1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994
8 *	The Regents of the University of California.  All rights reserved.
9 *
10 * This code is derived from software contributed to Berkeley by
11 * Margo Seltzer.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 * 1. Redistributions of source code must retain the above copyright
17 *    notice, this list of conditions and the following disclaimer.
18 * 2. Redistributions in binary form must reproduce the above copyright
19 *    notice, this list of conditions and the following disclaimer in the
20 *    documentation and/or other materials provided with the distribution.
21 * 3. Neither the name of the University nor the names of its contributors
22 *    may be used to endorse or promote products derived from this software
23 *    without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
26 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 *
37 * $Id$
38 */
39
40/*
41 * PACKAGE:  hashing
42 *
43 * DESCRIPTION:
44 *	Manipulation of duplicates for the hash package.
45 */
46
47#include "db_config.h"
48
49#include "db_int.h"
50#include "dbinc/db_page.h"
51#include "dbinc/hash.h"
52#include "dbinc/btree.h"
53#include "dbinc/mp.h"
54
55static int __hamc_chgpg __P((DBC *,
56    db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
57static int __ham_check_move __P((DBC *, u_int32_t));
58static int __ham_dcursor __P((DBC *, db_pgno_t, u_int32_t));
59static int __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t));
60
61/*
62 * Called from hash_access to add a duplicate key. nval is the new
63 * value that we want to add.  The flags correspond to the flag values
64 * to cursor_put indicating where to add the new element.
65 * There are 4 cases.
66 * Case 1: The existing duplicate set already resides on a separate page.
67 *	   We return and let the common code handle this.
68 * Case 2: The element is small enough to just be added to the existing set.
69 * Case 3: The element is large enough to be a big item, so we're going to
70 *	   have to push the set onto a new page.
71 * Case 4: The element is large enough to push the duplicate set onto a
72 *	   separate page.
73 *
74 * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t, db_pgno_t *));
75 */
76int
77__ham_add_dup(dbc, nval, flags, pgnop)
78	DBC *dbc;
79	DBT *nval;
80	u_int32_t flags;
81	db_pgno_t *pgnop;
82{
83	DB *dbp;
84	DBT pval, tmp_val;
85	DB_MPOOLFILE *mpf;
86	ENV *env;
87	HASH_CURSOR *hcp;
88	u_int32_t add_bytes, new_size;
89	int cmp, ret;
90	u_int8_t *hk;
91
92	dbp = dbc->dbp;
93	env = dbp->env;
94	mpf = dbp->mpf;
95	hcp = (HASH_CURSOR *)dbc->internal;
96
97	DB_ASSERT(env, flags != DB_CURRENT);
98
99	add_bytes = nval->size +
100	    (F_ISSET(nval, DB_DBT_PARTIAL) ? nval->doff : 0);
101	add_bytes = DUP_SIZE(add_bytes);
102
103	if ((ret = __ham_check_move(dbc, add_bytes)) != 0)
104		return (ret);
105
106	/*
107	 * Check if resulting duplicate set is going to need to go
108	 * onto a separate duplicate page.  If so, convert the
109	 * duplicate set and add the new one.  After conversion,
110	 * hcp->dndx is the first free ndx or the index of the
111	 * current pointer into the duplicate set.
112	 */
113	hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
114	/* Add the len bytes to the current singleton. */
115	if (HPAGE_PTYPE(hk) != H_DUPLICATE)
116		add_bytes += DUP_SIZE(0);
117	new_size =
118	    LEN_HKEYDATA(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)) +
119	    add_bytes;
120
121	/*
122	 * We convert to off-page duplicates if the item is a big item,
123	 * the addition of the new item will make the set large, or
124	 * if there isn't enough room on this page to add the next item.
125	 */
126	if (HPAGE_PTYPE(hk) != H_OFFDUP &&
127	    (HPAGE_PTYPE(hk) == H_OFFPAGE || ISBIG(hcp, new_size) ||
128	    add_bytes > P_FREESPACE(dbp, hcp->page))) {
129
130		if ((ret = __ham_dup_convert(dbc)) != 0)
131			return (ret);
132		return (hcp->opd->am_put(hcp->opd,
133		    NULL, nval, flags, NULL));
134	}
135
136	/* There are two separate cases here: on page and off page. */
137	if (HPAGE_PTYPE(hk) != H_OFFDUP) {
138		if (HPAGE_PTYPE(hk) != H_DUPLICATE) {
139			pval.flags = 0;
140			pval.data = HKEYDATA_DATA(hk);
141			pval.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize,
142			    hcp->indx);
143			if ((ret = __ham_make_dup(env,
144			    &pval, &tmp_val, &dbc->my_rdata.data,
145			    &dbc->my_rdata.ulen)) != 0 || (ret =
146			    __ham_replpair(dbc, &tmp_val, 1)) != 0)
147				return (ret);
148			hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
149			HPAGE_PTYPE(hk) = H_DUPLICATE;
150
151			/*
152			 * Update the cursor position since we now are in
153			 * duplicates.
154			 */
155			F_SET(hcp, H_ISDUP);
156			hcp->dup_off = 0;
157			hcp->dup_len = pval.size;
158			hcp->dup_tlen = DUP_SIZE(hcp->dup_len);
159		}
160
161		/* Now make the new entry a duplicate. */
162		if ((ret = __ham_make_dup(env, nval,
163		    &tmp_val, &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
164			return (ret);
165
166		tmp_val.dlen = 0;
167		switch (flags) {			/* On page. */
168		case DB_KEYFIRST:
169		case DB_KEYLAST:
170		case DB_NODUPDATA:
171		case DB_OVERWRITE_DUP:
172			if (dbp->dup_compare != NULL) {
173				__ham_dsearch(dbc,
174				    nval, &tmp_val.doff, &cmp, flags);
175
176				/*
177				 * Duplicate duplicates are not supported w/
178				 * sorted dups.  We can either overwrite or
179				 * return DB_KEYEXIST.
180				 */
181				if (cmp == 0) {
182					if (flags == DB_OVERWRITE_DUP)
183						return (__ham_overwrite(dbc,
184						    nval, flags));
185					return (__db_duperr(dbp, flags));
186				}
187			} else {
188				hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
189				    dbp->pgsize, hcp->indx);
190				hcp->dup_len = nval->size;
191				F_SET(hcp, H_ISDUP);
192				if (flags == DB_KEYFIRST)
193					hcp->dup_off = tmp_val.doff = 0;
194				else
195					hcp->dup_off =
196					    tmp_val.doff = hcp->dup_tlen;
197			}
198			break;
199		case DB_BEFORE:
200			tmp_val.doff = hcp->dup_off;
201			break;
202		case DB_AFTER:
203			tmp_val.doff = hcp->dup_off + DUP_SIZE(hcp->dup_len);
204			break;
205		default:
206			return (__db_unknown_path(env, "__ham_add_dup"));
207		}
208
209		/* Add the duplicate. */
210		if ((ret = __memp_dirty(mpf, &hcp->page,
211		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
212		    (ret = __ham_replpair(dbc, &tmp_val, 0)) != 0)
213			return (ret);
214
215		/* Now, update the cursor if necessary. */
216		switch (flags) {
217		case DB_AFTER:
218			hcp->dup_off += DUP_SIZE(hcp->dup_len);
219			hcp->dup_len = nval->size;
220			hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
221			break;
222		case DB_BEFORE:
223		case DB_KEYFIRST:
224		case DB_KEYLAST:
225		case DB_NODUPDATA:
226			hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
227			hcp->dup_len = nval->size;
228			break;
229		default:
230			return (__db_unknown_path(env, "__ham_add_dup"));
231		}
232		ret = __hamc_update(dbc, tmp_val.size, DB_HAM_CURADJ_ADD, 1);
233		return (ret);
234	}
235
236	/*
237	 * If we get here, then we're on duplicate pages; set pgnop and
238	 * return so the common code can handle it.
239	 */
240	memcpy(pgnop, HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
241	    sizeof(db_pgno_t));
242
243	return (ret);
244}
245
246/*
247 * Convert an on-page set of duplicates to an offpage set of duplicates.
248 *
249 * PUBLIC: int __ham_dup_convert __P((DBC *));
250 */
251int
252__ham_dup_convert(dbc)
253	DBC *dbc;
254{
255	BOVERFLOW bo;
256	DB *dbp;
257	DBC **hcs;
258	DBT dbt;
259	DB_LSN lsn;
260	DB_MPOOLFILE *mpf;
261	ENV *env;
262	HASH_CURSOR *hcp;
263	HOFFPAGE ho;
264	PAGE *dp;
265	db_indx_t i, len, off;
266	int c, ret, t_ret;
267	u_int8_t *p, *pend;
268
269	dbp = dbc->dbp;
270	env = dbp->env;
271	mpf = dbp->mpf;
272	hcp = (HASH_CURSOR *)dbc->internal;
273
274	/*
275	 * Create a new page for the duplicates.
276	 */
277	if ((ret = __db_new(dbc,
278	    dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, NULL, &dp)) != 0)
279		return (ret);
280	P_INIT(dp, dbp->pgsize,
281	    dp->pgno, PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
282
283	/*
284	 * Get the list of cursors that may need to be updated.
285	 */
286	if ((ret = __ham_get_clist(dbp,
287	    PGNO(hcp->page), (u_int32_t)hcp->indx, &hcs)) != 0)
288		goto err;
289
290	/*
291	 * Now put the duplicates onto the new page.
292	 */
293	dbt.flags = 0;
294	switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
295	case H_KEYDATA:
296		/* Simple case, one key on page; move it to dup page. */
297		dbt.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
298		dbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
299		ret = __db_pitem(dbc,
300		    dp, 0, BKEYDATA_SIZE(dbt.size), NULL, &dbt);
301		goto finish;
302	case H_OFFPAGE:
303		/* Simple case, one key on page; move it to dup page. */
304		memcpy(&ho, P_ENTRY(dbp, hcp->page, H_DATAINDEX(hcp->indx)),
305		    HOFFPAGE_SIZE);
306		UMRW_SET(bo.unused1);
307		B_TSET(bo.type, ho.type);
308		UMRW_SET(bo.unused2);
309		bo.pgno = ho.pgno;
310		bo.tlen = ho.tlen;
311		dbt.size = BOVERFLOW_SIZE;
312		dbt.data = &bo;
313
314		ret = __db_pitem(dbc, dp, 0, dbt.size, &dbt, NULL);
315finish:		if (ret == 0) {
316			/* Update any other cursors. */
317			if (hcs != NULL && DBC_LOGGING(dbc) &&
318			    IS_SUBTRANSACTION(dbc->txn)) {
319				if ((ret = __ham_chgpg_log(dbp, dbc->txn,
320				    &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
321				    PGNO(dp), hcp->indx, 0)) != 0)
322					break;
323			}
324			for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
325				if ((ret = __ham_dcursor(hcs[c],
326				    PGNO(dp), 0)) != 0)
327					break;
328		}
329		break;
330	case H_DUPLICATE:
331		p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
332		pend = p +
333		    LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
334
335		/*
336		 * We need to maintain the duplicate cursor position.
337		 * Keep track of where we are in the duplicate set via
338		 * the offset, and when it matches the one in the cursor,
339		 * set the off-page duplicate cursor index to the current
340		 * index.
341		 */
342		for (off = 0, i = 0; p < pend; i++) {
343			memcpy(&len, p, sizeof(db_indx_t));
344			dbt.size = len;
345			p += sizeof(db_indx_t);
346			dbt.data = p;
347			p += len + sizeof(db_indx_t);
348			if ((ret = __db_pitem(dbc, dp,
349			    i, BKEYDATA_SIZE(dbt.size), NULL, &dbt)) != 0)
350				break;
351
352			/* Update any other cursors */
353			if (hcs != NULL && DBC_LOGGING(dbc) &&
354			    IS_SUBTRANSACTION(dbc->txn)) {
355				if ((ret = __ham_chgpg_log(dbp, dbc->txn,
356				    &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
357				    PGNO(dp), hcp->indx, i)) != 0)
358					break;
359			}
360			for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
361				if (((HASH_CURSOR *)(hcs[c]->internal))->dup_off
362				    == off && (ret = __ham_dcursor(hcs[c],
363				    PGNO(dp), i)) != 0)
364					goto err;
365			off += len + 2 * sizeof(db_indx_t);
366		}
367		break;
368	default:
369		ret = __db_pgfmt(env, hcp->pgno);
370		break;
371	}
372
373	/*
374	 * Now attach this to the source page in place of the old duplicate
375	 * item.
376	 */
377	if (ret == 0)
378		ret = __memp_dirty(mpf,
379		    &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0);
380
381	if (ret == 0)
382		ret = __ham_move_offpage(dbc, hcp->page,
383		    (u_int32_t)H_DATAINDEX(hcp->indx), PGNO(dp));
384
385err:	if ((t_ret = __memp_fput(mpf,
386	    dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0)
387		ret = t_ret;
388
389	if (ret == 0)
390		hcp->dup_tlen = hcp->dup_off = hcp->dup_len = 0;
391
392	if (hcs != NULL)
393		__os_free(env, hcs);
394
395	return (ret);
396}
397
398/*
399 * __ham_make_dup
400 *
401 * Take a regular dbt and make it into a duplicate item with all the partial
402 * information set appropriately. If the incoming dbt is a partial, assume
403 * we are creating a new entry and make sure that we do any initial padding.
404 *
405 * PUBLIC: int __ham_make_dup __P((ENV *,
406 * PUBLIC:     const DBT *, DBT *d, void **, u_int32_t *));
407 */
408int
409__ham_make_dup(env, notdup, duplicate, bufp, sizep)
410	ENV *env;
411	const DBT *notdup;
412	DBT *duplicate;
413	void **bufp;
414	u_int32_t *sizep;
415{
416	db_indx_t tsize, item_size;
417	int ret;
418	u_int8_t *p;
419
420	item_size = (db_indx_t)notdup->size;
421	if (F_ISSET(notdup, DB_DBT_PARTIAL))
422		item_size += notdup->doff;
423
424	tsize = DUP_SIZE(item_size);
425	if ((ret = __ham_init_dbt(env, duplicate, tsize, bufp, sizep)) != 0)
426		return (ret);
427
428	duplicate->dlen = 0;
429	duplicate->flags = notdup->flags;
430	F_SET(duplicate, DB_DBT_PARTIAL);
431
432	p = duplicate->data;
433	memcpy(p, &item_size, sizeof(db_indx_t));
434	p += sizeof(db_indx_t);
435	if (F_ISSET(notdup, DB_DBT_PARTIAL)) {
436		memset(p, 0, notdup->doff);
437		p += notdup->doff;
438	}
439	memcpy(p, notdup->data, notdup->size);
440	p += notdup->size;
441	memcpy(p, &item_size, sizeof(db_indx_t));
442
443	duplicate->doff = 0;
444	duplicate->dlen = notdup->size;
445
446	return (0);
447}
448
449/*
450 * __ham_check_move --
451 *
452 * Check if we can do whatever we need to on this page.  If not,
453 * then we'll have to move the current element to a new page.
454 */
455static int
456__ham_check_move(dbc, add_len)
457	DBC *dbc;
458	u_int32_t add_len;
459{
460	DB *dbp;
461	DBT k, d;
462	DB_LSN new_lsn;
463	DB_MPOOLFILE *mpf;
464	HASH_CURSOR *hcp;
465	PAGE *next_pagep;
466	db_pgno_t next_pgno;
467	u_int32_t new_datalen, old_len, rectype;
468	db_indx_t new_indx;
469	u_int8_t *hk;
470	int key_type, match, ret, t_ret;
471
472	dbp = dbc->dbp;
473	mpf = dbp->mpf;
474	hcp = (HASH_CURSOR *)dbc->internal;
475
476	hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
477
478	/*
479	 * If the item is already off page duplicates or an offpage item,
480	 * then we know we can do whatever we need to do in-place
481	 */
482	if (HPAGE_PTYPE(hk) == H_OFFDUP || HPAGE_PTYPE(hk) == H_OFFPAGE)
483		return (0);
484
485	old_len =
486	    LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx));
487	new_datalen = (old_len - HKEYDATA_SIZE(0)) + add_len;
488	if (HPAGE_PTYPE(hk) != H_DUPLICATE)
489		new_datalen += DUP_SIZE(0);
490
491	/*
492	 * We need to add a new page under two conditions:
493	 * 1. The addition makes the total data length cross the BIG
494	 *    threshold and the OFFDUP structure won't fit on this page.
495	 * 2. The addition does not make the total data cross the
496	 *    threshold, but the new data won't fit on the page.
497	 * If neither of these is true, then we can return.
498	 */
499	if (ISBIG(hcp, new_datalen) && (old_len > HOFFDUP_SIZE ||
500	    HOFFDUP_SIZE - old_len <= P_FREESPACE(dbp, hcp->page)))
501		return (0);
502
503	if (!ISBIG(hcp, new_datalen) &&
504	    (new_datalen - old_len) <= P_FREESPACE(dbp, hcp->page))
505		return (0);
506
507	/*
508	 * If we get here, then we need to move the item to a new page.
509	 * Check if there are more pages in the chain.  We now need to
510	 * update new_datalen to include the size of both the key and
511	 * the data that we need to move.
512	 */
513
514	new_datalen = ISBIG(hcp, new_datalen) ?
515	    HOFFDUP_SIZE : HKEYDATA_SIZE(new_datalen);
516	new_datalen +=
517	    LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_KEYINDEX(hcp->indx));
518
519	next_pagep = NULL;
520	for (next_pgno = NEXT_PGNO(hcp->page); next_pgno != PGNO_INVALID;
521	    next_pgno = NEXT_PGNO(next_pagep)) {
522		if (next_pagep != NULL && (ret = __memp_fput(mpf,
523		    dbc->thread_info, next_pagep, dbc->priority)) != 0)
524			return (ret);
525
526		if ((ret = __memp_fget(mpf,
527		    &next_pgno, dbc->thread_info, dbc->txn,
528		    DB_MPOOL_CREATE, &next_pagep)) != 0)
529			return (ret);
530
531		if (P_FREESPACE(dbp, next_pagep) >= new_datalen)
532			break;
533	}
534
535	/* No more pages, add one. */
536	if ((ret = __memp_dirty(mpf,
537	    &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
538		return (ret);
539
540	if (next_pagep == NULL && (ret = __ham_add_ovflpage(dbc,
541	    hcp->page, 0, &next_pagep)) != 0)
542		return (ret);
543
544	/* Add new page at the end of the chain. */
545	if ((ret = __memp_dirty(mpf,
546	    &next_pagep, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
547		(void)__memp_fput(mpf,
548		    dbc->thread_info, next_pagep, dbc->priority);
549		return (ret);
550	}
551
552	if (P_FREESPACE(dbp, next_pagep) < new_datalen && (ret =
553	    __ham_add_ovflpage(dbc, next_pagep, 1, &next_pagep)) != 0) {
554		(void)__memp_fput(mpf,
555		    dbc->thread_info, next_pagep, dbc->priority);
556		return (ret);
557	}
558
559	/* Copy the item to the new page. */
560	if (DBC_LOGGING(dbc)) {
561		rectype = PUTPAIR;
562		k.flags = 0;
563		d.flags = 0;
564		if (HPAGE_PTYPE(
565		    H_PAIRKEY(dbp, hcp->page, hcp->indx)) == H_OFFPAGE) {
566			rectype |= PAIR_KEYMASK;
567			k.data = H_PAIRKEY(dbp, hcp->page, hcp->indx);
568			k.size = HOFFPAGE_SIZE;
569			key_type = H_OFFPAGE;
570		} else {
571			k.data =
572			    HKEYDATA_DATA(H_PAIRKEY(dbp, hcp->page, hcp->indx));
573			k.size =
574			    LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx);
575			key_type = H_KEYDATA;
576		}
577
578		/* Resolve the insert index so it can be written to the log. */
579		if ((ret = __ham_getindex(dbc, next_pagep, &k,
580		    key_type, &match, &new_indx)) != 0)
581			return (ret);
582
583		if (HPAGE_PTYPE(hk) == H_OFFPAGE) {
584			rectype |= PAIR_DATAMASK;
585			d.data = H_PAIRDATA(dbp, hcp->page, hcp->indx);
586			d.size = HOFFPAGE_SIZE;
587		} else {
588			if (HPAGE_PTYPE(H_PAIRDATA(dbp,
589			    hcp->page, hcp->indx)) == H_DUPLICATE)
590				rectype |= PAIR_DUPMASK;
591			d.data = HKEYDATA_DATA(
592			    H_PAIRDATA(dbp, hcp->page, hcp->indx));
593			d.size = LEN_HDATA(dbp, hcp->page,
594			    dbp->pgsize, hcp->indx);
595		}
596
597		if ((ret = __ham_insdel_log(dbp,
598		    dbc->txn, &new_lsn, 0, rectype, PGNO(next_pagep),
599		    (u_int32_t)new_indx, &LSN(next_pagep),
600		    &k, &d)) != 0) {
601			(void)__memp_fput(mpf,
602			    dbc->thread_info, next_pagep, dbc->priority);
603			return (ret);
604		}
605	} else {
606		LSN_NOT_LOGGED(new_lsn);
607		/*
608		 * Ensure that an invalid index is passed to __ham_copypair, so
609		 * it knows to resolve the index. Resolving the insert index
610		 * here would require creating a temporary DBT with the key,
611		 * and calling __ham_getindex. Let __ham_copypair do the
612		 * resolution using the final key DBT.
613		 */
614		new_indx = NDX_INVALID;
615	}
616
617	/* Move lsn onto page. */
618	if ((ret = __memp_dirty(mpf,
619	    &next_pagep, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
620		(void)__memp_fput(mpf,
621		    dbc->thread_info, next_pagep, dbc->priority);
622		return (ret);
623	}
624	LSN(next_pagep) = new_lsn;	/* Structure assignment. */
625
626	if ((ret = __ham_copypair(dbc, hcp->page,
627	    H_KEYINDEX(hcp->indx), next_pagep, &new_indx)) != 0)
628	    goto out;
629
630	/* Update all cursors that used to point to this item. */
631	if ((ret = __hamc_chgpg(dbc, PGNO(hcp->page), H_KEYINDEX(hcp->indx),
632	    PGNO(next_pagep), new_indx)) != 0)
633		goto out;
634
635	/* Now delete the pair from the current page. */
636	ret = __ham_del_pair(dbc, HAM_DEL_NO_RECLAIM);
637
638	/*
639	 * __ham_del_pair decremented nelem.  This is incorrect;  we
640	 * manually copied the element elsewhere, so the total number
641	 * of elements hasn't changed.  Increment it again.
642	 *
643	 * !!!
644	 * Note that we still have the metadata page pinned, and
645	 * __ham_del_pair dirtied it, so we don't need to set the dirty
646	 * flag again.
647	 */
648	if (!STD_LOCKING(dbc))
649		hcp->hdr->nelem++;
650
651out:	if ((t_ret = __memp_fput(mpf,
652	    dbc->thread_info, hcp->page, dbc->priority)) != 0 && ret == 0)
653		ret = t_ret;
654	hcp->page = next_pagep;
655	hcp->pgno = PGNO(hcp->page);
656	hcp->indx = new_indx;
657	F_SET(hcp, H_EXPAND);
658	F_CLR(hcp, H_DELETED);
659
660	return (ret);
661}
662
663/*
664 * __ham_move_offpage --
665 *	Replace an onpage set of duplicates with the OFFDUP structure
666 *	that references the duplicate page.
667 *
668 * XXX
669 * This is really just a special case of __onpage_replace; we should
670 * probably combine them.
671 *
672 */
673static int
674__ham_move_offpage(dbc, pagep, ndx, pgno)
675	DBC *dbc;
676	PAGE *pagep;
677	u_int32_t ndx;
678	db_pgno_t pgno;
679{
680	DB *dbp;
681	DBT new_dbt;
682	DBT old_dbt;
683	HOFFDUP od;
684	db_indx_t i, *inp;
685	int32_t difflen;
686	u_int8_t *src;
687	int ret;
688
689	dbp = dbc->dbp;
690	od.type = H_OFFDUP;
691	UMRW_SET(od.unused[0]);
692	UMRW_SET(od.unused[1]);
693	UMRW_SET(od.unused[2]);
694	od.pgno = pgno;
695	ret = 0;
696
697	if (DBC_LOGGING(dbc)) {
698		new_dbt.data = &od;
699		new_dbt.size = HOFFDUP_SIZE;
700		old_dbt.data = P_ENTRY(dbp, pagep, ndx);
701		old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx);
702		if ((ret = __ham_replace_log(dbp, dbc->txn, &LSN(pagep), 0,
703		    PGNO(pagep), (u_int32_t)ndx, &LSN(pagep), -1,
704		    &old_dbt, &new_dbt, 0)) != 0)
705			return (ret);
706	} else
707		LSN_NOT_LOGGED(LSN(pagep));
708
709	/*
710	 * difflen is the difference in the lengths, and so may be negative.
711	 * We know that the difference between two unsigned lengths from a
712	 * database page will fit into an int32_t.
713	 */
714	difflen =
715	    (int32_t)LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
716	    (int32_t)HOFFDUP_SIZE;
717	if (difflen != 0) {
718		/* Copy data. */
719		inp = P_INP(dbp, pagep);
720		src = (u_int8_t *)(pagep) + HOFFSET(pagep);
721		memmove(src + difflen, src, inp[ndx] - HOFFSET(pagep));
722		HOFFSET(pagep) += difflen;
723
724		/* Update index table. */
725		for (i = ndx; i < NUM_ENT(pagep); i++)
726			inp[i] += difflen;
727	}
728
729	/* Now copy the offdup entry onto the page. */
730	memcpy(P_ENTRY(dbp, pagep, ndx), &od, HOFFDUP_SIZE);
731	return (ret);
732}
733
734/*
735 * __ham_dsearch:
736 *	Locate a particular duplicate in a duplicate set.  Make sure that
737 *	we exit with the cursor set appropriately.
738 *
739 * PUBLIC: void __ham_dsearch
740 * PUBLIC:     __P((DBC *, DBT *, u_int32_t *, int *, u_int32_t));
741 */
742void
743__ham_dsearch(dbc, dbt, offp, cmpp, flags)
744	DBC *dbc;
745	DBT *dbt;
746	u_int32_t *offp, flags;
747	int *cmpp;
748{
749	DB *dbp;
750	DBT cur;
751	HASH_CURSOR *hcp;
752	db_indx_t i, len;
753	int (*func) __P((DB *, const DBT *, const DBT *));
754	u_int8_t *data;
755
756	dbp = dbc->dbp;
757	hcp = (HASH_CURSOR *)dbc->internal;
758	func = dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare;
759
760	i = F_ISSET(hcp, H_CONTINUE) ? hcp->dup_off: 0;
761	data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + i;
762	hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
763	len = hcp->dup_len;
764	while (i < hcp->dup_tlen) {
765		memcpy(&len, data, sizeof(db_indx_t));
766		data += sizeof(db_indx_t);
767		DB_SET_DBT(cur, data, len);
768
769		/*
770		 * If we find an exact match, we're done.  If in a sorted
771		 * duplicate set and the item is larger than our test item,
772		 * we're done.  In the latter case, if permitting partial
773		 * matches, it's not a failure.
774		 */
775		*cmpp = func(dbp, dbt, &cur);
776		if (*cmpp == 0)
777			break;
778		if (*cmpp < 0 && dbp->dup_compare != NULL) {
779			if (flags == DB_GET_BOTH_RANGE)
780				*cmpp = 0;
781			break;
782		}
783
784		i += len + 2 * sizeof(db_indx_t);
785		data += len + sizeof(db_indx_t);
786	}
787
788	*offp = i;
789	hcp->dup_off = i;
790	hcp->dup_len = len;
791	F_SET(hcp, H_ISDUP);
792}
793
794/*
795 * __ham_dcursor --
796 *
797 *	Create an off page duplicate cursor for this cursor.
798 */
799static int
800__ham_dcursor(dbc, pgno, indx)
801	DBC *dbc;
802	db_pgno_t pgno;
803	u_int32_t indx;
804{
805	BTREE_CURSOR *dcp;
806	DB *dbp;
807	HASH_CURSOR *hcp;
808	int ret;
809
810	dbp = dbc->dbp;
811	hcp = (HASH_CURSOR *)dbc->internal;
812
813	if ((ret = __dbc_newopd(dbc, pgno, hcp->opd, &hcp->opd)) != 0)
814		return (ret);
815
816	dcp = (BTREE_CURSOR *)hcp->opd->internal;
817	dcp->pgno = pgno;
818	dcp->indx = indx;
819
820	if (dbp->dup_compare == NULL) {
821		/*
822		 * Converting to off-page Recno trees is tricky.  The
823		 * record number for the cursor is the index + 1 (to
824		 * convert to 1-based record numbers).
825		 */
826		dcp->recno = indx + 1;
827	}
828
829	/*
830	 * Transfer the deleted flag from the top-level cursor to the
831	 * created one.
832	 */
833	if (F_ISSET(hcp, H_DELETED)) {
834		F_SET(dcp, C_DELETED);
835		F_CLR(hcp, H_DELETED);
836	}
837
838	return (0);
839}
840
841/*
842 * __hamc_chgpg --
843 *	Adjust the cursors after moving an item to a new page.  We only
844 *	move cursors that are pointing at this one item and are not
845 *	deleted;  since we only touch non-deleted cursors, and since
846 *	(by definition) no item existed at the pgno/indx we're moving the
847 *	item to, we're guaranteed that all the cursors we affect here or
848 *	on abort really do refer to this one item.
849 */
850static int
851__hamc_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
852	DBC *dbc;
853	db_pgno_t old_pgno, new_pgno;
854	u_int32_t old_index, new_index;
855{
856	DB *dbp, *ldbp;
857	DBC *cp;
858	DB_LSN lsn;
859	DB_TXN *my_txn;
860	ENV *env;
861	HASH_CURSOR *hcp;
862	int found, ret;
863
864	dbp = dbc->dbp;
865	env = dbp->env;
866
867	my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
868
869	MUTEX_LOCK(env, env->mtx_dblist);
870	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
871	for (found = 0;
872	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
873	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
874		MUTEX_LOCK(env, dbp->mutex);
875		TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
876			if (cp == dbc || cp->dbtype != DB_HASH)
877				continue;
878
879			hcp = (HASH_CURSOR *)cp->internal;
880
881			/*
882			 * If a cursor is deleted, it doesn't refer to this
883			 * item--it just happens to have the same indx, but
884			 * it points to a former neighbor.  Don't move it.
885			 */
886			if (F_ISSET(hcp, H_DELETED))
887				continue;
888
889			if (hcp->pgno == old_pgno &&
890			    hcp->indx == old_index &&
891			    !MVCC_SKIP_CURADJ(cp, old_pgno)) {
892				hcp->pgno = new_pgno;
893				hcp->indx = new_index;
894				if (my_txn != NULL && cp->txn != my_txn)
895					found = 1;
896			}
897		}
898		MUTEX_UNLOCK(env, dbp->mutex);
899	}
900	MUTEX_UNLOCK(env, env->mtx_dblist);
901
902	if (found != 0 && DBC_LOGGING(dbc)) {
903		if ((ret = __ham_chgpg_log(dbp, my_txn, &lsn, 0, DB_HAM_CHGPG,
904		    old_pgno, new_pgno, old_index, new_index)) != 0)
905			return (ret);
906	}
907	return (0);
908}
909