/** * The read/write mutex module provides a primitive for maintaining shared read * access and mutually exclusive write access. * * Copyright: Copyright Sean Kelly 2005 - 2009. * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) * Authors: Sean Kelly * Source: $(DRUNTIMESRC core/sync/_rwmutex.d) */ /* Copyright Sean Kelly 2005 - 2009. * Distributed under the Boost Software License, Version 1.0. * (See accompanying file LICENSE or copy at * http://www.boost.org/LICENSE_1_0.txt) */ module core.sync.rwmutex; public import core.sync.exception; import core.sync.condition; import core.sync.mutex; import core.memory; version (Posix) { import core.sys.posix.pthread; } //////////////////////////////////////////////////////////////////////////////// // ReadWriteMutex // // Reader reader(); // Writer writer(); //////////////////////////////////////////////////////////////////////////////// /** * This class represents a mutex that allows any number of readers to enter, * but when a writer enters, all other readers and writers are blocked. * * Please note that this mutex is not recursive and is intended to guard access * to data only. Also, no deadlock checking is in place because doing so would * require dynamic memory allocation, which would reduce performance by an * unacceptable amount. As a result, any attempt to recursively acquire this * mutex may well deadlock the caller, particularly if a write lock is acquired * while holding a read lock, or vice-versa. In practice, this should not be * an issue however, because it is uncommon to call deeply into unknown code * while holding a lock that simply protects data. */ class ReadWriteMutex { /** * Defines the policy used by this mutex. Currently, two policies are * defined. * * The first will queue writers until no readers hold the mutex, then * pass the writers through one at a time. If a reader acquires the mutex * while there are still writers queued, the reader will take precedence. * * The second will queue readers if there are any writers queued. Writers * are passed through one at a time, and once there are no writers present, * all queued readers will be alerted. * * Future policies may offer a more even balance between reader and writer * precedence. */ enum Policy { PREFER_READERS, /// Readers get preference. This may starve writers. PREFER_WRITERS /// Writers get preference. This may starve readers. } //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes a read/write mutex object with the supplied policy. * * Params: * policy = The policy to use. * * Throws: * SyncError on error. */ this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow { m_commonMutex = new Mutex; if ( !m_commonMutex ) throw new SyncError( "Unable to initialize mutex" ); m_readerQueue = new Condition( m_commonMutex ); if ( !m_readerQueue ) throw new SyncError( "Unable to initialize mutex" ); m_writerQueue = new Condition( m_commonMutex ); if ( !m_writerQueue ) throw new SyncError( "Unable to initialize mutex" ); m_policy = policy; m_reader = new Reader; m_writer = new Writer; } /// ditto shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow { m_commonMutex = new shared Mutex; if ( !m_commonMutex ) throw new SyncError( "Unable to initialize mutex" ); m_readerQueue = new shared Condition( m_commonMutex ); if ( !m_readerQueue ) throw new SyncError( "Unable to initialize mutex" ); m_writerQueue = new shared Condition( m_commonMutex ); if ( !m_writerQueue ) throw new SyncError( "Unable to initialize mutex" ); m_policy = policy; m_reader = new shared Reader; m_writer = new shared Writer; } //////////////////////////////////////////////////////////////////////////// // General Properties //////////////////////////////////////////////////////////////////////////// /** * Gets the policy used by this mutex. * * Returns: * The policy used by this mutex. */ @property Policy policy() @safe nothrow { return m_policy; } ///ditto @property Policy policy() shared @safe nothrow { return m_policy; } //////////////////////////////////////////////////////////////////////////// // Reader/Writer Handles //////////////////////////////////////////////////////////////////////////// /** * Gets an object representing the reader lock for the associated mutex. * * Returns: * A reader sub-mutex. */ @property Reader reader() @safe nothrow { return m_reader; } ///ditto @property shared(Reader) reader() shared @safe nothrow { return m_reader; } /** * Gets an object representing the writer lock for the associated mutex. * * Returns: * A writer sub-mutex. */ @property Writer writer() @safe nothrow { return m_writer; } ///ditto @property shared(Writer) writer() shared @safe nothrow { return m_writer; } //////////////////////////////////////////////////////////////////////////// // Reader //////////////////////////////////////////////////////////////////////////// /** * This class can be considered a mutex in its own right, and is used to * negotiate a read lock for the enclosing mutex. */ class Reader : Object.Monitor { /** * Initializes a read/write mutex reader proxy object. */ this(this Q)() @trusted nothrow if (is(Q == Reader) || is(Q == shared Reader)) { m_proxy.link = this; this.__monitor = cast(void*) &m_proxy; } /** * Acquires a read lock on the enclosing mutex. */ @trusted void lock() { synchronized( m_commonMutex ) { ++m_numQueuedReaders; scope(exit) --m_numQueuedReaders; while ( shouldQueueReader ) m_readerQueue.wait(); ++m_numActiveReaders; } } /// ditto @trusted void lock() shared { synchronized( m_commonMutex ) { ++(cast()m_numQueuedReaders); scope(exit) --(cast()m_numQueuedReaders); while ( shouldQueueReader ) m_readerQueue.wait(); ++(cast()m_numActiveReaders); } } /** * Releases a read lock on the enclosing mutex. */ @trusted void unlock() { synchronized( m_commonMutex ) { if ( --m_numActiveReaders < 1 ) { if ( m_numQueuedWriters > 0 ) m_writerQueue.notify(); } } } /// ditto @trusted void unlock() shared { synchronized( m_commonMutex ) { if ( --(cast()m_numActiveReaders) < 1 ) { if ( m_numQueuedWriters > 0 ) m_writerQueue.notify(); } } } /** * Attempts to acquire a read lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the lock is not acquired and false is returned. * * Returns: * true if the lock was acquired and false if not. */ @trusted bool tryLock() { synchronized( m_commonMutex ) { if ( shouldQueueReader ) return false; ++m_numActiveReaders; return true; } } /// ditto @trusted bool tryLock() shared { synchronized( m_commonMutex ) { if ( shouldQueueReader ) return false; ++(cast()m_numActiveReaders); return true; } } /** * Attempts to acquire a read lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the function blocks until either the lock can be * obtained or the time elapsed exceeds $(D_PARAM timeout), returning * true if the lock was acquired and false if the function timed out. * * Params: * timeout = maximum amount of time to wait for the lock * Returns: * true if the lock was acquired and false if not. */ @trusted bool tryLock(Duration timeout) { synchronized( m_commonMutex ) { if (!shouldQueueReader) { ++m_numActiveReaders; return true; } enum zero = Duration.zero(); if (timeout <= zero) return false; ++m_numQueuedReaders; scope(exit) --m_numQueuedReaders; enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration. const initialTime = MonoTime.currTime; m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall); while (shouldQueueReader) { const timeElapsed = MonoTime.currTime - initialTime; if (timeElapsed >= timeout) return false; auto nextWait = timeout - timeElapsed; m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); } ++m_numActiveReaders; return true; } } /// ditto @trusted bool tryLock(Duration timeout) shared { const initialTime = MonoTime.currTime; synchronized( m_commonMutex ) { ++(cast()m_numQueuedReaders); scope(exit) --(cast()m_numQueuedReaders); while (shouldQueueReader) { const timeElapsed = MonoTime.currTime - initialTime; if (timeElapsed >= timeout) return false; auto nextWait = timeout - timeElapsed; // Avoid problems calling wait(Duration) with huge arguments. enum maxWaitPerCall = dur!"hours"(24 * 365); m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); } ++(cast()m_numActiveReaders); return true; } } private: @property bool shouldQueueReader(this Q)() nothrow @safe @nogc if (is(Q == Reader) || is(Q == shared Reader)) { if ( m_numActiveWriters > 0 ) return true; switch ( m_policy ) { case Policy.PREFER_WRITERS: return m_numQueuedWriters > 0; case Policy.PREFER_READERS: default: break; } return false; } struct MonitorProxy { Object.Monitor link; } MonitorProxy m_proxy; } //////////////////////////////////////////////////////////////////////////// // Writer //////////////////////////////////////////////////////////////////////////// /** * This class can be considered a mutex in its own right, and is used to * negotiate a write lock for the enclosing mutex. */ class Writer : Object.Monitor { /** * Initializes a read/write mutex writer proxy object. */ this(this Q)() @trusted nothrow if (is(Q == Writer) || is(Q == shared Writer)) { m_proxy.link = this; this.__monitor = cast(void*) &m_proxy; } /** * Acquires a write lock on the enclosing mutex. */ @trusted void lock() { synchronized( m_commonMutex ) { ++m_numQueuedWriters; scope(exit) --m_numQueuedWriters; while ( shouldQueueWriter ) m_writerQueue.wait(); ++m_numActiveWriters; } } /// ditto @trusted void lock() shared { synchronized( m_commonMutex ) { ++(cast()m_numQueuedWriters); scope(exit) --(cast()m_numQueuedWriters); while ( shouldQueueWriter ) m_writerQueue.wait(); ++(cast()m_numActiveWriters); } } /** * Releases a write lock on the enclosing mutex. */ @trusted void unlock() { synchronized( m_commonMutex ) { if ( --m_numActiveWriters < 1 ) { switch ( m_policy ) { default: case Policy.PREFER_READERS: if ( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); else if ( m_numQueuedWriters > 0 ) m_writerQueue.notify(); break; case Policy.PREFER_WRITERS: if ( m_numQueuedWriters > 0 ) m_writerQueue.notify(); else if ( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); } } } } /// ditto @trusted void unlock() shared { synchronized( m_commonMutex ) { if ( --(cast()m_numActiveWriters) < 1 ) { switch ( m_policy ) { default: case Policy.PREFER_READERS: if ( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); else if ( m_numQueuedWriters > 0 ) m_writerQueue.notify(); break; case Policy.PREFER_WRITERS: if ( m_numQueuedWriters > 0 ) m_writerQueue.notify(); else if ( m_numQueuedReaders > 0 ) m_readerQueue.notifyAll(); } } } } /** * Attempts to acquire a write lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the lock is not acquired and false is returned. * * Returns: * true if the lock was acquired and false if not. */ @trusted bool tryLock() { synchronized( m_commonMutex ) { if ( shouldQueueWriter ) return false; ++m_numActiveWriters; return true; } } /// ditto @trusted bool tryLock() shared { synchronized( m_commonMutex ) { if ( shouldQueueWriter ) return false; ++(cast()m_numActiveWriters); return true; } } /** * Attempts to acquire a write lock on the enclosing mutex. If one can * be obtained without blocking, the lock is acquired and true is * returned. If not, the function blocks until either the lock can be * obtained or the time elapsed exceeds $(D_PARAM timeout), returning * true if the lock was acquired and false if the function timed out. * * Params: * timeout = maximum amount of time to wait for the lock * Returns: * true if the lock was acquired and false if not. */ @trusted bool tryLock(Duration timeout) { synchronized( m_commonMutex ) { if (!shouldQueueWriter) { ++m_numActiveWriters; return true; } enum zero = Duration.zero(); if (timeout <= zero) return false; ++m_numQueuedWriters; scope(exit) --m_numQueuedWriters; enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration. const initialTime = MonoTime.currTime; m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall); while (shouldQueueWriter) { const timeElapsed = MonoTime.currTime - initialTime; if (timeElapsed >= timeout) return false; auto nextWait = timeout - timeElapsed; m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); } ++m_numActiveWriters; return true; } } /// ditto @trusted bool tryLock(Duration timeout) shared { const initialTime = MonoTime.currTime; synchronized( m_commonMutex ) { ++(cast()m_numQueuedWriters); scope(exit) --(cast()m_numQueuedWriters); while (shouldQueueWriter) { const timeElapsed = MonoTime.currTime - initialTime; if (timeElapsed >= timeout) return false; auto nextWait = timeout - timeElapsed; // Avoid problems calling wait(Duration) with huge arguments. enum maxWaitPerCall = dur!"hours"(24 * 365); m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); } ++(cast()m_numActiveWriters); return true; } } private: @property bool shouldQueueWriter(this Q)() if (is(Q == Writer) || is(Q == shared Writer)) { if ( m_numActiveWriters > 0 || m_numActiveReaders > 0 ) return true; switch ( m_policy ) { case Policy.PREFER_READERS: return m_numQueuedReaders > 0; case Policy.PREFER_WRITERS: default: break; } return false; } struct MonitorProxy { Object.Monitor link; } MonitorProxy m_proxy; } private: Policy m_policy; Reader m_reader; Writer m_writer; Mutex m_commonMutex; Condition m_readerQueue; Condition m_writerQueue; int m_numQueuedReaders; int m_numActiveReaders; int m_numQueuedWriters; int m_numActiveWriters; } //////////////////////////////////////////////////////////////////////////////// // Unit Tests //////////////////////////////////////////////////////////////////////////////// unittest { import core.atomic, core.thread, core.sync.semaphore; static void runTest(ReadWriteMutex.Policy policy) { scope mutex = new ReadWriteMutex(policy); scope rdSemA = new Semaphore, rdSemB = new Semaphore, wrSemA = new Semaphore, wrSemB = new Semaphore; shared size_t numReaders, numWriters; void readerFn() { synchronized (mutex.reader) { atomicOp!"+="(numReaders, 1); rdSemA.notify(); rdSemB.wait(); atomicOp!"-="(numReaders, 1); } } void writerFn() { synchronized (mutex.writer) { atomicOp!"+="(numWriters, 1); wrSemA.notify(); wrSemB.wait(); atomicOp!"-="(numWriters, 1); } } void waitQueued(size_t queuedReaders, size_t queuedWriters) { for (;;) { synchronized (mutex.m_commonMutex) { if (mutex.m_numQueuedReaders == queuedReaders && mutex.m_numQueuedWriters == queuedWriters) break; } Thread.yield(); } } scope group = new ThreadGroup; // 2 simultaneous readers group.create(&readerFn); group.create(&readerFn); rdSemA.wait(); rdSemA.wait(); assert(numReaders == 2); rdSemB.notify(); rdSemB.notify(); group.joinAll(); assert(numReaders == 0); foreach (t; group) group.remove(t); // 1 writer at a time group.create(&writerFn); group.create(&writerFn); wrSemA.wait(); assert(!wrSemA.tryWait()); assert(numWriters == 1); wrSemB.notify(); wrSemA.wait(); assert(numWriters == 1); wrSemB.notify(); group.joinAll(); assert(numWriters == 0); foreach (t; group) group.remove(t); // reader and writer are mutually exclusive group.create(&readerFn); rdSemA.wait(); group.create(&writerFn); waitQueued(0, 1); assert(!wrSemA.tryWait()); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); wrSemA.wait(); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); group.joinAll(); assert(numReaders == 0 && numWriters == 0); foreach (t; group) group.remove(t); // writer and reader are mutually exclusive group.create(&writerFn); wrSemA.wait(); group.create(&readerFn); waitQueued(1, 0); assert(!rdSemA.tryWait()); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); rdSemA.wait(); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); group.joinAll(); assert(numReaders == 0 && numWriters == 0); foreach (t; group) group.remove(t); // policy determines whether queued reader or writers progress first group.create(&writerFn); wrSemA.wait(); group.create(&readerFn); group.create(&writerFn); waitQueued(1, 1); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); if (policy == ReadWriteMutex.Policy.PREFER_READERS) { rdSemA.wait(); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); wrSemA.wait(); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); } else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS) { wrSemA.wait(); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); rdSemA.wait(); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); } group.joinAll(); assert(numReaders == 0 && numWriters == 0); foreach (t; group) group.remove(t); } runTest(ReadWriteMutex.Policy.PREFER_READERS); runTest(ReadWriteMutex.Policy.PREFER_WRITERS); } unittest { import core.atomic, core.thread; __gshared ReadWriteMutex rwmutex; shared static bool threadTriedOnceToGetLock; shared static bool threadFinallyGotLock; rwmutex = new ReadWriteMutex(); atomicFence; const maxTimeAllowedForTest = dur!"seconds"(20); // Test ReadWriteMutex.Reader.tryLock(Duration). { static void testReaderTryLock() { assert(!rwmutex.reader.tryLock(Duration.min)); threadTriedOnceToGetLock.atomicStore(true); assert(rwmutex.reader.tryLock(Duration.max)); threadFinallyGotLock.atomicStore(true); rwmutex.reader.unlock; } assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); auto otherThread = new Thread(&testReaderTryLock).start; const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; Thread.yield; // We started otherThread with the writer lock held so otherThread's // first rwlock.reader.tryLock with timeout Duration.min should fail. while (!threadTriedOnceToGetLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } rwmutex.writer.unlock; // Soon after we release the writer lock otherThread's second // rwlock.reader.tryLock with timeout Duration.max should succeed. while (!threadFinallyGotLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } otherThread.join; } threadTriedOnceToGetLock.atomicStore(false); // Reset. threadFinallyGotLock.atomicStore(false); // Reset. // Test ReadWriteMutex.Writer.tryLock(Duration). { static void testWriterTryLock() { assert(!rwmutex.writer.tryLock(Duration.min)); threadTriedOnceToGetLock.atomicStore(true); assert(rwmutex.writer.tryLock(Duration.max)); threadFinallyGotLock.atomicStore(true); rwmutex.writer.unlock; } assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); auto otherThread = new Thread(&testWriterTryLock).start; const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; Thread.yield; // We started otherThread with the reader lock held so otherThread's // first rwlock.writer.tryLock with timeout Duration.min should fail. while (!threadTriedOnceToGetLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } rwmutex.reader.unlock; // Soon after we release the reader lock otherThread's second // rwlock.writer.tryLock with timeout Duration.max should succeed. while (!threadFinallyGotLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } otherThread.join; } } unittest { import core.atomic, core.thread, core.sync.semaphore; static void runTest(ReadWriteMutex.Policy policy) { shared scope mutex = new shared ReadWriteMutex(policy); scope rdSemA = new Semaphore, rdSemB = new Semaphore, wrSemA = new Semaphore, wrSemB = new Semaphore; shared size_t numReaders, numWriters; void readerFn() { synchronized (mutex.reader) { atomicOp!"+="(numReaders, 1); rdSemA.notify(); rdSemB.wait(); atomicOp!"-="(numReaders, 1); } } void writerFn() { synchronized (mutex.writer) { atomicOp!"+="(numWriters, 1); wrSemA.notify(); wrSemB.wait(); atomicOp!"-="(numWriters, 1); } } void waitQueued(size_t queuedReaders, size_t queuedWriters) { for (;;) { synchronized (mutex.m_commonMutex) { if (mutex.m_numQueuedReaders == queuedReaders && mutex.m_numQueuedWriters == queuedWriters) break; } Thread.yield(); } } scope group = new ThreadGroup; // 2 simultaneous readers group.create(&readerFn); group.create(&readerFn); rdSemA.wait(); rdSemA.wait(); assert(numReaders == 2); rdSemB.notify(); rdSemB.notify(); group.joinAll(); assert(numReaders == 0); foreach (t; group) group.remove(t); // 1 writer at a time group.create(&writerFn); group.create(&writerFn); wrSemA.wait(); assert(!wrSemA.tryWait()); assert(numWriters == 1); wrSemB.notify(); wrSemA.wait(); assert(numWriters == 1); wrSemB.notify(); group.joinAll(); assert(numWriters == 0); foreach (t; group) group.remove(t); // reader and writer are mutually exclusive group.create(&readerFn); rdSemA.wait(); group.create(&writerFn); waitQueued(0, 1); assert(!wrSemA.tryWait()); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); wrSemA.wait(); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); group.joinAll(); assert(numReaders == 0 && numWriters == 0); foreach (t; group) group.remove(t); // writer and reader are mutually exclusive group.create(&writerFn); wrSemA.wait(); group.create(&readerFn); waitQueued(1, 0); assert(!rdSemA.tryWait()); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); rdSemA.wait(); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); group.joinAll(); assert(numReaders == 0 && numWriters == 0); foreach (t; group) group.remove(t); // policy determines whether queued reader or writers progress first group.create(&writerFn); wrSemA.wait(); group.create(&readerFn); group.create(&writerFn); waitQueued(1, 1); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); if (policy == ReadWriteMutex.Policy.PREFER_READERS) { rdSemA.wait(); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); wrSemA.wait(); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); } else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS) { wrSemA.wait(); assert(numReaders == 0 && numWriters == 1); wrSemB.notify(); rdSemA.wait(); assert(numReaders == 1 && numWriters == 0); rdSemB.notify(); } group.joinAll(); assert(numReaders == 0 && numWriters == 0); foreach (t; group) group.remove(t); } runTest(ReadWriteMutex.Policy.PREFER_READERS); runTest(ReadWriteMutex.Policy.PREFER_WRITERS); } unittest { import core.atomic, core.thread; shared static ReadWriteMutex rwmutex; shared static bool threadTriedOnceToGetLock; shared static bool threadFinallyGotLock; rwmutex = new shared ReadWriteMutex(); atomicFence; const maxTimeAllowedForTest = dur!"seconds"(20); // Test ReadWriteMutex.Reader.tryLock(Duration). { static void testReaderTryLock() { assert(!rwmutex.reader.tryLock(Duration.min)); threadTriedOnceToGetLock.atomicStore(true); assert(rwmutex.reader.tryLock(Duration.max)); threadFinallyGotLock.atomicStore(true); rwmutex.reader.unlock; } assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); auto otherThread = new Thread(&testReaderTryLock).start; const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; Thread.yield; // We started otherThread with the writer lock held so otherThread's // first rwlock.reader.tryLock with timeout Duration.min should fail. while (!threadTriedOnceToGetLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } rwmutex.writer.unlock; // Soon after we release the writer lock otherThread's second // rwlock.reader.tryLock with timeout Duration.max should succeed. while (!threadFinallyGotLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } otherThread.join; } threadTriedOnceToGetLock.atomicStore(false); // Reset. threadFinallyGotLock.atomicStore(false); // Reset. // Test ReadWriteMutex.Writer.tryLock(Duration). { static void testWriterTryLock() { assert(!rwmutex.writer.tryLock(Duration.min)); threadTriedOnceToGetLock.atomicStore(true); assert(rwmutex.writer.tryLock(Duration.max)); threadFinallyGotLock.atomicStore(true); rwmutex.writer.unlock; } assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); auto otherThread = new Thread(&testWriterTryLock).start; const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; Thread.yield; // We started otherThread with the reader lock held so otherThread's // first rwlock.writer.tryLock with timeout Duration.min should fail. while (!threadTriedOnceToGetLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } rwmutex.reader.unlock; // Soon after we release the reader lock otherThread's second // rwlock.writer.tryLock with timeout Duration.max should succeed. while (!threadFinallyGotLock.atomicLoad) { assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); Thread.yield; } otherThread.join; } }