1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001, 2010 Oracle and/or its affiliates.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/db_am.h"
14#include "dbinc/lock.h"
15#include "dbinc/mp.h"
16#include "dbinc/qam.h"
17#include "dbinc/txn.h"
18
19static int __qam_rr __P((DB *, DB_THREAD_INFO *, DB_TXN *,
20	       const char *, const char *, const char *, qam_name_op));
21static int __qam_set_extentsize __P((DB *, u_int32_t));
22
23/*
24 * __qam_db_create --
25 *	Queue specific initialization of the DB structure.
26 *
27 * PUBLIC: int __qam_db_create __P((DB *));
28 */
29int
30__qam_db_create(dbp)
31	DB *dbp;
32{
33	QUEUE *t;
34	int ret;
35
36	/* Allocate and initialize the private queue structure. */
37	if ((ret = __os_calloc(dbp->env, 1, sizeof(QUEUE), &t)) != 0)
38		return (ret);
39	dbp->q_internal = t;
40	dbp->get_q_extentsize = __qam_get_extentsize;
41	dbp->set_q_extentsize = __qam_set_extentsize;
42
43	t->re_pad = ' ';
44
45	return (0);
46}
47
48/*
49 * __qam_db_close --
50 *	Queue specific discard of the DB structure.
51 *
52 * PUBLIC: int __qam_db_close __P((DB *, u_int32_t));
53 */
54int
55__qam_db_close(dbp, flags)
56	DB *dbp;
57	u_int32_t flags;
58{
59	DB_MPOOLFILE *mpf;
60	MPFARRAY *array;
61	QUEUE *t;
62	struct __qmpf *mpfp;
63	u_int32_t i;
64	int ret, t_ret;
65
66	ret = 0;
67	if ((t = dbp->q_internal) == NULL)
68		return (0);
69
70	array = &t->array1;
71again:
72	mpfp = array->mpfarray;
73	if (mpfp != NULL) {
74		for (i = array->low_extent;
75		    i <= array->hi_extent; i++, mpfp++) {
76			mpf = mpfp->mpf;
77			mpfp->mpf = NULL;
78			if (mpf != NULL && (t_ret = __memp_fclose(mpf,
79			    LF_ISSET(DB_AM_DISCARD) ? DB_MPOOL_DISCARD : 0))
80			    != 0 && ret == 0)
81				ret = t_ret;
82		}
83		__os_free(dbp->env, array->mpfarray);
84	}
85	if (t->array2.n_extent != 0) {
86		array = &t->array2;
87		array->n_extent = 0;
88		goto again;
89	}
90
91	if (LF_ISSET(DB_AM_DISCARD) &&
92	     (t_ret = __qam_nameop(dbp, NULL,
93	     NULL, QAM_NAME_DISCARD)) != 0 && ret == 0)
94		ret = t_ret;
95
96	if (t->path != NULL)
97		__os_free(dbp->env, t->path);
98	__os_free(dbp->env, t);
99	dbp->q_internal = NULL;
100
101	return (ret);
102}
103
104/*
105 * __qam_get_extentsize --
106 *	The DB->q_get_extentsize method.
107 *
108 * PUBLIC: int __qam_get_extentsize __P((DB *, u_int32_t *));
109 */
110int
111__qam_get_extentsize(dbp, q_extentsizep)
112	DB *dbp;
113	u_int32_t *q_extentsizep;
114{
115	*q_extentsizep = ((QUEUE*)dbp->q_internal)->page_ext;
116	return (0);
117}
118
119static int
120__qam_set_extentsize(dbp, extentsize)
121	DB *dbp;
122	u_int32_t extentsize;
123{
124	DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_extentsize");
125
126	if (extentsize < 1) {
127		__db_errx(dbp->env, "Extent size must be at least 1");
128		return (EINVAL);
129	}
130
131	((QUEUE*)dbp->q_internal)->page_ext = extentsize;
132
133	return (0);
134}
135
136/*
137 * __queue_pageinfo -
138 *	Given a dbp, get first/last page information about a queue.
139 *
140 * PUBLIC: int __queue_pageinfo __P((DB *, db_pgno_t *, db_pgno_t *,
141 * PUBLIC:       int *, int, u_int32_t));
142 */
143int
144__queue_pageinfo(dbp, firstp, lastp, emptyp, prpage, flags)
145	DB *dbp;
146	db_pgno_t *firstp, *lastp;
147	int *emptyp;
148	int prpage;
149	u_int32_t flags;
150{
151	DB_MPOOLFILE *mpf;
152	DB_THREAD_INFO *ip;
153	QMETA *meta;
154	db_pgno_t first, i, last;
155	int empty, ret, t_ret;
156
157	mpf = dbp->mpf;
158	ENV_GET_THREAD_INFO(dbp->env, ip);
159
160	/* Find out the page number of the last page in the database. */
161	i = PGNO_BASE_MD;
162	if ((ret = __memp_fget(mpf, &i, ip, NULL, 0, &meta)) != 0)
163		return (ret);
164
165	first = QAM_RECNO_PAGE(dbp, meta->first_recno);
166	last = QAM_RECNO_PAGE(
167	    dbp, meta->cur_recno == 1 ? 1 : meta->cur_recno - 1);
168
169	empty = meta->cur_recno == meta->first_recno;
170	if (firstp != NULL)
171		*firstp = first;
172	if (lastp != NULL)
173		*lastp = last;
174	if (emptyp != NULL)
175		*emptyp = empty;
176#ifdef HAVE_STATISTICS
177	if (prpage)
178		ret = __db_prpage(dbp, (PAGE *)meta, flags);
179#else
180	COMPQUIET(prpage, 0);
181	COMPQUIET(flags, 0);
182#endif
183
184	if ((t_ret = __memp_fput(mpf,
185	    ip, meta, dbp->priority)) != 0 && ret == 0)
186		ret = t_ret;
187
188	return (ret);
189}
190
191#ifdef HAVE_STATISTICS
192/*
193 * __db_prqueue --
194 *	Print out a queue
195 *
196 * PUBLIC: int __db_prqueue __P((DB *, u_int32_t));
197 */
198int
199__db_prqueue(dbp, flags)
200	DB *dbp;
201	u_int32_t flags;
202{
203	DBC *dbc;
204	DB_THREAD_INFO *ip;
205	PAGE *h;
206	db_pgno_t first, i, last, pg_ext, stop;
207	int empty, ret, t_ret;
208
209	if ((ret = __queue_pageinfo(dbp, &first, &last, &empty, 1, flags)) != 0)
210		return (ret);
211
212	if (empty || ret != 0)
213		return (ret);
214
215	ENV_GET_THREAD_INFO(dbp->env, ip);
216	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
217		return (ret);
218	i = first;
219	if (first > last)
220		stop = QAM_RECNO_PAGE(dbp, UINT32_MAX);
221	else
222		stop = last;
223
224	/* Dump each page. */
225	pg_ext = ((QUEUE *)dbp->q_internal)->page_ext;
226begin:
227	for (; i <= stop; ++i) {
228		if ((ret = __qam_fget(dbc, &i, 0, &h)) != 0) {
229			if (pg_ext == 0) {
230				if (ret == DB_PAGE_NOTFOUND && first == last)
231					ret = 0;
232				goto err;
233			}
234			if (ret == ENOENT || ret == DB_PAGE_NOTFOUND) {
235				i += (pg_ext - ((i - 1) % pg_ext)) - 1;
236				ret = 0;
237				continue;
238			}
239			goto err;
240		}
241		(void)__db_prpage(dbp, h, flags);
242		if ((ret = __qam_fput(dbc, i, h, dbp->priority)) != 0)
243			goto err;
244	}
245
246	if (first > last) {
247		i = 1;
248		stop = last;
249		first = last;
250		goto begin;
251	}
252
253err:
254	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
255		ret = t_ret;
256	return (ret);
257}
258#endif
259
260/*
261 * __qam_remove --
262 *	Remove method for a Queue.
263 *
264 * PUBLIC: int __qam_remove __P((DB *, DB_THREAD_INFO *, DB_TXN *,
265 * PUBLIC:    const char *, const char *, u_int32_t));
266 */
267int
268__qam_remove(dbp, ip, txn, name, subdb, flags)
269	DB *dbp;
270	DB_THREAD_INFO *ip;
271	DB_TXN *txn;
272	const char *name, *subdb;
273	u_int32_t flags;
274{
275	COMPQUIET(flags, 0);
276	return (__qam_rr(dbp, ip, txn, name, subdb, NULL, QAM_NAME_REMOVE));
277}
278
279/*
280 * __qam_rename --
281 *	Rename method for a Queue.
282 *
283 * PUBLIC: int __qam_rename __P((DB *, DB_THREAD_INFO *, DB_TXN *,
284 * PUBLIC:         const char *, const char *, const char *));
285 */
286int
287__qam_rename(dbp, ip, txn, name, subdb, newname)
288	DB *dbp;
289	DB_THREAD_INFO *ip;
290	DB_TXN *txn;
291	const char *name, *subdb, *newname;
292{
293	return (__qam_rr(dbp, ip, txn, name, subdb, newname, QAM_NAME_RENAME));
294}
295
296/*
297 * __qam_rr --
298 *	Remove/Rename method for a Queue.
299 */
300static int
301__qam_rr(dbp, ip, txn, name, subdb, newname, op)
302	DB *dbp;
303	DB_THREAD_INFO *ip;
304	DB_TXN *txn;
305	const char *name, *subdb, *newname;
306	qam_name_op op;
307{
308	DB *tmpdbp;
309	ENV *env;
310	QUEUE *qp;
311	int ret, t_ret;
312
313	env = dbp->env;
314	ret = 0;
315
316	if (subdb != NULL && name != NULL) {
317		__db_errx(env,
318		    "Queue does not support multiple databases per file");
319		return (EINVAL);
320	}
321
322	/*
323	 * Since regular rename no longer opens the database, we may have
324	 * to do it here.
325	 */
326	if (F_ISSET(dbp, DB_AM_OPEN_CALLED))
327		tmpdbp = dbp;
328	else {
329		if ((ret = __db_create_internal(&tmpdbp, env, 0)) != 0)
330			return (ret);
331
332		/*
333		 * We need to make sure we don't self-deadlock, so give
334		 * this dbp the same locker as the incoming one.
335		 */
336		tmpdbp->locker = dbp->locker;
337		if ((ret = __db_open(tmpdbp, ip, txn,
338		    name, NULL, DB_QUEUE, DB_RDONLY, 0, PGNO_BASE_MD)) != 0)
339			goto err;
340	}
341
342	qp = (QUEUE *)tmpdbp->q_internal;
343	if (qp->page_ext != 0)
344		ret = __qam_nameop(tmpdbp, txn, newname, op);
345
346	if (!F_ISSET(dbp, DB_AM_OPEN_CALLED)) {
347err:		/*
348		 * Since we copied the locker ID from the dbp, we'd better not
349		 * free it here.
350		 */
351		tmpdbp->locker = NULL;
352
353		/* We need to remove the lock event we associated with this. */
354		if (txn != NULL)
355			__txn_remlock(env,
356			    txn, &tmpdbp->handle_lock, DB_LOCK_INVALIDID);
357
358		if ((t_ret = __db_close(tmpdbp,
359		    txn, DB_NOSYNC)) != 0 && ret == 0)
360			ret = t_ret;
361	}
362	return (ret);
363}
364
365/*
366 * __qam_map_flags --
367 *	Map queue-specific flags from public to the internal values.
368 *
369 * PUBLIC: void __qam_map_flags __P((DB *, u_int32_t *, u_int32_t *));
370 */
371void
372__qam_map_flags(dbp, inflagsp, outflagsp)
373	DB *dbp;
374	u_int32_t *inflagsp, *outflagsp;
375{
376	COMPQUIET(dbp, NULL);
377
378	if (FLD_ISSET(*inflagsp, DB_INORDER)) {
379		FLD_SET(*outflagsp, DB_AM_INORDER);
380		FLD_CLR(*inflagsp, DB_INORDER);
381	}
382}
383
384/*
385 * __qam_set_flags --
386 *	Set queue-specific flags.
387 *
388 * PUBLIC: int __qam_set_flags __P((DB *, u_int32_t *flagsp));
389 */
390int
391__qam_set_flags(dbp, flagsp)
392	DB *dbp;
393	u_int32_t *flagsp;
394{
395
396	__qam_map_flags(dbp, flagsp, &dbp->flags);
397	return (0);
398}
399