1/*
2 * Copyright (C) 2004, 2005, 2007, 2009  Internet Systems Consortium, Inc. ("ISC")
3 * Copyright (C) 1998-2001, 2003  Internet Software Consortium.
4 *
5 * Permission to use, copy, modify, and/or distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
8 *
9 * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
10 * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
11 * AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
12 * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13 * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
14 * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15 * PERFORMANCE OF THIS SOFTWARE.
16 */
17
18/* $Id: rwlock.c,v 1.44.332.2 2009/01/18 23:47:41 tbox Exp $ */
19
20/*! \file */
21
22#include <config.h>
23
24#include <stddef.h>
25
26#include <isc/atomic.h>
27#include <isc/magic.h>
28#include <isc/msgs.h>
29#include <isc/platform.h>
30#include <isc/rwlock.h>
31#include <isc/util.h>
32
33#define RWLOCK_MAGIC		ISC_MAGIC('R', 'W', 'L', 'k')
34#define VALID_RWLOCK(rwl)	ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
35
36#ifdef ISC_PLATFORM_USETHREADS
37
38#ifndef RWLOCK_DEFAULT_READ_QUOTA
39#define RWLOCK_DEFAULT_READ_QUOTA 4
40#endif
41
42#ifndef RWLOCK_DEFAULT_WRITE_QUOTA
43#define RWLOCK_DEFAULT_WRITE_QUOTA 4
44#endif
45
46#ifdef ISC_RWLOCK_TRACE
47#include <stdio.h>		/* Required for fprintf/stderr. */
48#include <isc/thread.h>		/* Required for isc_thread_self(). */
49
50static void
51print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
52	fprintf(stderr,
53		isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
54			       ISC_MSG_PRINTLOCK,
55			       "rwlock %p thread %lu %s(%s): %s, %u active, "
56			       "%u granted, %u rwaiting, %u wwaiting\n"),
57		rwl, isc_thread_self(), operation,
58		(type == isc_rwlocktype_read ?
59		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
60				ISC_MSG_READ, "read") :
61		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
62				ISC_MSG_WRITE, "write")),
63		(rwl->type == isc_rwlocktype_read ?
64		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
65				ISC_MSG_READING, "reading") :
66		 isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
67				ISC_MSG_WRITING, "writing")),
68		rwl->active, rwl->granted, rwl->readers_waiting,
69		rwl->writers_waiting);
70}
71#endif
72
73isc_result_t
74isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
75		unsigned int write_quota)
76{
77	isc_result_t result;
78
79	REQUIRE(rwl != NULL);
80
81	/*
82	 * In case there's trouble initializing, we zero magic now.  If all
83	 * goes well, we'll set it to RWLOCK_MAGIC.
84	 */
85	rwl->magic = 0;
86
87#if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
88	rwl->write_requests = 0;
89	rwl->write_completions = 0;
90	rwl->cnt_and_flag = 0;
91	rwl->readers_waiting = 0;
92	rwl->write_granted = 0;
93	if (read_quota != 0) {
94		UNEXPECTED_ERROR(__FILE__, __LINE__,
95				 "read quota is not supported");
96	}
97	if (write_quota == 0)
98		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
99	rwl->write_quota = write_quota;
100#else
101	rwl->type = isc_rwlocktype_read;
102	rwl->original = isc_rwlocktype_none;
103	rwl->active = 0;
104	rwl->granted = 0;
105	rwl->readers_waiting = 0;
106	rwl->writers_waiting = 0;
107	if (read_quota == 0)
108		read_quota = RWLOCK_DEFAULT_READ_QUOTA;
109	rwl->read_quota = read_quota;
110	if (write_quota == 0)
111		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
112	rwl->write_quota = write_quota;
113#endif
114
115	result = isc_mutex_init(&rwl->lock);
116	if (result != ISC_R_SUCCESS)
117		return (result);
118
119	result = isc_condition_init(&rwl->readable);
120	if (result != ISC_R_SUCCESS) {
121		UNEXPECTED_ERROR(__FILE__, __LINE__,
122				 "isc_condition_init(readable) %s: %s",
123				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
124						ISC_MSG_FAILED, "failed"),
125				 isc_result_totext(result));
126		result = ISC_R_UNEXPECTED;
127		goto destroy_lock;
128	}
129	result = isc_condition_init(&rwl->writeable);
130	if (result != ISC_R_SUCCESS) {
131		UNEXPECTED_ERROR(__FILE__, __LINE__,
132				 "isc_condition_init(writeable) %s: %s",
133				 isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
134						ISC_MSG_FAILED, "failed"),
135				 isc_result_totext(result));
136		result = ISC_R_UNEXPECTED;
137		goto destroy_rcond;
138	}
139
140	rwl->magic = RWLOCK_MAGIC;
141
142	return (ISC_R_SUCCESS);
143
144  destroy_rcond:
145	(void)isc_condition_destroy(&rwl->readable);
146  destroy_lock:
147	DESTROYLOCK(&rwl->lock);
148
149	return (result);
150}
151
152void
153isc_rwlock_destroy(isc_rwlock_t *rwl) {
154	REQUIRE(VALID_RWLOCK(rwl));
155
156#if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
157	REQUIRE(rwl->write_requests == rwl->write_completions &&
158		rwl->cnt_and_flag == 0 && rwl->readers_waiting == 0);
159#else
160	LOCK(&rwl->lock);
161	REQUIRE(rwl->active == 0 &&
162		rwl->readers_waiting == 0 &&
163		rwl->writers_waiting == 0);
164	UNLOCK(&rwl->lock);
165#endif
166
167	rwl->magic = 0;
168	(void)isc_condition_destroy(&rwl->readable);
169	(void)isc_condition_destroy(&rwl->writeable);
170	DESTROYLOCK(&rwl->lock);
171}
172
173#if defined(ISC_PLATFORM_HAVEXADD) && defined(ISC_PLATFORM_HAVECMPXCHG)
174
175/*
176 * When some architecture-dependent atomic operations are available,
177 * rwlock can be more efficient than the generic algorithm defined below.
178 * The basic algorithm is described in the following URL:
179 *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
180 *
181 * The key is to use the following integer variables modified atomically:
182 *   write_requests, write_completions, and cnt_and_flag.
183 *
184 * write_requests and write_completions act as a waiting queue for writers
185 * in order to ensure the FIFO order.  Both variables begin with the initial
186 * value of 0.  When a new writer tries to get a write lock, it increments
187 * write_requests and gets the previous value of the variable as a "ticket".
188 * When write_completions reaches the ticket number, the new writer can start
189 * writing.  When the writer completes its work, it increments
190 * write_completions so that another new writer can start working.  If the
191 * write_requests is not equal to write_completions, it means a writer is now
192 * working or waiting.  In this case, a new readers cannot start reading, or
193 * in other words, this algorithm basically prefers writers.
194 *
195 * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
196 * variable is a kind of structure with two members: writer_flag (1 bit) and
197 * reader_count (31 bits).  The writer_flag shows whether a writer is working,
198 * and the reader_count shows the number of readers currently working or almost
199 * ready for working.  A writer who has the current "ticket" tries to get the
200 * lock by exclusively setting the writer_flag to 1, provided that the whole
201 * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
202 * a new reader tries to increment the "reader_count" field provided that
203 * the writer_flag is 0 (meaning there is no writer working).
204 *
205 * If some of the above operations fail, the reader or the writer sleeps
206 * until the related condition changes.  When a working reader or writer
207 * completes its work, some readers or writers are sleeping, and the condition
208 * that suspended the reader or writer has changed, it wakes up the sleeping
209 * readers or writers.
210 *
211 * As already noted, this algorithm basically prefers writers.  In order to
212 * prevent readers from starving, however, the algorithm also introduces the
213 * "writer quota" (Q).  When Q consecutive writers have completed their work,
214 * suspending readers, the last writer will wake up the readers, even if a new
215 * writer is waiting.
216 *
217 * Implementation specific note: due to the combination of atomic operations
218 * and a mutex lock, ordering between the atomic operation and locks can be
219 * very sensitive in some cases.  In particular, it is generally very important
220 * to check the atomic variable that requires a reader or writer to sleep after
221 * locking the mutex and before actually sleeping; otherwise, it could be very
222 * likely to cause a deadlock.  For example, assume "var" is a variable
223 * atomically modified, then the corresponding code would be:
224 *	if (var == need_sleep) {
225 *		LOCK(lock);
226 *		if (var == need_sleep)
227 *			WAIT(cond, lock);
228 *		UNLOCK(lock);
229 *	}
230 * The second check is important, since "var" is protected by the atomic
231 * operation, not by the mutex, and can be changed just before sleeping.
232 * (The first "if" could be omitted, but this is also important in order to
233 * make the code efficient by avoiding the use of the mutex unless it is
234 * really necessary.)
235 */
236
237#define WRITER_ACTIVE	0x1
238#define READER_INCR	0x2
239
240isc_result_t
241isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
242	isc_int32_t cntflag;
243
244	REQUIRE(VALID_RWLOCK(rwl));
245
246#ifdef ISC_RWLOCK_TRACE
247	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
248				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
249#endif
250
251	if (type == isc_rwlocktype_read) {
252		if (rwl->write_requests != rwl->write_completions) {
253			/* there is a waiting or active writer */
254			LOCK(&rwl->lock);
255			if (rwl->write_requests != rwl->write_completions) {
256				rwl->readers_waiting++;
257				WAIT(&rwl->readable, &rwl->lock);
258				rwl->readers_waiting--;
259			}
260			UNLOCK(&rwl->lock);
261		}
262
263		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
264		while (1) {
265			if ((rwl->cnt_and_flag & WRITER_ACTIVE) == 0)
266				break;
267
268			/* A writer is still working */
269			LOCK(&rwl->lock);
270			rwl->readers_waiting++;
271			if ((rwl->cnt_and_flag & WRITER_ACTIVE) != 0)
272				WAIT(&rwl->readable, &rwl->lock);
273			rwl->readers_waiting--;
274			UNLOCK(&rwl->lock);
275
276			/*
277			 * Typically, the reader should be able to get a lock
278			 * at this stage:
279			 *   (1) there should have been no pending writer when
280			 *       the reader was trying to increment the
281			 *       counter; otherwise, the writer should be in
282			 *       the waiting queue, preventing the reader from
283			 *       proceeding to this point.
284			 *   (2) once the reader increments the counter, no
285			 *       more writer can get a lock.
286			 * Still, it is possible another writer can work at
287			 * this point, e.g. in the following scenario:
288			 *   A previous writer unlocks the writer lock.
289			 *   This reader proceeds to point (1).
290			 *   A new writer appears, and gets a new lock before
291			 *   the reader increments the counter.
292			 *   The reader then increments the counter.
293			 *   The previous writer notices there is a waiting
294			 *   reader who is almost ready, and wakes it up.
295			 * So, the reader needs to confirm whether it can now
296			 * read explicitly (thus we loop).  Note that this is
297			 * not an infinite process, since the reader has
298			 * incremented the counter at this point.
299			 */
300		}
301
302		/*
303		 * If we are temporarily preferred to writers due to the writer
304		 * quota, reset the condition (race among readers doesn't
305		 * matter).
306		 */
307		rwl->write_granted = 0;
308	} else {
309		isc_int32_t prev_writer;
310
311		/* enter the waiting queue, and wait for our turn */
312		prev_writer = isc_atomic_xadd(&rwl->write_requests, 1);
313		while (rwl->write_completions != prev_writer) {
314			LOCK(&rwl->lock);
315			if (rwl->write_completions != prev_writer) {
316				WAIT(&rwl->writeable, &rwl->lock);
317				UNLOCK(&rwl->lock);
318				continue;
319			}
320			UNLOCK(&rwl->lock);
321			break;
322		}
323
324		while (1) {
325			cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
326						     WRITER_ACTIVE);
327			if (cntflag == 0)
328				break;
329
330			/* Another active reader or writer is working. */
331			LOCK(&rwl->lock);
332			if (rwl->cnt_and_flag != 0)
333				WAIT(&rwl->writeable, &rwl->lock);
334			UNLOCK(&rwl->lock);
335		}
336
337		INSIST((rwl->cnt_and_flag & WRITER_ACTIVE) != 0);
338		rwl->write_granted++;
339	}
340
341#ifdef ISC_RWLOCK_TRACE
342	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
343				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
344#endif
345
346	return (ISC_R_SUCCESS);
347}
348
349isc_result_t
350isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
351	isc_int32_t cntflag;
352
353	REQUIRE(VALID_RWLOCK(rwl));
354
355#ifdef ISC_RWLOCK_TRACE
356	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
357				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
358#endif
359
360	if (type == isc_rwlocktype_read) {
361		/* If a writer is waiting or working, we fail. */
362		if (rwl->write_requests != rwl->write_completions)
363			return (ISC_R_LOCKBUSY);
364
365		/* Otherwise, be ready for reading. */
366		cntflag = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
367		if ((cntflag & WRITER_ACTIVE) != 0) {
368			/*
369			 * A writer is working.  We lose, and cancel the read
370			 * request.
371			 */
372			cntflag = isc_atomic_xadd(&rwl->cnt_and_flag,
373						  -READER_INCR);
374			/*
375			 * If no other readers are waiting and we've suspended
376			 * new writers in this short period, wake them up.
377			 */
378			if (cntflag == READER_INCR &&
379			    rwl->write_completions != rwl->write_requests) {
380				LOCK(&rwl->lock);
381				BROADCAST(&rwl->writeable);
382				UNLOCK(&rwl->lock);
383			}
384
385			return (ISC_R_LOCKBUSY);
386		}
387	} else {
388		/* Try locking without entering the waiting queue. */
389		cntflag = isc_atomic_cmpxchg(&rwl->cnt_and_flag, 0,
390					     WRITER_ACTIVE);
391		if (cntflag != 0)
392			return (ISC_R_LOCKBUSY);
393
394		/*
395		 * XXXJT: jump into the queue, possibly breaking the writer
396		 * order.
397		 */
398		(void)isc_atomic_xadd(&rwl->write_completions, -1);
399
400		rwl->write_granted++;
401	}
402
403#ifdef ISC_RWLOCK_TRACE
404	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
405				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
406#endif
407
408	return (ISC_R_SUCCESS);
409}
410
411isc_result_t
412isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
413	isc_int32_t prevcnt;
414
415	REQUIRE(VALID_RWLOCK(rwl));
416
417	/* Try to acquire write access. */
418	prevcnt = isc_atomic_cmpxchg(&rwl->cnt_and_flag,
419				     READER_INCR, WRITER_ACTIVE);
420	/*
421	 * There must have been no writer, and there must have been at least
422	 * one reader.
423	 */
424	INSIST((prevcnt & WRITER_ACTIVE) == 0 &&
425	       (prevcnt & ~WRITER_ACTIVE) != 0);
426
427	if (prevcnt == READER_INCR) {
428		/*
429		 * We are the only reader and have been upgraded.
430		 * Now jump into the head of the writer waiting queue.
431		 */
432		(void)isc_atomic_xadd(&rwl->write_completions, -1);
433	} else
434		return (ISC_R_LOCKBUSY);
435
436	return (ISC_R_SUCCESS);
437
438}
439
440void
441isc_rwlock_downgrade(isc_rwlock_t *rwl) {
442	isc_int32_t prev_readers;
443
444	REQUIRE(VALID_RWLOCK(rwl));
445
446	/* Become an active reader. */
447	prev_readers = isc_atomic_xadd(&rwl->cnt_and_flag, READER_INCR);
448	/* We must have been a writer. */
449	INSIST((prev_readers & WRITER_ACTIVE) != 0);
450
451	/* Complete write */
452	(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
453	(void)isc_atomic_xadd(&rwl->write_completions, 1);
454
455	/* Resume other readers */
456	LOCK(&rwl->lock);
457	if (rwl->readers_waiting > 0)
458		BROADCAST(&rwl->readable);
459	UNLOCK(&rwl->lock);
460}
461
462isc_result_t
463isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
464	isc_int32_t prev_cnt;
465
466	REQUIRE(VALID_RWLOCK(rwl));
467
468#ifdef ISC_RWLOCK_TRACE
469	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
470				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
471#endif
472
473	if (type == isc_rwlocktype_read) {
474		prev_cnt = isc_atomic_xadd(&rwl->cnt_and_flag, -READER_INCR);
475
476		/*
477		 * If we're the last reader and any writers are waiting, wake
478		 * them up.  We need to wake up all of them to ensure the
479		 * FIFO order.
480		 */
481		if (prev_cnt == READER_INCR &&
482		    rwl->write_completions != rwl->write_requests) {
483			LOCK(&rwl->lock);
484			BROADCAST(&rwl->writeable);
485			UNLOCK(&rwl->lock);
486		}
487	} else {
488		isc_boolean_t wakeup_writers = ISC_TRUE;
489
490		/*
491		 * Reset the flag, and (implicitly) tell other writers
492		 * we are done.
493		 */
494		(void)isc_atomic_xadd(&rwl->cnt_and_flag, -WRITER_ACTIVE);
495		(void)isc_atomic_xadd(&rwl->write_completions, 1);
496
497		if (rwl->write_granted >= rwl->write_quota ||
498		    rwl->write_requests == rwl->write_completions ||
499		    (rwl->cnt_and_flag & ~WRITER_ACTIVE) != 0) {
500			/*
501			 * We have passed the write quota, no writer is
502			 * waiting, or some readers are almost ready, pending
503			 * possible writers.  Note that the last case can
504			 * happen even if write_requests != write_completions
505			 * (which means a new writer in the queue), so we need
506			 * to catch the case explicitly.
507			 */
508			LOCK(&rwl->lock);
509			if (rwl->readers_waiting > 0) {
510				wakeup_writers = ISC_FALSE;
511				BROADCAST(&rwl->readable);
512			}
513			UNLOCK(&rwl->lock);
514		}
515
516		if (rwl->write_requests != rwl->write_completions &&
517		    wakeup_writers) {
518			LOCK(&rwl->lock);
519			BROADCAST(&rwl->writeable);
520			UNLOCK(&rwl->lock);
521		}
522	}
523
524#ifdef ISC_RWLOCK_TRACE
525	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
526				  ISC_MSG_POSTUNLOCK, "postunlock"),
527		   rwl, type);
528#endif
529
530	return (ISC_R_SUCCESS);
531}
532
533#else /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
534
535static isc_result_t
536doit(isc_rwlock_t *rwl, isc_rwlocktype_t type, isc_boolean_t nonblock) {
537	isc_boolean_t skip = ISC_FALSE;
538	isc_boolean_t done = ISC_FALSE;
539	isc_result_t result = ISC_R_SUCCESS;
540
541	REQUIRE(VALID_RWLOCK(rwl));
542
543	LOCK(&rwl->lock);
544
545#ifdef ISC_RWLOCK_TRACE
546	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
547				  ISC_MSG_PRELOCK, "prelock"), rwl, type);
548#endif
549
550	if (type == isc_rwlocktype_read) {
551		if (rwl->readers_waiting != 0)
552			skip = ISC_TRUE;
553		while (!done) {
554			if (!skip &&
555			    ((rwl->active == 0 ||
556			      (rwl->type == isc_rwlocktype_read &&
557			       (rwl->writers_waiting == 0 ||
558				rwl->granted < rwl->read_quota)))))
559			{
560				rwl->type = isc_rwlocktype_read;
561				rwl->active++;
562				rwl->granted++;
563				done = ISC_TRUE;
564			} else if (nonblock) {
565				result = ISC_R_LOCKBUSY;
566				done = ISC_TRUE;
567			} else {
568				skip = ISC_FALSE;
569				rwl->readers_waiting++;
570				WAIT(&rwl->readable, &rwl->lock);
571				rwl->readers_waiting--;
572			}
573		}
574	} else {
575		if (rwl->writers_waiting != 0)
576			skip = ISC_TRUE;
577		while (!done) {
578			if (!skip && rwl->active == 0) {
579				rwl->type = isc_rwlocktype_write;
580				rwl->active = 1;
581				rwl->granted++;
582				done = ISC_TRUE;
583			} else if (nonblock) {
584				result = ISC_R_LOCKBUSY;
585				done = ISC_TRUE;
586			} else {
587				skip = ISC_FALSE;
588				rwl->writers_waiting++;
589				WAIT(&rwl->writeable, &rwl->lock);
590				rwl->writers_waiting--;
591			}
592		}
593	}
594
595#ifdef ISC_RWLOCK_TRACE
596	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
597				  ISC_MSG_POSTLOCK, "postlock"), rwl, type);
598#endif
599
600	UNLOCK(&rwl->lock);
601
602	return (result);
603}
604
605isc_result_t
606isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
607	return (doit(rwl, type, ISC_FALSE));
608}
609
610isc_result_t
611isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
612	return (doit(rwl, type, ISC_TRUE));
613}
614
615isc_result_t
616isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
617	isc_result_t result = ISC_R_SUCCESS;
618
619	REQUIRE(VALID_RWLOCK(rwl));
620	LOCK(&rwl->lock);
621	REQUIRE(rwl->type == isc_rwlocktype_read);
622	REQUIRE(rwl->active != 0);
623
624	/* If we are the only reader then succeed. */
625	if (rwl->active == 1) {
626		rwl->original = (rwl->original == isc_rwlocktype_none) ?
627				isc_rwlocktype_read : isc_rwlocktype_none;
628		rwl->type = isc_rwlocktype_write;
629	} else
630		result = ISC_R_LOCKBUSY;
631
632	UNLOCK(&rwl->lock);
633	return (result);
634}
635
636void
637isc_rwlock_downgrade(isc_rwlock_t *rwl) {
638
639	REQUIRE(VALID_RWLOCK(rwl));
640	LOCK(&rwl->lock);
641	REQUIRE(rwl->type == isc_rwlocktype_write);
642	REQUIRE(rwl->active == 1);
643
644	rwl->type = isc_rwlocktype_read;
645	rwl->original = (rwl->original == isc_rwlocktype_none) ?
646			isc_rwlocktype_write : isc_rwlocktype_none;
647	/*
648	 * Resume processing any read request that were blocked when
649	 * we upgraded.
650	 */
651	if (rwl->original == isc_rwlocktype_none &&
652	    (rwl->writers_waiting == 0 || rwl->granted < rwl->read_quota) &&
653	    rwl->readers_waiting > 0)
654		BROADCAST(&rwl->readable);
655
656	UNLOCK(&rwl->lock);
657}
658
659isc_result_t
660isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
661
662	REQUIRE(VALID_RWLOCK(rwl));
663	LOCK(&rwl->lock);
664	REQUIRE(rwl->type == type);
665
666	UNUSED(type);
667
668#ifdef ISC_RWLOCK_TRACE
669	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
670				  ISC_MSG_PREUNLOCK, "preunlock"), rwl, type);
671#endif
672
673	INSIST(rwl->active > 0);
674	rwl->active--;
675	if (rwl->active == 0) {
676		if (rwl->original != isc_rwlocktype_none) {
677			rwl->type = rwl->original;
678			rwl->original = isc_rwlocktype_none;
679		}
680		if (rwl->type == isc_rwlocktype_read) {
681			rwl->granted = 0;
682			if (rwl->writers_waiting > 0) {
683				rwl->type = isc_rwlocktype_write;
684				SIGNAL(&rwl->writeable);
685			} else if (rwl->readers_waiting > 0) {
686				/* Does this case ever happen? */
687				BROADCAST(&rwl->readable);
688			}
689		} else {
690			if (rwl->readers_waiting > 0) {
691				if (rwl->writers_waiting > 0 &&
692				    rwl->granted < rwl->write_quota) {
693					SIGNAL(&rwl->writeable);
694				} else {
695					rwl->granted = 0;
696					rwl->type = isc_rwlocktype_read;
697					BROADCAST(&rwl->readable);
698				}
699			} else if (rwl->writers_waiting > 0) {
700				rwl->granted = 0;
701				SIGNAL(&rwl->writeable);
702			} else {
703				rwl->granted = 0;
704			}
705		}
706	}
707	INSIST(rwl->original == isc_rwlocktype_none);
708
709#ifdef ISC_RWLOCK_TRACE
710	print_lock(isc_msgcat_get(isc_msgcat, ISC_MSGSET_RWLOCK,
711				  ISC_MSG_POSTUNLOCK, "postunlock"),
712		   rwl, type);
713#endif
714
715	UNLOCK(&rwl->lock);
716
717	return (ISC_R_SUCCESS);
718}
719
720#endif /* ISC_PLATFORM_HAVEXADD && ISC_PLATFORM_HAVECMPXCHG */
721#else /* ISC_PLATFORM_USETHREADS */
722
723isc_result_t
724isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
725		unsigned int write_quota)
726{
727	REQUIRE(rwl != NULL);
728
729	UNUSED(read_quota);
730	UNUSED(write_quota);
731
732	rwl->type = isc_rwlocktype_read;
733	rwl->active = 0;
734	rwl->magic = RWLOCK_MAGIC;
735
736	return (ISC_R_SUCCESS);
737}
738
739isc_result_t
740isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
741	REQUIRE(VALID_RWLOCK(rwl));
742
743	if (type == isc_rwlocktype_read) {
744		if (rwl->type != isc_rwlocktype_read && rwl->active != 0)
745			return (ISC_R_LOCKBUSY);
746		rwl->type = isc_rwlocktype_read;
747		rwl->active++;
748	} else {
749		if (rwl->active != 0)
750			return (ISC_R_LOCKBUSY);
751		rwl->type = isc_rwlocktype_write;
752		rwl->active = 1;
753	}
754	return (ISC_R_SUCCESS);
755}
756
757isc_result_t
758isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
759	return (isc_rwlock_lock(rwl, type));
760}
761
762isc_result_t
763isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
764	isc_result_t result = ISC_R_SUCCESS;
765
766	REQUIRE(VALID_RWLOCK(rwl));
767	REQUIRE(rwl->type == isc_rwlocktype_read);
768	REQUIRE(rwl->active != 0);
769
770	/* If we are the only reader then succeed. */
771	if (rwl->active == 1)
772		rwl->type = isc_rwlocktype_write;
773	else
774		result = ISC_R_LOCKBUSY;
775	return (result);
776}
777
778void
779isc_rwlock_downgrade(isc_rwlock_t *rwl) {
780
781	REQUIRE(VALID_RWLOCK(rwl));
782	REQUIRE(rwl->type == isc_rwlocktype_write);
783	REQUIRE(rwl->active == 1);
784
785	rwl->type = isc_rwlocktype_read;
786}
787
788isc_result_t
789isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
790	REQUIRE(VALID_RWLOCK(rwl));
791	REQUIRE(rwl->type == type);
792
793	UNUSED(type);
794
795	INSIST(rwl->active > 0);
796	rwl->active--;
797
798	return (ISC_R_SUCCESS);
799}
800
801void
802isc_rwlock_destroy(isc_rwlock_t *rwl) {
803	REQUIRE(rwl != NULL);
804	REQUIRE(rwl->active == 0);
805	rwl->magic = 0;
806}
807
808#endif /* ISC_PLATFORM_USETHREADS */
809