1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1999,2008 Oracle.  All rights reserved.
5 *
6 * $Id: qam_method.c,v 12.19 2008/01/08 20:58:47 bostic Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/db_am.h"
14#include "dbinc/lock.h"
15#include "dbinc/mp.h"
16#include "dbinc/qam.h"
17#include "dbinc/txn.h"
18
19static int __qam_rr __P((DB *, DB_TXN *,
20	       const char *, const char *, const char *, qam_name_op));
21static int __qam_set_extentsize __P((DB *, u_int32_t));
22
23/*
24 * __qam_db_create --
25 *	Queue specific initialization of the DB structure.
26 *
27 * PUBLIC: int __qam_db_create __P((DB *));
28 */
29int
30__qam_db_create(dbp)
31	DB *dbp;
32{
33	QUEUE *t;
34	int ret;
35
36	/* Allocate and initialize the private queue structure. */
37	if ((ret = __os_calloc(dbp->env, 1, sizeof(QUEUE), &t)) != 0)
38		return (ret);
39	dbp->q_internal = t;
40	dbp->get_q_extentsize = __qam_get_extentsize;
41	dbp->set_q_extentsize = __qam_set_extentsize;
42
43	t->re_pad = ' ';
44
45	return (0);
46}
47
48/*
49 * __qam_db_close --
50 *	Queue specific discard of the DB structure.
51 *
52 * PUBLIC: int __qam_db_close __P((DB *, u_int32_t));
53 */
54int
55__qam_db_close(dbp, flags)
56	DB *dbp;
57	u_int32_t flags;
58{
59	DB_MPOOLFILE *mpf;
60	MPFARRAY *array;
61	QUEUE *t;
62	struct __qmpf *mpfp;
63	u_int32_t i;
64	int ret, t_ret;
65
66	ret = 0;
67	if ((t = dbp->q_internal) == NULL)
68		return (0);
69
70	array = &t->array1;
71again:
72	mpfp = array->mpfarray;
73	if (mpfp != NULL) {
74		for (i = array->low_extent;
75		    i <= array->hi_extent; i++, mpfp++) {
76			mpf = mpfp->mpf;
77			mpfp->mpf = NULL;
78			if (mpf != NULL && (t_ret = __memp_fclose(mpf,
79			    LF_ISSET(DB_AM_DISCARD) ? DB_MPOOL_DISCARD : 0))
80			    != 0 && ret == 0)
81				ret = t_ret;
82		}
83		__os_free(dbp->env, array->mpfarray);
84	}
85	if (t->array2.n_extent != 0) {
86		array = &t->array2;
87		array->n_extent = 0;
88		goto again;
89	}
90
91	if (LF_ISSET(DB_AM_DISCARD) &&
92	     (t_ret = __qam_nameop(dbp, NULL,
93	     NULL, QAM_NAME_DISCARD)) != 0 && ret == 0)
94		ret = t_ret;
95
96	if (t->path != NULL)
97		__os_free(dbp->env, t->path);
98	__os_free(dbp->env, t);
99	dbp->q_internal = NULL;
100
101	return (ret);
102}
103
104/*
105 * __qam_get_extentsize --
106 *	The DB->q_get_extentsize method.
107 *
108 * PUBLIC: int __qam_get_extentsize __P((DB *, u_int32_t *));
109 */
110int
111__qam_get_extentsize(dbp, q_extentsizep)
112	DB *dbp;
113	u_int32_t *q_extentsizep;
114{
115	*q_extentsizep = ((QUEUE*)dbp->q_internal)->page_ext;
116	return (0);
117}
118
119static int
120__qam_set_extentsize(dbp, extentsize)
121	DB *dbp;
122	u_int32_t extentsize;
123{
124	DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_extentsize");
125
126	if (extentsize < 1) {
127		__db_errx(dbp->env, "Extent size must be at least 1");
128		return (EINVAL);
129	}
130
131	((QUEUE*)dbp->q_internal)->page_ext = extentsize;
132
133	return (0);
134}
135
136/*
137 * __queue_pageinfo -
138 *	Given a dbp, get first/last page information about a queue.
139 *
140 * PUBLIC: int __queue_pageinfo __P((DB *, db_pgno_t *, db_pgno_t *,
141 * PUBLIC:       int *, int, u_int32_t));
142 */
143int
144__queue_pageinfo(dbp, firstp, lastp, emptyp, prpage, flags)
145	DB *dbp;
146	db_pgno_t *firstp, *lastp;
147	int *emptyp;
148	int prpage;
149	u_int32_t flags;
150{
151	DB_MPOOLFILE *mpf;
152	DB_THREAD_INFO *ip;
153	QMETA *meta;
154	db_pgno_t first, i, last;
155	int empty, ret, t_ret;
156
157	mpf = dbp->mpf;
158	ENV_GET_THREAD_INFO(dbp->env, ip);
159
160	/* Find out the page number of the last page in the database. */
161	i = PGNO_BASE_MD;
162	if ((ret = __memp_fget(mpf, &i, ip, NULL, 0, &meta)) != 0)
163		return (ret);
164
165	first = QAM_RECNO_PAGE(dbp, meta->first_recno);
166	last = QAM_RECNO_PAGE(
167	    dbp, meta->cur_recno == 1 ? 1 : meta->cur_recno - 1);
168
169	empty = meta->cur_recno == meta->first_recno;
170	if (firstp != NULL)
171		*firstp = first;
172	if (lastp != NULL)
173		*lastp = last;
174	if (emptyp != NULL)
175		*emptyp = empty;
176#ifdef HAVE_STATISTICS
177	if (prpage)
178		ret = __db_prpage(dbp, (PAGE *)meta, flags);
179#else
180	COMPQUIET(prpage, 0);
181	COMPQUIET(flags, 0);
182#endif
183
184	if ((t_ret = __memp_fput(mpf,
185	    ip, meta, dbp->priority)) != 0 && ret == 0)
186		ret = t_ret;
187
188	return (ret);
189}
190
191#ifdef HAVE_STATISTICS
192/*
193 * __db_prqueue --
194 *	Print out a queue
195 *
196 * PUBLIC: int __db_prqueue __P((DB *, u_int32_t));
197 */
198int
199__db_prqueue(dbp, flags)
200	DB *dbp;
201	u_int32_t flags;
202{
203	DBC *dbc;
204	DB_THREAD_INFO *ip;
205	PAGE *h;
206	db_pgno_t first, i, last, pg_ext, stop;
207	int empty, ret, t_ret;
208
209	if ((ret = __queue_pageinfo(dbp, &first, &last, &empty, 1, flags)) != 0)
210		return (ret);
211
212	if (empty || ret != 0)
213		return (ret);
214
215	ENV_GET_THREAD_INFO(dbp->env, ip);
216	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
217		return (ret);
218	i = first;
219	if (first > last)
220		stop = QAM_RECNO_PAGE(dbp, UINT32_MAX);
221	else
222		stop = last;
223
224	/* Dump each page. */
225	pg_ext = ((QUEUE *)dbp->q_internal)->page_ext;
226begin:
227	for (; i <= stop; ++i) {
228		if ((ret = __qam_fget(dbc, &i, 0, &h)) != 0) {
229			if (pg_ext == 0) {
230				if (ret == DB_PAGE_NOTFOUND && first == last)
231					ret = 0;
232				goto err;
233			}
234			if (ret == ENOENT || ret == DB_PAGE_NOTFOUND) {
235				i += (pg_ext - ((i - 1) % pg_ext)) - 1;
236				ret = 0;
237				continue;
238			}
239			goto err;
240		}
241		(void)__db_prpage(dbp, h, flags);
242		if ((ret = __qam_fput(dbc, i, h, dbp->priority)) != 0)
243			goto err;
244	}
245
246	if (first > last) {
247		i = 1;
248		stop = last;
249		first = last;
250		goto begin;
251	}
252
253err:
254	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
255		ret = t_ret;
256	return (ret);
257}
258#endif
259
260/*
261 * __qam_remove --
262 *	Remove method for a Queue.
263 *
264 * PUBLIC: int __qam_remove __P((DB *, DB_TXN *, const char *, const char *));
265 */
266int
267__qam_remove(dbp, txn, name, subdb)
268	DB *dbp;
269	DB_TXN *txn;
270	const char *name, *subdb;
271{
272	return (__qam_rr(dbp, txn, name, subdb, NULL, QAM_NAME_REMOVE));
273}
274
275/*
276 * __qam_rename --
277 *	Rename method for a Queue.
278 *
279 * PUBLIC: int __qam_rename __P((DB *,
280 * PUBLIC:         DB_TXN *, const char *, const char *, const char *));
281 */
282int
283__qam_rename(dbp, txn, name, subdb, newname)
284	DB *dbp;
285	DB_TXN *txn;
286	const char *name, *subdb, *newname;
287{
288	return (__qam_rr(dbp, txn, name, subdb, newname, QAM_NAME_RENAME));
289}
290
291/*
292 * __qam_rr --
293 *	Remove/Rename method for a Queue.
294 */
295static int
296__qam_rr(dbp, txn, name, subdb, newname, op)
297	DB *dbp;
298	DB_TXN *txn;
299	const char *name, *subdb, *newname;
300	qam_name_op op;
301{
302	DB *tmpdbp;
303	DB_THREAD_INFO *ip;
304	ENV *env;
305	QUEUE *qp;
306	int ret, t_ret;
307
308	env = dbp->env;
309	ret = 0;
310
311	if (subdb != NULL && name != NULL) {
312		__db_errx(env,
313		    "Queue does not support multiple databases per file");
314		return (EINVAL);
315	}
316	ENV_GET_THREAD_INFO(env, ip);
317
318	/*
319	 * Since regular rename no longer opens the database, we may have
320	 * to do it here.
321	 */
322	if (F_ISSET(dbp, DB_AM_OPEN_CALLED))
323		tmpdbp = dbp;
324	else {
325		if ((ret = __db_create_internal(&tmpdbp, env, 0)) != 0)
326			return (ret);
327
328		/*
329		 * We need to make sure we don't self-deadlock, so give
330		 * this dbp the same locker as the incoming one.
331		 */
332		tmpdbp->locker = dbp->locker;
333		if ((ret = __db_open(tmpdbp, ip, txn,
334		    name, NULL, DB_QUEUE, DB_RDONLY, 0, PGNO_BASE_MD)) != 0)
335			goto err;
336	}
337
338	qp = (QUEUE *)tmpdbp->q_internal;
339	if (qp->page_ext != 0)
340		ret = __qam_nameop(tmpdbp, txn, newname, op);
341
342	if (!F_ISSET(dbp, DB_AM_OPEN_CALLED)) {
343err:		/*
344		 * Since we copied the locker ID from the dbp, we'd better not
345		 * free it here.
346		 */
347		tmpdbp->locker = NULL;
348
349		/* We need to remove the lock event we associated with this. */
350		if (txn != NULL)
351			__txn_remlock(env,
352			    txn, &tmpdbp->handle_lock, DB_LOCK_INVALIDID);
353
354		if (txn == NULL ) {
355			if ((t_ret = __db_close(tmpdbp,
356			    txn, DB_NOSYNC)) != 0 && ret == 0)
357				ret = t_ret;
358		} else {
359			if ((t_ret = __txn_closeevent(env,
360			    txn, tmpdbp)) != 0 && ret == 0)
361				ret = t_ret;
362		}
363	}
364	return (ret);
365}
366
367/*
368 * __qam_map_flags --
369 *	Map queue-specific flags from public to the internal values.
370 *
371 * PUBLIC: void __qam_map_flags __P((DB *, u_int32_t *, u_int32_t *));
372 */
373void
374__qam_map_flags(dbp, inflagsp, outflagsp)
375	DB *dbp;
376	u_int32_t *inflagsp, *outflagsp;
377{
378	COMPQUIET(dbp, NULL);
379
380	if (FLD_ISSET(*inflagsp, DB_INORDER)) {
381		FLD_SET(*outflagsp, DB_AM_INORDER);
382		FLD_CLR(*inflagsp, DB_INORDER);
383	}
384}
385
386/*
387 * __qam_set_flags --
388 *	Set queue-specific flags.
389 *
390 * PUBLIC: int __qam_set_flags __P((DB *, u_int32_t *flagsp));
391 */
392int
393__qam_set_flags(dbp, flagsp)
394	DB *dbp;
395	u_int32_t *flagsp;
396{
397
398	__qam_map_flags(dbp, flagsp, &dbp->flags);
399	return (0);
400}
401