1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/lock.h"
13#include "dbinc/log.h"
14
15static int __lock_allocobj __P((DB_LOCKTAB *, u_int32_t));
16static int __lock_alloclock __P((DB_LOCKTAB *, u_int32_t));
17static int  __lock_freelock __P((DB_LOCKTAB *,
18		struct __db_lock *, DB_LOCKER *, u_int32_t));
19static int  __lock_getobj
20		__P((DB_LOCKTAB *, const DBT *, u_int32_t, int, DB_LOCKOBJ **));
21static int __lock_get_api __P((ENV *,
22		u_int32_t, u_int32_t, const DBT *, db_lockmode_t, DB_LOCK *));
23static int  __lock_inherit_locks __P ((DB_LOCKTAB *, DB_LOCKER *, u_int32_t));
24static int  __lock_is_parent __P((DB_LOCKTAB *, roff_t, DB_LOCKER *));
25static int  __lock_put_internal __P((DB_LOCKTAB *,
26		struct __db_lock *, u_int32_t,  u_int32_t));
27static int  __lock_put_nolock __P((ENV *, DB_LOCK *, int *, u_int32_t));
28static int __lock_remove_waiter __P((DB_LOCKTAB *,
29		DB_LOCKOBJ *, struct __db_lock *, db_status_t));
30static int __lock_trade __P((ENV *, DB_LOCK *, DB_LOCKER *));
31static int __lock_vec_api __P((ENV *,
32		u_int32_t, u_int32_t,  DB_LOCKREQ *, int, DB_LOCKREQ **));
33
34static const char __db_lock_invalid[] = "%s: Lock is no longer valid";
35static const char __db_locker_invalid[] = "Locker is not valid";
36
37/*
38 * __lock_vec_pp --
39 *	ENV->lock_vec pre/post processing.
40 *
41 * PUBLIC: int __lock_vec_pp __P((DB_ENV *,
42 * PUBLIC:     u_int32_t, u_int32_t, DB_LOCKREQ *, int, DB_LOCKREQ **));
43 */
44int
45__lock_vec_pp(dbenv, lid, flags, list, nlist, elistp)
46	DB_ENV *dbenv;
47	u_int32_t lid, flags;
48	int nlist;
49	DB_LOCKREQ *list, **elistp;
50{
51	DB_THREAD_INFO *ip;
52	ENV *env;
53	int ret;
54
55	env = dbenv->env;
56
57	ENV_REQUIRES_CONFIG(env,
58	    env->lk_handle, "DB_ENV->lock_vec", DB_INIT_LOCK);
59
60	/* Validate arguments. */
61	if ((ret = __db_fchk(env,
62	     "DB_ENV->lock_vec", flags, DB_LOCK_NOWAIT)) != 0)
63		return (ret);
64
65	ENV_ENTER(env, ip);
66	REPLICATION_WRAP(env,
67	     (__lock_vec_api(env, lid, flags, list, nlist, elistp)), 0, ret);
68	ENV_LEAVE(env, ip);
69	return (ret);
70}
71
72static int
73__lock_vec_api(env, lid, flags, list, nlist, elistp)
74	ENV *env;
75	u_int32_t lid, flags;
76	int nlist;
77	DB_LOCKREQ *list, **elistp;
78{
79	DB_LOCKER *sh_locker;
80	int ret;
81
82	if ((ret =
83	    __lock_getlocker(env->lk_handle, lid, 0, &sh_locker)) == 0)
84		ret = __lock_vec(env, sh_locker, flags, list, nlist, elistp);
85	return (ret);
86}
87
88/*
89 * __lock_vec --
90 *	ENV->lock_vec.
91 *
92 *	Vector lock routine.  This function takes a set of operations
93 *	and performs them all at once.  In addition, lock_vec provides
94 *	functionality for lock inheritance, releasing all locks for a
95 *	given locker (used during transaction commit/abort), releasing
96 *	all locks on a given object, and generating debugging information.
97 *
98 * PUBLIC: int __lock_vec __P((ENV *,
99 * PUBLIC:     DB_LOCKER *, u_int32_t, DB_LOCKREQ *, int, DB_LOCKREQ **));
100 */
101int
102__lock_vec(env, sh_locker, flags, list, nlist, elistp)
103	ENV *env;
104	DB_LOCKER *sh_locker;
105	u_int32_t flags;
106	int nlist;
107	DB_LOCKREQ *list, **elistp;
108{
109	struct __db_lock *lp, *next_lock;
110	DB_LOCK lock; DB_LOCKOBJ *sh_obj;
111	DB_LOCKREGION *region;
112	DB_LOCKTAB *lt;
113	DBT *objlist, *np;
114	u_int32_t ndx;
115	int did_abort, i, ret, run_dd, upgrade, writes;
116
117	/* Check if locks have been globally turned off. */
118	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
119		return (0);
120
121	lt = env->lk_handle;
122	region = lt->reginfo.primary;
123
124	run_dd = 0;
125	LOCK_SYSTEM_LOCK(lt, region);
126	for (i = 0, ret = 0; i < nlist && ret == 0; i++)
127		switch (list[i].op) {
128		case DB_LOCK_GET_TIMEOUT:
129			LF_SET(DB_LOCK_SET_TIMEOUT);
130			/* FALLTHROUGH */
131		case DB_LOCK_GET:
132			if (IS_RECOVERING(env)) {
133				LOCK_INIT(list[i].lock);
134				break;
135			}
136			ret = __lock_get_internal(lt,
137			    sh_locker, flags, list[i].obj,
138			    list[i].mode, list[i].timeout, &list[i].lock);
139			break;
140		case DB_LOCK_INHERIT:
141			ret = __lock_inherit_locks(lt, sh_locker, flags);
142			break;
143		case DB_LOCK_PUT:
144			ret = __lock_put_nolock(env,
145			    &list[i].lock, &run_dd, flags);
146			break;
147		case DB_LOCK_PUT_ALL:		/* Put all locks. */
148		case DB_LOCK_PUT_READ:		/* Put read locks. */
149		case DB_LOCK_UPGRADE_WRITE:
150				/* Upgrade was_write and put read locks. */
151			/*
152			 * Since the locker may hold no
153			 * locks (i.e., you could call abort before you've
154			 * done any work), it's perfectly reasonable for there
155			 * to be no locker; this is not an error.
156			 */
157			if (sh_locker == NULL)
158				/*
159				 * If ret is set, then we'll generate an
160				 * error.  If it's not set, we have nothing
161				 * to do.
162				 */
163				break;
164			upgrade = 0;
165			writes = 1;
166			if (list[i].op == DB_LOCK_PUT_READ)
167				writes = 0;
168			else if (list[i].op == DB_LOCK_UPGRADE_WRITE) {
169				if (F_ISSET(sh_locker, DB_LOCKER_DIRTY))
170					upgrade = 1;
171				writes = 0;
172			}
173			objlist = list[i].obj;
174			if (objlist != NULL) {
175				/*
176				 * We know these should be ilocks,
177				 * but they could be something else,
178				 * so allocate room for the size too.
179				 */
180				objlist->size =
181				     sh_locker->nwrites * sizeof(DBT);
182				if ((ret = __os_malloc(env,
183				     objlist->size, &objlist->data)) != 0)
184					goto up_done;
185				memset(objlist->data, 0, objlist->size);
186				np = (DBT *) objlist->data;
187			} else
188				np = NULL;
189
190			/* Now traverse the locks, releasing each one. */
191			for (lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock);
192			    lp != NULL; lp = next_lock) {
193				sh_obj = (DB_LOCKOBJ *)
194				    ((u_int8_t *)lp + lp->obj);
195				next_lock = SH_LIST_NEXT(lp,
196				    locker_links, __db_lock);
197				if (writes == 1 ||
198				    lp->mode == DB_LOCK_READ ||
199				    lp->mode == DB_LOCK_READ_UNCOMMITTED) {
200					SH_LIST_REMOVE(lp,
201					    locker_links, __db_lock);
202					sh_obj = (DB_LOCKOBJ *)
203					    ((u_int8_t *)lp + lp->obj);
204					ndx = sh_obj->indx;
205					OBJECT_LOCK_NDX(lt, region, ndx);
206					/*
207					 * We are not letting lock_put_internal
208					 * unlink the lock, so we'll have to
209					 * update counts here.
210					 */
211					if (lp->status == DB_LSTAT_HELD) {
212						DB_ASSERT(env,
213						    sh_locker->nlocks != 0);
214						sh_locker->nlocks--;
215						if (IS_WRITELOCK(lp->mode))
216							sh_locker->nwrites--;
217					}
218					ret = __lock_put_internal(lt, lp,
219					    sh_obj->indx,
220					    DB_LOCK_FREE | DB_LOCK_DOALL);
221					OBJECT_UNLOCK(lt, region, ndx);
222					if (ret != 0)
223						break;
224					continue;
225				}
226				if (objlist != NULL) {
227					DB_ASSERT(env, (u_int8_t *)np <
228					     (u_int8_t *)objlist->data +
229					     objlist->size);
230					np->data = SH_DBT_PTR(&sh_obj->lockobj);
231					np->size = sh_obj->lockobj.size;
232					np++;
233				}
234			}
235			if (ret != 0)
236				goto up_done;
237
238			if (objlist != NULL)
239				if ((ret = __lock_fix_list(env,
240				     objlist, sh_locker->nwrites)) != 0)
241					goto up_done;
242			switch (list[i].op) {
243			case DB_LOCK_UPGRADE_WRITE:
244				/*
245				 * Upgrade all WWRITE locks to WRITE so
246				 * that we can abort a transaction which
247				 * was supporting dirty readers.
248				 */
249				if (upgrade != 1)
250					goto up_done;
251				SH_LIST_FOREACH(lp, &sh_locker->heldby,
252				    locker_links, __db_lock) {
253					if (lp->mode != DB_LOCK_WWRITE)
254						continue;
255					lock.off = R_OFFSET(&lt->reginfo, lp);
256					lock.gen = lp->gen;
257					F_SET(sh_locker, DB_LOCKER_INABORT);
258					if ((ret = __lock_get_internal(lt,
259					    sh_locker, flags | DB_LOCK_UPGRADE,
260					    NULL, DB_LOCK_WRITE, 0, &lock)) !=0)
261						break;
262				}
263			up_done:
264				/* FALLTHROUGH */
265			case DB_LOCK_PUT_READ:
266			case DB_LOCK_PUT_ALL:
267				break;
268			default:
269				break;
270			}
271			break;
272		case DB_LOCK_PUT_OBJ:
273			/* Remove all the locks associated with an object. */
274			OBJECT_LOCK(lt, region, list[i].obj, ndx);
275			if ((ret = __lock_getobj(lt, list[i].obj,
276			    ndx, 0, &sh_obj)) != 0 || sh_obj == NULL) {
277				if (ret == 0)
278					ret = EINVAL;
279				OBJECT_UNLOCK(lt, region, ndx);
280				break;
281			}
282
283			/*
284			 * Go through both waiters and holders.  Don't bother
285			 * to run promotion, because everyone is getting
286			 * released.  The processes waiting will still get
287			 * awakened as their waiters are released.
288			 */
289			for (lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock);
290			    ret == 0 && lp != NULL;
291			    lp = SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock))
292				ret = __lock_put_internal(lt, lp, ndx,
293				    DB_LOCK_UNLINK |
294				    DB_LOCK_NOPROMOTE | DB_LOCK_DOALL);
295
296			/*
297			 * On the last time around, the object will get
298			 * reclaimed by __lock_put_internal, structure the
299			 * loop carefully so we do not get bitten.
300			 */
301			for (lp = SH_TAILQ_FIRST(&sh_obj->holders, __db_lock);
302			    ret == 0 && lp != NULL;
303			    lp = next_lock) {
304				next_lock = SH_TAILQ_NEXT(lp, links, __db_lock);
305				ret = __lock_put_internal(lt, lp, ndx,
306				    DB_LOCK_UNLINK |
307				    DB_LOCK_NOPROMOTE | DB_LOCK_DOALL);
308			}
309			OBJECT_UNLOCK(lt, region, ndx);
310			break;
311
312		case DB_LOCK_TIMEOUT:
313			ret = __lock_set_timeout_internal(env,
314			    sh_locker, 0, DB_SET_TXN_NOW);
315			break;
316
317		case DB_LOCK_TRADE:
318			/*
319			 * INTERNAL USE ONLY.
320			 * Change the holder of the lock described in
321			 * list[i].lock to the locker-id specified by
322			 * the locker parameter.
323			 */
324			/*
325			 * You had better know what you're doing here.
326			 * We are trading locker-id's on a lock to
327			 * facilitate file locking on open DB handles.
328			 * We do not do any conflict checking on this,
329			 * so heaven help you if you use this flag under
330			 * any other circumstances.
331			 */
332			ret = __lock_trade(env, &list[i].lock, sh_locker);
333			break;
334#if defined(DEBUG) && defined(HAVE_STATISTICS)
335		case DB_LOCK_DUMP:
336			if (sh_locker == NULL)
337				break;
338
339			SH_LIST_FOREACH(
340			    lp, &sh_locker->heldby, locker_links, __db_lock)
341				__lock_printlock(lt, NULL, lp, 1);
342			break;
343#endif
344		default:
345			__db_errx(env,
346			    "Invalid lock operation: %d", list[i].op);
347			ret = EINVAL;
348			break;
349		}
350
351	if (ret == 0 && region->detect != DB_LOCK_NORUN &&
352	     (region->need_dd || timespecisset(&region->next_timeout)))
353		run_dd = 1;
354	LOCK_SYSTEM_UNLOCK(lt, region);
355
356	if (run_dd)
357		(void)__lock_detect(env, region->detect, &did_abort);
358
359	if (ret != 0 && elistp != NULL)
360		*elistp = &list[i - 1];
361
362	return (ret);
363}
364
365/*
366 * __lock_get_pp --
367 *	ENV->lock_get pre/post processing.
368 *
369 * PUBLIC: int __lock_get_pp __P((DB_ENV *,
370 * PUBLIC:     u_int32_t, u_int32_t, DBT *, db_lockmode_t, DB_LOCK *));
371 */
372int
373__lock_get_pp(dbenv, locker, flags, obj, lock_mode, lock)
374	DB_ENV *dbenv;
375	u_int32_t locker, flags;
376	DBT *obj;
377	db_lockmode_t lock_mode;
378	DB_LOCK *lock;
379{
380	DB_THREAD_INFO *ip;
381	ENV *env;
382	int ret;
383
384	env = dbenv->env;
385
386	ENV_REQUIRES_CONFIG(env,
387	    env->lk_handle, "DB_ENV->lock_get", DB_INIT_LOCK);
388
389	/* Validate arguments. */
390	if ((ret = __db_fchk(env, "DB_ENV->lock_get", flags,
391	    DB_LOCK_NOWAIT | DB_LOCK_UPGRADE | DB_LOCK_SWITCH)) != 0)
392		return (ret);
393
394	if ((ret = __dbt_usercopy(env, obj)) != 0)
395		return (ret);
396
397	ENV_ENTER(env, ip);
398	REPLICATION_WRAP(env,
399	     (__lock_get_api(env, locker, flags, obj, lock_mode, lock)),
400	     0, ret);
401	ENV_LEAVE(env, ip);
402	return (ret);
403}
404
405static int
406__lock_get_api(env, locker, flags, obj, lock_mode, lock)
407	ENV *env;
408	u_int32_t locker, flags;
409	const DBT *obj;
410	db_lockmode_t lock_mode;
411	DB_LOCK *lock;
412{
413	DB_LOCKER *sh_locker;
414	DB_LOCKREGION *region;
415	int ret;
416
417	COMPQUIET(region, NULL);
418
419	region = env->lk_handle->reginfo.primary;
420
421	LOCK_SYSTEM_LOCK(env->lk_handle, region);
422	LOCK_LOCKERS(env, region);
423	ret = __lock_getlocker_int(env->lk_handle, locker, 0, &sh_locker);
424	UNLOCK_LOCKERS(env, region);
425	if (ret == 0)
426		ret = __lock_get_internal(env->lk_handle,
427		    sh_locker, flags, obj, lock_mode, 0, lock);
428	LOCK_SYSTEM_UNLOCK(env->lk_handle, region);
429	return (ret);
430}
431
432/*
433 * __lock_get --
434 *	ENV->lock_get.
435 *
436 * PUBLIC: int __lock_get __P((ENV *,
437 * PUBLIC:     DB_LOCKER *, u_int32_t, const DBT *, db_lockmode_t, DB_LOCK *));
438 */
439int
440__lock_get(env, locker, flags, obj, lock_mode, lock)
441	ENV *env;
442	DB_LOCKER *locker;
443	u_int32_t flags;
444	const DBT *obj;
445	db_lockmode_t lock_mode;
446	DB_LOCK *lock;
447{
448	DB_LOCKTAB *lt;
449	int ret;
450
451	lt = env->lk_handle;
452
453	if (IS_RECOVERING(env)) {
454		LOCK_INIT(*lock);
455		return (0);
456	}
457
458	LOCK_SYSTEM_LOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
459	ret = __lock_get_internal(lt, locker, flags, obj, lock_mode, 0, lock);
460	LOCK_SYSTEM_UNLOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
461	return (ret);
462}
463/*
464 * __lock_alloclock -- allocate a lock from another partition.
465 *	We assume we have the partition locked on entry and leave
466 * it unlocked on success since we will have to retry the lock operation.
467 * The mutex will be locked if we are out of space.
468 */
469static int
470__lock_alloclock(lt, part_id)
471	DB_LOCKTAB *lt;
472	u_int32_t part_id;
473{
474	struct __db_lock *sh_lock;
475	DB_LOCKPART *end_p, *cur_p;
476	DB_LOCKREGION *region;
477	int begin;
478
479	region = lt->reginfo.primary;
480
481	if (region->part_t_size == 1)
482		goto err;
483
484	begin = 0;
485	sh_lock = NULL;
486	cur_p = &lt->part_array[part_id];
487	MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
488	end_p = &lt->part_array[region->part_t_size];
489	/*
490	 * Start looking at the next partition and wrap around.  If
491	 * we get back to our partition then raise an error.
492	 */
493again:	for (cur_p++; sh_lock == NULL && cur_p < end_p; cur_p++) {
494		MUTEX_LOCK(lt->env, cur_p->mtx_part);
495		if ((sh_lock =
496		    SH_TAILQ_FIRST(&cur_p->free_locks, __db_lock)) != NULL)
497			SH_TAILQ_REMOVE(&cur_p->free_locks,
498			    sh_lock, links, __db_lock);
499		MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
500	}
501	if (sh_lock != NULL) {
502		cur_p = &lt->part_array[part_id];
503		MUTEX_LOCK(lt->env, cur_p->mtx_part);
504		SH_TAILQ_INSERT_HEAD(&cur_p->free_locks,
505		    sh_lock, links, __db_lock);
506		STAT(cur_p->part_stat.st_locksteals++);
507		MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
508		return (0);
509	}
510	if (!begin) {
511		begin = 1;
512		cur_p = lt->part_array;
513		end_p = &lt->part_array[part_id];
514		goto again;
515	}
516
517	cur_p = &lt->part_array[part_id];
518	MUTEX_LOCK(lt->env, cur_p->mtx_part);
519
520err:	return (__lock_nomem(lt->env, "lock entries"));
521}
522
523/*
524 * __lock_get_internal --
525 *	All the work for lock_get (and for the GET option of lock_vec) is done
526 *	inside of lock_get_internal.
527 *
528 * PUBLIC: int  __lock_get_internal __P((DB_LOCKTAB *, DB_LOCKER *, u_int32_t,
529 * PUBLIC:     const DBT *, db_lockmode_t, db_timeout_t, DB_LOCK *));
530 */
531int
532__lock_get_internal(lt, sh_locker, flags, obj, lock_mode, timeout, lock)
533	DB_LOCKTAB *lt;
534	DB_LOCKER *sh_locker;
535	u_int32_t flags;
536	const DBT *obj;
537	db_lockmode_t lock_mode;
538	db_timeout_t timeout;
539	DB_LOCK *lock;
540{
541	struct __db_lock *newl, *lp;
542	ENV *env;
543	DB_LOCKOBJ *sh_obj;
544	DB_LOCKREGION *region;
545	DB_THREAD_INFO *ip;
546	u_int32_t ndx, part_id;
547	int did_abort, ihold, grant_dirty, no_dd, ret, t_ret;
548	roff_t holder, sh_off;
549
550	/*
551	 * We decide what action to take based on what locks are already held
552	 * and what locks are in the wait queue.
553	 */
554	enum {
555		GRANT,		/* Grant the lock. */
556		UPGRADE,	/* Upgrade the lock. */
557		HEAD,		/* Wait at head of wait queue. */
558		SECOND,		/* Wait as the second waiter. */
559		TAIL		/* Wait at tail of the wait queue. */
560	} action;
561
562	env = lt->env;
563	region = lt->reginfo.primary;
564
565	/* Check if locks have been globally turned off. */
566	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
567		return (0);
568
569	if (sh_locker == NULL) {
570		__db_errx(env, "Locker does not exist");
571		return (EINVAL);
572	}
573
574	no_dd = ret = 0;
575	newl = NULL;
576	sh_obj = NULL;
577
578	/* Check that the lock mode is valid.  */
579	if (lock_mode >= (db_lockmode_t)region->nmodes) {
580		__db_errx(env, "DB_ENV->lock_get: invalid lock mode %lu",
581		    (u_long)lock_mode);
582		return (EINVAL);
583	}
584
585again:	if (obj == NULL) {
586		DB_ASSERT(env, LOCK_ISSET(*lock));
587		lp = R_ADDR(&lt->reginfo, lock->off);
588		sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
589		ndx = sh_obj->indx;
590		OBJECT_LOCK_NDX(lt, region, ndx);
591	} else {
592		/* Allocate a shared memory new object. */
593		OBJECT_LOCK(lt, region, obj, lock->ndx);
594		ndx = lock->ndx;
595		if ((ret = __lock_getobj(lt, obj, lock->ndx, 1, &sh_obj)) != 0)
596			goto err;
597	}
598
599#ifdef HAVE_STATISTICS
600	if (LF_ISSET(DB_LOCK_UPGRADE))
601		lt->obj_stat[ndx].st_nupgrade++;
602	else if (!LF_ISSET(DB_LOCK_SWITCH))
603		lt->obj_stat[ndx].st_nrequests++;
604#endif
605
606	/*
607	 * Figure out if we can grant this lock or if it should wait.
608	 * By default, we can grant the new lock if it does not conflict with
609	 * anyone on the holders list OR anyone on the waiters list.
610	 * The reason that we don't grant if there's a conflict is that
611	 * this can lead to starvation (a writer waiting on a popularly
612	 * read item will never be granted).  The downside of this is that
613	 * a waiting reader can prevent an upgrade from reader to writer,
614	 * which is not uncommon.
615	 *
616	 * There are two exceptions to the no-conflict rule.  First, if
617	 * a lock is held by the requesting locker AND the new lock does
618	 * not conflict with any other holders, then we grant the lock.
619	 * The most common place this happens is when the holder has a
620	 * WRITE lock and a READ lock request comes in for the same locker.
621	 * If we do not grant the read lock, then we guarantee deadlock.
622	 * Second, dirty readers are granted if at all possible while
623	 * avoiding starvation, see below.
624	 *
625	 * In case of conflict, we put the new lock on the end of the waiters
626	 * list, unless we are upgrading or this is a dirty reader in which
627	 * case the locker goes at or near the front of the list.
628	 */
629	ihold = 0;
630	grant_dirty = 0;
631	holder = 0;
632
633	/*
634	 * SWITCH is a special case, used by the queue access method
635	 * when we want to get an entry which is past the end of the queue.
636	 * We have a DB_READ_LOCK and need to switch it to DB_LOCK_WAIT and
637	 * join the waiters queue.  This must be done as a single operation
638	 * so that another locker cannot get in and fail to wake us up.
639	 */
640	if (LF_ISSET(DB_LOCK_SWITCH))
641		lp = NULL;
642	else
643		lp = SH_TAILQ_FIRST(&sh_obj->holders, __db_lock);
644
645	sh_off = R_OFFSET(&lt->reginfo, sh_locker);
646	for (; lp != NULL; lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
647		DB_ASSERT(env, lp->status != DB_LSTAT_FREE);
648		if (sh_off == lp->holder) {
649			if (lp->mode == lock_mode &&
650			    lp->status == DB_LSTAT_HELD) {
651				if (LF_ISSET(DB_LOCK_UPGRADE))
652					goto upgrade;
653
654				/*
655				 * Lock is held, so we can increment the
656				 * reference count and return this lock
657				 * to the caller.  We do not count reference
658				 * increments towards the locks held by
659				 * the locker.
660				 */
661				lp->refcount++;
662				lock->off = R_OFFSET(&lt->reginfo, lp);
663				lock->gen = lp->gen;
664				lock->mode = lp->mode;
665				goto done;
666			} else {
667				ihold = 1;
668			}
669		} else if (__lock_is_parent(lt, lp->holder, sh_locker))
670			ihold = 1;
671		else if (CONFLICTS(lt, region, lp->mode, lock_mode))
672			break;
673		else if (lp->mode == DB_LOCK_READ ||
674		     lp->mode == DB_LOCK_WWRITE) {
675			grant_dirty = 1;
676			holder = lp->holder;
677		}
678	}
679
680	/*
681	 * If there are conflicting holders we will have to wait.  If we
682	 * already hold a lock on this object or are doing an upgrade or
683	 * this is a dirty reader it goes to the head of the queue, everyone
684	 * else to the back.
685	 */
686	if (lp != NULL) {
687		if (ihold || LF_ISSET(DB_LOCK_UPGRADE) ||
688		    lock_mode == DB_LOCK_READ_UNCOMMITTED)
689			action = HEAD;
690		else
691			action = TAIL;
692	} else {
693		if (LF_ISSET(DB_LOCK_SWITCH))
694			action = TAIL;
695		else if (LF_ISSET(DB_LOCK_UPGRADE))
696			action = UPGRADE;
697		else  if (ihold)
698			action = GRANT;
699		else {
700			/*
701			 * Look for conflicting waiters.
702			 */
703			SH_TAILQ_FOREACH(
704			    lp, &sh_obj->waiters, links, __db_lock)
705				if (CONFLICTS(lt, region, lp->mode,
706				     lock_mode) && sh_off != lp->holder)
707					break;
708
709			/*
710			 * If there are no conflicting holders or waiters,
711			 * then we grant. Normally when we wait, we
712			 * wait at the end (TAIL).  However, the goal of
713			 * DIRTY_READ locks to allow forward progress in the
714			 * face of updating transactions, so we try to allow
715			 * all DIRTY_READ requests to proceed as rapidly
716			 * as possible, so long as we can prevent starvation.
717			 *
718			 * When determining how to queue a DIRTY_READ
719			 * request:
720			 *
721			 *	1. If there is a waiting upgrading writer,
722			 *	   then we enqueue the dirty reader BEHIND it
723			 *	   (second in the queue).
724			 *	2. Else, if the current holders are either
725			 *	   READ or WWRITE, we grant
726			 *	3. Else queue SECOND i.e., behind the first
727			 *	   waiter.
728			 *
729			 * The end result is that dirty_readers get to run
730			 * so long as other lockers are blocked.  Once
731			 * there is a locker which is only waiting on
732			 * dirty readers then they queue up behind that
733			 * locker so that it gets to run.  In general
734			 * this locker will be a WRITE which will shortly
735			 * get downgraded to a WWRITE, permitting the
736			 * DIRTY locks to be granted.
737			 */
738			if (lp == NULL)
739				action = GRANT;
740			else if (grant_dirty &&
741			    lock_mode == DB_LOCK_READ_UNCOMMITTED) {
742				/*
743				 * An upgrade will be at the head of the
744				 * queue.
745				 */
746				lp = SH_TAILQ_FIRST(
747				     &sh_obj->waiters, __db_lock);
748				if (lp->mode == DB_LOCK_WRITE &&
749				     lp->holder == holder)
750					action = SECOND;
751				else
752					action = GRANT;
753			} else if (lock_mode == DB_LOCK_READ_UNCOMMITTED)
754				action = SECOND;
755			else
756				action = TAIL;
757		}
758	}
759
760	switch (action) {
761	case HEAD:
762	case TAIL:
763	case SECOND:
764	case GRANT:
765		part_id = LOCK_PART(region, ndx);
766		/* Allocate a new lock. */
767		if ((newl = SH_TAILQ_FIRST(
768		    &FREE_LOCKS(lt, part_id), __db_lock)) == NULL) {
769			if ((ret = __lock_alloclock(lt, part_id)) != 0)
770				goto err;
771			/* Dropped the mutex start over. */
772			goto again;
773		}
774		SH_TAILQ_REMOVE(
775		    &FREE_LOCKS(lt, part_id), newl, links, __db_lock);
776
777#ifdef HAVE_STATISTICS
778		/*
779		 * Keep track of the maximum number of locks allocated
780		 * in each partition and the maximum number of locks
781		 * used by any one bucket.
782		 */
783		if (++lt->obj_stat[ndx].st_nlocks >
784		    lt->obj_stat[ndx].st_maxnlocks)
785			lt->obj_stat[ndx].st_maxnlocks =
786			    lt->obj_stat[ndx].st_nlocks;
787		if (++lt->part_array[part_id].part_stat.st_nlocks >
788		    lt->part_array[part_id].part_stat.st_maxnlocks)
789			lt->part_array[part_id].part_stat.st_maxnlocks =
790			    lt->part_array[part_id].part_stat.st_nlocks;
791#endif
792
793		newl->holder = R_OFFSET(&lt->reginfo, sh_locker);
794		newl->refcount = 1;
795		newl->mode = lock_mode;
796		newl->obj = (roff_t)SH_PTR_TO_OFF(newl, sh_obj);
797		newl->indx = sh_obj->indx;
798		/*
799		 * Now, insert the lock onto its locker's list.
800		 * If the locker does not currently hold any locks,
801		 * there's no reason to run a deadlock
802		 * detector, save that information.
803		 */
804		no_dd = sh_locker->master_locker == INVALID_ROFF &&
805		    SH_LIST_FIRST(
806		    &sh_locker->child_locker, __db_locker) == NULL &&
807		    SH_LIST_FIRST(&sh_locker->heldby, __db_lock) == NULL;
808
809		SH_LIST_INSERT_HEAD(
810		    &sh_locker->heldby, newl, locker_links, __db_lock);
811
812		/*
813		 * Allocate a mutex if we do not have a mutex backing the lock.
814		 *
815		 * Use the lock mutex to block the thread; lock the mutex
816		 * when it is allocated so that we will block when we try
817		 * to lock it again.  We will wake up when another thread
818		 * grants the lock and releases the mutex.  We leave it
819		 * locked for the next use of this lock object.
820		 */
821		if (newl->mtx_lock == MUTEX_INVALID) {
822			if ((ret = __mutex_alloc(env, MTX_LOGICAL_LOCK,
823			    DB_MUTEX_LOGICAL_LOCK | DB_MUTEX_SELF_BLOCK,
824			    &newl->mtx_lock)) != 0)
825				goto err;
826			MUTEX_LOCK(env, newl->mtx_lock);
827		}
828		break;
829
830	case UPGRADE:
831upgrade:	lp = R_ADDR(&lt->reginfo, lock->off);
832		if (IS_WRITELOCK(lock_mode) && !IS_WRITELOCK(lp->mode))
833			sh_locker->nwrites++;
834		lp->mode = lock_mode;
835		goto done;
836	}
837
838	switch (action) {
839	case UPGRADE:
840		DB_ASSERT(env, 0);
841		break;
842	case GRANT:
843		newl->status = DB_LSTAT_HELD;
844		SH_TAILQ_INSERT_TAIL(&sh_obj->holders, newl, links);
845		break;
846	case HEAD:
847	case TAIL:
848	case SECOND:
849		if (LF_ISSET(DB_LOCK_NOWAIT)) {
850			ret = DB_LOCK_NOTGRANTED;
851			STAT(region->stat.st_lock_nowait++);
852			goto err;
853		}
854		if ((lp =
855		    SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock)) == NULL) {
856			LOCK_DD(env, region);
857			SH_TAILQ_INSERT_HEAD(&region->dd_objs,
858				    sh_obj, dd_links, __db_lockobj);
859			UNLOCK_DD(env, region);
860		}
861		switch (action) {
862		case HEAD:
863			SH_TAILQ_INSERT_HEAD(
864			     &sh_obj->waiters, newl, links, __db_lock);
865			break;
866		case SECOND:
867			SH_TAILQ_INSERT_AFTER(
868			     &sh_obj->waiters, lp, newl, links, __db_lock);
869			break;
870		case TAIL:
871			SH_TAILQ_INSERT_TAIL(&sh_obj->waiters, newl, links);
872			break;
873		default:
874			DB_ASSERT(env, 0);
875		}
876
877		/*
878		 * First check to see if this txn has expired.
879		 * If not then see if the lock timeout is past
880		 * the expiration of the txn, if it is, use
881		 * the txn expiration time.  lk_expire is passed
882		 * to avoid an extra call to get the time.
883		 */
884		if (__lock_expired(env,
885		    &sh_locker->lk_expire, &sh_locker->tx_expire)) {
886			newl->status = DB_LSTAT_EXPIRED;
887			sh_locker->lk_expire = sh_locker->tx_expire;
888
889			/* We are done. */
890			goto expired;
891		}
892
893		/*
894		 * If a timeout was specified in this call then it
895		 * takes priority.  If a lock timeout has been specified
896		 * for this transaction then use that, otherwise use
897		 * the global timeout value.
898		 */
899		if (!LF_ISSET(DB_LOCK_SET_TIMEOUT)) {
900			if (F_ISSET(sh_locker, DB_LOCKER_TIMEOUT))
901				timeout = sh_locker->lk_timeout;
902			else
903				timeout = region->lk_timeout;
904		}
905		if (timeout != 0)
906			__lock_expires(env, &sh_locker->lk_expire, timeout);
907		else
908			timespecclear(&sh_locker->lk_expire);
909
910		if (timespecisset(&sh_locker->tx_expire) &&
911			(timeout == 0 || __lock_expired(env,
912			    &sh_locker->lk_expire, &sh_locker->tx_expire)))
913				sh_locker->lk_expire = sh_locker->tx_expire;
914		if (timespecisset(&sh_locker->lk_expire) &&
915		    (!timespecisset(&region->next_timeout) ||
916		    timespeccmp(
917		    &region->next_timeout, &sh_locker->lk_expire, >)))
918			region->next_timeout = sh_locker->lk_expire;
919
920in_abort:	newl->status = DB_LSTAT_WAITING;
921		STAT(lt->obj_stat[ndx].st_lock_wait++);
922		/* We are about to block, deadlock detector must run. */
923		region->need_dd = 1;
924
925		OBJECT_UNLOCK(lt, region, sh_obj->indx);
926
927		/* If we are switching drop the lock we had. */
928		if (LF_ISSET(DB_LOCK_SWITCH) &&
929		    (ret = __lock_put_nolock(env,
930		    lock, &ihold, DB_LOCK_NOWAITERS)) != 0) {
931			OBJECT_LOCK_NDX(lt, region, sh_obj->indx);
932			(void)__lock_remove_waiter(
933			    lt, sh_obj, newl, DB_LSTAT_FREE);
934			goto err;
935		}
936
937		LOCK_SYSTEM_UNLOCK(lt, region);
938
939		/*
940		 * Before waiting, see if the deadlock detector should run.
941		 */
942		if (region->detect != DB_LOCK_NORUN && !no_dd)
943			(void)__lock_detect(env, region->detect, &did_abort);
944
945		ip = NULL;
946		if (env->thr_hashtab != NULL &&
947		     (ret = __env_set_state(env, &ip, THREAD_BLOCKED)) != 0) {
948			LOCK_SYSTEM_LOCK(lt, region);
949			OBJECT_LOCK_NDX(lt, region, ndx);
950			goto err;
951		}
952
953		MUTEX_LOCK(env, newl->mtx_lock);
954		if (ip != NULL)
955			ip->dbth_state = THREAD_ACTIVE;
956
957		LOCK_SYSTEM_LOCK(lt, region);
958		OBJECT_LOCK_NDX(lt, region, ndx);
959
960		/* Turn off lock timeout. */
961		if (newl->status != DB_LSTAT_EXPIRED)
962			timespecclear(&sh_locker->lk_expire);
963
964		switch (newl->status) {
965		case DB_LSTAT_ABORTED:
966			/*
967			 * If we raced with the deadlock detector and it
968			 * mistakenly picked this tranaction to abort again
969			 * ignore the abort and request the lock again.
970			 */
971			if (F_ISSET(sh_locker, DB_LOCKER_INABORT))
972				goto in_abort;
973			ret = DB_LOCK_DEADLOCK;
974			goto err;
975		case DB_LSTAT_EXPIRED:
976expired:		ret = __lock_put_internal(lt, newl,
977			    ndx, DB_LOCK_UNLINK | DB_LOCK_FREE);
978			newl = NULL;
979			if (ret != 0)
980				goto err;
981#ifdef HAVE_STATISTICS
982			if (timespeccmp(
983			    &sh_locker->lk_expire, &sh_locker->tx_expire, ==))
984				lt->obj_stat[ndx].st_ntxntimeouts++;
985			else
986				lt->obj_stat[ndx].st_nlocktimeouts++;
987#endif
988			ret = DB_LOCK_NOTGRANTED;
989			timespecclear(&sh_locker->lk_expire);
990			goto err;
991		case DB_LSTAT_PENDING:
992			if (LF_ISSET(DB_LOCK_UPGRADE)) {
993				/*
994				 * The lock just granted got put on the holders
995				 * list.  Since we're upgrading some other lock,
996				 * we've got to remove it here.
997				 */
998				SH_TAILQ_REMOVE(
999				    &sh_obj->holders, newl, links, __db_lock);
1000				/*
1001				 * Ensure the object is not believed to be on
1002				 * the object's lists, if we're traversing by
1003				 * locker.
1004				 */
1005				newl->links.stqe_prev = -1;
1006				goto upgrade;
1007			} else
1008				newl->status = DB_LSTAT_HELD;
1009			break;
1010		case DB_LSTAT_FREE:
1011		case DB_LSTAT_HELD:
1012		case DB_LSTAT_WAITING:
1013		default:
1014			__db_errx(env,
1015			    "Unexpected lock status: %d", (int)newl->status);
1016			ret = __env_panic(env, EINVAL);
1017			goto err;
1018		}
1019	}
1020
1021	lock->off = R_OFFSET(&lt->reginfo, newl);
1022	lock->gen = newl->gen;
1023	lock->mode = newl->mode;
1024	sh_locker->nlocks++;
1025	if (IS_WRITELOCK(newl->mode)) {
1026		sh_locker->nwrites++;
1027		if (newl->mode == DB_LOCK_WWRITE)
1028			F_SET(sh_locker, DB_LOCKER_DIRTY);
1029	}
1030
1031	OBJECT_UNLOCK(lt, region, ndx);
1032	return (0);
1033
1034err:	if (!LF_ISSET(DB_LOCK_UPGRADE | DB_LOCK_SWITCH))
1035		LOCK_INIT(*lock);
1036
1037done:	if (newl != NULL &&
1038	     (t_ret = __lock_freelock(lt, newl, sh_locker,
1039	     DB_LOCK_FREE | DB_LOCK_UNLINK)) != 0 && ret == 0)
1040		ret = t_ret;
1041	OBJECT_UNLOCK(lt, region, ndx);
1042
1043	return (ret);
1044}
1045
1046/*
1047 * __lock_put_pp --
1048 *	ENV->lock_put pre/post processing.
1049 *
1050 * PUBLIC: int  __lock_put_pp __P((DB_ENV *, DB_LOCK *));
1051 */
1052int
1053__lock_put_pp(dbenv, lock)
1054	DB_ENV *dbenv;
1055	DB_LOCK *lock;
1056{
1057	DB_THREAD_INFO *ip;
1058	ENV *env;
1059	int ret;
1060
1061	env = dbenv->env;
1062
1063	ENV_REQUIRES_CONFIG(env,
1064	    env->lk_handle, "DB_LOCK->lock_put", DB_INIT_LOCK);
1065
1066	ENV_ENTER(env, ip);
1067	REPLICATION_WRAP(env, (__lock_put(env, lock)), 0, ret);
1068	ENV_LEAVE(env, ip);
1069	return (ret);
1070}
1071
1072/*
1073 * __lock_put --
1074 *
1075 * PUBLIC: int  __lock_put __P((ENV *, DB_LOCK *));
1076 *  Internal lock_put interface.
1077 */
1078int
1079__lock_put(env, lock)
1080	ENV *env;
1081	DB_LOCK *lock;
1082{
1083	DB_LOCKTAB *lt;
1084	int ret, run_dd;
1085
1086	if (IS_RECOVERING(env))
1087		return (0);
1088
1089	lt = env->lk_handle;
1090
1091	LOCK_SYSTEM_LOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
1092	ret = __lock_put_nolock(env, lock, &run_dd, 0);
1093	LOCK_SYSTEM_UNLOCK(lt, (DB_LOCKREGION *)lt->reginfo.primary);
1094
1095	/*
1096	 * Only run the lock detector if put told us to AND we are running
1097	 * in auto-detect mode.  If we are not running in auto-detect, then
1098	 * a call to lock_detect here will 0 the need_dd bit, but will not
1099	 * actually abort anything.
1100	 */
1101	if (ret == 0 && run_dd)
1102		(void)__lock_detect(env,
1103		    ((DB_LOCKREGION *)lt->reginfo.primary)->detect, NULL);
1104	return (ret);
1105}
1106
1107static int
1108__lock_put_nolock(env, lock, runp, flags)
1109	ENV *env;
1110	DB_LOCK *lock;
1111	int *runp;
1112	u_int32_t flags;
1113{
1114	struct __db_lock *lockp;
1115	DB_LOCKREGION *region;
1116	DB_LOCKTAB *lt;
1117	int ret;
1118
1119	/* Check if locks have been globally turned off. */
1120	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1121		return (0);
1122
1123	lt = env->lk_handle;
1124	region = lt->reginfo.primary;
1125
1126	lockp = R_ADDR(&lt->reginfo, lock->off);
1127	if (lock->gen != lockp->gen) {
1128		__db_errx(env, __db_lock_invalid, "DB_LOCK->lock_put");
1129		LOCK_INIT(*lock);
1130		return (EINVAL);
1131	}
1132
1133	OBJECT_LOCK_NDX(lt, region, lock->ndx);
1134	ret = __lock_put_internal(lt,
1135	    lockp, lock->ndx, flags | DB_LOCK_UNLINK | DB_LOCK_FREE);
1136	OBJECT_UNLOCK(lt, region, lock->ndx);
1137
1138	LOCK_INIT(*lock);
1139
1140	*runp = 0;
1141	if (ret == 0 && region->detect != DB_LOCK_NORUN &&
1142	     (region->need_dd || timespecisset(&region->next_timeout)))
1143		*runp = 1;
1144
1145	return (ret);
1146}
1147
1148/*
1149 * __lock_downgrade --
1150 *
1151 * Used to downgrade locks.  Currently this is used in three places: 1) by the
1152 * Concurrent Data Store product to downgrade write locks back to iwrite locks
1153 * and 2) to downgrade write-handle locks to read-handle locks at the end of
1154 * an open/create. 3) To downgrade write locks to was_write to support dirty
1155 * reads.
1156 *
1157 * PUBLIC: int __lock_downgrade __P((ENV *,
1158 * PUBLIC:     DB_LOCK *, db_lockmode_t, u_int32_t));
1159 */
1160int
1161__lock_downgrade(env, lock, new_mode, flags)
1162	ENV *env;
1163	DB_LOCK *lock;
1164	db_lockmode_t new_mode;
1165	u_int32_t flags;
1166{
1167	struct __db_lock *lockp;
1168	DB_LOCKER *sh_locker;
1169	DB_LOCKOBJ *obj;
1170	DB_LOCKREGION *region;
1171	DB_LOCKTAB *lt;
1172	int ret;
1173
1174	ret = 0;
1175
1176	/* Check if locks have been globally turned off. */
1177	if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING))
1178		return (0);
1179
1180	lt = env->lk_handle;
1181	region = lt->reginfo.primary;
1182
1183	LOCK_SYSTEM_LOCK(lt, region);
1184
1185	lockp = R_ADDR(&lt->reginfo, lock->off);
1186	if (lock->gen != lockp->gen) {
1187		__db_errx(env, __db_lock_invalid, "lock_downgrade");
1188		ret = EINVAL;
1189		goto out;
1190	}
1191
1192	sh_locker = R_ADDR(&lt->reginfo, lockp->holder);
1193
1194	if (IS_WRITELOCK(lockp->mode) && !IS_WRITELOCK(new_mode))
1195		sh_locker->nwrites--;
1196
1197	lockp->mode = new_mode;
1198	lock->mode = new_mode;
1199
1200	/* Get the object associated with this lock. */
1201	obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
1202	OBJECT_LOCK_NDX(lt, region, obj->indx);
1203	STAT(lt->obj_stat[obj->indx].st_ndowngrade++);
1204	ret = __lock_promote(lt, obj, NULL, LF_ISSET(DB_LOCK_NOWAITERS));
1205	OBJECT_UNLOCK(lt, region, obj->indx);
1206
1207out:	LOCK_SYSTEM_UNLOCK(lt, region);
1208	return (ret);
1209}
1210
1211/*
1212 * __lock_put_internal -- put a lock structure
1213 * We assume that we are called with the proper object locked.
1214 */
1215static int
1216__lock_put_internal(lt, lockp, obj_ndx, flags)
1217	DB_LOCKTAB *lt;
1218	struct __db_lock *lockp;
1219	u_int32_t obj_ndx, flags;
1220{
1221	DB_LOCKOBJ *sh_obj;
1222	DB_LOCKREGION *region;
1223	ENV *env;
1224	u_int32_t part_id;
1225	int ret, state_changed;
1226
1227	COMPQUIET(env, NULL);
1228	env = lt->env;
1229	region = lt->reginfo.primary;
1230	ret = state_changed = 0;
1231
1232	if (!OBJ_LINKS_VALID(lockp)) {
1233		/*
1234		 * Someone removed this lock while we were doing a release
1235		 * by locker id.  We are trying to free this lock, but it's
1236		 * already been done; all we need to do is return it to the
1237		 * free list.
1238		 */
1239		(void)__lock_freelock(lt, lockp, NULL, DB_LOCK_FREE);
1240		return (0);
1241	}
1242
1243#ifdef HAVE_STATISTICS
1244	if (LF_ISSET(DB_LOCK_DOALL))
1245		lt->obj_stat[obj_ndx].st_nreleases += lockp->refcount;
1246	else
1247		lt->obj_stat[obj_ndx].st_nreleases++;
1248#endif
1249
1250	if (!LF_ISSET(DB_LOCK_DOALL) && lockp->refcount > 1) {
1251		lockp->refcount--;
1252		return (0);
1253	}
1254
1255	/* Increment generation number. */
1256	lockp->gen++;
1257
1258	/* Get the object associated with this lock. */
1259	sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
1260
1261	/*
1262	 * Remove this lock from its holders/waitlist.  Set its status
1263	 * to ABORTED.  It may get freed below, but if not then the
1264	 * waiter has been aborted (it will panic if the lock is
1265	 * free).
1266	 */
1267	if (lockp->status != DB_LSTAT_HELD &&
1268	    lockp->status != DB_LSTAT_PENDING) {
1269		if ((ret = __lock_remove_waiter(
1270		    lt, sh_obj, lockp, DB_LSTAT_ABORTED)) != 0)
1271			return (ret);
1272	} else {
1273		SH_TAILQ_REMOVE(&sh_obj->holders, lockp, links, __db_lock);
1274		lockp->links.stqe_prev = -1;
1275	}
1276
1277	if (LF_ISSET(DB_LOCK_NOPROMOTE))
1278		state_changed = 0;
1279	else
1280		if ((ret = __lock_promote(lt, sh_obj, &state_changed,
1281		    LF_ISSET(DB_LOCK_NOWAITERS))) != 0)
1282			return (ret);
1283
1284	/* Check if object should be reclaimed. */
1285	if (SH_TAILQ_FIRST(&sh_obj->holders, __db_lock) == NULL &&
1286	    SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL) {
1287		part_id = LOCK_PART(region, obj_ndx);
1288		SH_TAILQ_REMOVE(
1289		    &lt->obj_tab[obj_ndx], sh_obj, links, __db_lockobj);
1290		if (sh_obj->lockobj.size > sizeof(sh_obj->objdata)) {
1291			if (region->part_t_size != 1)
1292				LOCK_REGION_LOCK(env);
1293			__env_alloc_free(&lt->reginfo,
1294			    SH_DBT_PTR(&sh_obj->lockobj));
1295			if (region->part_t_size != 1)
1296				LOCK_REGION_UNLOCK(env);
1297		}
1298		SH_TAILQ_INSERT_HEAD(
1299		    &FREE_OBJS(lt, part_id), sh_obj, links, __db_lockobj);
1300		sh_obj->generation++;
1301		STAT(lt->part_array[part_id].part_stat.st_nobjects--);
1302		STAT(lt->obj_stat[obj_ndx].st_nobjects--);
1303		state_changed = 1;
1304	}
1305
1306	/* Free lock. */
1307	if (LF_ISSET(DB_LOCK_UNLINK | DB_LOCK_FREE))
1308		ret = __lock_freelock(lt, lockp,
1309		     R_ADDR(&lt->reginfo, lockp->holder), flags);
1310
1311	/*
1312	 * If we did not promote anyone; we need to run the deadlock
1313	 * detector again.
1314	 */
1315	if (state_changed == 0)
1316		region->need_dd = 1;
1317
1318	return (ret);
1319}
1320
1321/*
1322 * __lock_freelock --
1323 *	Free a lock.  Unlink it from its locker if necessary.
1324 * We must hold the object lock.
1325 *
1326 */
1327static int
1328__lock_freelock(lt, lockp, sh_locker, flags)
1329	DB_LOCKTAB *lt;
1330	struct __db_lock *lockp;
1331	DB_LOCKER *sh_locker;
1332	u_int32_t flags;
1333{
1334	DB_LOCKREGION *region;
1335	ENV *env;
1336	u_int32_t part_id;
1337	int ret;
1338
1339	env = lt->env;
1340	region = lt->reginfo.primary;
1341
1342	if (LF_ISSET(DB_LOCK_UNLINK)) {
1343		SH_LIST_REMOVE(lockp, locker_links, __db_lock);
1344		if (lockp->status == DB_LSTAT_HELD) {
1345			sh_locker->nlocks--;
1346			if (IS_WRITELOCK(lockp->mode))
1347				sh_locker->nwrites--;
1348		}
1349	}
1350
1351	if (LF_ISSET(DB_LOCK_FREE)) {
1352		/*
1353		 * If the lock is not held we cannot be sure of its mutex
1354		 * state so we just destroy it and let it be re-created
1355		 * when needed.
1356		 */
1357		part_id = LOCK_PART(region, lockp->indx);
1358		if (lockp->mtx_lock != MUTEX_INVALID &&
1359		     lockp->status != DB_LSTAT_HELD &&
1360		     lockp->status != DB_LSTAT_EXPIRED &&
1361		     (ret = __mutex_free(env, &lockp->mtx_lock)) != 0)
1362			return (ret);
1363		lockp->status = DB_LSTAT_FREE;
1364		SH_TAILQ_INSERT_HEAD(&FREE_LOCKS(lt, part_id),
1365		     lockp, links, __db_lock);
1366		STAT(lt->part_array[part_id].part_stat.st_nlocks--);
1367		STAT(lt->obj_stat[lockp->indx].st_nlocks--);
1368	}
1369
1370	return (0);
1371}
1372
1373/*
1374 * __lock_allocobj -- allocate a object from another partition.
1375 *	We assume we have the partition locked on entry and leave
1376 * with the same partition locked on exit.
1377 */
1378static int
1379__lock_allocobj(lt, part_id)
1380	DB_LOCKTAB *lt;
1381	u_int32_t part_id;
1382{
1383	DB_LOCKOBJ *sh_obj;
1384	DB_LOCKPART *end_p, *cur_p;
1385	DB_LOCKREGION *region;
1386	int begin;
1387
1388	region = lt->reginfo.primary;
1389
1390	if (region->part_t_size == 1)
1391		goto err;
1392
1393	begin = 0;
1394	sh_obj = NULL;
1395	cur_p = &lt->part_array[part_id];
1396	MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
1397	end_p = &lt->part_array[region->part_t_size];
1398	/*
1399	 * Start looking at the next partition and wrap around.  If
1400	 * we get back to our partition then raise an error.
1401	 */
1402again:	for (cur_p++; sh_obj == NULL && cur_p < end_p; cur_p++) {
1403		MUTEX_LOCK(lt->env, cur_p->mtx_part);
1404		if ((sh_obj =
1405		    SH_TAILQ_FIRST(&cur_p->free_objs, __db_lockobj)) != NULL)
1406			SH_TAILQ_REMOVE(&cur_p->free_objs,
1407			    sh_obj, links, __db_lockobj);
1408		MUTEX_UNLOCK(lt->env, cur_p->mtx_part);
1409	}
1410	if (sh_obj != NULL) {
1411		cur_p = &lt->part_array[part_id];
1412		MUTEX_LOCK(lt->env, cur_p->mtx_part);
1413		SH_TAILQ_INSERT_HEAD(&cur_p->free_objs,
1414		    sh_obj, links, __db_lockobj);
1415		STAT(cur_p->part_stat.st_objectsteals++);
1416		return (0);
1417	}
1418	if (!begin) {
1419		begin = 1;
1420		cur_p = lt->part_array;
1421		end_p = &lt->part_array[part_id];
1422		goto again;
1423	}
1424	MUTEX_LOCK(lt->env, cur_p->mtx_part);
1425
1426err:	return (__lock_nomem(lt->env, "object entries"));
1427}
1428/*
1429 * __lock_getobj --
1430 *	Get an object in the object hash table.  The create parameter
1431 * indicates if the object should be created if it doesn't exist in
1432 * the table.
1433 *
1434 * This must be called with the object bucket locked.
1435 */
1436static int
1437__lock_getobj(lt, obj, ndx, create, retp)
1438	DB_LOCKTAB *lt;
1439	const DBT *obj;
1440	u_int32_t ndx;
1441	int create;
1442	DB_LOCKOBJ **retp;
1443{
1444	DB_LOCKOBJ *sh_obj;
1445	DB_LOCKREGION *region;
1446	ENV *env;
1447	int ret;
1448	void *p;
1449	u_int32_t len, part_id;
1450
1451	env = lt->env;
1452	region = lt->reginfo.primary;
1453	len = 0;
1454
1455	/* Look up the object in the hash table. */
1456retry:	SH_TAILQ_FOREACH(sh_obj, &lt->obj_tab[ndx], links, __db_lockobj) {
1457		len++;
1458		if (obj->size == sh_obj->lockobj.size &&
1459		    memcmp(obj->data,
1460		    SH_DBT_PTR(&sh_obj->lockobj), obj->size) == 0)
1461			break;
1462	}
1463
1464	/*
1465	 * If we found the object, then we can just return it.  If
1466	 * we didn't find the object, then we need to create it.
1467	 */
1468	if (sh_obj == NULL && create) {
1469		/* Create new object and then insert it into hash table. */
1470		part_id = LOCK_PART(region, ndx);
1471		if ((sh_obj = SH_TAILQ_FIRST(&FREE_OBJS(
1472		    lt, part_id), __db_lockobj)) == NULL) {
1473			if ((ret = __lock_allocobj(lt, part_id)) == 0)
1474				goto retry;
1475			goto err;
1476		}
1477
1478		/*
1479		 * If we can fit this object in the structure, do so instead
1480		 * of alloc-ing space for it.
1481		 */
1482		if (obj->size <= sizeof(sh_obj->objdata))
1483			p = sh_obj->objdata;
1484		else {
1485			/*
1486			 * If we have only one partition, the region is locked.
1487			 */
1488			if (region->part_t_size != 1)
1489				LOCK_REGION_LOCK(env);
1490			if ((ret =
1491			    __env_alloc(&lt->reginfo, obj->size, &p)) != 0) {
1492				__db_errx(env,
1493				    "No space for lock object storage");
1494				if (region->part_t_size != 1)
1495					LOCK_REGION_UNLOCK(env);
1496				goto err;
1497			}
1498			if (region->part_t_size != 1)
1499				LOCK_REGION_UNLOCK(env);
1500		}
1501
1502		memcpy(p, obj->data, obj->size);
1503
1504		SH_TAILQ_REMOVE(&FREE_OBJS(
1505		    lt, part_id), sh_obj, links, __db_lockobj);
1506#ifdef HAVE_STATISTICS
1507		/*
1508		 * Keep track of both the max number of objects allocated
1509		 * per partition and the max number of objects used by
1510		 * this bucket.
1511		 */
1512		len++;
1513		if (++lt->obj_stat[ndx].st_nobjects >
1514		    lt->obj_stat[ndx].st_maxnobjects)
1515			lt->obj_stat[ndx].st_maxnobjects =
1516			    lt->obj_stat[ndx].st_nobjects;
1517		if (++lt->part_array[part_id].part_stat.st_nobjects >
1518		    lt->part_array[part_id].part_stat.st_maxnobjects)
1519			lt->part_array[part_id].part_stat.st_maxnobjects =
1520			    lt->part_array[part_id].part_stat.st_nobjects;
1521#endif
1522
1523		sh_obj->indx = ndx;
1524		SH_TAILQ_INIT(&sh_obj->waiters);
1525		SH_TAILQ_INIT(&sh_obj->holders);
1526		sh_obj->lockobj.size = obj->size;
1527		sh_obj->lockobj.off =
1528		    (roff_t)SH_PTR_TO_OFF(&sh_obj->lockobj, p);
1529		SH_TAILQ_INSERT_HEAD(
1530		    &lt->obj_tab[ndx], sh_obj, links, __db_lockobj);
1531	}
1532
1533#ifdef HAVE_STATISTICS
1534	if (len > lt->obj_stat[ndx].st_hash_len)
1535		lt->obj_stat[ndx].st_hash_len = len;
1536#endif
1537
1538#ifdef HAVE_STATISTICS
1539	if (len > lt->obj_stat[ndx].st_hash_len)
1540		lt->obj_stat[ndx].st_hash_len = len;
1541#endif
1542
1543	*retp = sh_obj;
1544	return (0);
1545
1546err:	return (ret);
1547}
1548
1549/*
1550 * __lock_is_parent --
1551 *	Given a locker and a transaction, return 1 if the locker is
1552 * an ancestor of the designated transaction.  This is used to determine
1553 * if we should grant locks that appear to conflict, but don't because
1554 * the lock is already held by an ancestor.
1555 */
1556static int
1557__lock_is_parent(lt, l_off, sh_locker)
1558	DB_LOCKTAB *lt;
1559	roff_t l_off;
1560	DB_LOCKER *sh_locker;
1561{
1562	DB_LOCKER *parent;
1563
1564	parent = sh_locker;
1565	while (parent->parent_locker != INVALID_ROFF) {
1566		if (parent->parent_locker == l_off)
1567			return (1);
1568		parent = R_ADDR(&lt->reginfo, parent->parent_locker);
1569	}
1570
1571	return (0);
1572}
1573
1574/*
1575 * __lock_locker_is_parent --
1576 *	Determine if "locker" is an ancestor of "child".
1577 * *retp == 1 if so, 0 otherwise.
1578 *
1579 * PUBLIC: int __lock_locker_is_parent
1580 * PUBLIC:     __P((ENV *, DB_LOCKER *, DB_LOCKER *, int *));
1581 */
1582int
1583__lock_locker_is_parent(env, locker, child, retp)
1584	ENV *env;
1585	DB_LOCKER *locker;
1586	DB_LOCKER *child;
1587	int *retp;
1588{
1589	DB_LOCKTAB *lt;
1590
1591	lt = env->lk_handle;
1592
1593	/*
1594	 * The locker may not exist for this transaction, if not then it has
1595	 * no parents.
1596	 */
1597	if (locker == NULL)
1598		*retp = 0;
1599	else
1600		*retp = __lock_is_parent(lt,
1601		     R_OFFSET(&lt->reginfo, locker), child);
1602	return (0);
1603}
1604
1605/*
1606 * __lock_inherit_locks --
1607 *	Called on child commit to merge child's locks with parent's.
1608 */
1609static int
1610__lock_inherit_locks(lt, sh_locker, flags)
1611	DB_LOCKTAB *lt;
1612	DB_LOCKER *sh_locker;
1613	u_int32_t flags;
1614{
1615	DB_LOCKER *sh_parent;
1616	DB_LOCKOBJ *obj;
1617	DB_LOCKREGION *region;
1618	ENV *env;
1619	int ret;
1620	struct __db_lock *hlp, *lp;
1621	roff_t poff;
1622
1623	env = lt->env;
1624	region = lt->reginfo.primary;
1625
1626	/*
1627	 * Get the committing locker and mark it as deleted.
1628	 * This allows us to traverse the locker links without
1629	 * worrying that someone else is deleting locks out
1630	 * from under us.  However, if the locker doesn't
1631	 * exist, that just means that the child holds no
1632	 * locks, so inheritance is easy!
1633	 */
1634	if (sh_locker == NULL) {
1635		__db_errx(env, __db_locker_invalid);
1636		return (EINVAL);
1637	}
1638
1639	/* Make sure we are a child transaction. */
1640	if (sh_locker->parent_locker == INVALID_ROFF) {
1641		__db_errx(env, "Not a child transaction");
1642		return (EINVAL);
1643	}
1644	sh_parent = R_ADDR(&lt->reginfo, sh_locker->parent_locker);
1645
1646	/*
1647	 * In order to make it possible for a parent to have
1648	 * many, many children who lock the same objects, and
1649	 * not require an inordinate number of locks, we try
1650	 * to merge the child's locks with its parent's.
1651	 */
1652	poff = R_OFFSET(&lt->reginfo, sh_parent);
1653	for (lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock);
1654	    lp != NULL;
1655	    lp = SH_LIST_FIRST(&sh_locker->heldby, __db_lock)) {
1656		SH_LIST_REMOVE(lp, locker_links, __db_lock);
1657
1658		/* See if the parent already has a lock. */
1659		obj = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
1660		OBJECT_LOCK_NDX(lt, region, obj->indx);
1661		SH_TAILQ_FOREACH(hlp, &obj->holders, links, __db_lock)
1662			if (hlp->holder == poff && lp->mode == hlp->mode)
1663				break;
1664
1665		if (hlp != NULL) {
1666			/* Parent already holds lock. */
1667			hlp->refcount += lp->refcount;
1668
1669			/* Remove lock from object list and free it. */
1670			DB_ASSERT(env, lp->status == DB_LSTAT_HELD);
1671			SH_TAILQ_REMOVE(&obj->holders, lp, links, __db_lock);
1672			(void)__lock_freelock(lt, lp, sh_locker, DB_LOCK_FREE);
1673		} else {
1674			/* Just move lock to parent chains. */
1675			SH_LIST_INSERT_HEAD(&sh_parent->heldby,
1676			    lp, locker_links, __db_lock);
1677			lp->holder = poff;
1678		}
1679
1680		/*
1681		 * We may need to promote regardless of whether we simply
1682		 * moved the lock to the parent or changed the parent's
1683		 * reference count, because there might be a sibling waiting,
1684		 * who will now be allowed to make forward progress.
1685		 */
1686		ret =
1687		     __lock_promote(lt, obj, NULL, LF_ISSET(DB_LOCK_NOWAITERS));
1688		OBJECT_UNLOCK(lt, region, obj->indx);
1689		if (ret != 0)
1690			return (ret);
1691	}
1692
1693	/* Transfer child counts to parent. */
1694	sh_parent->nlocks += sh_locker->nlocks;
1695	sh_parent->nwrites += sh_locker->nwrites;
1696
1697	return (0);
1698}
1699
1700/*
1701 * __lock_promote --
1702 *
1703 * Look through the waiters and holders lists and decide which (if any)
1704 * locks can be promoted.   Promote any that are eligible.
1705 *
1706 * PUBLIC: int __lock_promote
1707 * PUBLIC:    __P((DB_LOCKTAB *, DB_LOCKOBJ *, int *, u_int32_t));
1708 */
1709int
1710__lock_promote(lt, obj, state_changedp, flags)
1711	DB_LOCKTAB *lt;
1712	DB_LOCKOBJ *obj;
1713	int *state_changedp;
1714	u_int32_t flags;
1715{
1716	struct __db_lock *lp_w, *lp_h, *next_waiter;
1717	DB_LOCKREGION *region;
1718	int had_waiters, state_changed;
1719
1720	region = lt->reginfo.primary;
1721	had_waiters = 0;
1722
1723	/*
1724	 * We need to do lock promotion.  We also need to determine if we're
1725	 * going to need to run the deadlock detector again.  If we release
1726	 * locks, and there are waiters, but no one gets promoted, then we
1727	 * haven't fundamentally changed the lockmgr state, so we may still
1728	 * have a deadlock and we have to run again.  However, if there were
1729	 * no waiters, or we actually promoted someone, then we are OK and we
1730	 * don't have to run it immediately.
1731	 *
1732	 * During promotion, we look for state changes so we can return this
1733	 * information to the caller.
1734	 */
1735
1736	for (lp_w = SH_TAILQ_FIRST(&obj->waiters, __db_lock),
1737	    state_changed = lp_w == NULL;
1738	    lp_w != NULL;
1739	    lp_w = next_waiter) {
1740		had_waiters = 1;
1741		next_waiter = SH_TAILQ_NEXT(lp_w, links, __db_lock);
1742
1743		/* Waiter may have aborted or expired. */
1744		if (lp_w->status != DB_LSTAT_WAITING)
1745			continue;
1746		/* Are we switching locks? */
1747		if (LF_ISSET(DB_LOCK_NOWAITERS) && lp_w->mode == DB_LOCK_WAIT)
1748			continue;
1749
1750		SH_TAILQ_FOREACH(lp_h, &obj->holders, links, __db_lock) {
1751			if (lp_h->holder != lp_w->holder &&
1752			    CONFLICTS(lt, region, lp_h->mode, lp_w->mode)) {
1753				if (!__lock_is_parent(lt, lp_h->holder,
1754				     R_ADDR(&lt->reginfo, lp_w->holder)))
1755					break;
1756			}
1757		}
1758		if (lp_h != NULL)	/* Found a conflict. */
1759			break;
1760
1761		/* No conflict, promote the waiting lock. */
1762		SH_TAILQ_REMOVE(&obj->waiters, lp_w, links, __db_lock);
1763		lp_w->status = DB_LSTAT_PENDING;
1764		SH_TAILQ_INSERT_TAIL(&obj->holders, lp_w, links);
1765
1766		/* Wake up waiter. */
1767		MUTEX_UNLOCK(lt->env, lp_w->mtx_lock);
1768		state_changed = 1;
1769	}
1770
1771	/*
1772	 * If this object had waiters and doesn't any more, then we need
1773	 * to remove it from the dd_obj list.
1774	 */
1775	if (had_waiters && SH_TAILQ_FIRST(&obj->waiters, __db_lock) == NULL) {
1776		LOCK_DD(lt->env, region);
1777		/*
1778		 * Bump the generation when removing an object from the
1779		 * queue so that the deadlock detector will retry.
1780		 */
1781		obj->generation++;
1782		SH_TAILQ_REMOVE(&region->dd_objs, obj, dd_links, __db_lockobj);
1783		UNLOCK_DD(lt->env, region);
1784	}
1785
1786	if (state_changedp != NULL)
1787		*state_changedp = state_changed;
1788
1789	return (0);
1790}
1791
1792/*
1793 * __lock_remove_waiter --
1794 *	Any lock on the waitlist has a process waiting for it.  Therefore,
1795 * we can't return the lock to the freelist immediately.  Instead, we can
1796 * remove the lock from the list of waiters, set the status field of the
1797 * lock, and then let the process waking up return the lock to the
1798 * free list.
1799 *
1800 * This must be called with the Object bucket locked.
1801 */
1802static int
1803__lock_remove_waiter(lt, sh_obj, lockp, status)
1804	DB_LOCKTAB *lt;
1805	DB_LOCKOBJ *sh_obj;
1806	struct __db_lock *lockp;
1807	db_status_t status;
1808{
1809	DB_LOCKREGION *region;
1810	int do_wakeup;
1811
1812	region = lt->reginfo.primary;
1813
1814	do_wakeup = lockp->status == DB_LSTAT_WAITING;
1815
1816	SH_TAILQ_REMOVE(&sh_obj->waiters, lockp, links, __db_lock);
1817	lockp->links.stqe_prev = -1;
1818	lockp->status = status;
1819	if (SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL) {
1820		LOCK_DD(lt->env, region);
1821		sh_obj->generation++;
1822		SH_TAILQ_REMOVE(
1823		    &region->dd_objs,
1824		    sh_obj, dd_links, __db_lockobj);
1825		UNLOCK_DD(lt->env, region);
1826	}
1827
1828	/*
1829	 * Wake whoever is waiting on this lock.
1830	 */
1831	if (do_wakeup)
1832		MUTEX_UNLOCK(lt->env, lockp->mtx_lock);
1833
1834	return (0);
1835}
1836
1837/*
1838 * __lock_trade --
1839 *
1840 * Trade locker ids on a lock.  This is used to reassign file locks from
1841 * a transactional locker id to a long-lived locker id.  This should be
1842 * called with the region mutex held.
1843 */
1844static int
1845__lock_trade(env, lock, new_locker)
1846	ENV *env;
1847	DB_LOCK *lock;
1848	DB_LOCKER *new_locker;
1849{
1850	struct __db_lock *lp;
1851	DB_LOCKTAB *lt;
1852	int ret;
1853
1854	lt = env->lk_handle;
1855	lp = R_ADDR(&lt->reginfo, lock->off);
1856
1857	/* If the lock is already released, simply return. */
1858	if (lp->gen != lock->gen)
1859		return (DB_NOTFOUND);
1860
1861	if (new_locker == NULL) {
1862		__db_errx(env, "Locker does not exist");
1863		return (EINVAL);
1864	}
1865
1866	/* Remove the lock from its current locker. */
1867	if ((ret = __lock_freelock(lt,
1868	    lp, R_ADDR(&lt->reginfo, lp->holder), DB_LOCK_UNLINK)) != 0)
1869		return (ret);
1870
1871	/* Add lock to its new locker. */
1872	SH_LIST_INSERT_HEAD(&new_locker->heldby, lp, locker_links, __db_lock);
1873	new_locker->nlocks++;
1874	if (IS_WRITELOCK(lp->mode))
1875		new_locker->nwrites++;
1876	lp->holder = R_OFFSET(&lt->reginfo, new_locker);
1877
1878	return (0);
1879}
1880