1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 *
6 * $Id: db_upg_opd.c,v 12.11 2008/01/08 20:58:10 bostic Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14
15static int __db_build_bi __P((DB *, DB_FH *, PAGE *, PAGE *, u_int32_t, int *));
16static int __db_build_ri __P((DB *, DB_FH *, PAGE *, PAGE *, u_int32_t, int *));
17static int __db_up_ovref __P((DB *, DB_FH *, db_pgno_t));
18
19#define	GET_PAGE(dbp, fhp, pgno, page) {				\
20	if ((ret = __os_seek(						\
21	    dbp->env, fhp, pgno, (dbp)->pgsize, 0)) != 0)		\
22		goto err;						\
23	if ((ret = __os_read(dbp->env,				\
24	    fhp, page, (dbp)->pgsize, &n)) != 0)			\
25		goto err;						\
26}
27#define	PUT_PAGE(dbp, fhp, pgno, page) {				\
28	if ((ret = __os_seek(						\
29	    dbp->env, fhp, pgno, (dbp)->pgsize, 0)) != 0)		\
30		goto err;						\
31	if ((ret = __os_write(dbp->env,				\
32	    fhp, page, (dbp)->pgsize, &n)) != 0)			\
33		goto err;						\
34}
35
36/*
37 * __db_31_offdup --
38 *	Convert 3.0 off-page duplicates to 3.1 off-page duplicates.
39 *
40 * PUBLIC: int __db_31_offdup __P((DB *, char *, DB_FH *, int, db_pgno_t *));
41 */
42int
43__db_31_offdup(dbp, real_name, fhp, sorted, pgnop)
44	DB *dbp;
45	char *real_name;
46	DB_FH *fhp;
47	int sorted;
48	db_pgno_t *pgnop;
49{
50	PAGE *ipage, *page;
51	db_indx_t indx;
52	db_pgno_t cur_cnt, i, next_cnt, pgno, *pgno_cur, pgno_last;
53	db_pgno_t *pgno_next, pgno_max, *tmp;
54	db_recno_t nrecs;
55	size_t n;
56	int level, nomem, ret;
57
58	ipage = page = NULL;
59	pgno_cur = pgno_next = NULL;
60
61	/* Allocate room to hold a page. */
62	if ((ret = __os_malloc(dbp->env, dbp->pgsize, &page)) != 0)
63		goto err;
64
65	/*
66	 * Walk the chain of 3.0 off-page duplicates.  Each one is converted
67	 * in place to a 3.1 off-page duplicate page.  If the duplicates are
68	 * sorted, they are converted to a Btree leaf page, otherwise to a
69	 * Recno leaf page.
70	 */
71	for (nrecs = 0, cur_cnt = pgno_max = 0,
72	    pgno = *pgnop; pgno != PGNO_INVALID;) {
73		if (pgno_max == cur_cnt) {
74			pgno_max += 20;
75			if ((ret = __os_realloc(dbp->env, pgno_max *
76			    sizeof(db_pgno_t), &pgno_cur)) != 0)
77				goto err;
78		}
79		pgno_cur[cur_cnt++] = pgno;
80
81		GET_PAGE(dbp, fhp, pgno, page);
82		nrecs += NUM_ENT(page);
83		LEVEL(page) = LEAFLEVEL;
84		TYPE(page) = sorted ? P_LDUP : P_LRECNO;
85		/*
86		 * !!!
87		 * DB didn't zero the LSNs on off-page duplicates pages.
88		 */
89		ZERO_LSN(LSN(page));
90		PUT_PAGE(dbp, fhp, pgno, page);
91
92		pgno = NEXT_PGNO(page);
93	}
94
95	/* If we only have a single page, it's easy. */
96	if (cur_cnt <= 1)
97		goto done;
98
99	/*
100	 * pgno_cur is the list of pages we just converted.  We're
101	 * going to walk that list, but we'll need to create a new
102	 * list while we do so.
103	 */
104	if ((ret = __os_malloc(dbp->env,
105	    cur_cnt * sizeof(db_pgno_t), &pgno_next)) != 0)
106		goto err;
107
108	/* Figure out where we can start allocating new pages. */
109	if ((ret = __db_lastpgno(dbp, real_name, fhp, &pgno_last)) != 0)
110		goto err;
111
112	/* Allocate room for an internal page. */
113	if ((ret = __os_malloc(dbp->env, dbp->pgsize, &ipage)) != 0)
114		goto err;
115	PGNO(ipage) = PGNO_INVALID;
116
117	/*
118	 * Repeatedly walk the list of pages, building internal pages, until
119	 * there's only one page at a level.
120	 */
121	for (level = LEAFLEVEL + 1; cur_cnt > 1; ++level) {
122		for (indx = 0, i = next_cnt = 0; i < cur_cnt;) {
123			if (indx == 0) {
124				P_INIT(ipage, dbp->pgsize, pgno_last,
125				    PGNO_INVALID, PGNO_INVALID,
126				    level, sorted ? P_IBTREE : P_IRECNO);
127				ZERO_LSN(LSN(ipage));
128
129				pgno_next[next_cnt++] = pgno_last++;
130			}
131
132			GET_PAGE(dbp, fhp, pgno_cur[i], page);
133
134			/*
135			 * If the duplicates are sorted, put the first item on
136			 * the lower-level page onto a Btree internal page. If
137			 * the duplicates are not sorted, create an internal
138			 * Recno structure on the page.  If either case doesn't
139			 * fit, push out the current page and start a new one.
140			 */
141			nomem = 0;
142			if (sorted) {
143				if ((ret = __db_build_bi(
144				    dbp, fhp, ipage, page, indx, &nomem)) != 0)
145					goto err;
146			} else
147				if ((ret = __db_build_ri(
148				    dbp, fhp, ipage, page, indx, &nomem)) != 0)
149					goto err;
150			if (nomem) {
151				indx = 0;
152				PUT_PAGE(dbp, fhp, PGNO(ipage), ipage);
153			} else {
154				++indx;
155				++NUM_ENT(ipage);
156				++i;
157			}
158		}
159
160		/*
161		 * Push out the last internal page.  Set the top-level record
162		 * count if we've reached the top.
163		 */
164		if (next_cnt == 1)
165			RE_NREC_SET(ipage, nrecs);
166		PUT_PAGE(dbp, fhp, PGNO(ipage), ipage);
167
168		/* Swap the current and next page number arrays. */
169		cur_cnt = next_cnt;
170		tmp = pgno_cur;
171		pgno_cur = pgno_next;
172		pgno_next = tmp;
173	}
174
175done:	*pgnop = pgno_cur[0];
176
177err:	if (pgno_cur != NULL)
178		__os_free(dbp->env, pgno_cur);
179	if (pgno_next != NULL)
180		__os_free(dbp->env, pgno_next);
181	if (ipage != NULL)
182		__os_free(dbp->env, ipage);
183	if (page != NULL)
184		__os_free(dbp->env, page);
185
186	return (ret);
187}
188
189/*
190 * __db_build_bi --
191 *	Build a BINTERNAL entry for a parent page.
192 */
193static int
194__db_build_bi(dbp, fhp, ipage, page, indx, nomemp)
195	DB *dbp;
196	DB_FH *fhp;
197	PAGE *ipage, *page;
198	u_int32_t indx;
199	int *nomemp;
200{
201	BINTERNAL bi, *child_bi;
202	BKEYDATA *child_bk;
203	u_int8_t *p;
204	int ret;
205	db_indx_t *inp;
206
207	inp = P_INP(dbp, ipage);
208	switch (TYPE(page)) {
209	case P_IBTREE:
210		child_bi = GET_BINTERNAL(dbp, page, 0);
211		if (P_FREESPACE(dbp, ipage) < BINTERNAL_PSIZE(child_bi->len)) {
212			*nomemp = 1;
213			return (0);
214		}
215		inp[indx] =
216		    HOFFSET(ipage) -= BINTERNAL_SIZE(child_bi->len);
217		p = P_ENTRY(dbp, ipage, indx);
218
219		bi.len = child_bi->len;
220		B_TSET(bi.type, child_bi->type);
221		bi.pgno = PGNO(page);
222		bi.nrecs = __bam_total(dbp, page);
223		memcpy(p, &bi, SSZA(BINTERNAL, data));
224		p += SSZA(BINTERNAL, data);
225		memcpy(p, child_bi->data, child_bi->len);
226
227		/* Increment the overflow ref count. */
228		if (B_TYPE(child_bi->type) == B_OVERFLOW)
229			if ((ret = __db_up_ovref(dbp, fhp,
230			    ((BOVERFLOW *)(child_bi->data))->pgno)) != 0)
231				return (ret);
232		break;
233	case P_LDUP:
234		child_bk = GET_BKEYDATA(dbp, page, 0);
235		switch (B_TYPE(child_bk->type)) {
236		case B_KEYDATA:
237			if (P_FREESPACE(dbp, ipage) <
238			    BINTERNAL_PSIZE(child_bk->len)) {
239				*nomemp = 1;
240				return (0);
241			}
242			inp[indx] =
243			    HOFFSET(ipage) -= BINTERNAL_SIZE(child_bk->len);
244			p = P_ENTRY(dbp, ipage, indx);
245
246			bi.len = child_bk->len;
247			B_TSET(bi.type, child_bk->type);
248			bi.pgno = PGNO(page);
249			bi.nrecs = __bam_total(dbp, page);
250			memcpy(p, &bi, SSZA(BINTERNAL, data));
251			p += SSZA(BINTERNAL, data);
252			memcpy(p, child_bk->data, child_bk->len);
253			break;
254		case B_OVERFLOW:
255			if (P_FREESPACE(dbp, ipage) <
256			    BINTERNAL_PSIZE(BOVERFLOW_SIZE)) {
257				*nomemp = 1;
258				return (0);
259			}
260			inp[indx] =
261			    HOFFSET(ipage) -= BINTERNAL_SIZE(BOVERFLOW_SIZE);
262			p = P_ENTRY(dbp, ipage, indx);
263
264			bi.len = BOVERFLOW_SIZE;
265			B_TSET(bi.type, child_bk->type);
266			bi.pgno = PGNO(page);
267			bi.nrecs = __bam_total(dbp, page);
268			memcpy(p, &bi, SSZA(BINTERNAL, data));
269			p += SSZA(BINTERNAL, data);
270			memcpy(p, child_bk, BOVERFLOW_SIZE);
271
272			/* Increment the overflow ref count. */
273			if ((ret = __db_up_ovref(dbp, fhp,
274			    ((BOVERFLOW *)child_bk)->pgno)) != 0)
275				return (ret);
276			break;
277		default:
278			return (__db_pgfmt(dbp->env, PGNO(page)));
279		}
280		break;
281	default:
282		return (__db_pgfmt(dbp->env, PGNO(page)));
283	}
284
285	return (0);
286}
287
288/*
289 * __db_build_ri --
290 *	Build a RINTERNAL entry for an internal parent page.
291 */
292static int
293__db_build_ri(dbp, fhp, ipage, page, indx, nomemp)
294	DB *dbp;
295	DB_FH *fhp;
296	PAGE *ipage, *page;
297	u_int32_t indx;
298	int *nomemp;
299{
300	RINTERNAL ri;
301	db_indx_t *inp;
302
303	COMPQUIET(fhp, NULL);
304	inp = P_INP(dbp, ipage);
305	if (P_FREESPACE(dbp, ipage) < RINTERNAL_PSIZE) {
306		*nomemp = 1;
307		return (0);
308	}
309
310	ri.pgno = PGNO(page);
311	ri.nrecs = __bam_total(dbp, page);
312	inp[indx] = HOFFSET(ipage) -= RINTERNAL_SIZE;
313	memcpy(P_ENTRY(dbp, ipage, indx), &ri, RINTERNAL_SIZE);
314
315	return (0);
316}
317
318/*
319 * __db_up_ovref --
320 *	Increment/decrement the reference count on an overflow page.
321 */
322static int
323__db_up_ovref(dbp, fhp, pgno)
324	DB *dbp;
325	DB_FH *fhp;
326	db_pgno_t pgno;
327{
328	PAGE *page;
329	size_t n;
330	int ret;
331
332	/* Allocate room to hold a page. */
333	if ((ret = __os_malloc(dbp->env, dbp->pgsize, &page)) != 0)
334		return (ret);
335
336	GET_PAGE(dbp, fhp, pgno, page);
337	++OV_REF(page);
338	PUT_PAGE(dbp, fhp, pgno, page);
339
340err:	__os_free(dbp->env, page);
341
342	return (ret);
343}
344