1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1999-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/lock.h"
15#include "dbinc/log.h"
16#include "dbinc/mp.h"
17#include "dbinc/txn.h"
18
19static int __bam_compact_dups __P((DBC *,
20     PAGE **, u_int32_t, int, DB_COMPACT *, int *));
21static int __bam_compact_int __P((DBC *,
22     DBT *, DBT *, u_int32_t, int *, DB_COMPACT *, int *));
23static int __bam_compact_isdone __P((DBC *, DBT *, PAGE *, int *));
24static int __bam_csearch __P((DBC *, DBT *, u_int32_t, int));
25static int __bam_lock_tree __P((DBC *, EPG *, EPG *csp, u_int32_t, u_int32_t));
26static int __bam_lock_subtree __P((DBC *, PAGE *, u_int32_t, u_int32_t));
27static int __bam_merge __P((DBC *,
28     DBC *,  u_int32_t, DBT *, DB_COMPACT *,int *));
29static int __bam_merge_internal __P((DBC *, DBC *, int, DB_COMPACT *, int *));
30static int __bam_merge_pages __P((DBC *, DBC *, DB_COMPACT *));
31static int __bam_merge_records __P((DBC *, DBC*,  u_int32_t, DB_COMPACT *));
32static int __bam_truncate_internal_overflow __P((DBC *, PAGE *, DB_COMPACT *));
33static int __bam_truncate_overflow __P((DBC *,
34     db_pgno_t, PAGE **, DB_COMPACT *));
35static int __bam_truncate_page __P((DBC *, PAGE **, PAGE *, int));
36static int __bam_truncate_root_page __P((DBC *,
37     PAGE *, u_int32_t, DB_COMPACT *));
38
39#ifdef HAVE_FTRUNCATE
40static int __bam_free_freelist __P((DB *, DB_THREAD_INFO *, DB_TXN *));
41static int __bam_savekey __P((DBC *, int, DBT *));
42static int __bam_setup_freelist __P((DB *, db_pglist_t *, u_int32_t));
43static int __bam_truncate_internal __P((DB *,
44     DB_THREAD_INFO *, DB_TXN *, DB_COMPACT *));
45#endif
46
47#define	SAVE_START							\
48	do {								\
49		save_data = *c_data;					\
50		ret = __db_retcopy(env,				\
51		     &save_start, current.data, current.size,		\
52		     &save_start.data, &save_start.ulen);		\
53	} while (0)
54
55/*
56 * Only restore those things that are negated by aborting the
57 * transaction.  We don't restore the number of deadlocks, for example.
58 */
59
60#define	RESTORE_START							\
61	do {								\
62		c_data->compact_pages_free =				\
63		      save_data.compact_pages_free;			\
64		c_data->compact_levels = save_data.compact_levels;	\
65		c_data->compact_truncate = save_data.compact_truncate;	\
66		ret = __db_retcopy(env, &current,			\
67		     save_start.data, save_start.size,			\
68		     &current.data, &current.ulen);			\
69	} while (0)
70
71/*
72 e __bam_compact -- compact a btree.
73 *
74 * PUBLIC: int __bam_compact __P((DB *, DB_THREAD_INFO *, DB_TXN *,
75 * PUBLIC:     DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *));
76 */
77int
78__bam_compact(dbp, ip, txn, start, stop, c_data, flags, end)
79	DB *dbp;
80	DB_THREAD_INFO *ip;
81	DB_TXN *txn;
82	DBT *start, *stop;
83	DB_COMPACT *c_data;
84	u_int32_t flags;
85	DBT *end;
86{
87	DBC *dbc;
88	DBT current, save_start;
89	DB_COMPACT save_data;
90	ENV *env;
91	u_int32_t factor, retry;
92	int deadlock, have_freelist, isdone, ret, span, t_ret, txn_local;
93
94#ifdef HAVE_FTRUNCATE
95	db_pglist_t *list;
96	db_pgno_t last_pgno;
97	u_int32_t nelems, truncated;
98#endif
99
100	env = dbp->env;
101
102	memset(&current, 0, sizeof(current));
103	memset(&save_start, 0, sizeof(save_start));
104	dbc = NULL;
105	factor = 0;
106	have_freelist = deadlock = isdone = ret = span = 0;
107	ret = retry = 0;
108
109#ifdef HAVE_FTRUNCATE
110	list = NULL;
111	last_pgno = 0;
112	nelems = truncated = 0;
113#endif
114
115	/*
116	 * We pass "current" to the internal routine, indicating where that
117	 * routine should begin its work and expecting that it will return to
118	 * us the last key that it processed.
119	 */
120	if (start != NULL && (ret = __db_retcopy(env,
121	     &current, start->data, start->size,
122	     &current.data, &current.ulen)) != 0)
123		return (ret);
124
125	if (IS_DB_AUTO_COMMIT(dbp, txn))
126		txn_local = 1;
127	else
128		txn_local = 0;
129	if (!LF_ISSET(DB_FREE_SPACE | DB_FREELIST_ONLY))
130		goto no_free;
131	if (LF_ISSET(DB_FREELIST_ONLY))
132		LF_SET(DB_FREE_SPACE);
133
134#ifdef HAVE_FTRUNCATE
135	/* Sort the freelist and set up the in-memory list representation. */
136	if (txn_local && (ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0)
137		goto err;
138
139	if ((ret = __db_free_truncate(dbp, ip,
140	     txn, flags, c_data, &list, &nelems, &last_pgno)) != 0) {
141		LF_CLR(DB_FREE_SPACE);
142		goto terr;
143	}
144
145	/* If the freelist is empty and we are not filling, get out. */
146	if (nelems == 0 && LF_ISSET(DB_FREELIST_ONLY)) {
147		ret = 0;
148		LF_CLR(DB_FREE_SPACE);
149		goto terr;
150	}
151	if ((ret = __bam_setup_freelist(dbp, list, nelems)) != 0) {
152		/* Someone else owns the free list. */
153		if (ret == EBUSY)
154			ret = 0;
155	}
156	if (ret == 0)
157		have_freelist = 1;
158
159	/* Commit the txn and release the meta page lock. */
160terr:	if (txn_local) {
161		if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0)
162			ret = t_ret;
163		txn = NULL;
164	}
165	if (ret != 0)
166		goto err;
167
168	/* Save the number truncated so far, we will add what we get below. */
169	truncated = c_data->compact_pages_truncated;
170	if (LF_ISSET(DB_FREELIST_ONLY))
171		goto done;
172#endif
173
174	/*
175	 * We want factor to be the target number of free bytes on each page,
176	 * so we know when to stop adding items to a page.   Make sure to
177	 * subtract the page overhead when computing this target.  This can
178	 * result in a 1-2% error on the smallest page.
179	 * First figure out how many bytes we should use:
180	 */
181no_free:
182	factor = dbp->pgsize - SIZEOF_PAGE;
183	if (c_data->compact_fillpercent != 0) {
184		factor *= c_data->compact_fillpercent;
185		factor /= 100;
186	}
187	/* Now convert to the number of free bytes to target. */
188	factor = (dbp->pgsize - SIZEOF_PAGE) - factor;
189
190	if (c_data->compact_pages == 0)
191		c_data->compact_pages = DB_MAX_PAGES;
192
193	do {
194		deadlock = 0;
195
196		SAVE_START;
197		if (ret != 0)
198			break;
199
200		if (txn_local) {
201			if ((ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0)
202				break;
203
204			if (c_data->compact_timeout != 0 &&
205			    (ret = __txn_set_timeout(txn,
206			    c_data->compact_timeout, DB_SET_LOCK_TIMEOUT)) != 0)
207				goto err;
208		}
209
210		if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
211			goto err;
212
213		if ((ret = __bam_compact_int(dbc, &current, stop, factor,
214		     &span, c_data, &isdone)) ==
215		     DB_LOCK_DEADLOCK && txn_local) {
216			/*
217			 * We retry on deadlock.  Cancel the statistics
218			 * and reset the start point to before this
219			 * iteration.
220			 */
221			deadlock = 1;
222			c_data->compact_deadlock++;
223			RESTORE_START;
224		}
225		/*
226		 * If we could not get a lock while holding an internal
227		 * node latched, commit the current local transaction otherwise
228		 * report a deadlock.
229		 */
230		if (ret == DB_LOCK_NOTGRANTED) {
231			if (txn_local || retry++ < 5)
232				ret = 0;
233			else
234				ret = DB_LOCK_DEADLOCK;
235		} else
236			retry = 0;
237
238		if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
239			ret = t_ret;
240
241err:		if (txn_local && txn != NULL) {
242			if (ret == 0 && deadlock == 0)
243				ret = __txn_commit(txn, DB_TXN_NOSYNC);
244			else if ((t_ret = __txn_abort(txn)) != 0 && ret == 0)
245				ret = t_ret;
246			txn = NULL;
247		}
248	} while (ret == 0 && !isdone);
249
250	if (ret == 0 && end != NULL)
251		ret = __db_retcopy(env, end, current.data, current.size,
252		    &end->data, &end->ulen);
253	if (current.data != NULL)
254		__os_free(env, current.data);
255	if (save_start.data != NULL)
256		__os_free(env, save_start.data);
257
258#ifdef HAVE_FTRUNCATE
259	/*
260	 * Finish up truncation work.  If there are pages left in the free
261	 * list then search the internal nodes of the tree as we may have
262	 * missed some while walking the leaf nodes.  Then calculate how
263	 * many pages we have truncated and release the in-memory free list.
264	 */
265done:	if (LF_ISSET(DB_FREE_SPACE)) {
266		DBMETA *meta;
267		db_pgno_t pgno;
268
269		pgno = PGNO_BASE_MD;
270		isdone = 1;
271		if (ret == 0 && !LF_ISSET(DB_FREELIST_ONLY) && (t_ret =
272		    __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) {
273			isdone = meta->free == PGNO_INVALID;
274			ret = __memp_fput(dbp->mpf, ip, meta, dbp->priority);
275		}
276
277		if (!isdone)
278			ret = __bam_truncate_internal(dbp, ip, txn, c_data);
279
280		/* Clean up the free list. */
281		if (list != NULL)
282			__os_free(env, list);
283
284		if ((t_ret =
285		    __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) {
286			c_data->compact_pages_truncated =
287			    truncated + last_pgno - meta->last_pgno;
288			if ((t_ret = __memp_fput(dbp->mpf, ip,
289			    meta, dbp->priority)) != 0 && ret == 0)
290				ret = t_ret;
291		} else if (ret == 0)
292			ret = t_ret;
293
294		if (have_freelist && (t_ret =
295		    __bam_free_freelist(dbp, ip, txn)) != 0 && ret == 0)
296			t_ret = ret;
297	}
298#endif
299
300	return (ret);
301}
302
303/*
304 * __bam_csearch -- isolate search code for bam_compact.
305 * This routine hides the differences between searching
306 * a BTREE and a RECNO from the rest of the code.
307 */
308#define	CS_READ	0	/* We are just reading. */
309#define	CS_PARENT	1	/* We want the parent too, write lock. */
310#define	CS_NEXT		2	/* Get the next page. */
311#define	CS_NEXT_WRITE	3	/* Get the next page and write lock. */
312#define	CS_DEL		4	/* Get a stack to delete a page. */
313#define	CS_START	5	/* Starting level for stack, write lock. */
314#define	CS_NEXT_BOTH	6	/* Get this page and the next, write lock. */
315#define	CS_GETRECNO     0x80	/* Extract record number from start. */
316
317static int
318__bam_csearch(dbc, start, sflag, level)
319	DBC *dbc;
320	DBT *start;
321	u_int32_t sflag;
322	int level;
323{
324	BTREE_CURSOR *cp;
325	int not_used, ret;
326
327	cp = (BTREE_CURSOR *)dbc->internal;
328
329	if (dbc->dbtype == DB_RECNO) {
330		/* If GETRECNO is not set the cp->recno is what we want. */
331		if (FLD_ISSET(sflag, CS_GETRECNO)) {
332			if (start == NULL || start->size == 0)
333				cp->recno = 1;
334			else if ((ret =
335			     __ram_getno(dbc, start, &cp->recno, 0)) != 0)
336				return (ret);
337			FLD_CLR(sflag, CS_GETRECNO);
338		}
339		switch (sflag) {
340		case CS_READ:
341			sflag = SR_READ;
342			break;
343		case CS_NEXT:
344			sflag = SR_PARENT | SR_READ;
345			break;
346		case CS_START:
347			level = LEAFLEVEL;
348			/* FALLTHROUGH */
349		case CS_DEL:
350		case CS_NEXT_WRITE:
351			sflag = SR_STACK;
352			break;
353		case CS_NEXT_BOTH:
354			sflag = SR_BOTH | SR_NEXT | SR_WRITE;
355			break;
356		case CS_PARENT:
357			sflag = SR_PARENT | SR_WRITE;
358			break;
359		default:
360			return (__env_panic(dbc->env, EINVAL));
361		}
362		if ((ret = __bam_rsearch(dbc,
363		     &cp->recno, sflag, level, &not_used)) != 0)
364			return (ret);
365		/* Reset the cursor's recno to the beginning of the page. */
366		cp->recno -= cp->csp->indx;
367	} else {
368		FLD_CLR(sflag, CS_GETRECNO);
369		switch (sflag) {
370		case CS_READ:
371			sflag = SR_READ | SR_DUPFIRST;
372			break;
373		case CS_DEL:
374			sflag = SR_DEL;
375			break;
376		case CS_NEXT:
377			sflag = SR_NEXT;
378			break;
379		case CS_NEXT_WRITE:
380			sflag = SR_NEXT | SR_WRITE;
381			break;
382		case CS_NEXT_BOTH:
383			sflag = SR_BOTH | SR_NEXT | SR_WRITE;
384			break;
385		case CS_START:
386			sflag = SR_START | SR_WRITE;
387			break;
388		case CS_PARENT:
389			sflag = SR_PARENT | SR_WRITE;
390			break;
391		default:
392			return (__env_panic(dbc->env, EINVAL));
393		}
394		if (start == NULL || start->size == 0)
395			FLD_SET(sflag, SR_MIN);
396
397		if ((ret = __bam_search(dbc,
398		     cp->root, start, sflag, level, NULL, &not_used)) != 0)
399			return (ret);
400	}
401
402	return (0);
403}
404
405/*
406 * __bam_compact_int -- internal compaction routine.
407 *	Called either with a cursor on the main database
408 * or a cursor initialized to the root of an off page duplicate
409 * tree.
410 */
411static int
412__bam_compact_int(dbc, start, stop, factor, spanp, c_data, donep)
413	DBC *dbc;
414	DBT *start, *stop;
415	u_int32_t factor;
416	int *spanp;
417	DB_COMPACT *c_data;
418	int *donep;
419{
420	BTREE_CURSOR *cp, *ncp;
421	DB *dbp;
422	DBC *ndbc;
423	DB_LOCK metalock, next_lock, nnext_lock, prev_lock, saved_lock;
424	DB_MPOOLFILE *dbmp;
425	ENV *env;
426	EPG *epg;
427	PAGE *pg, *ppg, *npg;
428	db_pgno_t metapgno, npgno, nnext_pgno;
429	db_pgno_t pgno, prev_pgno, ppgno, saved_pgno;
430	db_recno_t next_recno;
431	u_int32_t sflag, pgs_free;
432	int check_dups, check_trunc, clear_root, isdone;
433	int merged, nentry, next_p, pgs_done, ret, t_ret, tdone;
434
435#ifdef	DEBUG
436#define	CTRACE(dbc, location, t, start, f) do {				\
437		DBT __trace;						\
438		DB_SET_DBT(__trace, t, strlen(t));			\
439		DEBUG_LWRITE(						\
440		    dbc, (dbc)->txn, location, &__trace, start, f)	\
441	} while (0)
442#define	PTRACE(dbc, location, p, start, f) do {				\
443		char __buf[32];						\
444		(void)snprintf(__buf,					\
445		    sizeof(__buf), "pgno: %lu", (u_long)p);		\
446		CTRACE(dbc, location, __buf, start, f);			\
447	} while (0)
448#else
449#define	CTRACE(dbc, location, t, start, f)
450#define	PTRACE(dbc, location, p, start, f)
451#endif
452
453	ndbc = NULL;
454	pg = NULL;
455	npg = NULL;
456
457	isdone = 0;
458	tdone = 0;
459	pgs_done = 0;
460	next_recno = 0;
461	next_p = 0;
462	clear_root = 0;
463	metapgno = PGNO_BASE_MD;
464	LOCK_INIT(next_lock);
465	LOCK_INIT(nnext_lock);
466	LOCK_INIT(saved_lock);
467	LOCK_INIT(metalock);
468	LOCK_INIT(prev_lock);
469	check_trunc = c_data->compact_truncate != PGNO_INVALID;
470	check_dups = (!F_ISSET(dbc, DBC_OPD) &&
471	     F_ISSET(dbc->dbp, DB_AM_DUP)) || check_trunc;
472
473	dbp = dbc->dbp;
474	env = dbp->env;
475	dbmp = dbp->mpf;
476	cp = (BTREE_CURSOR *)dbc->internal;
477	pgs_free = c_data->compact_pages_free;
478
479	/* Search down the tree for the starting point. */
480	if ((ret = __bam_csearch(dbc,
481	    start, CS_READ | CS_GETRECNO, LEAFLEVEL)) != 0) {
482		/* Its not an error to compact an empty db. */
483		if (ret == DB_NOTFOUND)
484			ret = 0;
485		isdone = 1;
486		goto err;
487	}
488
489	/*
490	 * Get the first leaf page. The loop below will change pg so
491	 * we clear the stack reference so we don't put a a page twice.
492	 */
493	pg = cp->csp->page;
494	cp->csp->page = NULL;
495	next_recno = cp->recno;
496next:	/*
497	 * This is the start of the main compaction loop.  There are 3
498	 * parts to the process:
499	 * 1) Walk the leaf pages of the tree looking for a page to
500	 *	process.  We do this with read locks.  Save the
501	 *	key from the page and release it.
502	 * 2) Set up a cursor stack which will write lock the page
503	 *	and enough of its ancestors to get the job done.
504	 *	This could go to the root if we might delete a subtree
505	 *	or we have record numbers to update.
506	 * 3) Loop fetching pages after the above page and move enough
507	 *	data to fill it.
508	 * We exit the loop if we are at the end of the leaf pages, are
509	 * about to lock a new subtree (we span) or on error.
510	 */
511
512	/* Walk the pages looking for something to fill up. */
513	while ((npgno = NEXT_PGNO(pg)) != PGNO_INVALID) {
514		c_data->compact_pages_examine++;
515		PTRACE(dbc, "Next", PGNO(pg), start, 0);
516
517		/* If we have fetched the next page, get the new key. */
518		if (next_p == 1 &&
519		    dbc->dbtype != DB_RECNO && NUM_ENT(pg) != 0) {
520			if ((ret = __db_ret(dbc, pg, 0, start,
521			    &start->data, &start->ulen)) != 0)
522				goto err;
523		}
524		next_recno += NUM_ENT(pg);
525		if (P_FREESPACE(dbp, pg) > factor ||
526		     (check_trunc && PGNO(pg) > c_data->compact_truncate))
527			break;
528		if (stop != NULL && stop->size > 0) {
529			if ((ret = __bam_compact_isdone(dbc,
530			    stop, pg, &isdone)) != 0)
531				goto err;
532			if (isdone)
533				goto done;
534		}
535
536		/*
537		 * The page does not need more data or to be swapped,
538		 * check to see if we want to look at possible duplicate
539		 * trees or overflow records and the move on to the next page.
540		 */
541		cp->recno += NUM_ENT(pg);
542		next_p = 1;
543		tdone = pgs_done;
544		PTRACE(dbc, "Dups", PGNO(pg), start, 0);
545		if (check_dups && (ret = __bam_compact_dups(
546		     dbc, &pg, factor, 0, c_data, &pgs_done)) != 0)
547			goto err;
548		npgno = NEXT_PGNO(pg);
549		if ((ret = __memp_fput(dbmp,
550		     dbc->thread_info, pg, dbc->priority)) != 0)
551			goto err;
552		pg = NULL;
553		/*
554		 * If we don't do anything we don't need to hold
555		 * the lock on the previous page, so couple always.
556		 */
557		if ((ret = __db_lget(dbc,
558		    tdone == pgs_done ? LCK_COUPLE_ALWAYS : LCK_COUPLE,
559		    npgno, DB_LOCK_READ, 0, &cp->csp->lock)) != 0)
560			goto err;
561		if ((ret = __memp_fget(dbmp, &npgno,
562		     dbc->thread_info, dbc->txn, 0, &pg)) != 0)
563			goto err;
564	}
565
566	/*
567	 * When we get here we have 3 cases:
568	 * 1) We've reached the end of the leaf linked list and are done.
569	 * 2) A page whose freespace exceeds our target and therefore needs
570	 *	to have data added to it.
571	 * 3) A page that doesn't have too much free space but needs to be
572	 *	checked for truncation.
573	 * In both cases 2 and 3, we need that page's first key or record
574	 * number.  We may already have it, if not get it here.
575	 */
576	if ((nentry = NUM_ENT(pg)) != 0) {
577		next_p = 0;
578		/* Get a copy of the first recno on the page. */
579		if (dbc->dbtype == DB_RECNO) {
580			if ((ret = __db_retcopy(dbp->env, start,
581			     &cp->recno, sizeof(cp->recno),
582			     &start->data, &start->ulen)) != 0)
583				goto err;
584		} else if (start->size == 0 && (ret = __db_ret(dbc,
585		    pg, 0, start, &start->data, &start->ulen)) != 0)
586			goto err;
587
588		if (npgno == PGNO_INVALID) {
589			/* End of the tree, check its duplicates and exit. */
590			PTRACE(dbc, "GoDone", PGNO(pg), start, 0);
591			if (check_dups && (ret = __bam_compact_dups(dbc,
592			   &pg, factor, 0, c_data, &pgs_done)) != 0)
593				goto err;
594			c_data->compact_pages_examine++;
595			isdone = 1;
596			goto done;
597		}
598	}
599
600	/* Release the page so we don't deadlock getting its parent. */
601	if ((ret = __memp_fput(dbmp, dbc->thread_info, pg, dbc->priority)) != 0)
602		goto err;
603	if ((ret = __LPUT(dbc, cp->csp->lock)) != 0)
604		goto err;
605	BT_STK_CLR(cp);
606	pg = NULL;
607	saved_pgno = PGNO_INVALID;
608	prev_pgno = PGNO_INVALID;
609	nnext_pgno = PGNO_INVALID;
610
611	/*
612	 * We must lock the metadata page first because we cannot block
613	 * while holding interior nodes of the tree pinned.
614	 */
615
616	if (!LOCK_ISSET(metalock) && pgs_free == c_data->compact_pages_free &&
617	    (ret = __db_lget(dbc,
618	    LCK_ALWAYS, metapgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
619		goto err;
620
621	/*
622	 * Setup the cursor stack. There are 3 cases:
623	 * 1) the page is empty and will be deleted: nentry == 0.
624	 * 2) the next page has the same parent: *spanp == 0.
625	 * 3) the next page has a different parent: *spanp == 1.
626	 *
627	 * We now need to search the tree again, getting a write lock
628	 * on the page we are going to merge or delete.  We do this by
629	 * searching down the tree and locking as much of the subtree
630	 * above the page as needed.  In the case of a delete we will
631	 * find the maximal subtree that can be deleted. In the case
632	 * of merge if the current page and the next page are siblings
633	 * with the same parent then we only need to lock the parent.
634	 * Otherwise *span will be set and we need to search to find the
635	 * lowest common ancestor.  Dbc will be set to contain the subtree
636	 * containing the page to be merged or deleted. Ndbc will contain
637	 * the minimal subtree containing that page and its next sibling.
638	 * In all cases for DB_RECNO we simplify things and get the whole
639	 * tree if we need more than a single parent.
640	 * The tree can collapse while we don't have it locked, so the
641	 * page we are looking for may be gone.  If so we are at
642	 * the right most end of the leaf pages and are done.
643	 */
644
645retry:	pg = NULL;
646	if (npg != NULL && (ret = __memp_fput(dbmp,
647	     dbc->thread_info, npg, dbc->priority)) != 0)
648		goto err;
649	npg = NULL;
650	if (ndbc != NULL) {
651		ncp = (BTREE_CURSOR *)ndbc->internal;
652		if (clear_root == 1) {
653			ncp->sp->page = NULL;
654			LOCK_INIT(ncp->sp->lock);
655		}
656		if ((ret = __bam_stkrel(ndbc, 0)) != 0)
657			goto err;
658	}
659	clear_root = 0;
660	/* Case 1 -- page is empty. */
661	if (nentry == 0) {
662		CTRACE(dbc, "Empty", "", start, 0);
663		if (next_p == 1)
664			sflag = CS_NEXT_WRITE;
665		else
666			sflag = CS_DEL;
667		if ((ret = __bam_csearch(dbc, start, sflag, LEAFLEVEL)) != 0) {
668			isdone = 1;
669			if (ret == DB_NOTFOUND)
670				ret = 0;
671			goto err;
672		}
673
674		pg = cp->csp->page;
675		/* Check to see if the page is still empty. */
676		if (NUM_ENT(pg) != 0)
677			npgno = PGNO(pg);
678		else {
679			npgno = NEXT_PGNO(pg);
680			/* If this is now the root, we are very done. */
681			if (PGNO(pg) == cp->root)
682				isdone = 1;
683			else {
684				if (npgno != PGNO_INVALID) {
685					TRY_LOCK(dbc, npgno, saved_pgno,
686					    next_lock, DB_LOCK_WRITE, retry);
687					if (ret != 0)
688						goto err;
689				}
690				if (PREV_PGNO(pg) != PGNO_INVALID) {
691					TRY_LOCK(dbc, PREV_PGNO(pg), prev_pgno,
692					    prev_lock, DB_LOCK_WRITE, retry);
693					if (ret != 0)
694						goto err;
695				}
696				if ((ret =
697				    __bam_dpages(dbc, 0, BTD_RELINK)) != 0)
698					goto err;
699				c_data->compact_pages_free++;
700				if ((ret = __TLPUT(dbc, prev_lock)) != 0)
701					goto err;
702				LOCK_INIT(prev_lock);
703				if ((ret = __TLPUT(dbc, next_lock)) != 0)
704					goto err;
705				LOCK_INIT(next_lock);
706				goto next_no_release;
707			}
708		}
709		goto next_page;
710	}
711
712	/* case 3 -- different parents. */
713	if (*spanp) {
714		CTRACE(dbc, "Span", "", start, 0);
715		/*
716		 * Search the tree looking for the page containing and
717		 * the next page after the current key.
718		 * The stack will be rooted at the page that spans
719		 * the current and next pages. The two subtrees
720		 * are returned below that.  For BTREE the current
721		 * page subtreee will be first while for RECNO the
722		 * next page subtree will be first
723		 */
724		if (ndbc == NULL && (ret = __dbc_dup(dbc, &ndbc, 0)) != 0)
725			goto err;
726		ncp = (BTREE_CURSOR *)ndbc->internal;
727
728		ncp->recno = cp->recno;
729		cp->recno = next_recno;
730
731		if ((ret = __bam_csearch(dbc, start, CS_NEXT_BOTH, 0)) != 0) {
732			if (ret == DB_NOTFOUND) {
733				isdone = 1;
734				ret = 0;
735			}
736			goto err;
737		}
738
739		/*
740		 * Find the top of the stack for the second subtree.
741		 */
742		for (epg = cp->csp - 1; epg > cp->sp; epg--)
743			if (LEVEL(epg->page) == LEAFLEVEL)
744				break;
745		DB_ASSERT(env, epg != cp->sp);
746
747		/*
748		 * Copy the root. We will have two instances of the
749		 * same page, be careful not to free both.
750		 */
751		BT_STK_PUSH(env, ncp, cp->sp->page, cp->sp->indx,
752		     cp->sp->lock, cp->sp->lock_mode, ret);
753		if (ret != 0)
754			goto err;
755		clear_root = 1;
756
757		/* Copy the stack containing the next page. */
758		for (epg++; epg <= cp->csp; epg++) {
759			BT_STK_PUSH(env, ncp, epg->page, epg->indx,
760			     epg->lock, epg->lock_mode, ret);
761			if (ret != 0)
762				goto err;
763		}
764		/* adjust the stack pointer to remove these items. */
765		ncp->csp--;
766		cp->csp -= ncp->csp - ncp->sp;
767
768		/*
769		 * If this is RECNO then we want to swap the stacks.
770		 */
771		if (dbp->type == DB_RECNO) {
772			ndbc->internal = (DBC_INTERNAL *)cp;
773			dbc->internal = (DBC_INTERNAL *)ncp;
774			cp = ncp;
775			ncp = (BTREE_CURSOR *)ndbc->internal;
776			cp->sp->indx--;
777		} else
778			ncp->sp->indx++;
779
780		DB_ASSERT(env,
781		    NEXT_PGNO(cp->csp->page) == PGNO(ncp->csp->page));
782		pg = cp->csp->page;
783
784		/*
785		 * The page may have emptied while we waited for the
786		 * lock or the record we are looking for may have
787		 * moved.
788		 * Reset npgno so we re-get this page when we go back
789		 * to the top.
790		 */
791		if (NUM_ENT(pg) == 0 ||
792		     (dbc->dbtype == DB_RECNO &&
793		     NEXT_PGNO(cp->csp->page) != PGNO(ncp->csp->page))) {
794			npgno = PGNO(pg);
795			*spanp = 0;
796			goto next_page;
797		}
798
799		if (check_trunc && PGNO(pg) > c_data->compact_truncate) {
800			if (PREV_PGNO(pg) != PGNO_INVALID) {
801				TRY_LOCK2(dbc, ndbc, PREV_PGNO(pg), prev_pgno,
802				    prev_lock, DB_LOCK_WRITE, retry);
803				if (ret != 0)
804					goto err1;
805			}
806			pgs_done++;
807			/* Get a fresh low numbered page. */
808			if ((ret = __bam_truncate_page(dbc,
809			    &pg, ncp->csp->page, 1)) != 0)
810				goto err1;
811			if ((ret = __TLPUT(dbc, prev_lock)) != 0)
812				goto err;
813			LOCK_INIT(prev_lock);
814		}
815		*spanp = 0;
816		PTRACE(dbc, "SDups", PGNO(ncp->csp->page), start, 0);
817		if (check_dups && (ret = __bam_compact_dups(ndbc,
818		     &ncp->csp->page, factor, 1, c_data, &pgs_done)) != 0)
819			goto err1;
820
821		/* Check to see if the tree collapsed. */
822		if (PGNO(ncp->csp->page) == ncp->root)
823			goto done;
824
825		pg = cp->csp->page;
826		npgno = NEXT_PGNO(pg);
827		PTRACE(dbc, "SDups", PGNO(pg), start, 0);
828		if (check_dups && (ret =
829		     __bam_compact_dups(dbc, &cp->csp->page,
830		     factor, 1, c_data, &pgs_done)) != 0)
831			goto err1;
832
833		/*
834		 * We may have dropped our locks, check again
835		 * to see if we still need to fill this page and
836		 * we are in a spanning situation.
837		 */
838
839		if (P_FREESPACE(dbp, pg) <= factor ||
840		     cp->csp[-1].indx != NUM_ENT(cp->csp[-1].page) - 1)
841			goto next_page;
842
843		/*
844		 * Try to move things into a single parent.
845		 */
846		merged = 0;
847		for (epg = cp->sp; epg != cp->csp; epg++) {
848			PTRACE(dbc, "PMerge", PGNO(epg->page), start, 0);
849			if ((ret = __bam_merge_internal(dbc,
850			    ndbc, LEVEL(epg->page), c_data, &merged)) != 0)
851				break;
852			if (merged)
853				break;
854		}
855
856		if (ret != 0 && ret != DB_LOCK_NOTGRANTED)
857			goto err1;
858		/*
859		 * If we merged the parent, then we no longer span.
860		 * Otherwise if we tried to merge the parent but would
861		 * block on one of the other leaf pages try again.
862		 * If we did not merge any records of the parent,
863		 * exit to commit any local transactions and try again.
864		 */
865		if (merged || ret == DB_LOCK_NOTGRANTED) {
866			if (merged)
867				pgs_done++;
868			else
869				goto done;
870			if (cp->csp->page == NULL)
871				goto deleted;
872			npgno = PGNO(pg);
873			next_recno = cp->recno;
874			goto next_page;
875		}
876		PTRACE(dbc, "SMerge", PGNO(cp->csp->page), start, 0);
877
878		/* if we remove the next page, then we need its next locked */
879		npgno = NEXT_PGNO(ncp->csp->page);
880		if (npgno != PGNO_INVALID) {
881			TRY_LOCK2(dbc, ndbc, npgno,
882			    nnext_pgno, nnext_lock, DB_LOCK_WRITE, retry);
883			if (ret != 0)
884				goto err1;
885		}
886		if ((ret = __bam_merge(dbc,
887		     ndbc, factor, stop, c_data, &isdone)) != 0)
888			goto err1;
889		pgs_done++;
890		/*
891		 * __bam_merge could have freed our stack if it
892		 * deleted a page possibly collapsing the tree.
893		 */
894		if (cp->csp->page == NULL)
895			goto deleted;
896		cp->recno += NUM_ENT(pg);
897
898		if ((ret = __TLPUT(dbc, nnext_lock)) != 0)
899			goto err1;
900		LOCK_INIT(nnext_lock);
901
902		/* If we did not bump to the next page something did not fit. */
903		if (npgno != NEXT_PGNO(pg)) {
904			npgno = NEXT_PGNO(pg);
905			goto next_page;
906		}
907	} else {
908		/* Case 2 -- same parents. */
909		CTRACE(dbc, "Sib", "", start, 0);
910		if ((ret =
911		    __bam_csearch(dbc, start, CS_PARENT, LEAFLEVEL)) != 0) {
912			if (ret == DB_NOTFOUND) {
913				isdone = 1;
914				ret = 0;
915			}
916			goto err;
917		}
918
919		pg = cp->csp->page;
920		DB_ASSERT(env, IS_DIRTY(pg));
921		DB_ASSERT(env,
922		    PGNO(pg) == cp->root || IS_DIRTY(cp->csp[-1].page));
923
924		/* We now have a write lock, recheck the page. */
925		if ((nentry = NUM_ENT(pg)) == 0) {
926			npgno = PGNO(pg);
927			goto next_page;
928		}
929
930		/* Check duplicate trees, we have a write lock on the page. */
931		PTRACE(dbc, "SibDup", PGNO(pg), start, 0);
932		if (check_dups && (ret =
933		     __bam_compact_dups(dbc, &cp->csp->page,
934		     factor, 1, c_data, &pgs_done)) != 0)
935			goto err1;
936		pg = cp->csp->page;
937		npgno = NEXT_PGNO(pg);
938
939		/* Check to see if the tree collapsed. */
940		if (PGNO(pg) == cp->root)
941			goto err1;
942		DB_ASSERT(env, cp->csp - cp->sp == 1);
943
944		/* After re-locking check to see if we still need to fill. */
945		if (P_FREESPACE(dbp, pg) <= factor) {
946			if (check_trunc &&
947			    PGNO(pg) > c_data->compact_truncate) {
948				if (PREV_PGNO(pg) != PGNO_INVALID) {
949					TRY_LOCK(dbc, PREV_PGNO(pg), prev_pgno,
950					    prev_lock, DB_LOCK_WRITE, retry);
951					if (ret != 0)
952						goto err1;
953				}
954				if (npgno != PGNO_INVALID) {
955					TRY_LOCK(dbc, npgno, saved_pgno,
956					    next_lock, DB_LOCK_WRITE, retry);
957					if (ret != 0)
958						goto err1;
959				}
960				pgs_done++;
961				/* Get a fresh low numbered page. */
962				if ((ret = __bam_truncate_page(dbc,
963				    &pg, NULL, 1)) != 0)
964					goto err1;
965				if ((ret = __TLPUT(dbc, prev_lock)) != 0)
966					goto err1;
967				LOCK_INIT(prev_lock);
968				if ((ret = __TLPUT(dbc, next_lock)) != 0)
969					goto err1;
970				LOCK_INIT(next_lock);
971			}
972			goto next_page;
973		}
974
975		/* If they have the same parent, just dup the cursor */
976		if (ndbc != NULL && (ret = __dbc_close(ndbc)) != 0)
977			goto err1;
978		if ((ret = __dbc_dup(dbc, &ndbc, DB_POSITION)) != 0)
979			goto err1;
980		ncp = (BTREE_CURSOR *)ndbc->internal;
981
982		/*
983		 * ncp->recno needs to have the recno of the next page.
984		 * Bump it by the number of records on the current page.
985		 */
986		ncp->recno += NUM_ENT(pg);
987	}
988
989	pgno = PGNO(cp->csp->page);
990	ppgno = PGNO(cp->csp[-1].page);
991	/* Fetch pages until we fill this one. */
992	while (!isdone && npgno != PGNO_INVALID &&
993	     P_FREESPACE(dbp, pg) > factor && c_data->compact_pages != 0) {
994		/*
995		 * merging may have to free the parent page, if it does,
996		 * refetch it but do it decending the tree.
997		 */
998		epg = &cp->csp[-1];
999		if ((ppg = epg->page) == NULL) {
1000			if ((ret = __memp_fput(dbmp, dbc->thread_info,
1001			     cp->csp->page, dbc->priority)) != 0)
1002				goto err1;
1003			pg = NULL;
1004			if ((ret = __memp_fget(dbmp, &ppgno, dbc->thread_info,
1005			    dbc->txn, DB_MPOOL_DIRTY, &ppg)) != 0)
1006				goto err1;
1007			if ((ret = __memp_fget(dbmp, &pgno, dbc->thread_info,
1008			    dbc->txn, DB_MPOOL_DIRTY, &pg)) != 0)
1009				goto err1;
1010			epg->page = ppg;
1011			cp->csp->page = pg;
1012		}
1013
1014		/*
1015		 * If our current position is the last one on a parent
1016		 * page, then we are about to merge across different
1017		 * internal nodes.  Thus, we need to lock higher up
1018		 * in the tree.  We will exit the routine and commit
1019		 * what we have done so far.  Set spanp so we know
1020		 * we are in this case when we come back.
1021		 */
1022		if (epg->indx == NUM_ENT(ppg) - 1) {
1023			*spanp = 1;
1024			npgno = PGNO(pg);
1025			next_recno = cp->recno;
1026			epg->page = ppg;
1027			goto next_page;
1028		}
1029
1030		/* Lock and get the next page. */
1031		TRY_LOCK(dbc, npgno,
1032		    saved_pgno, saved_lock, DB_LOCK_WRITE, retry);
1033		if (ret != 0)
1034			goto err1;
1035		if ((ret = __LPUT(dbc, ncp->lock)) != 0)
1036			goto err1;
1037		ncp->lock = saved_lock;
1038		LOCK_INIT(saved_lock);
1039		saved_pgno = PGNO_INVALID;
1040
1041		if ((ret = __memp_fget(dbmp, &npgno,
1042		    dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &npg)) != 0)
1043			goto err1;
1044
1045		if (check_trunc &&
1046		    PGNO(pg) > c_data->compact_truncate) {
1047			if (PREV_PGNO(pg) != PGNO_INVALID) {
1048				TRY_LOCK(dbc, PREV_PGNO(pg),
1049				    prev_pgno, prev_lock, DB_LOCK_WRITE, retry);
1050				if (ret != 0)
1051					goto err1;
1052			}
1053			pgs_done++;
1054			/* Get a fresh low numbered page. */
1055			if ((ret = __bam_truncate_page(dbc, &pg, npg, 1)) != 0)
1056				goto err1;
1057			if ((ret = __TLPUT(dbc, prev_lock)) != 0)
1058				goto err1;
1059			LOCK_INIT(prev_lock);
1060			pgno = PGNO(pg);
1061		}
1062		c_data->compact_pages_examine++;
1063
1064		PTRACE(dbc, "MDups", PGNO(npg), start, 0);
1065		if (check_dups && (ret = __bam_compact_dups(ndbc,
1066		     &npg, factor, 1, c_data, &pgs_done)) != 0)
1067			goto err1;
1068
1069		npgno = NEXT_PGNO(npg);
1070		if (npgno != PGNO_INVALID) {
1071			TRY_LOCK(dbc, npgno,
1072			    nnext_pgno, nnext_lock, DB_LOCK_WRITE, retry);
1073			if (ret != 0)
1074				goto err1;
1075		}
1076
1077		/* copy the common parent to the stack. */
1078		BT_STK_PUSH(env, ncp, ppg,
1079		     epg->indx + 1, epg->lock, epg->lock_mode, ret);
1080		if (ret != 0)
1081			goto err1;
1082
1083		/* Put the page on the stack. */
1084		BT_STK_ENTER(env, ncp, npg, 0, ncp->lock, DB_LOCK_WRITE, ret);
1085
1086		LOCK_INIT(ncp->lock);
1087		npg = NULL;
1088
1089		/*
1090		 * Merge the pages.  This will either free the next
1091		 * page or just update its parent pointer.
1092		 */
1093		PTRACE(dbc, "Merge", PGNO(cp->csp->page), start, 0);
1094		if ((ret = __bam_merge(dbc,
1095		     ndbc, factor, stop, c_data, &isdone)) != 0)
1096			goto err1;
1097
1098		pgs_done++;
1099
1100		if ((ret = __TLPUT(dbc, nnext_lock)) != 0)
1101			goto err1;
1102		LOCK_INIT(nnext_lock);
1103
1104		/*
1105		 * __bam_merge could have freed our stack if it
1106		 * deleted a page possibly collapsing the tree.
1107		 */
1108		if (cp->csp->page == NULL)
1109			goto deleted;
1110		/* If we did not bump to the next page something did not fit. */
1111		if (npgno != NEXT_PGNO(pg))
1112			break;
1113	}
1114
1115	/* Bottom of the main loop.  Move to the next page. */
1116	npgno = NEXT_PGNO(pg);
1117	cp->recno += NUM_ENT(pg);
1118	next_recno = cp->recno;
1119
1120next_page:
1121	if (ndbc != NULL) {
1122		ncp = (BTREE_CURSOR *)ndbc->internal;
1123		if (ncp->sp->page == cp->sp->page) {
1124			ncp->sp->page = NULL;
1125			LOCK_INIT(ncp->sp->lock);
1126		}
1127		if ((ret = __bam_stkrel(ndbc,
1128		     pgs_done == 0 ? STK_NOLOCK : 0)) != 0)
1129			goto err;
1130	}
1131	/*
1132	 * Unlatch the tree before trying to lock the next page.  We must
1133	 * unlatch to avoid a latch deadlock but we want to hold the
1134	 * lock on the parent node so this leaf cannot be unlinked.
1135	 */
1136	pg = NULL;
1137	if ((ret = __bam_stkrel(dbc, STK_PGONLY)) != 0)
1138		goto err;
1139	if ((ret = __db_lget(dbc, 0, npgno, DB_LOCK_READ, 0, &next_lock)) != 0)
1140		goto err;
1141	if ((ret = __bam_stkrel(dbc, pgs_done == 0 ? STK_NOLOCK : 0)) != 0)
1142		goto err;
1143	if ((ret = __TLPUT(dbc, saved_lock)) != 0)
1144		goto err;
1145	if ((ret = __TLPUT(dbc, prev_lock)) != 0)
1146		goto err;
1147
1148next_no_release:
1149	pg = NULL;
1150
1151	if (npgno == PGNO_INVALID || c_data->compact_pages  == 0)
1152		isdone = 1;
1153	if (!isdone) {
1154		/*
1155		 * If we are at the end of this parent commit the
1156		 * transaction so we don't tie things up.
1157		 */
1158		if (pgs_done != 0 && *spanp) {
1159deleted:		if (((ret = __bam_stkrel(ndbc, 0)) != 0 ||
1160			     (ret = __dbc_close(ndbc)) != 0))
1161				goto err;
1162			*donep = 0;
1163			return (0);
1164		}
1165
1166		/* Reget the next page to look at. */
1167		cp->recno = next_recno;
1168		if ((ret = __memp_fget(dbmp, &npgno,
1169		    dbc->thread_info, dbc->txn, 0, &pg)) != 0)
1170			goto err;
1171		cp->csp->lock = next_lock;
1172		LOCK_INIT(next_lock);
1173		next_p = 1;
1174		/* If we did not do anything we can drop the metalock. */
1175		if (pgs_done == 0 && (ret = __LPUT(dbc, metalock)) != 0)
1176			goto err;
1177		goto next;
1178	}
1179
1180done:
1181	if (0) {
1182		/*
1183		 * We come here if pg came from cp->csp->page and could
1184		 * have already been fput.
1185		 */
1186err1:		pg = NULL;
1187	}
1188err:	/*
1189	 * Don't release locks (STK_PGONLY)if we had an error, we could reveal
1190	 * a bad tree to a dirty reader.  Wait till the abort to free the locks.
1191	 */
1192	sflag = STK_CLRDBC;
1193	if (dbc->txn != NULL && ret != 0)
1194		sflag |= STK_PGONLY;
1195	if (ndbc != NULL) {
1196		ncp = (BTREE_CURSOR *)ndbc->internal;
1197		if (npg == ncp->csp->page)
1198			npg = NULL;
1199		if (ncp->sp->page == cp->sp->page) {
1200			ncp->sp->page = NULL;
1201			LOCK_INIT(ncp->sp->lock);
1202		}
1203		if ((t_ret = __bam_stkrel(ndbc, sflag)) != 0 && ret == 0)
1204			ret = t_ret;
1205		else if ((t_ret = __dbc_close(ndbc)) != 0 && ret == 0)
1206			ret = t_ret;
1207	}
1208	if (pg == cp->csp->page)
1209		pg = NULL;
1210	if ((t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0)
1211		ret = t_ret;
1212
1213	if ((t_ret = __TLPUT(dbc, metalock)) != 0 && ret == 0)
1214		ret = t_ret;
1215
1216	if (pg != NULL && (t_ret =
1217	     __memp_fput(dbmp,
1218		  dbc->thread_info, pg, dbc->priority) != 0) && ret == 0)
1219		ret = t_ret;
1220	if (npg != NULL && (t_ret =
1221	     __memp_fput(dbmp,
1222		  dbc->thread_info, npg, dbc->priority) != 0) && ret == 0)
1223		ret = t_ret;
1224
1225	*donep = isdone;
1226
1227	return (ret);
1228}
1229
1230/*
1231 * __bam_merge -- do actual merging of leaf pages.
1232 */
1233static int
1234__bam_merge(dbc, ndbc, factor, stop, c_data, donep)
1235	DBC *dbc, *ndbc;
1236	u_int32_t factor;
1237	DBT *stop;
1238	DB_COMPACT *c_data;
1239	int *donep;
1240{
1241	BTREE_CURSOR *cp, *ncp;
1242	DB *dbp;
1243	PAGE *pg, *npg;
1244	db_indx_t nent;
1245	int ret;
1246
1247	DB_ASSERT(NULL, dbc != NULL);
1248	DB_ASSERT(NULL, ndbc != NULL);
1249	dbp = dbc->dbp;
1250	cp = (BTREE_CURSOR *)dbc->internal;
1251	ncp = (BTREE_CURSOR *)ndbc->internal;
1252	pg = cp->csp->page;
1253	npg = ncp->csp->page;
1254
1255	nent = NUM_ENT(npg);
1256
1257	/* If the page is empty just throw it away. */
1258	if (nent == 0)
1259		goto free_page;
1260
1261	/* Find if the stopping point is on this page. */
1262	if (stop != NULL && stop->size != 0) {
1263		if ((ret = __bam_compact_isdone(dbc, stop, npg, donep)) != 0)
1264			return (ret);
1265		if (*donep)
1266			return (0);
1267	}
1268
1269	/*
1270	 * If there is too much data then just move records one at a time.
1271	 * Otherwise copy the data space over and fix up the index table.
1272	 * If we are on the left most child we will effect our parent's
1273	 * index entry so we call merge_records to figure out key sizes.
1274	 */
1275	if ((dbc->dbtype == DB_BTREE &&
1276	    ncp->csp[-1].indx == 0 && ncp->csp[-1].entries != 1) ||
1277	    (int)(P_FREESPACE(dbp, pg) -
1278	    ((dbp->pgsize - P_OVERHEAD(dbp)) -
1279	    P_FREESPACE(dbp, npg))) < (int)factor)
1280		ret = __bam_merge_records(dbc, ndbc, factor, c_data);
1281	else
1282free_page:	ret = __bam_merge_pages(dbc, ndbc, c_data);
1283
1284	return (ret);
1285}
1286
1287static int
1288__bam_merge_records(dbc, ndbc, factor, c_data)
1289	DBC *dbc, *ndbc;
1290	u_int32_t factor;
1291	DB_COMPACT *c_data;
1292{
1293	BINTERNAL *bi;
1294	BKEYDATA *bk, *tmp_bk;
1295	BTREE *t;
1296	BTREE_CURSOR *cp, *ncp;
1297	DB *dbp;
1298	DBT a, b, data, hdr;
1299	ENV *env;
1300	EPG *epg;
1301	PAGE *pg, *npg;
1302	db_indx_t adj, indx, nent, *ninp, pind;
1303	int32_t adjust;
1304	u_int32_t freespace, nksize, pfree, size;
1305	int first_dup, is_dup, next_dup, n_ok, ret;
1306	size_t (*func) __P((DB *, const DBT *, const DBT *));
1307
1308	dbp = dbc->dbp;
1309	env = dbp->env;
1310	t = dbp->bt_internal;
1311	cp = (BTREE_CURSOR *)dbc->internal;
1312	ncp = (BTREE_CURSOR *)ndbc->internal;
1313	pg = cp->csp->page;
1314	npg = ncp->csp->page;
1315	memset(&hdr, 0, sizeof(hdr));
1316	pind = NUM_ENT(pg);
1317	n_ok = 0;
1318	adjust = 0;
1319	ret = 0;
1320	nent = NUM_ENT(npg);
1321
1322	DB_ASSERT(env, nent != 0);
1323
1324	/* See if we want to swap out this page. */
1325	if (c_data->compact_truncate != PGNO_INVALID &&
1326	     PGNO(npg) > c_data->compact_truncate) {
1327		/* Get a fresh low numbered page. */
1328		if ((ret = __bam_truncate_page(ndbc, &npg, pg, 1)) != 0)
1329			goto err;
1330	}
1331
1332	ninp = P_INP(dbp, npg);
1333
1334	/*
1335	 * pg is the page that is being filled, it is in the stack in cp.
1336	 * npg is the next page, it is in the stack in ncp.
1337	 */
1338	freespace = P_FREESPACE(dbp, pg);
1339
1340	adj = TYPE(npg) == P_LBTREE ? P_INDX : O_INDX;
1341	/*
1342	 * Loop through the records and find the stopping point.
1343	 */
1344	for (indx = 0; indx < nent; indx += adj)  {
1345		bk = GET_BKEYDATA(dbp, npg, indx);
1346
1347		/* Size of the key. */
1348		size = BITEM_PSIZE(bk);
1349
1350		/* Size of the data. */
1351		if (TYPE(pg) == P_LBTREE)
1352			size += BITEM_PSIZE(GET_BKEYDATA(dbp, npg, indx + 1));
1353		/*
1354		 * If we are at a duplicate set, skip ahead to see and
1355		 * get the total size for the group.
1356		 */
1357		n_ok = adj;
1358		if (TYPE(pg) == P_LBTREE &&
1359		     indx < nent - adj &&
1360		     ninp[indx] == ninp[indx + adj]) {
1361			do {
1362				/* Size of index for key reference. */
1363				size += sizeof(db_indx_t);
1364				n_ok++;
1365				/* Size of data item. */
1366				size += BITEM_PSIZE(
1367				    GET_BKEYDATA(dbp, npg, indx + n_ok));
1368				n_ok++;
1369			} while (indx + n_ok < nent &&
1370			    ninp[indx] == ninp[indx + n_ok]);
1371		}
1372		/* if the next set will not fit on the page we are done. */
1373		if (freespace < size)
1374			break;
1375
1376		/*
1377		 * Otherwise figure out if we are past the goal and if
1378		 * adding this set will put us closer to the goal than
1379		 * we are now.
1380		 */
1381		if ((freespace - size) < factor) {
1382			if (freespace - factor > factor - (freespace - size))
1383				indx += n_ok;
1384			break;
1385		}
1386		freespace -= size;
1387		indx += n_ok - adj;
1388	}
1389
1390	/* If we have hit the first record then there is nothing we can move. */
1391	if (indx == 0)
1392		goto done;
1393	if (TYPE(pg) != P_LBTREE && TYPE(pg) != P_LDUP) {
1394		if (indx == nent)
1395			return (__bam_merge_pages(dbc, ndbc, c_data));
1396		goto no_check;
1397	}
1398	/*
1399	 * We need to update npg's parent key.  Avoid creating a new key
1400	 * that will be too big. Get what space will be available on the
1401	 * parents. Then if there will not be room for this key, see if
1402	 * prefix compression will make it work, if not backup till we
1403	 * find something that will.  (Needless to say, this is a very
1404	 * unlikely event.)  If we are deleting this page then we will
1405	 * need to propagate the next key to our grand parents, so we
1406	 * see if that will fit.
1407	 */
1408	pfree = dbp->pgsize;
1409	for (epg = &ncp->csp[-1]; epg >= ncp->sp; epg--)
1410		if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) {
1411			bi = GET_BINTERNAL(dbp, epg->page, epg->indx);
1412			/* Add back in the key we will be deleting. */
1413			freespace += BINTERNAL_PSIZE(bi->len);
1414			if (freespace < pfree)
1415				pfree = freespace;
1416			if (epg->indx != 0)
1417				break;
1418		}
1419
1420	/*
1421	 * If we are at the end, we will delete this page.  We need to
1422	 * check the next parent key only if we are the leftmost page and
1423	 * will therefore have to propagate the key up the tree.
1424	 */
1425	if (indx == nent) {
1426		if (ncp->csp[-1].indx != 0 || ncp->csp[-1].entries == 1 ||
1427		     BINTERNAL_PSIZE(GET_BINTERNAL(dbp,
1428		     ncp->csp[-1].page, 1)->len) <= pfree)
1429			return (__bam_merge_pages(dbc, ndbc, c_data));
1430		indx -= adj;
1431	}
1432	bk = GET_BKEYDATA(dbp, npg, indx);
1433	if (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) {
1434		if (F_ISSET(dbc, DBC_OPD)) {
1435			if (dbp->dup_compare == __bam_defcmp)
1436				func = __bam_defpfx;
1437			else
1438				func = NULL;
1439		} else
1440			func = t->bt_prefix;
1441	} else
1442		func = NULL;
1443
1444	/* Skip to the beginning of a duplicate set. */
1445	while (indx != 0 && ninp[indx] == ninp[indx - adj])
1446		indx -= adj;
1447
1448	while (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) {
1449		if (B_TYPE(bk->type) != B_KEYDATA)
1450			goto noprefix;
1451		/*
1452		 * Figure out if we can truncate this key.
1453		 * Code borrowed from bt_split.c
1454		 */
1455		if (func == NULL)
1456			goto noprefix;
1457		tmp_bk = GET_BKEYDATA(dbp, npg, indx - adj);
1458		if (B_TYPE(tmp_bk->type) != B_KEYDATA)
1459			goto noprefix;
1460		memset(&a, 0, sizeof(a));
1461		a.size = tmp_bk->len;
1462		a.data = tmp_bk->data;
1463		memset(&b, 0, sizeof(b));
1464		b.size = bk->len;
1465		b.data = bk->data;
1466		nksize = (u_int32_t)func(dbp, &a, &b);
1467		if (BINTERNAL_PSIZE(nksize) < pfree)
1468			break;
1469noprefix:
1470		/* Skip to the beginning of a duplicate set. */
1471		do {
1472			indx -= adj;
1473		} while (indx != 0 &&  ninp[indx] == ninp[indx - adj]);
1474
1475		bk = GET_BKEYDATA(dbp, npg, indx);
1476	}
1477
1478	/*
1479	 * indx references the first record that will not move to the previous
1480	 * page.  If it is 0 then we could not find a key that would fit in
1481	 * the parent that would permit us to move any records.
1482	 */
1483	if (indx == 0)
1484		goto done;
1485	DB_ASSERT(env, indx <= nent);
1486
1487	/* Loop through the records and move them from npg to pg. */
1488no_check: is_dup = first_dup = next_dup = 0;
1489	pg = cp->csp->page;
1490	npg = ncp->csp->page;
1491	DB_ASSERT(env, IS_DIRTY(pg));
1492	DB_ASSERT(env, IS_DIRTY(npg));
1493	ninp = P_INP(dbp, npg);
1494	do {
1495		bk = GET_BKEYDATA(dbp, npg, 0);
1496		/* Figure out if we are in a duplicate group or not. */
1497		if ((NUM_ENT(npg) % 2) == 0) {
1498			if (NUM_ENT(npg) > 2 && ninp[0] == ninp[2]) {
1499				if (!is_dup) {
1500					first_dup = 1;
1501					is_dup = 1;
1502				} else
1503					first_dup = 0;
1504
1505				next_dup = 1;
1506			} else if (next_dup) {
1507				is_dup = 1;
1508				first_dup = 0;
1509				next_dup = 0;
1510			} else
1511				is_dup = 0;
1512		}
1513
1514		if (is_dup && !first_dup && (pind % 2) == 0) {
1515			/* Duplicate key. */
1516			if ((ret = __bam_adjindx(dbc,
1517			     pg, pind, pind - P_INDX, 1)) != 0)
1518				goto err;
1519			if (!next_dup)
1520				is_dup = 0;
1521		} else switch (B_TYPE(bk->type)) {
1522		case B_KEYDATA:
1523			hdr.data = bk;
1524			hdr.size = SSZA(BKEYDATA, data);
1525			data.size = bk->len;
1526			data.data = bk->data;
1527			if ((ret = __db_pitem(dbc, pg, pind,
1528			     BKEYDATA_SIZE(bk->len), &hdr, &data)) != 0)
1529				goto err;
1530			break;
1531		case B_OVERFLOW:
1532		case B_DUPLICATE:
1533			data.size = BOVERFLOW_SIZE;
1534			data.data = bk;
1535			if ((ret = __db_pitem(dbc, pg, pind,
1536			     BOVERFLOW_SIZE, &data, NULL)) != 0)
1537				goto err;
1538			break;
1539		default:
1540			__db_errx(env,
1541			    "Unknown record format, page %lu, indx 0",
1542			    (u_long)PGNO(pg));
1543			ret = EINVAL;
1544			goto err;
1545		}
1546		pind++;
1547		if (next_dup && (NUM_ENT(npg) % 2) == 0) {
1548			if ((ret = __bam_adjindx(ndbc,
1549			     npg, 0, O_INDX, 0)) != 0)
1550				goto err;
1551		} else {
1552			if ((ret = __db_ditem(ndbc,
1553			     npg, 0, BITEM_SIZE(bk))) != 0)
1554				goto err;
1555		}
1556		adjust++;
1557	} while (--indx != 0);
1558
1559	DB_ASSERT(env, NUM_ENT(npg) != 0);
1560
1561	if (adjust != 0 &&
1562	     (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))) {
1563		if (TYPE(pg) == P_LBTREE)
1564			adjust /= P_INDX;
1565		if ((ret = __bam_adjust(ndbc, -adjust)) != 0)
1566			goto err;
1567
1568		if ((ret = __bam_adjust(dbc, adjust)) != 0)
1569			goto err;
1570	}
1571
1572	/* Update parent with new key. */
1573	if (ndbc->dbtype == DB_BTREE &&
1574	    (ret = __bam_pupdate(ndbc, pg)) != 0)
1575		goto err;
1576
1577done:	if (cp->sp->page == ncp->sp->page) {
1578		cp->sp->page = NULL;
1579		LOCK_INIT(cp->sp->lock);
1580	}
1581	ret = __bam_stkrel(ndbc, STK_CLRDBC);
1582
1583err:	return (ret);
1584}
1585
1586static int
1587__bam_merge_pages(dbc, ndbc, c_data)
1588	DBC *dbc, *ndbc;
1589	DB_COMPACT *c_data;
1590{
1591	BTREE_CURSOR *cp, *ncp;
1592	DB *dbp;
1593	DBT data, hdr;
1594	DB_MPOOLFILE *dbmp;
1595	PAGE *pg, *npg;
1596	db_indx_t nent, *ninp, *pinp;
1597	db_pgno_t ppgno;
1598	u_int8_t *bp;
1599	u_int32_t len;
1600	int i, level, ret;
1601
1602	COMPQUIET(ppgno, PGNO_INVALID);
1603	dbp = dbc->dbp;
1604	dbmp = dbp->mpf;
1605	cp = (BTREE_CURSOR *)dbc->internal;
1606	ncp = (BTREE_CURSOR *)ndbc->internal;
1607	pg = cp->csp->page;
1608	npg = ncp->csp->page;
1609	memset(&hdr, 0, sizeof(hdr));
1610	nent = NUM_ENT(npg);
1611
1612	/* If the page is empty just throw it away. */
1613	if (nent == 0)
1614		goto free_page;
1615
1616	pg = cp->csp->page;
1617	npg = ncp->csp->page;
1618	DB_ASSERT(dbp->env, IS_DIRTY(pg));
1619	DB_ASSERT(dbp->env, IS_DIRTY(npg));
1620	DB_ASSERT(dbp->env, nent == NUM_ENT(npg));
1621
1622	/* Bulk copy the data to the new page. */
1623	len = dbp->pgsize - HOFFSET(npg);
1624	if (DBC_LOGGING(dbc)) {
1625		memset(&hdr, 0, sizeof(hdr));
1626		hdr.data = npg;
1627		hdr.size = LOFFSET(dbp, npg);
1628		memset(&data, 0, sizeof(data));
1629		data.data = (u_int8_t *)npg + HOFFSET(npg);
1630		data.size = len;
1631		if ((ret = __bam_merge_log(dbp,
1632		     dbc->txn, &LSN(pg), 0, PGNO(pg),
1633		     &LSN(pg), PGNO(npg), &LSN(npg), &hdr, &data, 0)) != 0)
1634			goto err;
1635	} else
1636		LSN_NOT_LOGGED(LSN(pg));
1637	LSN(npg) = LSN(pg);
1638	bp = (u_int8_t *)pg + HOFFSET(pg) - len;
1639	memcpy(bp, (u_int8_t *)npg + HOFFSET(npg), len);
1640
1641	/* Copy index table offset by what was there already. */
1642	pinp = P_INP(dbp, pg) + NUM_ENT(pg);
1643	ninp = P_INP(dbp, npg);
1644	for (i = 0; i < NUM_ENT(npg); i++)
1645		*pinp++ = *ninp++ - (dbp->pgsize - HOFFSET(pg));
1646	HOFFSET(pg) -= len;
1647	NUM_ENT(pg) += i;
1648
1649	NUM_ENT(npg) = 0;
1650	HOFFSET(npg) += len;
1651
1652	if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD)) {
1653		/*
1654		 * There are two cases here regarding the stack.
1655		 * Either we have two two level stacks but only ndbc
1656		 * references the parent page or we have a multilevel
1657		 * stack and only ndbc has an entry for the spanning
1658		 * page.
1659		 */
1660		if (TYPE(pg) == P_LBTREE)
1661			i /= P_INDX;
1662		if ((ret = __bam_adjust(ndbc, -i)) != 0)
1663			goto err;
1664
1665		if ((ret = __bam_adjust(dbc, i)) != 0)
1666			goto err;
1667	}
1668
1669free_page:
1670	/*
1671	 * __bam_dpages may decide to collapse the tree.
1672	 * This can happen if we have the root and there
1673	 * are exactly 2 pointers left in it.
1674	 * If it can collapse the tree we must free the other
1675	 * stack since it will nolonger be valid.  This
1676	 * must be done before hand because we cannot
1677	 * hold a page pinned if it might be truncated.
1678	 */
1679	if ((ret = __bam_relink(dbc,
1680	    ncp->csp->page, cp->csp->page, PGNO_INVALID)) != 0)
1681		goto err;
1682	/* Drop the duplicate reference to the sub tree root. */
1683	cp->sp->page = NULL;
1684	LOCK_INIT(cp->sp->lock);
1685	if (PGNO(ncp->sp->page) == ncp->root &&
1686	    NUM_ENT(ncp->sp->page) == 2) {
1687		if ((ret = __bam_stkrel(dbc, STK_CLRDBC | STK_PGONLY)) != 0)
1688			goto err;
1689		level = LEVEL(ncp->sp->page);
1690		ppgno = PGNO(ncp->csp[-1].page);
1691	} else
1692		level = 0;
1693	if (c_data->compact_truncate > PGNO(npg))
1694		c_data->compact_truncate--;
1695	if ((ret = __bam_dpages(ndbc,
1696	    0, ndbc->dbtype == DB_RECNO ? 0 : BTD_UPDATE)) != 0)
1697		goto err;
1698	npg = NULL;
1699	c_data->compact_pages_free++;
1700	c_data->compact_pages--;
1701	if (level != 0) {
1702		if ((ret = __memp_fget(dbmp, &ncp->root,
1703		    dbc->thread_info, dbc->txn, 0, &npg)) != 0)
1704			goto err;
1705		if (level == LEVEL(npg))
1706			level = 0;
1707		if ((ret = __memp_fput(dbmp,
1708		     dbc->thread_info, npg, dbc->priority)) != 0)
1709			goto err;
1710		npg = NULL;
1711		if (level != 0) {
1712			c_data->compact_levels++;
1713			c_data->compact_pages_free++;
1714			if (c_data->compact_truncate > ppgno)
1715				c_data->compact_truncate--;
1716			if (c_data->compact_pages != 0)
1717				c_data->compact_pages--;
1718		}
1719	}
1720
1721err:	return (ret);
1722}
1723
1724/*
1725 * __bam_merge_internal --
1726 *	Merge internal nodes of the tree.
1727 */
1728static int
1729__bam_merge_internal(dbc, ndbc, level, c_data, merged)
1730	DBC *dbc, *ndbc;
1731	int level;
1732	DB_COMPACT *c_data;
1733	int *merged;
1734{
1735	BINTERNAL bi, *bip, *fip;
1736	BTREE_CURSOR *cp, *ncp;
1737	DB *dbp;
1738	DBT data, hdr;
1739	DB_MPOOLFILE *dbmp;
1740	EPG *epg, *save_csp, *nsave_csp;
1741	PAGE *pg, *npg;
1742	RINTERNAL *rk;
1743	db_indx_t first, indx, pind;
1744	db_pgno_t ppgno;
1745	int32_t nrecs, trecs;
1746	u_int16_t size;
1747	u_int32_t freespace, pfree;
1748	int ret;
1749
1750	COMPQUIET(bip, NULL);
1751	COMPQUIET(ppgno, PGNO_INVALID);
1752	DB_ASSERT(NULL, dbc != NULL);
1753	DB_ASSERT(NULL, ndbc != NULL);
1754
1755	/*
1756	 * ndbc will contain the the dominating parent of the subtree.
1757	 * dbc will have the tree containing the left child.
1758	 *
1759	 * The stacks descend to the leaf level.
1760	 * If this is a recno tree then both stacks will start at the root.
1761	 */
1762	dbp = dbc->dbp;
1763	dbmp = dbp->mpf;
1764	cp = (BTREE_CURSOR *)dbc->internal;
1765	ncp = (BTREE_CURSOR *)ndbc->internal;
1766	*merged = 0;
1767	ret = 0;
1768
1769	/*
1770	 * Set the stacks to the level requested.
1771	 * Save the old value to restore when we exit.
1772	 */
1773	save_csp = cp->csp;
1774	cp->csp = &cp->csp[-level + 1];
1775	pg = cp->csp->page;
1776	pind = NUM_ENT(pg);
1777
1778	nsave_csp = ncp->csp;
1779	ncp->csp = &ncp->csp[-level + 1];
1780	npg = ncp->csp->page;
1781	indx = NUM_ENT(npg);
1782
1783	/*
1784	 * The caller may have two stacks that include common ancestors, we
1785	 * check here for convenience.
1786	 */
1787	if (npg == pg)
1788		goto done;
1789
1790	if (TYPE(pg) == P_IBTREE) {
1791		/*
1792		 * Check for overflow keys on both pages while we have
1793		 * them locked.
1794		 */
1795		 if ((ret =
1796		      __bam_truncate_internal_overflow(dbc, pg, c_data)) != 0)
1797			goto err;
1798		 if ((ret =
1799		      __bam_truncate_internal_overflow(dbc, npg, c_data)) != 0)
1800			goto err;
1801	}
1802
1803	/*
1804	 * If we are about to move data off the left most page of an
1805	 * internal node we will need to update its parents, make sure there
1806	 * will be room for the new key on all the parents in the stack.
1807	 * If not, move less data.
1808	 */
1809	fip = NULL;
1810	if (TYPE(pg) == P_IBTREE) {
1811		/* See where we run out of space. */
1812		freespace = P_FREESPACE(dbp, pg);
1813		/*
1814		 * The leftmost key of an internal page is not accurate.
1815		 * Go up the tree to find a non-leftmost parent.
1816		 */
1817		epg = ncp->csp;
1818		while (--epg >= ncp->sp && epg->indx == 0)
1819			continue;
1820		fip = bip = GET_BINTERNAL(dbp, epg->page, epg->indx);
1821		epg = ncp->csp;
1822
1823		for (indx = 0;;) {
1824			size = BINTERNAL_PSIZE(bip->len);
1825			if (size > freespace)
1826				break;
1827			freespace -= size;
1828			if (++indx >= NUM_ENT(npg))
1829				break;
1830			bip = GET_BINTERNAL(dbp, npg, indx);
1831		}
1832
1833		/* See if we are deleting the page and we are not left most. */
1834		if (indx == NUM_ENT(npg) && epg[-1].indx != 0)
1835			goto fits;
1836
1837		pfree = dbp->pgsize;
1838		for (epg--; epg >= ncp->sp; epg--)
1839			if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) {
1840				bip = GET_BINTERNAL(dbp, epg->page, epg->indx);
1841				/* Add back in the key we will be deleting. */
1842				freespace += BINTERNAL_PSIZE(bip->len);
1843				if (freespace < pfree)
1844					pfree = freespace;
1845				if (epg->indx != 0)
1846					break;
1847			}
1848		epg = ncp->csp;
1849
1850		/* If we are at the end of the page we will delete it. */
1851		if (indx == NUM_ENT(npg)) {
1852			if (NUM_ENT(epg[-1].page) == 1)
1853				goto fits;
1854			bip =
1855			     GET_BINTERNAL(dbp, epg[-1].page, epg[-1].indx + 1);
1856		} else
1857			bip = GET_BINTERNAL(dbp, npg, indx);
1858
1859		/* Back up until we have a key that fits. */
1860		while (indx != 0 && BINTERNAL_PSIZE(bip->len) > pfree) {
1861			indx--;
1862			bip = GET_BINTERNAL(dbp, npg, indx);
1863		}
1864		if (indx == 0)
1865			goto done;
1866	}
1867
1868fits:	memset(&bi, 0, sizeof(bi));
1869	memset(&hdr, 0, sizeof(hdr));
1870	memset(&data, 0, sizeof(data));
1871	trecs = 0;
1872
1873	/*
1874	 * Copy data between internal nodes till one is full
1875	 * or the other is empty.
1876	 */
1877	first = 0;
1878	nrecs = 0;
1879	do {
1880		if (dbc->dbtype == DB_BTREE) {
1881			bip = GET_BINTERNAL(dbp, npg, 0);
1882			size = fip == NULL ?
1883			     BINTERNAL_SIZE(bip->len) :
1884			     BINTERNAL_SIZE(fip->len);
1885			if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t))
1886				break;
1887
1888			if (fip == NULL) {
1889				data.size = bip->len;
1890				data.data = bip->data;
1891			} else {
1892				data.size = fip->len;
1893				data.data = fip->data;
1894			}
1895			bi.len = data.size;
1896			B_TSET(bi.type, bip->type);
1897			bi.pgno = bip->pgno;
1898			bi.nrecs = bip->nrecs;
1899			hdr.data = &bi;
1900			hdr.size = SSZA(BINTERNAL, data);
1901			if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))
1902				nrecs = (int32_t)bip->nrecs;
1903		} else {
1904			rk = GET_RINTERNAL(dbp, npg, 0);
1905			size = RINTERNAL_SIZE;
1906			if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t))
1907				break;
1908
1909			hdr.data = rk;
1910			hdr.size = size;
1911			nrecs = (int32_t)rk->nrecs;
1912		}
1913		/*
1914		 * Try to lock the subtree leaf records without waiting.
1915		 * We must lock the subtree below the record we are merging
1916		 * and the one after it since that is were a search will wind
1917		 * up if it has already looked at our parent.  After the first
1918		 * move we have the current subtree already locked.
1919		 * If we merged any records then we will revisit this
1920		 * node when we merge its leaves.  If not we will return
1921		 * NOTGRANTED and our caller will do a retry.  We only
1922		 * need to do this if we are in a transation. If not then
1923		 * we cannot abort and things will be hosed up on error
1924		 * anyway.
1925		 */
1926		if (dbc->txn != NULL && (ret = __bam_lock_tree(ndbc,
1927		    ncp->csp, nsave_csp, first,
1928		    NUM_ENT(ncp->csp->page) == 1 ? 1 : 2)) != 0) {
1929			if (ret != DB_LOCK_NOTGRANTED)
1930				goto err;
1931			break;
1932		}
1933		first = 1;
1934		if ((ret = __db_pitem(dbc, pg, pind, size, &hdr, &data)) != 0)
1935			goto err;
1936		pind++;
1937		if (fip != NULL) {
1938			/* reset size to be for the record being deleted. */
1939			size = BINTERNAL_SIZE(bip->len);
1940			fip = NULL;
1941		}
1942		if ((ret = __db_ditem(ndbc, npg, 0, size)) != 0)
1943			goto err;
1944		*merged = 1;
1945		trecs += nrecs;
1946	} while (--indx != 0);
1947
1948	if (!*merged)
1949		goto done;
1950
1951	if (trecs != 0) {
1952		cp->csp--;
1953		ret = __bam_adjust(dbc, trecs);
1954		if (ret != 0)
1955			goto err;
1956		cp->csp++;
1957		ncp->csp--;
1958		if ((ret = __bam_adjust(ndbc, -trecs)) != 0)
1959			goto err;
1960		ncp->csp++;
1961	}
1962
1963	/*
1964	 * Either we emptied the page or we need to update its
1965	 * parent to reflect the first page we now point to.
1966	 * First get rid of the bottom of the stack,
1967	 * bam_dpages will clear the stack.  Maintain transactional
1968	 * locks on the leaf pages to protect changes at this level.
1969	 */
1970	do {
1971		if ((ret = __memp_fput(dbmp, dbc->thread_info,
1972		    nsave_csp->page, dbc->priority)) != 0)
1973			goto err;
1974		nsave_csp->page = NULL;
1975		if ((ret = __TLPUT(dbc, nsave_csp->lock)) != 0)
1976			goto err;
1977		LOCK_INIT(nsave_csp->lock);
1978		nsave_csp--;
1979	} while (nsave_csp != ncp->csp);
1980
1981	if (NUM_ENT(npg) == 0)  {
1982		/*
1983		 * __bam_dpages may decide to collapse the tree
1984		 * so we need to free our other stack.  The tree
1985		 * will change in hight and our stack will nolonger
1986		 * be valid.
1987		 */
1988		cp->csp = save_csp;
1989		cp->sp->page = NULL;
1990		LOCK_INIT(cp->sp->lock);
1991		if (PGNO(ncp->sp->page) == ncp->root &&
1992		    NUM_ENT(ncp->sp->page) == 2) {
1993			if ((ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0)
1994				goto err;
1995			level = LEVEL(ncp->sp->page);
1996			ppgno = PGNO(ncp->csp[-1].page);
1997		} else
1998			level = 0;
1999
2000		if (c_data->compact_truncate > PGNO(npg))
2001			c_data->compact_truncate--;
2002		ret = __bam_dpages(ndbc,
2003		     0, ndbc->dbtype == DB_RECNO ?
2004		     BTD_RELINK : BTD_UPDATE | BTD_RELINK);
2005		c_data->compact_pages_free++;
2006		if (ret == 0 && level != 0) {
2007			if ((ret = __memp_fget(dbmp, &ncp->root,
2008			    dbc->thread_info, dbc->txn, 0, &npg)) != 0)
2009				goto err;
2010			if (level == LEVEL(npg))
2011				level = 0;
2012			if ((ret = __memp_fput(dbmp,
2013			    dbc->thread_info, npg, dbc->priority)) != 0)
2014				goto err;
2015			npg = NULL;
2016			if (level != 0) {
2017				c_data->compact_levels++;
2018				c_data->compact_pages_free++;
2019				if (c_data->compact_truncate > ppgno)
2020					c_data->compact_truncate--;
2021				if (c_data->compact_pages != 0)
2022					c_data->compact_pages--;
2023			}
2024		}
2025	} else {
2026		ret = __bam_pupdate(ndbc, npg);
2027
2028		if (NUM_ENT(npg) != 0 &&
2029		    c_data->compact_truncate != PGNO_INVALID &&
2030		    PGNO(npg) > c_data->compact_truncate &&
2031		    ncp->csp != ncp->sp) {
2032			if ((ret = __bam_truncate_page(ndbc, &npg, pg, 1)) != 0)
2033				goto err;
2034		}
2035		if (c_data->compact_truncate != PGNO_INVALID &&
2036		     PGNO(pg) > c_data->compact_truncate && cp->csp != cp->sp) {
2037			if ((ret = __bam_truncate_page(dbc, &pg, npg, 1)) != 0)
2038				goto err;
2039		}
2040	}
2041	cp->csp = save_csp;
2042
2043	return (ret);
2044
2045done:
2046err:	cp->csp = save_csp;
2047	ncp->csp = nsave_csp;
2048
2049	return (ret);
2050}
2051
2052/*
2053 * __bam_compact_dups -- try to compress off page dup trees.
2054 * We may or may not have a write lock on this page.
2055 */
2056static int
2057__bam_compact_dups(dbc, ppg, factor, have_lock, c_data, donep)
2058	DBC *dbc;
2059	PAGE **ppg;
2060	u_int32_t factor;
2061	int have_lock;
2062	DB_COMPACT *c_data;
2063	int *donep;
2064{
2065	BOVERFLOW *bo;
2066	BTREE_CURSOR *cp;
2067	DB *dbp;
2068	DBC *opd;
2069	DBT start;
2070	DB_MPOOLFILE *dbmp;
2071	ENV *env;
2072	PAGE *dpg, *pg;
2073	db_indx_t i;
2074	db_pgno_t pgno;
2075	int isdone, level, ret, span, t_ret;
2076
2077	span = 0;
2078	ret = 0;
2079	opd = NULL;
2080
2081	DB_ASSERT(NULL, dbc != NULL);
2082	dbp = dbc->dbp;
2083	env = dbp->env;
2084	dbmp = dbp->mpf;
2085	cp = (BTREE_CURSOR *)dbc->internal;
2086	pg = *ppg;
2087
2088	for (i = 0; i <  NUM_ENT(pg); i++) {
2089		bo = GET_BOVERFLOW(dbp, pg, i);
2090		if (B_TYPE(bo->type) == B_KEYDATA)
2091			continue;
2092		c_data->compact_pages_examine++;
2093		if (bo->pgno > c_data->compact_truncate) {
2094			(*donep)++;
2095			if (!have_lock) {
2096				/*
2097				 * The caller should have the page at
2098				 * least read locked.  Drop the buffer
2099				 * and get the write lock.
2100				 */
2101				pgno = PGNO(pg);
2102				if ((ret = __memp_fput(dbmp, dbc->thread_info,
2103				     pg, dbc->priority)) != 0)
2104					goto err;
2105				*ppg = NULL;
2106				if ((ret = __db_lget(dbc, 0, pgno,
2107				     DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0)
2108					goto err;
2109				have_lock = 1;
2110				if ((ret = __memp_fget(dbmp, &pgno,
2111				    dbc->thread_info,
2112				    dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0)
2113					goto err;
2114				pg = *ppg;
2115			}
2116			if ((ret =
2117			     __bam_truncate_root_page(dbc, pg, i, c_data)) != 0)
2118				goto err;
2119			/* Just in case it should move.  Could it? */
2120			bo = GET_BOVERFLOW(dbp, pg, i);
2121		}
2122
2123		if (B_TYPE(bo->type) == B_OVERFLOW) {
2124			if ((ret = __bam_truncate_overflow(dbc,
2125			    bo->pgno, have_lock ? NULL : ppg, c_data)) != 0)
2126				goto err;
2127			(*donep)++;
2128			continue;
2129		}
2130		/*
2131		 * Take a peek at the root.  If it's a leaf then
2132		 * there is no tree here, avoid all the trouble.
2133		 */
2134		if ((ret = __memp_fget(dbmp, &bo->pgno,
2135		     dbc->thread_info, dbc->txn, 0, &dpg)) != 0)
2136			goto err;
2137
2138		level = dpg->level;
2139		if ((ret = __memp_fput(dbmp,
2140		     dbc->thread_info, dpg, dbc->priority)) != 0)
2141			goto err;
2142		if (level == LEAFLEVEL)
2143			continue;
2144		if ((ret = __dbc_newopd(dbc, bo->pgno, NULL, &opd)) != 0)
2145			return (ret);
2146		if (!have_lock) {
2147			/*
2148			 * The caller should have the page at
2149			 * least read locked.  Drop the buffer
2150			 * and get the write lock.
2151			 */
2152			pgno = PGNO(pg);
2153			if ((ret = __memp_fput(dbmp, dbc->thread_info,
2154			     pg, dbc->priority)) != 0)
2155				goto err;
2156			*ppg = NULL;
2157			if ((ret = __db_lget(dbc, 0, pgno,
2158			     DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0)
2159				goto err;
2160			have_lock = 1;
2161			if ((ret = __memp_fget(dbmp, &pgno,
2162			    dbc->thread_info,
2163			    dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0)
2164				goto err;
2165			pg = *ppg;
2166		}
2167		(*donep)++;
2168		memset(&start, 0, sizeof(start));
2169		do {
2170			if ((ret = __bam_compact_int(opd, &start,
2171			     NULL, factor, &span, c_data, &isdone)) != 0)
2172				break;
2173		} while (!isdone);
2174
2175		if (start.data != NULL)
2176			__os_free(env, start.data);
2177
2178		if (ret != 0)
2179			goto err;
2180
2181		ret = __dbc_close(opd);
2182		opd = NULL;
2183		if (ret != 0)
2184			goto err;
2185	}
2186
2187err:	if (opd != NULL && (t_ret = __dbc_close(opd)) != 0 && ret == 0)
2188		ret = t_ret;
2189	return (ret);
2190}
2191
2192/*
2193 * __bam_truncate_page -- swap a page with a lower numbered page.
2194 *	The cusor has a stack which includes at least the
2195 * immediate parent of this page.
2196 */
2197static int
2198__bam_truncate_page(dbc, pgp, opg, update_parent)
2199	DBC *dbc;
2200	PAGE **pgp, *opg;
2201	int update_parent;
2202{
2203	BTREE_CURSOR *cp;
2204	DB *dbp;
2205	DBT data, hdr;
2206	DB_LSN lsn;
2207	DB_LOCK lock;
2208	EPG *epg;
2209	PAGE *newpage;
2210	db_pgno_t newpgno, oldpgno, *pgnop;
2211	int ret;
2212
2213	DB_ASSERT(NULL, dbc != NULL);
2214	dbp = dbc->dbp;
2215	LOCK_INIT(lock);
2216
2217	/*
2218	 * We want to free a page that lives in the part of the file that
2219	 * can be truncated, so we're going to move it onto a free page
2220	 * that is in the part of the file that need not be truncated.
2221	 * Since the freelist is ordered now, we can simply call __db_new
2222	 * which will grab the first element off the freelist; we know this
2223	 * is the lowest numbered free page.
2224	 */
2225	if ((ret = __db_new(dbc, P_DONTEXTEND | TYPE(*pgp),
2226	     TYPE(*pgp) == P_LBTREE ? &lock : NULL, &newpage)) != 0)
2227		return (ret);
2228
2229	/*
2230	 * If newpage is null then __db_new would have had to allocate
2231	 * a new page from the filesystem, so there is no reason
2232	 * to continue this action.
2233	 */
2234	if (newpage == NULL)
2235		return (0);
2236
2237	/*
2238	 * It is possible that a higher page is allocated if other threads
2239	 * are allocating at the same time, if so, just put it back.
2240	 */
2241	if (PGNO(newpage) > PGNO(*pgp)) {
2242		/* Its unfortunate but you can't just free a new overflow. */
2243		if (TYPE(newpage) == P_OVERFLOW)
2244			OV_LEN(newpage) = 0;
2245		if ((ret = __LPUT(dbc, lock)) != 0)
2246			return (ret);
2247		return (__db_free(dbc, newpage));
2248	}
2249
2250	/* Log if necessary. */
2251	if (DBC_LOGGING(dbc)) {
2252		memset(&hdr, 0, sizeof(hdr));
2253		hdr.data = *pgp;
2254		hdr.size = P_OVERHEAD(dbp);
2255		memset(&data, 0, sizeof(data));
2256		if (TYPE(*pgp) == P_OVERFLOW) {
2257			data.data = (u_int8_t *)*pgp + P_OVERHEAD(dbp);
2258			data.size = OV_LEN(*pgp);
2259		} else {
2260			data.data = (u_int8_t *)*pgp + HOFFSET(*pgp);
2261			data.size = dbp->pgsize - HOFFSET(*pgp);
2262			hdr.size += NUM_ENT(*pgp) * sizeof(db_indx_t);
2263		}
2264		if ((ret = __bam_merge_log(dbp, dbc->txn,
2265		      &LSN(newpage), 0, PGNO(newpage), &LSN(newpage),
2266		      PGNO(*pgp), &LSN(*pgp), &hdr, &data, 1)) != 0)
2267			goto err;
2268	} else
2269		LSN_NOT_LOGGED(LSN(newpage));
2270
2271	oldpgno = PGNO(*pgp);
2272	newpgno = PGNO(newpage);
2273	lsn = LSN(newpage);
2274	memcpy(newpage, *pgp, dbp->pgsize);
2275	PGNO(newpage) = newpgno;
2276	LSN(newpage) = lsn;
2277
2278	/* Empty the old page. */
2279	if ((ret = __memp_dirty(dbp->mpf,
2280	    pgp, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
2281		goto err;
2282	if (TYPE(*pgp) == P_OVERFLOW)
2283		OV_LEN(*pgp) = 0;
2284	else {
2285		HOFFSET(*pgp) = dbp->pgsize;
2286		NUM_ENT(*pgp) = 0;
2287	}
2288	LSN(*pgp) = lsn;
2289
2290	/* Update siblings. */
2291	switch (TYPE(newpage)) {
2292	case P_OVERFLOW:
2293	case P_LBTREE:
2294	case P_LRECNO:
2295	case P_LDUP:
2296		if (NEXT_PGNO(newpage) == PGNO_INVALID &&
2297		    PREV_PGNO(newpage) == PGNO_INVALID)
2298			break;
2299		if ((ret = __bam_relink(dbc, *pgp, opg, PGNO(newpage))) != 0)
2300			goto err;
2301		break;
2302	default:
2303		break;
2304	}
2305	cp = (BTREE_CURSOR *)dbc->internal;
2306
2307	/*
2308	 * Now, if we free this page, it will get truncated, when we free
2309	 * all the pages after it in the file.
2310	 */
2311	ret = __db_free(dbc, *pgp);
2312	/* db_free always puts the page. */
2313	*pgp = newpage;
2314
2315	if (ret != 0)
2316		return (ret);
2317
2318	if (!update_parent)
2319		goto done;
2320
2321	/* Update the parent. */
2322	epg = &cp->csp[-1];
2323
2324	switch (TYPE(epg->page)) {
2325	case P_IBTREE:
2326		pgnop = &GET_BINTERNAL(dbp, epg->page, epg->indx)->pgno;
2327		break;
2328	case P_IRECNO:
2329		pgnop = &GET_RINTERNAL(dbp, epg->page, epg->indx)->pgno;
2330		break;
2331	default:
2332		pgnop = &GET_BOVERFLOW(dbp, epg->page, epg->indx)->pgno;
2333		break;
2334	}
2335	DB_ASSERT(dbp->env, oldpgno == *pgnop);
2336	if (DBC_LOGGING(dbc)) {
2337		if ((ret = __bam_pgno_log(dbp, dbc->txn, &LSN(epg->page),
2338		    0, PGNO(epg->page), &LSN(epg->page), (u_int32_t)epg->indx,
2339		    *pgnop, PGNO(newpage))) != 0)
2340			return (ret);
2341	} else
2342		LSN_NOT_LOGGED(LSN(epg->page));
2343
2344	*pgnop = PGNO(newpage);
2345	cp->csp->page = newpage;
2346	if ((ret = __TLPUT(dbc, lock)) != 0)
2347		return (ret);
2348
2349done:	return (0);
2350
2351err:	(void)__memp_fput(dbp->mpf, dbc->thread_info, newpage, dbc->priority);
2352	(void)__TLPUT(dbc, lock);
2353	return (ret);
2354}
2355
2356/*
2357 * __bam_truncate_overflow -- find overflow pages to truncate.
2358 *	Walk the pages of an overflow chain and swap out
2359 * high numbered pages.  We are passed the first page
2360 * but only deal with the second and subsequent pages.
2361 */
2362
2363static int
2364__bam_truncate_overflow(dbc, pgno, ppg, c_data)
2365	DBC *dbc;
2366	db_pgno_t pgno;
2367	PAGE **ppg;
2368	DB_COMPACT *c_data;
2369{
2370	DB *dbp;
2371	DB_LOCK lock;
2372	PAGE *page;
2373	db_pgno_t ppgno;
2374	int have_lock, ret, t_ret;
2375
2376	dbp = dbc->dbp;
2377	page = NULL;
2378	LOCK_INIT(lock);
2379	have_lock = ppg == NULL;
2380
2381	if ((ret = __memp_fget(dbp->mpf, &pgno,
2382	     dbc->thread_info, dbc->txn, 0, &page)) != 0)
2383		return (ret);
2384
2385	while ((pgno = NEXT_PGNO(page)) != PGNO_INVALID) {
2386		if ((ret = __memp_fput(dbp->mpf,
2387		     dbc->thread_info, page, dbc->priority)) != 0)
2388			return (ret);
2389		if ((ret = __memp_fget(dbp->mpf, &pgno,
2390		    dbc->thread_info, dbc->txn, 0, &page)) != 0)
2391			return (ret);
2392		if (pgno <= c_data->compact_truncate)
2393			continue;
2394		if (have_lock == 0) {
2395			ppgno = PGNO(*ppg);
2396			if ((ret = __memp_fput(dbp->mpf, dbc->thread_info,
2397			     *ppg, dbc->priority)) != 0)
2398				goto err;
2399			*ppg = NULL;
2400			if ((ret = __db_lget(dbc, 0, ppgno,
2401			     DB_LOCK_WRITE, 0, &lock)) != 0)
2402				goto err;
2403			if ((ret = __memp_fget(dbp->mpf, &ppgno,
2404			    dbc->thread_info,
2405			    dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0)
2406				goto err;
2407			have_lock = 1;
2408		}
2409		if ((ret = __bam_truncate_page(dbc, &page, NULL, 0)) != 0)
2410			break;
2411	}
2412
2413err:	if (page != NULL &&
2414	    (t_ret = __memp_fput( dbp->mpf,
2415	    dbc->thread_info, page, dbc->priority)) != 0 && ret == 0)
2416		ret = t_ret;
2417	if ((t_ret = __TLPUT(dbc, lock)) != 0 && ret == 0)
2418		ret = t_ret;
2419	return (ret);
2420}
2421
2422/*
2423 * __bam_truncate_root_page -- swap a page which is
2424 *    the root of an off page dup tree or the head of an overflow.
2425 * The page is reference by the pg/indx passed in.
2426 */
2427static int
2428__bam_truncate_root_page(dbc, pg, indx, c_data)
2429	DBC *dbc;
2430	PAGE *pg;
2431	u_int32_t indx;
2432	DB_COMPACT *c_data;
2433{
2434	BINTERNAL *bi;
2435	BOVERFLOW *bo;
2436	DB *dbp;
2437	DBT orig;
2438	PAGE *page;
2439	db_pgno_t newpgno, *pgnop;
2440	int ret, t_ret;
2441
2442	COMPQUIET(c_data, NULL);
2443	COMPQUIET(bo, NULL);
2444	COMPQUIET(newpgno, PGNO_INVALID);
2445	dbp = dbc->dbp;
2446	page = NULL;
2447	if (TYPE(pg) == P_IBTREE) {
2448		bi = GET_BINTERNAL(dbp, pg, indx);
2449		if (B_TYPE(bi->type) == B_OVERFLOW) {
2450			bo = (BOVERFLOW *)(bi->data);
2451			pgnop = &bo->pgno;
2452		} else
2453			pgnop = &bi->pgno;
2454	} else {
2455		bo = GET_BOVERFLOW(dbp, pg, indx);
2456		pgnop = &bo->pgno;
2457	}
2458
2459	DB_ASSERT(dbp->env, IS_DIRTY(pg));
2460
2461	if ((ret = __memp_fget(dbp->mpf, pgnop,
2462	     dbc->thread_info, dbc->txn, 0, &page)) != 0)
2463		goto err;
2464
2465	/*
2466	 * If this is a multiply reference overflow key, then we will just
2467	 * copy it and decrement the reference count.  This is part of a
2468	 * fix to get rid of multiple references.
2469	 */
2470	if (TYPE(page) == P_OVERFLOW && OV_REF(page) > 1) {
2471		if ((ret = __db_ovref(dbc, bo->pgno)) != 0)
2472			goto err;
2473		memset(&orig, 0, sizeof(orig));
2474		if ((ret = __db_goff(dbc, &orig, bo->tlen, bo->pgno,
2475		    &orig.data, &orig.size)) == 0)
2476			ret = __db_poff(dbc, &orig, &newpgno);
2477		if (orig.data != NULL)
2478			__os_free(dbp->env, orig.data);
2479		if (ret != 0)
2480			goto err;
2481	} else {
2482		if ((ret = __bam_truncate_page(dbc, &page, NULL, 0)) != 0)
2483			goto err;
2484		newpgno = PGNO(page);
2485		/* If we could not allocate from the free list, give up.*/
2486		if (newpgno == *pgnop)
2487			goto err;
2488	}
2489
2490	/* Update the reference. */
2491	if (DBC_LOGGING(dbc)) {
2492		if ((ret = __bam_pgno_log(dbp,
2493		     dbc->txn, &LSN(pg), 0, PGNO(pg),
2494		     &LSN(pg), (u_int32_t)indx, *pgnop, newpgno)) != 0)
2495			goto err;
2496	} else
2497		LSN_NOT_LOGGED(LSN(pg));
2498
2499	*pgnop = newpgno;
2500
2501err:	if (page != NULL && (t_ret =
2502	      __memp_fput(dbp->mpf, dbc->thread_info,
2503		  page, dbc->priority)) != 0 && ret == 0)
2504		ret = t_ret;
2505	return (ret);
2506}
2507
2508/*
2509 * -- bam_truncate_internal_overflow -- find overflow keys
2510 *	on internal pages and if they have high page
2511 * numbers swap them with lower pages and truncate them.
2512 * Note that if there are overflow keys in the internal
2513 * nodes they will get copied adding pages to the database.
2514 */
2515static int
2516__bam_truncate_internal_overflow(dbc, page, c_data)
2517	DBC *dbc;
2518	PAGE *page;
2519	DB_COMPACT *c_data;
2520{
2521	BINTERNAL *bi;
2522	BOVERFLOW *bo;
2523	db_indx_t indx;
2524	int ret;
2525
2526	COMPQUIET(bo, NULL);
2527	ret = 0;
2528	for (indx = 0; indx < NUM_ENT(page); indx++) {
2529		bi = GET_BINTERNAL(dbc->dbp, page, indx);
2530		if (B_TYPE(bi->type) != B_OVERFLOW)
2531			continue;
2532		bo = (BOVERFLOW *)(bi->data);
2533		if (bo->pgno > c_data->compact_truncate && (ret =
2534		     __bam_truncate_root_page(dbc, page, indx, c_data)) != 0)
2535			break;
2536		if ((ret = __bam_truncate_overflow(
2537		     dbc, bo->pgno, NULL, c_data)) != 0)
2538			break;
2539	}
2540	return (ret);
2541}
2542
2543/*
2544 * __bam_compact_isdone ---
2545 *
2546 * Check to see if the stop key specified by the caller is on the
2547 * current page, in which case we are done compacting.
2548 */
2549static int
2550__bam_compact_isdone(dbc, stop, pg, isdone)
2551	DBC *dbc;
2552	DBT *stop;
2553	PAGE *pg;
2554	int *isdone;
2555{
2556	db_recno_t recno;
2557	BTREE *t;
2558	BTREE_CURSOR *cp;
2559	int cmp, ret;
2560
2561	*isdone = 0;
2562	cp = (BTREE_CURSOR *)dbc->internal;
2563	t = dbc->dbp->bt_internal;
2564
2565	if (dbc->dbtype == DB_RECNO) {
2566		if ((ret = __ram_getno(dbc, stop, &recno, 0)) != 0)
2567			return (ret);
2568		*isdone = cp->recno > recno;
2569	} else {
2570		DB_ASSERT(dbc->dbp->env, TYPE(pg) == P_LBTREE);
2571		if ((ret = __bam_cmp(dbc, stop, pg, 0,
2572		    t->bt_compare, &cmp)) != 0)
2573			return (ret);
2574
2575		*isdone = cmp <= 0;
2576	}
2577	return (0);
2578}
2579
2580/*
2581 * Lock the subtrees from the top of the stack.
2582 *	The 0'th child may be in the stack and locked otherwise iterate
2583 * through the records by calling __bam_lock_subtree.
2584 */
2585static int
2586__bam_lock_tree(dbc, sp, csp, start, stop)
2587	DBC *dbc;
2588	EPG *sp, *csp;
2589	u_int32_t start, stop;
2590{
2591	PAGE *cpage;
2592	db_pgno_t pgno;
2593	int ret;
2594
2595	if (dbc->dbtype == DB_RECNO)
2596		pgno = GET_RINTERNAL(dbc->dbp, sp->page, 0)->pgno;
2597	else
2598		pgno = GET_BINTERNAL(dbc->dbp, sp->page, 0)->pgno;
2599	cpage = (sp + 1)->page;
2600	/*
2601	 * First recurse down the left most sub tree if it is in the cursor
2602	 * stack.  We already have these pages latched and locked if its a
2603	 * leaf.
2604	 */
2605	if (start == 0 && sp + 1 != csp && pgno == PGNO(cpage) &&
2606	    (ret = __bam_lock_tree(dbc, sp + 1, csp, 0, NUM_ENT(cpage))) != 0)
2607		return (ret);
2608
2609	/*
2610	 * Then recurse on the other records on the page if needed.
2611	 * If the page is in the stack then its already locked or
2612	 * was processed above.
2613	 */
2614	if (start == 0 && pgno == PGNO(cpage))
2615		start = 1;
2616
2617	if (start == stop)
2618		return (0);
2619	return (__bam_lock_subtree(dbc, sp->page, start, stop));
2620
2621}
2622
2623/*
2624 * Lock the subtree from the current node.
2625 */
2626static int
2627__bam_lock_subtree(dbc, page, indx, stop)
2628	DBC *dbc;
2629	PAGE *page;
2630	u_int32_t indx, stop;
2631{
2632	DB *dbp;
2633	DB_LOCK lock;
2634	PAGE *cpage;
2635	db_pgno_t pgno;
2636	int ret, t_ret;
2637
2638	dbp = dbc->dbp;
2639
2640	for (; indx < stop; indx++) {
2641		if (dbc->dbtype == DB_RECNO)
2642			pgno = GET_RINTERNAL(dbc->dbp, page, indx)->pgno;
2643		else
2644			pgno = GET_BINTERNAL(dbc->dbp, page, indx)->pgno;
2645		if (LEVEL(page) - 1 == LEAFLEVEL) {
2646			if ((ret = __db_lget(dbc, 0, pgno,
2647			     DB_LOCK_WRITE, DB_LOCK_NOWAIT, &lock)) != 0) {
2648				if (ret == DB_LOCK_DEADLOCK)
2649					return (DB_LOCK_NOTGRANTED);
2650				return (ret);
2651			}
2652		} else {
2653			if ((ret = __memp_fget(dbp->mpf, &pgno,
2654			     dbc->thread_info, dbc->txn, 0, &cpage)) != 0)
2655				return (ret);
2656			ret = __bam_lock_subtree(dbc, cpage, 0, NUM_ENT(cpage));
2657			if ((t_ret = __memp_fput(dbp->mpf, dbc->thread_info,
2658			    cpage, dbc->priority)) != 0 && ret == 0)
2659				ret = t_ret;
2660			if (ret != 0)
2661				return (ret);
2662		}
2663	}
2664	return (0);
2665}
2666
2667#ifdef HAVE_FTRUNCATE
2668/*
2669 * __bam_savekey -- save the key from an internal page.
2670 *  We need to save information so that we can
2671 * fetch then next internal node of the tree.  This means
2672 * we need the btree key on this current page, or the
2673 * next record number.
2674 */
2675static int
2676__bam_savekey(dbc, next, start)
2677	DBC *dbc;
2678	int next;
2679	DBT *start;
2680{
2681	BINTERNAL *bi;
2682	BKEYDATA *bk;
2683	BOVERFLOW *bo;
2684	BTREE_CURSOR *cp;
2685	DB *dbp;
2686	DB_LOCK lock;
2687	ENV *env;
2688	PAGE *pg;
2689	RINTERNAL *ri;
2690	db_indx_t indx, top;
2691	db_pgno_t pgno, saved_pgno;
2692	int ret, t_ret;
2693	u_int32_t len;
2694	u_int8_t *data;
2695	int level;
2696
2697	dbp = dbc->dbp;
2698	env = dbp->env;
2699	cp = (BTREE_CURSOR *)dbc->internal;
2700	pg = cp->csp->page;
2701	ret = 0;
2702
2703	if (dbc->dbtype == DB_RECNO) {
2704		if (next)
2705			for (indx = 0, top = NUM_ENT(pg); indx != top; indx++) {
2706				ri = GET_RINTERNAL(dbp, pg, indx);
2707				cp->recno += ri->nrecs;
2708			}
2709		return (__db_retcopy(env, start, &cp->recno,
2710		     sizeof(cp->recno), &start->data, &start->ulen));
2711
2712	}
2713
2714	bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1);
2715	data = bi->data;
2716	len = bi->len;
2717	LOCK_INIT(lock);
2718	saved_pgno = PGNO_INVALID;
2719	/* If there is single record on the page it may have an empty key. */
2720	while (len == 0) {
2721		/*
2722		 * We should not have an empty data page, since we just
2723		 * compacted things, check anyway and punt.
2724		 */
2725		if (NUM_ENT(pg) == 0)
2726			goto no_key;
2727		pgno = bi->pgno;
2728		level = LEVEL(pg);
2729		if (pg != cp->csp->page &&
2730		    (ret = __memp_fput(dbp->mpf,
2731			 dbc->thread_info, pg, dbc->priority)) != 0) {
2732			pg = NULL;
2733			goto err;
2734		}
2735		if (level - 1 == LEAFLEVEL) {
2736			TRY_LOCK(dbc, pgno, saved_pgno,
2737			    lock, DB_LOCK_READ, retry);
2738			if (ret != 0)
2739				goto err;
2740		}
2741		if ((ret = __memp_fget(dbp->mpf, &pgno,
2742		     dbc->thread_info, dbc->txn, 0, &pg)) != 0)
2743			goto err;
2744
2745		/*
2746		 * At the data level use the last key to try and avoid the
2747		 * possibility that the user has a zero length key, if they
2748		 * do, we punt.
2749		 */
2750		if (pg->level == LEAFLEVEL) {
2751			bk = GET_BKEYDATA(dbp, pg, NUM_ENT(pg) - 2);
2752			data = bk->data;
2753			len = bk->len;
2754			if (len == 0) {
2755no_key:				__db_errx(env,
2756				    "Compact cannot handle zero length key");
2757				ret = DB_NOTFOUND;
2758				goto err;
2759			}
2760		} else {
2761			bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1);
2762			data = bi->data;
2763			len = bi->len;
2764		}
2765	}
2766	if (B_TYPE(bi->type) == B_OVERFLOW) {
2767		bo = (BOVERFLOW *)(data);
2768		ret = __db_goff(dbc, start, bo->tlen, bo->pgno,
2769		    &start->data, &start->ulen);
2770	}
2771	else
2772		ret = __db_retcopy(env,
2773		     start, data, len,  &start->data, &start->ulen);
2774
2775err:	if (pg != NULL && pg != cp->csp->page &&
2776	    (t_ret = __memp_fput(dbp->mpf, dbc->thread_info,
2777		 pg, dbc->priority)) != 0 && ret == 0)
2778		ret = t_ret;
2779	return (ret);
2780
2781retry:	return (DB_LOCK_NOTGRANTED);
2782}
2783
2784/*
2785 * bam_truncate_internal --
2786 *	Find high numbered pages in the internal nodes of a tree and
2787 *	swap them.
2788 */
2789static int
2790__bam_truncate_internal(dbp, ip, txn, c_data)
2791	DB *dbp;
2792	DB_THREAD_INFO *ip;
2793	DB_TXN *txn;
2794	DB_COMPACT *c_data;
2795{
2796	BTREE_CURSOR *cp;
2797	DBC *dbc;
2798	DBT start;
2799	DB_LOCK meta_lock;
2800	PAGE *pg;
2801	db_pgno_t pgno;
2802	u_int32_t sflag;
2803	int level, local_txn, ret, t_ret;
2804
2805	dbc = NULL;
2806	memset(&start, 0, sizeof(start));
2807
2808	if (IS_DB_AUTO_COMMIT(dbp, txn)) {
2809		local_txn = 1;
2810		txn = NULL;
2811	} else
2812		local_txn = 0;
2813
2814	level = LEAFLEVEL + 1;
2815	sflag = CS_READ | CS_GETRECNO;
2816	LOCK_INIT(meta_lock);
2817
2818new_txn:
2819	if (local_txn &&
2820	    (ret = __txn_begin(dbp->env, ip, NULL, &txn, 0)) != 0)
2821		goto err;
2822
2823	if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
2824		goto err;
2825	cp = (BTREE_CURSOR *)dbc->internal;
2826
2827	/*
2828	 * If the the root is a leaf we have nothing to do.
2829	 * Searching an empty RECNO tree will return NOTFOUND below and loop.
2830	 */
2831	if ((ret = __memp_fget(dbp->mpf, &cp->root, ip, txn, 0, &pg)) != 0)
2832		goto err;
2833	if (LEVEL(pg) == LEAFLEVEL) {
2834		ret = __memp_fput(dbp->mpf, ip, pg, dbp->priority);
2835		goto err;
2836	}
2837	if ((ret = __memp_fput(dbp->mpf, ip, pg, dbp->priority)) != 0)
2838		goto err;
2839
2840	pgno = PGNO_INVALID;
2841	do {
2842		if ((ret = __bam_csearch(dbc, &start, sflag, level)) != 0) {
2843			/* No more at this level, go up one. */
2844			if (ret == DB_NOTFOUND) {
2845				level++;
2846				if (start.data != NULL)
2847					__os_free(dbp->env, start.data);
2848				memset(&start, 0, sizeof(start));
2849				sflag = CS_READ | CS_GETRECNO;
2850				continue;
2851			}
2852			goto err;
2853		}
2854		c_data->compact_pages_examine++;
2855
2856		pg = cp->csp->page;
2857		pgno = PGNO(pg);
2858
2859		sflag = CS_NEXT | CS_GETRECNO;
2860		/* Grab info about the page and drop the stack. */
2861		if (pgno != cp->root && (ret = __bam_savekey(dbc,
2862		    pgno <= c_data->compact_truncate, &start)) != 0) {
2863		    	if (ret == DB_LOCK_NOTGRANTED)
2864				continue;
2865			goto err;
2866		}
2867
2868		if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0)
2869			goto err;
2870		if (pgno == cp->root)
2871			break;
2872
2873		if (pgno <= c_data->compact_truncate)
2874			continue;
2875
2876		/* Get the meta page lock before latching interior nodes. */
2877		if (!LOCK_ISSET(meta_lock) && (ret = __db_lget(dbc,
2878		     0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &meta_lock)) != 0)
2879			goto err;
2880
2881		/* Reget the page with a write latch, and its parent too. */
2882		if ((ret = __bam_csearch(dbc,
2883		    &start, CS_PARENT | CS_GETRECNO, level)) != 0) {
2884			if (ret == DB_NOTFOUND) {
2885				ret = 0;
2886			}
2887			goto err;
2888		}
2889		pg = cp->csp->page;
2890		pgno = PGNO(pg);
2891
2892		if (pgno > c_data->compact_truncate) {
2893			if ((ret = __bam_truncate_page(dbc, &pg, NULL, 1)) != 0)
2894				goto err;
2895			if (pgno == PGNO(pg)) {
2896				/* We could not allocate.  Give up. */
2897				pgno = cp->root;
2898			}
2899		}
2900
2901		if ((ret = __bam_stkrel(dbc,
2902		     pgno > c_data->compact_truncate ? 0 : STK_NOLOCK)) != 0)
2903			goto err;
2904
2905		/* We are locking subtrees, so drop the write locks asap. */
2906		if (local_txn && pgno > c_data->compact_truncate)
2907			break;
2908	} while (pgno != cp->root);
2909
2910	if ((ret = __dbc_close(dbc)) != 0)
2911		goto err;
2912	dbc = NULL;
2913	if (local_txn) {
2914		if ((ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0)
2915			goto err;
2916		txn = NULL;
2917		LOCK_INIT(meta_lock);
2918	}
2919	if (pgno != ((BTREE *)dbp->bt_internal)->bt_root)
2920		goto new_txn;
2921
2922err:	if (txn != NULL && ret != 0)
2923		sflag = STK_PGONLY;
2924	else
2925		sflag = 0;
2926	if (txn == NULL)
2927		if ((t_ret = __LPUT(dbc, meta_lock)) != 0 && ret == 0)
2928			ret = t_ret;
2929	if (dbc != NULL && (t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0)
2930		ret = t_ret;
2931	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2932		ret = t_ret;
2933	if (local_txn &&
2934	    txn != NULL && (t_ret = __txn_abort(txn)) != 0 && ret == 0)
2935		ret = t_ret;
2936	if (start.data != NULL)
2937		__os_free(dbp->env, start.data);
2938	return (ret);
2939}
2940
2941static int
2942__bam_setup_freelist(dbp, list, nelems)
2943	DB *dbp;
2944	db_pglist_t *list;
2945	u_int32_t nelems;
2946{
2947	DB_MPOOLFILE *mpf;
2948	db_pgno_t *plist;
2949	int ret;
2950
2951	mpf = dbp->mpf;
2952
2953	if ((ret = __memp_alloc_freelist(mpf, nelems, &plist)) != 0)
2954		return (ret);
2955
2956	while (nelems-- != 0)
2957		*plist++ = list++->pgno;
2958
2959	return (0);
2960}
2961
2962static int
2963__bam_free_freelist(dbp, ip, txn)
2964	DB *dbp;
2965	DB_THREAD_INFO *ip;
2966	DB_TXN *txn;
2967{
2968	DBC *dbc;
2969	DB_LOCK lock;
2970	int auto_commit, ret, t_ret;
2971
2972	LOCK_INIT(lock);
2973	auto_commit = ret = 0;
2974
2975	/*
2976	 * If we are not in a transaction then we need to get
2977	 * a lock on the meta page, otherwise we should already
2978	 * have the lock.
2979	 */
2980
2981	dbc = NULL;
2982	if (IS_DB_AUTO_COMMIT(dbp, txn)) {
2983		/*
2984		 * We must not timeout the lock or we will not free the list.
2985		 * We ignore errors from txn_begin as there is little that
2986		 * the application can do with the error and we want to
2987		 * get the lock and free the list if at all possible.
2988		 */
2989		if (__txn_begin(dbp->env, ip, NULL, &txn, 0) == 0) {
2990			(void)__lock_set_timeout(dbp->env,
2991			    txn->locker, 0, DB_SET_TXN_TIMEOUT);
2992			(void)__lock_set_timeout(dbp->env,
2993			    txn->locker, 0, DB_SET_LOCK_TIMEOUT);
2994			auto_commit = 1;
2995		}
2996		/* Get a cursor so we can call __db_lget. */
2997		if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
2998			return (ret);
2999
3000		if ((ret = __db_lget(dbc,
3001		     0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &lock)) != 0)
3002			goto err;
3003	}
3004
3005	ret = __memp_free_freelist(dbp->mpf);
3006
3007err:	if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
3008		ret = t_ret;
3009
3010	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
3011		ret = t_ret;
3012
3013	if (auto_commit && __txn_abort(txn) != 0 && ret == 0)
3014		ret = t_ret;
3015
3016	return (ret);
3017}
3018#endif
3019