1/*	$NetBSD: rwlock.c,v 1.14 2024/02/21 22:52:29 christos Exp $	*/
2
3/*
4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 *
8 * This Source Code Form is subject to the terms of the Mozilla Public
9 * License, v. 2.0. If a copy of the MPL was not distributed with this
10 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
11 *
12 * See the COPYRIGHT file distributed with this work for additional
13 * information regarding copyright ownership.
14 */
15
16/*! \file */
17
18#include <inttypes.h>
19#include <stdbool.h>
20#include <stddef.h>
21
22#if defined(sun) && (defined(__sparc) || defined(__sparc__))
23#include <synch.h> /* for smt_pause(3c) */
24#endif /* if defined(sun) && (defined(__sparc) || defined(__sparc__)) */
25
26#include <isc/atomic.h>
27#include <isc/magic.h>
28#include <isc/print.h>
29#include <isc/rwlock.h>
30#include <isc/util.h>
31
32#if USE_PTHREAD_RWLOCK
33
34#include <errno.h>
35#include <pthread.h>
36
37void
38isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
39		unsigned int write_quota) {
40	UNUSED(read_quota);
41	UNUSED(write_quota);
42	REQUIRE(pthread_rwlock_init(&rwl->rwlock, NULL) == 0);
43	atomic_init(&rwl->downgrade, false);
44}
45
46isc_result_t
47isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
48	switch (type) {
49	case isc_rwlocktype_read:
50		REQUIRE(pthread_rwlock_rdlock(&rwl->rwlock) == 0);
51		break;
52	case isc_rwlocktype_write:
53		while (true) {
54			REQUIRE(pthread_rwlock_wrlock(&rwl->rwlock) == 0);
55			/* Unlock if in middle of downgrade operation */
56			if (atomic_load_acquire(&rwl->downgrade)) {
57				REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) ==
58					0);
59				while (atomic_load_acquire(&rwl->downgrade)) {
60				}
61				continue;
62			}
63			break;
64		}
65		break;
66	default:
67		UNREACHABLE();
68	}
69	return (ISC_R_SUCCESS);
70}
71
72isc_result_t
73isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
74	int ret = 0;
75	switch (type) {
76	case isc_rwlocktype_read:
77		ret = pthread_rwlock_tryrdlock(&rwl->rwlock);
78		break;
79	case isc_rwlocktype_write:
80		ret = pthread_rwlock_trywrlock(&rwl->rwlock);
81		if ((ret == 0) && atomic_load_acquire(&rwl->downgrade)) {
82			isc_rwlock_unlock(rwl, type);
83			return (ISC_R_LOCKBUSY);
84		}
85		break;
86	default:
87		UNREACHABLE();
88	}
89
90	switch (ret) {
91	case 0:
92		return (ISC_R_SUCCESS);
93	case EBUSY:
94		return (ISC_R_LOCKBUSY);
95	case EAGAIN:
96		return (ISC_R_LOCKBUSY);
97	default:
98		UNREACHABLE();
99	}
100}
101
102isc_result_t
103isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
104	UNUSED(type);
105	REQUIRE(pthread_rwlock_unlock(&rwl->rwlock) == 0);
106	return (ISC_R_SUCCESS);
107}
108
109isc_result_t
110isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
111	UNUSED(rwl);
112	return (ISC_R_LOCKBUSY);
113}
114
115void
116isc_rwlock_downgrade(isc_rwlock_t *rwl) {
117	isc_result_t result;
118	atomic_store_release(&rwl->downgrade, true);
119	result = isc_rwlock_unlock(rwl, isc_rwlocktype_write);
120	RUNTIME_CHECK(result == ISC_R_SUCCESS);
121	result = isc_rwlock_lock(rwl, isc_rwlocktype_read);
122	RUNTIME_CHECK(result == ISC_R_SUCCESS);
123	atomic_store_release(&rwl->downgrade, false);
124}
125
126void
127isc_rwlock_destroy(isc_rwlock_t *rwl) {
128	pthread_rwlock_destroy(&rwl->rwlock);
129}
130
131#else /* if USE_PTHREAD_RWLOCK */
132
133#define RWLOCK_MAGIC	  ISC_MAGIC('R', 'W', 'L', 'k')
134#define VALID_RWLOCK(rwl) ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
135
136#ifndef RWLOCK_DEFAULT_READ_QUOTA
137#define RWLOCK_DEFAULT_READ_QUOTA 4
138#endif /* ifndef RWLOCK_DEFAULT_READ_QUOTA */
139
140#ifndef RWLOCK_DEFAULT_WRITE_QUOTA
141#define RWLOCK_DEFAULT_WRITE_QUOTA 4
142#endif /* ifndef RWLOCK_DEFAULT_WRITE_QUOTA */
143
144#ifndef RWLOCK_MAX_ADAPTIVE_COUNT
145#define RWLOCK_MAX_ADAPTIVE_COUNT 100
146#endif /* ifndef RWLOCK_MAX_ADAPTIVE_COUNT */
147
148#ifdef __lint__
149# define isc_rwlock_pause()
150#else
151#if defined(_MSC_VER)
152#include <intrin.h>
153#define isc_rwlock_pause() YieldProcessor()
154#elif defined(__x86_64__)
155#include <immintrin.h>
156#define isc_rwlock_pause() _mm_pause()
157#elif defined(__i386__)
158#define isc_rwlock_pause() __asm__ __volatile__("rep; nop")
159#elif defined(__ia64__)
160#define isc_rwlock_pause() __asm__ __volatile__("hint @pause")
161#elif defined(__arm__) && (defined(_ARM_ARCH_6) || HAVE_ARM_YIELD)
162#define isc_rwlock_pause() __asm__ __volatile__("yield")
163#elif defined(sun) && (defined(__sparc) || defined(__sparc__))
164#define isc_rwlock_pause() smt_pause()
165#elif (defined(__sparc) || defined(__sparc__)) && HAVE_SPARC_PAUSE
166#define isc_rwlock_pause() __asm__ __volatile__("pause")
167#elif defined(__ppc__) || defined(_ARCH_PPC) || defined(_ARCH_PWR) || \
168	defined(_ARCH_PWR2) || defined(_POWER)
169#define isc_rwlock_pause() __asm__ volatile("or 27,27,27")
170#else /* if defined(_MSC_VER) */
171#define isc_rwlock_pause()
172#endif /* if defined(_MSC_VER) */
173#endif
174
175static isc_result_t
176isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type);
177
178#ifdef ISC_RWLOCK_TRACE
179#include <stdio.h> /* Required for fprintf/stderr. */
180
181#include <isc/thread.h> /* Required for isc_thread_self(). */
182
183static void
184print_lock(const char *operation, isc_rwlock_t *rwl, isc_rwlocktype_t type) {
185	fprintf(stderr,
186		"rwlock %p thread %" PRIuPTR " %s(%s): "
187		"write_requests=%u, write_completions=%u, "
188		"cnt_and_flag=0x%x, readers_waiting=%u, "
189		"write_granted=%u, write_quota=%u\n",
190		rwl, isc_thread_self(), operation,
191		(type == isc_rwlocktype_read ? "read" : "write"),
192		atomic_load_acquire(&rwl->write_requests),
193		atomic_load_acquire(&rwl->write_completions),
194		atomic_load_acquire(&rwl->cnt_and_flag), rwl->readers_waiting,
195		atomic_load_acquire(&rwl->write_granted), rwl->write_quota);
196}
197#endif			/* ISC_RWLOCK_TRACE */
198
199void
200isc_rwlock_init(isc_rwlock_t *rwl, unsigned int read_quota,
201		unsigned int write_quota) {
202	REQUIRE(rwl != NULL);
203
204	/*
205	 * In case there's trouble initializing, we zero magic now.  If all
206	 * goes well, we'll set it to RWLOCK_MAGIC.
207	 */
208	rwl->magic = 0;
209
210	atomic_init(&rwl->spins, 0);
211	atomic_init(&rwl->write_requests, 0);
212	atomic_init(&rwl->write_completions, 0);
213	atomic_init(&rwl->cnt_and_flag, 0);
214	rwl->readers_waiting = 0;
215	atomic_init(&rwl->write_granted, 0);
216	if (read_quota != 0) {
217		UNEXPECTED_ERROR("read quota is not supported");
218	}
219	if (write_quota == 0) {
220		write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
221	}
222	rwl->write_quota = write_quota;
223
224	isc_mutex_init(&rwl->lock);
225
226	isc_condition_init(&rwl->readable);
227	isc_condition_init(&rwl->writeable);
228
229	rwl->magic = RWLOCK_MAGIC;
230}
231
232void
233isc_rwlock_destroy(isc_rwlock_t *rwl) {
234	REQUIRE(VALID_RWLOCK(rwl));
235
236	REQUIRE(atomic_load_acquire(&rwl->write_requests) ==
237			atomic_load_acquire(&rwl->write_completions) &&
238		atomic_load_acquire(&rwl->cnt_and_flag) == 0 &&
239		rwl->readers_waiting == 0);
240
241	rwl->magic = 0;
242	(void)isc_condition_destroy(&rwl->readable);
243	(void)isc_condition_destroy(&rwl->writeable);
244	isc_mutex_destroy(&rwl->lock);
245}
246
247/*
248 * When some architecture-dependent atomic operations are available,
249 * rwlock can be more efficient than the generic algorithm defined below.
250 * The basic algorithm is described in the following URL:
251 *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
252 *
253 * The key is to use the following integer variables modified atomically:
254 *   write_requests, write_completions, and cnt_and_flag.
255 *
256 * write_requests and write_completions act as a waiting queue for writers
257 * in order to ensure the FIFO order.  Both variables begin with the initial
258 * value of 0.  When a new writer tries to get a write lock, it increments
259 * write_requests and gets the previous value of the variable as a "ticket".
260 * When write_completions reaches the ticket number, the new writer can start
261 * writing.  When the writer completes its work, it increments
262 * write_completions so that another new writer can start working.  If the
263 * write_requests is not equal to write_completions, it means a writer is now
264 * working or waiting.  In this case, a new readers cannot start reading, or
265 * in other words, this algorithm basically prefers writers.
266 *
267 * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
268 * variable is a kind of structure with two members: writer_flag (1 bit) and
269 * reader_count (31 bits).  The writer_flag shows whether a writer is working,
270 * and the reader_count shows the number of readers currently working or almost
271 * ready for working.  A writer who has the current "ticket" tries to get the
272 * lock by exclusively setting the writer_flag to 1, provided that the whole
273 * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
274 * a new reader tries to increment the "reader_count" field provided that
275 * the writer_flag is 0 (meaning there is no writer working).
276 *
277 * If some of the above operations fail, the reader or the writer sleeps
278 * until the related condition changes.  When a working reader or writer
279 * completes its work, some readers or writers are sleeping, and the condition
280 * that suspended the reader or writer has changed, it wakes up the sleeping
281 * readers or writers.
282 *
283 * As already noted, this algorithm basically prefers writers.  In order to
284 * prevent readers from starving, however, the algorithm also introduces the
285 * "writer quota" (Q).  When Q consecutive writers have completed their work,
286 * suspending readers, the last writer will wake up the readers, even if a new
287 * writer is waiting.
288 *
289 * Implementation specific note: due to the combination of atomic operations
290 * and a mutex lock, ordering between the atomic operation and locks can be
291 * very sensitive in some cases.  In particular, it is generally very important
292 * to check the atomic variable that requires a reader or writer to sleep after
293 * locking the mutex and before actually sleeping; otherwise, it could be very
294 * likely to cause a deadlock.  For example, assume "var" is a variable
295 * atomically modified, then the corresponding code would be:
296 *	if (var == need_sleep) {
297 *		LOCK(lock);
298 *		if (var == need_sleep)
299 *			WAIT(cond, lock);
300 *		UNLOCK(lock);
301 *	}
302 * The second check is important, since "var" is protected by the atomic
303 * operation, not by the mutex, and can be changed just before sleeping.
304 * (The first "if" could be omitted, but this is also important in order to
305 * make the code efficient by avoiding the use of the mutex unless it is
306 * really necessary.)
307 */
308
309#define WRITER_ACTIVE 0x1
310#define READER_INCR   0x2
311
312static isc_result_t
313isc__rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
314	int32_t cntflag;
315
316	REQUIRE(VALID_RWLOCK(rwl));
317
318#ifdef ISC_RWLOCK_TRACE
319	print_lock("prelock", rwl, type);
320#endif /* ifdef ISC_RWLOCK_TRACE */
321
322	if (type == isc_rwlocktype_read) {
323		if (atomic_load_acquire(&rwl->write_requests) !=
324		    atomic_load_acquire(&rwl->write_completions))
325		{
326			/* there is a waiting or active writer */
327			LOCK(&rwl->lock);
328			if (atomic_load_acquire(&rwl->write_requests) !=
329			    atomic_load_acquire(&rwl->write_completions))
330			{
331				rwl->readers_waiting++;
332				WAIT(&rwl->readable, &rwl->lock);
333				rwl->readers_waiting--;
334			}
335			UNLOCK(&rwl->lock);
336		}
337
338		cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
339						   READER_INCR);
340		POST(cntflag);
341		while (1) {
342			if ((atomic_load_acquire(&rwl->cnt_and_flag) &
343			     WRITER_ACTIVE) == 0)
344			{
345				break;
346			}
347
348			/* A writer is still working */
349			LOCK(&rwl->lock);
350			rwl->readers_waiting++;
351			if ((atomic_load_acquire(&rwl->cnt_and_flag) &
352			     WRITER_ACTIVE) != 0)
353			{
354				WAIT(&rwl->readable, &rwl->lock);
355			}
356			rwl->readers_waiting--;
357			UNLOCK(&rwl->lock);
358
359			/*
360			 * Typically, the reader should be able to get a lock
361			 * at this stage:
362			 *   (1) there should have been no pending writer when
363			 *       the reader was trying to increment the
364			 *       counter; otherwise, the writer should be in
365			 *       the waiting queue, preventing the reader from
366			 *       proceeding to this point.
367			 *   (2) once the reader increments the counter, no
368			 *       more writer can get a lock.
369			 * Still, it is possible another writer can work at
370			 * this point, e.g. in the following scenario:
371			 *   A previous writer unlocks the writer lock.
372			 *   This reader proceeds to point (1).
373			 *   A new writer appears, and gets a new lock before
374			 *   the reader increments the counter.
375			 *   The reader then increments the counter.
376			 *   The previous writer notices there is a waiting
377			 *   reader who is almost ready, and wakes it up.
378			 * So, the reader needs to confirm whether it can now
379			 * read explicitly (thus we loop).  Note that this is
380			 * not an infinite process, since the reader has
381			 * incremented the counter at this point.
382			 */
383		}
384
385		/*
386		 * If we are temporarily preferred to writers due to the writer
387		 * quota, reset the condition (race among readers doesn't
388		 * matter).
389		 */
390		atomic_store_release(&rwl->write_granted, 0);
391	} else {
392		int32_t prev_writer;
393
394		/* enter the waiting queue, and wait for our turn */
395		prev_writer = atomic_fetch_add_release(&rwl->write_requests, 1);
396		while (atomic_load_acquire(&rwl->write_completions) !=
397		       prev_writer)
398		{
399			LOCK(&rwl->lock);
400			if (atomic_load_acquire(&rwl->write_completions) !=
401			    prev_writer)
402			{
403				WAIT(&rwl->writeable, &rwl->lock);
404				UNLOCK(&rwl->lock);
405				continue;
406			}
407			UNLOCK(&rwl->lock);
408			break;
409		}
410
411		while (!atomic_compare_exchange_weak_acq_rel(
412			&rwl->cnt_and_flag, &(int_fast32_t){ 0 },
413			WRITER_ACTIVE))
414		{
415			/* Another active reader or writer is working. */
416			LOCK(&rwl->lock);
417			if (atomic_load_acquire(&rwl->cnt_and_flag) != 0) {
418				WAIT(&rwl->writeable, &rwl->lock);
419			}
420			UNLOCK(&rwl->lock);
421		}
422
423		INSIST((atomic_load_acquire(&rwl->cnt_and_flag) &
424			WRITER_ACTIVE));
425		atomic_fetch_add_release(&rwl->write_granted, 1);
426	}
427
428#ifdef ISC_RWLOCK_TRACE
429	print_lock("postlock", rwl, type);
430#endif /* ifdef ISC_RWLOCK_TRACE */
431
432	return (ISC_R_SUCCESS);
433}
434
435isc_result_t
436isc_rwlock_lock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
437	int32_t cnt = 0;
438	int32_t spins = atomic_load_acquire(&rwl->spins) * 2 + 10;
439	int32_t max_cnt = ISC_MAX(spins, RWLOCK_MAX_ADAPTIVE_COUNT);
440	isc_result_t result = ISC_R_SUCCESS;
441
442	do {
443		if (cnt++ >= max_cnt) {
444			result = isc__rwlock_lock(rwl, type);
445			break;
446		}
447		isc_rwlock_pause();
448	} while (isc_rwlock_trylock(rwl, type) != ISC_R_SUCCESS);
449
450	atomic_fetch_add_release(&rwl->spins, (cnt - spins) / 8);
451
452	return (result);
453}
454
455isc_result_t
456isc_rwlock_trylock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
457	int32_t cntflag;
458
459	REQUIRE(VALID_RWLOCK(rwl));
460
461#ifdef ISC_RWLOCK_TRACE
462	print_lock("prelock", rwl, type);
463#endif /* ifdef ISC_RWLOCK_TRACE */
464
465	if (type == isc_rwlocktype_read) {
466		/* If a writer is waiting or working, we fail. */
467		if (atomic_load_acquire(&rwl->write_requests) !=
468		    atomic_load_acquire(&rwl->write_completions))
469		{
470			return (ISC_R_LOCKBUSY);
471		}
472
473		/* Otherwise, be ready for reading. */
474		cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
475						   READER_INCR);
476		if ((cntflag & WRITER_ACTIVE) != 0) {
477			/*
478			 * A writer is working.  We lose, and cancel the read
479			 * request.
480			 */
481			cntflag = atomic_fetch_sub_release(&rwl->cnt_and_flag,
482							   READER_INCR);
483			/*
484			 * If no other readers are waiting and we've suspended
485			 * new writers in this short period, wake them up.
486			 */
487			if (cntflag == READER_INCR &&
488			    atomic_load_acquire(&rwl->write_completions) !=
489				    atomic_load_acquire(&rwl->write_requests))
490			{
491				LOCK(&rwl->lock);
492				BROADCAST(&rwl->writeable);
493				UNLOCK(&rwl->lock);
494			}
495
496			return (ISC_R_LOCKBUSY);
497		}
498	} else {
499		/* Try locking without entering the waiting queue. */
500		int_fast32_t zero = 0;
501		if (!atomic_compare_exchange_strong_acq_rel(
502			    &rwl->cnt_and_flag, &zero, WRITER_ACTIVE))
503		{
504			return (ISC_R_LOCKBUSY);
505		}
506
507		/*
508		 * XXXJT: jump into the queue, possibly breaking the writer
509		 * order.
510		 */
511		atomic_fetch_sub_release(&rwl->write_completions, 1);
512		atomic_fetch_add_release(&rwl->write_granted, 1);
513	}
514
515#ifdef ISC_RWLOCK_TRACE
516	print_lock("postlock", rwl, type);
517#endif /* ifdef ISC_RWLOCK_TRACE */
518
519	return (ISC_R_SUCCESS);
520}
521
522isc_result_t
523isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
524	REQUIRE(VALID_RWLOCK(rwl));
525
526	int_fast32_t reader_incr = READER_INCR;
527
528	/* Try to acquire write access. */
529	atomic_compare_exchange_strong_acq_rel(&rwl->cnt_and_flag, &reader_incr,
530					       WRITER_ACTIVE);
531	/*
532	 * There must have been no writer, and there must have
533	 * been at least one reader.
534	 */
535	INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
536	       (reader_incr & ~WRITER_ACTIVE) != 0);
537
538	if (reader_incr == READER_INCR) {
539		/*
540		 * We are the only reader and have been upgraded.
541		 * Now jump into the head of the writer waiting queue.
542		 */
543		atomic_fetch_sub_release(&rwl->write_completions, 1);
544	} else {
545		return (ISC_R_LOCKBUSY);
546	}
547
548	return (ISC_R_SUCCESS);
549}
550
551void
552isc_rwlock_downgrade(isc_rwlock_t *rwl) {
553	int32_t prev_readers;
554
555	REQUIRE(VALID_RWLOCK(rwl));
556
557	/* Become an active reader. */
558	prev_readers = atomic_fetch_add_release(&rwl->cnt_and_flag,
559						READER_INCR);
560	/* We must have been a writer. */
561	INSIST((prev_readers & WRITER_ACTIVE) != 0);
562
563	/* Complete write */
564	atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
565	atomic_fetch_add_release(&rwl->write_completions, 1);
566
567	/* Resume other readers */
568	LOCK(&rwl->lock);
569	if (rwl->readers_waiting > 0) {
570		BROADCAST(&rwl->readable);
571	}
572	UNLOCK(&rwl->lock);
573}
574
575isc_result_t
576isc_rwlock_unlock(isc_rwlock_t *rwl, isc_rwlocktype_t type) {
577	int32_t prev_cnt;
578
579	REQUIRE(VALID_RWLOCK(rwl));
580
581#ifdef ISC_RWLOCK_TRACE
582	print_lock("preunlock", rwl, type);
583#endif /* ifdef ISC_RWLOCK_TRACE */
584
585	if (type == isc_rwlocktype_read) {
586		prev_cnt = atomic_fetch_sub_release(&rwl->cnt_and_flag,
587						    READER_INCR);
588		/*
589		 * If we're the last reader and any writers are waiting, wake
590		 * them up.  We need to wake up all of them to ensure the
591		 * FIFO order.
592		 */
593		if (prev_cnt == READER_INCR &&
594		    atomic_load_acquire(&rwl->write_completions) !=
595			    atomic_load_acquire(&rwl->write_requests))
596		{
597			LOCK(&rwl->lock);
598			BROADCAST(&rwl->writeable);
599			UNLOCK(&rwl->lock);
600		}
601	} else {
602		bool wakeup_writers = true;
603
604		/*
605		 * Reset the flag, and (implicitly) tell other writers
606		 * we are done.
607		 */
608		atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
609		atomic_fetch_add_release(&rwl->write_completions, 1);
610
611		if ((atomic_load_acquire(&rwl->write_granted) >=
612		     rwl->write_quota) ||
613		    (atomic_load_acquire(&rwl->write_requests) ==
614		     atomic_load_acquire(&rwl->write_completions)) ||
615		    (atomic_load_acquire(&rwl->cnt_and_flag) & ~WRITER_ACTIVE))
616		{
617			/*
618			 * We have passed the write quota, no writer is
619			 * waiting, or some readers are almost ready, pending
620			 * possible writers.  Note that the last case can
621			 * happen even if write_requests != write_completions
622			 * (which means a new writer in the queue), so we need
623			 * to catch the case explicitly.
624			 */
625			LOCK(&rwl->lock);
626			if (rwl->readers_waiting > 0) {
627				wakeup_writers = false;
628				BROADCAST(&rwl->readable);
629			}
630			UNLOCK(&rwl->lock);
631		}
632
633		if ((atomic_load_acquire(&rwl->write_requests) !=
634		     atomic_load_acquire(&rwl->write_completions)) &&
635		    wakeup_writers)
636		{
637			LOCK(&rwl->lock);
638			BROADCAST(&rwl->writeable);
639			UNLOCK(&rwl->lock);
640		}
641	}
642
643#ifdef ISC_RWLOCK_TRACE
644	print_lock("postunlock", rwl, type);
645#endif /* ifdef ISC_RWLOCK_TRACE */
646
647	return (ISC_R_SUCCESS);
648}
649
650#endif /* USE_PTHREAD_RWLOCK */
651