1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996-2009 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994
8 *	Margo Seltzer.  All rights reserved.
9 */
10/*
11 * Copyright (c) 1990, 1993, 1994
12 *	The Regents of the University of California.  All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Margo Seltzer.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 *    notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 *    notice, this list of conditions and the following disclaimer in the
24 *    documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 *    may be used to endorse or promote products derived from this software
27 *    without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id$
42 */
43
44#include "db_config.h"
45
46#include "db_int.h"
47#include "dbinc/db_page.h"
48#include "dbinc/btree.h"
49#include "dbinc/hash.h"
50#include "dbinc/lock.h"
51#include "dbinc/mp.h"
52#include "dbinc/partition.h"
53
54static int  __ham_bulk __P((DBC *, DBT *, u_int32_t));
55static int  __hamc_close __P((DBC *, db_pgno_t, int *));
56static int  __hamc_del __P((DBC *, u_int32_t));
57static int  __hamc_destroy __P((DBC *));
58static int  __hamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
59static int  __hamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
60static int  __hamc_writelock __P((DBC *));
61static int  __ham_dup_return __P((DBC *, DBT *, u_int32_t));
62static int  __ham_expand_table __P((DBC *));
63static int  __ham_lookup __P((DBC *,
64		const DBT *, u_int32_t, db_lockmode_t, db_pgno_t *));
65
66/*
67 * __ham_quick_delete --
68 *	This function is called by __db_del when the appropriate conditions
69 *	are met, and it performs the delete in the optimized way.
70 *
71 * PUBLIC: int __ham_quick_delete __P((DBC *));
72 */
73int
74__ham_quick_delete(dbc)
75	DBC *dbc;
76{
77	int ret, t_ret;
78
79	/*
80	 * When performing a DB->del operation not involving secondary indices
81	 * and not removing an off-page duplicate tree, we can speed things up
82	 * substantially by removing the entire duplicate set, if any is
83	 * present, in one operation, rather than by conjuring up and deleting
84	 * each of the items individually.  (All are stored in one big HKEYDATA
85	 * structure.)  We don't bother to distinguish on-page duplicate sets
86	 * from single, non-dup items;  they're deleted in exactly the same way.
87	 *
88	 * The cursor should be set to the first item in the duplicate set, or
89	 * to the sole key/data pair when the key does not have a duplicate set,
90	 * before the function is called.
91	 *
92	 * We do not need to call CDB_LOCKING_INIT, __db_del calls here with
93	 * a write cursor.
94	 *
95	 * Assert we're initialized, but not to an off-page duplicate.
96	 * Assert we're not using secondary indices.
97	 */
98	DB_ASSERT(dbc->env, IS_INITIALIZED(dbc));
99	DB_ASSERT(dbc->env, dbc->internal->opd == NULL);
100	DB_ASSERT(dbc->env, !F_ISSET(dbc->dbp, DB_AM_SECONDARY));
101	DB_ASSERT(dbc->env, !DB_IS_PRIMARY(dbc->dbp));
102
103	if ((ret = __ham_get_meta(dbc)) != 0)
104		return (ret);
105
106	if ((ret = __hamc_writelock(dbc)) == 0)
107		ret = __ham_del_pair(dbc, 0);
108
109	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
110		ret = t_ret;
111
112	return (ret);
113}
114
115/* ****************** CURSORS ********************************** */
116/*
117 * __hamc_init --
118 *	Initialize the hash-specific portion of a cursor.
119 *
120 * PUBLIC: int __hamc_init __P((DBC *));
121 */
122int
123__hamc_init(dbc)
124	DBC *dbc;
125{
126	ENV *env;
127	HASH_CURSOR *new_curs;
128	int ret;
129
130	env = dbc->env;
131	if ((ret = __os_calloc(env,
132	    1, sizeof(struct cursor_t), &new_curs)) != 0)
133		return (ret);
134	if ((ret = __os_malloc(env,
135	    dbc->dbp->pgsize, &new_curs->split_buf)) != 0) {
136		__os_free(env, new_curs);
137		return (ret);
138	}
139
140	dbc->internal = (DBC_INTERNAL *) new_curs;
141	dbc->close = dbc->c_close = __dbc_close_pp;
142	dbc->cmp = __dbc_cmp_pp;
143	dbc->count = dbc->c_count = __dbc_count_pp;
144	dbc->del = dbc->c_del = __dbc_del_pp;
145	dbc->dup = dbc->c_dup = __dbc_dup_pp;
146	dbc->get = dbc->c_get = __dbc_get_pp;
147	dbc->pget = dbc->c_pget = __dbc_pget_pp;
148	dbc->put = dbc->c_put = __dbc_put_pp;
149	dbc->am_bulk = __ham_bulk;
150	dbc->am_close = __hamc_close;
151	dbc->am_del = __hamc_del;
152	dbc->am_destroy = __hamc_destroy;
153	dbc->am_get = __hamc_get;
154	dbc->am_put = __hamc_put;
155	dbc->am_writelock = __hamc_writelock;
156
157	return (__ham_item_init(dbc));
158}
159
160/*
161 * __hamc_close --
162 *	Close down the cursor from a single use.
163 */
164static int
165__hamc_close(dbc, root_pgno, rmroot)
166	DBC *dbc;
167	db_pgno_t root_pgno;
168	int *rmroot;
169{
170	DB_MPOOLFILE *mpf;
171	HASH_CURSOR *hcp;
172	HKEYDATA *dp;
173	db_lockmode_t lock_mode;
174	int doroot, gotmeta, ret, t_ret;
175
176	COMPQUIET(rmroot, 0);
177	mpf = dbc->dbp->mpf;
178	doroot = gotmeta = ret = 0;
179	hcp = (HASH_CURSOR *) dbc->internal;
180
181	/* Check for off page dups. */
182	if (dbc->internal->opd != NULL) {
183		if ((ret = __ham_get_meta(dbc)) != 0)
184			goto done;
185		gotmeta = 1;
186		lock_mode = DB_LOCK_READ;
187
188		/* To support dirty reads we must reget the write lock. */
189		if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED) &&
190		     F_ISSET((BTREE_CURSOR *)
191		     dbc->internal->opd->internal, C_DELETED))
192			lock_mode = DB_LOCK_WRITE;
193
194		if ((ret = __ham_get_cpage(dbc, lock_mode)) != 0)
195			goto out;
196		dp = (HKEYDATA *)H_PAIRDATA(dbc->dbp, hcp->page, hcp->indx);
197
198		/* If it's not a dup we aborted before we changed it. */
199		if (HPAGE_PTYPE(dp) == H_OFFDUP)
200			memcpy(&root_pgno,
201			    HOFFPAGE_PGNO(dp), sizeof(db_pgno_t));
202		else
203			root_pgno = PGNO_INVALID;
204
205		if ((ret =
206		    hcp->opd->am_close(hcp->opd, root_pgno, &doroot)) != 0)
207			goto out;
208		if (doroot != 0) {
209			if ((ret = __memp_dirty(mpf, &hcp->page,
210			    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
211				goto out;
212			if ((ret = __ham_del_pair(dbc, 0)) != 0)
213				goto out;
214		}
215	}
216
217out:	if (hcp->page != NULL && (t_ret = __memp_fput(mpf,
218	    dbc->thread_info, hcp->page, dbc->priority)) != 0 && ret == 0)
219		ret = t_ret;
220	if (gotmeta != 0 && (t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
221		ret = t_ret;
222
223done:	if ((t_ret = __ham_item_init(dbc)) != 0 && ret == 0)
224		ret = t_ret;
225	return (ret);
226}
227
228/*
229 * __hamc_destroy --
230 *	Cleanup the access method private part of a cursor.
231 */
232static int
233__hamc_destroy(dbc)
234	DBC *dbc;
235{
236	HASH_CURSOR *hcp;
237
238	hcp = (HASH_CURSOR *)dbc->internal;
239	if (hcp->split_buf != NULL)
240		__os_free(dbc->env, hcp->split_buf);
241	__os_free(dbc->env, hcp);
242
243	return (0);
244}
245
246/*
247 * __hamc_count --
248 *	Return a count of on-page duplicates.
249 *
250 * PUBLIC: int __hamc_count __P((DBC *, db_recno_t *));
251 */
252int
253__hamc_count(dbc, recnop)
254	DBC *dbc;
255	db_recno_t *recnop;
256{
257	DB *dbp;
258	DB_MPOOLFILE *mpf;
259	HASH_CURSOR *hcp;
260	db_indx_t len;
261	db_recno_t recno;
262	int ret, t_ret;
263	u_int8_t *p, *pend;
264
265	dbp = dbc->dbp;
266	mpf = dbp->mpf;
267	hcp = (HASH_CURSOR *)dbc->internal;
268
269	recno = 0;
270
271	if ((ret = __ham_get_cpage(dbc, DB_LOCK_READ)) != 0)
272		return (ret);
273	if (hcp->indx >= NUM_ENT(hcp->page)) {
274		*recnop = 0;
275		goto err;
276	}
277
278	switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
279	case H_KEYDATA:
280	case H_OFFPAGE:
281		recno = 1;
282		break;
283	case H_DUPLICATE:
284		p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
285		pend = p +
286		    LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
287		for (; p < pend; recno++) {
288			/* p may be odd, so copy rather than just dereffing */
289			memcpy(&len, p, sizeof(db_indx_t));
290			p += 2 * sizeof(db_indx_t) + len;
291		}
292
293		break;
294	default:
295		ret = __db_pgfmt(dbp->env, hcp->pgno);
296		goto err;
297	}
298
299	*recnop = recno;
300
301err:	if ((t_ret = __memp_fput(mpf,
302	     dbc->thread_info, hcp->page, dbc->priority)) != 0 && ret == 0)
303		ret = t_ret;
304	hcp->page = NULL;
305	return (ret);
306}
307
308/*
309 * __hamc_cmp --
310 *	Compare two hash cursors for equality.
311 *
312 * This function is only called with two cursors that point to the same item.
313 * It distinguishes two cases:
314 * * Cursors pointing to different items in the same on-page duplicate set.
315 * * Cursors pointing to the same item, with different DELETED flags.
316 *
317 * PUBLIC: int __hamc_cmp __P((DBC *, DBC *, int *));
318 */
319int
320__hamc_cmp(dbc, other_dbc, result)
321	DBC *dbc, *other_dbc;
322	int *result;
323{
324	ENV *env;
325	HASH_CURSOR *hcp, *ohcp;
326
327	env = dbc->env;
328	hcp = (HASH_CURSOR *)dbc->internal;
329	ohcp = (HASH_CURSOR *)other_dbc->internal;
330
331	DB_ASSERT (env, hcp->pgno == ohcp->pgno);
332	DB_ASSERT (env, hcp->indx == ohcp->indx);
333
334	/* Only compare the duplicate offsets if this is a duplicate item. */
335	if ((F_ISSET(hcp, H_ISDUP) && hcp->dup_off != ohcp->dup_off) ||
336	    F_ISSET(hcp, H_DELETED) != F_ISSET(ohcp, H_DELETED))
337		*result = 1;
338	else
339		*result = 0;
340	return (0);
341}
342
343static int
344__hamc_del(dbc, flags)
345	DBC *dbc;
346	u_int32_t flags;
347{
348	DB *dbp;
349	DBT repldbt;
350	DB_MPOOLFILE *mpf;
351	HASH_CURSOR *hcp;
352	int ret, t_ret;
353
354	COMPQUIET(flags, 0);
355	dbp = dbc->dbp;
356	mpf = dbp->mpf;
357	hcp = (HASH_CURSOR *)dbc->internal;
358
359	if (F_ISSET(hcp, H_DELETED))
360		return (DB_NOTFOUND);
361
362	if ((ret = __ham_get_meta(dbc)) != 0)
363		goto out;
364
365	if ((ret = __ham_get_cpage(dbc, DB_LOCK_WRITE)) != 0)
366		goto out;
367
368	/* Off-page duplicates. */
369	if (HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP)
370		goto out;
371
372	DB_ASSERT(dbp->env, IS_DIRTY(hcp->page));
373
374	if (F_ISSET(hcp, H_ISDUP)) { /* On-page duplicate. */
375		if (hcp->dup_off == 0 &&
376		    DUP_SIZE(hcp->dup_len) == LEN_HDATA(dbp, hcp->page,
377		    hcp->hdr->dbmeta.pagesize, hcp->indx))
378			ret = __ham_del_pair(dbc, 0);
379		else {
380			repldbt.flags = 0;
381			F_SET(&repldbt, DB_DBT_PARTIAL);
382			repldbt.doff = hcp->dup_off;
383			repldbt.dlen = DUP_SIZE(hcp->dup_len);
384			repldbt.size = 0;
385			repldbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page,
386			    hcp->indx));
387			if ((ret = __ham_replpair(dbc, &repldbt, 0)) == 0) {
388				hcp->dup_tlen -= DUP_SIZE(hcp->dup_len);
389				F_SET(hcp, H_DELETED);
390				/*
391				 * Clear any cached streaming information.
392				 */
393				hcp->stream_start_pgno = PGNO_INVALID;
394				ret = __hamc_update(dbc, DUP_SIZE(hcp->dup_len),
395				    DB_HAM_CURADJ_DEL, 1);
396			}
397		}
398	} else /* Not a duplicate */
399		ret = __ham_del_pair(dbc, 0);
400
401out:	if (hcp->page != NULL) {
402		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
403		    hcp->page, dbc->priority)) != 0 && ret == 0)
404			ret = t_ret;
405		hcp->page = NULL;
406	}
407	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
408		ret = t_ret;
409	return (ret);
410}
411
412/*
413 * __hamc_dup --
414 *	Duplicate a hash cursor, such that the new one holds appropriate
415 *	locks for the position of the original.
416 *
417 * PUBLIC: int __hamc_dup __P((DBC *, DBC *));
418 */
419int
420__hamc_dup(orig_dbc, new_dbc)
421	DBC *orig_dbc, *new_dbc;
422{
423	HASH_CURSOR *orig, *new;
424
425	orig = (HASH_CURSOR *)orig_dbc->internal;
426	new = (HASH_CURSOR *)new_dbc->internal;
427
428	new->bucket = orig->bucket;
429	new->lbucket = orig->lbucket;
430	new->dup_off = orig->dup_off;
431	new->dup_len = orig->dup_len;
432	new->dup_tlen = orig->dup_tlen;
433
434	if (F_ISSET(orig, H_DELETED))
435		F_SET(new, H_DELETED);
436	if (F_ISSET(orig, H_ISDUP))
437		F_SET(new, H_ISDUP);
438
439	return (0);
440}
441
442static int
443__hamc_get(dbc, key, data, flags, pgnop)
444	DBC *dbc;
445	DBT *key;
446	DBT *data;
447	u_int32_t flags;
448	db_pgno_t *pgnop;
449{
450	DB *dbp;
451	DB_MPOOLFILE *mpf;
452	ENV *env;
453	HASH_CURSOR *hcp;
454	db_lockmode_t lock_type;
455	int ret, t_ret;
456
457	hcp = (HASH_CURSOR *)dbc->internal;
458	dbp = dbc->dbp;
459	env = dbp->env;
460	mpf = dbp->mpf;
461
462	/* Clear OR'd in additional bits so we can check for flag equality. */
463	if (F_ISSET(dbc, DBC_RMW))
464		lock_type = DB_LOCK_WRITE;
465	else
466		lock_type = DB_LOCK_READ;
467
468	if ((ret = __ham_get_meta(dbc)) != 0)
469		return (ret);
470	hcp->seek_size = 0;
471
472	ret = 0;
473	switch (flags) {
474	case DB_PREV_DUP:
475		F_SET(hcp, H_DUPONLY);
476		goto prev;
477	case DB_PREV_NODUP:
478		F_SET(hcp, H_NEXT_NODUP);
479		/* FALLTHROUGH */
480	case DB_PREV:
481		if (IS_INITIALIZED(dbc)) {
482prev:			ret = __ham_item_prev(dbc, lock_type, pgnop);
483			break;
484		}
485		/* FALLTHROUGH */
486	case DB_LAST:
487		ret = __ham_item_last(dbc, lock_type, pgnop);
488		break;
489	case DB_NEXT_DUP:
490	case DB_GET_BOTHC:
491		/* cgetchk has already determined that the cursor is set. */
492		F_SET(hcp, H_DUPONLY);
493		goto next;
494	case DB_NEXT_NODUP:
495		F_SET(hcp, H_NEXT_NODUP);
496		/* FALLTHROUGH */
497	case DB_NEXT:
498		if (IS_INITIALIZED(dbc)) {
499next:			ret = __ham_item_next(dbc, lock_type, pgnop);
500			break;
501		}
502		/* FALLTHROUGH */
503	case DB_FIRST:
504		ret = __ham_item_first(dbc, lock_type, pgnop);
505		break;
506	case DB_SET:
507	case DB_SET_RANGE:
508	case DB_GET_BOTH:
509	case DB_GET_BOTH_RANGE:
510		ret = __ham_lookup(dbc, key, 0, lock_type, pgnop);
511		break;
512	case DB_CURRENT:
513		/* cgetchk has already determined that the cursor is set. */
514		if (F_ISSET(hcp, H_DELETED)) {
515			ret = DB_KEYEMPTY;
516			goto err;
517		}
518
519		ret = __ham_item(dbc, lock_type, pgnop);
520		break;
521	default:
522		ret = __db_unknown_flag(env, "__hamc_get", flags);
523		break;
524	}
525
526	/*
527	 * Must always enter this loop to do error handling and
528	 * check for big key/data pair.
529	 */
530	for (;;) {
531		if (ret != 0 && ret != DB_NOTFOUND)
532			goto err;
533		else if (F_ISSET(hcp, H_OK)) {
534			if (*pgnop == PGNO_INVALID)
535				ret = __ham_dup_return(dbc, data, flags);
536			break;
537		} else if (!F_ISSET(hcp, H_NOMORE)) {
538			__db_errx(env, "H_NOMORE returned to __hamc_get");
539			ret = EINVAL;
540			break;
541		}
542
543		/*
544		 * Ran out of entries in a bucket; change buckets.
545		 */
546		switch (flags) {
547		case DB_LAST:
548		case DB_PREV:
549		case DB_PREV_DUP:
550		case DB_PREV_NODUP:
551			ret = __memp_fput(mpf,
552			    dbc->thread_info, hcp->page, dbc->priority);
553			hcp->page = NULL;
554			if (hcp->bucket == 0) {
555				ret = DB_NOTFOUND;
556				hcp->pgno = PGNO_INVALID;
557				goto err;
558			}
559			F_CLR(hcp, H_ISDUP);
560			hcp->bucket--;
561			hcp->indx = NDX_INVALID;
562			hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
563			if (ret == 0)
564				ret = __ham_item_prev(dbc, lock_type, pgnop);
565			break;
566		case DB_FIRST:
567		case DB_NEXT:
568		case DB_NEXT_NODUP:
569			ret = __memp_fput(mpf,
570			    dbc->thread_info, hcp->page, dbc->priority);
571			hcp->page = NULL;
572			hcp->indx = NDX_INVALID;
573			hcp->bucket++;
574			F_CLR(hcp, H_ISDUP);
575			hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
576			if (hcp->bucket > hcp->hdr->max_bucket) {
577				ret = DB_NOTFOUND;
578				hcp->pgno = PGNO_INVALID;
579				goto err;
580			}
581			if (ret == 0)
582				ret = __ham_item_next(dbc, lock_type, pgnop);
583			break;
584		case DB_GET_BOTH:
585		case DB_GET_BOTHC:
586		case DB_GET_BOTH_RANGE:
587		case DB_NEXT_DUP:
588		case DB_SET:
589		case DB_SET_RANGE:
590			/* Key not found. */
591			ret = DB_NOTFOUND;
592			goto err;
593		case DB_CURRENT:
594			/*
595			 * This should only happen if you are doing deletes and
596			 * reading with concurrent threads and not doing proper
597			 * locking.  We return the same error code as we would
598			 * if the cursor were deleted.
599			 */
600			ret = DB_KEYEMPTY;
601			goto err;
602		default:
603			DB_ASSERT(env, 0);
604		}
605	}
606
607err:	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
608		ret = t_ret;
609
610	F_CLR(hcp, H_DUPONLY);
611	F_CLR(hcp, H_NEXT_NODUP);
612
613	return (ret);
614}
615
616/*
617 * __ham_bulk -- Return bulk data from a hash table.
618 */
619static int
620__ham_bulk(dbc, data, flags)
621	DBC *dbc;
622	DBT *data;
623	u_int32_t flags;
624{
625	DB *dbp;
626	DB_MPOOLFILE *mpf;
627	HASH_CURSOR *cp;
628	PAGE *pg;
629	db_indx_t dup_len, dup_off, dup_tlen, indx, *inp;
630	db_lockmode_t lock_mode;
631	db_pgno_t pgno;
632	int32_t *endp, *offp, *saveoff;
633	u_int32_t key_off, key_size, pagesize, size, space;
634	u_int8_t *dbuf, *dp, *hk, *np, *tmp;
635	int is_dup, is_key;
636	int need_pg, next_key, no_dup, ret, t_ret;
637
638	ret = 0;
639	key_off = 0;
640	dup_len = dup_off = dup_tlen = 0;
641	size = 0;
642	dbp = dbc->dbp;
643	pagesize = dbp->pgsize;
644	mpf = dbp->mpf;
645	cp = (HASH_CURSOR *)dbc->internal;
646	is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
647	next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
648	no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
649	dbuf = data->data;
650	np = dp = dbuf;
651
652	/* Keep track of space that is left.  There is an termination entry */
653	space = data->ulen;
654	space -= sizeof(*offp);
655
656	/* Build the offset/size table from the end up. */
657	endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
658	endp--;
659	offp = endp;
660
661	key_size = 0;
662	lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE: DB_LOCK_READ;
663
664next_pg:
665	need_pg = 1;
666	indx = cp->indx;
667	pg = cp->page;
668	inp = P_INP(dbp, pg);
669
670	do {
671		if (is_key) {
672			hk = H_PAIRKEY(dbp, pg, indx);
673			if (HPAGE_PTYPE(hk) == H_OFFPAGE) {
674				memcpy(&key_size,
675				    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
676				memcpy(&pgno,
677				    HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
678				size = key_size;
679				if (key_size > space)
680					goto get_key_space;
681				if ((ret = __bam_bulk_overflow(
682				    dbc, key_size, pgno, np)) != 0)
683					return (ret);
684				space -= key_size;
685				key_off = (u_int32_t)(np - dbuf);
686				np += key_size;
687			} else {
688				if (need_pg) {
689					dp = np;
690					size = pagesize - HOFFSET(pg);
691					if (space < size) {
692get_key_space:
693						if (offp == endp) {
694							data->size = (u_int32_t)
695							    DB_ALIGN(size +
696							    pagesize, 1024);
697							return
698							    (DB_BUFFER_SMALL);
699						}
700						goto back_up;
701					}
702					memcpy(dp,
703					   (u_int8_t *)pg + HOFFSET(pg), size);
704					need_pg = 0;
705					space -= size;
706					np += size;
707				}
708				key_size = LEN_HKEY(dbp, pg, pagesize, indx);
709				key_off = ((inp[indx] - HOFFSET(pg)) +
710				    (u_int32_t)(dp - dbuf)) +
711				    SSZA(HKEYDATA, data);
712			}
713		}
714
715		hk = H_PAIRDATA(dbp, pg, indx);
716		switch (HPAGE_PTYPE(hk)) {
717		case H_DUPLICATE:
718		case H_KEYDATA:
719			if (need_pg) {
720				dp = np;
721				size = pagesize - HOFFSET(pg);
722				if (space < size) {
723back_up:
724					if (indx != 0) {
725						indx -= 2;
726						/* XXX
727						 * It's not clear that this is
728						 * the right way to fix this,
729						 * but here goes.
730						 * If we are backing up onto a
731						 * duplicate, then we need to
732						 * position ourselves at the
733						 * end of the duplicate set.
734						 * We probably need to make
735						 * this work for H_OFFDUP too.
736						 * It might be worth making a
737						 * dummy cursor and calling
738						 * __ham_item_prev.
739						 */
740						tmp = H_PAIRDATA(dbp, pg, indx);
741						if (HPAGE_PTYPE(tmp) ==
742						    H_DUPLICATE) {
743							dup_off = dup_tlen =
744							    LEN_HDATA(dbp, pg,
745							    pagesize, indx + 1);
746							memcpy(&dup_len,
747							    HKEYDATA_DATA(tmp),
748							    sizeof(db_indx_t));
749						} else  {
750							is_dup = 0;
751							dup_len = 0;
752							dup_off = 0;
753							dup_tlen = 0;
754							F_CLR(cp, H_ISDUP);
755						}
756						goto get_space;
757					}
758					/* indx == 0 */
759					cp->dup_len = dup_len;
760					cp->dup_off = dup_off;
761					cp->dup_tlen = dup_tlen;
762					if ((ret = __ham_item_prev(dbc,
763					    lock_mode, &pgno)) != 0) {
764						if (ret != DB_NOTFOUND)
765							return (ret);
766						if ((ret = __memp_fput(mpf,
767						    dbc->thread_info, cp->page,
768						    dbc->priority)) != 0)
769							return (ret);
770						cp->page = NULL;
771						if (cp->bucket == 0) {
772							cp->indx = indx =
773							    NDX_INVALID;
774							goto get_space;
775						}
776						if ((ret =
777						    __ham_get_meta(dbc)) != 0)
778							return (ret);
779
780						cp->bucket--;
781						cp->pgno = BUCKET_TO_PAGE(cp,
782						    cp->bucket);
783						cp->indx = NDX_INVALID;
784						if ((ret = __ham_release_meta(
785						    dbc)) != 0)
786							return (ret);
787						if ((ret = __ham_item_prev(dbc,
788						    lock_mode, &pgno)) != 0)
789							return (ret);
790					}
791					indx = cp->indx;
792get_space:
793					/*
794					 * See if we put any data in the buffer.
795					 */
796					if (offp >= endp ||
797					    F_ISSET(dbc, DBC_TRANSIENT)) {
798						data->size = (u_int32_t)
799						    DB_ALIGN(size +
800						    data->ulen - space, 1024);
801						return (DB_BUFFER_SMALL);
802					}
803					/*
804					 * Don't continue;  we're all out
805					 * of space, even though we're
806					 * returning success.
807					 */
808					next_key = 0;
809					break;
810				}
811				memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
812				need_pg = 0;
813				space -= size;
814				np += size;
815			}
816
817			/*
818			 * We're about to crack the offset(s) and length(s)
819			 * out of an H_KEYDATA or H_DUPLICATE item.
820			 * There are three cases:
821			 *   1. We were moved into a duplicate set by
822			 *	the standard hash cursor code.  Respect
823			 *	the dup_off and dup_tlen we were given.
824			 *   2. We stumbled upon a duplicate set while
825			 *	walking the page on our own.  We need to
826			 *	recognize it as a dup and set dup_off and
827			 *	dup_tlen.
828			 *   3. The current item is not a dup.
829			 */
830			if (F_ISSET(cp, H_ISDUP)) {
831				/* Case 1 */
832				is_dup = 1;
833				dup_len = cp->dup_len;
834				dup_off = cp->dup_off;
835				dup_tlen = cp->dup_tlen;
836			} else if (HPAGE_PTYPE(hk) == H_DUPLICATE) {
837				/* Case 2 */
838				is_dup = 1;
839				/*
840				 * If we run out of memory and bail,
841				 * make sure the fact we're in a dup set
842				 * isn't ignored later.
843				 */
844				F_SET(cp, H_ISDUP);
845				dup_off = 0;
846				memcpy(&dup_len,
847				    HKEYDATA_DATA(hk), sizeof(db_indx_t));
848				dup_tlen = LEN_HDATA(dbp, pg, pagesize, indx);
849			} else {
850				/* Case 3 */
851				is_dup = 0;
852				dup_len = 0;
853				dup_off = 0;
854				dup_tlen = 0;
855			}
856
857			do {
858				space -= (is_key ? 4 : 2) * sizeof(*offp);
859				size += (is_key ? 4 : 2) * sizeof(*offp);
860				/*
861				 * Since space is an unsigned, if we happen
862				 * to wrap, then this comparison will turn out
863				 * to be true.  XXX Wouldn't it be better to
864				 * simply check above that space is greater than
865				 * the value we're about to subtract???
866				 */
867				if (space > data->ulen) {
868					if (!is_dup || dup_off == 0)
869						goto back_up;
870					dup_off -= (db_indx_t)
871					    DUP_SIZE((u_int32_t)offp[1]);
872					goto get_space;
873				}
874				if (is_key) {
875					*offp-- = (int32_t)key_off;
876					*offp-- = (int32_t)key_size;
877				}
878				if (is_dup) {
879					*offp-- = (int32_t)(
880					    ((inp[indx + 1] - HOFFSET(pg)) +
881					    dp - dbuf) + SSZA(HKEYDATA, data) +
882					    dup_off + sizeof(db_indx_t));
883					memcpy(&dup_len,
884					    HKEYDATA_DATA(hk) + dup_off,
885					    sizeof(db_indx_t));
886					dup_off += DUP_SIZE(dup_len);
887					*offp-- = dup_len;
888				} else {
889					*offp-- = (int32_t)(
890					    ((inp[indx + 1] - HOFFSET(pg)) +
891					    dp - dbuf) + SSZA(HKEYDATA, data));
892					*offp-- = LEN_HDATA(dbp, pg,
893					    pagesize, indx);
894				}
895			} while (is_dup && dup_off < dup_tlen && no_dup == 0);
896			F_CLR(cp, H_ISDUP);
897			break;
898		case H_OFFDUP:
899			memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
900			space -= 2 * sizeof(*offp);
901			if (space > data->ulen)
902				goto back_up;
903
904			if (is_key) {
905				space -= 2 * sizeof(*offp);
906				if (space > data->ulen)
907					goto back_up;
908				*offp-- = (int32_t)key_off;
909				*offp-- = (int32_t)key_size;
910			}
911			saveoff = offp;
912			if ((ret = __bam_bulk_duplicates(dbc,
913			    pgno, dbuf, is_key ? offp + 2 : NULL,
914			    &offp, &np, &space, no_dup)) != 0) {
915				if (ret == DB_BUFFER_SMALL) {
916					size = space;
917					space = 0;
918					if (is_key && saveoff == offp) {
919						offp += 2;
920						goto back_up;
921					}
922					goto get_space;
923				}
924				return (ret);
925			}
926			break;
927		case H_OFFPAGE:
928			space -= (is_key ? 4 : 2) * sizeof(*offp);
929			if (space > data->ulen)
930				goto back_up;
931
932			memcpy(&size, HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
933			memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
934			if (size > space)
935				goto back_up;
936
937			if ((ret =
938			    __bam_bulk_overflow(dbc, size, pgno, np)) != 0)
939				return (ret);
940
941			if (is_key) {
942				*offp-- = (int32_t)key_off;
943				*offp-- = (int32_t)key_size;
944			}
945
946			*offp-- = (int32_t)(np - dbuf);
947			*offp-- = (int32_t)size;
948
949			np += size;
950			space -= size;
951			break;
952		default:
953			/* Do nothing. */
954			break;
955		}
956	} while (next_key && (indx += 2) < NUM_ENT(pg));
957
958	cp->indx = indx;
959	cp->dup_len = dup_len;
960	cp->dup_off = dup_off;
961	cp->dup_tlen = dup_tlen;
962
963	/* If we are off the page then try to the next page. */
964	if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
965		if ((ret = __ham_item_next(dbc, lock_mode, &pgno)) == 0)
966			goto next_pg;
967		if (ret != DB_NOTFOUND)
968			return (ret);
969		if ((ret = __memp_fput(dbc->dbp->mpf,
970		    dbc->thread_info, cp->page, dbc->priority)) != 0)
971			return (ret);
972		cp->page = NULL;
973		if ((ret = __ham_get_meta(dbc)) != 0)
974			return (ret);
975
976		cp->bucket++;
977		if (cp->bucket > cp->hdr->max_bucket) {
978			/*
979			 * Restore cursor to its previous state.  We're past
980			 * the last item in the last bucket, so the next
981			 * DBC->get(DB_NEXT) will return DB_NOTFOUND.
982			 */
983			cp->bucket--;
984			ret = DB_NOTFOUND;
985		} else {
986			/*
987			 * Start on the next bucket.
988			 *
989			 * Note that if this new bucket happens to be empty,
990			 * but there's another non-empty bucket after it,
991			 * we'll return early.  This is a rare case, and we
992			 * don't guarantee any particular number of keys
993			 * returned on each call, so just let the next call
994			 * to bulk get move forward by yet another bucket.
995			 */
996			cp->pgno = BUCKET_TO_PAGE(cp, cp->bucket);
997			cp->indx = NDX_INVALID;
998			F_CLR(cp, H_ISDUP);
999			ret = __ham_item_next(dbc, lock_mode, &pgno);
1000		}
1001
1002		if ((t_ret = __ham_release_meta(dbc)) != 0)
1003			return (t_ret);
1004		if (ret == 0)
1005			goto next_pg;
1006		if (ret != DB_NOTFOUND)
1007			return (ret);
1008	}
1009	*offp = -1;
1010	return (0);
1011}
1012
1013static int
1014__hamc_put(dbc, key, data, flags, pgnop)
1015	DBC *dbc;
1016	DBT *key;
1017	DBT *data;
1018	u_int32_t flags;
1019	db_pgno_t *pgnop;
1020{
1021	DB *dbp;
1022	DBT tmp_val, *myval;
1023	DB_MPOOLFILE *mpf;
1024	HASH_CURSOR *hcp;
1025	u_int32_t nbytes;
1026	int ret, t_ret;
1027
1028	/*
1029	 * The compiler doesn't realize that we only use this when ret is
1030	 * equal to 0 and that if ret is equal to 0, that we must have set
1031	 * myval.  So, we initialize it here to shut the compiler up.
1032	 */
1033	COMPQUIET(myval, NULL);
1034
1035	dbp = dbc->dbp;
1036	mpf = dbp->mpf;
1037	hcp = (HASH_CURSOR *)dbc->internal;
1038
1039	if (F_ISSET(hcp, H_DELETED) && flags != DB_KEYFIRST &&
1040	    flags != DB_KEYLAST && flags != DB_OVERWRITE_DUP)
1041		return (DB_NOTFOUND);
1042
1043	if ((ret = __ham_get_meta(dbc)) != 0)
1044		goto err1;
1045
1046	switch (flags) {
1047	case DB_KEYLAST:
1048	case DB_KEYFIRST:
1049	case DB_NODUPDATA:
1050	case DB_NOOVERWRITE:
1051	case DB_OVERWRITE_DUP:
1052		nbytes = (ISBIG(hcp, key->size) ? HOFFPAGE_PSIZE :
1053		    HKEYDATA_PSIZE(key->size)) +
1054		    (ISBIG(hcp, data->size) ? HOFFPAGE_PSIZE :
1055		    HKEYDATA_PSIZE(data->size));
1056		if ((ret = __ham_lookup(dbc,
1057		    key, nbytes, DB_LOCK_WRITE, pgnop)) == DB_NOTFOUND) {
1058			if (hcp->seek_found_page != PGNO_INVALID &&
1059			    hcp->seek_found_page != hcp->pgno) {
1060				if ((ret = __memp_fput(mpf, dbc->thread_info,
1061				    hcp->page, dbc->priority)) != 0)
1062					goto err2;
1063				hcp->page = NULL;
1064				hcp->pgno = hcp->seek_found_page;
1065				hcp->indx = NDX_INVALID;
1066			}
1067
1068			if (F_ISSET(data, DB_DBT_PARTIAL) && data->doff != 0) {
1069				/*
1070				 * A partial put, but the key does not exist
1071				 * and we are not beginning the write at 0.
1072				 * We must create a data item padded up to doff
1073				 * and then write the new bytes represented by
1074				 * val.
1075				 */
1076				if ((ret = __ham_init_dbt(dbp->env, &tmp_val,
1077				    data->size + data->doff,
1078				    &dbc->my_rdata.data,
1079				    &dbc->my_rdata.ulen)) != 0)
1080					goto err2;
1081
1082				memset(tmp_val.data, 0, data->doff);
1083				memcpy((u_int8_t *)tmp_val.data +
1084				    data->doff, data->data, data->size);
1085				myval = &tmp_val;
1086			} else
1087				myval = (DBT *)data;
1088
1089			ret = __ham_add_el(dbc, key, myval, H_KEYDATA);
1090			goto done;
1091		} else if (flags == DB_NOOVERWRITE &&
1092		    !F_ISSET(hcp, H_DELETED)) {
1093			if (*pgnop == PGNO_INVALID)
1094				ret = DB_KEYEXIST;
1095			else
1096				ret = __bam_opd_exists(dbc, *pgnop);
1097			if (ret != 0)
1098				goto done;
1099		}
1100		break;
1101	case DB_BEFORE:
1102	case DB_AFTER:
1103	case DB_CURRENT:
1104		ret = __ham_item(dbc, DB_LOCK_WRITE, pgnop);
1105		break;
1106	default:
1107		ret = __db_unknown_flag(dbp->env, "__hamc_put", flags);
1108		break;
1109	}
1110
1111	/*
1112	 * Invalidate any insert index found so they are not reused
1113	 * in future inserts.
1114	 */
1115	hcp->seek_found_page = PGNO_INVALID;
1116	hcp->seek_found_indx = NDX_INVALID;
1117
1118	if (*pgnop == PGNO_INVALID && ret == 0) {
1119		if ((ret = __memp_dirty(mpf, &hcp->page,
1120		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1121			goto done;
1122		if (flags == DB_CURRENT ||
1123		    (!(F_ISSET(dbp, DB_AM_DUP) || F_ISSET(key, DB_DBT_DUPOK)) &&
1124		    (flags == DB_KEYFIRST || flags == DB_KEYLAST ||
1125		    flags == DB_NODUPDATA || flags == DB_OVERWRITE_DUP)))
1126			ret = __ham_overwrite(dbc, data, flags);
1127		else
1128			ret = __ham_add_dup(dbc, data, flags, pgnop);
1129	}
1130
1131done:	if (hcp->page != NULL) {
1132		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
1133		    hcp->page, dbc->priority)) != 0 && ret == 0)
1134			ret = t_ret;
1135		if (t_ret == 0)
1136			hcp->page = NULL;
1137	}
1138
1139	if (ret == 0 && F_ISSET(hcp, H_EXPAND)) {
1140		ret = __ham_expand_table(dbc);
1141		F_CLR(hcp, H_EXPAND);
1142		/* If we are out of space, ignore the error. */
1143		if (ret == ENOSPC && dbc->txn == NULL)
1144			ret = 0;
1145	}
1146
1147err2:	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
1148		ret = t_ret;
1149
1150err1:	return (ret);
1151}
1152
1153/********************************* UTILITIES ************************/
1154
1155/*
1156 * __ham_expand_table --
1157 */
1158static int
1159__ham_expand_table(dbc)
1160	DBC *dbc;
1161{
1162	DB *dbp;
1163	DBMETA *mmeta;
1164	DB_LOCK metalock;
1165	DB_LSN lsn;
1166	DB_MPOOLFILE *mpf;
1167	HASH_CURSOR *hcp;
1168	PAGE *h;
1169	db_pgno_t pgno, mpgno;
1170	u_int32_t logn, newalloc, new_bucket, old_bucket;
1171	int got_meta, new_double, ret, t_ret;
1172
1173	dbp = dbc->dbp;
1174	mpf = dbp->mpf;
1175	hcp = (HASH_CURSOR *)dbc->internal;
1176	if ((ret = __ham_dirty_meta(dbc, 0)) != 0)
1177		return (ret);
1178
1179	LOCK_INIT(metalock);
1180	mmeta = (DBMETA *) hcp->hdr;
1181	mpgno = mmeta->pgno;
1182	h = NULL;
1183	newalloc = 0;
1184	got_meta = 0;
1185
1186	/*
1187	 * If the split point is about to increase, make sure that we
1188	 * have enough extra pages.  The calculation here is weird.
1189	 * We'd like to do this after we've upped max_bucket, but it's
1190	 * too late then because we've logged the meta-data split.  What
1191	 * we'll do between then and now is increment max bucket and then
1192	 * see what the log of one greater than that is; here we have to
1193	 * look at the log of max + 2.  VERY NASTY STUFF.
1194	 *
1195	 * We figure out what we need to do, then we log it, then request
1196	 * the pages from mpool.  We don't want to fail after extending
1197	 * the file.
1198	 *
1199	 * If the page we are about to split into has already been allocated,
1200	 * then we simply need to get it to get its LSN.  If it hasn't yet
1201	 * been allocated, then we know it's LSN (0,0).
1202	 */
1203
1204	new_bucket = hcp->hdr->max_bucket + 1;
1205	old_bucket = new_bucket & hcp->hdr->low_mask;
1206
1207	new_double = hcp->hdr->max_bucket == hcp->hdr->high_mask;
1208	logn = __db_log2(new_bucket);
1209
1210	if (!new_double || hcp->hdr->spares[logn + 1] != PGNO_INVALID) {
1211		/* Page exists; get it so we can get its LSN */
1212		pgno = BUCKET_TO_PAGE(hcp, new_bucket);
1213		if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info, dbc->txn,
1214		    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &h)) != 0)
1215			goto err;
1216		lsn = h->lsn;
1217	} else {
1218		/* Get the master meta-data page to do allocation. */
1219		if (F_ISSET(dbp, DB_AM_SUBDB)) {
1220			mpgno = PGNO_BASE_MD;
1221			if ((ret = __db_lget(dbc,
1222			   0, mpgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
1223				goto err;
1224			if ((ret = __memp_fget(mpf, &mpgno, dbc->thread_info,
1225			    dbc->txn, DB_MPOOL_DIRTY, &mmeta)) != 0)
1226				goto err;
1227			got_meta = 1;
1228		}
1229		pgno = mmeta->last_pgno + 1;
1230		ZERO_LSN(lsn);
1231		newalloc = 1;
1232	}
1233
1234	/* Log the meta-data split first. */
1235	if (DBC_LOGGING(dbc)) {
1236		/*
1237		 * We always log the page number of the first page of
1238		 * the allocation group.  However, the LSN that we log
1239		 * is either the LSN on the first page (if we did not
1240		 * do the actual allocation here) or the LSN on the last
1241		 * page of the unit (if we did do the allocation here).
1242		 */
1243		if ((ret = __ham_metagroup_log(dbp, dbc->txn,
1244		    &lsn, 0, hcp->hdr->max_bucket, mpgno, &mmeta->lsn,
1245		    hcp->hdr->dbmeta.pgno, &hcp->hdr->dbmeta.lsn,
1246		    pgno, &lsn, newalloc, mmeta->last_pgno)) != 0)
1247			goto err;
1248	} else
1249		LSN_NOT_LOGGED(lsn);
1250
1251	hcp->hdr->dbmeta.lsn = lsn;
1252
1253	if (new_double && hcp->hdr->spares[logn + 1] == PGNO_INVALID) {
1254		/*
1255		 * We need to begin a new doubling and we have not allocated
1256		 * any pages yet.  Read the last page in and initialize it to
1257		 * make the allocation contiguous.  The pgno we calculated
1258		 * above is the first page allocated. The entry in spares is
1259		 * that page number minus any buckets already allocated (it
1260		 * simplifies bucket to page transaction).  After we've set
1261		 * that, we calculate the last pgno.
1262		 */
1263
1264		pgno += hcp->hdr->max_bucket;
1265
1266		if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info, dbc->txn,
1267		    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &h)) != 0)
1268			goto err;
1269
1270		hcp->hdr->spares[logn + 1] =
1271		    (pgno - new_bucket) - hcp->hdr->max_bucket;
1272		mmeta->last_pgno = pgno;
1273		mmeta->lsn = lsn;
1274
1275		P_INIT(h, dbp->pgsize,
1276		    pgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH);
1277	}
1278
1279	/* Write out whatever page we ended up modifying. */
1280	h->lsn = lsn;
1281	if ((ret = __memp_fput(mpf, dbc->thread_info, h, dbc->priority)) != 0)
1282		goto err;
1283	h = NULL;
1284
1285	/*
1286	 * Update the meta-data page of this hash database.
1287	 */
1288	hcp->hdr->max_bucket = new_bucket;
1289	if (new_double) {
1290		hcp->hdr->low_mask = hcp->hdr->high_mask;
1291		hcp->hdr->high_mask = new_bucket | hcp->hdr->low_mask;
1292	}
1293
1294err:	if (got_meta)
1295		if ((t_ret = __memp_fput(mpf,
1296		    dbc->thread_info, mmeta, dbc->priority)) != 0 && ret == 0)
1297			ret = t_ret;
1298	if ((t_ret = __TLPUT(dbc, metalock)) != 0 && ret == 0)
1299			ret = t_ret;
1300	if (h != NULL)
1301		if ((t_ret = __memp_fput(mpf,
1302		    dbc->thread_info, h, dbc->priority)) != 0 && ret == 0)
1303			ret = t_ret;
1304
1305	/* Relocate records to the new bucket -- after releasing metapage. */
1306	if (ret == 0)
1307		ret = __ham_split_page(dbc, old_bucket, new_bucket);
1308
1309	return (ret);
1310}
1311
1312/*
1313 * PUBLIC: u_int32_t __ham_call_hash __P((DBC *, u_int8_t *, u_int32_t));
1314 */
1315u_int32_t
1316__ham_call_hash(dbc, k, len)
1317	DBC *dbc;
1318	u_int8_t *k;
1319	u_int32_t len;
1320{
1321	DB *dbp;
1322	HASH *hashp;
1323	HASH_CURSOR *hcp;
1324	u_int32_t n, bucket;
1325
1326	dbp = dbc->dbp;
1327	hcp = (HASH_CURSOR *)dbc->internal;
1328	hashp = dbp->h_internal;
1329
1330	n = (u_int32_t)(hashp->h_hash(dbp, k, len));
1331
1332	bucket = n & hcp->hdr->high_mask;
1333	if (bucket > hcp->hdr->max_bucket)
1334		bucket = bucket & hcp->hdr->low_mask;
1335	return (bucket);
1336}
1337
1338/*
1339 * Check for duplicates, and call __db_ret appropriately.  Release
1340 * everything held by the cursor.
1341 */
1342static int
1343__ham_dup_return(dbc, val, flags)
1344	DBC *dbc;
1345	DBT *val;
1346	u_int32_t flags;
1347{
1348	DB *dbp;
1349	DBT *myval, tmp_val;
1350	HASH_CURSOR *hcp;
1351	PAGE *pp;
1352	db_indx_t ndx;
1353	db_pgno_t pgno;
1354	u_int32_t off, tlen;
1355	u_int8_t *hk, type;
1356	int cmp, ret;
1357	db_indx_t len;
1358
1359	/* Check for duplicate and return the first one. */
1360	dbp = dbc->dbp;
1361	hcp = (HASH_CURSOR *)dbc->internal;
1362	ndx = H_DATAINDEX(hcp->indx);
1363	type = HPAGE_TYPE(dbp, hcp->page, ndx);
1364	pp = hcp->page;
1365	myval = val;
1366
1367	/*
1368	 * There are 4 cases:
1369	 * 1. We are not in duplicate, simply return; the upper layer
1370	 *    will do the right thing.
1371	 * 2. We are looking at keys and stumbled onto a duplicate.
1372	 * 3. We are in the middle of a duplicate set. (ISDUP set)
1373	 * 4. We need to check for particular data match.
1374	 */
1375
1376	/* We should never get here with off-page dups. */
1377	DB_ASSERT(dbp->env, type != H_OFFDUP);
1378
1379	/* Case 1 */
1380	if (type != H_DUPLICATE && flags != DB_GET_BOTH &&
1381	    flags != DB_GET_BOTHC && flags != DB_GET_BOTH_RANGE)
1382		return (0);
1383
1384	/*
1385	 * Here we check for the case where we just stumbled onto a
1386	 * duplicate.  In this case, we do initialization and then
1387	 * let the normal duplicate code handle it. (Case 2)
1388	 */
1389	if (!F_ISSET(hcp, H_ISDUP) && type == H_DUPLICATE) {
1390		F_SET(hcp, H_ISDUP);
1391		hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
1392		    hcp->hdr->dbmeta.pagesize, hcp->indx);
1393		hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1394		if (flags == DB_LAST ||
1395		    flags == DB_PREV || flags == DB_PREV_NODUP) {
1396			hcp->dup_off = 0;
1397			do {
1398				memcpy(&len,
1399				    HKEYDATA_DATA(hk) + hcp->dup_off,
1400				    sizeof(db_indx_t));
1401				hcp->dup_off += DUP_SIZE(len);
1402			} while (hcp->dup_off < hcp->dup_tlen);
1403			hcp->dup_off -= DUP_SIZE(len);
1404		} else {
1405			memcpy(&len,
1406			    HKEYDATA_DATA(hk), sizeof(db_indx_t));
1407			hcp->dup_off = 0;
1408		}
1409		hcp->dup_len = len;
1410	}
1411
1412	/*
1413	 * If we are retrieving a specific key/data pair, then we
1414	 * may need to adjust the cursor before returning data.
1415	 * Case 4
1416	 */
1417	if (flags == DB_GET_BOTH ||
1418	    flags == DB_GET_BOTHC || flags == DB_GET_BOTH_RANGE) {
1419		if (F_ISSET(hcp, H_ISDUP)) {
1420			/*
1421			 * If we're doing a join, search forward from the
1422			 * current position, not the beginning of the dup set.
1423			 */
1424			if (flags == DB_GET_BOTHC)
1425				F_SET(hcp, H_CONTINUE);
1426
1427			__ham_dsearch(dbc, val, &off, &cmp, flags);
1428
1429			/*
1430			 * This flag is set nowhere else and is safe to
1431			 * clear unconditionally.
1432			 */
1433			F_CLR(hcp, H_CONTINUE);
1434			hcp->dup_off = off;
1435		} else {
1436			hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1437			if (((HKEYDATA *)hk)->type == H_OFFPAGE) {
1438				memcpy(&tlen,
1439				    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
1440				memcpy(&pgno,
1441				    HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
1442				if ((ret = __db_moff(dbc, val, pgno, tlen,
1443				    dbp->dup_compare, &cmp)) != 0)
1444					return (ret);
1445				cmp = -cmp;
1446			} else {
1447				/*
1448				 * We do not zero tmp_val since the comparison
1449				 * routines may only look at data and size.
1450				 */
1451				tmp_val.data = HKEYDATA_DATA(hk);
1452				tmp_val.size = LEN_HDATA(dbp, hcp->page,
1453				    dbp->pgsize, hcp->indx);
1454				cmp = dbp->dup_compare == NULL ?
1455				    __bam_defcmp(dbp, &tmp_val, val) :
1456				    dbp->dup_compare(dbp, &tmp_val, val);
1457			}
1458
1459			if (cmp > 0 && flags == DB_GET_BOTH_RANGE &&
1460			    F_ISSET(dbp, DB_AM_DUPSORT))
1461				cmp = 0;
1462		}
1463
1464		if (cmp != 0)
1465			return (DB_NOTFOUND);
1466	}
1467
1468	/*
1469	 * If we've already got the data for this value, or we're doing a bulk
1470	 * get, we don't want to return the data.
1471	 */
1472	if (F_ISSET(dbc, DBC_MULTIPLE | DBC_MULTIPLE_KEY) ||
1473	    F_ISSET(val, DB_DBT_ISSET))
1474		return (0);
1475
1476	/*
1477	 * Now, everything is initialized, grab a duplicate if
1478	 * necessary.
1479	 */
1480	if (F_ISSET(hcp, H_ISDUP)) {	/* Case 3 */
1481		/*
1482		 * Copy the DBT in case we are retrieving into user
1483		 * memory and we need the parameters for it.  If the
1484		 * user requested a partial, then we need to adjust
1485		 * the user's parameters to get the partial of the
1486		 * duplicate which is itself a partial.
1487		 */
1488		memcpy(&tmp_val, val, sizeof(*val));
1489
1490		if (F_ISSET(&tmp_val, DB_DBT_PARTIAL)) {
1491			/*
1492			 * Take the user's length unless it would go
1493			 * beyond the end of the duplicate.
1494			 */
1495			if (tmp_val.doff > hcp->dup_len)
1496				tmp_val.dlen = 0;
1497			else if (tmp_val.dlen + tmp_val.doff > hcp->dup_len)
1498				tmp_val.dlen = hcp->dup_len - tmp_val.doff;
1499
1500		} else {
1501			F_SET(&tmp_val, DB_DBT_PARTIAL);
1502			tmp_val.dlen = hcp->dup_len;
1503			tmp_val.doff = 0;
1504		}
1505
1506		/*
1507		 * Set offset to the appropriate place within the
1508		 * current duplicate -- need to take into account
1509		 * both the dup_off and the current duplicate's
1510		 * length.
1511		 */
1512		tmp_val.doff += hcp->dup_off + sizeof(db_indx_t);
1513
1514		myval = &tmp_val;
1515	}
1516
1517	/*
1518	 * Finally, if we had a duplicate, pp, ndx, and myval should be
1519	 * set appropriately.
1520	 */
1521	if ((ret = __db_ret(dbc, pp, ndx, myval,
1522	    &dbc->rdata->data, &dbc->rdata->ulen)) != 0) {
1523		if (ret == DB_BUFFER_SMALL)
1524			val->size = myval->size;
1525		return (ret);
1526	}
1527
1528	/*
1529	 * In case we sent a temporary off to db_ret, set the real
1530	 * return values.
1531	 */
1532	val->data = myval->data;
1533	val->size = myval->size;
1534
1535	F_SET(val, DB_DBT_ISSET);
1536
1537	return (0);
1538}
1539
1540/*
1541 * Overwrite a record.
1542 *
1543 * PUBLIC: int  __ham_overwrite __P((DBC *, DBT *, u_int32_t));
1544 */
1545int
1546__ham_overwrite(dbc, nval, flags)
1547	DBC *dbc;
1548	DBT *nval;
1549	u_int32_t flags;
1550{
1551	DB *dbp;
1552	DBT *myval, tmp_val, tmp_val2;
1553	ENV *env;
1554	HASH_CURSOR *hcp;
1555	void *newrec;
1556	u_int8_t *hk, *p;
1557	u_int32_t len, nondup_size;
1558	db_indx_t newsize;
1559	int ret;
1560
1561	dbp = dbc->dbp;
1562	env = dbp->env;
1563	hcp = (HASH_CURSOR *)dbc->internal;
1564	if (F_ISSET(hcp, H_ISDUP)) {
1565		/*
1566		 * This is an overwrite of a duplicate. We should never
1567		 * be off-page at this point.
1568		 */
1569		DB_ASSERT(env, hcp->opd == NULL);
1570		/* On page dups */
1571		if (F_ISSET(nval, DB_DBT_PARTIAL)) {
1572			/*
1573			 * We're going to have to get the current item, then
1574			 * construct the record, do any padding and do a
1575			 * replace.
1576			 */
1577			memset(&tmp_val, 0, sizeof(tmp_val));
1578			if ((ret =
1579			    __ham_dup_return(dbc, &tmp_val, DB_CURRENT)) != 0)
1580				return (ret);
1581
1582			/* Figure out new size. */
1583			nondup_size = tmp_val.size;
1584			newsize = nondup_size;
1585
1586			/*
1587			 * Three cases:
1588			 * 1. strictly append (may need to allocate space
1589			 *	for pad bytes; really gross).
1590			 * 2. overwrite some and append.
1591			 * 3. strictly overwrite.
1592			 */
1593			if (nval->doff > nondup_size)
1594				newsize +=
1595				    ((nval->doff - nondup_size) + nval->size);
1596			else if (nval->doff + nval->dlen > nondup_size)
1597				newsize += nval->size -
1598				    (nondup_size - nval->doff);
1599			else
1600				newsize += nval->size - nval->dlen;
1601
1602			/*
1603			 * Make sure that the new size doesn't put us over
1604			 * the onpage duplicate size in which case we need
1605			 * to convert to off-page duplicates.
1606			 */
1607			if (ISBIG(hcp,
1608			    (hcp->dup_tlen - nondup_size) + newsize)) {
1609				if ((ret = __ham_dup_convert(dbc)) != 0)
1610					return (ret);
1611				return (hcp->opd->am_put(hcp->opd,
1612				    NULL, nval, flags, NULL));
1613			}
1614
1615			if ((ret = __os_malloc(dbp->env,
1616			    DUP_SIZE(newsize), &newrec)) != 0)
1617				return (ret);
1618			memset(&tmp_val2, 0, sizeof(tmp_val2));
1619			F_SET(&tmp_val2, DB_DBT_PARTIAL);
1620
1621			/* Construct the record. */
1622			p = newrec;
1623			/* Initial size. */
1624			memcpy(p, &newsize, sizeof(db_indx_t));
1625			p += sizeof(db_indx_t);
1626
1627			/* First part of original record. */
1628			len = nval->doff > tmp_val.size
1629			    ? tmp_val.size : nval->doff;
1630			memcpy(p, tmp_val.data, len);
1631			p += len;
1632
1633			if (nval->doff > tmp_val.size) {
1634				/* Padding */
1635				memset(p, 0, nval->doff - tmp_val.size);
1636				p += nval->doff - tmp_val.size;
1637			}
1638
1639			/* New bytes */
1640			memcpy(p, nval->data, nval->size);
1641			p += nval->size;
1642
1643			/* End of original record (if there is any) */
1644			if (nval->doff + nval->dlen < tmp_val.size) {
1645				len = (tmp_val.size - nval->doff) - nval->dlen;
1646				memcpy(p, (u_int8_t *)tmp_val.data +
1647				    nval->doff + nval->dlen, len);
1648				p += len;
1649			}
1650
1651			/* Final size. */
1652			memcpy(p, &newsize, sizeof(db_indx_t));
1653
1654			/*
1655			 * Make sure that the caller isn't corrupting
1656			 * the sort order.
1657			 */
1658			if (dbp->dup_compare != NULL) {
1659				tmp_val2.data =
1660				    (u_int8_t *)newrec + sizeof(db_indx_t);
1661				tmp_val2.size = newsize;
1662				if (dbp->dup_compare(
1663				    dbp, &tmp_val, &tmp_val2) != 0) {
1664					__os_free(env, newrec);
1665					return (__db_duperr(dbp, flags));
1666				}
1667			}
1668
1669			tmp_val2.data = newrec;
1670			tmp_val2.size = DUP_SIZE(newsize);
1671			tmp_val2.doff = hcp->dup_off;
1672			tmp_val2.dlen = DUP_SIZE(hcp->dup_len);
1673
1674			ret = __ham_replpair(dbc, &tmp_val2, 0);
1675			__os_free(env, newrec);
1676
1677			/* Update cursor */
1678			if (ret != 0)
1679				return (ret);
1680
1681			if (newsize > nondup_size) {
1682				if ((ret = __hamc_update(dbc,
1683				    (newsize - nondup_size),
1684				    DB_HAM_CURADJ_ADDMOD, 1)) != 0)
1685					return (ret);
1686				hcp->dup_tlen += (newsize - nondup_size);
1687			} else {
1688				if ((ret = __hamc_update(dbc,
1689				    (nondup_size - newsize),
1690				    DB_HAM_CURADJ_DELMOD, 1)) != 0)
1691					return (ret);
1692				hcp->dup_tlen -= (nondup_size - newsize);
1693			}
1694			hcp->dup_len = newsize;
1695			return (0);
1696		} else {
1697			/* Check whether we need to convert to off page. */
1698			if (ISBIG(hcp,
1699			    (hcp->dup_tlen - hcp->dup_len) + nval->size)) {
1700				if ((ret = __ham_dup_convert(dbc)) != 0)
1701					return (ret);
1702				return (hcp->opd->am_put(hcp->opd,
1703				    NULL, nval, flags, NULL));
1704			}
1705
1706			/* Make sure we maintain sort order. */
1707			if (dbp->dup_compare != NULL) {
1708				tmp_val2.data =
1709				    HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page,
1710				    hcp->indx)) + hcp->dup_off +
1711				    sizeof(db_indx_t);
1712				tmp_val2.size = hcp->dup_len;
1713				if (dbp->dup_compare(
1714				    dbp, nval, &tmp_val2) != 0) {
1715					__db_errx(env,
1716			"Existing data sorts differently from put data");
1717					return (EINVAL);
1718				}
1719			}
1720			/* Overwriting a complete duplicate. */
1721			if ((ret =
1722			    __ham_make_dup(dbp->env, nval, &tmp_val,
1723			    &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
1724				return (ret);
1725			/* Now fix what we are replacing. */
1726			tmp_val.doff = hcp->dup_off;
1727			tmp_val.dlen = DUP_SIZE(hcp->dup_len);
1728
1729			/* Update cursor */
1730			if (nval->size > hcp->dup_len) {
1731				if ((ret = __hamc_update(dbc,
1732				    (nval->size - hcp->dup_len),
1733				    DB_HAM_CURADJ_ADDMOD, 1)) != 0)
1734					return (ret);
1735				hcp->dup_tlen += (nval->size - hcp->dup_len);
1736			} else {
1737				if ((ret = __hamc_update(dbc,
1738				    (hcp->dup_len - nval->size),
1739				    DB_HAM_CURADJ_DELMOD, 1)) != 0)
1740					return (ret);
1741				hcp->dup_tlen -= (hcp->dup_len - nval->size);
1742			}
1743			hcp->dup_len = (db_indx_t)nval->size;
1744		}
1745		myval = &tmp_val;
1746	} else if (!F_ISSET(nval, DB_DBT_PARTIAL)) {
1747		/* Put/overwrite */
1748		memcpy(&tmp_val, nval, sizeof(*nval));
1749		F_SET(&tmp_val, DB_DBT_PARTIAL);
1750		tmp_val.doff = 0;
1751		hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1752		if (HPAGE_PTYPE(hk) == H_OFFPAGE)
1753			memcpy(&tmp_val.dlen,
1754			    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
1755		else
1756			tmp_val.dlen = LEN_HDATA(dbp, hcp->page,
1757			    hcp->hdr->dbmeta.pagesize, hcp->indx);
1758		myval = &tmp_val;
1759	} else
1760		/* Regular partial put */
1761		myval = nval;
1762
1763	return (__ham_replpair(dbc, myval, 0));
1764}
1765
1766/*
1767 * Given a key and a cursor, sets the cursor to the page/ndx on which
1768 * the key resides.  If the key is found, the cursor H_OK flag is set
1769 * and the pagep, bndx, pgno (dpagep, dndx, dpgno) fields are set.
1770 * If the key is not found, the H_OK flag is not set.  If the sought
1771 * field is non-0, the pagep, bndx, pgno (dpagep, dndx, dpgno) fields
1772 * are set indicating where an add might take place.  If it is 0,
1773 * none of the cursor pointer field are valid.
1774 */
1775static int
1776__ham_lookup(dbc, key, sought, mode, pgnop)
1777	DBC *dbc;
1778	const DBT *key;
1779	u_int32_t sought;
1780	db_lockmode_t mode;
1781	db_pgno_t *pgnop;
1782{
1783	DB *dbp;
1784	HASH_CURSOR *hcp;
1785	db_pgno_t next_pgno;
1786	int match, ret;
1787	u_int8_t *dk;
1788
1789	dbp = dbc->dbp;
1790	hcp = (HASH_CURSOR *)dbc->internal;
1791
1792	/*
1793	 * Set up cursor so that we're looking for space to add an item
1794	 * as we cycle through the pages looking for the key.
1795	 */
1796	if ((ret = __ham_item_reset(dbc)) != 0)
1797		return (ret);
1798	hcp->seek_size = sought;
1799
1800	hcp->bucket = __ham_call_hash(dbc, (u_int8_t *)key->data, key->size);
1801	hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
1802	/* look though all pages in the bucket for the key */
1803	if ((ret = __ham_get_cpage(dbc, mode)) != 0)
1804		return (ret);
1805
1806	*pgnop = PGNO_INVALID;
1807	if (hcp->indx == NDX_INVALID) {
1808		hcp->indx = 0;
1809		F_CLR(hcp, H_ISDUP);
1810	}
1811	while (hcp->pgno != PGNO_INVALID) {
1812		/* Are we looking for space to insert an item. */
1813		if (hcp->seek_size != 0 &&
1814		    hcp->seek_found_page == PGNO_INVALID &&
1815		    hcp->seek_size < P_FREESPACE(dbp, hcp->page)) {
1816			hcp->seek_found_page = hcp->pgno;
1817			hcp->seek_found_indx = NDX_INVALID;
1818		    }
1819
1820		if ((ret = __ham_getindex(dbc, hcp->page, key,
1821		    H_KEYDATA, &match, &hcp->indx)) != 0)
1822			return (ret);
1823
1824		/*
1825		 * If this is the first page in the bucket with space for
1826		 * inserting the requested item. Store the insert index to
1827		 * save having to look it up again later.
1828		 */
1829		if (hcp->seek_found_page == hcp->pgno)
1830		    hcp->seek_found_indx = hcp->indx;
1831
1832		if (match == 0) {
1833			F_SET(hcp, H_OK);
1834			dk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1835			if (HPAGE_PTYPE(dk) == H_OFFDUP)
1836				memcpy(pgnop, HOFFDUP_PGNO(dk),
1837				    sizeof(db_pgno_t));
1838			return (0);
1839		}
1840
1841		/* move the cursor to the next page. */
1842		if (NEXT_PGNO(hcp->page) == PGNO_INVALID)
1843			break;
1844		next_pgno = NEXT_PGNO(hcp->page);
1845		hcp->indx = 0;
1846		if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0)
1847			return (ret);
1848	}
1849	F_SET(hcp, H_NOMORE);
1850	return (DB_NOTFOUND);
1851}
1852
1853/*
1854 * __ham_init_dbt --
1855 *	Initialize a dbt using some possibly already allocated storage
1856 *	for items.
1857 *
1858 * PUBLIC: int __ham_init_dbt __P((ENV *,
1859 * PUBLIC:     DBT *, u_int32_t, void **, u_int32_t *));
1860 */
1861int
1862__ham_init_dbt(env, dbt, size, bufp, sizep)
1863	ENV *env;
1864	DBT *dbt;
1865	u_int32_t size;
1866	void **bufp;
1867	u_int32_t *sizep;
1868{
1869	int ret;
1870
1871	memset(dbt, 0, sizeof(*dbt));
1872	if (*sizep < size) {
1873		if ((ret = __os_realloc(env, size, bufp)) != 0) {
1874			*sizep = 0;
1875			return (ret);
1876		}
1877		*sizep = size;
1878	}
1879	dbt->data = *bufp;
1880	dbt->size = size;
1881	return (0);
1882}
1883
1884/*
1885 * Adjust the cursor after an insert or delete.  The cursor passed is
1886 * the one that was operated upon; we just need to check any of the
1887 * others.
1888 *
1889 * len indicates the length of the item added/deleted
1890 * add indicates if the item indicated by the cursor has just been
1891 * added (add == 1) or deleted (add == 0).
1892 * dup indicates if the addition occurred into a duplicate set.
1893 *
1894 * PUBLIC: int __hamc_update
1895 * PUBLIC:    __P((DBC *, u_int32_t, db_ham_curadj, int));
1896 */
1897int
1898__hamc_update(dbc, len, operation, is_dup)
1899	DBC *dbc;
1900	u_int32_t len;
1901	db_ham_curadj operation;
1902	int is_dup;
1903{
1904	DB *dbp, *ldbp;
1905	DBC *cp;
1906	DB_LSN lsn;
1907	DB_TXN *my_txn;
1908	ENV *env;
1909	HASH_CURSOR *hcp, *lcp;
1910	int found, ret, was_mod, was_add;
1911	u_int32_t order;
1912
1913	dbp = dbc->dbp;
1914	env = dbp->env;
1915	hcp = (HASH_CURSOR *)dbc->internal;
1916
1917	/*
1918	 * Adjustment will only be logged if this is a subtransaction.
1919	 * Only subtransactions can abort and effect their parent
1920	 * transactions cursors.
1921	 */
1922
1923	my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
1924	found = 0;
1925
1926	MUTEX_LOCK(env, env->mtx_dblist);
1927
1928	switch (operation) {
1929	case DB_HAM_CURADJ_DEL:
1930		was_mod = 0;
1931		was_add = 0;
1932		break;
1933	case DB_HAM_CURADJ_ADD:
1934		was_mod = 0;
1935		was_add = 1;
1936		break;
1937	case DB_HAM_CURADJ_DELMOD:
1938		was_mod = 1;
1939		was_add = 0;
1940		break;
1941	case DB_HAM_CURADJ_ADDMOD:
1942		was_mod = 1;
1943		was_add = 1;
1944		break;
1945	default:
1946		return (EINVAL);
1947	}
1948
1949	/*
1950	 * Calculate the order of this deleted record.
1951	 * This will be one greater than any cursor that is pointing
1952	 * at this record and already marked as deleted.
1953	 */
1954	order = 0;
1955	if (was_add == 0) {
1956		FIND_FIRST_DB_MATCH(env, dbp, ldbp);
1957		for (order = 1;
1958		    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
1959		    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
1960			MUTEX_LOCK(env, dbp->mutex);
1961			TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
1962				if (cp == dbc || cp->dbtype != DB_HASH)
1963					continue;
1964				lcp = (HASH_CURSOR *)cp->internal;
1965				if (F_ISSET(lcp, H_DELETED) &&
1966				    hcp->pgno == lcp->pgno &&
1967				    hcp->indx == lcp->indx &&
1968				    order <= lcp->order &&
1969				    (!is_dup || hcp->dup_off == lcp->dup_off) &&
1970				    !MVCC_SKIP_CURADJ(cp, lcp->pgno))
1971					order = lcp->order + 1;
1972			}
1973			MUTEX_UNLOCK(env, dbp->mutex);
1974		}
1975		hcp->order = order;
1976	}
1977
1978	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
1979	for (;
1980	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
1981	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
1982		MUTEX_LOCK(env, dbp->mutex);
1983		TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
1984			if (cp == dbc || cp->dbtype != DB_HASH)
1985				continue;
1986
1987			lcp = (HASH_CURSOR *)cp->internal;
1988
1989			if (lcp->pgno != hcp->pgno ||
1990			    lcp->indx == NDX_INVALID ||
1991			    MVCC_SKIP_CURADJ(cp, lcp->pgno))
1992				continue;
1993
1994			/*
1995			 * We're about to move things out from under this
1996			 * cursor.  Clear any cached streaming information.
1997			 */
1998			lcp->stream_start_pgno = PGNO_INVALID;
1999
2000			if (my_txn != NULL && cp->txn != my_txn)
2001				found = 1;
2002
2003			if (!is_dup) {
2004				if (was_add == 1) {
2005					/*
2006					 * This routine is not called to add
2007					 * non-dup records which are always put
2008					 * at the end.  It is only called from
2009					 * recovery in this case and the
2010					 * cursor will be marked deleted.
2011					 * We are "undeleting" so unmark all
2012					 * cursors with the same order.
2013					 */
2014					if (lcp->indx == hcp->indx &&
2015					    F_ISSET(lcp, H_DELETED)) {
2016						if (lcp->order == hcp->order)
2017							F_CLR(lcp, H_DELETED);
2018						else if (lcp->order >
2019						    hcp->order) {
2020
2021						/*
2022						 * If we've moved this cursor's
2023						 * index, split its order
2024						 * number--i.e., decrement it by
2025						 * enough so that the lowest
2026						 * cursor moved has order 1.
2027						 * cp_arg->order is the split
2028						 * point, so decrement by it.
2029						 */
2030							lcp->order -=
2031							    hcp->order;
2032							lcp->indx += 2;
2033						}
2034					} else if (lcp->indx >= hcp->indx)
2035						lcp->indx += 2;
2036				} else {
2037					if (lcp->indx > hcp->indx) {
2038						lcp->indx -= 2;
2039						if (lcp->indx == hcp->indx &&
2040						    F_ISSET(lcp, H_DELETED))
2041							lcp->order += order;
2042					} else if (lcp->indx == hcp->indx &&
2043					    !F_ISSET(lcp, H_DELETED)) {
2044						F_SET(lcp, H_DELETED);
2045						F_CLR(lcp, H_ISDUP);
2046						lcp->order = order;
2047					}
2048				}
2049			} else if (lcp->indx == hcp->indx) {
2050				/*
2051				 * Handle duplicates.  This routine is only
2052				 * called for on page dups. Off page dups are
2053				 * handled by btree/rtree code.
2054				 */
2055				if (was_add == 1) {
2056					lcp->dup_tlen += len;
2057					if (lcp->dup_off == hcp->dup_off &&
2058					    F_ISSET(hcp, H_DELETED) &&
2059					    F_ISSET(lcp, H_DELETED)) {
2060						/* Abort of a delete. */
2061						if (lcp->order == hcp->order)
2062							F_CLR(lcp, H_DELETED);
2063						else if (lcp->order >
2064						    hcp->order) {
2065							lcp->order -=
2066							    (hcp->order -1);
2067							lcp->dup_off += len;
2068						}
2069					} else if (lcp->dup_off >
2070					    hcp->dup_off || (!was_mod &&
2071					    lcp->dup_off == hcp->dup_off))
2072						lcp->dup_off += len;
2073				} else {
2074					lcp->dup_tlen -= len;
2075					if (lcp->dup_off > hcp->dup_off) {
2076						lcp->dup_off -= len;
2077						if (lcp->dup_off ==
2078						    hcp->dup_off &&
2079						    F_ISSET(lcp, H_DELETED))
2080							lcp->order += order;
2081					} else if (!was_mod &&
2082					    lcp->dup_off == hcp->dup_off &&
2083					    !F_ISSET(lcp, H_DELETED)) {
2084						F_SET(lcp, H_DELETED);
2085						lcp->order = order;
2086					}
2087				}
2088			}
2089		}
2090		MUTEX_UNLOCK(env, dbp->mutex);
2091	}
2092	MUTEX_UNLOCK(env, env->mtx_dblist);
2093
2094	if (found != 0 && DBC_LOGGING(dbc)) {
2095		if ((ret = __ham_curadj_log(dbp, my_txn, &lsn, 0, hcp->pgno,
2096		    hcp->indx, len, hcp->dup_off, (int)operation, is_dup,
2097		    order)) != 0)
2098			return (ret);
2099	}
2100
2101	return (0);
2102}
2103
2104/*
2105 * __ham_get_clist --
2106 *
2107 * Get a list of cursors either on a particular bucket or on a particular
2108 * page and index combination.  The former is so that we can update
2109 * cursors on a split.  The latter is so we can update cursors when we
2110 * move items off page.
2111 *
2112 * PUBLIC: int __ham_get_clist __P((DB *, db_pgno_t, u_int32_t, DBC ***));
2113 */
2114int
2115__ham_get_clist(dbp, pgno, indx, listp)
2116	DB *dbp;
2117	db_pgno_t pgno;
2118	u_int32_t indx;
2119	DBC ***listp;
2120{
2121	DB *ldbp;
2122	DBC *dbc;
2123	ENV *env;
2124	u_int nalloc, nused;
2125	int ret;
2126
2127	*listp = NULL;
2128	env = dbp->env;
2129	nalloc = nused = 0;
2130
2131	/*
2132	 * Assume that finding anything is the exception, so optimize for
2133	 * the case where there aren't any.
2134	 */
2135	MUTEX_LOCK(env, env->mtx_dblist);
2136	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
2137	for (;
2138	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
2139	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
2140		MUTEX_LOCK(env, dbp->mutex);
2141		TAILQ_FOREACH(dbc, &ldbp->active_queue, links)
2142			/*
2143			 * We match if dbc->pgno matches the specified
2144			 * pgno, and if either the dbc->indx matches
2145			 * or we weren't given an index.
2146			 */
2147			if (dbc->internal->pgno == pgno &&
2148			    (indx == NDX_INVALID ||
2149			    dbc->internal->indx == indx) &&
2150			    !MVCC_SKIP_CURADJ(dbc, pgno)) {
2151				if (nused >= nalloc) {
2152					nalloc += 10;
2153					if ((ret = __os_realloc(dbp->env,
2154					    nalloc * sizeof(HASH_CURSOR *),
2155					    listp)) != 0)
2156						goto err;
2157				}
2158				(*listp)[nused++] = dbc;
2159			}
2160
2161		MUTEX_UNLOCK(dbp->env, dbp->mutex);
2162	}
2163	MUTEX_UNLOCK(env, env->mtx_dblist);
2164
2165	if (listp != NULL) {
2166		if (nused >= nalloc) {
2167			nalloc++;
2168			if ((ret = __os_realloc(dbp->env,
2169			    nalloc * sizeof(HASH_CURSOR *), listp)) != 0)
2170				return (ret);
2171		}
2172		(*listp)[nused] = NULL;
2173	}
2174	return (0);
2175err:
2176	MUTEX_UNLOCK(dbp->env, dbp->mutex);
2177	MUTEX_UNLOCK(env, env->mtx_dblist);
2178	return (ret);
2179}
2180
2181static int
2182__hamc_writelock(dbc)
2183	DBC *dbc;
2184{
2185	DB_LOCK tmp_lock;
2186	HASH_CURSOR *hcp;
2187	int ret;
2188
2189	/*
2190	 * All we need do is acquire the lock and let the off-page
2191	 * dup tree do its thing.
2192	 */
2193	if (!STD_LOCKING(dbc))
2194		return (0);
2195
2196	hcp = (HASH_CURSOR *)dbc->internal;
2197	ret = 0;
2198	if ((!LOCK_ISSET(hcp->lock) || hcp->lock_mode != DB_LOCK_WRITE)) {
2199		tmp_lock = hcp->lock;
2200		if ((ret = __ham_lock_bucket(dbc, DB_LOCK_WRITE)) == 0 &&
2201		    tmp_lock.mode != DB_LOCK_WWRITE)
2202			ret = __LPUT(dbc, tmp_lock);
2203	}
2204	return (ret);
2205}
2206