kern_rwlock.c revision 205626
1/*-
2 * Copyright (c) 2006 John Baldwin <jhb@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the author nor the names of any co-contributors
14 *    may be used to endorse or promote products derived from this software
15 *    without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30/*
31 * Machine independent bits of reader/writer lock implementation.
32 */
33
34#include <sys/cdefs.h>
35__FBSDID("$FreeBSD: head/sys/kern/kern_rwlock.c 205626 2010-03-24 19:21:26Z bz $");
36
37#include "opt_ddb.h"
38#include "opt_kdtrace.h"
39#include "opt_no_adaptive_rwlocks.h"
40
41#include <sys/param.h>
42#include <sys/ktr.h>
43#include <sys/kernel.h>
44#include <sys/lock.h>
45#include <sys/mutex.h>
46#include <sys/proc.h>
47#include <sys/rwlock.h>
48#include <sys/sysctl.h>
49#include <sys/systm.h>
50#include <sys/turnstile.h>
51
52#include <machine/cpu.h>
53
54#if defined(SMP) && !defined(NO_ADAPTIVE_RWLOCKS)
55#define	ADAPTIVE_RWLOCKS
56#endif
57
58#ifdef ADAPTIVE_RWLOCKS
59static int rowner_retries = 10;
60static int rowner_loops = 10000;
61SYSCTL_NODE(_debug, OID_AUTO, rwlock, CTLFLAG_RD, NULL, "rwlock debugging");
62SYSCTL_INT(_debug_rwlock, OID_AUTO, retry, CTLFLAG_RW, &rowner_retries, 0, "");
63SYSCTL_INT(_debug_rwlock, OID_AUTO, loops, CTLFLAG_RW, &rowner_loops, 0, "");
64#endif
65
66#ifdef DDB
67#include <ddb/ddb.h>
68
69static void	db_show_rwlock(struct lock_object *lock);
70#endif
71static void	assert_rw(struct lock_object *lock, int what);
72static void	lock_rw(struct lock_object *lock, int how);
73#ifdef KDTRACE_HOOKS
74static int	owner_rw(struct lock_object *lock, struct thread **owner);
75#endif
76static int	unlock_rw(struct lock_object *lock);
77
78struct lock_class lock_class_rw = {
79	.lc_name = "rw",
80	.lc_flags = LC_SLEEPLOCK | LC_RECURSABLE | LC_UPGRADABLE,
81	.lc_assert = assert_rw,
82#ifdef DDB
83	.lc_ddb_show = db_show_rwlock,
84#endif
85	.lc_lock = lock_rw,
86	.lc_unlock = unlock_rw,
87#ifdef KDTRACE_HOOKS
88	.lc_owner = owner_rw,
89#endif
90};
91
92/*
93 * Return a pointer to the owning thread if the lock is write-locked or
94 * NULL if the lock is unlocked or read-locked.
95 */
96#define	rw_wowner(rw)							\
97	((rw)->rw_lock & RW_LOCK_READ ? NULL :				\
98	    (struct thread *)RW_OWNER((rw)->rw_lock))
99
100/*
101 * Returns if a write owner is recursed.  Write ownership is not assured
102 * here and should be previously checked.
103 */
104#define	rw_recursed(rw)		((rw)->rw_recurse != 0)
105
106/*
107 * Return true if curthread helds the lock.
108 */
109#define	rw_wlocked(rw)		(rw_wowner((rw)) == curthread)
110
111/*
112 * Return a pointer to the owning thread for this lock who should receive
113 * any priority lent by threads that block on this lock.  Currently this
114 * is identical to rw_wowner().
115 */
116#define	rw_owner(rw)		rw_wowner(rw)
117
118#ifndef INVARIANTS
119#define	_rw_assert(rw, what, file, line)
120#endif
121
122void
123assert_rw(struct lock_object *lock, int what)
124{
125
126	rw_assert((struct rwlock *)lock, what);
127}
128
129void
130lock_rw(struct lock_object *lock, int how)
131{
132	struct rwlock *rw;
133
134	rw = (struct rwlock *)lock;
135	if (how)
136		rw_wlock(rw);
137	else
138		rw_rlock(rw);
139}
140
141int
142unlock_rw(struct lock_object *lock)
143{
144	struct rwlock *rw;
145
146	rw = (struct rwlock *)lock;
147	rw_assert(rw, RA_LOCKED | LA_NOTRECURSED);
148	if (rw->rw_lock & RW_LOCK_READ) {
149		rw_runlock(rw);
150		return (0);
151	} else {
152		rw_wunlock(rw);
153		return (1);
154	}
155}
156
157#ifdef KDTRACE_HOOKS
158int
159owner_rw(struct lock_object *lock, struct thread **owner)
160{
161	struct rwlock *rw = (struct rwlock *)lock;
162	uintptr_t x = rw->rw_lock;
163
164	*owner = rw_wowner(rw);
165	return ((x & RW_LOCK_READ) != 0 ?  (RW_READERS(x) != 0) :
166	    (*owner != NULL));
167}
168#endif
169
170void
171rw_init_flags(struct rwlock *rw, const char *name, int opts)
172{
173	int flags;
174
175	MPASS((opts & ~(RW_DUPOK | RW_NOPROFILE | RW_NOWITNESS | RW_QUIET |
176	    RW_RECURSE)) == 0);
177	ASSERT_ATOMIC_LOAD_PTR(rw->rw_lock,
178	    ("%s: rw_lock not aligned for %s: %p", __func__, name,
179	    &rw->rw_lock));
180
181	flags = LO_UPGRADABLE;
182	if (opts & RW_DUPOK)
183		flags |= LO_DUPOK;
184	if (opts & RW_NOPROFILE)
185		flags |= LO_NOPROFILE;
186	if (!(opts & RW_NOWITNESS))
187		flags |= LO_WITNESS;
188	if (opts & RW_RECURSE)
189		flags |= LO_RECURSABLE;
190	if (opts & RW_QUIET)
191		flags |= LO_QUIET;
192
193	rw->rw_lock = RW_UNLOCKED;
194	rw->rw_recurse = 0;
195	lock_init(&rw->lock_object, &lock_class_rw, name, NULL, flags);
196}
197
198void
199rw_destroy(struct rwlock *rw)
200{
201
202	KASSERT(rw->rw_lock == RW_UNLOCKED, ("rw lock %p not unlocked", rw));
203	KASSERT(rw->rw_recurse == 0, ("rw lock %p still recursed", rw));
204	rw->rw_lock = RW_DESTROYED;
205	lock_destroy(&rw->lock_object);
206}
207
208void
209rw_sysinit(void *arg)
210{
211	struct rw_args *args = arg;
212
213	rw_init(args->ra_rw, args->ra_desc);
214}
215
216void
217rw_sysinit_flags(void *arg)
218{
219	struct rw_args_flags *args = arg;
220
221	rw_init_flags(args->ra_rw, args->ra_desc, args->ra_flags);
222}
223
224int
225rw_wowned(struct rwlock *rw)
226{
227
228	return (rw_wowner(rw) == curthread);
229}
230
231void
232_rw_wlock(struct rwlock *rw, const char *file, int line)
233{
234
235	MPASS(curthread != NULL);
236	KASSERT(rw->rw_lock != RW_DESTROYED,
237	    ("rw_wlock() of destroyed rwlock @ %s:%d", file, line));
238	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER | LOP_EXCLUSIVE, file,
239	    line, NULL);
240	__rw_wlock(rw, curthread, file, line);
241	LOCK_LOG_LOCK("WLOCK", &rw->lock_object, 0, rw->rw_recurse, file, line);
242	WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
243	curthread->td_locks++;
244}
245
246int
247_rw_try_wlock(struct rwlock *rw, const char *file, int line)
248{
249	int rval;
250
251	KASSERT(rw->rw_lock != RW_DESTROYED,
252	    ("rw_try_wlock() of destroyed rwlock @ %s:%d", file, line));
253
254	if (rw_wlocked(rw) &&
255	    (rw->lock_object.lo_flags & LO_RECURSABLE) != 0) {
256		rw->rw_recurse++;
257		rval = 1;
258	} else
259		rval = atomic_cmpset_acq_ptr(&rw->rw_lock, RW_UNLOCKED,
260		    (uintptr_t)curthread);
261
262	LOCK_LOG_TRY("WLOCK", &rw->lock_object, 0, rval, file, line);
263	if (rval) {
264		WITNESS_LOCK(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
265		    file, line);
266		curthread->td_locks++;
267	}
268	return (rval);
269}
270
271void
272_rw_wunlock(struct rwlock *rw, const char *file, int line)
273{
274
275	MPASS(curthread != NULL);
276	KASSERT(rw->rw_lock != RW_DESTROYED,
277	    ("rw_wunlock() of destroyed rwlock @ %s:%d", file, line));
278	_rw_assert(rw, RA_WLOCKED, file, line);
279	curthread->td_locks--;
280	WITNESS_UNLOCK(&rw->lock_object, LOP_EXCLUSIVE, file, line);
281	LOCK_LOG_LOCK("WUNLOCK", &rw->lock_object, 0, rw->rw_recurse, file,
282	    line);
283	if (!rw_recursed(rw))
284		LOCKSTAT_PROFILE_RELEASE_LOCK(LS_RW_WUNLOCK_RELEASE, rw);
285	__rw_wunlock(rw, curthread, file, line);
286}
287/*
288 * Determines whether a new reader can acquire a lock.  Succeeds if the
289 * reader already owns a read lock and the lock is locked for read to
290 * prevent deadlock from reader recursion.  Also succeeds if the lock
291 * is unlocked and has no writer waiters or spinners.  Failing otherwise
292 * prioritizes writers before readers.
293 */
294#define	RW_CAN_READ(_rw)						\
295    ((curthread->td_rw_rlocks && (_rw) & RW_LOCK_READ) || ((_rw) &	\
296    (RW_LOCK_READ | RW_LOCK_WRITE_WAITERS | RW_LOCK_WRITE_SPINNER)) ==	\
297    RW_LOCK_READ)
298
299void
300_rw_rlock(struct rwlock *rw, const char *file, int line)
301{
302	struct turnstile *ts;
303#ifdef ADAPTIVE_RWLOCKS
304	volatile struct thread *owner;
305	int spintries = 0;
306	int i;
307#endif
308#ifdef LOCK_PROFILING
309	uint64_t waittime = 0;
310	int contested = 0;
311#endif
312	uintptr_t v;
313#ifdef KDTRACE_HOOKS
314	uint64_t spin_cnt = 0;
315	uint64_t sleep_cnt = 0;
316	int64_t sleep_time = 0;
317#endif
318
319	KASSERT(rw->rw_lock != RW_DESTROYED,
320	    ("rw_rlock() of destroyed rwlock @ %s:%d", file, line));
321	KASSERT(rw_wowner(rw) != curthread,
322	    ("%s (%s): wlock already held @ %s:%d", __func__,
323	    rw->lock_object.lo_name, file, line));
324	WITNESS_CHECKORDER(&rw->lock_object, LOP_NEWORDER, file, line, NULL);
325
326	for (;;) {
327#ifdef KDTRACE_HOOKS
328		spin_cnt++;
329#endif
330		/*
331		 * Handle the easy case.  If no other thread has a write
332		 * lock, then try to bump up the count of read locks.  Note
333		 * that we have to preserve the current state of the
334		 * RW_LOCK_WRITE_WAITERS flag.  If we fail to acquire a
335		 * read lock, then rw_lock must have changed, so restart
336		 * the loop.  Note that this handles the case of a
337		 * completely unlocked rwlock since such a lock is encoded
338		 * as a read lock with no waiters.
339		 */
340		v = rw->rw_lock;
341		if (RW_CAN_READ(v)) {
342			/*
343			 * The RW_LOCK_READ_WAITERS flag should only be set
344			 * if the lock has been unlocked and write waiters
345			 * were present.
346			 */
347			if (atomic_cmpset_acq_ptr(&rw->rw_lock, v,
348			    v + RW_ONE_READER)) {
349				if (LOCK_LOG_TEST(&rw->lock_object, 0))
350					CTR4(KTR_LOCK,
351					    "%s: %p succeed %p -> %p", __func__,
352					    rw, (void *)v,
353					    (void *)(v + RW_ONE_READER));
354				break;
355			}
356			continue;
357		}
358		lock_profile_obtain_lock_failed(&rw->lock_object,
359		    &contested, &waittime);
360
361#ifdef ADAPTIVE_RWLOCKS
362		/*
363		 * If the owner is running on another CPU, spin until
364		 * the owner stops running or the state of the lock
365		 * changes.
366		 */
367		if ((v & RW_LOCK_READ) == 0) {
368			owner = (struct thread *)RW_OWNER(v);
369			if (TD_IS_RUNNING(owner)) {
370				if (LOCK_LOG_TEST(&rw->lock_object, 0))
371					CTR3(KTR_LOCK,
372					    "%s: spinning on %p held by %p",
373					    __func__, rw, owner);
374				while ((struct thread*)RW_OWNER(rw->rw_lock) ==
375				    owner && TD_IS_RUNNING(owner)) {
376					cpu_spinwait();
377#ifdef KDTRACE_HOOKS
378					spin_cnt++;
379#endif
380				}
381				continue;
382			}
383		} else if (spintries < rowner_retries) {
384			spintries++;
385			for (i = 0; i < rowner_loops; i++) {
386				v = rw->rw_lock;
387				if ((v & RW_LOCK_READ) == 0 || RW_CAN_READ(v))
388					break;
389				cpu_spinwait();
390			}
391			if (i != rowner_loops)
392				continue;
393		}
394#endif
395
396		/*
397		 * Okay, now it's the hard case.  Some other thread already
398		 * has a write lock or there are write waiters present,
399		 * acquire the turnstile lock so we can begin the process
400		 * of blocking.
401		 */
402		ts = turnstile_trywait(&rw->lock_object);
403
404		/*
405		 * The lock might have been released while we spun, so
406		 * recheck its state and restart the loop if needed.
407		 */
408		v = rw->rw_lock;
409		if (RW_CAN_READ(v)) {
410			turnstile_cancel(ts);
411			continue;
412		}
413
414#ifdef ADAPTIVE_RWLOCKS
415		/*
416		 * The current lock owner might have started executing
417		 * on another CPU (or the lock could have changed
418		 * owners) while we were waiting on the turnstile
419		 * chain lock.  If so, drop the turnstile lock and try
420		 * again.
421		 */
422		if ((v & RW_LOCK_READ) == 0) {
423			owner = (struct thread *)RW_OWNER(v);
424			if (TD_IS_RUNNING(owner)) {
425				turnstile_cancel(ts);
426				continue;
427			}
428		}
429#endif
430
431		/*
432		 * The lock is held in write mode or it already has waiters.
433		 */
434		MPASS(!RW_CAN_READ(v));
435
436		/*
437		 * If the RW_LOCK_READ_WAITERS flag is already set, then
438		 * we can go ahead and block.  If it is not set then try
439		 * to set it.  If we fail to set it drop the turnstile
440		 * lock and restart the loop.
441		 */
442		if (!(v & RW_LOCK_READ_WAITERS)) {
443			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
444			    v | RW_LOCK_READ_WAITERS)) {
445				turnstile_cancel(ts);
446				continue;
447			}
448			if (LOCK_LOG_TEST(&rw->lock_object, 0))
449				CTR2(KTR_LOCK, "%s: %p set read waiters flag",
450				    __func__, rw);
451		}
452
453		/*
454		 * We were unable to acquire the lock and the read waiters
455		 * flag is set, so we must block on the turnstile.
456		 */
457		if (LOCK_LOG_TEST(&rw->lock_object, 0))
458			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
459			    rw);
460#ifdef KDTRACE_HOOKS
461		sleep_time -= lockstat_nsecs();
462#endif
463		turnstile_wait(ts, rw_owner(rw), TS_SHARED_QUEUE);
464#ifdef KDTRACE_HOOKS
465		sleep_time += lockstat_nsecs();
466		sleep_cnt++;
467#endif
468		if (LOCK_LOG_TEST(&rw->lock_object, 0))
469			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
470			    __func__, rw);
471	}
472
473	/*
474	 * TODO: acquire "owner of record" here.  Here be turnstile dragons
475	 * however.  turnstiles don't like owners changing between calls to
476	 * turnstile_wait() currently.
477	 */
478	LOCKSTAT_PROFILE_OBTAIN_LOCK_SUCCESS(LS_RW_RLOCK_ACQUIRE, rw, contested,
479	    waittime, file, line);
480	LOCK_LOG_LOCK("RLOCK", &rw->lock_object, 0, 0, file, line);
481	WITNESS_LOCK(&rw->lock_object, 0, file, line);
482	curthread->td_locks++;
483	curthread->td_rw_rlocks++;
484#ifdef KDTRACE_HOOKS
485	if (sleep_time)
486		LOCKSTAT_RECORD1(LS_RW_RLOCK_BLOCK, rw, sleep_time);
487
488	/*
489	 * Record only the loops spinning and not sleeping.
490	 */
491	if (spin_cnt > sleep_cnt)
492		LOCKSTAT_RECORD1(LS_RW_RLOCK_SPIN, rw, (spin_cnt - sleep_cnt));
493#endif
494}
495
496int
497_rw_try_rlock(struct rwlock *rw, const char *file, int line)
498{
499	uintptr_t x;
500
501	for (;;) {
502		x = rw->rw_lock;
503		KASSERT(rw->rw_lock != RW_DESTROYED,
504		    ("rw_try_rlock() of destroyed rwlock @ %s:%d", file, line));
505		if (!(x & RW_LOCK_READ))
506			break;
507		if (atomic_cmpset_acq_ptr(&rw->rw_lock, x, x + RW_ONE_READER)) {
508			LOCK_LOG_TRY("RLOCK", &rw->lock_object, 0, 1, file,
509			    line);
510			WITNESS_LOCK(&rw->lock_object, LOP_TRYLOCK, file, line);
511			curthread->td_locks++;
512			curthread->td_rw_rlocks++;
513			return (1);
514		}
515	}
516
517	LOCK_LOG_TRY("RLOCK", &rw->lock_object, 0, 0, file, line);
518	return (0);
519}
520
521void
522_rw_runlock(struct rwlock *rw, const char *file, int line)
523{
524	struct turnstile *ts;
525	uintptr_t x, v, queue;
526
527	KASSERT(rw->rw_lock != RW_DESTROYED,
528	    ("rw_runlock() of destroyed rwlock @ %s:%d", file, line));
529	_rw_assert(rw, RA_RLOCKED, file, line);
530	curthread->td_locks--;
531	curthread->td_rw_rlocks--;
532	WITNESS_UNLOCK(&rw->lock_object, 0, file, line);
533	LOCK_LOG_LOCK("RUNLOCK", &rw->lock_object, 0, 0, file, line);
534
535	/* TODO: drop "owner of record" here. */
536
537	for (;;) {
538		/*
539		 * See if there is more than one read lock held.  If so,
540		 * just drop one and return.
541		 */
542		x = rw->rw_lock;
543		if (RW_READERS(x) > 1) {
544			if (atomic_cmpset_rel_ptr(&rw->rw_lock, x,
545			    x - RW_ONE_READER)) {
546				if (LOCK_LOG_TEST(&rw->lock_object, 0))
547					CTR4(KTR_LOCK,
548					    "%s: %p succeeded %p -> %p",
549					    __func__, rw, (void *)x,
550					    (void *)(x - RW_ONE_READER));
551				break;
552			}
553			continue;
554		}
555		/*
556		 * If there aren't any waiters for a write lock, then try
557		 * to drop it quickly.
558		 */
559		if (!(x & RW_LOCK_WAITERS)) {
560			MPASS((x & ~RW_LOCK_WRITE_SPINNER) ==
561			    RW_READERS_LOCK(1));
562			if (atomic_cmpset_rel_ptr(&rw->rw_lock, x,
563			    RW_UNLOCKED)) {
564				if (LOCK_LOG_TEST(&rw->lock_object, 0))
565					CTR2(KTR_LOCK, "%s: %p last succeeded",
566					    __func__, rw);
567				break;
568			}
569			continue;
570		}
571		/*
572		 * Ok, we know we have waiters and we think we are the
573		 * last reader, so grab the turnstile lock.
574		 */
575		turnstile_chain_lock(&rw->lock_object);
576		v = rw->rw_lock & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
577		MPASS(v & RW_LOCK_WAITERS);
578
579		/*
580		 * Try to drop our lock leaving the lock in a unlocked
581		 * state.
582		 *
583		 * If you wanted to do explicit lock handoff you'd have to
584		 * do it here.  You'd also want to use turnstile_signal()
585		 * and you'd have to handle the race where a higher
586		 * priority thread blocks on the write lock before the
587		 * thread you wakeup actually runs and have the new thread
588		 * "steal" the lock.  For now it's a lot simpler to just
589		 * wakeup all of the waiters.
590		 *
591		 * As above, if we fail, then another thread might have
592		 * acquired a read lock, so drop the turnstile lock and
593		 * restart.
594		 */
595		x = RW_UNLOCKED;
596		if (v & RW_LOCK_WRITE_WAITERS) {
597			queue = TS_EXCLUSIVE_QUEUE;
598			x |= (v & RW_LOCK_READ_WAITERS);
599		} else
600			queue = TS_SHARED_QUEUE;
601		if (!atomic_cmpset_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v,
602		    x)) {
603			turnstile_chain_unlock(&rw->lock_object);
604			continue;
605		}
606		if (LOCK_LOG_TEST(&rw->lock_object, 0))
607			CTR2(KTR_LOCK, "%s: %p last succeeded with waiters",
608			    __func__, rw);
609
610		/*
611		 * Ok.  The lock is released and all that's left is to
612		 * wake up the waiters.  Note that the lock might not be
613		 * free anymore, but in that case the writers will just
614		 * block again if they run before the new lock holder(s)
615		 * release the lock.
616		 */
617		ts = turnstile_lookup(&rw->lock_object);
618		MPASS(ts != NULL);
619		turnstile_broadcast(ts, queue);
620		turnstile_unpend(ts, TS_SHARED_LOCK);
621		turnstile_chain_unlock(&rw->lock_object);
622		break;
623	}
624	LOCKSTAT_PROFILE_RELEASE_LOCK(LS_RW_RUNLOCK_RELEASE, rw);
625}
626
627/*
628 * This function is called when we are unable to obtain a write lock on the
629 * first try.  This means that at least one other thread holds either a
630 * read or write lock.
631 */
632void
633_rw_wlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
634{
635	struct turnstile *ts;
636#ifdef ADAPTIVE_RWLOCKS
637	volatile struct thread *owner;
638	int spintries = 0;
639	int i;
640#endif
641	uintptr_t v, x;
642#ifdef LOCK_PROFILING
643	uint64_t waittime = 0;
644	int contested = 0;
645#endif
646#ifdef KDTRACE_HOOKS
647	uint64_t spin_cnt = 0;
648	uint64_t sleep_cnt = 0;
649	int64_t sleep_time = 0;
650#endif
651
652	if (rw_wlocked(rw)) {
653		KASSERT(rw->lock_object.lo_flags & LO_RECURSABLE,
654		    ("%s: recursing but non-recursive rw %s @ %s:%d\n",
655		    __func__, rw->lock_object.lo_name, file, line));
656		rw->rw_recurse++;
657		if (LOCK_LOG_TEST(&rw->lock_object, 0))
658			CTR2(KTR_LOCK, "%s: %p recursing", __func__, rw);
659		return;
660	}
661
662	if (LOCK_LOG_TEST(&rw->lock_object, 0))
663		CTR5(KTR_LOCK, "%s: %s contested (lock=%p) at %s:%d", __func__,
664		    rw->lock_object.lo_name, (void *)rw->rw_lock, file, line);
665
666	while (!_rw_write_lock(rw, tid)) {
667#ifdef KDTRACE_HOOKS
668		spin_cnt++;
669#endif
670		lock_profile_obtain_lock_failed(&rw->lock_object,
671		    &contested, &waittime);
672#ifdef ADAPTIVE_RWLOCKS
673		/*
674		 * If the lock is write locked and the owner is
675		 * running on another CPU, spin until the owner stops
676		 * running or the state of the lock changes.
677		 */
678		v = rw->rw_lock;
679		owner = (struct thread *)RW_OWNER(v);
680		if (!(v & RW_LOCK_READ) && TD_IS_RUNNING(owner)) {
681			if (LOCK_LOG_TEST(&rw->lock_object, 0))
682				CTR3(KTR_LOCK, "%s: spinning on %p held by %p",
683				    __func__, rw, owner);
684			while ((struct thread*)RW_OWNER(rw->rw_lock) == owner &&
685			    TD_IS_RUNNING(owner)) {
686				cpu_spinwait();
687#ifdef KDTRACE_HOOKS
688				spin_cnt++;
689#endif
690			}
691			continue;
692		}
693		if ((v & RW_LOCK_READ) && RW_READERS(v) &&
694		    spintries < rowner_retries) {
695			if (!(v & RW_LOCK_WRITE_SPINNER)) {
696				if (!atomic_cmpset_ptr(&rw->rw_lock, v,
697				    v | RW_LOCK_WRITE_SPINNER)) {
698					continue;
699				}
700			}
701			spintries++;
702			for (i = 0; i < rowner_loops; i++) {
703				if ((rw->rw_lock & RW_LOCK_WRITE_SPINNER) == 0)
704					break;
705				cpu_spinwait();
706			}
707#ifdef KDTRACE_HOOKS
708			spin_cnt += rowner_loops - i;
709#endif
710			if (i != rowner_loops)
711				continue;
712		}
713#endif
714		ts = turnstile_trywait(&rw->lock_object);
715		v = rw->rw_lock;
716
717#ifdef ADAPTIVE_RWLOCKS
718		/*
719		 * The current lock owner might have started executing
720		 * on another CPU (or the lock could have changed
721		 * owners) while we were waiting on the turnstile
722		 * chain lock.  If so, drop the turnstile lock and try
723		 * again.
724		 */
725		if (!(v & RW_LOCK_READ)) {
726			owner = (struct thread *)RW_OWNER(v);
727			if (TD_IS_RUNNING(owner)) {
728				turnstile_cancel(ts);
729				continue;
730			}
731		}
732#endif
733		/*
734		 * Check for the waiters flags about this rwlock.
735		 * If the lock was released, without maintain any pending
736		 * waiters queue, simply try to acquire it.
737		 * If a pending waiters queue is present, claim the lock
738		 * ownership and maintain the pending queue.
739		 */
740		x = v & (RW_LOCK_WAITERS | RW_LOCK_WRITE_SPINNER);
741		if ((v & ~x) == RW_UNLOCKED) {
742			x &= ~RW_LOCK_WRITE_SPINNER;
743			if (atomic_cmpset_acq_ptr(&rw->rw_lock, v, tid | x)) {
744				if (x)
745					turnstile_claim(ts);
746				else
747					turnstile_cancel(ts);
748				break;
749			}
750			turnstile_cancel(ts);
751			continue;
752		}
753		/*
754		 * If the RW_LOCK_WRITE_WAITERS flag isn't set, then try to
755		 * set it.  If we fail to set it, then loop back and try
756		 * again.
757		 */
758		if (!(v & RW_LOCK_WRITE_WAITERS)) {
759			if (!atomic_cmpset_ptr(&rw->rw_lock, v,
760			    v | RW_LOCK_WRITE_WAITERS)) {
761				turnstile_cancel(ts);
762				continue;
763			}
764			if (LOCK_LOG_TEST(&rw->lock_object, 0))
765				CTR2(KTR_LOCK, "%s: %p set write waiters flag",
766				    __func__, rw);
767		}
768		/*
769		 * We were unable to acquire the lock and the write waiters
770		 * flag is set, so we must block on the turnstile.
771		 */
772		if (LOCK_LOG_TEST(&rw->lock_object, 0))
773			CTR2(KTR_LOCK, "%s: %p blocking on turnstile", __func__,
774			    rw);
775#ifdef KDTRACE_HOOKS
776		sleep_time -= lockstat_nsecs();
777#endif
778		turnstile_wait(ts, rw_owner(rw), TS_EXCLUSIVE_QUEUE);
779#ifdef KDTRACE_HOOKS
780		sleep_time += lockstat_nsecs();
781		sleep_cnt++;
782#endif
783		if (LOCK_LOG_TEST(&rw->lock_object, 0))
784			CTR2(KTR_LOCK, "%s: %p resuming from turnstile",
785			    __func__, rw);
786#ifdef ADAPTIVE_RWLOCKS
787		spintries = 0;
788#endif
789	}
790	LOCKSTAT_PROFILE_OBTAIN_LOCK_SUCCESS(LS_RW_WLOCK_ACQUIRE, rw, contested,
791	    waittime, file, line);
792#ifdef KDTRACE_HOOKS
793	if (sleep_time)
794		LOCKSTAT_RECORD1(LS_RW_WLOCK_BLOCK, rw, sleep_time);
795
796	/*
797	 * Record only the loops spinning and not sleeping.
798	 */
799	if (spin_cnt > sleep_cnt)
800		LOCKSTAT_RECORD1(LS_RW_WLOCK_SPIN, rw, (spin_cnt - sleep_cnt));
801#endif
802}
803
804/*
805 * This function is called if the first try at releasing a write lock failed.
806 * This means that one of the 2 waiter bits must be set indicating that at
807 * least one thread is waiting on this lock.
808 */
809void
810_rw_wunlock_hard(struct rwlock *rw, uintptr_t tid, const char *file, int line)
811{
812	struct turnstile *ts;
813	uintptr_t v;
814	int queue;
815
816	if (rw_wlocked(rw) && rw_recursed(rw)) {
817		rw->rw_recurse--;
818		if (LOCK_LOG_TEST(&rw->lock_object, 0))
819			CTR2(KTR_LOCK, "%s: %p unrecursing", __func__, rw);
820		return;
821	}
822
823	KASSERT(rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS),
824	    ("%s: neither of the waiter flags are set", __func__));
825
826	if (LOCK_LOG_TEST(&rw->lock_object, 0))
827		CTR2(KTR_LOCK, "%s: %p contested", __func__, rw);
828
829	turnstile_chain_lock(&rw->lock_object);
830	ts = turnstile_lookup(&rw->lock_object);
831	MPASS(ts != NULL);
832
833	/*
834	 * Use the same algo as sx locks for now.  Prefer waking up shared
835	 * waiters if we have any over writers.  This is probably not ideal.
836	 *
837	 * 'v' is the value we are going to write back to rw_lock.  If we
838	 * have waiters on both queues, we need to preserve the state of
839	 * the waiter flag for the queue we don't wake up.  For now this is
840	 * hardcoded for the algorithm mentioned above.
841	 *
842	 * In the case of both readers and writers waiting we wakeup the
843	 * readers but leave the RW_LOCK_WRITE_WAITERS flag set.  If a
844	 * new writer comes in before a reader it will claim the lock up
845	 * above.  There is probably a potential priority inversion in
846	 * there that could be worked around either by waking both queues
847	 * of waiters or doing some complicated lock handoff gymnastics.
848	 */
849	v = RW_UNLOCKED;
850	if (rw->rw_lock & RW_LOCK_WRITE_WAITERS) {
851		queue = TS_EXCLUSIVE_QUEUE;
852		v |= (rw->rw_lock & RW_LOCK_READ_WAITERS);
853	} else
854		queue = TS_SHARED_QUEUE;
855
856	/* Wake up all waiters for the specific queue. */
857	if (LOCK_LOG_TEST(&rw->lock_object, 0))
858		CTR3(KTR_LOCK, "%s: %p waking up %s waiters", __func__, rw,
859		    queue == TS_SHARED_QUEUE ? "read" : "write");
860	turnstile_broadcast(ts, queue);
861	atomic_store_rel_ptr(&rw->rw_lock, v);
862	turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
863	turnstile_chain_unlock(&rw->lock_object);
864}
865
866/*
867 * Attempt to do a non-blocking upgrade from a read lock to a write
868 * lock.  This will only succeed if this thread holds a single read
869 * lock.  Returns true if the upgrade succeeded and false otherwise.
870 */
871int
872_rw_try_upgrade(struct rwlock *rw, const char *file, int line)
873{
874	uintptr_t v, x, tid;
875	struct turnstile *ts;
876	int success;
877
878	KASSERT(rw->rw_lock != RW_DESTROYED,
879	    ("rw_try_upgrade() of destroyed rwlock @ %s:%d", file, line));
880	_rw_assert(rw, RA_RLOCKED, file, line);
881
882	/*
883	 * Attempt to switch from one reader to a writer.  If there
884	 * are any write waiters, then we will have to lock the
885	 * turnstile first to prevent races with another writer
886	 * calling turnstile_wait() before we have claimed this
887	 * turnstile.  So, do the simple case of no waiters first.
888	 */
889	tid = (uintptr_t)curthread;
890	success = 0;
891	for (;;) {
892		v = rw->rw_lock;
893		if (RW_READERS(v) > 1)
894			break;
895		if (!(v & RW_LOCK_WAITERS)) {
896			success = atomic_cmpset_ptr(&rw->rw_lock, v, tid);
897			if (!success)
898				continue;
899			break;
900		}
901
902		/*
903		 * Ok, we think we have waiters, so lock the turnstile.
904		 */
905		ts = turnstile_trywait(&rw->lock_object);
906		v = rw->rw_lock;
907		if (RW_READERS(v) > 1) {
908			turnstile_cancel(ts);
909			break;
910		}
911		/*
912		 * Try to switch from one reader to a writer again.  This time
913		 * we honor the current state of the waiters flags.
914		 * If we obtain the lock with the flags set, then claim
915		 * ownership of the turnstile.
916		 */
917		x = rw->rw_lock & RW_LOCK_WAITERS;
918		success = atomic_cmpset_ptr(&rw->rw_lock, v, tid | x);
919		if (success) {
920			if (x)
921				turnstile_claim(ts);
922			else
923				turnstile_cancel(ts);
924			break;
925		}
926		turnstile_cancel(ts);
927	}
928	LOCK_LOG_TRY("WUPGRADE", &rw->lock_object, 0, success, file, line);
929	if (success) {
930		curthread->td_rw_rlocks--;
931		WITNESS_UPGRADE(&rw->lock_object, LOP_EXCLUSIVE | LOP_TRYLOCK,
932		    file, line);
933		LOCKSTAT_RECORD0(LS_RW_TRYUPGRADE_UPGRADE, rw);
934	}
935	return (success);
936}
937
938/*
939 * Downgrade a write lock into a single read lock.
940 */
941void
942_rw_downgrade(struct rwlock *rw, const char *file, int line)
943{
944	struct turnstile *ts;
945	uintptr_t tid, v;
946	int rwait, wwait;
947
948	KASSERT(rw->rw_lock != RW_DESTROYED,
949	    ("rw_downgrade() of destroyed rwlock @ %s:%d", file, line));
950	_rw_assert(rw, RA_WLOCKED | RA_NOTRECURSED, file, line);
951#ifndef INVARIANTS
952	if (rw_recursed(rw))
953		panic("downgrade of a recursed lock");
954#endif
955
956	WITNESS_DOWNGRADE(&rw->lock_object, 0, file, line);
957
958	/*
959	 * Convert from a writer to a single reader.  First we handle
960	 * the easy case with no waiters.  If there are any waiters, we
961	 * lock the turnstile and "disown" the lock.
962	 */
963	tid = (uintptr_t)curthread;
964	if (atomic_cmpset_rel_ptr(&rw->rw_lock, tid, RW_READERS_LOCK(1)))
965		goto out;
966
967	/*
968	 * Ok, we think we have waiters, so lock the turnstile so we can
969	 * read the waiter flags without any races.
970	 */
971	turnstile_chain_lock(&rw->lock_object);
972	v = rw->rw_lock & RW_LOCK_WAITERS;
973	rwait = v & RW_LOCK_READ_WAITERS;
974	wwait = v & RW_LOCK_WRITE_WAITERS;
975	MPASS(rwait | wwait);
976
977	/*
978	 * Downgrade from a write lock while preserving waiters flag
979	 * and give up ownership of the turnstile.
980	 */
981	ts = turnstile_lookup(&rw->lock_object);
982	MPASS(ts != NULL);
983	if (!wwait)
984		v &= ~RW_LOCK_READ_WAITERS;
985	atomic_store_rel_ptr(&rw->rw_lock, RW_READERS_LOCK(1) | v);
986	/*
987	 * Wake other readers if there are no writers pending.  Otherwise they
988	 * won't be able to acquire the lock anyway.
989	 */
990	if (rwait && !wwait) {
991		turnstile_broadcast(ts, TS_SHARED_QUEUE);
992		turnstile_unpend(ts, TS_EXCLUSIVE_LOCK);
993	} else
994		turnstile_disown(ts);
995	turnstile_chain_unlock(&rw->lock_object);
996out:
997	curthread->td_rw_rlocks++;
998	LOCK_LOG_LOCK("WDOWNGRADE", &rw->lock_object, 0, 0, file, line);
999	LOCKSTAT_RECORD0(LS_RW_DOWNGRADE_DOWNGRADE, rw);
1000}
1001
1002#ifdef INVARIANT_SUPPORT
1003#ifndef INVARIANTS
1004#undef _rw_assert
1005#endif
1006
1007/*
1008 * In the non-WITNESS case, rw_assert() can only detect that at least
1009 * *some* thread owns an rlock, but it cannot guarantee that *this*
1010 * thread owns an rlock.
1011 */
1012void
1013_rw_assert(struct rwlock *rw, int what, const char *file, int line)
1014{
1015
1016	if (panicstr != NULL)
1017		return;
1018	switch (what) {
1019	case RA_LOCKED:
1020	case RA_LOCKED | RA_RECURSED:
1021	case RA_LOCKED | RA_NOTRECURSED:
1022	case RA_RLOCKED:
1023#ifdef WITNESS
1024		witness_assert(&rw->lock_object, what, file, line);
1025#else
1026		/*
1027		 * If some other thread has a write lock or we have one
1028		 * and are asserting a read lock, fail.  Also, if no one
1029		 * has a lock at all, fail.
1030		 */
1031		if (rw->rw_lock == RW_UNLOCKED ||
1032		    (!(rw->rw_lock & RW_LOCK_READ) && (what == RA_RLOCKED ||
1033		    rw_wowner(rw) != curthread)))
1034			panic("Lock %s not %slocked @ %s:%d\n",
1035			    rw->lock_object.lo_name, (what == RA_RLOCKED) ?
1036			    "read " : "", file, line);
1037
1038		if (!(rw->rw_lock & RW_LOCK_READ)) {
1039			if (rw_recursed(rw)) {
1040				if (what & RA_NOTRECURSED)
1041					panic("Lock %s recursed @ %s:%d\n",
1042					    rw->lock_object.lo_name, file,
1043					    line);
1044			} else if (what & RA_RECURSED)
1045				panic("Lock %s not recursed @ %s:%d\n",
1046				    rw->lock_object.lo_name, file, line);
1047		}
1048#endif
1049		break;
1050	case RA_WLOCKED:
1051	case RA_WLOCKED | RA_RECURSED:
1052	case RA_WLOCKED | RA_NOTRECURSED:
1053		if (rw_wowner(rw) != curthread)
1054			panic("Lock %s not exclusively locked @ %s:%d\n",
1055			    rw->lock_object.lo_name, file, line);
1056		if (rw_recursed(rw)) {
1057			if (what & RA_NOTRECURSED)
1058				panic("Lock %s recursed @ %s:%d\n",
1059				    rw->lock_object.lo_name, file, line);
1060		} else if (what & RA_RECURSED)
1061			panic("Lock %s not recursed @ %s:%d\n",
1062			    rw->lock_object.lo_name, file, line);
1063		break;
1064	case RA_UNLOCKED:
1065#ifdef WITNESS
1066		witness_assert(&rw->lock_object, what, file, line);
1067#else
1068		/*
1069		 * If we hold a write lock fail.  We can't reliably check
1070		 * to see if we hold a read lock or not.
1071		 */
1072		if (rw_wowner(rw) == curthread)
1073			panic("Lock %s exclusively locked @ %s:%d\n",
1074			    rw->lock_object.lo_name, file, line);
1075#endif
1076		break;
1077	default:
1078		panic("Unknown rw lock assertion: %d @ %s:%d", what, file,
1079		    line);
1080	}
1081}
1082#endif /* INVARIANT_SUPPORT */
1083
1084#ifdef DDB
1085void
1086db_show_rwlock(struct lock_object *lock)
1087{
1088	struct rwlock *rw;
1089	struct thread *td;
1090
1091	rw = (struct rwlock *)lock;
1092
1093	db_printf(" state: ");
1094	if (rw->rw_lock == RW_UNLOCKED)
1095		db_printf("UNLOCKED\n");
1096	else if (rw->rw_lock == RW_DESTROYED) {
1097		db_printf("DESTROYED\n");
1098		return;
1099	} else if (rw->rw_lock & RW_LOCK_READ)
1100		db_printf("RLOCK: %ju locks\n",
1101		    (uintmax_t)(RW_READERS(rw->rw_lock)));
1102	else {
1103		td = rw_wowner(rw);
1104		db_printf("WLOCK: %p (tid %d, pid %d, \"%s\")\n", td,
1105		    td->td_tid, td->td_proc->p_pid, td->td_name);
1106		if (rw_recursed(rw))
1107			db_printf(" recursed: %u\n", rw->rw_recurse);
1108	}
1109	db_printf(" waiters: ");
1110	switch (rw->rw_lock & (RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS)) {
1111	case RW_LOCK_READ_WAITERS:
1112		db_printf("readers\n");
1113		break;
1114	case RW_LOCK_WRITE_WAITERS:
1115		db_printf("writers\n");
1116		break;
1117	case RW_LOCK_READ_WAITERS | RW_LOCK_WRITE_WAITERS:
1118		db_printf("readers and writers\n");
1119		break;
1120	default:
1121		db_printf("none\n");
1122		break;
1123	}
1124}
1125
1126#endif
1127