1 /**
2  * The condition module provides a primitive for synchronized condition
3  * checking.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_condition.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module java.nonstandard.sync.condition;
17 
18 version(Tango){
19 
20     public import tango.core.sync.Condition;
21 
22 } else { // Phobos
23 
24 public import java.nonstandard.sync.exception;
25 public import java.nonstandard.sync.mutex;
26 public import core.time;
27 
28 version( Windows )
29 {
30     private import core.sync.semaphore;
31     private import core.sys.windows.windows;
32 }
33 else version( Posix )
34 {
35     private import core.sync.config;
36     private import core.stdc.errno;
37     private import core.sys.posix.pthread;
38     private import core.sys.posix.time;
39 }
40 else
41 {
42     static assert(false, "Platform not supported");
43 }
44 
45 
46 ////////////////////////////////////////////////////////////////////////////////
47 // Condition
48 //
49 // void wait();
50 // void notify();
51 // void notifyAll();
52 ////////////////////////////////////////////////////////////////////////////////
53 
54 
55 /**
56  * This class represents a condition variable as conceived by C.A.R. Hoare.  As
57  * per Mesa type monitors however, "signal" has been replaced with "notify" to
58  * indicate that control is not transferred to the waiter when a notification
59  * is sent.
60  */
61 class Condition
62 {
63     ////////////////////////////////////////////////////////////////////////////
64     // Initialization
65     ////////////////////////////////////////////////////////////////////////////
66 
67     /**
68      * Initializes a condition object which is associated with the supplied
69      * mutex object.
70      *
71      * Params:
72      *  m = The mutex with which this condition will be associated.
73      *
74      * Throws:
75      *  SyncException on error.
76      */
77     this( Mutex m ) @trusted // @trusted is used to stay backwards compatible
78     {                        // versions older than 2.067.0 where
79                              // core.sys.posix.pthread.pthread_cond_init is
80                              // @system
81         version( Windows )
82         {
83             m_blockLock = CreateSemaphoreA( null, 1, 1, null );
84             if( m_blockLock == m_blockLock.init )
85                 throw new SyncException( "Unable to initialize condition" );
86             scope(failure) CloseHandle( m_blockLock );
87 
88             m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
89             if( m_blockQueue == m_blockQueue.init )
90                 throw new SyncException( "Unable to initialize condition" );
91             scope(failure) CloseHandle( m_blockQueue );
92 
93             InitializeCriticalSection( &m_unblockLock );
94             m_assocMutex = m;
95         }
96         else version( Posix )
97         {
98             m_assocMutex = m;
99             int rc = pthread_cond_init( &m_hndl, null );
100             if( rc )
101                 throw new SyncException( "Unable to initialize condition" );
102         }
103     }
104 
105 
106     ~this()
107     {
108         version( Windows )
109         {
110             BOOL rc = CloseHandle( m_blockLock );
111             assert( rc, "Unable to destroy condition" );
112             rc = CloseHandle( m_blockQueue );
113             assert( rc, "Unable to destroy condition" );
114             DeleteCriticalSection( &m_unblockLock );
115         }
116         else version( Posix )
117         {
118             int rc = pthread_cond_destroy( &m_hndl );
119             assert( !rc, "Unable to destroy condition" );
120         }
121     }
122 
123 
124     ////////////////////////////////////////////////////////////////////////////
125     // General Properties
126     ////////////////////////////////////////////////////////////////////////////
127 
128 
129     /**
130      * Gets the mutex associated with this condition.
131      *
132      * Returns:
133      *  The mutex associated with this condition.
134      */
135     @property Mutex mutex()
136     {
137         return m_assocMutex;
138     }
139 
140 
141     ////////////////////////////////////////////////////////////////////////////
142     // General Actions
143     ////////////////////////////////////////////////////////////////////////////
144 
145 
146     /**
147      * Wait until notified.
148      *
149      * Throws:
150      *  SyncException on error.
151      */
152     void wait()
153     {
154         version( Windows )
155         {
156             timedWait( INFINITE );
157         }
158         else version( Posix )
159         {
160             int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() );
161             if( rc )
162                 throw new SyncException( "Unable to wait for condition" );
163         }
164     }
165 
166 
167     /**
168      * Suspends the calling thread until a notification occurs or until the
169      * supplied time period has elapsed.
170      *
171      * Params:
172      *  val = The time to wait.
173      *
174      * In:
175      *  val must be non-negative.
176      *
177      * Throws:
178      *  SyncException on error.
179      *
180      * Returns:
181      *  true if notified before the timeout and false if not.
182      */
183     bool wait( Duration val )
184     in
185     {
186         assert( !val.isNegative );
187     }
188     body
189     {
190         version( Windows )
191         {
192             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
193 
194             while( val > maxWaitMillis )
195             {
196                 if( timedWait( cast(uint)
197                                maxWaitMillis.total!"msecs" ) )
198                     return true;
199                 val -= maxWaitMillis;
200             }
201             return timedWait( cast(uint) val.total!"msecs" );
202         }
203         else version( Posix )
204         {
205             timespec t = void;
206             mktspec( t, val );
207 
208             int rc = pthread_cond_timedwait( &m_hndl,
209                                              m_assocMutex.handleAddr(),
210                                              &t );
211             if( !rc )
212                 return true;
213             if( rc == ETIMEDOUT )
214                 return false;
215             throw new SyncException( "Unable to wait for condition" );
216         }
217     }
218 
219 
220     /**
221      * Notifies one waiter.
222      *
223      * Throws:
224      *  SyncException on error.
225      */
226     void notify()
227     {
228         version( Windows )
229         {
230             notify( false );
231         }
232         else version( Posix )
233         {
234             int rc = pthread_cond_signal( &m_hndl );
235             if( rc )
236                 throw new SyncException( "Unable to notify condition" );
237         }
238     }
239 
240 
241     /**
242      * Notifies all waiters.
243      *
244      * Throws:
245      *  SyncException on error.
246      */
247     void notifyAll()
248     {
249         version( Windows )
250         {
251             notify( true );
252         }
253         else version( Posix )
254         {
255             int rc = pthread_cond_broadcast( &m_hndl );
256             if( rc )
257                 throw new SyncException( "Unable to notify condition" );
258         }
259     }
260 
261 
262 private:
263     version( Windows )
264     {
265         bool timedWait( DWORD timeout )
266         {
267             int   numSignalsLeft;
268             int   numWaitersGone;
269             DWORD rc;
270 
271             rc = WaitForSingleObject( m_blockLock, INFINITE );
272             assert( rc == WAIT_OBJECT_0 );
273 
274             m_numWaitersBlocked++;
275 
276             rc = ReleaseSemaphore( m_blockLock, 1, null );
277             assert( rc );
278 
279             m_assocMutex.unlock();
280             scope(failure) m_assocMutex.lock();
281 
282             rc = WaitForSingleObject( m_blockQueue, timeout );
283             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
284             bool timedOut = (rc == WAIT_TIMEOUT);
285 
286             EnterCriticalSection( &m_unblockLock );
287             scope(failure) LeaveCriticalSection( &m_unblockLock );
288 
289             if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
290             {
291                 if ( timedOut )
292                 {
293                     // timeout (or canceled)
294                     if( m_numWaitersBlocked != 0 )
295                     {
296                         m_numWaitersBlocked--;
297                         // do not unblock next waiter below (already unblocked)
298                         numSignalsLeft = 0;
299                     }
300                     else
301                     {
302                         // spurious wakeup pending!!
303                         m_numWaitersGone = 1;
304                     }
305                 }
306                 if( --m_numWaitersToUnblock == 0 )
307                 {
308                     if( m_numWaitersBlocked != 0 )
309                     {
310                         // open the gate
311                         rc = ReleaseSemaphore( m_blockLock, 1, null );
312                         assert( rc );
313                         // do not open the gate below again
314                         numSignalsLeft = 0;
315                     }
316                     else if( (numWaitersGone = m_numWaitersGone) != 0 )
317                     {
318                         m_numWaitersGone = 0;
319                     }
320                 }
321             }
322             else if( ++m_numWaitersGone == int.max / 2 )
323             {
324                 // timeout/canceled or spurious event :-)
325                 rc = WaitForSingleObject( m_blockLock, INFINITE );
326                 assert( rc == WAIT_OBJECT_0 );
327                 // something is going on here - test of timeouts?
328                 m_numWaitersBlocked -= m_numWaitersGone;
329                 rc = ReleaseSemaphore( m_blockLock, 1, null );
330                 assert( rc == WAIT_OBJECT_0 );
331                 m_numWaitersGone = 0;
332             }
333 
334             LeaveCriticalSection( &m_unblockLock );
335 
336             if( numSignalsLeft == 1 )
337             {
338                 // better now than spurious later (same as ResetEvent)
339                 for( ; numWaitersGone > 0; --numWaitersGone )
340                 {
341                     rc = WaitForSingleObject( m_blockQueue, INFINITE );
342                     assert( rc == WAIT_OBJECT_0 );
343                 }
344                 // open the gate
345                 rc = ReleaseSemaphore( m_blockLock, 1, null );
346                 assert( rc );
347             }
348             else if( numSignalsLeft != 0 )
349             {
350                 // unblock next waiter
351                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
352                 assert( rc );
353             }
354             m_assocMutex.lock();
355             return !timedOut;
356         }
357 
358 
359         void notify( bool all )
360         {
361             DWORD rc;
362 
363             EnterCriticalSection( &m_unblockLock );
364             scope(failure) LeaveCriticalSection( &m_unblockLock );
365 
366             if( m_numWaitersToUnblock != 0 )
367             {
368                 if( m_numWaitersBlocked == 0 )
369                 {
370                     LeaveCriticalSection( &m_unblockLock );
371                     return;
372                 }
373                 if( all )
374                 {
375                     m_numWaitersToUnblock += m_numWaitersBlocked;
376                     m_numWaitersBlocked = 0;
377                 }
378                 else
379                 {
380                     m_numWaitersToUnblock++;
381                     m_numWaitersBlocked--;
382                 }
383                 LeaveCriticalSection( &m_unblockLock );
384             }
385             else if( m_numWaitersBlocked > m_numWaitersGone )
386             {
387                 rc = WaitForSingleObject( m_blockLock, INFINITE );
388                 assert( rc == WAIT_OBJECT_0 );
389                 if( 0 != m_numWaitersGone )
390                 {
391                     m_numWaitersBlocked -= m_numWaitersGone;
392                     m_numWaitersGone = 0;
393                 }
394                 if( all )
395                 {
396                     m_numWaitersToUnblock = m_numWaitersBlocked;
397                     m_numWaitersBlocked = 0;
398                 }
399                 else
400                 {
401                     m_numWaitersToUnblock = 1;
402                     m_numWaitersBlocked--;
403                 }
404                 LeaveCriticalSection( &m_unblockLock );
405                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
406                 assert( rc );
407             }
408             else
409             {
410                 LeaveCriticalSection( &m_unblockLock );
411             }
412         }
413 
414 
415         // NOTE: This implementation uses Algorithm 8c as described here:
416         //       http://groups.google.com/group/comp.programming.threads/
417         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
418         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
419         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
420         Mutex               m_assocMutex;   // external mutex/CS
421         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
422         int                 m_numWaitersGone        = 0;
423         int                 m_numWaitersBlocked     = 0;
424         int                 m_numWaitersToUnblock   = 0;
425     }
426     else version( Posix )
427     {
428         Mutex               m_assocMutex;
429         pthread_cond_t      m_hndl;
430     }
431 }
432 
433 
434 ////////////////////////////////////////////////////////////////////////////////
435 // Unit Tests
436 ////////////////////////////////////////////////////////////////////////////////
437 
438 
439 version( unittest )
440 {
441     private import core.thread;
442     private import java.nonstandard.sync.mutex;
443     private import core.sync.semaphore;
444 
445 
446     void testNotify()
447     {
448         auto mutex      = new Mutex;
449         auto condReady  = new Condition( mutex );
450         auto semDone    = new Semaphore;
451         auto synLoop    = new Object;
452         int  numWaiters = 10;
453         int  numTries   = 10;
454         int  numReady   = 0;
455         int  numTotal   = 0;
456         int  numDone    = 0;
457         int  numPost    = 0;
458 
459         void waiter()
460         {
461             for( int i = 0; i < numTries; ++i )
462             {
463                 synchronized( mutex )
464                 {
465                     while( numReady < 1 )
466                     {
467                         condReady.wait();
468                     }
469                     --numReady;
470                     ++numTotal;
471                 }
472 
473                 synchronized( synLoop )
474                 {
475                     ++numDone;
476                 }
477                 semDone.wait();
478             }
479         }
480 
481         auto group = new ThreadGroup;
482 
483         for( int i = 0; i < numWaiters; ++i )
484             group.create( &waiter );
485 
486         for( int i = 0; i < numTries; ++i )
487         {
488             for( int j = 0; j < numWaiters; ++j )
489             {
490                 synchronized( mutex )
491                 {
492                     ++numReady;
493                     condReady.notify();
494                 }
495             }
496             while( true )
497             {
498                 synchronized( synLoop )
499                 {
500                     if( numDone >= numWaiters )
501                         break;
502                 }
503                 Thread.yield();
504             }
505             for( int j = 0; j < numWaiters; ++j )
506             {
507                 semDone.notify();
508             }
509         }
510 
511         group.joinAll();
512         assert( numTotal == numWaiters * numTries );
513     }
514 
515 
516     void testNotifyAll()
517     {
518         auto mutex      = new Mutex;
519         auto condReady  = new Condition( mutex );
520         int  numWaiters = 10;
521         int  numReady   = 0;
522         int  numDone    = 0;
523         bool alert      = false;
524 
525         void waiter()
526         {
527             synchronized( mutex )
528             {
529                 ++numReady;
530                 while( !alert )
531                     condReady.wait();
532                 ++numDone;
533             }
534         }
535 
536         auto group = new ThreadGroup;
537 
538         for( int i = 0; i < numWaiters; ++i )
539             group.create( &waiter );
540 
541         while( true )
542         {
543             synchronized( mutex )
544             {
545                 if( numReady >= numWaiters )
546                 {
547                     alert = true;
548                     condReady.notifyAll();
549                     break;
550                 }
551             }
552             Thread.yield();
553         }
554         group.joinAll();
555         assert( numReady == numWaiters && numDone == numWaiters );
556     }
557 
558 
559     void testWaitTimeout()
560     {
561         auto mutex      = new Mutex;
562         auto condReady  = new Condition( mutex );
563         bool waiting    = false;
564         bool alertedOne = true;
565         bool alertedTwo = true;
566 
567         void waiter()
568         {
569             synchronized( mutex )
570             {
571                 waiting    = true;
572                 // we never want to miss the notification (30s)
573                 alertedOne = condReady.wait( dur!"seconds"(30) );
574                 // but we don't want to wait long for the timeout (10ms)
575                 alertedTwo = condReady.wait( dur!"msecs"(10) );
576             }
577         }
578 
579         auto thread = new Thread( &waiter );
580         thread.start();
581 
582         while( true )
583         {
584             synchronized( mutex )
585             {
586                 if( waiting )
587                 {
588                     condReady.notify();
589                     break;
590                 }
591             }
592             Thread.yield();
593         }
594         thread.join();
595         assert( waiting );
596         assert( alertedOne );
597         assert( !alertedTwo );
598     }
599 
600 
601     unittest
602     {
603         testNotify();
604         testNotifyAll();
605         testWaitTimeout();
606     }
607 }
608 
609 }