1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994, 1995, 1996
8 *	Keith Bostic.  All rights reserved.
9 */
10/*
11 * Copyright (c) 1990, 1993, 1994, 1995
12 *	The Regents of the University of California.  All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Mike Olson.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 *    notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 *    notice, this list of conditions and the following disclaimer in the
24 *    documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 *    may be used to endorse or promote products derived from this software
27 *    without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id: db_overflow.c,v 12.26 2008/03/12 20:32:32 mbrey Exp $
42 */
43
44#include "db_config.h"
45
46#include "db_int.h"
47#include "dbinc/db_page.h"
48#include "dbinc/db_am.h"
49#include "dbinc/mp.h"
50
51/*
52 * Big key/data code.
53 *
54 * Big key and data entries are stored on linked lists of pages.  The initial
55 * reference is a structure with the total length of the item and the page
56 * number where it begins.  Each entry in the linked list contains a pointer
57 * to the next page of data, and so on.
58 */
59
60/*
61 * __db_goff --
62 *	Get an offpage item.
63 *
64 * PUBLIC: int __db_goff __P((DB *, DB_THREAD_INFO *, DB_TXN *, DBT *,
65 * PUBLIC:     u_int32_t, db_pgno_t, void **, u_int32_t *));
66 */
67int
68__db_goff(dbp, ip, txn, dbt, tlen, pgno, bpp, bpsz)
69	DB *dbp;
70	DB_THREAD_INFO *ip;
71	DB_TXN *txn;
72	DBT *dbt;
73	u_int32_t tlen;
74	db_pgno_t pgno;
75	void **bpp;
76	u_int32_t *bpsz;
77{
78	DB_MPOOLFILE *mpf;
79	ENV *env;
80	PAGE *h;
81	db_indx_t bytes;
82	u_int32_t curoff, needed, start;
83	u_int8_t *p, *src;
84	int ret;
85
86	env = dbp->env;
87	mpf = dbp->mpf;
88
89	/*
90	 * Check if the buffer is big enough; if it is not and we are
91	 * allowed to malloc space, then we'll malloc it.  If we are
92	 * not (DB_DBT_USERMEM), then we'll set the dbt and return
93	 * appropriately.
94	 */
95	if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
96		start = dbt->doff;
97		if (start > tlen)
98			needed = 0;
99		else if (dbt->dlen > tlen - start)
100			needed = tlen - start;
101		else
102			needed = dbt->dlen;
103	} else {
104		start = 0;
105		needed = tlen;
106	}
107
108	if (F_ISSET(dbt, DB_DBT_USERCOPY))
109		goto skip_alloc;
110
111	/* Allocate any necessary memory. */
112	if (F_ISSET(dbt, DB_DBT_USERMEM)) {
113		if (needed > dbt->ulen) {
114			dbt->size = needed;
115			return (DB_BUFFER_SMALL);
116		}
117	} else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
118		if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
119			return (ret);
120	} else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
121		if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
122			return (ret);
123	} else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
124		if ((ret = __os_realloc(env, needed, bpp)) != 0)
125			return (ret);
126		*bpsz = needed;
127		dbt->data = *bpp;
128	} else if (bpp != NULL)
129		dbt->data = *bpp;
130	else {
131		DB_ASSERT(env,
132		    F_ISSET(dbt,
133		    DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
134		    bpsz != NULL || bpp != NULL);
135		return (DB_BUFFER_SMALL);
136	}
137
138skip_alloc:
139	/*
140	 * Step through the linked list of pages, copying the data on each
141	 * one into the buffer.  Never copy more than the total data length.
142	 */
143	dbt->size = needed;
144	for (curoff = 0, p = dbt->data; pgno != PGNO_INVALID && needed > 0;) {
145		if ((ret = __memp_fget(mpf, &pgno, ip, txn, 0, &h)) != 0)
146			return (ret);
147		DB_ASSERT(env, TYPE(h) == P_OVERFLOW);
148
149		/* Check if we need any bytes from this page. */
150		if (curoff + OV_LEN(h) >= start) {
151			bytes = OV_LEN(h);
152			src = (u_int8_t *)h + P_OVERHEAD(dbp);
153			if (start > curoff) {
154				src += start - curoff;
155				bytes -= start - curoff;
156			}
157			if (bytes > needed)
158				bytes = needed;
159			if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
160				/*
161				 * The offset into the DBT is the total size
162				 * less the amount of data still needed.  Care
163				 * needs to be taken if doing a partial copy
164				 * beginning at an offset other than 0.
165				 */
166				if ((ret = env->dbt_usercopy(
167				    dbt, dbt->size - needed,
168				    src, bytes, DB_USERCOPY_SETDATA)) != 0) {
169					(void)__memp_fput(mpf, ip,
170					     h, dbp->priority);
171					return (ret);
172				}
173			} else
174				memcpy(p, src, bytes);
175			p += bytes;
176			needed -= bytes;
177		}
178		curoff += OV_LEN(h);
179		pgno = h->next_pgno;
180		(void)__memp_fput(mpf, ip, h, dbp->priority);
181	}
182	return (0);
183}
184
185/*
186 * __db_poff --
187 *	Put an offpage item.
188 *
189 * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *));
190 */
191int
192__db_poff(dbc, dbt, pgnop)
193	DBC *dbc;
194	const DBT *dbt;
195	db_pgno_t *pgnop;
196{
197	DB *dbp;
198	DBT tmp_dbt;
199	DB_LSN new_lsn, null_lsn;
200	DB_MPOOLFILE *mpf;
201	PAGE *pagep, *lastp;
202	db_indx_t pagespace;
203	u_int32_t sz;
204	u_int8_t *p;
205	int ret, t_ret;
206
207	/*
208	 * Allocate pages and copy the key/data item into them.  Calculate the
209	 * number of bytes we get for pages we fill completely with a single
210	 * item.
211	 */
212	dbp = dbc->dbp;
213	mpf = dbp->mpf;
214	pagespace = P_MAXSPACE(dbp, dbp->pgsize);
215
216	ret = 0;
217	lastp = NULL;
218	for (p = dbt->data,
219	    sz = dbt->size; sz > 0; p += pagespace, sz -= pagespace) {
220		/*
221		 * Reduce pagespace so we terminate the loop correctly and
222		 * don't copy too much data.
223		 */
224		if (sz < pagespace)
225			pagespace = sz;
226
227		/*
228		 * Allocate and initialize a new page and copy all or part of
229		 * the item onto the page.  If sz is less than pagespace, we
230		 * have a partial record.
231		 */
232		if ((ret = __db_new(dbc, P_OVERFLOW, &pagep)) != 0)
233			break;
234		if (DBC_LOGGING(dbc)) {
235			tmp_dbt.data = p;
236			tmp_dbt.size = pagespace;
237			ZERO_LSN(null_lsn);
238			if ((ret = __db_big_log(dbp, dbc->txn,
239			    &new_lsn, 0, DB_ADD_BIG, PGNO(pagep),
240			    lastp ? PGNO(lastp) : PGNO_INVALID,
241			    PGNO_INVALID, &tmp_dbt, &LSN(pagep),
242			    lastp == NULL ? &null_lsn : &LSN(lastp),
243			    &null_lsn)) != 0) {
244				if (lastp != NULL)
245					(void)__memp_fput(mpf, dbc->thread_info,
246					     lastp, dbc->priority);
247				lastp = pagep;
248				break;
249			}
250		} else
251			LSN_NOT_LOGGED(new_lsn);
252
253		/* Move LSN onto page. */
254		if (lastp != NULL)
255			LSN(lastp) = new_lsn;
256		LSN(pagep) = new_lsn;
257
258		OV_LEN(pagep) = pagespace;
259		OV_REF(pagep) = 1;
260		memcpy((u_int8_t *)pagep + P_OVERHEAD(dbp), p, pagespace);
261
262		/*
263		 * If this is the first entry, update the user's info.
264		 * Otherwise, update the entry on the last page filled
265		 * in and release that page.
266		 */
267		if (lastp == NULL)
268			*pgnop = PGNO(pagep);
269		else {
270			lastp->next_pgno = PGNO(pagep);
271			pagep->prev_pgno = PGNO(lastp);
272			(void)__memp_fput(mpf,
273			    dbc->thread_info, lastp, dbc->priority);
274		}
275		lastp = pagep;
276	}
277	if (lastp != NULL && (t_ret = __memp_fput(mpf,
278	     dbc->thread_info, lastp, dbc->priority)) != 0 && ret == 0)
279		ret = t_ret;
280	return (ret);
281}
282
283/*
284 * __db_ovref --
285 *	Decrement the reference count on an overflow page.
286 *
287 * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t));
288 */
289int
290__db_ovref(dbc, pgno)
291	DBC *dbc;
292	db_pgno_t pgno;
293{
294	DB *dbp;
295	DB_MPOOLFILE *mpf;
296	PAGE *h;
297	int ret;
298
299	dbp = dbc->dbp;
300	mpf = dbp->mpf;
301
302	if ((ret = __memp_fget(mpf, &pgno,
303	     dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &h)) != 0)
304		return (ret);
305
306	if (DBC_LOGGING(dbc)) {
307		if ((ret = __db_ovref_log(dbp,
308		    dbc->txn, &LSN(h), 0, h->pgno, -1, &LSN(h))) != 0) {
309			(void)__memp_fput(mpf,
310			     dbc->thread_info, h, dbc->priority);
311			return (ret);
312		}
313	} else
314		LSN_NOT_LOGGED(LSN(h));
315
316	/*
317	 * In BDB releases before 4.5, the overflow reference counts were
318	 * incremented when an overflow item was split onto an internal
319	 * page.  There was a lock race in that code, and rather than fix
320	 * the race, we changed BDB to copy overflow items when splitting
321	 * them onto internal pages.  The code to decrement reference
322	 * counts remains so databases already in the field continue to
323	 * work.
324	 */
325	--OV_REF(h);
326
327	return (__memp_fput(mpf, dbc->thread_info, h, dbc->priority));
328}
329
330/*
331 * __db_doff --
332 *	Delete an offpage chain of overflow pages.
333 *
334 * PUBLIC: int __db_doff __P((DBC *, db_pgno_t));
335 */
336int
337__db_doff(dbc, pgno)
338	DBC *dbc;
339	db_pgno_t pgno;
340{
341	DB *dbp;
342	DBT tmp_dbt;
343	DB_LSN null_lsn;
344	DB_MPOOLFILE *mpf;
345	PAGE *pagep;
346	int ret;
347
348	dbp = dbc->dbp;
349	mpf = dbp->mpf;
350
351	do {
352		if ((ret = __memp_fget(mpf, &pgno,
353		     dbc->thread_info, dbc->txn, 0, &pagep)) != 0)
354			return (ret);
355
356		DB_ASSERT(dbp->env, TYPE(pagep) == P_OVERFLOW);
357		/*
358		 * If it's referenced by more than one key/data item,
359		 * decrement the reference count and return.
360		 */
361		if (OV_REF(pagep) > 1) {
362			(void)__memp_fput(mpf,
363			    dbc->thread_info, pagep, dbc->priority);
364			return (__db_ovref(dbc, pgno));
365		}
366
367		if ((ret = __memp_dirty(mpf, &pagep,
368		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
369			(void)__memp_fput(mpf,
370			    dbc->thread_info, pagep, dbc->priority);
371			return (ret);
372		}
373
374		if (DBC_LOGGING(dbc)) {
375			tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD(dbp);
376			tmp_dbt.size = OV_LEN(pagep);
377			ZERO_LSN(null_lsn);
378			if ((ret = __db_big_log(dbp, dbc->txn,
379			    &LSN(pagep), 0, DB_REM_BIG,
380			    PGNO(pagep), PREV_PGNO(pagep),
381			    NEXT_PGNO(pagep), &tmp_dbt,
382			    &LSN(pagep), &null_lsn, &null_lsn)) != 0) {
383				(void)__memp_fput(mpf,
384				    dbc->thread_info, pagep, dbc->priority);
385				return (ret);
386			}
387		} else
388			LSN_NOT_LOGGED(LSN(pagep));
389		pgno = pagep->next_pgno;
390		OV_LEN(pagep) = 0;
391		if ((ret = __db_free(dbc, pagep)) != 0)
392			return (ret);
393	} while (pgno != PGNO_INVALID);
394
395	return (0);
396}
397
398/*
399 * __db_moff --
400 *	Match on overflow pages.
401 *
402 * Given a starting page number and a key, return <0, 0, >0 to indicate if the
403 * key on the page is less than, equal to or greater than the key specified.
404 * We optimize this by doing chunk at a time comparison unless the user has
405 * specified a comparison function.  In this case, we need to materialize
406 * the entire object and call their comparison routine.
407 *
408 * __db_moff and __db_coff are generic functions useful in searching and
409 * ordering off page items. __db_moff matches an overflow DBT with an offpage
410 * item. __db_coff compares two offpage items for lexicographic sort order.
411 *
412 * PUBLIC: int __db_moff __P((DB *,
413 * PUBLIC:     DB_THREAD_INFO *, DB_TXN *, const DBT *, db_pgno_t,
414 * PUBLIC:     u_int32_t, int (*)(DB *, const DBT *, const DBT *), int *));
415 */
416int
417__db_moff(dbp, ip, txn, dbt, pgno, tlen, cmpfunc, cmpp)
418	DB *dbp;
419	DB_THREAD_INFO *ip;
420	DB_TXN *txn;
421	const DBT *dbt;
422	db_pgno_t pgno;
423	u_int32_t tlen;
424	int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
425{
426	DBT local_dbt;
427	DB_MPOOLFILE *mpf;
428	PAGE *pagep;
429	void *buf;
430	u_int32_t bufsize, cmp_bytes, key_left;
431	u_int8_t *p1, *p2;
432	int ret;
433
434	mpf = dbp->mpf;
435
436	/*
437	 * If there is a user-specified comparison function, build a
438	 * contiguous copy of the key, and call it.
439	 */
440	if (cmpfunc != NULL) {
441		memset(&local_dbt, 0, sizeof(local_dbt));
442		buf = NULL;
443		bufsize = 0;
444
445		if ((ret = __db_goff(dbp, ip, txn,
446		    &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
447			return (ret);
448		/* Pass the key as the first argument */
449		*cmpp = cmpfunc(dbp, dbt, &local_dbt);
450		__os_free(dbp->env, buf);
451		return (0);
452	}
453
454	/* While there are both keys to compare. */
455	for (*cmpp = 0, p1 = dbt->data,
456	    key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) {
457		if ((ret = __memp_fget(mpf, &pgno, ip, txn, 0, &pagep)) != 0)
458			return (ret);
459
460		cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left;
461		tlen -= cmp_bytes;
462		key_left -= cmp_bytes;
463		for (p2 = (u_int8_t *)pagep + P_OVERHEAD(dbp);
464		    cmp_bytes-- > 0; ++p1, ++p2)
465			if (*p1 != *p2) {
466				*cmpp = (long)*p1 - (long)*p2;
467				break;
468			}
469		pgno = NEXT_PGNO(pagep);
470		if ((ret = __memp_fput(mpf, ip, pagep, dbp->priority)) != 0)
471			return (ret);
472		if (*cmpp != 0)
473			return (0);
474	}
475	if (key_left > 0)		/* DBT is longer than the page key. */
476		*cmpp = 1;
477	else if (tlen > 0)		/* DBT is shorter than the page key. */
478		*cmpp = -1;
479	else
480		*cmpp = 0;
481
482	return (0);
483}
484
485/*
486 * __db_coff --
487 *	Match two offpage dbts.
488 *
489 * The DBTs must both refer to offpage items.
490 * The match happens a chunk (page) at a time unless a user defined comparison
491 * function exists. It is not possible to optimize this comparison away when
492 * a lexicographic sort order is required on mismatch.
493 *
494 * NOTE: For now this function only works for H_OFFPAGE type items. It would
495 * be simple to extend it for use with B_OVERFLOW type items. It would only
496 * require extracting the total length, and page number, dependent on the
497 * DBT type.
498 *
499 * PUBLIC: int __db_coff __P((DB *, DB_THREAD_INFO *, DB_TXN *, const DBT *,
500 * PUBLIC:     const DBT *, int (*)(DB *, const DBT *, const DBT *), int *));
501 */
502int
503__db_coff(dbp, ip, txn, dbt, match, cmpfunc, cmpp)
504	DB *dbp;
505	DB_THREAD_INFO *ip;
506	DB_TXN *txn;
507	const DBT *dbt, *match;
508	int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
509{
510	DBT local_key, local_match;
511	DB_MPOOLFILE *mpf;
512	PAGE *dbt_pagep, *match_pagep;
513	db_pgno_t dbt_pgno, match_pgno;
514	u_int32_t cmp_bytes, dbt_bufsz, dbt_len, match_bufsz;
515	u_int32_t match_len, max_data, page_sz;
516	u_int8_t *p1, *p2;
517	int ret;
518	void *dbt_buf, *match_buf;
519
520	mpf = dbp->mpf;
521	page_sz = dbp->pgsize;
522	*cmpp = 0;
523	dbt_buf = match_buf = NULL;
524
525	DB_ASSERT(dbp->env, HPAGE_PTYPE(dbt->data) == H_OFFPAGE);
526	DB_ASSERT(dbp->env, HPAGE_PTYPE(match->data) == H_OFFPAGE);
527
528	/* Extract potentially unaligned length and pgno fields from DBTs */
529	memcpy(&dbt_len, HOFFPAGE_TLEN(dbt->data), sizeof(u_int32_t));
530	memcpy(&dbt_pgno, HOFFPAGE_PGNO(dbt->data), sizeof(db_pgno_t));
531	memcpy(&match_len, HOFFPAGE_TLEN(match->data), sizeof(u_int32_t));
532	memcpy(&match_pgno, HOFFPAGE_PGNO(match->data), sizeof(db_pgno_t));
533	max_data = (dbt_len < match_len ? dbt_len : match_len);
534
535	/*
536	 * If there is a custom comparator, fully resolve both DBTs.
537	 * Then call the users comparator.
538	 */
539	if (cmpfunc != NULL) {
540		memset(&local_key, 0, sizeof(local_key));
541		memset(&local_match, 0, sizeof(local_match));
542		dbt_buf = match_buf = NULL;
543		dbt_bufsz = match_bufsz = 0;
544
545		if ((ret = __db_goff(dbp, ip, txn, &local_key, dbt_len,
546		    dbt_pgno, &dbt_buf, &dbt_bufsz)) != 0)
547			goto err1;
548		if ((ret = __db_goff(dbp, ip, txn, &local_match, match_len,
549		    match_pgno, &match_buf, &match_bufsz)) != 0)
550			goto err1;
551		/* The key needs to be the first argument for sort order */
552		*cmpp = cmpfunc(dbp, &local_key, &local_match);
553
554err1:		if (dbt_buf != NULL)
555			__os_free(dbp->env, dbt_buf);
556		if (match_buf != NULL)
557			__os_free(dbp->env, match_buf);
558		return (ret);
559	}
560
561	/* Match the offpage DBTs a page at a time. */
562	while (dbt_pgno != PGNO_INVALID && match_pgno != PGNO_INVALID) {
563		if ((ret =
564		    __memp_fget(mpf, &dbt_pgno, ip, txn, 0, &dbt_pagep)) != 0)
565			return (ret);
566		if ((ret =
567		    __memp_fget(mpf, &match_pgno,
568			ip, txn, 0, &match_pagep)) != 0) {
569			(void)__memp_fput(
570			    mpf, ip, dbt_pagep, DB_PRIORITY_UNCHANGED);
571			return (ret);
572		}
573		cmp_bytes = page_sz < max_data ? page_sz : max_data;
574		for (p1 = (u_int8_t *)dbt_pagep + P_OVERHEAD(dbp),
575		    p2 = (u_int8_t *)match_pagep + P_OVERHEAD(dbp);
576		    cmp_bytes-- > 0; ++p1, ++p2)
577				if (*p1 != *p2) {
578					*cmpp = (long)*p1 - (long)*p2;
579					break;
580				}
581
582		dbt_pgno = NEXT_PGNO(dbt_pagep);
583		match_pgno = NEXT_PGNO(match_pagep);
584		max_data -= page_sz;
585		if ((ret = __memp_fput(mpf,
586		     ip, dbt_pagep, DB_PRIORITY_UNCHANGED)) != 0) {
587			(void)__memp_fput(mpf,
588			    ip, match_pagep, DB_PRIORITY_UNCHANGED);
589			return (ret);
590		}
591		if ((ret = __memp_fput(mpf,
592		    ip, match_pagep, DB_PRIORITY_UNCHANGED)) != 0)
593			return (ret);
594		if (*cmpp != 0)
595			return (0);
596	}
597
598	/* If a lexicographic mismatch was found, then the result has already
599	 * been returned. If the DBTs matched, consider the lengths of the
600	 * items, and return appropriately.
601	 */
602	if (dbt_len > match_len) /* DBT is longer than the match key. */
603		*cmpp = 1;
604	else if (match_len > dbt_len) /* DBT is shorter than the match key. */
605		*cmpp = -1;
606	else
607		*cmpp = 0;
608
609	return (0);
610
611}
612