1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1997,2008 Oracle.  All rights reserved.
5 *
6 * $Id: bt_recno.c,v 12.40 2008/02/12 16:42:54 bschmeck Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/lock.h"
15#include "dbinc/mp.h"
16
17static int  __ram_add __P((DBC *, db_recno_t *, DBT *, u_int32_t, u_int32_t));
18static int  __ram_source __P((DB *));
19static int  __ram_sread __P((DBC *, db_recno_t));
20static int  __ram_update __P((DBC *, db_recno_t, int));
21
22/*
23 * In recno, there are two meanings to the on-page "deleted" flag.  If we're
24 * re-numbering records, it means the record was implicitly created.  We skip
25 * over implicitly created records if doing a cursor "next" or "prev", and
26 * return DB_KEYEMPTY if they're explicitly requested..  If not re-numbering
27 * records, it means that the record was implicitly created, or was deleted.
28 * We skip over implicitly created or deleted records if doing a cursor "next"
29 * or "prev", and return DB_KEYEMPTY if they're explicitly requested.
30 *
31 * If we're re-numbering records, then we have to detect in the cursor that
32 * a record was deleted, and adjust the cursor as necessary on the next get.
33 * If we're not re-numbering records, then we can detect that a record has
34 * been deleted by looking at the actual on-page record, so we completely
35 * ignore the cursor's delete flag.  This is different from the B+tree code.
36 * It also maintains whether the cursor references a deleted record in the
37 * cursor, and it doesn't always check the on-page value.
38 */
39#define	CD_SET(cp) {							\
40	if (F_ISSET(cp, C_RENUMBER))					\
41		F_SET(cp, C_DELETED);					\
42}
43#define	CD_CLR(cp) {							\
44	if (F_ISSET(cp, C_RENUMBER)) {					\
45		F_CLR(cp, C_DELETED);					\
46		cp->order = INVALID_ORDER;				\
47	}								\
48}
49#define	CD_ISSET(cp)							\
50	(F_ISSET(cp, C_RENUMBER) && F_ISSET(cp, C_DELETED) ? 1 : 0)
51
52/*
53 * Macros for comparing the ordering of two cursors.
54 * cp1 comes before cp2 iff one of the following holds:
55 *	cp1's recno is less than cp2's recno
56 *	recnos are equal, both deleted, and cp1's order is less than cp2's
57 *	recnos are equal, cp1 deleted, and cp2 not deleted
58 */
59#define	C_LESSTHAN(cp1, cp2)						\
60    (((cp1)->recno < (cp2)->recno) ||					\
61    (((cp1)->recno == (cp2)->recno) &&					\
62    ((CD_ISSET((cp1)) && CD_ISSET((cp2)) && (cp1)->order < (cp2)->order) || \
63    (CD_ISSET((cp1)) && !CD_ISSET((cp2))))))
64
65/*
66 * cp1 is equal to cp2 iff their recnos and delete flags are identical,
67 * and if the delete flag is set their orders are also identical.
68 */
69#define	C_EQUAL(cp1, cp2)						\
70    ((cp1)->recno == (cp2)->recno && CD_ISSET((cp1)) == CD_ISSET((cp2)) && \
71    (!CD_ISSET((cp1)) || (cp1)->order == (cp2)->order))
72
73/*
74 * Do we need to log the current cursor adjustment?
75 */
76#define	CURADJ_LOG(dbc)							\
77	(DBC_LOGGING((dbc)) && (dbc)->txn != NULL && (dbc)->txn->parent != NULL)
78
79/*
80 * After a search, copy the found page into the cursor, discarding any
81 * currently held lock.
82 */
83#define	STACK_TO_CURSOR(cp, ret) {					\
84	int __t_ret;							\
85	(cp)->page = (cp)->csp->page;					\
86	(cp)->pgno = (cp)->csp->page->pgno;				\
87	(cp)->indx = (cp)->csp->indx;					\
88	if ((__t_ret = __TLPUT(dbc, (cp)->lock)) != 0 && (ret) == 0)	\
89		ret = __t_ret;						\
90	(cp)->lock = (cp)->csp->lock;					\
91	(cp)->lock_mode = (cp)->csp->lock_mode;				\
92}
93
94/*
95 * __ram_open --
96 *	Recno open function.
97 *
98 * PUBLIC: int __ram_open __P((DB *, DB_THREAD_INFO *,
99 * PUBLIC:      DB_TXN *, const char *, db_pgno_t, u_int32_t));
100 */
101int
102__ram_open(dbp, ip, txn, name, base_pgno, flags)
103	DB *dbp;
104	DB_THREAD_INFO *ip;
105	DB_TXN *txn;
106	const char *name;
107	db_pgno_t base_pgno;
108	u_int32_t flags;
109{
110	BTREE *t;
111	DBC *dbc;
112	int ret, t_ret;
113
114	COMPQUIET(name, NULL);
115	t = dbp->bt_internal;
116
117	/* Start up the tree. */
118	if ((ret = __bam_read_root(dbp, ip, txn, base_pgno, flags)) != 0)
119		return (ret);
120
121	/*
122	 * If the user specified a source tree, open it and map it in.
123	 *
124	 * !!!
125	 * We don't complain if the user specified transactions or threads.
126	 * It's possible to make it work, but you'd better know what you're
127	 * doing!
128	 */
129	if (t->re_source != NULL && (ret = __ram_source(dbp)) != 0)
130		return (ret);
131
132	/* If we're snapshotting an underlying source file, do it now. */
133	if (F_ISSET(dbp, DB_AM_SNAPSHOT)) {
134		/* Allocate a cursor. */
135		if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
136			return (ret);
137
138		/* Do the snapshot. */
139		if ((ret = __ram_update(dbc,
140		    DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND)
141			ret = 0;
142
143		/* Discard the cursor. */
144		if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
145			ret = t_ret;
146	}
147
148	return (ret);
149}
150
151/*
152 * __ram_append --
153 *	Recno append function.
154 *
155 * PUBLIC: int __ram_append __P((DBC *, DBT *, DBT *));
156 */
157int
158__ram_append(dbc, key, data)
159	DBC *dbc;
160	DBT *key, *data;
161{
162	BTREE_CURSOR *cp;
163	int ret;
164
165	cp = (BTREE_CURSOR *)dbc->internal;
166
167	/*
168	 * Make sure we've read in all of the backing source file.  If
169	 * we found the record or it simply didn't exist, add the
170	 * user's record.
171	 */
172	ret = __ram_update(dbc, DB_MAX_RECORDS, 0);
173	if (ret == 0 || ret == DB_NOTFOUND)
174		ret = __ram_add(dbc, &cp->recno, data, DB_APPEND, 0);
175
176	/* Return the record number. */
177	if (ret == 0 && key != NULL)
178		ret = __db_retcopy(dbc->env, key, &cp->recno,
179		    sizeof(cp->recno), &dbc->rkey->data, &dbc->rkey->ulen);
180
181	return (ret);
182}
183
184/*
185 * __ramc_del --
186 *	Recno DBC->del function.
187 *
188 * PUBLIC: int __ramc_del __P((DBC *));
189 */
190int
191__ramc_del(dbc)
192	DBC *dbc;
193{
194	BKEYDATA bk;
195	BTREE *t;
196	BTREE_CURSOR *cp;
197	DB *dbp;
198	DBT hdr, data;
199	DB_LSN lsn;
200	int exact, nc, ret, stack, t_ret;
201
202	dbp = dbc->dbp;
203	cp = (BTREE_CURSOR *)dbc->internal;
204	t = dbp->bt_internal;
205	stack = 0;
206
207	/*
208	 * The semantics of cursors during delete are as follows: in
209	 * non-renumbering recnos, records are replaced with a marker
210	 * containing a delete flag.  If the record referenced by this cursor
211	 * has already been deleted, we will detect that as part of the delete
212	 * operation, and fail.
213	 *
214	 * In renumbering recnos, cursors which represent deleted items
215	 * are flagged with the C_DELETED flag, and it is an error to
216	 * call c_del a second time without an intervening cursor motion.
217	 */
218	if (CD_ISSET(cp))
219		return (DB_KEYEMPTY);
220
221	/* Search the tree for the key; delete only deletes exact matches. */
222	if ((ret = __bam_rsearch(dbc, &cp->recno, SR_DELETE, 1, &exact)) != 0)
223		goto err;
224	if (!exact) {
225		ret = DB_NOTFOUND;
226		goto err;
227	}
228	stack = 1;
229
230	/* Copy the page into the cursor. */
231	if ((ret = __memp_dirty(dbp->mpf,
232	    &cp->csp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
233		goto err;
234	STACK_TO_CURSOR(cp, ret);
235	if (ret != 0)
236		goto err;
237
238	/*
239	 * If re-numbering records, the on-page deleted flag can only mean
240	 * that this record was implicitly created.  Applications aren't
241	 * permitted to delete records they never created, return an error.
242	 *
243	 * If not re-numbering records, the on-page deleted flag means that
244	 * this record was implicitly created, or, was deleted at some time.
245	 * The former is an error because applications aren't permitted to
246	 * delete records they never created, the latter is an error because
247	 * if the record was "deleted", we could never have found it.
248	 */
249	if (B_DISSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type)) {
250		ret = DB_KEYEMPTY;
251		goto err;
252	}
253
254	if (F_ISSET(cp, C_RENUMBER)) {
255		/* Delete the item, adjust the counts, adjust the cursors. */
256		if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
257			goto err;
258		if ((ret = __bam_adjust(dbc, -1)) != 0)
259			goto err;
260		if ((ret = __ram_ca(dbc, CA_DELETE, &nc)) != 0)
261			goto err;
262		if (nc > 0 &&
263		    CURADJ_LOG(dbc) && (ret = __bam_rcuradj_log(dbp, dbc->txn,
264		    &lsn, 0, CA_DELETE, cp->root, cp->recno, cp->order)) != 0)
265			goto err;
266
267		/*
268		 * If the page is empty, delete it.
269		 *
270		 * We never delete a root page.  First, root pages of primary
271		 * databases never go away, recno or otherwise.  However, if
272		 * it's the root page of an off-page duplicates database, then
273		 * it can be deleted.   We don't delete it here because we have
274		 * no way of telling the primary database page holder (e.g.,
275		 * the hash access method) that its page element should cleaned
276		 * up because the underlying tree is gone.  So, we keep the page
277		 * around until the last cursor referencing the empty tree is
278		 * are closed, and then clean it up.
279		 */
280		if (NUM_ENT(cp->page) == 0 && PGNO(cp->page) != cp->root) {
281			/*
282			 * We want to delete a single item out of the last page
283			 * that we're not deleting.
284			 */
285			ret = __bam_dpages(dbc, 0, 0);
286
287			/*
288			 * Regardless of the return from __bam_dpages, it will
289			 * discard our stack and pinned page.
290			 */
291			stack = 0;
292			cp->page = NULL;
293		}
294	} else {
295		/* Use a delete/put pair to replace the record with a marker. */
296		if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0)
297			goto err;
298
299		B_TSET_DELETED(bk.type, B_KEYDATA);
300		bk.len = 0;
301		DB_INIT_DBT(hdr, &bk, SSZA(BKEYDATA, data));
302		DB_INIT_DBT(data, "", 0);
303		if ((ret = __db_pitem(dbc,
304		    cp->page, cp->indx, BKEYDATA_SIZE(0), &hdr, &data)) != 0)
305			goto err;
306	}
307
308	t->re_modified = 1;
309
310err:	if (stack && (t_ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0 && ret == 0)
311		ret = t_ret;
312
313	return (ret);
314}
315
316/*
317 * __ramc_get --
318 *	Recno DBC->get function.
319 *
320 * PUBLIC: int __ramc_get
321 * PUBLIC:     __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
322 */
323int
324__ramc_get(dbc, key, data, flags, pgnop)
325	DBC *dbc;
326	DBT *key, *data;
327	u_int32_t flags;
328	db_pgno_t *pgnop;
329{
330	BTREE_CURSOR *cp;
331	DB *dbp;
332	int cmp, exact, ret;
333
334	COMPQUIET(pgnop, NULL);
335
336	dbp = dbc->dbp;
337	cp = (BTREE_CURSOR *)dbc->internal;
338
339	LF_CLR(DB_MULTIPLE|DB_MULTIPLE_KEY);
340retry:	switch (flags) {
341	case DB_CURRENT:
342		/*
343		 * If we're using mutable records and the deleted flag is
344		 * set, the cursor is pointing at a nonexistent record;
345		 * return an error.
346		 */
347		if (CD_ISSET(cp))
348			return (DB_KEYEMPTY);
349		break;
350	case DB_NEXT_DUP:
351		/*
352		 * If we're not in an off-page dup set, we know there's no
353		 * next duplicate since recnos don't have them.  If we
354		 * are in an off-page dup set, the next item assuredly is
355		 * a dup, so we set flags to DB_NEXT and keep going.
356		 */
357		if (!F_ISSET(dbc, DBC_OPD))
358			return (DB_NOTFOUND);
359		/* FALLTHROUGH */
360	case DB_NEXT_NODUP:
361		/*
362		 * Recno databases don't have duplicates, set flags to DB_NEXT
363		 * and keep going.
364		 */
365		/* FALLTHROUGH */
366	case DB_NEXT:
367		flags = DB_NEXT;
368		/*
369		 * If record numbers are mutable: if we just deleted a record,
370		 * we have to avoid incrementing the record number so that we
371		 * return the right record by virtue of renumbering the tree.
372		 */
373		if (CD_ISSET(cp)) {
374			/*
375			 * Clear the flag, we've moved off the deleted record.
376			 */
377			CD_CLR(cp);
378			break;
379		}
380
381		if (cp->recno != RECNO_OOB) {
382			++cp->recno;
383			break;
384		}
385		/* FALLTHROUGH */
386	case DB_FIRST:
387		flags = DB_NEXT;
388		cp->recno = 1;
389		break;
390	case DB_PREV_DUP:
391		/*
392		 * If we're not in an off-page dup set, we know there's no
393		 * previous duplicate since recnos don't have them.  If we
394		 * are in an off-page dup set, the previous item assuredly
395		 * is a dup, so we set flags to DB_PREV and keep going.
396		 */
397		if (!F_ISSET(dbc, DBC_OPD))
398			return (DB_NOTFOUND);
399		/* FALLTHROUGH */
400	case DB_PREV_NODUP:
401		/*
402		 * Recno databases don't have duplicates, set flags to DB_PREV
403		 * and keep going.
404		 */
405		/* FALLTHROUGH */
406	case DB_PREV:
407		flags = DB_PREV;
408		if (cp->recno != RECNO_OOB) {
409			if (cp->recno == 1) {
410				ret = DB_NOTFOUND;
411				goto err;
412			}
413			--cp->recno;
414			break;
415		}
416		/* FALLTHROUGH */
417	case DB_LAST:
418		flags = DB_PREV;
419		if (((ret = __ram_update(dbc,
420		    DB_MAX_RECORDS, 0)) != 0) && ret != DB_NOTFOUND)
421			goto err;
422		if ((ret = __bam_nrecs(dbc, &cp->recno)) != 0)
423			goto err;
424		if (cp->recno == 0) {
425			ret = DB_NOTFOUND;
426			goto err;
427		}
428		break;
429	case DB_GET_BOTHC:
430		/*
431		 * If we're doing a join and these are offpage dups,
432		 * we want to keep searching forward from after the
433		 * current cursor position.  Increment the recno by 1,
434		 * then proceed as for a DB_SET.
435		 *
436		 * Otherwise, we know there are no additional matching
437		 * data, as recnos don't have dups.  return DB_NOTFOUND.
438		 */
439		if (F_ISSET(dbc, DBC_OPD)) {
440			cp->recno++;
441			break;
442		}
443		ret = DB_NOTFOUND;
444		goto err;
445		/* NOTREACHED */
446	case DB_GET_BOTH:
447	case DB_GET_BOTH_RANGE:
448		/*
449		 * If we're searching a set of off-page dups, we start
450		 * a new linear search from the first record.  Otherwise,
451		 * we compare the single data item associated with the
452		 * requested record for a match.
453		 */
454		if (F_ISSET(dbc, DBC_OPD)) {
455			cp->recno = 1;
456			break;
457		}
458		/* FALLTHROUGH */
459	case DB_SET:
460	case DB_SET_RANGE:
461		if ((ret = __ram_getno(dbc, key, &cp->recno, 0)) != 0)
462			goto err;
463		break;
464	default:
465		ret = __db_unknown_flag(dbp->env, "__ramc_get", flags);
466		goto err;
467	}
468
469	/*
470	 * For DB_PREV, DB_LAST, DB_SET and DB_SET_RANGE, we have already
471	 * called __ram_update() to make sure sufficient records have been
472	 * read from the backing source file.  Do it now for DB_CURRENT (if
473	 * the current record was deleted we may need more records from the
474	 * backing file for a DB_CURRENT operation), DB_FIRST and DB_NEXT.
475	 * (We don't have to test for flags == DB_FIRST, because the switch
476	 * statement above re-set flags to DB_NEXT in that case.)
477	 */
478	if ((flags == DB_NEXT || flags == DB_CURRENT) && ((ret =
479	    __ram_update(dbc, cp->recno, 0)) != 0) && ret != DB_NOTFOUND)
480		goto err;
481
482	for (;; ++cp->recno) {
483		/* Search the tree for the record. */
484		if ((ret = __bam_rsearch(dbc, &cp->recno,
485		    F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND,
486		    1, &exact)) != 0)
487			goto err;
488		if (!exact) {
489			ret = DB_NOTFOUND;
490			goto err;
491		}
492
493		/* Copy the page into the cursor. */
494		STACK_TO_CURSOR(cp, ret);
495		if (ret != 0)
496			goto err;
497
498		/*
499		 * If re-numbering records, the on-page deleted flag means this
500		 * record was implicitly created.  If not re-numbering records,
501		 * the on-page deleted flag means this record was implicitly
502		 * created, or, it was deleted at some time.  Regardless, we
503		 * skip such records if doing cursor next/prev operations or
504		 * walking through off-page duplicates, and fail if they were
505		 * requested explicitly by the application.
506		 */
507		if (B_DISSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type))
508			switch (flags) {
509			case DB_NEXT:
510			case DB_PREV:
511				(void)__bam_stkrel(dbc, STK_CLRDBC);
512				goto retry;
513			case DB_GET_BOTH:
514			case DB_GET_BOTH_RANGE:
515				/*
516				 * If we're an OPD tree, we don't care about
517				 * matching a record number on a DB_GET_BOTH
518				 * -- everything belongs to the same tree.  A
519				 * normal recno should give up and return
520				 * DB_NOTFOUND if the matching recno is deleted.
521				 */
522				if (F_ISSET(dbc, DBC_OPD)) {
523					(void)__bam_stkrel(dbc, STK_CLRDBC);
524					continue;
525				}
526				ret = DB_NOTFOUND;
527				goto err;
528			default:
529				ret = DB_KEYEMPTY;
530				goto err;
531			}
532
533		if (flags == DB_GET_BOTH ||
534		    flags == DB_GET_BOTHC || flags == DB_GET_BOTH_RANGE) {
535			if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn,
536			    data, cp->page, cp->indx, __bam_defcmp, &cmp)) != 0)
537				return (ret);
538			if (cmp == 0)
539				break;
540			if (!F_ISSET(dbc, DBC_OPD)) {
541				ret = DB_NOTFOUND;
542				goto err;
543			}
544			(void)__bam_stkrel(dbc, STK_CLRDBC);
545		} else
546			break;
547	}
548
549	/* Return the key if the user didn't give us one. */
550	if (!F_ISSET(dbc, DBC_OPD) && !F_ISSET(key, DB_DBT_ISSET)) {
551		ret = __db_retcopy(dbp->env,
552		    key, &cp->recno, sizeof(cp->recno),
553		    &dbc->rkey->data, &dbc->rkey->ulen);
554		F_SET(key, DB_DBT_ISSET);
555	}
556
557	/* The cursor was reset, no further delete adjustment is necessary. */
558err:	CD_CLR(cp);
559
560	return (ret);
561}
562
563/*
564 * __ramc_put --
565 *	Recno DBC->put function.
566 *
567 * PUBLIC: int __ramc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
568 */
569int
570__ramc_put(dbc, key, data, flags, pgnop)
571	DBC *dbc;
572	DBT *key, *data;
573	u_int32_t flags;
574	db_pgno_t *pgnop;
575{
576	BTREE_CURSOR *cp;
577	DB *dbp;
578	DB_LSN lsn;
579	ENV *env;
580	u_int32_t iiflags;
581	int exact, nc, ret, t_ret;
582	void *arg;
583
584	COMPQUIET(pgnop, NULL);
585
586	dbp = dbc->dbp;
587	env = dbp->env;
588	cp = (BTREE_CURSOR *)dbc->internal;
589
590	/*
591	 * DB_KEYFIRST and DB_KEYLAST mean different things if they're
592	 * used in an off-page duplicate tree.  If we're an off-page
593	 * duplicate tree, they really mean "put at the beginning of the
594	 * tree" and "put at the end of the tree" respectively, so translate
595	 * them to something else.
596	 */
597	if (F_ISSET(dbc, DBC_OPD))
598		switch (flags) {
599		case DB_KEYFIRST:
600			cp->recno = 1;
601			flags = DB_BEFORE;
602			break;
603		case DB_KEYLAST:
604			if ((ret = __ram_add(dbc,
605			    &cp->recno, data, DB_APPEND, 0)) != 0)
606				return (ret);
607			if (CURADJ_LOG(dbc) &&
608			    (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0,
609			    CA_ICURRENT, cp->root, cp->recno, cp->order)) != 0)
610				return (ret);
611			return (0);
612		default:
613			break;
614		}
615
616	/*
617	 * Handle normal DB_KEYFIRST/DB_KEYLAST;  for a recno, which has
618	 * no duplicates, these are identical and mean "put the given
619	 * datum at the given recno".
620	 */
621	if (flags == DB_KEYFIRST ||
622	    flags == DB_KEYLAST || flags == DB_NOOVERWRITE) {
623		ret = __ram_getno(dbc, key, &cp->recno, 1);
624		if (ret == 0 || ret == DB_NOTFOUND)
625			ret = __ram_add(dbc, &cp->recno, data, flags, 0);
626		return (ret);
627	}
628
629	/*
630	 * If we're putting with a cursor that's marked C_DELETED, we need to
631	 * take special care;  the cursor doesn't "really" reference the item
632	 * corresponding to its current recno, but instead is "between" that
633	 * record and the current one.  Translate the actual insert into
634	 * DB_BEFORE, and let the __ram_ca work out the gory details of what
635	 * should wind up pointing where.
636	 */
637	if (CD_ISSET(cp))
638		iiflags = DB_BEFORE;
639	else
640		iiflags = flags;
641
642split:	if ((ret = __bam_rsearch(dbc, &cp->recno, SR_INSERT, 1, &exact)) != 0)
643		goto err;
644	/*
645	 * An inexact match is okay;  it just means we're one record past the
646	 * end, which is reasonable if we're marked deleted.
647	 */
648	DB_ASSERT(env, exact || CD_ISSET(cp));
649
650	/* Copy the page into the cursor. */
651	STACK_TO_CURSOR(cp, ret);
652	if (ret != 0)
653		goto err;
654
655	ret = __bam_iitem(dbc, key, data, iiflags, 0);
656	t_ret = __bam_stkrel(dbc, STK_CLRDBC);
657
658	if (t_ret != 0 && (ret == 0 || ret == DB_NEEDSPLIT))
659		ret = t_ret;
660	else if (ret == DB_NEEDSPLIT) {
661		arg = &cp->recno;
662		if ((ret = __bam_split(dbc, arg, NULL)) != 0)
663			goto err;
664		goto split;
665	}
666	if (ret != 0)
667		goto err;
668
669	switch (flags) {			/* Adjust the cursors. */
670	case DB_AFTER:
671		if ((ret = __ram_ca(dbc, CA_IAFTER, &nc)) != 0)
672			goto err;
673
674		/*
675		 * We only need to adjust this cursor forward if we truly added
676		 * the item after the current recno, rather than remapping it
677		 * to DB_BEFORE.
678		 */
679		if (iiflags == DB_AFTER)
680			++cp->recno;
681
682		/* Only log if __ram_ca found any relevant cursors. */
683		if (nc > 0 && CURADJ_LOG(dbc) &&
684		    (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0, CA_IAFTER,
685		    cp->root, cp->recno, cp->order)) != 0)
686			goto err;
687		break;
688	case DB_BEFORE:
689		if ((ret = __ram_ca(dbc, CA_IBEFORE, &nc)) != 0)
690			goto err;
691		--cp->recno;
692
693		/* Only log if __ram_ca found any relevant cursors. */
694		if (nc > 0 && CURADJ_LOG(dbc) &&
695		    (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0, CA_IBEFORE,
696		    cp->root, cp->recno, cp->order)) != 0)
697			goto err;
698		break;
699	case DB_CURRENT:
700		/*
701		 * We only need to do an adjustment if we actually
702		 * added an item, which we only would have done if the
703		 * cursor was marked deleted.
704		 */
705		if (!CD_ISSET(cp))
706			break;
707
708		/* Only log if __ram_ca found any relevant cursors. */
709		if ((ret = __ram_ca(dbc, CA_ICURRENT, &nc)) != 0)
710			goto err;
711		if (nc > 0 && CURADJ_LOG(dbc) &&
712		    (ret = __bam_rcuradj_log(dbp, dbc->txn, &lsn, 0,
713		    CA_ICURRENT, cp->root, cp->recno, cp->order)) != 0)
714			goto err;
715		break;
716	default:
717		break;
718	}
719
720	/* Return the key if we've created a new record. */
721	if (!F_ISSET(dbc, DBC_OPD) &&
722	    (flags == DB_AFTER || flags == DB_BEFORE) && key != NULL)
723		ret = __db_retcopy(env, key, &cp->recno,
724		    sizeof(cp->recno), &dbc->rkey->data, &dbc->rkey->ulen);
725
726	/* The cursor was reset, no further delete adjustment is necessary. */
727err:	CD_CLR(cp);
728
729	return (ret);
730}
731
732/*
733 * __ram_ca --
734 *	Adjust cursors.  Returns the number of relevant cursors.
735 *
736 * PUBLIC: int __ram_ca __P((DBC *, ca_recno_arg, int *));
737 */
738int
739__ram_ca(dbc_arg, op, foundp)
740	DBC *dbc_arg;
741	ca_recno_arg op;
742	int *foundp;
743{
744	BTREE_CURSOR *cp, *cp_arg;
745	DB *dbp, *ldbp;
746	DBC *dbc;
747	ENV *env;
748	db_recno_t recno;
749	u_int32_t order;
750	int adjusted, found;
751
752	dbp = dbc_arg->dbp;
753	env = dbp->env;
754	cp_arg = (BTREE_CURSOR *)dbc_arg->internal;
755	recno = cp_arg->recno;
756
757	/*
758	 * It only makes sense to adjust cursors if we're a renumbering
759	 * recno;  we should only be called if this is one.
760	 */
761	DB_ASSERT(env, F_ISSET(cp_arg, C_RENUMBER));
762
763	MUTEX_LOCK(env, env->mtx_dblist);
764	/*
765	 * Adjust the cursors.  See the comment in __bam_ca_delete().
766	 *
767	 * If we're doing a delete, we need to find the highest
768	 * order of any cursor currently pointing at this item,
769	 * so we can assign a higher order to the newly deleted
770	 * cursor.  Unfortunately, this requires a second pass through
771	 * the cursor list.
772	 */
773	if (op == CA_DELETE) {
774		FIND_FIRST_DB_MATCH(env, dbp, ldbp);
775		for (order = 1;
776		    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
777		    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
778			MUTEX_LOCK(env, dbp->mutex);
779			TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
780				cp = (BTREE_CURSOR *)dbc->internal;
781				if (cp_arg->root == cp->root &&
782				    recno == cp->recno && CD_ISSET(cp) &&
783				    order <= cp->order &&
784				    !MVCC_SKIP_CURADJ(dbc, cp->root))
785					order = cp->order + 1;
786			}
787			MUTEX_UNLOCK(env, dbp->mutex);
788		}
789	} else
790		order = INVALID_ORDER;
791
792	/* Now go through and do the actual adjustments. */
793	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
794	for (found = 0;
795	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
796	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
797		MUTEX_LOCK(env, dbp->mutex);
798		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
799			cp = (BTREE_CURSOR *)dbc->internal;
800			if (cp_arg->root != cp->root ||
801			    MVCC_SKIP_CURADJ(dbc, cp->root))
802				continue;
803			++found;
804			adjusted = 0;
805			switch (op) {
806			case CA_DELETE:
807				if (recno < cp->recno) {
808					--cp->recno;
809					/*
810					 * If the adjustment made them equal,
811					 * we have to merge the orders.
812					 */
813					if (recno == cp->recno && CD_ISSET(cp))
814						cp->order += order;
815				} else if (recno == cp->recno &&
816				    !CD_ISSET(cp)) {
817					CD_SET(cp);
818					cp->order = order;
819				}
820				break;
821			case CA_IBEFORE:
822				/*
823				 * IBEFORE is just like IAFTER, except that we
824				 * adjust cursors on the current record too.
825				 */
826				if (C_EQUAL(cp_arg, cp)) {
827					++cp->recno;
828					adjusted = 1;
829				}
830				goto iafter;
831			case CA_ICURRENT:
832
833				/*
834				 * If the original cursor wasn't deleted, we
835				 * just did a replacement and so there's no
836				 * need to adjust anything--we shouldn't have
837				 * gotten this far.  Otherwise, we behave
838				 * much like an IAFTER, except that all
839				 * cursors pointing to the current item get
840				 * marked undeleted and point to the new
841				 * item.
842				 */
843				DB_ASSERT(env, CD_ISSET(cp_arg));
844				if (C_EQUAL(cp_arg, cp)) {
845					CD_CLR(cp);
846					break;
847				}
848				/* FALLTHROUGH */
849			case CA_IAFTER:
850iafter:				if (!adjusted && C_LESSTHAN(cp_arg, cp)) {
851					++cp->recno;
852					adjusted = 1;
853				}
854				if (recno == cp->recno && adjusted)
855					/*
856					 * If we've moved this cursor's recno,
857					 * split its order number--i.e.,
858					 * decrement it by enough so that
859					 * the lowest cursor moved has order 1.
860					 * cp_arg->order is the split point,
861					 * so decrement by one less than that.
862					 */
863					cp->order -= (cp_arg->order - 1);
864				break;
865			}
866		}
867		MUTEX_UNLOCK(dbp->env, dbp->mutex);
868	}
869	MUTEX_UNLOCK(env, env->mtx_dblist);
870
871	if (foundp != NULL)
872		*foundp = found;
873	return (0);
874}
875
876/*
877 * __ram_getno --
878 *	Check the user's record number, and make sure we've seen it.
879 *
880 * PUBLIC: int __ram_getno __P((DBC *, const DBT *, db_recno_t *, int));
881 */
882int
883__ram_getno(dbc, key, rep, can_create)
884	DBC *dbc;
885	const DBT *key;
886	db_recno_t *rep;
887	int can_create;
888{
889	DB *dbp;
890	db_recno_t recno;
891
892	dbp = dbc->dbp;
893
894	/* If passed an empty DBT from Java, key->data may be NULL */
895	if (key->size != sizeof(db_recno_t)) {
896		__db_errx(dbp->env, "illegal record number size");
897		return (EINVAL);
898	}
899
900	/* Check the user's record number. */
901	if ((recno = *(db_recno_t *)key->data) == 0) {
902		__db_errx(dbp->env, "illegal record number of 0");
903		return (EINVAL);
904	}
905	if (rep != NULL)
906		*rep = recno;
907
908	/*
909	 * Btree can neither create records nor read them in.  Recno can
910	 * do both, see if we can find the record.
911	 */
912	return (dbc->dbtype == DB_RECNO ?
913	    __ram_update(dbc, recno, can_create) : 0);
914}
915
916/*
917 * __ram_update --
918 *	Ensure the tree has records up to and including the specified one.
919 */
920static int
921__ram_update(dbc, recno, can_create)
922	DBC *dbc;
923	db_recno_t recno;
924	int can_create;
925{
926	BTREE *t;
927	DB *dbp;
928	DBT *rdata;
929	db_recno_t nrecs;
930	int ret;
931
932	dbp = dbc->dbp;
933	t = dbp->bt_internal;
934
935	/*
936	 * If we can't create records and we've read the entire backing input
937	 * file, we're done.
938	 */
939	if (!can_create && t->re_eof)
940		return (0);
941
942	/*
943	 * If we haven't seen this record yet, try to get it from the original
944	 * file.
945	 */
946	if ((ret = __bam_nrecs(dbc, &nrecs)) != 0)
947		return (ret);
948	if (!t->re_eof && recno > nrecs) {
949		if ((ret = __ram_sread(dbc, recno)) != 0 && ret != DB_NOTFOUND)
950			return (ret);
951		if ((ret = __bam_nrecs(dbc, &nrecs)) != 0)
952			return (ret);
953	}
954
955	/*
956	 * If we can create records, create empty ones up to the requested
957	 * record.
958	 */
959	if (!can_create || recno <= nrecs + 1)
960		return (0);
961
962	rdata = &dbc->my_rdata;
963	rdata->flags = 0;
964	rdata->size = 0;
965
966	while (recno > ++nrecs)
967		if ((ret = __ram_add(dbc,
968		    &nrecs, rdata, 0, BI_DELETED)) != 0)
969			return (ret);
970	return (0);
971}
972
973/*
974 * __ram_source --
975 *	Load information about the backing file.
976 */
977static int
978__ram_source(dbp)
979	DB *dbp;
980{
981	BTREE *t;
982	ENV *env;
983	char *source;
984	int ret;
985
986	env = dbp->env;
987	t = dbp->bt_internal;
988
989	/* Find the real name, and swap out the one we had before. */
990	if ((ret = __db_appname(env,
991	    DB_APP_DATA, t->re_source, 0, NULL, &source)) != 0)
992		return (ret);
993	__os_free(env, t->re_source);
994	t->re_source = source;
995
996	/*
997	 * !!!
998	 * It's possible that the backing source file is read-only.  We don't
999	 * much care other than we'll complain if there are any modifications
1000	 * when it comes time to write the database back to the source.
1001	 */
1002	if ((t->re_fp = fopen(t->re_source, "rb")) == NULL) {
1003		ret = __os_get_errno();
1004		__db_err(env, ret, "%s", t->re_source);
1005		return (ret);
1006	}
1007
1008	t->re_eof = 0;
1009	return (0);
1010}
1011
1012/*
1013 * __ram_writeback --
1014 *	Rewrite the backing file.
1015 *
1016 * PUBLIC: int __ram_writeback __P((DB *));
1017 */
1018int
1019__ram_writeback(dbp)
1020	DB *dbp;
1021{
1022	BTREE *t;
1023	DBC *dbc;
1024	DBT key, data;
1025	DB_THREAD_INFO *ip;
1026	ENV *env;
1027	FILE *fp;
1028	db_recno_t keyno;
1029	int ret, t_ret;
1030	u_int8_t delim, *pad;
1031
1032	t = dbp->bt_internal;
1033	env = dbp->env;
1034	fp = NULL;
1035	pad = NULL;
1036
1037	/* If the file wasn't modified, we're done. */
1038	if (!t->re_modified)
1039		return (0);
1040
1041	/* If there's no backing source file, we're done. */
1042	if (t->re_source == NULL) {
1043		t->re_modified = 0;
1044		return (0);
1045	}
1046
1047	/*
1048	 * We step through the records, writing each one out.  Use the record
1049	 * number and the dbp->get() function, instead of a cursor, so we find
1050	 * and write out "deleted" or non-existent records.  The DB handle may
1051	 * be threaded, so allocate memory as we go.
1052	 */
1053	memset(&key, 0, sizeof(key));
1054	key.size = sizeof(db_recno_t);
1055	key.data = &keyno;
1056	memset(&data, 0, sizeof(data));
1057	F_SET(&data, DB_DBT_REALLOC);
1058
1059	/* Allocate a cursor. */
1060	ENV_GET_THREAD_INFO(env, ip);
1061	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
1062		return (ret);
1063
1064	/*
1065	 * Read any remaining records into the tree.
1066	 *
1067	 * !!!
1068	 * This is why we can't support transactions when applications specify
1069	 * backing (re_source) files.  At this point we have to read in the
1070	 * rest of the records from the file so that we can write all of the
1071	 * records back out again, which could modify a page for which we'd
1072	 * have to log changes and which we don't have locked.  This could be
1073	 * partially fixed by taking a snapshot of the entire file during the
1074	 * DB->open as DB->open is transaction protected.  But, if a checkpoint
1075	 * occurs then, the part of the log holding the copy of the file could
1076	 * be discarded, and that would make it impossible to recover in the
1077	 * face of disaster.  This could all probably be fixed, but it would
1078	 * require transaction protecting the backing source file.
1079	 *
1080	 * XXX
1081	 * This could be made to work now that we have transactions protecting
1082	 * file operations.  Margo has specifically asked for the privilege of
1083	 * doing this work.
1084	 */
1085	if ((ret =
1086	    __ram_update(dbc, DB_MAX_RECORDS, 0)) != 0 && ret != DB_NOTFOUND)
1087		goto err;
1088
1089	/*
1090	 * Close any existing file handle and re-open the file, truncating it.
1091	 */
1092	if (t->re_fp != NULL) {
1093		if (fclose(t->re_fp) != 0) {
1094			ret = __os_get_errno();
1095			__db_err(env, ret, "%s", t->re_source);
1096			goto err;
1097		}
1098		t->re_fp = NULL;
1099	}
1100	if ((fp = fopen(t->re_source, "wb")) == NULL) {
1101		ret = __os_get_errno();
1102		__db_err(env, ret, "%s", t->re_source);
1103		goto err;
1104	}
1105
1106	/*
1107	 * We'll need the delimiter if we're doing variable-length records,
1108	 * and the pad character if we're doing fixed-length records.
1109	 */
1110	delim = t->re_delim;
1111	for (keyno = 1;; ++keyno) {
1112		switch (ret = __db_get(dbp, ip, NULL, &key, &data, 0)) {
1113		case 0:
1114			if (data.size != 0 &&
1115			    fwrite(data.data, 1, data.size, fp) != data.size)
1116				goto write_err;
1117			break;
1118		case DB_KEYEMPTY:
1119			if (F_ISSET(dbp, DB_AM_FIXEDLEN)) {
1120				if (pad == NULL) {
1121					if ((ret = __os_malloc(
1122					    env, t->re_len, &pad)) != 0)
1123						goto err;
1124					memset(pad, t->re_pad, t->re_len);
1125				}
1126				if (fwrite(pad, 1, t->re_len, fp) != t->re_len)
1127					goto write_err;
1128			}
1129			break;
1130		case DB_NOTFOUND:
1131			ret = 0;
1132			goto done;
1133		default:
1134			goto err;
1135		}
1136		if (!F_ISSET(dbp, DB_AM_FIXEDLEN) &&
1137		    fwrite(&delim, 1, 1, fp) != 1) {
1138write_err:		ret = __os_get_errno();
1139			__db_err(env, ret,
1140			    "%s: write failed to backing file", t->re_source);
1141			goto err;
1142		}
1143	}
1144
1145err:
1146done:	/* Close the file descriptor. */
1147	if (fp != NULL && fclose(fp) != 0) {
1148		t_ret = __os_get_errno();
1149		__db_err(env, t_ret, "%s", t->re_source);
1150		if (ret == 0)
1151			ret = t_ret;
1152	}
1153
1154	/* Discard the cursor. */
1155	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
1156		ret = t_ret;
1157
1158	/* Discard memory allocated to hold the data items. */
1159	if (data.data != NULL)
1160		__os_ufree(env, data.data);
1161	if (pad != NULL)
1162		__os_free(env, pad);
1163
1164	if (ret == 0)
1165		t->re_modified = 0;
1166
1167	return (ret);
1168}
1169
1170/*
1171 * __ram_sread --
1172 *	Read records from a source file.
1173 */
1174static int
1175__ram_sread(dbc, top)
1176	DBC *dbc;
1177	db_recno_t top;
1178{
1179	BTREE *t;
1180	DB *dbp;
1181	DBT data, *rdata;
1182	db_recno_t recno;
1183	size_t len;
1184	int ch, ret, was_modified;
1185
1186	t = dbc->dbp->bt_internal;
1187	dbp = dbc->dbp;
1188	was_modified = t->re_modified;
1189
1190	if ((ret = __bam_nrecs(dbc, &recno)) != 0)
1191		return (ret);
1192
1193	/*
1194	 * Use the record key return memory, it's only a short-term use.
1195	 * The record data return memory is used by __bam_iitem, which
1196	 * we'll indirectly call, so use the key so as not to collide.
1197	 */
1198	len = F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_len : 256;
1199	rdata = &dbc->my_rkey;
1200	if (rdata->ulen < len) {
1201		if ((ret = __os_realloc(
1202		    dbp->env, len, &rdata->data)) != 0) {
1203			rdata->ulen = 0;
1204			rdata->data = NULL;
1205			return (ret);
1206		}
1207		rdata->ulen = (u_int32_t)len;
1208	}
1209
1210	memset(&data, 0, sizeof(data));
1211	while (recno < top) {
1212		data.data = rdata->data;
1213		data.size = 0;
1214		if (F_ISSET(dbp, DB_AM_FIXEDLEN))
1215			for (len = t->re_len; len > 0; --len) {
1216				if ((ch = fgetc(t->re_fp)) == EOF) {
1217					if (data.size == 0)
1218						goto eof;
1219					break;
1220				}
1221				((u_int8_t *)data.data)[data.size++] = ch;
1222			}
1223		else
1224			for (;;) {
1225				if ((ch = fgetc(t->re_fp)) == EOF) {
1226					if (data.size == 0)
1227						goto eof;
1228					break;
1229				}
1230				if (ch == t->re_delim)
1231					break;
1232
1233				((u_int8_t *)data.data)[data.size++] = ch;
1234				if (data.size == rdata->ulen) {
1235					if ((ret = __os_realloc(dbp->env,
1236					    rdata->ulen *= 2,
1237					    &rdata->data)) != 0) {
1238						rdata->ulen = 0;
1239						rdata->data = NULL;
1240						return (ret);
1241					} else
1242						data.data = rdata->data;
1243				}
1244			}
1245
1246		/*
1247		 * Another process may have read this record from the input
1248		 * file and stored it into the database already, in which
1249		 * case we don't need to repeat that operation.  We detect
1250		 * this by checking if the last record we've read is greater
1251		 * or equal to the number of records in the database.
1252		 */
1253		if (t->re_last >= recno) {
1254			++recno;
1255			if ((ret = __ram_add(dbc, &recno, &data, 0, 0)) != 0)
1256				goto err;
1257		}
1258		++t->re_last;
1259	}
1260
1261	if (0) {
1262eof:		t->re_eof = 1;
1263		ret = DB_NOTFOUND;
1264	}
1265err:	if (!was_modified)
1266		t->re_modified = 0;
1267
1268	return (ret);
1269}
1270
1271/*
1272 * __ram_add --
1273 *	Add records into the tree.
1274 */
1275static int
1276__ram_add(dbc, recnop, data, flags, bi_flags)
1277	DBC *dbc;
1278	db_recno_t *recnop;
1279	DBT *data;
1280	u_int32_t flags, bi_flags;
1281{
1282	BTREE_CURSOR *cp;
1283	int exact, ret, stack, t_ret;
1284
1285	cp = (BTREE_CURSOR *)dbc->internal;
1286
1287retry:	/* Find the slot for insertion. */
1288	if ((ret = __bam_rsearch(dbc, recnop,
1289	    SR_INSERT | (flags == DB_APPEND ? SR_APPEND : 0), 1, &exact)) != 0)
1290		return (ret);
1291	stack = 1;
1292
1293	/* Copy the page into the cursor. */
1294	STACK_TO_CURSOR(cp, ret);
1295	if (ret != 0)
1296		goto err;
1297
1298	if (exact && flags == DB_NOOVERWRITE && !CD_ISSET(cp) &&
1299	    !B_DISSET(GET_BKEYDATA(dbc->dbp, cp->page, cp->indx)->type)) {
1300		ret = DB_KEYEXIST;
1301		goto err;
1302	}
1303
1304	/*
1305	 * The application may modify the data based on the selected record
1306	 * number.
1307	 */
1308	if (flags == DB_APPEND && dbc->dbp->db_append_recno != NULL &&
1309	    (ret = dbc->dbp->db_append_recno(dbc->dbp, data, *recnop)) != 0)
1310		goto err;
1311
1312	/*
1313	 * Select the arguments for __bam_iitem() and do the insert.  If the
1314	 * key is an exact match, or we're replacing the data item with a
1315	 * new data item, replace the current item.  If the key isn't an exact
1316	 * match, we're inserting a new key/data pair, before the search
1317	 * location.
1318	 */
1319	switch (ret = __bam_iitem(dbc,
1320	    NULL, data, exact ? DB_CURRENT : DB_BEFORE, bi_flags)) {
1321	case 0:
1322		/*
1323		 * Don't adjust anything.
1324		 *
1325		 * If we inserted a record, no cursors need adjusting because
1326		 * the only new record it's possible to insert is at the very
1327		 * end of the tree.  The necessary adjustments to the internal
1328		 * page counts were made by __bam_iitem().
1329		 *
1330		 * If we overwrote a record, no cursors need adjusting because
1331		 * future DBcursor->get calls will simply return the underlying
1332		 * record (there's no adjustment made for the DB_CURRENT flag
1333		 * when a cursor get operation immediately follows a cursor
1334		 * delete operation, and the normal adjustment for the DB_NEXT
1335		 * flag is still correct).
1336		 */
1337		break;
1338	case DB_NEEDSPLIT:
1339		/* Discard the stack of pages and split the page. */
1340		(void)__bam_stkrel(dbc, STK_CLRDBC);
1341		stack = 0;
1342
1343		if ((ret = __bam_split(dbc, recnop, NULL)) != 0)
1344			goto err;
1345
1346		goto retry;
1347		/* NOTREACHED */
1348	default:
1349		goto err;
1350	}
1351
1352err:	if (stack && (t_ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0 && ret == 0)
1353		ret = t_ret;
1354
1355	return (ret);
1356}
1357