1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2000,2008 Oracle.  All rights reserved.
5 *
6 * $Id: db_cam.c,v 12.79 2008/05/07 12:27:32 bschmeck Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/hash.h"
15#include "dbinc/lock.h"
16#include "dbinc/mp.h"
17#include "dbinc/qam.h"
18#include "dbinc/txn.h"
19
20static int __db_buildpartial __P((DB *, DBT *, DBT *, DBT *));
21static int __db_s_count __P((DB *));
22static int __db_wrlock_err __P((ENV *));
23static int __dbc_cleanup __P((DBC *, DBC *, int));
24static int __dbc_del_foreign __P((DBC *));
25static int __dbc_del_oldskey __P((DB *, DBC *, DBT *, DBT *, DBT *));
26static int __dbc_del_secondary __P((DBC *));
27static int __dbc_pget_recno __P((DBC *, DBT *, DBT *, u_int32_t));
28
29#define	CDB_LOCKING_INIT(env, dbc)					\
30	/*								\
31	 * If we are running CDB, this had better be either a write	\
32	 * cursor or an immediate writer.  If it's a regular writer,	\
33	 * that means we have an IWRITE lock and we need to upgrade	\
34	 * it to a write lock.						\
35	 */								\
36	if (CDB_LOCKING(env)) {					\
37		if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER))	\
38			return (__db_wrlock_err(env));		\
39									\
40		if (F_ISSET(dbc, DBC_WRITECURSOR) &&			\
41		    (ret = __lock_get(env,				\
42		    (dbc)->locker, DB_LOCK_UPGRADE, &(dbc)->lock_dbt,	\
43		    DB_LOCK_WRITE, &(dbc)->mylock)) != 0)		\
44			return (ret);					\
45	}
46#define	CDB_LOCKING_DONE(env, dbc)					\
47	/* Release the upgraded lock. */				\
48	if (F_ISSET(dbc, DBC_WRITECURSOR))				\
49		(void)__lock_downgrade(					\
50		    env, &(dbc)->mylock, DB_LOCK_IWRITE, 0);
51
52/*
53 * __dbc_close --
54 *	DBC->close.
55 *
56 * PUBLIC: int __dbc_close __P((DBC *));
57 */
58int
59__dbc_close(dbc)
60	DBC *dbc;
61{
62	DB *dbp;
63	DBC *opd;
64	DBC_INTERNAL *cp;
65	DB_TXN *txn;
66	ENV *env;
67	int ret, t_ret;
68
69	dbp = dbc->dbp;
70	env = dbp->env;
71	cp = dbc->internal;
72	opd = cp->opd;
73	ret = 0;
74
75	/*
76	 * Remove the cursor(s) from the active queue.  We may be closing two
77	 * cursors at once here, a top-level one and a lower-level, off-page
78	 * duplicate one.  The access-method specific cursor close routine must
79	 * close both of them in a single call.
80	 *
81	 * !!!
82	 * Cursors must be removed from the active queue before calling the
83	 * access specific cursor close routine, btree depends on having that
84	 * order of operations.
85	 */
86	MUTEX_LOCK(env, dbp->mutex);
87
88	if (opd != NULL) {
89		DB_ASSERT(env, F_ISSET(opd, DBC_ACTIVE));
90		F_CLR(opd, DBC_ACTIVE);
91		TAILQ_REMOVE(&dbp->active_queue, opd, links);
92	}
93	DB_ASSERT(env, F_ISSET(dbc, DBC_ACTIVE));
94	F_CLR(dbc, DBC_ACTIVE);
95	TAILQ_REMOVE(&dbp->active_queue, dbc, links);
96
97	MUTEX_UNLOCK(env, dbp->mutex);
98
99	/* Call the access specific cursor close routine. */
100	if ((t_ret =
101	    dbc->am_close(dbc, PGNO_INVALID, NULL)) != 0 && ret == 0)
102		ret = t_ret;
103
104	/*
105	 * Release the lock after calling the access method specific close
106	 * routine, a Btree cursor may have had pending deletes.
107	 */
108	if (CDB_LOCKING(env)) {
109		/*
110		 * Also, be sure not to free anything if mylock.off is
111		 * INVALID;  in some cases, such as idup'ed read cursors
112		 * and secondary update cursors, a cursor in a CDB
113		 * environment may not have a lock at all.
114		 */
115		if ((t_ret = __LPUT(dbc, dbc->mylock)) != 0 && ret == 0)
116			ret = t_ret;
117
118		/* For safety's sake, since this is going on the free queue. */
119		memset(&dbc->mylock, 0, sizeof(dbc->mylock));
120		if (opd != NULL)
121			memset(&opd->mylock, 0, sizeof(opd->mylock));
122	}
123
124	if ((txn = dbc->txn) != NULL)
125		txn->cursors--;
126
127	/* Move the cursor(s) to the free queue. */
128	MUTEX_LOCK(env, dbp->mutex);
129	if (opd != NULL) {
130		if (txn != NULL)
131			txn->cursors--;
132		TAILQ_INSERT_TAIL(&dbp->free_queue, opd, links);
133		opd = NULL;
134	}
135	TAILQ_INSERT_TAIL(&dbp->free_queue, dbc, links);
136	MUTEX_UNLOCK(env, dbp->mutex);
137
138	if (txn != NULL && F_ISSET(txn, TXN_PRIVATE) && txn->cursors == 0 &&
139	    (t_ret = __txn_commit(txn, 0)) != 0 && ret == 0)
140		ret = t_ret;
141
142	return (ret);
143}
144
145/*
146 * __dbc_destroy --
147 *	Destroy the cursor, called after DBC->close.
148 *
149 * PUBLIC: int __dbc_destroy __P((DBC *));
150 */
151int
152__dbc_destroy(dbc)
153	DBC *dbc;
154{
155	DB *dbp;
156	ENV *env;
157	int ret, t_ret;
158
159	dbp = dbc->dbp;
160	env = dbp->env;
161
162	/* Remove the cursor from the free queue. */
163	MUTEX_LOCK(env, dbp->mutex);
164	TAILQ_REMOVE(&dbp->free_queue, dbc, links);
165	MUTEX_UNLOCK(env, dbp->mutex);
166
167	/* Free up allocated memory. */
168	if (dbc->my_rskey.data != NULL)
169		__os_free(env, dbc->my_rskey.data);
170	if (dbc->my_rkey.data != NULL)
171		__os_free(env, dbc->my_rkey.data);
172	if (dbc->my_rdata.data != NULL)
173		__os_free(env, dbc->my_rdata.data);
174
175	/* Call the access specific cursor destroy routine. */
176	ret = dbc->am_destroy == NULL ? 0 : dbc->am_destroy(dbc);
177
178	/*
179	 * Release the lock id for this cursor.
180	 */
181	if (LOCKING_ON(env) &&
182	    F_ISSET(dbc, DBC_OWN_LID) &&
183	    (t_ret = __lock_id_free(env, dbc->lref)) != 0 && ret == 0)
184		ret = t_ret;
185
186	__os_free(env, dbc);
187
188	return (ret);
189}
190
191/*
192 * __dbc_count --
193 *	Return a count of duplicate data items.
194 *
195 * PUBLIC: int __dbc_count __P((DBC *, db_recno_t *));
196 */
197int
198__dbc_count(dbc, recnop)
199	DBC *dbc;
200	db_recno_t *recnop;
201{
202	ENV *env;
203	int ret;
204
205	env = dbc->env;
206
207	/*
208	 * Cursor Cleanup Note:
209	 * All of the cursors passed to the underlying access methods by this
210	 * routine are not duplicated and will not be cleaned up on return.
211	 * So, pages/locks that the cursor references must be resolved by the
212	 * underlying functions.
213	 */
214	switch (dbc->dbtype) {
215	case DB_QUEUE:
216	case DB_RECNO:
217		*recnop = 1;
218		break;
219	case DB_HASH:
220		if (dbc->internal->opd == NULL) {
221			if ((ret = __hamc_count(dbc, recnop)) != 0)
222				return (ret);
223			break;
224		}
225		/* FALLTHROUGH */
226	case DB_BTREE:
227		if ((ret = __bamc_count(dbc, recnop)) != 0)
228			return (ret);
229		break;
230	case DB_UNKNOWN:
231	default:
232		return (__db_unknown_type(env, "__dbc_count", dbc->dbtype));
233	}
234	return (0);
235}
236
237/*
238 * __dbc_del --
239 *	DBC->del.
240 *
241 * PUBLIC: int __dbc_del __P((DBC *, u_int32_t));
242 */
243int
244__dbc_del(dbc, flags)
245	DBC *dbc;
246	u_int32_t flags;
247{
248	DB *dbp;
249	DBC *opd;
250	ENV *env;
251	int ret, t_ret;
252
253	dbp = dbc->dbp;
254	env = dbp->env;
255
256	/*
257	 * Cursor Cleanup Note:
258	 * All of the cursors passed to the underlying access methods by this
259	 * routine are not duplicated and will not be cleaned up on return.
260	 * So, pages/locks that the cursor references must be resolved by the
261	 * underlying functions.
262	 */
263
264	CDB_LOCKING_INIT(env, dbc);
265
266	/*
267	 * If we're a secondary index, and DB_UPDATE_SECONDARY isn't set
268	 * (which it only is if we're being called from a primary update),
269	 * then we need to call through to the primary and delete the item.
270	 *
271	 * Note that this will delete the current item;  we don't need to
272	 * delete it ourselves as well, so we can just goto done.
273	 */
274	if (flags != DB_UPDATE_SECONDARY && F_ISSET(dbp, DB_AM_SECONDARY)) {
275		ret = __dbc_del_secondary(dbc);
276		goto done;
277	}
278
279	/*
280	 * If we are a foreign db, go through and check any foreign key
281	 * constraints first, which will make rolling back changes on an abort
282	 * simpler.
283	 */
284	if (LIST_FIRST(&dbp->f_primaries) != NULL &&
285	    (ret = __dbc_del_foreign(dbc)) != 0)
286		goto done;
287
288	/*
289	 * If we are a primary and have secondary indices, go through
290	 * and delete any secondary keys that point at the current record.
291	 */
292	if (LIST_FIRST(&dbp->s_secondaries) != NULL &&
293	    (ret = __dbc_del_primary(dbc)) != 0)
294		goto done;
295
296	/*
297	 * Off-page duplicate trees are locked in the primary tree, that is,
298	 * we acquire a write lock in the primary tree and no locks in the
299	 * off-page dup tree.  If the del operation is done in an off-page
300	 * duplicate tree, call the primary cursor's upgrade routine first.
301	 */
302	opd = dbc->internal->opd;
303	if (opd == NULL)
304		ret = dbc->am_del(dbc);
305	else
306		if ((ret = dbc->am_writelock(dbc)) == 0)
307			ret = opd->am_del(opd);
308
309	/*
310	 * If this was an update that is supporting dirty reads
311	 * then we may have just swapped our read for a write lock
312	 * which is held by the surviving cursor.  We need
313	 * to explicitly downgrade this lock.  The closed cursor
314	 * may only have had a read lock.
315	 */
316	if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED) &&
317	    dbc->internal->lock_mode == DB_LOCK_WRITE) {
318		if ((t_ret =
319		    __TLPUT(dbc, dbc->internal->lock)) != 0 && ret == 0)
320			ret = t_ret;
321		if (t_ret == 0)
322			dbc->internal->lock_mode = DB_LOCK_WWRITE;
323	}
324
325done:	CDB_LOCKING_DONE(env, dbc);
326
327	return (ret);
328}
329
330/*
331 * __dbc_dup --
332 *	Duplicate a cursor
333 *
334 * PUBLIC: int __dbc_dup __P((DBC *, DBC **, u_int32_t));
335 */
336int
337__dbc_dup(dbc_orig, dbcp, flags)
338	DBC *dbc_orig;
339	DBC **dbcp;
340	u_int32_t flags;
341{
342	DBC *dbc_n, *dbc_nopd;
343	int ret;
344
345	dbc_n = dbc_nopd = NULL;
346
347	/* Allocate a new cursor and initialize it. */
348	if ((ret = __dbc_idup(dbc_orig, &dbc_n, flags)) != 0)
349		goto err;
350	*dbcp = dbc_n;
351
352	/*
353	 * If the cursor references an off-page duplicate tree, allocate a
354	 * new cursor for that tree and initialize it.
355	 */
356	if (dbc_orig->internal->opd != NULL) {
357		if ((ret =
358		   __dbc_idup(dbc_orig->internal->opd, &dbc_nopd, flags)) != 0)
359			goto err;
360		dbc_n->internal->opd = dbc_nopd;
361	}
362	return (0);
363
364err:	if (dbc_n != NULL)
365		(void)__dbc_close(dbc_n);
366	if (dbc_nopd != NULL)
367		(void)__dbc_close(dbc_nopd);
368
369	return (ret);
370}
371
372/*
373 * __dbc_idup --
374 *	Internal version of __dbc_dup.
375 *
376 * PUBLIC: int __dbc_idup __P((DBC *, DBC **, u_int32_t));
377 */
378int
379__dbc_idup(dbc_orig, dbcp, flags)
380	DBC *dbc_orig, **dbcp;
381	u_int32_t flags;
382{
383	DB *dbp;
384	DBC *dbc_n;
385	DBC_INTERNAL *int_n, *int_orig;
386	ENV *env;
387	int ret;
388
389	dbp = dbc_orig->dbp;
390	dbc_n = *dbcp;
391	env = dbp->env;
392
393	if ((ret = __db_cursor_int(dbp, dbc_orig->thread_info,
394	    dbc_orig->txn, dbc_orig->dbtype, dbc_orig->internal->root,
395	    F_ISSET(dbc_orig, DBC_OPD) | DBC_DUPLICATE,
396	    dbc_orig->locker, &dbc_n)) != 0)
397		return (ret);
398
399	/* Position the cursor if requested, acquiring the necessary locks. */
400	if (flags == DB_POSITION) {
401		int_n = dbc_n->internal;
402		int_orig = dbc_orig->internal;
403
404		dbc_n->flags |= dbc_orig->flags & ~DBC_OWN_LID;
405
406		int_n->indx = int_orig->indx;
407		int_n->pgno = int_orig->pgno;
408		int_n->root = int_orig->root;
409		int_n->lock_mode = int_orig->lock_mode;
410
411		switch (dbc_orig->dbtype) {
412		case DB_QUEUE:
413			if ((ret = __qamc_dup(dbc_orig, dbc_n)) != 0)
414				goto err;
415			break;
416		case DB_BTREE:
417		case DB_RECNO:
418			if ((ret = __bamc_dup(dbc_orig, dbc_n)) != 0)
419				goto err;
420			break;
421		case DB_HASH:
422			if ((ret = __hamc_dup(dbc_orig, dbc_n)) != 0)
423				goto err;
424			break;
425		case DB_UNKNOWN:
426		default:
427			ret = __db_unknown_type(env,
428			    "__dbc_idup", dbc_orig->dbtype);
429			goto err;
430		}
431	}
432
433	/* Copy the locking flags to the new cursor. */
434	F_SET(dbc_n, F_ISSET(dbc_orig,
435	    DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED | DBC_WRITECURSOR));
436
437	/*
438	 * If we're in CDB and this isn't an offpage dup cursor, then
439	 * we need to get a lock for the duplicated cursor.
440	 */
441	if (CDB_LOCKING(env) && !F_ISSET(dbc_n, DBC_OPD) &&
442	    (ret = __lock_get(env, dbc_n->locker, 0,
443	    &dbc_n->lock_dbt, F_ISSET(dbc_orig, DBC_WRITECURSOR) ?
444	    DB_LOCK_IWRITE : DB_LOCK_READ, &dbc_n->mylock)) != 0)
445		goto err;
446
447	dbc_n->priority = dbc_orig->priority;
448	*dbcp = dbc_n;
449	return (0);
450
451err:	(void)__dbc_close(dbc_n);
452	return (ret);
453}
454
455/*
456 * __dbc_newopd --
457 *	Create a new off-page duplicate cursor.
458 *
459 * PUBLIC: int __dbc_newopd __P((DBC *, db_pgno_t, DBC *, DBC **));
460 */
461int
462__dbc_newopd(dbc_parent, root, oldopd, dbcp)
463	DBC *dbc_parent;
464	db_pgno_t root;
465	DBC *oldopd;
466	DBC **dbcp;
467{
468	DB *dbp;
469	DBC *opd;
470	DBTYPE dbtype;
471	int ret;
472
473	dbp = dbc_parent->dbp;
474	dbtype = (dbp->dup_compare == NULL) ? DB_RECNO : DB_BTREE;
475
476	/*
477	 * On failure, we want to default to returning the old off-page dup
478	 * cursor, if any;  our caller can't be left with a dangling pointer
479	 * to a freed cursor.  On error the only allowable behavior is to
480	 * close the cursor (and the old OPD cursor it in turn points to), so
481	 * this should be safe.
482	 */
483	*dbcp = oldopd;
484
485	if ((ret = __db_cursor_int(dbp, dbc_parent->thread_info,
486	    dbc_parent->txn,
487	    dbtype, root, DBC_OPD, dbc_parent->locker, &opd)) != 0)
488		return (ret);
489
490	opd->priority = dbc_parent->priority;
491	*dbcp = opd;
492
493	/*
494	 * Check to see if we already have an off-page dup cursor that we've
495	 * passed in.  If we do, close it.  It'd be nice to use it again
496	 * if it's a cursor belonging to the right tree, but if we're doing
497	 * a cursor-relative operation this might not be safe, so for now
498	 * we'll take the easy way out and always close and reopen.
499	 *
500	 * Note that under no circumstances do we want to close the old
501	 * cursor without returning a valid new one;  we don't want to
502	 * leave the main cursor in our caller with a non-NULL pointer
503	 * to a freed off-page dup cursor.
504	 */
505	if (oldopd != NULL && (ret = __dbc_close(oldopd)) != 0)
506		return (ret);
507
508	return (0);
509}
510
511/*
512 * __dbc_get --
513 *	Get using a cursor.
514 *
515 * PUBLIC: int __dbc_get __P((DBC *, DBT *, DBT *, u_int32_t));
516 */
517int
518__dbc_get(dbc_arg, key, data, flags)
519	DBC *dbc_arg;
520	DBT *key, *data;
521	u_int32_t flags;
522{
523	DB *dbp;
524	DBC *dbc, *dbc_n, *opd;
525	DBC_INTERNAL *cp, *cp_n;
526	DB_MPOOLFILE *mpf;
527	ENV *env;
528	db_pgno_t pgno;
529	db_indx_t indx_off;
530	u_int32_t multi, orig_ulen, tmp_flags, tmp_read_uncommitted, tmp_rmw;
531	u_int8_t type;
532	int key_small, ret, t_ret;
533
534	COMPQUIET(orig_ulen, 0);
535
536	key_small = 0;
537
538	/*
539	 * Cursor Cleanup Note:
540	 * All of the cursors passed to the underlying access methods by this
541	 * routine are duplicated cursors.  On return, any referenced pages
542	 * will be discarded, and, if the cursor is not intended to be used
543	 * again, the close function will be called.  So, pages/locks that
544	 * the cursor references do not need to be resolved by the underlying
545	 * functions.
546	 */
547	dbp = dbc_arg->dbp;
548	env = dbp->env;
549	mpf = dbp->mpf;
550	dbc_n = NULL;
551	opd = NULL;
552
553	/* Clear OR'd in additional bits so we can check for flag equality. */
554	tmp_rmw = LF_ISSET(DB_RMW);
555	LF_CLR(DB_RMW);
556
557	tmp_read_uncommitted =
558	    LF_ISSET(DB_READ_UNCOMMITTED) &&
559	    !F_ISSET(dbc_arg, DBC_READ_UNCOMMITTED);
560	LF_CLR(DB_READ_UNCOMMITTED);
561
562	multi = LF_ISSET(DB_MULTIPLE|DB_MULTIPLE_KEY);
563	LF_CLR(DB_MULTIPLE|DB_MULTIPLE_KEY);
564
565	/*
566	 * Return a cursor's record number.  It has nothing to do with the
567	 * cursor get code except that it was put into the interface.
568	 */
569	if (flags == DB_GET_RECNO) {
570		if (tmp_rmw)
571			F_SET(dbc_arg, DBC_RMW);
572		if (tmp_read_uncommitted)
573			F_SET(dbc_arg, DBC_READ_UNCOMMITTED);
574		ret = __bamc_rget(dbc_arg, data);
575		if (tmp_rmw)
576			F_CLR(dbc_arg, DBC_RMW);
577		if (tmp_read_uncommitted)
578			F_CLR(dbc_arg, DBC_READ_UNCOMMITTED);
579		return (ret);
580	}
581
582	if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
583		CDB_LOCKING_INIT(env, dbc_arg);
584
585	/* Don't return the key or data if it was passed to us. */
586	if (!DB_RETURNS_A_KEY(dbp, flags))
587		F_SET(key, DB_DBT_ISSET);
588	if (flags == DB_GET_BOTH &&
589	    (dbp->dup_compare == NULL || dbp->dup_compare == __bam_defcmp))
590		F_SET(data, DB_DBT_ISSET);
591
592	/*
593	 * If we have an off-page duplicates cursor, and the operation applies
594	 * to it, perform the operation.  Duplicate the cursor and call the
595	 * underlying function.
596	 *
597	 * Off-page duplicate trees are locked in the primary tree, that is,
598	 * we acquire a write lock in the primary tree and no locks in the
599	 * off-page dup tree.  If the DB_RMW flag was specified and the get
600	 * operation is done in an off-page duplicate tree, call the primary
601	 * cursor's upgrade routine first.
602	 */
603	cp = dbc_arg->internal;
604	if (cp->opd != NULL &&
605	    (flags == DB_CURRENT || flags == DB_GET_BOTHC ||
606	    flags == DB_NEXT || flags == DB_NEXT_DUP ||
607	    flags == DB_PREV || flags == DB_PREV_DUP)) {
608		if (tmp_rmw && (ret = dbc_arg->am_writelock(dbc_arg)) != 0)
609			goto err;
610		if (F_ISSET(dbc_arg, DBC_TRANSIENT))
611			opd = cp->opd;
612		else if ((ret = __dbc_idup(cp->opd, &opd, DB_POSITION)) != 0)
613			goto err;
614
615		switch (ret = opd->am_get(opd, key, data, flags, NULL)) {
616		case 0:
617			goto done;
618		case DB_NOTFOUND:
619			/*
620			 * Translate DB_NOTFOUND failures for the DB_NEXT and
621			 * DB_PREV operations into a subsequent operation on
622			 * the parent cursor.
623			 */
624			if (flags == DB_NEXT || flags == DB_PREV) {
625				if ((ret = __dbc_close(opd)) != 0)
626					goto err;
627				opd = NULL;
628				if (F_ISSET(dbc_arg, DBC_TRANSIENT))
629					cp->opd = NULL;
630				break;
631			}
632			goto err;
633		default:
634			goto err;
635		}
636	} else if (cp->opd != NULL && F_ISSET(dbc_arg, DBC_TRANSIENT)) {
637		if ((ret = __dbc_close(cp->opd)) != 0)
638			goto err;
639		cp->opd = NULL;
640	}
641
642	/*
643	 * Perform an operation on the main cursor.  Duplicate the cursor,
644	 * upgrade the lock as required, and call the underlying function.
645	 */
646	switch (flags) {
647	case DB_CURRENT:
648	case DB_GET_BOTHC:
649	case DB_NEXT:
650	case DB_NEXT_DUP:
651	case DB_NEXT_NODUP:
652	case DB_PREV:
653	case DB_PREV_DUP:
654	case DB_PREV_NODUP:
655		tmp_flags = DB_POSITION;
656		break;
657	default:
658		tmp_flags = 0;
659		break;
660	}
661
662	if (tmp_read_uncommitted)
663		F_SET(dbc_arg, DBC_READ_UNCOMMITTED);
664
665	/*
666	 * If this cursor is going to be closed immediately, we don't
667	 * need to take precautions to clean it up on error.
668	 */
669	if (F_ISSET(dbc_arg, DBC_TRANSIENT))
670		dbc_n = dbc_arg;
671	else {
672		ret = __dbc_idup(dbc_arg, &dbc_n, tmp_flags);
673		if (tmp_read_uncommitted)
674			F_CLR(dbc_arg, DBC_READ_UNCOMMITTED);
675
676		if (ret != 0)
677			goto err;
678		COPY_RET_MEM(dbc_arg, dbc_n);
679	}
680
681	if (tmp_rmw)
682		F_SET(dbc_n, DBC_RMW);
683
684	switch (multi) {
685	case DB_MULTIPLE:
686		F_SET(dbc_n, DBC_MULTIPLE);
687		break;
688	case DB_MULTIPLE_KEY:
689		F_SET(dbc_n, DBC_MULTIPLE_KEY);
690		break;
691	case DB_MULTIPLE | DB_MULTIPLE_KEY:
692		F_SET(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
693		break;
694	case 0:
695	default:
696		break;
697	}
698
699retry:	pgno = PGNO_INVALID;
700	ret = dbc_n->am_get(dbc_n, key, data, flags, &pgno);
701	if (tmp_rmw)
702		F_CLR(dbc_n, DBC_RMW);
703	if (tmp_read_uncommitted)
704		F_CLR(dbc_arg, DBC_READ_UNCOMMITTED);
705	F_CLR(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY);
706	if (ret != 0)
707		goto err;
708
709	cp_n = dbc_n->internal;
710
711	/*
712	 * We may be referencing a new off-page duplicates tree.  Acquire
713	 * a new cursor and call the underlying function.
714	 */
715	if (pgno != PGNO_INVALID) {
716		if ((ret = __dbc_newopd(dbc_arg,
717		    pgno, cp_n->opd, &cp_n->opd)) != 0)
718			goto err;
719
720		switch (flags) {
721		case DB_FIRST:
722		case DB_NEXT:
723		case DB_NEXT_NODUP:
724		case DB_SET:
725		case DB_SET_RECNO:
726		case DB_SET_RANGE:
727			tmp_flags = DB_FIRST;
728			break;
729		case DB_LAST:
730		case DB_PREV:
731		case DB_PREV_NODUP:
732			tmp_flags = DB_LAST;
733			break;
734		case DB_GET_BOTH:
735		case DB_GET_BOTHC:
736		case DB_GET_BOTH_RANGE:
737			tmp_flags = flags;
738			break;
739		default:
740			ret = __db_unknown_flag(env, "__dbc_get", flags);
741			goto err;
742		}
743		ret = cp_n->opd->am_get(cp_n->opd, key, data, tmp_flags, NULL);
744		/*
745		 * Another cursor may have deleted all of the off-page
746		 * duplicates, so for DB_NEXT and DB_PREV operations we need to
747		 * retry on the parent cursor.
748		 */
749		switch (ret) {
750		case 0:
751			break;
752		case DB_NOTFOUND:
753			/*
754			 * Translate DB_NOTFOUND failures for the DB_NEXT and
755			 * DB_PREV operations into a subsequent operation on
756			 * the parent cursor.
757			 */
758			if (flags == DB_NEXT || flags == DB_PREV) {
759				if ((ret = __dbc_close(cp_n->opd)) != 0)
760					goto err;
761				cp_n->opd = NULL;
762				goto retry;
763			}
764			goto err;
765		default:
766			goto err;
767		}
768	}
769
770done:	/*
771	 * Return a key/data item.  The only exception is that we don't return
772	 * a key if the user already gave us one, that is, if the DB_SET flag
773	 * was set.  The DB_SET flag is necessary.  In a Btree, the user's key
774	 * doesn't have to be the same as the key stored the tree, depending on
775	 * the magic performed by the comparison function.  As we may not have
776	 * done any key-oriented operation here, the page reference may not be
777	 * valid.  Fill it in as necessary.  We don't have to worry about any
778	 * locks, the cursor must already be holding appropriate locks.
779	 *
780	 * XXX
781	 * If not a Btree and DB_SET_RANGE is set, we shouldn't return a key
782	 * either, should we?
783	 */
784	cp_n = dbc_n == NULL ? dbc_arg->internal : dbc_n->internal;
785	if (!F_ISSET(key, DB_DBT_ISSET)) {
786		if (cp_n->page == NULL && (ret = __memp_fget(mpf, &cp_n->pgno,
787		    dbc_arg->thread_info, dbc_arg->txn, 0, &cp_n->page)) != 0)
788			goto err;
789
790		if ((ret = __db_ret(dbp, dbc_arg->thread_info,
791		    dbc_arg->txn, cp_n->page, cp_n->indx, key,
792		    &dbc_arg->rkey->data, &dbc_arg->rkey->ulen)) != 0) {
793			/*
794			 * If the key DBT is too small, we still want to return
795			 * the size of the data.  Otherwise applications are
796			 * forced to check each one with a separate call.  We
797			 * don't want to copy the data, so we set the ulen to
798			 * zero before calling __db_ret.
799			 */
800			if (ret == DB_BUFFER_SMALL &&
801			    F_ISSET(data, DB_DBT_USERMEM)) {
802				key_small = 1;
803				orig_ulen = data->ulen;
804				data->ulen = 0;
805			} else
806				goto err;
807		}
808	}
809	if (multi != 0) {
810		/*
811		 * Even if fetching from the OPD cursor we need a duplicate
812		 * primary cursor if we are going after multiple keys.
813		 */
814		if (dbc_n == NULL) {
815			/*
816			 * Non-"_KEY" DB_MULTIPLE doesn't move the main cursor,
817			 * so it's safe to just use dbc_arg, unless dbc_arg
818			 * has an open OPD cursor whose state might need to
819			 * be preserved.
820			 */
821			if ((!(multi & DB_MULTIPLE_KEY) &&
822			    dbc_arg->internal->opd == NULL) ||
823			    F_ISSET(dbc_arg, DBC_TRANSIENT))
824				dbc_n = dbc_arg;
825			else {
826				if ((ret = __dbc_idup(dbc_arg,
827				    &dbc_n, DB_POSITION)) != 0)
828					goto err;
829				if ((ret = dbc_n->am_get(dbc_n,
830				    key, data, DB_CURRENT, &pgno)) != 0)
831					goto err;
832			}
833			cp_n = dbc_n->internal;
834		}
835
836		/*
837		 * If opd is set then we dupped the opd that we came in with.
838		 * When we return we may have a new opd if we went to another
839		 * key.
840		 */
841		if (opd != NULL) {
842			DB_ASSERT(env, cp_n->opd == NULL);
843			cp_n->opd = opd;
844			opd = NULL;
845		}
846
847		/*
848		 * Bulk get doesn't use __db_retcopy, so data.size won't
849		 * get set up unless there is an error.  Assume success
850		 * here.  This is the only call to am_bulk, and it avoids
851		 * setting it exactly the same everywhere.  If we have an
852		 * DB_BUFFER_SMALL error, it'll get overwritten with the
853		 * needed value.
854		 */
855		data->size = data->ulen;
856		ret = dbc_n->am_bulk(dbc_n, data, flags | multi);
857	} else if (!F_ISSET(data, DB_DBT_ISSET)) {
858		dbc = opd != NULL ? opd : cp_n->opd != NULL ? cp_n->opd : dbc_n;
859		cp = dbc->internal;
860		if (cp->page == NULL &&
861		    (ret = __memp_fget(mpf, &cp->pgno,
862			 dbc_arg->thread_info, dbc->txn, 0, &cp->page)) != 0)
863			goto err;
864
865		type = TYPE(cp->page);
866		indx_off = ((type == P_LBTREE ||
867		    type == P_HASH || type == P_HASH_UNSORTED) ? O_INDX : 0);
868		ret = __db_ret(dbp,
869		    dbc->thread_info, dbc->txn, cp->page, cp->indx + indx_off,
870		    data, &dbc_arg->rdata->data, &dbc_arg->rdata->ulen);
871	}
872
873err:	/* Don't pass DB_DBT_ISSET back to application level, error or no. */
874	F_CLR(key, DB_DBT_ISSET);
875	F_CLR(data, DB_DBT_ISSET);
876
877	/* Cleanup and cursor resolution. */
878	if (opd != NULL) {
879		/*
880		 * To support dirty reads we must reget the write lock
881		 * if we have just stepped off a deleted record.
882		 * Since the OPD cursor does not know anything
883		 * about the referencing page or cursor we need
884		 * to peek at the OPD cursor and get the lock here.
885		 */
886		if (F_ISSET(dbc_arg->dbp, DB_AM_READ_UNCOMMITTED) &&
887		     F_ISSET((BTREE_CURSOR *)
888		     dbc_arg->internal->opd->internal, C_DELETED))
889			if ((t_ret =
890			    dbc_arg->am_writelock(dbc_arg)) != 0 && ret == 0)
891				ret = t_ret;
892		if ((t_ret = __dbc_cleanup(
893		    dbc_arg->internal->opd, opd, ret)) != 0 && ret == 0)
894			ret = t_ret;
895
896	}
897
898	if (key_small) {
899		data->ulen = orig_ulen;
900		if (ret == 0)
901			ret = DB_BUFFER_SMALL;
902	}
903
904	if ((t_ret = __dbc_cleanup(dbc_arg, dbc_n, ret)) != 0 &&
905	    (ret == 0 || ret == DB_BUFFER_SMALL))
906		ret = t_ret;
907
908	if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT)
909		CDB_LOCKING_DONE(env, dbc_arg);
910	return (ret);
911}
912
913/*
914 * __dbc_put --
915 *	Put using a cursor.
916 *
917 * PUBLIC: int __dbc_put __P((DBC *, DBT *, DBT *, u_int32_t));
918 */
919int
920__dbc_put(dbc_arg, key, data, flags)
921	DBC *dbc_arg;
922	DBT *key, *data;
923	u_int32_t flags;
924{
925	DB *dbp, *sdbp;
926	DBC *dbc_n, *fdbc, *oldopd, *opd, *sdbc, *pdbc;
927	DBT *all_skeys, *skeyp, *tskeyp;
928	DBT fdata, olddata, oldpkey, newdata, pkey, temppkey, tempskey;
929	ENV *env;
930	db_pgno_t pgno;
931	int cmp, have_oldrec, ispartial, nodel, re_pad, ret, s_count, t_ret;
932	u_int32_t re_len, nskey, rmw, size, tmp_flags;
933
934	/*
935	 * Cursor Cleanup Note:
936	 * All of the cursors passed to the underlying access methods by this
937	 * routine are duplicated cursors.  On return, any referenced pages
938	 * will be discarded, and, if the cursor is not intended to be used
939	 * again, the close function will be called.  So, pages/locks that
940	 * the cursor references do not need to be resolved by the underlying
941	 * functions.
942	 */
943	dbp = dbc_arg->dbp;
944	env = dbp->env;
945	sdbp = NULL;
946	fdbc = pdbc = dbc_n = NULL;
947	all_skeys = NULL;
948	memset(&newdata, 0, sizeof(DBT));
949	ret = s_count = 0;
950
951	/*
952	 * We do multiple cursor operations in some cases and subsequently
953	 * access the data DBT information.  Set DB_DBT_MALLOC so we don't risk
954	 * modification of the data between our uses of it.
955	 */
956	memset(&olddata, 0, sizeof(DBT));
957	F_SET(&olddata, DB_DBT_MALLOC);
958
959	/*
960	 * Putting to secondary indices is forbidden;  when we need
961	 * to internally update one, we'll call this with a private
962	 * synonym for DB_KEYLAST, DB_UPDATE_SECONDARY, which does
963	 * the right thing but won't return an error from cputchk().
964	 */
965	if (flags == DB_UPDATE_SECONDARY)
966		flags = DB_KEYLAST;
967
968	CDB_LOCKING_INIT(env, dbc_arg);
969
970	/*
971	 * Check to see if we are a primary and have secondary indices.
972	 * If we are not, we save ourselves a good bit of trouble and
973	 * just skip to the "normal" put.
974	 */
975	if (LIST_FIRST(&dbp->s_secondaries) == NULL)
976		goto skip_s_update;
977
978	/*
979	 * We have at least one secondary which we may need to update.
980	 *
981	 * There is a rather vile locking issue here.  Secondary gets
982	 * will always involve acquiring a read lock in the secondary,
983	 * then acquiring a read lock in the primary.  Ideally, we
984	 * would likewise perform puts by updating all the secondaries
985	 * first, then doing the actual put in the primary, to avoid
986	 * deadlock (since having multiple threads doing secondary
987	 * gets and puts simultaneously is probably a common case).
988	 *
989	 * However, if this put is a put-overwrite--and we have no way to
990	 * tell in advance whether it will be--we may need to delete
991	 * an outdated secondary key.  In order to find that old
992	 * secondary key, we need to get the record we're overwriting,
993	 * before we overwrite it.
994	 *
995	 * (XXX: It would be nice to avoid this extra get, and have the
996	 * underlying put routines somehow pass us the old record
997	 * since they need to traverse the tree anyway.  I'm saving
998	 * this optimization for later, as it's a lot of work, and it
999	 * would be hard to fit into this locking paradigm anyway.)
1000	 *
1001	 * The simple thing to do would be to go get the old record before
1002	 * we do anything else.  Unfortunately, though, doing so would
1003	 * violate our "secondary, then primary" lock acquisition
1004	 * ordering--even in the common case where no old primary record
1005	 * exists, we'll still acquire and keep a lock on the page where
1006	 * we're about to do the primary insert.
1007	 *
1008	 * To get around this, we do the following gyrations, which
1009	 * hopefully solve this problem in the common case:
1010	 *
1011	 * 1) If this is a c_put(DB_CURRENT), go ahead and get the
1012	 *    old record.  We already hold the lock on this page in
1013	 *    the primary, so no harm done, and we'll need the primary
1014	 *    key (which we weren't passed in this case) to do any
1015	 *    secondary puts anyway.
1016	 *
1017	 * 2) If we're doing a partial put, we need to perform the
1018	 *    get on the primary key right away, since we don't have
1019	 *    the whole datum that the secondary key is based on.
1020	 *    We may also need to pad out the record if the primary
1021	 *    has a fixed record length.
1022	 *
1023	 * 3) Loop through the secondary indices, putting into each a
1024	 *    new secondary key that corresponds to the new record.
1025	 *
1026	 * 4) If we haven't done so in (1) or (2), get the old primary
1027	 *    key/data pair.  If one does not exist--the common case--we're
1028	 *    done with secondary indices, and can go straight on to the
1029	 *    primary put.
1030	 *
1031	 * 5) If we do have an old primary key/data pair, however, we need
1032	 *    to loop through all the secondaries a second time and delete
1033	 *    the old secondary in each.
1034	 */
1035	memset(&pkey, 0, sizeof(DBT));
1036	s_count = __db_s_count(dbp);
1037	if ((ret = __os_calloc(
1038	    env, (u_int)s_count, sizeof(DBT), &all_skeys)) != 0)
1039		goto err;
1040	have_oldrec = nodel = 0;
1041
1042	/*
1043	 * Primary indices can't have duplicates, so only DB_CURRENT,
1044	 * DB_KEYFIRST, and DB_KEYLAST make any sense.  Other flags
1045	 * should have been caught by the checking routine, but
1046	 * add a sprinkling of paranoia.
1047	 */
1048	DB_ASSERT(env, flags == DB_CURRENT || flags == DB_KEYFIRST ||
1049	      flags == DB_KEYLAST || flags == DB_NOOVERWRITE);
1050
1051	/*
1052	 * We'll want to use DB_RMW in a few places, but it's only legal
1053	 * when locking is on.
1054	 */
1055	rmw = STD_LOCKING(dbc_arg) ? DB_RMW : 0;
1056
1057	if (flags == DB_CURRENT) {		/* Step 1. */
1058		/*
1059		 * This is safe to do on the cursor we already have;
1060		 * error or no, it won't move.
1061		 *
1062		 * We use DB_RMW for all of these gets because we'll be
1063		 * writing soon enough in the "normal" put code.  In
1064		 * transactional databases we'll hold those write locks
1065		 * even if we close the cursor we're reading with.
1066		 *
1067		 * The DB_KEYEMPTY return needs special handling -- if the
1068		 * cursor is on a deleted key, we return DB_NOTFOUND.
1069		 */
1070		ret = __dbc_get(dbc_arg, &pkey, &olddata, rmw | DB_CURRENT);
1071		if (ret == DB_KEYEMPTY)
1072			ret = DB_NOTFOUND;
1073		if (ret != 0)
1074			goto err;
1075
1076		have_oldrec = 1; /* We've looked for the old record. */
1077	} else {
1078		/* Set pkey so we can use &pkey everywhere instead of key.  */
1079		pkey.data = key->data;
1080		pkey.size = key->size;
1081	}
1082
1083	/*
1084	 * Check for partial puts (step 2).
1085	 */
1086	if (F_ISSET(data, DB_DBT_PARTIAL)) {
1087		if (!have_oldrec && !nodel) {
1088			/*
1089			 * We're going to have to search the tree for the
1090			 * specified key.  Dup a cursor (so we have the same
1091			 * locking info) and do a c_get.
1092			 */
1093			if ((ret = __dbc_idup(dbc_arg, &pdbc, 0)) != 0)
1094				goto err;
1095
1096			/* We should have gotten DB_CURRENT in step 1. */
1097			DB_ASSERT(env, flags != DB_CURRENT);
1098
1099			ret = __dbc_get(pdbc, &pkey, &olddata, rmw | DB_SET);
1100			if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
1101				nodel = 1;
1102				ret = 0;
1103			}
1104			if ((t_ret = __dbc_close(pdbc)) != 0)
1105				ret = t_ret;
1106			if (ret != 0)
1107				goto err;
1108
1109			have_oldrec = 1;
1110		}
1111
1112		/*
1113		 * Now build the new datum from olddata and the partial data we
1114		 * were given.  It's okay to do this if no record was returned
1115		 * above: a partial put on an empty record is allowed, if a
1116		 * little strange.  The data is zero-padded.
1117		 */
1118		if ((ret =
1119		    __db_buildpartial(dbp, &olddata, data, &newdata)) != 0)
1120			goto err;
1121		ispartial = 1;
1122	} else
1123		ispartial = 0;
1124
1125	/*
1126	 * Handle fixed-length records.  If the primary database has
1127	 * fixed-length records, we need to pad out the datum before
1128	 * we pass it into the callback function;  we always index the
1129	 * "real" record.
1130	 */
1131	if ((dbp->type == DB_RECNO && F_ISSET(dbp, DB_AM_FIXEDLEN)) ||
1132	    (dbp->type == DB_QUEUE)) {
1133		if (dbp->type == DB_QUEUE) {
1134			re_len = ((QUEUE *)dbp->q_internal)->re_len;
1135			re_pad = ((QUEUE *)dbp->q_internal)->re_pad;
1136		} else {
1137			re_len = ((BTREE *)dbp->bt_internal)->re_len;
1138			re_pad = ((BTREE *)dbp->bt_internal)->re_pad;
1139		}
1140
1141		size = ispartial ? newdata.size : data->size;
1142		if (size > re_len) {
1143			ret = __db_rec_toobig(env, size, re_len);
1144			goto err;
1145		} else if (size < re_len) {
1146			/*
1147			 * If we're not doing a partial put, copy
1148			 * data->data into newdata.data, then pad out
1149			 * newdata.data.
1150			 *
1151			 * If we're doing a partial put, the data
1152			 * we want are already in newdata.data;  we
1153			 * just need to pad.
1154			 *
1155			 * Either way, realloc is safe.
1156			 */
1157			if ((ret =
1158			    __os_realloc(env, re_len, &newdata.data)) != 0)
1159				goto err;
1160			if (!ispartial)
1161				memcpy(newdata.data, data->data, size);
1162			memset((u_int8_t *)newdata.data + size, re_pad,
1163			    re_len - size);
1164			newdata.size = re_len;
1165			ispartial = 1;
1166		}
1167	}
1168
1169	/*
1170	 * Loop through the secondaries.  (Step 3.)
1171	 *
1172	 * Note that __db_s_first and __db_s_next will take care of
1173	 * thread-locking and refcounting issues.
1174	 */
1175	for (ret = __db_s_first(dbp, &sdbp), skeyp = all_skeys;
1176	    sdbp != NULL && ret == 0;
1177	    ret = __db_s_next(&sdbp, dbc_arg->txn), ++skeyp) {
1178		DB_ASSERT(env, skeyp - all_skeys < s_count);
1179		/*
1180		 * Don't process this secondary if the key is immutable and we
1181		 * know that the old record exists.  This optimization can't be
1182		 * used if we have not checked for the old record yet.
1183		 */
1184		if (have_oldrec && !nodel &&
1185		    FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
1186			continue;
1187
1188		/*
1189		 * Call the callback for this secondary, to get the
1190		 * appropriate secondary key.
1191		 */
1192		if ((ret = sdbp->s_callback(sdbp,
1193		    &pkey, ispartial ? &newdata : data, skeyp)) != 0) {
1194			/* Not indexing is equivalent to an empty key set. */
1195			if (ret == DB_DONOTINDEX) {
1196				F_SET(skeyp, DB_DBT_MULTIPLE);
1197				skeyp->size = 0;
1198				ret = 0;
1199			} else
1200				goto err;
1201		}
1202
1203		if (sdbp->s_foreign != NULL &&
1204		    (ret = __db_cursor_int(sdbp->s_foreign,
1205		    dbc_arg->thread_info, dbc_arg->txn, sdbp->s_foreign->type,
1206		    PGNO_INVALID, 0, dbc_arg->locker, &fdbc)) != 0)
1207			goto err;
1208
1209		/*
1210		 * Mark the secondary key DBT(s) as set -- that is, the
1211		 * callback returned at least one secondary key.
1212		 *
1213		 * Also, if this secondary index is associated with a foreign
1214		 * database, check that the foreign db contains the key(s) to
1215		 * maintain referential integrity.  Set flags in fdata to avoid
1216		 * mem copying, we just need to know existence.  We need to do
1217		 * this check before setting DB_DBT_ISSET, otherwise __dbc_get
1218		 * will overwrite the flag values.
1219		 */
1220		if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) {
1221#ifdef DIAGNOSTIC
1222			__db_check_skeyset(sdbp, skeyp);
1223#endif
1224			for (tskeyp = (DBT *)skeyp->data, nskey = skeyp->size;
1225			     nskey > 0; nskey--, tskeyp++) {
1226				if (fdbc != NULL) {
1227					memset(&fdata, 0, sizeof(DBT));
1228					F_SET(&fdata,
1229					    DB_DBT_PARTIAL | DB_DBT_USERMEM);
1230					if ((ret = __dbc_get(
1231					    fdbc, tskeyp, &fdata,
1232					    DB_SET | rmw)) == DB_NOTFOUND ||
1233					    ret == DB_KEYEMPTY) {
1234						ret = DB_FOREIGN_CONFLICT;
1235						break;
1236					}
1237				}
1238				F_SET(tskeyp, DB_DBT_ISSET);
1239			}
1240			tskeyp = (DBT *)skeyp->data;
1241			nskey = skeyp->size;
1242		} else {
1243			if (fdbc != NULL) {
1244				memset(&fdata, 0, sizeof(DBT));
1245				F_SET(&fdata, DB_DBT_PARTIAL | DB_DBT_USERMEM);
1246				if ((ret = __dbc_get(fdbc, skeyp, &fdata,
1247				    DB_SET | rmw)) == DB_NOTFOUND ||
1248				    ret == DB_KEYEMPTY)
1249					ret = DB_FOREIGN_CONFLICT;
1250			}
1251			F_SET(skeyp, DB_DBT_ISSET);
1252			tskeyp = skeyp;
1253			nskey = 1;
1254		}
1255		if (fdbc != NULL && (t_ret = __dbc_close(fdbc)) != 0 &&
1256		    ret == 0)
1257			ret = t_ret;
1258		fdbc = NULL;
1259		if (ret != 0)
1260			goto err;
1261
1262		/*
1263		 * If we have the old record, we can generate and remove any
1264		 * old secondary key(s) now.  We can also skip the secondary put
1265		 * if there is no change.
1266		 */
1267		if (have_oldrec) {
1268			if ((ret = __dbc_del_oldskey(sdbp, dbc_arg,
1269			    skeyp, &pkey, &olddata)) == DB_KEYEXIST)
1270				continue;
1271			else if (ret != 0)
1272				goto err;
1273		}
1274		if (nskey == 0)
1275			continue;
1276
1277		/*
1278		 * Open a cursor in this secondary.
1279		 *
1280		 * Use the same locker ID as our primary cursor, so that
1281		 * we're guaranteed that the locks don't conflict (e.g. in CDB
1282		 * or if we're subdatabases that share and want to lock a
1283		 * metadata page).
1284		 */
1285		if ((ret = __db_cursor_int(sdbp, dbc_arg->thread_info,
1286		    dbc_arg->txn, sdbp->type,
1287		    PGNO_INVALID, 0, dbc_arg->locker, &sdbc)) != 0)
1288			goto err;
1289
1290		/*
1291		 * If we're in CDB, updates will fail since the new cursor
1292		 * isn't a writer.  However, we hold the WRITE lock in the
1293		 * primary and will for as long as our new cursor lasts,
1294		 * and the primary and secondary share a lock file ID,
1295		 * so it's safe to consider this a WRITER.  The close
1296		 * routine won't try to put anything because we don't
1297		 * really have a lock.
1298		 */
1299		if (CDB_LOCKING(env)) {
1300			DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
1301			F_SET(sdbc, DBC_WRITER);
1302		}
1303
1304		/*
1305		 * Swap the primary key to the byte order of this secondary, if
1306		 * necessary.  By doing this now, we can compare directly
1307		 * against the data already in the secondary without having to
1308		 * swap it after reading.
1309		 */
1310		SWAP_IF_NEEDED(sdbp, &pkey);
1311
1312		for (; nskey > 0 && ret == 0; nskey--, tskeyp++) {
1313			/* Skip this key if it is already in the database. */
1314			if (!F_ISSET(tskeyp, DB_DBT_ISSET))
1315				continue;
1316
1317			/*
1318			 * There are three cases here--
1319			 * 1) The secondary supports sorted duplicates.
1320			 *	If we attempt to put a secondary/primary pair
1321			 *	that already exists, that's a duplicate
1322			 *	duplicate, and c_put will return DB_KEYEXIST
1323			 *	(see __db_duperr).  This will leave us with
1324			 *	exactly one copy of the secondary/primary pair,
1325			 *	and this is just right--we'll avoid deleting it
1326			 *	later, as the old and new secondaries will
1327			 *	match (since the old secondary is the dup dup
1328			 *	that's already there).
1329			 * 2) The secondary supports duplicates, but they're not
1330			 *	sorted.  We need to avoid putting a duplicate
1331			 *	duplicate, because the matching old and new
1332			 *	secondaries will prevent us from deleting
1333			 *	anything and we'll wind up with two secondary
1334			 *	records that point to the same primary key.  Do
1335			 *	a c_get(DB_GET_BOTH);  only do the put if the
1336			 *	secondary doesn't exist.
1337			 * 3) The secondary doesn't support duplicates at all.
1338			 *	In this case, secondary keys must be unique;
1339			 *	if another primary key already exists for this
1340			 *	secondary key, we have to either overwrite it
1341			 *	or not put this one, and in either case we've
1342			 *	corrupted the secondary index.  Do a
1343			 *	c_get(DB_SET).  If the secondary/primary pair
1344			 *	already exists, do nothing;  if the secondary
1345			 *	exists with a different primary, return an
1346			 *	error;  and if the secondary does not exist,
1347			 *	put it.
1348			 */
1349			if (!F_ISSET(sdbp, DB_AM_DUP)) {
1350				/* Case 3. */
1351				memset(&oldpkey, 0, sizeof(DBT));
1352				F_SET(&oldpkey, DB_DBT_MALLOC);
1353				ret = __dbc_get(sdbc,
1354				    tskeyp, &oldpkey, rmw | DB_SET);
1355				if (ret == 0) {
1356					cmp = __bam_defcmp(sdbp,
1357					    &oldpkey, &pkey);
1358					__os_ufree(env, oldpkey.data);
1359					if (cmp != 0) {
1360						__db_errx(env, "%s%s",
1361			    "Put results in a non-unique secondary key in an ",
1362			    "index not configured to support duplicates");
1363						ret = EINVAL;
1364					}
1365				}
1366				if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
1367					break;
1368			} else if (!F_ISSET(sdbp, DB_AM_DUPSORT)) {
1369				/* Case 2. */
1370				DB_INIT_DBT(tempskey,
1371				    tskeyp->data, tskeyp->size);
1372				DB_INIT_DBT(temppkey,
1373				    pkey.data, pkey.size);
1374				ret = __dbc_get(sdbc, &tempskey, &temppkey,
1375				    rmw | DB_GET_BOTH);
1376				if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
1377					break;
1378			}
1379
1380			ret = __dbc_put(sdbc, tskeyp, &pkey,
1381			    DB_UPDATE_SECONDARY);
1382
1383			/*
1384			 * We don't know yet whether this was a put-overwrite
1385			 * that in fact changed nothing.  If it was, we may get
1386			 * DB_KEYEXIST.  This is not an error.
1387			 */
1388			if (ret == DB_KEYEXIST)
1389				ret = 0;
1390		}
1391
1392		/* Make sure the primary key is back in native byte-order. */
1393		SWAP_IF_NEEDED(sdbp, &pkey);
1394
1395		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
1396			ret = t_ret;
1397
1398		if (ret != 0)
1399			goto err;
1400
1401		/*
1402		 * Mark that we have a key for this secondary so we can check
1403		 * it later before deleting the old one.  We can't set it
1404		 * earlier or it would be cleared in the calls above.
1405		 */
1406		F_SET(skeyp, DB_DBT_ISSET);
1407	}
1408	if (ret != 0)
1409		goto err;
1410
1411	/*
1412	 * If we've already got the old primary key/data pair, the secondary
1413	 * updates are already done.
1414	 */
1415	if (have_oldrec)
1416		goto skip_s_update;
1417
1418	/*
1419	 * If still necessary, go get the old primary key/data.  (Step 4.)
1420	 *
1421	 * See the comments in step 2.  This is real familiar.
1422	 */
1423	if ((ret = __dbc_idup(dbc_arg, &pdbc, 0)) != 0)
1424		goto err;
1425	DB_ASSERT(env, flags != DB_CURRENT);
1426	pkey.data = key->data;
1427	pkey.size = key->size;
1428	ret = __dbc_get(pdbc, &pkey, &olddata, rmw | DB_SET);
1429	if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) {
1430		nodel = 1;
1431		ret = 0;
1432	}
1433	if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
1434		ret = t_ret;
1435	if (ret != 0)
1436		goto err;
1437
1438	/*
1439	 * Check whether we do in fact have an old record we may need to
1440	 * delete.  (Step 5).
1441	 */
1442	if (nodel)
1443		goto skip_s_update;
1444
1445	for (ret = __db_s_first(dbp, &sdbp), skeyp = all_skeys;
1446	    sdbp != NULL && ret == 0;
1447	    ret = __db_s_next(&sdbp, dbc_arg->txn), skeyp++) {
1448		DB_ASSERT(env, skeyp - all_skeys < s_count);
1449		/*
1450		 * Don't process this secondary if the key is immutable.  We
1451		 * know that the old record exists, so this optimization can
1452		 * always be used.
1453		 */
1454		if (FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY))
1455			continue;
1456
1457		if ((ret = __dbc_del_oldskey(sdbp, dbc_arg,
1458		    skeyp, &pkey, &olddata)) != 0 && ret != DB_KEYEXIST)
1459			goto err;
1460	}
1461	if (ret != 0)
1462		goto err;
1463
1464	/* Secondary index updates are now done.  On to the "real" stuff. */
1465
1466skip_s_update:
1467	/*
1468	 * If we have an off-page duplicates cursor, and the operation applies
1469	 * to it, perform the operation.  Duplicate the cursor and call the
1470	 * underlying function.
1471	 *
1472	 * Off-page duplicate trees are locked in the primary tree, that is,
1473	 * we acquire a write lock in the primary tree and no locks in the
1474	 * off-page dup tree.  If the put operation is done in an off-page
1475	 * duplicate tree, call the primary cursor's upgrade routine first.
1476	 */
1477	if (dbc_arg->internal->opd != NULL &&
1478	    (flags == DB_AFTER || flags == DB_BEFORE || flags == DB_CURRENT)) {
1479		/*
1480		 * A special case for hash off-page duplicates.  Hash doesn't
1481		 * support (and is documented not to support) put operations
1482		 * relative to a cursor which references an already deleted
1483		 * item.  For consistency, apply the same criteria to off-page
1484		 * duplicates as well.
1485		 */
1486		if (dbc_arg->dbtype == DB_HASH && F_ISSET(
1487		    ((BTREE_CURSOR *)(dbc_arg->internal->opd->internal)),
1488		    C_DELETED)) {
1489			ret = DB_NOTFOUND;
1490			goto err;
1491		}
1492
1493		if ((ret = dbc_arg->am_writelock(dbc_arg)) != 0 ||
1494		    (ret = __dbc_dup(dbc_arg, &dbc_n, DB_POSITION)) != 0)
1495			goto err;
1496		opd = dbc_n->internal->opd;
1497		if ((ret = opd->am_put(
1498		    opd, key, data, flags, NULL)) != 0)
1499			goto err;
1500		goto done;
1501	}
1502
1503	/*
1504	 * Perform an operation on the main cursor.  Duplicate the cursor,
1505	 * and call the underlying function.
1506	 */
1507	tmp_flags = flags == DB_AFTER ||
1508	    flags == DB_BEFORE || flags == DB_CURRENT ? DB_POSITION : 0;
1509
1510	/*
1511	 * If this cursor is going to be closed immediately, we don't
1512	 * need to take precautions to clean it up on error.
1513	 */
1514	if (F_ISSET(dbc_arg, DBC_TRANSIENT))
1515		dbc_n = dbc_arg;
1516	else if ((ret = __dbc_idup(dbc_arg, &dbc_n, tmp_flags)) != 0)
1517		goto err;
1518
1519	pgno = PGNO_INVALID;
1520	if ((ret = dbc_n->am_put(dbc_n, key, data, flags, &pgno)) != 0)
1521		goto err;
1522
1523	/*
1524	 * We may be referencing a new off-page duplicates tree.  Acquire
1525	 * a new cursor and call the underlying function.
1526	 */
1527	if (pgno != PGNO_INVALID) {
1528		oldopd = dbc_n->internal->opd;
1529		if ((ret = __dbc_newopd(dbc_arg, pgno, oldopd, &opd)) != 0) {
1530			dbc_n->internal->opd = opd;
1531			goto err;
1532		}
1533
1534		dbc_n->internal->opd = opd;
1535
1536		if (flags == DB_NOOVERWRITE)
1537			flags = DB_KEYLAST;
1538		if ((ret = opd->am_put(
1539		    opd, key, data, flags, NULL)) != 0)
1540			goto err;
1541	}
1542
1543done:
1544err:	/* Cleanup and cursor resolution. */
1545	if ((t_ret = __dbc_cleanup(dbc_arg, dbc_n, ret)) != 0 && ret == 0)
1546		ret = t_ret;
1547
1548	/* If newdata or olddata were used, free their buffers. */
1549	if (newdata.data != NULL)
1550		__os_free(env, newdata.data);
1551	if (olddata.data != NULL)
1552		__os_ufree(env, olddata.data);
1553
1554	CDB_LOCKING_DONE(env, dbc_arg);
1555
1556	if (sdbp != NULL &&
1557	    (t_ret = __db_s_done(sdbp, dbc_arg->txn)) != 0 && ret == 0)
1558		ret = t_ret;
1559
1560	for (skeyp = all_skeys; skeyp - all_skeys < s_count; skeyp++) {
1561		if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) {
1562			for (nskey = skeyp->size, tskeyp = (DBT *)skeyp->data;
1563			    nskey > 0;
1564			    nskey--, tskeyp++)
1565				FREE_IF_NEEDED(env, tskeyp);
1566		}
1567		FREE_IF_NEEDED(env, skeyp);
1568	}
1569	if (all_skeys != NULL)
1570		__os_free(env, all_skeys);
1571
1572	return (ret);
1573}
1574
1575/*
1576 * __dbc_del_oldskey --
1577 *	Delete an old secondary key, if necessary.
1578 *	Returns DB_KEYEXIST if the new and old keys match..
1579 */
1580static int
1581__dbc_del_oldskey(sdbp, dbc_arg, skey, pkey, olddata)
1582	DB *sdbp;
1583	DBC *dbc_arg;
1584	DBT *skey, *pkey, *olddata;
1585{
1586	DB *dbp;
1587	DBC *sdbc;
1588	DBT *toldskeyp, *tskeyp;
1589	DBT oldskey, temppkey, tempskey;
1590	ENV *env;
1591	int ret, t_ret;
1592	u_int32_t i, noldskey, nsame, nskey, rmw;
1593
1594	sdbc = NULL;
1595	dbp = sdbp->s_primary;
1596	env = dbp->env;
1597	nsame = 0;
1598	rmw = STD_LOCKING(dbc_arg) ? DB_RMW : 0;
1599
1600	/*
1601	 * Get the old secondary key.
1602	 */
1603	memset(&oldskey, 0, sizeof(DBT));
1604	if ((ret = sdbp->s_callback(sdbp, pkey, olddata, &oldskey)) != 0) {
1605		if (ret == DB_DONOTINDEX ||
1606		    (F_ISSET(&oldskey, DB_DBT_MULTIPLE) && oldskey.size == 0))
1607			/* There's no old key to delete. */
1608			ret = 0;
1609		return (ret);
1610	}
1611
1612	if (F_ISSET(&oldskey, DB_DBT_MULTIPLE)) {
1613#ifdef DIAGNOSTIC
1614		__db_check_skeyset(sdbp, &oldskey);
1615#endif
1616		toldskeyp = (DBT *)oldskey.data;
1617		noldskey = oldskey.size;
1618	} else {
1619		toldskeyp = &oldskey;
1620		noldskey = 1;
1621	}
1622
1623	if (F_ISSET(skey, DB_DBT_MULTIPLE)) {
1624		nskey = skey->size;
1625		skey = (DBT *)skey->data;
1626	} else
1627		nskey = F_ISSET(skey, DB_DBT_ISSET) ? 1 : 0;
1628
1629	for (; noldskey > 0 && ret == 0; noldskey--, toldskeyp++) {
1630		/*
1631		 * Check whether this old secondary key is also a new key
1632		 * before we delete it.  Note that bt_compare is (and must be)
1633		 * set no matter what access method we're in.
1634		 */
1635		for (i = 0, tskeyp = skey; i < nskey; i++, tskeyp++)
1636			if (((BTREE *)sdbp->bt_internal)->bt_compare(sdbp,
1637			    toldskeyp, tskeyp) == 0) {
1638				nsame++;
1639				F_CLR(tskeyp, DB_DBT_ISSET);
1640				break;
1641			}
1642
1643		if (i < nskey) {
1644			FREE_IF_NEEDED(env, toldskeyp);
1645			continue;
1646		}
1647
1648		if (sdbc == NULL) {
1649			if ((ret = __db_cursor_int(sdbp,
1650			    dbc_arg->thread_info, dbc_arg->txn, sdbp->type,
1651			    PGNO_INVALID, 0, dbc_arg->locker, &sdbc)) != 0)
1652				goto err;
1653			if (CDB_LOCKING(env)) {
1654				DB_ASSERT(env,
1655				    sdbc->mylock.off == LOCK_INVALID);
1656				F_SET(sdbc, DBC_WRITER);
1657			}
1658		}
1659
1660		/*
1661		 * Don't let c_get(DB_GET_BOTH) stomp on our data.  Use
1662		 * temporary DBTs instead.
1663		 */
1664		SWAP_IF_NEEDED(sdbp, pkey);
1665		DB_INIT_DBT(temppkey, pkey->data, pkey->size);
1666		DB_INIT_DBT(tempskey, toldskeyp->data, toldskeyp->size);
1667		if ((ret = __dbc_get(sdbc,
1668		    &tempskey, &temppkey, rmw | DB_GET_BOTH)) == 0)
1669			ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY);
1670		else if (ret == DB_NOTFOUND)
1671			ret = __db_secondary_corrupt(dbp);
1672		SWAP_IF_NEEDED(sdbp, pkey);
1673		FREE_IF_NEEDED(env, toldskeyp);
1674	}
1675
1676err:	for (; noldskey > 0; noldskey--, toldskeyp++)
1677		FREE_IF_NEEDED(env, toldskeyp);
1678	FREE_IF_NEEDED(env, &oldskey);
1679	if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
1680		ret = t_ret;
1681	if (ret == 0 && nsame == nskey)
1682		return (DB_KEYEXIST);
1683	return (ret);
1684}
1685
1686/*
1687 * __db_duperr()
1688 *	Error message: we don't currently support sorted duplicate duplicates.
1689 * PUBLIC: int __db_duperr __P((DB *, u_int32_t));
1690 */
1691int
1692__db_duperr(dbp, flags)
1693	DB *dbp;
1694	u_int32_t flags;
1695{
1696
1697	/*
1698	 * If we run into this error while updating a secondary index,
1699	 * don't yell--there's no clean way to pass DB_NODUPDATA in along
1700	 * with DB_UPDATE_SECONDARY, but we may run into this problem
1701	 * in a normal, non-error course of events.
1702	 *
1703	 * !!!
1704	 * If and when we ever permit duplicate duplicates in sorted-dup
1705	 * databases, we need to either change the secondary index code
1706	 * to check for dup dups, or we need to maintain the implicit
1707	 * "DB_NODUPDATA" behavior for databases with DB_AM_SECONDARY set.
1708	 */
1709	if (flags != DB_NODUPDATA && !F_ISSET(dbp, DB_AM_SECONDARY))
1710		__db_errx(dbp->env,
1711		    "Duplicate data items are not supported with sorted data");
1712	return (DB_KEYEXIST);
1713}
1714
1715/*
1716 * __dbc_cleanup --
1717 *	Clean up duplicate cursors.
1718 */
1719static int
1720__dbc_cleanup(dbc, dbc_n, failed)
1721	DBC *dbc, *dbc_n;
1722	int failed;
1723{
1724	DB *dbp;
1725	DBC *opd;
1726	DBC_INTERNAL *internal;
1727	DB_MPOOLFILE *mpf;
1728	int ret, t_ret;
1729
1730	dbp = dbc->dbp;
1731	mpf = dbp->mpf;
1732	internal = dbc->internal;
1733	ret = 0;
1734
1735	/* Discard any pages we're holding. */
1736	if (internal->page != NULL) {
1737		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
1738		     internal->page, dbc->priority)) != 0 && ret == 0)
1739			ret = t_ret;
1740		internal->page = NULL;
1741	}
1742	opd = internal->opd;
1743	if (opd != NULL && opd->internal->page != NULL) {
1744		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
1745		    opd->internal->page, dbc->priority)) != 0 && ret == 0)
1746			ret = t_ret;
1747		opd->internal->page = NULL;
1748	}
1749
1750	/*
1751	 * If dbc_n is NULL, there's no internal cursor swapping to be done
1752	 * and no dbc_n to close--we probably did the entire operation on an
1753	 * offpage duplicate cursor.  Just return.
1754	 *
1755	 * If dbc and dbc_n are the same, we're either inside a DB->{put/get}
1756	 * operation, and as an optimization we performed the operation on
1757	 * the main cursor rather than on a duplicated one, or we're in a
1758	 * bulk get that can't have moved the cursor (DB_MULTIPLE with the
1759	 * initial c_get operation on an off-page dup cursor).  Just
1760	 * return--either we know we didn't move the cursor, or we're going
1761	 * to close it before we return to application code, so we're sure
1762	 * not to visibly violate the "cursor stays put on error" rule.
1763	 */
1764	if (dbc_n == NULL || dbc == dbc_n)
1765		return (ret);
1766
1767	if (dbc_n->internal->page != NULL) {
1768		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
1769		    dbc_n->internal->page, dbc->priority)) != 0 && ret == 0)
1770			ret = t_ret;
1771		dbc_n->internal->page = NULL;
1772	}
1773	opd = dbc_n->internal->opd;
1774	if (opd != NULL && opd->internal->page != NULL) {
1775		if ((t_ret = __memp_fput(mpf, dbc->thread_info,
1776		     opd->internal->page, dbc->priority)) != 0 && ret == 0)
1777			ret = t_ret;
1778		opd->internal->page = NULL;
1779	}
1780
1781	/*
1782	 * If we didn't fail before entering this routine or just now when
1783	 * freeing pages, swap the interesting contents of the old and new
1784	 * cursors.
1785	 */
1786	if (!failed && ret == 0) {
1787		dbc->internal = dbc_n->internal;
1788		dbc_n->internal = internal;
1789	}
1790
1791	/*
1792	 * Close the cursor we don't care about anymore.  The close can fail,
1793	 * but we only expect DB_LOCK_DEADLOCK failures.  This violates our
1794	 * "the cursor is unchanged on error" semantics, but since all you can
1795	 * do with a DB_LOCK_DEADLOCK failure is close the cursor, I believe
1796	 * that's OK.
1797	 *
1798	 * XXX
1799	 * There's no way to recover from failure to close the old cursor.
1800	 * All we can do is move to the new position and return an error.
1801	 *
1802	 * XXX
1803	 * We might want to consider adding a flag to the cursor, so that any
1804	 * subsequent operations other than close just return an error?
1805	 */
1806	if ((t_ret = __dbc_close(dbc_n)) != 0 && ret == 0)
1807		ret = t_ret;
1808
1809	/*
1810	 * If this was an update that is supporting dirty reads
1811	 * then we may have just swapped our read for a write lock
1812	 * which is held by the surviving cursor.  We need
1813	 * to explicitly downgrade this lock.  The closed cursor
1814	 * may only have had a read lock.
1815	 */
1816	if (F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) &&
1817	    dbc->internal->lock_mode == DB_LOCK_WRITE) {
1818		if ((t_ret =
1819		    __TLPUT(dbc, dbc->internal->lock)) != 0 && ret == 0)
1820			ret = t_ret;
1821		if (t_ret == 0)
1822			dbc->internal->lock_mode = DB_LOCK_WWRITE;
1823	}
1824
1825	return (ret);
1826}
1827
1828/*
1829 * __dbc_secondary_get_pp --
1830 *	This wrapper function for DBC->pget() is the DBC->get() function
1831 *	for a secondary index cursor.
1832 *
1833 * PUBLIC: int __dbc_secondary_get_pp __P((DBC *, DBT *, DBT *, u_int32_t));
1834 */
1835int
1836__dbc_secondary_get_pp(dbc, skey, data, flags)
1837	DBC *dbc;
1838	DBT *skey, *data;
1839	u_int32_t flags;
1840{
1841	DB_ASSERT(dbc->env, F_ISSET(dbc->dbp, DB_AM_SECONDARY));
1842	return (__dbc_pget_pp(dbc, skey, NULL, data, flags));
1843}
1844
1845/*
1846 * __dbc_pget --
1847 *	Get a primary key/data pair through a secondary index.
1848 *
1849 * PUBLIC: int __dbc_pget __P((DBC *, DBT *, DBT *, DBT *, u_int32_t));
1850 */
1851int
1852__dbc_pget(dbc, skey, pkey, data, flags)
1853	DBC *dbc;
1854	DBT *skey, *pkey, *data;
1855	u_int32_t flags;
1856{
1857	DB *pdbp, *sdbp;
1858	DBC *dbc_n, *pdbc;
1859	DBT nullpkey;
1860	u_int32_t save_pkey_flags, tmp_flags, tmp_read_uncommitted, tmp_rmw;
1861	int pkeymalloc, ret, t_ret;
1862
1863	sdbp = dbc->dbp;
1864	pdbp = sdbp->s_primary;
1865	dbc_n = NULL;
1866	pkeymalloc = t_ret = 0;
1867
1868	/*
1869	 * The challenging part of this function is getting the behavior
1870	 * right for all the various permutations of DBT flags.  The
1871	 * next several blocks handle the various cases we need to
1872	 * deal with specially.
1873	 */
1874
1875	/*
1876	 * We may be called with a NULL pkey argument, if we've been
1877	 * wrapped by a 2-DBT get call.  If so, we need to use our
1878	 * own DBT.
1879	 */
1880	if (pkey == NULL) {
1881		memset(&nullpkey, 0, sizeof(DBT));
1882		pkey = &nullpkey;
1883	}
1884
1885	/* Clear OR'd in additional bits so we can check for flag equality. */
1886	tmp_rmw = LF_ISSET(DB_RMW);
1887	LF_CLR(DB_RMW);
1888
1889	tmp_read_uncommitted =
1890	    LF_ISSET(DB_READ_UNCOMMITTED) &&
1891	    !F_ISSET(dbc, DBC_READ_UNCOMMITTED);
1892	LF_CLR(DB_READ_UNCOMMITTED);
1893
1894	/*
1895	 * DB_GET_RECNO is a special case, because we're interested not in
1896	 * the primary key/data pair, but rather in the primary's record
1897	 * number.
1898	 */
1899	if (flags == DB_GET_RECNO) {
1900		if (tmp_rmw)
1901			F_SET(dbc, DBC_RMW);
1902		if (tmp_read_uncommitted)
1903			F_SET(dbc, DBC_READ_UNCOMMITTED);
1904		ret = __dbc_pget_recno(dbc, pkey, data, flags);
1905		if (tmp_rmw)
1906			F_CLR(dbc, DBC_RMW);
1907		if (tmp_read_uncommitted)
1908			F_CLR(dbc, DBC_READ_UNCOMMITTED);
1909		return (ret);
1910	}
1911
1912	/*
1913	 * If the DBTs we've been passed don't have any of the
1914	 * user-specified memory management flags set, we want to make sure
1915	 * we return values using the DBTs dbc->rskey, dbc->rkey, and
1916	 * dbc->rdata, respectively.
1917	 *
1918	 * There are two tricky aspects to this:  first, we need to pass
1919	 * skey and pkey *in* to the initial c_get on the secondary key,
1920	 * since either or both may be looked at by it (depending on the
1921	 * get flag).  Second, we must not use a normal DB->get call
1922	 * on the secondary, even though that's what we want to accomplish,
1923	 * because the DB handle may be free-threaded.  Instead,
1924	 * we open a cursor, then take steps to ensure that we actually use
1925	 * the rkey/rdata from the *secondary* cursor.
1926	 *
1927	 * We accomplish all this by passing in the DBTs we started out
1928	 * with to the c_get, but swapping the contents of rskey and rkey,
1929	 * respectively, into rkey and rdata;  __db_ret will treat them like
1930	 * the normal key/data pair in a c_get call, and will realloc them as
1931	 * need be (this is "step 1").  Then, for "step 2", we swap back
1932	 * rskey/rkey/rdata to normal, and do a get on the primary with the
1933	 * secondary dbc appointed as the owner of the returned-data memory.
1934	 *
1935	 * Note that in step 2, we copy the flags field in case we need to
1936	 * pass down a DB_DBT_PARTIAL or other flag that is compatible with
1937	 * letting DB do the memory management.
1938	 */
1939
1940	/*
1941	 * It is correct, though slightly sick, to attempt a partial get of a
1942	 * primary key.  However, if we do so here, we'll never find the
1943	 * primary record;  clear the DB_DBT_PARTIAL field of pkey just for the
1944	 * duration of the next call.
1945	 */
1946	save_pkey_flags = pkey->flags;
1947	F_CLR(pkey, DB_DBT_PARTIAL);
1948
1949	/*
1950	 * Now we can go ahead with the meat of this call.  First, get the
1951	 * primary key from the secondary index.  (What exactly we get depends
1952	 * on the flags, but the underlying cursor get will take care of the
1953	 * dirty work.)  Duplicate the cursor, in case the later get on the
1954	 * primary fails.
1955	 */
1956	switch (flags) {
1957	case DB_CURRENT:
1958	case DB_GET_BOTHC:
1959	case DB_NEXT:
1960	case DB_NEXT_DUP:
1961	case DB_NEXT_NODUP:
1962	case DB_PREV:
1963	case DB_PREV_DUP:
1964	case DB_PREV_NODUP:
1965		tmp_flags = DB_POSITION;
1966		break;
1967	default:
1968		tmp_flags = 0;
1969		break;
1970	}
1971
1972	if ((ret = __dbc_dup(dbc, &dbc_n, tmp_flags)) != 0)
1973		return (ret);
1974
1975	F_SET(dbc_n, DBC_TRANSIENT);
1976
1977	if (tmp_rmw)
1978		F_SET(dbc_n, DBC_RMW);
1979	if (tmp_read_uncommitted)
1980		F_SET(dbc_n, DBC_READ_UNCOMMITTED);
1981
1982	/*
1983	 * If we've been handed a primary key, it will be in native byte order,
1984	 * so we need to swap it before reading from the secondary.
1985	 */
1986	if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
1987	    flags == DB_GET_BOTH_RANGE)
1988		SWAP_IF_NEEDED(sdbp, pkey);
1989
1990retry:	/* Step 1. */
1991	dbc_n->rdata = dbc->rkey;
1992	dbc_n->rkey = dbc->rskey;
1993	ret = __dbc_get(dbc_n, skey, pkey, flags);
1994	/* Restore pkey's flags in case we stomped the PARTIAL flag. */
1995	pkey->flags = save_pkey_flags;
1996
1997	/*
1998	 * We need to swap the primary key to native byte order if we read it
1999	 * successfully, or if we swapped it on entry above.  We can't return
2000	 * with the application's data modified.
2001	 */
2002	if (ret == 0 || flags == DB_GET_BOTH || flags == DB_GET_BOTHC ||
2003	    flags == DB_GET_BOTH_RANGE)
2004		SWAP_IF_NEEDED(sdbp, pkey);
2005
2006	if (ret != 0)
2007		goto err;
2008
2009	/*
2010	 * Now we're ready for "step 2".  If either or both of pkey and data do
2011	 * not have memory management flags set--that is, if DB is managing
2012	 * their memory--we need to swap around the rkey/rdata structures so
2013	 * that we don't wind up trying to use memory managed by the primary
2014	 * database cursor, which we'll close before we return.
2015	 *
2016	 * !!!
2017	 * If you're carefully following the bouncing ball, you'll note that in
2018	 * the DB-managed case, the buffer hanging off of pkey is the same as
2019	 * dbc->rkey->data.  This is just fine;  we may well realloc and stomp
2020	 * on it when we return, if we're doing a DB_GET_BOTH and need to
2021	 * return a different partial or key (depending on the comparison
2022	 * function), but this is safe.
2023	 *
2024	 * !!!
2025	 * We need to use __db_cursor_int here rather than simply calling
2026	 * pdbp->cursor, because otherwise, if we're in CDB, we'll allocate a
2027	 * new locker ID and leave ourselves open to deadlocks.  (Even though
2028	 * we're only acquiring read locks, we'll still block if there are any
2029	 * waiters.)
2030	 */
2031	if ((ret = __db_cursor_int(pdbp, dbc->thread_info,
2032	    dbc->txn, pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
2033		goto err;
2034
2035	if (tmp_read_uncommitted || F_ISSET(dbc, DBC_READ_UNCOMMITTED))
2036		F_SET(pdbc, DBC_READ_UNCOMMITTED);
2037	if (tmp_rmw || F_ISSET(dbc, DBC_RMW))
2038		F_SET(pdbc, DBC_RMW);
2039	if (F_ISSET(dbc, DBC_READ_COMMITTED))
2040		F_SET(pdbc, DBC_READ_COMMITTED);
2041
2042	/*
2043	 * We're about to use pkey a second time.  If DB_DBT_MALLOC is set on
2044	 * it, we'll leak the memory we allocated the first time.  Thus, set
2045	 * DB_DBT_REALLOC instead so that we reuse that memory instead of
2046	 * leaking it.
2047	 *
2048	 * Alternatively, if the application is handling copying for pkey, we
2049	 * need to take a copy now.  The copy will be freed on exit from
2050	 * __dbc_pget_pp (and we must be coming through there if DB_DBT_USERCOPY
2051	 * is set).  In the case of DB_GET_BOTH_RANGE, the pkey supplied by
2052	 * the application has already been copied in but the value may have
2053	 * changed in the search.  In that case, free the original copy and get
2054	 * a new one.
2055	 *
2056	 * !!!
2057	 * This assumes that the user must always specify a compatible realloc
2058	 * function if a malloc function is specified.  I think this is a
2059	 * reasonable requirement.
2060	 */
2061	if (F_ISSET(pkey, DB_DBT_MALLOC)) {
2062		F_CLR(pkey, DB_DBT_MALLOC);
2063		F_SET(pkey, DB_DBT_REALLOC);
2064		pkeymalloc = 1;
2065	} else if (F_ISSET(pkey, DB_DBT_USERCOPY)) {
2066		if (flags == DB_GET_BOTH_RANGE)
2067			__dbt_userfree(sdbp->env, NULL, pkey, NULL);
2068		if ((ret = __dbt_usercopy(sdbp->env, pkey)) != 0)
2069			goto err;
2070	}
2071
2072	/*
2073	 * Do the actual get.  Set DBC_TRANSIENT since we don't care about
2074	 * preserving the position on error, and it's faster.  SET_RET_MEM so
2075	 * that the secondary DBC owns any returned-data memory.
2076	 */
2077	F_SET(pdbc, DBC_TRANSIENT);
2078	SET_RET_MEM(pdbc, dbc);
2079	ret = __dbc_get(pdbc, pkey, data, DB_SET);
2080
2081	/*
2082	 * If the item wasn't found in the primary, this is a bug; our
2083	 * secondary has somehow gotten corrupted, and contains elements that
2084	 * don't correspond to anything in the primary.  Complain.
2085	 */
2086
2087	/* Now close the primary cursor. */
2088	if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
2089		ret = t_ret;
2090
2091	else if (ret == DB_NOTFOUND) {
2092		if (!F_ISSET(pdbc, DBC_READ_UNCOMMITTED))
2093			ret = __db_secondary_corrupt(pdbp);
2094		else switch (flags) {
2095		case DB_GET_BOTHC:
2096		case DB_NEXT:
2097		case DB_NEXT_DUP:
2098		case DB_NEXT_NODUP:
2099		case DB_PREV:
2100		case DB_PREV_DUP:
2101		case DB_PREV_NODUP:
2102			goto retry;
2103		default:
2104			break;
2105		}
2106	}
2107
2108err:	/* Cleanup and cursor resolution. */
2109	if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0)
2110		ret = t_ret;
2111	if (pkeymalloc) {
2112		/*
2113		 * If pkey had a MALLOC flag, we need to restore it; otherwise,
2114		 * if the user frees the buffer but reuses the DBT without
2115		 * NULL'ing its data field or changing the flags, we may drop
2116		 * core.
2117		 */
2118		F_CLR(pkey, DB_DBT_REALLOC);
2119		F_SET(pkey, DB_DBT_MALLOC);
2120	}
2121
2122	return (ret);
2123}
2124
2125/*
2126 * __dbc_pget_recno --
2127 *	Perform a DB_GET_RECNO c_pget on a secondary index.  Returns
2128 * the secondary's record number in the pkey field and the primary's
2129 * in the data field.
2130 */
2131static int
2132__dbc_pget_recno(sdbc, pkey, data, flags)
2133	DBC *sdbc;
2134	DBT *pkey, *data;
2135	u_int32_t flags;
2136{
2137	DB *pdbp, *sdbp;
2138	DBC *pdbc;
2139	DBT discardme, primary_key;
2140	ENV *env;
2141	db_recno_t oob;
2142	u_int32_t rmw;
2143	int ret, t_ret;
2144
2145	sdbp = sdbc->dbp;
2146	pdbp = sdbp->s_primary;
2147	env = sdbp->env;
2148	pdbc = NULL;
2149	ret = t_ret = 0;
2150
2151	rmw = LF_ISSET(DB_RMW);
2152
2153	memset(&discardme, 0, sizeof(DBT));
2154	F_SET(&discardme, DB_DBT_USERMEM | DB_DBT_PARTIAL);
2155
2156	oob = RECNO_OOB;
2157
2158	/*
2159	 * If the primary is an rbtree, we want its record number, whether
2160	 * or not the secondary is one too.  Fetch the recno into "data".
2161	 *
2162	 * If it's not an rbtree, return RECNO_OOB in "data".
2163	 */
2164	if (F_ISSET(pdbp, DB_AM_RECNUM)) {
2165		/*
2166		 * Get the primary key, so we can find the record number
2167		 * in the primary. (We're uninterested in the secondary key.)
2168		 */
2169		memset(&primary_key, 0, sizeof(DBT));
2170		F_SET(&primary_key, DB_DBT_MALLOC);
2171		if ((ret = __dbc_get(sdbc,
2172		    &discardme, &primary_key, rmw | DB_CURRENT)) != 0)
2173			return (ret);
2174
2175		/*
2176		 * Open a cursor on the primary, set it to the right record,
2177		 * and fetch its recno into "data".
2178		 *
2179		 * (See __dbc_pget for comments on the use of __db_cursor_int.)
2180		 *
2181		 * SET_RET_MEM so that the secondary DBC owns any returned-data
2182		 * memory.
2183		 */
2184		if ((ret = __db_cursor_int(pdbp, sdbc->thread_info, sdbc->txn,
2185		    pdbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
2186			goto perr;
2187		SET_RET_MEM(pdbc, sdbc);
2188		if ((ret = __dbc_get(pdbc,
2189		    &primary_key, &discardme, rmw | DB_SET)) != 0)
2190			goto perr;
2191
2192		ret = __dbc_get(pdbc, &discardme, data, rmw | DB_GET_RECNO);
2193
2194perr:		__os_ufree(env, primary_key.data);
2195		if (pdbc != NULL &&
2196		    (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
2197			ret = t_ret;
2198		if (ret != 0)
2199			return (ret);
2200	} else if ((ret = __db_retcopy(env, data, &oob,
2201		    sizeof(oob), &sdbc->rkey->data, &sdbc->rkey->ulen)) != 0)
2202			return (ret);
2203
2204	/*
2205	 * If the secondary is an rbtree, we want its record number, whether
2206	 * or not the primary is one too.  Fetch the recno into "pkey".
2207	 *
2208	 * If it's not an rbtree, return RECNO_OOB in "pkey".
2209	 */
2210	if (F_ISSET(sdbp, DB_AM_RECNUM))
2211		return (__dbc_get(sdbc, &discardme, pkey, flags));
2212	else
2213		return (__db_retcopy(env, pkey, &oob,
2214		    sizeof(oob), &sdbc->rdata->data, &sdbc->rdata->ulen));
2215}
2216
2217/*
2218 * __db_wrlock_err -- do not have a write lock.
2219 */
2220static int
2221__db_wrlock_err(env)
2222	ENV *env;
2223{
2224	__db_errx(env, "Write attempted on read-only cursor");
2225	return (EPERM);
2226}
2227
2228/*
2229 * __dbc_del_secondary --
2230 *	Perform a delete operation on a secondary index:  call through
2231 *	to the primary and delete the primary record that this record
2232 *	points to.
2233 *
2234 *	Note that deleting the primary record will call c_del on all
2235 *	the secondaries, including this one;  thus, it is not necessary
2236 *	to execute both this function and an actual delete.
2237 */
2238static int
2239__dbc_del_secondary(dbc)
2240	DBC *dbc;
2241{
2242	DB *pdbp;
2243	DBC *pdbc;
2244	DBT skey, pkey;
2245	ENV *env;
2246	int ret, t_ret;
2247	u_int32_t rmw;
2248
2249	pdbp = dbc->dbp->s_primary;
2250	env = pdbp->env;
2251	rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
2252
2253	/*
2254	 * Get the current item that we're pointing at.
2255	 * We don't actually care about the secondary key, just
2256	 * the primary.
2257	 */
2258	memset(&skey, 0, sizeof(DBT));
2259	memset(&pkey, 0, sizeof(DBT));
2260	F_SET(&skey, DB_DBT_PARTIAL | DB_DBT_USERMEM);
2261	if ((ret = __dbc_get(dbc, &skey, &pkey, DB_CURRENT)) != 0)
2262		return (ret);
2263
2264	SWAP_IF_NEEDED(dbc->dbp, &pkey);
2265
2266	/*
2267	 * Create a cursor on the primary with our locker ID,
2268	 * so that when it calls back, we don't conflict.
2269	 *
2270	 * We create a cursor explicitly because there's no
2271	 * way to specify the same locker ID if we're using
2272	 * locking but not transactions if we use the DB->del
2273	 * interface.  This shouldn't be any less efficient
2274	 * anyway.
2275	 */
2276	if ((ret = __db_cursor_int(pdbp, dbc->thread_info, dbc->txn,
2277	    pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0)
2278		return (ret);
2279
2280	/*
2281	 * See comment in __dbc_put--if we're in CDB,
2282	 * we already hold the locks we need, and we need to flag
2283	 * the cursor as a WRITER so we don't run into errors
2284	 * when we try to delete.
2285	 */
2286	if (CDB_LOCKING(env)) {
2287		DB_ASSERT(env, pdbc->mylock.off == LOCK_INVALID);
2288		F_SET(pdbc, DBC_WRITER);
2289	}
2290
2291	/*
2292	 * Set the new cursor to the correct primary key.  Then
2293	 * delete it.  We don't really care about the datum;
2294	 * just reuse our skey DBT.
2295	 *
2296	 * If the primary get returns DB_NOTFOUND, something is amiss--
2297	 * every record in the secondary should correspond to some record
2298	 * in the primary.
2299	 */
2300	if ((ret = __dbc_get(pdbc, &pkey, &skey, DB_SET | rmw)) == 0)
2301		ret = __dbc_del(pdbc, 0);
2302	else if (ret == DB_NOTFOUND)
2303		ret = __db_secondary_corrupt(pdbp);
2304
2305	if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
2306		ret = t_ret;
2307
2308	return (ret);
2309}
2310
2311/*
2312 * __dbc_del_primary --
2313 *	Perform a delete operation on a primary index.  Loop through
2314 *	all the secondary indices which correspond to this primary
2315 *	database, and delete any secondary keys that point at the current
2316 *	record.
2317 *
2318 * PUBLIC: int __dbc_del_primary __P((DBC *));
2319 */
2320int
2321__dbc_del_primary(dbc)
2322	DBC *dbc;
2323{
2324	DB *dbp, *sdbp;
2325	DBC *sdbc;
2326	DBT *tskeyp;
2327	DBT data, pkey, skey, temppkey, tempskey;
2328	ENV *env;
2329	u_int32_t nskey, rmw;
2330	int ret, t_ret;
2331
2332	dbp = dbc->dbp;
2333	env = dbp->env;
2334	rmw = STD_LOCKING(dbc) ? DB_RMW : 0;
2335
2336	/*
2337	 * If we're called at all, we have at least one secondary.
2338	 * (Unfortunately, we can't assert this without grabbing the mutex.)
2339	 * Get the current record so that we can construct appropriate
2340	 * secondary keys as needed.
2341	 */
2342	memset(&pkey, 0, sizeof(DBT));
2343	memset(&data, 0, sizeof(DBT));
2344	if ((ret = __dbc_get(dbc, &pkey, &data, DB_CURRENT)) != 0)
2345		return (ret);
2346
2347	memset(&skey, 0, sizeof(DBT));
2348	for (ret = __db_s_first(dbp, &sdbp);
2349	    sdbp != NULL && ret == 0;
2350	    ret = __db_s_next(&sdbp, dbc->txn)) {
2351		/*
2352		 * Get the secondary key for this secondary and the current
2353		 * item.
2354		 */
2355		if ((ret = sdbp->s_callback(sdbp, &pkey, &data, &skey)) != 0) {
2356			/* Not indexing is equivalent to an empty key set. */
2357			if (ret == DB_DONOTINDEX) {
2358				F_SET(&skey, DB_DBT_MULTIPLE);
2359				skey.size = 0;
2360			} else /* We had a substantive error.  Bail. */
2361				goto err;
2362		}
2363
2364#ifdef DIAGNOSTIC
2365		if (F_ISSET(&skey, DB_DBT_MULTIPLE))
2366			__db_check_skeyset(sdbp, &skey);
2367#endif
2368
2369		if (F_ISSET(&skey, DB_DBT_MULTIPLE)) {
2370			tskeyp = (DBT *)skey.data;
2371			nskey = skey.size;
2372			if (nskey == 0)
2373				continue;
2374		} else {
2375			tskeyp = &skey;
2376			nskey = 1;
2377		}
2378
2379		/* Open a secondary cursor. */
2380		if ((ret = __db_cursor_int(sdbp,
2381		    dbc->thread_info, dbc->txn, sdbp->type,
2382		    PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0)
2383			goto err;
2384		/* See comment above and in __dbc_put. */
2385		if (CDB_LOCKING(env)) {
2386			DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
2387			F_SET(sdbc, DBC_WRITER);
2388		}
2389
2390		for (; nskey > 0; nskey--, tskeyp++) {
2391			/*
2392			 * Set the secondary cursor to the appropriate item.
2393			 * Delete it.
2394			 *
2395			 * We want to use DB_RMW if locking is on; it's only
2396			 * legal then, though.
2397			 *
2398			 * !!!
2399			 * Don't stomp on any callback-allocated buffer in skey
2400			 * when we do a c_get(DB_GET_BOTH); use a temp DBT
2401			 * instead.  Similarly, don't allow pkey to be
2402			 * invalidated when the cursor is closed.
2403			 */
2404			DB_INIT_DBT(tempskey, tskeyp->data, tskeyp->size);
2405			SWAP_IF_NEEDED(sdbp, &pkey);
2406			DB_INIT_DBT(temppkey, pkey.data, pkey.size);
2407			if ((ret = __dbc_get(sdbc, &tempskey, &temppkey,
2408			    DB_GET_BOTH | rmw)) == 0)
2409				ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY);
2410			else if (ret == DB_NOTFOUND)
2411				ret = __db_secondary_corrupt(dbp);
2412			SWAP_IF_NEEDED(sdbp, &pkey);
2413			FREE_IF_NEEDED(env, tskeyp);
2414		}
2415
2416		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
2417			ret = t_ret;
2418		if (ret != 0)
2419			goto err;
2420
2421		/*
2422		 * In the common case where there is a single secondary key, we
2423		 * will have freed any application-allocated data in skey
2424		 * already.  In the multiple key case, we need to free it here.
2425		 * It is safe to do this twice as the macro resets the data
2426		 * field.
2427		 */
2428		FREE_IF_NEEDED(env, &skey);
2429	}
2430
2431err:	if (sdbp != NULL &&
2432	    (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
2433		ret = t_ret;
2434	FREE_IF_NEEDED(env, &skey);
2435	return (ret);
2436}
2437
2438/*
2439 * __dbc_del_foreign --
2440 *	Apply the foreign database constraints for a particular foreign
2441 *	database when an item is being deleted (dbc points at item being deleted
2442 *	in the foreign database.)
2443 *
2444 *      Delete happens in dbp, check for occurrences of key in pdpb.
2445 *      Terminology:
2446 *        Foreign db = Where delete occurs (dbp).
2447 *        Secondary db = Where references to dbp occur (sdbp, a secondary)
2448 *        Primary db = sdbp's primary database, references to dbp are secondary
2449 *                      keys here
2450 *        Foreign Key = Key being deleted in dbp (fkey)
2451 *        Primary Key = Key of the corresponding entry in sdbp's primary (pkey).
2452 */
2453static int
2454__dbc_del_foreign(dbc)
2455	DBC *dbc;
2456{
2457	DB_FOREIGN_INFO *f_info;
2458	DB *dbp, *pdbp, *sdbp;
2459	DBC *pdbc, *sdbc;
2460	DBT data, fkey, pkey;
2461	ENV *env;
2462	u_int32_t flags, rmw;
2463	int changed, ret, t_ret;
2464
2465	dbp = dbc->dbp;
2466	env = dbp->env;
2467
2468	memset(&fkey, 0, sizeof(DBT));
2469	memset(&data, 0, sizeof(DBT));
2470	if ((ret = __dbc_get(dbc, &fkey, &data, DB_CURRENT)) != 0)
2471		return (ret);
2472
2473	LIST_FOREACH(f_info, &(dbp->f_primaries), f_links) {
2474		sdbp = f_info->dbp;
2475		pdbp = sdbp->s_primary;
2476		flags = f_info->flags;
2477
2478		rmw = (STD_LOCKING(dbc) &&
2479		    !LF_ISSET(DB_FOREIGN_ABORT)) ? DB_RMW : 0;
2480
2481		/*
2482		 * Handle CDB locking.  Some of this is copied from
2483		 * __dbc_del_primary, but a bit more acrobatics are required.
2484		 * If we're not going to abort, then we need to get a write
2485		 * cursor.  If CDB_ALLDB is set, then only one write cursor is
2486		 * allowed and we hold it, so we fudge things and promote the
2487		 * cursor on the other DBs manually, it won't cause a problem.
2488		 * If CDB_ALLDB is not set, then we go through the usual route
2489		 * to make sure we block as necessary.  If there are any open
2490		 * read cursors on sdbp, the delete or put call later will
2491		 * block.
2492		 *
2493		 * If NULLIFY is set, we'll need a cursor on the primary to
2494		 * update it with the nullified data.  Because primary and
2495		 * secondary dbs share a lock file ID in CDB, we open a cursor
2496		 * on the secondary and then get another writeable cursor on the
2497		 * primary via __db_cursor_int to avoid deadlocking.
2498		 */
2499		sdbc = pdbc = NULL;
2500		if (!LF_ISSET(DB_FOREIGN_ABORT) && CDB_LOCKING(env) &&
2501		    !F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
2502			ret = __db_cursor(sdbp,
2503			    dbc->thread_info, dbc->txn, &sdbc, DB_WRITECURSOR);
2504			if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0) {
2505				ret = __db_cursor_int(pdbp,
2506				    dbc->thread_info, dbc->txn, pdbp->type,
2507				    PGNO_INVALID, 0, dbc->locker, &pdbc);
2508				F_SET(pdbc, DBC_WRITER);
2509			}
2510		} else {
2511			ret = __db_cursor_int(sdbp, dbc->thread_info, dbc->txn,
2512			    sdbp->type, PGNO_INVALID, 0, dbc->locker, &sdbc);
2513			if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0)
2514				ret = __db_cursor_int(pdbp, dbc->thread_info,
2515				    dbc->txn, pdbp->type, PGNO_INVALID, 0,
2516				    dbc->locker, &pdbc);
2517			}
2518		if (ret != 0) {
2519			if (sdbc != NULL)
2520				(void)__dbc_close(sdbc);
2521			return (ret);
2522		}
2523		if (CDB_LOCKING(env) && F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)){
2524			DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
2525			F_SET(sdbc, DBC_WRITER);
2526			if (LF_ISSET(DB_FOREIGN_NULLIFY) && pdbc != NULL) {
2527				DB_ASSERT(env,
2528				    pdbc->mylock.off == LOCK_INVALID);
2529				F_SET(pdbc, DBC_WRITER);
2530			}
2531		}
2532
2533		/*
2534		 * There are three actions possible when a foreign database has
2535		 * items corresponding to a deleted item:
2536		 * DB_FOREIGN_ABORT - The delete operation should be aborted.
2537		 * DB_FOREIGN_CASCADE - All corresponding foreign items should
2538		 *    be deleted.
2539		 * DB_FOREIGN_NULLIFY - A callback needs to be made, allowing
2540		 *    the application to modify the data DBT from the
2541		 *    associated database.  If the callback makes a
2542		 *    modification, the updated item needs to replace the
2543		 *    original item in the foreign db
2544		 */
2545		memset(&pkey, 0, sizeof(DBT));
2546		memset(&data, 0, sizeof(DBT));
2547		ret = __dbc_pget(sdbc, &fkey, &pkey, &data, DB_SET|rmw);
2548
2549		if (ret == DB_NOTFOUND) {
2550			/* No entry means no constraint */
2551			ret = __dbc_close(sdbc);
2552			if (LF_ISSET(DB_FOREIGN_NULLIFY) &&
2553			    (t_ret = __dbc_close(pdbc)) != 0)
2554				ret = t_ret;
2555			if (ret != 0)
2556				return (ret);
2557			continue;
2558		} else if (ret != 0) {
2559			/* Just return the error code from the pget */
2560			(void)__dbc_close(sdbc);
2561			if (LF_ISSET(DB_FOREIGN_NULLIFY))
2562				(void)__dbc_close(pdbc);
2563			return (ret);
2564		} else if (LF_ISSET(DB_FOREIGN_ABORT)) {
2565			/* If the record exists and ABORT is set, we're done */
2566			if ((ret = __dbc_close(sdbc)) != 0)
2567				return (ret);
2568			return (DB_FOREIGN_CONFLICT);
2569		}
2570
2571		/*
2572		 * There were matching items in the primary DB, and the action
2573		 * is either DB_FOREIGN_CASCADE or DB_FOREIGN_NULLIFY.
2574		 */
2575		while (ret == 0) {
2576			if (LF_ISSET(DB_FOREIGN_CASCADE)) {
2577				/*
2578				 * Don't use the DB_UPDATE_SECONDARY flag,
2579				 * since we want the delete to cascade into the
2580				 * secondary's primary.
2581				 */
2582				if ((ret = __dbc_del(sdbc, 0)) != 0) {
2583					__db_err(env, ret,
2584	    "Attempt to execute cascading delete in a foreign index failed");
2585					break;
2586				}
2587			} else if (LF_ISSET(DB_FOREIGN_NULLIFY)) {
2588				changed = 0;
2589				if ((ret = f_info->callback(sdbp,
2590				    &pkey, &data, &fkey, &changed)) != 0) {
2591					__db_err(env, ret,
2592				    "Foreign database application callback");
2593					break;
2594				}
2595
2596				/*
2597				 * If the user callback modified the DBT and
2598				 * a put on the primary failed.
2599				 */
2600				if (changed && (ret = __dbc_put(pdbc,
2601				    &pkey, &data, DB_KEYFIRST)) != 0) {
2602					__db_err(env, ret,
2603  "Attempt to overwrite item in foreign database with nullified value failed");
2604					break;
2605				}
2606			}
2607			/* retrieve the next matching item from the prim. db */
2608			memset(&pkey, 0, sizeof(DBT));
2609			memset(&data, 0, sizeof(DBT));
2610			ret = __dbc_pget(sdbc,
2611			    &fkey, &pkey, &data, DB_NEXT_DUP|rmw);
2612		}
2613
2614		if (ret == DB_NOTFOUND)
2615			ret = 0;
2616		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
2617			ret = t_ret;
2618		if (LF_ISSET(DB_FOREIGN_NULLIFY) &&
2619		    (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
2620			ret = t_ret;
2621		if (ret != 0)
2622			return (ret);
2623	}
2624
2625	return (ret);
2626}
2627
2628/*
2629 * __db_s_first --
2630 *	Get the first secondary, if any are present, from the primary.
2631 *
2632 * PUBLIC: int __db_s_first __P((DB *, DB **));
2633 */
2634int
2635__db_s_first(pdbp, sdbpp)
2636	DB *pdbp, **sdbpp;
2637{
2638	DB *sdbp;
2639
2640	MUTEX_LOCK(pdbp->env, pdbp->mutex);
2641	sdbp = LIST_FIRST(&pdbp->s_secondaries);
2642
2643	/* See __db_s_next. */
2644	if (sdbp != NULL)
2645		sdbp->s_refcnt++;
2646	MUTEX_UNLOCK(pdbp->env, pdbp->mutex);
2647
2648	*sdbpp = sdbp;
2649
2650	return (0);
2651}
2652
2653/*
2654 * __db_s_next --
2655 *	Get the next secondary in the list.
2656 *
2657 * PUBLIC: int __db_s_next __P((DB **, DB_TXN *));
2658 */
2659int
2660__db_s_next(sdbpp, txn)
2661	DB **sdbpp;
2662	DB_TXN *txn;
2663{
2664	DB *sdbp, *pdbp, *closeme;
2665	ENV *env;
2666	int ret;
2667
2668	/*
2669	 * Secondary indices are kept in a linked list, s_secondaries,
2670	 * off each primary DB handle.  If a primary is free-threaded,
2671	 * this list may only be traversed or modified while the primary's
2672	 * thread mutex is held.
2673	 *
2674	 * The tricky part is that we don't want to hold the thread mutex
2675	 * across the full set of secondary puts necessary for each primary
2676	 * put, or we'll wind up essentially single-threading all the puts
2677	 * to the handle;  the secondary puts will each take about as
2678	 * long as the primary does, and may require I/O.  So we instead
2679	 * hold the thread mutex only long enough to follow one link to the
2680	 * next secondary, and then we release it before performing the
2681	 * actual secondary put.
2682	 *
2683	 * The only danger here is that we might legitimately close a
2684	 * secondary index in one thread while another thread is performing
2685	 * a put and trying to update that same secondary index.  To
2686	 * prevent this from happening, we refcount the secondary handles.
2687	 * If close is called on a secondary index handle while we're putting
2688	 * to it, it won't really be closed--the refcount will simply drop,
2689	 * and we'll be responsible for closing it here.
2690	 */
2691	sdbp = *sdbpp;
2692	pdbp = sdbp->s_primary;
2693	env = pdbp->env;
2694	closeme = NULL;
2695
2696	MUTEX_LOCK(env, pdbp->mutex);
2697	DB_ASSERT(env, sdbp->s_refcnt != 0);
2698	if (--sdbp->s_refcnt == 0) {
2699		LIST_REMOVE(sdbp, s_links);
2700		closeme = sdbp;
2701	}
2702	sdbp = LIST_NEXT(sdbp, s_links);
2703	if (sdbp != NULL)
2704		sdbp->s_refcnt++;
2705	MUTEX_UNLOCK(env, pdbp->mutex);
2706
2707	*sdbpp = sdbp;
2708
2709	/*
2710	 * closeme->close() is a wrapper;  call __db_close explicitly.
2711	 */
2712	if (closeme == NULL)
2713		ret = 0;
2714	else if (txn == NULL)
2715		ret = __db_close(closeme, NULL, 0);
2716	else
2717		ret = __txn_closeevent(env, txn, closeme);
2718
2719	return (ret);
2720}
2721
2722/*
2723 * __db_s_done --
2724 *	Properly decrement the refcount on a secondary database handle we're
2725 *	using, without calling __db_s_next.
2726 *
2727 * PUBLIC: int __db_s_done __P((DB *, DB_TXN *));
2728 */
2729int
2730__db_s_done(sdbp, txn)
2731	DB *sdbp;
2732	DB_TXN *txn;
2733{
2734	DB *pdbp;
2735	ENV *env;
2736	int doclose, ret;
2737
2738	pdbp = sdbp->s_primary;
2739	env = pdbp->env;
2740	doclose = 0;
2741
2742	MUTEX_LOCK(env, pdbp->mutex);
2743	DB_ASSERT(env, sdbp->s_refcnt != 0);
2744	if (--sdbp->s_refcnt == 0) {
2745		LIST_REMOVE(sdbp, s_links);
2746		doclose = 1;
2747	}
2748	MUTEX_UNLOCK(env, pdbp->mutex);
2749
2750	if (doclose == 0)
2751		ret = 0;
2752	else if (txn == NULL)
2753		ret = __db_close(sdbp, NULL, 0);
2754	else
2755		ret = __txn_closeevent(env, txn, sdbp);
2756	return (ret);
2757}
2758
2759/*
2760 * __db_s_count --
2761 *	Count the number of secondaries associated with a given primary.
2762 */
2763static int
2764__db_s_count(pdbp)
2765	DB *pdbp;
2766{
2767	DB *sdbp;
2768	ENV *env;
2769	int count;
2770
2771	env = pdbp->env;
2772	count = 0;
2773
2774	MUTEX_LOCK(env, pdbp->mutex);
2775	for (sdbp = LIST_FIRST(&pdbp->s_secondaries);
2776	    sdbp != NULL;
2777	    sdbp = LIST_NEXT(sdbp, s_links))
2778		++count;
2779	MUTEX_UNLOCK(env, pdbp->mutex);
2780
2781	return (count);
2782}
2783
2784/*
2785 * __db_buildpartial --
2786 *	Build the record that will result after a partial put is applied to
2787 *	an existing record.
2788 *
2789 *	This should probably be merged with __bam_build, but that requires
2790 *	a little trickery if we plan to keep the overflow-record optimization
2791 *	in that function.
2792 */
2793static int
2794__db_buildpartial(dbp, oldrec, partial, newrec)
2795	DB *dbp;
2796	DBT *oldrec, *partial, *newrec;
2797{
2798	ENV *env;
2799	u_int32_t len, nbytes;
2800	u_int8_t *buf;
2801	int ret;
2802
2803	env = dbp->env;
2804
2805	DB_ASSERT(env, F_ISSET(partial, DB_DBT_PARTIAL));
2806
2807	memset(newrec, 0, sizeof(DBT));
2808
2809	nbytes = __db_partsize(oldrec->size, partial);
2810	newrec->size = nbytes;
2811
2812	if ((ret = __os_malloc(env, nbytes, &buf)) != 0)
2813		return (ret);
2814	newrec->data = buf;
2815
2816	/* Nul or pad out the buffer, for any part that isn't specified. */
2817	memset(buf,
2818	    F_ISSET(dbp, DB_AM_FIXEDLEN) ? ((BTREE *)dbp->bt_internal)->re_pad :
2819	    0, nbytes);
2820
2821	/* Copy in any leading data from the original record. */
2822	memcpy(buf, oldrec->data,
2823	    partial->doff > oldrec->size ? oldrec->size : partial->doff);
2824
2825	/* Copy the data from partial. */
2826	memcpy(buf + partial->doff, partial->data, partial->size);
2827
2828	/* Copy any trailing data from the original record. */
2829	len = partial->doff + partial->dlen;
2830	if (oldrec->size > len)
2831		memcpy(buf + partial->doff + partial->size,
2832		    (u_int8_t *)oldrec->data + len, oldrec->size - len);
2833
2834	return (0);
2835}
2836
2837/*
2838 * __db_partsize --
2839 *	Given the number of bytes in an existing record and a DBT that
2840 *	is about to be partial-put, calculate the size of the record
2841 *	after the put.
2842 *
2843 *	This code is called from __bam_partsize.
2844 *
2845 * PUBLIC: u_int32_t __db_partsize __P((u_int32_t, DBT *));
2846 */
2847u_int32_t
2848__db_partsize(nbytes, data)
2849	u_int32_t nbytes;
2850	DBT *data;
2851{
2852
2853	/*
2854	 * There are really two cases here:
2855	 *
2856	 * Case 1: We are replacing some bytes that do not exist (i.e., they
2857	 * are past the end of the record).  In this case the number of bytes
2858	 * we are replacing is irrelevant and all we care about is how many
2859	 * bytes we are going to add from offset.  So, the new record length
2860	 * is going to be the size of the new bytes (size) plus wherever those
2861	 * new bytes begin (doff).
2862	 *
2863	 * Case 2: All the bytes we are replacing exist.  Therefore, the new
2864	 * size is the oldsize (nbytes) minus the bytes we are replacing (dlen)
2865	 * plus the bytes we are adding (size).
2866	 */
2867	if (nbytes < data->doff + data->dlen)		/* Case 1 */
2868		return (data->doff + data->size);
2869
2870	return (nbytes + data->size - data->dlen);	/* Case 2 */
2871}
2872
2873#ifdef DIAGNOSTIC
2874/*
2875 * __db_check_skeyset --
2876 *	Diagnostic check that the application's callback returns a set of
2877 *	secondary keys without repeats.
2878 *
2879 * PUBLIC: #ifdef DIAGNOSTIC
2880 * PUBLIC: void __db_check_skeyset __P((DB *, DBT *));
2881 * PUBLIC: #endif
2882 */
2883void
2884__db_check_skeyset(sdbp, skeyp)
2885	DB *sdbp;
2886	DBT *skeyp;
2887{
2888	DBT *firstkey, *lastkey, *key1, *key2;
2889	ENV *env;
2890
2891	env = sdbp->env;
2892
2893	firstkey = (DBT *)skeyp->data;
2894	lastkey = firstkey + skeyp->size;
2895	for (key1 = firstkey; key1 < lastkey; key1++)
2896		for (key2 = key1 + 1; key2 < lastkey; key2++)
2897			DB_ASSERT(env,
2898			    ((BTREE *)sdbp->bt_internal)->bt_compare(sdbp,
2899			    key1, key2) != 0);
2900}
2901#endif
2902