1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994
8 *	The Regents of the University of California.  All rights reserved.
9 *
10 * This code is derived from software contributed to Berkeley by
11 * Margo Seltzer.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 * 1. Redistributions of source code must retain the above copyright
17 *    notice, this list of conditions and the following disclaimer.
18 * 2. Redistributions in binary form must reproduce the above copyright
19 *    notice, this list of conditions and the following disclaimer in the
20 *    documentation and/or other materials provided with the distribution.
21 * 3. Neither the name of the University nor the names of its contributors
22 *    may be used to endorse or promote products derived from this software
23 *    without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
26 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 *
37 * $Id: hash_dup.c,v 12.29 2008/01/08 20:58:33 bostic Exp $
38 */
39
40/*
41 * PACKAGE:  hashing
42 *
43 * DESCRIPTION:
44 *	Manipulation of duplicates for the hash package.
45 */
46
47#include "db_config.h"
48
49#include "db_int.h"
50#include "dbinc/db_page.h"
51#include "dbinc/hash.h"
52#include "dbinc/btree.h"
53#include "dbinc/mp.h"
54
55static int __hamc_chgpg __P((DBC *,
56    db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
57static int __ham_check_move __P((DBC *, u_int32_t));
58static int __ham_dcursor __P((DBC *, db_pgno_t, u_int32_t));
59static int __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t));
60
61/*
62 * Called from hash_access to add a duplicate key. nval is the new
63 * value that we want to add.  The flags correspond to the flag values
64 * to cursor_put indicating where to add the new element.
65 * There are 4 cases.
66 * Case 1: The existing duplicate set already resides on a separate page.
67 *	   We return and let the common code handle this.
68 * Case 2: The element is small enough to just be added to the existing set.
69 * Case 3: The element is large enough to be a big item, so we're going to
70 *	   have to push the set onto a new page.
71 * Case 4: The element is large enough to push the duplicate set onto a
72 *	   separate page.
73 *
74 * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t, db_pgno_t *));
75 */
76int
77__ham_add_dup(dbc, nval, flags, pgnop)
78	DBC *dbc;
79	DBT *nval;
80	u_int32_t flags;
81	db_pgno_t *pgnop;
82{
83	DB *dbp;
84	DBT pval, tmp_val;
85	DB_MPOOLFILE *mpf;
86	ENV *env;
87	HASH_CURSOR *hcp;
88	u_int32_t add_bytes, new_size;
89	int cmp, ret;
90	u_int8_t *hk;
91
92	dbp = dbc->dbp;
93	env = dbp->env;
94	mpf = dbp->mpf;
95	hcp = (HASH_CURSOR *)dbc->internal;
96
97	DB_ASSERT(env, flags != DB_CURRENT);
98
99	add_bytes = nval->size +
100	    (F_ISSET(nval, DB_DBT_PARTIAL) ? nval->doff : 0);
101	add_bytes = DUP_SIZE(add_bytes);
102
103	if ((ret = __ham_check_move(dbc, add_bytes)) != 0)
104		return (ret);
105
106	/*
107	 * Check if resulting duplicate set is going to need to go
108	 * onto a separate duplicate page.  If so, convert the
109	 * duplicate set and add the new one.  After conversion,
110	 * hcp->dndx is the first free ndx or the index of the
111	 * current pointer into the duplicate set.
112	 */
113	hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
114	/* Add the len bytes to the current singleton. */
115	if (HPAGE_PTYPE(hk) != H_DUPLICATE)
116		add_bytes += DUP_SIZE(0);
117	new_size =
118	    LEN_HKEYDATA(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)) +
119	    add_bytes;
120
121	/*
122	 * We convert to off-page duplicates if the item is a big item,
123	 * the addition of the new item will make the set large, or
124	 * if there isn't enough room on this page to add the next item.
125	 */
126	if (HPAGE_PTYPE(hk) != H_OFFDUP &&
127	    (HPAGE_PTYPE(hk) == H_OFFPAGE || ISBIG(hcp, new_size) ||
128	    add_bytes > P_FREESPACE(dbp, hcp->page))) {
129
130		if ((ret = __ham_dup_convert(dbc)) != 0)
131			return (ret);
132		return (hcp->opd->am_put(hcp->opd,
133		    NULL, nval, flags, NULL));
134	}
135
136	/* There are two separate cases here: on page and off page. */
137	if (HPAGE_PTYPE(hk) != H_OFFDUP) {
138		if (HPAGE_PTYPE(hk) != H_DUPLICATE) {
139			pval.flags = 0;
140			pval.data = HKEYDATA_DATA(hk);
141			pval.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize,
142			    hcp->indx);
143			if ((ret = __ham_make_dup(env,
144			    &pval, &tmp_val, &dbc->my_rdata.data,
145			    &dbc->my_rdata.ulen)) != 0 || (ret =
146			    __ham_replpair(dbc, &tmp_val, 1)) != 0)
147				return (ret);
148			hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
149			HPAGE_PTYPE(hk) = H_DUPLICATE;
150
151			/*
152			 * Update the cursor position since we now are in
153			 * duplicates.
154			 */
155			F_SET(hcp, H_ISDUP);
156			hcp->dup_off = 0;
157			hcp->dup_len = pval.size;
158			hcp->dup_tlen = DUP_SIZE(hcp->dup_len);
159		}
160
161		/* Now make the new entry a duplicate. */
162		if ((ret = __ham_make_dup(env, nval,
163		    &tmp_val, &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
164			return (ret);
165
166		tmp_val.dlen = 0;
167		switch (flags) {			/* On page. */
168		case DB_KEYFIRST:
169		case DB_KEYLAST:
170		case DB_NODUPDATA:
171			if (dbp->dup_compare != NULL) {
172				__ham_dsearch(dbc,
173				    nval, &tmp_val.doff, &cmp, flags);
174
175				/* dup dups are not supported w/ sorted dups */
176				if (cmp == 0)
177					return (__db_duperr(dbp, flags));
178			} else {
179				hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
180				    dbp->pgsize, hcp->indx);
181				hcp->dup_len = nval->size;
182				F_SET(hcp, H_ISDUP);
183				if (flags == DB_KEYFIRST)
184					hcp->dup_off = tmp_val.doff = 0;
185				else
186					hcp->dup_off =
187					    tmp_val.doff = hcp->dup_tlen;
188			}
189			break;
190		case DB_BEFORE:
191			tmp_val.doff = hcp->dup_off;
192			break;
193		case DB_AFTER:
194			tmp_val.doff = hcp->dup_off + DUP_SIZE(hcp->dup_len);
195			break;
196		default:
197			return (__db_unknown_path(env, "__ham_add_dup"));
198		}
199
200		/* Add the duplicate. */
201		if ((ret = __memp_dirty(mpf, &hcp->page,
202		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 ||
203		    (ret = __ham_replpair(dbc, &tmp_val, 0)) != 0)
204			return (ret);
205
206		/* Now, update the cursor if necessary. */
207		switch (flags) {
208		case DB_AFTER:
209			hcp->dup_off += DUP_SIZE(hcp->dup_len);
210			hcp->dup_len = nval->size;
211			hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
212			break;
213		case DB_BEFORE:
214		case DB_KEYFIRST:
215		case DB_KEYLAST:
216		case DB_NODUPDATA:
217			hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size);
218			hcp->dup_len = nval->size;
219			break;
220		default:
221			return (__db_unknown_path(env, "__ham_add_dup"));
222		}
223		ret = __hamc_update(dbc, tmp_val.size, DB_HAM_CURADJ_ADD, 1);
224		return (ret);
225	}
226
227	/*
228	 * If we get here, then we're on duplicate pages; set pgnop and
229	 * return so the common code can handle it.
230	 */
231	memcpy(pgnop, HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
232	    sizeof(db_pgno_t));
233
234	return (ret);
235}
236
237/*
238 * Convert an on-page set of duplicates to an offpage set of duplicates.
239 *
240 * PUBLIC: int __ham_dup_convert __P((DBC *));
241 */
242int
243__ham_dup_convert(dbc)
244	DBC *dbc;
245{
246	BOVERFLOW bo;
247	DB *dbp;
248	DBC **hcs;
249	DBT dbt;
250	DB_LSN lsn;
251	DB_MPOOLFILE *mpf;
252	ENV *env;
253	HASH_CURSOR *hcp;
254	HOFFPAGE ho;
255	PAGE *dp;
256	db_indx_t i, len, off;
257	int c, ret, t_ret;
258	u_int8_t *p, *pend;
259
260	dbp = dbc->dbp;
261	env = dbp->env;
262	mpf = dbp->mpf;
263	hcp = (HASH_CURSOR *)dbc->internal;
264
265	/*
266	 * Create a new page for the duplicates.
267	 */
268	if ((ret = __db_new(dbc,
269	    dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, &dp)) != 0)
270		return (ret);
271	P_INIT(dp, dbp->pgsize,
272	    dp->pgno, PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp));
273
274	/*
275	 * Get the list of cursors that may need to be updated.
276	 */
277	if ((ret = __ham_get_clist(dbp,
278	    PGNO(hcp->page), (u_int32_t)hcp->indx, &hcs)) != 0)
279		goto err;
280
281	/*
282	 * Now put the duplicates onto the new page.
283	 */
284	dbt.flags = 0;
285	switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
286	case H_KEYDATA:
287		/* Simple case, one key on page; move it to dup page. */
288		dbt.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
289		dbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
290		ret = __db_pitem(dbc,
291		    dp, 0, BKEYDATA_SIZE(dbt.size), NULL, &dbt);
292		goto finish;
293	case H_OFFPAGE:
294		/* Simple case, one key on page; move it to dup page. */
295		memcpy(&ho, P_ENTRY(dbp, hcp->page, H_DATAINDEX(hcp->indx)),
296		    HOFFPAGE_SIZE);
297		UMRW_SET(bo.unused1);
298		B_TSET(bo.type, ho.type);
299		UMRW_SET(bo.unused2);
300		bo.pgno = ho.pgno;
301		bo.tlen = ho.tlen;
302		dbt.size = BOVERFLOW_SIZE;
303		dbt.data = &bo;
304
305		ret = __db_pitem(dbc, dp, 0, dbt.size, &dbt, NULL);
306finish:		if (ret == 0) {
307			/* Update any other cursors. */
308			if (hcs != NULL && DBC_LOGGING(dbc) &&
309			    IS_SUBTRANSACTION(dbc->txn)) {
310				if ((ret = __ham_chgpg_log(dbp, dbc->txn,
311				    &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
312				    PGNO(dp), hcp->indx, 0)) != 0)
313					break;
314			}
315			for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
316				if ((ret = __ham_dcursor(hcs[c],
317				    PGNO(dp), 0)) != 0)
318					break;
319		}
320		break;
321	case H_DUPLICATE:
322		p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
323		pend = p +
324		    LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
325
326		/*
327		 * We need to maintain the duplicate cursor position.
328		 * Keep track of where we are in the duplicate set via
329		 * the offset, and when it matches the one in the cursor,
330		 * set the off-page duplicate cursor index to the current
331		 * index.
332		 */
333		for (off = 0, i = 0; p < pend; i++) {
334			memcpy(&len, p, sizeof(db_indx_t));
335			dbt.size = len;
336			p += sizeof(db_indx_t);
337			dbt.data = p;
338			p += len + sizeof(db_indx_t);
339			if ((ret = __db_pitem(dbc, dp,
340			    i, BKEYDATA_SIZE(dbt.size), NULL, &dbt)) != 0)
341				break;
342
343			/* Update any other cursors */
344			if (hcs != NULL && DBC_LOGGING(dbc) &&
345			    IS_SUBTRANSACTION(dbc->txn)) {
346				if ((ret = __ham_chgpg_log(dbp, dbc->txn,
347				    &lsn, 0, DB_HAM_DUP, PGNO(hcp->page),
348				    PGNO(dp), hcp->indx, i)) != 0)
349					break;
350			}
351			for (c = 0; hcs != NULL && hcs[c] != NULL; c++)
352				if (((HASH_CURSOR *)(hcs[c]->internal))->dup_off
353				    == off && (ret = __ham_dcursor(hcs[c],
354				    PGNO(dp), i)) != 0)
355					goto err;
356			off += len + 2 * sizeof(db_indx_t);
357		}
358		break;
359	default:
360		ret = __db_pgfmt(env, hcp->pgno);
361		break;
362	}
363
364	/*
365	 * Now attach this to the source page in place of the old duplicate
366	 * item.
367	 */
368	if (ret == 0)
369		ret = __memp_dirty(mpf,
370		    &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0);
371
372	if (ret == 0)
373		ret = __ham_move_offpage(dbc, hcp->page,
374		    (u_int32_t)H_DATAINDEX(hcp->indx), PGNO(dp));
375
376err:	if ((t_ret = __memp_fput(mpf,
377	    dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0)
378		ret = t_ret;
379
380	if (ret == 0)
381		hcp->dup_tlen = hcp->dup_off = hcp->dup_len = 0;
382
383	if (hcs != NULL)
384		__os_free(env, hcs);
385
386	return (ret);
387}
388
389/*
390 * __ham_make_dup
391 *
392 * Take a regular dbt and make it into a duplicate item with all the partial
393 * information set appropriately. If the incoming dbt is a partial, assume
394 * we are creating a new entry and make sure that we do any initial padding.
395 *
396 * PUBLIC: int __ham_make_dup __P((ENV *,
397 * PUBLIC:     const DBT *, DBT *d, void **, u_int32_t *));
398 */
399int
400__ham_make_dup(env, notdup, duplicate, bufp, sizep)
401	ENV *env;
402	const DBT *notdup;
403	DBT *duplicate;
404	void **bufp;
405	u_int32_t *sizep;
406{
407	db_indx_t tsize, item_size;
408	int ret;
409	u_int8_t *p;
410
411	item_size = (db_indx_t)notdup->size;
412	if (F_ISSET(notdup, DB_DBT_PARTIAL))
413		item_size += notdup->doff;
414
415	tsize = DUP_SIZE(item_size);
416	if ((ret = __ham_init_dbt(env, duplicate, tsize, bufp, sizep)) != 0)
417		return (ret);
418
419	duplicate->dlen = 0;
420	duplicate->flags = notdup->flags;
421	F_SET(duplicate, DB_DBT_PARTIAL);
422
423	p = duplicate->data;
424	memcpy(p, &item_size, sizeof(db_indx_t));
425	p += sizeof(db_indx_t);
426	if (F_ISSET(notdup, DB_DBT_PARTIAL)) {
427		memset(p, 0, notdup->doff);
428		p += notdup->doff;
429	}
430	memcpy(p, notdup->data, notdup->size);
431	p += notdup->size;
432	memcpy(p, &item_size, sizeof(db_indx_t));
433
434	duplicate->doff = 0;
435	duplicate->dlen = notdup->size;
436
437	return (0);
438}
439
440/*
441 * __ham_check_move --
442 *
443 * Check if we can do whatever we need to on this page.  If not,
444 * then we'll have to move the current element to a new page.
445 */
446static int
447__ham_check_move(dbc, add_len)
448	DBC *dbc;
449	u_int32_t add_len;
450{
451	DB *dbp;
452	DBT k, d;
453	DB_LSN new_lsn;
454	DB_MPOOLFILE *mpf;
455	HASH_CURSOR *hcp;
456	PAGE *next_pagep;
457	db_pgno_t next_pgno;
458	u_int32_t new_datalen, old_len, rectype;
459	db_indx_t new_indx;
460	u_int8_t *hk;
461	int key_type, match, ret, t_ret;
462
463	dbp = dbc->dbp;
464	mpf = dbp->mpf;
465	hcp = (HASH_CURSOR *)dbc->internal;
466
467	hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
468
469	/*
470	 * If the item is already off page duplicates or an offpage item,
471	 * then we know we can do whatever we need to do in-place
472	 */
473	if (HPAGE_PTYPE(hk) == H_OFFDUP || HPAGE_PTYPE(hk) == H_OFFPAGE)
474		return (0);
475
476	old_len =
477	    LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx));
478	new_datalen = (old_len - HKEYDATA_SIZE(0)) + add_len;
479	if (HPAGE_PTYPE(hk) != H_DUPLICATE)
480		new_datalen += DUP_SIZE(0);
481
482	/*
483	 * We need to add a new page under two conditions:
484	 * 1. The addition makes the total data length cross the BIG
485	 *    threshold and the OFFDUP structure won't fit on this page.
486	 * 2. The addition does not make the total data cross the
487	 *    threshold, but the new data won't fit on the page.
488	 * If neither of these is true, then we can return.
489	 */
490	if (ISBIG(hcp, new_datalen) && (old_len > HOFFDUP_SIZE ||
491	    HOFFDUP_SIZE - old_len <= P_FREESPACE(dbp, hcp->page)))
492		return (0);
493
494	if (!ISBIG(hcp, new_datalen) &&
495	    (new_datalen - old_len) <= P_FREESPACE(dbp, hcp->page))
496		return (0);
497
498	/*
499	 * If we get here, then we need to move the item to a new page.
500	 * Check if there are more pages in the chain.  We now need to
501	 * update new_datalen to include the size of both the key and
502	 * the data that we need to move.
503	 */
504
505	new_datalen = ISBIG(hcp, new_datalen) ?
506	    HOFFDUP_SIZE : HKEYDATA_SIZE(new_datalen);
507	new_datalen +=
508	    LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_KEYINDEX(hcp->indx));
509
510	next_pagep = NULL;
511	for (next_pgno = NEXT_PGNO(hcp->page); next_pgno != PGNO_INVALID;
512	    next_pgno = NEXT_PGNO(next_pagep)) {
513		if (next_pagep != NULL && (ret = __memp_fput(mpf,
514		    dbc->thread_info, next_pagep, dbc->priority)) != 0)
515			return (ret);
516
517		if ((ret = __memp_fget(mpf,
518		    &next_pgno, dbc->thread_info, dbc->txn,
519		    DB_MPOOL_CREATE, &next_pagep)) != 0)
520			return (ret);
521
522		if (P_FREESPACE(dbp, next_pagep) >= new_datalen)
523			break;
524	}
525
526	/* No more pages, add one. */
527	if ((ret = __memp_dirty(mpf,
528	    &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
529		return (ret);
530
531	if (next_pagep == NULL && (ret = __ham_add_ovflpage(dbc,
532	    hcp->page, 0, &next_pagep)) != 0)
533		return (ret);
534
535	/* Add new page at the end of the chain. */
536	if ((ret = __memp_dirty(mpf,
537	    &next_pagep, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
538		(void)__memp_fput(mpf,
539		    dbc->thread_info, next_pagep, dbc->priority);
540		return (ret);
541	}
542
543	if (P_FREESPACE(dbp, next_pagep) < new_datalen && (ret =
544	    __ham_add_ovflpage(dbc, next_pagep, 1, &next_pagep)) != 0) {
545		(void)__memp_fput(mpf,
546		    dbc->thread_info, next_pagep, dbc->priority);
547		return (ret);
548	}
549
550	/* Copy the item to the new page. */
551	if (DBC_LOGGING(dbc)) {
552		rectype = PUTPAIR;
553		k.flags = 0;
554		d.flags = 0;
555		if (HPAGE_PTYPE(
556		    H_PAIRKEY(dbp, hcp->page, hcp->indx)) == H_OFFPAGE) {
557			rectype |= PAIR_KEYMASK;
558			k.data = H_PAIRKEY(dbp, hcp->page, hcp->indx);
559			k.size = HOFFPAGE_SIZE;
560			key_type = H_OFFPAGE;
561		} else {
562			k.data =
563			    HKEYDATA_DATA(H_PAIRKEY(dbp, hcp->page, hcp->indx));
564			k.size =
565			    LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx);
566			key_type = H_KEYDATA;
567		}
568
569		/* Resolve the insert index so it can be written to the log. */
570		if ((ret = __ham_getindex(dbc, next_pagep, &k,
571		    key_type, &match, &new_indx)) != 0)
572			return (ret);
573
574		if (HPAGE_PTYPE(hk) == H_OFFPAGE) {
575			rectype |= PAIR_DATAMASK;
576			d.data = H_PAIRDATA(dbp, hcp->page, hcp->indx);
577			d.size = HOFFPAGE_SIZE;
578		} else {
579			if (HPAGE_PTYPE(H_PAIRDATA(dbp,
580			    hcp->page, hcp->indx)) == H_DUPLICATE)
581				rectype |= PAIR_DUPMASK;
582			d.data = HKEYDATA_DATA(
583			    H_PAIRDATA(dbp, hcp->page, hcp->indx));
584			d.size = LEN_HDATA(dbp, hcp->page,
585			    dbp->pgsize, hcp->indx);
586		}
587
588		if ((ret = __ham_insdel_log(dbp,
589		    dbc->txn, &new_lsn, 0, rectype, PGNO(next_pagep),
590		    (u_int32_t)new_indx, &LSN(next_pagep),
591		    &k, &d)) != 0) {
592			(void)__memp_fput(mpf,
593			    dbc->thread_info, next_pagep, dbc->priority);
594			return (ret);
595		}
596	} else {
597		LSN_NOT_LOGGED(new_lsn);
598		/*
599		 * Ensure that an invalid index is passed to __ham_copypair, so
600		 * it knows to resolve the index. Resolving the insert index
601		 * here would require creating a temporary DBT with the key,
602		 * and calling __ham_getindex. Let __ham_copypair do the
603		 * resolution using the final key DBT.
604		 */
605		new_indx = NDX_INVALID;
606	}
607
608	/* Move lsn onto page. */
609	if ((ret = __memp_dirty(mpf,
610	    &next_pagep, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) {
611		(void)__memp_fput(mpf,
612		    dbc->thread_info, next_pagep, dbc->priority);
613		return (ret);
614	}
615	LSN(next_pagep) = new_lsn;	/* Structure assignment. */
616
617	if ((ret = __ham_copypair(dbc, hcp->page,
618	    H_KEYINDEX(hcp->indx), next_pagep, &new_indx)) != 0)
619	    goto out;
620
621	/* Update all cursors that used to point to this item. */
622	if ((ret = __hamc_chgpg(dbc, PGNO(hcp->page), H_KEYINDEX(hcp->indx),
623	    PGNO(next_pagep), new_indx)) != 0)
624		goto out;
625
626	/* Now delete the pair from the current page. */
627	ret = __ham_del_pair(dbc, HAM_DEL_NO_RECLAIM);
628
629	/*
630	 * __ham_del_pair decremented nelem.  This is incorrect;  we
631	 * manually copied the element elsewhere, so the total number
632	 * of elements hasn't changed.  Increment it again.
633	 *
634	 * !!!
635	 * Note that we still have the metadata page pinned, and
636	 * __ham_del_pair dirtied it, so we don't need to set the dirty
637	 * flag again.
638	 */
639	if (!STD_LOCKING(dbc))
640		hcp->hdr->nelem++;
641
642out:	if ((t_ret = __memp_fput(mpf,
643	    dbc->thread_info, hcp->page, dbc->priority)) != 0 && ret == 0)
644		ret = t_ret;
645	hcp->page = next_pagep;
646	hcp->pgno = PGNO(hcp->page);
647	hcp->indx = new_indx;
648	F_SET(hcp, H_EXPAND);
649	F_CLR(hcp, H_DELETED);
650
651	return (ret);
652}
653
654/*
655 * __ham_move_offpage --
656 *	Replace an onpage set of duplicates with the OFFDUP structure
657 *	that references the duplicate page.
658 *
659 * XXX
660 * This is really just a special case of __onpage_replace; we should
661 * probably combine them.
662 *
663 */
664static int
665__ham_move_offpage(dbc, pagep, ndx, pgno)
666	DBC *dbc;
667	PAGE *pagep;
668	u_int32_t ndx;
669	db_pgno_t pgno;
670{
671	DB *dbp;
672	DBT new_dbt;
673	DBT old_dbt;
674	HOFFDUP od;
675	db_indx_t i, *inp;
676	int32_t difflen;
677	u_int8_t *src;
678	int ret;
679
680	dbp = dbc->dbp;
681	od.type = H_OFFDUP;
682	UMRW_SET(od.unused[0]);
683	UMRW_SET(od.unused[1]);
684	UMRW_SET(od.unused[2]);
685	od.pgno = pgno;
686	ret = 0;
687
688	if (DBC_LOGGING(dbc)) {
689		new_dbt.data = &od;
690		new_dbt.size = HOFFDUP_SIZE;
691		old_dbt.data = P_ENTRY(dbp, pagep, ndx);
692		old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx);
693		if ((ret = __ham_replace_log(dbp, dbc->txn, &LSN(pagep), 0,
694		    PGNO(pagep), (u_int32_t)ndx, &LSN(pagep), -1,
695		    &old_dbt, &new_dbt, 0)) != 0)
696			return (ret);
697	} else
698		LSN_NOT_LOGGED(LSN(pagep));
699
700	/*
701	 * difflen is the difference in the lengths, and so may be negative.
702	 * We know that the difference between two unsigned lengths from a
703	 * database page will fit into an int32_t.
704	 */
705	difflen =
706	    (int32_t)LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) -
707	    (int32_t)HOFFDUP_SIZE;
708	if (difflen != 0) {
709		/* Copy data. */
710		inp = P_INP(dbp, pagep);
711		src = (u_int8_t *)(pagep) + HOFFSET(pagep);
712		memmove(src + difflen, src, inp[ndx] - HOFFSET(pagep));
713		HOFFSET(pagep) += difflen;
714
715		/* Update index table. */
716		for (i = ndx; i < NUM_ENT(pagep); i++)
717			inp[i] += difflen;
718	}
719
720	/* Now copy the offdup entry onto the page. */
721	memcpy(P_ENTRY(dbp, pagep, ndx), &od, HOFFDUP_SIZE);
722	return (ret);
723}
724
725/*
726 * __ham_dsearch:
727 *	Locate a particular duplicate in a duplicate set.  Make sure that
728 *	we exit with the cursor set appropriately.
729 *
730 * PUBLIC: void __ham_dsearch
731 * PUBLIC:     __P((DBC *, DBT *, u_int32_t *, int *, u_int32_t));
732 */
733void
734__ham_dsearch(dbc, dbt, offp, cmpp, flags)
735	DBC *dbc;
736	DBT *dbt;
737	u_int32_t *offp, flags;
738	int *cmpp;
739{
740	DB *dbp;
741	DBT cur;
742	HASH_CURSOR *hcp;
743	db_indx_t i, len;
744	int (*func) __P((DB *, const DBT *, const DBT *));
745	u_int8_t *data;
746
747	dbp = dbc->dbp;
748	hcp = (HASH_CURSOR *)dbc->internal;
749	func = dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare;
750
751	i = F_ISSET(hcp, H_CONTINUE) ? hcp->dup_off: 0;
752	data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + i;
753	hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
754	len = hcp->dup_len;
755	while (i < hcp->dup_tlen) {
756		memcpy(&len, data, sizeof(db_indx_t));
757		data += sizeof(db_indx_t);
758		DB_SET_DBT(cur, data, len);
759
760		/*
761		 * If we find an exact match, we're done.  If in a sorted
762		 * duplicate set and the item is larger than our test item,
763		 * we're done.  In the latter case, if permitting partial
764		 * matches, it's not a failure.
765		 */
766		*cmpp = func(dbp, dbt, &cur);
767		if (*cmpp == 0)
768			break;
769		if (*cmpp < 0 && dbp->dup_compare != NULL) {
770			if (flags == DB_GET_BOTH_RANGE)
771				*cmpp = 0;
772			break;
773		}
774
775		i += len + 2 * sizeof(db_indx_t);
776		data += len + sizeof(db_indx_t);
777	}
778
779	*offp = i;
780	hcp->dup_off = i;
781	hcp->dup_len = len;
782	F_SET(hcp, H_ISDUP);
783}
784
785/*
786 * __ham_dcursor --
787 *
788 *	Create an off page duplicate cursor for this cursor.
789 */
790static int
791__ham_dcursor(dbc, pgno, indx)
792	DBC *dbc;
793	db_pgno_t pgno;
794	u_int32_t indx;
795{
796	BTREE_CURSOR *dcp;
797	DB *dbp;
798	HASH_CURSOR *hcp;
799	int ret;
800
801	dbp = dbc->dbp;
802	hcp = (HASH_CURSOR *)dbc->internal;
803
804	if ((ret = __dbc_newopd(dbc, pgno, hcp->opd, &hcp->opd)) != 0)
805		return (ret);
806
807	dcp = (BTREE_CURSOR *)hcp->opd->internal;
808	dcp->pgno = pgno;
809	dcp->indx = indx;
810
811	if (dbp->dup_compare == NULL) {
812		/*
813		 * Converting to off-page Recno trees is tricky.  The
814		 * record number for the cursor is the index + 1 (to
815		 * convert to 1-based record numbers).
816		 */
817		dcp->recno = indx + 1;
818	}
819
820	/*
821	 * Transfer the deleted flag from the top-level cursor to the
822	 * created one.
823	 */
824	if (F_ISSET(hcp, H_DELETED)) {
825		F_SET(dcp, C_DELETED);
826		F_CLR(hcp, H_DELETED);
827	}
828
829	return (0);
830}
831
832/*
833 * __hamc_chgpg --
834 *	Adjust the cursors after moving an item to a new page.  We only
835 *	move cursors that are pointing at this one item and are not
836 *	deleted;  since we only touch non-deleted cursors, and since
837 *	(by definition) no item existed at the pgno/indx we're moving the
838 *	item to, we're guaranteed that all the cursors we affect here or
839 *	on abort really do refer to this one item.
840 */
841static int
842__hamc_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
843	DBC *dbc;
844	db_pgno_t old_pgno, new_pgno;
845	u_int32_t old_index, new_index;
846{
847	DB *dbp, *ldbp;
848	DBC *cp;
849	DB_LSN lsn;
850	DB_TXN *my_txn;
851	ENV *env;
852	HASH_CURSOR *hcp;
853	int found, ret;
854
855	dbp = dbc->dbp;
856	env = dbp->env;
857
858	my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
859
860	MUTEX_LOCK(env, env->mtx_dblist);
861	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
862	for (found = 0;
863	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
864	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
865		MUTEX_LOCK(env, dbp->mutex);
866		TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
867			if (cp == dbc || cp->dbtype != DB_HASH)
868				continue;
869
870			hcp = (HASH_CURSOR *)cp->internal;
871
872			/*
873			 * If a cursor is deleted, it doesn't refer to this
874			 * item--it just happens to have the same indx, but
875			 * it points to a former neighbor.  Don't move it.
876			 */
877			if (F_ISSET(hcp, H_DELETED))
878				continue;
879
880			if (hcp->pgno == old_pgno &&
881			    !MVCC_SKIP_CURADJ(cp, old_pgno)) {
882				if (hcp->indx == old_index) {
883					hcp->pgno = new_pgno;
884					hcp->indx = new_index;
885				} else
886					continue;
887				if (my_txn != NULL && cp->txn != my_txn)
888					found = 1;
889			}
890		}
891		MUTEX_UNLOCK(env, dbp->mutex);
892	}
893	MUTEX_UNLOCK(env, env->mtx_dblist);
894
895	if (found != 0 && DBC_LOGGING(dbc)) {
896		if ((ret = __ham_chgpg_log(dbp, my_txn, &lsn, 0, DB_HAM_CHGPG,
897		    old_pgno, new_pgno, old_index, new_index)) != 0)
898			return (ret);
899	}
900	return (0);
901}
902