1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994, 1995, 1996
8 *	Keith Bostic.  All rights reserved.
9 */
10/*
11 * Copyright (c) 1990, 1993, 1994, 1995
12 *	The Regents of the University of California.  All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Mike Olson.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 *    notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 *    notice, this list of conditions and the following disclaimer in the
24 *    documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 *    may be used to endorse or promote products derived from this software
27 *    without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id$
42 */
43
44#include "db_config.h"
45
46#include "db_int.h"
47#include "dbinc/db_page.h"
48#include "dbinc/btree.h"
49#include "dbinc/lock.h"
50#include "dbinc/mp.h"
51
52/*
53 * __bam_get_root --
54 *	Fetch the root of a tree and see if we want to keep
55 * it in the stack.
56 *
57 * PUBLIC: int __bam_get_root __P((DBC *, db_pgno_t, int, u_int32_t, int *));
58 */
59int
60__bam_get_root(dbc, pg, slevel, flags, stack)
61	DBC *dbc;
62	db_pgno_t pg;
63	int slevel;
64	u_int32_t flags;
65	int *stack;
66{
67	BTREE_CURSOR *cp;
68	DB *dbp;
69	DB_LOCK lock;
70	DB_MPOOLFILE *mpf;
71	PAGE *h;
72	db_lockmode_t lock_mode;
73	u_int32_t get_mode;
74	int ret, t_ret;
75
76	LOCK_INIT(lock);
77	dbp = dbc->dbp;
78	mpf = dbp->mpf;
79	cp = (BTREE_CURSOR *)dbc->internal;
80	/*
81	 * If write-locking pages, we need to know whether or not to acquire a
82	 * write lock on a page before getting it.  This depends on how deep it
83	 * is in tree, which we don't know until we acquire the root page.  So,
84	 * if we need to lock the root page we may have to upgrade it later,
85	 * because we won't get the correct lock initially.
86	 *
87	 * Retrieve the root page.
88	 */
89try_again:
90	*stack = LF_ISSET(SR_STACK) &&
91	      (dbc->dbtype == DB_RECNO || F_ISSET(cp, C_RECNUM));
92	lock_mode = DB_LOCK_READ;
93	if (*stack ||
94	    LF_ISSET(SR_DEL) || (LF_ISSET(SR_NEXT) && LF_ISSET(SR_WRITE)))
95		lock_mode = DB_LOCK_WRITE;
96	if ((lock_mode == DB_LOCK_WRITE || F_ISSET(dbc, DBC_DOWNREV) ||
97	    dbc->dbtype == DB_RECNO || F_ISSET(cp, C_RECNUM))) {
98lock_it:	if ((ret = __db_lget(dbc, 0, pg, lock_mode, 0, &lock)) != 0)
99			return (ret);
100	}
101
102	/*
103	 * Get the root.  If the root happens to be a leaf page then
104	 * we are supposed to get a read lock on it before latching
105	 * it.  So if we have not locked it do a try get first.
106	 * If we can't get the root shared, then get a lock on it and
107	 * then wait for the latch.
108	 */
109	if (lock_mode == DB_LOCK_WRITE)
110		get_mode = DB_MPOOL_DIRTY;
111	else if (LOCK_ISSET(lock) || !STD_LOCKING(dbc))
112		get_mode = 0;
113	else
114		get_mode = DB_MPOOL_TRY;
115
116	if ((ret = __memp_fget(mpf, &pg,
117	    dbc->thread_info, dbc->txn, get_mode, &h)) != 0) {
118		if (ret == DB_LOCK_NOTGRANTED)
119			goto lock_it;
120		/* Did not read it, so we can release the lock */
121		(void)__LPUT(dbc, lock);
122		return (ret);
123	}
124
125	/*
126	 * Decide if we need to dirty and/or lock this page.
127	 * We must not hold the latch while we get the lock.
128	 */
129	if (!*stack &&
130	    ((LF_ISSET(SR_PARENT) && (u_int8_t)(slevel + 1) >= LEVEL(h)) ||
131	    LEVEL(h) == LEAFLEVEL ||
132	    (LF_ISSET(SR_START) && slevel == LEVEL(h)))) {
133		*stack = 1;
134		/* If we already have the write lock, we are done. */
135		if (dbc->dbtype == DB_RECNO || F_ISSET(cp, C_RECNUM)) {
136			if (lock_mode == DB_LOCK_WRITE)
137				goto done;
138			if ((ret = __LPUT(dbc, lock)) != 0)
139				return (ret);
140		}
141
142		/*
143		 * Now that we know what level the root is at, do we need a
144		 * write lock?  If not and we got the lock before latching
145		 * we are done.
146		 */
147		if (LEVEL(h) != LEAFLEVEL || LF_ISSET(SR_WRITE)) {
148			lock_mode = DB_LOCK_WRITE;
149			/* Drop the read lock if we got it above. */
150			if ((ret = __LPUT(dbc, lock)) != 0)
151				return (ret);
152		} else if (LOCK_ISSET(lock))
153			goto done;
154		if (!STD_LOCKING(dbc)) {
155			if (lock_mode != DB_LOCK_WRITE)
156				goto done;
157			if ((ret = __memp_dirty(mpf, &h, dbc->thread_info,
158			    dbc->txn, dbc->priority, 0)) != 0) {
159				if (h != NULL)
160					(void)__memp_fput(mpf,
161					    dbc->thread_info, h, dbc->priority);
162				return (ret);
163			}
164		} else {
165			/* Try to lock the page without waiting first. */
166			if ((ret = __db_lget(dbc,
167			    0, pg, lock_mode, DB_LOCK_NOWAIT, &lock)) == 0) {
168				if (lock_mode == DB_LOCK_WRITE && (ret =
169				    __memp_dirty(mpf, &h, dbc->thread_info,
170				    dbc->txn, dbc->priority, 0)) != 0) {
171					if (h != NULL)
172						(void)__memp_fput(mpf,
173						    dbc->thread_info, h,
174						    dbc->priority);
175					return (ret);
176				}
177				goto done;
178			}
179
180			t_ret = __memp_fput(mpf,
181			    dbc->thread_info, h, dbc->priority);
182
183			if (ret == DB_LOCK_DEADLOCK ||
184			    ret == DB_LOCK_NOTGRANTED)
185				ret = 0;
186			if (ret == 0)
187				ret = t_ret;
188
189			if (ret != 0)
190				return (ret);
191
192			if ((ret = __db_lget(dbc,
193			     0, pg, lock_mode, 0, &lock)) != 0)
194				return (ret);
195			if ((ret = __memp_fget(mpf,
196			     &pg, dbc->thread_info, dbc->txn,
197			     lock_mode == DB_LOCK_WRITE ? DB_MPOOL_DIRTY : 0,
198			     &h)) != 0) {
199				/* Did not read it, release the lock */
200				(void)__LPUT(dbc, lock);
201				return (ret);
202			}
203		}
204		/*
205		 * While getting dirty or locked we need to drop the mutex
206		 * so someone else could get in and split the root.
207		 */
208		if (!((LF_ISSET(SR_PARENT) &&
209		    (u_int8_t)(slevel + 1) >= LEVEL(h)) ||
210		    LEVEL(h) == LEAFLEVEL ||
211		    (LF_ISSET(SR_START) && slevel == LEVEL(h)))) {
212			/* Someone else split the root, start over. */
213			ret = __memp_fput(mpf,
214			    dbc->thread_info, h, dbc->priority);
215			if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
216				ret = t_ret;
217			if (ret != 0)
218				return (ret);
219			goto try_again;
220		}
221	}
222
223done:	BT_STK_ENTER(dbp->env, cp, h, 0, lock, lock_mode, ret);
224
225	return (ret);
226}
227
228/*
229 * __bam_search --
230 *	Search a btree for a key.
231 *
232 * PUBLIC: int __bam_search __P((DBC *, db_pgno_t,
233 * PUBLIC:     const DBT *, u_int32_t, int, db_recno_t *, int *));
234 */
235int
236__bam_search(dbc, root_pgno, key, flags, slevel, recnop, exactp)
237	DBC *dbc;
238	db_pgno_t root_pgno;
239	const DBT *key;
240	u_int32_t flags;
241	int slevel, *exactp;
242	db_recno_t *recnop;
243{
244	BTREE *t;
245	BTREE_CURSOR *cp;
246	DB *dbp;
247	DB_LOCK lock, saved_lock;
248	DB_MPOOLFILE *mpf;
249	ENV *env;
250	PAGE *h, *parent_h;
251	db_indx_t base, i, indx, *inp, lim;
252	db_lockmode_t lock_mode;
253	db_pgno_t pg, saved_pg;
254	db_recno_t recno;
255	int adjust, cmp, deloffset, ret, set_stack, stack, t_ret;
256	int getlock, was_next;
257	int (*func) __P((DB *, const DBT *, const DBT *));
258	u_int32_t get_mode, wait;
259	u_int8_t level, saved_level;
260
261	dbp = dbc->dbp;
262	env = dbp->env;
263	mpf = dbp->mpf;
264	cp = (BTREE_CURSOR *)dbc->internal;
265	h = NULL;
266	parent_h = NULL;
267	t = dbp->bt_internal;
268	recno = 0;
269	t_ret = 0;
270
271	BT_STK_CLR(cp);
272	LOCK_INIT(saved_lock);
273	LOCK_INIT(lock);
274	was_next = LF_ISSET(SR_NEXT);
275	wait = DB_LOCK_NOWAIT;
276
277	/*
278	 * There are several ways we search a btree tree.  The flags argument
279	 * specifies if we're acquiring read or write latches, if we position
280	 * to the first or last item in a set of duplicates, if we return
281	 * deleted items, and if we are latching pairs of pages.  In addition,
282	 * if we're modifying record numbers, we have to latch the entire tree
283	 * regardless.  See btree.h for more details.
284	 */
285
286	if (root_pgno == PGNO_INVALID)
287		root_pgno = cp->root;
288	saved_pg = root_pgno;
289	saved_level = MAXBTREELEVEL;
290retry:	if ((ret = __bam_get_root(dbc, root_pgno, slevel, flags, &stack)) != 0)
291		goto err;
292	lock_mode = cp->csp->lock_mode;
293	get_mode = lock_mode == DB_LOCK_WRITE ? DB_MPOOL_DIRTY : 0;
294	h = cp->csp->page;
295	pg = PGNO(h);
296	lock = cp->csp->lock;
297	set_stack = stack;
298	/*
299	 * Determine if we need to lock interiror nodes.
300	 * If we have record numbers we always lock.  Otherwise we only
301	 * need to do this if we are write locking and we are returning
302	 * a stack of nodes.  SR_NEXT will eventually get a stack and
303	 * release the locks above that level.
304	 */
305	if (F_ISSET(dbc, DBC_DOWNREV)) {
306		getlock = 1;
307		wait = 0;
308	} else
309		getlock = F_ISSET(cp, C_RECNUM) ||
310		   (lock_mode == DB_LOCK_WRITE &&
311		   (stack || LF_ISSET(SR_NEXT | SR_DEL)));
312
313	/*
314	 * If we are asked a level that is above the root,
315	 * just return the root.  This can happen if the tree
316	 * collapses while we are trying to lock the root.
317	 */
318	if (!LF_ISSET(SR_START) && LEVEL(h) < slevel)
319		goto done;
320
321	BT_STK_CLR(cp);
322
323	/* Choose a comparison function. */
324	func = F_ISSET(dbc, DBC_OPD) ?
325	    (dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare) :
326	    t->bt_compare;
327
328	for (;;) {
329		if (TYPE(h) == P_LBTREE)
330			adjust = P_INDX;
331		else {
332			/*
333			 * It is possible to catch an internal page as a change
334			 * is being backed out.  Its leaf pages will be locked
335			 * but we must be sure we get to one.  If the page
336			 * is not populated enough lock it.
337			 */
338			if (TYPE(h) != P_LDUP && NUM_ENT(h) == 0) {
339				getlock = 1;
340				level = LEVEL(h) + 1;
341				if ((ret = __memp_fput(mpf, dbc->thread_info,
342				     h, dbc->priority)) != 0)
343					goto err;
344				goto lock_next;
345			}
346			adjust = O_INDX;
347		}
348		inp = P_INP(dbp, h);
349		if (LF_ISSET(SR_MIN | SR_MAX)) {
350			if (LF_ISSET(SR_MIN) || NUM_ENT(h) == 0)
351				indx = 0;
352			else if (TYPE(h) == P_LBTREE)
353				indx = NUM_ENT(h) - 2;
354			else
355				indx = NUM_ENT(h) - 1;
356
357			if (LEVEL(h) == LEAFLEVEL ||
358			     (!LF_ISSET(SR_START) && LEVEL(h) == slevel)) {
359				if (LF_ISSET(SR_NEXT))
360					goto get_next;
361				goto found;
362			}
363			goto next;
364		}
365		/*
366		 * Do a binary search on the current page.  If we're searching
367		 * a Btree leaf page, we have to walk the indices in groups of
368		 * two.  If we're searching an internal page or a off-page dup
369		 * page, they're an index per page item.  If we find an exact
370		 * match on a leaf page, we're done.
371		 */
372		DB_BINARY_SEARCH_FOR(base, lim, NUM_ENT(h), adjust) {
373			DB_BINARY_SEARCH_INCR(indx, base, lim, adjust);
374			if ((ret = __bam_cmp(dbc, key, h, indx,
375			    func, &cmp)) != 0)
376				goto err;
377			if (cmp == 0) {
378				if (LEVEL(h) == LEAFLEVEL ||
379				    (!LF_ISSET(SR_START) &&
380				    LEVEL(h) == slevel)) {
381					if (LF_ISSET(SR_NEXT))
382						goto get_next;
383					goto found;
384				}
385				goto next;
386			}
387			if (cmp > 0)
388				DB_BINARY_SEARCH_SHIFT_BASE(indx, base,
389				    lim, adjust);
390		}
391
392		/*
393		 * No match found.  Base is the smallest index greater than
394		 * key and may be zero or a last + O_INDX index.
395		 *
396		 * If it's a leaf page or the stopping point,
397		 * return base as the "found" value.
398		 * Delete only deletes exact matches.
399		 */
400		if (LEVEL(h) == LEAFLEVEL ||
401		    (!LF_ISSET(SR_START) && LEVEL(h) == slevel)) {
402			*exactp = 0;
403
404			if (LF_ISSET(SR_EXACT)) {
405				ret = DB_NOTFOUND;
406				goto err;
407			}
408
409			if (LF_ISSET(SR_STK_ONLY)) {
410				BT_STK_NUM(env, cp, h, base, ret);
411				if ((t_ret =
412				    __LPUT(dbc, lock)) != 0 && ret == 0)
413					ret = t_ret;
414				if ((t_ret = __memp_fput(mpf, dbc->thread_info,
415				     h, dbc->priority)) != 0 && ret == 0)
416					ret = t_ret;
417				h = NULL;
418				if (ret != 0)
419					goto err;
420				goto done;
421			}
422			if (LF_ISSET(SR_NEXT)) {
423get_next:			/*
424				 * The caller could have asked for a NEXT
425				 * at the root if the tree recently collapsed.
426				 */
427				if (PGNO(h) == root_pgno) {
428					ret = DB_NOTFOUND;
429					goto err;
430				}
431
432				indx = cp->sp->indx + 1;
433				if (indx == NUM_ENT(cp->sp->page)) {
434					ret = DB_NOTFOUND;
435					cp->csp++;
436					goto err;
437				}
438				/*
439				 * If we want both the key page and the next
440				 * page, push the key page on the stack
441				 * otherwise save the root of the subtree
442				 * and drop the rest of the subtree.
443				 * Search down again starting at the
444				 * next child of the root of this subtree.
445				 */
446				LF_SET(SR_MIN);
447				LF_CLR(SR_NEXT);
448				set_stack = stack = 1;
449				if (LF_ISSET(SR_BOTH)) {
450					cp->csp++;
451					BT_STK_PUSH(env,
452					    cp, h, indx, lock, lock_mode, ret);
453					if (ret != 0)
454						goto err;
455					LOCK_INIT(lock);
456					h = cp->sp->page;
457					pg = GET_BINTERNAL(dbp, h, indx)->pgno;
458					level = LEVEL(h);
459					h = NULL;
460					goto lock_next;
461				} else {
462					if ((ret = __LPUT(dbc, lock)) != 0)
463						goto err;
464					if ((ret = __memp_fput(mpf,
465					    dbc->thread_info,
466					    h, dbc->priority)) != 0)
467						goto err;
468					h = cp->sp->page;
469					cp->sp->page = NULL;
470					lock = cp->sp->lock;
471					LOCK_INIT(cp->sp->lock);
472					if ((ret = __bam_stkrel(dbc,
473					    STK_NOLOCK)) != 0)
474						goto err;
475					goto next;
476				}
477			}
478
479			/*
480			 * !!!
481			 * Possibly returning a deleted record -- DB_SET_RANGE,
482			 * DB_KEYFIRST and DB_KEYLAST don't require an exact
483			 * match, and we don't want to walk multiple pages here
484			 * to find an undeleted record.  This is handled by the
485			 * calling routine.
486			 */
487			if (LF_ISSET(SR_DEL) && cp->csp == cp->sp)
488				cp->csp++;
489			BT_STK_ENTER(env, cp, h, base, lock, lock_mode, ret);
490			if (ret != 0)
491				goto err;
492			goto done;
493		}
494
495		/*
496		 * If it's not a leaf page, record the internal page (which is
497		 * a parent page for the key).  Decrement the base by 1 if it's
498		 * non-zero so that if a split later occurs, the inserted page
499		 * will be to the right of the saved page.
500		 */
501		indx = base > 0 ? base - O_INDX : base;
502
503		/*
504		 * If we're trying to calculate the record number, sum up
505		 * all the record numbers on this page up to the indx point.
506		 */
507next:		if (recnop != NULL)
508			for (i = 0; i < indx; ++i)
509				recno += GET_BINTERNAL(dbp, h, i)->nrecs;
510
511		pg = GET_BINTERNAL(dbp, h, indx)->pgno;
512		level = LEVEL(h);
513
514		/* See if we are at the level to start stacking. */
515		if (LF_ISSET(SR_START) && slevel == level)
516			set_stack = stack = 1;
517
518		if (LF_ISSET(SR_STK_ONLY)) {
519			if (slevel == LEVEL(h)) {
520				BT_STK_NUM(env, cp, h, indx, ret);
521				if ((t_ret = __memp_fput(mpf, dbc->thread_info,
522				    h, dbc->priority)) != 0 && ret == 0)
523					ret = t_ret;
524				h = NULL;
525				if (ret != 0)
526					goto err;
527				goto done;
528			}
529			BT_STK_NUMPUSH(env, cp, h, indx, ret);
530			(void)__memp_fput(mpf,
531			    dbc->thread_info, h, dbc->priority);
532			h = NULL;
533		} else if (stack) {
534			/* Return if this is the lowest page wanted. */
535			if (LF_ISSET(SR_PARENT) && slevel == level) {
536				BT_STK_ENTER(env,
537				    cp, h, indx, lock, lock_mode, ret);
538				if (ret != 0)
539					goto err;
540				goto done;
541			}
542			if (LF_ISSET(SR_DEL) && NUM_ENT(h) > 1) {
543				/*
544				 * There was a page with a singleton pointer
545				 * to a non-empty subtree.
546				 */
547				cp->csp--;
548				if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0)
549					goto err;
550				set_stack = stack = 0;
551				goto do_del;
552			}
553			BT_STK_PUSH(env,
554			    cp, h, indx, lock, lock_mode, ret);
555			if (ret != 0)
556				goto err;
557
558			LOCK_INIT(lock);
559			get_mode = DB_MPOOL_DIRTY;
560			lock_mode = DB_LOCK_WRITE;
561			goto lock_next;
562		} else {
563			/*
564			 * Decide if we want to return a reference to the next
565			 * page in the return stack.  If so, latch it and don't
566			 * unlatch it.  We will want to stack things on the
567			 * next iteration.  The stack variable cannot be
568			 * set until we leave this clause. If we are locking
569			 * then we must lock this level before getting the page.
570			 */
571			if ((LF_ISSET(SR_PARENT) &&
572			    (u_int8_t)(slevel + 1) >= (level - 1)) ||
573			    (level - 1) == LEAFLEVEL)
574				set_stack = 1;
575
576			/*
577			 * Check for a normal search.  If so, we need to
578			 * latch couple the parent/chid buffers.
579			 */
580			if (!LF_ISSET(SR_DEL | SR_NEXT)) {
581				parent_h = h;
582				goto lock_next;
583			}
584
585			/*
586			 * Returning a subtree.  See if we have hit the start
587			 * point if so save the parent and set stack.
588			 * Otherwise free the parent and temporarily
589			 * save this one.
590			 * For SR_DEL we need to find a page with 1 entry.
591			 * For SR_NEXT we want find the minimal subtree
592			 * that contains the key and the next page.
593			 * We save pages as long as we are at the right
594			 * edge of the subtree.  When we leave the right
595			 * edge, then drop the subtree.
596			 */
597
598			if ((LF_ISSET(SR_DEL) && NUM_ENT(h) == 1)) {
599				/*
600				 * We are pushing the things on the stack,
601				 * set the stack variable now to indicate this
602				 * has happened.
603				 */
604				stack = set_stack = 1;
605				LF_SET(SR_WRITE);
606				/* Push the parent. */
607				cp->csp++;
608				/* Push this node. */
609				BT_STK_PUSH(env, cp, h,
610				     indx, lock, DB_LOCK_NG, ret);
611				if (ret != 0)
612					goto err;
613				LOCK_INIT(lock);
614			} else {
615			/*
616			 * See if we want to save the tree so far.
617			 * If we are looking for the next key,
618			 * then we must save this node if we are
619			 * at the end of the page.  If not then
620			 * discard anything we have saved so far.
621			 * For delete only keep one node until
622			 * we find a singleton.
623			 */
624do_del:				if (cp->csp->page != NULL) {
625					if (LF_ISSET(SR_NEXT) &&
626					     indx == NUM_ENT(h) - 1)
627						cp->csp++;
628					else if ((ret =
629					    __bam_stkrel(dbc, STK_NOLOCK)) != 0)
630						goto err;
631				}
632				/* Save this node. */
633				BT_STK_ENTER(env, cp,
634				    h, indx, lock, lock_mode, ret);
635				if (ret != 0)
636					goto err;
637				LOCK_INIT(lock);
638			}
639
640lock_next:		h = NULL;
641
642			if (set_stack && LF_ISSET(SR_WRITE)) {
643				lock_mode = DB_LOCK_WRITE;
644				get_mode = DB_MPOOL_DIRTY;
645				getlock = 1;
646			}
647			/*
648			 * If we are retrying and we are back at the same
649			 * page then we already have it locked.  If we are
650			 * at a different page we want to lock couple and
651			 * release that lock.
652			 */
653			if (level - 1 == saved_level) {
654				if ((ret = __LPUT(dbc, lock)) != 0)
655					goto err;
656				lock = saved_lock;
657				LOCK_INIT(saved_lock);
658				saved_level = MAXBTREELEVEL;
659				if (pg == saved_pg)
660					goto skip_lock;
661			}
662			if ((getlock || level - 1 == LEAFLEVEL) &&
663			    (ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
664			    pg, lock_mode, wait, &lock)) != 0) {
665				/*
666				 * If we are doing DEL or NEXT then we
667				 * have an extra level saved in the stack,
668				 * push it so it will get freed.
669				 */
670				if (LF_ISSET(SR_DEL | SR_NEXT) && !stack)
671					cp->csp++;
672				/*
673				 * If we fail, discard the lock we held.
674				 * This is ok because we will either search
675				 * again or exit without actually looking
676				 * at the data.
677				 */
678				if ((t_ret = __LPUT(dbc, lock)) != 0 &&
679				    ret == 0)
680					ret = t_ret;
681				/*
682				 * If we blocked at a different level release
683				 * the previous saved lock.
684				 */
685				if ((t_ret = __LPUT(dbc, saved_lock)) != 0 &&
686				    ret == 0)
687					ret = t_ret;
688				if (wait == 0 || (ret != DB_LOCK_NOTGRANTED &&
689				     ret != DB_LOCK_DEADLOCK))
690					goto err;
691
692				/* Relase the parent if we are holding it. */
693				if (parent_h != NULL &&
694				    (ret = __memp_fput(mpf, dbc->thread_info,
695				    parent_h, dbc->priority)) != 0)
696					goto err;
697				parent_h = NULL;
698
699				BT_STK_POP(cp);
700				if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0)
701					goto err;
702				if ((ret = __db_lget(dbc,
703				    0, pg, lock_mode, 0, &saved_lock)) != 0)
704					goto err;
705				/*
706				 * A very strange case: if this page was
707				 * freed while we wait then we cannot hold
708				 * the lock on it while we reget the root
709				 * latch because allocation is one place
710				 * we lock while holding a latch.
711				 * Noone can have a free page locked, so
712				 * check for that case.  We do this by
713				 * checking the level, since it will be 0
714				 * if free and we might as well see if this
715				 * page moved and drop the lock in that case.
716				 */
717				if ((ret = __memp_fget(mpf, &pg,
718				     dbc->thread_info,
719				     dbc->txn, get_mode, &h)) != 0 &&
720				     ret != DB_PAGE_NOTFOUND)
721					goto err;
722
723				if (ret != 0 || LEVEL(h) != level - 1) {
724					ret = __LPUT(dbc, saved_lock);
725					if (ret != 0)
726						goto err;
727					pg = root_pgno;
728					saved_level = MAXBTREELEVEL;
729				}
730				if (h != NULL && (ret = __memp_fput(mpf,
731				    dbc->thread_info, h, dbc->priority)) != 0)
732					goto err;
733				h = NULL;
734
735				if (was_next) {
736					LF_CLR(SR_MIN);
737					LF_SET(SR_NEXT);
738				}
739				/*
740				 * We have the lock but we dropped the
741				 * latch so we need to search again. If
742				 * we get back to the same page then all
743				 * is good, otherwise we need to try to
744				 * lock the new page.
745				 */
746				saved_pg = pg;
747				saved_level = level - 1;
748				goto retry;
749			}
750skip_lock:		stack = set_stack;
751		}
752		/* Get the child page. */
753		if ((ret = __memp_fget(mpf, &pg,
754		     dbc->thread_info, dbc->txn, get_mode, &h)) != 0)
755			goto err;
756		/* Release the parent. */
757		if (parent_h != NULL && (ret = __memp_fput(mpf,
758		    dbc->thread_info, parent_h, dbc->priority)) != 0)
759			goto err;
760		parent_h = NULL;
761	}
762	/* NOTREACHED */
763
764found:	*exactp = 1;
765
766	/*
767	 * If we got here, we know that we have a Btree leaf or off-page
768	 * duplicates page.  If it's a Btree leaf page, we have to handle
769	 * on-page duplicates.
770	 *
771	 * If there are duplicates, go to the first/last one.  This is
772	 * safe because we know that we're not going to leave the page,
773	 * all duplicate sets that are not on overflow pages exist on a
774	 * single leaf page.
775	 */
776	if (TYPE(h) == P_LBTREE && NUM_ENT(h) > P_INDX) {
777		if (LF_ISSET(SR_DUPLAST))
778			while (indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
779			    inp[indx] == inp[indx + P_INDX])
780				indx += P_INDX;
781		else if (LF_ISSET(SR_DUPFIRST))
782			while (indx > 0 &&
783			    inp[indx] == inp[indx - P_INDX])
784				indx -= P_INDX;
785	}
786
787	/*
788	 * Now check if we are allowed to return deleted items; if not, then
789	 * find the next (or previous) non-deleted duplicate entry.  (We do
790	 * not move from the original found key on the basis of the SR_DELNO
791	 * flag.)
792	 */
793	DB_ASSERT(env, recnop == NULL || LF_ISSET(SR_DELNO));
794	if (LF_ISSET(SR_DELNO)) {
795		deloffset = TYPE(h) == P_LBTREE ? O_INDX : 0;
796		if (LF_ISSET(SR_DUPLAST))
797			while (B_DISSET(GET_BKEYDATA(dbp,
798			    h, indx + deloffset)->type) && indx > 0 &&
799			    inp[indx] == inp[indx - adjust])
800				indx -= adjust;
801		else
802			while (B_DISSET(GET_BKEYDATA(dbp,
803			    h, indx + deloffset)->type) &&
804			    indx < (db_indx_t)(NUM_ENT(h) - adjust) &&
805			    inp[indx] == inp[indx + adjust])
806				indx += adjust;
807
808		/*
809		 * If we weren't able to find a non-deleted duplicate, return
810		 * DB_NOTFOUND.
811		 */
812		if (B_DISSET(GET_BKEYDATA(dbp, h, indx + deloffset)->type)) {
813			ret = DB_NOTFOUND;
814			goto err;
815		}
816
817		/*
818		 * Increment the record counter to point to the found element.
819		 * Ignore any deleted key/data pairs.  There doesn't need to
820		 * be any correction for duplicates, as Btree doesn't support
821		 * duplicates and record numbers in the same tree.
822		 */
823		if (recnop != NULL) {
824			DB_ASSERT(env, TYPE(h) == P_LBTREE);
825
826			for (i = 0; i < indx; i += P_INDX)
827				if (!B_DISSET(
828				    GET_BKEYDATA(dbp, h, i + O_INDX)->type))
829					++recno;
830
831			/* Correct the number for a 0-base. */
832			*recnop = recno + 1;
833		}
834	}
835
836	if (LF_ISSET(SR_STK_ONLY)) {
837		BT_STK_NUM(env, cp, h, indx, ret);
838		if ((t_ret = __memp_fput(mpf,
839		     dbc->thread_info, h, dbc->priority)) != 0 && ret == 0)
840			ret = t_ret;
841	} else {
842		if (LF_ISSET(SR_DEL) && cp->csp == cp->sp)
843			cp->csp++;
844		BT_STK_ENTER(env, cp, h, indx, lock, lock_mode, ret);
845	}
846	if (ret != 0)
847		goto err;
848
849	cp->csp->lock = lock;
850	DB_ASSERT(env, parent_h == NULL);
851
852done:	if ((ret = __LPUT(dbc, saved_lock)) != 0)
853		return (ret);
854
855	return (0);
856
857err:	if (ret == 0)
858		ret = t_ret;
859	if (h != NULL && (t_ret = __memp_fput(mpf,
860	    dbc->thread_info, h, dbc->priority)) != 0 && ret == 0)
861		ret = t_ret;
862	if (parent_h != NULL && (t_ret = __memp_fput(mpf,
863	    dbc->thread_info, parent_h, dbc->priority)) != 0 && ret == 0)
864		ret = t_ret;
865
866	/* Keep any not-found page locked for serializability. */
867	if ((t_ret = __TLPUT(dbc, lock)) != 0 && ret == 0)
868		ret = t_ret;
869
870	(void)__LPUT(dbc, saved_lock);
871
872	BT_STK_POP(cp);
873	(void)__bam_stkrel(dbc, 0);
874
875	return (ret);
876}
877
878/*
879 * __bam_stkrel --
880 *	Release all pages currently held in the stack.
881 *
882 * PUBLIC: int __bam_stkrel __P((DBC *, u_int32_t));
883 */
884int
885__bam_stkrel(dbc, flags)
886	DBC *dbc;
887	u_int32_t flags;
888{
889	BTREE_CURSOR *cp;
890	DB *dbp;
891	DB_MPOOLFILE *mpf;
892	EPG *epg;
893	int ret, t_ret;
894
895	DB_ASSERT(NULL, dbc != NULL);
896	dbp = dbc->dbp;
897	mpf = dbp->mpf;
898	cp = (BTREE_CURSOR *)dbc->internal;
899
900	/*
901	 * Release inner pages first.
902	 *
903	 * The caller must be sure that setting STK_NOLOCK will not effect
904	 * either serializability or recoverability.
905	 */
906	for (ret = 0, epg = cp->sp; epg <= cp->csp; ++epg) {
907		if (epg->page != NULL) {
908			if (LF_ISSET(STK_CLRDBC) && cp->page == epg->page) {
909				cp->page = NULL;
910				LOCK_INIT(cp->lock);
911			}
912			if ((t_ret = __memp_fput(mpf, dbc->thread_info,
913			     epg->page, dbc->priority)) != 0 && ret == 0)
914				ret = t_ret;
915			epg->page = NULL;
916		}
917		/*
918		 * We set this if we need to release our pins,
919		 * but are not logically ready to have the pages
920		 * visible.
921		 */
922		if (LF_ISSET(STK_PGONLY))
923			continue;
924		if (LF_ISSET(STK_NOLOCK)) {
925			if ((t_ret = __LPUT(dbc, epg->lock)) != 0 && ret == 0)
926				ret = t_ret;
927		} else
928			if ((t_ret = __TLPUT(dbc, epg->lock)) != 0 && ret == 0)
929				ret = t_ret;
930	}
931
932	/* Clear the stack, all pages have been released. */
933	if (!LF_ISSET(STK_PGONLY))
934		BT_STK_CLR(cp);
935
936	return (ret);
937}
938
939/*
940 * __bam_stkgrow --
941 *	Grow the stack.
942 *
943 * PUBLIC: int __bam_stkgrow __P((ENV *, BTREE_CURSOR *));
944 */
945int
946__bam_stkgrow(env, cp)
947	ENV *env;
948	BTREE_CURSOR *cp;
949{
950	EPG *p;
951	size_t entries;
952	int ret;
953
954	entries = cp->esp - cp->sp;
955
956	if ((ret = __os_calloc(env, entries * 2, sizeof(EPG), &p)) != 0)
957		return (ret);
958	memcpy(p, cp->sp, entries * sizeof(EPG));
959	if (cp->sp != cp->stack)
960		__os_free(env, cp->sp);
961	cp->sp = p;
962	cp->csp = p + entries;
963	cp->esp = p + entries * 2;
964	return (0);
965}
966