kern_rwlock.c revision 189074
1/*-
2 * Copyright (c) 2006 John Baldwin <jhb@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the author nor the names of any co-contributors
14 *    may be used to endorse or promote products derived from this software
15 *    without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30/*
31 * Machine independent bits of reader/writer lock implementation.
32 */
33
34#include <sys/cdefs.h>
35__FBSDID("$FreeBSD: head/sys/kern/kern_rwlock.c 189074 2009-02-26 15:51:54Z ed $");
36
37#include "opt_ddb.h"
38#include "opt_no_adaptive_rwlocks.h"
39
40#include <sys/param.h>
41#include <sys/ktr.h>
42#include <sys/kernel.h>
43#include <sys/lock.h>
44#include <sys/mutex.h>
45#include <sys/proc.h>
46#include <sys/rwlock.h>
47#include <sys/sysctl.h>
48#include <sys/systm.h>
49#include <sys/turnstile.h>
50
51#include <machine/cpu.h>
52
53CTASSERT((RW_RECURSE & LO_CLASSFLAGS) == RW_RECURSE);
54
55#if defined(SMP) && !defined(NO_ADAPTIVE_RWLOCKS)
56#define	ADAPTIVE_RWLOCKS
57#endif
58
59#ifdef ADAPTIVE_RWLOCKS
60static int rowner_retries = 10;
61static int rowner_loops = 10000;
62SYSCTL_NODE(_debug, OID_AUTO, rwlock, CTLFLAG_RD, NULL, "rwlock debugging");
63SYSCTL_INT(_debug_rwlock, OID_AUTO, retry, CTLFLAG_RW, &rowner_retries, 0, "");
64SYSCTL_INT(_debug_rwlock, OID_AUTO, loops, CTLFLAG_RW, &rowner_loops, 0, "");
65#endif
66
67#ifdef DDB
68#include <ddb/ddb.h>
69
70static void	db_show_rwlock(struct lock_object *lock);
71#endif
72static void	assert_rw(struct lock_object *lock, int what);
73static void	lock_rw(struct lock_object *lock, int how);
74static int	unlock_rw(struct lock_object *lock);
75
76struct lock_class lock_class_rw = {
77	.lc_name = "rw",
78	.lc_flags = LC_SLEEPLOCK | LC_RECURSABLE | LC_UPGRADABLE,
79	.lc_assert = assert_rw,
80#ifdef DDB
81	.lc_ddb_show = db_show_rwlock,
82#endif
83	.lc_lock = lock_rw,
84	.lc_unlock = unlock_rw,
85};
86
87/*
88 * Return a pointer to the owning thread if the lock is write-locked or
89 * NULL if the lock is unlocked or read-locked.
90 */
91#define	rw_wowner(rw)							\
92	((rw)->rw_lock & RW_LOCK_READ ? NULL :				\
93	    (struct thread *)RW_OWNER((rw)->rw_lock))
94
95/*
96 * Returns if a write owner is recursed.  Write ownership is not assured
97 * here and should be previously checked.
98 */
99#define	rw_recursed(rw)		((rw)->rw_recurse != 0)
100
101/*
102 * Return true if curthread helds the lock.
103 */
104#define	rw_wlocked(rw)		(rw_wowner((rw)) == curthread)
105
106/*
107 * Return a pointer to the owning thread for this lock who should receive
108 * any priority lent by threads that block on this lock.  Currently this
109 * is identical to rw_wowner().
110 */
111#define	rw_owner(rw)		rw_wowner(rw)
112
113#ifndef INVARIANTS
114#define	_rw_assert(rw, what, file, line)
115#endif
116
117void
118assert_rw(struct lock_object *lock, int what)
119{
120
121	rw_assert((struct rwlock *)lock, what);
122}
123
124void
125lock_rw(struct lock_object *lock, int how)
126{
127	struct rwlock *rw;
128
129	rw = (struct rwlock *)lock;
130	if (how)
131		rw_wlock(rw);
132	else
133		rw_rlock(rw);
134}
135
136int
137unlock_rw(struct lock_object *lock)
138{
139	struct rwlock *rw;
140
141	rw = (struct rwlock *)lock;
142	rw_assert(rw, RA_LOCKED | LA_NOTRECURSED);
143	if (rw->rw_lock & RW_LOCK_READ) {
144		rw_runlock(rw);
145		return (0);
146	} else {
147		rw_wunlock(rw);
148		return (1);
149	}
150}
151
152void
153rw_init_flags(struct rwlock *rw, const char *name, int opts)
154{
155	int flags;
156
157	MPASS((opts & ~(RW_DUPOK | RW_NOPROFILE | RW_NOWITNESS | RW_QUIET |
158	    RW_RECURSE)) == 0);
159
160	flags = LO_UPGRADABLE | LO_RECURSABLE;
161	if (opts & RW_DUPOK)
162		flags |= LO_DUPOK;
163	if (opts & RW_NOPROFILE)
164		flags |= LO_NOPROFILE;
165	if (!(opts & RW_NOWITNESS))
166		flags |= LO_WITNESS;
167	if (opts & RW_QUIET)
168		flags |= LO_QUIET;
169	flags |= opts & RW_RECURSE;
170
171	rw->rw_lock = RW_UNLOCKED;
172	rw->rw_recurse = 0;
173	lock_init(&rw->lock_object, &lock_class_rw, name, NULL, flags);
174}
175
176void
177rw_destroy(struct rwlock *rw)
178{
179
180	KASSERT(rw->rw_lock == RW_UNLOCKED, ("rw lock not unlocked"));
181	KASSERT(rw->rw_recurse == 0, ("rw lock still recursed"));
182	rw->rw_lock = RW_DESTROYED;
183	lock_destroy(&rw->lock_object);
184}
185
186void
187rw_sysinit(void *arg)
188{
189	struct rw_args *args = arg;
190
191	rw_init(args->ra_rw, args->ra_desc);
192}
193
194void
195rw_sysinit_flags(void *arg)
196{
197	struct rw_args_flags *args = arg;
198
199	rw_init_flags(args->ra_rw, args->ra_desc, args->ra_flags);
200}
201
202int
203rw_wowned(struct rwlock *rw)
204{
205
206	return (rw_wowner(rw) == curthread);
207}
208
209void
210_rw_wlock(struct rwlock *rw, const char *file, int line)
211{
212
213	MPASS(curthread != NULL);
214	KASSERT(rw->rw_lock != RW_DESTROYED,
215	    ("rw_wlock() of destroyed rwlock @ %s:%d", file, line));
216	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER | LOP_EXCLUSIVE, file,
217	    line, NULL);
218	__rw_wlock(rw, curthread, file, line);
219	LOCK_LOG_LOCK("WLOCK", &rw->lock_object, 0, rw->rw_recurse, file, line);
220	WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
221	curthread->td_locks++;
222}
223
224int
225_rw_try_wlock(struct rwlock *rw, const char *file, int line)
226{
227	int rval;
228
229	KASSERT(rw->rw_lock != RW_DESTROYED,
230	    ("rw_try_wlock() of destroyed rwlock @ %s:%d", file, line));
231
232	if (rw_wlocked(rw) && (rw->lock_object.lo_flags & RW_RECURSE) != 0) {
233		rw->rw_recurse++;
234		rval = 1;
235	} else
236		rval = atomic_cmpset_acq_ptr(&rw->rw_lock, RW_UNLOCKED,
237		    (uintptr_t)curthread);
238
239	LOCK_LOG_TRY("WLOCK", &rw->lock_object, 0, rval, file, line);
240	if (rval) {
241		WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
242		    file, line);
243		curthread->td_locks++;
244	}
245	return (rval);
246}
247
248void
249_rw_wunlock(struct rwlock *rw, const char *file, int line)
250{
251
252	MPASS(curthread != NULL);
253	KASSERT(rw->rw_lock != RW_DESTROYED,
254	    ("rw_wunlock() of destroyed rwlock @ %s:%d", file, line));
255	_rw_assert(rw, RA_WLOCKED, file, line);
256	curthread->td_locks--;
257	WITNESS_UNLOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
258	LOCK_LOG_LOCK("WUNLOCK", &rw->lock_object, 0, rw->rw_recurse, file,
259	    line);
260	if (!rw_recursed(rw))
261		lock_profile_release_lock(&rw->lock_object);
262	__rw_wunlock(rw, curthread, file, line);
263}
264/*
265 * Determines whether a new reader can acquire a lock.  Succeeds if the
266 * reader already owns a read lock and the lock is locked for read to
267 * prevent deadlock from reader recursion.  Also succeeds if the lock
268 * is unlocked and has no writer waiters or spinners.  Failing otherwise
269 * prioritizes writers before readers.
270 */
271#define	RW_CAN_READ(_rw)						\
272    ((curthread->td_rw_rlocks && (_rw) & RW_LOCK_READ) || ((_rw) &	\
273    (RW_LOCK_READ | RW_LOCK_WRITE_WAITERS | RW_LOCK_WRITE_SPINNER)) ==	\
274    RW_LOCK_READ)
275
276void
277_rw_rlock(struct rwlock *rw, const char *file, int line)
278{
279	struct turnstile *ts;
280#ifdef ADAPTIVE_RWLOCKS
281	volatile struct thread *owner;
282	int spintries = 0;
283	int i;
284#endif
285	uint64_t waittime = 0;
286	int contested = 0;
287	uintptr_t v;
288
289	KASSERT(rw->rw_lock != RW_DESTROYED,
290	    ("rw_rlock() of destroyed rwlock @ %s:%d", file, line));
291	KASSERT(rw_wowner(rw) != curthread,
292	    ("%s (%s): wlock already held @ %s:%d", __func__,
293	    rw->lock_object.lo_name, file, line));
294	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER, file, line, NULL);
295
296	for (;;) {
297		/*
298		 * Handle the easy case.  If no other thread has a write
299		 * lock, then try to bump up the count of read locks.  Note
300		 * that we have to preserve the current state of the
301		 * RW_LOCK_WRITE_WAITERS flag.  If we fail to acquire a
302		 * read lock, then rw_lock must have changed, so restart
303		 * the loop.  Note that this handles the case of a
304		 * completely unlocked rwlock since such a lock is encoded
305		 * as a read lock with no waiters.
306		 */
307		v = rw->rw_lock;
308		if (RW_CAN_READ(v)) {
309			/*
310			 * The RW_LOCK_READ_WAITERS flag should only be set
311			 * if the lock has been unlocked and write waiters
312			 * were present.
313			 */
314			if (atomic_cmpset_acq_ptr(&rw->rw_lock, v,
315			    v + RW_ONE_READER)) {
316				if (LOCK_LOG_TEST(&rw->lock_object, 0))
317					CTR4(KTR_LOCK,
318					    "%s: %p succeed %p -> %p", __func__,
319					    rw, (void *)v,
320					    (void *)(v + RW_ONE_READER));
321				break;
322			}
323			cpu_spinwait();
324			continue;
325		}
326		lock_profile_obtain_lock_failed(&rw->lock_object,
327		    &contested, &waittime);
328
329#ifdef ADAPTIVE_RWLOCKS
330		/*
331		 * If the owner is running on another CPU, spin until
332		 * the owner stops running or the state of the lock
333		 * changes.
334		 */
335		if ((v & RW_LOCK_READ) == 0) {
336			owner = (struct thread *)RW_OWNER(v);
337			if (TD_IS_RUNNING(owner)) {
338				if (LOCK_LOG_TEST(&rw->lock_object, 0))
339					CTR3(KTR_LOCK,
340					    "%s: spinning on %p held by %p",
341					    __func__, rw, owner);
342				while ((struct thread*)RW_OWNER(rw->rw_lock) ==
343				    owner && TD_IS_RUNNING(owner))
344					cpu_spinwait();
345				continue;
346			}
347		} else if (spintries < rowner_retries) {
348			spintries++;
349			for (i = 0; i < rowner_loops; i++) {
350				v = rw->rw_lock;
351				if ((v & RW_LOCK_READ) == 0 || RW_CAN_READ(v))
352					break;
353				cpu_spinwait();
354			}
355			if (i != rowner_loops)
356				continue;
357		}
358#endif
359
360		/*
361		 * Okay, now it's the hard case.  Some other thread already
362		 * has a write lock or there are write waiters present,
363		 * acquire the turnstile lock so we can begin the process
364		 * of blocking.
365		 */
366		ts = turnstile_trywait(&rw->lock_object);
367
368		/*
369		 * The lock might have been released while we spun, so
370		 * recheck its state and restart the loop if needed.
371		 */
372		v = rw->rw_lock;
373		if (RW_CAN_READ(v)) {
374			turnstile_cancel(ts);
375			cpu_spinwait();
376			continue;
377		}
378
379#ifdef ADAPTIVE_RWLOCKS
380		/*
381		 * If the current owner of the lock is executing on another
382		 * CPU quit the hard path and try to spin.
383		 */
384		if ((v & RW_LOCK_READ) == 0) {
385			owner = (struct thread *)RW_OWNER(v);
386			if (TD_IS_RUNNING(owner)) {
387				turnstile_cancel(ts);
388				cpu_spinwait();
389				continue;
390			}
391		}
392#endif
393
394		/*
395		 * The lock is held in write mode or it already has waiters.
396		 */
397		MPASS(!RW_CAN_READ(v));
398
399		/*
400		 * If the RW_LOCK_READ_WAITERS flag is already set, then
401		 * we can go ahead and block.  If it is not set then try
402		 * to set it.  If we fail to set it drop the turnstile
403		 * lock and restart the loop.
404		 */
405		if (!(v & RW_LOCK_READ_WAITERS)) {
406			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
407			    v | RW_LOCK_READ_WAITERS)) {
408				turnstile_cancel(ts);
409				cpu_spinwait();
410				continue;
411			}
412			if (LOCK_LOG_TEST(&rw->lock_object, 0))
413				CTR2(KTR_LOCK, "%s: %p set read waiters flag",
414				    __func__, rw);
415		}
416
417		/*
418		 * We were unable to acquire the lock and the read waiters
419		 * flag is set, so we must block on the turnstile.
420		 */
421		if (LOCK_LOG_TEST(&rw->lock_object, 0))
422			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
423			    rw);
424		turnstile_wait(ts, rw_owner(rw), TS_SHARED_QUEUE);
425		if (LOCK_LOG_TEST(&rw->lock_object, 0))
426			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
427			    __func__, rw);
428	}
429
430	/*
431	 * TODO: acquire "owner of record" here.  Here be turnstile dragons
432	 * however.  turnstiles don't like owners changing between calls to
433	 * turnstile_wait() currently.
434	 */
435	lock_profile_obtain_lock_success( &rw->lock_object, contested,
436	    waittime, file, line);
437	LOCK_LOG_LOCK("RLOCK", &rw->lock_object, 0, 0, file, line);
438	WITNESS_LOCK(&rw->lock_object, 0, file, line);
439	curthread->td_locks++;
440	curthread->td_rw_rlocks++;
441}
442
443int
444_rw_try_rlock(struct rwlock *rw, const char *file, int line)
445{
446	uintptr_t x;
447
448	for (;;) {
449		x = rw->rw_lock;
450		KASSERT(rw->rw_lock != RW_DESTROYED,
451		    ("rw_try_rlock() of destroyed rwlock @ %s:%d", file, line));
452		if (!(x & RW_LOCK_READ))
453			break;
454		if (atomic_cmpset_acq_ptr(&rw->rw_lock, x, x + RW_ONE_READER)) {
455			LOCK_LOG_TRY("RLOCK", &rw->lock_object, 0, 1, file,
456			    line);
457			WITNESS_LOCK(&rw->lock_object, LOP_TRYLOCK, file, line);
458			curthread->td_locks++;
459			curthread->td_rw_rlocks++;
460			return (1);
461		}
462	}
463
464	LOCK_LOG_TRY("RLOCK", &rw->lock_object, 0, 0, file, line);
465	return (0);
466}
467
468void
469_rw_runlock(struct rwlock *rw, const char *file, int line)
470{
471	struct turnstile *ts;
472	uintptr_t x, v, queue;
473
474	KASSERT(rw->rw_lock != RW_DESTROYED,
475	    ("rw_runlock() of destroyed rwlock @ %s:%d", file, line));
476	_rw_assert(rw, RA_RLOCKED, file, line);
477	curthread->td_locks--;
478	curthread->td_rw_rlocks--;
479	WITNESS_UNLOCK(&rw->lock_object, 0, file, line);
480	LOCK_LOG_LOCK("RUNLOCK", &rw->lock_object, 0, 0, file, line);
481
482	/* TODO: drop "owner of record" here. */
483
484	for (;;) {
485		/*
486		 * See if there is more than one read lock held.  If so,
487		 * just drop one and return.
488		 */
489		x = rw->rw_lock;
490		if (RW_READERS(x) > 1) {
491			if (atomic_cmpset_ptr(&rw->rw_lock, x,
492			    x - RW_ONE_READER)) {
493				if (LOCK_LOG_TEST(&rw->lock_object, 0))
494					CTR4(KTR_LOCK,
495					    "%s: %p succeeded %p -> %p",
496					    __func__, rw, (void *)x,
497					    (void *)(x - RW_ONE_READER));
498				break;
499			}
500			continue;
501		}
502		/*
503		 * If there aren't any waiters for a write lock, then try
504		 * to drop it quickly.
505		 */
506		if (!(x & RW_LOCK_WAITERS)) {
507			MPASS((x & ~RW_LOCK_WRITE_SPINNER) ==
508			    RW_READERS_LOCK(1));
509			if (atomic_cmpset_ptr(&rw->rw_lock, x, RW_UNLOCKED)) {
510				if (LOCK_LOG_TEST(&rw->lock_object, 0))
511					CTR2(KTR_LOCK, "%s: %p last succeeded",
512					    __func__, rw);
513				break;
514			}
515			continue;
516		}
517		/*
518		 * Ok, we know we have waiters and we think we are the
519		 * last reader, so grab the turnstile lock.
520		 */
521		turnstile_chain_lock(&rw->lock_object);
522		v = rw->rw_lock & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
523		MPASS(v & RW_LOCK_WAITERS);
524
525		/*
526		 * Try to drop our lock leaving the lock in a unlocked
527		 * state.
528		 *
529		 * If you wanted to do explicit lock handoff you'd have to
530		 * do it here.  You'd also want to use turnstile_signal()
531		 * and you'd have to handle the race where a higher
532		 * priority thread blocks on the write lock before the
533		 * thread you wakeup actually runs and have the new thread
534		 * "steal" the lock.  For now it's a lot simpler to just
535		 * wakeup all of the waiters.
536		 *
537		 * As above, if we fail, then another thread might have
538		 * acquired a read lock, so drop the turnstile lock and
539		 * restart.
540		 */
541		x = RW_UNLOCKED;
542		if (v & RW_LOCK_WRITE_WAITERS) {
543			queue = TS_EXCLUSIVE_QUEUE;
544			x |= (v & RW_LOCK_READ_WAITERS);
545		} else
546			queue = TS_SHARED_QUEUE;
547		if (!atomic_cmpset_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v,
548		    x)) {
549			turnstile_chain_unlock(&rw->lock_object);
550			continue;
551		}
552		if (LOCK_LOG_TEST(&rw->lock_object, 0))
553			CTR2(KTR_LOCK, "%s: %p last succeeded with waiters",
554			    __func__, rw);
555
556		/*
557		 * Ok.  The lock is released and all that's left is to
558		 * wake up the waiters.  Note that the lock might not be
559		 * free anymore, but in that case the writers will just
560		 * block again if they run before the new lock holder(s)
561		 * release the lock.
562		 */
563		ts = turnstile_lookup(&rw->lock_object);
564		MPASS(ts != NULL);
565		turnstile_broadcast(ts, queue);
566		turnstile_unpend(ts, TS_SHARED_LOCK);
567		turnstile_chain_unlock(&rw->lock_object);
568		break;
569	}
570	lock_profile_release_lock(&rw->lock_object);
571}
572
573/*
574 * This function is called when we are unable to obtain a write lock on the
575 * first try.  This means that at least one other thread holds either a
576 * read or write lock.
577 */
578void
579_rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
580{
581	struct turnstile *ts;
582#ifdef ADAPTIVE_RWLOCKS
583	volatile struct thread *owner;
584	int spintries = 0;
585	int i;
586#endif
587	uint64_t waittime = 0;
588	uintptr_t v, x;
589	int contested = 0;
590
591	if (rw_wlocked(rw)) {
592		KASSERT(rw->lock_object.lo_flags & RW_RECURSE,
593		    ("%s: recursing but non-recursive rw %s @ %s:%d\n",
594		    __func__, rw->lock_object.lo_name, file, line));
595		rw->rw_recurse++;
596		if (LOCK_LOG_TEST(&rw->lock_object, 0))
597			CTR2(KTR_LOCK, "%s: %p recursing", __func__, rw);
598		return;
599	}
600
601	if (LOCK_LOG_TEST(&rw->lock_object, 0))
602		CTR5(KTR_LOCK, "%s: %s contested (lock=%p) at %s:%d", __func__,
603		    rw->lock_object.lo_name, (void *)rw->rw_lock, file, line);
604
605	while (!_rw_write_lock(rw, tid)) {
606		lock_profile_obtain_lock_failed(&rw->lock_object,
607		    &contested, &waittime);
608#ifdef ADAPTIVE_RWLOCKS
609		/*
610		 * If the lock is write locked and the owner is
611		 * running on another CPU, spin until the owner stops
612		 * running or the state of the lock changes.
613		 */
614		v = rw->rw_lock;
615		owner = (struct thread *)RW_OWNER(v);
616		if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
617			if (LOCK_LOG_TEST(&rw->lock_object, 0))
618				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
619				    __func__, rw, owner);
620			while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
621			    TD_IS_RUNNING(owner))
622				cpu_spinwait();
623			continue;
624		}
625		if ((v & RW_LOCK_READ) && RW_READERS(v) &&
626		    spintries < rowner_retries) {
627			if (!(v & RW_LOCK_WRITE_SPINNER)) {
628				if (!atomic_cmpset_ptr(&rw->rw_lock, v,
629				    v | RW_LOCK_WRITE_SPINNER)) {
630					cpu_spinwait();
631					continue;
632				}
633			}
634			spintries++;
635			for (i = 0; i < rowner_loops; i++) {
636				if ((rw->rw_lock & RW_LOCK_WRITE_SPINNER) == 0)
637					break;
638				cpu_spinwait();
639			}
640			if (i != rowner_loops)
641				continue;
642		}
643#endif
644		ts = turnstile_trywait(&rw->lock_object);
645		v = rw->rw_lock;
646
647#ifdef ADAPTIVE_RWLOCKS
648		/*
649		 * If the current owner of the lock is executing on another
650		 * CPU quit the hard path and try to spin.
651		 */
652		if (!(v & RW_LOCK_READ)) {
653			owner = (struct thread *)RW_OWNER(v);
654			if (TD_IS_RUNNING(owner)) {
655				turnstile_cancel(ts);
656				cpu_spinwait();
657				continue;
658			}
659		}
660#endif
661		/*
662		 * Check for the waiters flags about this rwlock.
663		 * If the lock was released, without maintain any pending
664		 * waiters queue, simply try to acquire it.
665		 * If a pending waiters queue is present, claim the lock
666		 * ownership and maintain the pending queue.
667		 */
668		x = v & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
669		if ((v & ~x) == RW_UNLOCKED) {
670			x &= ~RW_LOCK_WRITE_SPINNER;
671			if (atomic_cmpset_acq_ptr(&rw->rw_lock, v, tid | x)) {
672				if (x)
673					turnstile_claim(ts);
674				else
675					turnstile_cancel(ts);
676				break;
677			}
678			turnstile_cancel(ts);
679			cpu_spinwait();
680			continue;
681		}
682		/*
683		 * If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
684		 * set it.  If we fail to set it, then loop back and try
685		 * again.
686		 */
687		if (!(v & RW_LOCK_WRITE_WAITERS)) {
688			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
689			    v | RW_LOCK_WRITE_WAITERS)) {
690				turnstile_cancel(ts);
691				cpu_spinwait();
692				continue;
693			}
694			if (LOCK_LOG_TEST(&rw->lock_object, 0))
695				CTR2(KTR_LOCK, "%s: %p set write waiters flag",
696				    __func__, rw);
697		}
698		/*
699		 * We were unable to acquire the lock and the write waiters
700		 * flag is set, so we must block on the turnstile.
701		 */
702		if (LOCK_LOG_TEST(&rw->lock_object, 0))
703			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
704			    rw);
705		turnstile_wait(ts, rw_owner(rw), TS_EXCLUSIVE_QUEUE);
706		if (LOCK_LOG_TEST(&rw->lock_object, 0))
707			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
708			    __func__, rw);
709#ifdef ADAPTIVE_RWLOCKS
710		spintries = 0;
711#endif
712	}
713	lock_profile_obtain_lock_success(&rw->lock_object, contested, waittime,
714	    file, line);
715}
716
717/*
718 * This function is called if the first try at releasing a write lock failed.
719 * This means that one of the 2 waiter bits must be set indicating that at
720 * least one thread is waiting on this lock.
721 */
722void
723_rw_wunlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
724{
725	struct turnstile *ts;
726	uintptr_t v;
727	int queue;
728
729	if (rw_wlocked(rw) && rw_recursed(rw)) {
730		rw->rw_recurse--;
731		if (LOCK_LOG_TEST(&rw->lock_object, 0))
732			CTR2(KTR_LOCK, "%s: %p unrecursing", __func__, rw);
733		return;
734	}
735
736	KASSERT(rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS),
737	    ("%s: neither of the waiter flags are set", __func__));
738
739	if (LOCK_LOG_TEST(&rw->lock_object, 0))
740		CTR2(KTR_LOCK, "%s: %p contested", __func__, rw);
741
742	turnstile_chain_lock(&rw->lock_object);
743	ts = turnstile_lookup(&rw->lock_object);
744	MPASS(ts != NULL);
745
746	/*
747	 * Use the same algo as sx locks for now.  Prefer waking up shared
748	 * waiters if we have any over writers.  This is probably not ideal.
749	 *
750	 * 'v' is the value we are going to write back to rw_lock.  If we
751	 * have waiters on both queues, we need to preserve the state of
752	 * the waiter flag for the queue we don't wake up.  For now this is
753	 * hardcoded for the algorithm mentioned above.
754	 *
755	 * In the case of both readers and writers waiting we wakeup the
756	 * readers but leave the RW_LOCK_WRITE_WAITERS flag set.  If a
757	 * new writer comes in before a reader it will claim the lock up
758	 * above.  There is probably a potential priority inversion in
759	 * there that could be worked around either by waking both queues
760	 * of waiters or doing some complicated lock handoff gymnastics.
761	 */
762	v = RW_UNLOCKED;
763	if (rw->rw_lock & RW_LOCK_WRITE_WAITERS) {
764		queue = TS_EXCLUSIVE_QUEUE;
765		v |= (rw->rw_lock & RW_LOCK_READ_WAITERS);
766	} else
767		queue = TS_SHARED_QUEUE;
768
769	/* Wake up all waiters for the specific queue. */
770	if (LOCK_LOG_TEST(&rw->lock_object, 0))
771		CTR3(KTR_LOCK, "%s: %p waking up %s waiters", __func__, rw,
772		    queue == TS_SHARED_QUEUE ? "read" : "write");
773	turnstile_broadcast(ts, queue);
774	atomic_store_rel_ptr(&rw->rw_lock, v);
775	turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
776	turnstile_chain_unlock(&rw->lock_object);
777}
778
779/*
780 * Attempt to do a non-blocking upgrade from a read lock to a write
781 * lock.  This will only succeed if this thread holds a single read
782 * lock.  Returns true if the upgrade succeeded and false otherwise.
783 */
784int
785_rw_try_upgrade(struct rwlock *rw, const char *file, int line)
786{
787	uintptr_t v, x, tid;
788	struct turnstile *ts;
789	int success;
790
791	KASSERT(rw->rw_lock != RW_DESTROYED,
792	    ("rw_try_upgrade() of destroyed rwlock @ %s:%d", file, line));
793	_rw_assert(rw, RA_RLOCKED, file, line);
794
795	/*
796	 * Attempt to switch from one reader to a writer.  If there
797	 * are any write waiters, then we will have to lock the
798	 * turnstile first to prevent races with another writer
799	 * calling turnstile_wait() before we have claimed this
800	 * turnstile.  So, do the simple case of no waiters first.
801	 */
802	tid = (uintptr_t)curthread;
803	success = 0;
804	for (;;) {
805		v = rw->rw_lock;
806		if (RW_READERS(v) > 1)
807			break;
808		if (!(v & RW_LOCK_WAITERS)) {
809			success = atomic_cmpset_ptr(&rw->rw_lock, v, tid);
810			if (!success)
811				continue;
812			break;
813		}
814
815		/*
816		 * Ok, we think we have waiters, so lock the turnstile.
817		 */
818		ts = turnstile_trywait(&rw->lock_object);
819		v = rw->rw_lock;
820		if (RW_READERS(v) > 1) {
821			turnstile_cancel(ts);
822			break;
823		}
824		/*
825		 * Try to switch from one reader to a writer again.  This time
826		 * we honor the current state of the waiters flags.
827		 * If we obtain the lock with the flags set, then claim
828		 * ownership of the turnstile.
829		 */
830		x = rw->rw_lock & RW_LOCK_WAITERS;
831		success = atomic_cmpset_ptr(&rw->rw_lock, v, tid | x);
832		if (success) {
833			if (x)
834				turnstile_claim(ts);
835			else
836				turnstile_cancel(ts);
837			break;
838		}
839		turnstile_cancel(ts);
840	}
841	LOCK_LOG_TRY("WUPGRADE", &rw->lock_object, 0, success, file, line);
842	if (success) {
843		curthread->td_rw_rlocks--;
844		WITNESS_UPGRADE(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
845		    file, line);
846	}
847	return (success);
848}
849
850/*
851 * Downgrade a write lock into a single read lock.
852 */
853void
854_rw_downgrade(struct rwlock *rw, const char *file, int line)
855{
856	struct turnstile *ts;
857	uintptr_t tid, v;
858	int rwait, wwait;
859
860	KASSERT(rw->rw_lock != RW_DESTROYED,
861	    ("rw_downgrade() of destroyed rwlock @ %s:%d", file, line));
862	_rw_assert(rw, RA_WLOCKED | RA_NOTRECURSED, file, line);
863#ifndef INVARIANTS
864	if (rw_recursed(rw))
865		panic("downgrade of a recursed lock");
866#endif
867
868	WITNESS_DOWNGRADE(&rw->lock_object, 0, file, line);
869
870	/*
871	 * Convert from a writer to a single reader.  First we handle
872	 * the easy case with no waiters.  If there are any waiters, we
873	 * lock the turnstile and "disown" the lock.
874	 */
875	tid = (uintptr_t)curthread;
876	if (atomic_cmpset_rel_ptr(&rw->rw_lock, tid, RW_READERS_LOCK(1)))
877		goto out;
878
879	/*
880	 * Ok, we think we have waiters, so lock the turnstile so we can
881	 * read the waiter flags without any races.
882	 */
883	turnstile_chain_lock(&rw->lock_object);
884	v = rw->rw_lock & RW_LOCK_WAITERS;
885	rwait = v & RW_LOCK_READ_WAITERS;
886	wwait = v & RW_LOCK_WRITE_WAITERS;
887	MPASS(rwait | wwait);
888
889	/*
890	 * Downgrade from a write lock while preserving waiters flag
891	 * and give up ownership of the turnstile.
892	 */
893	ts = turnstile_lookup(&rw->lock_object);
894	MPASS(ts != NULL);
895	if (!wwait)
896		v &= ~RW_LOCK_READ_WAITERS;
897	atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v);
898	/*
899	 * Wake other readers if there are no writers pending.  Otherwise they
900	 * won't be able to acquire the lock anyway.
901	 */
902	if (rwait && !wwait) {
903		turnstile_broadcast(ts, TS_SHARED_QUEUE);
904		turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
905	} else
906		turnstile_disown(ts);
907	turnstile_chain_unlock(&rw->lock_object);
908out:
909	curthread->td_rw_rlocks++;
910	LOCK_LOG_LOCK("WDOWNGRADE", &rw->lock_object, 0, 0, file, line);
911}
912
913#ifdef INVARIANT_SUPPORT
914#ifndef INVARIANTS
915#undef _rw_assert
916#endif
917
918/*
919 * In the non-WITNESS case, rw_assert() can only detect that at least
920 * *some* thread owns an rlock, but it cannot guarantee that *this*
921 * thread owns an rlock.
922 */
923void
924_rw_assert(struct rwlock *rw, int what, const char *file, int line)
925{
926
927	if (panicstr != NULL)
928		return;
929	switch (what) {
930	case RA_LOCKED:
931	case RA_LOCKED | RA_RECURSED:
932	case RA_LOCKED | RA_NOTRECURSED:
933	case RA_RLOCKED:
934#ifdef WITNESS
935		witness_assert(&rw->lock_object, what, file, line);
936#else
937		/*
938		 * If some other thread has a write lock or we have one
939		 * and are asserting a read lock, fail.  Also, if no one
940		 * has a lock at all, fail.
941		 */
942		if (rw->rw_lock == RW_UNLOCKED ||
943		    (!(rw->rw_lock & RW_LOCK_READ) && (what == RA_RLOCKED ||
944		    rw_wowner(rw) != curthread)))
945			panic("Lock %s not %slocked @ %s:%d\n",
946			    rw->lock_object.lo_name, (what == RA_RLOCKED) ?
947			    "read " : "", file, line);
948
949		if (!(rw->rw_lock & RW_LOCK_READ)) {
950			if (rw_recursed(rw)) {
951				if (what & RA_NOTRECURSED)
952					panic("Lock %s recursed @ %s:%d\n",
953					    rw->lock_object.lo_name, file,
954					    line);
955			} else if (what & RA_RECURSED)
956				panic("Lock %s not recursed @ %s:%d\n",
957				    rw->lock_object.lo_name, file, line);
958		}
959#endif
960		break;
961	case RA_WLOCKED:
962	case RA_WLOCKED | RA_RECURSED:
963	case RA_WLOCKED | RA_NOTRECURSED:
964		if (rw_wowner(rw) != curthread)
965			panic("Lock %s not exclusively locked @ %s:%d\n",
966			    rw->lock_object.lo_name, file, line);
967		if (rw_recursed(rw)) {
968			if (what & RA_NOTRECURSED)
969				panic("Lock %s recursed @ %s:%d\n",
970				    rw->lock_object.lo_name, file, line);
971		} else if (what & RA_RECURSED)
972			panic("Lock %s not recursed @ %s:%d\n",
973			    rw->lock_object.lo_name, file, line);
974		break;
975	case RA_UNLOCKED:
976#ifdef WITNESS
977		witness_assert(&rw->lock_object, what, file, line);
978#else
979		/*
980		 * If we hold a write lock fail.  We can't reliably check
981		 * to see if we hold a read lock or not.
982		 */
983		if (rw_wowner(rw) == curthread)
984			panic("Lock %s exclusively locked @ %s:%d\n",
985			    rw->lock_object.lo_name, file, line);
986#endif
987		break;
988	default:
989		panic("Unknown rw lock assertion: %d @ %s:%d", what, file,
990		    line);
991	}
992}
993#endif /* INVARIANT_SUPPORT */
994
995#ifdef DDB
996void
997db_show_rwlock(struct lock_object *lock)
998{
999	struct rwlock *rw;
1000	struct thread *td;
1001
1002	rw = (struct rwlock *)lock;
1003
1004	db_printf(" state: ");
1005	if (rw->rw_lock == RW_UNLOCKED)
1006		db_printf("UNLOCKED\n");
1007	else if (rw->rw_lock == RW_DESTROYED) {
1008		db_printf("DESTROYED\n");
1009		return;
1010	} else if (rw->rw_lock & RW_LOCK_READ)
1011		db_printf("RLOCK: %ju locks\n",
1012		    (uintmax_t)(RW_READERS(rw->rw_lock)));
1013	else {
1014		td = rw_wowner(rw);
1015		db_printf("WLOCK: %p (tid %d, pid %d, \"%s\")\n", td,
1016		    td->td_tid, td->td_proc->p_pid, td->td_name);
1017		if (rw_recursed(rw))
1018			db_printf(" recursed: %u\n", rw->rw_recurse);
1019	}
1020	db_printf(" waiters: ");
1021	switch (rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS)) {
1022	case RW_LOCK_READ_WAITERS:
1023		db_printf("readers\n");
1024		break;
1025	case RW_LOCK_WRITE_WAITERS:
1026		db_printf("writers\n");
1027		break;
1028	case RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS:
1029		db_printf("readers and writers\n");
1030		break;
1031	default:
1032		db_printf("none\n");
1033		break;
1034	}
1035}
1036
1037#endif
1038