1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994, 1995, 1996
8 *	Keith Bostic.  All rights reserved.
9 */
10/*
11 * Copyright (c) 1990, 1993, 1994, 1995
12 *	The Regents of the University of California.  All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Mike Olson.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 *    notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 *    notice, this list of conditions and the following disclaimer in the
24 *    documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 *    may be used to endorse or promote products derived from this software
27 *    without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id$
42 */
43
44#include "db_config.h"
45
46#include "db_int.h"
47#include "dbinc/db_page.h"
48#include "dbinc/db_am.h"
49#include "dbinc/mp.h"
50
51/*
52 * Big key/data code.
53 *
54 * Big key and data entries are stored on linked lists of pages.  The initial
55 * reference is a structure with the total length of the item and the page
56 * number where it begins.  Each entry in the linked list contains a pointer
57 * to the next page of data, and so on.
58 */
59
60/*
61 * __db_goff --
62 *	Get an offpage item.
63 *
64 * PUBLIC: int __db_goff __P((DBC *,
65 * PUBLIC:     DBT *, u_int32_t, db_pgno_t, void **, u_int32_t *));
66 */
67int
68__db_goff(dbc, dbt, tlen, pgno, bpp, bpsz)
69	DBC *dbc;
70	DBT *dbt;
71	u_int32_t tlen;
72	db_pgno_t pgno;
73	void **bpp;
74	u_int32_t *bpsz;
75{
76	DB *dbp;
77	DB_MPOOLFILE *mpf;
78	DB_TXN *txn;
79	DBC_INTERNAL *cp;
80	ENV *env;
81	PAGE *h;
82	DB_THREAD_INFO *ip;
83	db_indx_t bytes;
84	u_int32_t curoff, needed, start;
85	u_int8_t *p, *src;
86	int ret;
87
88	dbp = dbc->dbp;
89	cp = dbc->internal;
90	env = dbp->env;
91	ip = dbc->thread_info;
92	mpf = dbp->mpf;
93	txn = dbc->txn;
94
95	/*
96	 * Check if the buffer is big enough; if it is not and we are
97	 * allowed to malloc space, then we'll malloc it.  If we are
98	 * not (DB_DBT_USERMEM), then we'll set the dbt and return
99	 * appropriately.
100	 */
101	if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
102		start = dbt->doff;
103		if (start > tlen)
104			needed = 0;
105		else if (dbt->dlen > tlen - start)
106			needed = tlen - start;
107		else
108			needed = dbt->dlen;
109	} else {
110		start = 0;
111		needed = tlen;
112	}
113
114	/*
115	 * If the caller has not requested any data, return success. This
116	 * "early-out" also avoids setting up the streaming optimization when
117	 * no page would be retrieved. If it were removed, the streaming code
118	 * should only initialize when needed is not 0.
119	 */
120	if (needed == 0) {
121		dbt->size = 0;
122		return (0);
123	}
124
125	if (F_ISSET(dbt, DB_DBT_USERCOPY))
126		goto skip_alloc;
127
128	/* Allocate any necessary memory. */
129	if (F_ISSET(dbt, DB_DBT_USERMEM)) {
130		if (needed > dbt->ulen) {
131			dbt->size = needed;
132			return (DB_BUFFER_SMALL);
133		}
134	} else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
135		if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0)
136			return (ret);
137	} else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
138		if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0)
139			return (ret);
140	} else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) {
141		if ((ret = __os_realloc(env, needed, bpp)) != 0)
142			return (ret);
143		*bpsz = needed;
144		dbt->data = *bpp;
145	} else if (bpp != NULL)
146		dbt->data = *bpp;
147	else {
148		DB_ASSERT(env,
149		    F_ISSET(dbt,
150		    DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) ||
151		    bpsz != NULL || bpp != NULL);
152		return (DB_BUFFER_SMALL);
153	}
154
155skip_alloc:
156	/* Set up a start page in the overflow chain if streaming. */
157	if (cp->stream_start_pgno != PGNO_INVALID &&
158	    pgno == cp->stream_start_pgno && start >= cp->stream_off &&
159	    start < cp->stream_off + P_MAXSPACE(dbp, dbp->pgsize)) {
160		pgno = cp->stream_curr_pgno;
161		curoff = cp->stream_off;
162	} else {
163		cp->stream_start_pgno = cp->stream_curr_pgno = pgno;
164		cp->stream_off = curoff = 0;
165	}
166
167	/*
168	 * Step through the linked list of pages, copying the data on each
169	 * one into the buffer.  Never copy more than the total data length.
170	 */
171	dbt->size = needed;
172	for (p = dbt->data; pgno != PGNO_INVALID && needed > 0;) {
173		if ((ret = __memp_fget(mpf,
174		    &pgno, ip, txn, 0, &h)) != 0)
175			return (ret);
176		DB_ASSERT(env, TYPE(h) == P_OVERFLOW);
177
178		/* Check if we need any bytes from this page. */
179		if (curoff + OV_LEN(h) >= start) {
180			bytes = OV_LEN(h);
181			src = (u_int8_t *)h + P_OVERHEAD(dbp);
182			if (start > curoff) {
183				src += start - curoff;
184				bytes -= start - curoff;
185			}
186			if (bytes > needed)
187				bytes = needed;
188			if (F_ISSET(dbt, DB_DBT_USERCOPY)) {
189				/*
190				 * The offset into the DBT is the total size
191				 * less the amount of data still needed.  Care
192				 * needs to be taken if doing a partial copy
193				 * beginning at an offset other than 0.
194				 */
195				if ((ret = env->dbt_usercopy(
196				    dbt, dbt->size - needed,
197				    src, bytes, DB_USERCOPY_SETDATA)) != 0) {
198					(void)__memp_fput(mpf,
199					    ip, h, dbp->priority);
200					return (ret);
201				}
202			} else
203				memcpy(p, src, bytes);
204			p += bytes;
205			needed -= bytes;
206		}
207		cp->stream_off = curoff;
208		curoff += OV_LEN(h);
209		cp->stream_curr_pgno = pgno;
210		pgno = h->next_pgno;
211		(void)__memp_fput(mpf, ip, h, dbp->priority);
212	}
213
214	return (0);
215}
216
217/*
218 * __db_poff --
219 *	Put an offpage item.
220 *
221 * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *));
222 */
223int
224__db_poff(dbc, dbt, pgnop)
225	DBC *dbc;
226	const DBT *dbt;
227	db_pgno_t *pgnop;
228{
229	DB *dbp;
230	DBT tmp_dbt;
231	DB_LSN null_lsn;
232	DB_MPOOLFILE *mpf;
233	PAGE *pagep, *lastp;
234	db_indx_t pagespace;
235	db_pgno_t pgno;
236	u_int32_t space, sz, tlen;
237	u_int8_t *p;
238	int ret, t_ret;
239
240	/*
241	 * Allocate pages and copy the key/data item into them.  Calculate the
242	 * number of bytes we get for pages we fill completely with a single
243	 * item.
244	 */
245	dbp = dbc->dbp;
246	lastp = NULL;
247	mpf = dbp->mpf;
248	pagespace = P_MAXSPACE(dbp, dbp->pgsize);
249	p = dbt->data;
250	sz = dbt->size;
251
252	/*
253	 * Check whether we are streaming at the end of the overflow item.
254	 * If so, the last pgno and offset will be cached in the cursor.
255	 */
256	if (F_ISSET(dbt, DB_DBT_STREAMING)) {
257		tlen = dbt->size - dbt->dlen;
258		pgno = dbc->internal->stream_curr_pgno;
259		if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info,
260		    dbc->txn, DB_MPOOL_DIRTY, &lastp)) != 0)
261			return (ret);
262
263		/*
264		 * Calculate how much we can write on the last page of the
265		 * overflow item.
266		 */
267		DB_ASSERT(dbp->env,
268		    OV_LEN(lastp) == (tlen - dbc->internal->stream_off));
269		space = pagespace - OV_LEN(lastp);
270
271		/* Only copy as much data as we have. */
272		if (space > dbt->dlen)
273			space = dbt->dlen;
274
275		if (DBC_LOGGING(dbc)) {
276			tmp_dbt.data = dbt->data;
277			tmp_dbt.size = space;
278			ZERO_LSN(null_lsn);
279			if ((ret = __db_big_log(dbp, dbc->txn,
280			    &LSN(lastp), 0, DB_APPEND_BIG, pgno,
281			    PGNO_INVALID, PGNO_INVALID, &tmp_dbt,
282			    &LSN(lastp), &null_lsn, &null_lsn)) != 0)
283				goto err;
284		} else
285			LSN_NOT_LOGGED(LSN(lastp));
286
287		memcpy((u_int8_t *)lastp + P_OVERHEAD(dbp) + OV_LEN(lastp),
288		    dbt->data, space);
289		OV_LEN(lastp) += space;
290		sz -= space + dbt->doff;
291		p += space;
292		*pgnop = dbc->internal->stream_start_pgno;
293	}
294
295	ret = 0;
296	for (; sz > 0; p += pagespace, sz -= pagespace) {
297		/*
298		 * Reduce pagespace so we terminate the loop correctly and
299		 * don't copy too much data.
300		 */
301		if (sz < pagespace)
302			pagespace = sz;
303
304		/*
305		 * Allocate and initialize a new page and copy all or part of
306		 * the item onto the page.  If sz is less than pagespace, we
307		 * have a partial record.
308		 */
309		if ((ret = __db_new(dbc, P_OVERFLOW, NULL, &pagep)) != 0)
310			break;
311		if (DBC_LOGGING(dbc)) {
312			tmp_dbt.data = p;
313			tmp_dbt.size = pagespace;
314			ZERO_LSN(null_lsn);
315			if ((ret = __db_big_log(dbp, dbc->txn,
316			    &LSN(pagep), 0, DB_ADD_BIG, PGNO(pagep),
317			    lastp ? PGNO(lastp) : PGNO_INVALID,
318			    PGNO_INVALID, &tmp_dbt, &LSN(pagep),
319			    lastp == NULL ? &null_lsn : &LSN(lastp),
320			    &null_lsn)) != 0) {
321				(void)__memp_fput(mpf, dbc->thread_info,
322				    pagep, dbc->priority);
323				goto err;
324			}
325		} else
326			LSN_NOT_LOGGED(LSN(pagep));
327
328		/* Move LSN onto page. */
329		if (lastp != NULL)
330			LSN(lastp) = LSN(pagep);
331
332		OV_LEN(pagep) = pagespace;
333		OV_REF(pagep) = 1;
334		memcpy((u_int8_t *)pagep + P_OVERHEAD(dbp), p, pagespace);
335
336		/*
337		 * If this is the first entry, update the user's info and
338		 * initialize the cursor to allow for streaming of subsequent
339		 * updates.  Otherwise, update the entry on the last page
340		 * filled in and release that page.
341		 */
342		if (lastp == NULL) {
343			*pgnop = PGNO(pagep);
344			dbc->internal->stream_start_pgno =
345			    dbc->internal->stream_curr_pgno = *pgnop;
346			dbc->internal->stream_off = 0;
347		} else {
348			lastp->next_pgno = PGNO(pagep);
349			pagep->prev_pgno = PGNO(lastp);
350			if ((ret = __memp_fput(mpf,
351			    dbc->thread_info, lastp, dbc->priority)) != 0) {
352				lastp = NULL;
353				goto err;
354			}
355		}
356		lastp = pagep;
357	}
358err:	if (lastp != NULL) {
359		if (ret == 0) {
360			dbc->internal->stream_curr_pgno = PGNO(lastp);
361			dbc->internal->stream_off = dbt->size - OV_LEN(lastp);
362		}
363
364		if ((t_ret = __memp_fput(mpf, dbc->thread_info, lastp,
365		    dbc->priority)) != 0 && ret == 0)
366			ret = t_ret;
367	}
368	return (ret);
369}
370
371/*
372 * __db_ovref --
373 *	Decrement the reference count on an overflow page.
374 *
375 * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t));
376 */
377int
378__db_ovref(dbc, pgno)
379	DBC *dbc;
380	db_pgno_t pgno;
381{
382	DB *dbp;
383	DB_MPOOLFILE *mpf;
384	PAGE *h;
385	int ret;
386
387	dbp = dbc->dbp;
388	mpf = dbp->mpf;
389
390	if ((ret = __memp_fget(mpf, &pgno,
391	     dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &h)) != 0)
392		return (ret);
393
394	if (DBC_LOGGING(dbc)) {
395		if ((ret = __db_ovref_log(dbp,
396		    dbc->txn, &LSN(h), 0, h->pgno, -1, &LSN(h))) != 0) {
397			(void)__memp_fput(mpf,
398			     dbc->thread_info, h, dbc->priority);
399			return (ret);
400		}
401	} else
402		LSN_NOT_LOGGED(LSN(h));
403
404	/*
405	 * In BDB releases before 4.5, the overflow reference counts were
406	 * incremented when an overflow item was split onto an internal
407	 * page.  There was a lock race in that code, and rather than fix
408	 * the race, we changed BDB to copy overflow items when splitting
409	 * them onto internal pages.  The code to decrement reference
410	 * counts remains so databases already in the field continue to
411	 * work.
412	 */
413	--OV_REF(h);
414
415	return (__memp_fput(mpf, dbc->thread_info, h, dbc->priority));
416}
417
418/*
419 * __db_doff --
420 *	Delete an offpage chain of overflow pages.
421 *
422 * PUBLIC: int __db_doff __P((DBC *, db_pgno_t));
423 */
424int
425__db_doff(dbc, pgno)
426	DBC *dbc;
427	db_pgno_t pgno;
428{
429	DB *dbp;
430	DBT tmp_dbt;
431	DB_LSN null_lsn;
432	DB_MPOOLFILE *mpf;
433	PAGE *pagep;
434	int ret;
435
436	dbp = dbc->dbp;
437	mpf = dbp->mpf;
438
439	do {
440		if ((ret = __memp_fget(mpf, &pgno,
441		     dbc->thread_info, dbc->txn, 0, &pagep)) != 0)
442			return (ret);
443
444		DB_ASSERT(dbp->env, TYPE(pagep) == P_OVERFLOW);
445		/*
446		 * If it's referenced by more than one key/data item,
447		 * decrement the reference count and return.
448		 */
449		if (OV_REF(pagep) > 1) {
450			(void)__memp_fput(mpf,
451			    dbc->thread_info, pagep, dbc->priority);
452			return (__db_ovref(dbc, pgno));
453		}
454
455		if ((ret = __memp_dirty(mpf, &pagep,
456		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
457			if (pagep != NULL)
458				(void)__memp_fput(mpf,
459				    dbc->thread_info, pagep, dbc->priority);
460			return (ret);
461		}
462
463		if (DBC_LOGGING(dbc)) {
464			tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD(dbp);
465			tmp_dbt.size = OV_LEN(pagep);
466			ZERO_LSN(null_lsn);
467			if ((ret = __db_big_log(dbp, dbc->txn,
468			    &LSN(pagep), 0, DB_REM_BIG,
469			    PGNO(pagep), PREV_PGNO(pagep),
470			    NEXT_PGNO(pagep), &tmp_dbt,
471			    &LSN(pagep), &null_lsn, &null_lsn)) != 0) {
472				(void)__memp_fput(mpf,
473				    dbc->thread_info, pagep, dbc->priority);
474				return (ret);
475			}
476		} else
477			LSN_NOT_LOGGED(LSN(pagep));
478		pgno = pagep->next_pgno;
479		OV_LEN(pagep) = 0;
480		if ((ret = __db_free(dbc, pagep)) != 0)
481			return (ret);
482	} while (pgno != PGNO_INVALID);
483
484	return (0);
485}
486
487/*
488 * __db_moff --
489 *	Match on overflow pages.
490 *
491 * Given a starting page number and a key, return <0, 0, >0 to indicate if the
492 * key on the page is less than, equal to or greater than the key specified.
493 * We optimize this by doing chunk at a time comparison unless the user has
494 * specified a comparison function.  In this case, we need to materialize
495 * the entire object and call their comparison routine.
496 *
497 * __db_moff and __db_coff are generic functions useful in searching and
498 * ordering off page items. __db_moff matches an overflow DBT with an offpage
499 * item. __db_coff compares two offpage items for lexicographic sort order.
500 *
501 * PUBLIC: int __db_moff __P((DBC *, const DBT *, db_pgno_t, u_int32_t,
502 * PUBLIC:     int (*)(DB *, const DBT *, const DBT *), int *));
503 */
504int
505__db_moff(dbc, dbt, pgno, tlen, cmpfunc, cmpp)
506	DBC *dbc;
507	const DBT *dbt;
508	db_pgno_t pgno;
509	u_int32_t tlen;
510	int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
511{
512	DB *dbp;
513	DBT local_dbt;
514	DB_MPOOLFILE *mpf;
515	DB_THREAD_INFO *ip;
516	PAGE *pagep;
517	void *buf;
518	u_int32_t bufsize, cmp_bytes, key_left;
519	u_int8_t *p1, *p2;
520	int ret;
521
522	dbp = dbc->dbp;
523	ip = dbc->thread_info;
524	mpf = dbp->mpf;
525
526	/*
527	 * If there is a user-specified comparison function, build a
528	 * contiguous copy of the key, and call it.
529	 */
530	if (cmpfunc != NULL) {
531		memset(&local_dbt, 0, sizeof(local_dbt));
532		buf = NULL;
533		bufsize = 0;
534
535		if ((ret = __db_goff(dbc,
536		    &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
537			return (ret);
538		/* Pass the key as the first argument */
539		*cmpp = cmpfunc(dbp, dbt, &local_dbt);
540		__os_free(dbp->env, buf);
541		return (0);
542	}
543
544	/* While there are both keys to compare. */
545	for (*cmpp = 0, p1 = dbt->data,
546	    key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) {
547		if ((ret =
548		    __memp_fget(mpf, &pgno, ip, dbc->txn, 0, &pagep)) != 0)
549			return (ret);
550
551		cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left;
552		tlen -= cmp_bytes;
553		key_left -= cmp_bytes;
554		for (p2 = (u_int8_t *)pagep + P_OVERHEAD(dbp);
555		    cmp_bytes-- > 0; ++p1, ++p2)
556			if (*p1 != *p2) {
557				*cmpp = (long)*p1 - (long)*p2;
558				break;
559			}
560		pgno = NEXT_PGNO(pagep);
561		if ((ret = __memp_fput(mpf, ip, pagep, dbp->priority)) != 0)
562			return (ret);
563		if (*cmpp != 0)
564			return (0);
565	}
566	if (key_left > 0)		/* DBT is longer than the page key. */
567		*cmpp = 1;
568	else if (tlen > 0)		/* DBT is shorter than the page key. */
569		*cmpp = -1;
570	else
571		*cmpp = 0;
572
573	return (0);
574}
575
576/*
577 * __db_coff --
578 *	Match two offpage dbts.
579 *
580 * The DBTs must both refer to offpage items.
581 * The match happens a chunk (page) at a time unless a user defined comparison
582 * function exists. It is not possible to optimize this comparison away when
583 * a lexicographic sort order is required on mismatch.
584 *
585 * NOTE: For now this function only works for H_OFFPAGE type items. It would
586 * be simple to extend it for use with B_OVERFLOW type items. It would only
587 * require extracting the total length, and page number, dependent on the
588 * DBT type.
589 *
590 * PUBLIC: int __db_coff __P((DBC *, const DBT *, const DBT *,
591 * PUBLIC:     int (*)(DB *, const DBT *, const DBT *), int *));
592 */
593int
594__db_coff(dbc, dbt, match, cmpfunc, cmpp)
595	DBC *dbc;
596	const DBT *dbt, *match;
597	int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp;
598{
599	DB *dbp;
600	DB_THREAD_INFO *ip;
601	DB_MPOOLFILE *mpf;
602	DB_TXN *txn;
603	DBT local_key, local_match;
604	PAGE *dbt_pagep, *match_pagep;
605	db_pgno_t dbt_pgno, match_pgno;
606	u_int32_t cmp_bytes, dbt_bufsz, dbt_len, match_bufsz;
607	u_int32_t match_len, max_data, page_sz;
608	u_int8_t *p1, *p2;
609	int ret;
610	void *dbt_buf, *match_buf;
611
612	dbp = dbc->dbp;
613	ip = dbc->thread_info;
614	txn = dbc->txn;
615	mpf = dbp->mpf;
616	page_sz = dbp->pgsize;
617	*cmpp = 0;
618	dbt_buf = match_buf = NULL;
619
620	DB_ASSERT(dbp->env, HPAGE_PTYPE(dbt->data) == H_OFFPAGE);
621	DB_ASSERT(dbp->env, HPAGE_PTYPE(match->data) == H_OFFPAGE);
622
623	/* Extract potentially unaligned length and pgno fields from DBTs */
624	memcpy(&dbt_len, HOFFPAGE_TLEN(dbt->data), sizeof(u_int32_t));
625	memcpy(&dbt_pgno, HOFFPAGE_PGNO(dbt->data), sizeof(db_pgno_t));
626	memcpy(&match_len, HOFFPAGE_TLEN(match->data), sizeof(u_int32_t));
627	memcpy(&match_pgno, HOFFPAGE_PGNO(match->data), sizeof(db_pgno_t));
628	max_data = (dbt_len < match_len ? dbt_len : match_len);
629
630	/*
631	 * If there is a custom comparator, fully resolve both DBTs.
632	 * Then call the users comparator.
633	 */
634	if (cmpfunc != NULL) {
635		memset(&local_key, 0, sizeof(local_key));
636		memset(&local_match, 0, sizeof(local_match));
637		dbt_buf = match_buf = NULL;
638		dbt_bufsz = match_bufsz = 0;
639
640		if ((ret = __db_goff(dbc, &local_key, dbt_len,
641		    dbt_pgno, &dbt_buf, &dbt_bufsz)) != 0)
642			goto err1;
643		if ((ret = __db_goff(dbc, &local_match, match_len,
644		    match_pgno, &match_buf, &match_bufsz)) != 0)
645			goto err1;
646		/* The key needs to be the first argument for sort order */
647		*cmpp = cmpfunc(dbp, &local_key, &local_match);
648
649err1:		if (dbt_buf != NULL)
650			__os_free(dbp->env, dbt_buf);
651		if (match_buf != NULL)
652			__os_free(dbp->env, match_buf);
653		return (ret);
654	}
655
656	/* Match the offpage DBTs a page at a time. */
657	while (dbt_pgno != PGNO_INVALID && match_pgno != PGNO_INVALID) {
658		if ((ret =
659		    __memp_fget(mpf, &dbt_pgno, ip, txn, 0, &dbt_pagep)) != 0)
660			return (ret);
661		if ((ret =
662		    __memp_fget(mpf, &match_pgno,
663			ip, txn, 0, &match_pagep)) != 0) {
664			(void)__memp_fput(
665			    mpf, ip, dbt_pagep, DB_PRIORITY_UNCHANGED);
666			return (ret);
667		}
668		cmp_bytes = page_sz < max_data ? page_sz : max_data;
669		for (p1 = (u_int8_t *)dbt_pagep + P_OVERHEAD(dbp),
670		    p2 = (u_int8_t *)match_pagep + P_OVERHEAD(dbp);
671		    cmp_bytes-- > 0; ++p1, ++p2)
672				if (*p1 != *p2) {
673					*cmpp = (long)*p1 - (long)*p2;
674					break;
675				}
676
677		dbt_pgno = NEXT_PGNO(dbt_pagep);
678		match_pgno = NEXT_PGNO(match_pagep);
679		max_data -= page_sz;
680		if ((ret = __memp_fput(mpf,
681		     ip, dbt_pagep, DB_PRIORITY_UNCHANGED)) != 0) {
682			(void)__memp_fput(mpf,
683			    ip, match_pagep, DB_PRIORITY_UNCHANGED);
684			return (ret);
685		}
686		if ((ret = __memp_fput(mpf,
687		    ip, match_pagep, DB_PRIORITY_UNCHANGED)) != 0)
688			return (ret);
689		if (*cmpp != 0)
690			return (0);
691	}
692
693	/* If a lexicographic mismatch was found, then the result has already
694	 * been returned. If the DBTs matched, consider the lengths of the
695	 * items, and return appropriately.
696	 */
697	if (dbt_len > match_len) /* DBT is longer than the match key. */
698		*cmpp = 1;
699	else if (match_len > dbt_len) /* DBT is shorter than the match key. */
700		*cmpp = -1;
701	else
702		*cmpp = 0;
703
704	return (0);
705
706}
707