kern_rwlock.c revision 242515
1/*-
2 * Copyright (c) 2006 John Baldwin <jhb@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the author nor the names of any co-contributors
14 *    may be used to endorse or promote products derived from this software
15 *    without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30/*
31 * Machine independent bits of reader/writer lock implementation.
32 */
33
34#include <sys/cdefs.h>
35__FBSDID("$FreeBSD: head/sys/kern/kern_rwlock.c 242515 2012-11-03 15:57:37Z attilio $");
36
37#include "opt_ddb.h"
38#include "opt_hwpmc_hooks.h"
39#include "opt_kdtrace.h"
40#include "opt_no_adaptive_rwlocks.h"
41
42#include <sys/param.h>
43#include <sys/ktr.h>
44#include <sys/kernel.h>
45#include <sys/lock.h>
46#include <sys/mutex.h>
47#include <sys/proc.h>
48#include <sys/rwlock.h>
49#include <sys/sysctl.h>
50#include <sys/systm.h>
51#include <sys/turnstile.h>
52
53#include <machine/cpu.h>
54
55#if defined(SMP) && !defined(NO_ADAPTIVE_RWLOCKS)
56#define	ADAPTIVE_RWLOCKS
57#endif
58
59#ifdef HWPMC_HOOKS
60#include <sys/pmckern.h>
61PMC_SOFT_DECLARE( , , lock, failed);
62#endif
63
64/*
65 * Return the rwlock address when the lock cookie address is provided.
66 * This functionality assumes that struct rwlock* have a member named rw_lock.
67 */
68#define	rwlock2rw(c)	(__containerof(c, struct rwlock, rw_lock))
69
70#ifdef ADAPTIVE_RWLOCKS
71static int rowner_retries = 10;
72static int rowner_loops = 10000;
73static SYSCTL_NODE(_debug, OID_AUTO, rwlock, CTLFLAG_RD, NULL,
74    "rwlock debugging");
75SYSCTL_INT(_debug_rwlock, OID_AUTO, retry, CTLFLAG_RW, &rowner_retries, 0, "");
76SYSCTL_INT(_debug_rwlock, OID_AUTO, loops, CTLFLAG_RW, &rowner_loops, 0, "");
77#endif
78
79#ifdef DDB
80#include <ddb/ddb.h>
81
82static void	db_show_rwlock(const struct lock_object *lock);
83#endif
84static void	assert_rw(const struct lock_object *lock, int what);
85static void	lock_rw(struct lock_object *lock, int how);
86#ifdef KDTRACE_HOOKS
87static int	owner_rw(const struct lock_object *lock, struct thread **owner);
88#endif
89static int	unlock_rw(struct lock_object *lock);
90
91struct lock_class lock_class_rw = {
92	.lc_name = "rw",
93	.lc_flags = LC_SLEEPLOCK | LC_RECURSABLE | LC_UPGRADABLE,
94	.lc_assert = assert_rw,
95#ifdef DDB
96	.lc_ddb_show = db_show_rwlock,
97#endif
98	.lc_lock = lock_rw,
99	.lc_unlock = unlock_rw,
100#ifdef KDTRACE_HOOKS
101	.lc_owner = owner_rw,
102#endif
103};
104
105/*
106 * Return a pointer to the owning thread if the lock is write-locked or
107 * NULL if the lock is unlocked or read-locked.
108 */
109#define	rw_wowner(rw)							\
110	((rw)->rw_lock & RW_LOCK_READ ? NULL :				\
111	    (struct thread *)RW_OWNER((rw)->rw_lock))
112
113/*
114 * Returns if a write owner is recursed.  Write ownership is not assured
115 * here and should be previously checked.
116 */
117#define	rw_recursed(rw)		((rw)->rw_recurse != 0)
118
119/*
120 * Return true if curthread helds the lock.
121 */
122#define	rw_wlocked(rw)		(rw_wowner((rw)) == curthread)
123
124/*
125 * Return a pointer to the owning thread for this lock who should receive
126 * any priority lent by threads that block on this lock.  Currently this
127 * is identical to rw_wowner().
128 */
129#define	rw_owner(rw)		rw_wowner(rw)
130
131#ifndef INVARIANTS
132#define	__rw_assert(c, what, file, line)
133#endif
134
135void
136assert_rw(const struct lock_object *lock, int what)
137{
138
139	rw_assert((const struct rwlock *)lock, what);
140}
141
142void
143lock_rw(struct lock_object *lock, int how)
144{
145	struct rwlock *rw;
146
147	rw = (struct rwlock *)lock;
148	if (how)
149		rw_wlock(rw);
150	else
151		rw_rlock(rw);
152}
153
154int
155unlock_rw(struct lock_object *lock)
156{
157	struct rwlock *rw;
158
159	rw = (struct rwlock *)lock;
160	rw_assert(rw, RA_LOCKED | LA_NOTRECURSED);
161	if (rw->rw_lock & RW_LOCK_READ) {
162		rw_runlock(rw);
163		return (0);
164	} else {
165		rw_wunlock(rw);
166		return (1);
167	}
168}
169
170#ifdef KDTRACE_HOOKS
171int
172owner_rw(const struct lock_object *lock, struct thread **owner)
173{
174	const struct rwlock *rw = (const struct rwlock *)lock;
175	uintptr_t x = rw->rw_lock;
176
177	*owner = rw_wowner(rw);
178	return ((x & RW_LOCK_READ) != 0 ?  (RW_READERS(x) != 0) :
179	    (*owner != NULL));
180}
181#endif
182
183void
184_rw_init_flags(volatile uintptr_t *c, const char *name, int opts)
185{
186	struct rwlock *rw;
187	int flags;
188
189	rw = rwlock2rw(c);
190
191	MPASS((opts & ~(RW_DUPOK | RW_NOPROFILE | RW_NOWITNESS | RW_QUIET |
192	    RW_RECURSE)) == 0);
193	ASSERT_ATOMIC_LOAD_PTR(rw->rw_lock,
194	    ("%s: rw_lock not aligned for %s: %p", __func__, name,
195	    &rw->rw_lock));
196
197	flags = LO_UPGRADABLE;
198	if (opts & RW_DUPOK)
199		flags |= LO_DUPOK;
200	if (opts & RW_NOPROFILE)
201		flags |= LO_NOPROFILE;
202	if (!(opts & RW_NOWITNESS))
203		flags |= LO_WITNESS;
204	if (opts & RW_RECURSE)
205		flags |= LO_RECURSABLE;
206	if (opts & RW_QUIET)
207		flags |= LO_QUIET;
208
209	rw->rw_lock = RW_UNLOCKED;
210	rw->rw_recurse = 0;
211	lock_init(&rw->lock_object, &lock_class_rw, name, NULL, flags);
212}
213
214void
215_rw_destroy(volatile uintptr_t *c)
216{
217	struct rwlock *rw;
218
219	rw = rwlock2rw(c);
220
221	KASSERT(rw->rw_lock == RW_UNLOCKED, ("rw lock %p not unlocked", rw));
222	KASSERT(rw->rw_recurse == 0, ("rw lock %p still recursed", rw));
223	rw->rw_lock = RW_DESTROYED;
224	lock_destroy(&rw->lock_object);
225}
226
227void
228rw_sysinit(void *arg)
229{
230	struct rw_args *args = arg;
231
232	rw_init((struct rwlock *)args->ra_rw, args->ra_desc);
233}
234
235void
236rw_sysinit_flags(void *arg)
237{
238	struct rw_args_flags *args = arg;
239
240	rw_init_flags((struct rwlock *)args->ra_rw, args->ra_desc,
241	    args->ra_flags);
242}
243
244int
245_rw_wowned(const volatile uintptr_t *c)
246{
247
248	return (rw_wowner(rwlock2rw(c)) == curthread);
249}
250
251void
252_rw_wlock_cookie(volatile uintptr_t *c, const char *file, int line)
253{
254	struct rwlock *rw;
255
256	if (SCHEDULER_STOPPED())
257		return;
258
259	rw = rwlock2rw(c);
260
261	KASSERT(!TD_IS_IDLETHREAD(curthread),
262	    ("rw_wlock() by idle thread %p on rwlock %s @ %s:%d",
263	    curthread, rw->lock_object.lo_name, file, line));
264	KASSERT(rw->rw_lock != RW_DESTROYED,
265	    ("rw_wlock() of destroyed rwlock @ %s:%d", file, line));
266	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER | LOP_EXCLUSIVE, file,
267	    line, NULL);
268	__rw_wlock(rw, curthread, file, line);
269	LOCK_LOG_LOCK("WLOCK", &rw->lock_object, 0, rw->rw_recurse, file, line);
270	WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
271	curthread->td_locks++;
272}
273
274int
275__rw_try_wlock(volatile uintptr_t *c, const char *file, int line)
276{
277	struct rwlock *rw;
278	int rval;
279
280	if (SCHEDULER_STOPPED())
281		return (1);
282
283	rw = rwlock2rw(c);
284
285	KASSERT(!TD_IS_IDLETHREAD(curthread),
286	    ("rw_try_wlock() by idle thread %p on rwlock %s @ %s:%d",
287	    curthread, rw->lock_object.lo_name, file, line));
288	KASSERT(rw->rw_lock != RW_DESTROYED,
289	    ("rw_try_wlock() of destroyed rwlock @ %s:%d", file, line));
290
291	if (rw_wlocked(rw) &&
292	    (rw->lock_object.lo_flags & LO_RECURSABLE) != 0) {
293		rw->rw_recurse++;
294		rval = 1;
295	} else
296		rval = atomic_cmpset_acq_ptr(&rw->rw_lock, RW_UNLOCKED,
297		    (uintptr_t)curthread);
298
299	LOCK_LOG_TRY("WLOCK", &rw->lock_object, 0, rval, file, line);
300	if (rval) {
301		WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
302		    file, line);
303		curthread->td_locks++;
304	}
305	return (rval);
306}
307
308void
309_rw_wunlock_cookie(volatile uintptr_t *c, const char *file, int line)
310{
311	struct rwlock *rw;
312
313	if (SCHEDULER_STOPPED())
314		return;
315
316	rw = rwlock2rw(c);
317
318	KASSERT(rw->rw_lock != RW_DESTROYED,
319	    ("rw_wunlock() of destroyed rwlock @ %s:%d", file, line));
320	__rw_assert(c, RA_WLOCKED, file, line);
321	curthread->td_locks--;
322	WITNESS_UNLOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
323	LOCK_LOG_LOCK("WUNLOCK", &rw->lock_object, 0, rw->rw_recurse, file,
324	    line);
325	if (!rw_recursed(rw))
326		LOCKSTAT_PROFILE_RELEASE_LOCK(LS_RW_WUNLOCK_RELEASE, rw);
327	__rw_wunlock(rw, curthread, file, line);
328}
329/*
330 * Determines whether a new reader can acquire a lock.  Succeeds if the
331 * reader already owns a read lock and the lock is locked for read to
332 * prevent deadlock from reader recursion.  Also succeeds if the lock
333 * is unlocked and has no writer waiters or spinners.  Failing otherwise
334 * prioritizes writers before readers.
335 */
336#define	RW_CAN_READ(_rw)						\
337    ((curthread->td_rw_rlocks && (_rw) & RW_LOCK_READ) || ((_rw) &	\
338    (RW_LOCK_READ | RW_LOCK_WRITE_WAITERS | RW_LOCK_WRITE_SPINNER)) ==	\
339    RW_LOCK_READ)
340
341void
342__rw_rlock(volatile uintptr_t *c, const char *file, int line)
343{
344	struct rwlock *rw;
345	struct turnstile *ts;
346#ifdef ADAPTIVE_RWLOCKS
347	volatile struct thread *owner;
348	int spintries = 0;
349	int i;
350#endif
351#ifdef LOCK_PROFILING
352	uint64_t waittime = 0;
353	int contested = 0;
354#endif
355	uintptr_t v;
356#ifdef KDTRACE_HOOKS
357	uint64_t spin_cnt = 0;
358	uint64_t sleep_cnt = 0;
359	int64_t sleep_time = 0;
360#endif
361
362	if (SCHEDULER_STOPPED())
363		return;
364
365	rw = rwlock2rw(c);
366
367	KASSERT(!TD_IS_IDLETHREAD(curthread),
368	    ("rw_rlock() by idle thread %p on rwlock %s @ %s:%d",
369	    curthread, rw->lock_object.lo_name, file, line));
370	KASSERT(rw->rw_lock != RW_DESTROYED,
371	    ("rw_rlock() of destroyed rwlock @ %s:%d", file, line));
372	KASSERT(rw_wowner(rw) != curthread,
373	    ("%s (%s): wlock already held @ %s:%d", __func__,
374	    rw->lock_object.lo_name, file, line));
375	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER, file, line, NULL);
376
377	for (;;) {
378#ifdef KDTRACE_HOOKS
379		spin_cnt++;
380#endif
381		/*
382		 * Handle the easy case.  If no other thread has a write
383		 * lock, then try to bump up the count of read locks.  Note
384		 * that we have to preserve the current state of the
385		 * RW_LOCK_WRITE_WAITERS flag.  If we fail to acquire a
386		 * read lock, then rw_lock must have changed, so restart
387		 * the loop.  Note that this handles the case of a
388		 * completely unlocked rwlock since such a lock is encoded
389		 * as a read lock with no waiters.
390		 */
391		v = rw->rw_lock;
392		if (RW_CAN_READ(v)) {
393			/*
394			 * The RW_LOCK_READ_WAITERS flag should only be set
395			 * if the lock has been unlocked and write waiters
396			 * were present.
397			 */
398			if (atomic_cmpset_acq_ptr(&rw->rw_lock, v,
399			    v + RW_ONE_READER)) {
400				if (LOCK_LOG_TEST(&rw->lock_object, 0))
401					CTR4(KTR_LOCK,
402					    "%s: %p succeed %p -> %p", __func__,
403					    rw, (void *)v,
404					    (void *)(v + RW_ONE_READER));
405				break;
406			}
407			continue;
408		}
409#ifdef HWPMC_HOOKS
410		PMC_SOFT_CALL( , , lock, failed);
411#endif
412		lock_profile_obtain_lock_failed(&rw->lock_object,
413		    &contested, &waittime);
414
415#ifdef ADAPTIVE_RWLOCKS
416		/*
417		 * If the owner is running on another CPU, spin until
418		 * the owner stops running or the state of the lock
419		 * changes.
420		 */
421		if ((v & RW_LOCK_READ) == 0) {
422			owner = (struct thread *)RW_OWNER(v);
423			if (TD_IS_RUNNING(owner)) {
424				if (LOCK_LOG_TEST(&rw->lock_object, 0))
425					CTR3(KTR_LOCK,
426					    "%s: spinning on %p held by %p",
427					    __func__, rw, owner);
428				while ((struct thread*)RW_OWNER(rw->rw_lock) ==
429				    owner && TD_IS_RUNNING(owner)) {
430					cpu_spinwait();
431#ifdef KDTRACE_HOOKS
432					spin_cnt++;
433#endif
434				}
435				continue;
436			}
437		} else if (spintries < rowner_retries) {
438			spintries++;
439			for (i = 0; i < rowner_loops; i++) {
440				v = rw->rw_lock;
441				if ((v & RW_LOCK_READ) == 0 || RW_CAN_READ(v))
442					break;
443				cpu_spinwait();
444			}
445			if (i != rowner_loops)
446				continue;
447		}
448#endif
449
450		/*
451		 * Okay, now it's the hard case.  Some other thread already
452		 * has a write lock or there are write waiters present,
453		 * acquire the turnstile lock so we can begin the process
454		 * of blocking.
455		 */
456		ts = turnstile_trywait(&rw->lock_object);
457
458		/*
459		 * The lock might have been released while we spun, so
460		 * recheck its state and restart the loop if needed.
461		 */
462		v = rw->rw_lock;
463		if (RW_CAN_READ(v)) {
464			turnstile_cancel(ts);
465			continue;
466		}
467
468#ifdef ADAPTIVE_RWLOCKS
469		/*
470		 * The current lock owner might have started executing
471		 * on another CPU (or the lock could have changed
472		 * owners) while we were waiting on the turnstile
473		 * chain lock.  If so, drop the turnstile lock and try
474		 * again.
475		 */
476		if ((v & RW_LOCK_READ) == 0) {
477			owner = (struct thread *)RW_OWNER(v);
478			if (TD_IS_RUNNING(owner)) {
479				turnstile_cancel(ts);
480				continue;
481			}
482		}
483#endif
484
485		/*
486		 * The lock is held in write mode or it already has waiters.
487		 */
488		MPASS(!RW_CAN_READ(v));
489
490		/*
491		 * If the RW_LOCK_READ_WAITERS flag is already set, then
492		 * we can go ahead and block.  If it is not set then try
493		 * to set it.  If we fail to set it drop the turnstile
494		 * lock and restart the loop.
495		 */
496		if (!(v & RW_LOCK_READ_WAITERS)) {
497			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
498			    v | RW_LOCK_READ_WAITERS)) {
499				turnstile_cancel(ts);
500				continue;
501			}
502			if (LOCK_LOG_TEST(&rw->lock_object, 0))
503				CTR2(KTR_LOCK, "%s: %p set read waiters flag",
504				    __func__, rw);
505		}
506
507		/*
508		 * We were unable to acquire the lock and the read waiters
509		 * flag is set, so we must block on the turnstile.
510		 */
511		if (LOCK_LOG_TEST(&rw->lock_object, 0))
512			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
513			    rw);
514#ifdef KDTRACE_HOOKS
515		sleep_time -= lockstat_nsecs();
516#endif
517		turnstile_wait(ts, rw_owner(rw), TS_SHARED_QUEUE);
518#ifdef KDTRACE_HOOKS
519		sleep_time += lockstat_nsecs();
520		sleep_cnt++;
521#endif
522		if (LOCK_LOG_TEST(&rw->lock_object, 0))
523			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
524			    __func__, rw);
525	}
526
527	/*
528	 * TODO: acquire "owner of record" here.  Here be turnstile dragons
529	 * however.  turnstiles don't like owners changing between calls to
530	 * turnstile_wait() currently.
531	 */
532	LOCKSTAT_PROFILE_OBTAIN_LOCK_SUCCESS(LS_RW_RLOCK_ACQUIRE, rw, contested,
533	    waittime, file, line);
534	LOCK_LOG_LOCK("RLOCK", &rw->lock_object, 0, 0, file, line);
535	WITNESS_LOCK(&rw->lock_object, 0, file, line);
536	curthread->td_locks++;
537	curthread->td_rw_rlocks++;
538#ifdef KDTRACE_HOOKS
539	if (sleep_time)
540		LOCKSTAT_RECORD1(LS_RW_RLOCK_BLOCK, rw, sleep_time);
541
542	/*
543	 * Record only the loops spinning and not sleeping.
544	 */
545	if (spin_cnt > sleep_cnt)
546		LOCKSTAT_RECORD1(LS_RW_RLOCK_SPIN, rw, (spin_cnt - sleep_cnt));
547#endif
548}
549
550int
551__rw_try_rlock(volatile uintptr_t *c, const char *file, int line)
552{
553	struct rwlock *rw;
554	uintptr_t x;
555
556	if (SCHEDULER_STOPPED())
557		return (1);
558
559	rw = rwlock2rw(c);
560
561	KASSERT(!TD_IS_IDLETHREAD(curthread),
562	    ("rw_try_rlock() by idle thread %p on rwlock %s @ %s:%d",
563	    curthread, rw->lock_object.lo_name, file, line));
564
565	for (;;) {
566		x = rw->rw_lock;
567		KASSERT(rw->rw_lock != RW_DESTROYED,
568		    ("rw_try_rlock() of destroyed rwlock @ %s:%d", file, line));
569		if (!(x & RW_LOCK_READ))
570			break;
571		if (atomic_cmpset_acq_ptr(&rw->rw_lock, x, x + RW_ONE_READER)) {
572			LOCK_LOG_TRY("RLOCK", &rw->lock_object, 0, 1, file,
573			    line);
574			WITNESS_LOCK(&rw->lock_object, LOP_TRYLOCK, file, line);
575			curthread->td_locks++;
576			curthread->td_rw_rlocks++;
577			return (1);
578		}
579	}
580
581	LOCK_LOG_TRY("RLOCK", &rw->lock_object, 0, 0, file, line);
582	return (0);
583}
584
585void
586_rw_runlock_cookie(volatile uintptr_t *c, const char *file, int line)
587{
588	struct rwlock *rw;
589	struct turnstile *ts;
590	uintptr_t x, v, queue;
591
592	if (SCHEDULER_STOPPED())
593		return;
594
595	rw = rwlock2rw(c);
596
597	KASSERT(rw->rw_lock != RW_DESTROYED,
598	    ("rw_runlock() of destroyed rwlock @ %s:%d", file, line));
599	__rw_assert(c, RA_RLOCKED, file, line);
600	curthread->td_locks--;
601	curthread->td_rw_rlocks--;
602	WITNESS_UNLOCK(&rw->lock_object, 0, file, line);
603	LOCK_LOG_LOCK("RUNLOCK", &rw->lock_object, 0, 0, file, line);
604
605	/* TODO: drop "owner of record" here. */
606
607	for (;;) {
608		/*
609		 * See if there is more than one read lock held.  If so,
610		 * just drop one and return.
611		 */
612		x = rw->rw_lock;
613		if (RW_READERS(x) > 1) {
614			if (atomic_cmpset_rel_ptr(&rw->rw_lock, x,
615			    x - RW_ONE_READER)) {
616				if (LOCK_LOG_TEST(&rw->lock_object, 0))
617					CTR4(KTR_LOCK,
618					    "%s: %p succeeded %p -> %p",
619					    __func__, rw, (void *)x,
620					    (void *)(x - RW_ONE_READER));
621				break;
622			}
623			continue;
624		}
625		/*
626		 * If there aren't any waiters for a write lock, then try
627		 * to drop it quickly.
628		 */
629		if (!(x & RW_LOCK_WAITERS)) {
630			MPASS((x & ~RW_LOCK_WRITE_SPINNER) ==
631			    RW_READERS_LOCK(1));
632			if (atomic_cmpset_rel_ptr(&rw->rw_lock, x,
633			    RW_UNLOCKED)) {
634				if (LOCK_LOG_TEST(&rw->lock_object, 0))
635					CTR2(KTR_LOCK, "%s: %p last succeeded",
636					    __func__, rw);
637				break;
638			}
639			continue;
640		}
641		/*
642		 * Ok, we know we have waiters and we think we are the
643		 * last reader, so grab the turnstile lock.
644		 */
645		turnstile_chain_lock(&rw->lock_object);
646		v = rw->rw_lock & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
647		MPASS(v & RW_LOCK_WAITERS);
648
649		/*
650		 * Try to drop our lock leaving the lock in a unlocked
651		 * state.
652		 *
653		 * If you wanted to do explicit lock handoff you'd have to
654		 * do it here.  You'd also want to use turnstile_signal()
655		 * and you'd have to handle the race where a higher
656		 * priority thread blocks on the write lock before the
657		 * thread you wakeup actually runs and have the new thread
658		 * "steal" the lock.  For now it's a lot simpler to just
659		 * wakeup all of the waiters.
660		 *
661		 * As above, if we fail, then another thread might have
662		 * acquired a read lock, so drop the turnstile lock and
663		 * restart.
664		 */
665		x = RW_UNLOCKED;
666		if (v & RW_LOCK_WRITE_WAITERS) {
667			queue = TS_EXCLUSIVE_QUEUE;
668			x |= (v & RW_LOCK_READ_WAITERS);
669		} else
670			queue = TS_SHARED_QUEUE;
671		if (!atomic_cmpset_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v,
672		    x)) {
673			turnstile_chain_unlock(&rw->lock_object);
674			continue;
675		}
676		if (LOCK_LOG_TEST(&rw->lock_object, 0))
677			CTR2(KTR_LOCK, "%s: %p last succeeded with waiters",
678			    __func__, rw);
679
680		/*
681		 * Ok.  The lock is released and all that's left is to
682		 * wake up the waiters.  Note that the lock might not be
683		 * free anymore, but in that case the writers will just
684		 * block again if they run before the new lock holder(s)
685		 * release the lock.
686		 */
687		ts = turnstile_lookup(&rw->lock_object);
688		MPASS(ts != NULL);
689		turnstile_broadcast(ts, queue);
690		turnstile_unpend(ts, TS_SHARED_LOCK);
691		turnstile_chain_unlock(&rw->lock_object);
692		break;
693	}
694	LOCKSTAT_PROFILE_RELEASE_LOCK(LS_RW_RUNLOCK_RELEASE, rw);
695}
696
697/*
698 * This function is called when we are unable to obtain a write lock on the
699 * first try.  This means that at least one other thread holds either a
700 * read or write lock.
701 */
702void
703__rw_wlock_hard(volatile uintptr_t *c, uintptr_t tid, const char *file,
704    int line)
705{
706	struct rwlock *rw;
707	struct turnstile *ts;
708#ifdef ADAPTIVE_RWLOCKS
709	volatile struct thread *owner;
710	int spintries = 0;
711	int i;
712#endif
713	uintptr_t v, x;
714#ifdef LOCK_PROFILING
715	uint64_t waittime = 0;
716	int contested = 0;
717#endif
718#ifdef KDTRACE_HOOKS
719	uint64_t spin_cnt = 0;
720	uint64_t sleep_cnt = 0;
721	int64_t sleep_time = 0;
722#endif
723
724	if (SCHEDULER_STOPPED())
725		return;
726
727	rw = rwlock2rw(c);
728
729	if (rw_wlocked(rw)) {
730		KASSERT(rw->lock_object.lo_flags & LO_RECURSABLE,
731		    ("%s: recursing but non-recursive rw %s @ %s:%d\n",
732		    __func__, rw->lock_object.lo_name, file, line));
733		rw->rw_recurse++;
734		if (LOCK_LOG_TEST(&rw->lock_object, 0))
735			CTR2(KTR_LOCK, "%s: %p recursing", __func__, rw);
736		return;
737	}
738
739	if (LOCK_LOG_TEST(&rw->lock_object, 0))
740		CTR5(KTR_LOCK, "%s: %s contested (lock=%p) at %s:%d", __func__,
741		    rw->lock_object.lo_name, (void *)rw->rw_lock, file, line);
742
743	while (!_rw_write_lock(rw, tid)) {
744#ifdef KDTRACE_HOOKS
745		spin_cnt++;
746#endif
747#ifdef HWPMC_HOOKS
748		PMC_SOFT_CALL( , , lock, failed);
749#endif
750		lock_profile_obtain_lock_failed(&rw->lock_object,
751		    &contested, &waittime);
752#ifdef ADAPTIVE_RWLOCKS
753		/*
754		 * If the lock is write locked and the owner is
755		 * running on another CPU, spin until the owner stops
756		 * running or the state of the lock changes.
757		 */
758		v = rw->rw_lock;
759		owner = (struct thread *)RW_OWNER(v);
760		if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
761			if (LOCK_LOG_TEST(&rw->lock_object, 0))
762				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
763				    __func__, rw, owner);
764			while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
765			    TD_IS_RUNNING(owner)) {
766				cpu_spinwait();
767#ifdef KDTRACE_HOOKS
768				spin_cnt++;
769#endif
770			}
771			continue;
772		}
773		if ((v & RW_LOCK_READ) && RW_READERS(v) &&
774		    spintries < rowner_retries) {
775			if (!(v & RW_LOCK_WRITE_SPINNER)) {
776				if (!atomic_cmpset_ptr(&rw->rw_lock, v,
777				    v | RW_LOCK_WRITE_SPINNER)) {
778					continue;
779				}
780			}
781			spintries++;
782			for (i = 0; i < rowner_loops; i++) {
783				if ((rw->rw_lock & RW_LOCK_WRITE_SPINNER) == 0)
784					break;
785				cpu_spinwait();
786			}
787#ifdef KDTRACE_HOOKS
788			spin_cnt += rowner_loops - i;
789#endif
790			if (i != rowner_loops)
791				continue;
792		}
793#endif
794		ts = turnstile_trywait(&rw->lock_object);
795		v = rw->rw_lock;
796
797#ifdef ADAPTIVE_RWLOCKS
798		/*
799		 * The current lock owner might have started executing
800		 * on another CPU (or the lock could have changed
801		 * owners) while we were waiting on the turnstile
802		 * chain lock.  If so, drop the turnstile lock and try
803		 * again.
804		 */
805		if (!(v & RW_LOCK_READ)) {
806			owner = (struct thread *)RW_OWNER(v);
807			if (TD_IS_RUNNING(owner)) {
808				turnstile_cancel(ts);
809				continue;
810			}
811		}
812#endif
813		/*
814		 * Check for the waiters flags about this rwlock.
815		 * If the lock was released, without maintain any pending
816		 * waiters queue, simply try to acquire it.
817		 * If a pending waiters queue is present, claim the lock
818		 * ownership and maintain the pending queue.
819		 */
820		x = v & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
821		if ((v & ~x) == RW_UNLOCKED) {
822			x &= ~RW_LOCK_WRITE_SPINNER;
823			if (atomic_cmpset_acq_ptr(&rw->rw_lock, v, tid | x)) {
824				if (x)
825					turnstile_claim(ts);
826				else
827					turnstile_cancel(ts);
828				break;
829			}
830			turnstile_cancel(ts);
831			continue;
832		}
833		/*
834		 * If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
835		 * set it.  If we fail to set it, then loop back and try
836		 * again.
837		 */
838		if (!(v & RW_LOCK_WRITE_WAITERS)) {
839			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
840			    v | RW_LOCK_WRITE_WAITERS)) {
841				turnstile_cancel(ts);
842				continue;
843			}
844			if (LOCK_LOG_TEST(&rw->lock_object, 0))
845				CTR2(KTR_LOCK, "%s: %p set write waiters flag",
846				    __func__, rw);
847		}
848		/*
849		 * We were unable to acquire the lock and the write waiters
850		 * flag is set, so we must block on the turnstile.
851		 */
852		if (LOCK_LOG_TEST(&rw->lock_object, 0))
853			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
854			    rw);
855#ifdef KDTRACE_HOOKS
856		sleep_time -= lockstat_nsecs();
857#endif
858		turnstile_wait(ts, rw_owner(rw), TS_EXCLUSIVE_QUEUE);
859#ifdef KDTRACE_HOOKS
860		sleep_time += lockstat_nsecs();
861		sleep_cnt++;
862#endif
863		if (LOCK_LOG_TEST(&rw->lock_object, 0))
864			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
865			    __func__, rw);
866#ifdef ADAPTIVE_RWLOCKS
867		spintries = 0;
868#endif
869	}
870	LOCKSTAT_PROFILE_OBTAIN_LOCK_SUCCESS(LS_RW_WLOCK_ACQUIRE, rw, contested,
871	    waittime, file, line);
872#ifdef KDTRACE_HOOKS
873	if (sleep_time)
874		LOCKSTAT_RECORD1(LS_RW_WLOCK_BLOCK, rw, sleep_time);
875
876	/*
877	 * Record only the loops spinning and not sleeping.
878	 */
879	if (spin_cnt > sleep_cnt)
880		LOCKSTAT_RECORD1(LS_RW_WLOCK_SPIN, rw, (spin_cnt - sleep_cnt));
881#endif
882}
883
884/*
885 * This function is called if the first try at releasing a write lock failed.
886 * This means that one of the 2 waiter bits must be set indicating that at
887 * least one thread is waiting on this lock.
888 */
889void
890__rw_wunlock_hard(volatile uintptr_t *c, uintptr_t tid, const char *file,
891    int line)
892{
893	struct rwlock *rw;
894	struct turnstile *ts;
895	uintptr_t v;
896	int queue;
897
898	if (SCHEDULER_STOPPED())
899		return;
900
901	rw = rwlock2rw(c);
902
903	if (rw_wlocked(rw) && rw_recursed(rw)) {
904		rw->rw_recurse--;
905		if (LOCK_LOG_TEST(&rw->lock_object, 0))
906			CTR2(KTR_LOCK, "%s: %p unrecursing", __func__, rw);
907		return;
908	}
909
910	KASSERT(rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS),
911	    ("%s: neither of the waiter flags are set", __func__));
912
913	if (LOCK_LOG_TEST(&rw->lock_object, 0))
914		CTR2(KTR_LOCK, "%s: %p contested", __func__, rw);
915
916	turnstile_chain_lock(&rw->lock_object);
917	ts = turnstile_lookup(&rw->lock_object);
918	MPASS(ts != NULL);
919
920	/*
921	 * Use the same algo as sx locks for now.  Prefer waking up shared
922	 * waiters if we have any over writers.  This is probably not ideal.
923	 *
924	 * 'v' is the value we are going to write back to rw_lock.  If we
925	 * have waiters on both queues, we need to preserve the state of
926	 * the waiter flag for the queue we don't wake up.  For now this is
927	 * hardcoded for the algorithm mentioned above.
928	 *
929	 * In the case of both readers and writers waiting we wakeup the
930	 * readers but leave the RW_LOCK_WRITE_WAITERS flag set.  If a
931	 * new writer comes in before a reader it will claim the lock up
932	 * above.  There is probably a potential priority inversion in
933	 * there that could be worked around either by waking both queues
934	 * of waiters or doing some complicated lock handoff gymnastics.
935	 */
936	v = RW_UNLOCKED;
937	if (rw->rw_lock & RW_LOCK_WRITE_WAITERS) {
938		queue = TS_EXCLUSIVE_QUEUE;
939		v |= (rw->rw_lock & RW_LOCK_READ_WAITERS);
940	} else
941		queue = TS_SHARED_QUEUE;
942
943	/* Wake up all waiters for the specific queue. */
944	if (LOCK_LOG_TEST(&rw->lock_object, 0))
945		CTR3(KTR_LOCK, "%s: %p waking up %s waiters", __func__, rw,
946		    queue == TS_SHARED_QUEUE ? "read" : "write");
947	turnstile_broadcast(ts, queue);
948	atomic_store_rel_ptr(&rw->rw_lock, v);
949	turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
950	turnstile_chain_unlock(&rw->lock_object);
951}
952
953/*
954 * Attempt to do a non-blocking upgrade from a read lock to a write
955 * lock.  This will only succeed if this thread holds a single read
956 * lock.  Returns true if the upgrade succeeded and false otherwise.
957 */
958int
959__rw_try_upgrade(volatile uintptr_t *c, const char *file, int line)
960{
961	struct rwlock *rw;
962	uintptr_t v, x, tid;
963	struct turnstile *ts;
964	int success;
965
966	if (SCHEDULER_STOPPED())
967		return (1);
968
969	rw = rwlock2rw(c);
970
971	KASSERT(rw->rw_lock != RW_DESTROYED,
972	    ("rw_try_upgrade() of destroyed rwlock @ %s:%d", file, line));
973	__rw_assert(c, RA_RLOCKED, file, line);
974
975	/*
976	 * Attempt to switch from one reader to a writer.  If there
977	 * are any write waiters, then we will have to lock the
978	 * turnstile first to prevent races with another writer
979	 * calling turnstile_wait() before we have claimed this
980	 * turnstile.  So, do the simple case of no waiters first.
981	 */
982	tid = (uintptr_t)curthread;
983	success = 0;
984	for (;;) {
985		v = rw->rw_lock;
986		if (RW_READERS(v) > 1)
987			break;
988		if (!(v & RW_LOCK_WAITERS)) {
989			success = atomic_cmpset_ptr(&rw->rw_lock, v, tid);
990			if (!success)
991				continue;
992			break;
993		}
994
995		/*
996		 * Ok, we think we have waiters, so lock the turnstile.
997		 */
998		ts = turnstile_trywait(&rw->lock_object);
999		v = rw->rw_lock;
1000		if (RW_READERS(v) > 1) {
1001			turnstile_cancel(ts);
1002			break;
1003		}
1004		/*
1005		 * Try to switch from one reader to a writer again.  This time
1006		 * we honor the current state of the waiters flags.
1007		 * If we obtain the lock with the flags set, then claim
1008		 * ownership of the turnstile.
1009		 */
1010		x = rw->rw_lock & RW_LOCK_WAITERS;
1011		success = atomic_cmpset_ptr(&rw->rw_lock, v, tid | x);
1012		if (success) {
1013			if (x)
1014				turnstile_claim(ts);
1015			else
1016				turnstile_cancel(ts);
1017			break;
1018		}
1019		turnstile_cancel(ts);
1020	}
1021	LOCK_LOG_TRY("WUPGRADE", &rw->lock_object, 0, success, file, line);
1022	if (success) {
1023		curthread->td_rw_rlocks--;
1024		WITNESS_UPGRADE(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
1025		    file, line);
1026		LOCKSTAT_RECORD0(LS_RW_TRYUPGRADE_UPGRADE, rw);
1027	}
1028	return (success);
1029}
1030
1031/*
1032 * Downgrade a write lock into a single read lock.
1033 */
1034void
1035__rw_downgrade(volatile uintptr_t *c, const char *file, int line)
1036{
1037	struct rwlock *rw;
1038	struct turnstile *ts;
1039	uintptr_t tid, v;
1040	int rwait, wwait;
1041
1042	if (SCHEDULER_STOPPED())
1043		return;
1044
1045	rw = rwlock2rw(c);
1046
1047	KASSERT(rw->rw_lock != RW_DESTROYED,
1048	    ("rw_downgrade() of destroyed rwlock @ %s:%d", file, line));
1049	__rw_assert(c, RA_WLOCKED | RA_NOTRECURSED, file, line);
1050#ifndef INVARIANTS
1051	if (rw_recursed(rw))
1052		panic("downgrade of a recursed lock");
1053#endif
1054
1055	WITNESS_DOWNGRADE(&rw->lock_object, 0, file, line);
1056
1057	/*
1058	 * Convert from a writer to a single reader.  First we handle
1059	 * the easy case with no waiters.  If there are any waiters, we
1060	 * lock the turnstile and "disown" the lock.
1061	 */
1062	tid = (uintptr_t)curthread;
1063	if (atomic_cmpset_rel_ptr(&rw->rw_lock, tid, RW_READERS_LOCK(1)))
1064		goto out;
1065
1066	/*
1067	 * Ok, we think we have waiters, so lock the turnstile so we can
1068	 * read the waiter flags without any races.
1069	 */
1070	turnstile_chain_lock(&rw->lock_object);
1071	v = rw->rw_lock & RW_LOCK_WAITERS;
1072	rwait = v & RW_LOCK_READ_WAITERS;
1073	wwait = v & RW_LOCK_WRITE_WAITERS;
1074	MPASS(rwait | wwait);
1075
1076	/*
1077	 * Downgrade from a write lock while preserving waiters flag
1078	 * and give up ownership of the turnstile.
1079	 */
1080	ts = turnstile_lookup(&rw->lock_object);
1081	MPASS(ts != NULL);
1082	if (!wwait)
1083		v &= ~RW_LOCK_READ_WAITERS;
1084	atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v);
1085	/*
1086	 * Wake other readers if there are no writers pending.  Otherwise they
1087	 * won't be able to acquire the lock anyway.
1088	 */
1089	if (rwait && !wwait) {
1090		turnstile_broadcast(ts, TS_SHARED_QUEUE);
1091		turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
1092	} else
1093		turnstile_disown(ts);
1094	turnstile_chain_unlock(&rw->lock_object);
1095out:
1096	curthread->td_rw_rlocks++;
1097	LOCK_LOG_LOCK("WDOWNGRADE", &rw->lock_object, 0, 0, file, line);
1098	LOCKSTAT_RECORD0(LS_RW_DOWNGRADE_DOWNGRADE, rw);
1099}
1100
1101#ifdef INVARIANT_SUPPORT
1102#ifndef INVARIANTS
1103#undef __rw_assert
1104#endif
1105
1106/*
1107 * In the non-WITNESS case, rw_assert() can only detect that at least
1108 * *some* thread owns an rlock, but it cannot guarantee that *this*
1109 * thread owns an rlock.
1110 */
1111void
1112__rw_assert(const volatile uintptr_t *c, int what, const char *file, int line)
1113{
1114	const struct rwlock *rw;
1115
1116	if (panicstr != NULL)
1117		return;
1118
1119	rw = rwlock2rw(c);
1120
1121	switch (what) {
1122	case RA_LOCKED:
1123	case RA_LOCKED | RA_RECURSED:
1124	case RA_LOCKED | RA_NOTRECURSED:
1125	case RA_RLOCKED:
1126#ifdef WITNESS
1127		witness_assert(&rw->lock_object, what, file, line);
1128#else
1129		/*
1130		 * If some other thread has a write lock or we have one
1131		 * and are asserting a read lock, fail.  Also, if no one
1132		 * has a lock at all, fail.
1133		 */
1134		if (rw->rw_lock == RW_UNLOCKED ||
1135		    (!(rw->rw_lock & RW_LOCK_READ) && (what == RA_RLOCKED ||
1136		    rw_wowner(rw) != curthread)))
1137			panic("Lock %s not %slocked @ %s:%d\n",
1138			    rw->lock_object.lo_name, (what == RA_RLOCKED) ?
1139			    "read " : "", file, line);
1140
1141		if (!(rw->rw_lock & RW_LOCK_READ)) {
1142			if (rw_recursed(rw)) {
1143				if (what & RA_NOTRECURSED)
1144					panic("Lock %s recursed @ %s:%d\n",
1145					    rw->lock_object.lo_name, file,
1146					    line);
1147			} else if (what & RA_RECURSED)
1148				panic("Lock %s not recursed @ %s:%d\n",
1149				    rw->lock_object.lo_name, file, line);
1150		}
1151#endif
1152		break;
1153	case RA_WLOCKED:
1154	case RA_WLOCKED | RA_RECURSED:
1155	case RA_WLOCKED | RA_NOTRECURSED:
1156		if (rw_wowner(rw) != curthread)
1157			panic("Lock %s not exclusively locked @ %s:%d\n",
1158			    rw->lock_object.lo_name, file, line);
1159		if (rw_recursed(rw)) {
1160			if (what & RA_NOTRECURSED)
1161				panic("Lock %s recursed @ %s:%d\n",
1162				    rw->lock_object.lo_name, file, line);
1163		} else if (what & RA_RECURSED)
1164			panic("Lock %s not recursed @ %s:%d\n",
1165			    rw->lock_object.lo_name, file, line);
1166		break;
1167	case RA_UNLOCKED:
1168#ifdef WITNESS
1169		witness_assert(&rw->lock_object, what, file, line);
1170#else
1171		/*
1172		 * If we hold a write lock fail.  We can't reliably check
1173		 * to see if we hold a read lock or not.
1174		 */
1175		if (rw_wowner(rw) == curthread)
1176			panic("Lock %s exclusively locked @ %s:%d\n",
1177			    rw->lock_object.lo_name, file, line);
1178#endif
1179		break;
1180	default:
1181		panic("Unknown rw lock assertion: %d @ %s:%d", what, file,
1182		    line);
1183	}
1184}
1185#endif /* INVARIANT_SUPPORT */
1186
1187#ifdef DDB
1188void
1189db_show_rwlock(const struct lock_object *lock)
1190{
1191	const struct rwlock *rw;
1192	struct thread *td;
1193
1194	rw = (const struct rwlock *)lock;
1195
1196	db_printf(" state: ");
1197	if (rw->rw_lock == RW_UNLOCKED)
1198		db_printf("UNLOCKED\n");
1199	else if (rw->rw_lock == RW_DESTROYED) {
1200		db_printf("DESTROYED\n");
1201		return;
1202	} else if (rw->rw_lock & RW_LOCK_READ)
1203		db_printf("RLOCK: %ju locks\n",
1204		    (uintmax_t)(RW_READERS(rw->rw_lock)));
1205	else {
1206		td = rw_wowner(rw);
1207		db_printf("WLOCK: %p (tid %d, pid %d, \"%s\")\n", td,
1208		    td->td_tid, td->td_proc->p_pid, td->td_name);
1209		if (rw_recursed(rw))
1210			db_printf(" recursed: %u\n", rw->rw_recurse);
1211	}
1212	db_printf(" waiters: ");
1213	switch (rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS)) {
1214	case RW_LOCK_READ_WAITERS:
1215		db_printf("readers\n");
1216		break;
1217	case RW_LOCK_WRITE_WAITERS:
1218		db_printf("writers\n");
1219		break;
1220	case RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS:
1221		db_printf("readers and writers\n");
1222		break;
1223	default:
1224		db_printf("none\n");
1225		break;
1226	}
1227}
1228
1229#endif
1230