1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994
8 *	Margo Seltzer.  All rights reserved.
9 */
10/*
11 * Copyright (c) 1990, 1993, 1994
12 *	The Regents of the University of California.  All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Margo Seltzer.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 *    notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 *    notice, this list of conditions and the following disclaimer in the
24 *    documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 *    may be used to endorse or promote products derived from this software
27 *    without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id: hash.c,v 12.46 2008/01/08 20:58:33 bostic Exp $
42 */
43
44#include "db_config.h"
45
46#include "db_int.h"
47#include "dbinc/db_page.h"
48#include "dbinc/btree.h"
49#include "dbinc/hash.h"
50#include "dbinc/lock.h"
51#include "dbinc/mp.h"
52
53static int  __ham_bulk __P((DBC *, DBT *, u_int32_t));
54static int  __hamc_close __P((DBC *, db_pgno_t, int *));
55static int  __hamc_del __P((DBC *));
56static int  __hamc_destroy __P((DBC *));
57static int  __hamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
58static int  __hamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
59static int  __hamc_writelock __P((DBC *));
60static int  __ham_dup_return __P((DBC *, DBT *, u_int32_t));
61static int  __ham_expand_table __P((DBC *));
62static int  __ham_lookup __P((DBC *,
63		const DBT *, u_int32_t, db_lockmode_t, db_pgno_t *));
64static int  __ham_overwrite __P((DBC *, DBT *, u_int32_t));
65
66/*
67 * __ham_quick_delete --
68 *	This function is called by __db_del when the appropriate conditions
69 *	are met, and it performs the delete in the optimized way.
70 *
71 * PUBLIC: int __ham_quick_delete __P((DBC *));
72 */
73int
74__ham_quick_delete(dbc)
75	DBC *dbc;
76{
77	int ret, t_ret;
78
79	/*
80	 * When performing a DB->del operation not involving secondary indices
81	 * and not removing an off-page duplicate tree, we can speed things up
82	 * substantially by removing the entire duplicate set, if any is
83	 * present, in one operation, rather than by conjuring up and deleting
84	 * each of the items individually.  (All are stored in one big HKEYDATA
85	 * structure.)  We don't bother to distinguish on-page duplicate sets
86	 * from single, non-dup items;  they're deleted in exactly the same way.
87	 *
88	 * The cursor should be set to the first item in the duplicate set, or
89	 * to the sole key/data pair when the key does not have a duplicate set,
90	 * before the function is called.
91	 *
92	 * We do not need to call CDB_LOCKING_INIT, __db_del calls here with
93	 * a write cursor.
94	 *
95	 * Assert we're initialized, but not to an off-page duplicate.
96	 * Assert we're not using secondary indices.
97	 */
98	DB_ASSERT(dbc->env, IS_INITIALIZED(dbc));
99	DB_ASSERT(dbc->env, dbc->internal->opd == NULL);
100	DB_ASSERT(dbc->env, !F_ISSET(dbc->dbp, DB_AM_SECONDARY));
101	DB_ASSERT(dbc->env,
102	    LIST_FIRST(&dbc->dbp->s_secondaries) == NULL);
103
104	if ((ret = __ham_get_meta(dbc)) != 0)
105		return (ret);
106
107	if ((ret = __hamc_writelock(dbc)) == 0)
108		ret = __ham_del_pair(dbc, 0);
109
110	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
111		ret = t_ret;
112
113	return (ret);
114}
115
116/* ****************** CURSORS ********************************** */
117/*
118 * __hamc_init --
119 *	Initialize the hash-specific portion of a cursor.
120 *
121 * PUBLIC: int __hamc_init __P((DBC *));
122 */
123int
124__hamc_init(dbc)
125	DBC *dbc;
126{
127	ENV *env;
128	HASH_CURSOR *new_curs;
129	int ret;
130
131	env = dbc->env;
132	if ((ret = __os_calloc(env,
133	    1, sizeof(struct cursor_t), &new_curs)) != 0)
134		return (ret);
135	if ((ret = __os_malloc(env,
136	    dbc->dbp->pgsize, &new_curs->split_buf)) != 0) {
137		__os_free(env, new_curs);
138		return (ret);
139	}
140
141	dbc->internal = (DBC_INTERNAL *) new_curs;
142	dbc->close = dbc->c_close = __dbc_close_pp;
143	dbc->count = dbc->c_count = __dbc_count_pp;
144	dbc->del = dbc->c_del = __dbc_del_pp;
145	dbc->dup = dbc->c_dup = __dbc_dup_pp;
146	dbc->get = dbc->c_get = __dbc_get_pp;
147	dbc->pget = dbc->c_pget = __dbc_pget_pp;
148	dbc->put = dbc->c_put = __dbc_put_pp;
149	dbc->am_bulk = __ham_bulk;
150	dbc->am_close = __hamc_close;
151	dbc->am_del = __hamc_del;
152	dbc->am_destroy = __hamc_destroy;
153	dbc->am_get = __hamc_get;
154	dbc->am_put = __hamc_put;
155	dbc->am_writelock = __hamc_writelock;
156
157	return (__ham_item_init(dbc));
158}
159
160/*
161 * __hamc_close --
162 *	Close down the cursor from a single use.
163 */
164static int
165__hamc_close(dbc, root_pgno, rmroot)
166	DBC *dbc;
167	db_pgno_t root_pgno;
168	int *rmroot;
169{
170	DB_MPOOLFILE *mpf;
171	HASH_CURSOR *hcp;
172	HKEYDATA *dp;
173	db_lockmode_t lock_mode;
174	int doroot, gotmeta, ret, t_ret;
175
176	COMPQUIET(rmroot, 0);
177	mpf = dbc->dbp->mpf;
178	doroot = gotmeta = ret = 0;
179	hcp = (HASH_CURSOR *) dbc->internal;
180
181	/* Check for off page dups. */
182	if (dbc->internal->opd != NULL) {
183		if ((ret = __ham_get_meta(dbc)) != 0)
184			goto done;
185		gotmeta = 1;
186		lock_mode = DB_LOCK_READ;
187
188		/* To support dirty reads we must reget the write lock. */
189		if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED) &&
190		     F_ISSET((BTREE_CURSOR *)
191		     dbc->internal->opd->internal, C_DELETED))
192			lock_mode = DB_LOCK_WRITE;
193
194		if ((ret = __ham_get_cpage(dbc, lock_mode)) != 0)
195			goto out;
196		dp = (HKEYDATA *)H_PAIRDATA(dbc->dbp, hcp->page, hcp->indx);
197
198		/* If it's not a dup we aborted before we changed it. */
199		if (HPAGE_PTYPE(dp) == H_OFFDUP)
200			memcpy(&root_pgno,
201			    HOFFPAGE_PGNO(dp), sizeof(db_pgno_t));
202		else
203			root_pgno = PGNO_INVALID;
204
205		if ((ret =
206		    hcp->opd->am_close(hcp->opd, root_pgno, &doroot)) != 0)
207			goto out;
208		if (doroot != 0) {
209			if ((ret = __memp_dirty(mpf, &hcp->page,
210			    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
211				goto out;
212			if ((ret = __ham_del_pair(dbc, 0)) != 0)
213				goto out;
214		}
215	}
216
217out:	if (hcp->page != NULL && (t_ret = __memp_fput(mpf,
218	    dbc->thread_info, hcp->page, dbc->priority)) != 0 && ret == 0)
219		ret = t_ret;
220	if (gotmeta != 0 && (t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
221		ret = t_ret;
222
223done:	if ((t_ret = __ham_item_init(dbc)) != 0 && ret == 0)
224		ret = t_ret;
225	return (ret);
226}
227
228/*
229 * __hamc_destroy --
230 *	Cleanup the access method private part of a cursor.
231 */
232static int
233__hamc_destroy(dbc)
234	DBC *dbc;
235{
236	HASH_CURSOR *hcp;
237
238	hcp = (HASH_CURSOR *)dbc->internal;
239	if (hcp->split_buf != NULL)
240		__os_free(dbc->env, hcp->split_buf);
241	__os_free(dbc->env, hcp);
242
243	return (0);
244}
245
246/*
247 * __hamc_count --
248 *	Return a count of on-page duplicates.
249 *
250 * PUBLIC: int __hamc_count __P((DBC *, db_recno_t *));
251 */
252int
253__hamc_count(dbc, recnop)
254	DBC *dbc;
255	db_recno_t *recnop;
256{
257	DB *dbp;
258	DB_MPOOLFILE *mpf;
259	HASH_CURSOR *hcp;
260	db_indx_t len;
261	db_recno_t recno;
262	int ret, t_ret;
263	u_int8_t *p, *pend;
264
265	dbp = dbc->dbp;
266	mpf = dbp->mpf;
267	hcp = (HASH_CURSOR *)dbc->internal;
268
269	recno = 0;
270
271	if ((ret = __ham_get_cpage(dbc, DB_LOCK_READ)) != 0)
272		return (ret);
273	if (hcp->indx >= NUM_ENT(hcp->page)) {
274		*recnop = 0;
275		goto err;
276	}
277
278	switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) {
279	case H_KEYDATA:
280	case H_OFFPAGE:
281		recno = 1;
282		break;
283	case H_DUPLICATE:
284		p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
285		pend = p +
286		    LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx);
287		for (; p < pend; recno++) {
288			/* p may be odd, so copy rather than just dereffing */
289			memcpy(&len, p, sizeof(db_indx_t));
290			p += 2 * sizeof(db_indx_t) + len;
291		}
292
293		break;
294	default:
295		ret = __db_pgfmt(dbp->env, hcp->pgno);
296		goto err;
297	}
298
299	*recnop = recno;
300
301err:	if ((t_ret = __memp_fput(mpf,
302	     dbc->thread_info, hcp->page, dbc->priority)) != 0 && ret == 0)
303		ret = t_ret;
304	hcp->page = NULL;
305	return (ret);
306}
307
308static int
309__hamc_del(dbc)
310	DBC *dbc;
311{
312	DB *dbp;
313	DBT repldbt;
314	DB_MPOOLFILE *mpf;
315	HASH_CURSOR *hcp;
316	int ret, t_ret;
317
318	dbp = dbc->dbp;
319	mpf = dbp->mpf;
320	hcp = (HASH_CURSOR *)dbc->internal;
321
322	if (F_ISSET(hcp, H_DELETED))
323		return (DB_NOTFOUND);
324
325	if ((ret = __ham_get_meta(dbc)) != 0)
326		goto out;
327
328	if ((ret = __ham_get_cpage(dbc, DB_LOCK_WRITE)) != 0)
329		goto out;
330
331	/* Off-page duplicates. */
332	if (HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP)
333		goto out;
334
335	if ((ret = __memp_dirty(mpf,
336	    &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
337		goto out;
338
339	if (F_ISSET(hcp, H_ISDUP)) { /* On-page duplicate. */
340		if (hcp->dup_off == 0 &&
341		    DUP_SIZE(hcp->dup_len) == LEN_HDATA(dbp, hcp->page,
342		    hcp->hdr->dbmeta.pagesize, hcp->indx))
343			ret = __ham_del_pair(dbc, 0);
344		else {
345			repldbt.flags = 0;
346			F_SET(&repldbt, DB_DBT_PARTIAL);
347			repldbt.doff = hcp->dup_off;
348			repldbt.dlen = DUP_SIZE(hcp->dup_len);
349			repldbt.size = 0;
350			repldbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page,
351			    hcp->indx));
352			if ((ret = __ham_replpair(dbc, &repldbt, 0)) == 0) {
353				hcp->dup_tlen -= DUP_SIZE(hcp->dup_len);
354				F_SET(hcp, H_DELETED);
355				ret = __hamc_update(dbc, DUP_SIZE(hcp->dup_len),
356				    DB_HAM_CURADJ_DEL, 1);
357			}
358		}
359	} else /* Not a duplicate */
360		ret = __ham_del_pair(dbc, 0);
361
362out:	if (hcp->page != NULL) {
363		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
364		    hcp->page, dbc->priority)) != 0 && ret == 0)
365			ret = t_ret;
366		hcp->page = NULL;
367	}
368	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
369		ret = t_ret;
370	return (ret);
371}
372
373/*
374 * __hamc_dup --
375 *	Duplicate a hash cursor, such that the new one holds appropriate
376 *	locks for the position of the original.
377 *
378 * PUBLIC: int __hamc_dup __P((DBC *, DBC *));
379 */
380int
381__hamc_dup(orig_dbc, new_dbc)
382	DBC *orig_dbc, *new_dbc;
383{
384	HASH_CURSOR *orig, *new;
385
386	orig = (HASH_CURSOR *)orig_dbc->internal;
387	new = (HASH_CURSOR *)new_dbc->internal;
388
389	new->bucket = orig->bucket;
390	new->lbucket = orig->lbucket;
391	new->dup_off = orig->dup_off;
392	new->dup_len = orig->dup_len;
393	new->dup_tlen = orig->dup_tlen;
394
395	if (F_ISSET(orig, H_DELETED))
396		F_SET(new, H_DELETED);
397	if (F_ISSET(orig, H_ISDUP))
398		F_SET(new, H_ISDUP);
399
400	return (0);
401}
402
403static int
404__hamc_get(dbc, key, data, flags, pgnop)
405	DBC *dbc;
406	DBT *key;
407	DBT *data;
408	u_int32_t flags;
409	db_pgno_t *pgnop;
410{
411	DB *dbp;
412	DB_MPOOLFILE *mpf;
413	ENV *env;
414	HASH_CURSOR *hcp;
415	db_lockmode_t lock_type;
416	int ret, t_ret;
417
418	hcp = (HASH_CURSOR *)dbc->internal;
419	dbp = dbc->dbp;
420	env = dbp->env;
421	mpf = dbp->mpf;
422
423	/* Clear OR'd in additional bits so we can check for flag equality. */
424	if (F_ISSET(dbc, DBC_RMW))
425		lock_type = DB_LOCK_WRITE;
426	else
427		lock_type = DB_LOCK_READ;
428
429	if ((ret = __ham_get_meta(dbc)) != 0)
430		return (ret);
431	hcp->seek_size = 0;
432
433	ret = 0;
434	switch (flags) {
435	case DB_PREV_DUP:
436		F_SET(hcp, H_DUPONLY);
437		goto prev;
438	case DB_PREV_NODUP:
439		F_SET(hcp, H_NEXT_NODUP);
440		/* FALLTHROUGH */
441	case DB_PREV:
442		if (IS_INITIALIZED(dbc)) {
443prev:			ret = __ham_item_prev(dbc, lock_type, pgnop);
444			break;
445		}
446		/* FALLTHROUGH */
447	case DB_LAST:
448		ret = __ham_item_last(dbc, lock_type, pgnop);
449		break;
450	case DB_NEXT_DUP:
451	case DB_GET_BOTHC:
452		/* cgetchk has already determined that the cursor is set. */
453		F_SET(hcp, H_DUPONLY);
454		goto next;
455	case DB_NEXT_NODUP:
456		F_SET(hcp, H_NEXT_NODUP);
457		/* FALLTHROUGH */
458	case DB_NEXT:
459		if (IS_INITIALIZED(dbc)) {
460next:			ret = __ham_item_next(dbc, lock_type, pgnop);
461			break;
462		}
463		/* FALLTHROUGH */
464	case DB_FIRST:
465		ret = __ham_item_first(dbc, lock_type, pgnop);
466		break;
467	case DB_SET:
468	case DB_SET_RANGE:
469	case DB_GET_BOTH:
470	case DB_GET_BOTH_RANGE:
471		ret = __ham_lookup(dbc, key, 0, lock_type, pgnop);
472		break;
473	case DB_CURRENT:
474		/* cgetchk has already determined that the cursor is set. */
475		if (F_ISSET(hcp, H_DELETED)) {
476			ret = DB_KEYEMPTY;
477			goto err;
478		}
479
480		ret = __ham_item(dbc, lock_type, pgnop);
481		break;
482	default:
483		ret = __db_unknown_flag(env, "__hamc_get", flags);
484		break;
485	}
486
487	/*
488	 * Must always enter this loop to do error handling and
489	 * check for big key/data pair.
490	 */
491	for (;;) {
492		if (ret != 0 && ret != DB_NOTFOUND)
493			goto err;
494		else if (F_ISSET(hcp, H_OK)) {
495			if (*pgnop == PGNO_INVALID)
496				ret = __ham_dup_return(dbc, data, flags);
497			break;
498		} else if (!F_ISSET(hcp, H_NOMORE)) {
499			__db_errx(env, "H_NOMORE returned to __hamc_get");
500			ret = EINVAL;
501			break;
502		}
503
504		/*
505		 * Ran out of entries in a bucket; change buckets.
506		 */
507		switch (flags) {
508		case DB_LAST:
509		case DB_PREV:
510		case DB_PREV_DUP:
511		case DB_PREV_NODUP:
512			ret = __memp_fput(mpf,
513			    dbc->thread_info, hcp->page, dbc->priority);
514			hcp->page = NULL;
515			if (hcp->bucket == 0) {
516				ret = DB_NOTFOUND;
517				hcp->pgno = PGNO_INVALID;
518				goto err;
519			}
520			F_CLR(hcp, H_ISDUP);
521			hcp->bucket--;
522			hcp->indx = NDX_INVALID;
523			hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
524			if (ret == 0)
525				ret = __ham_item_prev(dbc, lock_type, pgnop);
526			break;
527		case DB_FIRST:
528		case DB_NEXT:
529		case DB_NEXT_NODUP:
530			ret = __memp_fput(mpf,
531			    dbc->thread_info, hcp->page, dbc->priority);
532			hcp->page = NULL;
533			hcp->indx = NDX_INVALID;
534			hcp->bucket++;
535			F_CLR(hcp, H_ISDUP);
536			hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
537			if (hcp->bucket > hcp->hdr->max_bucket) {
538				ret = DB_NOTFOUND;
539				hcp->pgno = PGNO_INVALID;
540				goto err;
541			}
542			if (ret == 0)
543				ret = __ham_item_next(dbc, lock_type, pgnop);
544			break;
545		case DB_GET_BOTH:
546		case DB_GET_BOTHC:
547		case DB_GET_BOTH_RANGE:
548		case DB_NEXT_DUP:
549		case DB_SET:
550		case DB_SET_RANGE:
551			/* Key not found. */
552			ret = DB_NOTFOUND;
553			goto err;
554		case DB_CURRENT:
555			/*
556			 * This should only happen if you are doing deletes and
557			 * reading with concurrent threads and not doing proper
558			 * locking.  We return the same error code as we would
559			 * if the cursor were deleted.
560			 */
561			ret = DB_KEYEMPTY;
562			goto err;
563		default:
564			DB_ASSERT(env, 0);
565		}
566	}
567
568err:	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
569		ret = t_ret;
570
571	F_CLR(hcp, H_DUPONLY);
572	F_CLR(hcp, H_NEXT_NODUP);
573
574	return (ret);
575}
576
577/*
578 * __ham_bulk -- Return bulk data from a hash table.
579 */
580static int
581__ham_bulk(dbc, data, flags)
582	DBC *dbc;
583	DBT *data;
584	u_int32_t flags;
585{
586	DB *dbp;
587	DB_MPOOLFILE *mpf;
588	HASH_CURSOR *cp;
589	PAGE *pg;
590	db_indx_t dup_len, dup_off, dup_tlen, indx, *inp;
591	db_lockmode_t lock_mode;
592	db_pgno_t pgno;
593	int32_t *endp, *offp, *saveoff;
594	u_int32_t key_off, key_size, pagesize, size, space;
595	u_int8_t *dbuf, *dp, *hk, *np, *tmp;
596	int is_dup, is_key;
597	int need_pg, next_key, no_dup, ret, t_ret;
598
599	ret = 0;
600	key_off = 0;
601	dup_len = dup_off = dup_tlen = 0;
602	size = 0;
603	dbp = dbc->dbp;
604	pagesize = dbp->pgsize;
605	mpf = dbp->mpf;
606	cp = (HASH_CURSOR *)dbc->internal;
607	is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
608	next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP;
609	no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP;
610	dbuf = data->data;
611	np = dp = dbuf;
612
613	/* Keep track of space that is left.  There is an termination entry */
614	space = data->ulen;
615	space -= sizeof(*offp);
616
617	/* Build the offset/size table from the end up. */
618	endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
619	endp--;
620	offp = endp;
621
622	key_size = 0;
623	lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE: DB_LOCK_READ;
624
625next_pg:
626	need_pg = 1;
627	indx = cp->indx;
628	pg = cp->page;
629	inp = P_INP(dbp, pg);
630
631	do {
632		if (is_key) {
633			hk = H_PAIRKEY(dbp, pg, indx);
634			if (HPAGE_PTYPE(hk) == H_OFFPAGE) {
635				memcpy(&key_size,
636				    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
637				memcpy(&pgno,
638				    HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
639				size = key_size;
640				if (key_size > space)
641					goto get_key_space;
642				if ((ret = __bam_bulk_overflow(
643				    dbc, key_size, pgno, np)) != 0)
644					return (ret);
645				space -= key_size;
646				key_off = (u_int32_t)(np - dbuf);
647				np += key_size;
648			} else {
649				if (need_pg) {
650					dp = np;
651					size = pagesize - HOFFSET(pg);
652					if (space < size) {
653get_key_space:
654						if (offp == endp) {
655							data->size = (u_int32_t)
656							    DB_ALIGN(size +
657							    pagesize, 1024);
658							return
659							    (DB_BUFFER_SMALL);
660						}
661						goto back_up;
662					}
663					memcpy(dp,
664					   (u_int8_t *)pg + HOFFSET(pg), size);
665					need_pg = 0;
666					space -= size;
667					np += size;
668				}
669				key_size = LEN_HKEY(dbp, pg, pagesize, indx);
670				key_off = ((inp[indx] - HOFFSET(pg)) +
671				    (u_int32_t)(dp - dbuf)) +
672				    SSZA(HKEYDATA, data);
673			}
674		}
675
676		hk = H_PAIRDATA(dbp, pg, indx);
677		switch (HPAGE_PTYPE(hk)) {
678		case H_DUPLICATE:
679		case H_KEYDATA:
680			if (need_pg) {
681				dp = np;
682				size = pagesize - HOFFSET(pg);
683				if (space < size) {
684back_up:
685					if (indx != 0) {
686						indx -= 2;
687						/* XXX
688						 * It's not clear that this is
689						 * the right way to fix this,
690						 * but here goes.
691						 * If we are backing up onto a
692						 * duplicate, then we need to
693						 * position ourselves at the
694						 * end of the duplicate set.
695						 * We probably need to make
696						 * this work for H_OFFDUP too.
697						 * It might be worth making a
698						 * dummy cursor and calling
699						 * __ham_item_prev.
700						 */
701						tmp = H_PAIRDATA(dbp, pg, indx);
702						if (HPAGE_PTYPE(tmp) ==
703						    H_DUPLICATE) {
704							dup_off = dup_tlen =
705							    LEN_HDATA(dbp, pg,
706							    pagesize, indx + 1);
707							memcpy(&dup_len,
708							    HKEYDATA_DATA(tmp),
709							    sizeof(db_indx_t));
710						} else  {
711							is_dup = 0;
712							dup_len = 0;
713							dup_off = 0;
714							dup_tlen = 0;
715							F_CLR(cp, H_ISDUP);
716						}
717						goto get_space;
718					}
719					/* indx == 0 */
720					cp->dup_len = dup_len;
721					cp->dup_off = dup_off;
722					cp->dup_tlen = dup_tlen;
723					if ((ret = __ham_item_prev(dbc,
724					    lock_mode, &pgno)) != 0) {
725						if (ret != DB_NOTFOUND)
726							return (ret);
727						if ((ret = __memp_fput(mpf,
728						    dbc->thread_info, cp->page,
729						    dbc->priority)) != 0)
730							return (ret);
731						cp->page = NULL;
732						if (cp->bucket == 0) {
733							cp->indx = indx =
734							    NDX_INVALID;
735							goto get_space;
736						}
737						if ((ret =
738						    __ham_get_meta(dbc)) != 0)
739							return (ret);
740
741						cp->bucket--;
742						cp->pgno = BUCKET_TO_PAGE(cp,
743						    cp->bucket);
744						cp->indx = NDX_INVALID;
745						if ((ret = __ham_release_meta(
746						    dbc)) != 0)
747							return (ret);
748						if ((ret = __ham_item_prev(dbc,
749						    lock_mode, &pgno)) != 0)
750							return (ret);
751					}
752					indx = cp->indx;
753get_space:
754					/*
755					 * See if we put any data in the buffer.
756					 */
757					if (offp >= endp ||
758					    F_ISSET(dbc, DBC_TRANSIENT)) {
759						data->size = (u_int32_t)
760						    DB_ALIGN(size +
761						    data->ulen - space, 1024);
762						return (DB_BUFFER_SMALL);
763					}
764					/*
765					 * Don't continue;  we're all out
766					 * of space, even though we're
767					 * returning success.
768					 */
769					next_key = 0;
770					break;
771				}
772				memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size);
773				need_pg = 0;
774				space -= size;
775				np += size;
776			}
777
778			/*
779			 * We're about to crack the offset(s) and length(s)
780			 * out of an H_KEYDATA or H_DUPLICATE item.
781			 * There are three cases:
782			 *   1. We were moved into a duplicate set by
783			 *	the standard hash cursor code.  Respect
784			 *	the dup_off and dup_tlen we were given.
785			 *   2. We stumbled upon a duplicate set while
786			 *	walking the page on our own.  We need to
787			 *	recognize it as a dup and set dup_off and
788			 *	dup_tlen.
789			 *   3. The current item is not a dup.
790			 */
791			if (F_ISSET(cp, H_ISDUP)) {
792				/* Case 1 */
793				is_dup = 1;
794				dup_len = cp->dup_len;
795				dup_off = cp->dup_off;
796				dup_tlen = cp->dup_tlen;
797			} else if (HPAGE_PTYPE(hk) == H_DUPLICATE) {
798				/* Case 2 */
799				is_dup = 1;
800				/*
801				 * If we run out of memory and bail,
802				 * make sure the fact we're in a dup set
803				 * isn't ignored later.
804				 */
805				F_SET(cp, H_ISDUP);
806				dup_off = 0;
807				memcpy(&dup_len,
808				    HKEYDATA_DATA(hk), sizeof(db_indx_t));
809				dup_tlen = LEN_HDATA(dbp, pg, pagesize, indx);
810			} else {
811				/* Case 3 */
812				is_dup = 0;
813				dup_len = 0;
814				dup_off = 0;
815				dup_tlen = 0;
816			}
817
818			do {
819				space -= (is_key ? 4 : 2) * sizeof(*offp);
820				size += (is_key ? 4 : 2) * sizeof(*offp);
821				/*
822				 * Since space is an unsigned, if we happen
823				 * to wrap, then this comparison will turn out
824				 * to be true.  XXX Wouldn't it be better to
825				 * simply check above that space is greater than
826				 * the value we're about to subtract???
827				 */
828				if (space > data->ulen) {
829					if (!is_dup || dup_off == 0)
830						goto back_up;
831					dup_off -= (db_indx_t)
832					    DUP_SIZE((u_int32_t)offp[1]);
833					goto get_space;
834				}
835				if (is_key) {
836					*offp-- = (int32_t)key_off;
837					*offp-- = (int32_t)key_size;
838				}
839				if (is_dup) {
840					*offp-- = (int32_t)(
841					    ((inp[indx + 1] - HOFFSET(pg)) +
842					    dp - dbuf) + SSZA(HKEYDATA, data) +
843					    dup_off + sizeof(db_indx_t));
844					memcpy(&dup_len,
845					    HKEYDATA_DATA(hk) + dup_off,
846					    sizeof(db_indx_t));
847					dup_off += DUP_SIZE(dup_len);
848					*offp-- = dup_len;
849				} else {
850					*offp-- = (int32_t)(
851					    ((inp[indx + 1] - HOFFSET(pg)) +
852					    dp - dbuf) + SSZA(HKEYDATA, data));
853					*offp-- = LEN_HDATA(dbp, pg,
854					    pagesize, indx);
855				}
856			} while (is_dup && dup_off < dup_tlen && no_dup == 0);
857			F_CLR(cp, H_ISDUP);
858			break;
859		case H_OFFDUP:
860			memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
861			space -= 2 * sizeof(*offp);
862			if (space > data->ulen)
863				goto back_up;
864
865			if (is_key) {
866				space -= 2 * sizeof(*offp);
867				if (space > data->ulen)
868					goto back_up;
869				*offp-- = (int32_t)key_off;
870				*offp-- = (int32_t)key_size;
871			}
872			saveoff = offp;
873			if ((ret = __bam_bulk_duplicates(dbc,
874			    pgno, dbuf, is_key ? offp + 2 : NULL,
875			    &offp, &np, &space, no_dup)) != 0) {
876				if (ret == DB_BUFFER_SMALL) {
877					size = space;
878					space = 0;
879					if (is_key && saveoff == offp) {
880						offp += 2;
881						goto back_up;
882					}
883					goto get_space;
884				}
885				return (ret);
886			}
887			break;
888		case H_OFFPAGE:
889			space -= (is_key ? 4 : 2) * sizeof(*offp);
890			if (space > data->ulen)
891				goto back_up;
892
893			memcpy(&size, HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
894			memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
895			if (size > space)
896				goto back_up;
897
898			if ((ret =
899			    __bam_bulk_overflow(dbc, size, pgno, np)) != 0)
900				return (ret);
901
902			if (is_key) {
903				*offp-- = (int32_t)key_off;
904				*offp-- = (int32_t)key_size;
905			}
906
907			*offp-- = (int32_t)(np - dbuf);
908			*offp-- = (int32_t)size;
909
910			np += size;
911			space -= size;
912			break;
913		default:
914			/* Do nothing. */
915			break;
916		}
917	} while (next_key && (indx += 2) < NUM_ENT(pg));
918
919	cp->indx = indx;
920	cp->dup_len = dup_len;
921	cp->dup_off = dup_off;
922	cp->dup_tlen = dup_tlen;
923
924	/* If we are off the page then try to the next page. */
925	if (ret == 0 && next_key && indx >= NUM_ENT(pg)) {
926		if ((ret = __ham_item_next(dbc, lock_mode, &pgno)) == 0)
927			goto next_pg;
928		if (ret != DB_NOTFOUND)
929			return (ret);
930		if ((ret = __memp_fput(dbc->dbp->mpf,
931		    dbc->thread_info, cp->page, dbc->priority)) != 0)
932			return (ret);
933		cp->page = NULL;
934		if ((ret = __ham_get_meta(dbc)) != 0)
935			return (ret);
936
937		cp->bucket++;
938		if (cp->bucket > cp->hdr->max_bucket) {
939			/*
940			 * Restore cursor to its previous state.  We're past
941			 * the last item in the last bucket, so the next
942			 * DBC->get(DB_NEXT) will return DB_NOTFOUND.
943			 */
944			cp->bucket--;
945			ret = DB_NOTFOUND;
946		} else {
947			/*
948			 * Start on the next bucket.
949			 *
950			 * Note that if this new bucket happens to be empty,
951			 * but there's another non-empty bucket after it,
952			 * we'll return early.  This is a rare case, and we
953			 * don't guarantee any particular number of keys
954			 * returned on each call, so just let the next call
955			 * to bulk get move forward by yet another bucket.
956			 */
957			cp->pgno = BUCKET_TO_PAGE(cp, cp->bucket);
958			cp->indx = NDX_INVALID;
959			F_CLR(cp, H_ISDUP);
960			ret = __ham_item_next(dbc, lock_mode, &pgno);
961		}
962
963		if ((t_ret = __ham_release_meta(dbc)) != 0)
964			return (t_ret);
965		if (ret == 0)
966			goto next_pg;
967		if (ret != DB_NOTFOUND)
968			return (ret);
969	}
970	*offp = -1;
971	return (0);
972}
973
974static int
975__hamc_put(dbc, key, data, flags, pgnop)
976	DBC *dbc;
977	DBT *key;
978	DBT *data;
979	u_int32_t flags;
980	db_pgno_t *pgnop;
981{
982	DB *dbp;
983	DBT tmp_val, *myval;
984	DB_MPOOLFILE *mpf;
985	HASH_CURSOR *hcp;
986	u_int32_t nbytes;
987	int ret, t_ret;
988
989	/*
990	 * The compiler doesn't realize that we only use this when ret is
991	 * equal to 0 and that if ret is equal to 0, that we must have set
992	 * myval.  So, we initialize it here to shut the compiler up.
993	 */
994	COMPQUIET(myval, NULL);
995
996	dbp = dbc->dbp;
997	mpf = dbp->mpf;
998	hcp = (HASH_CURSOR *)dbc->internal;
999
1000	if (F_ISSET(hcp, H_DELETED) &&
1001	    flags != DB_KEYFIRST && flags != DB_KEYLAST)
1002		return (DB_NOTFOUND);
1003
1004	if ((ret = __ham_get_meta(dbc)) != 0)
1005		goto err1;
1006
1007	switch (flags) {
1008	case DB_KEYLAST:
1009	case DB_KEYFIRST:
1010	case DB_NODUPDATA:
1011	case DB_NOOVERWRITE:
1012		nbytes = (ISBIG(hcp, key->size) ? HOFFPAGE_PSIZE :
1013		    HKEYDATA_PSIZE(key->size)) +
1014		    (ISBIG(hcp, data->size) ? HOFFPAGE_PSIZE :
1015		    HKEYDATA_PSIZE(data->size));
1016		if ((ret = __ham_lookup(dbc,
1017		    key, nbytes, DB_LOCK_WRITE, pgnop)) == DB_NOTFOUND) {
1018			if (hcp->seek_found_page != PGNO_INVALID &&
1019			    hcp->seek_found_page != hcp->pgno) {
1020				if ((ret = __memp_fput(mpf, dbc->thread_info,
1021				    hcp->page, dbc->priority)) != 0)
1022					goto err2;
1023				hcp->page = NULL;
1024				hcp->pgno = hcp->seek_found_page;
1025				hcp->indx = NDX_INVALID;
1026			}
1027
1028			if (F_ISSET(data, DB_DBT_PARTIAL) && data->doff != 0) {
1029				/*
1030				 * A partial put, but the key does not exist
1031				 * and we are not beginning the write at 0.
1032				 * We must create a data item padded up to doff
1033				 * and then write the new bytes represented by
1034				 * val.
1035				 */
1036				if ((ret = __ham_init_dbt(dbp->env, &tmp_val,
1037				    data->size + data->doff,
1038				    &dbc->my_rdata.data,
1039				    &dbc->my_rdata.ulen)) != 0)
1040					goto err2;
1041
1042				memset(tmp_val.data, 0, data->doff);
1043				memcpy((u_int8_t *)tmp_val.data +
1044				    data->doff, data->data, data->size);
1045				myval = &tmp_val;
1046			} else
1047				myval = (DBT *)data;
1048
1049			ret = __ham_add_el(dbc, key, myval, H_KEYDATA);
1050			goto done;
1051		} else if (flags == DB_NOOVERWRITE &&
1052		    !F_ISSET(hcp, H_DELETED)) {
1053			if (*pgnop == PGNO_INVALID)
1054				ret = DB_KEYEXIST;
1055			else
1056				ret = __bam_opd_exists(dbc, *pgnop);
1057			if (ret != 0)
1058				goto done;
1059		}
1060		break;
1061	case DB_BEFORE:
1062	case DB_AFTER:
1063	case DB_CURRENT:
1064		ret = __ham_item(dbc, DB_LOCK_WRITE, pgnop);
1065		break;
1066	default:
1067		ret = __db_unknown_flag(dbp->env, "__hamc_put", flags);
1068		break;
1069	}
1070
1071	/*
1072	 * Invalidate any insert index found. So they are not reused
1073	 * in future inserts.
1074	 */
1075	hcp->seek_found_page = PGNO_INVALID;
1076	hcp->seek_found_indx = NDX_INVALID;
1077
1078	if (*pgnop == PGNO_INVALID && ret == 0) {
1079		if ((ret = __memp_dirty(mpf, &hcp->page,
1080		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1081			goto done;
1082		if (flags == DB_CURRENT ||
1083		    ((flags == DB_KEYFIRST ||
1084		    flags == DB_KEYLAST || flags == DB_NODUPDATA) &&
1085		    !(F_ISSET(dbp, DB_AM_DUP) || F_ISSET(key, DB_DBT_DUPOK))))
1086			ret = __ham_overwrite(dbc, data, flags);
1087		else
1088			ret = __ham_add_dup(dbc, data, flags, pgnop);
1089	}
1090
1091done:	if (hcp->page != NULL) {
1092		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
1093		    hcp->page, dbc->priority)) != 0 && ret == 0)
1094			ret = t_ret;
1095		if (t_ret == 0)
1096			hcp->page = NULL;
1097	}
1098
1099	if (ret == 0 && F_ISSET(hcp, H_EXPAND)) {
1100		ret = __ham_expand_table(dbc);
1101		F_CLR(hcp, H_EXPAND);
1102		/* If we are out of space, ignore the error. */
1103		if (ret == ENOSPC && dbc->txn == NULL)
1104			ret = 0;
1105	}
1106
1107err2:	if ((t_ret = __ham_release_meta(dbc)) != 0 && ret == 0)
1108		ret = t_ret;
1109
1110err1:	return (ret);
1111}
1112
1113/********************************* UTILITIES ************************/
1114
1115/*
1116 * __ham_expand_table --
1117 */
1118static int
1119__ham_expand_table(dbc)
1120	DBC *dbc;
1121{
1122	DB *dbp;
1123	DBMETA *mmeta;
1124	DB_LOCK metalock;
1125	DB_LSN lsn;
1126	DB_MPOOLFILE *mpf;
1127	HASH_CURSOR *hcp;
1128	PAGE *h;
1129	db_pgno_t pgno, mpgno;
1130	u_int32_t logn, newalloc, new_bucket, old_bucket;
1131	int got_meta, new_double, ret, t_ret;
1132
1133	dbp = dbc->dbp;
1134	mpf = dbp->mpf;
1135	hcp = (HASH_CURSOR *)dbc->internal;
1136	if ((ret = __ham_dirty_meta(dbc, 0)) != 0)
1137		return (ret);
1138
1139	LOCK_INIT(metalock);
1140	mmeta = (DBMETA *) hcp->hdr;
1141	mpgno = mmeta->pgno;
1142	h = NULL;
1143	newalloc = 0;
1144	got_meta = 0;
1145
1146	/*
1147	 * If the split point is about to increase, make sure that we
1148	 * have enough extra pages.  The calculation here is weird.
1149	 * We'd like to do this after we've upped max_bucket, but it's
1150	 * too late then because we've logged the meta-data split.  What
1151	 * we'll do between then and now is increment max bucket and then
1152	 * see what the log of one greater than that is; here we have to
1153	 * look at the log of max + 2.  VERY NASTY STUFF.
1154	 *
1155	 * We figure out what we need to do, then we log it, then request
1156	 * the pages from mpool.  We don't want to fail after extending
1157	 * the file.
1158	 *
1159	 * If the page we are about to split into has already been allocated,
1160	 * then we simply need to get it to get its LSN.  If it hasn't yet
1161	 * been allocated, then we know it's LSN (0,0).
1162	 */
1163
1164	new_bucket = hcp->hdr->max_bucket + 1;
1165	old_bucket = new_bucket & hcp->hdr->low_mask;
1166
1167	new_double = hcp->hdr->max_bucket == hcp->hdr->high_mask;
1168	logn = __db_log2(new_bucket);
1169
1170	if (!new_double || hcp->hdr->spares[logn + 1] != PGNO_INVALID) {
1171		/* Page exists; get it so we can get its LSN */
1172		pgno = BUCKET_TO_PAGE(hcp, new_bucket);
1173		if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info, dbc->txn,
1174		    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &h)) != 0)
1175			goto err;
1176		lsn = h->lsn;
1177	} else {
1178		/* Get the master meta-data page to do allocation. */
1179		if (F_ISSET(dbp, DB_AM_SUBDB)) {
1180			mpgno = PGNO_BASE_MD;
1181			if ((ret = __db_lget(dbc,
1182			   0, mpgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
1183				goto err;
1184			if ((ret = __memp_fget(mpf, &mpgno, dbc->thread_info,
1185			    dbc->txn, DB_MPOOL_DIRTY, &mmeta)) != 0)
1186				goto err;
1187			got_meta = 1;
1188		}
1189		pgno = mmeta->last_pgno + 1;
1190		ZERO_LSN(lsn);
1191		newalloc = 1;
1192	}
1193
1194	/* Log the meta-data split first. */
1195	if (DBC_LOGGING(dbc)) {
1196		/*
1197		 * We always log the page number of the first page of
1198		 * the allocation group.  However, the LSN that we log
1199		 * is either the LSN on the first page (if we did not
1200		 * do the actual allocation here) or the LSN on the last
1201		 * page of the unit (if we did do the allocation here).
1202		 */
1203		if ((ret = __ham_metagroup_log(dbp, dbc->txn,
1204		    &lsn, 0, hcp->hdr->max_bucket, mpgno, &mmeta->lsn,
1205		    hcp->hdr->dbmeta.pgno, &hcp->hdr->dbmeta.lsn,
1206		    pgno, &lsn, newalloc, mmeta->last_pgno)) != 0)
1207			goto err;
1208	} else
1209		LSN_NOT_LOGGED(lsn);
1210
1211	hcp->hdr->dbmeta.lsn = lsn;
1212
1213	if (new_double && hcp->hdr->spares[logn + 1] == PGNO_INVALID) {
1214		/*
1215		 * We need to begin a new doubling and we have not allocated
1216		 * any pages yet.  Read the last page in and initialize it to
1217		 * make the allocation contiguous.  The pgno we calculated
1218		 * above is the first page allocated. The entry in spares is
1219		 * that page number minus any buckets already allocated (it
1220		 * simplifies bucket to page transaction).  After we've set
1221		 * that, we calculate the last pgno.
1222		 */
1223
1224		pgno += hcp->hdr->max_bucket;
1225
1226		if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info, dbc->txn,
1227		    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &h)) != 0)
1228			goto err;
1229
1230		hcp->hdr->spares[logn + 1] =
1231		    (pgno - new_bucket) - hcp->hdr->max_bucket;
1232		mmeta->last_pgno = pgno;
1233		mmeta->lsn = lsn;
1234
1235		P_INIT(h, dbp->pgsize,
1236		    pgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH);
1237	}
1238
1239	/* Write out whatever page we ended up modifying. */
1240	h->lsn = lsn;
1241	if ((ret = __memp_fput(mpf, dbc->thread_info, h, dbc->priority)) != 0)
1242		goto err;
1243	h = NULL;
1244
1245	/*
1246	 * Update the meta-data page of this hash database.
1247	 */
1248	hcp->hdr->max_bucket = new_bucket;
1249	if (new_double) {
1250		hcp->hdr->low_mask = hcp->hdr->high_mask;
1251		hcp->hdr->high_mask = new_bucket | hcp->hdr->low_mask;
1252	}
1253
1254	/* Relocate records to the new bucket */
1255	ret = __ham_split_page(dbc, old_bucket, new_bucket);
1256
1257err:	if (got_meta)
1258		if ((t_ret = __memp_fput(mpf,
1259		    dbc->thread_info, mmeta, dbc->priority)) != 0 && ret == 0)
1260			ret = t_ret;
1261	if ((t_ret = __TLPUT(dbc, metalock)) != 0 && ret == 0)
1262			ret = t_ret;
1263	if (h != NULL)
1264		if ((t_ret = __memp_fput(mpf,
1265		    dbc->thread_info, h, dbc->priority)) != 0 && ret == 0)
1266			ret = t_ret;
1267
1268	return (ret);
1269}
1270
1271/*
1272 * PUBLIC: u_int32_t __ham_call_hash __P((DBC *, u_int8_t *, u_int32_t));
1273 */
1274u_int32_t
1275__ham_call_hash(dbc, k, len)
1276	DBC *dbc;
1277	u_int8_t *k;
1278	u_int32_t len;
1279{
1280	DB *dbp;
1281	HASH *hashp;
1282	HASH_CURSOR *hcp;
1283	u_int32_t n, bucket;
1284
1285	dbp = dbc->dbp;
1286	hcp = (HASH_CURSOR *)dbc->internal;
1287	hashp = dbp->h_internal;
1288
1289	n = (u_int32_t)(hashp->h_hash(dbp, k, len));
1290
1291	bucket = n & hcp->hdr->high_mask;
1292	if (bucket > hcp->hdr->max_bucket)
1293		bucket = bucket & hcp->hdr->low_mask;
1294	return (bucket);
1295}
1296
1297/*
1298 * Check for duplicates, and call __db_ret appropriately.  Release
1299 * everything held by the cursor.
1300 */
1301static int
1302__ham_dup_return(dbc, val, flags)
1303	DBC *dbc;
1304	DBT *val;
1305	u_int32_t flags;
1306{
1307	DB *dbp;
1308	DBT *myval, tmp_val;
1309	HASH_CURSOR *hcp;
1310	PAGE *pp;
1311	db_indx_t ndx;
1312	db_pgno_t pgno;
1313	u_int32_t off, tlen;
1314	u_int8_t *hk, type;
1315	int cmp, ret;
1316	db_indx_t len;
1317
1318	/* Check for duplicate and return the first one. */
1319	dbp = dbc->dbp;
1320	hcp = (HASH_CURSOR *)dbc->internal;
1321	ndx = H_DATAINDEX(hcp->indx);
1322	type = HPAGE_TYPE(dbp, hcp->page, ndx);
1323	pp = hcp->page;
1324	myval = val;
1325
1326	/*
1327	 * There are 4 cases:
1328	 * 1. We are not in duplicate, simply return; the upper layer
1329	 *    will do the right thing.
1330	 * 2. We are looking at keys and stumbled onto a duplicate.
1331	 * 3. We are in the middle of a duplicate set. (ISDUP set)
1332	 * 4. We need to check for particular data match.
1333	 */
1334
1335	/* We should never get here with off-page dups. */
1336	DB_ASSERT(dbp->env, type != H_OFFDUP);
1337
1338	/* Case 1 */
1339	if (type != H_DUPLICATE && flags != DB_GET_BOTH &&
1340	    flags != DB_GET_BOTHC && flags != DB_GET_BOTH_RANGE)
1341		return (0);
1342
1343	/*
1344	 * Here we check for the case where we just stumbled onto a
1345	 * duplicate.  In this case, we do initialization and then
1346	 * let the normal duplicate code handle it. (Case 2)
1347	 */
1348	if (!F_ISSET(hcp, H_ISDUP) && type == H_DUPLICATE) {
1349		F_SET(hcp, H_ISDUP);
1350		hcp->dup_tlen = LEN_HDATA(dbp, hcp->page,
1351		    hcp->hdr->dbmeta.pagesize, hcp->indx);
1352		hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1353		if (flags == DB_LAST ||
1354		    flags == DB_PREV || flags == DB_PREV_NODUP) {
1355			hcp->dup_off = 0;
1356			do {
1357				memcpy(&len,
1358				    HKEYDATA_DATA(hk) + hcp->dup_off,
1359				    sizeof(db_indx_t));
1360				hcp->dup_off += DUP_SIZE(len);
1361			} while (hcp->dup_off < hcp->dup_tlen);
1362			hcp->dup_off -= DUP_SIZE(len);
1363		} else {
1364			memcpy(&len,
1365			    HKEYDATA_DATA(hk), sizeof(db_indx_t));
1366			hcp->dup_off = 0;
1367		}
1368		hcp->dup_len = len;
1369	}
1370
1371	/*
1372	 * If we are retrieving a specific key/data pair, then we
1373	 * may need to adjust the cursor before returning data.
1374	 * Case 4
1375	 */
1376	if (flags == DB_GET_BOTH ||
1377	    flags == DB_GET_BOTHC || flags == DB_GET_BOTH_RANGE) {
1378		if (F_ISSET(hcp, H_ISDUP)) {
1379			/*
1380			 * If we're doing a join, search forward from the
1381			 * current position, not the beginning of the dup set.
1382			 */
1383			if (flags == DB_GET_BOTHC)
1384				F_SET(hcp, H_CONTINUE);
1385
1386			__ham_dsearch(dbc, val, &off, &cmp, flags);
1387
1388			/*
1389			 * This flag is set nowhere else and is safe to
1390			 * clear unconditionally.
1391			 */
1392			F_CLR(hcp, H_CONTINUE);
1393			hcp->dup_off = off;
1394		} else {
1395			hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1396			if (((HKEYDATA *)hk)->type == H_OFFPAGE) {
1397				memcpy(&tlen,
1398				    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
1399				memcpy(&pgno,
1400				    HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
1401				if ((ret = __db_moff(dbp, dbc->thread_info,
1402				    dbc->txn, val,
1403				    pgno, tlen, dbp->dup_compare, &cmp)) != 0)
1404					return (ret);
1405			} else {
1406				/*
1407				 * We do not zero tmp_val since the comparison
1408				 * routines may only look at data and size.
1409				 */
1410				tmp_val.data = HKEYDATA_DATA(hk);
1411				tmp_val.size = LEN_HDATA(dbp, hcp->page,
1412				    dbp->pgsize, hcp->indx);
1413				cmp = dbp->dup_compare == NULL ?
1414				    __bam_defcmp(dbp, &tmp_val, val) :
1415				    dbp->dup_compare(dbp, &tmp_val, val);
1416			}
1417		}
1418
1419		if (cmp != 0)
1420			return (DB_NOTFOUND);
1421	}
1422
1423	/*
1424	 * If we've already got the data for this value, or we're doing a bulk
1425	 * get, we don't want to return the data.
1426	 */
1427	if (F_ISSET(dbc, DBC_MULTIPLE | DBC_MULTIPLE_KEY) ||
1428	    F_ISSET(val, DB_DBT_ISSET))
1429		return (0);
1430
1431	/*
1432	 * Now, everything is initialized, grab a duplicate if
1433	 * necessary.
1434	 */
1435	if (F_ISSET(hcp, H_ISDUP)) {	/* Case 3 */
1436		/*
1437		 * Copy the DBT in case we are retrieving into user
1438		 * memory and we need the parameters for it.  If the
1439		 * user requested a partial, then we need to adjust
1440		 * the user's parameters to get the partial of the
1441		 * duplicate which is itself a partial.
1442		 */
1443		memcpy(&tmp_val, val, sizeof(*val));
1444
1445		if (F_ISSET(&tmp_val, DB_DBT_PARTIAL)) {
1446			/*
1447			 * Take the user's length unless it would go
1448			 * beyond the end of the duplicate.
1449			 */
1450			if (tmp_val.doff > hcp->dup_len)
1451				tmp_val.dlen = 0;
1452			else if (tmp_val.dlen + tmp_val.doff > hcp->dup_len)
1453				tmp_val.dlen = hcp->dup_len - tmp_val.doff;
1454
1455		} else {
1456			F_SET(&tmp_val, DB_DBT_PARTIAL);
1457			tmp_val.dlen = hcp->dup_len;
1458			tmp_val.doff = 0;
1459		}
1460
1461		/*
1462		 * Set offset to the appropriate place within the
1463		 * current duplicate -- need to take into account
1464		 * both the dup_off and the current duplicate's
1465		 * length.
1466		 */
1467		tmp_val.doff += hcp->dup_off + sizeof(db_indx_t);
1468
1469		myval = &tmp_val;
1470	}
1471
1472	/*
1473	 * Finally, if we had a duplicate, pp, ndx, and myval should be
1474	 * set appropriately.
1475	 */
1476	if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn,
1477	    pp, ndx, myval, &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
1478		return (ret);
1479
1480	/*
1481	 * In case we sent a temporary off to db_ret, set the real
1482	 * return values.
1483	 */
1484	val->data = myval->data;
1485	val->size = myval->size;
1486
1487	F_SET(val, DB_DBT_ISSET);
1488
1489	return (0);
1490}
1491
1492static int
1493__ham_overwrite(dbc, nval, flags)
1494	DBC *dbc;
1495	DBT *nval;
1496	u_int32_t flags;
1497{
1498	DB *dbp;
1499	DBT *myval, tmp_val, tmp_val2;
1500	ENV *env;
1501	HASH_CURSOR *hcp;
1502	void *newrec;
1503	u_int8_t *hk, *p;
1504	u_int32_t len, nondup_size;
1505	db_indx_t newsize;
1506	int ret;
1507
1508	dbp = dbc->dbp;
1509	env = dbp->env;
1510	hcp = (HASH_CURSOR *)dbc->internal;
1511	if (F_ISSET(hcp, H_ISDUP)) {
1512		/*
1513		 * This is an overwrite of a duplicate. We should never
1514		 * be off-page at this point.
1515		 */
1516		DB_ASSERT(env, hcp->opd == NULL);
1517		/* On page dups */
1518		if (F_ISSET(nval, DB_DBT_PARTIAL)) {
1519			/*
1520			 * We're going to have to get the current item, then
1521			 * construct the record, do any padding and do a
1522			 * replace.
1523			 */
1524			memset(&tmp_val, 0, sizeof(tmp_val));
1525			if ((ret =
1526			    __ham_dup_return(dbc, &tmp_val, DB_CURRENT)) != 0)
1527				return (ret);
1528
1529			/* Figure out new size. */
1530			nondup_size = tmp_val.size;
1531			newsize = nondup_size;
1532
1533			/*
1534			 * Three cases:
1535			 * 1. strictly append (may need to allocate space
1536			 *	for pad bytes; really gross).
1537			 * 2. overwrite some and append.
1538			 * 3. strictly overwrite.
1539			 */
1540			if (nval->doff > nondup_size)
1541				newsize +=
1542				    ((nval->doff - nondup_size) + nval->size);
1543			else if (nval->doff + nval->dlen > nondup_size)
1544				newsize += nval->size -
1545				    (nondup_size - nval->doff);
1546			else
1547				newsize += nval->size - nval->dlen;
1548
1549			/*
1550			 * Make sure that the new size doesn't put us over
1551			 * the onpage duplicate size in which case we need
1552			 * to convert to off-page duplicates.
1553			 */
1554			if (ISBIG(hcp,
1555			    (hcp->dup_tlen - nondup_size) + newsize)) {
1556				if ((ret = __ham_dup_convert(dbc)) != 0)
1557					return (ret);
1558				return (hcp->opd->am_put(hcp->opd,
1559				    NULL, nval, flags, NULL));
1560			}
1561
1562			if ((ret = __os_malloc(dbp->env,
1563			    DUP_SIZE(newsize), &newrec)) != 0)
1564				return (ret);
1565			memset(&tmp_val2, 0, sizeof(tmp_val2));
1566			F_SET(&tmp_val2, DB_DBT_PARTIAL);
1567
1568			/* Construct the record. */
1569			p = newrec;
1570			/* Initial size. */
1571			memcpy(p, &newsize, sizeof(db_indx_t));
1572			p += sizeof(db_indx_t);
1573
1574			/* First part of original record. */
1575			len = nval->doff > tmp_val.size
1576			    ? tmp_val.size : nval->doff;
1577			memcpy(p, tmp_val.data, len);
1578			p += len;
1579
1580			if (nval->doff > tmp_val.size) {
1581				/* Padding */
1582				memset(p, 0, nval->doff - tmp_val.size);
1583				p += nval->doff - tmp_val.size;
1584			}
1585
1586			/* New bytes */
1587			memcpy(p, nval->data, nval->size);
1588			p += nval->size;
1589
1590			/* End of original record (if there is any) */
1591			if (nval->doff + nval->dlen < tmp_val.size) {
1592				len = (tmp_val.size - nval->doff) - nval->dlen;
1593				memcpy(p, (u_int8_t *)tmp_val.data +
1594				    nval->doff + nval->dlen, len);
1595				p += len;
1596			}
1597
1598			/* Final size. */
1599			memcpy(p, &newsize, sizeof(db_indx_t));
1600
1601			/*
1602			 * Make sure that the caller isn't corrupting
1603			 * the sort order.
1604			 */
1605			if (dbp->dup_compare != NULL) {
1606				tmp_val2.data =
1607				    (u_int8_t *)newrec + sizeof(db_indx_t);
1608				tmp_val2.size = newsize;
1609				if (dbp->dup_compare(
1610				    dbp, &tmp_val, &tmp_val2) != 0) {
1611					__os_free(env, newrec);
1612					return (__db_duperr(dbp, flags));
1613				}
1614			}
1615
1616			tmp_val2.data = newrec;
1617			tmp_val2.size = DUP_SIZE(newsize);
1618			tmp_val2.doff = hcp->dup_off;
1619			tmp_val2.dlen = DUP_SIZE(hcp->dup_len);
1620
1621			ret = __ham_replpair(dbc, &tmp_val2, 0);
1622			__os_free(env, newrec);
1623
1624			/* Update cursor */
1625			if (ret != 0)
1626				return (ret);
1627
1628			if (newsize > nondup_size) {
1629				if ((ret = __hamc_update(dbc,
1630				    (newsize - nondup_size),
1631				    DB_HAM_CURADJ_ADDMOD, 1)) != 0)
1632					return (ret);
1633				hcp->dup_tlen += (newsize - nondup_size);
1634			} else {
1635				if ((ret = __hamc_update(dbc,
1636				    (nondup_size - newsize),
1637				    DB_HAM_CURADJ_DELMOD, 1)) != 0)
1638					return (ret);
1639				hcp->dup_tlen -= (nondup_size - newsize);
1640			}
1641			hcp->dup_len = newsize;
1642			return (0);
1643		} else {
1644			/* Check whether we need to convert to off page. */
1645			if (ISBIG(hcp,
1646			    (hcp->dup_tlen - hcp->dup_len) + nval->size)) {
1647				if ((ret = __ham_dup_convert(dbc)) != 0)
1648					return (ret);
1649				return (hcp->opd->am_put(hcp->opd,
1650				    NULL, nval, flags, NULL));
1651			}
1652
1653			/* Make sure we maintain sort order. */
1654			if (dbp->dup_compare != NULL) {
1655				tmp_val2.data =
1656				    HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page,
1657				    hcp->indx)) + hcp->dup_off +
1658				    sizeof(db_indx_t);
1659				tmp_val2.size = hcp->dup_len;
1660				if (dbp->dup_compare(
1661				    dbp, nval, &tmp_val2) != 0) {
1662					__db_errx(env,
1663			"Existing data sorts differently from put data");
1664					return (EINVAL);
1665				}
1666			}
1667			/* Overwriting a complete duplicate. */
1668			if ((ret =
1669			    __ham_make_dup(dbp->env, nval, &tmp_val,
1670			    &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0)
1671				return (ret);
1672			/* Now fix what we are replacing. */
1673			tmp_val.doff = hcp->dup_off;
1674			tmp_val.dlen = DUP_SIZE(hcp->dup_len);
1675
1676			/* Update cursor */
1677			if (nval->size > hcp->dup_len) {
1678				if ((ret = __hamc_update(dbc,
1679				    (nval->size - hcp->dup_len),
1680				    DB_HAM_CURADJ_ADDMOD, 1)) != 0)
1681					return (ret);
1682				hcp->dup_tlen += (nval->size - hcp->dup_len);
1683			} else {
1684				if ((ret = __hamc_update(dbc,
1685				    (hcp->dup_len - nval->size),
1686				    DB_HAM_CURADJ_DELMOD, 1)) != 0)
1687					return (ret);
1688				hcp->dup_tlen -= (hcp->dup_len - nval->size);
1689			}
1690			hcp->dup_len = (db_indx_t)nval->size;
1691		}
1692		myval = &tmp_val;
1693	} else if (!F_ISSET(nval, DB_DBT_PARTIAL)) {
1694		/* Put/overwrite */
1695		memcpy(&tmp_val, nval, sizeof(*nval));
1696		F_SET(&tmp_val, DB_DBT_PARTIAL);
1697		tmp_val.doff = 0;
1698		hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1699		if (HPAGE_PTYPE(hk) == H_OFFPAGE)
1700			memcpy(&tmp_val.dlen,
1701			    HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
1702		else
1703			tmp_val.dlen = LEN_HDATA(dbp, hcp->page,
1704			    hcp->hdr->dbmeta.pagesize, hcp->indx);
1705		myval = &tmp_val;
1706	} else
1707		/* Regular partial put */
1708		myval = nval;
1709
1710	return (__ham_replpair(dbc, myval, 0));
1711}
1712
1713/*
1714 * Given a key and a cursor, sets the cursor to the page/ndx on which
1715 * the key resides.  If the key is found, the cursor H_OK flag is set
1716 * and the pagep, bndx, pgno (dpagep, dndx, dpgno) fields are set.
1717 * If the key is not found, the H_OK flag is not set.  If the sought
1718 * field is non-0, the pagep, bndx, pgno (dpagep, dndx, dpgno) fields
1719 * are set indicating where an add might take place.  If it is 0,
1720 * none of the cursor pointer field are valid.
1721 */
1722static int
1723__ham_lookup(dbc, key, sought, mode, pgnop)
1724	DBC *dbc;
1725	const DBT *key;
1726	u_int32_t sought;
1727	db_lockmode_t mode;
1728	db_pgno_t *pgnop;
1729{
1730	DB *dbp;
1731	HASH_CURSOR *hcp;
1732	db_pgno_t next_pgno;
1733	int match, ret;
1734	u_int8_t *dk;
1735
1736	dbp = dbc->dbp;
1737	hcp = (HASH_CURSOR *)dbc->internal;
1738
1739	/*
1740	 * Set up cursor so that we're looking for space to add an item
1741	 * as we cycle through the pages looking for the key.
1742	 */
1743	if ((ret = __ham_item_reset(dbc)) != 0)
1744		return (ret);
1745	hcp->seek_size = sought;
1746
1747	hcp->bucket = __ham_call_hash(dbc, (u_int8_t *)key->data, key->size);
1748	hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
1749	/* look though all pages in the bucket for the key */
1750	if ((ret = __ham_get_cpage(dbc, mode)) != 0)
1751		return (ret);
1752
1753	*pgnop = PGNO_INVALID;
1754	if (hcp->indx == NDX_INVALID) {
1755		hcp->indx = 0;
1756		F_CLR(hcp, H_ISDUP);
1757	}
1758	while (hcp->pgno != PGNO_INVALID) {
1759		/* Are we looking for space to insert an item. */
1760		if (hcp->seek_size != 0 &&
1761		    hcp->seek_found_page == PGNO_INVALID &&
1762		    hcp->seek_size < P_FREESPACE(dbp, hcp->page)) {
1763			hcp->seek_found_page = hcp->pgno;
1764			hcp->seek_found_indx = NDX_INVALID;
1765		    }
1766
1767		if ((ret = __ham_getindex(dbc, hcp->page, key,
1768		    H_KEYDATA, &match, &hcp->indx)) != 0)
1769			return (ret);
1770
1771		/*
1772		 * If this is the first page in the bucket with space for
1773		 * inserting the requested item. Store the insert index to
1774		 * save having to look it up again later.
1775		 */
1776		if (hcp->seek_found_page == hcp->pgno)
1777		    hcp->seek_found_indx = hcp->indx;
1778
1779		if (match == 0) {
1780			F_SET(hcp, H_OK);
1781			dk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1782			if (HPAGE_PTYPE(dk) == H_OFFDUP)
1783				memcpy(pgnop, HOFFDUP_PGNO(dk),
1784				    sizeof(db_pgno_t));
1785			return (0);
1786		}
1787
1788		/* move the cursor to the next page. */
1789		if (NEXT_PGNO(hcp->page) == PGNO_INVALID)
1790			break;
1791		next_pgno = NEXT_PGNO(hcp->page);
1792		hcp->indx = 0;
1793		if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0)
1794			return (ret);
1795	}
1796	F_SET(hcp, H_NOMORE);
1797	return (DB_NOTFOUND);
1798}
1799
1800/*
1801 * __ham_init_dbt --
1802 *	Initialize a dbt using some possibly already allocated storage
1803 *	for items.
1804 *
1805 * PUBLIC: int __ham_init_dbt __P((ENV *,
1806 * PUBLIC:     DBT *, u_int32_t, void **, u_int32_t *));
1807 */
1808int
1809__ham_init_dbt(env, dbt, size, bufp, sizep)
1810	ENV *env;
1811	DBT *dbt;
1812	u_int32_t size;
1813	void **bufp;
1814	u_int32_t *sizep;
1815{
1816	int ret;
1817
1818	memset(dbt, 0, sizeof(*dbt));
1819	if (*sizep < size) {
1820		if ((ret = __os_realloc(env, size, bufp)) != 0) {
1821			*sizep = 0;
1822			return (ret);
1823		}
1824		*sizep = size;
1825	}
1826	dbt->data = *bufp;
1827	dbt->size = size;
1828	return (0);
1829}
1830
1831/*
1832 * Adjust the cursor after an insert or delete.  The cursor passed is
1833 * the one that was operated upon; we just need to check any of the
1834 * others.
1835 *
1836 * len indicates the length of the item added/deleted
1837 * add indicates if the item indicated by the cursor has just been
1838 * added (add == 1) or deleted (add == 0).
1839 * dup indicates if the addition occurred into a duplicate set.
1840 *
1841 * PUBLIC: int __hamc_update
1842 * PUBLIC:    __P((DBC *, u_int32_t, db_ham_curadj, int));
1843 */
1844int
1845__hamc_update(dbc, len, operation, is_dup)
1846	DBC *dbc;
1847	u_int32_t len;
1848	db_ham_curadj operation;
1849	int is_dup;
1850{
1851	DB *dbp, *ldbp;
1852	DBC *cp;
1853	DB_LSN lsn;
1854	DB_TXN *my_txn;
1855	ENV *env;
1856	HASH_CURSOR *hcp, *lcp;
1857	int found, ret, was_mod, was_add;
1858	u_int32_t order;
1859
1860	dbp = dbc->dbp;
1861	env = dbp->env;
1862	hcp = (HASH_CURSOR *)dbc->internal;
1863
1864	/*
1865	 * Adjustment will only be logged if this is a subtransaction.
1866	 * Only subtransactions can abort and effect their parent
1867	 * transactions cursors.
1868	 */
1869
1870	my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
1871	found = 0;
1872
1873	MUTEX_LOCK(env, env->mtx_dblist);
1874
1875	switch (operation) {
1876	case DB_HAM_CURADJ_DEL:
1877		was_mod = 0;
1878		was_add = 0;
1879		break;
1880	case DB_HAM_CURADJ_ADD:
1881		was_mod = 0;
1882		was_add = 1;
1883		break;
1884	case DB_HAM_CURADJ_DELMOD:
1885		was_mod = 1;
1886		was_add = 0;
1887		break;
1888	case DB_HAM_CURADJ_ADDMOD:
1889		was_mod = 1;
1890		was_add = 1;
1891		break;
1892	default:
1893		return (EINVAL);
1894	}
1895
1896	/*
1897	 * Calculate the order of this deleted record.
1898	 * This will be one greater than any cursor that is pointing
1899	 * at this record and already marked as deleted.
1900	 */
1901	order = 0;
1902	if (was_add == 0) {
1903		FIND_FIRST_DB_MATCH(env, dbp, ldbp);
1904		for (order = 1;
1905		    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
1906		    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
1907			MUTEX_LOCK(env, dbp->mutex);
1908			TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
1909				if (cp == dbc || cp->dbtype != DB_HASH)
1910					continue;
1911				lcp = (HASH_CURSOR *)cp->internal;
1912				if (F_ISSET(lcp, H_DELETED) &&
1913				    hcp->pgno == lcp->pgno &&
1914				    hcp->indx == lcp->indx &&
1915				    order <= lcp->order &&
1916				    (!is_dup || hcp->dup_off == lcp->dup_off) &&
1917				    !MVCC_SKIP_CURADJ(cp, lcp->pgno))
1918					order = lcp->order + 1;
1919			}
1920			MUTEX_UNLOCK(env, dbp->mutex);
1921		}
1922		hcp->order = order;
1923	}
1924
1925	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
1926	for (;
1927	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
1928	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
1929		MUTEX_LOCK(env, dbp->mutex);
1930		TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
1931			if (cp == dbc || cp->dbtype != DB_HASH)
1932				continue;
1933
1934			lcp = (HASH_CURSOR *)cp->internal;
1935
1936			if (lcp->pgno != hcp->pgno ||
1937			    lcp->indx == NDX_INVALID ||
1938			    MVCC_SKIP_CURADJ(cp, lcp->pgno))
1939				continue;
1940
1941			if (my_txn != NULL && cp->txn != my_txn)
1942				found = 1;
1943
1944			if (!is_dup) {
1945				if (was_add == 1) {
1946					/*
1947					 * This routine is not called to add
1948					 * non-dup records which are always put
1949					 * at the end.  It is only called from
1950					 * recovery in this case and the
1951					 * cursor will be marked deleted.
1952					 * We are "undeleting" so unmark all
1953					 * cursors with the same order.
1954					 */
1955					if (lcp->indx == hcp->indx &&
1956					    F_ISSET(lcp, H_DELETED)) {
1957						if (lcp->order == hcp->order)
1958							F_CLR(lcp, H_DELETED);
1959						else if (lcp->order >
1960						    hcp->order) {
1961
1962						/*
1963						 * If we've moved this cursor's
1964						 * index, split its order
1965						 * number--i.e., decrement it by
1966						 * enough so that the lowest
1967						 * cursor moved has order 1.
1968						 * cp_arg->order is the split
1969						 * point, so decrement by it.
1970						 */
1971							lcp->order -=
1972							    hcp->order;
1973							lcp->indx += 2;
1974						}
1975					} else if (lcp->indx >= hcp->indx)
1976						lcp->indx += 2;
1977				} else {
1978					if (lcp->indx > hcp->indx) {
1979						lcp->indx -= 2;
1980						if (lcp->indx == hcp->indx &&
1981						    F_ISSET(lcp, H_DELETED))
1982							lcp->order += order;
1983					} else if (lcp->indx == hcp->indx &&
1984					    !F_ISSET(lcp, H_DELETED)) {
1985						F_SET(lcp, H_DELETED);
1986						F_CLR(lcp, H_ISDUP);
1987						lcp->order = order;
1988					}
1989				}
1990			} else if (lcp->indx == hcp->indx) {
1991				/*
1992				 * Handle duplicates.  This routine is only
1993				 * called for on page dups. Off page dups are
1994				 * handled by btree/rtree code.
1995				 */
1996				if (was_add == 1) {
1997					lcp->dup_tlen += len;
1998					if (lcp->dup_off == hcp->dup_off &&
1999					    F_ISSET(hcp, H_DELETED) &&
2000					    F_ISSET(lcp, H_DELETED)) {
2001						/* Abort of a delete. */
2002						if (lcp->order == hcp->order)
2003							F_CLR(lcp, H_DELETED);
2004						else if (lcp->order >
2005						    hcp->order) {
2006							lcp->order -=
2007							    (hcp->order -1);
2008							lcp->dup_off += len;
2009						}
2010					} else if (lcp->dup_off >
2011					    hcp->dup_off || (!was_mod &&
2012					    lcp->dup_off == hcp->dup_off))
2013						lcp->dup_off += len;
2014				} else {
2015					lcp->dup_tlen -= len;
2016					if (lcp->dup_off > hcp->dup_off) {
2017						lcp->dup_off -= len;
2018						if (lcp->dup_off ==
2019						    hcp->dup_off &&
2020						    F_ISSET(lcp, H_DELETED))
2021							lcp->order += order;
2022					} else if (!was_mod &&
2023					    lcp->dup_off == hcp->dup_off &&
2024					    !F_ISSET(lcp, H_DELETED)) {
2025						F_SET(lcp, H_DELETED);
2026						lcp->order = order;
2027					}
2028				}
2029			}
2030		}
2031		MUTEX_UNLOCK(env, dbp->mutex);
2032	}
2033	MUTEX_UNLOCK(env, env->mtx_dblist);
2034
2035	if (found != 0 && DBC_LOGGING(dbc)) {
2036		if ((ret = __ham_curadj_log(dbp, my_txn, &lsn, 0, hcp->pgno,
2037		    hcp->indx, len, hcp->dup_off, (int)operation, is_dup,
2038		    order)) != 0)
2039			return (ret);
2040	}
2041
2042	return (0);
2043}
2044
2045/*
2046 * __ham_get_clist --
2047 *
2048 * Get a list of cursors either on a particular bucket or on a particular
2049 * page and index combination.  The former is so that we can update
2050 * cursors on a split.  The latter is so we can update cursors when we
2051 * move items off page.
2052 *
2053 * PUBLIC: int __ham_get_clist __P((DB *, db_pgno_t, u_int32_t, DBC ***));
2054 */
2055int
2056__ham_get_clist(dbp, pgno, indx, listp)
2057	DB *dbp;
2058	db_pgno_t pgno;
2059	u_int32_t indx;
2060	DBC ***listp;
2061{
2062	DB *ldbp;
2063	DBC *cp;
2064	ENV *env;
2065	u_int nalloc, nused;
2066	int ret;
2067
2068	*listp = NULL;
2069	env = dbp->env;
2070	nalloc = nused = 0;
2071
2072	/*
2073	 * Assume that finding anything is the exception, so optimize for
2074	 * the case where there aren't any.
2075	 */
2076	MUTEX_LOCK(env, env->mtx_dblist);
2077	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
2078	for (;
2079	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
2080	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
2081		MUTEX_LOCK(env, dbp->mutex);
2082		TAILQ_FOREACH(cp, &ldbp->active_queue, links)
2083			/*
2084			 * We match if cp->pgno matches the specified
2085			 * pgno, and if either the cp->indx matches
2086			 * or we weren't given an index.
2087			 */
2088			if (cp->internal->pgno == pgno &&
2089			    (indx == NDX_INVALID ||
2090			    cp->internal->indx == indx) &&
2091			    !MVCC_SKIP_CURADJ(cp, pgno)) {
2092				if (nused >= nalloc) {
2093					nalloc += 10;
2094					if ((ret = __os_realloc(dbp->env,
2095					    nalloc * sizeof(HASH_CURSOR *),
2096					    listp)) != 0)
2097						goto err;
2098				}
2099				(*listp)[nused++] = cp;
2100			}
2101
2102		MUTEX_UNLOCK(dbp->env, dbp->mutex);
2103	}
2104	MUTEX_UNLOCK(env, env->mtx_dblist);
2105
2106	if (listp != NULL) {
2107		if (nused >= nalloc) {
2108			nalloc++;
2109			if ((ret = __os_realloc(dbp->env,
2110			    nalloc * sizeof(HASH_CURSOR *), listp)) != 0)
2111				return (ret);
2112		}
2113		(*listp)[nused] = NULL;
2114	}
2115	return (0);
2116err:
2117	MUTEX_UNLOCK(dbp->env, dbp->mutex);
2118	MUTEX_UNLOCK(env, env->mtx_dblist);
2119	return (ret);
2120}
2121
2122static int
2123__hamc_writelock(dbc)
2124	DBC *dbc;
2125{
2126	DB_LOCK tmp_lock;
2127	HASH_CURSOR *hcp;
2128	int ret;
2129
2130	/*
2131	 * All we need do is acquire the lock and let the off-page
2132	 * dup tree do its thing.
2133	 */
2134	if (!STD_LOCKING(dbc))
2135		return (0);
2136
2137	hcp = (HASH_CURSOR *)dbc->internal;
2138	ret = 0;
2139	if ((!LOCK_ISSET(hcp->lock) || hcp->lock_mode != DB_LOCK_WRITE)) {
2140		tmp_lock = hcp->lock;
2141		if ((ret = __ham_lock_bucket(dbc, DB_LOCK_WRITE)) == 0 &&
2142		    tmp_lock.mode != DB_LOCK_WWRITE)
2143			ret = __LPUT(dbc, tmp_lock);
2144	}
2145	return (ret);
2146}
2147