/** * The semaphore module provides a general use semaphore for synchronization. * * Copyright: Copyright Sean Kelly 2005 - 2009. * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) * Authors: Sean Kelly * Source: $(DRUNTIMESRC core/sync/_semaphore.d) */ /* Copyright Sean Kelly 2005 - 2009. * Distributed under the Boost Software License, Version 1.0. * (See accompanying file LICENSE or copy at * http://www.boost.org/LICENSE_1_0.txt) */ module core.sync.semaphore; public import core.sync.exception; public import core.time; version (OSX) version = Darwin; else version (iOS) version = Darwin; else version (TVOS) version = Darwin; else version (WatchOS) version = Darwin; version (Windows) { import core.sys.windows.basetsd /+: HANDLE+/; import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; import core.sys.windows.windef /+: BOOL, DWORD+/; import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; } else version (Darwin) { import core.sync.config; import core.stdc.errno; import core.sys.posix.time; import core.sys.darwin.mach.semaphore; } else version (Posix) { import core.sync.config; import core.stdc.errno; import core.sys.posix.pthread; import core.sys.posix.semaphore; } else { static assert(false, "Platform not supported"); } //////////////////////////////////////////////////////////////////////////////// // Semaphore // // void wait(); // void notify(); // bool tryWait(); //////////////////////////////////////////////////////////////////////////////// /** * This class represents a general counting semaphore as concieved by Edsger * Dijkstra. As per Mesa type monitors however, "signal" has been replaced * with "notify" to indicate that control is not transferred to the waiter when * a notification is sent. */ class Semaphore { //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes a semaphore object with the specified initial count. * * Params: * count = The initial count for the semaphore. * * Throws: * SyncError on error. */ this( uint count = 0 ) { version (Windows) { m_hndl = CreateSemaphoreA( null, count, int.max, null ); if ( m_hndl == m_hndl.init ) throw new SyncError( "Unable to create semaphore" ); } else version (Darwin) { auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count ); if ( rc ) throw new SyncError( "Unable to create semaphore" ); } else version (Posix) { int rc = sem_init( &m_hndl, 0, count ); if ( rc ) throw new SyncError( "Unable to create semaphore" ); } } ~this() { version (Windows) { BOOL rc = CloseHandle( m_hndl ); assert( rc, "Unable to destroy semaphore" ); } else version (Darwin) { auto rc = semaphore_destroy( mach_task_self(), m_hndl ); assert( !rc, "Unable to destroy semaphore" ); } else version (Posix) { int rc = sem_destroy( &m_hndl ); assert( !rc, "Unable to destroy semaphore" ); } } //////////////////////////////////////////////////////////////////////////// // General Actions //////////////////////////////////////////////////////////////////////////// /** * Wait until the current count is above zero, then atomically decrement * the count by one and return. * * Throws: * SyncError on error. */ void wait() { version (Windows) { DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); if ( rc != WAIT_OBJECT_0 ) throw new SyncError( "Unable to wait for semaphore" ); } else version (Darwin) { while ( true ) { auto rc = semaphore_wait( m_hndl ); if ( !rc ) return; if ( rc == KERN_ABORTED && errno == EINTR ) continue; throw new SyncError( "Unable to wait for semaphore" ); } } else version (Posix) { while ( true ) { if ( !sem_wait( &m_hndl ) ) return; if ( errno != EINTR ) throw new SyncError( "Unable to wait for semaphore" ); } } } /** * Suspends the calling thread until the current count moves above zero or * until the supplied time period has elapsed. If the count moves above * zero in this interval, then atomically decrement the count by one and * return true. Otherwise, return false. * * Params: * period = The time to wait. * * In: * period must be non-negative. * * Throws: * SyncError on error. * * Returns: * true if notified before the timeout and false if not. */ bool wait( Duration period ) in { assert( !period.isNegative ); } do { version (Windows) { auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); while ( period > maxWaitMillis ) { auto rc = WaitForSingleObject( m_hndl, cast(uint) maxWaitMillis.total!"msecs" ); switch ( rc ) { case WAIT_OBJECT_0: return true; case WAIT_TIMEOUT: period -= maxWaitMillis; continue; default: throw new SyncError( "Unable to wait for semaphore" ); } } switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) ) { case WAIT_OBJECT_0: return true; case WAIT_TIMEOUT: return false; default: throw new SyncError( "Unable to wait for semaphore" ); } } else version (Darwin) { mach_timespec_t t = void; (cast(byte*) &t)[0 .. t.sizeof] = 0; if ( period.total!"seconds" > t.tv_sec.max ) { t.tv_sec = t.tv_sec.max; t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs; } else period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec); while ( true ) { auto rc = semaphore_timedwait( m_hndl, t ); if ( !rc ) return true; if ( rc == KERN_OPERATION_TIMED_OUT ) return false; if ( rc != KERN_ABORTED || errno != EINTR ) throw new SyncError( "Unable to wait for semaphore" ); } } else version (Posix) { import core.sys.posix.time : clock_gettime, CLOCK_REALTIME; timespec t = void; clock_gettime( CLOCK_REALTIME, &t ); mvtspec( t, period ); while ( true ) { if ( !sem_timedwait( &m_hndl, &t ) ) return true; if ( errno == ETIMEDOUT ) return false; if ( errno != EINTR ) throw new SyncError( "Unable to wait for semaphore" ); } } } /** * Atomically increment the current count by one. This will notify one * waiter, if there are any in the queue. * * Throws: * SyncError on error. */ void notify() { version (Windows) { if ( !ReleaseSemaphore( m_hndl, 1, null ) ) throw new SyncError( "Unable to notify semaphore" ); } else version (Darwin) { auto rc = semaphore_signal( m_hndl ); if ( rc ) throw new SyncError( "Unable to notify semaphore" ); } else version (Posix) { int rc = sem_post( &m_hndl ); if ( rc ) throw new SyncError( "Unable to notify semaphore" ); } } /** * If the current count is equal to zero, return. Otherwise, atomically * decrement the count by one and return true. * * Throws: * SyncError on error. * * Returns: * true if the count was above zero and false if not. */ bool tryWait() { version (Windows) { switch ( WaitForSingleObject( m_hndl, 0 ) ) { case WAIT_OBJECT_0: return true; case WAIT_TIMEOUT: return false; default: throw new SyncError( "Unable to wait for semaphore" ); } } else version (Darwin) { return wait( dur!"hnsecs"(0) ); } else version (Posix) { while ( true ) { if ( !sem_trywait( &m_hndl ) ) return true; if ( errno == EAGAIN ) return false; if ( errno != EINTR ) throw new SyncError( "Unable to wait for semaphore" ); } } } protected: /// Aliases the operating-system-specific semaphore type. version (Windows) alias Handle = HANDLE; /// ditto else version (Darwin) alias Handle = semaphore_t; /// ditto else version (Posix) alias Handle = sem_t; /// Handle to the system-specific semaphore. Handle m_hndl; } //////////////////////////////////////////////////////////////////////////////// // Unit Tests //////////////////////////////////////////////////////////////////////////////// unittest { import core.thread, core.atomic; void testWait() { auto semaphore = new Semaphore; shared bool stopConsumption = false; immutable numToProduce = 20; immutable numConsumers = 10; shared size_t numConsumed; shared size_t numComplete; void consumer() { while (true) { semaphore.wait(); if (atomicLoad(stopConsumption)) break; atomicOp!"+="(numConsumed, 1); } atomicOp!"+="(numComplete, 1); } void producer() { assert(!semaphore.tryWait()); foreach (_; 0 .. numToProduce) semaphore.notify(); // wait until all items are consumed while (atomicLoad(numConsumed) != numToProduce) Thread.yield(); // mark consumption as finished atomicStore(stopConsumption, true); // wake all consumers foreach (_; 0 .. numConsumers) semaphore.notify(); // wait until all consumers completed while (atomicLoad(numComplete) != numConsumers) Thread.yield(); assert(!semaphore.tryWait()); semaphore.notify(); assert(semaphore.tryWait()); assert(!semaphore.tryWait()); } auto group = new ThreadGroup; for ( int i = 0; i < numConsumers; ++i ) group.create(&consumer); group.create(&producer); group.joinAll(); } void testWaitTimeout() { auto sem = new Semaphore; shared bool semReady; bool alertedOne, alertedTwo; void waiter() { while (!atomicLoad(semReady)) Thread.yield(); alertedOne = sem.wait(dur!"msecs"(1)); alertedTwo = sem.wait(dur!"msecs"(1)); assert(alertedOne && !alertedTwo); } auto thread = new Thread(&waiter); thread.start(); sem.notify(); atomicStore(semReady, true); thread.join(); assert(alertedOne && !alertedTwo); } testWait(); testWaitTimeout(); }