1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1999-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/lock.h"
15#include "dbinc/log.h"
16#include "dbinc/mp.h"
17#include "dbinc/qam.h"
18
19static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
20static int __qamc_close __P((DBC *, db_pgno_t, int *));
21static int __qamc_del __P((DBC *, u_int32_t));
22static int __qamc_destroy __P((DBC *));
23static int __qamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
24static int __qamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
25static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
26static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
27
28#define	DONT_NEED_LOCKS(dbc) ((dbc)->txn == NULL ||			\
29	F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED))
30
31/*
32 * __qam_position --
33 *	Position a queued access method cursor at a record.  This returns
34 *	the page locked.  *exactp will be set if the record is valid.
35 * PUBLIC: int __qam_position
36 * PUBLIC:      __P((DBC *, db_recno_t *, u_int32_t, int *));
37 */
38int
39__qam_position(dbc, recnop, get_mode, exactp)
40	DBC *dbc;		/* open cursor */
41	db_recno_t *recnop;	/* pointer to recno to find */
42	u_int32_t get_mode;	/* flags to __memp_fget */
43	int *exactp;		/* indicate if it was found */
44{
45	DB *dbp;
46	QAMDATA  *qp;
47	QUEUE_CURSOR *cp;
48	db_pgno_t pg;
49	int ret;
50
51	dbp = dbc->dbp;
52	cp = (QUEUE_CURSOR *)dbc->internal;
53
54	/* Fetch the page for this recno. */
55	cp->pgno = pg = QAM_RECNO_PAGE(dbp, *recnop);
56
57	cp->page = NULL;
58	*exactp = 0;
59	if ((ret = __qam_fget(dbc, &pg, get_mode, &cp->page)) != 0) {
60		if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE) &&
61		    (ret == DB_PAGE_NOTFOUND || ret == ENOENT))
62			ret = 0;
63		return (ret);
64	}
65	cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
66
67	if (PGNO(cp->page) == 0) {
68		/*
69		 * We have read an uninitialized page: set the page number if
70		 * we're creating the page.  Otherwise, we know that the record
71		 * doesn't exist yet.
72		 */
73		if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE)) {
74			*exactp = 0;
75			return (0);
76		}
77		DB_ASSERT(dbp->env, FLD_ISSET(get_mode, DB_MPOOL_CREATE));
78		PGNO(cp->page) = pg;
79		TYPE(cp->page) = P_QAMDATA;
80	}
81
82	qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
83	*exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
84
85	return (ret);
86}
87
88/*
89 * __qam_pitem --
90 *	Put an item on a queue page.  Copy the data to the page and set the
91 *	VALID and SET bits.  If logging and the record was previously set,
92 *	log that data, otherwise just log the new data.
93 *
94 *   pagep must be write locked
95 *
96 * PUBLIC: int __qam_pitem
97 * PUBLIC:    __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
98 */
99int
100__qam_pitem(dbc, pagep, indx, recno, data)
101	DBC *dbc;
102	QPAGE *pagep;
103	u_int32_t indx;
104	db_recno_t recno;
105	DBT *data;
106{
107	DB *dbp;
108	DBT olddata, pdata, *datap;
109	ENV *env;
110	QAMDATA *qp;
111	QUEUE *t;
112	u_int8_t *dest, *p;
113	int allocated, ret;
114
115	dbp = dbc->dbp;
116	env = dbp->env;
117	t = (QUEUE *)dbp->q_internal;
118	allocated = ret = 0;
119
120	if (data->size > t->re_len)
121		return (__db_rec_toobig(env, data->size, t->re_len));
122	qp = QAM_GET_RECORD(dbp, pagep, indx);
123
124	p = qp->data;
125	datap = data;
126	if (F_ISSET(data, DB_DBT_PARTIAL)) {
127		if (data->doff + data->dlen > t->re_len) {
128			__db_errx(env,
129		"%s: data offset plus length larger than record size of %lu",
130			    "Record length error", (u_long)t->re_len);
131			return (EINVAL);
132		}
133
134		if (data->size != data->dlen)
135			return (__db_rec_repl(env, data->size, data->dlen));
136
137		if (data->size == t->re_len)
138			goto no_partial;
139
140		/*
141		 * If we are logging, then we have to build the record
142		 * first, otherwise, we can simply drop the change
143		 * directly on the page.  After this clause, make
144		 * sure that datap and p are set up correctly so that
145		 * copying datap into p does the right thing.
146		 *
147		 * Note, I am changing this so that if the existing
148		 * record is not valid, we create a complete record
149		 * to log so that both this and the recovery code is simpler.
150		 */
151
152		if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
153			datap = &pdata;
154			memset(datap, 0, sizeof(*datap));
155
156			if ((ret = __os_malloc(env,
157			    t->re_len, &datap->data)) != 0)
158				return (ret);
159			allocated = 1;
160			datap->size = t->re_len;
161
162			/*
163			 * Construct the record if it's valid, otherwise set it
164			 * all to the pad character.
165			 */
166			dest = datap->data;
167			if (F_ISSET(qp, QAM_VALID))
168				memcpy(dest, p, t->re_len);
169			else
170				memset(dest, (int)t->re_pad, t->re_len);
171
172			dest += data->doff;
173			memcpy(dest, data->data, data->size);
174		} else {
175			datap = data;
176			p += data->doff;
177		}
178	}
179
180no_partial:
181	if (DBC_LOGGING(dbc)) {
182		olddata.size = 0;
183		if (F_ISSET(qp, QAM_SET)) {
184			olddata.data = qp->data;
185			olddata.size = t->re_len;
186		}
187		if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
188		    0, &LSN(pagep), pagep->pgno,
189		    indx, recno, datap, qp->flags,
190		    olddata.size == 0 ? NULL : &olddata)) != 0)
191			goto err;
192	} else if (!F_ISSET((dbc), DBC_RECOVER))
193		LSN_NOT_LOGGED(LSN(pagep));
194
195	F_SET(qp, QAM_VALID | QAM_SET);
196	memcpy(p, datap->data, datap->size);
197	if (!F_ISSET(data, DB_DBT_PARTIAL))
198		memset(p + datap->size,
199		    (int)t->re_pad, t->re_len - datap->size);
200
201err:	if (allocated)
202		__os_free(env, datap->data);
203
204	return (ret);
205}
206/*
207 * __qamc_put
208 *	Cursor put for queued access method.
209 *	BEFORE and AFTER cannot be specified.
210 */
211static int
212__qamc_put(dbc, key, data, flags, pgnop)
213	DBC *dbc;
214	DBT *key, *data;
215	u_int32_t flags;
216	db_pgno_t *pgnop;
217{
218	DB *dbp;
219	DB_MPOOLFILE *mpf;
220	ENV *env;
221	QMETA *meta;
222	QUEUE_CURSOR *cp;
223	db_pgno_t pg;
224	db_recno_t new_cur, new_first;
225	u_int32_t opcode;
226	int exact, ret, t_ret, writelock;
227
228	dbp = dbc->dbp;
229	env = dbp->env;
230	mpf = dbp->mpf;
231	if (pgnop != NULL)
232		*pgnop = PGNO_INVALID;
233
234	cp = (QUEUE_CURSOR *)dbc->internal;
235
236	switch (flags) {
237	case DB_KEYFIRST:
238	case DB_KEYLAST:
239	case DB_NOOVERWRITE:
240	case DB_OVERWRITE_DUP:
241		if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
242			return (ret);
243		/* FALLTHROUGH */
244	case DB_CURRENT:
245		break;
246	default:
247		/* The interface shouldn't let anything else through. */
248		return (__db_ferr(env, "DBC->put", 0));
249	}
250
251	/* Write lock the record. */
252	if ((ret = __db_lget(dbc, LCK_COUPLE,
253	    cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
254		return (ret);
255
256	if ((ret = __qam_position(dbc, &cp->recno,
257	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0) {
258		/* We could not get the page, we can release the record lock. */
259		(void)__LPUT(dbc, cp->lock);
260		return (ret);
261	}
262
263	if (exact != 0 && flags == DB_NOOVERWRITE)
264		ret = DB_KEYEXIST;
265	else
266		/* Put the item on the page. */
267		ret = __qam_pitem(dbc,
268		     (QPAGE *)cp->page, cp->indx, cp->recno, data);
269
270	if ((t_ret = __qam_fput(dbc,
271	    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
272		ret = t_ret;
273	cp->page = NULL;
274	cp->lock_mode = DB_LOCK_WRITE;
275	if (ret != 0)
276		return (ret);
277
278	/* We may need to reset the head or tail of the queue. */
279	pg = ((QUEUE *)dbp->q_internal)->q_meta;
280
281	writelock = 0;
282	if ((ret = __db_lget(dbc, LCK_COUPLE,
283	    pg,  DB_LOCK_READ, 0, &cp->lock)) != 0)
284		return (ret);
285	if ((ret = __memp_fget(mpf, &pg,
286	    dbc->thread_info, dbc->txn, 0, &meta)) != 0)
287		goto err;
288
289	opcode = 0;
290	new_cur = new_first = 0;
291
292	/*
293	 * If the put address is outside the queue, adjust the head and
294	 * tail of the queue.  If the order is inverted we move
295	 * the one which is closer.  The first case is when the
296	 * queue is empty, move first and current to where the new
297	 * insert is.
298	 */
299
300recheck:
301	if (meta->first_recno == meta->cur_recno) {
302		new_first = cp->recno;
303		new_cur = cp->recno + 1;
304		if (new_cur == RECNO_OOB)
305			new_cur++;
306		opcode |= QAM_SETFIRST;
307		opcode |= QAM_SETCUR;
308	} else {
309		if (QAM_BEFORE_FIRST(meta, cp->recno)) {
310			new_first = cp->recno;
311			opcode |= QAM_SETFIRST;
312		}
313
314		if (QAM_AFTER_CURRENT(meta, cp->recno)) {
315			new_cur = cp->recno + 1;
316			if (new_cur == RECNO_OOB)
317				new_cur++;
318			opcode |= QAM_SETCUR;
319		}
320	}
321
322	if (opcode == 0)
323		goto done;
324
325	/* Drop the read lock and get the a write lock on the meta page. */
326	if (writelock == 0 && (ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
327	     pg,  DB_LOCK_WRITE, 0, &cp->lock)) != 0)
328		goto done;
329	if (writelock++ == 0)
330		goto recheck;
331
332	if (((ret = __memp_dirty(mpf, &meta,
333	    dbc->thread_info, dbc->txn, dbc->priority, DB_MPOOL_DIRTY)) != 0 ||
334	    (DBC_LOGGING(dbc) &&
335	    (ret = __qam_mvptr_log(dbp, dbc->txn,
336	    &meta->dbmeta.lsn, 0, opcode, meta->first_recno,
337	    new_first, meta->cur_recno, new_cur,
338	    &meta->dbmeta.lsn, PGNO_BASE_MD)) != 0)))
339		opcode = 0;
340
341	if (opcode & QAM_SETCUR)
342		meta->cur_recno = new_cur;
343	if (opcode & QAM_SETFIRST)
344		meta->first_recno = new_first;
345
346done:	if (meta != NULL && (t_ret = __memp_fput(mpf,
347	    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
348		ret = t_ret;
349
350err:	/* Don't hold the meta page long term. */
351	if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
352		ret = t_ret;
353	return (ret);
354}
355
356/*
357 * __qam_append --
358 *	Perform a put(DB_APPEND) in queue.
359 *
360 * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
361 */
362int
363__qam_append(dbc, key, data)
364	DBC *dbc;
365	DBT *key, *data;
366{
367	DB *dbp;
368	DB_LOCK lock;
369	DB_MPOOLFILE *mpf;
370	QMETA *meta;
371	QPAGE *page;
372	QUEUE *qp;
373	QUEUE_CURSOR *cp;
374	db_pgno_t pg, metapg;
375	db_recno_t recno;
376	int ret, t_ret;
377
378	dbp = dbc->dbp;
379	mpf = dbp->mpf;
380	cp = (QUEUE_CURSOR *)dbc->internal;
381
382	/* Write lock the meta page. */
383	metapg = ((QUEUE *)dbp->q_internal)->q_meta;
384	if ((ret = __db_lget(dbc, 0, metapg,  DB_LOCK_WRITE, 0, &lock)) != 0)
385		return (ret);
386	if ((ret = __memp_fget(mpf, &metapg,
387	    dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0)
388		return (ret);
389
390	/* Get the next record number. */
391	recno = meta->cur_recno;
392	meta->cur_recno++;
393	if (meta->cur_recno == RECNO_OOB)
394		meta->cur_recno++;
395	if (meta->cur_recno == meta->first_recno) {
396		meta->cur_recno--;
397		if (meta->cur_recno == RECNO_OOB)
398			meta->cur_recno--;
399
400		if (ret == 0)
401			ret = EFBIG;
402		goto err;
403	}
404
405	if (QAM_BEFORE_FIRST(meta, recno))
406		meta->first_recno = recno;
407
408	/* Lock the record and release meta page lock. */
409	ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
410	    recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock);
411	/* Release the meta page. */
412	if ((t_ret = __memp_fput(mpf,
413	    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
414		ret = t_ret;
415	meta = NULL;
416
417	/*
418	 * The application may modify the data based on the selected record
419	 * number.  We always want to call this even if we ultimately end
420	 * up aborting, because we are allocating a record number, regardless.
421	 */
422	if (dbc->dbp->db_append_recno != NULL &&
423	    (t_ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0 &&
424	    ret == 0)
425		ret = t_ret;
426
427	/*
428	 * Capture errors from either the lock couple or the call to
429	 * dbp->db_append_recno.
430	 */
431	if (ret != 0)
432		goto err;
433
434	cp->lock = lock;
435	cp->lock_mode = DB_LOCK_WRITE;
436	LOCK_INIT(lock);
437
438	pg = QAM_RECNO_PAGE(dbp, recno);
439
440	/* Fetch for write the data page. */
441	if ((ret = __qam_fget(dbc, &pg,
442	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &page)) != 0)
443		goto err;
444
445	/* See if this is a new page. */
446	if (page->pgno == 0) {
447		page->pgno = pg;
448		page->type = P_QAMDATA;
449	}
450
451	/* Put the item on the page and log it. */
452	ret = __qam_pitem(dbc, page,
453	    QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
454
455	if ((t_ret = __qam_fput(dbc,
456	    pg, page, dbc->priority)) != 0 && ret == 0)
457		ret = t_ret;
458
459	/* Return the record number to the user. */
460	if (ret == 0 && key != NULL)
461		ret = __db_retcopy(dbp->env, key,
462		    &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
463
464	/* Position the cursor on this record. */
465	cp->recno = recno;
466
467	/* See if we are leaving the extent. */
468	qp = (QUEUE *) dbp->q_internal;
469	if (qp->page_ext != 0 &&
470	    (recno % (qp->page_ext * qp->rec_page) == 0 ||
471	    recno == UINT32_MAX)) {
472		if ((ret = __db_lget(dbc,
473		    0, metapg, DB_LOCK_READ, 0, &lock)) != 0)
474			goto err;
475		if ((ret = __memp_fget(mpf, &metapg,
476		    dbc->thread_info, dbc->txn, 0, &meta)) != 0)
477			goto err;
478		if (!QAM_AFTER_CURRENT(meta, recno))
479			ret = __qam_fclose(dbp, pg);
480	}
481
482err:	/* Release the meta page. */
483	if (meta != NULL && (t_ret = __memp_fput(mpf,
484	    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
485		ret = t_ret;
486	if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
487		ret = t_ret;
488
489	return (ret);
490}
491
492/*
493 * __qamc_del --
494 *	Qam cursor->am_del function
495 */
496static int
497__qamc_del(dbc, flags)
498	DBC *dbc;
499	u_int32_t flags;
500{
501	DB *dbp;
502	DBT data;
503	DB_LOCK metalock;
504	DB_MPOOLFILE *mpf;
505	PAGE *pagep;
506	QAMDATA *qp;
507	QMETA *meta;
508	QUEUE_CURSOR *cp;
509	db_pgno_t pg;
510	db_recno_t first;
511	int exact, ret, t_ret;
512
513	dbp = dbc->dbp;
514	mpf = dbp->mpf;
515	cp = (QUEUE_CURSOR *)dbc->internal;
516
517	pg = ((QUEUE *)dbp->q_internal)->q_meta;
518	/* Read lock the meta page. */
519	if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &metalock)) != 0)
520		return (ret);
521
522	if ((ret = __memp_fget(mpf, &pg,
523	    dbc->thread_info, dbc->txn, 0, &meta)) != 0)
524		return (ret);
525
526	if (QAM_NOT_VALID(meta, cp->recno)) {
527		ret = DB_NOTFOUND;
528		goto err;
529	}
530	first = meta->first_recno;
531
532	/* Don't hold the meta page long term. */
533	if ((ret = __memp_fput(mpf,
534	     dbc->thread_info, meta, dbc->priority)) != 0)
535		goto err;
536	meta = NULL;
537	if ((ret = __LPUT(dbc, metalock)) != 0)
538		goto err;
539
540	/* Get the record. */
541	if ((ret = __db_lget(dbc, LCK_COUPLE,
542	    cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
543		goto err;
544	cp->lock_mode = DB_LOCK_WRITE;
545
546	/* Find the record; delete only deletes exact matches. */
547	if ((ret = __qam_position(dbc, &cp->recno,
548	    DB_MPOOL_DIRTY, &exact)) != 0)
549		goto err;
550
551	if (!exact) {
552		ret = DB_NOTFOUND;
553		goto err;
554	}
555
556	pagep = cp->page;
557	qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
558
559	if (DBC_LOGGING(dbc)) {
560		if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
561		    ((QUEUE *)dbp->q_internal)->re_len == 0) {
562			if ((ret = __qam_del_log(dbp,
563			    dbc->txn, &LSN(pagep), 0, &LSN(pagep),
564			    pagep->pgno, cp->indx, cp->recno)) != 0)
565				goto err;
566		} else {
567			data.size = ((QUEUE *)dbp->q_internal)->re_len;
568			data.data = qp->data;
569			if ((ret = __qam_delext_log(dbp,
570			    dbc->txn, &LSN(pagep), 0, &LSN(pagep),
571			    pagep->pgno, cp->indx, cp->recno, &data)) != 0)
572				goto err;
573		}
574	} else
575		LSN_NOT_LOGGED(LSN(pagep));
576
577	F_CLR(qp, QAM_VALID);
578	if ((ret = __qam_fput(dbc,
579	    cp->pgno, cp->page, dbc->priority)) != 0)
580		goto err;
581	cp->page = NULL;
582
583	/*
584	 * Other threads cannot move first_recno past
585	 * our position while we have the record locked.
586	 * If it's pointing at the deleted record then lock
587	 * the metapage and check again as lower numbered
588	 * record may have been inserted.
589	 */
590	if (LF_ISSET(DB_CONSUME) || cp->recno == first) {
591		pg = ((QUEUE *)dbp->q_internal)->q_meta;
592		if ((ret =
593		    __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &metalock)) != 0)
594			goto err;
595		if ((ret = __memp_fget(mpf, &pg,
596		    dbc->thread_info, dbc->txn, 0, &meta)) != 0)
597			goto err;
598		if (LF_ISSET(DB_CONSUME) || cp->recno == meta->first_recno)
599			ret = __qam_consume(dbc, meta, meta->first_recno);
600	}
601
602err:	if (meta != NULL && (t_ret = __memp_fput(mpf, dbc->thread_info,
603	    meta, dbc->priority)) != 0 && ret == 0)
604		ret = t_ret;
605	/* Don't hold the meta page long term. */
606	if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
607		ret = t_ret;
608
609	if (cp->page != NULL &&
610	    (t_ret = __qam_fput(dbc,
611	    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
612		ret = t_ret;
613	cp->page = NULL;
614
615	return (ret);
616}
617
618#ifdef	DEBUG_WOP
619#define	QDEBUG
620#endif
621
622/*
623 * __qamc_get --
624 *	Queue DBC->get function.
625 */
626static int
627__qamc_get(dbc, key, data, flags, pgnop)
628	DBC *dbc;
629	DBT *key, *data;
630	u_int32_t flags;
631	db_pgno_t *pgnop;
632{
633	DB *dbp;
634	DBC *dbcdup;
635	DBT tmp;
636	DB_LOCK lock, metalock;
637	DB_MPOOLFILE *mpf;
638	ENV *env;
639	PAGE *pg;
640	QAMDATA *qp;
641	QMETA *meta;
642	QUEUE *t;
643	QUEUE_CURSOR *cp;
644	db_lockmode_t lock_mode, meta_mode;
645	db_pgno_t metapno;
646	db_recno_t first;
647	int exact, inorder, is_first, locked, ret, t_ret, wait, with_delete;
648	int retrying;
649
650	dbp = dbc->dbp;
651	env = dbp->env;
652	mpf = dbp->mpf;
653	cp = (QUEUE_CURSOR *)dbc->internal;
654	LOCK_INIT(lock);
655
656	lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
657	meta_mode = DB_LOCK_READ;
658	meta = NULL;
659	*pgnop = 0;
660	pg = NULL;
661	retrying = t_ret = wait = with_delete = 0;
662
663	if (flags == DB_CONSUME_WAIT) {
664		wait = 1;
665		flags = DB_CONSUME;
666	}
667	if (flags == DB_CONSUME) {
668		with_delete = 1;
669		flags = DB_FIRST;
670		meta_mode = lock_mode = DB_LOCK_WRITE;
671	}
672	inorder = F_ISSET(dbp, DB_AM_INORDER) && with_delete;
673
674	DEBUG_LREAD(dbc, dbc->txn, "qamc_get",
675	    flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
676
677	/* Make lint and friends happy. */
678	locked = 0;
679
680	is_first = 0;
681	first = 0;
682
683	t = (QUEUE *)dbp->q_internal;
684	metapno = t->q_meta;
685	LOCK_INIT(metalock);
686
687	/*
688	 * Get the meta page first
689	 */
690	if (locked == 0 && (ret = __db_lget(dbc,
691	    0, metapno, meta_mode, 0, &metalock)) != 0)
692		goto err;
693	locked = 1;
694	if ((ret = __memp_fget(mpf, &metapno,
695	     dbc->thread_info, dbc->txn, 0, &meta)) != 0)
696		return (ret);
697
698	/* Release any previous lock if not in a transaction. */
699	if ((ret = __TLPUT(dbc, cp->lock)) != 0)
700		goto err;
701
702retry:	/* Update the record number. */
703	switch (flags) {
704	case DB_CURRENT:
705		break;
706	case DB_NEXT_DUP:
707	case DB_PREV_DUP:
708		ret = DB_NOTFOUND;
709		goto err;
710		/* NOTREACHED */
711	case DB_NEXT:
712	case DB_NEXT_NODUP:
713		if (cp->recno != RECNO_OOB) {
714			++cp->recno;
715			/* Wrap around, skipping zero. */
716			if (cp->recno == RECNO_OOB)
717				cp->recno++;
718			/*
719			 * Check to see if we are out of data.
720			 */
721			if (QAM_AFTER_CURRENT(meta, cp->recno)) {
722				pg = NULL;
723				if (!wait) {
724					ret = DB_NOTFOUND;
725					goto err;
726				}
727				flags = DB_FIRST;
728				/*
729				 * If first is not set, then we skipped
730				 * a locked record, go back and find it.
731				 * If we find a locked record again
732				 * wait for it.
733				 */
734				if (first == 0) {
735					retrying = 1;
736					goto retry;
737				}
738
739				if (CDB_LOCKING(env)) {
740					/* Drop the metapage before we wait. */
741					ret = __memp_fput(mpf, dbc->thread_info,
742					    meta, dbc->priority);
743					meta = NULL;
744					if (ret != 0)
745						goto err;
746					if ((ret = __lock_get(
747					    env, dbc->locker,
748					    DB_LOCK_SWITCH, &dbc->lock_dbt,
749					    DB_LOCK_WAIT, &dbc->mylock)) != 0)
750						goto err;
751
752					if ((ret = __lock_get(
753					    env, dbc->locker,
754					    DB_LOCK_UPGRADE, &dbc->lock_dbt,
755					    DB_LOCK_WRITE, &dbc->mylock)) != 0)
756						goto err;
757					if ((ret = __memp_fget(mpf, &metapno,
758					    dbc->thread_info,
759					    dbc->txn, 0, &meta)) != 0)
760						goto err;
761					goto retry;
762				}
763				/*
764				 * Wait for someone to update the meta page.
765				 * This will probably mean there is something
766				 * in the queue.  We then go back up and
767				 * try again.
768				 */
769				if (locked == 0) {
770					if ((ret = __db_lget(dbc, 0, metapno,
771					    meta_mode, 0, &metalock)) != 0)
772						goto err;
773					locked = 1;
774					if (cp->recno != RECNO_OOB &&
775					    !QAM_AFTER_CURRENT(meta, cp->recno))
776						goto retry;
777				}
778				/* Drop the metapage before we wait. */
779				ret = __memp_fput(mpf,
780				     dbc->thread_info, meta, dbc->priority);
781				meta = NULL;
782				if (ret != 0)
783					goto err;
784				if ((ret = __db_lget(dbc,
785				    0, metapno, DB_LOCK_WAIT,
786				    DB_LOCK_SWITCH, &metalock)) != 0) {
787					if (ret == DB_LOCK_DEADLOCK)
788						ret = DB_LOCK_NOTGRANTED;
789					goto err;
790				}
791				if ((ret = __db_lget(dbc, 0,
792				    PGNO_INVALID, DB_LOCK_WRITE,
793				    DB_LOCK_UPGRADE, &metalock)) != 0) {
794					if (ret == DB_LOCK_DEADLOCK)
795						ret = DB_LOCK_NOTGRANTED;
796					goto err;
797				}
798				if ((ret = __memp_fget(mpf,
799				     &metapno, dbc->thread_info, dbc->txn,
800				    0, &meta)) != 0)
801					goto err;
802				locked = 1;
803				goto retry;
804			}
805			break;
806		}
807		/* FALLTHROUGH */
808	case DB_FIRST:
809		flags = DB_NEXT;
810		is_first = 1;
811
812		/* get the first record number */
813		cp->recno = first = meta->first_recno;
814
815		break;
816	case DB_PREV:
817	case DB_PREV_NODUP:
818		if (cp->recno != RECNO_OOB) {
819			if (cp->recno == meta->first_recno ||
820			   QAM_BEFORE_FIRST(meta, cp->recno)) {
821				ret = DB_NOTFOUND;
822				goto err;
823			}
824			--cp->recno;
825			/* Wrap around, skipping zero. */
826			if (cp->recno == RECNO_OOB)
827				--cp->recno;
828			break;
829		}
830		/* FALLTHROUGH */
831	case DB_LAST:
832		if (meta->first_recno == meta->cur_recno) {
833			ret = DB_NOTFOUND;
834			goto err;
835		}
836		cp->recno = meta->cur_recno - 1;
837		if (cp->recno == RECNO_OOB)
838			cp->recno--;
839		break;
840	case DB_SET:
841	case DB_SET_RANGE:
842	case DB_GET_BOTH:
843	case DB_GET_BOTH_RANGE:
844		if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
845			goto err;
846		break;
847	default:
848		ret = __db_unknown_flag(env, "__qamc_get", flags);
849		goto err;
850	}
851
852	/* Don't hold the meta page long term. */
853	if (locked) {
854		if ((ret = __LPUT(dbc, metalock)) != 0)
855			goto err;
856		locked = 0;
857	}
858
859	if ((ret = __memp_fput(mpf,
860	    dbc->thread_info, meta, dbc->priority)) != 0)
861		goto err;
862	meta = NULL;
863
864	/* Lock the record. */
865	if (((ret = __db_lget(dbc, LCK_COUPLE, cp->recno, lock_mode,
866	    (with_delete && !inorder && !retrying) ?
867	    DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
868	    &lock)) == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) &&
869	    with_delete) {
870#ifdef QDEBUG
871		if (DBC_LOGGING(dbc))
872			(void)__log_printf(env,
873			    dbc->txn, "Queue S: %x %d %d",
874			    dbc->locker ? dbc->locker->id : 0,
875			    cp->recno, first);
876#endif
877		first = 0;
878		if ((ret =
879		    __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
880			goto err;
881		locked = 1;
882		if ((ret = __memp_fget(mpf, &metapno,
883		     dbc->thread_info, dbc->txn, 0, &meta)) != 0)
884			goto err;
885		goto retry;
886	}
887
888	if (ret != 0)
889		goto err;
890
891	/*
892	 * In the DB_FIRST or DB_LAST cases we must wait and then start over
893	 * since the first/last may have moved while we slept.  If we are
894	 * reading in order and the first record was not there, we can skip it
895	 * as it must have been aborted was was skipped by a non-queue insert
896	 * or we could not have gotten its lock.  If we have the wrong
897	 * record we release our locks and try again.
898	 */
899	switch (flags) {
900	default:
901		if (inorder) {
902			if (first != cp->recno)
903				break;
904		} else if (with_delete || !is_first)
905			break;
906		/* FALLTHROUGH */
907	case DB_SET:
908	case DB_SET_RANGE:
909	case DB_GET_BOTH:
910	case DB_GET_BOTH_RANGE:
911	case DB_LAST:
912		if ((ret =
913		    __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
914			goto lerr;
915		locked = 1;
916		if ((ret = __memp_fget(mpf, &metapno,
917		     dbc->thread_info, dbc->txn, 0, &meta)) != 0)
918			goto lerr;
919		if ((is_first && cp->recno != meta->first_recno) ||
920		    (flags == DB_LAST && cp->recno != meta->cur_recno - 1)) {
921			if ((ret = __LPUT(dbc, lock)) != 0)
922				goto err;
923			if (is_first)
924				flags = DB_FIRST;
925			goto retry;
926		} else if (!is_first && flags != DB_LAST) {
927			if (QAM_BEFORE_FIRST(meta, cp->recno)) {
928				if (flags == DB_SET_RANGE ||
929				    flags == DB_GET_BOTH_RANGE) {
930					if ((ret = __LPUT(dbc, metalock)) != 0)
931						goto err;
932					locked = 0;
933					cp->lock = lock;
934					LOCK_INIT(lock);
935					goto release_retry;
936				}
937				ret = DB_NOTFOUND;
938				goto lerr;
939			}
940			if (QAM_AFTER_CURRENT(meta, cp->recno)) {
941				ret = DB_NOTFOUND;
942				goto lerr;
943			}
944		}
945		/* Don't hold the meta page long term. */
946		if ((ret = __LPUT(dbc, metalock)) != 0)
947			goto err;
948		locked = 0;
949		if ((ret = __memp_fput(mpf,
950		    dbc->thread_info, meta, dbc->priority)) != 0)
951			goto err;
952		meta = NULL;
953	}
954
955	/* Position the cursor on the record. */
956	if ((ret = __qam_position(dbc, &cp->recno, 0, &exact)) != 0) {
957		/* We cannot get the page, release the record lock. */
958		(void)__LPUT(dbc, lock);
959		goto err;
960	}
961
962	pg = cp->page;
963	cp->lock = lock;
964	cp->lock_mode = lock_mode;
965	LOCK_INIT(lock);
966
967	if (!exact) {
968release_retry:	/* Release locks and retry, if possible. */
969		if (pg != NULL)
970			(void)__qam_fput(dbc, cp->pgno, pg, dbc->priority);
971		cp->page = pg = NULL;
972		if (with_delete) {
973			if ((ret = __LPUT(dbc, cp->lock)) != 0)
974				goto err1;
975		} else if ((ret = __TLPUT(dbc, cp->lock)) != 0)
976			goto err1;
977
978		if (locked == 0 && (ret =
979		    __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
980			goto err1;
981		locked = 1;
982		if (meta == NULL && (ret = __memp_fget(mpf, &metapno,
983		     dbc->thread_info, dbc->txn, 0, &meta)) != 0)
984			goto err1;
985		/*
986		 * If we don't need locks and we are out of range
987		 * then we can just skip to the FIRST/LAST record
988		 * otherwise we must iterate to lock the records
989		 * and get serializability.
990		 */
991		switch (flags) {
992		case DB_NEXT:
993		case DB_NEXT_NODUP:
994			if (!with_delete)
995				is_first = 0;
996			if (QAM_BEFORE_FIRST(meta, cp->recno) &&
997			    DONT_NEED_LOCKS(dbc))
998				flags = DB_FIRST;
999			break;
1000		case DB_LAST:
1001		case DB_PREV:
1002		case DB_PREV_NODUP:
1003			if (QAM_AFTER_CURRENT(meta, cp->recno) &&
1004			    DONT_NEED_LOCKS(dbc))
1005				flags = DB_LAST;
1006			else
1007				flags = DB_PREV;
1008			break;
1009
1010		case DB_GET_BOTH_RANGE:
1011		case DB_SET_RANGE:
1012			if (QAM_BEFORE_FIRST(meta, cp->recno) &&
1013			    DONT_NEED_LOCKS(dbc))
1014				flags = DB_FIRST;
1015			else
1016				flags = DB_NEXT;
1017			break;
1018
1019		default:
1020			/* this is for the SET and GET_BOTH cases */
1021			ret = DB_KEYEMPTY;
1022			goto err1;
1023		}
1024		retrying = 0;
1025		goto retry;
1026	}
1027
1028	qp = QAM_GET_RECORD(dbp, pg, cp->indx);
1029
1030	/* Return the data item. */
1031	if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
1032		/*
1033		 * Need to compare
1034		 */
1035		tmp.data = qp->data;
1036		tmp.size = t->re_len;
1037		if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
1038			if (flags == DB_GET_BOTH_RANGE)
1039				goto release_retry;
1040			ret = DB_NOTFOUND;
1041			goto err1;
1042		}
1043	}
1044
1045	/* Return the key if the user didn't give us one. */
1046	if (key != NULL && !F_ISSET(key, DB_DBT_ISSET)) {
1047		if ((ret = __db_retcopy(dbp->env,
1048		    key, &cp->recno, sizeof(cp->recno),
1049		    &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
1050			goto err1;
1051		F_SET(key, DB_DBT_ISSET);
1052	}
1053
1054	if (data != NULL &&
1055	    !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
1056	    !F_ISSET(data, DB_DBT_ISSET)) {
1057		if ((ret = __db_retcopy(dbp->env, data, qp->data, t->re_len,
1058		    &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
1059			goto err1;
1060		F_SET(data, DB_DBT_ISSET);
1061	}
1062
1063	/* Finally, if we are doing DB_CONSUME mark the record. */
1064	if (with_delete) {
1065		/*
1066		 * Assert that we're not a secondary index.  Doing a DB_CONSUME
1067		 * on a secondary makes very little sense, since one can't
1068		 * DB_APPEND there;  attempting one should be forbidden by
1069		 * the interface.
1070		 */
1071		DB_ASSERT(env, !F_ISSET(dbp, DB_AM_SECONDARY));
1072
1073		/*
1074		 * If we have any secondary indices, call __dbc_del_primary to
1075		 * delete the references to the item we're about to delete.
1076		 *
1077		 * Note that we work on a duplicated cursor, since the
1078		 * __db_ret work has already been done, so it's not safe
1079		 * to perform any additional ops on this cursor.
1080		 */
1081		if (DB_IS_PRIMARY(dbp)) {
1082			if ((ret = __dbc_idup(dbc,
1083			    &dbcdup, DB_POSITION)) != 0)
1084				goto err1;
1085
1086			if ((ret = __qam_fput(dbc,
1087			    cp->pgno, cp->page, dbc->priority)) != 0)
1088				goto err1;
1089			cp->page = NULL;
1090			if (meta != NULL &&
1091			    (ret = __memp_fput(mpf,
1092			    dbc->thread_info, meta, dbc->priority)) != 0)
1093				goto err1;
1094			meta = NULL;
1095			if ((ret = __dbc_del_primary(dbcdup)) != 0) {
1096				/*
1097				 * The __dbc_del_primary return is more
1098				 * interesting.
1099				 */
1100				(void)__dbc_close(dbcdup);
1101				goto err1;
1102			}
1103
1104			if ((ret = __dbc_close(dbcdup)) != 0)
1105				goto err1;
1106			if ((ret = __qam_fget(dbc,
1107			    &cp->pgno, DB_MPOOL_DIRTY, &cp->page)) != 0)
1108				goto err;
1109		} else if ((ret = __qam_dirty(dbc,
1110		     cp->pgno, &cp->page, dbc->priority)) != 0)
1111			goto err1;
1112
1113		pg = cp->page;
1114
1115		if (DBC_LOGGING(dbc)) {
1116			if (t->page_ext == 0 || t->re_len == 0) {
1117				if ((ret = __qam_del_log(dbp, dbc->txn,
1118				    &LSN(pg), 0, &LSN(pg),
1119				    pg->pgno, cp->indx, cp->recno)) != 0)
1120					goto err1;
1121			} else {
1122				tmp.data = qp->data;
1123				tmp.size = t->re_len;
1124				if ((ret = __qam_delext_log(dbp,
1125				   dbc->txn, &LSN(pg), 0, &LSN(pg),
1126				   pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
1127					goto err1;
1128			}
1129		} else
1130			LSN_NOT_LOGGED(LSN(pg));
1131
1132		F_CLR(qp, QAM_VALID);
1133		if ((ret = __qam_fput(dbc,
1134		    cp->pgno, cp->page, dbc->priority)) != 0)
1135			goto err;
1136		cp->page = NULL;
1137
1138		/*
1139		 * Now we need to update the metapage
1140		 * first pointer. If we have deleted
1141		 * the record that is pointed to by
1142		 * first_recno then we move it as far
1143		 * forward as we can without blocking.
1144		 * The metapage lock must be held for
1145		 * the whole scan otherwise someone could
1146		 * do a random insert behind where we are
1147		 * looking.
1148		 */
1149
1150		if (locked == 0 && (ret = __db_lget(
1151		    dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
1152			goto err1;
1153		locked = 1;
1154		if (meta == NULL &&
1155		     (ret = __memp_fget(mpf,
1156		     &metapno, dbc->thread_info, dbc->txn, 0, &meta)) != 0)
1157			goto err1;
1158
1159#ifdef QDEBUG
1160		if (DBC_LOGGING(dbc))
1161			(void)__log_printf(env,
1162			    dbc->txn, "Queue D: %x %d %d %d",
1163			    dbc->locker ? dbc->locker->id : 0,
1164			    cp->recno, first, meta->first_recno);
1165#endif
1166		/*
1167		 * See if we deleted the "first" record.  If
1168		 * first is zero then we skipped something,
1169		 * see if first_recno has been move passed
1170		 * that to the record that we deleted.
1171		 */
1172		if (first == 0)
1173			first = cp->recno;
1174		if (first != meta->first_recno)
1175			goto done;
1176
1177		if ((ret = __qam_consume(dbc, meta, first)) != 0)
1178			goto err1;
1179	}
1180
1181done:
1182err1:	if (cp->page != NULL) {
1183		if ((t_ret = __qam_fput(dbc,
1184		    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
1185			ret = t_ret;
1186
1187		cp->page = NULL;
1188	}
1189	if (0) {
1190lerr:		(void)__LPUT(dbc, lock);
1191	}
1192
1193err:	if (meta) {
1194		/* Release the meta page. */
1195		if ((t_ret = __memp_fput(mpf,
1196		    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
1197			ret = t_ret;
1198
1199		/* Don't hold the meta page long term. */
1200		if (locked)
1201			if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
1202				ret = t_ret;
1203	}
1204	DB_ASSERT(env, !LOCK_ISSET(metalock));
1205
1206	return ((ret == DB_LOCK_NOTGRANTED && !F_ISSET(env->dbenv,
1207	    DB_ENV_TIME_NOTGRANTED)) ? DB_LOCK_DEADLOCK : ret);
1208}
1209
1210/*
1211 * __qam_consume -- try to reset the head of the queue.
1212 *
1213 */
1214static int
1215__qam_consume(dbc, meta, first)
1216	DBC *dbc;
1217	QMETA *meta;
1218	db_recno_t first;
1219{
1220	DB *dbp;
1221	DB_LOCK lock, save_lock;
1222	DB_MPOOLFILE *mpf;
1223	QUEUE_CURSOR *cp;
1224	db_indx_t save_indx;
1225	db_pgno_t save_page;
1226	db_recno_t current, save_recno;
1227	u_int32_t rec_extent;
1228	int exact, ret, t_ret, wrapped;
1229
1230	dbp = dbc->dbp;
1231	mpf = dbp->mpf;
1232	cp = (QUEUE_CURSOR *)dbc->internal;
1233	ret = 0;
1234
1235	save_page = cp->pgno;
1236	save_indx = cp->indx;
1237	save_recno = cp->recno;
1238	save_lock = cp->lock;
1239
1240	/*
1241	 * If we skipped some deleted records, we need to
1242	 * reposition on the first one.  Get a lock
1243	 * in case someone is trying to put it back.
1244	 */
1245	if (first != cp->recno) {
1246		ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1247		    DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1248		if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) {
1249			ret = 0;
1250			goto done;
1251		}
1252		if (ret != 0)
1253			goto done;
1254		if (cp->page != NULL && (ret =
1255		    __qam_fput(dbc, cp->pgno, cp->page, dbc->priority)) != 0)
1256			goto done;
1257		cp->page = NULL;
1258		if ((ret = __qam_position(dbc,
1259		    &first, 0, &exact)) != 0 || exact != 0) {
1260			(void)__LPUT(dbc, lock);
1261			goto done;
1262		}
1263		if ((ret =__LPUT(dbc, lock)) != 0)
1264			goto done;
1265	}
1266
1267	current = meta->cur_recno;
1268	wrapped = 0;
1269	if (first > current)
1270		wrapped = 1;
1271	rec_extent = meta->page_ext * meta->rec_page;
1272
1273	/* Loop until we find a record or hit current */
1274	for (;;) {
1275		/*
1276		 * Check to see if we are moving off the extent
1277		 * and remove the extent.
1278		 * If we are moving off a page we need to
1279		 * get rid of the buffer.
1280		 * Wait for the lagging readers to move off the
1281		 * page.
1282		 */
1283		if (rec_extent != 0 &&
1284		    ((exact = (first % rec_extent == 0)) ||
1285		    (first % meta->rec_page == 0) ||
1286		    first == UINT32_MAX)) {
1287			if (exact == 1 && (ret = __db_lget(dbc,
1288			    0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
1289				break;
1290#ifdef QDEBUG
1291			if (DBC_LOGGING(dbc))
1292				(void)__log_printf(dbp->env, dbc->txn,
1293				    "Queue R: %x %d %d %d",
1294				    dbc->locker ? dbc->locker->id : 0,
1295				    cp->pgno, first, meta->first_recno);
1296#endif
1297			if (cp->page != NULL && (ret = __qam_fput(dbc,
1298			    cp->pgno, cp->page, DB_PRIORITY_VERY_LOW)) != 0)
1299				break;
1300			cp->page = NULL;
1301
1302			if (exact == 1) {
1303				ret = __qam_fremove(dbp, cp->pgno);
1304				if ((t_ret =
1305				    __LPUT(dbc, cp->lock)) != 0 && ret == 0)
1306					ret = t_ret;
1307			}
1308			if (ret != 0)
1309				break;
1310		} else if (cp->page != NULL && (ret = __qam_fput(dbc,
1311		     cp->pgno, cp->page, dbc->priority)) != 0)
1312			break;
1313		cp->page = NULL;
1314		first++;
1315		if (first == RECNO_OOB) {
1316			wrapped = 0;
1317			first++;
1318		}
1319
1320		/*
1321		 * LOOP EXIT when we come move to the current
1322		 * pointer.
1323		 */
1324		if (!wrapped && first >= current)
1325			break;
1326
1327		ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1328		    DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1329		if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) {
1330			ret = 0;
1331			break;
1332		}
1333		if (ret != 0)
1334			break;
1335
1336		if ((ret = __qam_position(dbc, &first, 0, &exact)) != 0) {
1337			(void)__LPUT(dbc, lock);
1338			break;
1339		}
1340		if ((ret =__LPUT(dbc, lock)) != 0 || exact) {
1341			if ((t_ret = __qam_fput(dbc, cp->pgno,
1342			    cp->page, dbc->priority)) != 0 && ret == 0)
1343				ret = t_ret;
1344			cp->page = NULL;
1345			break;
1346		}
1347	}
1348
1349	cp->pgno = save_page;
1350	cp->indx = save_indx;
1351	cp->recno = save_recno;
1352	cp->lock = save_lock;
1353
1354	/*
1355	 * We have advanced as far as we can.
1356	 * Advance first_recno to this point.
1357	 */
1358	if (ret == 0 && meta->first_recno != first) {
1359		if ((ret = __memp_dirty(mpf,
1360		    &meta, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1361			goto done;
1362#ifdef QDEBUG
1363		if (DBC_LOGGING(dbc))
1364			(void)__log_printf(dbp->env, dbc->txn,
1365			    "Queue M: %x %d %d %d",
1366			    dbc->locker ? dbc->locker->id : 0,
1367			    cp->recno, first, meta->first_recno);
1368#endif
1369		if (DBC_LOGGING(dbc)) {
1370			if ((ret = __qam_incfirst_log(dbp,
1371			    dbc->txn, &meta->dbmeta.lsn, 0,
1372			    cp->recno, PGNO_BASE_MD)) != 0)
1373				goto done;
1374		} else
1375			LSN_NOT_LOGGED(meta->dbmeta.lsn);
1376		meta->first_recno = first;
1377	}
1378
1379done:
1380	return (ret);
1381}
1382
1383static int
1384__qam_bulk(dbc, data, flags)
1385	DBC *dbc;
1386	DBT *data;
1387	u_int32_t flags;
1388{
1389	DB *dbp;
1390	DB_LOCK metalock, rlock;
1391	DB_MPOOLFILE *mpf;
1392	PAGE *pg;
1393	QAMDATA *qp;
1394	QMETA *meta;
1395	QUEUE_CURSOR *cp;
1396	db_indx_t indx;
1397	db_lockmode_t lkmode;
1398	db_pgno_t metapno;
1399	u_int32_t  *endp, *offp;
1400	u_int32_t pagesize, re_len, recs;
1401	u_int8_t *dbuf, *dp, *np;
1402	int exact, ret, t_ret, valid;
1403	int is_key, need_pg, size, space;
1404
1405	dbp = dbc->dbp;
1406	mpf = dbp->mpf;
1407	cp = (QUEUE_CURSOR *)dbc->internal;
1408
1409	lkmode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
1410
1411	pagesize = dbp->pgsize;
1412	re_len = ((QUEUE *)dbp->q_internal)->re_len;
1413	recs = ((QUEUE *)dbp->q_internal)->rec_page;
1414	metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1415
1416	is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1417	size = 0;
1418
1419	if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
1420		return (ret);
1421	if ((ret = __memp_fget(mpf, &metapno,
1422	     dbc->thread_info, dbc->txn, 0, &meta)) != 0) {
1423		/* We did not fetch it, we can release the lock. */
1424		(void)__LPUT(dbc, metalock);
1425		return (ret);
1426	}
1427
1428	dbuf = data->data;
1429	np = dp = dbuf;
1430
1431	/* Keep track of space that is left.  There is an termination entry */
1432	space = (int)data->ulen;
1433	space -= (int)sizeof(*offp);
1434
1435	/* Build the offset/size table from the end up. */
1436	endp = (u_int32_t *)((u_int8_t *)dbuf + data->ulen);
1437	endp--;
1438	offp = endp;
1439	/* Save the lock on the current position of the cursor. */
1440	rlock = cp->lock;
1441	LOCK_INIT(cp->lock);
1442
1443next_pg:
1444	/* Wrap around, skipping zero. */
1445	if (cp->recno == RECNO_OOB)
1446		cp->recno++;
1447	if ((ret = __qam_position(dbc, &cp->recno, 0, &exact)) != 0)
1448		goto done;
1449
1450	pg = cp->page;
1451	indx = cp->indx;
1452	need_pg = 1;
1453
1454	do {
1455		/*
1456		 * If this page is a nonexistent page at the end of an
1457		 * extent, pg may be NULL.  A NULL page has no valid records,
1458		 * so just keep looping as though qp exists and isn't QAM_VALID;
1459		 * calling QAM_GET_RECORD is unsafe.
1460		 */
1461		valid = 0;
1462
1463		if (pg != NULL) {
1464			if ((ret = __db_lget(dbc, LCK_COUPLE,
1465			    cp->recno, lkmode, DB_LOCK_RECORD, &rlock)) != 0)
1466				goto done;
1467			qp = QAM_GET_RECORD(dbp, pg, indx);
1468			if (F_ISSET(qp, QAM_VALID)) {
1469				valid = 1;
1470				space -= (int)
1471				    ((is_key ? 3 : 2) * sizeof(*offp));
1472				if (space < 0)
1473					goto get_space;
1474				if (need_pg) {
1475					dp = np;
1476					size = (int)pagesize - QPAGE_SZ(dbp);
1477					if (space < size) {
1478get_space:
1479						if (offp == endp) {
1480							data->size = (u_int32_t)
1481							    DB_ALIGN((u_int32_t)
1482							    size + pagesize,
1483							    sizeof(u_int32_t));
1484							ret = DB_BUFFER_SMALL;
1485							break;
1486						}
1487						if (indx != 0)
1488							indx--;
1489						cp->recno--;
1490						space = 0;
1491						break;
1492					}
1493					memcpy(dp,
1494					    (u_int8_t *)pg + QPAGE_SZ(dbp),
1495					    (u_int)size);
1496					need_pg = 0;
1497					space -= size;
1498					np += size;
1499				}
1500				if (is_key)
1501					*offp-- = cp->recno;
1502				*offp-- = (u_int32_t)((((u_int8_t *)qp -
1503				    (u_int8_t *)pg) - QPAGE_SZ(dbp)) +
1504				    (dp - dbuf) + SSZA(QAMDATA, data));
1505				*offp-- = re_len;
1506			}
1507		}
1508		if (!valid && is_key == 0) {
1509			*offp-- = 0;
1510			*offp-- = 0;
1511		}
1512		cp->recno++;
1513	} while (++indx < recs && cp->recno != RECNO_OOB &&
1514	    !QAM_AFTER_CURRENT(meta, cp->recno));
1515
1516	if (cp->page != NULL) {
1517		if ((t_ret = __qam_fput(dbc,
1518		    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
1519			ret = t_ret;
1520		cp->page = NULL;
1521	}
1522
1523	if (ret == 0 && space > 0 &&
1524	    (indx >= recs || cp->recno == RECNO_OOB) &&
1525	    !QAM_AFTER_CURRENT(meta, cp->recno))
1526		goto next_pg;
1527
1528	/*
1529	 * Correct recno in two cases:
1530	 * 1) If we just wrapped fetch must start at record 1 not a FIRST.
1531	 * 2) We ran out of space exactly at the end of a page.
1532	 */
1533	if (cp->recno == RECNO_OOB || (space == 0 && indx == recs))
1534		cp->recno--;
1535
1536	if (is_key == 1)
1537		*offp = RECNO_OOB;
1538	else
1539		*offp = (u_int32_t)-1;
1540
1541done:	/* Release the meta page. */
1542	if ((t_ret = __memp_fput(mpf,
1543	     dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
1544		ret = t_ret;
1545	if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
1546		ret = t_ret;
1547
1548	cp->lock = rlock;
1549
1550	return (ret);
1551}
1552
1553/*
1554 * __qamc_close --
1555 *	Close down the cursor from a single use.
1556 */
1557static int
1558__qamc_close(dbc, root_pgno, rmroot)
1559	DBC *dbc;
1560	db_pgno_t root_pgno;
1561	int *rmroot;
1562{
1563	QUEUE_CURSOR *cp;
1564	int ret;
1565
1566	COMPQUIET(root_pgno, 0);
1567	COMPQUIET(rmroot, NULL);
1568
1569	cp = (QUEUE_CURSOR *)dbc->internal;
1570
1571	/* Discard any locks not acquired inside of a transaction. */
1572	ret = __TLPUT(dbc, cp->lock);
1573
1574	LOCK_INIT(cp->lock);
1575	cp->page = NULL;
1576	cp->pgno = PGNO_INVALID;
1577	cp->indx = 0;
1578	cp->lock_mode = DB_LOCK_NG;
1579	cp->recno = RECNO_OOB;
1580	cp->flags = 0;
1581
1582	return (ret);
1583}
1584
1585/*
1586 * __qamc_dup --
1587 *	Duplicate a queue cursor, such that the new one holds appropriate
1588 *	locks for the position of the original.
1589 *
1590 * PUBLIC: int __qamc_dup __P((DBC *, DBC *));
1591 */
1592int
1593__qamc_dup(orig_dbc, new_dbc)
1594	DBC *orig_dbc, *new_dbc;
1595{
1596	QUEUE_CURSOR *orig, *new;
1597
1598	orig = (QUEUE_CURSOR *)orig_dbc->internal;
1599	new = (QUEUE_CURSOR *)new_dbc->internal;
1600
1601	new->recno = orig->recno;
1602
1603	return (0);
1604}
1605
1606/*
1607 * __qamc_init
1608 *
1609 * PUBLIC: int __qamc_init __P((DBC *));
1610 */
1611int
1612__qamc_init(dbc)
1613	DBC *dbc;
1614{
1615	DB *dbp;
1616	QUEUE_CURSOR *cp;
1617	int ret;
1618
1619	dbp = dbc->dbp;
1620
1621	/* Allocate the internal structure. */
1622	cp = (QUEUE_CURSOR *)dbc->internal;
1623	if (cp == NULL) {
1624		if ((ret =
1625		    __os_calloc(dbp->env, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
1626			return (ret);
1627		dbc->internal = (DBC_INTERNAL *)cp;
1628	}
1629
1630	/* Initialize methods. */
1631	dbc->close = dbc->c_close = __dbc_close_pp;
1632	dbc->cmp = __dbc_cmp_pp;
1633	dbc->count = dbc->c_count = __dbc_count_pp;
1634	dbc->del = dbc->c_del = __dbc_del_pp;
1635	dbc->dup = dbc->c_dup = __dbc_dup_pp;
1636	dbc->get = dbc->c_get = __dbc_get_pp;
1637	dbc->pget = dbc->c_pget = __dbc_pget_pp;
1638	dbc->put = dbc->c_put = __dbc_put_pp;
1639	dbc->am_bulk = __qam_bulk;
1640	dbc->am_close = __qamc_close;
1641	dbc->am_del = __qamc_del;
1642	dbc->am_destroy = __qamc_destroy;
1643	dbc->am_get = __qamc_get;
1644	dbc->am_put = __qamc_put;
1645	dbc->am_writelock = NULL;
1646
1647	return (0);
1648}
1649
1650/*
1651 * __qamc_destroy --
1652 *	Close a single cursor -- internal version.
1653 */
1654static int
1655__qamc_destroy(dbc)
1656	DBC *dbc;
1657{
1658	/* Discard the structures. */
1659	__os_free(dbc->env, dbc->internal);
1660
1661	return (0);
1662}
1663
1664/*
1665 * __qam_getno --
1666 *	Check the user's record number.
1667 */
1668static int
1669__qam_getno(dbp, key, rep)
1670	DB *dbp;
1671	const DBT *key;
1672	db_recno_t *rep;
1673{
1674	/* If passed an empty DBT from Java, key->data may be NULL */
1675	if (key->size != sizeof(db_recno_t)) {
1676		__db_errx(dbp->env, "illegal record number size");
1677		return (EINVAL);
1678	}
1679
1680	if ((*rep = *(db_recno_t *)key->data) == 0) {
1681		__db_errx(dbp->env, "illegal record number of 0");
1682		return (EINVAL);
1683	}
1684	return (0);
1685}
1686
1687/*
1688 * __qam_truncate --
1689 *	Truncate a queue database
1690 *
1691 * PUBLIC: int __qam_truncate __P((DBC *, u_int32_t *));
1692 */
1693int
1694__qam_truncate(dbc, countp)
1695	DBC *dbc;
1696	u_int32_t *countp;
1697{
1698	DB *dbp;
1699	DB_LOCK metalock;
1700	DB_MPOOLFILE *mpf;
1701	QMETA *meta;
1702	db_pgno_t metapno;
1703	u_int32_t count;
1704	int ret, t_ret;
1705
1706	dbp = dbc->dbp;
1707
1708	/* Walk the queue, counting rows. */
1709	for (count = 0;
1710	    (ret = __qamc_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0;)
1711		count++;
1712	if (ret != DB_NOTFOUND)
1713		return (ret);
1714
1715	/* Update the meta page. */
1716	metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1717	if ((ret =
1718	    __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
1719		return (ret);
1720
1721	mpf = dbp->mpf;
1722	if ((ret = __memp_fget(mpf, &metapno, dbc->thread_info, dbc->txn,
1723	    DB_MPOOL_DIRTY, &meta)) != 0) {
1724		/* We did not fetch it, we can release the lock. */
1725		(void)__LPUT(dbc, metalock);
1726		return (ret);
1727	}
1728	/* Remove the last extent file. */
1729	if (meta->cur_recno > 1 && ((QUEUE *)dbp->q_internal)->page_ext != 0) {
1730		if ((ret = __qam_fremove(dbp,
1731		    QAM_RECNO_PAGE(dbp, meta->cur_recno - 1))) != 0)
1732			goto err;
1733	}
1734
1735	if (DBC_LOGGING(dbc)) {
1736		ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
1737		    QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
1738		    1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
1739	} else
1740		LSN_NOT_LOGGED(meta->dbmeta.lsn);
1741	if (ret == 0)
1742		meta->first_recno = meta->cur_recno = 1;
1743
1744err:	if ((t_ret = __memp_fput(mpf,
1745	     dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
1746		ret = t_ret;
1747	if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
1748		ret = t_ret;
1749
1750	if (countp != NULL)
1751		*countp = count;
1752
1753	return (ret);
1754}
1755
1756/*
1757 * __qam_delete --
1758 *	Queue fast delete function.
1759 *
1760 * PUBLIC: int __qam_delete __P((DBC *,  DBT *, u_int32_t));
1761 */
1762int
1763__qam_delete(dbc, key, flags)
1764	DBC *dbc;
1765	DBT *key;
1766	u_int32_t flags;
1767{
1768	QUEUE_CURSOR *cp;
1769	int ret;
1770
1771	cp = (QUEUE_CURSOR *)dbc->internal;
1772	if ((ret = __qam_getno(dbc->dbp, key, &cp->recno)) != 0)
1773		goto err;
1774
1775	ret = __qamc_del(dbc, flags);
1776
1777err:	return (ret);
1778}
1779