1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 */
6/*
7 * Copyright (c) 1990, 1993, 1994
8 *	Margo Seltzer.  All rights reserved.
9 */
10/*
11 * Copyright (c) 1990, 1993, 1994
12 *	The Regents of the University of California.  All rights reserved.
13 *
14 * This code is derived from software contributed to Berkeley by
15 * Margo Seltzer.
16 *
17 * Redistribution and use in source and binary forms, with or without
18 * modification, are permitted provided that the following conditions
19 * are met:
20 * 1. Redistributions of source code must retain the above copyright
21 *    notice, this list of conditions and the following disclaimer.
22 * 2. Redistributions in binary form must reproduce the above copyright
23 *    notice, this list of conditions and the following disclaimer in the
24 *    documentation and/or other materials provided with the distribution.
25 * 3. Neither the name of the University nor the names of its contributors
26 *    may be used to endorse or promote products derived from this software
27 *    without specific prior written permission.
28 *
29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
32 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * SUCH DAMAGE.
40 *
41 * $Id: hash_page.c,v 12.53 2008/05/07 12:27:34 bschmeck Exp $
42 */
43
44/*
45 * PACKAGE:  hashing
46 *
47 * DESCRIPTION:
48 *	Page manipulation for hashing package.
49 */
50
51#include "db_config.h"
52
53#include "db_int.h"
54#include "dbinc/db_page.h"
55#include "dbinc/hash.h"
56#include "dbinc/lock.h"
57#include "dbinc/mp.h"
58
59static int __hamc_delpg
60    __P((DBC *, db_pgno_t, db_pgno_t, u_int32_t, db_ham_mode, u_int32_t *));
61static int __ham_getindex_sorted
62    __P((DBC *, PAGE *, const DBT *, int, int *, db_indx_t *));
63static int __ham_getindex_unsorted
64    __P((DBC *, PAGE *, const DBT *, int *, db_indx_t *));
65static int __ham_sort_page_cursor __P((DBC *, PAGE *));
66
67/*
68 * PUBLIC: int __ham_item __P((DBC *, db_lockmode_t, db_pgno_t *));
69 */
70int
71__ham_item(dbc, mode, pgnop)
72	DBC *dbc;
73	db_lockmode_t mode;
74	db_pgno_t *pgnop;
75{
76	DB *dbp;
77	HASH_CURSOR *hcp;
78	db_pgno_t next_pgno;
79	int ret;
80
81	dbp = dbc->dbp;
82	hcp = (HASH_CURSOR *)dbc->internal;
83
84	if (F_ISSET(hcp, H_DELETED)) {
85		__db_errx(dbp->env, "Attempt to return a deleted item");
86		return (EINVAL);
87	}
88	F_CLR(hcp, H_OK | H_NOMORE);
89
90	/* Check if we need to get a page for this cursor. */
91	if ((ret = __ham_get_cpage(dbc, mode)) != 0)
92		return (ret);
93
94recheck:
95	/* Check if we are looking for space in which to insert an item. */
96	if (hcp->seek_size != 0 && hcp->seek_found_page == PGNO_INVALID &&
97	    hcp->seek_size < P_FREESPACE(dbp, hcp->page)) {
98		hcp->seek_found_page = hcp->pgno;
99		hcp->seek_found_indx = NDX_INVALID;
100	}
101
102	/* Check for off-page duplicates. */
103	if (hcp->indx < NUM_ENT(hcp->page) &&
104	    HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP) {
105		memcpy(pgnop,
106		    HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
107		    sizeof(db_pgno_t));
108		F_SET(hcp, H_OK);
109		return (0);
110	}
111
112	/* Check if we need to go on to the next page. */
113	if (F_ISSET(hcp, H_ISDUP))
114		/*
115		 * ISDUP is set, and offset is at the beginning of the datum.
116		 * We need to grab the length of the datum, then set the datum
117		 * pointer to be the beginning of the datum.
118		 */
119		memcpy(&hcp->dup_len,
120		    HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) +
121		    hcp->dup_off, sizeof(db_indx_t));
122
123	if (hcp->indx >= (db_indx_t)NUM_ENT(hcp->page)) {
124		/* Fetch next page. */
125		if (NEXT_PGNO(hcp->page) == PGNO_INVALID) {
126			F_SET(hcp, H_NOMORE);
127			return (DB_NOTFOUND);
128		}
129		next_pgno = NEXT_PGNO(hcp->page);
130		hcp->indx = 0;
131		if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0)
132			return (ret);
133		goto recheck;
134	}
135
136	F_SET(hcp, H_OK);
137	return (0);
138}
139
140/*
141 * PUBLIC: int __ham_item_reset __P((DBC *));
142 */
143int
144__ham_item_reset(dbc)
145	DBC *dbc;
146{
147	DB *dbp;
148	DB_MPOOLFILE *mpf;
149	HASH_CURSOR *hcp;
150	int ret, t_ret;
151
152	dbp = dbc->dbp;
153	mpf = dbp->mpf;
154	hcp = (HASH_CURSOR *)dbc->internal;
155
156	ret = 0;
157	if (hcp->page != NULL)
158		ret = __memp_fput(mpf,
159		    dbc->thread_info, hcp->page, dbc->priority);
160
161	if ((t_ret = __ham_item_init(dbc)) != 0 && ret == 0)
162		ret = t_ret;
163
164	return (ret);
165}
166
167/*
168 * PUBLIC: int __ham_item_init __P((DBC *));
169 */
170int
171__ham_item_init(dbc)
172	DBC *dbc;
173{
174	HASH_CURSOR *hcp;
175	int ret;
176
177	hcp = (HASH_CURSOR *)dbc->internal;
178
179	/*
180	 * If this cursor still holds any locks, we must release them if
181	 * we are not running with transactions.
182	 */
183	ret = __TLPUT(dbc, hcp->lock);
184
185	/*
186	 * The following fields must *not* be initialized here because they
187	 * may have meaning across inits.
188	 *	hlock, hdr, split_buf, stats
189	 */
190	hcp->bucket = BUCKET_INVALID;
191	hcp->lbucket = BUCKET_INVALID;
192	LOCK_INIT(hcp->lock);
193	hcp->lock_mode = DB_LOCK_NG;
194	hcp->dup_off = 0;
195	hcp->dup_len = 0;
196	hcp->dup_tlen = 0;
197	hcp->seek_size = 0;
198	hcp->seek_found_page = PGNO_INVALID;
199	hcp->seek_found_indx = NDX_INVALID;
200	hcp->flags = 0;
201
202	hcp->pgno = PGNO_INVALID;
203	hcp->indx = NDX_INVALID;
204	hcp->page = NULL;
205
206	return (ret);
207}
208
209/*
210 * Returns the last item in a bucket.
211 *
212 * PUBLIC: int __ham_item_last __P((DBC *, db_lockmode_t, db_pgno_t *));
213 */
214int
215__ham_item_last(dbc, mode, pgnop)
216	DBC *dbc;
217	db_lockmode_t mode;
218	db_pgno_t *pgnop;
219{
220	HASH_CURSOR *hcp;
221	int ret;
222
223	hcp = (HASH_CURSOR *)dbc->internal;
224	if ((ret = __ham_item_reset(dbc)) != 0)
225		return (ret);
226
227	hcp->bucket = hcp->hdr->max_bucket;
228	hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
229	F_SET(hcp, H_OK);
230	return (__ham_item_prev(dbc, mode, pgnop));
231}
232
233/*
234 * PUBLIC: int __ham_item_first __P((DBC *, db_lockmode_t, db_pgno_t *));
235 */
236int
237__ham_item_first(dbc, mode, pgnop)
238	DBC *dbc;
239	db_lockmode_t mode;
240	db_pgno_t *pgnop;
241{
242	HASH_CURSOR *hcp;
243	int ret;
244
245	hcp = (HASH_CURSOR *)dbc->internal;
246	if ((ret = __ham_item_reset(dbc)) != 0)
247		return (ret);
248	F_SET(hcp, H_OK);
249	hcp->bucket = 0;
250	hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
251	return (__ham_item_next(dbc, mode, pgnop));
252}
253
254/*
255 * __ham_item_prev --
256 *	Returns a pointer to key/data pair on a page.  In the case of
257 *	bigkeys, just returns the page number and index of the bigkey
258 *	pointer pair.
259 *
260 * PUBLIC: int __ham_item_prev __P((DBC *, db_lockmode_t, db_pgno_t *));
261 */
262int
263__ham_item_prev(dbc, mode, pgnop)
264	DBC *dbc;
265	db_lockmode_t mode;
266	db_pgno_t *pgnop;
267{
268	DB *dbp;
269	HASH_CURSOR *hcp;
270	db_pgno_t next_pgno;
271	int ret;
272
273	hcp = (HASH_CURSOR *)dbc->internal;
274	dbp = dbc->dbp;
275
276	/*
277	 * There are 5 cases for backing up in a hash file.
278	 * Case 1: In the middle of a page, no duplicates, just dec the index.
279	 * Case 2: In the middle of a duplicate set, back up one.
280	 * Case 3: At the beginning of a duplicate set, get out of set and
281	 *	back up to next key.
282	 * Case 4: At the beginning of a page; go to previous page.
283	 * Case 5: At the beginning of a bucket; go to prev bucket.
284	 */
285	F_CLR(hcp, H_OK | H_NOMORE | H_DELETED);
286
287	if ((ret = __ham_get_cpage(dbc, mode)) != 0)
288		return (ret);
289
290	/*
291	 * First handle the duplicates.  Either you'll get the key here
292	 * or you'll exit the duplicate set and drop into the code below
293	 * to handle backing up through keys.
294	 */
295	if (!F_ISSET(hcp, H_NEXT_NODUP) && F_ISSET(hcp, H_ISDUP)) {
296		if (HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) ==
297		    H_OFFDUP) {
298			memcpy(pgnop,
299			    HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)),
300			    sizeof(db_pgno_t));
301			F_SET(hcp, H_OK);
302			return (0);
303		}
304
305		/* Duplicates are on-page. */
306		if (hcp->dup_off != 0) {
307			memcpy(&hcp->dup_len, HKEYDATA_DATA(
308			    H_PAIRDATA(dbp, hcp->page, hcp->indx))
309			    + hcp->dup_off - sizeof(db_indx_t),
310			    sizeof(db_indx_t));
311			hcp->dup_off -=
312			    DUP_SIZE(hcp->dup_len);
313			return (__ham_item(dbc, mode, pgnop));
314		}
315	}
316
317	/*
318	 * If we get here, we are not in a duplicate set, and just need
319	 * to back up the cursor.  There are still three cases:
320	 * midpage, beginning of page, beginning of bucket.
321	 */
322
323	if (F_ISSET(hcp, H_DUPONLY)) {
324		F_CLR(hcp, H_OK);
325		F_SET(hcp, H_NOMORE);
326		return (0);
327	} else
328		/*
329		 * We are no longer in a dup set;  flag this so the dup code
330		 * will reinitialize should we stumble upon another one.
331		 */
332		F_CLR(hcp, H_ISDUP);
333
334	if (hcp->indx == 0) {		/* Beginning of page. */
335		hcp->pgno = PREV_PGNO(hcp->page);
336		if (hcp->pgno == PGNO_INVALID) {
337			/* Beginning of bucket. */
338			F_SET(hcp, H_NOMORE);
339			return (DB_NOTFOUND);
340		} else if ((ret =
341		    __ham_next_cpage(dbc, hcp->pgno)) != 0)
342			return (ret);
343		else
344			hcp->indx = NUM_ENT(hcp->page);
345	}
346
347	/*
348	 * Either we've got the cursor set up to be decremented, or we
349	 * have to find the end of a bucket.
350	 */
351	if (hcp->indx == NDX_INVALID) {
352		DB_ASSERT(dbp->env, hcp->page != NULL);
353
354		hcp->indx = NUM_ENT(hcp->page);
355		for (next_pgno = NEXT_PGNO(hcp->page);
356		    next_pgno != PGNO_INVALID;
357		    next_pgno = NEXT_PGNO(hcp->page)) {
358			if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0)
359				return (ret);
360			hcp->indx = NUM_ENT(hcp->page);
361		}
362
363		if (hcp->indx == 0) {
364			/* Bucket was empty. */
365			F_SET(hcp, H_NOMORE);
366			return (DB_NOTFOUND);
367		}
368	}
369
370	hcp->indx -= 2;
371
372	return (__ham_item(dbc, mode, pgnop));
373}
374
375/*
376 * Sets the cursor to the next key/data pair on a page.
377 *
378 * PUBLIC: int __ham_item_next __P((DBC *, db_lockmode_t, db_pgno_t *));
379 */
380int
381__ham_item_next(dbc, mode, pgnop)
382	DBC *dbc;
383	db_lockmode_t mode;
384	db_pgno_t *pgnop;
385{
386	HASH_CURSOR *hcp;
387	int ret;
388
389	hcp = (HASH_CURSOR *)dbc->internal;
390
391	if ((ret = __ham_get_cpage(dbc, mode)) != 0)
392		return (ret);
393
394	/*
395	 * Deleted on-page duplicates are a weird case. If we delete the last
396	 * one, then our cursor is at the very end of a duplicate set and
397	 * we actually need to go on to the next key.
398	 */
399	if (F_ISSET(hcp, H_DELETED)) {
400		if (hcp->indx != NDX_INVALID &&
401		    F_ISSET(hcp, H_ISDUP) &&
402		    HPAGE_TYPE(dbc->dbp, hcp->page, H_DATAINDEX(hcp->indx))
403			== H_DUPLICATE && hcp->dup_tlen == hcp->dup_off) {
404			if (F_ISSET(hcp, H_DUPONLY)) {
405				F_CLR(hcp, H_OK);
406				F_SET(hcp, H_NOMORE);
407				return (0);
408			} else {
409				F_CLR(hcp, H_ISDUP);
410				hcp->indx += 2;
411			}
412		} else if (!F_ISSET(hcp, H_ISDUP) && F_ISSET(hcp, H_DUPONLY)) {
413			F_CLR(hcp, H_OK);
414			F_SET(hcp, H_NOMORE);
415			return (0);
416		} else if (F_ISSET(hcp, H_ISDUP) &&
417		    F_ISSET(hcp, H_NEXT_NODUP)) {
418			F_CLR(hcp, H_ISDUP);
419			hcp->indx += 2;
420		}
421		F_CLR(hcp, H_DELETED);
422	} else if (hcp->indx == NDX_INVALID) {
423		hcp->indx = 0;
424		F_CLR(hcp, H_ISDUP);
425	} else if (F_ISSET(hcp, H_NEXT_NODUP)) {
426		hcp->indx += 2;
427		F_CLR(hcp, H_ISDUP);
428	} else if (F_ISSET(hcp, H_ISDUP) && hcp->dup_tlen != 0) {
429		if (hcp->dup_off + DUP_SIZE(hcp->dup_len) >=
430		    hcp->dup_tlen && F_ISSET(hcp, H_DUPONLY)) {
431			F_CLR(hcp, H_OK);
432			F_SET(hcp, H_NOMORE);
433			return (0);
434		}
435		hcp->dup_off += DUP_SIZE(hcp->dup_len);
436		if (hcp->dup_off >= hcp->dup_tlen) {
437			F_CLR(hcp, H_ISDUP);
438			hcp->indx += 2;
439		}
440	} else if (F_ISSET(hcp, H_DUPONLY)) {
441		F_CLR(hcp, H_OK);
442		F_SET(hcp, H_NOMORE);
443		return (0);
444	} else {
445		hcp->indx += 2;
446		F_CLR(hcp, H_ISDUP);
447	}
448
449	return (__ham_item(dbc, mode, pgnop));
450}
451
452/*
453 * __ham_insertpair --
454 *
455 * Used for adding a pair of elements to a sorted page. We are guaranteed that
456 * the pair will fit on this page.
457 *
458 * If an index is provided, then use it, otherwise lookup the index using
459 * __ham_getindex. This saves a getindex call when inserting using a cursor.
460 *
461 * We're overloading the meaning of the H_OFFPAGE type here, which is a little
462 * bit sleazy. When we recover deletes, we have the entire entry instead of
463 * having only the DBT, so we'll pass type H_OFFPAGE to mean "copy the whole
464 * entry" as opposed to constructing an H_KEYDATA around it. In the recovery
465 * case it is assumed that a valid index is passed in, since a lookup using
466 * the overloaded H_OFFPAGE key will be incorrect.
467 *
468 * PUBLIC: int __ham_insertpair __P((DBC *,
469 * PUBLIC:     PAGE *p, db_indx_t *indxp, const DBT *, const DBT *, int, int));
470 */
471int
472__ham_insertpair(dbc, p, indxp, key_dbt, data_dbt, key_type, data_type)
473	DBC *dbc;
474	PAGE *p;
475	db_indx_t *indxp;
476	const DBT *key_dbt, *data_dbt;
477	int key_type, data_type;
478{
479	DB *dbp;
480	u_int16_t n, indx;
481	db_indx_t *inp;
482	u_int32_t ksize, dsize, increase, distance;
483	u_int8_t *offset;
484	int i, match, ret;
485
486	dbp = dbc->dbp;
487	n = NUM_ENT(p);
488	inp = P_INP(dbp, p);
489	ksize = (key_type == H_OFFPAGE) ?
490	    key_dbt->size : HKEYDATA_SIZE(key_dbt->size);
491	dsize = (data_type == H_OFFPAGE) ?
492	    data_dbt->size : HKEYDATA_SIZE(data_dbt->size);
493	increase = ksize + dsize;
494
495	if (indxp != NULL && *indxp != NDX_INVALID)
496		indx = *indxp;
497	else {
498		if ((ret = __ham_getindex(dbc, p, key_dbt,
499		    key_type, &match, &indx)) != 0)
500			return (ret);
501		/* Save the index for the caller */
502		if (indxp != NULL)
503			*indxp = indx;
504		/* It is an error to insert a duplicate key */
505		DB_ASSERT(dbp->env, match != 0);
506	}
507
508	/* Special case if the page is empty or inserting at end of page.*/
509	if (n == 0 || indx == n) {
510		inp[indx] = HOFFSET(p) - ksize;
511		inp[indx+1] = HOFFSET(p) - increase;
512	} else {
513		/*
514		 * Shuffle the data elements.
515		 *
516		 * For example, inserting an element that sorts between items
517		 * 2 and 3 on a page:
518		 * The copy starts from the beginning of the second item.
519		 *
520		 * ---------------------------
521		 * |pgheader..
522		 * |__________________________
523		 * ||1|2|3|4|...
524		 * |--------------------------
525		 * |
526		 * |__________________________
527		 * |              ...|4|3|2|1|
528		 * |--------------------------
529		 * ---------------------------
530		 *
531		 * Becomes:
532		 *
533		 * ---------------------------
534		 * |pgheader..
535		 * |__________________________
536		 * ||1|2|2a|3|4|...
537		 * |--------------------------
538		 * |
539		 * |__________________________
540		 * |           ...|4|3|2a|2|1|
541		 * |--------------------------
542		 * ---------------------------
543		 *
544		 * Index's 3,4 etc move down the page.
545		 * The data for 3,4,etc moves up the page by sizeof(2a)
546		 * The index pointers in 3,4 etc are updated to point at the
547		 * relocated data.
548		 * It is necessary to move the data (not just adjust the index)
549		 * since the hash format uses consecutive data items to
550		 * dynamically calculate the item size.
551		 * An item in this example is a key/data pair.
552		 */
553		offset = (u_int8_t *)p + HOFFSET(p);
554		if (indx == 0)
555			distance = dbp->pgsize - HOFFSET(p);
556		else
557			distance = (u_int32_t)
558			    (P_ENTRY(dbp, p, indx - 1) - offset);
559		memmove(offset - increase, offset, distance);
560
561		/* Shuffle the index array */
562		memmove(&inp[indx + 2], &inp[indx],
563		    (n - indx) * sizeof(db_indx_t));
564
565		/* update the index array */
566		for (i = indx + 2; i < n + 2; i++)
567			inp[i] -= increase;
568
569		/* set the new index elements. */
570		inp[indx] = (HOFFSET(p) - increase) + distance + dsize;
571		inp[indx + 1] = (HOFFSET(p) - increase) + distance;
572	}
573
574	HOFFSET(p) -= increase;
575	/* insert the new elements */
576	if (key_type == H_OFFPAGE)
577		memcpy(P_ENTRY(dbp, p, indx), key_dbt->data, key_dbt->size);
578	else
579		PUT_HKEYDATA(P_ENTRY(dbp, p, indx), key_dbt->data,
580		    key_dbt->size, key_type);
581	if (data_type == H_OFFPAGE)
582		memcpy(P_ENTRY(dbp, p, indx+1), data_dbt->data,
583		    data_dbt->size);
584	else
585		PUT_HKEYDATA(P_ENTRY(dbp, p, indx+1), data_dbt->data,
586		    data_dbt->size, data_type);
587	NUM_ENT(p) += 2;
588
589	/*
590	 * If debugging a sorted hash page problem, this is a good place to
591	 * insert a call to __ham_verify_sorted_page.
592	 * It used to be called when diagnostic mode was enabled, but that
593	 * causes problems in recovery if a custom comparator was used.
594	 */
595	return (0);
596}
597
598/*
599 * __hame_getindex --
600 *
601 * The key_type parameter overloads the entry type to allow for comparison of
602 * a key DBT that contains off-page data. A key that is not of type H_OFFPAGE
603 * might contain data larger than the page size, since this routine can be
604 * called with user-provided DBTs.
605 *
606 * PUBLIC: int __ham_getindex __P((DBC *,
607 * PUBLIC:     PAGE *, const DBT *, int, int *, db_indx_t *));
608 */
609int
610__ham_getindex(dbc, p, key, key_type, match, indx)
611	DBC *dbc;
612	PAGE *p;
613	const DBT *key;
614	int key_type, *match;
615	db_indx_t *indx;
616{
617	/* Since all entries are key/data pairs. */
618	DB_ASSERT(dbc->env, NUM_ENT(p)%2 == 0 );
619
620	/* Support pre 4.6 unsorted hash pages. */
621	if (p->type == P_HASH_UNSORTED)
622		return (__ham_getindex_unsorted(dbc, p, key, match, indx));
623	else
624		return (__ham_getindex_sorted(dbc,
625		    p, key, key_type, match, indx));
626}
627
628#undef	min
629#define	min(a, b) (((a) < (b)) ? (a) : (b))
630
631/*
632 * Perform a linear search of an unsorted (pre 4.6 format) hash page.
633 *
634 * This routine is never used to generate an index for insertion, because any
635 * unsorted page is sorted before we insert.
636 *
637 * Returns 0 if an exact match is found, with indx set to requested elem.
638 * Returns 1 if the item did not exist, indx is set to the last element on the
639 * page.
640 */
641static int
642__ham_getindex_unsorted(dbc, p, key, match, indx)
643	DBC *dbc;
644	PAGE *p;
645	const DBT *key;
646	int *match;
647	db_indx_t *indx;
648{
649	DB *dbp;
650	DBT pg_dbt;
651	DB_THREAD_INFO *ip;
652	DB_TXN *txn;
653	HASH *t;
654	db_pgno_t pgno;
655	int i, n, res, ret;
656	u_int32_t tlen;
657	u_int8_t *hk;
658
659	dbp = dbc->dbp;
660	txn = dbc->txn;
661	ip = dbc->thread_info;
662	n = NUM_ENT(p);
663	t = dbp->h_internal;
664	res = 1;
665
666	/* Do a linear search over the page looking for an exact match */
667	for (i = 0; i < n; i+=2) {
668		hk = H_PAIRKEY(dbp, p, i);
669		switch (HPAGE_PTYPE(hk)) {
670		case H_OFFPAGE:
671			/* extract item length from possibly unaligned DBT */
672			memcpy(&tlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
673			if (tlen == key->size) {
674				memcpy(&pgno,
675				    HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
676				if ((ret = __db_moff(dbp, ip, txn, key,
677				     pgno, tlen, t->h_compare, &res)) != 0)
678					return (ret);
679			}
680			break;
681		case H_KEYDATA:
682			if (t->h_compare != NULL) {
683				DB_INIT_DBT(pg_dbt,
684				    HKEYDATA_DATA(hk), key->size);
685				if (t->h_compare(
686				    dbp, key, &pg_dbt) != 0)
687					break;
688			} else if (key->size ==
689			    LEN_HKEY(dbp, p, dbp->pgsize, i))
690				res = memcmp(key->data, HKEYDATA_DATA(hk),
691				    key->size);
692			break;
693		case H_DUPLICATE:
694		case H_OFFDUP:
695			/*
696			 * These are errors because keys are never duplicated.
697			 */
698			 /* FALLTHROUGH */
699		default:
700			return (__db_pgfmt(dbp->env, PGNO(p)));
701		}
702		if (res == 0)
703			break;
704	}
705	*indx = i;
706	*match = (res == 0 ? 0 : 1);
707	return (0);
708}
709
710/*
711 * Perform a binary search of a sorted hash page for a key.
712 * Return 0 if an exact match is found, with indx set to requested elem.
713 * Return 1 if the item did not exist, indx will be set to the first element
714 * greater than the requested item.
715 */
716static int
717__ham_getindex_sorted(dbc, p, key, key_type, match, indxp)
718	DBC *dbc;
719	PAGE *p;
720	const DBT *key;
721	int key_type, *match;
722	db_indx_t *indxp;
723{
724	DB *dbp;
725	DBT tmp_dbt;
726	DB_THREAD_INFO *ip;
727	DB_TXN *txn;
728	HASH *t;
729	HOFFPAGE *offp;
730	db_indx_t indx;
731	db_pgno_t off_pgno, koff_pgno;
732	u_int32_t base, itemlen, lim, off_len;
733	u_int8_t *entry;
734	int res, ret;
735	void *data;
736
737	dbp = dbc->dbp;
738	txn = dbc->txn;
739	ip = dbc->thread_info;
740	DB_ASSERT(dbp->env, p->type == P_HASH );
741
742	t = dbp->h_internal;
743	/* Initialize so the return params are correct for empty pages. */
744	res = indx = 0;
745
746	/* Do a binary search for the element. */
747	DB_BINARY_SEARCH_FOR(base, lim, p, 2) {
748		DB_BINARY_SEARCH_INCR(indx, base, lim, 2);
749		data = HKEYDATA_DATA(H_PAIRKEY(dbp, p, indx));
750		/*
751		 * There are 4 cases here:
752		 *  1) Off page key, off page match
753		 *  2) Off page key, on page match
754		 *  3) On page key, off page match
755		 *  4) On page key, on page match
756		 */
757		entry = P_ENTRY(dbp, p, indx);
758		if (*entry == H_OFFPAGE) {
759			offp = (HOFFPAGE*)P_ENTRY(dbp, p, indx);
760			(void)__ua_memcpy(&itemlen, HOFFPAGE_TLEN(offp),
761			    sizeof(u_int32_t));
762			if (key_type == H_OFFPAGE) {
763				/*
764				 * Case 1.
765				 *
766				 * If both key and cmp DBTs refer to different
767				 * offpage items, it is necessary to compare
768				 * the content of the entries, in order to be
769				 * able to maintain a valid lexicographic sort
770				 * order.
771				 */
772				(void)__ua_memcpy(&koff_pgno,
773				    HOFFPAGE_PGNO(key->data),
774				    sizeof(db_pgno_t));
775				(void)__ua_memcpy(&off_pgno, HOFFPAGE_PGNO(offp),
776				    sizeof(db_pgno_t));
777				if (koff_pgno == off_pgno)
778					res = 0;
779				else {
780					memset(&tmp_dbt, 0, sizeof(tmp_dbt));
781					tmp_dbt.size = HOFFPAGE_SIZE;
782					tmp_dbt.data = offp;
783					if ((ret = __db_coff(dbp, ip, txn, key,
784					    &tmp_dbt, t->h_compare, &res))
785					    != 0)
786						return (ret);
787				}
788			} else {
789				/* Case 2 */
790				(void)__ua_memcpy(&off_pgno,
791				    HOFFPAGE_PGNO(offp), sizeof(db_pgno_t));
792				if ((ret = __db_moff(dbp, ip, txn, key,
793				    off_pgno, itemlen, t->h_compare, &res))
794				    != 0)
795					return (ret);
796			}
797		} else {
798			itemlen = LEN_HKEYDATA(dbp, p, dbp->pgsize, indx);
799			if (key_type == H_OFFPAGE) {
800				/* Case 3 */
801				tmp_dbt.data = data;
802				tmp_dbt.size = itemlen;
803				offp = (HOFFPAGE *)key->data;
804				(void)__ua_memcpy(&off_pgno,
805				    HOFFPAGE_PGNO(offp), sizeof(db_pgno_t));
806				(void)__ua_memcpy(&off_len, HOFFPAGE_TLEN(offp),
807				    sizeof(u_int32_t));
808				if ((ret = __db_moff(dbp, ip, txn, &tmp_dbt,
809				    off_pgno, off_len, t->h_compare,
810				    &res)) != 0)
811					return (ret);
812				/*
813				 * Since we switched the key/match parameters
814				 * in the __db_moff call, the result needs to
815				 * be inverted.
816				 */
817				res = -res;
818			} else if (t->h_compare != NULL) {
819				/* Case 4, with a user comparison func */
820				DB_INIT_DBT(tmp_dbt, data, itemlen);
821				res = t->h_compare(dbp, key, &tmp_dbt);
822			} else {
823				/* Case 4, without a user comparison func */
824				if ((res = memcmp(key->data, data,
825				    min(key->size, itemlen))) == 0)
826					res = itemlen > key->size ? 1 :
827					    (itemlen < key->size ? -1 : 0);
828			}
829		}
830		if (res == 0) {
831			/* Found a match */
832			*indxp = indx;
833			*match = 0;
834			return (0);
835		} else if (res > 0)
836			DB_BINARY_SEARCH_SHIFT_BASE(indx, base, lim, 2);
837	}
838	/*
839	 * If no match was found, and the comparison indicates that the
840	 * closest match was lexicographically less than the input key adjust
841	 * the insertion index to be after the index of the closest match.
842	 */
843	if (res > 0)
844	    indx += 2;
845	*indxp = indx;
846	*match = 1;
847	return (0);
848}
849
850/*
851 * PUBLIC: int __ham_verify_sorted_page __P((DB *,
852 * PUBLIC:      DB_THREAD_INFO *, DB_TXN *, PAGE *));
853 *
854 * The__ham_verify_sorted_page function is used to determine the correctness
855 * of sorted hash pages. The checks are used by verification, they are
856 * implemented in the hash code because they are also useful debugging aids.
857 */
858int
859__ham_verify_sorted_page (dbp, ip, txn, p)
860	DB *dbp;
861	DB_THREAD_INFO *ip;
862	DB_TXN *txn;
863	PAGE *p;
864{
865	DBT prev_dbt, curr_dbt;
866	ENV *env;
867	HASH *t;
868	db_pgno_t tpgno;
869	u_int32_t curr_len, prev_len, tlen;
870	u_int16_t *indxp;
871	db_indx_t i, n;
872	int res, ret;
873	char *prev, *curr;
874
875	/* Validate that next, prev pointers are OK */
876	n = NUM_ENT(p);
877	DB_ASSERT(dbp->env, n%2 == 0 );
878
879	env = dbp->env;
880	t = dbp->h_internal;
881
882	/* Disable verification if a custom comparator is supplied */
883	if (t->h_compare != NULL)
884	    return (0);
885
886	/* Iterate through page, ensuring order */
887	prev = (char *)HKEYDATA_DATA(H_PAIRKEY(dbp, p, 0));
888	prev_len = LEN_HKEYDATA(dbp, p, dbp->pgsize, 0);
889	for (i = 2; i < n; i+=2) {
890		curr = (char *)HKEYDATA_DATA(H_PAIRKEY(dbp, p, i));
891		curr_len = LEN_HKEYDATA(dbp, p, dbp->pgsize, i);
892
893		if (HPAGE_TYPE(dbp, p, i-2) == H_OFFPAGE &&
894		    HPAGE_TYPE(dbp, p, i) == H_OFFPAGE) {
895			memset(&prev_dbt, 0, sizeof(prev_dbt));
896			memset(&curr_dbt, 0, sizeof(curr_dbt));
897			prev_dbt.size = curr_dbt.size = HOFFPAGE_SIZE;
898			prev_dbt.data = H_PAIRKEY(dbp, p, i-2);
899			curr_dbt.data = H_PAIRKEY(dbp, p, i);
900			if ((ret = __db_coff(dbp, ip, txn,
901			    &prev_dbt, &curr_dbt, t->h_compare, &res)) != 0)
902				return (ret);
903		} else if (HPAGE_TYPE(dbp, p, i-2) == H_OFFPAGE) {
904			memset(&curr_dbt, 0, sizeof(curr_dbt));
905			curr_dbt.size = curr_len;
906			curr_dbt.data = H_PAIRKEY(dbp, p, i);
907			memcpy(&tlen, HOFFPAGE_TLEN(H_PAIRKEY(dbp, p, i-2)),
908			    sizeof(u_int32_t));
909			memcpy(&tpgno, HOFFPAGE_PGNO(H_PAIRKEY(dbp, p, i-2)),
910			    sizeof(db_pgno_t));
911			if ((ret = __db_moff(dbp, ip, txn,
912			    &curr_dbt, tpgno, tlen, t->h_compare, &res)) != 0)
913				return (ret);
914		} else if (HPAGE_TYPE(dbp, p, i) == H_OFFPAGE) {
915			memset(&prev_dbt, 0, sizeof(prev_dbt));
916			prev_dbt.size = prev_len;
917			prev_dbt.data = H_PAIRKEY(dbp, p, i);
918			memcpy(&tlen, HOFFPAGE_TLEN(H_PAIRKEY(dbp, p, i)),
919			    sizeof(u_int32_t));
920			memcpy(&tpgno, HOFFPAGE_PGNO(H_PAIRKEY(dbp, p, i)),
921			    sizeof(db_pgno_t));
922			if ((ret = __db_moff(dbp, ip, txn,
923			    &prev_dbt, tpgno, tlen, t->h_compare, &res)) != 0)
924				return (ret);
925		} else
926			res = memcmp(prev, curr, min(curr_len, prev_len));
927
928		if (res == 0 && curr_len > prev_len)
929			res = 1;
930		else if (res == 0 && curr_len < prev_len)
931			res = -1;
932
933		if (res >= 0) {
934			__db_msg(env, "key1: %s, key2: %s, len: %lu\n",
935			    (char *)prev, (char *)curr,
936			    (u_long)min(curr_len, prev_len));
937			__db_msg(env, "curroffset %lu\n", (u_long)i);
938			__db_msg(env, "indexes: ");
939			for (i = 0; i < n; i++) {
940				indxp = P_INP(dbp, p) + i;
941				__db_msg(env, "%04X, ", *indxp);
942			}
943			__db_msg(env, "\n");
944#ifdef HAVE_STATISTICS
945			if ((ret = __db_prpage(dbp, p, DB_PR_PAGE)) != 0)
946				return (ret);
947#endif
948			DB_ASSERT(dbp->env, res < 0);
949		}
950
951		prev = curr;
952		prev_len = curr_len;
953	}
954	return (0);
955}
956
957/*
958 * A wrapper for the __ham_sort_page function. Implements logging and cursor
959 * adjustments associated with sorting a page outside of recovery/upgrade.
960 */
961static int
962__ham_sort_page_cursor(dbc, page)
963	DBC *dbc;
964	PAGE *page;
965{
966	DB *dbp;
967	DBT page_dbt;
968	DB_LSN new_lsn;
969	HASH_CURSOR *hcp;
970	int ret;
971
972	dbp = dbc->dbp;
973	hcp = (HASH_CURSOR *)dbc->internal;
974
975	if (DBC_LOGGING(dbc)) {
976		page_dbt.size = dbp->pgsize;
977		page_dbt.data = page;
978		if ((ret = __ham_splitdata_log(dbp, dbc->txn,
979		    &new_lsn, 0, SORTPAGE, PGNO(page),
980		    &page_dbt, &LSN(page))) != 0)
981			return (ret);
982	} else
983		LSN_NOT_LOGGED(new_lsn);
984	/* Move lsn onto page. */
985	LSN(page) = new_lsn;	/* Structure assignment. */
986
987	/*
988	 * Invalidate the saved index, it needs to be retrieved
989	 * again once the page is sorted.
990	 */
991	hcp->seek_found_indx = NDX_INVALID;
992	hcp->seek_found_page = PGNO_INVALID;
993
994	return (__ham_sort_page(dbc, &hcp->split_buf, page));
995}
996
997/*
998 * PUBLIC: int __ham_sort_page __P((DBC *,  PAGE **, PAGE *));
999 *
1000 * Convert a page from P_HASH_UNSORTED into the sorted format P_HASH.
1001 *
1002 * All locking and logging is carried out be the caller. A user buffer can
1003 * optionally be passed in to save allocating a page size buffer for sorting.
1004 * This is allows callers to re-use the buffer pre-allocated for page splits
1005 * in the hash cursor. The buffer is optional since no cursor exists when in
1006 * the recovery or upgrade code paths.
1007 */
1008int
1009__ham_sort_page(dbc, tmp_buf, page)
1010	DBC *dbc;
1011	PAGE **tmp_buf;
1012	PAGE *page;
1013{
1014	DB *dbp;
1015	PAGE *temp_pagep;
1016	db_indx_t i;
1017	int ret;
1018
1019	dbp = dbc->dbp;
1020	DB_ASSERT(dbp->env, page->type == P_HASH_UNSORTED);
1021
1022	ret = 0;
1023	if (tmp_buf != NULL)
1024		temp_pagep = *tmp_buf;
1025	else if ((ret = __os_malloc(dbp->env, dbp->pgsize, &temp_pagep)) != 0)
1026	    return (ret);
1027
1028	memcpy(temp_pagep, page, dbp->pgsize);
1029
1030	/* Re-initialize the page. */
1031	P_INIT(page, dbp->pgsize,
1032	    page->pgno, page->prev_pgno, page->next_pgno, 0, P_HASH);
1033
1034	for (i = 0; i < NUM_ENT(temp_pagep); i += 2)
1035		if ((ret =
1036		    __ham_copypair(dbc, temp_pagep, i, page, NULL)) != 0)
1037			break;
1038
1039	if (tmp_buf == NULL)
1040		__os_free(dbp->env, temp_pagep);
1041
1042	return (ret);
1043}
1044
1045/*
1046 * PUBLIC: int __ham_del_pair __P((DBC *, int));
1047 */
1048int
1049__ham_del_pair(dbc, flags)
1050	DBC *dbc;
1051	int flags;
1052{
1053	DB *dbp;
1054	DBT data_dbt, key_dbt;
1055	DB_LSN new_lsn, *n_lsn, tmp_lsn;
1056	DB_MPOOLFILE *mpf;
1057	HASH_CURSOR *hcp;
1058	PAGE *n_pagep, *nn_pagep, *p, *p_pagep;
1059	db_ham_mode op;
1060	db_indx_t ndx;
1061	db_pgno_t chg_pgno, pgno, tmp_pgno;
1062	u_int32_t order;
1063	int ret, t_ret;
1064
1065	dbp = dbc->dbp;
1066	mpf = dbp->mpf;
1067	hcp = (HASH_CURSOR *)dbc->internal;
1068	n_pagep = p_pagep = nn_pagep = NULL;
1069	ndx = hcp->indx;
1070
1071	if (hcp->page == NULL &&
1072	    (ret = __memp_fget(mpf, &hcp->pgno, dbc->thread_info, dbc->txn,
1073	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &hcp->page)) != 0)
1074		return (ret);
1075	p = hcp->page;
1076
1077	/*
1078	 * We optimize for the normal case which is when neither the key nor
1079	 * the data are large.  In this case, we write a single log record
1080	 * and do the delete.  If either is large, we'll call __big_delete
1081	 * to remove the big item and then update the page to remove the
1082	 * entry referring to the big item.
1083	 */
1084	if (HPAGE_PTYPE(H_PAIRKEY(dbp, p, ndx)) == H_OFFPAGE) {
1085		memcpy(&pgno, HOFFPAGE_PGNO(P_ENTRY(dbp, p, H_KEYINDEX(ndx))),
1086		    sizeof(db_pgno_t));
1087		ret = __db_doff(dbc, pgno);
1088	} else
1089		ret = 0;
1090
1091	if (ret == 0)
1092		switch (HPAGE_PTYPE(H_PAIRDATA(dbp, p, ndx))) {
1093		case H_OFFPAGE:
1094			memcpy(&pgno,
1095			    HOFFPAGE_PGNO(P_ENTRY(dbp, p, H_DATAINDEX(ndx))),
1096			    sizeof(db_pgno_t));
1097			ret = __db_doff(dbc, pgno);
1098			break;
1099		case H_OFFDUP:
1100		case H_DUPLICATE:
1101			/*
1102			 * If we delete a pair that is/was a duplicate, then
1103			 * we had better clear the flag so that we update the
1104			 * cursor appropriately.
1105			 */
1106			F_CLR(hcp, H_ISDUP);
1107			break;
1108		default:
1109			/* No-op */
1110			break;
1111		}
1112
1113	if (ret)
1114		return (ret);
1115
1116	/* Now log the delete off this page. */
1117	if (DBC_LOGGING(dbc)) {
1118		key_dbt.data = P_ENTRY(dbp, p, H_KEYINDEX(ndx));
1119		key_dbt.size = LEN_HITEM(dbp, p, dbp->pgsize, H_KEYINDEX(ndx));
1120		data_dbt.data = P_ENTRY(dbp, p, H_DATAINDEX(ndx));
1121		data_dbt.size =
1122		    LEN_HITEM(dbp, p, dbp->pgsize, H_DATAINDEX(ndx));
1123
1124		if ((ret = __ham_insdel_log(dbp,
1125		    dbc->txn, &new_lsn, 0, DELPAIR, PGNO(p), (u_int32_t)ndx,
1126		    &LSN(p), &key_dbt, &data_dbt)) != 0)
1127			return (ret);
1128	} else
1129		LSN_NOT_LOGGED(new_lsn);
1130
1131	/* Move lsn onto page. */
1132	LSN(p) = new_lsn;
1133	/* Do the delete. */
1134	__ham_dpair(dbp, p, ndx);
1135
1136	/*
1137	 * Mark item deleted so that we don't try to return it, and
1138	 * so that we update the cursor correctly on the next call
1139	 * to next.
1140	 */
1141	F_SET(hcp, H_DELETED);
1142	F_CLR(hcp, H_OK);
1143
1144	/*
1145	 * If we are locking, we will not maintain this, because it is
1146	 * a hot spot.
1147	 *
1148	 * XXX
1149	 * Perhaps we can retain incremental numbers and apply them later.
1150	 */
1151	if (!STD_LOCKING(dbc)) {
1152		if ((ret = __ham_dirty_meta(dbc, 0)) != 0)
1153			return (ret);
1154		--hcp->hdr->nelem;
1155	}
1156
1157	/* The HAM_DEL_NO_CURSOR flag implies HAM_DEL_NO_RECLAIM. */
1158	if (LF_ISSET(HAM_DEL_NO_CURSOR))
1159		return (0);
1160	/*
1161	 * Update cursors that are on the page where the delete happened.
1162	 */
1163	if ((ret = __hamc_update(dbc, 0, DB_HAM_CURADJ_DEL, 0)) != 0)
1164		return (ret);
1165
1166	/*
1167	 * If we need to reclaim the page, then check if the page is empty.
1168	 * There are two cases.  If it's empty and it's not the first page
1169	 * in the bucket (i.e., the bucket page) then we can simply remove
1170	 * it. If it is the first chain in the bucket, then we need to copy
1171	 * the second page into it and remove the second page.
1172	 * If its the only page in the bucket we leave it alone.
1173	 */
1174	if (LF_ISSET(HAM_DEL_NO_RECLAIM) ||
1175	    NUM_ENT(p) != 0 ||
1176	    (PREV_PGNO(p) == PGNO_INVALID && NEXT_PGNO(p) == PGNO_INVALID))
1177		return (0);
1178
1179	if (PREV_PGNO(p) == PGNO_INVALID) {
1180		/*
1181		 * First page in chain is empty and we know that there
1182		 * are more pages in the chain.
1183		 */
1184		if ((ret = __memp_fget(mpf,
1185		    &NEXT_PGNO(p), dbc->thread_info, dbc->txn,
1186		    DB_MPOOL_DIRTY, &n_pagep)) != 0)
1187			return (ret);
1188
1189		if (NEXT_PGNO(n_pagep) != PGNO_INVALID &&
1190		    (ret = __memp_fget(mpf, &NEXT_PGNO(n_pagep),
1191		    dbc->thread_info, dbc->txn,
1192		    DB_MPOOL_DIRTY, &nn_pagep)) != 0)
1193			goto err;
1194
1195		if (DBC_LOGGING(dbc)) {
1196			key_dbt.data = n_pagep;
1197			key_dbt.size = dbp->pgsize;
1198			if ((ret = __ham_copypage_log(dbp,
1199			    dbc->txn, &new_lsn, 0, PGNO(p),
1200			    &LSN(p), PGNO(n_pagep), &LSN(n_pagep),
1201			    NEXT_PGNO(n_pagep),
1202			    nn_pagep == NULL ? NULL : &LSN(nn_pagep),
1203			    &key_dbt)) != 0)
1204				goto err;
1205		} else
1206			LSN_NOT_LOGGED(new_lsn);
1207
1208		/* Move lsn onto page. */
1209		LSN(p) = new_lsn;	/* Structure assignment. */
1210		LSN(n_pagep) = new_lsn;
1211		if (NEXT_PGNO(n_pagep) != PGNO_INVALID)
1212			LSN(nn_pagep) = new_lsn;
1213
1214		if (nn_pagep != NULL) {
1215			PREV_PGNO(nn_pagep) = PGNO(p);
1216			if ((ret = __memp_fput(mpf,
1217			    dbc->thread_info, nn_pagep, dbc->priority)) != 0) {
1218				nn_pagep = NULL;
1219				goto err;
1220			}
1221		}
1222
1223		tmp_pgno = PGNO(p);
1224		tmp_lsn = LSN(p);
1225		memcpy(p, n_pagep, dbp->pgsize);
1226		PGNO(p) = tmp_pgno;
1227		LSN(p) = tmp_lsn;
1228		PREV_PGNO(p) = PGNO_INVALID;
1229
1230		/*
1231		 * Update cursors to reflect the fact that records
1232		 * on the second page have moved to the first page.
1233		 */
1234		if ((ret = __hamc_delpg(dbc, PGNO(n_pagep),
1235		    PGNO(p), 0, DB_HAM_DELFIRSTPG, &order)) != 0)
1236			goto err;
1237
1238		/*
1239		 * Update the cursor to reflect its new position.
1240		 */
1241		hcp->indx = 0;
1242		hcp->pgno = PGNO(p);
1243		hcp->order += order;
1244
1245		if ((ret = __db_free(dbc, n_pagep)) != 0) {
1246			n_pagep = NULL;
1247			goto err;
1248		}
1249	} else {
1250		if ((ret = __memp_fget(mpf,
1251		    &PREV_PGNO(p), dbc->thread_info, dbc->txn,
1252		    DB_MPOOL_DIRTY, &p_pagep)) != 0)
1253			goto err;
1254
1255		if (NEXT_PGNO(p) != PGNO_INVALID) {
1256			if ((ret = __memp_fget(mpf, &NEXT_PGNO(p),
1257			dbc->thread_info, dbc->txn,
1258			    DB_MPOOL_DIRTY, &n_pagep)) != 0)
1259				goto err;
1260			n_lsn = &LSN(n_pagep);
1261		} else {
1262			n_pagep = NULL;
1263			n_lsn = NULL;
1264		}
1265
1266		NEXT_PGNO(p_pagep) = NEXT_PGNO(p);
1267		if (n_pagep != NULL)
1268			PREV_PGNO(n_pagep) = PGNO(p_pagep);
1269
1270		if (DBC_LOGGING(dbc)) {
1271			if ((ret = __ham_newpage_log(dbp, dbc->txn,
1272			    &new_lsn, 0, DELOVFL, PREV_PGNO(p), &LSN(p_pagep),
1273			    PGNO(p), &LSN(p), NEXT_PGNO(p), n_lsn)) != 0)
1274				goto err;
1275		} else
1276			LSN_NOT_LOGGED(new_lsn);
1277
1278		/* Move lsn onto page. */
1279		LSN(p_pagep) = new_lsn;	/* Structure assignment. */
1280		if (n_pagep)
1281			LSN(n_pagep) = new_lsn;
1282		LSN(p) = new_lsn;
1283
1284		if (NEXT_PGNO(p) == PGNO_INVALID) {
1285			/*
1286			 * There is no next page; put the cursor on the
1287			 * previous page as if we'd deleted the last item
1288			 * on that page, with index after the last valid
1289			 * entry.
1290			 *
1291			 * The deleted flag was set up above.
1292			 */
1293			hcp->pgno = PGNO(p_pagep);
1294			hcp->indx = NUM_ENT(p_pagep);
1295			op = DB_HAM_DELLASTPG;
1296		} else {
1297			/*
1298			 * There is a next page, so put the cursor at
1299			 * the beginning of it.
1300			 */
1301			hcp->pgno = NEXT_PGNO(p);
1302			hcp->indx = 0;
1303			op = DB_HAM_DELMIDPG;
1304		}
1305
1306		/*
1307		 * Since we are about to delete the cursor page and we have
1308		 * just moved the cursor, we need to make sure that the
1309		 * old page pointer isn't left hanging around in the cursor.
1310		 */
1311		hcp->page = NULL;
1312		chg_pgno = PGNO(p);
1313		ret = __db_free(dbc, p);
1314		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
1315		       p_pagep, dbc->priority)) != 0 && ret == 0)
1316			ret = t_ret;
1317		if (n_pagep != NULL && (t_ret = __memp_fput(mpf,
1318		    dbc->thread_info, n_pagep, dbc->priority)) != 0 && ret == 0)
1319			ret = t_ret;
1320		if (ret != 0)
1321			return (ret);
1322		if ((ret = __hamc_delpg(dbc,
1323		    chg_pgno, hcp->pgno, hcp->indx, op, &order)) != 0)
1324			return (ret);
1325		hcp->order += order;
1326	}
1327	return (ret);
1328
1329err:	/* Clean up any pages. */
1330	if (n_pagep != NULL)
1331		(void)__memp_fput(mpf,
1332		    dbc->thread_info, n_pagep, dbc->priority);
1333	if (nn_pagep != NULL)
1334		(void)__memp_fput(mpf,
1335		    dbc->thread_info, nn_pagep, dbc->priority);
1336	if (p_pagep != NULL)
1337		(void)__memp_fput(mpf,
1338		    dbc->thread_info, p_pagep, dbc->priority);
1339	return (ret);
1340}
1341
1342/*
1343 * __ham_replpair --
1344 *	Given the key data indicated by the cursor, replace part/all of it
1345 *	according to the fields in the dbt.
1346 *
1347 * PUBLIC: int __ham_replpair __P((DBC *, DBT *, u_int32_t));
1348 */
1349int
1350__ham_replpair(dbc, dbt, make_dup)
1351	DBC *dbc;
1352	DBT *dbt;
1353	u_int32_t make_dup;
1354{
1355	DB *dbp;
1356	DBC **carray, *dbc_n;
1357	DBT old_dbt, tdata, tmp, *new_dbt;
1358	DB_LSN	new_lsn;
1359	ENV *env;
1360	HASH_CURSOR *hcp, *cp;
1361	db_indx_t orig_indx;
1362	db_pgno_t orig_pgno;
1363	u_int32_t change;
1364	u_int32_t dup_flag, len, memsize, newlen;
1365	int beyond_eor, is_big, is_plus, ret, type, i, found, t_ret;
1366	u_int8_t *beg, *dest, *end, *hk, *src;
1367	void *memp;
1368
1369	/*
1370	 * Items that were already offpage (ISBIG) were handled before
1371	 * we get in here.  So, we need only handle cases where the old
1372	 * key is on a regular page.  That leaves us 6 cases:
1373	 * 1. Original data onpage; new data is smaller
1374	 * 2. Original data onpage; new data is the same size
1375	 * 3. Original data onpage; new data is bigger, but not ISBIG,
1376	 *    fits on page
1377	 * 4. Original data onpage; new data is bigger, but not ISBIG,
1378	 *    does not fit on page
1379	 * 5. Original data onpage; New data is an off-page item.
1380	 * 6. Original data was offpage; new item is smaller.
1381	 *
1382	 * Cases 1-3 are essentially the same (and should be the common case).
1383	 * We handle 4-6 as delete and add.
1384	 */
1385	dbp = dbc->dbp;
1386	env = dbp->env;
1387	hcp = (HASH_CURSOR *)dbc->internal;
1388	found = 0;
1389	dbc_n = memp = NULL;
1390	carray = NULL;
1391
1392	/*
1393	 * We need to compute the number of bytes that we are adding or
1394	 * removing from the entry.  Normally, we can simply subtract
1395	 * the number of bytes we are replacing (dbt->dlen) from the
1396	 * number of bytes we are inserting (dbt->size).  However, if
1397	 * we are doing a partial put off the end of a record, then this
1398	 * formula doesn't work, because we are essentially adding
1399	 * new bytes.
1400	 */
1401	if (dbt->size > dbt->dlen) {
1402		change = dbt->size - dbt->dlen;
1403		is_plus = 1;
1404	} else {
1405		change = dbt->dlen - dbt->size;
1406		is_plus = 0;
1407	}
1408
1409	hk = H_PAIRDATA(dbp, hcp->page, hcp->indx);
1410	is_big = HPAGE_PTYPE(hk) == H_OFFPAGE;
1411
1412	if (is_big)
1413		memcpy(&len, HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
1414	else
1415		len = LEN_HKEYDATA(dbp, hcp->page,
1416		    dbp->pgsize, H_DATAINDEX(hcp->indx));
1417
1418	beyond_eor = dbt->doff + dbt->dlen > len;
1419	if (beyond_eor) {
1420		/*
1421		 * The change is beyond the end of record.  If change
1422		 * is a positive number, we can simply add the extension
1423		 * to it.  However, if change is negative, then we need
1424		 * to figure out if the extension is larger than the
1425		 * negative change.
1426		 */
1427		if (is_plus)
1428			change += dbt->doff + dbt->dlen - len;
1429		else if (dbt->doff + dbt->dlen - len > change) {
1430			/* Extension bigger than change */
1431			is_plus = 1;
1432			change = (dbt->doff + dbt->dlen - len) - change;
1433		} else /* Extension is smaller than change. */
1434			change -= (dbt->doff + dbt->dlen - len);
1435	}
1436
1437	newlen = (is_plus ? len + change : len - change);
1438	if (ISBIG(hcp, newlen) ||
1439	    (is_plus && change > P_FREESPACE(dbp, hcp->page)) ||
1440	    beyond_eor || is_big) {
1441		/*
1442		 * If we are in cases 4 or 5 then is_plus will be true.
1443		 * If we don't have a transaction then we cannot roll back,
1444		 * make sure there is enough room for the new page.
1445		 */
1446		if (is_plus && dbc->txn == NULL &&
1447		    dbp->mpf->mfp->maxpgno != 0 &&
1448		    dbp->mpf->mfp->maxpgno == dbp->mpf->mfp->last_pgno)
1449			return (__db_space_err(dbp));
1450		/*
1451		 * Cases 4-6 -- two subcases.
1452		 * A. This is not really a partial operation, but an overwrite.
1453		 *    Simple del and add works.
1454		 * B. This is a partial and we need to construct the data that
1455		 *    we are really inserting (yuck).
1456		 * In both cases, we need to grab the key off the page (in
1457		 * some cases we could do this outside of this routine; for
1458		 * cleanliness we do it here.  If you happen to be on a big
1459		 * key, this could be a performance hit).
1460		 */
1461		memset(&tmp, 0, sizeof(tmp));
1462		if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn,
1463		    hcp->page, H_KEYINDEX(hcp->indx), &tmp,
1464		    &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0)
1465			return (ret);
1466
1467		/*
1468		 * In cases 4-6, a delete and insert works, but we need to
1469		 * track and update any cursors pointing to the item being
1470		 * moved.
1471		 */
1472		orig_pgno = PGNO(hcp->page);
1473		orig_indx = hcp->indx;
1474		if ((ret = __ham_get_clist(dbp,
1475		    orig_pgno, orig_indx, &carray)) != 0)
1476			goto err;
1477
1478		/* Preserve duplicate info. */
1479		dup_flag = F_ISSET(hcp, H_ISDUP);
1480		if (dbt->doff == 0 && dbt->dlen == len) {
1481			type = (dup_flag ? H_DUPLICATE : H_KEYDATA);
1482			new_dbt = dbt;
1483		} else {					/* Case B */
1484			type = HPAGE_PTYPE(hk) != H_OFFPAGE ?
1485			    HPAGE_PTYPE(hk) : H_KEYDATA;
1486			memset(&tdata, 0, sizeof(tdata));
1487			memsize = 0;
1488			if ((ret = __db_ret(dbp, dbc->thread_info,
1489			    dbc->txn, hcp->page, H_DATAINDEX(hcp->indx),
1490			    &tdata, &memp, &memsize)) != 0)
1491				goto err;
1492
1493			/* Now shift old data around to make room for new. */
1494			if (is_plus) {
1495				if ((ret = __os_realloc(env,
1496				    tdata.size + change, &tdata.data)) != 0)
1497					return (ret);
1498				memp = tdata.data;
1499				memsize = tdata.size + change;
1500				memset((u_int8_t *)tdata.data + tdata.size,
1501				    0, change);
1502			}
1503			end = (u_int8_t *)tdata.data + tdata.size;
1504
1505			src = (u_int8_t *)tdata.data + dbt->doff + dbt->dlen;
1506			if (src < end && tdata.size > dbt->doff + dbt->dlen) {
1507				len = tdata.size - (dbt->doff + dbt->dlen);
1508				if (is_plus)
1509					dest = src + change;
1510				else
1511					dest = src - change;
1512				memmove(dest, src, len);
1513			}
1514			memcpy((u_int8_t *)tdata.data + dbt->doff,
1515			    dbt->data, dbt->size);
1516			if (is_plus)
1517				tdata.size += change;
1518			else
1519				tdata.size -= change;
1520			new_dbt = &tdata;
1521		}
1522		if ((ret = __ham_del_pair(dbc, HAM_DEL_NO_CURSOR)) != 0)
1523			goto err;
1524		/*
1525		 * Save the state of the cursor after the delete, so that we
1526		 * can adjust any cursors impacted by the delete. Don't just
1527		 * update the cursors now, to avoid ambiguity in reversing the
1528		 * adjustments during abort.
1529		 */
1530		if ((ret = __dbc_dup(dbc, &dbc_n, DB_POSITION)) != 0)
1531			goto err;
1532		if ((ret = __ham_add_el(dbc, &tmp, new_dbt, type)) != 0)
1533			goto err;
1534		F_SET(hcp, dup_flag);
1535
1536		/*
1537		 * If the delete/insert pair caused the item to be moved
1538		 * to another location (which is possible for duplicate sets
1539		 * that are moved onto another page in the bucket), then update
1540		 * any impacted cursors.
1541		 */
1542		if (((HASH_CURSOR*)dbc_n->internal)->pgno != hcp->pgno ||
1543		    ((HASH_CURSOR*)dbc_n->internal)->indx != hcp->indx) {
1544			/*
1545			 * Set any cursors pointing to items in the moved
1546			 * duplicate set to the destination location and reset
1547			 * the deleted flag. This can't be done earlier, since
1548			 * the insert location is not computed until the actual
1549			 * __ham_add_el call is made.
1550			 */
1551			if (carray != NULL) {
1552				for (i = 0; carray[i] != NULL; i++) {
1553					cp = (HASH_CURSOR*)carray[i]->internal;
1554					cp->pgno = hcp->pgno;
1555					cp->indx = hcp->indx;
1556					F_CLR(cp, H_DELETED);
1557					found = 1;
1558				}
1559				/*
1560				 * Only log the update once, since the recovery
1561				 * code iterates through all open cursors and
1562				 * applies the change to all matching cursors.
1563				 */
1564				if (found && DBC_LOGGING(dbc) &&
1565				    IS_SUBTRANSACTION(dbc->txn)) {
1566					if ((ret =
1567					    __ham_chgpg_log(dbp,
1568					    dbc->txn, &new_lsn, 0,
1569					    DB_HAM_CHGPG, orig_pgno, hcp->pgno,
1570					    orig_indx, hcp->indx)) != 0)
1571						goto err;
1572				}
1573			}
1574			/*
1575			 * Update any cursors impacted by the delete. Do this
1576			 * after chgpg log so that recovery does not re-bump
1577			 * cursors pointing to the deleted item.
1578			 */
1579			ret = __hamc_update(dbc_n, 0, DB_HAM_CURADJ_DEL, 0);
1580		}
1581
1582err:		if (dbc_n != NULL && (t_ret = __dbc_close(dbc_n)) != 0 &&
1583		    ret == 0)
1584			ret = t_ret;
1585		if (carray != NULL)
1586			__os_free(env, carray);
1587		if (memp != NULL)
1588			__os_free(env, memp);
1589		return (ret);
1590	}
1591
1592	/*
1593	 * Set up pointer into existing data. Do it before the log
1594	 * message so we can use it inside of the log setup.
1595	 */
1596	beg = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx));
1597	beg += dbt->doff;
1598
1599	/*
1600	 * If we are going to have to move bytes at all, figure out
1601	 * all the parameters here.  Then log the call before moving
1602	 * anything around.
1603	 */
1604	if (DBC_LOGGING(dbc)) {
1605		old_dbt.data = beg;
1606		old_dbt.size = dbt->dlen;
1607		if ((ret = __ham_replace_log(dbp,
1608		    dbc->txn, &new_lsn, 0, PGNO(hcp->page),
1609		    (u_int32_t)H_DATAINDEX(hcp->indx), &LSN(hcp->page),
1610		    (int32_t)dbt->doff, &old_dbt, dbt, make_dup)) != 0)
1611			return (ret);
1612	} else
1613		LSN_NOT_LOGGED(new_lsn);
1614
1615	LSN(hcp->page) = new_lsn;	/* Structure assignment. */
1616
1617	__ham_onpage_replace(dbp, hcp->page, (u_int32_t)H_DATAINDEX(hcp->indx),
1618	    (int32_t)dbt->doff, change, is_plus, dbt);
1619
1620	return (0);
1621}
1622
1623/*
1624 * Replace data on a page with new data, possibly growing or shrinking what's
1625 * there.  This is called on two different occasions. On one (from replpair)
1626 * we are interested in changing only the data.  On the other (from recovery)
1627 * we are replacing the entire data (header and all) with a new element.  In
1628 * the latter case, the off argument is negative.
1629 * pagep: the page that we're changing
1630 * ndx: page index of the element that is growing/shrinking.
1631 * off: Offset at which we are beginning the replacement.
1632 * change: the number of bytes (+ or -) that the element is growing/shrinking.
1633 * dbt: the new data that gets written at beg.
1634 *
1635 * PUBLIC: void __ham_onpage_replace __P((DB *, PAGE *, u_int32_t,
1636 * PUBLIC:     int32_t, u_int32_t,  int, DBT *));
1637 */
1638void
1639__ham_onpage_replace(dbp, pagep, ndx, off, change, is_plus, dbt)
1640	DB *dbp;
1641	PAGE *pagep;
1642	u_int32_t ndx;
1643	int32_t off;
1644	u_int32_t change;
1645	int is_plus;
1646	DBT *dbt;
1647{
1648	db_indx_t i, *inp;
1649	int32_t len;
1650	size_t pgsize;
1651	u_int8_t *src, *dest;
1652	int zero_me;
1653
1654	pgsize = dbp->pgsize;
1655	inp = P_INP(dbp, pagep);
1656	if (change != 0) {
1657		zero_me = 0;
1658		src = (u_int8_t *)(pagep) + HOFFSET(pagep);
1659		if (off < 0)
1660			len = inp[ndx] - HOFFSET(pagep);
1661		else if ((u_int32_t)off >=
1662		    LEN_HKEYDATA(dbp, pagep, pgsize, ndx)) {
1663			len = (int32_t)(HKEYDATA_DATA(P_ENTRY(dbp, pagep, ndx))
1664			    + LEN_HKEYDATA(dbp, pagep, pgsize, ndx) - src);
1665			zero_me = 1;
1666		} else
1667			len = (int32_t)(
1668			    (HKEYDATA_DATA(P_ENTRY(dbp, pagep, ndx)) + off) -
1669			    src);
1670		if (is_plus)
1671			dest = src - change;
1672		else
1673			dest = src + change;
1674		memmove(dest, src, (size_t)len);
1675		if (zero_me)
1676			memset(dest + len, 0, change);
1677
1678		/* Now update the indices. */
1679		for (i = ndx; i < NUM_ENT(pagep); i++) {
1680			if (is_plus)
1681				inp[i] -= change;
1682			else
1683				inp[i] += change;
1684		}
1685		if (is_plus)
1686			HOFFSET(pagep) -= change;
1687		else
1688			HOFFSET(pagep) += change;
1689	}
1690	if (off >= 0)
1691		memcpy(HKEYDATA_DATA(P_ENTRY(dbp, pagep, ndx)) + off,
1692		    dbt->data, dbt->size);
1693	else
1694		memcpy(P_ENTRY(dbp, pagep, ndx), dbt->data, dbt->size);
1695}
1696
1697/*
1698 * PUBLIC: int __ham_split_page __P((DBC *, u_int32_t, u_int32_t));
1699 */
1700int
1701__ham_split_page(dbc, obucket, nbucket)
1702	DBC *dbc;
1703	u_int32_t obucket, nbucket;
1704{
1705	DB *dbp;
1706	DBC **carray, *tmp_dbc;
1707	DBT key, page_dbt;
1708	DB_LOCK block;
1709	DB_LSN new_lsn;
1710	DB_MPOOLFILE *mpf;
1711	ENV *env;
1712	HASH_CURSOR *hcp, *cp;
1713	PAGE **pp, *old_pagep, *temp_pagep, *new_pagep;
1714	db_indx_t n, dest_indx;
1715	db_pgno_t bucket_pgno, npgno, next_pgno;
1716	u_int32_t big_len, len;
1717	int found, i, ret, t_ret;
1718	void *big_buf;
1719
1720	dbp = dbc->dbp;
1721	carray = NULL;
1722	env = dbp->env;
1723	mpf = dbp->mpf;
1724	hcp = (HASH_CURSOR *)dbc->internal;
1725	temp_pagep = old_pagep = new_pagep = NULL;
1726	npgno = PGNO_INVALID;
1727	LOCK_INIT(block);
1728
1729	bucket_pgno = BUCKET_TO_PAGE(hcp, obucket);
1730	if ((ret = __db_lget(dbc,
1731	    0, bucket_pgno, DB_LOCK_WRITE, 0, &block)) != 0)
1732		goto err;
1733	if ((ret = __memp_fget(mpf, &bucket_pgno, dbc->thread_info, dbc->txn,
1734	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &old_pagep)) != 0)
1735		goto err;
1736
1737	/* Sort any unsorted pages before doing a hash split. */
1738	if (old_pagep->type == P_HASH_UNSORTED)
1739		if ((ret = __ham_sort_page_cursor(dbc, old_pagep)) != 0)
1740			return (ret);
1741
1742	/* Properly initialize the new bucket page. */
1743	npgno = BUCKET_TO_PAGE(hcp, nbucket);
1744	if ((ret = __memp_fget(mpf, &npgno, dbc->thread_info, dbc->txn,
1745	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &new_pagep)) != 0)
1746		goto err;
1747	P_INIT(new_pagep,
1748	    dbp->pgsize, npgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH);
1749
1750	temp_pagep = hcp->split_buf;
1751	memcpy(temp_pagep, old_pagep, dbp->pgsize);
1752
1753	if (DBC_LOGGING(dbc)) {
1754		page_dbt.size = dbp->pgsize;
1755		page_dbt.data = old_pagep;
1756		if ((ret = __ham_splitdata_log(dbp,
1757		    dbc->txn, &new_lsn, 0, SPLITOLD,
1758		    PGNO(old_pagep), &page_dbt, &LSN(old_pagep))) != 0)
1759			goto err;
1760	} else
1761		LSN_NOT_LOGGED(new_lsn);
1762
1763	LSN(old_pagep) = new_lsn;	/* Structure assignment. */
1764
1765	P_INIT(old_pagep, dbp->pgsize, PGNO(old_pagep), PGNO_INVALID,
1766	    PGNO_INVALID, 0, P_HASH);
1767
1768	big_len = 0;
1769	big_buf = NULL;
1770	memset(&key, 0, sizeof(key));
1771	while (temp_pagep != NULL) {
1772		if ((ret = __ham_get_clist(dbp,
1773		    PGNO(temp_pagep), NDX_INVALID, &carray)) != 0)
1774			goto err;
1775
1776		for (n = 0; n < (db_indx_t)NUM_ENT(temp_pagep); n += 2) {
1777			if ((ret = __db_ret(dbp, dbc->thread_info,
1778			    dbc->txn, temp_pagep, H_KEYINDEX(n),
1779			    &key, &big_buf, &big_len)) != 0)
1780				goto err;
1781
1782			if (__ham_call_hash(dbc, key.data, key.size) == obucket)
1783				pp = &old_pagep;
1784			else
1785				pp = &new_pagep;
1786
1787			/*
1788			 * Figure out how many bytes we need on the new
1789			 * page to store the key/data pair.
1790			 */
1791			len = LEN_HITEM(dbp, temp_pagep, dbp->pgsize,
1792			    H_DATAINDEX(n)) +
1793			    LEN_HITEM(dbp, temp_pagep, dbp->pgsize,
1794			    H_KEYINDEX(n)) +
1795			    2 * sizeof(db_indx_t);
1796
1797			if (P_FREESPACE(dbp, *pp) < len) {
1798				if (DBC_LOGGING(dbc)) {
1799					page_dbt.size = dbp->pgsize;
1800					page_dbt.data = *pp;
1801					if ((ret = __ham_splitdata_log(dbp,
1802					    dbc->txn, &new_lsn, 0,
1803					    SPLITNEW, PGNO(*pp), &page_dbt,
1804					    &LSN(*pp))) != 0)
1805						goto err;
1806				} else
1807					LSN_NOT_LOGGED(new_lsn);
1808				LSN(*pp) = new_lsn;
1809				if ((ret =
1810				    __ham_add_ovflpage(dbc, *pp, 1, pp)) != 0)
1811					goto err;
1812			}
1813
1814			dest_indx = NDX_INVALID;
1815			if ((ret = __ham_copypair(dbc, temp_pagep,
1816			    H_KEYINDEX(n), *pp, &dest_indx)) != 0)
1817			    goto err;
1818
1819			/*
1820			 * Update any cursors that were pointing to items
1821			 * shuffled because of this insert.
1822			 * Use __hamc_update, since the cursor adjustments are
1823			 * the same as those required for an insert. The
1824			 * overhead of creating a cursor is worthwhile to save
1825			 * replicating the adjustment functionality.
1826			 * Adjusting shuffled cursors needs to be done prior to
1827			 * adjusting any cursors that were pointing to the
1828			 * moved item.
1829			 * All pages in a bucket are sorted, but the items are
1830			 * not sorted across pages within a bucket. This means
1831			 * that splitting the first page in a bucket into two
1832			 * new buckets won't require any cursor shuffling,
1833			 * since all inserts will be appends. Splitting of the
1834			 * second etc page from the initial bucket could
1835			 * cause an item to be inserted at any location on a
1836			 * page (since items already inserted from page 1 of
1837			 * the initial bucket may overlap), so only adjust
1838			 * cursors for the second etc pages within a bucket.
1839			 */
1840			if (PGNO(temp_pagep) != bucket_pgno) {
1841				if ((ret = __db_cursor_int(dbp,
1842				    dbc->thread_info, dbc->txn, dbp->type,
1843				    PGNO_INVALID, 0, DB_LOCK_INVALIDID,
1844				    &tmp_dbc)) != 0)
1845					goto err;
1846				hcp = (HASH_CURSOR*)tmp_dbc->internal;
1847				hcp->pgno = PGNO(*pp);
1848				hcp->indx = dest_indx;
1849				hcp->dup_off = 0;
1850				hcp->order = 0;
1851				if ((ret = __hamc_update(
1852				    tmp_dbc, len, DB_HAM_CURADJ_ADD, 0)) != 0)
1853					goto err;
1854				if ((ret = __dbc_close(tmp_dbc)) != 0)
1855					goto err;
1856			}
1857			/* Update any cursors pointing at the moved item. */
1858			if (carray != NULL) {
1859				found = 0;
1860				for (i = 0; carray[i] != NULL; i++) {
1861					cp =
1862					    (HASH_CURSOR *)carray[i]->internal;
1863					if (cp->pgno == PGNO(temp_pagep) &&
1864					    cp->indx == n) {
1865						cp->pgno = PGNO(*pp);
1866						cp->indx = dest_indx;
1867						found = 1;
1868					}
1869				}
1870				/*
1871				 * Only log the update once, since the recovery
1872				 * code iterates through all open cursors and
1873				 * applies the change to all matching cursors.
1874				 */
1875				if (found && DBC_LOGGING(dbc) &&
1876				    IS_SUBTRANSACTION(dbc->txn)) {
1877					if ((ret =
1878					    __ham_chgpg_log(dbp,
1879					    dbc->txn, &new_lsn, 0,
1880					    DB_HAM_SPLIT, PGNO(temp_pagep),
1881					    PGNO(*pp), n, dest_indx)) != 0)
1882						goto err;
1883				}
1884			}
1885		}
1886		next_pgno = NEXT_PGNO(temp_pagep);
1887
1888		/* Clear temp_page; if it's a link overflow page, free it. */
1889		if (PGNO(temp_pagep) != bucket_pgno && (ret =
1890		    __db_free(dbc, temp_pagep)) != 0) {
1891			temp_pagep = NULL;
1892			goto err;
1893		}
1894
1895		if (next_pgno == PGNO_INVALID)
1896			temp_pagep = NULL;
1897		else if ((ret = __memp_fget(mpf,
1898		    &next_pgno, dbc->thread_info, dbc->txn,
1899		    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &temp_pagep)) != 0)
1900			goto err;
1901
1902		if (temp_pagep != NULL) {
1903			if (DBC_LOGGING(dbc)) {
1904				page_dbt.size = dbp->pgsize;
1905				page_dbt.data = temp_pagep;
1906				if ((ret = __ham_splitdata_log(dbp,
1907				    dbc->txn, &new_lsn, 0,
1908				    SPLITOLD, PGNO(temp_pagep),
1909				    &page_dbt, &LSN(temp_pagep))) != 0)
1910					goto err;
1911			} else
1912				LSN_NOT_LOGGED(new_lsn);
1913			LSN(temp_pagep) = new_lsn;
1914		}
1915
1916		if (carray != NULL)	/* We never knew its size. */
1917			__os_free(env, carray);
1918		carray = NULL;
1919	}
1920	if (big_buf != NULL)
1921		__os_free(env, big_buf);
1922
1923	/*
1924	 * If the original bucket spanned multiple pages, then we've got
1925	 * a pointer to a page that used to be on the bucket chain.  It
1926	 * should be deleted.
1927	 */
1928	if (temp_pagep != NULL && PGNO(temp_pagep) != bucket_pgno &&
1929	    (ret = __db_free(dbc, temp_pagep)) != 0) {
1930		temp_pagep = NULL;
1931		goto err;
1932	}
1933
1934	/*
1935	 * Write new buckets out.
1936	 */
1937	if (DBC_LOGGING(dbc)) {
1938		page_dbt.size = dbp->pgsize;
1939		page_dbt.data = old_pagep;
1940		if ((ret = __ham_splitdata_log(dbp, dbc->txn,
1941		    &new_lsn, 0, SPLITNEW, PGNO(old_pagep), &page_dbt,
1942		    &LSN(old_pagep))) != 0)
1943			goto err;
1944		LSN(old_pagep) = new_lsn;
1945
1946		page_dbt.data = new_pagep;
1947		if ((ret = __ham_splitdata_log(dbp, dbc->txn, &new_lsn, 0,
1948		    SPLITNEW, PGNO(new_pagep), &page_dbt,
1949		    &LSN(new_pagep))) != 0)
1950			goto err;
1951		LSN(new_pagep) = new_lsn;
1952	} else {
1953		LSN_NOT_LOGGED(LSN(old_pagep));
1954		LSN_NOT_LOGGED(LSN(new_pagep));
1955	}
1956
1957	ret = __memp_fput(mpf, dbc->thread_info, old_pagep, dbc->priority);
1958	if ((t_ret = __memp_fput(mpf,
1959	    dbc->thread_info, new_pagep, dbc->priority)) != 0 && ret == 0)
1960		ret = t_ret;
1961
1962	if (0) {
1963err:		if (old_pagep != NULL)
1964			(void)__memp_fput(mpf,
1965			    dbc->thread_info, old_pagep, dbc->priority);
1966		if (new_pagep != NULL) {
1967			P_INIT(new_pagep, dbp->pgsize,
1968			     npgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH);
1969			(void)__memp_fput(mpf,
1970			    dbc->thread_info, new_pagep, dbc->priority);
1971		}
1972		if (temp_pagep != NULL && PGNO(temp_pagep) != bucket_pgno)
1973			(void)__memp_fput(mpf,
1974			    dbc->thread_info, temp_pagep, dbc->priority);
1975	}
1976	if ((t_ret = __TLPUT(dbc, block)) != 0 && ret == 0)
1977		ret = t_ret;
1978	if (carray != NULL)		/* We never knew its size. */
1979		__os_free(env, carray);
1980	return (ret);
1981}
1982
1983/*
1984 * Add the given pair to the page.  The page in question may already be
1985 * held (i.e. it was already gotten).  If it is, then the page is passed
1986 * in via the pagep parameter.  On return, pagep will contain the page
1987 * to which we just added something.  This allows us to link overflow
1988 * pages and return the new page having correctly put the last page.
1989 *
1990 * PUBLIC: int __ham_add_el __P((DBC *, const DBT *, const DBT *, int));
1991 */
1992int
1993__ham_add_el(dbc, key, val, type)
1994	DBC *dbc;
1995	const DBT *key, *val;
1996	int type;
1997{
1998	const DBT *pkey, *pdata;
1999	DB *dbp;
2000	DBT key_dbt, data_dbt;
2001	DB_LSN new_lsn;
2002	DB_MPOOLFILE *mpf;
2003	HASH_CURSOR *hcp;
2004	HOFFPAGE doff, koff;
2005	db_pgno_t next_pgno, pgno;
2006	u_int32_t data_size, key_size;
2007	u_int32_t pages, pagespace, pairsize, rectype;
2008	int do_expand, data_type, is_keybig, is_databig, key_type, match, ret;
2009
2010	dbp = dbc->dbp;
2011	mpf = dbp->mpf;
2012	hcp = (HASH_CURSOR *)dbc->internal;
2013	do_expand = 0;
2014
2015	pgno = hcp->seek_found_page != PGNO_INVALID ?
2016	    hcp->seek_found_page : hcp->pgno;
2017	if (hcp->page == NULL && (ret = __memp_fget(mpf, &pgno,
2018	    dbc->thread_info, dbc->txn, DB_MPOOL_CREATE, &hcp->page)) != 0)
2019		return (ret);
2020
2021	key_size = HKEYDATA_PSIZE(key->size);
2022	data_size = HKEYDATA_PSIZE(val->size);
2023	is_keybig = ISBIG(hcp, key->size);
2024	is_databig = ISBIG(hcp, val->size);
2025	if (is_keybig)
2026		key_size = HOFFPAGE_PSIZE;
2027	if (is_databig)
2028		data_size = HOFFPAGE_PSIZE;
2029
2030	pairsize = key_size + data_size;
2031
2032	/* Advance to first page in chain with room for item. */
2033	while (H_NUMPAIRS(hcp->page) && NEXT_PGNO(hcp->page) != PGNO_INVALID) {
2034		/*
2035		 * This may not be the end of the chain, but the pair may fit
2036		 * anyway.  Check if it's a bigpair that fits or a regular
2037		 * pair that fits.
2038		 */
2039		if (P_FREESPACE(dbp, hcp->page) >= pairsize)
2040			break;
2041		next_pgno = NEXT_PGNO(hcp->page);
2042		if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0)
2043			return (ret);
2044	}
2045
2046	/*
2047	 * Check if we need to allocate a new page.
2048	 */
2049	if (P_FREESPACE(dbp, hcp->page) < pairsize) {
2050		do_expand = 1;
2051		if ((ret = __memp_dirty(mpf, &hcp->page,
2052		    dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
2053			return (ret);
2054		if ((ret = __ham_add_ovflpage(dbc,
2055		    (PAGE *)hcp->page, 1, (PAGE **)&hcp->page)) != 0)
2056			return (ret);
2057		hcp->pgno = PGNO(hcp->page);
2058	}
2059
2060	/*
2061	 * If we don't have a transaction then make sure we will not
2062	 * run out of file space before updating the key or data.
2063	 */
2064	if (dbc->txn == NULL &&
2065	    dbp->mpf->mfp->maxpgno != 0 && (is_keybig || is_databig)) {
2066		pagespace = P_MAXSPACE(dbp, dbp->pgsize);
2067		pages = 0;
2068		if (is_databig)
2069			pages = ((data_size - 1) / pagespace) + 1;
2070		if (is_keybig) {
2071			pages += ((key->size - 1) / pagespace) + 1;
2072			if (pages >
2073			    (dbp->mpf->mfp->maxpgno - dbp->mpf->mfp->last_pgno))
2074				return (__db_space_err(dbp));
2075		}
2076	}
2077
2078	if ((ret = __memp_dirty(mpf,
2079	    &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
2080		return (ret);
2081
2082	/*
2083	 * Update cursor.
2084	 */
2085	hcp->indx = hcp->seek_found_indx;
2086	F_CLR(hcp, H_DELETED);
2087	if (is_keybig) {
2088		koff.type = H_OFFPAGE;
2089		UMRW_SET(koff.unused[0]);
2090		UMRW_SET(koff.unused[1]);
2091		UMRW_SET(koff.unused[2]);
2092		if ((ret = __db_poff(dbc, key, &koff.pgno)) != 0)
2093			return (ret);
2094		koff.tlen = key->size;
2095		key_dbt.data = &koff;
2096		key_dbt.size = sizeof(koff);
2097		pkey = &key_dbt;
2098		key_type = H_OFFPAGE;
2099	} else {
2100		pkey = key;
2101		key_type = H_KEYDATA;
2102	}
2103
2104	if (is_databig) {
2105		doff.type = H_OFFPAGE;
2106		UMRW_SET(doff.unused[0]);
2107		UMRW_SET(doff.unused[1]);
2108		UMRW_SET(doff.unused[2]);
2109		if ((ret = __db_poff(dbc, val, &doff.pgno)) != 0)
2110			return (ret);
2111		doff.tlen = val->size;
2112		data_dbt.data = &doff;
2113		data_dbt.size = sizeof(doff);
2114		pdata = &data_dbt;
2115		data_type = H_OFFPAGE;
2116	} else {
2117		pdata = val;
2118		data_type = type;
2119	}
2120
2121	/* Sort any unsorted pages before doing the insert. */
2122	if (((PAGE *)hcp->page)->type == P_HASH_UNSORTED)
2123		if ((ret = __ham_sort_page_cursor(dbc, hcp->page)) != 0)
2124			return (ret);
2125
2126	/*
2127	 * If inserting on the page found initially, then use the saved index.
2128	 * If inserting on a different page resolve the index now so it can be
2129	 * logged.
2130	 * The page might be different, if P_FREESPACE constraint failed (due
2131	 * to a partial put that increases the data size).
2132	 */
2133	if (PGNO(hcp->page) != hcp->seek_found_page) {
2134	    if ((ret = __ham_getindex(dbc, hcp->page, pkey,
2135		    key_type, &match, &hcp->seek_found_indx)) != 0)
2136			return (ret);
2137	    hcp->seek_found_page = PGNO(hcp->page);
2138
2139	    DB_ASSERT(dbp->env, hcp->seek_found_indx <= NUM_ENT(hcp->page));
2140	}
2141
2142	if (DBC_LOGGING(dbc)) {
2143		rectype = PUTPAIR;
2144		if (is_databig)
2145			rectype |= PAIR_DATAMASK;
2146		if (is_keybig)
2147			rectype |= PAIR_KEYMASK;
2148		if (type == H_DUPLICATE)
2149			rectype |= PAIR_DUPMASK;
2150
2151		if ((ret = __ham_insdel_log(dbp, dbc->txn, &new_lsn, 0,
2152		    rectype, PGNO(hcp->page), (u_int32_t)hcp->seek_found_indx,
2153		    &LSN(hcp->page), pkey, pdata)) != 0)
2154			return (ret);
2155	} else
2156		LSN_NOT_LOGGED(new_lsn);
2157
2158	/* Move lsn onto page. */
2159	LSN(hcp->page) = new_lsn;	/* Structure assignment. */
2160
2161	if ((ret = __ham_insertpair(dbc, hcp->page,
2162	    &hcp->seek_found_indx, pkey, pdata, key_type, data_type)) != 0)
2163		return (ret);
2164
2165	/*
2166	 * Adjust any cursors that were pointing at items whose indices were
2167	 * shuffled due to the insert.
2168	 */
2169	if ((ret = __hamc_update(dbc, pairsize, DB_HAM_CURADJ_ADD, 0)) != 0)
2170		return (ret);
2171
2172	/*
2173	 * For splits, we are going to update item_info's page number
2174	 * field, so that we can easily return to the same page the
2175	 * next time we come in here.  For other operations, this doesn't
2176	 * matter, since this is the last thing that happens before we return
2177	 * to the user program.
2178	 */
2179	hcp->pgno = PGNO(hcp->page);
2180	/*
2181	 * When moving an item from one page in a bucket to another, due to an
2182	 * expanding on page duplicate set, or a partial put that increases the
2183	 * size of an item. The destination index needs to be saved so that the
2184	 * __ham_replpair code can update any cursors impacted by the move. For
2185	 * other operations, this does not matter, since this is the last thing
2186	 * that happens before we return to the user program.
2187	 */
2188	hcp->indx = hcp->seek_found_indx;
2189
2190	/*
2191	 * XXX
2192	 * Maybe keep incremental numbers here.
2193	 */
2194	if (!STD_LOCKING(dbc)) {
2195		if ((ret = __ham_dirty_meta(dbc, 0)) != 0)
2196			return (ret);
2197		hcp->hdr->nelem++;
2198	}
2199
2200	if (do_expand || (hcp->hdr->ffactor != 0 &&
2201	    (u_int32_t)H_NUMPAIRS(hcp->page) > hcp->hdr->ffactor))
2202		F_SET(hcp, H_EXPAND);
2203	return (0);
2204}
2205
2206/*
2207 * Special insert pair call -- copies a key/data pair from one page to
2208 * another.  Works for all types of hash entries (H_OFFPAGE, H_KEYDATA,
2209 * H_DUPLICATE, H_OFFDUP).  Since we log splits at a high level, we
2210 * do not need to do any logging here.
2211 *
2212 * dest_indx is an optional parameter, it serves several purposes:
2213 * * ignored if NULL
2214 * * Used as an insert index if non-null and not NDX_INVALID
2215 * * Populated with the insert index if non-null and NDX_INVALID
2216 *
2217 * PUBLIC: int __ham_copypair __P((DBC *, PAGE *, u_int32_t,
2218 * PUBLIC:     PAGE *, db_indx_t *));
2219 */
2220int
2221__ham_copypair(dbc, src_page, src_ndx, dest_page, dest_indx)
2222	DBC *dbc;
2223	PAGE *src_page;
2224	u_int32_t src_ndx;
2225	PAGE *dest_page;
2226	db_indx_t *dest_indx;
2227{
2228	DB *dbp;
2229	DBT tkey, tdata;
2230	db_indx_t kindx, dindx;
2231	int ktype, dtype, ret;
2232
2233	dbp = dbc->dbp;
2234	ret = 0;
2235	memset(&tkey, 0, sizeof(tkey));
2236	memset(&tdata, 0, sizeof(tdata));
2237
2238	ktype = HPAGE_TYPE(dbp, src_page, H_KEYINDEX(src_ndx));
2239	dtype = HPAGE_TYPE(dbp, src_page, H_DATAINDEX(src_ndx));
2240	kindx = H_KEYINDEX(src_ndx);
2241	dindx = H_DATAINDEX(src_ndx);
2242	if (ktype == H_OFFPAGE) {
2243		tkey.data = P_ENTRY(dbp, src_page, kindx);
2244		tkey.size = LEN_HITEM(dbp, src_page, dbp->pgsize, kindx);
2245	} else {
2246		tkey.data = HKEYDATA_DATA(P_ENTRY(dbp, src_page, kindx));
2247		tkey.size = LEN_HKEYDATA(dbp, src_page, dbp->pgsize, kindx);
2248	}
2249	if (dtype == H_OFFPAGE) {
2250		tdata.data = P_ENTRY(dbp, src_page, dindx);
2251		tdata.size = LEN_HITEM(dbp, src_page, dbp->pgsize, dindx);
2252	} else {
2253		tdata.data = HKEYDATA_DATA(P_ENTRY(dbp, src_page, dindx));
2254		tdata.size = LEN_HKEYDATA(dbp, src_page, dbp->pgsize, dindx);
2255	}
2256	if ((ret = __ham_insertpair(dbc, dest_page, dest_indx,
2257	    &tkey, &tdata, ktype, dtype)) != 0)
2258	    return (ret);
2259
2260	return (ret);
2261}
2262
2263/*
2264 * __ham_add_ovflpage --
2265 *
2266 * Returns:
2267 *	0 on success: pp points to new page; !0 on error, pp not valid.
2268 *
2269 * PUBLIC: int __ham_add_ovflpage __P((DBC *, PAGE *, int, PAGE **));
2270 */
2271int
2272__ham_add_ovflpage(dbc, pagep, release, pp)
2273	DBC *dbc;
2274	PAGE *pagep;
2275	int release;
2276	PAGE **pp;
2277{
2278	DB *dbp;
2279	DB_LSN new_lsn;
2280	DB_MPOOLFILE *mpf;
2281	PAGE *new_pagep;
2282	int ret;
2283
2284	dbp = dbc->dbp;
2285	mpf = dbp->mpf;
2286
2287	DB_ASSERT(dbp->env, IS_DIRTY(pagep));
2288
2289	if ((ret = __db_new(dbc, P_HASH, &new_pagep)) != 0)
2290		return (ret);
2291
2292	if (DBC_LOGGING(dbc)) {
2293		if ((ret = __ham_newpage_log(dbp, dbc->txn, &new_lsn, 0,
2294		    PUTOVFL, PGNO(pagep), &LSN(pagep), PGNO(new_pagep),
2295		    &LSN(new_pagep), PGNO_INVALID, NULL)) != 0) {
2296			(void)__memp_fput(mpf,
2297			    dbc->thread_info, pagep, dbc->priority);
2298			return (ret);
2299		}
2300	} else
2301		LSN_NOT_LOGGED(new_lsn);
2302
2303	/* Move lsn onto page. */
2304	LSN(pagep) = LSN(new_pagep) = new_lsn;
2305	NEXT_PGNO(pagep) = PGNO(new_pagep);
2306
2307	PREV_PGNO(new_pagep) = PGNO(pagep);
2308
2309	if (release)
2310		ret = __memp_fput(mpf, dbc->thread_info, pagep, dbc->priority);
2311
2312	*pp = new_pagep;
2313	return (ret);
2314}
2315
2316/*
2317 * PUBLIC: int __ham_get_cpage __P((DBC *, db_lockmode_t));
2318 */
2319int
2320__ham_get_cpage(dbc, mode)
2321	DBC *dbc;
2322	db_lockmode_t mode;
2323{
2324	DB *dbp;
2325	DB_LOCK tmp_lock;
2326	DB_MPOOLFILE *mpf;
2327	HASH_CURSOR *hcp;
2328	int ret;
2329
2330	dbp = dbc->dbp;
2331	mpf = dbp->mpf;
2332	hcp = (HASH_CURSOR *)dbc->internal;
2333	ret = 0;
2334
2335	/*
2336	 * There are four cases with respect to buckets and locks.
2337	 * 1. If there is no lock held, then if we are locking, we should
2338	 *    get the lock.
2339	 * 2. If there is a lock held, it's for the current bucket, and it's
2340	 *    for the right mode, we don't need to do anything.
2341	 * 3. If there is a lock held for the current bucket but it's not
2342	 *    strong enough, we need to upgrade.
2343	 * 4. If there is a lock, but it's for a different bucket, then we need
2344	 *    to release the existing lock and get a new lock.
2345	 */
2346	LOCK_INIT(tmp_lock);
2347	if (STD_LOCKING(dbc)) {
2348		if (hcp->lbucket != hcp->bucket) {	/* Case 4 */
2349			if ((ret = __TLPUT(dbc, hcp->lock)) != 0)
2350				return (ret);
2351			LOCK_INIT(hcp->lock);
2352		}
2353
2354		/*
2355		 * See if we have the right lock.  If we are doing
2356		 * dirty reads we assume the write lock has been downgraded.
2357		 */
2358		if ((LOCK_ISSET(hcp->lock) &&
2359		    ((hcp->lock_mode == DB_LOCK_READ ||
2360		    F_ISSET(dbp, DB_AM_READ_UNCOMMITTED)) &&
2361		    mode == DB_LOCK_WRITE))) {
2362			/* Case 3. */
2363			tmp_lock = hcp->lock;
2364			LOCK_INIT(hcp->lock);
2365		}
2366
2367		/* Acquire the lock. */
2368		if (!LOCK_ISSET(hcp->lock))
2369			/* Cases 1, 3, and 4. */
2370			if ((ret = __ham_lock_bucket(dbc, mode)) != 0)
2371				return (ret);
2372
2373		if (ret == 0) {
2374			hcp->lock_mode = mode;
2375			hcp->lbucket = hcp->bucket;
2376			/* Case 3: release the original lock. */
2377			if ((ret = __ENV_LPUT(dbp->env, tmp_lock)) != 0)
2378				return (ret);
2379		} else if (LOCK_ISSET(tmp_lock))
2380			hcp->lock = tmp_lock;
2381	}
2382
2383	if (ret == 0 && hcp->page == NULL) {
2384		if (hcp->pgno == PGNO_INVALID)
2385			hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
2386		if ((ret = __memp_fget(mpf,
2387		    &hcp->pgno, dbc->thread_info, dbc->txn,
2388		    DB_MPOOL_CREATE, &hcp->page)) != 0)
2389			return (ret);
2390	}
2391	return (0);
2392}
2393
2394/*
2395 * Get a new page at the cursor, putting the last page if necessary.
2396 * If the flag is set to H_ISDUP, then we are talking about the
2397 * duplicate page, not the main page.
2398 *
2399 * PUBLIC: int __ham_next_cpage __P((DBC *, db_pgno_t));
2400 */
2401int
2402__ham_next_cpage(dbc, pgno)
2403	DBC *dbc;
2404	db_pgno_t pgno;
2405{
2406	DB *dbp;
2407	DB_MPOOLFILE *mpf;
2408	HASH_CURSOR *hcp;
2409	PAGE *p;
2410	int ret;
2411
2412	dbp = dbc->dbp;
2413	mpf = dbp->mpf;
2414	hcp = (HASH_CURSOR *)dbc->internal;
2415
2416	if (hcp->page != NULL && (ret = __memp_fput(mpf,
2417	    dbc->thread_info, hcp->page, dbc->priority)) != 0)
2418		return (ret);
2419	hcp->page = NULL;
2420
2421	if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info, dbc->txn,
2422	    DB_MPOOL_CREATE, &p)) != 0)
2423		return (ret);
2424
2425	hcp->page = p;
2426	hcp->pgno = pgno;
2427	hcp->indx = 0;
2428
2429	return (0);
2430}
2431
2432/*
2433 * __ham_lock_bucket --
2434 *	Get the lock on a particular bucket.
2435 *
2436 * PUBLIC: int __ham_lock_bucket __P((DBC *, db_lockmode_t));
2437 */
2438int
2439__ham_lock_bucket(dbc, mode)
2440	DBC *dbc;
2441	db_lockmode_t mode;
2442{
2443	HASH_CURSOR *hcp;
2444	db_pgno_t pgno;
2445	int gotmeta, ret;
2446
2447	hcp = (HASH_CURSOR *)dbc->internal;
2448	gotmeta = hcp->hdr == NULL ? 1 : 0;
2449	if (gotmeta)
2450		if ((ret = __ham_get_meta(dbc)) != 0)
2451			return (ret);
2452	pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
2453	if (gotmeta)
2454		if ((ret = __ham_release_meta(dbc)) != 0)
2455			return (ret);
2456
2457	ret = __db_lget(dbc, 0, pgno, mode, 0, &hcp->lock);
2458
2459	hcp->lock_mode = mode;
2460	return (ret);
2461}
2462
2463/*
2464 * __ham_dpair --
2465 *	Delete a pair on a page, paying no attention to what the pair
2466 *	represents.  The caller is responsible for freeing up duplicates
2467 *	or offpage entries that might be referenced by this pair.
2468 *
2469 *	Recovery assumes that this may be called without the metadata
2470 *	page pinned.
2471 *
2472 * PUBLIC: void __ham_dpair __P((DB *, PAGE *, u_int32_t));
2473 */
2474void
2475__ham_dpair(dbp, p, indx)
2476	DB *dbp;
2477	PAGE *p;
2478	u_int32_t indx;
2479{
2480	db_indx_t delta, n, *inp;
2481	u_int8_t *dest, *src;
2482
2483	inp = P_INP(dbp, p);
2484	/*
2485	 * Compute "delta", the amount we have to shift all of the
2486	 * offsets.  To find the delta, we just need to calculate
2487	 * the size of the pair of elements we are removing.
2488	 */
2489	delta = H_PAIRSIZE(dbp, p, dbp->pgsize, indx);
2490
2491	/*
2492	 * The hard case: we want to remove something other than
2493	 * the last item on the page.  We need to shift data and
2494	 * offsets down.
2495	 */
2496	if ((db_indx_t)indx != NUM_ENT(p) - 2) {
2497		/*
2498		 * Move the data: src is the first occupied byte on
2499		 * the page. (Length is delta.)
2500		 */
2501		src = (u_int8_t *)p + HOFFSET(p);
2502
2503		/*
2504		 * Destination is delta bytes beyond src.  This might
2505		 * be an overlapping copy, so we have to use memmove.
2506		 */
2507		dest = src + delta;
2508		memmove(dest, src, inp[H_DATAINDEX(indx)] - HOFFSET(p));
2509	}
2510
2511	/* Adjust page metadata. */
2512	HOFFSET(p) = HOFFSET(p) + delta;
2513	NUM_ENT(p) = NUM_ENT(p) - 2;
2514
2515	/* Adjust the offsets. */
2516	for (n = (db_indx_t)indx; n < (db_indx_t)(NUM_ENT(p)); n++)
2517		inp[n] = inp[n + 2] + delta;
2518
2519}
2520
2521/*
2522 * __hamc_delpg --
2523 *
2524 * Adjust the cursors after we've emptied a page in a bucket, taking
2525 * care that when we move cursors pointing to deleted items, their
2526 * orders don't collide with the orders of cursors on the page we move
2527 * them to (since after this function is called, cursors with the same
2528 * index on the two pages will be otherwise indistinguishable--they'll
2529 * all have pgno new_pgno).  There are three cases:
2530 *
2531 *	1) The emptied page is the first page in the bucket.  In this
2532 *	case, we've copied all the items from the second page into the
2533 *	first page, so the first page is new_pgno and the second page is
2534 *	old_pgno.  new_pgno is empty, but can have deleted cursors
2535 *	pointing at indx 0, so we need to be careful of the orders
2536 *	there.  This is DB_HAM_DELFIRSTPG.
2537 *
2538 *	2) The page is somewhere in the middle of a bucket.  Our caller
2539 *	can just delete such a page, so it's old_pgno.  old_pgno is
2540 *	empty, but may have deleted cursors pointing at indx 0, so we
2541 *	need to be careful of indx 0 when we move those cursors to
2542 *	new_pgno.  This is DB_HAM_DELMIDPG.
2543 *
2544 *	3) The page is the last in a bucket.  Again the empty page is
2545 *	old_pgno, and again it should only have cursors that are deleted
2546 *	and at indx == 0.  This time, though, there's no next page to
2547 *	move them to, so we set them to indx == num_ent on the previous
2548 *	page--and indx == num_ent is the index whose cursors we need to
2549 *	be careful of.  This is DB_HAM_DELLASTPG.
2550 */
2551static int
2552__hamc_delpg(dbc, old_pgno, new_pgno, num_ent, op, orderp)
2553	DBC *dbc;
2554	db_pgno_t old_pgno, new_pgno;
2555	u_int32_t num_ent;
2556	db_ham_mode op;
2557	u_int32_t *orderp;
2558{
2559	DB *dbp, *ldbp;
2560	DBC *cp;
2561	DB_LSN lsn;
2562	DB_TXN *my_txn;
2563	ENV *env;
2564	HASH_CURSOR *hcp;
2565	db_indx_t indx;
2566	u_int32_t order;
2567	int found, ret;
2568
2569	/* Which is the worrisome index? */
2570	indx = (op == DB_HAM_DELLASTPG) ? num_ent : 0;
2571
2572	dbp = dbc->dbp;
2573	env = dbp->env;
2574
2575	my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL;
2576	MUTEX_LOCK(env, env->mtx_dblist);
2577	/*
2578	 * Find the highest order of any cursor our movement
2579	 * may collide with.
2580	 */
2581	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
2582	for (order = 1;
2583	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
2584	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
2585		MUTEX_LOCK(env, dbp->mutex);
2586		TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
2587			if (cp == dbc || cp->dbtype != DB_HASH)
2588				continue;
2589			hcp = (HASH_CURSOR *)cp->internal;
2590			if (hcp->pgno == new_pgno &&
2591			    !MVCC_SKIP_CURADJ(cp, new_pgno)) {
2592				if (hcp->indx == indx &&
2593				    F_ISSET(hcp, H_DELETED) &&
2594				    hcp->order >= order)
2595					order = hcp->order + 1;
2596				DB_ASSERT(env, op != DB_HAM_DELFIRSTPG ||
2597				    hcp->indx == NDX_INVALID ||
2598				    (hcp->indx == 0 &&
2599				    F_ISSET(hcp, H_DELETED)));
2600			}
2601		}
2602		MUTEX_UNLOCK(env, dbp->mutex);
2603	}
2604
2605	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
2606	for (found = 0;
2607	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
2608	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
2609		MUTEX_LOCK(env, dbp->mutex);
2610		TAILQ_FOREACH(cp, &ldbp->active_queue, links) {
2611			if (cp == dbc || cp->dbtype != DB_HASH)
2612				continue;
2613
2614			hcp = (HASH_CURSOR *)cp->internal;
2615
2616			if (hcp->pgno == old_pgno &&
2617			    !MVCC_SKIP_CURADJ(cp, old_pgno)) {
2618				switch (op) {
2619				case DB_HAM_DELFIRSTPG:
2620					/*
2621					 * We're moving all items,
2622					 * regardless of index.
2623					 */
2624					hcp->pgno = new_pgno;
2625
2626					/*
2627					 * But we have to be careful of
2628					 * the order values.
2629					 */
2630					if (hcp->indx == indx)
2631						hcp->order += order;
2632					break;
2633				case DB_HAM_DELMIDPG:
2634					hcp->pgno = new_pgno;
2635					DB_ASSERT(env, hcp->indx == 0 &&
2636					    F_ISSET(hcp, H_DELETED));
2637					hcp->order += order;
2638					break;
2639				case DB_HAM_DELLASTPG:
2640					hcp->pgno = new_pgno;
2641					DB_ASSERT(env, hcp->indx == 0 &&
2642					    F_ISSET(hcp, H_DELETED));
2643					hcp->indx = indx;
2644					hcp->order += order;
2645					break;
2646				default:
2647					return (__db_unknown_path(
2648					    env, "__hamc_delpg"));
2649				}
2650				if (my_txn != NULL && cp->txn != my_txn)
2651					found = 1;
2652			}
2653		}
2654		MUTEX_UNLOCK(env, dbp->mutex);
2655	}
2656	MUTEX_UNLOCK(env, env->mtx_dblist);
2657
2658	if (found != 0 && DBC_LOGGING(dbc)) {
2659		if ((ret = __ham_chgpg_log(dbp, my_txn, &lsn, 0, op,
2660		    old_pgno, new_pgno, indx, order)) != 0)
2661			return (ret);
2662	}
2663	*orderp = order;
2664	return (0);
2665}
2666