1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1998,2008 Oracle.  All rights reserved.
5 *
6 * $Id: db_am.c,v 12.50 2008/02/18 19:11:59 bschmeck Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/hash.h"
15#include "dbinc/lock.h"
16#include "dbinc/log.h"
17#include "dbinc/mp.h"
18#include "dbinc/qam.h"
19#include "dbinc/txn.h"
20
21static int __db_append_primary __P((DBC *, DBT *, DBT *));
22static int __db_secondary_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
23static int __dbc_set_priority __P((DBC *, DB_CACHE_PRIORITY));
24static int __dbc_get_priority __P((DBC *, DB_CACHE_PRIORITY* ));
25
26/*
27 * __db_cursor_int --
28 *	Internal routine to create a cursor.
29 *
30 * PUBLIC: int __db_cursor_int __P((DB *, DB_THREAD_INFO *,
31 * PUBLIC:     DB_TXN *, DBTYPE, db_pgno_t, int, DB_LOCKER *, DBC **));
32 */
33int
34__db_cursor_int(dbp, ip, txn, dbtype, root, flags, locker, dbcp)
35	DB *dbp;
36	DB_THREAD_INFO *ip;
37	DB_TXN *txn;
38	DBTYPE dbtype;
39	db_pgno_t root;
40	int flags;
41	DB_LOCKER *locker;
42	DBC **dbcp;
43{
44	DBC *dbc;
45	DBC_INTERNAL *cp;
46	ENV *env;
47	db_threadid_t tid;
48	int allocated, ret;
49	pid_t pid;
50
51	env = dbp->env;
52	allocated = 0;
53
54	/*
55	 * If dbcp is non-NULL it is assumed to point to an area to initialize
56	 * as a cursor.
57	 *
58	 * Take one from the free list if it's available.  Take only the
59	 * right type.  With off page dups we may have different kinds
60	 * of cursors on the queue for a single database.
61	 */
62	MUTEX_LOCK(env, dbp->mutex);
63
64#ifndef HAVE_NO_DB_REFCOUNT
65	/*
66	 * If this DBP is being logged then refcount the log filename
67	 * relative to this transaction. We do this here because we have
68	 * the dbp->mutex which protects the refcount.  We want to avoid
69	 * calling the function if we are duplicating a cursor.  This includes
70	 * the case of creating an off page duplicate cursor. If we know this
71	 * cursor will not be used in an update, we could avoid this,
72	 * but we don't have that information.
73	 */
74	if (txn != NULL &&
75	    !LF_ISSET(DBC_OPD|DBC_DUPLICATE) && !F_ISSET(dbp, DB_AM_RECOVER) &&
76	    dbp->log_filename != NULL && !IS_REP_CLIENT(env) &&
77	    (ret = __txn_record_fname(env, txn, dbp->log_filename)) != 0)
78		return (ret);
79#endif
80
81	TAILQ_FOREACH(dbc, &dbp->free_queue, links)
82		if (dbtype == dbc->dbtype) {
83			TAILQ_REMOVE(&dbp->free_queue, dbc, links);
84			F_CLR(dbc, ~DBC_OWN_LID);
85			break;
86		}
87	MUTEX_UNLOCK(env, dbp->mutex);
88
89	if (dbc == NULL) {
90		if ((ret = __os_calloc(env, 1, sizeof(DBC), &dbc)) != 0)
91			return (ret);
92		allocated = 1;
93		dbc->flags = 0;
94
95		dbc->dbp = dbp;
96		dbc->dbenv = dbp->dbenv;
97		dbc->env = dbp->env;
98
99		/* Set up locking information. */
100		if (LOCKING_ON(env)) {
101			/*
102			 * If we are not threaded, we share a locker ID among
103			 * all cursors opened in the environment handle,
104			 * allocating one if this is the first cursor.
105			 *
106			 * This relies on the fact that non-threaded DB handles
107			 * always have non-threaded environment handles, since
108			 * we set DB_THREAD on DB handles created with threaded
109			 * environment handles.
110			 */
111			if (!DB_IS_THREADED(dbp)) {
112				if (env->env_lref == NULL && (ret =
113				    __lock_id(env, NULL, &env->env_lref)) != 0)
114					goto err;
115				dbc->lref = env->env_lref;
116			} else {
117				if ((ret =
118				    __lock_id(env, NULL, &dbc->lref)) != 0)
119					goto err;
120				F_SET(dbc, DBC_OWN_LID);
121			}
122
123			/*
124			 * In CDB, secondary indices should share a lock file
125			 * ID with the primary;  otherwise we're susceptible
126			 * to deadlocks.  We also use __db_cursor_int rather
127			 * than __db_cursor to create secondary update cursors
128			 * in c_put and c_del; these won't acquire a new lock.
129			 *
130			 * !!!
131			 * Since this is in the one-time cursor allocation
132			 * code, we need to be sure to destroy, not just
133			 * close, all cursors in the secondary when we
134			 * associate.
135			 */
136			if (CDB_LOCKING(env) &&
137			    F_ISSET(dbp, DB_AM_SECONDARY))
138				memcpy(dbc->lock.fileid,
139				    dbp->s_primary->fileid, DB_FILE_ID_LEN);
140			else
141				memcpy(dbc->lock.fileid,
142				    dbp->fileid, DB_FILE_ID_LEN);
143
144			if (CDB_LOCKING(env)) {
145				if (F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) {
146					/*
147					 * If we are doing a single lock per
148					 * environment, set up the global
149					 * lock object just like we do to
150					 * single thread creates.
151					 */
152					DB_ASSERT(env, sizeof(db_pgno_t) ==
153					    sizeof(u_int32_t));
154					dbc->lock_dbt.size = sizeof(u_int32_t);
155					dbc->lock_dbt.data = &dbc->lock.pgno;
156					dbc->lock.pgno = 0;
157				} else {
158					dbc->lock_dbt.size = DB_FILE_ID_LEN;
159					dbc->lock_dbt.data = dbc->lock.fileid;
160				}
161			} else {
162				dbc->lock.type = DB_PAGE_LOCK;
163				dbc->lock_dbt.size = sizeof(dbc->lock);
164				dbc->lock_dbt.data = &dbc->lock;
165			}
166		}
167		/* Init the DBC internal structure. */
168		switch (dbtype) {
169		case DB_BTREE:
170		case DB_RECNO:
171			if ((ret = __bamc_init(dbc, dbtype)) != 0)
172				goto err;
173			break;
174		case DB_HASH:
175			if ((ret = __hamc_init(dbc)) != 0)
176				goto err;
177			break;
178		case DB_QUEUE:
179			if ((ret = __qamc_init(dbc)) != 0)
180				goto err;
181			break;
182		case DB_UNKNOWN:
183		default:
184			ret = __db_unknown_type(env, "DB->cursor", dbtype);
185			goto err;
186		}
187
188		cp = dbc->internal;
189	}
190
191	/* Refresh the DBC structure. */
192	dbc->dbtype = dbtype;
193	RESET_RET_MEM(dbc);
194	dbc->set_priority = __dbc_set_priority;
195	dbc->get_priority = __dbc_get_priority;
196	dbc->priority = dbp->priority;
197
198	if ((dbc->txn = txn) != NULL)
199		dbc->locker = txn->locker;
200	else if (LOCKING_ON(env)) {
201		/*
202		 * There are certain cases in which we want to create a
203		 * new cursor with a particular locker ID that is known
204		 * to be the same as (and thus not conflict with) an
205		 * open cursor.
206		 *
207		 * The most obvious case is cursor duplication;  when we
208		 * call DBC->dup or __dbc_idup, we want to use the original
209		 * cursor's locker ID.
210		 *
211		 * Another case is when updating secondary indices.  Standard
212		 * CDB locking would mean that we might block ourself:  we need
213		 * to open an update cursor in the secondary while an update
214		 * cursor in the primary is open, and when the secondary and
215		 * primary are subdatabases or we're using env-wide locking,
216		 * this is disastrous.
217		 *
218		 * In these cases, our caller will pass a nonzero locker
219		 * ID into this function.  Use this locker ID instead of
220		 * the default as the locker ID for our new cursor.
221		 */
222		if (locker != NULL)
223			dbc->locker = locker;
224		else {
225			/*
226			 * If we are threaded then we need to set the
227			 * proper thread id into the locker.
228			 */
229			if (DB_IS_THREADED(dbp)) {
230				env->dbenv->thread_id(env->dbenv, &pid, &tid);
231				__lock_set_thread_id(dbc->lref, pid, tid);
232			}
233			dbc->locker = dbc->lref;
234		}
235	}
236
237	/*
238	 * These fields change when we are used as a secondary index, so
239	 * if the DB is a secondary, make sure they're set properly just
240	 * in case we opened some cursors before we were associated.
241	 *
242	 * __dbc_get is used by all access methods, so this should be safe.
243	 */
244	if (F_ISSET(dbp, DB_AM_SECONDARY))
245		dbc->get = dbc->c_get = __dbc_secondary_get_pp;
246
247	if (LF_ISSET(DBC_OPD))
248		F_SET(dbc, DBC_OPD);
249	if (F_ISSET(dbp, DB_AM_RECOVER))
250		F_SET(dbc, DBC_RECOVER);
251	if (F_ISSET(dbp, DB_AM_COMPENSATE))
252		F_SET(dbc, DBC_DONTLOCK);
253
254	/* Refresh the DBC internal structure. */
255	cp = dbc->internal;
256	cp->opd = NULL;
257
258	cp->indx = 0;
259	cp->page = NULL;
260	cp->pgno = PGNO_INVALID;
261	cp->root = root;
262
263	switch (dbtype) {
264	case DB_BTREE:
265	case DB_RECNO:
266		if ((ret = __bamc_refresh(dbc)) != 0)
267			goto err;
268		break;
269	case DB_HASH:
270	case DB_QUEUE:
271		break;
272	case DB_UNKNOWN:
273	default:
274		ret = __db_unknown_type(env, "DB->cursor", dbp->type);
275		goto err;
276	}
277
278	/*
279	 * The transaction keeps track of how many cursors were opened within
280	 * it to catch application errors where the cursor isn't closed when
281	 * the transaction is resolved.
282	 */
283	if (txn != NULL)
284		++txn->cursors;
285	if (ip != NULL)
286		dbc->thread_info = ip;
287	else if (txn != NULL)
288		dbc->thread_info = txn->thread_info;
289	else
290		ENV_GET_THREAD_INFO(env, dbc->thread_info);
291
292	MUTEX_LOCK(env, dbp->mutex);
293	TAILQ_INSERT_TAIL(&dbp->active_queue, dbc, links);
294	F_SET(dbc, DBC_ACTIVE);
295	MUTEX_UNLOCK(env, dbp->mutex);
296
297	*dbcp = dbc;
298	return (0);
299
300err:	if (allocated)
301		__os_free(env, dbc);
302	return (ret);
303}
304
305/*
306 * __db_put --
307 *	Store a key/data pair.
308 *
309 * PUBLIC: int __db_put __P((DB *,
310 * PUBLIC:      DB_THREAD_INFO *, DB_TXN *, DBT *, DBT *, u_int32_t));
311 */
312int
313__db_put(dbp, ip, txn, key, data, flags)
314	DB *dbp;
315	DB_THREAD_INFO *ip;
316	DB_TXN *txn;
317	DBT *key, *data;
318	u_int32_t flags;
319{
320	DBC *dbc;
321	DBT tdata;
322	ENV *env;
323	int ret, t_ret;
324
325	env = dbp->env;
326
327	if ((ret = __db_cursor(dbp, ip, txn, &dbc, DB_WRITELOCK)) != 0)
328		return (ret);
329
330	DEBUG_LWRITE(dbc, txn, "DB->put", key, data, flags);
331
332	SET_RET_MEM(dbc, dbp);
333
334	/*
335	 * See the comment in __db_get().
336	 *
337	 * Note that the c_get in the DB_NOOVERWRITE case is safe to
338	 * do with this flag set;  if it errors in any way other than
339	 * DB_NOTFOUND, we're going to close the cursor without doing
340	 * anything else, and if it returns DB_NOTFOUND then it's safe
341	 * to do a c_put(DB_KEYLAST) even if an access method moved the
342	 * cursor, since that's not position-dependent.
343	 */
344	F_SET(dbc, DBC_TRANSIENT);
345
346	switch (flags) {
347	case DB_APPEND:
348		/*
349		 * If there is an append callback, the value stored in
350		 * data->data may be replaced and then freed.  To avoid
351		 * passing a freed pointer back to the user, just operate
352		 * on a copy of the data DBT.
353		 */
354		tdata = *data;
355
356		/*
357		 * Append isn't a normal put operation;  call the appropriate
358		 * access method's append function.
359		 */
360		switch (dbp->type) {
361		case DB_QUEUE:
362			if ((ret = __qam_append(dbc, key, &tdata)) != 0)
363				goto err;
364			break;
365		case DB_RECNO:
366			if ((ret = __ram_append(dbc, key, &tdata)) != 0)
367				goto err;
368			break;
369		case DB_BTREE:
370		case DB_HASH:
371		case DB_UNKNOWN:
372		default:
373			/* The interface should prevent this. */
374			DB_ASSERT(env,
375			    dbp->type == DB_QUEUE || dbp->type == DB_RECNO);
376
377			ret = __db_ferr(env, "DB->put", 0);
378			goto err;
379		}
380
381		/*
382		 * Secondary indices:  since we've returned zero from an append
383		 * function, we've just put a record, and done so outside
384		 * __dbc_put.  We know we're not a secondary-- the interface
385		 * prevents puts on them--but we may be a primary.  If so,
386		 * update our secondary indices appropriately.
387		 *
388		 * If the application is managing this key's data, we need a
389		 * copy of it here.  It will be freed in __db_put_pp.
390		 */
391		DB_ASSERT(env, !F_ISSET(dbp, DB_AM_SECONDARY));
392
393		if (LIST_FIRST(&dbp->s_secondaries) != NULL &&
394		    (ret = __dbt_usercopy(env, key)) == 0)
395			ret = __db_append_primary(dbc, key, &tdata);
396
397		/*
398		 * The append callback, if one exists, may have allocated
399		 * a new tdata.data buffer.  If so, free it.
400		 */
401		FREE_IF_NEEDED(env, &tdata);
402
403		/* No need for a cursor put;  we're done. */
404		goto done;
405	default:
406		/* Fall through to normal cursor put. */
407		break;
408	}
409
410	if (ret == 0)
411		ret = __dbc_put(dbc,
412		    key, data, flags == 0 ? DB_KEYLAST : flags);
413
414err:
415done:	/* Close the cursor. */
416	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
417		ret = t_ret;
418
419	return (ret);
420}
421
422/*
423 * __db_del --
424 *	Delete the items referenced by a key.
425 *
426 * PUBLIC: int __db_del __P((DB *,
427 * PUBLIC:      DB_THREAD_INFO *, DB_TXN *, DBT *, u_int32_t));
428 */
429int
430__db_del(dbp, ip, txn, key, flags)
431	DB *dbp;
432	DB_THREAD_INFO *ip;
433	DB_TXN *txn;
434	DBT *key;
435	u_int32_t flags;
436{
437	DBC *dbc;
438	DBT data;
439	u_int32_t f_init, f_next;
440	int ret, t_ret;
441
442	/* Allocate a cursor. */
443	if ((ret = __db_cursor(dbp, ip, txn, &dbc, DB_WRITELOCK)) != 0)
444		goto err;
445
446	DEBUG_LWRITE(dbc, txn, "DB->del", key, NULL, flags);
447	COMPQUIET(flags, 0);
448
449	/*
450	 * Walk a cursor through the key/data pairs, deleting as we go.  Set
451	 * the DB_DBT_USERMEM flag, as this might be a threaded application
452	 * and the flags checking will catch us.  We don't actually want the
453	 * keys or data, set DB_DBT_ISSET.  We rely on __dbc_get to clear
454	 * this.
455	 */
456	memset(&data, 0, sizeof(data));
457	F_SET(&data, DB_DBT_USERMEM | DB_DBT_ISSET);
458	F_SET(key, DB_DBT_ISSET);
459
460	/*
461	 * If locking (and we haven't already acquired CDB locks), set the
462	 * read-modify-write flag.
463	 */
464	f_init = DB_SET;
465	f_next = DB_NEXT_DUP;
466	if (STD_LOCKING(dbc)) {
467		f_init |= DB_RMW;
468		f_next |= DB_RMW;
469	}
470
471	/*
472	 * Optimize the simple cases.  For all AMs if we don't have secondaries
473	 * and are not a secondary and we aren't a foreign database and there
474	 * are no dups then we can avoid a bunch of overhead.  For queue we
475	 * don't need to fetch the record since we delete by direct calculation
476	 * from the record number.
477	 *
478	 * Hash permits an optimization in DB->del: since on-page duplicates are
479	 * stored in a single HKEYDATA structure, it's possible to delete an
480	 * entire set of them at once, and as the HKEYDATA has to be rebuilt
481	 * and re-put each time it changes, this is much faster than deleting
482	 * the duplicates one by one.  Thus, if not pointing at an off-page
483	 * duplicate set, and we're not using secondary indices (in which case
484	 * we'd have to examine the items one by one anyway), let hash do this
485	 * "quick delete".
486	 *
487	 * !!!
488	 * Note that this is the only application-executed delete call in
489	 * Berkeley DB that does not go through the __dbc_del function.
490	 * If anything other than the delete itself (like a secondary index
491	 * update) has to happen there in a particular situation, the
492	 * conditions here should be modified not to use these optimizations.
493	 * The ordinary AM-independent alternative will work just fine;
494	 * it'll just be slower.
495	 */
496	if (!F_ISSET(dbp, DB_AM_SECONDARY) &&
497	    LIST_FIRST(&dbp->s_secondaries) == NULL &&
498	    LIST_FIRST(&dbp->f_primaries) == NULL) {
499#ifdef HAVE_QUEUE
500		if (dbp->type == DB_QUEUE) {
501			ret = __qam_delete(dbc, key);
502			F_CLR(key, DB_DBT_ISSET);
503			goto done;
504		}
505#endif
506
507		/* Fetch the first record. */
508		if ((ret = __dbc_get(dbc, key, &data, f_init)) != 0)
509			goto err;
510
511#ifdef HAVE_HASH
512		if (dbp->type == DB_HASH && dbc->internal->opd == NULL) {
513			ret = __ham_quick_delete(dbc);
514			goto done;
515		}
516#endif
517
518		if ((dbp->type == DB_BTREE || dbp->type == DB_RECNO) &&
519		    !F_ISSET(dbp, DB_AM_DUP)) {
520			ret = dbc->am_del(dbc);
521			goto done;
522		}
523	} else if ((ret = __dbc_get(dbc, key, &data, f_init)) != 0)
524		goto err;
525
526	/* Walk through the set of key/data pairs, deleting as we go. */
527	for (;;) {
528		if ((ret = __dbc_del(dbc, 0)) != 0)
529			break;
530		F_SET(key, DB_DBT_ISSET);
531		F_SET(&data, DB_DBT_ISSET);
532		if ((ret = __dbc_get(dbc, key, &data, f_next)) != 0) {
533			if (ret == DB_NOTFOUND)
534				ret = 0;
535			break;
536		}
537	}
538
539done:
540err:	/* Discard the cursor. */
541	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
542		ret = t_ret;
543
544	return (ret);
545}
546
547/*
548 * __db_sync --
549 *	Flush the database cache.
550 *
551 * PUBLIC: int __db_sync __P((DB *));
552 */
553int
554__db_sync(dbp)
555	DB *dbp;
556{
557	int ret, t_ret;
558
559	ret = 0;
560
561	/* If the database was read-only, we're done. */
562	if (F_ISSET(dbp, DB_AM_RDONLY))
563		return (0);
564
565	/* If it's a Recno tree, write the backing source text file. */
566	if (dbp->type == DB_RECNO)
567		ret = __ram_writeback(dbp);
568
569	/* If the database was never backed by a database file, we're done. */
570	if (F_ISSET(dbp, DB_AM_INMEM))
571		return (ret);
572
573	if (dbp->type == DB_QUEUE)
574		ret = __qam_sync(dbp);
575	else
576		/* Flush any dirty pages from the cache to the backing file. */
577		if ((t_ret = __memp_fsync(dbp->mpf)) != 0 && ret == 0)
578			ret = t_ret;
579
580	return (ret);
581}
582
583/*
584 * __db_associate --
585 *	Associate another database as a secondary index to this one.
586 *
587 * PUBLIC: int __db_associate __P((DB *, DB_THREAD_INFO *, DB_TXN *, DB *,
588 * PUBLIC:     int (*)(DB *, const DBT *, const DBT *, DBT *), u_int32_t));
589 */
590int
591__db_associate(dbp, ip, txn, sdbp, callback, flags)
592	DB *dbp, *sdbp;
593	DB_THREAD_INFO *ip;
594	DB_TXN *txn;
595	int (*callback) __P((DB *, const DBT *, const DBT *, DBT *));
596	u_int32_t flags;
597{
598	DBC *pdbc, *sdbc;
599	DBT key, data, skey, *tskeyp;
600	ENV *env;
601	int build, ret, t_ret;
602	u_int32_t nskey;
603
604	env = dbp->env;
605	pdbc = sdbc = NULL;
606	ret = 0;
607
608	memset(&skey, 0, sizeof(DBT));
609	nskey = 0;
610	tskeyp = NULL;
611
612	/*
613	 * Check to see if the secondary is empty -- and thus if we should
614	 * build it -- before we link it in and risk making it show up in other
615	 * threads.  Do this first so that the databases remain unassociated on
616	 * error.
617	 */
618	build = 0;
619	if (LF_ISSET(DB_CREATE)) {
620		if ((ret = __db_cursor(sdbp, ip, txn, &sdbc, 0)) != 0)
621			goto err;
622
623		/*
624		 * We don't care about key or data;  we're just doing
625		 * an existence check.
626		 */
627		memset(&key, 0, sizeof(DBT));
628		memset(&data, 0, sizeof(DBT));
629		F_SET(&key, DB_DBT_PARTIAL | DB_DBT_USERMEM);
630		F_SET(&data, DB_DBT_PARTIAL | DB_DBT_USERMEM);
631		if ((ret = __dbc_get(sdbc, &key, &data,
632		    (STD_LOCKING(sdbc) ? DB_RMW : 0) |
633		    DB_FIRST)) == DB_NOTFOUND) {
634			build = 1;
635			ret = 0;
636		}
637
638		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
639			ret = t_ret;
640
641		/* Reset for later error check. */
642		sdbc = NULL;
643
644		if (ret != 0)
645			goto err;
646	}
647
648	/*
649	 * Set up the database handle as a secondary.
650	 */
651	sdbp->s_callback = callback;
652	sdbp->s_primary = dbp;
653
654	sdbp->stored_get = sdbp->get;
655	sdbp->get = __db_secondary_get;
656
657	sdbp->stored_close = sdbp->close;
658	sdbp->close = __db_secondary_close_pp;
659
660	F_SET(sdbp, DB_AM_SECONDARY);
661
662	if (LF_ISSET(DB_IMMUTABLE_KEY))
663		FLD_SET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY);
664
665	/*
666	 * Add the secondary to the list on the primary.  Do it here
667	 * so that we see any updates that occur while we're walking
668	 * the primary.
669	 */
670	MUTEX_LOCK(env, dbp->mutex);
671
672	/* See __db_s_next for an explanation of secondary refcounting. */
673	DB_ASSERT(env, sdbp->s_refcnt == 0);
674	sdbp->s_refcnt = 1;
675	LIST_INSERT_HEAD(&dbp->s_secondaries, sdbp, s_links);
676	MUTEX_UNLOCK(env, dbp->mutex);
677
678	if (build) {
679		/*
680		 * We loop through the primary, putting each item we
681		 * find into the new secondary.
682		 *
683		 * If we're using CDB, opening these two cursors puts us
684		 * in a bit of a locking tangle:  CDB locks are done on the
685		 * primary, so that we stay deadlock-free, but that means
686		 * that updating the secondary while we have a read cursor
687		 * open on the primary will self-block.  To get around this,
688		 * we force the primary cursor to use the same locker ID
689		 * as the secondary, so they won't conflict.  This should
690		 * be harmless even if we're not using CDB.
691		 */
692		if ((ret = __db_cursor(sdbp, ip, txn, &sdbc,
693		    CDB_LOCKING(sdbp->env) ? DB_WRITECURSOR : 0)) != 0)
694			goto err;
695		if ((ret = __db_cursor_int(dbp, ip,
696		    txn, dbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0)
697			goto err;
698
699		/* Lock out other threads, now that we have a locker. */
700		dbp->associate_locker = sdbc->locker;
701
702		memset(&key, 0, sizeof(DBT));
703		memset(&data, 0, sizeof(DBT));
704		while ((ret = __dbc_get(pdbc, &key, &data, DB_NEXT)) == 0) {
705			if ((ret = callback(sdbp, &key, &data, &skey)) != 0) {
706				if (ret == DB_DONOTINDEX)
707					continue;
708				goto err;
709			}
710			if (F_ISSET(&skey, DB_DBT_MULTIPLE)) {
711#ifdef DIAGNOSTIC
712				__db_check_skeyset(sdbp, &skey);
713#endif
714				nskey = skey.size;
715				tskeyp = (DBT *)skey.data;
716			} else {
717				nskey = 1;
718				tskeyp = &skey;
719			}
720			SWAP_IF_NEEDED(sdbp, &key);
721			for (; nskey > 0; nskey--, tskeyp++) {
722				if ((ret = __dbc_put(sdbc,
723				    tskeyp, &key, DB_UPDATE_SECONDARY)) != 0)
724					goto err;
725				FREE_IF_NEEDED(env, tskeyp);
726			}
727			SWAP_IF_NEEDED(sdbp, &key);
728			FREE_IF_NEEDED(env, &skey);
729		}
730		if (ret == DB_NOTFOUND)
731			ret = 0;
732	}
733
734err:	if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
735		ret = t_ret;
736
737	if (pdbc != NULL && (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
738		ret = t_ret;
739
740	dbp->associate_locker = NULL;
741
742	for (; nskey > 0; nskey--, tskeyp++)
743		FREE_IF_NEEDED(env, tskeyp);
744	FREE_IF_NEEDED(env, &skey);
745
746	return (ret);
747}
748
749/*
750 * __db_secondary_get --
751 *	This wrapper function for DB->pget() is the DB->get() function
752 *	on a database which has been made into a secondary index.
753 */
754static int
755__db_secondary_get(sdbp, txn, skey, data, flags)
756	DB *sdbp;
757	DB_TXN *txn;
758	DBT *skey, *data;
759	u_int32_t flags;
760{
761	DB_ASSERT(sdbp->env, F_ISSET(sdbp, DB_AM_SECONDARY));
762	return (__db_pget_pp(sdbp, txn, skey, NULL, data, flags));
763}
764
765/*
766 * __db_secondary_close --
767 *	Wrapper function for DB->close() which we use on secondaries to
768 *	manage refcounting and make sure we don't close them underneath
769 *	a primary that is updating.
770 *
771 * PUBLIC: int __db_secondary_close __P((DB *, u_int32_t));
772 */
773int
774__db_secondary_close(sdbp, flags)
775	DB *sdbp;
776	u_int32_t flags;
777{
778	DB *primary;
779	ENV *env;
780	int doclose;
781
782	doclose = 0;
783	primary = sdbp->s_primary;
784	env = primary->env;
785
786	MUTEX_LOCK(env, primary->mutex);
787	/*
788	 * Check the refcount--if it was at 1 when we were called, no
789	 * thread is currently updating this secondary through the primary,
790	 * so it's safe to close it for real.
791	 *
792	 * If it's not safe to do the close now, we do nothing;  the
793	 * database will actually be closed when the refcount is decremented,
794	 * which can happen in either __db_s_next or __db_s_done.
795	 */
796	DB_ASSERT(env, sdbp->s_refcnt != 0);
797	if (--sdbp->s_refcnt == 0) {
798		LIST_REMOVE(sdbp, s_links);
799		/* We don't want to call close while the mutex is held. */
800		doclose = 1;
801	}
802	MUTEX_UNLOCK(env, primary->mutex);
803
804	/*
805	 * sdbp->close is this function;  call the real one explicitly if
806	 * need be.
807	 */
808	return (doclose ? __db_close(sdbp, NULL, flags) : 0);
809}
810
811/*
812 * __db_append_primary --
813 *	Perform the secondary index updates necessary to put(DB_APPEND)
814 *	a record to a primary database.
815 */
816static int
817__db_append_primary(dbc, key, data)
818	DBC *dbc;
819	DBT *key, *data;
820{
821	DB *dbp, *sdbp;
822	DBC *fdbc, *sdbc, *pdbc;
823	DBT fdata, oldpkey, pkey, pdata, skey;
824	ENV *env;
825	int cmp, ret, t_ret;
826
827	dbp = dbc->dbp;
828	env = dbp->env;
829	sdbp = NULL;
830	ret = 0;
831
832	/*
833	 * Worrying about partial appends seems a little like worrying
834	 * about Linear A character encodings.  But we support those
835	 * too if your application understands them.
836	 */
837	pdbc = NULL;
838	if (F_ISSET(data, DB_DBT_PARTIAL) || F_ISSET(key, DB_DBT_PARTIAL)) {
839		/*
840		 * The dbc we were passed is all set to pass things
841		 * back to the user;  we can't safely do a call on it.
842		 * Dup the cursor, grab the real data item (we don't
843		 * care what the key is--we've been passed it directly),
844		 * and use that instead of the data DBT we were passed.
845		 *
846		 * Note that we can get away with this simple get because
847		 * an appended item is by definition new, and the
848		 * correctly-constructed full data item from this partial
849		 * put is on the page waiting for us.
850		 */
851		if ((ret = __dbc_idup(dbc, &pdbc, DB_POSITION)) != 0)
852			return (ret);
853		memset(&pkey, 0, sizeof(DBT));
854		memset(&pdata, 0, sizeof(DBT));
855
856		if ((ret = __dbc_get(pdbc, &pkey, &pdata, DB_CURRENT)) != 0)
857			goto err;
858
859		key = &pkey;
860		data = &pdata;
861	}
862
863	/*
864	 * Loop through the secondary indices, putting a new item in
865	 * each that points to the appended item.
866	 *
867	 * This is much like the loop in "step 3" in __dbc_put, so
868	 * I'm not commenting heavily here;  it was unclean to excerpt
869	 * just that section into a common function, but the basic
870	 * overview is the same here.
871	 */
872	if ((ret = __db_s_first(dbp, &sdbp)) != 0)
873		goto err;
874	for (; sdbp != NULL && ret == 0; ret = __db_s_next(&sdbp, dbc->txn)) {
875		memset(&skey, 0, sizeof(DBT));
876		if ((ret = sdbp->s_callback(sdbp, key, data, &skey)) != 0) {
877			if (ret == DB_DONOTINDEX)
878				continue;
879			goto err;
880		}
881
882		/*
883		 * If this secondary index is associated with a foreign
884		 * database, check that the foreign db contains this key to
885		 * maintain referential integrity.  Set flags in fdata to avoid
886		 * mem copying, we just need to know existence.
887		 */
888		memset(&fdata, 0, sizeof(DBT));
889		F_SET(&fdata, DB_DBT_PARTIAL | DB_DBT_USERMEM);
890		if (sdbp->s_foreign != NULL) {
891			if ((ret = __db_cursor_int(sdbp->s_foreign,
892			   dbc->thread_info, dbc->txn, sdbp->s_foreign->type,
893			   PGNO_INVALID, 0, dbc->locker, &fdbc)) != 0)
894				goto err;
895			if ((ret = __dbc_get(fdbc, &skey, &fdata,
896			   DB_SET | (STD_LOCKING(dbc) ? DB_RMW : 0))) != 0) {
897				if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY)
898					ret = DB_FOREIGN_CONFLICT;
899				goto err;
900			}
901			if ((ret = __dbc_close(fdbc)) != 0)
902				goto err;
903		}
904
905		if ((ret = __db_cursor_int(sdbp, dbc->thread_info, dbc->txn,
906		    sdbp->type, PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0) {
907			FREE_IF_NEEDED(env, &skey);
908			goto err;
909		}
910		if (CDB_LOCKING(env)) {
911			DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID);
912			F_SET(sdbc, DBC_WRITER);
913		}
914
915		/*
916		 * Since we know we have a new primary key, it can't be a
917		 * duplicate duplicate in the secondary.  It can be a
918		 * duplicate in a secondary that doesn't support duplicates,
919		 * however, so we need to be careful to avoid an overwrite
920		 * (which would corrupt our index).
921		 */
922		if (!F_ISSET(sdbp, DB_AM_DUP)) {
923			memset(&oldpkey, 0, sizeof(DBT));
924			F_SET(&oldpkey, DB_DBT_MALLOC);
925			ret = __dbc_get(sdbc, &skey, &oldpkey,
926			    DB_SET | (STD_LOCKING(dbc) ? DB_RMW : 0));
927			if (ret == 0) {
928				cmp = __bam_defcmp(sdbp, &oldpkey, key);
929				/*
930				 * XXX
931				 * This needs to use the right free function
932				 * as soon as this is possible.
933				 */
934				__os_ufree(env, oldpkey.data);
935				if (cmp != 0) {
936					__db_errx(env, "%s%s",
937			    "Append results in a non-unique secondary key in",
938			    " an index not configured to support duplicates");
939					ret = EINVAL;
940					goto err1;
941				}
942			} else if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY)
943				goto err1;
944		}
945
946		ret = __dbc_put(sdbc, &skey, key, DB_UPDATE_SECONDARY);
947
948err1:		FREE_IF_NEEDED(env, &skey);
949
950		if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0)
951			ret = t_ret;
952		if (ret != 0)
953			goto err;
954	}
955
956err:	if (pdbc != NULL && (t_ret = __dbc_close(pdbc)) != 0 && ret == 0)
957		ret = t_ret;
958	if (sdbp != NULL &&
959	    (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0)
960		ret = t_ret;
961	return (ret);
962}
963
964/*
965 * __db_associate_foreign --
966 *	Associate this database (fdbp) as a foreign constraint to another
967 *	database (pdbp).  That is, dbp's keys appear as foreign key values in
968 *	pdbp.
969 *
970 * PUBLIC: int __db_associate_foreign __P((DB *, DB *,
971 * PUBLIC:     int (*)(DB *, const DBT *, DBT *, const DBT *, int *),
972 * PUBLIC:     u_int32_t));
973 */
974int
975__db_associate_foreign(fdbp, pdbp, callback, flags)
976	DB *fdbp, *pdbp;
977	int (*callback)(DB *, const DBT *, DBT *, const DBT *, int *);
978	u_int32_t flags;
979{
980	DB_FOREIGN_INFO *f_info;
981	ENV *env;
982	int ret;
983
984	env = fdbp->env;
985	ret = 0;
986
987	if ((ret = __os_malloc(env, sizeof(DB_FOREIGN_INFO), &f_info)) != 0) {
988		return ret;
989	}
990	memset(f_info, 0, sizeof(DB_FOREIGN_INFO));
991
992	f_info->dbp = pdbp;
993	f_info->callback = callback;
994
995	/*
996	 * It might be wise to filter this, but for now the flags only
997	 * set the delete action type.
998	 */
999	FLD_SET(f_info->flags, flags);
1000
1001	/*
1002	 * Add f_info to the foreign database's list of primaries.  That is to
1003	 * say, fdbp->f_primaries lists all databases for which fdbp is a
1004	 * foreign constraint.
1005	 */
1006	MUTEX_LOCK(env, fdbp->mutex);
1007	LIST_INSERT_HEAD(&fdbp->f_primaries, f_info, f_links);
1008	MUTEX_UNLOCK(env, fdbp->mutex);
1009
1010	/*
1011	* Associate fdbp as pdbp's foreign db, for referential integrity
1012	* checks.  We don't allow the foreign db to be changed, because we
1013	* currently have no way of removing pdbp from the old foreign db's list
1014	* of primaries.
1015	*/
1016	if (pdbp->s_foreign != NULL)
1017		return (EINVAL);
1018	pdbp->s_foreign = fdbp;
1019
1020	return (ret);
1021}
1022
1023static int
1024__dbc_set_priority(dbc, priority)
1025	DBC *dbc;
1026	DB_CACHE_PRIORITY priority;
1027{
1028	dbc->priority = priority;
1029	return (0);
1030}
1031
1032static int
1033__dbc_get_priority(dbc, priority)
1034	DBC *dbc;
1035	DB_CACHE_PRIORITY *priority;
1036{
1037	*priority = dbc->priority;
1038	return (0);
1039}
1040