1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 *
6 * $Id: lock.c,v 12.59 2008/05/07 12:27:35 bschmeck Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/lock.h"
13#include "dbinc/log.h"
14
15static int __lock_allocobj __P((DB_LOCKTAB *, u_int32_t));
16static int __lock_alloclock __P((DB_LOCKTAB *, u_int32_t));
17static int  __lock_freelock __P((DB_LOCKTAB *,
18		struct __db_lock *, DB_LOCKER *, u_int32_t));
19static int  __lock_getobj
20		__P((DB_LOCKTAB *, const DBT *, u_int32_t, int, DB_LOCKOBJ **));
21static int __lock_get_api __P((ENV *,
22		u_int32_t, u_int32_t, const DBT *, db_lockmode_t, DB_LOCK *));
23static int  __lock_inherit_locks __P ((DB_LOCKTAB *, DB_LOCKER *, u_int32_t));
24static int  __lock_is_parent __P((DB_LOCKTAB *, roff_t, DB_LOCKER *));
25static int  __lock_put_internal __P((DB_LOCKTAB *,
26		struct __db_lock *, u_int32_t,  u_int32_t));
27static int  __lock_put_nolock __P((ENV *, DB_LOCK *, int *, u_int32_t));
28static int __lock_remove_waiter __P((DB_LOCKTAB *,
29		DB_LOCKOBJ *, struct __db_lock *, db_status_t));
30static int __lock_trade __P((ENV *, DB_LOCK *, DB_LOCKER *));
31static int __lock_vec_api __P((ENV *,
32		u_int32_t, u_int32_t,  DB_LOCKREQ *, int, DB_LOCKREQ **));
33
34static const char __db_lock_invalid[] = "%s: Lock is no longer valid";
35static const char __db_locker_invalid[] = "Locker is not valid";
36
37/*
38 * __lock_vec_pp --
39 *	ENV->lock_vec pre/post processing.
40 *
41 * PUBLIC: int __lock_vec_pp __P((DB_ENV *,
42 * PUBLIC:     u_int32_t, u_int32_t, DB_LOCKREQ *, int, DB_LOCKREQ **));
43 */
44int
45__lock_vec_pp(dbenv, lid, flags, list, nlist, elistp)
46	DB_ENV *dbenv;
47	u_int32_t lid, flags;
48	int nlist;
49	DB_LOCKREQ *list, **elistp;
50{
51	DB_THREAD_INFO *ip;
52	ENV *env;
53	int ret;
54
55	env = dbenv->env;
56
57	ENV_REQUIRES_CONFIG(env,
58	    env->lk_handle, "DB_ENV->lock_vec", DB_INIT_LOCK);
59
60	/* Validate arguments. */
61	if ((ret = __db_fchk(env,
62	     "DB_ENV->lock_vec", flags, DB_LOCK_NOWAIT)) != 0)
63		return (ret);
64
65	ENV_ENTER(env, ip);
66	REPLICATION_WRAP(env,
67	     (__lock_vec_api(env, lid, flags, list, nlist, elistp)), 0, ret);
68	ENV_LEAVE(env, ip);
69	return (ret);
70}
71
72static int
73__lock_vec_api(env, lid, flags, list, nlist, elistp)
74	ENV *env;
75	u_int32_t lid, flags;
76	int nlist;
77	DB_LOCKREQ *list, **elistp;
78{
79	DB_LOCKER *sh_locker;
80	int ret;
81
82	if ((ret =
83	    __lock_getlocker(env->lk_handle, lid, 0, &sh_locker)) == 0)
84		ret = __lock_vec(env, sh_locker, flags, list, nlist, elistp);
85	return (ret);
86}
87
88/*
89 * __lock_vec --
90 *	ENV->lock_vec.
91 *
92 *	Vector lock routine.  This function takes a set of operations
93 *	and performs them all at once.  In addition, lock_vec provides
94 *	functionality for lock inheritance, releasing all locks for a
95 *	given locker (used during transaction commit/abort), releasing
96 *	all locks on a given object, and generating debugging information.
97 *
98 * PUBLIC: int __lock_vec __P((ENV *,
99 * PUBLIC:     DB_LOCKER *, u_int32_t, DB_LOCKREQ *, int, DB_LOCKREQ **));
100 */
101int
102__lock_vec(env, sh_locker, flags, list, nlist, elistp)
103	ENV *env;
104	DB_LOCKER *sh_locker;
105	u_int32_t flags;
106	int nlist;
107	DB_LOCKREQ *list, **elistp;
108{
109	struct __db_lock *lp, *next_lock;
110	DB_LOCK lock; DB_LOCKOBJ *sh_obj;
111	DB_LOCKREGION *region;
112	DB_LOCKTAB *lt;
113	DBT *objlist, *np;
114	u_int32_t ndx;
115	int did_abort, i, ret, run_dd, upgrade, writes;
116
117	/* Check if locks have been globally turned off. */
118	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
119		return (0);
120
121	lt = env->lk_handle;
122	region = lt->reginfo.primary;
123
124	run_dd = 0;
125	LOCK_SYSTEM_LOCK(lt, region);
126	for (i = 0, ret = 0; i < nlist && ret == 0; i++)
127		switch (list[i].op) {
128		case DB_LOCK_GET_TIMEOUT:
129			LF_SET(DB_LOCK_SET_TIMEOUT);
130			/* FALLTHROUGH */
131		case DB_LOCK_GET:
132			if (IS_RECOVERING(env)) {
133				LOCK_INIT(list[i].lock);
134				break;
135			}
136			ret = __lock_get_internal(lt,
137			    sh_locker, flags, list[i].obj,
138			    list[i].mode, list[i].timeout, &list[i].lock);
139			break;
140		case DB_LOCK_INHERIT:
141			ret = __lock_inherit_locks(lt, sh_locker, flags);
142			break;
143		case DB_LOCK_PUT:
144			ret = __lock_put_nolock(env,
145			    &list[i].lock, &run_dd, flags);
146			break;
147		case DB_LOCK_PUT_ALL:		/* Put all locks. */
148		case DB_LOCK_PUT_READ:		/* Put read locks. */
149		case DB_LOCK_UPGRADE_WRITE:
150				/* Upgrade was_write and put read locks. */
151			/*
152			 * Since the locker may hold no
153			 * locks (i.e., you could call abort before you've
154			 * done any work), it's perfectly reasonable for there
155			 * to be no locker; this is not an error.
156			 */
157			if (sh_locker == NULL)
158				/*
159				 * If ret is set, then we'll generate an
160				 * error.  If it's not set, we have nothing
161				 * to do.
162				 */
163				break;
164			upgrade = 0;
165			writes = 1;
166			if (list[i].op == DB_LOCK_PUT_READ)
167				writes = 0;
168			else if (list[i].op == DB_LOCK_UPGRADE_WRITE) {
169				if (F_ISSET(sh_locker, DB_LOCKER_DIRTY))
170					upgrade = 1;
171				writes = 0;
172			}
173			objlist = list[i].obj;
174			if (objlist != NULL) {
175				/*
176				 * We know these should be ilocks,
177				 * but they could be something else,
178				 * so allocate room for the size too.
179				 */
180				objlist->size =
181				     sh_locker->nwrites * sizeof(DBT);
182				if ((ret = __os_malloc(env,
183				     objlist->size, &objlist->data)) != 0)
184					goto up_done;
185				memset(objlist->data, 0, objlist->size);
186				np = (DBT *) objlist->data;
187			} else
188				np = NULL;
189
190			/* Now traverse the locks, releasing each one. */
191			for (lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock);
192			    lp != NULL; lp = next_lock) {
193				sh_obj = (DB_LOCKOBJ *)
194				    ((u_int8_t *)lp + lp->obj);
195				next_lock = SH_LIST_NEXT(lp,
196				    locker_links, __db_lock);
197				if (writes == 1 ||
198				    lp->mode == DB_LOCK_READ ||
199				    lp->mode == DB_LOCK_READ_UNCOMMITTED) {
200					SH_LIST_REMOVE(lp,
201					    locker_links, __db_lock);
202					sh_obj = (DB_LOCKOBJ *)
203					    ((u_int8_t *)lp + lp->obj);
204					ndx = sh_obj->indx;
205					OBJECT_LOCK_NDX(lt, region, ndx);
206					/*
207					 * We are not letting lock_put_internal
208					 * unlink the lock, so we'll have to
209					 * update counts here.
210					 */
211					if (lp->status == DB_LSTAT_HELD) {
212						DB_ASSERT(env,
213						    sh_locker->nlocks != 0);
214						sh_locker->nlocks--;
215						if (IS_WRITELOCK(lp->mode))
216							sh_locker->nwrites--;
217					}
218					ret = __lock_put_internal(lt, lp,
219					    sh_obj->indx,
220					    DB_LOCK_FREE | DB_LOCK_DOALL);
221					OBJECT_UNLOCK(lt, region, ndx);
222					if (ret != 0)
223						break;
224					continue;
225				}
226				if (objlist != NULL) {
227					DB_ASSERT(env, (u_int8_t *)np <
228					     (u_int8_t *)objlist->data +
229					     objlist->size);
230					np->data = SH_DBT_PTR(&sh_obj->lockobj);
231					np->size = sh_obj->lockobj.size;
232					np++;
233				}
234			}
235			if (ret != 0)
236				goto up_done;
237
238			if (objlist != NULL)
239				if ((ret = __lock_fix_list(env,
240				     objlist, sh_locker->nwrites)) != 0)
241					goto up_done;
242			switch (list[i].op) {
243			case DB_LOCK_UPGRADE_WRITE:
244				/*
245				 * Upgrade all WWRITE locks to WRITE so
246				 * that we can abort a transaction which
247				 * was supporting dirty readers.
248				 */
249				if (upgrade != 1)
250					goto up_done;
251				SH_LIST_FOREACH(lp, &sh_locker->heldby,
252				    locker_links, __db_lock) {
253					if (lp->mode != DB_LOCK_WWRITE)
254						continue;
255					lock.off = R_OFFSET(&lt->reginfo, lp);
256					lock.gen = lp->gen;
257					F_SET(sh_locker, DB_LOCKER_INABORT);
258					if ((ret = __lock_get_internal(lt,
259					    sh_locker, flags | DB_LOCK_UPGRADE,
260					    NULL, DB_LOCK_WRITE, 0, &lock)) !=0)
261						break;
262				}
263			up_done:
264				/* FALLTHROUGH */
265			case DB_LOCK_PUT_READ:
266			case DB_LOCK_PUT_ALL:
267				break;
268			default:
269				break;
270			}
271			break;
272		case DB_LOCK_PUT_OBJ:
273			/* Remove all the locks associated with an object. */
274			OBJECT_LOCK(lt, region, list[i].obj, ndx);
275			if ((ret = __lock_getobj(lt, list[i].obj,
276			    ndx, 0, &sh_obj)) != 0 || sh_obj == NULL) {
277				if (ret == 0)
278					ret = EINVAL;
279				OBJECT_UNLOCK(lt, region, ndx);
280				break;
281			}
282
283			/*
284			 * Go through both waiters and holders.  Don't bother
285			 * to run promotion, because everyone is getting
286			 * released.  The processes waiting will still get
287			 * awakened as their waiters are released.
288			 */
289			for (lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock);
290			    ret == 0 && lp != NULL;
291			    lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock))
292				ret = __lock_put_internal(lt, lp, ndx,
293				    DB_LOCK_UNLINK |
294				    DB_LOCK_NOPROMOTE | DB_LOCK_DOALL);
295
296			/*
297			 * On the last time around, the object will get
298			 * reclaimed by __lock_put_internal, structure the
299			 * loop carefully so we do not get bitten.
300			 */
301			for (lp = SH_TAILQ_FIRST(&sh_obj->holders, __db_lock);
302			    ret == 0 && lp != NULL;
303			    lp = next_lock) {
304				next_lock = SH_TAILQ_NEXT(lp, links, __db_lock);
305				ret = __lock_put_internal(lt, lp, ndx,
306				    DB_LOCK_UNLINK |
307				    DB_LOCK_NOPROMOTE | DB_LOCK_DOALL);
308			}
309			OBJECT_UNLOCK(lt, region, ndx);
310			break;
311
312		case DB_LOCK_TIMEOUT:
313			ret = __lock_set_timeout_internal(env,
314			    sh_locker, 0, DB_SET_TXN_NOW);
315			break;
316
317		case DB_LOCK_TRADE:
318			/*
319			 * INTERNAL USE ONLY.
320			 * Change the holder of the lock described in
321			 * list[i].lock to the locker-id specified by
322			 * the locker parameter.
323			 */
324			/*
325			 * You had better know what you're doing here.
326			 * We are trading locker-id's on a lock to
327			 * facilitate file locking on open DB handles.
328			 * We do not do any conflict checking on this,
329			 * so heaven help you if you use this flag under
330			 * any other circumstances.
331			 */
332			ret = __lock_trade(env, &list[i].lock, sh_locker);
333			break;
334#if defined(DEBUG) && defined(HAVE_STATISTICS)
335		case DB_LOCK_DUMP:
336			if (sh_locker == NULL)
337				break;
338
339			SH_LIST_FOREACH(
340			    lp, &sh_locker->heldby, locker_links, __db_lock)
341				__lock_printlock(lt, NULL, lp, 1);
342			break;
343#endif
344		default:
345			__db_errx(env,
346			    "Invalid lock operation: %d", list[i].op);
347			ret = EINVAL;
348			break;
349		}
350
351	if (ret == 0 && region->detect != DB_LOCK_NORUN &&
352	     (region->need_dd || timespecisset(&region->next_timeout)))
353		run_dd = 1;
354	LOCK_SYSTEM_UNLOCK(lt, region);
355
356	if (run_dd)
357		(void)__lock_detect(env, region->detect, &did_abort);
358
359	if (ret != 0 && elistp != NULL)
360		*elistp = &list[i - 1];
361
362	return (ret);
363}
364
365/*
366 * __lock_get_pp --
367 *	ENV->lock_get pre/post processing.
368 *
369 * PUBLIC: int __lock_get_pp __P((DB_ENV *,
370 * PUBLIC:     u_int32_t, u_int32_t, const DBT *, db_lockmode_t, DB_LOCK *));
371 */
372int
373__lock_get_pp(dbenv, locker, flags, obj, lock_mode, lock)
374	DB_ENV *dbenv;
375	u_int32_t locker, flags;
376	const DBT *obj;
377	db_lockmode_t lock_mode;
378	DB_LOCK *lock;
379{
380	DB_THREAD_INFO *ip;
381	ENV *env;
382	int ret;
383
384	env = dbenv->env;
385
386	ENV_REQUIRES_CONFIG(env,
387	    env->lk_handle, "DB_ENV->lock_get", DB_INIT_LOCK);
388
389	/* Validate arguments. */
390	if ((ret = __db_fchk(env, "DB_ENV->lock_get", flags,
391	    DB_LOCK_NOWAIT | DB_LOCK_UPGRADE | DB_LOCK_SWITCH)) != 0)
392		return (ret);
393
394	ENV_ENTER(env, ip);
395	REPLICATION_WRAP(env,
396	     (__lock_get_api(env, locker, flags, obj, lock_mode, lock)),
397	     0, ret);
398	ENV_LEAVE(env, ip);
399	return (ret);
400}
401
402static int
403__lock_get_api(env, locker, flags, obj, lock_mode, lock)
404	ENV *env;
405	u_int32_t locker, flags;
406	const DBT *obj;
407	db_lockmode_t lock_mode;
408	DB_LOCK *lock;
409{
410	DB_LOCKER *sh_locker;
411	DB_LOCKREGION *region;
412	int ret;
413
414	COMPQUIET(region, NULL);
415
416	region = env->lk_handle->reginfo.primary;
417
418	LOCK_SYSTEM_LOCK(env->lk_handle, region);
419	LOCK_LOCKERS(env, region);
420	ret = __lock_getlocker_int(env->lk_handle, locker, 0, &sh_locker);
421	UNLOCK_LOCKERS(env, region);
422	if (ret == 0)
423		ret = __lock_get_internal(env->lk_handle,
424		    sh_locker, flags, obj, lock_mode, 0, lock);
425	LOCK_SYSTEM_UNLOCK(env->lk_handle, region);
426	return (ret);
427}
428
429/*
430 * __lock_get --
431 *	ENV->lock_get.
432 *
433 * PUBLIC: int __lock_get __P((ENV *,
434 * PUBLIC:     DB_LOCKER *, u_int32_t, const DBT *, db_lockmode_t, DB_LOCK *));
435 */
436int
437__lock_get(env, locker, flags, obj, lock_mode, lock)
438	ENV *env;
439	DB_LOCKER *locker;
440	u_int32_t flags;
441	const DBT *obj;
442	db_lockmode_t lock_mode;
443	DB_LOCK *lock;
444{
445	DB_LOCKTAB *lt;
446	int ret;
447
448	lt = env->lk_handle;
449
450	if (IS_RECOVERING(env)) {
451		LOCK_INIT(*lock);
452		return (0);
453	}
454
455	LOCK_SYSTEM_LOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
456	ret = __lock_get_internal(lt, locker, flags, obj, lock_mode, 0, lock);
457	LOCK_SYSTEM_UNLOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
458	return (ret);
459}
460/*
461 * __lock_alloclock -- allocate a lock from another partition.
462 *	We assume we have the partition locked on entry and leave
463 * it unlocked on exit since we will have to retry the lock operation.
464 */
465static int
466__lock_alloclock(lt, part_id)
467	DB_LOCKTAB *lt;
468	u_int32_t part_id;
469{
470	struct __db_lock *sh_lock;
471	DB_LOCKPART *end_p, *cur_p;
472	DB_LOCKREGION *region;
473	int begin;
474
475	region = lt->reginfo.primary;
476
477	if (region->part_t_size == 1)
478		goto err;
479
480	begin = 0;
481	sh_lock = NULL;
482	cur_p = &lt->part_array[part_id];
483	MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
484	end_p = &lt->part_array[region->part_t_size];
485	/*
486	 * Start looking at the next partition and wrap around.  If
487	 * we get back to our partition then raise an error.
488	 */
489again:	for (cur_p++; sh_lock == NULL && cur_p < end_p; cur_p++) {
490		MUTEX_LOCK(lt->env, cur_p->mtx_part);
491		if ((sh_lock =
492		    SH_TAILQ_FIRST(&cur_p->free_locks, __db_lock)) != NULL)
493			SH_TAILQ_REMOVE(&cur_p->free_locks,
494			    sh_lock, links, __db_lock);
495		MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
496	}
497	if (sh_lock != NULL) {
498		cur_p = &lt->part_array[part_id];
499		MUTEX_LOCK(lt->env, cur_p->mtx_part);
500		SH_TAILQ_INSERT_HEAD(&cur_p->free_locks,
501		    sh_lock, links, __db_lock);
502		STAT(cur_p->part_stat.st_locksteals++);
503		MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
504		return (0);
505	}
506	if (!begin) {
507		begin = 1;
508		cur_p = lt->part_array;
509		end_p = &lt->part_array[part_id];
510		goto again;
511	}
512
513err:	return (__lock_nomem(lt->env, "lock entries"));
514}
515
516/*
517 * __lock_get_internal --
518 *	All the work for lock_get (and for the GET option of lock_vec) is done
519 *	inside of lock_get_internal.
520 *
521 * PUBLIC: int  __lock_get_internal __P((DB_LOCKTAB *, DB_LOCKER *, u_int32_t,
522 * PUBLIC:     const DBT *, db_lockmode_t, db_timeout_t, DB_LOCK *));
523 */
524int
525__lock_get_internal(lt, sh_locker, flags, obj, lock_mode, timeout, lock)
526	DB_LOCKTAB *lt;
527	DB_LOCKER *sh_locker;
528	u_int32_t flags;
529	const DBT *obj;
530	db_lockmode_t lock_mode;
531	db_timeout_t timeout;
532	DB_LOCK *lock;
533{
534	struct __db_lock *newl, *lp;
535	ENV *env;
536	DB_LOCKOBJ *sh_obj;
537	DB_LOCKREGION *region;
538	DB_THREAD_INFO *ip;
539	u_int32_t ndx, part_id;
540	int did_abort, ihold, grant_dirty, no_dd, ret, t_ret;
541	roff_t holder, sh_off;
542
543	/*
544	 * We decide what action to take based on what locks are already held
545	 * and what locks are in the wait queue.
546	 */
547	enum {
548		GRANT,		/* Grant the lock. */
549		UPGRADE,	/* Upgrade the lock. */
550		HEAD,		/* Wait at head of wait queue. */
551		SECOND,		/* Wait as the second waiter. */
552		TAIL		/* Wait at tail of the wait queue. */
553	} action;
554
555	env = lt->env;
556	region = lt->reginfo.primary;
557
558	/* Check if locks have been globally turned off. */
559	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
560		return (0);
561
562	if (sh_locker == NULL) {
563		__db_errx(env, "Locker does not exist");
564		return (EINVAL);
565	}
566
567	no_dd = ret = 0;
568	newl = NULL;
569	sh_obj = NULL;
570
571	/* Check that the lock mode is valid.  */
572	if (lock_mode >= (db_lockmode_t)region->stat.st_nmodes) {
573		__db_errx(env, "DB_ENV->lock_get: invalid lock mode %lu",
574		    (u_long)lock_mode);
575		return (EINVAL);
576	}
577
578again:	if (obj == NULL) {
579		DB_ASSERT(env, LOCK_ISSET(*lock));
580		lp = R_ADDR(&lt->reginfo, lock->off);
581		sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
582		ndx = sh_obj->indx;
583		OBJECT_LOCK_NDX(lt, region, ndx);
584	} else {
585		/* Allocate a shared memory new object. */
586		OBJECT_LOCK(lt, region, obj, lock->ndx);
587		ndx = lock->ndx;
588		if ((ret = __lock_getobj(lt, obj, lock->ndx, 1, &sh_obj)) != 0)
589			goto err;
590	}
591
592#ifdef HAVE_STATISTICS
593	if (LF_ISSET(DB_LOCK_UPGRADE))
594		lt->obj_stat[ndx].st_nupgrade++;
595	else if (!LF_ISSET(DB_LOCK_SWITCH))
596		lt->obj_stat[ndx].st_nrequests++;
597#endif
598
599	/*
600	 * Figure out if we can grant this lock or if it should wait.
601	 * By default, we can grant the new lock if it does not conflict with
602	 * anyone on the holders list OR anyone on the waiters list.
603	 * The reason that we don't grant if there's a conflict is that
604	 * this can lead to starvation (a writer waiting on a popularly
605	 * read item will never be granted).  The downside of this is that
606	 * a waiting reader can prevent an upgrade from reader to writer,
607	 * which is not uncommon.
608	 *
609	 * There are two exceptions to the no-conflict rule.  First, if
610	 * a lock is held by the requesting locker AND the new lock does
611	 * not conflict with any other holders, then we grant the lock.
612	 * The most common place this happens is when the holder has a
613	 * WRITE lock and a READ lock request comes in for the same locker.
614	 * If we do not grant the read lock, then we guarantee deadlock.
615	 * Second, dirty readers are granted if at all possible while
616	 * avoiding starvation, see below.
617	 *
618	 * In case of conflict, we put the new lock on the end of the waiters
619	 * list, unless we are upgrading or this is a dirty reader in which
620	 * case the locker goes at or near the front of the list.
621	 */
622	ihold = 0;
623	grant_dirty = 0;
624	holder = 0;
625
626	/*
627	 * SWITCH is a special case, used by the queue access method
628	 * when we want to get an entry which is past the end of the queue.
629	 * We have a DB_READ_LOCK and need to switch it to DB_LOCK_WAIT and
630	 * join the waiters queue.  This must be done as a single operation
631	 * so that another locker cannot get in and fail to wake us up.
632	 */
633	if (LF_ISSET(DB_LOCK_SWITCH))
634		lp = NULL;
635	else
636		lp = SH_TAILQ_FIRST(&sh_obj->holders, __db_lock);
637
638	sh_off = R_OFFSET(&lt->reginfo, sh_locker);
639	for (; lp != NULL; lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
640		DB_ASSERT(env, lp->status != DB_LSTAT_FREE);
641		if (sh_off == lp->holder) {
642			if (lp->mode == lock_mode &&
643			    lp->status == DB_LSTAT_HELD) {
644				if (LF_ISSET(DB_LOCK_UPGRADE))
645					goto upgrade;
646
647				/*
648				 * Lock is held, so we can increment the
649				 * reference count and return this lock
650				 * to the caller.  We do not count reference
651				 * increments towards the locks held by
652				 * the locker.
653				 */
654				lp->refcount++;
655				lock->off = R_OFFSET(&lt->reginfo, lp);
656				lock->gen = lp->gen;
657				lock->mode = lp->mode;
658				goto done;
659			} else {
660				ihold = 1;
661			}
662		} else if (__lock_is_parent(lt, lp->holder, sh_locker))
663			ihold = 1;
664		else if (CONFLICTS(lt, region, lp->mode, lock_mode))
665			break;
666		else if (lp->mode == DB_LOCK_READ ||
667		     lp->mode == DB_LOCK_WWRITE) {
668			grant_dirty = 1;
669			holder = lp->holder;
670		}
671	}
672
673	/*
674	 * If there are conflicting holders we will have to wait.  If we
675	 * already hold a lock on this object or are doing an upgrade or
676	 * this is a dirty reader it goes to the head of the queue, everyone
677	 * else to the back.
678	 */
679	if (lp != NULL) {
680		if (ihold || LF_ISSET(DB_LOCK_UPGRADE) ||
681		    lock_mode == DB_LOCK_READ_UNCOMMITTED)
682			action = HEAD;
683		else
684			action = TAIL;
685	} else {
686		if (LF_ISSET(DB_LOCK_SWITCH))
687			action = TAIL;
688		else if (LF_ISSET(DB_LOCK_UPGRADE))
689			action = UPGRADE;
690		else  if (ihold)
691			action = GRANT;
692		else {
693			/*
694			 * Look for conflicting waiters.
695			 */
696			SH_TAILQ_FOREACH(
697			    lp, &sh_obj->waiters, links, __db_lock)
698				if (CONFLICTS(lt, region, lp->mode,
699				     lock_mode) && sh_off != lp->holder)
700					break;
701
702			/*
703			 * If there are no conflicting holders or waiters,
704			 * then we grant. Normally when we wait, we
705			 * wait at the end (TAIL).  However, the goal of
706			 * DIRTY_READ locks to allow forward progress in the
707			 * face of updating transactions, so we try to allow
708			 * all DIRTY_READ requests to proceed as rapidly
709			 * as possible, so long as we can prevent starvation.
710			 *
711			 * When determining how to queue a DIRTY_READ
712			 * request:
713			 *
714			 *	1. If there is a waiting upgrading writer,
715			 *	   then we enqueue the dirty reader BEHIND it
716			 *	   (second in the queue).
717			 *	2. Else, if the current holders are either
718			 *	   READ or WWRITE, we grant
719			 *	3. Else queue SECOND i.e., behind the first
720			 *	   waiter.
721			 *
722			 * The end result is that dirty_readers get to run
723			 * so long as other lockers are blocked.  Once
724			 * there is a locker which is only waiting on
725			 * dirty readers then they queue up behind that
726			 * locker so that it gets to run.  In general
727			 * this locker will be a WRITE which will shortly
728			 * get downgraded to a WWRITE, permitting the
729			 * DIRTY locks to be granted.
730			 */
731			if (lp == NULL)
732				action = GRANT;
733			else if (grant_dirty &&
734			    lock_mode == DB_LOCK_READ_UNCOMMITTED) {
735				/*
736				 * An upgrade will be at the head of the
737				 * queue.
738				 */
739				lp = SH_TAILQ_FIRST(
740				     &sh_obj->waiters, __db_lock);
741				if (lp->mode == DB_LOCK_WRITE &&
742				     lp->holder == holder)
743					action = SECOND;
744				else
745					action = GRANT;
746			} else if (lock_mode == DB_LOCK_READ_UNCOMMITTED)
747				action = SECOND;
748			else
749				action = TAIL;
750		}
751	}
752
753	switch (action) {
754	case HEAD:
755	case TAIL:
756	case SECOND:
757	case GRANT:
758		part_id = LOCK_PART(region, ndx);
759		/* Allocate a new lock. */
760		if ((newl = SH_TAILQ_FIRST(
761		    &FREE_LOCKS(lt, part_id), __db_lock)) == NULL) {
762			if ((ret = __lock_alloclock(lt, part_id)) != 0)
763				goto err;
764			/* Dropped the mutex start over. */
765			goto again;
766		}
767		SH_TAILQ_REMOVE(
768		    &FREE_LOCKS(lt, part_id), newl, links, __db_lock);
769
770#ifdef HAVE_STATISTICS
771		/*
772		 * Keep track of the maximum number of locks allocated
773		 * in each partition and the maximum number of locks
774		 * used by any one bucket.
775		 */
776		if (++lt->obj_stat[ndx].st_nlocks >
777		    lt->obj_stat[ndx].st_maxnlocks)
778			lt->obj_stat[ndx].st_maxnlocks =
779			    lt->obj_stat[ndx].st_nlocks;
780		if (++lt->part_array[part_id].part_stat.st_nlocks >
781		    lt->part_array[part_id].part_stat.st_maxnlocks)
782			lt->part_array[part_id].part_stat.st_maxnlocks =
783			    lt->part_array[part_id].part_stat.st_nlocks;
784#endif
785
786		/*
787		 * Allocate a mutex if we do not have a mutex backing the lock.
788		 *
789		 * Use the lock mutex to block the thread; lock the mutex
790		 * when it is allocated so that we will block when we try
791		 * to lock it again.  We will wake up when another thread
792		 * grants the lock and releases the mutex.  We leave it
793		 * locked for the next use of this lock object.
794		 */
795		if (newl->mtx_lock == MUTEX_INVALID) {
796			if ((ret = __mutex_alloc(env, MTX_LOGICAL_LOCK,
797			    DB_MUTEX_LOGICAL_LOCK | DB_MUTEX_SELF_BLOCK,
798			    &newl->mtx_lock)) != 0)
799				goto err;
800			MUTEX_LOCK(env, newl->mtx_lock);
801		}
802
803		newl->holder = R_OFFSET(&lt->reginfo, sh_locker);
804		newl->refcount = 1;
805		newl->mode = lock_mode;
806		newl->obj = (roff_t)SH_PTR_TO_OFF(newl, sh_obj);
807		newl->indx = sh_obj->indx;
808		/*
809		 * Now, insert the lock onto its locker's list.
810		 * If the locker does not currently hold any locks,
811		 * there's no reason to run a deadlock
812		 * detector, save that information.
813		 */
814		no_dd = sh_locker->master_locker == INVALID_ROFF &&
815		    SH_LIST_FIRST(
816		    &sh_locker->child_locker, __db_locker) == NULL &&
817		    SH_LIST_FIRST(&sh_locker->heldby, __db_lock) == NULL;
818
819		SH_LIST_INSERT_HEAD(
820		    &sh_locker->heldby, newl, locker_links, __db_lock);
821		break;
822
823	case UPGRADE:
824upgrade:	lp = R_ADDR(&lt->reginfo, lock->off);
825		if (IS_WRITELOCK(lock_mode) && !IS_WRITELOCK(lp->mode))
826			sh_locker->nwrites++;
827		lp->mode = lock_mode;
828		goto done;
829	}
830
831	switch (action) {
832	case UPGRADE:
833		DB_ASSERT(env, 0);
834		break;
835	case GRANT:
836		newl->status = DB_LSTAT_HELD;
837		SH_TAILQ_INSERT_TAIL(&sh_obj->holders, newl, links);
838		break;
839	case HEAD:
840	case TAIL:
841	case SECOND:
842		if (LF_ISSET(DB_LOCK_NOWAIT)) {
843			ret = DB_LOCK_NOTGRANTED;
844			STAT(region->stat.st_lock_nowait++);
845			goto err;
846		}
847		if ((lp =
848		    SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock)) == NULL) {
849			LOCK_DD(env, region);
850			SH_TAILQ_INSERT_HEAD(&region->dd_objs,
851				    sh_obj, dd_links, __db_lockobj);
852			UNLOCK_DD(env, region);
853		}
854		switch (action) {
855		case HEAD:
856			SH_TAILQ_INSERT_HEAD(
857			     &sh_obj->waiters, newl, links, __db_lock);
858			break;
859		case SECOND:
860			SH_TAILQ_INSERT_AFTER(
861			     &sh_obj->waiters, lp, newl, links, __db_lock);
862			break;
863		case TAIL:
864			SH_TAILQ_INSERT_TAIL(&sh_obj->waiters, newl, links);
865			break;
866		default:
867			DB_ASSERT(env, 0);
868		}
869
870		/*
871		 * First check to see if this txn has expired.
872		 * If not then see if the lock timeout is past
873		 * the expiration of the txn, if it is, use
874		 * the txn expiration time.  lk_expire is passed
875		 * to avoid an extra call to get the time.
876		 */
877		if (__lock_expired(env,
878		    &sh_locker->lk_expire, &sh_locker->tx_expire)) {
879			newl->status = DB_LSTAT_EXPIRED;
880			sh_locker->lk_expire = sh_locker->tx_expire;
881
882			/* We are done. */
883			goto expired;
884		}
885
886		/*
887		 * If a timeout was specified in this call then it
888		 * takes priority.  If a lock timeout has been specified
889		 * for this transaction then use that, otherwise use
890		 * the global timeout value.
891		 */
892		if (!LF_ISSET(DB_LOCK_SET_TIMEOUT)) {
893			if (F_ISSET(sh_locker, DB_LOCKER_TIMEOUT))
894				timeout = sh_locker->lk_timeout;
895			else
896				timeout = region->lk_timeout;
897		}
898		if (timeout != 0)
899			__lock_expires(env, &sh_locker->lk_expire, timeout);
900		else
901			timespecclear(&sh_locker->lk_expire);
902
903		if (timespecisset(&sh_locker->tx_expire) &&
904			(timeout == 0 || __lock_expired(env,
905			    &sh_locker->lk_expire, &sh_locker->tx_expire)))
906				sh_locker->lk_expire = sh_locker->tx_expire;
907		if (timespecisset(&sh_locker->lk_expire) &&
908		    (!timespecisset(&region->next_timeout) ||
909		    timespeccmp(
910		    &region->next_timeout, &sh_locker->lk_expire, >)))
911			region->next_timeout = sh_locker->lk_expire;
912
913		newl->status = DB_LSTAT_WAITING;
914		STAT(lt->obj_stat[ndx].st_lock_wait++);
915		/* We are about to block, deadlock detector must run. */
916		region->need_dd = 1;
917
918		OBJECT_UNLOCK(lt, region, sh_obj->indx);
919
920		/* If we are switching drop the lock we had. */
921		if (LF_ISSET(DB_LOCK_SWITCH) &&
922		    (ret = __lock_put_nolock(env,
923		    lock, &ihold, DB_LOCK_NOWAITERS)) != 0) {
924			OBJECT_LOCK_NDX(lt, region, sh_obj->indx);
925			(void)__lock_remove_waiter(
926			    lt, sh_obj, newl, DB_LSTAT_FREE);
927			goto err;
928		}
929
930		LOCK_SYSTEM_UNLOCK(lt, region);
931
932		/*
933		 * Before waiting, see if the deadlock detector should run.
934		 */
935		if (region->detect != DB_LOCK_NORUN && !no_dd)
936			(void)__lock_detect(env, region->detect, &did_abort);
937
938		ip = NULL;
939		if (env->thr_hashtab != NULL &&
940		     (ret = __env_set_state(env, &ip, THREAD_BLOCKED)) != 0) {
941			LOCK_SYSTEM_LOCK(lt, region);
942			OBJECT_LOCK_NDX(lt, region, ndx);
943			goto err;
944		}
945
946		MUTEX_LOCK(env, newl->mtx_lock);
947		if (ip != NULL)
948			ip->dbth_state = THREAD_ACTIVE;
949
950		LOCK_SYSTEM_LOCK(lt, region);
951		OBJECT_LOCK_NDX(lt, region, ndx);
952
953		/* Turn off lock timeout. */
954		if (newl->status != DB_LSTAT_EXPIRED)
955			timespecclear(&sh_locker->lk_expire);
956
957		switch (newl->status) {
958		case DB_LSTAT_ABORTED:
959			ret = DB_LOCK_DEADLOCK;
960			goto err;
961		case DB_LSTAT_EXPIRED:
962expired:		ret = __lock_put_internal(lt, newl,
963			    ndx, DB_LOCK_UNLINK | DB_LOCK_FREE);
964			newl = NULL;
965			if (ret != 0)
966				goto err;
967#ifdef HAVE_STATISTICS
968			if (timespeccmp(
969			    &sh_locker->lk_expire, &sh_locker->tx_expire, ==))
970				lt->obj_stat[ndx].st_ntxntimeouts++;
971			else
972				lt->obj_stat[ndx].st_nlocktimeouts++;
973#endif
974			ret = DB_LOCK_NOTGRANTED;
975			timespecclear(&sh_locker->lk_expire);
976			goto err;
977		case DB_LSTAT_PENDING:
978			if (LF_ISSET(DB_LOCK_UPGRADE)) {
979				/*
980				 * The lock just granted got put on the holders
981				 * list.  Since we're upgrading some other lock,
982				 * we've got to remove it here.
983				 */
984				SH_TAILQ_REMOVE(
985				    &sh_obj->holders, newl, links, __db_lock);
986				/*
987				 * Ensure the object is not believed to be on
988				 * the object's lists, if we're traversing by
989				 * locker.
990				 */
991				newl->links.stqe_prev = -1;
992				goto upgrade;
993			} else
994				newl->status = DB_LSTAT_HELD;
995			break;
996		case DB_LSTAT_FREE:
997		case DB_LSTAT_HELD:
998		case DB_LSTAT_WAITING:
999		default:
1000			__db_errx(env,
1001			    "Unexpected lock status: %d", (int)newl->status);
1002			ret = __env_panic(env, EINVAL);
1003			goto err;
1004		}
1005	}
1006
1007	lock->off = R_OFFSET(&lt->reginfo, newl);
1008	lock->gen = newl->gen;
1009	lock->mode = newl->mode;
1010	sh_locker->nlocks++;
1011	if (IS_WRITELOCK(newl->mode)) {
1012		sh_locker->nwrites++;
1013		if (newl->mode == DB_LOCK_WWRITE)
1014			F_SET(sh_locker, DB_LOCKER_DIRTY);
1015	}
1016
1017	OBJECT_UNLOCK(lt, region, ndx);
1018	return (0);
1019
1020err:	if (!LF_ISSET(DB_LOCK_UPGRADE | DB_LOCK_SWITCH))
1021		LOCK_INIT(*lock);
1022
1023done:	if (newl != NULL &&
1024	     (t_ret = __lock_freelock(lt, newl, sh_locker,
1025	     DB_LOCK_FREE | DB_LOCK_UNLINK)) != 0 && ret == 0)
1026		ret = t_ret;
1027	OBJECT_UNLOCK(lt, region, ndx);
1028
1029	return (ret);
1030}
1031
1032/*
1033 * __lock_put_pp --
1034 *	ENV->lock_put pre/post processing.
1035 *
1036 * PUBLIC: int  __lock_put_pp __P((DB_ENV *, DB_LOCK *));
1037 */
1038int
1039__lock_put_pp(dbenv, lock)
1040	DB_ENV *dbenv;
1041	DB_LOCK *lock;
1042{
1043	DB_THREAD_INFO *ip;
1044	ENV *env;
1045	int ret;
1046
1047	env = dbenv->env;
1048
1049	ENV_REQUIRES_CONFIG(env,
1050	    env->lk_handle, "DB_LOCK->lock_put", DB_INIT_LOCK);
1051
1052	ENV_ENTER(env, ip);
1053	REPLICATION_WRAP(env, (__lock_put(env, lock)), 0, ret);
1054	ENV_LEAVE(env, ip);
1055	return (ret);
1056}
1057
1058/*
1059 * __lock_put --
1060 *
1061 * PUBLIC: int  __lock_put __P((ENV *, DB_LOCK *));
1062 *  Internal lock_put interface.
1063 */
1064int
1065__lock_put(env, lock)
1066	ENV *env;
1067	DB_LOCK *lock;
1068{
1069	DB_LOCKTAB *lt;
1070	int ret, run_dd;
1071
1072	if (IS_RECOVERING(env))
1073		return (0);
1074
1075	lt = env->lk_handle;
1076
1077	LOCK_SYSTEM_LOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
1078	ret = __lock_put_nolock(env, lock, &run_dd, 0);
1079	LOCK_SYSTEM_UNLOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
1080
1081	/*
1082	 * Only run the lock detector if put told us to AND we are running
1083	 * in auto-detect mode.  If we are not running in auto-detect, then
1084	 * a call to lock_detect here will 0 the need_dd bit, but will not
1085	 * actually abort anything.
1086	 */
1087	if (ret == 0 && run_dd)
1088		(void)__lock_detect(env,
1089		    ((DB_LOCKREGION *)lt->reginfo.primary)->detect, NULL);
1090	return (ret);
1091}
1092
1093static int
1094__lock_put_nolock(env, lock, runp, flags)
1095	ENV *env;
1096	DB_LOCK *lock;
1097	int *runp;
1098	u_int32_t flags;
1099{
1100	struct __db_lock *lockp;
1101	DB_LOCKREGION *region;
1102	DB_LOCKTAB *lt;
1103	int ret;
1104
1105	/* Check if locks have been globally turned off. */
1106	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1107		return (0);
1108
1109	lt = env->lk_handle;
1110	region = lt->reginfo.primary;
1111
1112	lockp = R_ADDR(&lt->reginfo, lock->off);
1113	if (lock->gen != lockp->gen) {
1114		__db_errx(env, __db_lock_invalid, "DB_LOCK->lock_put");
1115		LOCK_INIT(*lock);
1116		return (EINVAL);
1117	}
1118
1119	OBJECT_LOCK_NDX(lt, region, lock->ndx);
1120	ret = __lock_put_internal(lt,
1121	    lockp, lock->ndx, flags | DB_LOCK_UNLINK | DB_LOCK_FREE);
1122	OBJECT_UNLOCK(lt, region, lock->ndx);
1123
1124	LOCK_INIT(*lock);
1125
1126	*runp = 0;
1127	if (ret == 0 && region->detect != DB_LOCK_NORUN &&
1128	     (region->need_dd || timespecisset(&region->next_timeout)))
1129		*runp = 1;
1130
1131	return (ret);
1132}
1133
1134/*
1135 * __lock_downgrade --
1136 *
1137 * Used to downgrade locks.  Currently this is used in three places: 1) by the
1138 * Concurrent Data Store product to downgrade write locks back to iwrite locks
1139 * and 2) to downgrade write-handle locks to read-handle locks at the end of
1140 * an open/create. 3) To downgrade write locks to was_write to support dirty
1141 * reads.
1142 *
1143 * PUBLIC: int __lock_downgrade __P((ENV *,
1144 * PUBLIC:     DB_LOCK *, db_lockmode_t, u_int32_t));
1145 */
1146int
1147__lock_downgrade(env, lock, new_mode, flags)
1148	ENV *env;
1149	DB_LOCK *lock;
1150	db_lockmode_t new_mode;
1151	u_int32_t flags;
1152{
1153	struct __db_lock *lockp;
1154	DB_LOCKER *sh_locker;
1155	DB_LOCKOBJ *obj;
1156	DB_LOCKREGION *region;
1157	DB_LOCKTAB *lt;
1158	int ret;
1159
1160	ret = 0;
1161
1162	/* Check if locks have been globally turned off. */
1163	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1164		return (0);
1165
1166	lt = env->lk_handle;
1167	region = lt->reginfo.primary;
1168
1169	LOCK_SYSTEM_LOCK(lt, region);
1170
1171	lockp = R_ADDR(&lt->reginfo, lock->off);
1172	if (lock->gen != lockp->gen) {
1173		__db_errx(env, __db_lock_invalid, "lock_downgrade");
1174		ret = EINVAL;
1175		goto out;
1176	}
1177
1178	sh_locker = R_ADDR(&lt->reginfo, lockp->holder);
1179
1180	if (IS_WRITELOCK(lockp->mode) && !IS_WRITELOCK(new_mode))
1181		sh_locker->nwrites--;
1182
1183	lockp->mode = new_mode;
1184	lock->mode = new_mode;
1185
1186	/* Get the object associated with this lock. */
1187	obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
1188	OBJECT_LOCK_NDX(lt, region, obj->indx);
1189	STAT(lt->obj_stat[obj->indx].st_ndowngrade++);
1190	ret = __lock_promote(lt, obj, NULL, LF_ISSET(DB_LOCK_NOWAITERS));
1191	OBJECT_UNLOCK(lt, region, obj->indx);
1192
1193out:	LOCK_SYSTEM_UNLOCK(lt, region);
1194	return (ret);
1195}
1196
1197/*
1198 * __lock_put_internal -- put a lock structure
1199 * We assume that we are called with the proper object locked.
1200 */
1201static int
1202__lock_put_internal(lt, lockp, obj_ndx, flags)
1203	DB_LOCKTAB *lt;
1204	struct __db_lock *lockp;
1205	u_int32_t obj_ndx, flags;
1206{
1207	DB_LOCKOBJ *sh_obj;
1208	DB_LOCKREGION *region;
1209	ENV *env;
1210	u_int32_t part_id;
1211	int ret, state_changed;
1212
1213	COMPQUIET(env, NULL);
1214	env = lt->env;
1215	region = lt->reginfo.primary;
1216	ret = state_changed = 0;
1217
1218	if (!OBJ_LINKS_VALID(lockp)) {
1219		/*
1220		 * Someone removed this lock while we were doing a release
1221		 * by locker id.  We are trying to free this lock, but it's
1222		 * already been done; all we need to do is return it to the
1223		 * free list.
1224		 */
1225		(void)__lock_freelock(lt, lockp, NULL, DB_LOCK_FREE);
1226		return (0);
1227	}
1228
1229#ifdef HAVE_STATISTICS
1230	if (LF_ISSET(DB_LOCK_DOALL))
1231		lt->obj_stat[obj_ndx].st_nreleases += lockp->refcount;
1232	else
1233		lt->obj_stat[obj_ndx].st_nreleases++;
1234#endif
1235
1236	if (!LF_ISSET(DB_LOCK_DOALL) && lockp->refcount > 1) {
1237		lockp->refcount--;
1238		return (0);
1239	}
1240
1241	/* Increment generation number. */
1242	lockp->gen++;
1243
1244	/* Get the object associated with this lock. */
1245	sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
1246
1247	/*
1248	 * Remove this lock from its holders/waitlist.  Set its status
1249	 * to ABORTED.  It may get freed below, but if not then the
1250	 * waiter has been aborted (it will panic if the lock is
1251	 * free).
1252	 */
1253	if (lockp->status != DB_LSTAT_HELD &&
1254	    lockp->status != DB_LSTAT_PENDING) {
1255		if ((ret = __lock_remove_waiter(
1256		    lt, sh_obj, lockp, DB_LSTAT_ABORTED)) != 0)
1257			return (ret);
1258	} else {
1259		SH_TAILQ_REMOVE(&sh_obj->holders, lockp, links, __db_lock);
1260		lockp->links.stqe_prev = -1;
1261	}
1262
1263	if (LF_ISSET(DB_LOCK_NOPROMOTE))
1264		state_changed = 0;
1265	else
1266		if ((ret = __lock_promote(lt, sh_obj, &state_changed,
1267		    LF_ISSET(DB_LOCK_NOWAITERS))) != 0)
1268			return (ret);
1269
1270	/* Check if object should be reclaimed. */
1271	if (SH_TAILQ_FIRST(&sh_obj->holders, __db_lock) == NULL &&
1272	    SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL) {
1273		part_id = LOCK_PART(region, obj_ndx);
1274		SH_TAILQ_REMOVE(
1275		    &lt->obj_tab[obj_ndx], sh_obj, links, __db_lockobj);
1276		if (sh_obj->lockobj.size > sizeof(sh_obj->objdata)) {
1277			LOCK_REGION_LOCK(env);
1278			__env_alloc_free(&lt->reginfo,
1279			    SH_DBT_PTR(&sh_obj->lockobj));
1280			LOCK_REGION_UNLOCK(env);
1281		}
1282		SH_TAILQ_INSERT_HEAD(
1283		    &FREE_OBJS(lt, part_id), sh_obj, links, __db_lockobj);
1284		sh_obj->generation++;
1285		STAT(lt->part_array[part_id].part_stat.st_nobjects--);
1286		STAT(lt->obj_stat[obj_ndx].st_nobjects--);
1287		state_changed = 1;
1288	}
1289
1290	/* Free lock. */
1291	if (LF_ISSET(DB_LOCK_UNLINK | DB_LOCK_FREE))
1292		ret = __lock_freelock(lt, lockp,
1293		     R_ADDR(&lt->reginfo, lockp->holder), flags);
1294
1295	/*
1296	 * If we did not promote anyone; we need to run the deadlock
1297	 * detector again.
1298	 */
1299	if (state_changed == 0)
1300		region->need_dd = 1;
1301
1302	return (ret);
1303}
1304
1305/*
1306 * __lock_freelock --
1307 *	Free a lock.  Unlink it from its locker if necessary.
1308 * We must hold the object lock.
1309 *
1310 */
1311static int
1312__lock_freelock(lt, lockp, sh_locker, flags)
1313	DB_LOCKTAB *lt;
1314	struct __db_lock *lockp;
1315	DB_LOCKER *sh_locker;
1316	u_int32_t flags;
1317{
1318	DB_LOCKREGION *region;
1319	ENV *env;
1320	u_int32_t part_id;
1321	int ret;
1322
1323	env = lt->env;
1324	region = lt->reginfo.primary;
1325
1326	if (LF_ISSET(DB_LOCK_UNLINK)) {
1327
1328		SH_LIST_REMOVE(lockp, locker_links, __db_lock);
1329		if (lockp->status == DB_LSTAT_HELD) {
1330			sh_locker->nlocks--;
1331			if (IS_WRITELOCK(lockp->mode))
1332				sh_locker->nwrites--;
1333		}
1334	}
1335
1336	if (LF_ISSET(DB_LOCK_FREE)) {
1337		/*
1338		 * If the lock is not held we cannot be sure of its mutex
1339		 * state so we just destroy it and let it be re-created
1340		 * when needed.
1341		 */
1342		part_id = LOCK_PART(region, lockp->indx);
1343		if (lockp->mtx_lock != MUTEX_INVALID &&
1344		     lockp->status != DB_LSTAT_HELD &&
1345		     lockp->status != DB_LSTAT_EXPIRED &&
1346		     (ret = __mutex_free(env, &lockp->mtx_lock)) != 0)
1347			return (ret);
1348		lockp->status = DB_LSTAT_FREE;
1349		SH_TAILQ_INSERT_HEAD(&FREE_LOCKS(lt, part_id),
1350		     lockp, links, __db_lock);
1351		STAT(lt->part_array[part_id].part_stat.st_nlocks--);
1352		STAT(lt->obj_stat[lockp->indx].st_nlocks--);
1353	}
1354
1355	return (0);
1356}
1357
1358/*
1359 * __lock_allocobj -- allocate a object from another partition.
1360 *	We assume we have the partition locked on entry and leave
1361 * with the same partition locked on exit.
1362 */
1363static int
1364__lock_allocobj(lt, part_id)
1365	DB_LOCKTAB *lt;
1366	u_int32_t part_id;
1367{
1368	DB_LOCKOBJ *sh_obj;
1369	DB_LOCKPART *end_p, *cur_p;
1370	DB_LOCKREGION *region;
1371	int begin;
1372
1373	region = lt->reginfo.primary;
1374
1375	if (region->part_t_size == 1)
1376		goto err;
1377
1378	begin = 0;
1379	sh_obj = NULL;
1380	cur_p = &lt->part_array[part_id];
1381	MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
1382	end_p = &lt->part_array[region->part_t_size];
1383	/*
1384	 * Start looking at the next partition and wrap around.  If
1385	 * we get back to our partition then raise an error.
1386	 */
1387again:	for (cur_p++; sh_obj == NULL && cur_p < end_p; cur_p++) {
1388		MUTEX_LOCK(lt->env, cur_p->mtx_part);
1389		if ((sh_obj =
1390		    SH_TAILQ_FIRST(&cur_p->free_objs, __db_lockobj)) != NULL)
1391			SH_TAILQ_REMOVE(&cur_p->free_objs,
1392			    sh_obj, links, __db_lockobj);
1393		MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
1394	}
1395	if (sh_obj != NULL) {
1396		cur_p = &lt->part_array[part_id];
1397		MUTEX_LOCK(lt->env, cur_p->mtx_part);
1398		SH_TAILQ_INSERT_HEAD(&cur_p->free_objs,
1399		    sh_obj, links, __db_lockobj);
1400		STAT(cur_p->part_stat.st_objectsteals++);
1401		return (0);
1402	}
1403	if (!begin) {
1404		begin = 1;
1405		cur_p = lt->part_array;
1406		end_p = &lt->part_array[part_id];
1407		goto again;
1408	}
1409	MUTEX_LOCK(lt->env, cur_p->mtx_part);
1410
1411err:	return (__lock_nomem(lt->env, "object entries"));
1412}
1413/*
1414 * __lock_getobj --
1415 *	Get an object in the object hash table.  The create parameter
1416 * indicates if the object should be created if it doesn't exist in
1417 * the table.
1418 *
1419 * This must be called with the object bucket locked.
1420 */
1421static int
1422__lock_getobj(lt, obj, ndx, create, retp)
1423	DB_LOCKTAB *lt;
1424	const DBT *obj;
1425	u_int32_t ndx;
1426	int create;
1427	DB_LOCKOBJ **retp;
1428{
1429	DB_LOCKOBJ *sh_obj;
1430	DB_LOCKREGION *region;
1431	ENV *env;
1432	int ret;
1433	void *p;
1434	u_int32_t len, part_id;
1435
1436	env = lt->env;
1437	region = lt->reginfo.primary;
1438	len = 0;
1439
1440	/* Look up the object in the hash table. */
1441retry:	SH_TAILQ_FOREACH(sh_obj, &lt->obj_tab[ndx], links, __db_lockobj) {
1442		len++;
1443		if (obj->size == sh_obj->lockobj.size &&
1444		    memcmp(obj->data,
1445		    SH_DBT_PTR(&sh_obj->lockobj), obj->size) == 0)
1446			break;
1447	}
1448
1449	/*
1450	 * If we found the object, then we can just return it.  If
1451	 * we didn't find the object, then we need to create it.
1452	 */
1453	if (sh_obj == NULL && create) {
1454		/* Create new object and then insert it into hash table. */
1455		part_id = LOCK_PART(region, ndx);
1456		if ((sh_obj = SH_TAILQ_FIRST(&FREE_OBJS(
1457		    lt, part_id), __db_lockobj)) == NULL) {
1458			if ((ret = __lock_allocobj(lt, part_id)) == 0)
1459				goto retry;
1460			goto err;
1461		}
1462
1463		/*
1464		 * If we can fit this object in the structure, do so instead
1465		 * of alloc-ing space for it.
1466		 */
1467		if (obj->size <= sizeof(sh_obj->objdata))
1468			p = sh_obj->objdata;
1469		else {
1470			LOCK_REGION_LOCK(env);
1471			if ((ret =
1472			    __env_alloc(&lt->reginfo, obj->size, &p)) != 0) {
1473				__db_errx(env,
1474				    "No space for lock object storage");
1475				LOCK_REGION_UNLOCK(env);
1476				goto err;
1477			}
1478			LOCK_REGION_UNLOCK(env);
1479		}
1480
1481		memcpy(p, obj->data, obj->size);
1482
1483		SH_TAILQ_REMOVE(&FREE_OBJS(
1484		    lt, part_id), sh_obj, links, __db_lockobj);
1485#ifdef HAVE_STATISTICS
1486		/*
1487		 * Keep track of both the max number of objects allocated
1488		 * per partition and the max number of objects used by
1489		 * this bucket.
1490		 */
1491		len++;
1492		if (++lt->obj_stat[ndx].st_nobjects >
1493		    lt->obj_stat[ndx].st_maxnobjects)
1494			lt->obj_stat[ndx].st_maxnobjects =
1495			    lt->obj_stat[ndx].st_nobjects;
1496		if (++lt->part_array[part_id].part_stat.st_nobjects >
1497		    lt->part_array[part_id].part_stat.st_maxnobjects)
1498			lt->part_array[part_id].part_stat.st_maxnobjects =
1499			    lt->part_array[part_id].part_stat.st_nobjects;
1500#endif
1501
1502		sh_obj->indx = ndx;
1503		SH_TAILQ_INIT(&sh_obj->waiters);
1504		SH_TAILQ_INIT(&sh_obj->holders);
1505		sh_obj->lockobj.size = obj->size;
1506		sh_obj->lockobj.off =
1507		    (roff_t)SH_PTR_TO_OFF(&sh_obj->lockobj, p);
1508		SH_TAILQ_INSERT_HEAD(
1509		    &lt->obj_tab[ndx], sh_obj, links, __db_lockobj);
1510	}
1511
1512#ifdef HAVE_STATISTICS
1513	if (len > lt->obj_stat[ndx].st_hash_len)
1514		lt->obj_stat[ndx].st_hash_len = len;
1515#endif
1516
1517#ifdef HAVE_STATISTICS
1518	if (len > lt->obj_stat[ndx].st_hash_len)
1519		lt->obj_stat[ndx].st_hash_len = len;
1520#endif
1521
1522	*retp = sh_obj;
1523	return (0);
1524
1525err:	return (ret);
1526}
1527
1528/*
1529 * __lock_is_parent --
1530 *	Given a locker and a transaction, return 1 if the locker is
1531 * an ancestor of the designated transaction.  This is used to determine
1532 * if we should grant locks that appear to conflict, but don't because
1533 * the lock is already held by an ancestor.
1534 */
1535static int
1536__lock_is_parent(lt, l_off, sh_locker)
1537	DB_LOCKTAB *lt;
1538	roff_t l_off;
1539	DB_LOCKER *sh_locker;
1540{
1541	DB_LOCKER *parent;
1542
1543	parent = sh_locker;
1544	while (parent->parent_locker != INVALID_ROFF) {
1545		if (parent->parent_locker == l_off)
1546			return (1);
1547		parent = R_ADDR(&lt->reginfo, parent->parent_locker);
1548	}
1549
1550	return (0);
1551}
1552
1553/*
1554 * __lock_locker_is_parent --
1555 *	Determine if "locker" is an ancestor of "child".
1556 * *retp == 1 if so, 0 otherwise.
1557 *
1558 * PUBLIC: int __lock_locker_is_parent
1559 * PUBLIC:     __P((ENV *, DB_LOCKER *, DB_LOCKER *, int *));
1560 */
1561int
1562__lock_locker_is_parent(env, locker, child, retp)
1563	ENV *env;
1564	DB_LOCKER *locker;
1565	DB_LOCKER *child;
1566	int *retp;
1567{
1568	DB_LOCKTAB *lt;
1569
1570	lt = env->lk_handle;
1571
1572	/*
1573	 * The locker may not exist for this transaction, if not then it has
1574	 * no parents.
1575	 */
1576	if (locker == NULL)
1577		*retp = 0;
1578	else
1579		*retp = __lock_is_parent(lt,
1580		     R_OFFSET(&lt->reginfo, locker), child);
1581	return (0);
1582}
1583
1584/*
1585 * __lock_inherit_locks --
1586 *	Called on child commit to merge child's locks with parent's.
1587 */
1588static int
1589__lock_inherit_locks(lt, sh_locker, flags)
1590	DB_LOCKTAB *lt;
1591	DB_LOCKER *sh_locker;
1592	u_int32_t flags;
1593{
1594	DB_LOCKER *sh_parent;
1595	DB_LOCKOBJ *obj;
1596	DB_LOCKREGION *region;
1597	ENV *env;
1598	int ret;
1599	struct __db_lock *hlp, *lp;
1600	roff_t poff;
1601
1602	env = lt->env;
1603	region = lt->reginfo.primary;
1604
1605	/*
1606	 * Get the committing locker and mark it as deleted.
1607	 * This allows us to traverse the locker links without
1608	 * worrying that someone else is deleting locks out
1609	 * from under us.  However, if the locker doesn't
1610	 * exist, that just means that the child holds no
1611	 * locks, so inheritance is easy!
1612	 */
1613	if (sh_locker == NULL) {
1614		__db_errx(env, __db_locker_invalid);
1615		return (EINVAL);
1616	}
1617
1618	/* Make sure we are a child transaction. */
1619	if (sh_locker->parent_locker == INVALID_ROFF) {
1620		__db_errx(env, "Not a child transaction");
1621		return (EINVAL);
1622	}
1623	sh_parent = R_ADDR(&lt->reginfo, sh_locker->parent_locker);
1624
1625	/*
1626	 * In order to make it possible for a parent to have
1627	 * many, many children who lock the same objects, and
1628	 * not require an inordinate number of locks, we try
1629	 * to merge the child's locks with its parent's.
1630	 */
1631	poff = R_OFFSET(&lt->reginfo, sh_parent);
1632	for (lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock);
1633	    lp != NULL;
1634	    lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock)) {
1635		SH_LIST_REMOVE(lp, locker_links, __db_lock);
1636
1637		/* See if the parent already has a lock. */
1638		obj = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
1639		OBJECT_LOCK_NDX(lt, region, obj->indx);
1640		SH_TAILQ_FOREACH(hlp, &obj->holders, links, __db_lock)
1641			if (hlp->holder == poff && lp->mode == hlp->mode)
1642				break;
1643
1644		if (hlp != NULL) {
1645			/* Parent already holds lock. */
1646			hlp->refcount += lp->refcount;
1647
1648			/* Remove lock from object list and free it. */
1649			DB_ASSERT(env, lp->status == DB_LSTAT_HELD);
1650			SH_TAILQ_REMOVE(&obj->holders, lp, links, __db_lock);
1651			(void)__lock_freelock(lt, lp, sh_locker, DB_LOCK_FREE);
1652		} else {
1653			/* Just move lock to parent chains. */
1654			SH_LIST_INSERT_HEAD(&sh_parent->heldby,
1655			    lp, locker_links, __db_lock);
1656			lp->holder = poff;
1657		}
1658
1659		/*
1660		 * We may need to promote regardless of whether we simply
1661		 * moved the lock to the parent or changed the parent's
1662		 * reference count, because there might be a sibling waiting,
1663		 * who will now be allowed to make forward progress.
1664		 */
1665		ret =
1666		     __lock_promote(lt, obj, NULL, LF_ISSET(DB_LOCK_NOWAITERS));
1667		OBJECT_UNLOCK(lt, region, obj->indx);
1668		if (ret != 0)
1669			return (ret);
1670	}
1671
1672	/* Transfer child counts to parent. */
1673	sh_parent->nlocks += sh_locker->nlocks;
1674	sh_parent->nwrites += sh_locker->nwrites;
1675
1676	return (0);
1677}
1678
1679/*
1680 * __lock_promote --
1681 *
1682 * Look through the waiters and holders lists and decide which (if any)
1683 * locks can be promoted.   Promote any that are eligible.
1684 *
1685 * PUBLIC: int __lock_promote
1686 * PUBLIC:    __P((DB_LOCKTAB *, DB_LOCKOBJ *, int *, u_int32_t));
1687 */
1688int
1689__lock_promote(lt, obj, state_changedp, flags)
1690	DB_LOCKTAB *lt;
1691	DB_LOCKOBJ *obj;
1692	int *state_changedp;
1693	u_int32_t flags;
1694{
1695	struct __db_lock *lp_w, *lp_h, *next_waiter;
1696	DB_LOCKREGION *region;
1697	int had_waiters, state_changed;
1698
1699	region = lt->reginfo.primary;
1700	had_waiters = 0;
1701
1702	/*
1703	 * We need to do lock promotion.  We also need to determine if we're
1704	 * going to need to run the deadlock detector again.  If we release
1705	 * locks, and there are waiters, but no one gets promoted, then we
1706	 * haven't fundamentally changed the lockmgr state, so we may still
1707	 * have a deadlock and we have to run again.  However, if there were
1708	 * no waiters, or we actually promoted someone, then we are OK and we
1709	 * don't have to run it immediately.
1710	 *
1711	 * During promotion, we look for state changes so we can return this
1712	 * information to the caller.
1713	 */
1714
1715	for (lp_w = SH_TAILQ_FIRST(&obj->waiters, __db_lock),
1716	    state_changed = lp_w == NULL;
1717	    lp_w != NULL;
1718	    lp_w = next_waiter) {
1719		had_waiters = 1;
1720		next_waiter = SH_TAILQ_NEXT(lp_w, links, __db_lock);
1721
1722		/* Waiter may have aborted or expired. */
1723		if (lp_w->status != DB_LSTAT_WAITING)
1724			continue;
1725		/* Are we switching locks? */
1726		if (LF_ISSET(DB_LOCK_NOWAITERS) && lp_w->mode == DB_LOCK_WAIT)
1727			continue;
1728
1729		SH_TAILQ_FOREACH(lp_h, &obj->holders, links, __db_lock) {
1730			if (lp_h->holder != lp_w->holder &&
1731			    CONFLICTS(lt, region, lp_h->mode, lp_w->mode)) {
1732				if (!__lock_is_parent(lt, lp_h->holder,
1733				     R_ADDR(&lt->reginfo, lp_w->holder)))
1734					break;
1735			}
1736		}
1737		if (lp_h != NULL)	/* Found a conflict. */
1738			break;
1739
1740		/* No conflict, promote the waiting lock. */
1741		SH_TAILQ_REMOVE(&obj->waiters, lp_w, links, __db_lock);
1742		lp_w->status = DB_LSTAT_PENDING;
1743		SH_TAILQ_INSERT_TAIL(&obj->holders, lp_w, links);
1744
1745		/* Wake up waiter. */
1746		MUTEX_UNLOCK(lt->env, lp_w->mtx_lock);
1747		state_changed = 1;
1748	}
1749
1750	/*
1751	 * If this object had waiters and doesn't any more, then we need
1752	 * to remove it from the dd_obj list.
1753	 */
1754	if (had_waiters && SH_TAILQ_FIRST(&obj->waiters, __db_lock) == NULL) {
1755		LOCK_DD(lt->env, region);
1756		/*
1757		 * Bump the generation when removing an object from the
1758		 * queue so that the deadlock detector will retry.
1759		 */
1760		obj->generation++;
1761		SH_TAILQ_REMOVE(&region->dd_objs, obj, dd_links, __db_lockobj);
1762		UNLOCK_DD(lt->env, region);
1763	}
1764
1765	if (state_changedp != NULL)
1766		*state_changedp = state_changed;
1767
1768	return (0);
1769}
1770
1771/*
1772 * __lock_remove_waiter --
1773 *	Any lock on the waitlist has a process waiting for it.  Therefore,
1774 * we can't return the lock to the freelist immediately.  Instead, we can
1775 * remove the lock from the list of waiters, set the status field of the
1776 * lock, and then let the process waking up return the lock to the
1777 * free list.
1778 *
1779 * This must be called with the Object bucket locked.
1780 */
1781static int
1782__lock_remove_waiter(lt, sh_obj, lockp, status)
1783	DB_LOCKTAB *lt;
1784	DB_LOCKOBJ *sh_obj;
1785	struct __db_lock *lockp;
1786	db_status_t status;
1787{
1788	DB_LOCKREGION *region;
1789	int do_wakeup;
1790
1791	region = lt->reginfo.primary;
1792
1793	do_wakeup = lockp->status == DB_LSTAT_WAITING;
1794
1795	SH_TAILQ_REMOVE(&sh_obj->waiters, lockp, links, __db_lock);
1796	lockp->links.stqe_prev = -1;
1797	lockp->status = status;
1798	if (SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL) {
1799		LOCK_DD(lt->env, region);
1800		sh_obj->generation++;
1801		SH_TAILQ_REMOVE(
1802		    &region->dd_objs,
1803		    sh_obj, dd_links, __db_lockobj);
1804		UNLOCK_DD(lt->env, region);
1805	}
1806
1807	/*
1808	 * Wake whoever is waiting on this lock.
1809	 */
1810	if (do_wakeup)
1811		MUTEX_UNLOCK(lt->env, lockp->mtx_lock);
1812
1813	return (0);
1814}
1815
1816/*
1817 * __lock_trade --
1818 *
1819 * Trade locker ids on a lock.  This is used to reassign file locks from
1820 * a transactional locker id to a long-lived locker id.  This should be
1821 * called with the region mutex held.
1822 */
1823static int
1824__lock_trade(env, lock, new_locker)
1825	ENV *env;
1826	DB_LOCK *lock;
1827	DB_LOCKER *new_locker;
1828{
1829	struct __db_lock *lp;
1830	DB_LOCKTAB *lt;
1831	int ret;
1832
1833	lt = env->lk_handle;
1834	lp = R_ADDR(&lt->reginfo, lock->off);
1835
1836	/* If the lock is already released, simply return. */
1837	if (lp->gen != lock->gen)
1838		return (DB_NOTFOUND);
1839
1840	if (new_locker == NULL) {
1841		__db_errx(env, "Locker does not exist");
1842		return (EINVAL);
1843	}
1844
1845	/* Remove the lock from its current locker. */
1846	if ((ret = __lock_freelock(lt,
1847	    lp, R_ADDR(&lt->reginfo, lp->holder), DB_LOCK_UNLINK)) != 0)
1848		return (ret);
1849
1850	/* Add lock to its new locker. */
1851	SH_LIST_INSERT_HEAD(&new_locker->heldby, lp, locker_links, __db_lock);
1852	new_locker->nlocks++;
1853	if (IS_WRITELOCK(lp->mode))
1854		new_locker->nwrites++;
1855	lp->holder = R_OFFSET(&lt->reginfo, new_locker);
1856
1857	return (0);
1858}
1859