1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 *
6 * $Id: lock_deadlock.c,v 12.33 2008/03/10 13:31:33 mjc Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/lock.h"
13#include "dbinc/log.h"
14#include "dbinc/txn.h"
15
16#define	ISSET_MAP(M, N)	((M)[(N) / 32] & (1 << ((N) % 32)))
17
18#define	CLEAR_MAP(M, N) {						\
19	u_int32_t __i;							\
20	for (__i = 0; __i < (N); __i++)					\
21		(M)[__i] = 0;						\
22}
23
24#define	SET_MAP(M, B)	((M)[(B) / 32] |= (1 << ((B) % 32)))
25#define	CLR_MAP(M, B)	((M)[(B) / 32] &= ~((u_int)1 << ((B) % 32)))
26
27#define	OR_MAP(D, S, N)	{						\
28	u_int32_t __i;							\
29	for (__i = 0; __i < (N); __i++)					\
30		D[__i] |= S[__i];					\
31}
32#define	BAD_KILLID	0xffffffff
33
34typedef struct {
35	int		valid;
36	int		self_wait;
37	int		in_abort;
38	u_int32_t	count;
39	u_int32_t	id;
40	roff_t		last_lock;
41	roff_t		last_obj;
42	u_int32_t	last_ndx;
43	u_int32_t	last_locker_id;
44	db_pgno_t	pgno;
45} locker_info;
46
47static int __dd_abort __P((ENV *, locker_info *, int *));
48static int __dd_build __P((ENV *, u_int32_t,
49	    u_int32_t **, u_int32_t *, u_int32_t *, locker_info **, int*));
50static int __dd_find __P((ENV *,
51	    u_int32_t *, locker_info *, u_int32_t, u_int32_t, u_int32_t ***));
52static int __dd_isolder __P((u_int32_t, u_int32_t, u_int32_t, u_int32_t));
53static int __dd_verify __P((locker_info *, u_int32_t *, u_int32_t *,
54	    u_int32_t *, u_int32_t, u_int32_t, u_int32_t));
55
56#ifdef DIAGNOSTIC
57static void __dd_debug
58	    __P((ENV *, locker_info *, u_int32_t *, u_int32_t, u_int32_t));
59#endif
60
61/*
62 * __lock_detect_pp --
63 *	ENV->lock_detect pre/post processing.
64 *
65 * PUBLIC: int __lock_detect_pp __P((DB_ENV *, u_int32_t, u_int32_t, int *));
66 */
67int
68__lock_detect_pp(dbenv, flags, atype, rejectp)
69	DB_ENV *dbenv;
70	u_int32_t flags, atype;
71	int *rejectp;
72{
73	DB_THREAD_INFO *ip;
74	ENV *env;
75	int ret;
76
77	env = dbenv->env;
78
79	ENV_REQUIRES_CONFIG(env,
80	    env->lk_handle, "DB_ENV->lock_detect", DB_INIT_LOCK);
81
82	/* Validate arguments. */
83	if ((ret = __db_fchk(env, "DB_ENV->lock_detect", flags, 0)) != 0)
84		return (ret);
85	switch (atype) {
86	case DB_LOCK_DEFAULT:
87	case DB_LOCK_EXPIRE:
88	case DB_LOCK_MAXLOCKS:
89	case DB_LOCK_MAXWRITE:
90	case DB_LOCK_MINLOCKS:
91	case DB_LOCK_MINWRITE:
92	case DB_LOCK_OLDEST:
93	case DB_LOCK_RANDOM:
94	case DB_LOCK_YOUNGEST:
95		break;
96	default:
97		__db_errx(env,
98	    "DB_ENV->lock_detect: unknown deadlock detection mode specified");
99		return (EINVAL);
100	}
101
102	ENV_ENTER(env, ip);
103	REPLICATION_WRAP(env, (__lock_detect(env, atype, rejectp)), 0, ret);
104	ENV_LEAVE(env, ip);
105	return (ret);
106}
107
108/*
109 * __lock_detect --
110 *	ENV->lock_detect.
111 *
112 * PUBLIC: int __lock_detect __P((ENV *, u_int32_t, int *));
113 */
114int
115__lock_detect(env, atype, rejectp)
116	ENV *env;
117	u_int32_t atype;
118	int *rejectp;
119{
120	DB_LOCKREGION *region;
121	DB_LOCKTAB *lt;
122	db_timespec now;
123	locker_info *idmap;
124	u_int32_t *bitmap, *copymap, **deadp, **free_me, *tmpmap;
125	u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers;
126	u_int32_t lock_max, txn_max;
127	int ret, status;
128
129	/*
130	 * If this environment is a replication client, then we must use the
131	 * MINWRITE detection discipline.
132	 */
133	if (IS_REP_CLIENT(env))
134		atype = DB_LOCK_MINWRITE;
135
136	free_me = NULL;
137
138	lt = env->lk_handle;
139	if (rejectp != NULL)
140		*rejectp = 0;
141
142	/* Check if a detector run is necessary. */
143
144	/* Make a pass only if auto-detect would run. */
145	region = lt->reginfo.primary;
146
147	timespecclear(&now);
148	if (region->need_dd == 0 &&
149	     (!timespecisset(&region->next_timeout) ||
150	     !__lock_expired(env, &now, &region->next_timeout))) {
151		return (0);
152	}
153	if (region->need_dd == 0)
154		atype = DB_LOCK_EXPIRE;
155
156	/* Reset need_dd, so we know we've run the detector. */
157	region->need_dd = 0;
158
159	/* Build the waits-for bitmap. */
160	ret = __dd_build(env,
161	    atype, &bitmap, &nlockers, &nalloc, &idmap, rejectp);
162	lock_max = region->stat.st_cur_maxid;
163	if (ret != 0 || atype == DB_LOCK_EXPIRE)
164		return (ret);
165
166	/* If there are no lockers, there are no deadlocks. */
167	if (nlockers == 0)
168		return (0);
169
170#ifdef DIAGNOSTIC
171	if (FLD_ISSET(env->dbenv->verbose, DB_VERB_WAITSFOR))
172		__dd_debug(env, idmap, bitmap, nlockers, nalloc);
173#endif
174
175	/* Now duplicate the bitmaps so we can verify deadlock participants. */
176	if ((ret = __os_calloc(env, (size_t)nlockers,
177	    sizeof(u_int32_t) * nalloc, &copymap)) != 0)
178		goto err;
179	memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc);
180
181	if ((ret = __os_calloc(env, sizeof(u_int32_t), nalloc, &tmpmap)) != 0)
182		goto err1;
183
184	/* Find a deadlock. */
185	if ((ret =
186	    __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadp)) != 0)
187		return (ret);
188
189	/*
190	 * We need the cur_maxid from the txn region as well.  In order
191	 * to avoid tricky synchronization between the lock and txn
192	 * regions, we simply unlock the lock region and then lock the
193	 * txn region.  This introduces a small window during which the
194	 * transaction system could then wrap.  We're willing to return
195	 * the wrong answer for "oldest" or "youngest" in those rare
196	 * circumstances.
197	 */
198	if (TXN_ON(env)) {
199		TXN_SYSTEM_LOCK(env);
200		txn_max = ((DB_TXNREGION *)
201		    env->tx_handle->reginfo.primary)->cur_maxid;
202		TXN_SYSTEM_UNLOCK(env);
203	} else
204		txn_max = TXN_MAXIMUM;
205
206	killid = BAD_KILLID;
207	free_me = deadp;
208	for (; *deadp != NULL; deadp++) {
209		if (rejectp != NULL)
210			++*rejectp;
211		killid = (u_int32_t)(*deadp - bitmap) / nalloc;
212		limit = killid;
213
214		/*
215		 * There are cases in which our general algorithm will
216		 * fail.  Returning 1 from verify indicates that the
217		 * particular locker is not only involved in a deadlock,
218		 * but that killing him will allow others to make forward
219		 * progress.  Unfortunately, there are cases where we need
220		 * to abort someone, but killing them will not necessarily
221		 * ensure forward progress (imagine N readers all trying to
222		 * acquire a write lock).
223		 * killid is only set to lockers that pass the db_verify test.
224		 * keeper will hold the best candidate even if it does
225		 * not pass db_verify.  Once we fill in killid then we do
226		 * not need a keeper, but we keep updating it anyway.
227		 */
228
229		keeper = idmap[killid].in_abort == 0 ? killid : BAD_KILLID;
230		if (keeper == BAD_KILLID ||
231		    __dd_verify(idmap, *deadp,
232		    tmpmap, copymap, nlockers, nalloc, keeper) == 0)
233			killid = BAD_KILLID;
234
235		if (killid != BAD_KILLID &&
236		    (atype == DB_LOCK_DEFAULT || atype == DB_LOCK_RANDOM))
237			goto dokill;
238
239		/*
240		 * Start with the id that we know is deadlocked, then examine
241		 * all other set bits and see if any are a better candidate
242		 * for abortion and they are genuinely part of the deadlock.
243		 * The definition of "best":
244		 *	MAXLOCKS: maximum count
245		 *	MAXWRITE: maximum write count
246		 *	MINLOCKS: minimum count
247		 *	MINWRITE: minimum write count
248		 *	OLDEST: smallest id
249		 *	YOUNGEST: largest id
250		 */
251		for (i = (limit + 1) % nlockers;
252		    i != limit;
253		    i = (i + 1) % nlockers) {
254			if (!ISSET_MAP(*deadp, i) || idmap[i].in_abort)
255				continue;
256
257			/*
258			 * Determine if we have a verified candidate
259			 * in killid, if not then compare with the
260			 * non-verified candidate in keeper.
261			 */
262			if (killid == BAD_KILLID) {
263				if (keeper == BAD_KILLID)
264					goto use_next;
265				else
266					cid = keeper;
267			} else
268				cid = killid;
269
270			switch (atype) {
271			case DB_LOCK_OLDEST:
272				if (__dd_isolder(idmap[cid].id,
273				    idmap[i].id, lock_max, txn_max))
274					continue;
275				break;
276			case DB_LOCK_YOUNGEST:
277				if (__dd_isolder(idmap[i].id,
278				    idmap[cid].id, lock_max, txn_max))
279					continue;
280				break;
281			case DB_LOCK_MAXLOCKS:
282				if (idmap[i].count < idmap[cid].count)
283					continue;
284				break;
285			case DB_LOCK_MAXWRITE:
286				if (idmap[i].count < idmap[cid].count)
287					continue;
288				break;
289			case DB_LOCK_MINLOCKS:
290			case DB_LOCK_MINWRITE:
291				if (idmap[i].count > idmap[cid].count)
292					continue;
293				break;
294			case DB_LOCK_DEFAULT:
295			case DB_LOCK_RANDOM:
296				goto dokill;
297
298			default:
299				killid = BAD_KILLID;
300				ret = EINVAL;
301				goto dokill;
302			}
303
304use_next:		keeper = i;
305			if (__dd_verify(idmap, *deadp,
306			    tmpmap, copymap, nlockers, nalloc, i))
307				killid = i;
308		}
309
310dokill:		if (killid == BAD_KILLID) {
311			if (keeper == BAD_KILLID)
312				/*
313				 * It's conceivable that under XA, the
314				 * locker could have gone away.
315				 */
316				continue;
317			else {
318				/*
319				 * Removing a single locker will not
320				 * break the deadlock, signal to run
321				 * detection again.
322				 */
323				region->need_dd = 1;
324				killid = keeper;
325			}
326		}
327
328		/* Kill the locker with lockid idmap[killid]. */
329		if ((ret = __dd_abort(env, &idmap[killid], &status)) != 0)
330			break;
331
332		/*
333		 * It's possible that the lock was already aborted; this isn't
334		 * necessarily a problem, so do not treat it as an error.
335		 */
336		if (status != 0) {
337			if (status != DB_ALREADY_ABORTED)
338				__db_errx(env,
339				    "warning: unable to abort locker %lx",
340				    (u_long)idmap[killid].id);
341		} else if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK))
342			__db_msg(env,
343			    "Aborting locker %lx", (u_long)idmap[killid].id);
344	}
345	__os_free(env, tmpmap);
346err1:	__os_free(env, copymap);
347
348err:	if (free_me != NULL)
349		__os_free(env, free_me);
350	__os_free(env, bitmap);
351	__os_free(env, idmap);
352
353	return (ret);
354}
355
356/*
357 * ========================================================================
358 * Utilities
359 */
360
361#define	DD_INVALID_ID	((u_int32_t) -1)
362
363static int
364__dd_build(env, atype, bmp, nlockers, allocp, idmap, rejectp)
365	ENV *env;
366	u_int32_t atype, **bmp, *nlockers, *allocp;
367	locker_info **idmap;
368	int *rejectp;
369{
370	struct __db_lock *lp;
371	DB_LOCKER *lip, *lockerp, *child;
372	DB_LOCKOBJ *op, *lo, *np;
373	DB_LOCKREGION *region;
374	DB_LOCKTAB *lt;
375	locker_info *id_array;
376	db_timespec now, min_timeout;
377	u_int32_t *bitmap, count, dd;
378	u_int32_t *entryp, gen, id, indx, ndx, nentries, *tmpmap;
379	u_int8_t *pptr;
380	int is_first, ret;
381
382	COMPQUIET(indx, 0);
383	lt = env->lk_handle;
384	region = lt->reginfo.primary;
385	timespecclear(&now);
386	timespecclear(&min_timeout);
387
388	/*
389	 * While we always check for expired timeouts, if we are called with
390	 * DB_LOCK_EXPIRE, then we are only checking for timeouts (i.e., not
391	 * doing deadlock detection at all).  If we aren't doing real deadlock
392	 * detection, then we can skip a significant, amount of the processing.
393	 * In particular we do not build the conflict array and our caller
394	 * needs to expect this.
395	 */
396	if (atype == DB_LOCK_EXPIRE) {
397skip:		LOCK_DD(env, region);
398		op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
399		for (; op != NULL; op = np) {
400			indx = op->indx;
401			gen = op->generation;
402			UNLOCK_DD(env, region);
403			OBJECT_LOCK_NDX(lt, region, indx);
404			if (op->generation != gen) {
405				OBJECT_UNLOCK(lt, region, indx);
406				goto skip;
407			}
408			SH_TAILQ_FOREACH(lp, &op->waiters, links, __db_lock) {
409				lockerp = (DB_LOCKER *)
410				    R_ADDR(&lt->reginfo, lp->holder);
411				if (lp->status == DB_LSTAT_WAITING) {
412					if (__lock_expired(env,
413					    &now, &lockerp->lk_expire)) {
414						lp->status = DB_LSTAT_EXPIRED;
415						MUTEX_UNLOCK(
416						    env, lp->mtx_lock);
417						if (rejectp != NULL)
418							++*rejectp;
419						continue;
420					}
421					if (!timespecisset(&min_timeout) ||
422					    timespeccmp(&min_timeout,
423					    &lockerp->lk_expire, >))
424						min_timeout =
425						    lockerp->lk_expire;
426				}
427			}
428			LOCK_DD(env, region);
429			np = SH_TAILQ_NEXT(op, dd_links, __db_lockobj);
430			OBJECT_UNLOCK(lt, region, indx);
431		}
432		UNLOCK_DD(env, region);
433		goto done;
434	}
435
436	/*
437	 * We'll check how many lockers there are, add a few more in for
438	 * good measure and then allocate all the structures.  Then we'll
439	 * verify that we have enough room when we go back in and get the
440	 * mutex the second time.
441	 */
442retry:	count = region->stat.st_nlockers;
443	if (count == 0) {
444		*nlockers = 0;
445		return (0);
446	}
447
448	if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK))
449		__db_msg(env, "%lu lockers", (u_long)count);
450
451	count += 20;
452	nentries = (u_int32_t)DB_ALIGN(count, 32) / 32;
453
454	/*
455	 * Allocate enough space for a count by count bitmap matrix.
456	 *
457	 * XXX
458	 * We can probably save the malloc's between iterations just
459	 * reallocing if necessary because count grew by too much.
460	 */
461	if ((ret = __os_calloc(env, (size_t)count,
462	    sizeof(u_int32_t) * nentries, &bitmap)) != 0)
463		return (ret);
464
465	if ((ret = __os_calloc(env,
466	    sizeof(u_int32_t), nentries, &tmpmap)) != 0) {
467		__os_free(env, bitmap);
468		return (ret);
469	}
470
471	if ((ret = __os_calloc(env,
472	    (size_t)count, sizeof(locker_info), &id_array)) != 0) {
473		__os_free(env, bitmap);
474		__os_free(env, tmpmap);
475		return (ret);
476	}
477
478	/*
479	 * Now go back in and actually fill in the matrix.
480	 */
481	if (region->stat.st_nlockers > count) {
482		__os_free(env, bitmap);
483		__os_free(env, tmpmap);
484		__os_free(env, id_array);
485		goto retry;
486	}
487
488	/*
489	 * First we go through and assign each locker a deadlock detector id.
490	 */
491	id = 0;
492	LOCK_LOCKERS(env, region);
493	SH_TAILQ_FOREACH(lip, &region->lockers, ulinks, __db_locker) {
494		if (lip->master_locker == INVALID_ROFF) {
495			lip->dd_id = id++;
496			id_array[lip->dd_id].id = lip->id;
497			switch (atype) {
498			case DB_LOCK_MINLOCKS:
499			case DB_LOCK_MAXLOCKS:
500				id_array[lip->dd_id].count = lip->nlocks;
501				break;
502			case DB_LOCK_MINWRITE:
503			case DB_LOCK_MAXWRITE:
504				id_array[lip->dd_id].count = lip->nwrites;
505				break;
506			default:
507				break;
508			}
509		} else
510			lip->dd_id = DD_INVALID_ID;
511
512	}
513	UNLOCK_LOCKERS(env, region);
514
515	/*
516	 * We only need consider objects that have waiters, so we use
517	 * the list of objects with waiters (dd_objs) instead of traversing
518	 * the entire hash table.  For each object, we traverse the waiters
519	 * list and add an entry in the waitsfor matrix for each waiter/holder
520	 * combination. We don't want to lock from the DD mutex to the
521	 * hash mutex, so we drop deadlock mutex  and get the hash mutex.  Then
522	 * check to see if the object has changed.  Once we have the object
523	 * locked then locks cannot be remove and lockers cannot go away.
524	 */
525	if (0) {
526		/* If an object has changed state, start over. */
527again:		memset(bitmap, 0, count * sizeof(u_int32_t) * nentries);
528	}
529	LOCK_DD(env, region);
530	op = SH_TAILQ_FIRST(&region->dd_objs, __db_lockobj);
531	for (; op != NULL; op = np) {
532		indx = op->indx;
533		gen = op->generation;
534		UNLOCK_DD(env, region);
535
536		OBJECT_LOCK_NDX(lt, region, indx);
537		if (gen != op->generation) {
538			OBJECT_UNLOCK(lt, region, indx);
539			goto again;
540		}
541
542		/*
543		 * First we go through and create a bit map that
544		 * represents all the holders of this object.
545		 */
546
547		CLEAR_MAP(tmpmap, nentries);
548		SH_TAILQ_FOREACH(lp, &op->holders, links, __db_lock) {
549			lockerp = (DB_LOCKER *)R_ADDR(&lt->reginfo, lp->holder);
550
551			if (lockerp->dd_id == DD_INVALID_ID) {
552				/*
553				 * If the locker was not here when we started,
554				 * then it was not deadlocked at that time.
555				 */
556				if (lockerp->master_locker == INVALID_ROFF)
557					continue;
558				dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
559				    lockerp->master_locker))->dd_id;
560				if (dd == DD_INVALID_ID)
561					continue;
562				lockerp->dd_id = dd;
563				switch (atype) {
564				case DB_LOCK_MINLOCKS:
565				case DB_LOCK_MAXLOCKS:
566					id_array[dd].count += lockerp->nlocks;
567					break;
568				case DB_LOCK_MINWRITE:
569				case DB_LOCK_MAXWRITE:
570					id_array[dd].count += lockerp->nwrites;
571					break;
572				default:
573					break;
574				}
575
576			} else
577				dd = lockerp->dd_id;
578			id_array[dd].valid = 1;
579
580			/*
581			 * If the holder has already been aborted, then
582			 * we should ignore it for now.
583			 */
584			if (lp->status == DB_LSTAT_HELD)
585				SET_MAP(tmpmap, dd);
586		}
587
588		/*
589		 * Next, for each waiter, we set its row in the matrix
590		 * equal to the map of holders we set up above.
591		 */
592		for (is_first = 1,
593		    lp = SH_TAILQ_FIRST(&op->waiters, __db_lock);
594		    lp != NULL;
595		    is_first = 0,
596		    lp = SH_TAILQ_NEXT(lp, links, __db_lock)) {
597			lockerp = (DB_LOCKER *)R_ADDR(&lt->reginfo, lp->holder);
598			if (lp->status == DB_LSTAT_WAITING) {
599				if (__lock_expired(env,
600				    &now, &lockerp->lk_expire)) {
601					lp->status = DB_LSTAT_EXPIRED;
602					MUTEX_UNLOCK(env, lp->mtx_lock);
603					if (rejectp != NULL)
604						++*rejectp;
605					continue;
606				}
607				if (!timespecisset(&min_timeout) ||
608				    timespeccmp(
609				    &min_timeout, &lockerp->lk_expire, >))
610					min_timeout = lockerp->lk_expire;
611			}
612
613			if (lockerp->dd_id == DD_INVALID_ID) {
614				dd = ((DB_LOCKER *)R_ADDR(&lt->reginfo,
615				    lockerp->master_locker))->dd_id;
616				lockerp->dd_id = dd;
617				switch (atype) {
618				case DB_LOCK_MINLOCKS:
619				case DB_LOCK_MAXLOCKS:
620					id_array[dd].count += lockerp->nlocks;
621					break;
622				case DB_LOCK_MINWRITE:
623				case DB_LOCK_MAXWRITE:
624					id_array[dd].count += lockerp->nwrites;
625					break;
626				default:
627					break;
628				}
629			} else
630				dd = lockerp->dd_id;
631			id_array[dd].valid = 1;
632
633			/*
634			 * If the transaction is pending abortion, then
635			 * ignore it on this iteration.
636			 */
637			if (lp->status != DB_LSTAT_WAITING)
638				continue;
639
640			entryp = bitmap + (nentries * dd);
641			OR_MAP(entryp, tmpmap, nentries);
642			/*
643			 * If this is the first waiter on the queue,
644			 * then we remove the waitsfor relationship
645			 * with oneself.  However, if it's anywhere
646			 * else on the queue, then we have to keep
647			 * it and we have an automatic deadlock.
648			 */
649			if (is_first) {
650				if (ISSET_MAP(entryp, dd))
651					id_array[dd].self_wait = 1;
652				CLR_MAP(entryp, dd);
653			}
654		}
655		LOCK_DD(env, region);
656		np = SH_TAILQ_NEXT(op, dd_links, __db_lockobj);
657		OBJECT_UNLOCK(lt, region, indx);
658	}
659	UNLOCK_DD(env, region);
660
661	/*
662	 * Now for each locker, record its last lock and set abort status.
663	 * We need to look at the heldby list carefully.  We have the LOCKERS
664	 * locked so they cannot go away.  The lock at the head of the
665	 * list can be removed by locking the object it points at.
666	 * Since lock memory is not freed if we get a lock we can look
667	 * at it safely but SH_LIST_FIRST is not atomic, so we check that
668	 * the list has not gone empty during that macro. We check abort
669	 * status after building the bit maps so that we will not detect
670	 * a blocked transaction without noting that it is already aborting.
671	 */
672	LOCK_LOCKERS(env, region);
673	for (id = 0; id < count; id++) {
674		if (!id_array[id].valid)
675			continue;
676		if ((ret = __lock_getlocker_int(lt,
677		    id_array[id].id, 0, &lockerp)) != 0 || lockerp == NULL)
678			continue;
679
680		/*
681		 * If this is a master transaction, try to
682		 * find one of its children's locks first,
683		 * as they are probably more recent.
684		 */
685		child = SH_LIST_FIRST(&lockerp->child_locker, __db_locker);
686		if (child != NULL) {
687			do {
688c_retry:			lp = SH_LIST_FIRST(&child->heldby, __db_lock);
689				if (SH_LIST_EMPTY(&child->heldby) || lp == NULL)
690					goto c_next;
691
692				if (F_ISSET(child, DB_LOCKER_INABORT))
693					id_array[id].in_abort = 1;
694				ndx = lp->indx;
695				OBJECT_LOCK_NDX(lt, region, ndx);
696				if (lp != SH_LIST_FIRST(
697				    &child->heldby, __db_lock) ||
698				    ndx != lp->indx) {
699					OBJECT_UNLOCK(lt, region, ndx);
700					goto c_retry;
701				}
702
703				if (lp != NULL &&
704				    lp->status == DB_LSTAT_WAITING) {
705					id_array[id].last_locker_id = child->id;
706					goto get_lock;
707				} else {
708					OBJECT_UNLOCK(lt, region, ndx);
709				}
710c_next:				child = SH_LIST_NEXT(
711				    child, child_link, __db_locker);
712			} while (child != NULL);
713		}
714
715l_retry:	lp = SH_LIST_FIRST(&lockerp->heldby, __db_lock);
716		if (!SH_LIST_EMPTY(&lockerp->heldby) && lp != NULL) {
717			ndx = lp->indx;
718			OBJECT_LOCK_NDX(lt, region, ndx);
719			if (lp != SH_LIST_FIRST(&lockerp->heldby, __db_lock) ||
720			    lp->indx != ndx) {
721				OBJECT_UNLOCK(lt, region, ndx);
722				goto l_retry;
723			}
724			id_array[id].last_locker_id = lockerp->id;
725get_lock:		id_array[id].last_lock = R_OFFSET(&lt->reginfo, lp);
726			id_array[id].last_obj = lp->obj;
727			lo = (DB_LOCKOBJ *)((u_int8_t *)lp + lp->obj);
728			id_array[id].last_ndx = lo->indx;
729			pptr = SH_DBT_PTR(&lo->lockobj);
730			if (lo->lockobj.size >= sizeof(db_pgno_t))
731				memcpy(&id_array[id].pgno,
732				    pptr, sizeof(db_pgno_t));
733			else
734				id_array[id].pgno = 0;
735			OBJECT_UNLOCK(lt, region, ndx);
736		}
737		if (F_ISSET(lockerp, DB_LOCKER_INABORT))
738			id_array[id].in_abort = 1;
739	}
740	UNLOCK_LOCKERS(env, region);
741
742	/*
743	 * Now we can release everything except the bitmap matrix that we
744	 * created.
745	 */
746	*nlockers = id;
747	*idmap = id_array;
748	*bmp = bitmap;
749	*allocp = nentries;
750	__os_free(env, tmpmap);
751done:	if (timespecisset(&region->next_timeout))
752		region->next_timeout = min_timeout;
753	return (0);
754}
755
756static int
757__dd_find(env, bmp, idmap, nlockers, nalloc, deadp)
758	ENV *env;
759	u_int32_t *bmp, nlockers, nalloc;
760	locker_info *idmap;
761	u_int32_t ***deadp;
762{
763	u_int32_t i, j, k, *mymap, *tmpmap, **retp;
764	u_int ndead, ndeadalloc;
765	int ret;
766
767#undef	INITIAL_DEAD_ALLOC
768#define	INITIAL_DEAD_ALLOC	8
769
770	ndeadalloc = INITIAL_DEAD_ALLOC;
771	ndead = 0;
772	if ((ret = __os_malloc(env,
773	    ndeadalloc * sizeof(u_int32_t *), &retp)) != 0)
774		return (ret);
775
776	/*
777	 * For each locker, OR in the bits from the lockers on which that
778	 * locker is waiting.
779	 */
780	for (mymap = bmp, i = 0; i < nlockers; i++, mymap += nalloc) {
781		if (!idmap[i].valid)
782			continue;
783		for (j = 0; j < nlockers; j++) {
784			if (!ISSET_MAP(mymap, j))
785				continue;
786
787			/* Find the map for this bit. */
788			tmpmap = bmp + (nalloc * j);
789			OR_MAP(mymap, tmpmap, nalloc);
790			if (!ISSET_MAP(mymap, i))
791				continue;
792
793			/* Make sure we leave room for NULL. */
794			if (ndead + 2 >= ndeadalloc) {
795				ndeadalloc <<= 1;
796				/*
797				 * If the alloc fails, then simply return the
798				 * deadlocks that we already have.
799				 */
800				if (__os_realloc(env,
801				    ndeadalloc * sizeof(u_int32_t *),
802				    &retp) != 0) {
803					retp[ndead] = NULL;
804					*deadp = retp;
805					return (0);
806				}
807			}
808			retp[ndead++] = mymap;
809
810			/* Mark all participants in this deadlock invalid. */
811			for (k = 0; k < nlockers; k++)
812				if (ISSET_MAP(mymap, k))
813					idmap[k].valid = 0;
814			break;
815		}
816	}
817	retp[ndead] = NULL;
818	*deadp = retp;
819	return (0);
820}
821
822static int
823__dd_abort(env, info, statusp)
824	ENV *env;
825	locker_info *info;
826	int *statusp;
827{
828	struct __db_lock *lockp;
829	DB_LOCKER *lockerp;
830	DB_LOCKOBJ *sh_obj;
831	DB_LOCKREGION *region;
832	DB_LOCKTAB *lt;
833	int ret;
834
835	*statusp = 0;
836
837	lt = env->lk_handle;
838	region = lt->reginfo.primary;
839	ret = 0;
840
841	/* We must lock so this locker cannot go away while we abort it. */
842	LOCK_LOCKERS(env, region);
843
844	/*
845	 * Get the locker.  If it's gone or was aborted while we were
846	 * detecting, return that.
847	 */
848	if ((ret = __lock_getlocker_int(lt,
849	    info->last_locker_id, 0, &lockerp)) != 0)
850		goto err;
851	if (lockerp == NULL || F_ISSET(lockerp, DB_LOCKER_INABORT)) {
852		*statusp = DB_ALREADY_ABORTED;
853		goto out;
854	}
855
856	/*
857	 * Find the locker's last lock.  It is possible for this lock to have
858	 * been freed, either though a timeout or another detector run.
859	 * First lock the lock object so it is stable.
860	 */
861
862	OBJECT_LOCK_NDX(lt, region, info->last_ndx);
863	if ((lockp = SH_LIST_FIRST(&lockerp->heldby, __db_lock)) == NULL) {
864		*statusp = DB_ALREADY_ABORTED;
865		goto done;
866	}
867	if (R_OFFSET(&lt->reginfo, lockp) != info->last_lock ||
868	    lockp->holder != R_OFFSET(&lt->reginfo, lockerp) ||
869	    lockp->obj != info->last_obj || lockp->status != DB_LSTAT_WAITING) {
870		*statusp = DB_ALREADY_ABORTED;
871		goto done;
872	}
873
874	sh_obj = (DB_LOCKOBJ *)((u_int8_t *)lockp + lockp->obj);
875
876	/* Abort lock, take it off list, and wake up this lock. */
877	lockp->status = DB_LSTAT_ABORTED;
878	SH_TAILQ_REMOVE(&sh_obj->waiters, lockp, links, __db_lock);
879
880	/*
881	 * Either the waiters list is now empty, in which case we remove
882	 * it from dd_objs, or it is not empty, in which case we need to
883	 * do promotion.
884	 */
885	if (SH_TAILQ_FIRST(&sh_obj->waiters, __db_lock) == NULL) {
886		LOCK_DD(env, region);
887		SH_TAILQ_REMOVE(&region->dd_objs,
888		    sh_obj, dd_links, __db_lockobj);
889		UNLOCK_DD(env, region);
890	} else
891		ret = __lock_promote(lt, sh_obj, NULL, 0);
892	MUTEX_UNLOCK(env, lockp->mtx_lock);
893
894	STAT(region->stat.st_ndeadlocks++);
895done:	OBJECT_UNLOCK(lt, region, info->last_ndx);
896err:
897out:	UNLOCK_LOCKERS(env, region);
898	return (ret);
899}
900
901#ifdef DIAGNOSTIC
902static void
903__dd_debug(env, idmap, bitmap, nlockers, nalloc)
904	ENV *env;
905	locker_info *idmap;
906	u_int32_t *bitmap, nlockers, nalloc;
907{
908	DB_MSGBUF mb;
909	u_int32_t i, j, *mymap;
910
911	DB_MSGBUF_INIT(&mb);
912
913	__db_msg(env, "Waitsfor array\nWaiter:\tWaiting on:");
914	for (mymap = bitmap, i = 0; i < nlockers; i++, mymap += nalloc) {
915		if (!idmap[i].valid)
916			continue;
917
918		__db_msgadd(env, &mb,				/* Waiter. */
919		    "%lx/%lu:\t", (u_long)idmap[i].id, (u_long)idmap[i].pgno);
920		for (j = 0; j < nlockers; j++)
921			if (ISSET_MAP(mymap, j))
922				__db_msgadd(env,
923				    &mb, " %lx", (u_long)idmap[j].id);
924		__db_msgadd(env, &mb, " %lu", (u_long)idmap[i].last_lock);
925		DB_MSGBUF_FLUSH(env, &mb);
926	}
927}
928#endif
929
930/*
931 * Given a bitmap that contains a deadlock, verify that the bit
932 * specified in the which parameter indicates a transaction that
933 * is actually deadlocked.  Return 1 if really deadlocked, 0 otherwise.
934 * deadmap  --  the array that identified the deadlock.
935 * tmpmap   --  a copy of the initial bitmaps from the dd_build phase.
936 * origmap  --  a temporary bit map into which we can OR things.
937 * nlockers --  the number of actual lockers under consideration.
938 * nalloc   --  the number of words allocated for the bitmap.
939 * which    --  the locker in question.
940 */
941static int
942__dd_verify(idmap, deadmap, tmpmap, origmap, nlockers, nalloc, which)
943	locker_info *idmap;
944	u_int32_t *deadmap, *tmpmap, *origmap;
945	u_int32_t nlockers, nalloc, which;
946{
947	u_int32_t *tmap;
948	u_int32_t j;
949	int count;
950
951	memset(tmpmap, 0, sizeof(u_int32_t) * nalloc);
952
953	/*
954	 * In order for "which" to be actively involved in
955	 * the deadlock, removing him from the evaluation
956	 * must remove the deadlock.  So, we OR together everyone
957	 * except which; if all the participants still have their
958	 * bits set, then the deadlock persists and which does
959	 * not participate.  If the deadlock does not persist
960	 * then "which" does participate.
961	 */
962	count = 0;
963	for (j = 0; j < nlockers; j++) {
964		if (!ISSET_MAP(deadmap, j) || j == which)
965			continue;
966
967		/* Find the map for this bit. */
968		tmap = origmap + (nalloc * j);
969
970		/*
971		 * We special case the first waiter who is also a holder, so
972		 * we don't automatically call that a deadlock.  However, if
973		 * it really is a deadlock, we need the bit set now so that
974		 * we treat the first waiter like other waiters.
975		 */
976		if (idmap[j].self_wait)
977			SET_MAP(tmap, j);
978		OR_MAP(tmpmap, tmap, nalloc);
979		count++;
980	}
981
982	if (count == 1)
983		return (1);
984
985	/*
986	 * Now check the resulting map and see whether
987	 * all participants still have their bit set.
988	 */
989	for (j = 0; j < nlockers; j++) {
990		if (!ISSET_MAP(deadmap, j) || j == which)
991			continue;
992		if (!ISSET_MAP(tmpmap, j))
993			return (1);
994	}
995	return (0);
996}
997
998/*
999 * __dd_isolder --
1000 *
1001 * Figure out the relative age of two lockers.  We make all lockers
1002 * older than all transactions, because that's how it's worked
1003 * historically (because lockers are lower ids).
1004 */
1005static int
1006__dd_isolder(a, b, lock_max, txn_max)
1007	u_int32_t	a, b;
1008	u_int32_t	lock_max, txn_max;
1009{
1010	u_int32_t max;
1011
1012	/* Check for comparing lock-id and txnid. */
1013	if (a <= DB_LOCK_MAXID && b > DB_LOCK_MAXID)
1014		return (1);
1015	if (b <= DB_LOCK_MAXID && a > DB_LOCK_MAXID)
1016		return (0);
1017
1018	/* In the same space; figure out which one. */
1019	max = txn_max;
1020	if (a <= DB_LOCK_MAXID)
1021		max = lock_max;
1022
1023	/*
1024	 * We can't get a 100% correct ordering, because we don't know
1025	 * where the current interval started and if there were older
1026	 * lockers outside the interval.  We do the best we can.
1027	 */
1028
1029	/*
1030	 * Check for a wrapped case with ids above max.
1031	 */
1032	if (a > max && b < max)
1033		return (1);
1034	if (b > max && a < max)
1035		return (0);
1036
1037	return (a < b);
1038}
1039