1258945Sroberto/*
2280849Scy * Copyright (C) 2004, 2005, 2007, 2009, 2011, 2012  Internet Systems Consortium, Inc. ("ISC")
3258945Sroberto * Copyright (C) 1998-2001, 2003  Internet Software Consortium.
4258945Sroberto *
5258945Sroberto * Permission to use, copy, modify, and/or distribute this software for any
6258945Sroberto * purpose with or without fee is hereby granted, provided that the above
7258945Sroberto * copyright notice and this permission notice appear in all copies.
8258945Sroberto *
9258945Sroberto * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
10258945Sroberto * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
11258945Sroberto * AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
12258945Sroberto * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13258945Sroberto * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
14258945Sroberto * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15258945Sroberto * PERFORMANCE OF THIS SOFTWARE.
16258945Sroberto */
17258945Sroberto
18280849Scy/* $Id$ */
19258945Sroberto
20258945Sroberto/*! \file */
21258945Sroberto
22258945Sroberto#include <config.h>
23258945Sroberto
24258945Sroberto#include <stddef.h>
25258945Sroberto
26258945Sroberto#include <isc/atomic.h>
27258945Sroberto#include <isc/magic.h>
28258945Sroberto#include <isc/msgs.h>
29258945Sroberto#include <isc/platform.h>
30258945Sroberto#include <isc/rwlock.h>
31258945Sroberto#include <isc/util.h>
32258945Sroberto
33258945Sroberto#define RWLOCK_MAGIC		ISC_MAGIC('R', 'W', 'L', 'k')
34258945Sroberto#define VALID_RWLOCK(rwl)	ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
35258945Sroberto
36258945Sroberto#ifdef ISC_PLATFORM_USETHREADS
37258945Sroberto
38258945Sroberto#ifndef RWLOCK_DEFAULT_READ_QUOTA
39258945Sroberto#define RWLOCK_DEFAULT_READ_QUOTA 4
40258945Sroberto#endif
41258945Sroberto
42258945Sroberto#ifndef RWLOCK_DEFAULT_WRITE_QUOTA
43258945Sroberto#define RWLOCK_DEFAULT_WRITE_QUOTA 4
44258945Sroberto#endif
45258945Sroberto
46258945Sroberto#ifdef ISC_RWLOCK_TRACE
47258945Sroberto#include <stdio.h>		/* Required for fprintf/stderr. */
48258945Sroberto#include <isc/thread.h>		/* Required for isc_thread_self(). */
49258945Sroberto
50258945Srobertostatic void
51258945Srobertoprint_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
52258945Sroberto	fprintf(stderr,
53258945Sroberto		isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
54258945Sroberto			       ISC_MSG_PRINTLOCK,
55258945Sroberto			       "rwlock %p thread %lu %s(%s): %s, %u active, "
56258945Sroberto			       "%u granted, %u rwaiting, %u wwaiting\n"),
57258945Sroberto		rwl, isc_thread_self(), operation,
58258945Sroberto		(type == isc_rwlocktype_read ?
59258945Sroberto		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
60258945Sroberto				ISC_MSG_READ, "read") :
61258945Sroberto		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
62258945Sroberto				ISC_MSG_WRITE, "write")),
63258945Sroberto		(rwl->type == isc_rwlocktype_read ?
64258945Sroberto		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
65258945Sroberto				ISC_MSG_READING, "reading") :
66258945Sroberto		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
67258945Sroberto				ISC_MSG_WRITING, "writing")),
68258945Sroberto		rwl->active, rwl->granted, rwl->readers_waiting,
69258945Sroberto		rwl->writers_waiting);
70258945Sroberto}
71258945Sroberto#endif
72258945Sroberto
73258945Srobertoisc_result_t
74258945Srobertoisc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
75258945Sroberto		unsigned int write_quota)
76258945Sroberto{
77258945Sroberto	isc_result_t result;
78258945Sroberto
79258945Sroberto	REQUIRE(rwl != NULL);
80258945Sroberto
81258945Sroberto	/*
82258945Sroberto	 * In case there's trouble initializing, we zero magic now.  If all
83258945Sroberto	 * goes well, we'll set it to RWLOCK_MAGIC.
84258945Sroberto	 */
85258945Sroberto	rwl->magic = 0;
86258945Sroberto
87258945Sroberto#if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
88258945Sroberto	rwl->write_requests = 0;
89258945Sroberto	rwl->write_completions = 0;
90258945Sroberto	rwl->cnt_and_flag = 0;
91258945Sroberto	rwl->readers_waiting = 0;
92258945Sroberto	rwl->write_granted = 0;
93258945Sroberto	if (read_quota != 0) {
94258945Sroberto		UNEXPECTED_ERROR(__FILE__, __LINE__,
95258945Sroberto				 "read quota is not supported");
96258945Sroberto	}
97258945Sroberto	if (write_quota == 0)
98258945Sroberto		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
99258945Sroberto	rwl->write_quota = write_quota;
100258945Sroberto#else
101258945Sroberto	rwl->type = isc_rwlocktype_read;
102258945Sroberto	rwl->original = isc_rwlocktype_none;
103258945Sroberto	rwl->active = 0;
104258945Sroberto	rwl->granted = 0;
105258945Sroberto	rwl->readers_waiting = 0;
106258945Sroberto	rwl->writers_waiting = 0;
107258945Sroberto	if (read_quota == 0)
108258945Sroberto		read_quota = RWLOCK_DEFAULT_READ_QUOTA;
109258945Sroberto	rwl->read_quota = read_quota;
110258945Sroberto	if (write_quota == 0)
111258945Sroberto		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
112258945Sroberto	rwl->write_quota = write_quota;
113258945Sroberto#endif
114258945Sroberto
115258945Sroberto	result = isc_mutex_init(&rwl->lock);
116258945Sroberto	if (result != ISC_R_SUCCESS)
117258945Sroberto		return (result);
118258945Sroberto
119258945Sroberto	result = isc_condition_init(&rwl->readable);
120258945Sroberto	if (result != ISC_R_SUCCESS) {
121258945Sroberto		UNEXPECTED_ERROR(__FILE__, __LINE__,
122258945Sroberto				 "isc_condition_init(readable) %s: %s",
123258945Sroberto				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
124258945Sroberto						ISC_MSG_FAILED, "failed"),
125258945Sroberto				 isc_result_totext(result));
126258945Sroberto		result = ISC_R_UNEXPECTED;
127258945Sroberto		goto destroy_lock;
128258945Sroberto	}
129258945Sroberto	result = isc_condition_init(&rwl->writeable);
130258945Sroberto	if (result != ISC_R_SUCCESS) {
131258945Sroberto		UNEXPECTED_ERROR(__FILE__, __LINE__,
132258945Sroberto				 "isc_condition_init(writeable) %s: %s",
133258945Sroberto				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
134258945Sroberto						ISC_MSG_FAILED, "failed"),
135258945Sroberto				 isc_result_totext(result));
136258945Sroberto		result = ISC_R_UNEXPECTED;
137258945Sroberto		goto destroy_rcond;
138258945Sroberto	}
139258945Sroberto
140258945Sroberto	rwl->magic = RWLOCK_MAGIC;
141258945Sroberto
142258945Sroberto	return (ISC_R_SUCCESS);
143258945Sroberto
144258945Sroberto  destroy_rcond:
145258945Sroberto	(void)isc_condition_destroy(&rwl->readable);
146258945Sroberto  destroy_lock:
147258945Sroberto	DESTROYLOCK(&rwl->lock);
148258945Sroberto
149258945Sroberto	return (result);
150258945Sroberto}
151258945Sroberto
152258945Srobertovoid
153258945Srobertoisc_rwlock_destroy(isc_rwlock_t *rwl) {
154258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
155258945Sroberto
156258945Sroberto#if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
157258945Sroberto	REQUIRE(rwl->write_requests == rwl->write_completions &&
158258945Sroberto		rwl->cnt_and_flag == 0 && rwl->readers_waiting == 0);
159258945Sroberto#else
160258945Sroberto	LOCK(&rwl->lock);
161258945Sroberto	REQUIRE(rwl->active == 0 &&
162258945Sroberto		rwl->readers_waiting == 0 &&
163258945Sroberto		rwl->writers_waiting == 0);
164258945Sroberto	UNLOCK(&rwl->lock);
165258945Sroberto#endif
166258945Sroberto
167258945Sroberto	rwl->magic = 0;
168258945Sroberto	(void)isc_condition_destroy(&rwl->readable);
169258945Sroberto	(void)isc_condition_destroy(&rwl->writeable);
170258945Sroberto	DESTROYLOCK(&rwl->lock);
171258945Sroberto}
172258945Sroberto
173258945Sroberto#if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
174258945Sroberto
175258945Sroberto/*
176258945Sroberto * When some architecture-dependent atomic operations are available,
177258945Sroberto * rwlock can be more efficient than the generic algorithm defined below.
178258945Sroberto * The basic algorithm is described in the following URL:
179258945Sroberto *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
180258945Sroberto *
181258945Sroberto * The key is to use the following integer variables modified atomically:
182258945Sroberto *   write_requests, write_completions, and cnt_and_flag.
183258945Sroberto *
184258945Sroberto * write_requests and write_completions act as a waiting queue for writers
185258945Sroberto * in order to ensure the FIFO order.  Both variables begin with the initial
186258945Sroberto * value of 0.  When a new writer tries to get a write lock, it increments
187258945Sroberto * write_requests and gets the previous value of the variable as a "ticket".
188258945Sroberto * When write_completions reaches the ticket number, the new writer can start
189258945Sroberto * writing.  When the writer completes its work, it increments
190258945Sroberto * write_completions so that another new writer can start working.  If the
191258945Sroberto * write_requests is not equal to write_completions, it means a writer is now
192258945Sroberto * working or waiting.  In this case, a new readers cannot start reading, or
193258945Sroberto * in other words, this algorithm basically prefers writers.
194258945Sroberto *
195258945Sroberto * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
196258945Sroberto * variable is a kind of structure with two members: writer_flag (1 bit) and
197258945Sroberto * reader_count (31 bits).  The writer_flag shows whether a writer is working,
198258945Sroberto * and the reader_count shows the number of readers currently working or almost
199258945Sroberto * ready for working.  A writer who has the current "ticket" tries to get the
200258945Sroberto * lock by exclusively setting the writer_flag to 1, provided that the whole
201258945Sroberto * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
202258945Sroberto * a new reader tries to increment the "reader_count" field provided that
203258945Sroberto * the writer_flag is 0 (meaning there is no writer working).
204258945Sroberto *
205258945Sroberto * If some of the above operations fail, the reader or the writer sleeps
206258945Sroberto * until the related condition changes.  When a working reader or writer
207258945Sroberto * completes its work, some readers or writers are sleeping, and the condition
208258945Sroberto * that suspended the reader or writer has changed, it wakes up the sleeping
209258945Sroberto * readers or writers.
210258945Sroberto *
211258945Sroberto * As already noted, this algorithm basically prefers writers.  In order to
212258945Sroberto * prevent readers from starving, however, the algorithm also introduces the
213258945Sroberto * "writer quota" (Q).  When Q consecutive writers have completed their work,
214258945Sroberto * suspending readers, the last writer will wake up the readers, even if a new
215258945Sroberto * writer is waiting.
216258945Sroberto *
217258945Sroberto * Implementation specific note: due to the combination of atomic operations
218258945Sroberto * and a mutex lock, ordering between the atomic operation and locks can be
219258945Sroberto * very sensitive in some cases.  In particular, it is generally very important
220258945Sroberto * to check the atomic variable that requires a reader or writer to sleep after
221258945Sroberto * locking the mutex and before actually sleeping; otherwise, it could be very
222258945Sroberto * likely to cause a deadlock.  For example, assume "var" is a variable
223258945Sroberto * atomically modified, then the corresponding code would be:
224258945Sroberto *	if (var == need_sleep) {
225258945Sroberto *		LOCK(lock);
226258945Sroberto *		if (var == need_sleep)
227258945Sroberto *			WAIT(cond, lock);
228258945Sroberto *		UNLOCK(lock);
229258945Sroberto *	}
230258945Sroberto * The second check is important, since "var" is protected by the atomic
231258945Sroberto * operation, not by the mutex, and can be changed just before sleeping.
232258945Sroberto * (The first "if" could be omitted, but this is also important in order to
233258945Sroberto * make the code efficient by avoiding the use of the mutex unless it is
234258945Sroberto * really necessary.)
235258945Sroberto */
236258945Sroberto
237258945Sroberto#define WRITER_ACTIVE	0x1
238258945Sroberto#define READER_INCR	0x2
239258945Sroberto
240258945Srobertoisc_result_t
241258945Srobertoisc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
242258945Sroberto	isc_int32_t cntflag;
243258945Sroberto
244258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
245258945Sroberto
246258945Sroberto#ifdef ISC_RWLOCK_TRACE
247258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
248258945Sroberto				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
249258945Sroberto#endif
250258945Sroberto
251258945Sroberto	if (type == isc_rwlocktype_read) {
252258945Sroberto		if (rwl->write_requests != rwl->write_completions) {
253258945Sroberto			/* there is a waiting or active writer */
254258945Sroberto			LOCK(&rwl->lock);
255258945Sroberto			if (rwl->write_requests != rwl->write_completions) {
256258945Sroberto				rwl->readers_waiting++;
257258945Sroberto				WAIT(&rwl->readable, &rwl->lock);
258258945Sroberto				rwl->readers_waiting--;
259258945Sroberto			}
260258945Sroberto			UNLOCK(&rwl->lock);
261258945Sroberto		}
262258945Sroberto
263258945Sroberto		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
264280849Scy		POST(cntflag);
265258945Sroberto		while (1) {
266258945Sroberto			if ((rwl->cnt_and_flag & WRITER_ACTIVE) == 0)
267258945Sroberto				break;
268258945Sroberto
269258945Sroberto			/* A writer is still working */
270258945Sroberto			LOCK(&rwl->lock);
271258945Sroberto			rwl->readers_waiting++;
272258945Sroberto			if ((rwl->cnt_and_flag & WRITER_ACTIVE) != 0)
273258945Sroberto				WAIT(&rwl->readable, &rwl->lock);
274258945Sroberto			rwl->readers_waiting--;
275258945Sroberto			UNLOCK(&rwl->lock);
276258945Sroberto
277258945Sroberto			/*
278258945Sroberto			 * Typically, the reader should be able to get a lock
279258945Sroberto			 * at this stage:
280258945Sroberto			 *   (1) there should have been no pending writer when
281258945Sroberto			 *       the reader was trying to increment the
282258945Sroberto			 *       counter; otherwise, the writer should be in
283258945Sroberto			 *       the waiting queue, preventing the reader from
284258945Sroberto			 *       proceeding to this point.
285258945Sroberto			 *   (2) once the reader increments the counter, no
286258945Sroberto			 *       more writer can get a lock.
287258945Sroberto			 * Still, it is possible another writer can work at
288258945Sroberto			 * this point, e.g. in the following scenario:
289258945Sroberto			 *   A previous writer unlocks the writer lock.
290258945Sroberto			 *   This reader proceeds to point (1).
291258945Sroberto			 *   A new writer appears, and gets a new lock before
292258945Sroberto			 *   the reader increments the counter.
293258945Sroberto			 *   The reader then increments the counter.
294258945Sroberto			 *   The previous writer notices there is a waiting
295258945Sroberto			 *   reader who is almost ready, and wakes it up.
296258945Sroberto			 * So, the reader needs to confirm whether it can now
297258945Sroberto			 * read explicitly (thus we loop).  Note that this is
298258945Sroberto			 * not an infinite process, since the reader has
299258945Sroberto			 * incremented the counter at this point.
300258945Sroberto			 */
301258945Sroberto		}
302258945Sroberto
303258945Sroberto		/*
304258945Sroberto		 * If we are temporarily preferred to writers due to the writer
305258945Sroberto		 * quota, reset the condition (race among readers doesn't
306258945Sroberto		 * matter).
307258945Sroberto		 */
308258945Sroberto		rwl->write_granted = 0;
309258945Sroberto	} else {
310258945Sroberto		isc_int32_t prev_writer;
311258945Sroberto
312258945Sroberto		/* enter the waiting queue, and wait for our turn */
313258945Sroberto		prev_writer = isc_atomic_xadd(&rwl->write_requests, 1);
314258945Sroberto		while (rwl->write_completions != prev_writer) {
315258945Sroberto			LOCK(&rwl->lock);
316258945Sroberto			if (rwl->write_completions != prev_writer) {
317258945Sroberto				WAIT(&rwl->writeable, &rwl->lock);
318258945Sroberto				UNLOCK(&rwl->lock);
319258945Sroberto				continue;
320258945Sroberto			}
321258945Sroberto			UNLOCK(&rwl->lock);
322258945Sroberto			break;
323258945Sroberto		}
324258945Sroberto
325258945Sroberto		while (1) {
326258945Sroberto			cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
327258945Sroberto						     WRITER_ACTIVE);
328258945Sroberto			if (cntflag == 0)
329258945Sroberto				break;
330258945Sroberto
331258945Sroberto			/* Another active reader or writer is working. */
332258945Sroberto			LOCK(&rwl->lock);
333258945Sroberto			if (rwl->cnt_and_flag != 0)
334258945Sroberto				WAIT(&rwl->writeable, &rwl->lock);
335258945Sroberto			UNLOCK(&rwl->lock);
336258945Sroberto		}
337258945Sroberto
338258945Sroberto		INSIST((rwl->cnt_and_flag & WRITER_ACTIVE) != 0);
339258945Sroberto		rwl->write_granted++;
340258945Sroberto	}
341258945Sroberto
342258945Sroberto#ifdef ISC_RWLOCK_TRACE
343258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
344258945Sroberto				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
345258945Sroberto#endif
346258945Sroberto
347258945Sroberto	return (ISC_R_SUCCESS);
348258945Sroberto}
349258945Sroberto
350258945Srobertoisc_result_t
351258945Srobertoisc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
352258945Sroberto	isc_int32_t cntflag;
353258945Sroberto
354258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
355258945Sroberto
356258945Sroberto#ifdef ISC_RWLOCK_TRACE
357258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
358258945Sroberto				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
359258945Sroberto#endif
360258945Sroberto
361258945Sroberto	if (type == isc_rwlocktype_read) {
362258945Sroberto		/* If a writer is waiting or working, we fail. */
363258945Sroberto		if (rwl->write_requests != rwl->write_completions)
364258945Sroberto			return (ISC_R_LOCKBUSY);
365258945Sroberto
366258945Sroberto		/* Otherwise, be ready for reading. */
367258945Sroberto		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
368258945Sroberto		if ((cntflag & WRITER_ACTIVE) != 0) {
369258945Sroberto			/*
370258945Sroberto			 * A writer is working.  We lose, and cancel the read
371258945Sroberto			 * request.
372258945Sroberto			 */
373258945Sroberto			cntflag = isc_atomic_xadd(&rwl->cnt_and_flag,
374258945Sroberto						  -READER_INCR);
375258945Sroberto			/*
376258945Sroberto			 * If no other readers are waiting and we've suspended
377258945Sroberto			 * new writers in this short period, wake them up.
378258945Sroberto			 */
379258945Sroberto			if (cntflag == READER_INCR &&
380258945Sroberto			    rwl->write_completions != rwl->write_requests) {
381258945Sroberto				LOCK(&rwl->lock);
382258945Sroberto				BROADCAST(&rwl->writeable);
383258945Sroberto				UNLOCK(&rwl->lock);
384258945Sroberto			}
385258945Sroberto
386258945Sroberto			return (ISC_R_LOCKBUSY);
387258945Sroberto		}
388258945Sroberto	} else {
389258945Sroberto		/* Try locking without entering the waiting queue. */
390258945Sroberto		cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
391258945Sroberto					     WRITER_ACTIVE);
392258945Sroberto		if (cntflag != 0)
393258945Sroberto			return (ISC_R_LOCKBUSY);
394258945Sroberto
395258945Sroberto		/*
396258945Sroberto		 * XXXJT: jump into the queue, possibly breaking the writer
397258945Sroberto		 * order.
398258945Sroberto		 */
399258945Sroberto		(void)isc_atomic_xadd(&rwl->write_completions, -1);
400258945Sroberto
401258945Sroberto		rwl->write_granted++;
402258945Sroberto	}
403258945Sroberto
404258945Sroberto#ifdef ISC_RWLOCK_TRACE
405258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
406258945Sroberto				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
407258945Sroberto#endif
408258945Sroberto
409258945Sroberto	return (ISC_R_SUCCESS);
410258945Sroberto}
411258945Sroberto
412258945Srobertoisc_result_t
413258945Srobertoisc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
414258945Sroberto	isc_int32_t prevcnt;
415258945Sroberto
416258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
417258945Sroberto
418258945Sroberto	/* Try to acquire write access. */
419258945Sroberto	prevcnt = isc_atomic_cmpxchg(&rwl->cnt_and_flag,
420258945Sroberto				     READER_INCR, WRITER_ACTIVE);
421258945Sroberto	/*
422258945Sroberto	 * There must have been no writer, and there must have been at least
423258945Sroberto	 * one reader.
424258945Sroberto	 */
425258945Sroberto	INSIST((prevcnt & WRITER_ACTIVE) == 0 &&
426258945Sroberto	       (prevcnt & ~WRITER_ACTIVE) != 0);
427258945Sroberto
428258945Sroberto	if (prevcnt == READER_INCR) {
429258945Sroberto		/*
430258945Sroberto		 * We are the only reader and have been upgraded.
431258945Sroberto		 * Now jump into the head of the writer waiting queue.
432258945Sroberto		 */
433258945Sroberto		(void)isc_atomic_xadd(&rwl->write_completions, -1);
434258945Sroberto	} else
435258945Sroberto		return (ISC_R_LOCKBUSY);
436258945Sroberto
437258945Sroberto	return (ISC_R_SUCCESS);
438258945Sroberto
439258945Sroberto}
440258945Sroberto
441258945Srobertovoid
442258945Srobertoisc_rwlock_downgrade(isc_rwlock_t *rwl) {
443258945Sroberto	isc_int32_t prev_readers;
444258945Sroberto
445258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
446258945Sroberto
447258945Sroberto	/* Become an active reader. */
448258945Sroberto	prev_readers = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
449258945Sroberto	/* We must have been a writer. */
450258945Sroberto	INSIST((prev_readers & WRITER_ACTIVE) != 0);
451258945Sroberto
452258945Sroberto	/* Complete write */
453258945Sroberto	(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
454258945Sroberto	(void)isc_atomic_xadd(&rwl->write_completions, 1);
455258945Sroberto
456258945Sroberto	/* Resume other readers */
457258945Sroberto	LOCK(&rwl->lock);
458258945Sroberto	if (rwl->readers_waiting > 0)
459258945Sroberto		BROADCAST(&rwl->readable);
460258945Sroberto	UNLOCK(&rwl->lock);
461258945Sroberto}
462258945Sroberto
463258945Srobertoisc_result_t
464258945Srobertoisc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
465258945Sroberto	isc_int32_t prev_cnt;
466258945Sroberto
467258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
468258945Sroberto
469258945Sroberto#ifdef ISC_RWLOCK_TRACE
470258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
471258945Sroberto				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
472258945Sroberto#endif
473258945Sroberto
474258945Sroberto	if (type == isc_rwlocktype_read) {
475258945Sroberto		prev_cnt = isc_atomic_xadd(&rwl->cnt_and_flag, -READER_INCR);
476258945Sroberto
477258945Sroberto		/*
478258945Sroberto		 * If we're the last reader and any writers are waiting, wake
479258945Sroberto		 * them up.  We need to wake up all of them to ensure the
480258945Sroberto		 * FIFO order.
481258945Sroberto		 */
482258945Sroberto		if (prev_cnt == READER_INCR &&
483258945Sroberto		    rwl->write_completions != rwl->write_requests) {
484258945Sroberto			LOCK(&rwl->lock);
485258945Sroberto			BROADCAST(&rwl->writeable);
486258945Sroberto			UNLOCK(&rwl->lock);
487258945Sroberto		}
488258945Sroberto	} else {
489258945Sroberto		isc_boolean_t wakeup_writers = ISC_TRUE;
490258945Sroberto
491258945Sroberto		/*
492258945Sroberto		 * Reset the flag, and (implicitly) tell other writers
493258945Sroberto		 * we are done.
494258945Sroberto		 */
495258945Sroberto		(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
496258945Sroberto		(void)isc_atomic_xadd(&rwl->write_completions, 1);
497258945Sroberto
498258945Sroberto		if (rwl->write_granted >= rwl->write_quota ||
499258945Sroberto		    rwl->write_requests == rwl->write_completions ||
500258945Sroberto		    (rwl->cnt_and_flag & ~WRITER_ACTIVE) != 0) {
501258945Sroberto			/*
502258945Sroberto			 * We have passed the write quota, no writer is
503258945Sroberto			 * waiting, or some readers are almost ready, pending
504258945Sroberto			 * possible writers.  Note that the last case can
505258945Sroberto			 * happen even if write_requests != write_completions
506258945Sroberto			 * (which means a new writer in the queue), so we need
507258945Sroberto			 * to catch the case explicitly.
508258945Sroberto			 */
509258945Sroberto			LOCK(&rwl->lock);
510258945Sroberto			if (rwl->readers_waiting > 0) {
511258945Sroberto				wakeup_writers = ISC_FALSE;
512258945Sroberto				BROADCAST(&rwl->readable);
513258945Sroberto			}
514258945Sroberto			UNLOCK(&rwl->lock);
515258945Sroberto		}
516258945Sroberto
517258945Sroberto		if (rwl->write_requests != rwl->write_completions &&
518258945Sroberto		    wakeup_writers) {
519258945Sroberto			LOCK(&rwl->lock);
520258945Sroberto			BROADCAST(&rwl->writeable);
521258945Sroberto			UNLOCK(&rwl->lock);
522258945Sroberto		}
523258945Sroberto	}
524258945Sroberto
525258945Sroberto#ifdef ISC_RWLOCK_TRACE
526258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
527258945Sroberto				  ISC_MSG_POSTUNLOCK, "postunlock"),
528258945Sroberto		   rwl, type);
529258945Sroberto#endif
530258945Sroberto
531258945Sroberto	return (ISC_R_SUCCESS);
532258945Sroberto}
533258945Sroberto
534258945Sroberto#else /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
535258945Sroberto
536258945Srobertostatic isc_result_t
537258945Srobertodoit(isc_rwlock_t *rwl, isc_rwlocktype_t type, isc_boolean_t nonblock) {
538258945Sroberto	isc_boolean_t skip = ISC_FALSE;
539258945Sroberto	isc_boolean_t done = ISC_FALSE;
540258945Sroberto	isc_result_t result = ISC_R_SUCCESS;
541258945Sroberto
542258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
543258945Sroberto
544258945Sroberto	LOCK(&rwl->lock);
545258945Sroberto
546258945Sroberto#ifdef ISC_RWLOCK_TRACE
547258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
548258945Sroberto				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
549258945Sroberto#endif
550258945Sroberto
551258945Sroberto	if (type == isc_rwlocktype_read) {
552258945Sroberto		if (rwl->readers_waiting != 0)
553258945Sroberto			skip = ISC_TRUE;
554258945Sroberto		while (!done) {
555258945Sroberto			if (!skip &&
556258945Sroberto			    ((rwl->active == 0 ||
557258945Sroberto			      (rwl->type == isc_rwlocktype_read &&
558258945Sroberto			       (rwl->writers_waiting == 0 ||
559258945Sroberto				rwl->granted < rwl->read_quota)))))
560258945Sroberto			{
561258945Sroberto				rwl->type = isc_rwlocktype_read;
562258945Sroberto				rwl->active++;
563258945Sroberto				rwl->granted++;
564258945Sroberto				done = ISC_TRUE;
565258945Sroberto			} else if (nonblock) {
566258945Sroberto				result = ISC_R_LOCKBUSY;
567258945Sroberto				done = ISC_TRUE;
568258945Sroberto			} else {
569258945Sroberto				skip = ISC_FALSE;
570258945Sroberto				rwl->readers_waiting++;
571258945Sroberto				WAIT(&rwl->readable, &rwl->lock);
572258945Sroberto				rwl->readers_waiting--;
573258945Sroberto			}
574258945Sroberto		}
575258945Sroberto	} else {
576258945Sroberto		if (rwl->writers_waiting != 0)
577258945Sroberto			skip = ISC_TRUE;
578258945Sroberto		while (!done) {
579258945Sroberto			if (!skip && rwl->active == 0) {
580258945Sroberto				rwl->type = isc_rwlocktype_write;
581258945Sroberto				rwl->active = 1;
582258945Sroberto				rwl->granted++;
583258945Sroberto				done = ISC_TRUE;
584258945Sroberto			} else if (nonblock) {
585258945Sroberto				result = ISC_R_LOCKBUSY;
586258945Sroberto				done = ISC_TRUE;
587258945Sroberto			} else {
588258945Sroberto				skip = ISC_FALSE;
589258945Sroberto				rwl->writers_waiting++;
590258945Sroberto				WAIT(&rwl->writeable, &rwl->lock);
591258945Sroberto				rwl->writers_waiting--;
592258945Sroberto			}
593258945Sroberto		}
594258945Sroberto	}
595258945Sroberto
596258945Sroberto#ifdef ISC_RWLOCK_TRACE
597258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
598258945Sroberto				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
599258945Sroberto#endif
600258945Sroberto
601258945Sroberto	UNLOCK(&rwl->lock);
602258945Sroberto
603258945Sroberto	return (result);
604258945Sroberto}
605258945Sroberto
606258945Srobertoisc_result_t
607258945Srobertoisc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
608258945Sroberto	return (doit(rwl, type, ISC_FALSE));
609258945Sroberto}
610258945Sroberto
611258945Srobertoisc_result_t
612258945Srobertoisc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
613258945Sroberto	return (doit(rwl, type, ISC_TRUE));
614258945Sroberto}
615258945Sroberto
616258945Srobertoisc_result_t
617258945Srobertoisc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
618258945Sroberto	isc_result_t result = ISC_R_SUCCESS;
619258945Sroberto
620258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
621258945Sroberto	LOCK(&rwl->lock);
622258945Sroberto	REQUIRE(rwl->type == isc_rwlocktype_read);
623258945Sroberto	REQUIRE(rwl->active != 0);
624258945Sroberto
625258945Sroberto	/* If we are the only reader then succeed. */
626258945Sroberto	if (rwl->active == 1) {
627258945Sroberto		rwl->original = (rwl->original == isc_rwlocktype_none) ?
628258945Sroberto				isc_rwlocktype_read : isc_rwlocktype_none;
629258945Sroberto		rwl->type = isc_rwlocktype_write;
630258945Sroberto	} else
631258945Sroberto		result = ISC_R_LOCKBUSY;
632258945Sroberto
633258945Sroberto	UNLOCK(&rwl->lock);
634258945Sroberto	return (result);
635258945Sroberto}
636258945Sroberto
637258945Srobertovoid
638258945Srobertoisc_rwlock_downgrade(isc_rwlock_t *rwl) {
639258945Sroberto
640258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
641258945Sroberto	LOCK(&rwl->lock);
642258945Sroberto	REQUIRE(rwl->type == isc_rwlocktype_write);
643258945Sroberto	REQUIRE(rwl->active == 1);
644258945Sroberto
645258945Sroberto	rwl->type = isc_rwlocktype_read;
646258945Sroberto	rwl->original = (rwl->original == isc_rwlocktype_none) ?
647258945Sroberto			isc_rwlocktype_write : isc_rwlocktype_none;
648258945Sroberto	/*
649258945Sroberto	 * Resume processing any read request that were blocked when
650258945Sroberto	 * we upgraded.
651258945Sroberto	 */
652258945Sroberto	if (rwl->original == isc_rwlocktype_none &&
653258945Sroberto	    (rwl->writers_waiting == 0 || rwl->granted < rwl->read_quota) &&
654258945Sroberto	    rwl->readers_waiting > 0)
655258945Sroberto		BROADCAST(&rwl->readable);
656258945Sroberto
657258945Sroberto	UNLOCK(&rwl->lock);
658258945Sroberto}
659258945Sroberto
660258945Srobertoisc_result_t
661258945Srobertoisc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
662258945Sroberto
663258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
664258945Sroberto	LOCK(&rwl->lock);
665258945Sroberto	REQUIRE(rwl->type == type);
666258945Sroberto
667258945Sroberto	UNUSED(type);
668258945Sroberto
669258945Sroberto#ifdef ISC_RWLOCK_TRACE
670258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
671258945Sroberto				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
672258945Sroberto#endif
673258945Sroberto
674258945Sroberto	INSIST(rwl->active > 0);
675258945Sroberto	rwl->active--;
676258945Sroberto	if (rwl->active == 0) {
677258945Sroberto		if (rwl->original != isc_rwlocktype_none) {
678258945Sroberto			rwl->type = rwl->original;
679258945Sroberto			rwl->original = isc_rwlocktype_none;
680258945Sroberto		}
681258945Sroberto		if (rwl->type == isc_rwlocktype_read) {
682258945Sroberto			rwl->granted = 0;
683258945Sroberto			if (rwl->writers_waiting > 0) {
684258945Sroberto				rwl->type = isc_rwlocktype_write;
685258945Sroberto				SIGNAL(&rwl->writeable);
686258945Sroberto			} else if (rwl->readers_waiting > 0) {
687258945Sroberto				/* Does this case ever happen? */
688258945Sroberto				BROADCAST(&rwl->readable);
689258945Sroberto			}
690258945Sroberto		} else {
691258945Sroberto			if (rwl->readers_waiting > 0) {
692258945Sroberto				if (rwl->writers_waiting > 0 &&
693258945Sroberto				    rwl->granted < rwl->write_quota) {
694258945Sroberto					SIGNAL(&rwl->writeable);
695258945Sroberto				} else {
696258945Sroberto					rwl->granted = 0;
697258945Sroberto					rwl->type = isc_rwlocktype_read;
698258945Sroberto					BROADCAST(&rwl->readable);
699258945Sroberto				}
700258945Sroberto			} else if (rwl->writers_waiting > 0) {
701258945Sroberto				rwl->granted = 0;
702258945Sroberto				SIGNAL(&rwl->writeable);
703258945Sroberto			} else {
704258945Sroberto				rwl->granted = 0;
705258945Sroberto			}
706258945Sroberto		}
707258945Sroberto	}
708258945Sroberto	INSIST(rwl->original == isc_rwlocktype_none);
709258945Sroberto
710258945Sroberto#ifdef ISC_RWLOCK_TRACE
711258945Sroberto	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
712258945Sroberto				  ISC_MSG_POSTUNLOCK, "postunlock"),
713258945Sroberto		   rwl, type);
714258945Sroberto#endif
715258945Sroberto
716258945Sroberto	UNLOCK(&rwl->lock);
717258945Sroberto
718258945Sroberto	return (ISC_R_SUCCESS);
719258945Sroberto}
720258945Sroberto
721258945Sroberto#endif /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
722258945Sroberto#else /* ISC_PLATFORM_USETHREADS */
723258945Sroberto
724258945Srobertoisc_result_t
725258945Srobertoisc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
726258945Sroberto		unsigned int write_quota)
727258945Sroberto{
728258945Sroberto	REQUIRE(rwl != NULL);
729258945Sroberto
730258945Sroberto	UNUSED(read_quota);
731258945Sroberto	UNUSED(write_quota);
732258945Sroberto
733258945Sroberto	rwl->type = isc_rwlocktype_read;
734258945Sroberto	rwl->active = 0;
735258945Sroberto	rwl->magic = RWLOCK_MAGIC;
736258945Sroberto
737258945Sroberto	return (ISC_R_SUCCESS);
738258945Sroberto}
739258945Sroberto
740258945Srobertoisc_result_t
741258945Srobertoisc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
742258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
743258945Sroberto
744258945Sroberto	if (type == isc_rwlocktype_read) {
745258945Sroberto		if (rwl->type != isc_rwlocktype_read && rwl->active != 0)
746258945Sroberto			return (ISC_R_LOCKBUSY);
747258945Sroberto		rwl->type = isc_rwlocktype_read;
748258945Sroberto		rwl->active++;
749258945Sroberto	} else {
750258945Sroberto		if (rwl->active != 0)
751258945Sroberto			return (ISC_R_LOCKBUSY);
752258945Sroberto		rwl->type = isc_rwlocktype_write;
753258945Sroberto		rwl->active = 1;
754258945Sroberto	}
755258945Sroberto	return (ISC_R_SUCCESS);
756258945Sroberto}
757258945Sroberto
758258945Srobertoisc_result_t
759258945Srobertoisc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
760258945Sroberto	return (isc_rwlock_lock(rwl, type));
761258945Sroberto}
762258945Sroberto
763258945Srobertoisc_result_t
764258945Srobertoisc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
765258945Sroberto	isc_result_t result = ISC_R_SUCCESS;
766258945Sroberto
767258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
768258945Sroberto	REQUIRE(rwl->type == isc_rwlocktype_read);
769258945Sroberto	REQUIRE(rwl->active != 0);
770258945Sroberto
771258945Sroberto	/* If we are the only reader then succeed. */
772258945Sroberto	if (rwl->active == 1)
773258945Sroberto		rwl->type = isc_rwlocktype_write;
774258945Sroberto	else
775258945Sroberto		result = ISC_R_LOCKBUSY;
776258945Sroberto	return (result);
777258945Sroberto}
778258945Sroberto
779258945Srobertovoid
780258945Srobertoisc_rwlock_downgrade(isc_rwlock_t *rwl) {
781258945Sroberto
782258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
783258945Sroberto	REQUIRE(rwl->type == isc_rwlocktype_write);
784258945Sroberto	REQUIRE(rwl->active == 1);
785258945Sroberto
786258945Sroberto	rwl->type = isc_rwlocktype_read;
787258945Sroberto}
788258945Sroberto
789258945Srobertoisc_result_t
790258945Srobertoisc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
791258945Sroberto	REQUIRE(VALID_RWLOCK(rwl));
792258945Sroberto	REQUIRE(rwl->type == type);
793258945Sroberto
794258945Sroberto	UNUSED(type);
795258945Sroberto
796258945Sroberto	INSIST(rwl->active > 0);
797258945Sroberto	rwl->active--;
798258945Sroberto
799258945Sroberto	return (ISC_R_SUCCESS);
800258945Sroberto}
801258945Sroberto
802258945Srobertovoid
803258945Srobertoisc_rwlock_destroy(isc_rwlock_t *rwl) {
804258945Sroberto	REQUIRE(rwl != NULL);
805258945Sroberto	REQUIRE(rwl->active == 0);
806258945Sroberto	rwl->magic = 0;
807258945Sroberto}
808258945Sroberto
809258945Sroberto#endif /* ISC_PLATFORM_USETHREADS */
810