1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1999,2008 Oracle.  All rights reserved.
5 *
6 * $Id: bt_compact.c,v 12.80 2008/05/13 04:40:52 ubell Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/lock.h"
15#include "dbinc/log.h"
16#include "dbinc/mp.h"
17#include "dbinc/txn.h"
18
19static int __bam_compact_dups __P((DBC *,
20     PAGE **, u_int32_t, int, DB_COMPACT *, int *));
21static int __bam_compact_int __P((DBC *,
22     DBT *, DBT *, u_int32_t, int *, DB_COMPACT *, int *));
23static int __bam_compact_isdone __P((DBC *, DBT *, PAGE *, int *));
24static int __bam_csearch __P((DBC *, DBT *, u_int32_t, int));
25static int __bam_merge __P((DBC *,
26     DBC *,  u_int32_t, DBT *, DB_COMPACT *,int *));
27static int __bam_merge_internal __P((DBC *, DBC *, int, DB_COMPACT *, int *));
28static int __bam_merge_pages __P((DBC *, DBC *, DB_COMPACT *));
29static int __bam_merge_records __P((DBC *, DBC*,  u_int32_t, DB_COMPACT *));
30static int __bam_truncate_internal_overflow __P((DBC *, PAGE *, DB_COMPACT *));
31static int __bam_truncate_overflow __P((DBC *,
32     db_pgno_t, db_pgno_t, DB_COMPACT *));
33static int __bam_truncate_page __P((DBC *, PAGE **, int));
34static int __bam_truncate_root_page __P((DBC *,
35     PAGE *, u_int32_t, DB_COMPACT *));
36
37#ifdef HAVE_FTRUNCATE
38static int __bam_free_freelist __P((DB *, DB_THREAD_INFO *, DB_TXN *));
39static int __bam_savekey __P((DBC *, int, DBT *));
40static int __bam_setup_freelist __P((DB *, db_pglist_t *, u_int32_t));
41static int __bam_truncate_internal __P((DB *,
42     DB_THREAD_INFO *, DB_TXN *, DB_COMPACT *));
43#endif
44
45#define	SAVE_START							\
46	do {								\
47		save_data = *c_data;					\
48		ret = __db_retcopy(env,				\
49		     &save_start, current.data, current.size,		\
50		     &save_start.data, &save_start.ulen);		\
51	} while (0)
52
53/*
54 * Only restore those things that are negated by aborting the
55 * transaction.  We don't restore the number of deadlocks, for example.
56 */
57
58#define	RESTORE_START							\
59	do {								\
60		c_data->compact_pages_free =				\
61		      save_data.compact_pages_free;			\
62		c_data->compact_levels = save_data.compact_levels;	\
63		c_data->compact_truncate = save_data.compact_truncate;	\
64		ret = __db_retcopy(env, &current,			\
65		     save_start.data, save_start.size,			\
66		     &current.data, &current.ulen);			\
67	} while (0)
68/*
69 * __bam_compact -- compact a btree.
70 *
71 * PUBLIC: int __bam_compact __P((DB *, DB_THREAD_INFO *, DB_TXN *,
72 * PUBLIC:     DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *));
73 */
74int
75__bam_compact(dbp, ip, txn, start, stop, c_data, flags, end)
76	DB *dbp;
77	DB_THREAD_INFO *ip;
78	DB_TXN *txn;
79	DBT *start, *stop;
80	DB_COMPACT *c_data;
81	u_int32_t flags;
82	DBT *end;
83{
84	DBC *dbc;
85	DBT current, save_start;
86	DB_COMPACT save_data;
87	ENV *env;
88	u_int32_t factor;
89	int deadlock, isdone, ret, span, t_ret, txn_local;
90
91#ifdef HAVE_FTRUNCATE
92	db_pglist_t *list;
93	db_pgno_t last_pgno;
94	u_int32_t nelems, truncated;
95#endif
96
97	env = dbp->env;
98
99	memset(&current, 0, sizeof(current));
100	memset(&save_start, 0, sizeof(save_start));
101	dbc = NULL;
102	factor = 0;
103	deadlock = isdone = ret = span = 0;
104
105#ifdef HAVE_FTRUNCATE
106	list = NULL;
107	last_pgno = 0;
108	nelems = truncated = 0;
109#endif
110
111	/*
112	 * We pass "current" to the internal routine, indicating where that
113	 * routine should begin its work and expecting that it will return to
114	 * us the last key that it processed.
115	 */
116	if (start != NULL && (ret = __db_retcopy(env,
117	     &current, start->data, start->size,
118	     &current.data, &current.ulen)) != 0)
119		return (ret);
120
121	if (IS_DB_AUTO_COMMIT(dbp, txn))
122		txn_local = 1;
123	else
124		txn_local = 0;
125	if (!LF_ISSET(DB_FREE_SPACE | DB_FREELIST_ONLY))
126		goto no_free;
127	if (LF_ISSET(DB_FREELIST_ONLY))
128		LF_SET(DB_FREE_SPACE);
129
130#ifdef HAVE_FTRUNCATE
131	/* Sort the freelist and set up the in-memory list representation. */
132	if (txn_local && (ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0)
133		goto err;
134
135	if ((ret = __db_free_truncate(dbp, ip,
136	     txn, flags, c_data, &list, &nelems, &last_pgno)) != 0) {
137		LF_CLR(DB_FREE_SPACE);
138		goto terr;
139	}
140
141	/* If the freelist is empty and we are not filling, get out. */
142	if (nelems == 0 && LF_ISSET(DB_FREELIST_ONLY)) {
143		ret = 0;
144		LF_CLR(DB_FREE_SPACE);
145		goto terr;
146	}
147	if ((ret = __bam_setup_freelist(dbp, list, nelems)) != 0) {
148		/* Someone else owns the free list. */
149		if (ret == EBUSY)
150			ret = 0;
151	}
152
153	/* Commit the txn and release the meta page lock. */
154terr:	if (txn_local) {
155		if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0)
156			ret = t_ret;
157		txn = NULL;
158	}
159	if (ret != 0)
160		goto err;
161
162	/* Save the number truncated so far, we will add what we get below. */
163	truncated = c_data->compact_pages_truncated;
164	if (LF_ISSET(DB_FREELIST_ONLY))
165		goto done;
166#endif
167
168	/*
169	 * We want factor to be the target number of free bytes on each page,
170	 * so we know when to stop adding items to a page.   Make sure to
171	 * subtract the page overhead when computing this target.  This can
172	 * result in a 1-2% error on the smallest page.
173	 * First figure out how many bytes we should use:
174	 */
175no_free:
176	factor = dbp->pgsize - SIZEOF_PAGE;
177	if (c_data->compact_fillpercent != 0) {
178		factor *= c_data->compact_fillpercent;
179		factor /= 100;
180	}
181	/* Now convert to the number of free bytes to target. */
182	factor = (dbp->pgsize - SIZEOF_PAGE) - factor;
183
184	if (c_data->compact_pages == 0)
185		c_data->compact_pages = DB_MAX_PAGES;
186
187	do {
188		deadlock = 0;
189
190		SAVE_START;
191		if (ret != 0)
192			break;
193
194		if (txn_local) {
195			if ((ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0)
196				break;
197
198			if (c_data->compact_timeout != 0 &&
199			    (ret = __txn_set_timeout(txn,
200			    c_data->compact_timeout, DB_SET_LOCK_TIMEOUT)) != 0)
201				goto err;
202		}
203
204		if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
205			goto err;
206
207		if ((ret = __bam_compact_int(dbc, &current, stop, factor,
208		     &span, c_data, &isdone)) ==
209		     DB_LOCK_DEADLOCK && txn_local) {
210			/*
211			 * We retry on deadlock.  Cancel the statistics
212			 * and reset the start point to before this
213			 * iteration.
214			 */
215			deadlock = 1;
216			c_data->compact_deadlock++;
217			RESTORE_START;
218		}
219
220		if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
221			ret = t_ret;
222
223err:		if (txn_local && txn != NULL) {
224			if (ret == 0 && deadlock == 0)
225				ret = __txn_commit(txn, DB_TXN_NOSYNC);
226			else if ((t_ret = __txn_abort(txn)) != 0 && ret == 0)
227				ret = t_ret;
228			txn = NULL;
229		}
230	} while (ret == 0 && !isdone);
231
232	if (ret == 0 && end != NULL)
233		ret = __db_retcopy(env, end, current.data, current.size,
234		    &end->data, &end->ulen);
235	if (current.data != NULL)
236		__os_free(env, current.data);
237	if (save_start.data != NULL)
238		__os_free(env, save_start.data);
239
240#ifdef HAVE_FTRUNCATE
241	/*
242	 * Finish up truncation work.  If there are pages left in the free
243	 * list then search the internal nodes of the tree as we may have
244	 * missed some while walking the leaf nodes.  Then calculate how
245	 * many pages we have truncated and release the in-memory free list.
246	 */
247done:	if (LF_ISSET(DB_FREE_SPACE)) {
248		DBMETA *meta;
249		db_pgno_t pgno;
250
251		pgno = PGNO_BASE_MD;
252		isdone = 1;
253		if (ret == 0 && !LF_ISSET(DB_FREELIST_ONLY) && (t_ret =
254		    __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) {
255			isdone = meta->free == PGNO_INVALID;
256			ret = __memp_fput(dbp->mpf, ip, meta, dbp->priority);
257		}
258
259		if (!isdone)
260			ret = __bam_truncate_internal(dbp, ip, txn, c_data);
261
262		/* Clean up the free list. */
263		if (list != NULL)
264			__os_free(env, list);
265
266		if ((t_ret =
267		    __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) {
268			c_data->compact_pages_truncated =
269			    truncated + last_pgno - meta->last_pgno;
270			if ((t_ret = __memp_fput(dbp->mpf, ip,
271			    meta, dbp->priority)) != 0 && ret == 0)
272				ret = t_ret;
273		} else if (ret == 0)
274			ret = t_ret;
275
276		if ((t_ret =
277		    __bam_free_freelist(dbp, ip, txn)) != 0 && ret == 0)
278			t_ret = ret;
279	}
280#endif
281
282	return (ret);
283}
284
285/*
286 * __bam_csearch -- isolate search code for bam_compact.
287 * This routine hides the differences between searching
288 * a BTREE and a RECNO from the rest of the code.
289 */
290#define	CS_READ	0	/* We are just reading. */
291#define	CS_PARENT	1	/* We want the parent too, write lock. */
292#define	CS_NEXT		2	/* Get the next page. */
293#define	CS_NEXT_WRITE	3	/* Get the next page and write lock. */
294#define	CS_DEL		4	/* Get a stack to delete a page. */
295#define	CS_START	5	/* Starting level for stack, write lock. */
296#define	CS_GETRECNO     0x80	/* Extract record number from start. */
297
298static int
299__bam_csearch(dbc, start, sflag, level)
300	DBC *dbc;
301	DBT *start;
302	u_int32_t sflag;
303	int level;
304{
305	BTREE_CURSOR *cp;
306	int not_used, ret;
307
308	cp = (BTREE_CURSOR *)dbc->internal;
309
310	if (dbc->dbtype == DB_RECNO) {
311		/* If GETRECNO is not set the cp->recno is what we want. */
312		if (FLD_ISSET(sflag, CS_GETRECNO)) {
313			if (start == NULL || start->size == 0)
314				cp->recno = 1;
315			else if ((ret =
316			     __ram_getno(dbc, start, &cp->recno, 0)) != 0)
317				return (ret);
318			FLD_CLR(sflag, CS_GETRECNO);
319		}
320		switch (sflag) {
321		case CS_READ:
322			sflag = SR_READ;
323			break;
324		case CS_NEXT:
325			sflag = SR_PARENT | SR_READ;
326			break;
327		case CS_START:
328			level = LEAFLEVEL;
329			/* FALLTHROUGH */
330		case CS_DEL:
331		case CS_NEXT_WRITE:
332			sflag = SR_STACK;
333			break;
334		case CS_PARENT:
335			sflag = SR_PARENT | SR_WRITE;
336			break;
337		default:
338			return (__env_panic(dbc->env, EINVAL));
339		}
340		if ((ret = __bam_rsearch(dbc,
341		     &cp->recno, sflag, level, &not_used)) != 0)
342			return (ret);
343		/* Reset the cursor's recno to the beginning of the page. */
344		cp->recno -= cp->csp->indx;
345	} else {
346		FLD_CLR(sflag, CS_GETRECNO);
347		switch (sflag) {
348		case CS_READ:
349			sflag = SR_READ | SR_DUPFIRST;
350			break;
351		case CS_DEL:
352			sflag = SR_DEL;
353			break;
354		case CS_NEXT:
355			sflag = SR_NEXT;
356			break;
357		case CS_NEXT_WRITE:
358			sflag = SR_NEXT | SR_WRITE;
359			break;
360		case CS_START:
361			sflag = SR_START | SR_WRITE;
362			break;
363		case CS_PARENT:
364			sflag = SR_PARENT | SR_WRITE;
365			break;
366		default:
367			return (__env_panic(dbc->env, EINVAL));
368		}
369		if (start == NULL || start->size == 0)
370			FLD_SET(sflag, SR_MIN);
371
372		if ((ret = __bam_search(dbc,
373		     cp->root, start, sflag, level, NULL, &not_used)) != 0)
374			return (ret);
375	}
376
377	return (0);
378}
379
380/*
381 * __bam_compact_int -- internal compaction routine.
382 *	Called either with a cursor on the main database
383 * or a cursor initialized to the root of an off page duplicate
384 * tree.
385 */
386static int
387__bam_compact_int(dbc, start, stop, factor, spanp, c_data, donep)
388	DBC *dbc;
389	DBT *start, *stop;
390	u_int32_t factor;
391	int *spanp;
392	DB_COMPACT *c_data;
393	int *donep;
394{
395	BTREE_CURSOR *cp, *ncp;
396	DB *dbp;
397	DBC *ndbc;
398	DB_LOCK nolock;
399	DB_MPOOLFILE *dbmp;
400	ENV *env;
401	EPG *epg;
402	PAGE *pg, *ppg, *npg;
403	db_pgno_t npgno;
404	db_recno_t next_recno;
405	u_int32_t sflag;
406	int check_dups, check_trunc, isdone, level;
407	int merged, nentry, next_p, pgs_done, ret, t_ret, tdone;
408
409#ifdef	DEBUG
410#define	CTRACE(dbc, location, t, start, f) do {				\
411		DBT __trace;						\
412		DB_SET_DBT(__trace, t, strlen(t));			\
413		DEBUG_LWRITE(						\
414		    dbc, (dbc)->txn, location, &__trace, start, f)	\
415	} while (0)
416#define	PTRACE(dbc, location, p, start, f) do {				\
417		char __buf[32];						\
418		(void)snprintf(__buf,					\
419		    sizeof(__buf), "pgno: %lu", (u_long)p);		\
420		CTRACE(dbc, location, __buf, start, f);			\
421	} while (0)
422#else
423#define	CTRACE(dbc, location, t, start, f)
424#define	PTRACE(dbc, location, p, start, f)
425#endif
426
427	ndbc = NULL;
428	pg = NULL;
429	npg = NULL;
430	isdone = 0;
431	tdone = 0;
432	pgs_done = 0;
433	next_recno = 0;
434	next_p = 0;
435	LOCK_INIT(nolock);
436	check_trunc = c_data->compact_truncate != PGNO_INVALID;
437	check_dups = (!F_ISSET(dbc, DBC_OPD) &&
438	     F_ISSET(dbc->dbp, DB_AM_DUP)) || check_trunc;
439
440	dbp = dbc->dbp;
441	env = dbp->env;
442	dbmp = dbp->mpf;
443	cp = (BTREE_CURSOR *)dbc->internal;
444
445	/* Search down the tree for the starting point. */
446	if ((ret = __bam_csearch(dbc,
447	    start, CS_READ | CS_GETRECNO, LEAFLEVEL)) != 0) {
448		/* Its not an error to compact an empty db. */
449		if (ret == DB_NOTFOUND)
450			ret = 0;
451		isdone = 1;
452		goto err;
453	}
454
455	/*
456	 * Get the first leaf page. The loop below will change pg so
457	 * we clear the stack reference so we don't put a a page twice.
458	 */
459	pg = cp->csp->page;
460	cp->csp->page = NULL;
461	next_recno = cp->recno;
462next:	/*
463	 * This is the start of the main compaction loop.  There are 3
464	 * parts to the process:
465	 * 1) Walk the leaf pages of the tree looking for a page to
466	 *	process.  We do this with read locks.  Save the
467	 *	key from the page and release it.
468	 * 2) Set up a cursor stack which will write lock the page
469	 *	and enough of its ancestors to get the job done.
470	 *	This could go to the root if we might delete a subtree
471	 *	or we have record numbers to update.
472	 * 3) Loop fetching pages after the above page and move enough
473	 *	data to fill it.
474	 * We exit the loop if we are at the end of the leaf pages, are
475	 * about to lock a new subtree (we span) or on error.
476	 */
477
478	/* Walk the pages looking for something to fill up. */
479	while ((npgno = NEXT_PGNO(pg)) != PGNO_INVALID) {
480		c_data->compact_pages_examine++;
481		PTRACE(dbc, "Next", PGNO(pg), start, 0);
482
483		/* If we have fetched the next page, get the new key. */
484		if (next_p == 1 &&
485		    dbc->dbtype != DB_RECNO && NUM_ENT(pg) != 0) {
486			if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn, pg,
487			     0, start, &start->data, &start->ulen)) != 0)
488				goto err;
489		}
490		next_recno += NUM_ENT(pg);
491		if (P_FREESPACE(dbp, pg) > factor ||
492		     (check_trunc && PGNO(pg) > c_data->compact_truncate))
493			break;
494		if (stop != NULL && stop->size > 0) {
495			if ((ret = __bam_compact_isdone(dbc,
496			    stop, pg, &isdone)) != 0)
497				goto err;
498			if (isdone)
499				goto done;
500		}
501
502		/*
503		 * The page does not need more data or to be swapped,
504		 * check to see if we want to look at possible duplicate
505		 * trees or overflow records and the move on to the next page.
506		 */
507		cp->recno += NUM_ENT(pg);
508		next_p = 1;
509		tdone = pgs_done;
510		PTRACE(dbc, "Dups", PGNO(pg), start, 0);
511		if (check_dups && (ret = __bam_compact_dups(
512		     dbc, &pg, factor, 0, c_data, &pgs_done)) != 0)
513			goto err;
514		npgno = NEXT_PGNO(pg);
515		if ((ret = __memp_fput(dbmp,
516		     dbc->thread_info, pg, dbc->priority)) != 0)
517			goto err;
518		pg = NULL;
519		/*
520		 * If we don't do anything we don't need to hold
521		 * the lock on the previous page, so couple always.
522		 */
523		if ((ret = __db_lget(dbc,
524		    tdone == pgs_done ? LCK_COUPLE_ALWAYS : LCK_COUPLE,
525		    npgno, DB_LOCK_READ, 0, &cp->csp->lock)) != 0)
526			goto err;
527		if ((ret = __memp_fget(dbmp, &npgno,
528		     dbc->thread_info, dbc->txn, 0, &pg)) != 0)
529			goto err;
530	}
531
532	/*
533	 * When we get here we have 3 cases:
534	 * 1) We've reached the end of the leaf linked list and are done.
535	 * 2) A page whose freespace exceeds our target and therefore needs
536	 *	to have data added to it.
537	 * 3) A page that doesn't have too much free space but needs to be
538	 *	checked for truncation.
539	 * In both cases 2 and 3, we need that page's first key or record
540	 * number.  We may already have it, if not get it here.
541	 */
542	if ((nentry = NUM_ENT(pg)) != 0) {
543		next_p = 0;
544		/* Get a copy of the first recno on the page. */
545		if (dbc->dbtype == DB_RECNO) {
546			if ((ret = __db_retcopy(dbp->env, start,
547			     &cp->recno, sizeof(cp->recno),
548			     &start->data, &start->ulen)) != 0)
549				goto err;
550		} else if (start->size == 0 &&
551		     (ret = __db_ret(dbp, dbc->thread_info, dbc->txn, pg,
552		     0, start, &start->data, &start->ulen)) != 0)
553			goto err;
554
555		if (npgno == PGNO_INVALID) {
556			/* End of the tree, check its duplicates and exit. */
557			PTRACE(dbc, "GoDone", PGNO(pg), start, 0);
558			if (check_dups && (ret = __bam_compact_dups(dbc,
559			   &pg, factor, 0, c_data, &pgs_done)) != 0)
560				goto err;
561			c_data->compact_pages_examine++;
562			isdone = 1;
563			goto done;
564		}
565	}
566
567	/* Release the page so we don't deadlock getting its parent. */
568	if ((ret = __memp_fput(dbmp, dbc->thread_info, pg, dbc->priority)) != 0)
569		goto err;
570	if ((ret = __LPUT(dbc, cp->csp->lock)) != 0)
571		goto err;
572	BT_STK_CLR(cp);
573	pg = NULL;
574
575	/*
576	 * Setup the cursor stack. There are 3 cases:
577	 * 1) the page is empty and will be deleted: nentry == 0.
578	 * 2) the next page has the same parent: *spanp == 0.
579	 * 3) the next page has a different parent: *spanp == 1.
580	 *
581	 * We now need to search the tree again, getting a write lock
582	 * on the page we are going to merge or delete.  We do this by
583	 * searching down the tree and locking as much of the subtree
584	 * above the page as needed.  In the case of a delete we will
585	 * find the maximal subtree that can be deleted. In the case
586	 * of merge if the current page and the next page are siblings
587	 * with the same parent then we only need to lock the parent.
588	 * Otherwise *span will be set and we need to search to find the
589	 * lowest common ancestor.  Dbc will be set to contain the subtree
590	 * containing the page to be merged or deleted. Ndbc will contain
591	 * the minimal subtree containing that page and its next sibling.
592	 * In all cases for DB_RECNO we simplify things and get the whole
593	 * tree if we need more than a single parent.
594	 */
595
596	/* Case 1 -- page is empty. */
597	if (nentry == 0) {
598		CTRACE(dbc, "Empty", "", start, 0);
599		if (next_p == 1)
600			sflag = CS_NEXT_WRITE;
601		else
602			sflag = CS_DEL;
603		if ((ret = __bam_csearch(dbc, start, sflag, LEAFLEVEL)) != 0)
604			goto err;
605
606		pg = cp->csp->page;
607		/* Check to see if the page is still empty. */
608		if (NUM_ENT(pg) != 0)
609			npgno = PGNO(pg);
610		else {
611			npgno = NEXT_PGNO(pg);
612			/* If this is now the root, we are very done. */
613			if (PGNO(pg) == cp->root)
614				isdone = 1;
615			else {
616				if ((ret = __bam_dpages(dbc, 0, 0)) != 0)
617					goto err;
618				c_data->compact_pages_free++;
619				goto next_no_release;
620			}
621		}
622		goto next_page;
623	}
624
625	/* case 3 -- different parents. */
626	if (*spanp) {
627		CTRACE(dbc, "Span", "", start, 0);
628		if (ndbc == NULL && (ret = __dbc_dup(dbc, &ndbc, 0)) != 0)
629			goto err;
630		ncp = (BTREE_CURSOR *)ndbc->internal;
631		ncp->recno = next_recno;
632		/*
633		 * Search the tree looking for the next page after the
634		 * current key.  For RECNO get the whole stack.
635		 * For BTREE the return will contain the stack that
636		 * dominates both the current and next pages.
637		 */
638		if ((ret = __bam_csearch(ndbc, start, CS_NEXT_WRITE, 0)) != 0)
639			goto err;
640
641		/*
642		 * We need the stacks to be the same height
643		 * so that we can merge parents.
644		 */
645		level = LEVEL(ncp->sp->page);
646		sflag = CS_START;
647		if ((ret = __bam_csearch(dbc, start, sflag, level)) != 0)
648			goto err;
649		pg = cp->csp->page;
650
651		*spanp = 0;
652		/*
653		 * The page may have emptied while we waited for the
654		 * lock or the record we are looking for may have
655		 * moved.
656		 * Reset npgno so we re-get this page when we go back
657		 * to the top.
658		 */
659		if (NUM_ENT(pg) == 0 ||
660		     (dbc->dbtype == DB_RECNO &&
661		     NEXT_PGNO(cp->csp->page) != PGNO(ncp->csp->page))) {
662			npgno = PGNO(pg);
663			goto next_page;
664		}
665
666		if (check_trunc && PGNO(pg) > c_data->compact_truncate) {
667			pgs_done++;
668			/* Get a fresh low numbered page. */
669			if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0)
670				goto err1;
671		}
672		if ((ret = __memp_dirty(dbp->mpf, &ncp->csp->page,
673		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
674			goto err1;
675		PTRACE(dbc, "SDups", PGNO(ncp->csp->page), start, 0);
676		if (check_dups && (ret = __bam_compact_dups(ndbc,
677		     &ncp->csp->page, factor, 1, c_data, &pgs_done)) != 0)
678			goto err1;
679
680		/* Check to see if the tree collapsed. */
681		if (PGNO(ncp->csp->page) == ncp->root)
682			goto done;
683
684		if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page,
685		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
686			goto err1;
687		pg = cp->csp->page;
688		npgno = NEXT_PGNO(pg);
689		PTRACE(dbc, "SDups", PGNO(pg), start, 0);
690		if (check_dups && (ret =
691		     __bam_compact_dups(dbc, &cp->csp->page,
692		     factor, 1, c_data, &pgs_done)) != 0)
693			goto err1;
694
695		/*
696		 * We may have dropped our locks, check again
697		 * to see if we still need to fill this page and
698		 * we are in a spanning situation.
699		 */
700
701		if (P_FREESPACE(dbp, pg) <= factor ||
702		     cp->csp[-1].indx != NUM_ENT(cp->csp[-1].page) - 1)
703			goto next_page;
704
705		/*
706		 * Try to move things into a single parent.
707		 */
708		merged = 0;
709		for (epg = cp->sp; epg != cp->csp; epg++) {
710			if (PGNO(epg->page) == cp->root)
711				continue;
712			PTRACE(dbc, "PMerge", PGNO(epg->page), start, 0);
713			if ((ret = __bam_merge_internal(dbc,
714			       ndbc, LEVEL(epg->page), c_data, &merged)) != 0)
715				goto err1;
716			if (merged)
717				break;
718		}
719
720		/* If we merged the parent, then we nolonger span. */
721		if (merged) {
722			pgs_done++;
723			if (cp->csp->page == NULL)
724				goto deleted;
725			npgno = PGNO(pg);
726			goto next_page;
727		}
728		PTRACE(dbc, "SMerge", PGNO(cp->csp->page), start, 0);
729		npgno = NEXT_PGNO(ncp->csp->page);
730		if ((ret = __bam_merge(dbc,
731		     ndbc, factor, stop, c_data, &isdone)) != 0)
732			goto err1;
733		pgs_done++;
734		/*
735		 * __bam_merge could have freed our stack if it
736		 * deleted a page possibly collapsing the tree.
737		 */
738		if (cp->csp->page == NULL)
739			goto deleted;
740		cp->recno += NUM_ENT(pg);
741
742		/* If we did not bump to the next page something did not fit. */
743		if (npgno != NEXT_PGNO(pg)) {
744			npgno = NEXT_PGNO(pg);
745			goto next_page;
746		}
747	} else {
748		/* Case 2 -- same parents. */
749		CTRACE(dbc, "Sib", "", start, 0);
750		if ((ret =
751		    __bam_csearch(dbc, start, CS_PARENT, LEAFLEVEL)) != 0)
752			goto err;
753
754		pg = cp->csp->page;
755		npgno = PGNO(pg);
756
757		/* We now have a write lock, recheck the page. */
758		if ((nentry = NUM_ENT(pg)) == 0)
759			goto next_page;
760
761		if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page,
762		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
763			goto err1;
764		pg = cp->csp->page;
765
766		npgno = NEXT_PGNO(pg);
767
768		/* Check duplicate trees, we have a write lock on the page. */
769		PTRACE(dbc, "SibDup", PGNO(pg), start, 0);
770		if (check_dups && (ret =
771		     __bam_compact_dups(dbc, &cp->csp->page,
772		     factor, 1, c_data, &pgs_done)) != 0)
773			goto err1;
774		pg = cp->csp->page;
775
776		/* Check to see if the tree collapsed. */
777		if (PGNO(pg) == cp->root)
778			goto err1;
779		DB_ASSERT(env, cp->csp - cp->sp == 1);
780
781		if (check_trunc && PGNO(pg) > c_data->compact_truncate) {
782			pgs_done++;
783			/* Get a fresh low numbered page. */
784			if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0)
785				goto err1;
786		}
787
788		/* After re-locking check to see if we still need to fill. */
789		if (P_FREESPACE(dbp, pg) <= factor)
790			goto next_page;
791
792		/* If they have the same parent, just dup the cursor */
793		if (ndbc != NULL && (ret = __dbc_close(ndbc)) != 0)
794			goto err1;
795		if ((ret = __dbc_dup(dbc, &ndbc, DB_POSITION)) != 0)
796			goto err1;
797		ncp = (BTREE_CURSOR *)ndbc->internal;
798
799		/*
800		 * ncp->recno needs to have the recno of the next page.
801		 * Bump it by the number of records on the current page.
802		 */
803		ncp->recno += NUM_ENT(pg);
804	}
805
806	/* Fetch pages until we fill this one. */
807	while (!isdone && npgno != PGNO_INVALID &&
808	     P_FREESPACE(dbp, pg) > factor && c_data->compact_pages != 0) {
809		/*
810		 * If our current position is the last one on a parent
811		 * page, then we are about to merge across different
812		 * internal nodes.  Thus, we need to lock higher up
813		 * in the tree.  We will exit the routine and commit
814		 * what we have done so far.  Set spanp so we know
815		 * we are in this case when we come back.
816		 */
817		if (cp->csp[-1].indx == NUM_ENT(cp->csp[-1].page) - 1) {
818			*spanp = 1;
819			npgno = PGNO(pg);
820			next_recno = cp->recno;
821			goto next_page;
822		}
823
824		/* Lock and get the next page. */
825		if ((ret = __db_lget(dbc, LCK_COUPLE,
826		     npgno, DB_LOCK_WRITE, 0, &ncp->lock)) != 0)
827			goto err1;
828		if ((ret = __memp_fget(dbmp, &npgno,
829		    dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &npg)) != 0)
830			goto err1;
831
832		/* Fix up the next page cursor with its parent node. */
833		if ((ret = __memp_fget(dbmp, &PGNO(cp->csp[-1].page),
834		    dbc->thread_info, dbc->txn, 0, &ppg)) != 0)
835			goto err1;
836		BT_STK_PUSH(env, ncp, ppg,
837		     cp->csp[-1].indx + 1, nolock, DB_LOCK_NG, ret);
838		if (ret != 0)
839			goto err1;
840
841		/* Put the page on the stack. */
842		BT_STK_ENTER(env, ncp, npg, 0, ncp->lock, DB_LOCK_WRITE, ret);
843
844		LOCK_INIT(ncp->lock);
845		npg = NULL;
846
847		c_data->compact_pages_examine++;
848
849		PTRACE(dbc, "MDups", PGNO(ncp->csp->page), start, 0);
850		if (check_dups && (ret = __bam_compact_dups(ndbc,
851		     &ncp->csp->page, factor, 1, c_data, &pgs_done)) != 0)
852			goto err1;
853
854		npgno = NEXT_PGNO(ncp->csp->page);
855		/*
856		 * Merge the pages.  This will either free the next
857		 * page or just update its parent pointer.
858		 */
859		PTRACE(dbc, "Merge", PGNO(cp->csp->page), start, 0);
860		if ((ret = __bam_merge(dbc,
861		     ndbc, factor, stop, c_data, &isdone)) != 0)
862			goto err1;
863
864		pgs_done++;
865
866		/*
867		 * __bam_merge could have freed our stack if it
868		 * deleted a page possibly collapsing the tree.
869		 */
870		if (cp->csp->page == NULL)
871			goto deleted;
872		/* If we did not bump to the next page something did not fit. */
873		if (npgno != NEXT_PGNO(pg))
874			break;
875	}
876
877	/* Bottom of the main loop.  Move to the next page. */
878	npgno = NEXT_PGNO(pg);
879	cp->recno += NUM_ENT(pg);
880	next_recno = cp->recno;
881
882next_page:
883	if ((ret = __bam_stkrel(dbc, pgs_done == 0 ? STK_NOLOCK : 0)) != 0)
884		goto err1;
885	if (ndbc != NULL &&
886	     (ret = __bam_stkrel(ndbc, pgs_done == 0 ? STK_NOLOCK : 0)) != 0)
887		goto err1;
888
889next_no_release:
890	pg = NULL;
891
892	if (npgno == PGNO_INVALID || c_data->compact_pages  == 0)
893		isdone = 1;
894	if (!isdone) {
895		/*
896		 * If we are at the end of this parent commit the
897		 * transaction so we don't tie things up.
898		 */
899		if (pgs_done != 0 && *spanp) {
900deleted:		if (((ret = __bam_stkrel(ndbc, 0)) != 0 ||
901			     (ret = __dbc_close(ndbc)) != 0))
902				goto err;
903			*donep = 0;
904			return (0);
905		}
906
907		/* Reget the next page to look at. */
908		cp->recno = next_recno;
909		if ((ret = __db_lget(dbc,
910		    pgs_done ? LCK_COUPLE_ALWAYS : LCK_COUPLE,
911		    npgno, DB_LOCK_READ, 0, &cp->csp->lock)) != 0 ||
912		    (ret = __memp_fget(dbmp, &npgno,
913		    dbc->thread_info, dbc->txn, 0, &pg)) != 0)
914			goto err;
915		next_p = 1;
916		goto next;
917	}
918
919done:
920	if (0) {
921		/* We come here if pg is the same as cp->csp->page. */
922err1:		pg = NULL;
923	}
924err:	/*
925	 * Don't release locks (STK_PGONLY)if we had an error, we could reveal
926	 * a bad tree to a dirty reader.  Wait till the abort to free the locks.
927	 */
928	sflag = STK_CLRDBC;
929	if (dbc->txn != NULL && ret != 0)
930		sflag |= STK_PGONLY;
931	if (dbc != NULL &&
932	    (t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0)
933		ret = t_ret;
934	if (ndbc != NULL) {
935		if ((t_ret = __bam_stkrel(ndbc, sflag)) != 0 && ret == 0)
936			ret = t_ret;
937		else if ((t_ret = __dbc_close(ndbc)) != 0 && ret == 0)
938			ret = t_ret;
939	}
940
941	if (pg != NULL && (t_ret =
942	     __memp_fput(dbmp,
943		  dbc->thread_info, pg, dbc->priority) != 0) && ret == 0)
944		ret = t_ret;
945	if (npg != NULL && (t_ret =
946	     __memp_fput(dbmp,
947		  dbc->thread_info, npg, dbc->priority) != 0) && ret == 0)
948		ret = t_ret;
949
950	*donep = isdone;
951
952	return (ret);
953}
954
955/*
956 * __bam_merge -- do actual merging of leaf pages.
957 */
958static int
959__bam_merge(dbc, ndbc, factor, stop, c_data, donep)
960	DBC *dbc, *ndbc;
961	u_int32_t factor;
962	DBT *stop;
963	DB_COMPACT *c_data;
964	int *donep;
965{
966	BTREE_CURSOR *cp, *ncp;
967	DB *dbp;
968	PAGE *pg, *npg;
969	db_indx_t nent;
970	int ret;
971
972	dbp = dbc->dbp;
973	cp = (BTREE_CURSOR *)dbc->internal;
974	ncp = (BTREE_CURSOR *)ndbc->internal;
975	pg = cp->csp->page;
976	npg = ncp->csp->page;
977
978	nent = NUM_ENT(npg);
979
980	/* If the page is empty just throw it away. */
981	if (nent == 0)
982		goto free_page;
983
984	/* Find if the stopping point is on this page. */
985	if (stop != NULL && stop->size != 0) {
986		if ((ret = __bam_compact_isdone(dbc, stop, npg, donep)) != 0)
987			return (ret);
988		if (*donep)
989			return (0);
990	}
991
992	/*
993	 * If there is too much data then just move records one at a time.
994	 * Otherwise copy the data space over and fix up the index table.
995	 * If we are on the left most child we will effect our parent's
996	 * index entry so we call merge_records to figure out key sizes.
997	 */
998	if ((dbc->dbtype == DB_BTREE &&
999	    ncp->csp[-1].indx == 0 && ncp->csp[-1].entries != 1) ||
1000	    (int)(P_FREESPACE(dbp, pg) -
1001	    ((dbp->pgsize - P_OVERHEAD(dbp)) -
1002	    P_FREESPACE(dbp, npg))) < (int)factor)
1003		ret = __bam_merge_records(dbc, ndbc, factor, c_data);
1004	else
1005free_page:	ret = __bam_merge_pages(dbc, ndbc, c_data);
1006
1007	return (ret);
1008}
1009
1010static int
1011__bam_merge_records(dbc, ndbc, factor, c_data)
1012	DBC *dbc, *ndbc;
1013	u_int32_t factor;
1014	DB_COMPACT *c_data;
1015{
1016	BINTERNAL *bi;
1017	BKEYDATA *bk, *tmp_bk;
1018	BTREE *t;
1019	BTREE_CURSOR *cp, *ncp;
1020	DB *dbp;
1021	DBT a, b, data, hdr;
1022	ENV *env;
1023	EPG *epg;
1024	PAGE *pg, *npg;
1025	db_indx_t adj, indx, nent, *ninp, pind;
1026	int32_t adjust;
1027	u_int32_t freespace, nksize, pfree, size;
1028	int first_dup, is_dup, next_dup, n_ok, ret;
1029	size_t (*func) __P((DB *, const DBT *, const DBT *));
1030
1031	dbp = dbc->dbp;
1032	env = dbp->env;
1033	t = dbp->bt_internal;
1034	cp = (BTREE_CURSOR *)dbc->internal;
1035	ncp = (BTREE_CURSOR *)ndbc->internal;
1036	pg = cp->csp->page;
1037	npg = ncp->csp->page;
1038	memset(&hdr, 0, sizeof(hdr));
1039	pind = NUM_ENT(pg);
1040	n_ok = 0;
1041	adjust = 0;
1042	ret = 0;
1043	nent = NUM_ENT(npg);
1044
1045	DB_ASSERT(env, nent != 0);
1046
1047	/* See if we want to swap out this page. */
1048	if (c_data->compact_truncate != PGNO_INVALID &&
1049	     PGNO(npg) > c_data->compact_truncate) {
1050		/* Get a fresh low numbered page. */
1051		if ((ret = __bam_truncate_page(ndbc, &npg, 1)) != 0)
1052			goto err;
1053	}
1054
1055	ninp = P_INP(dbp, npg);
1056
1057	/*
1058	 * pg is the page that is being filled, it is in the stack in cp.
1059	 * npg is the next page, it is in the stack in ncp.
1060	 */
1061	freespace = P_FREESPACE(dbp, pg);
1062
1063	adj = TYPE(npg) == P_LBTREE ? P_INDX : O_INDX;
1064	/*
1065	 * Loop through the records and find the stopping point.
1066	 */
1067	for (indx = 0; indx < nent; indx += adj)  {
1068		bk = GET_BKEYDATA(dbp, npg, indx);
1069
1070		/* Size of the key. */
1071		size = BITEM_PSIZE(bk);
1072
1073		/* Size of the data. */
1074		if (TYPE(pg) == P_LBTREE)
1075			size += BITEM_PSIZE(GET_BKEYDATA(dbp, npg, indx + 1));
1076		/*
1077		 * If we are at a duplicate set, skip ahead to see and
1078		 * get the total size for the group.
1079		 */
1080		n_ok = adj;
1081		if (TYPE(pg) == P_LBTREE &&
1082		     indx < nent - adj &&
1083		     ninp[indx] == ninp[indx + adj]) {
1084			do {
1085				/* Size of index for key reference. */
1086				size += sizeof(db_indx_t);
1087				n_ok++;
1088				/* Size of data item. */
1089				size += BITEM_PSIZE(
1090				    GET_BKEYDATA(dbp, npg, indx + n_ok));
1091				n_ok++;
1092			} while (indx + n_ok < nent &&
1093			    ninp[indx] == ninp[indx + n_ok]);
1094		}
1095		/* if the next set will not fit on the page we are done. */
1096		if (freespace < size)
1097			break;
1098
1099		/*
1100		 * Otherwise figure out if we are past the goal and if
1101		 * adding this set will put us closer to the goal than
1102		 * we are now.
1103		 */
1104		if ((freespace - size) < factor) {
1105			if (freespace - factor > factor - (freespace - size))
1106				indx += n_ok;
1107			break;
1108		}
1109		freespace -= size;
1110		indx += n_ok - adj;
1111	}
1112
1113	/* If we have hit the first record then there is nothing we can move. */
1114	if (indx == 0)
1115		goto done;
1116	if (TYPE(pg) != P_LBTREE && TYPE(pg) != P_LDUP) {
1117		if (indx == nent)
1118			return (__bam_merge_pages(dbc, ndbc, c_data));
1119		goto no_check;
1120	}
1121	/*
1122	 * We need to update npg's parent key.  Avoid creating a new key
1123	 * that will be too big. Get what space will be available on the
1124	 * parents. Then if there will not be room for this key, see if
1125	 * prefix compression will make it work, if not backup till we
1126	 * find something that will.  (Needless to say, this is a very
1127	 * unlikely event.)  If we are deleting this page then we will
1128	 * need to propagate the next key to our grand parents, so we
1129	 * see if that will fit.
1130	 */
1131	pfree = dbp->pgsize;
1132	for (epg = &ncp->csp[-1]; epg >= ncp->sp; epg--)
1133		if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) {
1134			bi = GET_BINTERNAL(dbp, epg->page, epg->indx);
1135			/* Add back in the key we will be deleting. */
1136			freespace += BINTERNAL_PSIZE(bi->len);
1137			if (freespace < pfree)
1138				pfree = freespace;
1139			if (epg->indx != 0)
1140				break;
1141		}
1142
1143	/*
1144	 * If we are at the end, we will delete this page.  We need to
1145	 * check the next parent key only if we are the leftmost page and
1146	 * will therefore have to propagate the key up the tree.
1147	 */
1148	if (indx == nent) {
1149		if (ncp->csp[-1].indx != 0 || ncp->csp[-1].entries == 1 ||
1150		     BINTERNAL_PSIZE(GET_BINTERNAL(dbp,
1151		     ncp->csp[-1].page, 1)->len) <= pfree)
1152			return (__bam_merge_pages(dbc, ndbc, c_data));
1153		indx -= adj;
1154	}
1155	bk = GET_BKEYDATA(dbp, npg, indx);
1156	if (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) {
1157		if (F_ISSET(dbc, DBC_OPD)) {
1158			if (dbp->dup_compare == __bam_defcmp)
1159				func = __bam_defpfx;
1160			else
1161				func = NULL;
1162		} else
1163			func = t->bt_prefix;
1164	} else
1165		func = NULL;
1166
1167	/* Skip to the beginning of a duplicate set. */
1168	while (indx != 0 && ninp[indx] == ninp[indx - adj])
1169		indx -= adj;
1170
1171	while (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) {
1172		if (B_TYPE(bk->type) != B_KEYDATA)
1173			goto noprefix;
1174		/*
1175		 * Figure out if we can truncate this key.
1176		 * Code borrowed from bt_split.c
1177		 */
1178		if (func == NULL)
1179			goto noprefix;
1180		tmp_bk = GET_BKEYDATA(dbp, npg, indx - adj);
1181		if (B_TYPE(tmp_bk->type) != B_KEYDATA)
1182			goto noprefix;
1183		memset(&a, 0, sizeof(a));
1184		a.size = tmp_bk->len;
1185		a.data = tmp_bk->data;
1186		memset(&b, 0, sizeof(b));
1187		b.size = bk->len;
1188		b.data = bk->data;
1189		nksize = (u_int32_t)func(dbp, &a, &b);
1190		if (BINTERNAL_PSIZE(nksize) < pfree)
1191			break;
1192noprefix:
1193		/* Skip to the beginning of a duplicate set. */
1194		do {
1195			indx -= adj;
1196		} while (indx != 0 &&  ninp[indx] == ninp[indx - adj]);
1197
1198		bk = GET_BKEYDATA(dbp, npg, indx);
1199	}
1200
1201	/*
1202	 * indx references the first record that will not move to the previous
1203	 * page.  If it is 0 then we could not find a key that would fit in
1204	 * the parent that would permit us to move any records.
1205	 */
1206	if (indx == 0)
1207		goto done;
1208	DB_ASSERT(env, indx <= nent);
1209
1210	/* Loop through the records and move them from npg to pg. */
1211no_check: is_dup = first_dup = next_dup = 0;
1212	if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page,
1213	    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
1214	    (ret = __memp_dirty(dbp->mpf, &ncp->csp->page,
1215	    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1216		goto err;
1217	pg = cp->csp->page;
1218	npg = ncp->csp->page;
1219	ninp = P_INP(dbp, npg);
1220	do {
1221		bk = GET_BKEYDATA(dbp, npg, 0);
1222		/* Figure out if we are in a duplicate group or not. */
1223		if ((NUM_ENT(npg) % 2) == 0) {
1224			if (NUM_ENT(npg) > 2 && ninp[0] == ninp[2]) {
1225				if (!is_dup) {
1226					first_dup = 1;
1227					is_dup = 1;
1228				} else
1229					first_dup = 0;
1230
1231				next_dup = 1;
1232			} else if (next_dup) {
1233				is_dup = 1;
1234				first_dup = 0;
1235				next_dup = 0;
1236			} else
1237				is_dup = 0;
1238		}
1239
1240		if (is_dup && !first_dup && (pind % 2) == 0) {
1241			/* Duplicate key. */
1242			if ((ret = __bam_adjindx(dbc,
1243			     pg, pind, pind - P_INDX, 1)) != 0)
1244				goto err;
1245			if (!next_dup)
1246				is_dup = 0;
1247		} else switch (B_TYPE(bk->type)) {
1248		case B_KEYDATA:
1249			hdr.data = bk;
1250			hdr.size = SSZA(BKEYDATA, data);
1251			data.size = bk->len;
1252			data.data = bk->data;
1253			if ((ret = __db_pitem(dbc, pg, pind,
1254			     BKEYDATA_SIZE(bk->len), &hdr, &data)) != 0)
1255				goto err;
1256			break;
1257		case B_OVERFLOW:
1258		case B_DUPLICATE:
1259			data.size = BOVERFLOW_SIZE;
1260			data.data = bk;
1261			if ((ret = __db_pitem(dbc, pg, pind,
1262			     BOVERFLOW_SIZE, &data, NULL)) != 0)
1263				goto err;
1264			break;
1265		default:
1266			__db_errx(env,
1267			    "Unknown record format, page %lu, indx 0",
1268			    (u_long)PGNO(pg));
1269			ret = EINVAL;
1270			goto err;
1271		}
1272		pind++;
1273		if (next_dup && (NUM_ENT(npg) % 2) == 0) {
1274			if ((ret = __bam_adjindx(ndbc,
1275			     npg, 0, O_INDX, 0)) != 0)
1276				goto err;
1277		} else {
1278			if ((ret = __db_ditem(ndbc,
1279			     npg, 0, BITEM_SIZE(bk))) != 0)
1280				goto err;
1281		}
1282		adjust++;
1283	} while (--indx != 0);
1284
1285	DB_ASSERT(env, NUM_ENT(npg) != 0);
1286
1287	if (adjust != 0 &&
1288	     (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))) {
1289		DB_ASSERT(env, cp->csp - cp->sp == ncp->csp - ncp->sp);
1290		if (TYPE(pg) == P_LBTREE)
1291			adjust /= P_INDX;
1292		if ((ret = __bam_adjust(ndbc, -adjust)) != 0)
1293			goto err;
1294
1295		if ((ret = __bam_adjust(dbc, adjust)) != 0)
1296			goto err;
1297	}
1298
1299	/* Update parent with new key. */
1300	if (ndbc->dbtype == DB_BTREE &&
1301	    (ret = __bam_pupdate(ndbc, pg)) != 0)
1302		goto err;
1303
1304done:	ret = __bam_stkrel(ndbc, STK_CLRDBC);
1305
1306err:	return (ret);
1307}
1308
1309static int
1310__bam_merge_pages(dbc, ndbc, c_data)
1311	DBC *dbc, *ndbc;
1312	DB_COMPACT *c_data;
1313{
1314	BTREE_CURSOR *cp, *ncp;
1315	DB *dbp;
1316	DBT data, hdr;
1317	DB_MPOOLFILE *dbmp;
1318	PAGE *pg, *npg;
1319	db_indx_t nent, *ninp, *pinp;
1320	db_pgno_t ppgno;
1321	u_int8_t *bp;
1322	u_int32_t len;
1323	int i, level, ret;
1324
1325	COMPQUIET(ppgno, PGNO_INVALID);
1326	dbp = dbc->dbp;
1327	dbmp = dbp->mpf;
1328	cp = (BTREE_CURSOR *)dbc->internal;
1329	ncp = (BTREE_CURSOR *)ndbc->internal;
1330	pg = cp->csp->page;
1331	npg = ncp->csp->page;
1332	memset(&hdr, 0, sizeof(hdr));
1333	nent = NUM_ENT(npg);
1334
1335	/* If the page is empty just throw it away. */
1336	if (nent == 0)
1337		goto free_page;
1338
1339	if ((ret = __memp_dirty(dbp->mpf, &cp->csp->page,
1340	    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
1341	    (ret = __memp_dirty(dbp->mpf, &ncp->csp->page,
1342	    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1343		goto err;
1344	pg = cp->csp->page;
1345	npg = ncp->csp->page;
1346	DB_ASSERT(dbp->env, nent == NUM_ENT(npg));
1347
1348	/* Bulk copy the data to the new page. */
1349	len = dbp->pgsize - HOFFSET(npg);
1350	if (DBC_LOGGING(dbc)) {
1351		hdr.data = npg;
1352		hdr.size = LOFFSET(dbp, npg);
1353		data.data = (u_int8_t *)npg + HOFFSET(npg);
1354		data.size = len;
1355		if ((ret = __bam_merge_log(dbp,
1356		     dbc->txn, &LSN(pg), 0, PGNO(pg),
1357		     &LSN(pg), PGNO(npg), &LSN(npg), &hdr, &data, 0)) != 0)
1358			goto err;
1359	} else
1360		LSN_NOT_LOGGED(LSN(pg));
1361	LSN(npg) = LSN(pg);
1362	bp = (u_int8_t *)pg + HOFFSET(pg) - len;
1363	memcpy(bp, (u_int8_t *)npg + HOFFSET(npg), len);
1364
1365	/* Copy index table offset by what was there already. */
1366	pinp = P_INP(dbp, pg) + NUM_ENT(pg);
1367	ninp = P_INP(dbp, npg);
1368	for (i = 0; i < NUM_ENT(npg); i++)
1369		*pinp++ = *ninp++ - (dbp->pgsize - HOFFSET(pg));
1370	HOFFSET(pg) -= len;
1371	NUM_ENT(pg) += i;
1372
1373	NUM_ENT(npg) = 0;
1374	HOFFSET(npg) += len;
1375
1376	if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD)) {
1377		DB_ASSERT(dbp->env, cp->csp - cp->sp == ncp->csp - ncp->sp);
1378		if (TYPE(pg) == P_LBTREE)
1379			i /= P_INDX;
1380		if ((ret = __bam_adjust(ndbc, -i)) != 0)
1381			goto err;
1382
1383		if ((ret = __bam_adjust(dbc, i)) != 0)
1384			goto err;
1385	}
1386
1387free_page:
1388	/*
1389	 * __bam_dpages may decide to collapse the tree.
1390	 * This can happen if we have the root and there
1391	 * are exactly 2 pointers left in it.
1392	 * If it can collapse the tree we must free the other
1393	 * stack since it will nolonger be valid.  This
1394	 * must be done before hand because we cannot
1395	 * hold a page pinned if it might be truncated.
1396	 */
1397	if (PGNO(ncp->sp->page) == ncp->root &&
1398	    NUM_ENT(ncp->sp->page) == 2) {
1399		if ((ret = __bam_stkrel(dbc, STK_CLRDBC | STK_PGONLY)) != 0)
1400			goto err;
1401		level = LEVEL(ncp->sp->page);
1402		ppgno = PGNO(ncp->csp[-1].page);
1403	} else
1404		level = 0;
1405	if (c_data->compact_truncate > PGNO(npg))
1406		c_data->compact_truncate--;
1407	if ((ret = __bam_dpages(ndbc,
1408	    0, ndbc->dbtype == DB_RECNO ? 0 : 1)) != 0)
1409		goto err;
1410	npg = NULL;
1411	c_data->compact_pages_free++;
1412	c_data->compact_pages--;
1413	if (level != 0) {
1414		if ((ret = __memp_fget(dbmp, &ncp->root,
1415		    dbc->thread_info, dbc->txn, 0, &npg)) != 0)
1416			goto err;
1417		if (level == LEVEL(npg))
1418			level = 0;
1419		if ((ret = __memp_fput(dbmp,
1420		     dbc->thread_info, npg, dbc->priority)) != 0)
1421			goto err;
1422		npg = NULL;
1423		if (level != 0) {
1424			c_data->compact_levels++;
1425			c_data->compact_pages_free++;
1426			if (c_data->compact_truncate > ppgno)
1427				c_data->compact_truncate--;
1428			if (c_data->compact_pages != 0)
1429				c_data->compact_pages--;
1430		}
1431	}
1432
1433err:	return (ret);
1434}
1435
1436/*
1437 * __bam_merge_internal --
1438 *	Merge internal nodes of the tree.
1439 */
1440static int
1441__bam_merge_internal(dbc, ndbc, level, c_data, merged)
1442	DBC *dbc, *ndbc;
1443	int level;
1444	DB_COMPACT *c_data;
1445	int *merged;
1446{
1447	BINTERNAL bi, *bip, *fip;
1448	BTREE_CURSOR *cp, *ncp;
1449	DB *dbp;
1450	DBT data, hdr;
1451	DB_MPOOLFILE *dbmp;
1452	EPG *epg, *save_csp, *nsave_csp;
1453	PAGE *pg, *npg;
1454	RINTERNAL *rk;
1455	db_indx_t indx, pind;
1456	db_pgno_t ppgno;
1457	int32_t trecs;
1458	u_int16_t size;
1459	u_int32_t freespace, pfree;
1460	int ret;
1461
1462	COMPQUIET(bip, NULL);
1463	COMPQUIET(ppgno, PGNO_INVALID);
1464
1465	/*
1466	 * ndbc will contain the the dominating parent of the subtree.
1467	 * dbc will have the tree containing the left child.
1468	 *
1469	 * The stacks descend to the leaf level.
1470	 * If this is a recno tree then both stacks will start at the root.
1471	 */
1472	dbp = dbc->dbp;
1473	dbmp = dbp->mpf;
1474	cp = (BTREE_CURSOR *)dbc->internal;
1475	ncp = (BTREE_CURSOR *)ndbc->internal;
1476	*merged = 0;
1477	ret = 0;
1478
1479	/*
1480	 * Set the stacks to the level requested.
1481	 * Save the old value to restore when we exit.
1482	 */
1483	save_csp = cp->csp;
1484	cp->csp = &cp->csp[-level + 1];
1485	pg = cp->csp->page;
1486	pind = NUM_ENT(pg);
1487
1488	nsave_csp = ncp->csp;
1489	ncp->csp = &ncp->csp[-level + 1];
1490	npg = ncp->csp->page;
1491	indx = NUM_ENT(npg);
1492
1493	/*
1494	 * The caller may have two stacks that include common ancestors, we
1495	 * check here for convenience.
1496	 */
1497	if (npg == pg)
1498		goto done;
1499
1500	if ((ret = __memp_dirty(dbmp, &cp->csp->page,
1501	    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
1502	    (ret = __memp_dirty(dbmp, &ncp->csp->page,
1503	    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1504		goto err;
1505	pg = cp->csp->page;
1506	npg = ncp->csp->page;
1507
1508	if (TYPE(pg) == P_IBTREE) {
1509		/*
1510		 * Check for overflow keys on both pages while we have
1511		 * them locked.
1512		 */
1513		 if ((ret =
1514		      __bam_truncate_internal_overflow(dbc, pg, c_data)) != 0)
1515			goto err;
1516		 if ((ret =
1517		      __bam_truncate_internal_overflow(dbc, npg, c_data)) != 0)
1518			goto err;
1519	}
1520
1521	/*
1522	 * If we are about to move data off the left most page of an
1523	 * internal node we will need to update its parents, make sure there
1524	 * will be room for the new key on all the parents in the stack.
1525	 * If not, move less data.
1526	 */
1527	fip = NULL;
1528	if (TYPE(pg) == P_IBTREE) {
1529		/* See where we run out of space. */
1530		freespace = P_FREESPACE(dbp, pg);
1531		/*
1532		 * The leftmost key of an internal page is not accurate.
1533		 * Go up the tree to find a non-leftmost parent.
1534		 */
1535		epg = ncp->csp;
1536		while (--epg >= ncp->sp && epg->indx == 0)
1537			continue;
1538		fip = bip = GET_BINTERNAL(dbp, epg->page, epg->indx);
1539		epg = ncp->csp;
1540
1541		for (indx = 0;;) {
1542			size = BINTERNAL_PSIZE(bip->len);
1543			if (size > freespace)
1544				break;
1545			freespace -= size;
1546			if (++indx >= NUM_ENT(npg))
1547				break;
1548			bip = GET_BINTERNAL(dbp, npg, indx);
1549		}
1550
1551		/* See if we are deleting the page and we are not left most. */
1552		if (indx == NUM_ENT(npg) && epg[-1].indx != 0)
1553			goto fits;
1554
1555		pfree = dbp->pgsize;
1556		for (epg--; epg >= ncp->sp; epg--)
1557			if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) {
1558				bip = GET_BINTERNAL(dbp, epg->page, epg->indx);
1559				/* Add back in the key we will be deleting. */
1560				freespace += BINTERNAL_PSIZE(bip->len);
1561				if (freespace < pfree)
1562					pfree = freespace;
1563				if (epg->indx != 0)
1564					break;
1565			}
1566		epg = ncp->csp;
1567
1568		/* If we are at the end of the page we will delete it. */
1569		if (indx == NUM_ENT(npg)) {
1570			if (NUM_ENT(epg[-1].page) == 1)
1571				goto fits;
1572			bip =
1573			     GET_BINTERNAL(dbp, epg[-1].page, epg[-1].indx + 1);
1574		} else
1575			bip = GET_BINTERNAL(dbp, npg, indx);
1576
1577		/* Back up until we have a key that fits. */
1578		while (indx != 0 && BINTERNAL_PSIZE(bip->len) > pfree) {
1579			indx--;
1580			bip = GET_BINTERNAL(dbp, npg, indx);
1581		}
1582		if (indx == 0)
1583			goto done;
1584	}
1585
1586fits:	memset(&bi, 0, sizeof(bi));
1587	memset(&hdr, 0, sizeof(hdr));
1588	memset(&data, 0, sizeof(data));
1589	trecs = 0;
1590
1591	/*
1592	 * Copy data between internal nodes till one is full
1593	 * or the other is empty.
1594	 */
1595	do {
1596		if (dbc->dbtype == DB_BTREE) {
1597			bip = GET_BINTERNAL(dbp, npg, 0);
1598			size = fip == NULL ?
1599			     BINTERNAL_SIZE(bip->len) :
1600			     BINTERNAL_SIZE(fip->len);
1601			if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t))
1602				break;
1603
1604			if (fip == NULL) {
1605				data.size = bip->len;
1606				data.data = bip->data;
1607			} else {
1608				data.size = fip->len;
1609				data.data = fip->data;
1610			}
1611			bi.len = data.size;
1612			B_TSET(bi.type, bip->type);
1613			bi.pgno = bip->pgno;
1614			bi.nrecs = bip->nrecs;
1615			hdr.data = &bi;
1616			hdr.size = SSZA(BINTERNAL, data);
1617			if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))
1618				trecs += (int32_t)bip->nrecs;
1619		} else {
1620			rk = GET_RINTERNAL(dbp, npg, 0);
1621			size = RINTERNAL_SIZE;
1622			if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t))
1623				break;
1624
1625			hdr.data = rk;
1626			hdr.size = size;
1627			trecs += (int32_t)rk->nrecs;
1628		}
1629		if ((ret = __db_pitem(dbc, pg, pind, size, &hdr, &data)) != 0)
1630			goto err;
1631		pind++;
1632		if (fip != NULL) {
1633			/* reset size to be for the record being deleted. */
1634			size = BINTERNAL_SIZE(bip->len);
1635			fip = NULL;
1636		}
1637		if ((ret = __db_ditem(ndbc, npg, 0, size)) != 0)
1638			goto err;
1639		*merged = 1;
1640	} while (--indx != 0);
1641
1642	if (c_data->compact_truncate != PGNO_INVALID &&
1643	     PGNO(pg) > c_data->compact_truncate && cp->csp != cp->sp) {
1644		if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0)
1645			goto err;
1646	}
1647
1648	if (NUM_ENT(npg) != 0 && c_data->compact_truncate != PGNO_INVALID &&
1649	    PGNO(npg) > c_data->compact_truncate && ncp->csp != ncp->sp) {
1650		if ((ret = __bam_truncate_page(ndbc, &npg, 1)) != 0)
1651			goto err;
1652	}
1653
1654	if (!*merged)
1655		goto done;
1656
1657	if (trecs != 0) {
1658		DB_ASSERT(dbp->env, cp->csp - cp->sp == ncp->csp - ncp->sp);
1659		cp->csp--;
1660		if ((ret = __bam_adjust(dbc, trecs)) != 0)
1661			goto err;
1662
1663		ncp->csp--;
1664		if ((ret = __bam_adjust(ndbc, -trecs)) != 0)
1665			goto err;
1666		ncp->csp++;
1667	}
1668	cp->csp = save_csp;
1669
1670	/*
1671	 * Either we emptied the page or we need to update its
1672	 * parent to reflect the first page we now point to.
1673	 * First get rid of the bottom of the stack,
1674	 * bam_dpages will clear the stack.  We can drop
1675	 * the locks on those pages as we have not done
1676	 * anything to them.
1677	 */
1678	do {
1679		if ((ret = __memp_fput(dbmp, dbc->thread_info,
1680		    nsave_csp->page, dbc->priority)) != 0)
1681			goto err;
1682		if ((ret = __LPUT(dbc, nsave_csp->lock)) != 0)
1683			goto err;
1684		nsave_csp--;
1685	} while (nsave_csp != ncp->csp);
1686
1687	if (NUM_ENT(npg) == 0)  {
1688		/*
1689		 * __bam_dpages may decide to collapse the tree
1690		 * so we need to free our other stack.  The tree
1691		 * will change in hight and our stack will nolonger
1692		 * be valid.
1693		 */
1694		if (PGNO(ncp->sp->page) == ncp->root &&
1695		    NUM_ENT(ncp->sp->page) == 2) {
1696			if ((ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0)
1697				goto err;
1698			level = LEVEL(ncp->sp->page);
1699			ppgno = PGNO(ncp->csp[-1].page);
1700		} else
1701			level = 0;
1702
1703		if (c_data->compact_truncate > PGNO(npg))
1704			c_data->compact_truncate--;
1705		ret = __bam_dpages(ndbc,
1706		     0, ndbc->dbtype == DB_RECNO ? 0 : 1);
1707		c_data->compact_pages_free++;
1708		if (ret == 0 && level != 0) {
1709			if ((ret = __memp_fget(dbmp, &ncp->root,
1710			    dbc->thread_info, dbc->txn, 0, &npg)) != 0)
1711				goto err;
1712			if (level == LEVEL(npg))
1713				level = 0;
1714			if ((ret = __memp_fput(dbmp,
1715			    dbc->thread_info, npg, dbc->priority)) != 0)
1716				goto err;
1717			npg = NULL;
1718			if (level != 0) {
1719				c_data->compact_levels++;
1720				c_data->compact_pages_free++;
1721				if (c_data->compact_truncate > ppgno)
1722					c_data->compact_truncate--;
1723				if (c_data->compact_pages != 0)
1724					c_data->compact_pages--;
1725			}
1726		}
1727	} else
1728		ret = __bam_pupdate(ndbc, npg);
1729	return (ret);
1730
1731done:
1732err:	cp->csp = save_csp;
1733	ncp->csp = nsave_csp;
1734
1735	return (ret);
1736}
1737
1738/*
1739 * __bam_compact_dups -- try to compress off page dup trees.
1740 * We may or may not have a write lock on this page.
1741 */
1742static int
1743__bam_compact_dups(dbc, ppg, factor, have_lock, c_data, donep)
1744	DBC *dbc;
1745	PAGE **ppg;
1746	u_int32_t factor;
1747	int have_lock;
1748	DB_COMPACT *c_data;
1749	int *donep;
1750{
1751	BOVERFLOW *bo;
1752	BTREE_CURSOR *cp;
1753	DB *dbp;
1754	DBC *opd;
1755	DBT start;
1756	DB_MPOOLFILE *dbmp;
1757	ENV *env;
1758	PAGE *dpg, *pg;
1759	db_indx_t i;
1760	int isdone, level, ret, span, t_ret;
1761
1762	span = 0;
1763	ret = 0;
1764	opd = NULL;
1765
1766	dbp = dbc->dbp;
1767	env = dbp->env;
1768	dbmp = dbp->mpf;
1769	cp = (BTREE_CURSOR *)dbc->internal;
1770	pg = *ppg;
1771
1772	for (i = 0; i <  NUM_ENT(pg); i++) {
1773		bo = GET_BOVERFLOW(dbp, pg, i);
1774		if (B_TYPE(bo->type) == B_KEYDATA)
1775			continue;
1776		c_data->compact_pages_examine++;
1777		if (bo->pgno > c_data->compact_truncate) {
1778			(*donep)++;
1779			if (!have_lock) {
1780				if ((ret = __db_lget(dbc, 0, PGNO(pg),
1781				     DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0)
1782					goto err;
1783				have_lock = 1;
1784				if ((ret = __memp_dirty(dbp->mpf, ppg,
1785				    dbc->thread_info,
1786				    dbc->txn, dbc->priority, 0)) != 0)
1787					goto err;
1788				pg = *ppg;
1789			}
1790			if ((ret =
1791			     __bam_truncate_root_page(dbc, pg, i, c_data)) != 0)
1792				goto err;
1793			/* Just in case it should move.  Could it? */
1794			bo = GET_BOVERFLOW(dbp, pg, i);
1795		}
1796
1797		if (B_TYPE(bo->type) == B_OVERFLOW) {
1798			if ((ret = __bam_truncate_overflow(dbc, bo->pgno,
1799			     have_lock ? PGNO_INVALID : PGNO(pg), c_data)) != 0)
1800				goto err;
1801			(*donep)++;
1802			continue;
1803		}
1804		/*
1805		 * Take a peek at the root.  If it's a leaf then
1806		 * there is no tree here, avoid all the trouble.
1807		 */
1808		if ((ret = __memp_fget(dbmp, &bo->pgno,
1809		     dbc->thread_info, dbc->txn, 0, &dpg)) != 0)
1810			goto err;
1811
1812		level = dpg->level;
1813		if ((ret = __memp_fput(dbmp,
1814		     dbc->thread_info, dpg, dbc->priority)) != 0)
1815			goto err;
1816		if (level == LEAFLEVEL)
1817			continue;
1818		if ((ret = __dbc_newopd(dbc, bo->pgno, NULL, &opd)) != 0)
1819			return (ret);
1820		if (!have_lock) {
1821			if ((ret = __db_lget(dbc, 0,
1822			     PGNO(pg), DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0)
1823				goto err;
1824			have_lock = 1;
1825			if ((ret = __memp_dirty(dbp->mpf, ppg,
1826			    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1827				goto err;
1828			pg = *ppg;
1829		}
1830		(*donep)++;
1831		memset(&start, 0, sizeof(start));
1832		do {
1833			if ((ret = __bam_compact_int(opd, &start,
1834			     NULL, factor, &span, c_data, &isdone)) != 0)
1835				break;
1836		} while (!isdone);
1837
1838		if (start.data != NULL)
1839			__os_free(env, start.data);
1840
1841		if (ret != 0)
1842			goto err;
1843
1844		ret = __dbc_close(opd);
1845		opd = NULL;
1846		if (ret != 0)
1847			goto err;
1848	}
1849
1850err:	if (opd != NULL && (t_ret = __dbc_close(opd)) != 0 && ret == 0)
1851		ret = t_ret;
1852	return (ret);
1853}
1854
1855/*
1856 * __bam_truncate_page -- swap a page with a lower numbered page.
1857 *	The cusor has a stack which includes at least the
1858 * immediate parent of this page.
1859 */
1860static int
1861__bam_truncate_page(dbc, pgp, update_parent)
1862	DBC *dbc;
1863	PAGE **pgp;
1864	int update_parent;
1865{
1866	BTREE_CURSOR *cp;
1867	DB *dbp;
1868	DBT data, hdr;
1869	DB_LSN lsn;
1870	EPG *epg;
1871	PAGE *newpage;
1872	db_pgno_t newpgno, *pgnop;
1873	int ret;
1874
1875	dbp = dbc->dbp;
1876
1877	/*
1878	 * We want to free a page that lives in the part of the file that
1879	 * can be truncated, so we're going to move it onto a free page
1880	 * that is in the part of the file that need not be truncated.
1881	 * Since the freelist is ordered now, we can simply call __db_new
1882	 * which will grab the first element off the freelist; we know this
1883	 * is the lowest numbered free page.
1884	 */
1885	if ((ret = __db_new(dbc, P_DONTEXTEND | TYPE(*pgp), &newpage)) != 0)
1886		return (ret);
1887
1888	/*
1889	 * If newpage is null then __db_new would have had to allocate
1890	 * a new page from the filesystem, so there is no reason
1891	 * to continue this action.
1892	 */
1893	if (newpage == NULL)
1894		return (0);
1895
1896	/*
1897	 * It is possible that a higher page is allocated if other threads
1898	 * are allocating at the same time, if so, just put it back.
1899	 */
1900	if (PGNO(newpage) > PGNO(*pgp)) {
1901		/* Its unfortunate but you can't just free a new overflow. */
1902		if (TYPE(newpage) == P_OVERFLOW)
1903			OV_LEN(newpage) = 0;
1904		return (__db_free(dbc, newpage));
1905	}
1906
1907	if ((ret = __memp_dirty(dbp->mpf,
1908	    &newpage, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1909		goto err;
1910
1911	/* Log if necessary. */
1912	if (DBC_LOGGING(dbc)) {
1913		hdr.data = *pgp;
1914		hdr.size = P_OVERHEAD(dbp);
1915		if (TYPE(*pgp) == P_OVERFLOW) {
1916			data.data = (u_int8_t *)*pgp + P_OVERHEAD(dbp);
1917			data.size = OV_LEN(*pgp);
1918		} else {
1919			data.data = (u_int8_t *)*pgp + HOFFSET(*pgp);
1920			data.size = dbp->pgsize - HOFFSET(*pgp);
1921			hdr.size += NUM_ENT(*pgp) * sizeof(db_indx_t);
1922		}
1923		if ((ret = __bam_merge_log(dbp, dbc->txn,
1924		      &LSN(newpage), 0, PGNO(newpage), &LSN(newpage),
1925		      PGNO(*pgp), &LSN(*pgp), &hdr, &data, 1)) != 0)
1926			goto err;
1927	} else
1928		LSN_NOT_LOGGED(LSN(newpage));
1929
1930	newpgno = PGNO(newpage);
1931	lsn = LSN(newpage);
1932	memcpy(newpage, *pgp, dbp->pgsize);
1933	PGNO(newpage) = newpgno;
1934	LSN(newpage) = lsn;
1935
1936	/* Empty the old page. */
1937	if ((ret = __memp_dirty(dbp->mpf,
1938	    pgp, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1939		goto err;
1940	if (TYPE(*pgp) == P_OVERFLOW)
1941		OV_LEN(*pgp) = 0;
1942	else {
1943		HOFFSET(*pgp) = dbp->pgsize;
1944		NUM_ENT(*pgp) = 0;
1945	}
1946	LSN(*pgp) = lsn;
1947
1948	/* Update siblings. */
1949	switch (TYPE(newpage)) {
1950	case P_OVERFLOW:
1951	case P_LBTREE:
1952	case P_LRECNO:
1953	case P_LDUP:
1954		if (NEXT_PGNO(newpage) == PGNO_INVALID &&
1955		    PREV_PGNO(newpage) == PGNO_INVALID)
1956			break;
1957		if ((ret = __bam_relink(dbc, *pgp, PGNO(newpage))) != 0)
1958			goto err;
1959		break;
1960	default:
1961		break;
1962	}
1963	cp = (BTREE_CURSOR *)dbc->internal;
1964
1965	/*
1966	 * Now, if we free this page, it will get truncated, when we free
1967	 * all the pages after it in the file.
1968	 */
1969	ret = __db_free(dbc, *pgp);
1970	/* db_free always puts the page. */
1971	*pgp = newpage;
1972
1973	if (ret != 0)
1974		return (ret);
1975
1976	if (!update_parent)
1977		goto done;
1978
1979	/* Update the parent. */
1980	epg = &cp->csp[-1];
1981	if ((ret = __memp_dirty(dbp->mpf,
1982	    &epg->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1983		return (ret);
1984
1985	switch (TYPE(epg->page)) {
1986	case P_IBTREE:
1987		pgnop = &GET_BINTERNAL(dbp, epg->page, epg->indx)->pgno;
1988		break;
1989	case P_IRECNO:
1990		pgnop = &GET_RINTERNAL(dbp, epg->page, epg->indx)->pgno;
1991		break;
1992	default:
1993		pgnop = &GET_BOVERFLOW(dbp, epg->page, epg->indx)->pgno;
1994		break;
1995	}
1996	if (DBC_LOGGING(dbc)) {
1997		if ((ret = __bam_pgno_log(dbp, dbc->txn, &LSN(epg->page),
1998		    0, PGNO(epg->page), &LSN(epg->page), (u_int32_t)epg->indx,
1999		    *pgnop, PGNO(newpage))) != 0)
2000			return (ret);
2001	} else
2002		LSN_NOT_LOGGED(LSN(epg->page));
2003
2004	*pgnop = PGNO(newpage);
2005	cp->csp->page = newpage;
2006
2007done:	return (0);
2008
2009err:	(void)__memp_fput(dbp->mpf, dbc->thread_info, newpage, dbc->priority);
2010	return (ret);
2011}
2012
2013/*
2014 * __bam_truncate_overflow -- find overflow pages to truncate.
2015 *	Walk the pages of an overflow chain and swap out
2016 * high numbered pages.  We are passed the first page
2017 * but only deal with the second and subsequent pages.
2018 */
2019
2020static int
2021__bam_truncate_overflow(dbc, pgno, pg_lock, c_data)
2022	DBC *dbc;
2023	db_pgno_t pgno;
2024	db_pgno_t pg_lock;
2025	DB_COMPACT *c_data;
2026{
2027	DB *dbp;
2028	DB_LOCK lock;
2029	PAGE *page;
2030	int ret, t_ret;
2031
2032	dbp = dbc->dbp;
2033	page = NULL;
2034	LOCK_INIT(lock);
2035
2036	if ((ret = __memp_fget(dbp->mpf, &pgno,
2037	     dbc->thread_info, dbc->txn, 0, &page)) != 0)
2038		return (ret);
2039
2040	while ((pgno = NEXT_PGNO(page)) != PGNO_INVALID) {
2041		if ((ret = __memp_fput(dbp->mpf,
2042		     dbc->thread_info, page, dbc->priority)) != 0)
2043			return (ret);
2044		if ((ret = __memp_fget(dbp->mpf, &pgno,
2045		    dbc->thread_info, dbc->txn, 0, &page)) != 0)
2046			return (ret);
2047		if (pgno <= c_data->compact_truncate)
2048			continue;
2049		if (pg_lock != PGNO_INVALID) {
2050			if ((ret = __db_lget(dbc,
2051			     0, pg_lock, DB_LOCK_WRITE, 0, &lock)) != 0)
2052				break;
2053			pg_lock = PGNO_INVALID;
2054		}
2055		if ((ret = __bam_truncate_page(dbc, &page, 0)) != 0)
2056			break;
2057	}
2058
2059	if (page != NULL &&
2060	    (t_ret = __memp_fput( dbp->mpf,
2061	    dbc->thread_info, page, dbc->priority)) != 0 && ret == 0)
2062		ret = t_ret;
2063	if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
2064		ret = t_ret;
2065	return (ret);
2066}
2067
2068/*
2069 * __bam_truncate_root_page -- swap a page which is
2070 *    the root of an off page dup tree or the head of an overflow.
2071 * The page is reference by the pg/indx passed in.
2072 */
2073static int
2074__bam_truncate_root_page(dbc, pg, indx, c_data)
2075	DBC *dbc;
2076	PAGE *pg;
2077	u_int32_t indx;
2078	DB_COMPACT *c_data;
2079{
2080	BINTERNAL *bi;
2081	BOVERFLOW *bo;
2082	DB *dbp;
2083	DBT orig;
2084	PAGE *page;
2085	db_pgno_t newpgno, *pgnop;
2086	int ret, t_ret;
2087
2088	COMPQUIET(c_data, NULL);
2089	COMPQUIET(bo, NULL);
2090	COMPQUIET(newpgno, PGNO_INVALID);
2091	dbp = dbc->dbp;
2092	page = NULL;
2093	if (TYPE(pg) == P_IBTREE) {
2094		bi = GET_BINTERNAL(dbp, pg, indx);
2095		if (B_TYPE(bi->type) == B_OVERFLOW) {
2096			bo = (BOVERFLOW *)(bi->data);
2097			pgnop = &bo->pgno;
2098		} else
2099			pgnop = &bi->pgno;
2100	} else {
2101		bo = GET_BOVERFLOW(dbp, pg, indx);
2102		pgnop = &bo->pgno;
2103	}
2104
2105	DB_ASSERT(dbp->env, IS_DIRTY(pg));
2106
2107	if ((ret = __memp_fget(dbp->mpf, pgnop,
2108	     dbc->thread_info, dbc->txn, 0, &page)) != 0)
2109		goto err;
2110
2111	/*
2112	 * If this is a multiply reference overflow key, then we will just
2113	 * copy it and decrement the reference count.  This is part of a
2114	 * fix to get rid of multiple references.
2115	 */
2116	if (TYPE(page) == P_OVERFLOW && OV_REF(page) > 1) {
2117		if ((ret = __db_ovref(dbc, bo->pgno)) != 0)
2118			goto err;
2119		memset(&orig, 0, sizeof(orig));
2120		if ((ret = __db_goff(dbp, dbc->thread_info, dbc->txn, &orig,
2121		    bo->tlen, bo->pgno, &orig.data, &orig.size)) == 0)
2122			ret = __db_poff(dbc, &orig, &newpgno);
2123		if (orig.data != NULL)
2124			__os_free(dbp->env, orig.data);
2125		if (ret != 0)
2126			goto err;
2127	} else {
2128		if ((ret = __bam_truncate_page(dbc, &page, 0)) != 0)
2129			goto err;
2130		newpgno = PGNO(page);
2131		/* If we could not allocate from the free list, give up.*/
2132		if (newpgno == *pgnop)
2133			goto err;
2134	}
2135
2136	/* Update the reference. */
2137	if (DBC_LOGGING(dbc)) {
2138		if ((ret = __bam_pgno_log(dbp,
2139		     dbc->txn, &LSN(pg), 0, PGNO(pg),
2140		     &LSN(pg), (u_int32_t)indx, *pgnop, newpgno)) != 0)
2141			goto err;
2142	} else
2143		LSN_NOT_LOGGED(LSN(pg));
2144
2145	*pgnop = newpgno;
2146
2147err:	if (page != NULL && (t_ret =
2148	      __memp_fput(dbp->mpf, dbc->thread_info,
2149		  page, dbc->priority)) != 0 && ret == 0)
2150		ret = t_ret;
2151	return (ret);
2152}
2153
2154/*
2155 * -- bam_truncate_internal_overflow -- find overflow keys
2156 *	on internal pages and if they have high page
2157 * numbers swap them with lower pages and truncate them.
2158 * Note that if there are overflow keys in the internal
2159 * nodes they will get copied adding pages to the database.
2160 */
2161static int
2162__bam_truncate_internal_overflow(dbc, page, c_data)
2163	DBC *dbc;
2164	PAGE *page;
2165	DB_COMPACT *c_data;
2166{
2167	BINTERNAL *bi;
2168	BOVERFLOW *bo;
2169	db_indx_t indx;
2170	int ret;
2171
2172	COMPQUIET(bo, NULL);
2173	ret = 0;
2174	for (indx = 0; indx < NUM_ENT(page); indx++) {
2175		bi = GET_BINTERNAL(dbc->dbp, page, indx);
2176		if (B_TYPE(bi->type) != B_OVERFLOW)
2177			continue;
2178		bo = (BOVERFLOW *)(bi->data);
2179		if (bo->pgno > c_data->compact_truncate && (ret =
2180		     __bam_truncate_root_page(dbc, page, indx, c_data)) != 0)
2181			break;
2182		if ((ret = __bam_truncate_overflow(
2183		     dbc, bo->pgno, PGNO_INVALID, c_data)) != 0)
2184			break;
2185	}
2186	return (ret);
2187}
2188
2189/*
2190 * __bam_compact_isdone ---
2191 *
2192 * Check to see if the stop key specified by the caller is on the
2193 * current page, in which case we are done compacting.
2194 */
2195static int
2196__bam_compact_isdone(dbc, stop, pg, isdone)
2197	DBC *dbc;
2198	DBT *stop;
2199	PAGE *pg;
2200	int *isdone;
2201{
2202	db_recno_t recno;
2203	BTREE *t;
2204	BTREE_CURSOR *cp;
2205	int cmp, ret;
2206
2207	*isdone = 0;
2208	cp = (BTREE_CURSOR *)dbc->internal;
2209	t = dbc->dbp->bt_internal;
2210
2211	if (dbc->dbtype == DB_RECNO) {
2212		if ((ret = __ram_getno(dbc, stop, &recno, 0)) != 0)
2213			return (ret);
2214		*isdone = cp->recno > recno;
2215	} else {
2216		DB_ASSERT(dbc->dbp->env, TYPE(pg) == P_LBTREE);
2217		if ((ret = __bam_cmp(dbc->dbp, dbc->thread_info, dbc->txn,
2218		    stop, pg, 0, t->bt_compare, &cmp)) != 0)
2219			return (ret);
2220
2221		*isdone = cmp <= 0;
2222	}
2223	return (0);
2224}
2225
2226#ifdef HAVE_FTRUNCATE
2227/*
2228 * __bam_savekey -- save the key from an internal page.
2229 *  We need to save information so that we can
2230 * fetch then next internal node of the tree.  This means
2231 * we need the btree key on this current page, or the
2232 * next record number.
2233 */
2234static int
2235__bam_savekey(dbc, next, start)
2236	DBC *dbc;
2237	int next;
2238	DBT *start;
2239{
2240	BINTERNAL *bi;
2241	BKEYDATA *bk;
2242	BOVERFLOW *bo;
2243	BTREE_CURSOR *cp;
2244	DB *dbp;
2245	ENV *env;
2246	PAGE *pg;
2247	RINTERNAL *ri;
2248	db_indx_t indx, top;
2249	db_pgno_t pgno;
2250	int ret, t_ret;
2251	u_int32_t len;
2252	u_int8_t *data;
2253
2254	dbp = dbc->dbp;
2255	env = dbp->env;
2256	cp = (BTREE_CURSOR *)dbc->internal;
2257	pg = cp->csp->page;
2258
2259	if (dbc->dbtype == DB_RECNO) {
2260		if (next)
2261			for (indx = 0, top = NUM_ENT(pg); indx != top; indx++) {
2262				ri = GET_RINTERNAL(dbp, pg, indx);
2263				cp->recno += ri->nrecs;
2264			}
2265		return (__db_retcopy(env, start, &cp->recno,
2266		     sizeof(cp->recno), &start->data, &start->ulen));
2267
2268	}
2269	bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1);
2270	data = bi->data;
2271	len = bi->len;
2272	/* If there is single record on the page it may have an empty key. */
2273	while (len == 0) {
2274		/*
2275		 * We should not have an empty data page, since we just
2276		 * compacted things, check anyway and punt.
2277		 */
2278		if (NUM_ENT(pg) == 0)
2279			goto no_key;
2280		pgno = bi->pgno;
2281		if (pg != cp->csp->page &&
2282		    (ret = __memp_fput(dbp->mpf,
2283		    dbc->thread_info, pg, dbc->priority)) != 0) {
2284			pg = NULL;
2285			goto err;
2286		}
2287		if ((ret = __memp_fget(dbp->mpf, &pgno,
2288		     dbc->thread_info, dbc->txn, 0, &pg)) != 0)
2289			goto err;
2290
2291		/*
2292		 * At the data level use the last key to try and avoid the
2293		 * possibility that the user has a zero length key, if they
2294		 * do, we punt.
2295		 */
2296		if (pg->level == LEAFLEVEL) {
2297			bk = GET_BKEYDATA(dbp, pg, NUM_ENT(pg) - 2);
2298			data = bk->data;
2299			len = bk->len;
2300			if (len == 0) {
2301no_key:				__db_errx(env,
2302				    "Compact cannot handle zero length key");
2303				ret = DB_NOTFOUND;
2304				goto err;
2305			}
2306		} else {
2307			bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1);
2308			data = bi->data;
2309			len = bi->len;
2310		}
2311	}
2312	if (B_TYPE(bi->type) == B_OVERFLOW) {
2313		bo = (BOVERFLOW *)(data);
2314		ret = __db_goff(dbp, dbc->thread_info, dbc->txn, start,
2315		     bo->tlen, bo->pgno, &start->data, &start->ulen);
2316	}
2317	else
2318		ret = __db_retcopy(env,
2319		     start, data, len,  &start->data, &start->ulen);
2320
2321err:	if (pg != NULL && pg != cp->csp->page &&
2322	    (t_ret = __memp_fput(dbp->mpf, dbc->thread_info,
2323		 pg, dbc->priority)) != 0 && ret == 0)
2324		ret = t_ret;
2325	return (ret);
2326}
2327
2328/*
2329 * bam_truncate_internal --
2330 *	Find high numbered pages in the internal nodes of a tree and
2331 *	swap them.
2332 */
2333static int
2334__bam_truncate_internal(dbp, ip, txn, c_data)
2335	DB *dbp;
2336	DB_THREAD_INFO *ip;
2337	DB_TXN *txn;
2338	DB_COMPACT *c_data;
2339{
2340	BTREE_CURSOR *cp;
2341	DBC *dbc;
2342	DBT start;
2343	PAGE *pg;
2344	db_pgno_t pgno;
2345	u_int32_t sflag;
2346	int level, local_txn, ret, t_ret;
2347
2348	dbc = NULL;
2349	memset(&start, 0, sizeof(start));
2350
2351	if (IS_DB_AUTO_COMMIT(dbp, txn)) {
2352		local_txn = 1;
2353		txn = NULL;
2354	} else
2355		local_txn = 0;
2356
2357	level = LEAFLEVEL + 1;
2358	sflag = CS_READ | CS_GETRECNO;
2359
2360new_txn:
2361	if (local_txn &&
2362	     (ret = __txn_begin(dbp->env, ip, NULL, &txn, 0)) != 0)
2363		goto err;
2364
2365	if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
2366		goto err;
2367	cp = (BTREE_CURSOR *)dbc->internal;
2368
2369	pgno = PGNO_INVALID;
2370	do {
2371		if ((ret = __bam_csearch(dbc, &start, sflag, level)) != 0) {
2372			/* No more at this level, go up one. */
2373			if (ret == DB_NOTFOUND) {
2374				level++;
2375				if (start.data != NULL)
2376					__os_free(dbp->env, start.data);
2377				memset(&start, 0, sizeof(start));
2378				sflag = CS_READ | CS_GETRECNO;
2379				continue;
2380			}
2381			goto err;
2382		}
2383		c_data->compact_pages_examine++;
2384
2385		pg = cp->csp->page;
2386		pgno = PGNO(pg);
2387
2388		sflag = CS_NEXT | CS_GETRECNO;
2389		/* Grab info about the page and drop the stack. */
2390		if (pgno != cp->root && (ret = __bam_savekey(dbc,
2391		    pgno <= c_data->compact_truncate, &start)) != 0)
2392			goto err;
2393
2394		if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0)
2395			goto err;
2396		if (pgno == cp->root)
2397			break;
2398
2399		if (pgno <= c_data->compact_truncate)
2400			continue;
2401
2402		/* Reget the page with a write lock, and its parent too. */
2403		if ((ret = __bam_csearch(dbc,
2404		    &start, CS_PARENT | CS_GETRECNO, level)) != 0)
2405			goto err;
2406		pg = cp->csp->page;
2407		pgno = PGNO(pg);
2408
2409		if (pgno > c_data->compact_truncate) {
2410			if ((ret = __bam_truncate_page(dbc, &pg, 1)) != 0)
2411				goto err;
2412		}
2413		if ((ret = __bam_stkrel(dbc,
2414		     pgno > c_data->compact_truncate ? 0 : STK_NOLOCK)) != 0)
2415			goto err;
2416
2417		/* We are locking subtrees, so drop the write locks asap. */
2418		if (local_txn && pgno > c_data->compact_truncate)
2419			break;
2420	} while (pgno != cp->root);
2421
2422	if ((ret = __dbc_close(dbc)) != 0)
2423		goto err;
2424	dbc = NULL;
2425	if (local_txn) {
2426		if ((ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0)
2427			goto err;
2428		txn = NULL;
2429	}
2430	if (pgno != ((BTREE *)dbp->bt_internal)->bt_root)
2431		goto new_txn;
2432
2433err:	if (txn != NULL && ret != 0)
2434		sflag = STK_PGONLY;
2435	else
2436		sflag = 0;
2437	if (dbc != NULL && (t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0)
2438		ret = t_ret;
2439	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2440		ret = t_ret;
2441	if (local_txn &&
2442	    txn != NULL && (t_ret = __txn_abort(txn)) != 0 && ret == 0)
2443		ret = t_ret;
2444	if (start.data != NULL)
2445		__os_free(dbp->env, start.data);
2446	return (ret);
2447}
2448
2449static int
2450__bam_setup_freelist(dbp, list, nelems)
2451	DB *dbp;
2452	db_pglist_t *list;
2453	u_int32_t nelems;
2454{
2455	DB_MPOOLFILE *mpf;
2456	db_pgno_t *plist;
2457	int ret;
2458
2459	mpf = dbp->mpf;
2460
2461	if ((ret = __memp_alloc_freelist(mpf, nelems, &plist)) != 0)
2462		return (ret);
2463
2464	while (nelems-- != 0)
2465		*plist++ = list++->pgno;
2466
2467	return (0);
2468}
2469
2470static int
2471__bam_free_freelist(dbp, ip, txn)
2472	DB *dbp;
2473	DB_THREAD_INFO *ip;
2474	DB_TXN *txn;
2475{
2476	DBC *dbc;
2477	DB_LOCK lock;
2478	int auto_commit, ret, t_ret;
2479
2480	LOCK_INIT(lock);
2481	auto_commit = ret = 0;
2482
2483	/*
2484	 * If we are not in a transaction then we need to get
2485	 * a lock on the meta page, otherwise we should already
2486	 * have the lock.
2487	 */
2488
2489	dbc = NULL;
2490	if (IS_DB_AUTO_COMMIT(dbp, txn)) {
2491		/*
2492		 * We must not timeout the lock or we will not free the list.
2493		 * We ignore errors from txn_begin as there is little that
2494		 * the application can do with the error and we want to
2495		 * get the lock and free the list if at all possible.
2496		 */
2497		if (__txn_begin(dbp->env, ip, NULL, &txn, 0) == 0) {
2498			(void)__lock_set_timeout(dbp->env,
2499			    txn->locker, 0, DB_SET_TXN_TIMEOUT);
2500			(void)__lock_set_timeout(dbp->env,
2501			    txn->locker, 0, DB_SET_LOCK_TIMEOUT);
2502			auto_commit = 1;
2503		}
2504		/* Get a cursor so we can call __db_lget. */
2505		if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0)
2506			return (ret);
2507
2508		if ((ret = __db_lget(dbc,
2509		     0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &lock)) != 0)
2510			goto err;
2511	}
2512
2513	ret = __memp_free_freelist(dbp->mpf);
2514
2515err:	if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
2516		ret = t_ret;
2517
2518	if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0)
2519		ret = t_ret;
2520
2521	if (auto_commit && __txn_abort(txn) != 0 && ret == 0)
2522		ret = t_ret;
2523
2524	return (ret);
2525}
2526#endif
2527