1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 *
6 * $Id: bt_cursor.c,v 12.34 2008/03/11 21:07:40 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/lock.h"
15#include "dbinc/mp.h"
16
17static int  __bam_bulk __P((DBC *, DBT *, u_int32_t));
18static int  __bamc_close __P((DBC *, db_pgno_t, int *));
19static int  __bamc_del __P((DBC *));
20static int  __bamc_destroy __P((DBC *));
21static int  __bamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
22static int  __bamc_getstack __P((DBC *));
23static int  __bamc_next __P((DBC *, int, int));
24static int  __bamc_physdel __P((DBC *));
25static int  __bamc_prev __P((DBC *));
26static int  __bamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
27static int  __bamc_search __P((DBC *,
28		db_pgno_t, const DBT *, u_int32_t, int *));
29static int  __bamc_writelock __P((DBC *));
30static int  __bam_getboth_finddatum __P((DBC *, DBT *, u_int32_t));
31static int  __bam_getbothc __P((DBC *, DBT *));
32static int  __bam_get_prev __P((DBC *));
33static int  __bam_isopd __P((DBC *, db_pgno_t *));
34
35/*
36 * Acquire a new page/lock.  If we hold a page/lock, discard the page, and
37 * lock-couple the lock.
38 *
39 * !!!
40 * We have to handle both where we have a lock to lock-couple and where we
41 * don't -- we don't duplicate locks when we duplicate cursors if we are
42 * running in a transaction environment as there's no point if locks are
43 * never discarded.  This means that the cursor may or may not hold a lock.
44 * In the case where we are descending the tree we always want to unlock
45 * the held interior page so we use ACQUIRE_COUPLE.
46 */
47#undef	ACQUIRE
48#define	ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, flags, ret) do {	\
49	DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;				\
50	if ((pagep) != NULL) {						\
51		ret = __memp_fput(__mpf,				\
52		     (dbc)->thread_info, pagep, dbc->priority);		\
53		pagep = NULL;						\
54	} else								\
55		ret = 0;						\
56	if ((ret) == 0 && STD_LOCKING(dbc))				\
57		ret = __db_lget(					\
58		    dbc, LCK_COUPLE, lpgno, mode, flags, &(lock));	\
59	if ((ret) == 0)							\
60		ret = __memp_fget(__mpf, &(fpgno),			\
61		    (dbc)->thread_info, (dbc)->txn, 0, &(pagep));	\
62} while (0)
63
64/* Acquire a new page/lock for a cursor. */
65#undef	ACQUIRE_CUR
66#define	ACQUIRE_CUR(dbc, mode, p, flags, ret) do {			\
67	BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;		\
68	if (p != __cp->pgno)						\
69		__cp->pgno = PGNO_INVALID;				\
70	ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, flags, ret);	\
71	if ((ret) == 0) {						\
72		__cp->pgno = p;						\
73		__cp->lock_mode = (mode);				\
74	}								\
75} while (0)
76
77/*
78 * Acquire a write lock if we don't already have one.
79 *
80 * !!!
81 * See ACQUIRE macro on why we handle cursors that don't have locks.
82 */
83#undef	ACQUIRE_WRITE_LOCK
84#define	ACQUIRE_WRITE_LOCK(dbc, ret) do {				\
85	BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;		\
86	ret = 0;							\
87	if (STD_LOCKING(dbc) &&						\
88	    __cp->lock_mode != DB_LOCK_WRITE &&				\
89	    ((ret) = __db_lget(dbc,					\
90	    LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0,			\
91	    __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0)		\
92		__cp->lock_mode = DB_LOCK_WRITE;			\
93} while (0)
94
95/* Discard the current page/lock for a cursor. */
96#undef	DISCARD_CUR
97#define	DISCARD_CUR(dbc, ret) do {					\
98	BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;		\
99	DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf;				\
100	int __t_ret;							\
101	if ((__cp->page) != NULL) {					\
102		__t_ret = __memp_fput(__mpf,				\
103		     (dbc)->thread_info, __cp->page, dbc->priority);\
104		__cp->page = NULL;					\
105	} else								\
106		__t_ret = 0;						\
107	if (__t_ret != 0 && (ret) == 0)					\
108		ret = __t_ret;						\
109	__t_ret = __TLPUT((dbc), __cp->lock);				\
110	if (__t_ret != 0 && (ret) == 0)					\
111		ret = __t_ret;						\
112	if ((ret) == 0 && !LOCK_ISSET(__cp->lock))			\
113		__cp->lock_mode = DB_LOCK_NG;				\
114} while (0)
115
116/* If on-page item is a deleted record. */
117#undef	IS_DELETED
118#define	IS_DELETED(dbp, page, indx)					\
119	B_DISSET(GET_BKEYDATA(dbp, page,				\
120	    (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
121#undef	IS_CUR_DELETED
122#define	IS_CUR_DELETED(dbc)						\
123	IS_DELETED((dbc)->dbp, (dbc)->internal->page, (dbc)->internal->indx)
124
125/*
126 * Test to see if two cursors could point to duplicates of the same key.
127 * In the case of off-page duplicates they are they same, as the cursors
128 * will be in the same off-page duplicate tree.  In the case of on-page
129 * duplicates, the key index offsets must be the same.  For the last test,
130 * as the original cursor may not have a valid page pointer, we use the
131 * current cursor's.
132 */
133#undef	IS_DUPLICATE
134#define	IS_DUPLICATE(dbc, i1, i2)					\
135	    (P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i1] ==	\
136	     P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i2])
137#undef	IS_CUR_DUPLICATE
138#define	IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)			\
139	(F_ISSET(dbc, DBC_OPD) ||					\
140	    (orig_pgno == (dbc)->internal->pgno &&			\
141	    IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
142
143/*
144 * __bamc_init --
145 *	Initialize the access private portion of a cursor
146 *
147 * PUBLIC: int __bamc_init __P((DBC *, DBTYPE));
148 */
149int
150__bamc_init(dbc, dbtype)
151	DBC *dbc;
152	DBTYPE dbtype;
153{
154	ENV *env;
155	int ret;
156
157	env = dbc->env;
158
159	/* Allocate/initialize the internal structure. */
160	if (dbc->internal == NULL && (ret =
161	    __os_calloc(env, 1, sizeof(BTREE_CURSOR), &dbc->internal)) != 0)
162		return (ret);
163
164	/* Initialize methods. */
165	dbc->close = dbc->c_close = __dbc_close_pp;
166	dbc->count = dbc->c_count = __dbc_count_pp;
167	dbc->del = dbc->c_del = __dbc_del_pp;
168	dbc->dup = dbc->c_dup = __dbc_dup_pp;
169	dbc->get = dbc->c_get = __dbc_get_pp;
170	dbc->pget = dbc->c_pget = __dbc_pget_pp;
171	dbc->put = dbc->c_put = __dbc_put_pp;
172	if (dbtype == DB_BTREE) {
173		dbc->am_bulk = __bam_bulk;
174		dbc->am_close = __bamc_close;
175		dbc->am_del = __bamc_del;
176		dbc->am_destroy = __bamc_destroy;
177		dbc->am_get = __bamc_get;
178		dbc->am_put = __bamc_put;
179		dbc->am_writelock = __bamc_writelock;
180	} else {
181		dbc->am_bulk = __bam_bulk;
182		dbc->am_close = __bamc_close;
183		dbc->am_del = __ramc_del;
184		dbc->am_destroy = __bamc_destroy;
185		dbc->am_get = __ramc_get;
186		dbc->am_put = __ramc_put;
187		dbc->am_writelock = __bamc_writelock;
188	}
189
190	return (0);
191}
192
193/*
194 * __bamc_refresh
195 *	Set things up properly for cursor re-use.
196 *
197 * PUBLIC: int __bamc_refresh __P((DBC *));
198 */
199int
200__bamc_refresh(dbc)
201	DBC *dbc;
202{
203	BTREE *t;
204	BTREE_CURSOR *cp;
205	DB *dbp;
206
207	dbp = dbc->dbp;
208	t = dbp->bt_internal;
209	cp = (BTREE_CURSOR *)dbc->internal;
210
211	/*
212	 * If our caller set the root page number, it's because the root was
213	 * known.  This is always the case for off page dup cursors.  Else,
214	 * pull it out of our internal information.
215	 */
216	if (cp->root == PGNO_INVALID)
217		cp->root = t->bt_root;
218
219	LOCK_INIT(cp->lock);
220	cp->lock_mode = DB_LOCK_NG;
221
222	if (cp->sp == NULL) {
223		cp->sp = cp->stack;
224		cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
225	}
226	BT_STK_CLR(cp);
227
228	/*
229	 * The btree leaf page data structures require that two key/data pairs
230	 * (or four items) fit on a page, but other than that there's no fixed
231	 * requirement.  The btree off-page duplicates only require two items,
232	 * to be exact, but requiring four for them as well seems reasonable.
233	 *
234	 * Recno uses the btree bt_ovflsize value -- it's close enough.
235	 */
236	cp->ovflsize = B_MINKEY_TO_OVFLSIZE(
237	    dbp,  F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey, dbp->pgsize);
238
239	cp->recno = RECNO_OOB;
240	cp->order = INVALID_ORDER;
241	cp->flags = 0;
242
243	/* Initialize for record numbers. */
244	if (F_ISSET(dbc, DBC_OPD) ||
245	    dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_AM_RECNUM)) {
246		F_SET(cp, C_RECNUM);
247
248		/*
249		 * All btrees that support record numbers, optionally standard
250		 * recno trees, and all off-page duplicate recno trees have
251		 * mutable record numbers.
252		 */
253		if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
254		    F_ISSET(dbp, DB_AM_RECNUM | DB_AM_RENUMBER))
255			F_SET(cp, C_RENUMBER);
256	}
257
258	return (0);
259}
260
261/*
262 * __bamc_close --
263 *	Close down the cursor.
264 */
265static int
266__bamc_close(dbc, root_pgno, rmroot)
267	DBC *dbc;
268	db_pgno_t root_pgno;
269	int *rmroot;
270{
271	BTREE_CURSOR *cp, *cp_opd, *cp_c;
272	DB *dbp;
273	DBC *dbc_opd, *dbc_c;
274	DB_MPOOLFILE *mpf;
275	ENV *env;
276	PAGE *h;
277	int cdb_lock, count, ret;
278
279	dbp = dbc->dbp;
280	env = dbp->env;
281	mpf = dbp->mpf;
282	cp = (BTREE_CURSOR *)dbc->internal;
283	cp_opd = (dbc_opd = cp->opd) == NULL ?
284	    NULL : (BTREE_CURSOR *)dbc_opd->internal;
285	cdb_lock = ret = 0;
286
287	/*
288	 * There are 3 ways this function is called:
289	 *
290	 * 1. Closing a primary cursor: we get called with a pointer to a
291	 *    primary cursor that has a NULL opd field.  This happens when
292	 *    closing a btree/recno database cursor without an associated
293	 *    off-page duplicate tree.
294	 *
295	 * 2. Closing a primary and an off-page duplicate cursor stack: we
296	 *    get called with a pointer to the primary cursor which has a
297	 *    non-NULL opd field.  This happens when closing a btree cursor
298	 *    into database with an associated off-page btree/recno duplicate
299	 *    tree. (It can't be a primary recno database, recno databases
300	 *    don't support duplicates.)
301	 *
302	 * 3. Closing an off-page duplicate cursor stack: we get called with
303	 *    a pointer to the off-page duplicate cursor.  This happens when
304	 *    closing a non-btree database that has an associated off-page
305	 *    btree/recno duplicate tree or for a btree database when the
306	 *    opd tree is not empty (root_pgno == PGNO_INVALID).
307	 *
308	 * If either the primary or off-page duplicate cursor deleted a btree
309	 * key/data pair, check to see if the item is still referenced by a
310	 * different cursor.  If it is, confirm that cursor's delete flag is
311	 * set and leave it to that cursor to do the delete.
312	 *
313	 * NB: The test for == 0 below is correct.  Our caller already removed
314	 * our cursor argument from the active queue, we won't find it when we
315	 * search the queue in __bam_ca_delete().
316	 * NB: It can't be true that both the primary and off-page duplicate
317	 * cursors have deleted a btree key/data pair.  Either the primary
318	 * cursor may have deleted an item and there's no off-page duplicate
319	 * cursor, or there's an off-page duplicate cursor and it may have
320	 * deleted an item.
321	 *
322	 * Primary recno databases aren't an issue here.  Recno keys are either
323	 * deleted immediately or never deleted, and do not have to be handled
324	 * here.
325	 *
326	 * Off-page duplicate recno databases are an issue here, cases #2 and
327	 * #3 above can both be off-page recno databases.  The problem is the
328	 * same as the final problem for off-page duplicate btree databases.
329	 * If we no longer need the off-page duplicate tree, we want to remove
330	 * it.  For off-page duplicate btrees, we are done with the tree when
331	 * we delete the last item it contains, i.e., there can be no further
332	 * references to it when it's empty.  For off-page duplicate recnos,
333	 * we remove items from the tree as the application calls the remove
334	 * function, so we are done with the tree when we close the last cursor
335	 * that references it.
336	 *
337	 * We optionally take the root page number from our caller.  If the
338	 * primary database is a btree, we can get it ourselves because dbc
339	 * is the primary cursor.  If the primary database is not a btree,
340	 * the problem is that we may be dealing with a stack of pages.  The
341	 * cursor we're using to do the delete points at the bottom of that
342	 * stack and we need the top of the stack.
343	 */
344	if (F_ISSET(cp, C_DELETED)) {
345		dbc_c = dbc;
346		switch (dbc->dbtype) {
347		case DB_BTREE:				/* Case #1, #3. */
348			if ((ret = __bam_ca_delete(
349			    dbp, cp->pgno, cp->indx, 1, &count)) != 0)
350				goto err;
351			if (count == 0)
352				goto lock;
353			goto done;
354		case DB_RECNO:
355			if (!F_ISSET(dbc, DBC_OPD))	/* Case #1. */
356				goto done;
357							/* Case #3. */
358			if ((ret = __ram_ca_delete(dbp, cp->root, &count)) != 0)
359				goto err;
360			if (count == 0)
361				goto lock;
362			goto done;
363		case DB_HASH:
364		case DB_QUEUE:
365		case DB_UNKNOWN:
366		default:
367			ret = __db_unknown_type(
368			    env, "DbCursor.close", dbc->dbtype);
369			goto err;
370		}
371	}
372
373	if (dbc_opd == NULL)
374		goto done;
375
376	if (F_ISSET(cp_opd, C_DELETED)) {		/* Case #2. */
377		/*
378		 * We will not have been provided a root page number.  Acquire
379		 * one from the primary database.
380		 */
381		if ((ret = __memp_fget(mpf, &cp->pgno,
382		     dbc->thread_info, dbc->txn, 0, &h)) != 0)
383			goto err;
384		root_pgno = GET_BOVERFLOW(dbp, h, cp->indx + O_INDX)->pgno;
385		if ((ret = __memp_fput(mpf,
386		     dbc->thread_info, h, dbc->priority)) != 0)
387			goto err;
388
389		dbc_c = dbc_opd;
390		switch (dbc_opd->dbtype) {
391		case DB_BTREE:
392			if ((ret = __bam_ca_delete(
393			    dbp, cp_opd->pgno, cp_opd->indx, 1, &count)) != 0)
394				goto err;
395			if (count == 0)
396				goto lock;
397			goto done;
398		case DB_RECNO:
399			if ((ret =
400			    __ram_ca_delete(dbp, cp_opd->root, &count)) != 0)
401				goto err;
402			if (count == 0)
403				goto lock;
404			goto done;
405		case DB_HASH:
406		case DB_QUEUE:
407		case DB_UNKNOWN:
408		default:
409			ret = __db_unknown_type(
410			   env, "DbCursor.close", dbc->dbtype);
411			goto err;
412		}
413	}
414	goto done;
415
416lock:	cp_c = (BTREE_CURSOR *)dbc_c->internal;
417
418	/*
419	 * If this is CDB, upgrade the lock if necessary.  While we acquired
420	 * the write lock to logically delete the record, we released it when
421	 * we returned from that call, and so may not be holding a write lock
422	 * at the moment.
423	 */
424	if (CDB_LOCKING(env)) {
425		if (F_ISSET(dbc, DBC_WRITECURSOR)) {
426			if ((ret = __lock_get(env,
427			    dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt,
428			    DB_LOCK_WRITE, &dbc->mylock)) != 0)
429				goto err;
430			cdb_lock = 1;
431		}
432		goto delete;
433	}
434
435	/*
436	 * The variable dbc_c has been initialized to reference the cursor in
437	 * which we're going to do the delete.  Initialize the cursor's lock
438	 * structures as necessary.
439	 *
440	 * First, we may not need to acquire any locks.  If we're in case #3,
441	 * that is, the primary database isn't a btree database, our caller
442	 * is responsible for acquiring any necessary locks before calling us.
443	 */
444	if (F_ISSET(dbc, DBC_OPD))
445		goto delete;
446
447	/*
448	 * Otherwise, acquire a write lock on the primary database's page.
449	 *
450	 * Lock the primary database page, regardless of whether we're deleting
451	 * an item on a primary database page or an off-page duplicates page.
452	 *
453	 * If the cursor that did the initial logical deletion (and had a write
454	 * lock) is not the same cursor doing the physical deletion (which may
455	 * have only ever had a read lock on the item), we need to upgrade to a
456	 * write lock.  The confusion comes as follows:
457	 *
458	 *	C1	created, acquires item read lock
459	 *	C2	dup C1, create C2, also has item read lock.
460	 *	C1	acquire write lock, delete item
461	 *	C1	close
462	 *	C2	close, needs a write lock to physically delete item.
463	 *
464	 * If we're in a TXN, we know that C2 will be able to acquire the write
465	 * lock, because no locker other than the one shared by C1 and C2 can
466	 * acquire a write lock -- the original write lock C1 acquired was never
467	 * discarded.
468	 *
469	 * If we're not in a TXN, it's nastier.  Other cursors might acquire
470	 * read locks on the item after C1 closed, discarding its write lock,
471	 * and such locks would prevent C2 from acquiring a read lock.  That's
472	 * OK, though, we'll simply wait until we can acquire a write lock, or
473	 * we'll deadlock.  (Which better not happen, since we're not in a TXN.)
474	 *
475	 * There are similar scenarios with dirty reads, where the cursor may
476	 * have downgraded its write lock to a was-write lock.
477	 */
478	if (STD_LOCKING(dbc))
479		if ((ret = __db_lget(dbc,
480		    LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
481			goto err;
482
483delete:	/*
484	 * If the delete occurred in a Btree, we're going to look at the page
485	 * to see if the item has to be physically deleted.  Otherwise, we do
486	 * not need the actual page (and it may not even exist, it might have
487	 * been truncated from the file after an allocation aborted).
488	 *
489	 * Delete the on-page physical item referenced by the cursor.
490	 */
491	if (dbc_c->dbtype == DB_BTREE) {
492		if ((ret = __memp_fget(mpf, &cp_c->pgno, dbc->thread_info,
493		    dbc->txn, DB_MPOOL_DIRTY, &cp_c->page)) != 0)
494			goto err;
495		if ((ret = __bamc_physdel(dbc_c)) != 0)
496			goto err;
497	}
498
499	/*
500	 * If we're not working in an off-page duplicate tree, then we're
501	 * done.
502	 */
503	if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
504		goto done;
505
506	/*
507	 * We may have just deleted the last element in the off-page duplicate
508	 * tree, and closed the last cursor in the tree.  For an off-page btree
509	 * there are no other cursors in the tree by definition, if the tree is
510	 * empty.  For an off-page recno we know we have closed the last cursor
511	 * in the tree because the __ram_ca_delete call above returned 0 only
512	 * in that case.  So, if the off-page duplicate tree is empty at this
513	 * point, we want to remove it.
514	 */
515	if ((ret = __memp_fget(mpf, &root_pgno,
516	     dbc->thread_info, dbc->txn, 0, &h)) != 0)
517		goto err;
518	if (NUM_ENT(h) == 0) {
519		DISCARD_CUR(dbc_c, ret);
520		if (ret != 0)
521			goto err;
522		if ((ret = __db_free(dbc, h)) != 0)
523			goto err;
524	} else {
525		if ((ret = __memp_fput(mpf,
526		     dbc->thread_info, h, dbc->priority)) != 0)
527			goto err;
528		goto done;
529	}
530
531	/*
532	 * When removing the tree, we have to do one of two things.  If this is
533	 * case #2, that is, the primary tree is a btree, delete the key that's
534	 * associated with the tree from the btree leaf page.  We know we are
535	 * the only reference to it and we already have the correct lock.  We
536	 * detect this case because the cursor that was passed to us references
537	 * an off-page duplicate cursor.
538	 *
539	 * If this is case #3, that is, the primary tree isn't a btree, pass
540	 * the information back to our caller, it's their job to do cleanup on
541	 * the primary page.
542	 */
543	if (dbc_opd != NULL) {
544		if ((ret = __memp_fget(mpf, &cp->pgno, dbc->thread_info,
545		    dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0)
546			goto err;
547		if ((ret = __bamc_physdel(dbc)) != 0)
548			goto err;
549	} else
550		*rmroot = 1;
551err:
552done:	/*
553	 * Discard the page references and locks, and confirm that the stack
554	 * has been emptied.
555	 */
556	if (dbc_opd != NULL)
557		DISCARD_CUR(dbc_opd, ret);
558	DISCARD_CUR(dbc, ret);
559
560	/* Downgrade any CDB lock we acquired. */
561	if (cdb_lock)
562		(void)__lock_downgrade(env, &dbc->mylock, DB_LOCK_IWRITE, 0);
563
564	return (ret);
565}
566
567/*
568 * __bamc_destroy --
569 *	Close a single cursor -- internal version.
570 */
571static int
572__bamc_destroy(dbc)
573	DBC *dbc;
574{
575	BTREE_CURSOR *cp;
576	ENV *env;
577
578	cp = (BTREE_CURSOR *)dbc->internal;
579	env = dbc->env;
580
581	/* Discard the structures. */
582	if (cp->sp != cp->stack)
583		__os_free(env, cp->sp);
584	__os_free(env, cp);
585
586	return (0);
587}
588
589/*
590 * __bamc_count --
591 *	Return a count of on and off-page duplicates.
592 *
593 * PUBLIC: int __bamc_count __P((DBC *, db_recno_t *));
594 */
595int
596__bamc_count(dbc, recnop)
597	DBC *dbc;
598	db_recno_t *recnop;
599{
600	BTREE_CURSOR *cp;
601	DB *dbp;
602	DB_MPOOLFILE *mpf;
603	db_indx_t indx, top;
604	db_recno_t recno;
605	int ret;
606
607	dbp = dbc->dbp;
608	mpf = dbp->mpf;
609	cp = (BTREE_CURSOR *)dbc->internal;
610
611	/*
612	 * Called with the top-level cursor that may reference an off-page
613	 * duplicates tree.  We don't have to acquire any new locks, we have
614	 * to have a read lock to even get here.
615	 */
616	if (cp->opd == NULL) {
617		/*
618		 * On-page duplicates, get the page and count.
619		 */
620		if ((ret = __memp_fget(mpf, &cp->pgno,
621		    dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
622			return (ret);
623
624		/*
625		 * Move back to the beginning of the set of duplicates and
626		 * then count forward.
627		 */
628		for (indx = cp->indx;; indx -= P_INDX)
629			if (indx == 0 ||
630			    !IS_DUPLICATE(dbc, indx, indx - P_INDX))
631				break;
632		for (recno = 0,
633		    top = NUM_ENT(cp->page) - P_INDX;; indx += P_INDX) {
634			if (!IS_DELETED(dbp, cp->page, indx))
635				++recno;
636			if (indx == top ||
637			    !IS_DUPLICATE(dbc, indx, indx + P_INDX))
638				break;
639		}
640	} else {
641		/*
642		 * Off-page duplicates tree, get the root page of the off-page
643		 * duplicate tree.
644		 */
645		if ((ret = __memp_fget(mpf, &cp->opd->internal->root,
646		    dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
647			return (ret);
648
649		/*
650		 * If the page is an internal page use the page's count as it's
651		 * up-to-date and reflects the status of cursors in the tree.
652		 * If the page is a leaf page for unsorted duplicates, use the
653		 * page's count as cursors don't mark items deleted on the page
654		 * and wait, cursor delete items immediately.
655		 * If the page is a leaf page for sorted duplicates, there may
656		 * be cursors on the page marking deleted items -- count.
657		 */
658		if (TYPE(cp->page) == P_LDUP)
659			for (recno = 0, indx = 0,
660			    top = NUM_ENT(cp->page) - O_INDX;; indx += O_INDX) {
661				if (!IS_DELETED(dbp, cp->page, indx))
662					++recno;
663				if (indx == top)
664					break;
665			}
666		else
667			recno = RE_NREC(cp->page);
668	}
669
670	*recnop = recno;
671
672	ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
673	cp->page = NULL;
674
675	return (ret);
676}
677
678/*
679 * __bamc_del --
680 *	Delete using a cursor.
681 */
682static int
683__bamc_del(dbc)
684	DBC *dbc;
685{
686	BTREE_CURSOR *cp;
687	DB *dbp;
688	DB_MPOOLFILE *mpf;
689	int count, ret, t_ret;
690
691	dbp = dbc->dbp;
692	mpf = dbp->mpf;
693	cp = (BTREE_CURSOR *)dbc->internal;
694	ret = 0;
695
696	/* If the item was already deleted, return failure. */
697	if (F_ISSET(cp, C_DELETED))
698		return (DB_KEYEMPTY);
699
700	/*
701	 * This code is always called with a page lock but no page.
702	 */
703	DB_ASSERT(dbp->env, cp->page == NULL);
704
705	/*
706	 * We don't physically delete the record until the cursor moves, so
707	 * we have to have a long-lived write lock on the page instead of a
708	 * a long-lived read lock.  Note, we have to have a read lock to even
709	 * get here.
710	 *
711	 * If we're maintaining record numbers, we lock the entire tree, else
712	 * we lock the single page.
713	 */
714	if (F_ISSET(cp, C_RECNUM)) {
715		if ((ret = __bamc_getstack(dbc)) != 0)
716			goto err;
717		cp->page = cp->csp->page;
718	} else {
719		ACQUIRE_CUR(dbc, DB_LOCK_WRITE, cp->pgno, 0, ret);
720		if (ret != 0)
721			goto err;
722	}
723
724	/* Mark the page dirty. */
725	if ((ret = __memp_dirty(mpf,
726	    &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
727		goto err;
728
729	/* Log the change. */
730	if (DBC_LOGGING(dbc)) {
731		if ((ret = __bam_cdel_log(dbp, dbc->txn, &LSN(cp->page), 0,
732		    PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
733			goto err;
734	} else
735		LSN_NOT_LOGGED(LSN(cp->page));
736
737	/* Set the intent-to-delete flag on the page. */
738	if (TYPE(cp->page) == P_LBTREE)
739		B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx + O_INDX)->type);
740	else
741		B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type);
742
743err:	/*
744	 * If we've been successful so far and the tree has record numbers,
745	 * adjust the record counts.  Either way, release acquired page(s).
746	 */
747	if (F_ISSET(cp, C_RECNUM)) {
748		cp->csp->page = cp->page;
749		if (ret == 0)
750			ret = __bam_adjust(dbc, -1);
751		(void)__bam_stkrel(dbc, 0);
752	} else
753		if (cp->page != NULL &&
754		    (t_ret = __memp_fput(mpf, dbc->thread_info,
755		    cp->page, dbc->priority)) != 0 && ret == 0)
756			ret = t_ret;
757
758	cp->page = NULL;
759
760	/*
761	 * Update the cursors last, after all chance of recoverable failure
762	 * is past.
763	 */
764	if (ret == 0)
765		ret = __bam_ca_delete(dbp, cp->pgno, cp->indx, 1, &count);
766
767	return (ret);
768}
769
770/*
771 * __bamc_dup --
772 *	Duplicate a btree cursor, such that the new one holds appropriate
773 *	locks for the position of the original.
774 *
775 * PUBLIC: int __bamc_dup __P((DBC *, DBC *));
776 */
777int
778__bamc_dup(orig_dbc, new_dbc)
779	DBC *orig_dbc, *new_dbc;
780{
781	BTREE_CURSOR *orig, *new;
782
783	orig = (BTREE_CURSOR *)orig_dbc->internal;
784	new = (BTREE_CURSOR *)new_dbc->internal;
785
786	new->ovflsize = orig->ovflsize;
787	new->recno = orig->recno;
788	new->flags = orig->flags;
789
790	return (0);
791}
792
793/*
794 * __bamc_get --
795 *	Get using a cursor (btree).
796 */
797static int
798__bamc_get(dbc, key, data, flags, pgnop)
799	DBC *dbc;
800	DBT *key, *data;
801	u_int32_t flags;
802	db_pgno_t *pgnop;
803{
804	BTREE_CURSOR *cp;
805	DB *dbp;
806	DB_MPOOLFILE *mpf;
807	db_pgno_t orig_pgno;
808	db_indx_t orig_indx;
809	int exact, newopd, ret;
810
811	dbp = dbc->dbp;
812	mpf = dbp->mpf;
813	cp = (BTREE_CURSOR *)dbc->internal;
814	orig_pgno = cp->pgno;
815	orig_indx = cp->indx;
816
817	newopd = 0;
818	switch (flags) {
819	case DB_CURRENT:
820		/* It's not possible to return a deleted record. */
821		if (F_ISSET(cp, C_DELETED)) {
822			ret = DB_KEYEMPTY;
823			goto err;
824		}
825
826		/*
827		 * Acquire the current page.  We have at least a read-lock
828		 * already.  The caller may have set DB_RMW asking for a
829		 * write lock, but upgrading to a write lock has no better
830		 * chance of succeeding now instead of later, so don't try.
831		 */
832		if ((ret = __memp_fget(mpf, &cp->pgno,
833		    dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
834			goto err;
835		break;
836	case DB_FIRST:
837		newopd = 1;
838		if ((ret = __bamc_search(dbc,
839		     PGNO_INVALID, NULL, flags, &exact)) != 0)
840			goto err;
841		break;
842	case DB_GET_BOTH:
843	case DB_GET_BOTH_RANGE:
844		/*
845		 * There are two ways to get here based on DBcursor->get
846		 * with the DB_GET_BOTH/DB_GET_BOTH_RANGE flags set:
847		 *
848		 * 1. Searching a sorted off-page duplicate tree: do a tree
849		 * search.
850		 *
851		 * 2. Searching btree: do a tree search.  If it returns a
852		 * reference to off-page duplicate tree, return immediately
853		 * and let our caller deal with it.  If the search doesn't
854		 * return a reference to off-page duplicate tree, continue
855		 * with an on-page search.
856		 */
857		if (F_ISSET(dbc, DBC_OPD)) {
858			if ((ret = __bamc_search(
859			    dbc, PGNO_INVALID, data, flags, &exact)) != 0)
860				goto err;
861			if (flags == DB_GET_BOTH) {
862				if (!exact) {
863					ret = DB_NOTFOUND;
864					goto err;
865				}
866				break;
867			}
868
869			/*
870			 * We didn't require an exact match, so the search may
871			 * may have returned an entry past the end of the page,
872			 * or we may be referencing a deleted record.  If so,
873			 * move to the next entry.
874			 */
875			if ((cp->indx == NUM_ENT(cp->page) ||
876			    IS_CUR_DELETED(dbc)) &&
877			    (ret = __bamc_next(dbc, 1, 0)) != 0)
878				goto err;
879		} else {
880			if ((ret = __bamc_search(
881			    dbc, PGNO_INVALID, key, flags, &exact)) != 0)
882				return (ret);
883			if (!exact) {
884				ret = DB_NOTFOUND;
885				goto err;
886			}
887
888			if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
889				newopd = 1;
890				break;
891			}
892			if ((ret =
893			    __bam_getboth_finddatum(dbc, data, flags)) != 0)
894				goto err;
895		}
896		break;
897	case DB_GET_BOTHC:
898		if ((ret = __bam_getbothc(dbc, data)) != 0)
899			goto err;
900		break;
901	case DB_LAST:
902		newopd = 1;
903		if ((ret = __bamc_search(dbc,
904		     PGNO_INVALID, NULL, flags, &exact)) != 0)
905			goto err;
906		break;
907	case DB_NEXT:
908		newopd = 1;
909		if (cp->pgno == PGNO_INVALID) {
910			if ((ret = __bamc_search(dbc,
911			     PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
912				goto err;
913		} else
914			if ((ret = __bamc_next(dbc, 1, 0)) != 0)
915				goto err;
916		break;
917	case DB_NEXT_DUP:
918		if ((ret = __bamc_next(dbc, 1, 0)) != 0)
919			goto err;
920		if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
921			ret = DB_NOTFOUND;
922			goto err;
923		}
924		break;
925	case DB_NEXT_NODUP:
926		newopd = 1;
927		if (cp->pgno == PGNO_INVALID) {
928			if ((ret = __bamc_search(dbc,
929			     PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0)
930				goto err;
931		} else
932			do {
933				if ((ret = __bamc_next(dbc, 1, 0)) != 0)
934					goto err;
935			} while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
936		break;
937	case DB_PREV:
938		newopd = 1;
939		if (cp->pgno == PGNO_INVALID) {
940			if ((ret = __bamc_search(dbc,
941			     PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
942				goto err;
943		} else
944			if ((ret = __bamc_prev(dbc)) != 0)
945				goto err;
946		break;
947	case DB_PREV_DUP:
948		if ((ret = __bamc_prev(dbc)) != 0)
949			goto err;
950		if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
951			ret = DB_NOTFOUND;
952			goto err;
953		}
954		break;
955	case DB_PREV_NODUP:
956		newopd = 1;
957		if (cp->pgno == PGNO_INVALID) {
958			if ((ret = __bamc_search(dbc,
959			     PGNO_INVALID, NULL, DB_LAST, &exact)) != 0)
960				goto err;
961		} else
962			do {
963				if ((ret = __bamc_prev(dbc)) != 0)
964					goto err;
965			} while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
966		break;
967	case DB_SET:
968	case DB_SET_RECNO:
969		newopd = 1;
970		if ((ret = __bamc_search(dbc,
971		    PGNO_INVALID, key, flags, &exact)) != 0)
972			goto err;
973		break;
974	case DB_SET_RANGE:
975		newopd = 1;
976		if ((ret = __bamc_search(dbc,
977		    PGNO_INVALID, key, flags, &exact)) != 0)
978			goto err;
979
980		/*
981		 * As we didn't require an exact match, the search function
982		 * may have returned an entry past the end of the page.  Or,
983		 * we may be referencing a deleted record.  If so, move to
984		 * the next entry.
985		 */
986		if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
987			if ((ret = __bamc_next(dbc, 0, 0)) != 0)
988				goto err;
989		break;
990	default:
991		ret = __db_unknown_flag(dbp->env, "__bamc_get", flags);
992		goto err;
993	}
994
995	/*
996	 * We may have moved to an off-page duplicate tree.  Return that
997	 * information to our caller.
998	 */
999	if (newopd && pgnop != NULL)
1000		(void)__bam_isopd(dbc, pgnop);
1001
1002err:	/*
1003	 * Regardless of whether we were successful or not, if the cursor
1004	 * moved, clear the delete flag, DBcursor->get never references a
1005	 * deleted key, if it moved at all.
1006	 */
1007	if (F_ISSET(cp, C_DELETED) &&
1008	    (cp->pgno != orig_pgno || cp->indx != orig_indx))
1009		F_CLR(cp, C_DELETED);
1010
1011	return (ret);
1012}
1013
1014static int
1015__bam_get_prev(dbc)
1016	DBC *dbc;
1017{
1018	BTREE_CURSOR *cp;
1019	DBT key, data;
1020	db_pgno_t pgno;
1021	int ret;
1022
1023	if ((ret = __bamc_prev(dbc)) != 0)
1024		return (ret);
1025
1026	if (__bam_isopd(dbc, &pgno)) {
1027		cp = (BTREE_CURSOR *)dbc->internal;
1028		if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0)
1029			return (ret);
1030		if ((ret = cp->opd->am_get(cp->opd,
1031		    &key, &data, DB_LAST, NULL)) != 0)
1032			return (ret);
1033	}
1034
1035	return (0);
1036}
1037
1038/*
1039 * __bam_bulk -- Return bulk data from a btree.
1040 */
1041static int
1042__bam_bulk(dbc, data, flags)
1043	DBC *dbc;
1044	DBT *data;
1045	u_int32_t flags;
1046{
1047	BKEYDATA *bk;
1048	BOVERFLOW *bo;
1049	BTREE_CURSOR *cp;
1050	PAGE *pg;
1051	db_indx_t *inp, indx, pg_keyoff;
1052	int32_t  *endp, key_off, *offp, *saveoffp;
1053	u_int8_t *dbuf, *dp, *np;
1054	u_int32_t key_size, pagesize, size, space;
1055	int adj, is_key, need_pg, next_key, no_dup, rec_key, ret;
1056
1057	ret = 0;
1058	key_off = 0;
1059	size = 0;
1060	pagesize = dbc->dbp->pgsize;
1061	cp = (BTREE_CURSOR *)dbc->internal;
1062
1063	/*
1064	 * dp tracks the beginning of the page in the buffer.
1065	 * np is the next place to copy things into the buffer.
1066	 * dbuf always stays at the beginning of the buffer.
1067	 */
1068	dbuf = data->data;
1069	np = dp = dbuf;
1070
1071	/* Keep track of space that is left.  There is a termination entry */
1072	space = data->ulen;
1073	space -= sizeof(*offp);
1074
1075	/* Build the offset/size table from the end up. */
1076	endp = (int32_t *)((u_int8_t *)dbuf + data->ulen);
1077	endp--;
1078	offp = endp;
1079
1080	key_size = 0;
1081
1082	/*
1083	 * Distinguish between BTREE and RECNO.
1084	 * There are no keys in RECNO.  If MULTIPLE_KEY is specified
1085	 * then we return the record numbers.
1086	 * is_key indicates that multiple btree keys are returned.
1087	 * rec_key is set if we are returning record numbers.
1088	 * next_key is set if we are going after the next key rather than dup.
1089	 */
1090	if (dbc->dbtype == DB_BTREE) {
1091		is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1: 0;
1092		rec_key = 0;
1093		next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1094		adj = 2;
1095	} else {
1096		is_key = 0;
1097		rec_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1098		next_key = LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
1099		adj = 1;
1100	}
1101	no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
1102
1103next_pg:
1104	indx = cp->indx;
1105	pg = cp->page;
1106
1107	inp = P_INP(dbc->dbp, pg);
1108	/* The current page is not yet in the buffer. */
1109	need_pg = 1;
1110
1111	/*
1112	 * Keep track of the offset of the current key on the page.
1113	 * If we are returning keys, set it to 0 first so we force
1114	 * the copy of the key to the buffer.
1115	 */
1116	pg_keyoff = 0;
1117	if (is_key == 0)
1118		pg_keyoff = inp[indx];
1119
1120	do {
1121		if (IS_DELETED(dbc->dbp, pg, indx)) {
1122			if (dbc->dbtype != DB_RECNO)
1123				continue;
1124
1125			cp->recno++;
1126			/*
1127			 * If we are not returning recnos then we
1128			 * need to fill in every slot so the user
1129			 * can calculate the record numbers.
1130			 */
1131			if (rec_key != 0)
1132				continue;
1133
1134			space -= 2 * sizeof(*offp);
1135			/* Check if space as underflowed. */
1136			if (space > data->ulen)
1137				goto back_up;
1138
1139			/* Just mark the empty recno slots. */
1140			*offp-- = 0;
1141			*offp-- = 0;
1142			continue;
1143		}
1144
1145		/*
1146		 * Check to see if we have a new key.
1147		 * If so, then see if we need to put the
1148		 * key on the page.  If its already there
1149		 * then we just point to it.
1150		 */
1151		if (is_key && pg_keyoff != inp[indx]) {
1152			bk = GET_BKEYDATA(dbc->dbp, pg, indx);
1153			if (B_TYPE(bk->type) == B_OVERFLOW) {
1154				bo = (BOVERFLOW *)bk;
1155				size = key_size = bo->tlen;
1156				if (key_size > space)
1157					goto get_key_space;
1158				if ((ret = __bam_bulk_overflow(dbc,
1159				    bo->tlen, bo->pgno, np)) != 0)
1160					return (ret);
1161				space -= key_size;
1162				key_off = (int32_t)(np - dbuf);
1163				np += key_size;
1164			} else {
1165				if (need_pg) {
1166					dp = np;
1167					size = pagesize - HOFFSET(pg);
1168					if (space < size) {
1169get_key_space:
1170						/* Nothing added, then error. */
1171						if (offp == endp) {
1172							data->size = (u_int32_t)
1173							    DB_ALIGN(size +
1174							    pagesize, 1024);
1175							return
1176							    (DB_BUFFER_SMALL);
1177						}
1178						/*
1179						 * We need to back up to the
1180						 * last record put into the
1181						 * buffer so that it is
1182						 * CURRENT.
1183						 */
1184						if (indx != 0)
1185							indx -= P_INDX;
1186						else {
1187							if ((ret =
1188							    __bam_get_prev(
1189							    dbc)) != 0)
1190								return (ret);
1191							indx = cp->indx;
1192							pg = cp->page;
1193						}
1194						break;
1195					}
1196					/*
1197					 * Move the data part of the page
1198					 * to the buffer.
1199					 */
1200					memcpy(dp,
1201					   (u_int8_t *)pg + HOFFSET(pg), size);
1202					need_pg = 0;
1203					space -= size;
1204					np += size;
1205				}
1206				key_size = bk->len;
1207				key_off = (int32_t)((inp[indx] - HOFFSET(pg))
1208				    + (dp - dbuf) + SSZA(BKEYDATA, data));
1209				pg_keyoff = inp[indx];
1210			}
1211		}
1212
1213		/*
1214		 * Reserve space for the pointers and sizes.
1215		 * Either key/data pair or just for a data item.
1216		 */
1217		space -= (is_key ? 4 : 2) * sizeof(*offp);
1218		if (rec_key)
1219			space -= sizeof(*offp);
1220
1221		/* Check to see if space has underflowed. */
1222		if (space > data->ulen)
1223			goto back_up;
1224
1225		/*
1226		 * Determine if the next record is in the
1227		 * buffer already or if it needs to be copied in.
1228		 * If we have an off page dup, then copy as many
1229		 * as will fit into the buffer.
1230		 */
1231		bk = GET_BKEYDATA(dbc->dbp, pg, indx + adj - 1);
1232		if (B_TYPE(bk->type) == B_DUPLICATE) {
1233			bo = (BOVERFLOW *)bk;
1234			if (is_key) {
1235				*offp-- = (int32_t)key_off;
1236				*offp-- = (int32_t)key_size;
1237			}
1238			/*
1239			 * We pass the offset of the current key.
1240			 * On return we check to see if offp has
1241			 * moved to see if any data fit.
1242			 */
1243			saveoffp = offp;
1244			if ((ret = __bam_bulk_duplicates(dbc, bo->pgno,
1245			    dbuf, is_key ? offp + P_INDX : NULL,
1246			    &offp, &np, &space, no_dup)) != 0) {
1247				if (ret == DB_BUFFER_SMALL) {
1248					size = space;
1249					space = 0;
1250					/* If nothing was added, then error. */
1251					if (offp == saveoffp) {
1252						offp += 2;
1253						goto back_up;
1254					}
1255					goto get_space;
1256				}
1257				return (ret);
1258			}
1259		} else if (B_TYPE(bk->type) == B_OVERFLOW) {
1260			bo = (BOVERFLOW *)bk;
1261			size = bo->tlen;
1262			if (size > space)
1263				goto back_up;
1264			if ((ret =
1265			    __bam_bulk_overflow(dbc,
1266				bo->tlen, bo->pgno, np)) != 0)
1267				return (ret);
1268			space -= size;
1269			if (is_key) {
1270				*offp-- = (int32_t)key_off;
1271				*offp-- = (int32_t)key_size;
1272			} else if (rec_key)
1273				*offp-- = (int32_t)cp->recno;
1274			*offp-- = (int32_t)(np - dbuf);
1275			np += size;
1276			*offp-- = (int32_t)size;
1277		} else {
1278			if (need_pg) {
1279				dp = np;
1280				size = pagesize - HOFFSET(pg);
1281				if (space < size) {
1282back_up:
1283					/*
1284					 * Back up the index so that the
1285					 * last record in the buffer is CURRENT
1286					 */
1287					if (indx >= adj)
1288						indx -= adj;
1289					else {
1290						if ((ret =
1291						    __bam_get_prev(dbc)) != 0 &&
1292						    ret != DB_NOTFOUND)
1293							return (ret);
1294						indx = cp->indx;
1295						pg = cp->page;
1296					}
1297					if (dbc->dbtype == DB_RECNO)
1298						cp->recno--;
1299get_space:
1300					/*
1301					 * See if we put anything in the
1302					 * buffer or if we are doing a DBP->get
1303					 * did we get all of the data.
1304					 */
1305					if (offp >=
1306					    (is_key ? &endp[-1] : endp) ||
1307					    F_ISSET(dbc, DBC_TRANSIENT)) {
1308						data->size = (u_int32_t)
1309						    DB_ALIGN(size +
1310						    data->ulen - space, 1024);
1311						return (DB_BUFFER_SMALL);
1312					}
1313					break;
1314				}
1315				memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
1316				need_pg = 0;
1317				space -= size;
1318				np += size;
1319			}
1320			/*
1321			 * Add the offsets and sizes to the end of the buffer.
1322			 * First add the key info then the data info.
1323			 */
1324			if (is_key) {
1325				*offp-- = (int32_t)key_off;
1326				*offp-- = (int32_t)key_size;
1327			} else if (rec_key)
1328				*offp-- = (int32_t)cp->recno;
1329			*offp-- = (int32_t)((inp[indx + adj - 1] - HOFFSET(pg))
1330			    + (dp - dbuf) + SSZA(BKEYDATA, data));
1331			*offp-- = bk->len;
1332		}
1333		if (dbc->dbtype == DB_RECNO)
1334			cp->recno++;
1335		else if (no_dup) {
1336			while (indx + adj < NUM_ENT(pg) &&
1337			    pg_keyoff == inp[indx + adj])
1338				indx += adj;
1339		}
1340	/*
1341	 * Stop when we either run off the page or we move to the next key and
1342	 * we are not returning multiple keys.
1343	 */
1344	} while ((indx += adj) < NUM_ENT(pg) &&
1345	    (next_key || pg_keyoff == inp[indx]));
1346
1347	/* If we are off the page then try to the next page. */
1348	if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
1349		cp->indx = indx;
1350		ret = __bamc_next(dbc, 0, 1);
1351		if (ret == 0)
1352			goto next_pg;
1353		if (ret != DB_NOTFOUND)
1354			return (ret);
1355	}
1356
1357	/*
1358	 * If we did a DBP->get we must error if we did not return
1359	 * all the data for the current key because there is
1360	 * no way to know if we did not get it all, nor any
1361	 * interface to fetch the balance.
1362	 */
1363
1364	if (ret == 0 && indx < pg->entries &&
1365	    F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) {
1366		data->size = (data->ulen - space) + size;
1367		return (DB_BUFFER_SMALL);
1368	}
1369	/*
1370	 * Must leave the index pointing at the last record fetched.
1371	 * If we are not fetching keys, we may have stepped to the
1372	 * next key.
1373	 */
1374	if (ret == DB_BUFFER_SMALL || next_key || pg_keyoff == inp[indx])
1375		cp->indx = indx;
1376	else
1377		cp->indx = indx - P_INDX;
1378
1379	if (rec_key == 1)
1380		*offp = RECNO_OOB;
1381	else
1382		*offp = -1;
1383	return (0);
1384}
1385
1386/*
1387 * __bam_bulk_overflow --
1388 *	Dump overflow record into the buffer.
1389 *	The space requirements have already been checked.
1390 * PUBLIC: int __bam_bulk_overflow
1391 * PUBLIC:    __P((DBC *, u_int32_t, db_pgno_t, u_int8_t *));
1392 */
1393int
1394__bam_bulk_overflow(dbc, len, pgno, dp)
1395	DBC *dbc;
1396	u_int32_t len;
1397	db_pgno_t pgno;
1398	u_int8_t *dp;
1399{
1400	DBT dbt;
1401
1402	memset(&dbt, 0, sizeof(dbt));
1403	F_SET(&dbt, DB_DBT_USERMEM);
1404	dbt.ulen = len;
1405	dbt.data = (void *)dp;
1406	return (__db_goff(dbc->dbp,
1407	    dbc->thread_info, dbc->txn, &dbt, len, pgno, NULL, NULL));
1408}
1409
1410/*
1411 * __bam_bulk_duplicates --
1412 *	Put as many off page duplicates as will fit into the buffer.
1413 * This routine will adjust the cursor to reflect the position in
1414 * the overflow tree.
1415 * PUBLIC: int __bam_bulk_duplicates __P((DBC *,
1416 * PUBLIC:       db_pgno_t, u_int8_t *, int32_t *,
1417 * PUBLIC:	 int32_t **, u_int8_t **, u_int32_t *, int));
1418 */
1419int
1420__bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup)
1421	DBC *dbc;
1422	db_pgno_t pgno;
1423	u_int8_t *dbuf;
1424	int32_t *keyoff, **offpp;
1425	u_int8_t **dpp;
1426	u_int32_t *spacep;
1427	int no_dup;
1428{
1429	BKEYDATA *bk;
1430	BOVERFLOW *bo;
1431	BTREE_CURSOR *cp;
1432	DB *dbp;
1433	DBC *opd;
1434	DBT key, data;
1435	PAGE *pg;
1436	db_indx_t indx, *inp;
1437	int32_t *offp;
1438	u_int32_t pagesize, size, space;
1439	u_int8_t *dp, *np;
1440	int first, need_pg, ret, t_ret;
1441
1442	ret = 0;
1443
1444	dbp = dbc->dbp;
1445	cp = (BTREE_CURSOR *)dbc->internal;
1446	opd = cp->opd;
1447
1448	if (opd == NULL) {
1449		if ((ret = __dbc_newopd(dbc, pgno, NULL, &opd)) != 0)
1450			return (ret);
1451		cp->opd = opd;
1452		if ((ret = opd->am_get(opd,
1453		    &key, &data, DB_FIRST, NULL)) != 0)
1454			goto close_opd;
1455	}
1456
1457	pagesize = opd->dbp->pgsize;
1458	cp = (BTREE_CURSOR *)opd->internal;
1459	space = *spacep;
1460	/* Get current offset slot. */
1461	offp = *offpp;
1462
1463	/*
1464	 * np is the next place to put data.
1465	 * dp is the beginning of the current page in the buffer.
1466	 */
1467	np = dp = *dpp;
1468	first = 1;
1469	indx = cp->indx;
1470
1471	do {
1472		/* Fetch the current record.  No initial move. */
1473		if ((ret = __bamc_next(opd, 0, 0)) != 0)
1474			break;
1475		pg = cp->page;
1476		indx = cp->indx;
1477		inp = P_INP(dbp, pg);
1478		/* We need to copy the page to the buffer. */
1479		need_pg = 1;
1480
1481		do {
1482			if (IS_DELETED(dbp, pg, indx))
1483				goto contin;
1484			bk = GET_BKEYDATA(dbp, pg, indx);
1485			space -= 2 * sizeof(*offp);
1486			/* Allocate space for key if needed. */
1487			if (first == 0 && keyoff != NULL)
1488				space -= 2 * sizeof(*offp);
1489
1490			/* Did space underflow? */
1491			if (space > *spacep) {
1492				ret = DB_BUFFER_SMALL;
1493				if (first == 1) {
1494					/* Get the absolute value. */
1495					space = -(int32_t)space;
1496					space = *spacep + space;
1497					if (need_pg)
1498						space += pagesize - HOFFSET(pg);
1499				}
1500				break;
1501			}
1502			if (B_TYPE(bk->type) == B_OVERFLOW) {
1503				bo = (BOVERFLOW *)bk;
1504				size = bo->tlen;
1505				if (size > space) {
1506					ret = DB_BUFFER_SMALL;
1507					space = *spacep + size;
1508					break;
1509				}
1510				if (first == 0 && keyoff != NULL) {
1511					*offp-- = keyoff[0];
1512					*offp-- = keyoff[-1];
1513				}
1514				if ((ret = __bam_bulk_overflow(dbc,
1515				    bo->tlen, bo->pgno, np)) != 0)
1516					return (ret);
1517				space -= size;
1518				*offp-- = (int32_t)(np - dbuf);
1519				np += size;
1520			} else {
1521				if (need_pg) {
1522					dp = np;
1523					size = pagesize - HOFFSET(pg);
1524					if (space < size) {
1525						ret = DB_BUFFER_SMALL;
1526						/* Return space required. */
1527						space = *spacep + size;
1528						break;
1529					}
1530					memcpy(dp,
1531					    (u_int8_t *)pg + HOFFSET(pg), size);
1532					need_pg = 0;
1533					space -= size;
1534					np += size;
1535				}
1536				if (first == 0 && keyoff != NULL) {
1537					*offp-- = keyoff[0];
1538					*offp-- = keyoff[-1];
1539				}
1540				size = bk->len;
1541				*offp-- = (int32_t)((inp[indx] - HOFFSET(pg))
1542				    + (dp - dbuf) + SSZA(BKEYDATA, data));
1543			}
1544			*offp-- = (int32_t)size;
1545			first = 0;
1546			if (no_dup)
1547				break;
1548contin:
1549			indx++;
1550			if (opd->dbtype == DB_RECNO)
1551				cp->recno++;
1552		} while (indx < NUM_ENT(pg));
1553		if (no_dup)
1554			break;
1555		cp->indx = indx;
1556
1557	} while (ret == 0);
1558
1559	/* Return the updated information. */
1560	*spacep = space;
1561	*offpp = offp;
1562	*dpp = np;
1563
1564	/*
1565	 * If we ran out of space back up the pointer.
1566	 * If we did not return any dups or reached the end, close the opd.
1567	 */
1568	if (ret == DB_BUFFER_SMALL) {
1569		if (opd->dbtype == DB_RECNO) {
1570			if (--cp->recno == 0)
1571				goto close_opd;
1572		} else if (indx != 0)
1573			cp->indx--;
1574		else {
1575			t_ret = __bamc_prev(opd);
1576			if (t_ret == DB_NOTFOUND)
1577				goto close_opd;
1578			if (t_ret != 0)
1579				ret = t_ret;
1580		}
1581	} else if (keyoff == NULL && ret == DB_NOTFOUND) {
1582		cp->indx--;
1583		if (opd->dbtype == DB_RECNO)
1584			--cp->recno;
1585	} else if (indx == 0 || ret == DB_NOTFOUND) {
1586close_opd:
1587		if (ret == DB_NOTFOUND)
1588			ret = 0;
1589		if ((t_ret = __dbc_close(opd)) != 0 && ret == 0)
1590			ret = t_ret;
1591		((BTREE_CURSOR *)dbc->internal)->opd = NULL;
1592	}
1593	if (ret == DB_NOTFOUND)
1594		ret = 0;
1595
1596	return (ret);
1597}
1598
1599/*
1600 * __bam_getbothc --
1601 *	Search for a matching data item on a join.
1602 */
1603static int
1604__bam_getbothc(dbc, data)
1605	DBC *dbc;
1606	DBT *data;
1607{
1608	BTREE_CURSOR *cp;
1609	DB *dbp;
1610	DB_MPOOLFILE *mpf;
1611	int cmp, exact, ret;
1612
1613	dbp = dbc->dbp;
1614	mpf = dbp->mpf;
1615	cp = (BTREE_CURSOR *)dbc->internal;
1616
1617	/*
1618	 * Acquire the current page.  We have at least a read-lock
1619	 * already.  The caller may have set DB_RMW asking for a
1620	 * write lock, but upgrading to a write lock has no better
1621	 * chance of succeeding now instead of later, so don't try.
1622	 */
1623	if ((ret = __memp_fget(mpf, &cp->pgno,
1624	     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
1625		return (ret);
1626
1627	/*
1628	 * An off-page duplicate cursor.  Search the remaining duplicates
1629	 * for one which matches (do a normal btree search, then verify
1630	 * that the retrieved record is greater than the original one).
1631	 */
1632	if (F_ISSET(dbc, DBC_OPD)) {
1633		/*
1634		 * Check to make sure the desired item comes strictly after
1635		 * the current position;  if it doesn't, return DB_NOTFOUND.
1636		 */
1637		if ((ret = __bam_cmp(dbp, dbc->thread_info,
1638		    dbc->txn, data, cp->page, cp->indx,
1639		    dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare,
1640		    &cmp)) != 0)
1641			return (ret);
1642
1643		if (cmp <= 0)
1644			return (DB_NOTFOUND);
1645
1646		/* Discard the current page, we're going to do a full search. */
1647		if ((ret = __memp_fput(mpf,
1648		     dbc->thread_info, cp->page, dbc->priority)) != 0)
1649			return (ret);
1650		cp->page = NULL;
1651
1652		return (__bamc_search(dbc,
1653		    PGNO_INVALID, data, DB_GET_BOTH, &exact));
1654	}
1655
1656	/*
1657	 * We're doing a DBC->get(DB_GET_BOTHC) and we're already searching
1658	 * a set of on-page duplicates (either sorted or unsorted).  Continue
1659	 * a linear search from after the current position.
1660	 *
1661	 * (Note that we could have just finished a "set" of one duplicate,
1662	 * i.e. not a duplicate at all, but the following check will always
1663	 * return DB_NOTFOUND in this case, which is the desired behavior.)
1664	 */
1665	if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1666	    !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1667		return (DB_NOTFOUND);
1668	cp->indx += P_INDX;
1669
1670	return (__bam_getboth_finddatum(dbc, data, DB_GET_BOTH));
1671}
1672
1673/*
1674 * __bam_getboth_finddatum --
1675 *	Find a matching on-page data item.
1676 */
1677static int
1678__bam_getboth_finddatum(dbc, data, flags)
1679	DBC *dbc;
1680	DBT *data;
1681	u_int32_t flags;
1682{
1683	BTREE_CURSOR *cp;
1684	DB *dbp;
1685	db_indx_t base, lim, top;
1686	int cmp, ret;
1687
1688	COMPQUIET(cmp, 0);
1689
1690	dbp = dbc->dbp;
1691	cp = (BTREE_CURSOR *)dbc->internal;
1692
1693	/*
1694	 * Called (sometimes indirectly) from DBC->get to search on-page data
1695	 * item(s) for a matching value.  If the original flag was DB_GET_BOTH
1696	 * or DB_GET_BOTH_RANGE, the cursor is set to the first undeleted data
1697	 * item for the key.  If the original flag was DB_GET_BOTHC, the cursor
1698	 * argument is set to the first data item we can potentially return.
1699	 * In both cases, there may or may not be additional duplicate data
1700	 * items to search.
1701	 *
1702	 * If the duplicates are not sorted, do a linear search.
1703	 */
1704	if (dbp->dup_compare == NULL) {
1705		for (;; cp->indx += P_INDX) {
1706			if (!IS_CUR_DELETED(dbc) &&
1707			    (ret = __bam_cmp(dbp, dbc->thread_info,
1708			    dbc->txn, data, cp->page,
1709			    cp->indx + O_INDX, __bam_defcmp, &cmp)) != 0)
1710				return (ret);
1711			if (cmp == 0)
1712				return (0);
1713
1714			if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1715			    !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
1716				break;
1717		}
1718		return (DB_NOTFOUND);
1719	}
1720
1721	/*
1722	 * If the duplicates are sorted, do a binary search.  The reason for
1723	 * this is that large pages and small key/data pairs result in large
1724	 * numbers of on-page duplicates before they get pushed off-page.
1725	 *
1726	 * Find the top and bottom of the duplicate set.  Binary search
1727	 * requires at least two items, don't loop if there's only one.
1728	 */
1729	for (base = top = cp->indx; top < NUM_ENT(cp->page); top += P_INDX)
1730		if (!IS_DUPLICATE(dbc, cp->indx, top))
1731			break;
1732	if (base == (top - P_INDX)) {
1733		if  ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn, data,
1734		    cp->page, cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1735			return (ret);
1736		return (cmp == 0 ||
1737		    (cmp < 0 && flags == DB_GET_BOTH_RANGE) ? 0 : DB_NOTFOUND);
1738	}
1739
1740	for (lim = (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
1741		cp->indx = base + ((lim >> 1) * P_INDX);
1742		if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn, data,
1743		    cp->page, cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1744			return (ret);
1745		if (cmp == 0) {
1746			/*
1747			 * XXX
1748			 * No duplicate duplicates in sorted duplicate sets,
1749			 * so there can be only one.
1750			 */
1751			if (!IS_CUR_DELETED(dbc))
1752				return (0);
1753			break;
1754		}
1755		if (cmp > 0) {
1756			base = cp->indx + P_INDX;
1757			--lim;
1758		}
1759	}
1760
1761	/* No match found; if we're looking for an exact match, we're done. */
1762	if (flags == DB_GET_BOTH)
1763		return (DB_NOTFOUND);
1764
1765	/*
1766	 * Base is the smallest index greater than the data item, may be zero
1767	 * or a last + O_INDX index, and may be deleted.  Find an undeleted
1768	 * item.
1769	 */
1770	cp->indx = base;
1771	while (cp->indx < top && IS_CUR_DELETED(dbc))
1772		cp->indx += P_INDX;
1773	return (cp->indx < top ? 0 : DB_NOTFOUND);
1774}
1775
1776/*
1777 * __bamc_put --
1778 *	Put using a cursor.
1779 */
1780static int
1781__bamc_put(dbc, key, data, flags, pgnop)
1782	DBC *dbc;
1783	DBT *key, *data;
1784	u_int32_t flags;
1785	db_pgno_t *pgnop;
1786{
1787	BTREE *t;
1788	BTREE_CURSOR *cp;
1789	DB *dbp;
1790	DBT dbt;
1791	DB_MPOOLFILE *mpf;
1792	db_pgno_t root_pgno;
1793	u_int32_t iiop;
1794	int cmp, exact, own, ret, stack;
1795	void *arg;
1796
1797	dbp = dbc->dbp;
1798	mpf = dbp->mpf;
1799	cp = (BTREE_CURSOR *)dbc->internal;
1800	root_pgno = cp->root;
1801
1802split:	ret = stack = 0;
1803	switch (flags) {
1804	case DB_CURRENT:
1805		if (F_ISSET(cp, C_DELETED))
1806			return (DB_NOTFOUND);
1807		/* FALLTHROUGH */
1808
1809	case DB_AFTER:
1810	case DB_BEFORE:
1811		iiop = flags;
1812		own = 1;
1813
1814		/* Acquire the current page with a write lock. */
1815		ACQUIRE_WRITE_LOCK(dbc, ret);
1816		if (ret != 0)
1817			goto err;
1818		if ((ret = __memp_fget(mpf, &cp->pgno,
1819		    dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
1820			goto err;
1821		break;
1822	case DB_KEYFIRST:
1823	case DB_KEYLAST:
1824	case DB_NODUPDATA:
1825	case DB_NOOVERWRITE:
1826		own = 0;
1827		/*
1828		 * Searching off-page, sorted duplicate tree: do a tree search
1829		 * for the correct item; __bamc_search returns the smallest
1830		 * slot greater than the key, use it.
1831		 *
1832		 * See comment below regarding where we can start the search.
1833		 */
1834		if (F_ISSET(dbc, DBC_OPD)) {
1835			if ((ret = __bamc_search(dbc,
1836			    F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno,
1837			    data, flags, &exact)) != 0)
1838				goto err;
1839			stack = 1;
1840
1841			/* Disallow "sorted" duplicate duplicates. */
1842			if (exact) {
1843				if (IS_DELETED(dbp, cp->page, cp->indx)) {
1844					iiop = DB_CURRENT;
1845					break;
1846				}
1847				ret = __db_duperr(dbp, flags);
1848				goto err;
1849			}
1850			iiop = DB_BEFORE;
1851			break;
1852		}
1853
1854		/*
1855		 * Searching a btree.
1856		 *
1857		 * If we've done a split, we can start the search from the
1858		 * parent of the split page, which __bam_split returned
1859		 * for us in root_pgno, unless we're in a Btree with record
1860		 * numbering.  In that case, we'll need the true root page
1861		 * in order to adjust the record count.
1862		 */
1863		if ((ret = __bamc_search(dbc,
1864		    F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, key,
1865		    flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
1866		    DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
1867			goto err;
1868		stack = 1;
1869
1870		/*
1871		 * If we don't have an exact match, __bamc_search returned
1872		 * the smallest slot greater than the key, use it.
1873		 */
1874		if (!exact) {
1875			iiop = DB_KEYFIRST;
1876			break;
1877
1878		/*
1879		 * Check for NOOVERWRITE.  It is possible that there
1880		 * is a key with an empty duplicate page attached.
1881		 */
1882		} else if (flags == DB_NOOVERWRITE && !IS_CUR_DELETED(dbc)) {
1883			if (pgnop != NULL && __bam_isopd(dbc, pgnop))
1884				ret = __bam_opd_exists(dbc, *pgnop);
1885			else
1886				ret = DB_KEYEXIST;
1887			if (ret != 0)
1888				goto err;
1889		}
1890
1891		/*
1892		 * If duplicates aren't supported, replace the current item.
1893		 */
1894		if (!F_ISSET(dbp, DB_AM_DUP)) {
1895			iiop = DB_CURRENT;
1896			break;
1897		}
1898
1899		/*
1900		 * If we find a matching entry, it may be an off-page duplicate
1901		 * tree.  Return the page number to our caller, we need a new
1902		 * cursor.
1903		 */
1904		if (pgnop != NULL && __bam_isopd(dbc, pgnop))
1905			goto done;
1906
1907		/* If the duplicates aren't sorted, move to the right slot. */
1908		if (dbp->dup_compare == NULL) {
1909			if (flags == DB_KEYFIRST)
1910				iiop = DB_BEFORE;
1911			else
1912				for (;; cp->indx += P_INDX)
1913					if (cp->indx + P_INDX >=
1914					    NUM_ENT(cp->page) ||
1915					    !IS_DUPLICATE(dbc, cp->indx,
1916					    cp->indx + P_INDX)) {
1917						iiop = DB_AFTER;
1918						break;
1919					}
1920			break;
1921		}
1922
1923		/*
1924		 * We know that we're looking at the first of a set of sorted
1925		 * on-page duplicates.  Walk the list to find the right slot.
1926		 */
1927		for (;; cp->indx += P_INDX) {
1928			if ((ret = __bam_cmp(dbp, dbc->thread_info,
1929			    dbc->txn, data, cp->page,
1930			    cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
1931				goto err;
1932			if (cmp < 0) {
1933				iiop = DB_BEFORE;
1934				break;
1935			}
1936
1937			/* Disallow "sorted" duplicate duplicates. */
1938			if (cmp == 0) {
1939				if (IS_DELETED(dbp, cp->page, cp->indx)) {
1940					iiop = DB_CURRENT;
1941					break;
1942				}
1943				ret = __db_duperr(dbp, flags);
1944				goto err;
1945			}
1946
1947			if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
1948			    P_INP(dbp, ((PAGE *)cp->page))[cp->indx] !=
1949			    P_INP(dbp, ((PAGE *)cp->page))[cp->indx + P_INDX]) {
1950				iiop = DB_AFTER;
1951				break;
1952			}
1953		}
1954		break;
1955	default:
1956		ret = __db_unknown_flag(dbp->env, "__bamc_put", flags);
1957		goto err;
1958	}
1959
1960	switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) {
1961	case 0:
1962		break;
1963	case DB_NEEDSPLIT:
1964		/*
1965		 * To split, we need a key for the page.  Either use the key
1966		 * argument or get a copy of the key from the page.
1967		 */
1968		if (flags == DB_AFTER ||
1969		    flags == DB_BEFORE || flags == DB_CURRENT) {
1970			memset(&dbt, 0, sizeof(DBT));
1971			if ((ret = __db_ret(dbp, dbc->thread_info,
1972			    dbc->txn, cp->page, 0,
1973			    &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
1974				goto err;
1975			arg = &dbt;
1976		} else
1977			arg = F_ISSET(dbc, DBC_OPD) ? data : key;
1978
1979		/*
1980		 * Discard any locks and pinned pages (the locks are discarded
1981		 * even if we're running with transactions, as they lock pages
1982		 * that we're sorry we ever acquired).  If stack is set and the
1983		 * cursor entries are valid, they point to the same entries as
1984		 * the stack, don't free them twice.
1985		 */
1986		if (stack)
1987			ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
1988		else
1989			DISCARD_CUR(dbc, ret);
1990		if (ret != 0)
1991			goto err;
1992
1993		/*
1994		 * SR [#6059]
1995		 * If we do not own a lock on the page any more, then clear the
1996		 * cursor so we don't point at it.  Even if we call __bam_stkrel
1997		 * above we still may have entered the routine with the cursor
1998		 * positioned to a particular record.  This is in the case
1999		 * where C_RECNUM is set.
2000		 */
2001		if (own == 0) {
2002			cp->pgno = PGNO_INVALID;
2003			cp->indx = 0;
2004		}
2005
2006		/* Split the tree. */
2007		if ((ret = __bam_split(dbc, arg, &root_pgno)) != 0)
2008			return (ret);
2009
2010		goto split;
2011	default:
2012		goto err;
2013	}
2014
2015err:
2016done:	/*
2017	 * If we inserted a key into the first or last slot of the tree,
2018	 * remember where it was so we can do it more quickly next time.
2019	 * If the tree has record numbers, we need a complete stack so
2020	 * that we can adjust the record counts, so skipping the tree search
2021	 * isn't possible.  For subdatabases we need to be careful that the
2022	 * page does not move from one db to another, so we track its LSN.
2023	 *
2024	 * If there are duplicates and we are inserting into the last slot,
2025	 * the cursor will point _to_ the last item, not after it, which
2026	 * is why we subtract P_INDX below.
2027	 */
2028
2029	t = dbp->bt_internal;
2030	if (ret == 0 && TYPE(cp->page) == P_LBTREE &&
2031	    (flags == DB_KEYFIRST || flags == DB_KEYLAST) &&
2032	    !F_ISSET(cp, C_RECNUM) &&
2033	    (!F_ISSET(dbp, DB_AM_SUBDB) ||
2034	    (LOGGING_ON(dbp->env) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))) &&
2035	    ((NEXT_PGNO(cp->page) == PGNO_INVALID &&
2036	    cp->indx >= NUM_ENT(cp->page) - P_INDX) ||
2037	    (PREV_PGNO(cp->page) == PGNO_INVALID && cp->indx == 0))) {
2038		t->bt_lpgno = cp->pgno;
2039		if (F_ISSET(dbp, DB_AM_SUBDB))
2040			t->bt_llsn = LSN(cp->page);
2041	} else
2042		t->bt_lpgno = PGNO_INVALID;
2043	/*
2044	 * Discard any pages pinned in the tree and their locks, except for
2045	 * the leaf page.  Note, the leaf page participated in any stack we
2046	 * acquired, and so we have to adjust the stack as necessary.  If
2047	 * there was only a single page on the stack, we don't have to free
2048	 * further stack pages.
2049	 */
2050	if (stack && BT_STK_POP(cp) != NULL)
2051		(void)__bam_stkrel(dbc, 0);
2052
2053	/*
2054	 * Regardless of whether we were successful or not, clear the delete
2055	 * flag.  If we're successful, we either moved the cursor or the item
2056	 * is no longer deleted.  If we're not successful, then we're just a
2057	 * copy, no need to have the flag set.
2058	 *
2059	 * We may have instantiated off-page duplicate cursors during the put,
2060	 * so clear the deleted bit from the off-page duplicate cursor as well.
2061	 */
2062	F_CLR(cp, C_DELETED);
2063	if (cp->opd != NULL) {
2064		cp = (BTREE_CURSOR *)cp->opd->internal;
2065		F_CLR(cp, C_DELETED);
2066	}
2067
2068	return (ret);
2069}
2070
2071/*
2072 * __bamc_rget --
2073 *	Return the record number for a cursor.
2074 *
2075 * PUBLIC: int __bamc_rget __P((DBC *, DBT *));
2076 */
2077int
2078__bamc_rget(dbc, data)
2079	DBC *dbc;
2080	DBT *data;
2081{
2082	BTREE_CURSOR *cp;
2083	DB *dbp;
2084	DBT dbt;
2085	DB_MPOOLFILE *mpf;
2086	db_recno_t recno;
2087	int exact, ret, t_ret;
2088
2089	dbp = dbc->dbp;
2090	mpf = dbp->mpf;
2091	cp = (BTREE_CURSOR *)dbc->internal;
2092
2093	/*
2094	 * Get the page with the current item on it.
2095	 * Get a copy of the key.
2096	 * Release the page, making sure we don't release it twice.
2097	 */
2098	if ((ret = __memp_fget(mpf, &cp->pgno,
2099	     dbc->thread_info, dbc->txn, 0, &cp->page)) != 0)
2100		return (ret);
2101	memset(&dbt, 0, sizeof(DBT));
2102	if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn, cp->page,
2103	    cp->indx, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2104		goto err;
2105	ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority);
2106	cp->page = NULL;
2107	if (ret != 0)
2108		return (ret);
2109
2110	if ((ret = __bam_search(dbc, PGNO_INVALID, &dbt,
2111	    F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND,
2112	    1, &recno, &exact)) != 0)
2113		goto err;
2114
2115	ret = __db_retcopy(dbc->env, data,
2116	    &recno, sizeof(recno), &dbc->rdata->data, &dbc->rdata->ulen);
2117
2118	/* Release the stack. */
2119err:	if ((t_ret = __bam_stkrel(dbc, 0)) != 0 && ret == 0)
2120		ret = t_ret;
2121
2122	return (ret);
2123}
2124
2125/*
2126 * __bamc_writelock --
2127 *	Upgrade the cursor to a write lock.
2128 */
2129static int
2130__bamc_writelock(dbc)
2131	DBC *dbc;
2132{
2133	BTREE_CURSOR *cp;
2134	int ret;
2135
2136	cp = (BTREE_CURSOR *)dbc->internal;
2137
2138	if (cp->lock_mode == DB_LOCK_WRITE)
2139		return (0);
2140
2141	/*
2142	 * When writing to an off-page duplicate tree, we need to have the
2143	 * appropriate page in the primary tree locked.  The general DBC
2144	 * code calls us first with the primary cursor so we can acquire the
2145	 * appropriate lock.
2146	 */
2147	ACQUIRE_WRITE_LOCK(dbc, ret);
2148	return (ret);
2149}
2150
2151/*
2152 * __bamc_next --
2153 *	Move to the next record.
2154 */
2155static int
2156__bamc_next(dbc, initial_move, deleted_okay)
2157	DBC *dbc;
2158	int initial_move, deleted_okay;
2159{
2160	BTREE_CURSOR *cp;
2161	db_indx_t adjust;
2162	db_lockmode_t lock_mode;
2163	db_pgno_t pgno;
2164	int ret;
2165
2166	cp = (BTREE_CURSOR *)dbc->internal;
2167	ret = 0;
2168
2169	/*
2170	 * We're either moving through a page of duplicates or a btree leaf
2171	 * page.
2172	 *
2173	 * !!!
2174	 * This code handles empty pages and pages with only deleted entries.
2175	 */
2176	if (F_ISSET(dbc, DBC_OPD)) {
2177		adjust = O_INDX;
2178		lock_mode = DB_LOCK_NG;
2179	} else {
2180		adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2181		lock_mode =
2182		    F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2183	}
2184	if (cp->page == NULL) {
2185		ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret);
2186		if (ret != 0)
2187			return (ret);
2188	}
2189
2190	if (initial_move)
2191		cp->indx += adjust;
2192
2193	for (;;) {
2194		/*
2195		 * If at the end of the page, move to a subsequent page.
2196		 *
2197		 * !!!
2198		 * Check for >= NUM_ENT.  If the original search landed us on
2199		 * NUM_ENT, we may have incremented indx before the test.
2200		 */
2201		if (cp->indx >= NUM_ENT(cp->page)) {
2202			if ((pgno = NEXT_PGNO(cp->page)) == PGNO_INVALID)
2203				return (DB_NOTFOUND);
2204
2205			ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret);
2206			if (ret != 0)
2207				return (ret);
2208			cp->indx = 0;
2209			continue;
2210		}
2211		if (!deleted_okay && IS_CUR_DELETED(dbc)) {
2212			cp->indx += adjust;
2213			continue;
2214		}
2215		break;
2216	}
2217	return (0);
2218}
2219
2220/*
2221 * __bamc_prev --
2222 *	Move to the previous record.
2223 */
2224static int
2225__bamc_prev(dbc)
2226	DBC *dbc;
2227{
2228	BTREE_CURSOR *cp;
2229	db_indx_t adjust;
2230	db_lockmode_t lock_mode;
2231	db_pgno_t pgno;
2232	int ret;
2233
2234	cp = (BTREE_CURSOR *)dbc->internal;
2235	ret = 0;
2236
2237	/*
2238	 * We're either moving through a page of duplicates or a btree leaf
2239	 * page.
2240	 *
2241	 * !!!
2242	 * This code handles empty pages and pages with only deleted entries.
2243	 */
2244	if (F_ISSET(dbc, DBC_OPD)) {
2245		adjust = O_INDX;
2246		lock_mode = DB_LOCK_NG;
2247	} else {
2248		adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
2249		lock_mode =
2250		    F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
2251	}
2252	if (cp->page == NULL) {
2253		ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret);
2254		if (ret != 0)
2255			return (ret);
2256	}
2257
2258	for (;;) {
2259		/* If at the beginning of the page, move to a previous one. */
2260		if (cp->indx == 0) {
2261			if ((pgno =
2262			    PREV_PGNO(cp->page)) == PGNO_INVALID)
2263				return (DB_NOTFOUND);
2264
2265			ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret);
2266			if (ret != 0)
2267				return (ret);
2268
2269			if ((cp->indx = NUM_ENT(cp->page)) == 0)
2270				continue;
2271		}
2272
2273		/* Ignore deleted records. */
2274		cp->indx -= adjust;
2275		if (IS_CUR_DELETED(dbc))
2276			continue;
2277
2278		break;
2279	}
2280	return (0);
2281}
2282
2283/*
2284 * __bamc_search --
2285 *	Move to a specified record.
2286 */
2287static int
2288__bamc_search(dbc, root_pgno, key, flags, exactp)
2289	DBC *dbc;
2290	db_pgno_t root_pgno;
2291	const DBT *key;
2292	u_int32_t flags;
2293	int *exactp;
2294{
2295	BTREE *t;
2296	BTREE_CURSOR *cp;
2297	DB *dbp;
2298	PAGE *h;
2299	db_indx_t indx, *inp;
2300	db_pgno_t bt_lpgno;
2301	db_recno_t recno;
2302	u_int32_t sflags;
2303	int cmp, ret, t_ret;
2304
2305	dbp = dbc->dbp;
2306	cp = (BTREE_CURSOR *)dbc->internal;
2307	t = dbp->bt_internal;
2308	ret = 0;
2309
2310	/*
2311	 * Find an entry in the database.  Discard any lock we currently hold,
2312	 * we're going to search the tree.
2313	 */
2314	DISCARD_CUR(dbc, ret);
2315	if (ret != 0)
2316		return (ret);
2317
2318	switch (flags) {
2319	case DB_FIRST:
2320		sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MIN;
2321		goto search;
2322	case DB_LAST:
2323		sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MAX;
2324		goto search;
2325	case DB_SET_RECNO:
2326		if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
2327			return (ret);
2328		sflags =
2329		    (F_ISSET(dbc, DBC_RMW) ?  SR_FIND_WR : SR_FIND) | SR_EXACT;
2330		if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
2331			return (ret);
2332		break;
2333	case DB_SET:
2334	case DB_GET_BOTH:
2335		sflags =
2336		    (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND) | SR_EXACT;
2337		goto search;
2338	case DB_GET_BOTH_RANGE:
2339		sflags = (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND);
2340		goto search;
2341	case DB_SET_RANGE:
2342		sflags =
2343		    (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_DUPFIRST;
2344		goto search;
2345	case DB_KEYFIRST:
2346	case DB_NOOVERWRITE:
2347		sflags = SR_KEYFIRST;
2348		goto fast_search;
2349	case DB_KEYLAST:
2350	case DB_NODUPDATA:
2351		sflags = SR_KEYLAST;
2352fast_search:	/*
2353		 * If the application has a history of inserting into the first
2354		 * or last pages of the database, we check those pages first to
2355		 * avoid doing a full search.
2356		 */
2357		if (F_ISSET(dbc, DBC_OPD))
2358			goto search;
2359
2360		/*
2361		 * !!!
2362		 * We do not mutex protect the t->bt_lpgno field, which means
2363		 * that it can only be used in an advisory manner.  If we find
2364		 * page we can use, great.  If we don't, we don't care, we do
2365		 * it the slow way instead.  Regardless, copy it into a local
2366		 * variable, otherwise we might acquire a lock for a page and
2367		 * then read a different page because it changed underfoot.
2368		 */
2369		bt_lpgno = t->bt_lpgno;
2370
2371		/*
2372		 * If the tree has no history of insertion, do it the slow way.
2373		 */
2374		if (bt_lpgno == PGNO_INVALID)
2375			goto search;
2376
2377		/*
2378		 * Lock and retrieve the page on which we last inserted.
2379		 *
2380		 * The page may not exist: if a transaction created the page
2381		 * and then aborted, the page might have been truncated from
2382		 * the end of the file.  We don't want to wait on the lock.
2383		 * The page may not even be relevant to this search.
2384		 */
2385		h = NULL;
2386		ACQUIRE_CUR(dbc, DB_LOCK_WRITE, bt_lpgno, DB_LOCK_NOWAIT, ret);
2387		if (ret != 0) {
2388			if (ret == DB_LOCK_DEADLOCK ||
2389			    ret == DB_LOCK_NOTGRANTED ||
2390			    ret == DB_PAGE_NOTFOUND)
2391				ret = 0;
2392			goto fast_miss;
2393		}
2394
2395		h = cp->page;
2396		inp = P_INP(dbp, h);
2397
2398		/*
2399		 * It's okay if the page type isn't right or it's empty, it
2400		 * just means that the world changed.
2401		 */
2402		if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
2403			goto fast_miss;
2404
2405		/* Verify that this page cannot have moved to another db. */
2406		if (F_ISSET(dbp, DB_AM_SUBDB) &&
2407		    LOG_COMPARE(&t->bt_llsn, &LSN(h)) != 0)
2408			goto fast_miss;
2409		/*
2410		 * What we do here is test to see if we're at the beginning or
2411		 * end of the tree and if the new item sorts before/after the
2412		 * first/last page entry.  We don't try and catch inserts into
2413		 * the middle of the tree (although we could, as long as there
2414		 * were two keys on the page and we saved both the index and
2415		 * the page number of the last insert).
2416		 */
2417		if (h->next_pgno == PGNO_INVALID) {
2418			indx = NUM_ENT(h) - P_INDX;
2419			if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn,
2420			    key, h, indx, t->bt_compare, &cmp)) != 0)
2421				goto fast_miss;
2422
2423			if (cmp < 0)
2424				goto try_begin;
2425			if (cmp > 0) {
2426				indx += P_INDX;
2427				goto fast_hit;
2428			}
2429
2430			/*
2431			 * Found a duplicate.  If doing DB_KEYLAST, we're at
2432			 * the correct position, otherwise, move to the first
2433			 * of the duplicates.  If we're looking at off-page
2434			 * duplicates, duplicate duplicates aren't permitted,
2435			 * so we're done.
2436			 */
2437			if (flags == DB_KEYLAST)
2438				goto fast_hit;
2439			for (;
2440			    indx > 0 && inp[indx - P_INDX] == inp[indx];
2441			    indx -= P_INDX)
2442				;
2443			goto fast_hit;
2444		}
2445try_begin:	if (h->prev_pgno == PGNO_INVALID) {
2446			indx = 0;
2447			if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn,
2448			    key, h, indx, t->bt_compare, &cmp)) != 0)
2449				goto fast_miss;
2450
2451			if (cmp > 0)
2452				goto fast_miss;
2453			if (cmp < 0)
2454				goto fast_hit;
2455
2456			/*
2457			 * Found a duplicate.  If doing DB_KEYFIRST, we're at
2458			 * the correct position, otherwise, move to the last
2459			 * of the duplicates.  If we're looking at off-page
2460			 * duplicates, duplicate duplicates aren't permitted,
2461			 * so we're done.
2462			 */
2463			if (flags == DB_KEYFIRST)
2464				goto fast_hit;
2465			for (;
2466			    indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
2467			    inp[indx] == inp[indx + P_INDX];
2468			    indx += P_INDX)
2469				;
2470			goto fast_hit;
2471		}
2472		goto fast_miss;
2473
2474fast_hit:	/* Set the exact match flag, we may have found a duplicate. */
2475		*exactp = cmp == 0;
2476
2477		/*
2478		 * Insert the entry in the stack.  (Our caller is likely to
2479		 * call __bam_stkrel() after our return.)
2480		 */
2481		BT_STK_CLR(cp);
2482		BT_STK_ENTER(dbp->env,
2483		    cp, h, indx, cp->lock, cp->lock_mode, ret);
2484		if (ret != 0)
2485			return (ret);
2486		break;
2487
2488fast_miss:	/*
2489		 * This was not the right page, so we do not need to retain
2490		 * the lock even in the presence of transactions.
2491		 *
2492		 * This is also an error path, so ret may have been set.
2493		 */
2494		DISCARD_CUR(dbc, ret);
2495		cp->pgno = PGNO_INVALID;
2496		if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
2497			ret = t_ret;
2498		if (ret != 0)
2499			return (ret);
2500
2501search:		if ((ret = __bam_search(dbc, root_pgno,
2502		    key, sflags, 1, NULL, exactp)) != 0)
2503			return (ret);
2504		break;
2505	default:
2506		return (__db_unknown_flag(dbp->env, "__bamc_search", flags));
2507	}
2508	/* Initialize the cursor from the stack. */
2509	cp->page = cp->csp->page;
2510	cp->pgno = cp->csp->page->pgno;
2511	cp->indx = cp->csp->indx;
2512	cp->lock = cp->csp->lock;
2513	cp->lock_mode = cp->csp->lock_mode;
2514
2515	/* If on an empty page or a deleted record, move to the next one. */
2516	if (flags == DB_FIRST &&
2517	    (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
2518		if ((ret = __bamc_next(dbc, 0, 0)) != 0)
2519			return (ret);
2520	if (flags == DB_LAST &&
2521	    (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc)))
2522		if ((ret = __bamc_prev(dbc)) != 0)
2523			return (ret);
2524
2525	return (0);
2526}
2527
2528/*
2529 * __bamc_physdel --
2530 *	Physically remove an item from the page.
2531 */
2532static int
2533__bamc_physdel(dbc)
2534	DBC *dbc;
2535{
2536	BTREE_CURSOR *cp;
2537	DB *dbp;
2538	DBT key;
2539	int delete_page, empty_page, exact, ret;
2540
2541	dbp = dbc->dbp;
2542	memset(&key, 0, sizeof(DBT));
2543	cp = (BTREE_CURSOR *)dbc->internal;
2544	delete_page = empty_page = ret = 0;
2545
2546	/* If the page is going to be emptied, consider deleting it. */
2547	delete_page = empty_page =
2548	    NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
2549
2550	/*
2551	 * Check if the application turned off reverse splits.  Applications
2552	 * can't turn off reverse splits in off-page duplicate trees, that
2553	 * space will never be reused unless the exact same key is specified.
2554	 */
2555	if (delete_page &&
2556	    !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_AM_REVSPLITOFF))
2557		delete_page = 0;
2558
2559	/*
2560	 * We never delete the last leaf page.  (Not really true -- we delete
2561	 * the last leaf page of off-page duplicate trees, but that's handled
2562	 * by our caller, not down here.)
2563	 */
2564	if (delete_page && cp->pgno == cp->root)
2565		delete_page = 0;
2566
2567	/*
2568	 * To delete a leaf page other than an empty root page, we need a
2569	 * copy of a key from the page.  Use the 0th page index since it's
2570	 * the last key the page held.
2571	 *
2572	 * !!!
2573	 * Note that because __bamc_physdel is always called from a cursor
2574	 * close, it should be safe to use the cursor's own "my_rkey" memory
2575	 * to temporarily hold this key.  We shouldn't own any returned-data
2576	 * memory of interest--if we do, we're in trouble anyway.
2577	 */
2578	if (delete_page) {
2579		if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn, cp->page,
2580		    0, &key, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2581			return (ret);
2582	}
2583
2584	/*
2585	 * Delete the items.  If page isn't empty, we adjust the cursors.
2586	 *
2587	 * !!!
2588	 * The following operations to delete a page may deadlock.  The easy
2589	 * scenario is if we're deleting an item because we're closing cursors
2590	 * because we've already deadlocked and want to call txn->abort.  If
2591	 * we fail due to deadlock, we'll leave a locked, possibly empty page
2592	 * in the tree, which won't be empty long because we'll undo the delete
2593	 * when we undo the transaction's modifications.
2594	 *
2595	 * !!!
2596	 * Delete the key item first, otherwise the on-page duplicate checks
2597	 * in __bam_ditem() won't work!
2598	 */
2599	if ((ret = __memp_dirty(dbp->mpf,
2600	    &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
2601		return (ret);
2602	if (TYPE(cp->page) == P_LBTREE) {
2603		if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2604			return (ret);
2605		if (!empty_page)
2606			if ((ret = __bam_ca_di(dbc,
2607			    PGNO(cp->page), cp->indx, -1)) != 0)
2608				return (ret);
2609	}
2610	if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
2611		return (ret);
2612
2613	/* Clear the deleted flag, the item is gone. */
2614	F_CLR(cp, C_DELETED);
2615
2616	if (!empty_page)
2617		if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0)
2618			return (ret);
2619
2620	/*
2621	 * Need to downgrade write locks here or non-txn locks will get stuck.
2622	 */
2623	if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED)) {
2624		if ((ret = __TLPUT(dbc, cp->lock)) != 0)
2625			return (ret);
2626		cp->lock_mode = DB_LOCK_WWRITE;
2627	}
2628	/* If we're not going to try and delete the page, we're done. */
2629	if (!delete_page)
2630		return (0);
2631
2632	ret = __bam_search(dbc, PGNO_INVALID, &key, SR_DEL, 0, NULL, &exact);
2633
2634	/*
2635	 * If everything worked, delete the stack, otherwise, release the
2636	 * stack and page locks without further damage.
2637	 */
2638	if (ret == 0)
2639		DISCARD_CUR(dbc, ret);
2640	if (ret == 0)
2641		ret = __bam_dpages(dbc, 1, 0);
2642	else
2643		(void)__bam_stkrel(dbc, 0);
2644
2645	return (ret);
2646}
2647
2648/*
2649 * __bamc_getstack --
2650 *	Acquire a full stack for a cursor.
2651 */
2652static int
2653__bamc_getstack(dbc)
2654	DBC *dbc;
2655{
2656	BTREE_CURSOR *cp;
2657	DB *dbp;
2658	DBT dbt;
2659	DB_MPOOLFILE *mpf;
2660	PAGE *h;
2661	int exact, ret, t_ret;
2662
2663	dbp = dbc->dbp;
2664	mpf = dbp->mpf;
2665	cp = (BTREE_CURSOR *)dbc->internal;
2666
2667	/*
2668	 * Get the page with the current item on it.  The caller of this
2669	 * routine has to already hold a read lock on the page, so there
2670	 * is no additional lock to acquire.
2671	 */
2672	if ((ret = __memp_fget(mpf, &cp->pgno,
2673	     dbc->thread_info, dbc->txn, 0, &h)) != 0)
2674		return (ret);
2675
2676	/* Get a copy of a key from the page. */
2677	memset(&dbt, 0, sizeof(DBT));
2678	if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn,
2679	    h, 0, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
2680		goto err;
2681
2682	/* Get a write-locked stack for the page. */
2683	exact = 0;
2684	ret = __bam_search(dbc, PGNO_INVALID,
2685	    &dbt, SR_KEYFIRST, 1, NULL, &exact);
2686
2687err:	/* Discard the key and the page. */
2688	if ((t_ret = __memp_fput(mpf,
2689	     dbc->thread_info, h, dbc->priority)) != 0 && ret == 0)
2690		ret = t_ret;
2691
2692	return (ret);
2693}
2694
2695/*
2696 * __bam_isopd --
2697 *	Return if the cursor references an off-page duplicate tree via its
2698 *	page number.
2699 */
2700static int
2701__bam_isopd(dbc, pgnop)
2702	DBC *dbc;
2703	db_pgno_t *pgnop;
2704{
2705	BOVERFLOW *bo;
2706
2707	if (TYPE(dbc->internal->page) != P_LBTREE)
2708		return (0);
2709
2710	bo = GET_BOVERFLOW(dbc->dbp,
2711	    dbc->internal->page, dbc->internal->indx + O_INDX);
2712	if (B_TYPE(bo->type) == B_DUPLICATE) {
2713		*pgnop = bo->pgno;
2714		return (1);
2715	}
2716	return (0);
2717}
2718
2719/*
2720 * __bam_opd_exists --
2721 *	Return if the current position has any data.
2722 * PUBLIC: int  __bam_opd_exists __P((DBC *, db_pgno_t));
2723 */
2724int
2725__bam_opd_exists(dbc, pgno)
2726	DBC *dbc;
2727	db_pgno_t pgno;
2728{
2729	PAGE *h;
2730	int ret;
2731
2732	if ((ret = __memp_fget(dbc->dbp->mpf, &pgno,
2733	    dbc->thread_info, dbc->txn, 0, &h)) != 0)
2734		return (ret);
2735
2736	/*
2737	 * We always collapse OPD trees so we only need to check
2738	 * the number of entries on the root.  If there is a non-empty
2739	 * tree then there will be duplicates.
2740	 */
2741	if (NUM_ENT(h) == 0)
2742		ret = 0;
2743	else
2744		ret = DB_KEYEXIST;
2745
2746	(void)__memp_fput(dbc->dbp->mpf, dbc->thread_info, h, dbc->priority);
2747
2748	return (ret);
2749}
2750