1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996,2008 Oracle.  All rights reserved.
5 *
6 * $Id: bt_curadj.c,v 12.17 2008/01/08 20:57:59 bostic Exp $
7 */
8
9#include "db_config.h"
10
11#include "db_int.h"
12#include "dbinc/db_page.h"
13#include "dbinc/btree.h"
14#include "dbinc/mp.h"
15
16static int __bam_opd_cursor __P((DB *, DBC *, db_pgno_t, u_int32_t, u_int32_t));
17
18/*
19 * Cursor adjustments are logged if they are for subtransactions.  This is
20 * because it's possible for a subtransaction to adjust cursors which will
21 * still be active after the subtransaction aborts, and so which must be
22 * restored to their previous locations.  Cursors that can be both affected
23 * by our cursor adjustments and active after our transaction aborts can
24 * only be found in our parent transaction -- cursors in other transactions,
25 * including other child transactions of our parent, must have conflicting
26 * locker IDs, and so cannot be affected by adjustments in this transaction.
27 */
28
29/*
30 * __bam_ca_delete --
31 *	Update the cursors when items are deleted and when already deleted
32 *	items are overwritten.  Return the number of relevant cursors found.
33 *
34 * PUBLIC: int __bam_ca_delete __P((DB *, db_pgno_t, u_int32_t, int, int *));
35 */
36int
37__bam_ca_delete(dbp, pgno, indx, delete, countp)
38	DB *dbp;
39	db_pgno_t pgno;
40	u_int32_t indx;
41	int delete, *countp;
42{
43	BTREE_CURSOR *cp;
44	DB *ldbp;
45	DBC *dbc;
46	ENV *env;
47	int count;		/* !!!: Has to contain max number of cursors. */
48
49	env = dbp->env;
50
51	/*
52	 * Adjust the cursors.  We have the page write locked, so the
53	 * only other cursors that can be pointing at a page are
54	 * those in the same thread of control.  Unfortunately, we don't
55	 * know that they're using the same DB handle, so traverse
56	 * all matching DB handles in the same ENV, then all cursors
57	 * on each matching DB handle.
58	 *
59	 * Each cursor is single-threaded, so we only need to lock the
60	 * list of DBs and then the list of cursors in each DB.
61	 */
62	MUTEX_LOCK(env, env->mtx_dblist);
63	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
64	for (count = 0;
65	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
66	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
67		MUTEX_LOCK(env, dbp->mutex);
68		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
69			cp = (BTREE_CURSOR *)dbc->internal;
70			if (cp->pgno == pgno && cp->indx == indx &&
71			    !MVCC_SKIP_CURADJ(dbc, pgno)) {
72				/*
73				 * [#8032] This assert is checking
74				 * for possible race conditions where we
75				 * hold a cursor position without a lock.
76				 * Unfortunately, there are paths in the
77				 * Btree code that do not satisfy these
78				 * conditions. None of them are known to
79				 * be a problem, but this assert should
80				 * be re-activated when the Btree stack
81				 * code is re-written.
82				DB_ASSERT(env, !STD_LOCKING(dbc) ||
83				    cp->lock_mode != DB_LOCK_NG);
84				 */
85				if (delete)
86					F_SET(cp, C_DELETED);
87				else
88					F_CLR(cp, C_DELETED);
89				++count;
90			}
91		}
92		MUTEX_UNLOCK(env, dbp->mutex);
93	}
94	MUTEX_UNLOCK(env, env->mtx_dblist);
95
96	if (countp != NULL)
97		*countp = count;
98	return (0);
99}
100
101/*
102 * __ram_ca_delete --
103 *	Return if any relevant cursors found.
104 *
105 * PUBLIC: int __ram_ca_delete __P((DB *, db_pgno_t, int *));
106 */
107int
108__ram_ca_delete(dbp, root_pgno, foundp)
109	DB *dbp;
110	db_pgno_t root_pgno;
111	int *foundp;
112{
113	DB *ldbp;
114	DBC *dbc;
115	ENV *env;
116	int found;
117
118	env = dbp->env;
119
120	/*
121	 * Review the cursors.  See the comment in __bam_ca_delete().
122	 */
123	MUTEX_LOCK(env, env->mtx_dblist);
124	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
125	for (found = 0;
126	    found == 0 && ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
127	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
128		MUTEX_LOCK(env, dbp->mutex);
129		TAILQ_FOREACH(dbc, &ldbp->active_queue, links)
130			if (dbc->internal->root == root_pgno &&
131			    !MVCC_SKIP_CURADJ(dbc, root_pgno)) {
132				found = 1;
133				break;
134			}
135		MUTEX_UNLOCK(env, dbp->mutex);
136	}
137	MUTEX_UNLOCK(env, env->mtx_dblist);
138
139	*foundp = found;
140	return (0);
141}
142
143/*
144 * __bam_ca_di --
145 *	Adjust the cursors during a delete or insert.
146 *
147 * PUBLIC: int __bam_ca_di __P((DBC *, db_pgno_t, u_int32_t, int));
148 */
149int
150__bam_ca_di(my_dbc, pgno, indx, adjust)
151	DBC *my_dbc;
152	db_pgno_t pgno;
153	u_int32_t indx;
154	int adjust;
155{
156	DB *dbp, *ldbp;
157	DBC *dbc;
158	DBC_INTERNAL *cp;
159	DB_LSN lsn;
160	DB_TXN *my_txn;
161	ENV *env;
162	int found, ret;
163
164	dbp = my_dbc->dbp;
165	env = dbp->env;
166
167	my_txn = IS_SUBTRANSACTION(my_dbc->txn) ? my_dbc->txn : NULL;
168
169	/*
170	 * Adjust the cursors.  See the comment in __bam_ca_delete().
171	 */
172	MUTEX_LOCK(env, env->mtx_dblist);
173	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
174	for (found = 0;
175	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
176	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
177		MUTEX_LOCK(env, dbp->mutex);
178		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
179			if (dbc->dbtype == DB_RECNO)
180				continue;
181			cp = dbc->internal;
182			if (cp->pgno == pgno && cp->indx >= indx &&
183			    (dbc == my_dbc || !MVCC_SKIP_CURADJ(dbc, pgno))) {
184				/* Cursor indices should never be negative. */
185				DB_ASSERT(env, cp->indx != 0 || adjust > 0);
186				/* [#8032]
187				DB_ASSERT(env, !STD_LOCKING(dbc) ||
188				    cp->lock_mode != DB_LOCK_NG);
189				*/
190				cp->indx += adjust;
191				if (my_txn != NULL && dbc->txn != my_txn)
192					found = 1;
193			}
194		}
195		MUTEX_UNLOCK(env, dbp->mutex);
196	}
197	MUTEX_UNLOCK(env, env->mtx_dblist);
198
199	if (found != 0 && DBC_LOGGING(my_dbc)) {
200		if ((ret = __bam_curadj_log(dbp, my_dbc->txn, &lsn, 0,
201		    DB_CA_DI, pgno, 0, 0, (u_int32_t)adjust, indx, 0)) != 0)
202			return (ret);
203	}
204
205	return (0);
206}
207
208/*
209 * __bam_opd_cursor -- create a new opd cursor.
210 */
211static int
212__bam_opd_cursor(dbp, dbc, first, tpgno, ti)
213	DB *dbp;
214	DBC *dbc;
215	db_pgno_t tpgno;
216	u_int32_t first, ti;
217{
218	BTREE_CURSOR *cp, *orig_cp;
219	DBC *dbc_nopd;
220	int ret;
221
222	orig_cp = (BTREE_CURSOR *)dbc->internal;
223	dbc_nopd = NULL;
224
225	/*
226	 * Allocate a new cursor and create the stack.  If duplicates
227	 * are sorted, we've just created an off-page duplicate Btree.
228	 * If duplicates aren't sorted, we've just created a Recno tree.
229	 *
230	 * Note that in order to get here at all, there shouldn't be
231	 * an old off-page dup cursor--to augment the checking dbc_newopd
232	 * will do, assert this.
233	 */
234	DB_ASSERT(dbp->env, orig_cp->opd == NULL);
235	if ((ret = __dbc_newopd(dbc, tpgno, orig_cp->opd, &dbc_nopd)) != 0)
236		return (ret);
237
238	cp = (BTREE_CURSOR *)dbc_nopd->internal;
239	cp->pgno = tpgno;
240	cp->indx = ti;
241
242	if (dbp->dup_compare == NULL) {
243		/*
244		 * Converting to off-page Recno trees is tricky.  The
245		 * record number for the cursor is the index + 1 (to
246		 * convert to 1-based record numbers).
247		 */
248		cp->recno = ti + 1;
249	}
250
251	/*
252	 * Transfer the deleted flag from the top-level cursor to the
253	 * created one.
254	 */
255	if (F_ISSET(orig_cp, C_DELETED)) {
256		F_SET(cp, C_DELETED);
257		F_CLR(orig_cp, C_DELETED);
258	}
259
260	/* Stack the cursors and reset the initial cursor's index. */
261	orig_cp->opd = dbc_nopd;
262	orig_cp->indx = first;
263	return (0);
264}
265
266/*
267 * __bam_ca_dup --
268 *	Adjust the cursors when moving items from a leaf page to a duplicates
269 *	page.
270 *
271 * PUBLIC: int __bam_ca_dup __P((DBC *,
272 * PUBLIC:    u_int32_t, db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
273 */
274int
275__bam_ca_dup(my_dbc, first, fpgno, fi, tpgno, ti)
276	DBC *my_dbc;
277	db_pgno_t fpgno, tpgno;
278	u_int32_t first, fi, ti;
279{
280	BTREE_CURSOR *orig_cp;
281	DB *dbp, *ldbp;
282	DBC *dbc;
283	DB_LSN lsn;
284	DB_TXN *my_txn;
285	ENV *env;
286	int found, ret, t_ret;
287
288	dbp = my_dbc->dbp;
289	env = dbp->env;
290	my_txn = IS_SUBTRANSACTION(my_dbc->txn) ? my_dbc->txn : NULL;
291	ret = 0;
292
293	/*
294	 * Adjust the cursors.  See the comment in __bam_ca_delete().
295	 */
296	MUTEX_LOCK(env, env->mtx_dblist);
297	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
298	for (found = 0;
299	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
300	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
301loop:		MUTEX_LOCK(env, dbp->mutex);
302		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
303			/* Find cursors pointing to this record. */
304			orig_cp = (BTREE_CURSOR *)dbc->internal;
305			if (orig_cp->pgno != fpgno || orig_cp->indx != fi ||
306			    MVCC_SKIP_CURADJ(dbc, fpgno))
307				continue;
308
309			/*
310			 * Since we rescan the list see if this is already
311			 * converted.
312			 */
313			if (orig_cp->opd != NULL)
314				continue;
315
316			MUTEX_UNLOCK(env, dbp->mutex);
317			/* [#8032]
318			DB_ASSERT(env, !STD_LOCKING(dbc) ||
319			    orig_cp->lock_mode != DB_LOCK_NG);
320			*/
321			if ((ret = __bam_opd_cursor(dbp,
322			    dbc, first, tpgno, ti)) != 0)
323				goto err;
324			if (my_txn != NULL && dbc->txn != my_txn)
325				found = 1;
326			/* We released the mutex to get a cursor, start over. */
327			goto loop;
328		}
329		MUTEX_UNLOCK(env, dbp->mutex);
330	}
331err:	MUTEX_UNLOCK(env, env->mtx_dblist);
332
333	if (found != 0 && DBC_LOGGING(my_dbc)) {
334		if ((t_ret = __bam_curadj_log(dbp, my_dbc->txn,
335		    &lsn, 0, DB_CA_DUP, fpgno, tpgno, 0, first, fi, ti)) != 0 &&
336		    ret == 0)
337			ret = t_ret;
338	}
339
340	return (ret);
341}
342
343/*
344 * __bam_ca_undodup --
345 *	Adjust the cursors when returning items to a leaf page
346 *	from a duplicate page.
347 *	Called only during undo processing.
348 *
349 * PUBLIC: int __bam_ca_undodup __P((DB *,
350 * PUBLIC:    u_int32_t, db_pgno_t, u_int32_t, u_int32_t));
351 */
352int
353__bam_ca_undodup(dbp, first, fpgno, fi, ti)
354	DB *dbp;
355	db_pgno_t fpgno;
356	u_int32_t first, fi, ti;
357{
358	BTREE_CURSOR *orig_cp;
359	DB *ldbp;
360	DBC *dbc;
361	ENV *env;
362	int ret;
363
364	env = dbp->env;
365	ret = 0;
366
367	/*
368	 * Adjust the cursors.  See the comment in __bam_ca_delete().
369	 */
370	MUTEX_LOCK(env, env->mtx_dblist);
371	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
372	for (;
373	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
374	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
375loop:		MUTEX_LOCK(env, dbp->mutex);
376		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
377			orig_cp = (BTREE_CURSOR *)dbc->internal;
378
379			/*
380			 * A note on the orig_cp->opd != NULL requirement here:
381			 * it's possible that there's a cursor that refers to
382			 * the same duplicate set, but which has no opd cursor,
383			 * because it refers to a different item and we took
384			 * care of it while processing a previous record.
385			 */
386			if (orig_cp->pgno != fpgno ||
387			    orig_cp->indx != first ||
388			    orig_cp->opd == NULL || ((BTREE_CURSOR *)
389			    orig_cp->opd->internal)->indx != ti ||
390			    MVCC_SKIP_CURADJ(dbc, fpgno))
391				continue;
392			MUTEX_UNLOCK(env, dbp->mutex);
393			if ((ret = __dbc_close(orig_cp->opd)) != 0)
394				goto err;
395			orig_cp->opd = NULL;
396			orig_cp->indx = fi;
397			/*
398			 * We released the mutex to free a cursor,
399			 * start over.
400			 */
401			goto loop;
402		}
403		MUTEX_UNLOCK(env, dbp->mutex);
404	}
405err:	MUTEX_UNLOCK(env, env->mtx_dblist);
406
407	return (ret);
408}
409
410/*
411 * __bam_ca_rsplit --
412 *	Adjust the cursors when doing reverse splits.
413 *
414 * PUBLIC: int __bam_ca_rsplit __P((DBC *, db_pgno_t, db_pgno_t));
415 */
416int
417__bam_ca_rsplit(my_dbc, fpgno, tpgno)
418	DBC* my_dbc;
419	db_pgno_t fpgno, tpgno;
420{
421	DB *dbp, *ldbp;
422	DBC *dbc;
423	DB_LSN lsn;
424	DB_TXN *my_txn;
425	ENV *env;
426	int found, ret;
427
428	dbp = my_dbc->dbp;
429	env = dbp->env;
430	my_txn = IS_SUBTRANSACTION(my_dbc->txn) ? my_dbc->txn : NULL;
431
432	/*
433	 * Adjust the cursors.  See the comment in __bam_ca_delete().
434	 */
435	MUTEX_LOCK(env, env->mtx_dblist);
436	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
437	for (found = 0;
438	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
439	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
440		MUTEX_LOCK(env, dbp->mutex);
441		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
442			if (dbc->dbtype == DB_RECNO)
443				continue;
444			if (dbc->internal->pgno == fpgno &&
445			    !MVCC_SKIP_CURADJ(dbc, fpgno)) {
446				dbc->internal->pgno = tpgno;
447				/* [#8032]
448				DB_ASSERT(env, !STD_LOCKING(dbc) ||
449				    dbc->internal->lock_mode != DB_LOCK_NG);
450				*/
451				if (my_txn != NULL && dbc->txn != my_txn)
452					found = 1;
453			}
454		}
455		MUTEX_UNLOCK(env, dbp->mutex);
456	}
457	MUTEX_UNLOCK(env, env->mtx_dblist);
458
459	if (found != 0 && DBC_LOGGING(my_dbc)) {
460		if ((ret = __bam_curadj_log(dbp, my_dbc->txn,
461		    &lsn, 0, DB_CA_RSPLIT, fpgno, tpgno, 0, 0, 0, 0)) != 0)
462			return (ret);
463	}
464	return (0);
465}
466
467/*
468 * __bam_ca_split --
469 *	Adjust the cursors when splitting a page.
470 *
471 * PUBLIC: int __bam_ca_split __P((DBC *,
472 * PUBLIC:    db_pgno_t, db_pgno_t, db_pgno_t, u_int32_t, int));
473 */
474int
475__bam_ca_split(my_dbc, ppgno, lpgno, rpgno, split_indx, cleft)
476	DBC *my_dbc;
477	db_pgno_t ppgno, lpgno, rpgno;
478	u_int32_t split_indx;
479	int cleft;
480{
481	DB *dbp, *ldbp;
482	DBC *dbc;
483	DBC_INTERNAL *cp;
484	DB_LSN lsn;
485	DB_TXN *my_txn;
486	ENV *env;
487	int found, ret;
488
489	dbp = my_dbc->dbp;
490	env = dbp->env;
491	my_txn = IS_SUBTRANSACTION(my_dbc->txn) ? my_dbc->txn : NULL;
492
493	/*
494	 * Adjust the cursors.  See the comment in __bam_ca_delete().
495	 *
496	 * If splitting the page that a cursor was on, the cursor has to be
497	 * adjusted to point to the same record as before the split.  Most
498	 * of the time we don't adjust pointers to the left page, because
499	 * we're going to copy its contents back over the original page.  If
500	 * the cursor is on the right page, it is decremented by the number of
501	 * records split to the left page.
502	 */
503	MUTEX_LOCK(env, env->mtx_dblist);
504	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
505	for (found = 0;
506	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
507	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
508		MUTEX_LOCK(env, dbp->mutex);
509		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
510			if (dbc->dbtype == DB_RECNO)
511				continue;
512			cp = dbc->internal;
513			if (cp->pgno == ppgno &&
514			    !MVCC_SKIP_CURADJ(dbc, ppgno)) {
515				/* [#8032]
516				DB_ASSERT(env, !STD_LOCKING(dbc) ||
517				    cp->lock_mode != DB_LOCK_NG);
518				*/
519				if (my_txn != NULL && dbc->txn != my_txn)
520					found = 1;
521				if (cp->indx < split_indx) {
522					if (cleft)
523						cp->pgno = lpgno;
524				} else {
525					cp->pgno = rpgno;
526					cp->indx -= split_indx;
527				}
528			}
529		}
530		MUTEX_UNLOCK(env, dbp->mutex);
531	}
532	MUTEX_UNLOCK(env, env->mtx_dblist);
533
534	if (found != 0 && DBC_LOGGING(my_dbc)) {
535		if ((ret = __bam_curadj_log(dbp,
536		    my_dbc->txn, &lsn, 0, DB_CA_SPLIT, ppgno, rpgno,
537		    cleft ? lpgno : PGNO_INVALID, 0, split_indx, 0)) != 0)
538			return (ret);
539	}
540
541	return (0);
542}
543
544/*
545 * __bam_ca_undosplit --
546 *	Adjust the cursors when undoing a split of a page.
547 *	If we grew a level we will execute this for both the
548 *	left and the right pages.
549 *	Called only during undo processing.
550 *
551 * PUBLIC: int __bam_ca_undosplit __P((DB *,
552 * PUBLIC:    db_pgno_t, db_pgno_t, db_pgno_t, u_int32_t));
553 */
554int
555__bam_ca_undosplit(dbp, frompgno, topgno, lpgno, split_indx)
556	DB *dbp;
557	db_pgno_t frompgno, topgno, lpgno;
558	u_int32_t split_indx;
559{
560	DB *ldbp;
561	DBC *dbc;
562	DBC_INTERNAL *cp;
563	ENV *env;
564
565	env = dbp->env;
566
567	/*
568	 * Adjust the cursors.  See the comment in __bam_ca_delete().
569	 *
570	 * When backing out a split, we move the cursor back
571	 * to the original offset and bump it by the split_indx.
572	 */
573	MUTEX_LOCK(env, env->mtx_dblist);
574	FIND_FIRST_DB_MATCH(env, dbp, ldbp);
575	for (;
576	    ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid;
577	    ldbp = TAILQ_NEXT(ldbp, dblistlinks)) {
578		MUTEX_LOCK(env, dbp->mutex);
579		TAILQ_FOREACH(dbc, &ldbp->active_queue, links) {
580			if (dbc->dbtype == DB_RECNO)
581				continue;
582			cp = dbc->internal;
583			if (cp->pgno == topgno &&
584			    !MVCC_SKIP_CURADJ(dbc, topgno)) {
585				cp->pgno = frompgno;
586				cp->indx += split_indx;
587			} else if (cp->pgno == lpgno &&
588			    !MVCC_SKIP_CURADJ(dbc, lpgno))
589				cp->pgno = frompgno;
590		}
591		MUTEX_UNLOCK(env, dbp->mutex);
592	}
593	MUTEX_UNLOCK(env, env->mtx_dblist);
594
595	return (0);
596}
597