1 /** 2 * The condition module provides a primitive for synchronized condition 3 * checking. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_condition.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module java.nonstandard.sync.condition; 17 18 version(Tango){ 19 20 public import tango.core.sync.Condition; 21 22 } else { // Phobos 23 24 public import java.nonstandard.sync.exception; 25 public import java.nonstandard.sync.mutex; 26 public import core.time; 27 28 version( Windows ) 29 { 30 private import core.sync.semaphore; 31 private import core.sys.windows.windows; 32 } 33 else version( Posix ) 34 { 35 private import core.sync.config; 36 private import core.stdc.errno; 37 private import core.sys.posix.pthread; 38 private import core.sys.posix.time; 39 } 40 else 41 { 42 static assert(false, "Platform not supported"); 43 } 44 45 46 //////////////////////////////////////////////////////////////////////////////// 47 // Condition 48 // 49 // void wait(); 50 // void notify(); 51 // void notifyAll(); 52 //////////////////////////////////////////////////////////////////////////////// 53 54 55 /** 56 * This class represents a condition variable as conceived by C.A.R. Hoare. As 57 * per Mesa type monitors however, "signal" has been replaced with "notify" to 58 * indicate that control is not transferred to the waiter when a notification 59 * is sent. 60 */ 61 class Condition 62 { 63 //////////////////////////////////////////////////////////////////////////// 64 // Initialization 65 //////////////////////////////////////////////////////////////////////////// 66 67 /** 68 * Initializes a condition object which is associated with the supplied 69 * mutex object. 70 * 71 * Params: 72 * m = The mutex with which this condition will be associated. 73 * 74 * Throws: 75 * SyncException on error. 76 */ 77 this( Mutex m ) @trusted // @trusted is used to stay backwards compatible 78 { // versions older than 2.067.0 where 79 // core.sys.posix.pthread.pthread_cond_init is 80 // @system 81 version( Windows ) 82 { 83 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 84 if( m_blockLock == m_blockLock.init ) 85 throw new SyncException( "Unable to initialize condition" ); 86 scope(failure) CloseHandle( m_blockLock ); 87 88 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 89 if( m_blockQueue == m_blockQueue.init ) 90 throw new SyncException( "Unable to initialize condition" ); 91 scope(failure) CloseHandle( m_blockQueue ); 92 93 InitializeCriticalSection( &m_unblockLock ); 94 m_assocMutex = m; 95 } 96 else version( Posix ) 97 { 98 m_assocMutex = m; 99 int rc = pthread_cond_init( &m_hndl, null ); 100 if( rc ) 101 throw new SyncException( "Unable to initialize condition" ); 102 } 103 } 104 105 106 ~this() 107 { 108 version( Windows ) 109 { 110 BOOL rc = CloseHandle( m_blockLock ); 111 assert( rc, "Unable to destroy condition" ); 112 rc = CloseHandle( m_blockQueue ); 113 assert( rc, "Unable to destroy condition" ); 114 DeleteCriticalSection( &m_unblockLock ); 115 } 116 else version( Posix ) 117 { 118 int rc = pthread_cond_destroy( &m_hndl ); 119 assert( !rc, "Unable to destroy condition" ); 120 } 121 } 122 123 124 //////////////////////////////////////////////////////////////////////////// 125 // General Properties 126 //////////////////////////////////////////////////////////////////////////// 127 128 129 /** 130 * Gets the mutex associated with this condition. 131 * 132 * Returns: 133 * The mutex associated with this condition. 134 */ 135 @property Mutex mutex() 136 { 137 return m_assocMutex; 138 } 139 140 141 //////////////////////////////////////////////////////////////////////////// 142 // General Actions 143 //////////////////////////////////////////////////////////////////////////// 144 145 146 /** 147 * Wait until notified. 148 * 149 * Throws: 150 * SyncException on error. 151 */ 152 void wait() 153 { 154 version( Windows ) 155 { 156 timedWait( INFINITE ); 157 } 158 else version( Posix ) 159 { 160 int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() ); 161 if( rc ) 162 throw new SyncException( "Unable to wait for condition" ); 163 } 164 } 165 166 167 /** 168 * Suspends the calling thread until a notification occurs or until the 169 * supplied time period has elapsed. 170 * 171 * Params: 172 * val = The time to wait. 173 * 174 * In: 175 * val must be non-negative. 176 * 177 * Throws: 178 * SyncException on error. 179 * 180 * Returns: 181 * true if notified before the timeout and false if not. 182 */ 183 bool wait( Duration val ) 184 in 185 { 186 assert( !val.isNegative ); 187 } 188 body 189 { 190 version( Windows ) 191 { 192 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 193 194 while( val > maxWaitMillis ) 195 { 196 if( timedWait( cast(uint) 197 maxWaitMillis.total!"msecs" ) ) 198 return true; 199 val -= maxWaitMillis; 200 } 201 return timedWait( cast(uint) val.total!"msecs" ); 202 } 203 else version( Posix ) 204 { 205 timespec t = void; 206 mktspec( t, val ); 207 208 int rc = pthread_cond_timedwait( &m_hndl, 209 m_assocMutex.handleAddr(), 210 &t ); 211 if( !rc ) 212 return true; 213 if( rc == ETIMEDOUT ) 214 return false; 215 throw new SyncException( "Unable to wait for condition" ); 216 } 217 } 218 219 220 /** 221 * Notifies one waiter. 222 * 223 * Throws: 224 * SyncException on error. 225 */ 226 void notify() 227 { 228 version( Windows ) 229 { 230 notify( false ); 231 } 232 else version( Posix ) 233 { 234 int rc = pthread_cond_signal( &m_hndl ); 235 if( rc ) 236 throw new SyncException( "Unable to notify condition" ); 237 } 238 } 239 240 241 /** 242 * Notifies all waiters. 243 * 244 * Throws: 245 * SyncException on error. 246 */ 247 void notifyAll() 248 { 249 version( Windows ) 250 { 251 notify( true ); 252 } 253 else version( Posix ) 254 { 255 int rc = pthread_cond_broadcast( &m_hndl ); 256 if( rc ) 257 throw new SyncException( "Unable to notify condition" ); 258 } 259 } 260 261 262 private: 263 version( Windows ) 264 { 265 bool timedWait( DWORD timeout ) 266 { 267 int numSignalsLeft; 268 int numWaitersGone; 269 DWORD rc; 270 271 rc = WaitForSingleObject( m_blockLock, INFINITE ); 272 assert( rc == WAIT_OBJECT_0 ); 273 274 m_numWaitersBlocked++; 275 276 rc = ReleaseSemaphore( m_blockLock, 1, null ); 277 assert( rc ); 278 279 m_assocMutex.unlock(); 280 scope(failure) m_assocMutex.lock(); 281 282 rc = WaitForSingleObject( m_blockQueue, timeout ); 283 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 284 bool timedOut = (rc == WAIT_TIMEOUT); 285 286 EnterCriticalSection( &m_unblockLock ); 287 scope(failure) LeaveCriticalSection( &m_unblockLock ); 288 289 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 290 { 291 if ( timedOut ) 292 { 293 // timeout (or canceled) 294 if( m_numWaitersBlocked != 0 ) 295 { 296 m_numWaitersBlocked--; 297 // do not unblock next waiter below (already unblocked) 298 numSignalsLeft = 0; 299 } 300 else 301 { 302 // spurious wakeup pending!! 303 m_numWaitersGone = 1; 304 } 305 } 306 if( --m_numWaitersToUnblock == 0 ) 307 { 308 if( m_numWaitersBlocked != 0 ) 309 { 310 // open the gate 311 rc = ReleaseSemaphore( m_blockLock, 1, null ); 312 assert( rc ); 313 // do not open the gate below again 314 numSignalsLeft = 0; 315 } 316 else if( (numWaitersGone = m_numWaitersGone) != 0 ) 317 { 318 m_numWaitersGone = 0; 319 } 320 } 321 } 322 else if( ++m_numWaitersGone == int.max / 2 ) 323 { 324 // timeout/canceled or spurious event :-) 325 rc = WaitForSingleObject( m_blockLock, INFINITE ); 326 assert( rc == WAIT_OBJECT_0 ); 327 // something is going on here - test of timeouts? 328 m_numWaitersBlocked -= m_numWaitersGone; 329 rc = ReleaseSemaphore( m_blockLock, 1, null ); 330 assert( rc == WAIT_OBJECT_0 ); 331 m_numWaitersGone = 0; 332 } 333 334 LeaveCriticalSection( &m_unblockLock ); 335 336 if( numSignalsLeft == 1 ) 337 { 338 // better now than spurious later (same as ResetEvent) 339 for( ; numWaitersGone > 0; --numWaitersGone ) 340 { 341 rc = WaitForSingleObject( m_blockQueue, INFINITE ); 342 assert( rc == WAIT_OBJECT_0 ); 343 } 344 // open the gate 345 rc = ReleaseSemaphore( m_blockLock, 1, null ); 346 assert( rc ); 347 } 348 else if( numSignalsLeft != 0 ) 349 { 350 // unblock next waiter 351 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 352 assert( rc ); 353 } 354 m_assocMutex.lock(); 355 return !timedOut; 356 } 357 358 359 void notify( bool all ) 360 { 361 DWORD rc; 362 363 EnterCriticalSection( &m_unblockLock ); 364 scope(failure) LeaveCriticalSection( &m_unblockLock ); 365 366 if( m_numWaitersToUnblock != 0 ) 367 { 368 if( m_numWaitersBlocked == 0 ) 369 { 370 LeaveCriticalSection( &m_unblockLock ); 371 return; 372 } 373 if( all ) 374 { 375 m_numWaitersToUnblock += m_numWaitersBlocked; 376 m_numWaitersBlocked = 0; 377 } 378 else 379 { 380 m_numWaitersToUnblock++; 381 m_numWaitersBlocked--; 382 } 383 LeaveCriticalSection( &m_unblockLock ); 384 } 385 else if( m_numWaitersBlocked > m_numWaitersGone ) 386 { 387 rc = WaitForSingleObject( m_blockLock, INFINITE ); 388 assert( rc == WAIT_OBJECT_0 ); 389 if( 0 != m_numWaitersGone ) 390 { 391 m_numWaitersBlocked -= m_numWaitersGone; 392 m_numWaitersGone = 0; 393 } 394 if( all ) 395 { 396 m_numWaitersToUnblock = m_numWaitersBlocked; 397 m_numWaitersBlocked = 0; 398 } 399 else 400 { 401 m_numWaitersToUnblock = 1; 402 m_numWaitersBlocked--; 403 } 404 LeaveCriticalSection( &m_unblockLock ); 405 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 406 assert( rc ); 407 } 408 else 409 { 410 LeaveCriticalSection( &m_unblockLock ); 411 } 412 } 413 414 415 // NOTE: This implementation uses Algorithm 8c as described here: 416 // http://groups.google.com/group/comp.programming.threads/ 417 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 418 HANDLE m_blockLock; // auto-reset event (now semaphore) 419 HANDLE m_blockQueue; // auto-reset event (now semaphore) 420 Mutex m_assocMutex; // external mutex/CS 421 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 422 int m_numWaitersGone = 0; 423 int m_numWaitersBlocked = 0; 424 int m_numWaitersToUnblock = 0; 425 } 426 else version( Posix ) 427 { 428 Mutex m_assocMutex; 429 pthread_cond_t m_hndl; 430 } 431 } 432 433 434 //////////////////////////////////////////////////////////////////////////////// 435 // Unit Tests 436 //////////////////////////////////////////////////////////////////////////////// 437 438 439 version( unittest ) 440 { 441 private import core.thread; 442 private import java.nonstandard.sync.mutex; 443 private import core.sync.semaphore; 444 445 446 void testNotify() 447 { 448 auto mutex = new Mutex; 449 auto condReady = new Condition( mutex ); 450 auto semDone = new Semaphore; 451 auto synLoop = new Object; 452 int numWaiters = 10; 453 int numTries = 10; 454 int numReady = 0; 455 int numTotal = 0; 456 int numDone = 0; 457 int numPost = 0; 458 459 void waiter() 460 { 461 for( int i = 0; i < numTries; ++i ) 462 { 463 synchronized( mutex ) 464 { 465 while( numReady < 1 ) 466 { 467 condReady.wait(); 468 } 469 --numReady; 470 ++numTotal; 471 } 472 473 synchronized( synLoop ) 474 { 475 ++numDone; 476 } 477 semDone.wait(); 478 } 479 } 480 481 auto group = new ThreadGroup; 482 483 for( int i = 0; i < numWaiters; ++i ) 484 group.create( &waiter ); 485 486 for( int i = 0; i < numTries; ++i ) 487 { 488 for( int j = 0; j < numWaiters; ++j ) 489 { 490 synchronized( mutex ) 491 { 492 ++numReady; 493 condReady.notify(); 494 } 495 } 496 while( true ) 497 { 498 synchronized( synLoop ) 499 { 500 if( numDone >= numWaiters ) 501 break; 502 } 503 Thread.yield(); 504 } 505 for( int j = 0; j < numWaiters; ++j ) 506 { 507 semDone.notify(); 508 } 509 } 510 511 group.joinAll(); 512 assert( numTotal == numWaiters * numTries ); 513 } 514 515 516 void testNotifyAll() 517 { 518 auto mutex = new Mutex; 519 auto condReady = new Condition( mutex ); 520 int numWaiters = 10; 521 int numReady = 0; 522 int numDone = 0; 523 bool alert = false; 524 525 void waiter() 526 { 527 synchronized( mutex ) 528 { 529 ++numReady; 530 while( !alert ) 531 condReady.wait(); 532 ++numDone; 533 } 534 } 535 536 auto group = new ThreadGroup; 537 538 for( int i = 0; i < numWaiters; ++i ) 539 group.create( &waiter ); 540 541 while( true ) 542 { 543 synchronized( mutex ) 544 { 545 if( numReady >= numWaiters ) 546 { 547 alert = true; 548 condReady.notifyAll(); 549 break; 550 } 551 } 552 Thread.yield(); 553 } 554 group.joinAll(); 555 assert( numReady == numWaiters && numDone == numWaiters ); 556 } 557 558 559 void testWaitTimeout() 560 { 561 auto mutex = new Mutex; 562 auto condReady = new Condition( mutex ); 563 bool waiting = false; 564 bool alertedOne = true; 565 bool alertedTwo = true; 566 567 void waiter() 568 { 569 synchronized( mutex ) 570 { 571 waiting = true; 572 // we never want to miss the notification (30s) 573 alertedOne = condReady.wait( dur!"seconds"(30) ); 574 // but we don't want to wait long for the timeout (10ms) 575 alertedTwo = condReady.wait( dur!"msecs"(10) ); 576 } 577 } 578 579 auto thread = new Thread( &waiter ); 580 thread.start(); 581 582 while( true ) 583 { 584 synchronized( mutex ) 585 { 586 if( waiting ) 587 { 588 condReady.notify(); 589 break; 590 } 591 } 592 Thread.yield(); 593 } 594 thread.join(); 595 assert( waiting ); 596 assert( alertedOne ); 597 assert( !alertedTwo ); 598 } 599 600 601 unittest 602 { 603 testNotify(); 604 testNotifyAll(); 605 testWaitTimeout(); 606 } 607 } 608 609 }