1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 *
6 * $Id: lock_deadlock.c,v 12.33 2008/03/10 13:31:33 mjc Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/lock.h"
13#include "dbinc/log.h"
14#include "dbinc/txn.h"
15
16#define	ISSET_MAP(M, N)	((M)[(N) / 32] & (1 << ((N) % 32)))
17
18#define	CLEAR_MAP(M, N) {						\
19	u_int32_t __i;							\
20	for (__i = 0; __i < (N); __i++)					\
21		(M)[__i] = 0;						\
22}
23
24#define	SET_MAP(M, B)	((M)[(B) / 32] |= (1 << ((B) % 32)))
25#define	CLR_MAP(M, B)	((M)[(B) / 32] &= ~((u_int)1 << ((B) % 32)))
26
27#define	OR_MAP(D, S, N)	{						\
28	u_int32_t __i;							\
29	for (__i = 0; __i < (N); __i++)					\
30		D[__i] |= S[__i];					\
31}
32#define	BAD_KILLID	0xffffffff
33
34typedef struct {
35	int		valid;
36	int		self_wait;
37	int		in_abort;
38	u_int32_t	count;
39	u_int32_t	id;
40	roff_t		last_lock;
41	roff_t		last_obj;
42	u_int32_t	last_ndx;
43	u_int32_t	last_locker_id;
44	db_pgno_t	pgno;
45} locker_info;
46
47static int __dd_abort __P((ENV *, locker_info *, int *));
48static int __dd_build __P((ENV *, u_int32_t,
49	    u_int32_t **, u_int32_t *, u_int32_t *, locker_info **, int*));
50static int __dd_find __P((ENV *,
51	    u_int32_t *, locker_info *, u_int32_t, u_int32_t, u_int32_t ***));
52static int __dd_isolder __P((u_int32_t, u_int32_t, u_int32_t, u_int32_t));
53static int __dd_verify __P((locker_info *, u_int32_t *, u_int32_t *,
54	    u_int32_t *, u_int32_t, u_int32_t, u_int32_t));
55
56#ifdef DIAGNOSTIC
57static void __dd_debug
58	    __P((ENV *, locker_info *, u_int32_t *, u_int32_t, u_int32_t));
59#endif
60
61/*
62 * __lock_detect_pp --
63 *	ENV->lock_detect pre/post processing.
64 *
65 * PUBLIC: int __lock_detect_pp __P((DB_ENV *, u_int32_t, u_int32_t, int *));
66 */
67int
68__lock_detect_pp(dbenv, flags, atype, rejectp)
69	DB_ENV *dbenv;
70	u_int32_t flags, atype;
71	int *rejectp;
72{
73	DB_THREAD_INFO *ip;
74	ENV *env;
75	int ret;
76
77	env = dbenv->env;
78
79	ENV_REQUIRES_CONFIG(env,
80	    env->lk_handle, "DB_ENV->lock_detect", DB_INIT_LOCK);
81
82	/* Validate arguments. */
83	if ((ret = __db_fchk(env, "DB_ENV->lock_detect", flags, 0)) != 0)
84		return (ret);
85	switch (atype) {
86	case DB_LOCK_DEFAULT:
87	case DB_LOCK_EXPIRE:
88	case DB_LOCK_MAXLOCKS:
89	case DB_LOCK_MAXWRITE:
90	case DB_LOCK_MINLOCKS:
91	case DB_LOCK_MINWRITE:
92	case DB_LOCK_OLDEST:
93	case DB_LOCK_RANDOM:
94	case DB_LOCK_YOUNGEST:
95		break;
96	default:
97		__db_errx(env,
98	    "DB_ENV->lock_detect: unknown deadlock detection mode specified");
99		return (EINVAL);
100	}
101
102	ENV_ENTER(env, ip);
103	REPLICATION_WRAP(env, (__lock_detect(env, atype, rejectp)), 0, ret);
104	ENV_LEAVE(env, ip);
105	return (ret);
106}
107
108/*
109 * __lock_detect --
110 *	ENV->lock_detect.
111 *
112 * PUBLIC: int __lock_detect __P((ENV *, u_int32_t, int *));
113 */
114int
115__lock_detect(env, atype, rejectp)
116	ENV *env;
117	u_int32_t atype;
118	int *rejectp;
119{
120	DB_LOCKREGION *region;
121	DB_LOCKTAB *lt;
122	db_timespec now;
123	locker_info *idmap;
124	u_int32_t *bitmap, *copymap, **deadp, **deadlist, *tmpmap;
125	u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers;
126	u_int32_t lock_max, txn_max;
127	int ret, status;
128
129	/*
130	 * If this environment is a replication client, then we must use the
131	 * MINWRITE detection discipline.
132	 */
133	if (IS_REP_CLIENT(env))
134		atype = DB_LOCK_MINWRITE;
135
136	copymap = tmpmap = NULL;
137	deadlist = NULL;
138
139	lt = env->lk_handle;
140	if (rejectp != NULL)
141		*rejectp = 0;
142
143	/* Check if a detector run is necessary. */
144
145	/* Make a pass only if auto-detect would run. */
146	region = lt->reginfo.primary;
147
148	timespecclear(&now);
149	if (region->need_dd == 0 &&
150	     (!timespecisset(&region->next_timeout) ||
151	     !__lock_expired(env, &now, &region->next_timeout))) {
152		return (0);
153	}
154	if (region->need_dd == 0)
155		atype = DB_LOCK_EXPIRE;
156
157	/* Reset need_dd, so we know we've run the detector. */
158	region->need_dd = 0;
159
160	/* Build the waits-for bitmap. */
161	ret = __dd_build(env,
162	    atype, &bitmap, &nlockers, &nalloc, &idmap, rejectp);
163	lock_max = region->stat.st_cur_maxid;
164	if (ret != 0 || atype == DB_LOCK_EXPIRE)
165		return (ret);
166
167	/* If there are no lockers, there are no deadlocks. */
168	if (nlockers == 0)
169		return (0);
170
171#ifdef DIAGNOSTIC
172	if (FLD_ISSET(env->dbenv->verbose, DB_VERB_WAITSFOR))
173		__dd_debug(env, idmap, bitmap, nlockers, nalloc);
174#endif
175
176	/* Now duplicate the bitmaps so we can verify deadlock participants. */
177	if ((ret = __os_calloc(env, (size_t)nlockers,
178	    sizeof(u_int32_t) * nalloc, &copymap)) != 0)
179		goto err;
180	memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc);
181
182	if ((ret = __os_calloc(env, sizeof(u_int32_t), nalloc, &tmpmap)) != 0)
183		goto err;
184
185	/* Find a deadlock. */
186	if ((ret =
187	    __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadlist)) != 0)
188		return (ret);
189
190	/*
191	 * We need the cur_maxid from the txn region as well.  In order
192	 * to avoid tricky synchronization between the lock and txn
193	 * regions, we simply unlock the lock region and then lock the
194	 * txn region.  This introduces a small window during which the
195	 * transaction system could then wrap.  We're willing to return
196	 * the wrong answer for "oldest" or "youngest" in those rare
197	 * circumstances.
198	 */
199	if (TXN_ON(env)) {
200		TXN_SYSTEM_LOCK(env);
201		txn_max = ((DB_TXNREGION *)
202		    env->tx_handle->reginfo.primary)->cur_maxid;
203		TXN_SYSTEM_UNLOCK(env);
204	} else
205		txn_max = TXN_MAXIMUM;
206
207	killid = BAD_KILLID;
208	for (deadp = deadlist; *deadp != NULL; deadp++) {
209		if (rejectp != NULL)
210			++*rejectp;
211		killid = (u_int32_t)(*deadp - bitmap) / nalloc;
212		limit = killid;
213
214		/*
215		 * There are cases in which our general algorithm will
216		 * fail.  Returning 1 from verify indicates that the
217		 * particular locker is not only involved in a deadlock,
218		 * but that killing him will allow others to make forward
219		 * progress.  Unfortunately, there are cases where we need
220		 * to abort someone, but killing them will not necessarily
221		 * ensure forward progress (imagine N readers all trying to
222		 * acquire a write lock).
223		 * killid is only set to lockers that pass the db_verify test.
224		 * keeper will hold the best candidate even if it does
225		 * not pass db_verify.  Once we fill in killid then we do
226		 * not need a keeper, but we keep updating it anyway.
227		 */
228
229		keeper = idmap[killid].in_abort == 0 ? killid : BAD_KILLID;
230		if (keeper == BAD_KILLID ||
231		    __dd_verify(idmap, *deadp,
232		    tmpmap, copymap, nlockers, nalloc, keeper) == 0)
233			killid = BAD_KILLID;
234
235		if (killid != BAD_KILLID &&
236		    (atype == DB_LOCK_DEFAULT || atype == DB_LOCK_RANDOM))
237			goto dokill;
238
239		/*
240		 * Start with the id that we know is deadlocked, then examine
241		 * all other set bits and see if any are a better candidate
242		 * for abortion and they are genuinely part of the deadlock.
243		 * The definition of "best":
244		 *	MAXLOCKS: maximum count
245		 *	MAXWRITE: maximum write count
246		 *	MINLOCKS: minimum count
247		 *	MINWRITE: minimum write count
248		 *	OLDEST: smallest id
249		 *	YOUNGEST: largest id
250		 */
251		for (i = (limit + 1) % nlockers;
252		    i != limit;
253		    i = (i + 1) % nlockers) {
254			if (!ISSET_MAP(*deadp, i) || idmap[i].in_abort)
255				continue;
256
257			/*
258			 * Determine if we have a verified candidate
259			 * in killid, if not then compare with the
260			 * non-verified candidate in keeper.
261			 */
262			if (killid == BAD_KILLID) {
263				if (keeper == BAD_KILLID)
264					goto use_next;
265				else
266					cid = keeper;
267			} else
268				cid = killid;
269
270			switch (atype) {
271			case DB_LOCK_OLDEST:
272				if (__dd_isolder(idmap[cid].id,
273				    idmap[i].id, lock_max, txn_max))
274					continue;
275				break;
276			case DB_LOCK_YOUNGEST:
277				if (__dd_isolder(idmap[i].id,
278				    idmap[cid].id, lock_max, txn_max))
279					continue;
280				break;
281			case DB_LOCK_MAXLOCKS:
282				if (idmap[i].count < idmap[cid].count)
283					continue;
284				break;
285			case DB_LOCK_MAXWRITE:
286				if (idmap[i].count < idmap[cid].count)
287					continue;
288				break;
289			case DB_LOCK_MINLOCKS:
290			case DB_LOCK_MINWRITE:
291				if (idmap[i].count > idmap[cid].count)
292					continue;
293				break;
294			case DB_LOCK_DEFAULT:
295			case DB_LOCK_RANDOM:
296				goto dokill;
297
298			default:
299				killid = BAD_KILLID;
300				ret = EINVAL;
301				goto dokill;
302			}
303
304use_next:		keeper = i;
305			if (__dd_verify(idmap, *deadp,
306			    tmpmap, copymap, nlockers, nalloc, i))
307				killid = i;
308		}
309
310dokill:		if (killid == BAD_KILLID) {
311			if (keeper == BAD_KILLID)
312				/*
313				 * It's conceivable that under XA, the
314				 * locker could have gone away.
315				 */
316				continue;
317			else {
318				/*
319				 * Removing a single locker will not
320				 * break the deadlock, signal to run
321				 * detection again.
322				 */
323				region->need_dd = 1;
324				killid = keeper;
325			}
326		}
327
328		/* Kill the locker with lockid idmap[killid]. */
329		if ((ret = __dd_abort(env, &idmap[killid], &status)) != 0)
330			break;
331
332		/*
333		 * It's possible that the lock was already aborted; this isn't
334		 * necessarily a problem, so do not treat it as an error.
335		 */
336		if (status != 0) {
337			if (status != DB_ALREADY_ABORTED)
338				__db_errx(env,
339				    "warning: unable to abort locker %lx",
340				    (u_long)idmap[killid].id);
341		} else if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK))
342			__db_msg(env,
343			    "Aborting locker %lx", (u_long)idmap[killid].id);
344	}
345err:	if(copymap != NULL)
346		__os_free(env, copymap);
347	if (deadlist != NULL)
348		__os_free(env, deadlist);
349	if(tmpmap != NULL)
350		__os_free(env, tmpmap);
351	__os_free(env, bitmap);
352	__os_free(env, idmap);
353
354	return (ret);
355}
356
357/*
358 * ========================================================================
359 * Utilities
360 */
361
362#define	DD_INVALID_ID	((u_int32_t) -1)
363
364/*
365 * __dd_build --
366 *	Build the lock dependency bit maps.
367 * Notes on syncronization:
368 *	LOCK_SYSTEM_LOCK is used to hold objects locked when we have
369 *		a single partition.
370 *	LOCK_LOCKERS is held while we are walking the lockers list and
371 *		to single thread the use of lockerp->dd_id.
372 *	LOCK_DD protects the DD list of objects.
373 */
374
375static int
376__dd_build(env, atype, bmp, nlockers, allocp, idmap, rejectp)
377	ENV *env;
378	u_int32_t atype, **bmp, *nlockers, *allocp;
379	locker_info **idmap;
380	int *rejectp;
381{
382	struct __db_lock *lp;
383	DB_LOCKER *lip, *lockerp, *child;
384	DB_LOCKOBJ *op, *lo, *np;
385	DB_LOCKREGION *region;
386	DB_LOCKTAB *lt;
387	locker_info *id_array;
388	db_timespec now, min_timeout;
389	u_int32_t *bitmap, count, dd;
390	u_int32_t *entryp, gen, id, indx, ndx, nentries, *tmpmap;
391	u_int8_t *pptr;
392	int is_first, ret;
393
394	COMPQUIET(indx, 0);
395	lt = env->lk_handle;
396	region = lt->reginfo.primary;
397	timespecclear(&now);
398	timespecclear(&min_timeout);
399
400	/*
401	 * While we always check for expired timeouts, if we are called with
402	 * DB_LOCK_EXPIRE, then we are only checking for timeouts (i.e., not
403	 * doing deadlock detection at all).  If we aren't doing real deadlock
404	 * detection, then we can skip a significant, amount of the processing.
405	 * In particular we do not build the conflict array and our caller
406	 * needs to expect this.
407	 */
408	LOCK_SYSTEM_LOCK(lt, region);
409	if (atype == DB_LOCK_EXPIRE) {
410skip:		LOCK_DD(env, region);
411		op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
412		for (; op != NULL; op = np) {
413			indx = op->indx;
414			gen = op->generation;
415			UNLOCK_DD(env, region);
416			OBJECT_LOCK_NDX(lt, region, indx);
417			if (op->generation != gen) {
418				OBJECT_UNLOCK(lt, region, indx);
419				goto skip;
420			}
421			SH_TAILQ_FOREACH(lp, &op->waiters, links, __db_lock) {
422				lockerp = (DB_LOCKER *)
423				    R_ADDR(&lt->reginfo, lp->holder);
424				if (lp->status == DB_LSTAT_WAITING) {
425					if (__lock_expired(env,
426					    &now, &lockerp->lk_expire)) {
427						lp->status = DB_LSTAT_EXPIRED;
428						MUTEX_UNLOCK(
429						    env, lp->mtx_lock);
430						if (rejectp != NULL)
431							++*rejectp;
432						continue;
433					}
434					if (!timespecisset(&min_timeout) ||
435					    timespeccmp(&min_timeout,
436					    &lockerp->lk_expire, >))
437						min_timeout =
438						    lockerp->lk_expire;
439				}
440			}
441			LOCK_DD(env, region);
442			np = SH_TAILQ_NEXT(op, dd_links, __db_lockobj);
443			OBJECT_UNLOCK(lt, region, indx);
444		}
445		UNLOCK_DD(env, region);
446		LOCK_SYSTEM_UNLOCK(lt, region);
447		goto done;
448	}
449
450	/*
451	 * Allocate after locking the region
452	 * to make sure the structures are large enough.
453	 */
454	LOCK_LOCKERS(env, region);
455	count = region->stat.st_nlockers;
456	if (count == 0) {
457		UNLOCK_LOCKERS(env, region);
458		*nlockers = 0;
459		return (0);
460	}
461
462	if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK))
463		__db_msg(env, "%lu lockers", (u_long)count);
464
465	nentries = (u_int32_t)DB_ALIGN(count, 32) / 32;
466
467	/* Allocate enough space for a count by count bitmap matrix. */
468	if ((ret = __os_calloc(env, (size_t)count,
469	    sizeof(u_int32_t) * nentries, &bitmap)) != 0) {
470		UNLOCK_LOCKERS(env, region);
471		return (ret);
472	}
473
474	if ((ret = __os_calloc(env,
475	    sizeof(u_int32_t), nentries, &tmpmap)) != 0) {
476		UNLOCK_LOCKERS(env, region);
477		__os_free(env, bitmap);
478		return (ret);
479	}
480
481	if ((ret = __os_calloc(env,
482	    (size_t)count, sizeof(locker_info), &id_array)) != 0) {
483		UNLOCK_LOCKERS(env, region);
484		__os_free(env, bitmap);
485		__os_free(env, tmpmap);
486		return (ret);
487	}
488
489	/*
490	 * First we go through and assign each locker a deadlock detector id.
491	 */
492	id = 0;
493	SH_TAILQ_FOREACH(lip, &region->lockers, ulinks, __db_locker) {
494		if (lip->master_locker == INVALID_ROFF) {
495			DB_ASSERT(env, id < count);
496			lip->dd_id = id++;
497			id_array[lip->dd_id].id = lip->id;
498			switch (atype) {
499			case DB_LOCK_MINLOCKS:
500			case DB_LOCK_MAXLOCKS:
501				id_array[lip->dd_id].count = lip->nlocks;
502				break;
503			case DB_LOCK_MINWRITE:
504			case DB_LOCK_MAXWRITE:
505				id_array[lip->dd_id].count = lip->nwrites;
506				break;
507			default:
508				break;
509			}
510		} else
511			lip->dd_id = DD_INVALID_ID;
512
513	}
514
515	/*
516	 * We only need consider objects that have waiters, so we use
517	 * the list of objects with waiters (dd_objs) instead of traversing
518	 * the entire hash table.  For each object, we traverse the waiters
519	 * list and add an entry in the waitsfor matrix for each waiter/holder
520	 * combination. We don't want to lock from the DD mutex to the
521	 * hash mutex, so we drop deadlock mutex  and get the hash mutex.  Then
522	 * check to see if the object has changed.  Once we have the object
523	 * locked then locks cannot be remove and lockers cannot go away.
524	 */
525	if (0) {
526		/* If an object has changed state, start over. */
527again:		memset(bitmap, 0, count * sizeof(u_int32_t) * nentries);
528	}
529	LOCK_DD(env, region);
530	op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
531	for (; op != NULL; op = np) {
532		indx = op->indx;
533		gen = op->generation;
534		UNLOCK_DD(env, region);
535
536		OBJECT_LOCK_NDX(lt, region, indx);
537		if (gen != op->generation) {
538			OBJECT_UNLOCK(lt, region, indx);
539			goto again;
540		}
541
542		/*
543		 * First we go through and create a bit map that
544		 * represents all the holders of this object.
545		 */
546
547		CLEAR_MAP(tmpmap, nentries);
548		SH_TAILQ_FOREACH(lp, &op->holders, links, __db_lock) {
549			lockerp = (DB_LOCKER *)R_ADDR(&lt->reginfo, lp->holder);
550
551			if (lockerp->dd_id == DD_INVALID_ID) {
552				/*
553				 * If the locker was not here when we started,
554				 * then it was not deadlocked at that time.
555				 */
556				if (lockerp->master_locker == INVALID_ROFF)
557					continue;
558				dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
559				    lockerp->master_locker))->dd_id;
560				if (dd == DD_INVALID_ID)
561					continue;
562				lockerp->dd_id = dd;
563				switch (atype) {
564				case DB_LOCK_MINLOCKS:
565				case DB_LOCK_MAXLOCKS:
566					id_array[dd].count += lockerp->nlocks;
567					break;
568				case DB_LOCK_MINWRITE:
569				case DB_LOCK_MAXWRITE:
570					id_array[dd].count += lockerp->nwrites;
571					break;
572				default:
573					break;
574				}
575
576			} else
577				dd = lockerp->dd_id;
578			id_array[dd].valid = 1;
579
580			/*
581			 * If the holder has already been aborted, then
582			 * we should ignore it for now.
583			 */
584			if (lp->status == DB_LSTAT_HELD)
585				SET_MAP(tmpmap, dd);
586		}
587
588		/*
589		 * Next, for each waiter, we set its row in the matrix
590		 * equal to the map of holders we set up above.
591		 */
592		for (is_first = 1,
593		    lp = SH_TAILQ_FIRST(&op->waiters, __db_lock);
594		    lp != NULL;
595		    is_first = 0,
596		    lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
597			lockerp = (DB_LOCKER *)R_ADDR(&lt->reginfo, lp->holder);
598			if (lp->status == DB_LSTAT_WAITING) {
599				if (__lock_expired(env,
600				    &now, &lockerp->lk_expire)) {
601					lp->status = DB_LSTAT_EXPIRED;
602					MUTEX_UNLOCK(env, lp->mtx_lock);
603					if (rejectp != NULL)
604						++*rejectp;
605					continue;
606				}
607				if (!timespecisset(&min_timeout) ||
608				    timespeccmp(
609				    &min_timeout, &lockerp->lk_expire, >))
610					min_timeout = lockerp->lk_expire;
611			}
612
613			if (lockerp->dd_id == DD_INVALID_ID) {
614				dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
615				    lockerp->master_locker))->dd_id;
616				lockerp->dd_id = dd;
617				switch (atype) {
618				case DB_LOCK_MINLOCKS:
619				case DB_LOCK_MAXLOCKS:
620					id_array[dd].count += lockerp->nlocks;
621					break;
622				case DB_LOCK_MINWRITE:
623				case DB_LOCK_MAXWRITE:
624					id_array[dd].count += lockerp->nwrites;
625					break;
626				default:
627					break;
628				}
629			} else
630				dd = lockerp->dd_id;
631			id_array[dd].valid = 1;
632
633			/*
634			 * If the transaction is pending abortion, then
635			 * ignore it on this iteration.
636			 */
637			if (lp->status != DB_LSTAT_WAITING)
638				continue;
639
640			entryp = bitmap + (nentries * dd);
641			OR_MAP(entryp, tmpmap, nentries);
642			/*
643			 * If this is the first waiter on the queue,
644			 * then we remove the waitsfor relationship
645			 * with oneself.  However, if it's anywhere
646			 * else on the queue, then we have to keep
647			 * it and we have an automatic deadlock.
648			 */
649			if (is_first) {
650				if (ISSET_MAP(entryp, dd))
651					id_array[dd].self_wait = 1;
652				CLR_MAP(entryp, dd);
653			}
654		}
655		LOCK_DD(env, region);
656		np = SH_TAILQ_NEXT(op, dd_links, __db_lockobj);
657		OBJECT_UNLOCK(lt, region, indx);
658	}
659	UNLOCK_DD(env, region);
660
661	/*
662	 * Now for each locker, record its last lock and set abort status.
663	 * We need to look at the heldby list carefully.  We have the LOCKERS
664	 * locked so they cannot go away.  The lock at the head of the
665	 * list can be removed by locking the object it points at.
666	 * Since lock memory is not freed if we get a lock we can look
667	 * at it safely but SH_LIST_FIRST is not atomic, so we check that
668	 * the list has not gone empty during that macro. We check abort
669	 * status after building the bit maps so that we will not detect
670	 * a blocked transaction without noting that it is already aborting.
671	 */
672	for (id = 0; id < count; id++) {
673		if (!id_array[id].valid)
674			continue;
675		if ((ret = __lock_getlocker_int(lt,
676		    id_array[id].id, 0, &lockerp)) != 0 || lockerp == NULL)
677			continue;
678
679		/*
680		 * If this is a master transaction, try to
681		 * find one of its children's locks first,
682		 * as they are probably more recent.
683		 */
684		child = SH_LIST_FIRST(&lockerp->child_locker, __db_locker);
685		if (child != NULL) {
686			do {
687c_retry:			lp = SH_LIST_FIRST(&child->heldby, __db_lock);
688				if (SH_LIST_EMPTY(&child->heldby) || lp == NULL)
689					goto c_next;
690
691				if (F_ISSET(child, DB_LOCKER_INABORT))
692					id_array[id].in_abort = 1;
693				ndx = lp->indx;
694				OBJECT_LOCK_NDX(lt, region, ndx);
695				if (lp != SH_LIST_FIRST(
696				    &child->heldby, __db_lock) ||
697				    ndx != lp->indx) {
698					OBJECT_UNLOCK(lt, region, ndx);
699					goto c_retry;
700				}
701
702				if (lp != NULL &&
703				    lp->status == DB_LSTAT_WAITING) {
704					id_array[id].last_locker_id = child->id;
705					goto get_lock;
706				} else {
707					OBJECT_UNLOCK(lt, region, ndx);
708				}
709c_next:				child = SH_LIST_NEXT(
710				    child, child_link, __db_locker);
711			} while (child != NULL);
712		}
713
714l_retry:	lp = SH_LIST_FIRST(&lockerp->heldby, __db_lock);
715		if (!SH_LIST_EMPTY(&lockerp->heldby) && lp != NULL) {
716			ndx = lp->indx;
717			OBJECT_LOCK_NDX(lt, region, ndx);
718			if (lp != SH_LIST_FIRST(&lockerp->heldby, __db_lock) ||
719			    lp->indx != ndx) {
720				OBJECT_UNLOCK(lt, region, ndx);
721				goto l_retry;
722			}
723			id_array[id].last_locker_id = lockerp->id;
724get_lock:		id_array[id].last_lock = R_OFFSET(&lt->reginfo, lp);
725			id_array[id].last_obj = lp->obj;
726			lo = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
727			id_array[id].last_ndx = lo->indx;
728			pptr = SH_DBT_PTR(&lo->lockobj);
729			if (lo->lockobj.size >= sizeof(db_pgno_t))
730				memcpy(&id_array[id].pgno,
731				    pptr, sizeof(db_pgno_t));
732			else
733				id_array[id].pgno = 0;
734			OBJECT_UNLOCK(lt, region, ndx);
735		}
736		if (F_ISSET(lockerp, DB_LOCKER_INABORT))
737			id_array[id].in_abort = 1;
738	}
739	UNLOCK_LOCKERS(env, region);
740	LOCK_SYSTEM_UNLOCK(lt, region);
741
742	/*
743	 * Now we can release everything except the bitmap matrix that we
744	 * created.
745	 */
746	*nlockers = id;
747	*idmap = id_array;
748	*bmp = bitmap;
749	*allocp = nentries;
750	__os_free(env, tmpmap);
751done:	if (timespecisset(&region->next_timeout))
752		region->next_timeout = min_timeout;
753	return (0);
754}
755
756static int
757__dd_find(env, bmp, idmap, nlockers, nalloc, deadp)
758	ENV *env;
759	u_int32_t *bmp, nlockers, nalloc;
760	locker_info *idmap;
761	u_int32_t ***deadp;
762{
763	u_int32_t i, j, k, *mymap, *tmpmap, **retp;
764	u_int ndead, ndeadalloc;
765	int ret;
766
767#undef	INITIAL_DEAD_ALLOC
768#define	INITIAL_DEAD_ALLOC	8
769
770	ndeadalloc = INITIAL_DEAD_ALLOC;
771	ndead = 0;
772	if ((ret = __os_malloc(env,
773	    ndeadalloc * sizeof(u_int32_t *), &retp)) != 0)
774		return (ret);
775
776	/*
777	 * For each locker, OR in the bits from the lockers on which that
778	 * locker is waiting.
779	 */
780	for (mymap = bmp, i = 0; i < nlockers; i++, mymap += nalloc) {
781		if (!idmap[i].valid)
782			continue;
783		for (j = 0; j < nlockers; j++) {
784			if (!ISSET_MAP(mymap, j))
785				continue;
786
787			/* Find the map for this bit. */
788			tmpmap = bmp + (nalloc * j);
789			OR_MAP(mymap, tmpmap, nalloc);
790			if (!ISSET_MAP(mymap, i))
791				continue;
792
793			/* Make sure we leave room for NULL. */
794			if (ndead + 2 >= ndeadalloc) {
795				ndeadalloc <<= 1;
796				/*
797				 * If the alloc fails, then simply return the
798				 * deadlocks that we already have.
799				 */
800				if (__os_realloc(env,
801				    ndeadalloc * sizeof(u_int32_t *),
802				    &retp) != 0) {
803					retp[ndead] = NULL;
804					*deadp = retp;
805					return (0);
806				}
807			}
808			retp[ndead++] = mymap;
809
810			/* Mark all participants in this deadlock invalid. */
811			for (k = 0; k < nlockers; k++)
812				if (ISSET_MAP(mymap, k))
813					idmap[k].valid = 0;
814			break;
815		}
816	}
817	retp[ndead] = NULL;
818	*deadp = retp;
819	return (0);
820}
821
822static int
823__dd_abort(env, info, statusp)
824	ENV *env;
825	locker_info *info;
826	int *statusp;
827{
828	struct __db_lock *lockp;
829	DB_LOCKER *lockerp;
830	DB_LOCKOBJ *sh_obj;
831	DB_LOCKREGION *region;
832	DB_LOCKTAB *lt;
833	int ret;
834
835	*statusp = 0;
836
837	lt = env->lk_handle;
838	region = lt->reginfo.primary;
839	ret = 0;
840
841	/* We must lock so this locker cannot go away while we abort it. */
842	LOCK_SYSTEM_LOCK(lt, region);
843	LOCK_LOCKERS(env, region);
844
845	/*
846	 * Get the locker.  If it's gone or was aborted while we were
847	 * detecting, return that.
848	 */
849	if ((ret = __lock_getlocker_int(lt,
850	    info->last_locker_id, 0, &lockerp)) != 0)
851		goto err;
852	if (lockerp == NULL || F_ISSET(lockerp, DB_LOCKER_INABORT)) {
853		*statusp = DB_ALREADY_ABORTED;
854		goto out;
855	}
856
857	/*
858	 * Find the locker's last lock.  It is possible for this lock to have
859	 * been freed, either though a timeout or another detector run.
860	 * First lock the lock object so it is stable.
861	 */
862
863	OBJECT_LOCK_NDX(lt, region, info->last_ndx);
864	if ((lockp = SH_LIST_FIRST(&lockerp->heldby, __db_lock)) == NULL) {
865		*statusp = DB_ALREADY_ABORTED;
866		goto done;
867	}
868	if (R_OFFSET(&lt->reginfo, lockp) != info->last_lock ||
869	    lockp->holder != R_OFFSET(&lt->reginfo, lockerp) ||
870	    lockp->obj != info->last_obj || lockp->status != DB_LSTAT_WAITING) {
871		*statusp = DB_ALREADY_ABORTED;
872		goto done;
873	}
874
875	sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
876
877	/* Abort lock, take it off list, and wake up this lock. */
878	lockp->status = DB_LSTAT_ABORTED;
879	SH_TAILQ_REMOVE(&sh_obj->waiters, lockp, links, __db_lock);
880
881	/*
882	 * Either the waiters list is now empty, in which case we remove
883	 * it from dd_objs, or it is not empty, in which case we need to
884	 * do promotion.
885	 */
886	if (SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL) {
887		LOCK_DD(env, region);
888		SH_TAILQ_REMOVE(&region->dd_objs,
889		    sh_obj, dd_links, __db_lockobj);
890		UNLOCK_DD(env, region);
891	} else
892		ret = __lock_promote(lt, sh_obj, NULL, 0);
893	MUTEX_UNLOCK(env, lockp->mtx_lock);
894
895	STAT(region->stat.st_ndeadlocks++);
896done:	OBJECT_UNLOCK(lt, region, info->last_ndx);
897err:
898out:	UNLOCK_LOCKERS(env, region);
899	LOCK_SYSTEM_UNLOCK(lt, region);
900	return (ret);
901}
902
903#ifdef DIAGNOSTIC
904static void
905__dd_debug(env, idmap, bitmap, nlockers, nalloc)
906	ENV *env;
907	locker_info *idmap;
908	u_int32_t *bitmap, nlockers, nalloc;
909{
910	DB_MSGBUF mb;
911	u_int32_t i, j, *mymap;
912
913	DB_MSGBUF_INIT(&mb);
914
915	__db_msg(env, "Waitsfor array\nWaiter:\tWaiting on:");
916	for (mymap = bitmap, i = 0; i < nlockers; i++, mymap += nalloc) {
917		if (!idmap[i].valid)
918			continue;
919
920		__db_msgadd(env, &mb,				/* Waiter. */
921		    "%lx/%lu:\t", (u_long)idmap[i].id, (u_long)idmap[i].pgno);
922		for (j = 0; j < nlockers; j++)
923			if (ISSET_MAP(mymap, j))
924				__db_msgadd(env,
925				    &mb, " %lx", (u_long)idmap[j].id);
926		__db_msgadd(env, &mb, " %lu", (u_long)idmap[i].last_lock);
927		DB_MSGBUF_FLUSH(env, &mb);
928	}
929}
930#endif
931
932/*
933 * Given a bitmap that contains a deadlock, verify that the bit
934 * specified in the which parameter indicates a transaction that
935 * is actually deadlocked.  Return 1 if really deadlocked, 0 otherwise.
936 * deadmap  --  the array that identified the deadlock.
937 * tmpmap   --  a copy of the initial bitmaps from the dd_build phase.
938 * origmap  --  a temporary bit map into which we can OR things.
939 * nlockers --  the number of actual lockers under consideration.
940 * nalloc   --  the number of words allocated for the bitmap.
941 * which    --  the locker in question.
942 */
943static int
944__dd_verify(idmap, deadmap, tmpmap, origmap, nlockers, nalloc, which)
945	locker_info *idmap;
946	u_int32_t *deadmap, *tmpmap, *origmap;
947	u_int32_t nlockers, nalloc, which;
948{
949	u_int32_t *tmap;
950	u_int32_t j;
951	int count;
952
953	memset(tmpmap, 0, sizeof(u_int32_t) * nalloc);
954
955	/*
956	 * In order for "which" to be actively involved in
957	 * the deadlock, removing him from the evaluation
958	 * must remove the deadlock.  So, we OR together everyone
959	 * except which; if all the participants still have their
960	 * bits set, then the deadlock persists and which does
961	 * not participate.  If the deadlock does not persist
962	 * then "which" does participate.
963	 */
964	count = 0;
965	for (j = 0; j < nlockers; j++) {
966		if (!ISSET_MAP(deadmap, j) || j == which)
967			continue;
968
969		/* Find the map for this bit. */
970		tmap = origmap + (nalloc * j);
971
972		/*
973		 * We special case the first waiter who is also a holder, so
974		 * we don't automatically call that a deadlock.  However, if
975		 * it really is a deadlock, we need the bit set now so that
976		 * we treat the first waiter like other waiters.
977		 */
978		if (idmap[j].self_wait)
979			SET_MAP(tmap, j);
980		OR_MAP(tmpmap, tmap, nalloc);
981		count++;
982	}
983
984	if (count == 1)
985		return (1);
986
987	/*
988	 * Now check the resulting map and see whether
989	 * all participants still have their bit set.
990	 */
991	for (j = 0; j < nlockers; j++) {
992		if (!ISSET_MAP(deadmap, j) || j == which)
993			continue;
994		if (!ISSET_MAP(tmpmap, j))
995			return (1);
996	}
997	return (0);
998}
999
1000/*
1001 * __dd_isolder --
1002 *
1003 * Figure out the relative age of two lockers.  We make all lockers
1004 * older than all transactions, because that's how it's worked
1005 * historically (because lockers are lower ids).
1006 */
1007static int
1008__dd_isolder(a, b, lock_max, txn_max)
1009	u_int32_t	a, b;
1010	u_int32_t	lock_max, txn_max;
1011{
1012	u_int32_t max;
1013
1014	/* Check for comparing lock-id and txnid. */
1015	if (a <= DB_LOCK_MAXID && b > DB_LOCK_MAXID)
1016		return (1);
1017	if (b <= DB_LOCK_MAXID && a > DB_LOCK_MAXID)
1018		return (0);
1019
1020	/* In the same space; figure out which one. */
1021	max = txn_max;
1022	if (a <= DB_LOCK_MAXID)
1023		max = lock_max;
1024
1025	/*
1026	 * We can't get a 100% correct ordering, because we don't know
1027	 * where the current interval started and if there were older
1028	 * lockers outside the interval.  We do the best we can.
1029	 */
1030
1031	/*
1032	 * Check for a wrapped case with ids above max.
1033	 */
1034	if (a > max && b < max)
1035		return (1);
1036	if (b > max && a < max)
1037		return (0);
1038
1039	return (a < b);
1040}
1041