1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1999,2008 Oracle.  All rights reserved.
5 *
6 * $Id: qam.c,v 12.59 2008/03/13 15:44:50 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/lock.h"
15#include "dbinc/log.h"
16#include "dbinc/mp.h"
17#include "dbinc/qam.h"
18
19static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
20static int __qamc_close __P((DBC *, db_pgno_t, int *));
21static int __qamc_del __P((DBC *));
22static int __qamc_destroy __P((DBC *));
23static int __qamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
24static int __qamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
25static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
26static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
27
28#define	DONT_NEED_LOCKS(dbc) ((dbc)->txn == NULL ||			\
29	F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED))
30
31/*
32 * __qam_position --
33 *	Position a queued access method cursor at a record.  This returns
34 *	the page locked.  *exactp will be set if the record is valid.
35 * PUBLIC: int __qam_position
36 * PUBLIC:      __P((DBC *, db_recno_t *, db_lockmode_t, u_int32_t, int *));
37 */
38int
39__qam_position(dbc, recnop, lock_mode, get_mode, exactp)
40	DBC *dbc;		/* open cursor */
41	db_recno_t *recnop;	/* pointer to recno to find */
42	db_lockmode_t lock_mode;/* locking: read or write */
43	u_int32_t get_mode;	/* flags to __memp_fget */
44	int *exactp;		/* indicate if it was found */
45{
46	DB *dbp;
47	QAMDATA  *qp;
48	QUEUE_CURSOR *cp;
49	db_pgno_t pg;
50	int ret, t_ret;
51
52	dbp = dbc->dbp;
53	cp = (QUEUE_CURSOR *)dbc->internal;
54
55	/* Fetch the page for this recno. */
56	pg = QAM_RECNO_PAGE(dbp, *recnop);
57
58	if ((ret = __db_lget(dbc, 0, pg, lock_mode, 0, &cp->lock)) != 0)
59		return (ret);
60	cp->page = NULL;
61	*exactp = 0;
62	if ((ret = __qam_fget(dbc, &pg, get_mode, &cp->page)) != 0) {
63		if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE) &&
64		    (ret == DB_PAGE_NOTFOUND || ret == ENOENT))
65			ret = 0;
66
67		/* We did not fetch it, we can release the lock. */
68		if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
69			ret = t_ret;
70		return (ret);
71	}
72	cp->pgno = pg;
73	cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
74
75	if (PGNO(cp->page) == 0) {
76		/*
77		 * We have read an uninitialized page: set the page number if
78		 * we're creating the page.  Otherwise, we know that the record
79		 * doesn't exist yet.
80		 */
81		if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE)) {
82			*exactp = 0;
83			return (0);
84		}
85		DB_ASSERT(dbp->env, FLD_ISSET(get_mode, DB_MPOOL_CREATE));
86		PGNO(cp->page) = pg;
87		TYPE(cp->page) = P_QAMDATA;
88	}
89
90	qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
91	*exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
92
93	return (ret);
94}
95
96/*
97 * __qam_pitem --
98 *	Put an item on a queue page.  Copy the data to the page and set the
99 *	VALID and SET bits.  If logging and the record was previously set,
100 *	log that data, otherwise just log the new data.
101 *
102 *   pagep must be write locked
103 *
104 * PUBLIC: int __qam_pitem
105 * PUBLIC:    __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
106 */
107int
108__qam_pitem(dbc, pagep, indx, recno, data)
109	DBC *dbc;
110	QPAGE *pagep;
111	u_int32_t indx;
112	db_recno_t recno;
113	DBT *data;
114{
115	DB *dbp;
116	DBT olddata, pdata, *datap;
117	ENV *env;
118	QAMDATA *qp;
119	QUEUE *t;
120	u_int8_t *dest, *p;
121	int allocated, ret;
122
123	dbp = dbc->dbp;
124	env = dbp->env;
125	t = (QUEUE *)dbp->q_internal;
126	allocated = ret = 0;
127
128	if (data->size > t->re_len)
129		return (__db_rec_toobig(env, data->size, t->re_len));
130	qp = QAM_GET_RECORD(dbp, pagep, indx);
131
132	p = qp->data;
133	datap = data;
134	if (F_ISSET(data, DB_DBT_PARTIAL)) {
135		if (data->doff + data->dlen > t->re_len) {
136			__db_errx(env,
137		"%s: data offset plus length larger than record size of %lu",
138			    "Record length error", (u_long)t->re_len);
139			return (EINVAL);
140		}
141
142		if (data->size != data->dlen)
143			return (__db_rec_repl(env, data->size, data->dlen));
144
145		if (data->size == t->re_len)
146			goto no_partial;
147
148		/*
149		 * If we are logging, then we have to build the record
150		 * first, otherwise, we can simply drop the change
151		 * directly on the page.  After this clause, make
152		 * sure that datap and p are set up correctly so that
153		 * copying datap into p does the right thing.
154		 *
155		 * Note, I am changing this so that if the existing
156		 * record is not valid, we create a complete record
157		 * to log so that both this and the recovery code is simpler.
158		 */
159
160		if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
161			datap = &pdata;
162			memset(datap, 0, sizeof(*datap));
163
164			if ((ret = __os_malloc(env,
165			    t->re_len, &datap->data)) != 0)
166				return (ret);
167			allocated = 1;
168			datap->size = t->re_len;
169
170			/*
171			 * Construct the record if it's valid, otherwise set it
172			 * all to the pad character.
173			 */
174			dest = datap->data;
175			if (F_ISSET(qp, QAM_VALID))
176				memcpy(dest, p, t->re_len);
177			else
178				memset(dest, (int)t->re_pad, t->re_len);
179
180			dest += data->doff;
181			memcpy(dest, data->data, data->size);
182		} else {
183			datap = data;
184			p += data->doff;
185		}
186	}
187
188no_partial:
189	if (DBC_LOGGING(dbc)) {
190		olddata.size = 0;
191		if (F_ISSET(qp, QAM_SET)) {
192			olddata.data = qp->data;
193			olddata.size = t->re_len;
194		}
195		if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
196		    0, &LSN(pagep), pagep->pgno,
197		    indx, recno, datap, qp->flags,
198		    olddata.size == 0 ? NULL : &olddata)) != 0)
199			goto err;
200	} else if (!F_ISSET((dbc), DBC_RECOVER))
201		LSN_NOT_LOGGED(LSN(pagep));
202
203	F_SET(qp, QAM_VALID | QAM_SET);
204	memcpy(p, datap->data, datap->size);
205	if (!F_ISSET(data, DB_DBT_PARTIAL))
206		memset(p + datap->size,
207		    (int)t->re_pad, t->re_len - datap->size);
208
209err:	if (allocated)
210		__os_free(env, datap->data);
211
212	return (ret);
213}
214/*
215 * __qamc_put
216 *	Cursor put for queued access method.
217 *	BEFORE and AFTER cannot be specified.
218 */
219static int
220__qamc_put(dbc, key, data, flags, pgnop)
221	DBC *dbc;
222	DBT *key, *data;
223	u_int32_t flags;
224	db_pgno_t *pgnop;
225{
226	DB *dbp;
227	DB_LOCK lock;
228	DB_MPOOLFILE *mpf;
229	ENV *env;
230	QMETA *meta;
231	QUEUE_CURSOR *cp;
232	db_pgno_t pg;
233	db_recno_t new_cur, new_first;
234	u_int32_t opcode;
235	int exact, ret, t_ret, writelock;
236
237	dbp = dbc->dbp;
238	env = dbp->env;
239	mpf = dbp->mpf;
240	if (pgnop != NULL)
241		*pgnop = PGNO_INVALID;
242
243	cp = (QUEUE_CURSOR *)dbc->internal;
244
245	switch (flags) {
246	case DB_KEYFIRST:
247	case DB_KEYLAST:
248	case DB_NOOVERWRITE:
249		if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
250			return (ret);
251		/* FALLTHROUGH */
252	case DB_CURRENT:
253		break;
254	default:
255		/* The interface shouldn't let anything else through. */
256		return (__db_ferr(env, "DBC->put", 0));
257	}
258
259	/* Write lock the record. */
260	if ((ret = __db_lget(dbc, LCK_COUPLE,
261	    cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
262		return (ret);
263
264	lock = cp->lock;
265
266	if ((ret = __qam_position(dbc, &cp->recno, DB_LOCK_WRITE,
267	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0) {
268		/* We could not get the page, we can release the record lock. */
269		(void)__LPUT(dbc, lock);
270		return (ret);
271	}
272
273	if (exact != 0 && flags == DB_NOOVERWRITE)
274		ret = DB_KEYEXIST;
275	else
276		/* Put the item on the page. */
277		ret = __qam_pitem(dbc,
278		     (QPAGE *)cp->page, cp->indx, cp->recno, data);
279
280	/* Doing record locking, release the page lock */
281	if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
282		ret = t_ret;
283	if ((t_ret = __qam_fput(dbc,
284	    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
285		ret = t_ret;
286	cp->page = NULL;
287	cp->lock = lock;
288	cp->lock_mode = DB_LOCK_WRITE;
289	if (ret != 0)
290		return (ret);
291
292	/* We may need to reset the head or tail of the queue. */
293	pg = ((QUEUE *)dbp->q_internal)->q_meta;
294
295	/*
296	 * Get the meta page first, we don't want to lock it while trying
297	 * to pin it.
298	 */
299	writelock = 0;
300	if ((ret = __memp_fget(mpf, &pg,
301	    dbc->thread_info, dbc->txn, 0, &meta)) != 0)
302		return (ret);
303	if ((ret = __db_lget(dbc, LCK_COUPLE,
304	    pg,  DB_LOCK_READ, 0, &cp->lock)) != 0) {
305		(void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority);
306		return (ret);
307	}
308
309	opcode = 0;
310	new_cur = new_first = 0;
311
312	/*
313	 * If the put address is outside the queue, adjust the head and
314	 * tail of the queue.  If the order is inverted we move
315	 * the one which is closer.  The first case is when the
316	 * queue is empty, move first and current to where the new
317	 * insert is.
318	 */
319
320recheck:
321	if (meta->first_recno == meta->cur_recno) {
322		new_first = cp->recno;
323		new_cur = cp->recno + 1;
324		if (new_cur == RECNO_OOB)
325			new_cur++;
326		opcode |= QAM_SETFIRST;
327		opcode |= QAM_SETCUR;
328	} else {
329		if (QAM_BEFORE_FIRST(meta, cp->recno)) {
330			new_first = cp->recno;
331			opcode |= QAM_SETFIRST;
332		}
333
334		if (QAM_AFTER_CURRENT(meta, cp->recno)) {
335			new_cur = cp->recno + 1;
336			if (new_cur == RECNO_OOB)
337				new_cur++;
338			opcode |= QAM_SETCUR;
339		}
340	}
341
342	if (opcode == 0)
343		goto done;
344
345	/* Drop the read lock and get the a write lock on the meta page. */
346	if (writelock == 0 && (ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
347	     pg,  DB_LOCK_WRITE, 0, &cp->lock)) != 0) {
348		(void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority);
349		return (ret);
350	}
351	if (writelock++ == 0)
352		goto recheck;
353
354	if (((ret = __memp_dirty(mpf, &meta,
355	    dbc->thread_info, dbc->txn, dbc->priority, DB_MPOOL_DIRTY)) != 0 ||
356	    (DBC_LOGGING(dbc) &&
357	    (ret = __qam_mvptr_log(dbp, dbc->txn,
358	    &meta->dbmeta.lsn, 0, opcode, meta->first_recno,
359	    new_first, meta->cur_recno, new_cur,
360	    &meta->dbmeta.lsn, PGNO_BASE_MD)) != 0)))
361		opcode = 0;
362
363	if (opcode & QAM_SETCUR)
364		meta->cur_recno = new_cur;
365	if (opcode & QAM_SETFIRST)
366		meta->first_recno = new_first;
367
368done:	if ((t_ret = __memp_fput(mpf,
369	    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
370		ret = t_ret;
371
372	/* Don't hold the meta page long term. */
373	if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
374		ret = t_ret;
375	return (ret);
376}
377
378/*
379 * __qam_append --
380 *	Perform a put(DB_APPEND) in queue.
381 *
382 * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
383 */
384int
385__qam_append(dbc, key, data)
386	DBC *dbc;
387	DBT *key, *data;
388{
389	DB *dbp;
390	DB_LOCK lock;
391	DB_MPOOLFILE *mpf;
392	QMETA *meta;
393	QPAGE *page;
394	QUEUE *qp;
395	QUEUE_CURSOR *cp;
396	db_pgno_t pg;
397	db_recno_t recno;
398	int ret, t_ret;
399
400	dbp = dbc->dbp;
401	mpf = dbp->mpf;
402	cp = (QUEUE_CURSOR *)dbc->internal;
403
404	pg = ((QUEUE *)dbp->q_internal)->q_meta;
405	/*
406	 * Get the meta page first, we don't want to write lock it while
407	 * trying to pin it.
408	 */
409	if ((ret = __memp_fget(mpf, &pg,
410	    dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0)
411		return (ret);
412	/* Write lock the meta page. */
413	if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0) {
414		(void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority);
415		return (ret);
416	}
417
418	/* Get the next record number. */
419	recno = meta->cur_recno;
420	meta->cur_recno++;
421	if (meta->cur_recno == RECNO_OOB)
422		meta->cur_recno++;
423	if (meta->cur_recno == meta->first_recno) {
424		meta->cur_recno--;
425		if (meta->cur_recno == RECNO_OOB)
426			meta->cur_recno--;
427		ret = __LPUT(dbc, lock);
428
429		if (ret == 0)
430			ret = EFBIG;
431		goto err;
432	}
433
434	if (QAM_BEFORE_FIRST(meta, recno))
435		meta->first_recno = recno;
436
437	/* Lock the record and release meta page lock. */
438	ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
439	    recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock);
440
441	/*
442	 * The application may modify the data based on the selected record
443	 * number.  We always want to call this even if we ultimately end
444	 * up aborting, because we are allocating a record number, regardless.
445	 */
446	if (dbc->dbp->db_append_recno != NULL &&
447	    (t_ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0 &&
448	    ret == 0)
449		ret = t_ret;
450
451	/*
452	 * Capture errors from either the lock couple or the call to
453	 * dbp->db_append_recno.
454	 */
455	if (ret != 0) {
456		(void)__LPUT(dbc, lock);
457		goto err;
458	}
459
460	cp->lock = lock;
461	cp->lock_mode = DB_LOCK_WRITE;
462
463	pg = QAM_RECNO_PAGE(dbp, recno);
464
465	/* Fetch and write lock the data page. */
466	if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
467		goto err;
468	if ((ret = __qam_fget(dbc, &pg,
469	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &page)) != 0) {
470		/* We did not fetch it, we can release the lock. */
471		(void)__LPUT(dbc, lock);
472		goto err;
473	}
474
475	/* See if this is a new page. */
476	if (page->pgno == 0) {
477		page->pgno = pg;
478		page->type = P_QAMDATA;
479	}
480
481	/* Put the item on the page and log it. */
482	ret = __qam_pitem(dbc, page,
483	    QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
484
485	/* Doing record locking, release the page lock */
486	if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
487		ret = t_ret;
488
489	if ((t_ret = __qam_fput(dbc,
490	    pg, page, dbc->priority)) != 0 && ret == 0)
491		ret = t_ret;
492
493	/* Return the record number to the user. */
494	if (ret == 0 && key != NULL)
495		ret = __db_retcopy(dbp->env, key,
496		    &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
497
498	/* Position the cursor on this record. */
499	cp->recno = recno;
500
501	/* See if we are leaving the extent. */
502	qp = (QUEUE *) dbp->q_internal;
503	if (qp->page_ext != 0 &&
504	    (recno % (qp->page_ext * qp->rec_page) == 0 ||
505	    recno == UINT32_MAX)) {
506		if ((ret = __db_lget(dbc,
507		    0, ((QUEUE *)dbp->q_internal)->q_meta,
508		    DB_LOCK_WRITE, 0, &lock)) != 0)
509			goto err;
510		if (!QAM_AFTER_CURRENT(meta, recno))
511			ret = __qam_fclose(dbp, pg);
512		if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
513			ret = t_ret;
514	}
515
516err:	/* Release the meta page. */
517	if ((t_ret = __memp_fput(mpf,
518	    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
519		ret = t_ret;
520
521	return (ret);
522}
523
524/*
525 * __qamc_del --
526 *	Qam cursor->am_del function
527 */
528static int
529__qamc_del(dbc)
530	DBC *dbc;
531{
532	DB *dbp;
533	DBT data;
534	DB_LOCK lock, metalock;
535	DB_MPOOLFILE *mpf;
536	PAGE *pagep;
537	QAMDATA *qp;
538	QMETA *meta;
539	QUEUE_CURSOR *cp;
540	db_pgno_t pg;
541	int exact, ret, t_ret;
542
543	dbp = dbc->dbp;
544	mpf = dbp->mpf;
545	cp = (QUEUE_CURSOR *)dbc->internal;
546	LOCK_INIT(lock);
547
548	pg = ((QUEUE *)dbp->q_internal)->q_meta;
549	/*
550	 * Get the meta page first, we don't want to write lock it while
551	 * trying to pin it.
552	 */
553	if ((ret = __memp_fget(mpf, &pg,
554	    dbc->thread_info, dbc->txn, 0, &meta)) != 0)
555		return (ret);
556	/* Write lock the meta page. */
557	if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &metalock)) != 0) {
558		(void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority);
559		return (ret);
560	}
561
562	if (QAM_NOT_VALID(meta, cp->recno))
563		ret = DB_NOTFOUND;
564
565	/* Don't hold the meta page long term. */
566	if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
567		ret = t_ret;
568
569	if (ret != 0)
570		goto err;
571
572	if ((ret = __db_lget(dbc, LCK_COUPLE,
573	    cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0)
574		goto err;
575	cp->lock_mode = DB_LOCK_WRITE;
576	lock = cp->lock;
577
578	/* Find the record ; delete only deletes exact matches. */
579	if ((ret = __qam_position(dbc, &cp->recno, DB_LOCK_WRITE,
580	    DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0)
581		goto err;
582
583	if (!exact) {
584		ret = DB_NOTFOUND;
585		goto err;
586	}
587
588	pagep = cp->page;
589	qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
590
591	if (DBC_LOGGING(dbc)) {
592		if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
593		    ((QUEUE *)dbp->q_internal)->re_len == 0) {
594			if ((ret = __qam_del_log(dbp,
595			    dbc->txn, &LSN(pagep), 0, &LSN(pagep),
596			    pagep->pgno, cp->indx, cp->recno)) != 0)
597				goto err;
598		} else {
599			data.size = ((QUEUE *)dbp->q_internal)->re_len;
600			data.data = qp->data;
601			if ((ret = __qam_delext_log(dbp,
602			    dbc->txn, &LSN(pagep), 0, &LSN(pagep),
603			    pagep->pgno, cp->indx, cp->recno, &data)) != 0)
604				goto err;
605		}
606	} else
607		LSN_NOT_LOGGED(LSN(pagep));
608
609	F_CLR(qp, QAM_VALID);
610
611	/*
612	 * Peek at the first_recno before locking the meta page.
613	 * Other threads cannot move first_recno past
614	 * our position while we have the record locked.
615	 * If it's pointing at the deleted record then lock
616	 * the metapage and check again as lower numbered
617	 * record may have been inserted.
618	 */
619	if (cp->recno == meta->first_recno) {
620		pg = ((QUEUE *)dbp->q_internal)->q_meta;
621		if ((ret =
622		    __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &metalock)) != 0)
623			goto err;
624		if (cp->recno == meta->first_recno)
625			ret = __qam_consume(dbc, meta, meta->first_recno);
626		if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
627			ret = t_ret;
628	}
629
630err:	if ((t_ret = __memp_fput(mpf, dbc->thread_info,
631	    meta, dbc->priority)) != 0 && ret == 0)
632		ret = t_ret;
633	if (cp->page != NULL &&
634	    (t_ret = __qam_fput(dbc,
635	    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
636		ret = t_ret;
637	cp->page = NULL;
638
639	/* Doing record locking, release the page lock */
640	if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
641		ret = t_ret;
642	cp->lock = lock;
643
644	return (ret);
645}
646
647#ifdef	DEBUG_WOP
648#define	QDEBUG
649#endif
650
651/*
652 * __qamc_get --
653 *	Queue DBC->get function.
654 */
655static int
656__qamc_get(dbc, key, data, flags, pgnop)
657	DBC *dbc;
658	DBT *key, *data;
659	u_int32_t flags;
660	db_pgno_t *pgnop;
661{
662	DB *dbp;
663	DBC *dbcdup;
664	DBT tmp;
665	DB_LOCK lock, pglock, metalock;
666	DB_MPOOLFILE *mpf;
667	ENV *env;
668	PAGE *pg;
669	QAMDATA *qp;
670	QMETA *meta;
671	QUEUE *t;
672	QUEUE_CURSOR *cp;
673	db_lockmode_t lock_mode, meta_mode;
674	db_pgno_t metapno;
675	db_recno_t first;
676	int exact, inorder, is_first, locked, ret, t_ret, wait, with_delete;
677	int retrying;
678
679	dbp = dbc->dbp;
680	env = dbp->env;
681	mpf = dbp->mpf;
682	cp = (QUEUE_CURSOR *)dbc->internal;
683	LOCK_INIT(lock);
684	LOCK_INIT(pglock);
685
686	lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
687	meta_mode = DB_LOCK_READ;
688	meta = NULL;
689	*pgnop = 0;
690	pg = NULL;
691	retrying = t_ret = wait = with_delete = 0;
692
693	if (flags == DB_CONSUME_WAIT) {
694		wait = 1;
695		flags = DB_CONSUME;
696	}
697	if (flags == DB_CONSUME) {
698		with_delete = 1;
699		flags = DB_FIRST;
700		meta_mode = lock_mode = DB_LOCK_WRITE;
701	}
702	inorder = F_ISSET(dbp, DB_AM_INORDER) && with_delete;
703
704	DEBUG_LREAD(dbc, dbc->txn, "qamc_get",
705	    flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
706
707	/* Make lint and friends happy. */
708	locked = 0;
709
710	is_first = 0;
711	first = 0;
712
713	t = (QUEUE *)dbp->q_internal;
714	metapno = t->q_meta;
715	LOCK_INIT(metalock);
716
717	/*
718	 * Get the meta page first, we don't want to write lock it while
719	 * trying to pin it.  This is because someone my have it pinned
720	 * but not locked.
721	 */
722	if ((ret = __memp_fget(mpf, &metapno,
723	     dbc->thread_info, dbc->txn, 0, &meta)) != 0)
724		return (ret);
725
726get_next:
727	switch (flags) {
728	case DB_NEXT:
729	case DB_NEXT_NODUP:
730	case DB_FIRST:
731	case DB_PREV:
732	case DB_PREV_NODUP:
733	case DB_LAST:
734		if ((ret = __db_lget(dbc,
735		    0, metapno, meta_mode, 0, &metalock)) != 0)
736			goto err;
737		locked = 1;
738		break;
739	default:
740		break;
741	}
742
743	/* Release any previous lock if not in a transaction. */
744	if ((ret = __TLPUT(dbc, cp->lock)) != 0)
745		goto err;
746
747retry:	/* Update the record number. */
748	switch (flags) {
749	case DB_CURRENT:
750		break;
751	case DB_NEXT_DUP:
752	case DB_PREV_DUP:
753		ret = DB_NOTFOUND;
754		goto err;
755		/* NOTREACHED */
756	case DB_NEXT:
757	case DB_NEXT_NODUP:
758		if (cp->recno != RECNO_OOB) {
759			++cp->recno;
760			/* Wrap around, skipping zero. */
761			if (cp->recno == RECNO_OOB)
762				cp->recno++;
763			/*
764			 * Check to see if we are out of data.
765			 */
766			if (QAM_AFTER_CURRENT(meta, cp->recno)) {
767				pg = NULL;
768				if (!wait) {
769					ret = DB_NOTFOUND;
770					goto err;
771				}
772				flags = DB_FIRST;
773				/*
774				 * If first is not set, then we skipped
775				 * a locked record, go back and find it.
776				 * If we find a locked record again
777				 * wait for it.
778				 */
779				if (first == 0) {
780					retrying = 1;
781					goto retry;
782				}
783
784				if (CDB_LOCKING(env)) {
785					/* Drop the metapage before we wait. */
786					ret = __memp_fput(mpf, dbc->thread_info,
787					    meta, dbc->priority);
788					meta = NULL;
789					if (ret != 0)
790						goto err;
791					if ((ret = __lock_get(
792					    env, dbc->locker,
793					    DB_LOCK_SWITCH, &dbc->lock_dbt,
794					    DB_LOCK_WAIT, &dbc->mylock)) != 0)
795						goto err;
796
797					if ((ret = __memp_fget(mpf, &metapno,
798					    dbc->thread_info,
799					    dbc->txn, 0, &meta)) != 0)
800						goto err;
801					if ((ret = __lock_get(
802					    env, dbc->locker,
803					    DB_LOCK_UPGRADE, &dbc->lock_dbt,
804					    DB_LOCK_WRITE, &dbc->mylock)) != 0)
805						goto err;
806					goto retry;
807				}
808				/*
809				 * Wait for someone to update the meta page.
810				 * This will probably mean there is something
811				 * in the queue.  We then go back up and
812				 * try again.
813				 */
814				if (locked == 0) {
815					if ((ret = __db_lget(dbc, 0, metapno,
816					    meta_mode, 0, &metalock)) != 0)
817						goto err;
818					locked = 1;
819					if (cp->recno != RECNO_OOB &&
820					    !QAM_AFTER_CURRENT(meta, cp->recno))
821						goto retry;
822				}
823				/* Drop the metapage before we wait. */
824				ret = __memp_fput(mpf,
825				     dbc->thread_info, meta, dbc->priority);
826				meta = NULL;
827				if (ret != 0)
828					goto err;
829				if ((ret = __db_lget(dbc,
830				    0, metapno, DB_LOCK_WAIT,
831				    DB_LOCK_SWITCH, &metalock)) != 0) {
832					if (ret == DB_LOCK_DEADLOCK)
833						ret = DB_LOCK_NOTGRANTED;
834					goto err;
835				}
836				if ((ret = __memp_fget(mpf,
837				     &metapno, dbc->thread_info, dbc->txn,
838				    0, &meta)) != 0)
839					goto err;
840				if ((ret = __db_lget(dbc, 0,
841				    PGNO_INVALID, DB_LOCK_WRITE,
842				    DB_LOCK_UPGRADE, &metalock)) != 0) {
843					if (ret == DB_LOCK_DEADLOCK)
844						ret = DB_LOCK_NOTGRANTED;
845					goto err;
846				}
847				locked = 1;
848				goto retry;
849			}
850			break;
851		}
852		/* FALLTHROUGH */
853	case DB_FIRST:
854		flags = DB_NEXT;
855		is_first = 1;
856
857		/* get the first record number */
858		cp->recno = first = meta->first_recno;
859
860		break;
861	case DB_PREV:
862	case DB_PREV_NODUP:
863		if (cp->recno != RECNO_OOB) {
864			if (cp->recno == meta->first_recno ||
865			   QAM_BEFORE_FIRST(meta, cp->recno)) {
866				ret = DB_NOTFOUND;
867				goto err;
868			}
869			--cp->recno;
870			/* Wrap around, skipping zero. */
871			if (cp->recno == RECNO_OOB)
872				--cp->recno;
873			break;
874		}
875		/* FALLTHROUGH */
876	case DB_LAST:
877		if (meta->first_recno == meta->cur_recno) {
878			ret = DB_NOTFOUND;
879			goto err;
880		}
881		cp->recno = meta->cur_recno - 1;
882		if (cp->recno == RECNO_OOB)
883			cp->recno--;
884		break;
885	case DB_SET:
886	case DB_SET_RANGE:
887	case DB_GET_BOTH:
888	case DB_GET_BOTH_RANGE:
889		if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
890			goto err;
891		break;
892	default:
893		ret = __db_unknown_flag(env, "__qamc_get", flags);
894		goto err;
895	}
896
897	/* Don't hold the meta page long term. */
898	if (locked) {
899		if ((ret = __LPUT(dbc, metalock)) != 0)
900			goto err;
901		locked = 0;
902	}
903
904	/* Lock the record. */
905	if (((ret = __db_lget(dbc, LCK_COUPLE, cp->recno, lock_mode,
906	    (with_delete && !inorder && !retrying) ?
907	    DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
908	    &lock)) == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) &&
909	    with_delete) {
910#ifdef QDEBUG
911		if (DBC_LOGGING(dbc))
912			(void)__log_printf(env,
913			    dbc->txn, "Queue S: %x %d %d %d",
914			    dbc->locker ? dbc->locker->id : 0,
915			    cp->recno, first, meta->first_recno);
916#endif
917		first = 0;
918		if ((ret =
919		    __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
920			goto err;
921		locked = 1;
922		goto retry;
923	}
924
925	if (ret != 0)
926		goto err;
927
928	/*
929	 * In the DB_FIRST or DB_LAST cases we must wait and then start over
930	 * since the first/last may have moved while we slept.  If we are
931	 * reading in order and the first record was not there, we can skip it
932	 * as it must have been aborted was was skipped by a non-queue insert
933	 * or we could not have gotten its lock.  If we have the wrong
934	 * record we release our locks and try again.
935	 */
936	switch (flags) {
937	default:
938		if (inorder) {
939			if (first != cp->recno)
940				break;
941		} else if (with_delete || !is_first)
942			break;
943		/* FALLTHROUGH */
944	case DB_SET:
945	case DB_SET_RANGE:
946	case DB_GET_BOTH:
947	case DB_GET_BOTH_RANGE:
948	case DB_LAST:
949		if ((ret =
950		    __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
951			goto lerr;
952		locked = 1;
953		if ((is_first && cp->recno != meta->first_recno) ||
954		    (flags == DB_LAST && cp->recno != meta->cur_recno - 1)) {
955			if ((ret = __LPUT(dbc, lock)) != 0)
956				goto err;
957			if (is_first)
958				flags = DB_FIRST;
959			goto retry;
960		} else if (!is_first && flags != DB_LAST) {
961			if (QAM_BEFORE_FIRST(meta, cp->recno)) {
962				if (flags == DB_SET_RANGE ||
963				    flags == DB_GET_BOTH_RANGE) {
964					if ((ret = __LPUT(dbc, metalock)) != 0)
965						goto err;
966					locked = 0;
967					cp->lock = lock;
968					LOCK_INIT(lock);
969					goto release_retry;
970				}
971				ret = DB_NOTFOUND;
972				goto lerr;
973			}
974			if (QAM_AFTER_CURRENT(meta, cp->recno)) {
975				ret = DB_NOTFOUND;
976				goto lerr;
977			}
978		}
979		/* Don't hold the meta page long term. */
980		if ((ret = __LPUT(dbc, metalock)) != 0)
981			goto err;
982		locked = 0;
983	}
984
985	/* Position the cursor on the record. */
986	if ((ret = __qam_position(dbc, &cp->recno,
987	    lock_mode, 0, &exact)) != 0) {
988		/* We cannot get the page, release the record lock. */
989		(void)__LPUT(dbc, lock);
990		goto err;
991	}
992
993	pg = cp->page;
994	pglock = cp->lock;
995	cp->lock = lock;
996	cp->lock_mode = lock_mode;
997	LOCK_INIT(lock);
998
999	if (!exact) {
1000release_retry:	/* Release locks and retry, if possible. */
1001		if (pg != NULL)
1002			(void)__qam_fput(dbc, cp->pgno, pg, dbc->priority);
1003		cp->page = pg = NULL;
1004		if ((ret = __LPUT(dbc, pglock)) != 0)
1005			goto err1;
1006		if (with_delete) {
1007			if ((ret = __LPUT(dbc, cp->lock)) != 0)
1008				goto err1;
1009		} else if ((ret = __TLPUT(dbc, cp->lock)) != 0)
1010			goto err1;
1011
1012		/*
1013		 * If we don't need locks and we are out of range
1014		 * then we can just skip to the FIRST/LAST record
1015		 * otherwise we must iterate to lock the records
1016		 * and get serializability.
1017		 */
1018		switch (flags) {
1019		case DB_NEXT:
1020		case DB_NEXT_NODUP:
1021			if (!with_delete)
1022				is_first = 0;
1023			if (QAM_BEFORE_FIRST(meta, cp->recno) &&
1024			    DONT_NEED_LOCKS(dbc))
1025				flags = DB_FIRST;
1026			break;
1027		case DB_LAST:
1028		case DB_PREV:
1029		case DB_PREV_NODUP:
1030			if (QAM_AFTER_CURRENT(meta, cp->recno) &&
1031			    DONT_NEED_LOCKS(dbc))
1032				flags = DB_LAST;
1033			else
1034				flags = DB_PREV;
1035			break;
1036
1037		case DB_GET_BOTH_RANGE:
1038		case DB_SET_RANGE:
1039			if (QAM_BEFORE_FIRST(meta, cp->recno) &&
1040			    DONT_NEED_LOCKS(dbc))
1041				flags = DB_FIRST;
1042			else
1043				flags = DB_NEXT;
1044			break;
1045
1046		default:
1047			/* this is for the SET and GET_BOTH cases */
1048			ret = DB_KEYEMPTY;
1049			goto err1;
1050		}
1051		retrying = 0;
1052		goto get_next;
1053	}
1054
1055	qp = QAM_GET_RECORD(dbp, pg, cp->indx);
1056
1057	/* Return the data item. */
1058	if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
1059		/*
1060		 * Need to compare
1061		 */
1062		tmp.data = qp->data;
1063		tmp.size = t->re_len;
1064		if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
1065			if (flags == DB_GET_BOTH_RANGE)
1066				goto release_retry;
1067			ret = DB_NOTFOUND;
1068			goto err1;
1069		}
1070	}
1071
1072	/* Return the key if the user didn't give us one. */
1073	if (key != NULL && !F_ISSET(key, DB_DBT_ISSET)) {
1074		if ((ret = __db_retcopy(dbp->env,
1075		    key, &cp->recno, sizeof(cp->recno),
1076		    &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
1077			goto err1;
1078		F_SET(key, DB_DBT_ISSET);
1079	}
1080
1081	if (data != NULL &&
1082	    !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
1083	    !F_ISSET(data, DB_DBT_ISSET)) {
1084		if ((ret = __db_retcopy(dbp->env, data, qp->data, t->re_len,
1085		    &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
1086			goto err1;
1087		F_SET(data, DB_DBT_ISSET);
1088	}
1089
1090	/* Finally, if we are doing DB_CONSUME mark the record. */
1091	if (with_delete) {
1092		/*
1093		 * Assert that we're not a secondary index.  Doing a DB_CONSUME
1094		 * on a secondary makes very little sense, since one can't
1095		 * DB_APPEND there;  attempting one should be forbidden by
1096		 * the interface.
1097		 */
1098		DB_ASSERT(env, !F_ISSET(dbp, DB_AM_SECONDARY));
1099
1100		if ((ret = __qam_dirty(dbc,
1101		     cp->pgno, &cp->page, dbc->priority)) != 0)
1102			goto err1;
1103		pg = cp->page;
1104
1105		/*
1106		 * Check and see if we *have* any secondary indices.
1107		 * If we do, we're a primary, so call __dbc_del_primary
1108		 * to delete the references to the item we're about to
1109		 * delete.
1110		 *
1111		 * Note that we work on a duplicated cursor, since the
1112		 * __db_ret work has already been done, so it's not safe
1113		 * to perform any additional ops on this cursor.
1114		 */
1115		if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
1116			if ((ret = __dbc_idup(dbc,
1117			    &dbcdup, DB_POSITION)) != 0)
1118				goto err1;
1119
1120			if ((ret = __dbc_del_primary(dbcdup)) != 0) {
1121				/*
1122				 * The __dbc_del_primary return is more
1123				 * interesting.
1124				 */
1125				(void)__dbc_close(dbcdup);
1126				goto err1;
1127			}
1128
1129			if ((ret = __dbc_close(dbcdup)) != 0)
1130				goto err1;
1131		}
1132
1133		if (DBC_LOGGING(dbc)) {
1134			if (t->page_ext == 0 || t->re_len == 0) {
1135				if ((ret = __qam_del_log(dbp, dbc->txn,
1136				    &LSN(pg), 0, &LSN(pg),
1137				    pg->pgno, cp->indx, cp->recno)) != 0)
1138					goto err1;
1139			} else {
1140				tmp.data = qp->data;
1141				tmp.size = t->re_len;
1142				if ((ret = __qam_delext_log(dbp,
1143				   dbc->txn, &LSN(pg), 0, &LSN(pg),
1144				   pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
1145					goto err1;
1146			}
1147		} else
1148			LSN_NOT_LOGGED(LSN(pg));
1149
1150		F_CLR(qp, QAM_VALID);
1151
1152		if ((ret = __LPUT(dbc, pglock)) != 0)
1153			goto err1;
1154
1155		/*
1156		 * Now we need to update the metapage
1157		 * first pointer. If we have deleted
1158		 * the record that is pointed to by
1159		 * first_recno then we move it as far
1160		 * forward as we can without blocking.
1161		 * The metapage lock must be held for
1162		 * the whole scan otherwise someone could
1163		 * do a random insert behind where we are
1164		 * looking.
1165		 */
1166
1167		if (locked == 0 && (ret = __db_lget(
1168		    dbc, 0, metapno, meta_mode, 0, &metalock)) != 0)
1169			goto err1;
1170		locked = 1;
1171
1172#ifdef QDEBUG
1173		if (DBC_LOGGING(dbc))
1174			(void)__log_printf(env,
1175			    dbc->txn, "Queue D: %x %d %d %d",
1176			    dbc->locker ? dbc->locker->id : 0,
1177			    cp->recno, first, meta->first_recno);
1178#endif
1179		/*
1180		 * See if we deleted the "first" record.  If
1181		 * first is zero then we skipped something,
1182		 * see if first_recno has been move passed
1183		 * that to the record that we deleted.
1184		 */
1185		if (first == 0)
1186			first = cp->recno;
1187		if (first != meta->first_recno)
1188			goto done;
1189
1190		if ((ret = __qam_consume(dbc, meta, first)) != 0)
1191			goto err1;
1192	}
1193
1194done:
1195err1:	if (cp->page != NULL) {
1196		if ((t_ret = __qam_fput(dbc,
1197		    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
1198			ret = t_ret;
1199
1200		/* Doing record locking, release the page lock */
1201		if ((t_ret = __LPUT(dbc, pglock)) != 0 && ret == 0)
1202			ret = t_ret;
1203		cp->page = NULL;
1204	}
1205	if (0) {
1206lerr:		(void)__LPUT(dbc, lock);
1207	}
1208
1209err:	if (meta) {
1210		/* Release the meta page. */
1211		if ((t_ret = __memp_fput(mpf,
1212		    dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
1213			ret = t_ret;
1214
1215		/* Don't hold the meta page long term. */
1216		if (locked)
1217			if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
1218				ret = t_ret;
1219	}
1220	DB_ASSERT(env, !LOCK_ISSET(metalock));
1221
1222	return ((ret == DB_LOCK_NOTGRANTED && !F_ISSET(env->dbenv,
1223	    DB_ENV_TIME_NOTGRANTED)) ? DB_LOCK_DEADLOCK : ret);
1224}
1225
1226/*
1227 * __qam_consume -- try to reset the head of the queue.
1228 *
1229 */
1230static int
1231__qam_consume(dbc, meta, first)
1232	DBC *dbc;
1233	QMETA *meta;
1234	db_recno_t first;
1235{
1236	DB *dbp;
1237	DB_LOCK lock, save_lock;
1238	DB_MPOOLFILE *mpf;
1239	QUEUE_CURSOR *cp;
1240	db_indx_t save_indx;
1241	db_pgno_t save_page;
1242	db_recno_t current, save_recno;
1243	u_int32_t rec_extent;
1244	int exact, ret, t_ret, wrapped;
1245
1246	dbp = dbc->dbp;
1247	mpf = dbp->mpf;
1248	cp = (QUEUE_CURSOR *)dbc->internal;
1249	ret = 0;
1250
1251	save_page = cp->pgno;
1252	save_indx = cp->indx;
1253	save_recno = cp->recno;
1254	save_lock = cp->lock;
1255
1256	/*
1257	 * If we skipped some deleted records, we need to
1258	 * reposition on the first one.  Get a lock
1259	 * in case someone is trying to put it back.
1260	 */
1261	if (first != cp->recno) {
1262		ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1263		    DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1264		if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) {
1265			ret = 0;
1266			goto done;
1267		}
1268		if (ret != 0)
1269			goto done;
1270		if ((ret =
1271		    __qam_fput(dbc, cp->pgno, cp->page, dbc->priority)) != 0)
1272			goto done;
1273		cp->page = NULL;
1274		if ((ret = __qam_position(dbc,
1275		    &first, DB_LOCK_READ, 0, &exact)) != 0 || exact != 0) {
1276			(void)__LPUT(dbc, lock);
1277			goto done;
1278		}
1279		if ((ret =__LPUT(dbc, lock)) != 0)
1280			goto done;
1281		if ((ret = __LPUT(dbc, cp->lock)) != 0)
1282			goto done;
1283	}
1284
1285	current = meta->cur_recno;
1286	wrapped = 0;
1287	if (first > current)
1288		wrapped = 1;
1289	rec_extent = meta->page_ext * meta->rec_page;
1290
1291	/* Loop until we find a record or hit current */
1292	for (;;) {
1293		/*
1294		 * Check to see if we are moving off the extent
1295		 * and remove the extent.
1296		 * If we are moving off a page we need to
1297		 * get rid of the buffer.
1298		 * Wait for the lagging readers to move off the
1299		 * page.
1300		 */
1301		if (cp->page != NULL && rec_extent != 0 &&
1302		    ((exact = (first % rec_extent == 0)) ||
1303		    (first % meta->rec_page == 0) ||
1304		    first == UINT32_MAX)) {
1305			if (exact == 1 && (ret = __db_lget(dbc,
1306			    0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
1307				break;
1308#ifdef QDEBUG
1309			if (DBC_LOGGING(dbc))
1310				(void)__log_printf(dbp->env, dbc->txn,
1311				    "Queue R: %x %d %d %d",
1312				    dbc->locker ? dbc->locker->id : 0,
1313				    cp->pgno, first, meta->first_recno);
1314#endif
1315			if ((ret = __qam_fput(dbc,
1316			    cp->pgno, cp->page, DB_PRIORITY_VERY_LOW)) != 0)
1317				break;
1318			cp->page = NULL;
1319
1320			if (exact == 1) {
1321				ret = __qam_fremove(dbp, cp->pgno);
1322				if ((t_ret =
1323				    __LPUT(dbc, cp->lock)) != 0 && ret == 0)
1324					ret = t_ret;
1325			}
1326			if (ret != 0)
1327				break;
1328		} else if (cp->page != NULL && (ret = __qam_fput(dbc,
1329		     cp->pgno, cp->page, dbc->priority)) != 0)
1330			break;
1331		cp->page = NULL;
1332		first++;
1333		if (first == RECNO_OOB) {
1334			wrapped = 0;
1335			first++;
1336		}
1337
1338		/*
1339		 * LOOP EXIT when we come move to the current
1340		 * pointer.
1341		 */
1342		if (!wrapped && first >= current)
1343			break;
1344
1345		ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1346		    DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1347		if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) {
1348			ret = 0;
1349			break;
1350		}
1351		if (ret != 0)
1352			break;
1353
1354		if ((ret = __qam_position(dbc,
1355		    &first, DB_LOCK_READ, 0, &exact)) != 0) {
1356			(void)__LPUT(dbc, lock);
1357			break;
1358		}
1359		if ((ret =__LPUT(dbc, lock)) != 0 ||
1360		    (ret = __LPUT(dbc, cp->lock)) != 0 || exact) {
1361			if ((t_ret = __qam_fput(dbc, cp->pgno,
1362			    cp->page, dbc->priority)) != 0 && ret == 0)
1363				ret = t_ret;
1364			cp->page = NULL;
1365			break;
1366		}
1367	}
1368
1369	cp->pgno = save_page;
1370	cp->indx = save_indx;
1371	cp->recno = save_recno;
1372	cp->lock = save_lock;
1373
1374	/*
1375	 * We have advanced as far as we can.
1376	 * Advance first_recno to this point.
1377	 */
1378	if (ret == 0 && meta->first_recno != first) {
1379		if ((ret = __memp_dirty(mpf,
1380		    &meta, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0)
1381			goto done;
1382#ifdef QDEBUG
1383		if (DBC_LOGGING(dbc))
1384			(void)__log_printf(dbp->env, dbc->txn,
1385			    "Queue M: %x %d %d %d",
1386			    dbc->locker ? dbc->locker->id : 0,
1387			    cp->recno, first, meta->first_recno);
1388#endif
1389		if (DBC_LOGGING(dbc)) {
1390			if ((ret = __qam_incfirst_log(dbp,
1391			    dbc->txn, &meta->dbmeta.lsn, 0,
1392			    cp->recno, PGNO_BASE_MD)) != 0)
1393				goto done;
1394		} else
1395			LSN_NOT_LOGGED(meta->dbmeta.lsn);
1396		meta->first_recno = first;
1397	}
1398
1399done:
1400	return (ret);
1401}
1402
1403static int
1404__qam_bulk(dbc, data, flags)
1405	DBC *dbc;
1406	DBT *data;
1407	u_int32_t flags;
1408{
1409	DB *dbp;
1410	DB_LOCK metalock, rlock;
1411	DB_MPOOLFILE *mpf;
1412	PAGE *pg;
1413	QAMDATA *qp;
1414	QMETA *meta;
1415	QUEUE_CURSOR *cp;
1416	db_indx_t indx;
1417	db_lockmode_t lkmode;
1418	db_pgno_t metapno;
1419	u_int32_t  *endp, *offp;
1420	u_int32_t pagesize, re_len, recs;
1421	u_int8_t *dbuf, *dp, *np;
1422	int exact, ret, t_ret, valid;
1423	int is_key, need_pg, size, space;
1424
1425	dbp = dbc->dbp;
1426	mpf = dbp->mpf;
1427	cp = (QUEUE_CURSOR *)dbc->internal;
1428
1429	lkmode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
1430
1431	pagesize = dbp->pgsize;
1432	re_len = ((QUEUE *)dbp->q_internal)->re_len;
1433	recs = ((QUEUE *)dbp->q_internal)->rec_page;
1434	metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1435
1436	is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1437	size = 0;
1438
1439	if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
1440		return (ret);
1441	if ((ret = __memp_fget(mpf, &metapno,
1442	     dbc->thread_info, dbc->txn, 0, &meta)) != 0) {
1443		/* We did not fetch it, we can release the lock. */
1444		(void)__LPUT(dbc, metalock);
1445		return (ret);
1446	}
1447
1448	dbuf = data->data;
1449	np = dp = dbuf;
1450
1451	/* Keep track of space that is left.  There is an termination entry */
1452	space = (int)data->ulen;
1453	space -= (int)sizeof(*offp);
1454
1455	/* Build the offset/size table from the end up. */
1456	endp = (u_int32_t *)((u_int8_t *)dbuf + data->ulen);
1457	endp--;
1458	offp = endp;
1459	/* Save the lock on the current position of the cursor. */
1460	rlock = cp->lock;
1461	LOCK_INIT(cp->lock);
1462
1463next_pg:
1464	/* Wrap around, skipping zero. */
1465	if (cp->recno == RECNO_OOB)
1466		cp->recno++;
1467	if ((ret = __qam_position(dbc, &cp->recno, lkmode, 0, &exact)) != 0)
1468		goto done;
1469
1470	pg = cp->page;
1471	indx = cp->indx;
1472	need_pg = 1;
1473
1474	do {
1475		/*
1476		 * If this page is a nonexistent page at the end of an
1477		 * extent, pg may be NULL.  A NULL page has no valid records,
1478		 * so just keep looping as though qp exists and isn't QAM_VALID;
1479		 * calling QAM_GET_RECORD is unsafe.
1480		 */
1481		valid = 0;
1482
1483		if (pg != NULL) {
1484			if ((ret = __db_lget(dbc, LCK_COUPLE,
1485			    cp->recno, lkmode, DB_LOCK_RECORD, &rlock)) != 0)
1486				goto done;
1487			qp = QAM_GET_RECORD(dbp, pg, indx);
1488			if (F_ISSET(qp, QAM_VALID)) {
1489				valid = 1;
1490				space -= (int)
1491				    ((is_key ? 3 : 2) * sizeof(*offp));
1492				if (space < 0)
1493					goto get_space;
1494				if (need_pg) {
1495					dp = np;
1496					size = (int)pagesize - QPAGE_SZ(dbp);
1497					if (space < size) {
1498get_space:
1499						if (offp == endp) {
1500							data->size = (u_int32_t)
1501							    DB_ALIGN((u_int32_t)
1502							    size + pagesize,
1503							    sizeof(u_int32_t));
1504							ret = DB_BUFFER_SMALL;
1505							break;
1506						}
1507						if (indx != 0)
1508							indx--;
1509						cp->recno--;
1510						space = 0;
1511						break;
1512					}
1513					memcpy(dp,
1514					    (u_int8_t *)pg + QPAGE_SZ(dbp),
1515					    (u_int)size);
1516					need_pg = 0;
1517					space -= size;
1518					np += size;
1519				}
1520				if (is_key)
1521					*offp-- = cp->recno;
1522				*offp-- = (u_int32_t)((((u_int8_t *)qp -
1523				    (u_int8_t *)pg) - QPAGE_SZ(dbp)) +
1524				    (dp - dbuf) + SSZA(QAMDATA, data));
1525				*offp-- = re_len;
1526			}
1527		}
1528		if (!valid && is_key == 0) {
1529			*offp-- = 0;
1530			*offp-- = 0;
1531		}
1532		cp->recno++;
1533	} while (++indx < recs && cp->recno != RECNO_OOB &&
1534	    !QAM_AFTER_CURRENT(meta, cp->recno));
1535
1536	/* Drop the page lock. */
1537	if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
1538		ret = t_ret;
1539
1540	if (cp->page != NULL) {
1541		if ((t_ret = __qam_fput(dbc,
1542		    cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0)
1543			ret = t_ret;
1544		cp->page = NULL;
1545	}
1546
1547	if (ret == 0 && space > 0 &&
1548	    (indx >= recs || cp->recno == RECNO_OOB) &&
1549	    !QAM_AFTER_CURRENT(meta, cp->recno))
1550		goto next_pg;
1551
1552	/*
1553	 * Correct recno in two cases:
1554	 * 1) If we just wrapped fetch must start at record 1 not a FIRST.
1555	 * 2) We ran out of space exactly at the end of a page.
1556	 */
1557	if (cp->recno == RECNO_OOB || (space == 0 && indx == recs))
1558		cp->recno--;
1559
1560	if (is_key == 1)
1561		*offp = RECNO_OOB;
1562	else
1563		*offp = (u_int32_t)-1;
1564
1565done:	/* Release the meta page. */
1566	if ((t_ret = __memp_fput(mpf,
1567	     dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
1568		ret = t_ret;
1569	if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
1570		ret = t_ret;
1571
1572	cp->lock = rlock;
1573
1574	return (ret);
1575}
1576
1577/*
1578 * __qamc_close --
1579 *	Close down the cursor from a single use.
1580 */
1581static int
1582__qamc_close(dbc, root_pgno, rmroot)
1583	DBC *dbc;
1584	db_pgno_t root_pgno;
1585	int *rmroot;
1586{
1587	QUEUE_CURSOR *cp;
1588	int ret;
1589
1590	COMPQUIET(root_pgno, 0);
1591	COMPQUIET(rmroot, NULL);
1592
1593	cp = (QUEUE_CURSOR *)dbc->internal;
1594
1595	/* Discard any locks not acquired inside of a transaction. */
1596	ret = __TLPUT(dbc, cp->lock);
1597
1598	LOCK_INIT(cp->lock);
1599	cp->page = NULL;
1600	cp->pgno = PGNO_INVALID;
1601	cp->indx = 0;
1602	cp->lock_mode = DB_LOCK_NG;
1603	cp->recno = RECNO_OOB;
1604	cp->flags = 0;
1605
1606	return (ret);
1607}
1608
1609/*
1610 * __qamc_dup --
1611 *	Duplicate a queue cursor, such that the new one holds appropriate
1612 *	locks for the position of the original.
1613 *
1614 * PUBLIC: int __qamc_dup __P((DBC *, DBC *));
1615 */
1616int
1617__qamc_dup(orig_dbc, new_dbc)
1618	DBC *orig_dbc, *new_dbc;
1619{
1620	QUEUE_CURSOR *orig, *new;
1621
1622	orig = (QUEUE_CURSOR *)orig_dbc->internal;
1623	new = (QUEUE_CURSOR *)new_dbc->internal;
1624
1625	new->recno = orig->recno;
1626
1627	return (0);
1628}
1629
1630/*
1631 * __qamc_init
1632 *
1633 * PUBLIC: int __qamc_init __P((DBC *));
1634 */
1635int
1636__qamc_init(dbc)
1637	DBC *dbc;
1638{
1639	DB *dbp;
1640	QUEUE_CURSOR *cp;
1641	int ret;
1642
1643	dbp = dbc->dbp;
1644
1645	/* Allocate the internal structure. */
1646	cp = (QUEUE_CURSOR *)dbc->internal;
1647	if (cp == NULL) {
1648		if ((ret =
1649		    __os_calloc(dbp->env, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
1650			return (ret);
1651		dbc->internal = (DBC_INTERNAL *)cp;
1652	}
1653
1654	/* Initialize methods. */
1655	dbc->close = dbc->c_close = __dbc_close_pp;
1656	dbc->count = dbc->c_count = __dbc_count_pp;
1657	dbc->del = dbc->c_del = __dbc_del_pp;
1658	dbc->dup = dbc->c_dup = __dbc_dup_pp;
1659	dbc->get = dbc->c_get = __dbc_get_pp;
1660	dbc->pget = dbc->c_pget = __dbc_pget_pp;
1661	dbc->put = dbc->c_put = __dbc_put_pp;
1662	dbc->am_bulk = __qam_bulk;
1663	dbc->am_close = __qamc_close;
1664	dbc->am_del = __qamc_del;
1665	dbc->am_destroy = __qamc_destroy;
1666	dbc->am_get = __qamc_get;
1667	dbc->am_put = __qamc_put;
1668	dbc->am_writelock = NULL;
1669
1670	return (0);
1671}
1672
1673/*
1674 * __qamc_destroy --
1675 *	Close a single cursor -- internal version.
1676 */
1677static int
1678__qamc_destroy(dbc)
1679	DBC *dbc;
1680{
1681	/* Discard the structures. */
1682	__os_free(dbc->env, dbc->internal);
1683
1684	return (0);
1685}
1686
1687/*
1688 * __qam_getno --
1689 *	Check the user's record number.
1690 */
1691static int
1692__qam_getno(dbp, key, rep)
1693	DB *dbp;
1694	const DBT *key;
1695	db_recno_t *rep;
1696{
1697	/* If passed an empty DBT from Java, key->data may be NULL */
1698	if (key->size != sizeof(db_recno_t)) {
1699		__db_errx(dbp->env, "illegal record number size");
1700		return (EINVAL);
1701	}
1702
1703	if ((*rep = *(db_recno_t *)key->data) == 0) {
1704		__db_errx(dbp->env, "illegal record number of 0");
1705		return (EINVAL);
1706	}
1707	return (0);
1708}
1709
1710/*
1711 * __qam_truncate --
1712 *	Truncate a queue database
1713 *
1714 * PUBLIC: int __qam_truncate __P((DBC *, u_int32_t *));
1715 */
1716int
1717__qam_truncate(dbc, countp)
1718	DBC *dbc;
1719	u_int32_t *countp;
1720{
1721	DB *dbp;
1722	DB_LOCK metalock;
1723	DB_MPOOLFILE *mpf;
1724	QMETA *meta;
1725	db_pgno_t metapno;
1726	u_int32_t count;
1727	int ret, t_ret;
1728
1729	dbp = dbc->dbp;
1730
1731	/* Walk the queue, counting rows. */
1732	for (count = 0;
1733	    (ret = __qamc_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0;)
1734		count++;
1735	if (ret != DB_NOTFOUND)
1736		return (ret);
1737
1738	/* Update the meta page. */
1739	metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1740	if ((ret =
1741	    __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
1742		return (ret);
1743
1744	mpf = dbp->mpf;
1745	if ((ret = __memp_fget(mpf, &metapno, dbc->thread_info, dbc->txn,
1746	    DB_MPOOL_DIRTY, &meta)) != 0) {
1747		/* We did not fetch it, we can release the lock. */
1748		(void)__LPUT(dbc, metalock);
1749		return (ret);
1750	}
1751	/* Remove the last extent file. */
1752	if (meta->cur_recno > 1 && ((QUEUE *)dbp->q_internal)->page_ext != 0) {
1753		if ((ret = __qam_fremove(dbp,
1754		    QAM_RECNO_PAGE(dbp, meta->cur_recno - 1))) != 0)
1755			return (ret);
1756	}
1757
1758	if (DBC_LOGGING(dbc)) {
1759		ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
1760		    QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
1761		    1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
1762	} else
1763		LSN_NOT_LOGGED(meta->dbmeta.lsn);
1764	if (ret == 0)
1765		meta->first_recno = meta->cur_recno = 1;
1766
1767	if ((t_ret = __memp_fput(mpf,
1768	     dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0)
1769		ret = t_ret;
1770	if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
1771		ret = t_ret;
1772
1773	if (countp != NULL)
1774		*countp = count;
1775
1776	return (ret);
1777}
1778
1779/*
1780 * __qam_delete --
1781 *	Queue fast delete function.
1782 *
1783 * PUBLIC: int __qam_delete __P((DBC *,  DBT *));
1784 */
1785int
1786__qam_delete(dbc, key)
1787	DBC *dbc;
1788	DBT *key;
1789{
1790	QUEUE_CURSOR *cp;
1791	int ret;
1792
1793	cp = (QUEUE_CURSOR *)dbc->internal;
1794	if ((ret = __qam_getno(dbc->dbp, key, &cp->recno)) != 0)
1795		goto err;
1796
1797	ret = __qamc_del(dbc);
1798
1799err:	return (ret);
1800}
1801