1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996,2008 Oracle. All rights reserved. 5 * 6 * $Id: bt_cursor.c,v 12.34 2008/03/11 21:07:40 mbrey Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/btree.h" 14#include "dbinc/lock.h" 15#include "dbinc/mp.h" 16 17static int __bam_bulk __P((DBC *, DBT *, u_int32_t)); 18static int __bamc_close __P((DBC *, db_pgno_t, int *)); 19static int __bamc_del __P((DBC *)); 20static int __bamc_destroy __P((DBC *)); 21static int __bamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); 22static int __bamc_getstack __P((DBC *)); 23static int __bamc_next __P((DBC *, int, int)); 24static int __bamc_physdel __P((DBC *)); 25static int __bamc_prev __P((DBC *)); 26static int __bamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); 27static int __bamc_search __P((DBC *, 28 db_pgno_t, const DBT *, u_int32_t, int *)); 29static int __bamc_writelock __P((DBC *)); 30static int __bam_getboth_finddatum __P((DBC *, DBT *, u_int32_t)); 31static int __bam_getbothc __P((DBC *, DBT *)); 32static int __bam_get_prev __P((DBC *)); 33static int __bam_isopd __P((DBC *, db_pgno_t *)); 34 35/* 36 * Acquire a new page/lock. If we hold a page/lock, discard the page, and 37 * lock-couple the lock. 38 * 39 * !!! 40 * We have to handle both where we have a lock to lock-couple and where we 41 * don't -- we don't duplicate locks when we duplicate cursors if we are 42 * running in a transaction environment as there's no point if locks are 43 * never discarded. This means that the cursor may or may not hold a lock. 44 * In the case where we are descending the tree we always want to unlock 45 * the held interior page so we use ACQUIRE_COUPLE. 46 */ 47#undef ACQUIRE 48#define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, flags, ret) do { \ 49 DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \ 50 if ((pagep) != NULL) { \ 51 ret = __memp_fput(__mpf, \ 52 (dbc)->thread_info, pagep, dbc->priority); \ 53 pagep = NULL; \ 54 } else \ 55 ret = 0; \ 56 if ((ret) == 0 && STD_LOCKING(dbc)) \ 57 ret = __db_lget( \ 58 dbc, LCK_COUPLE, lpgno, mode, flags, &(lock)); \ 59 if ((ret) == 0) \ 60 ret = __memp_fget(__mpf, &(fpgno), \ 61 (dbc)->thread_info, (dbc)->txn, 0, &(pagep)); \ 62} while (0) 63 64/* Acquire a new page/lock for a cursor. */ 65#undef ACQUIRE_CUR 66#define ACQUIRE_CUR(dbc, mode, p, flags, ret) do { \ 67 BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \ 68 if (p != __cp->pgno) \ 69 __cp->pgno = PGNO_INVALID; \ 70 ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, flags, ret); \ 71 if ((ret) == 0) { \ 72 __cp->pgno = p; \ 73 __cp->lock_mode = (mode); \ 74 } \ 75} while (0) 76 77/* 78 * Acquire a write lock if we don't already have one. 79 * 80 * !!! 81 * See ACQUIRE macro on why we handle cursors that don't have locks. 82 */ 83#undef ACQUIRE_WRITE_LOCK 84#define ACQUIRE_WRITE_LOCK(dbc, ret) do { \ 85 BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \ 86 ret = 0; \ 87 if (STD_LOCKING(dbc) && \ 88 __cp->lock_mode != DB_LOCK_WRITE && \ 89 ((ret) = __db_lget(dbc, \ 90 LOCK_ISSET(__cp->lock) ? LCK_COUPLE : 0, \ 91 __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0) \ 92 __cp->lock_mode = DB_LOCK_WRITE; \ 93} while (0) 94 95/* Discard the current page/lock for a cursor. */ 96#undef DISCARD_CUR 97#define DISCARD_CUR(dbc, ret) do { \ 98 BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal; \ 99 DB_MPOOLFILE *__mpf = (dbc)->dbp->mpf; \ 100 int __t_ret; \ 101 if ((__cp->page) != NULL) { \ 102 __t_ret = __memp_fput(__mpf, \ 103 (dbc)->thread_info, __cp->page, dbc->priority);\ 104 __cp->page = NULL; \ 105 } else \ 106 __t_ret = 0; \ 107 if (__t_ret != 0 && (ret) == 0) \ 108 ret = __t_ret; \ 109 __t_ret = __TLPUT((dbc), __cp->lock); \ 110 if (__t_ret != 0 && (ret) == 0) \ 111 ret = __t_ret; \ 112 if ((ret) == 0 && !LOCK_ISSET(__cp->lock)) \ 113 __cp->lock_mode = DB_LOCK_NG; \ 114} while (0) 115 116/* If on-page item is a deleted record. */ 117#undef IS_DELETED 118#define IS_DELETED(dbp, page, indx) \ 119 B_DISSET(GET_BKEYDATA(dbp, page, \ 120 (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type) 121#undef IS_CUR_DELETED 122#define IS_CUR_DELETED(dbc) \ 123 IS_DELETED((dbc)->dbp, (dbc)->internal->page, (dbc)->internal->indx) 124 125/* 126 * Test to see if two cursors could point to duplicates of the same key. 127 * In the case of off-page duplicates they are they same, as the cursors 128 * will be in the same off-page duplicate tree. In the case of on-page 129 * duplicates, the key index offsets must be the same. For the last test, 130 * as the original cursor may not have a valid page pointer, we use the 131 * current cursor's. 132 */ 133#undef IS_DUPLICATE 134#define IS_DUPLICATE(dbc, i1, i2) \ 135 (P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i1] == \ 136 P_INP((dbc)->dbp,((PAGE *)(dbc)->internal->page))[i2]) 137#undef IS_CUR_DUPLICATE 138#define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx) \ 139 (F_ISSET(dbc, DBC_OPD) || \ 140 (orig_pgno == (dbc)->internal->pgno && \ 141 IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx))) 142 143/* 144 * __bamc_init -- 145 * Initialize the access private portion of a cursor 146 * 147 * PUBLIC: int __bamc_init __P((DBC *, DBTYPE)); 148 */ 149int 150__bamc_init(dbc, dbtype) 151 DBC *dbc; 152 DBTYPE dbtype; 153{ 154 ENV *env; 155 int ret; 156 157 env = dbc->env; 158 159 /* Allocate/initialize the internal structure. */ 160 if (dbc->internal == NULL && (ret = 161 __os_calloc(env, 1, sizeof(BTREE_CURSOR), &dbc->internal)) != 0) 162 return (ret); 163 164 /* Initialize methods. */ 165 dbc->close = dbc->c_close = __dbc_close_pp; 166 dbc->count = dbc->c_count = __dbc_count_pp; 167 dbc->del = dbc->c_del = __dbc_del_pp; 168 dbc->dup = dbc->c_dup = __dbc_dup_pp; 169 dbc->get = dbc->c_get = __dbc_get_pp; 170 dbc->pget = dbc->c_pget = __dbc_pget_pp; 171 dbc->put = dbc->c_put = __dbc_put_pp; 172 if (dbtype == DB_BTREE) { 173 dbc->am_bulk = __bam_bulk; 174 dbc->am_close = __bamc_close; 175 dbc->am_del = __bamc_del; 176 dbc->am_destroy = __bamc_destroy; 177 dbc->am_get = __bamc_get; 178 dbc->am_put = __bamc_put; 179 dbc->am_writelock = __bamc_writelock; 180 } else { 181 dbc->am_bulk = __bam_bulk; 182 dbc->am_close = __bamc_close; 183 dbc->am_del = __ramc_del; 184 dbc->am_destroy = __bamc_destroy; 185 dbc->am_get = __ramc_get; 186 dbc->am_put = __ramc_put; 187 dbc->am_writelock = __bamc_writelock; 188 } 189 190 return (0); 191} 192 193/* 194 * __bamc_refresh 195 * Set things up properly for cursor re-use. 196 * 197 * PUBLIC: int __bamc_refresh __P((DBC *)); 198 */ 199int 200__bamc_refresh(dbc) 201 DBC *dbc; 202{ 203 BTREE *t; 204 BTREE_CURSOR *cp; 205 DB *dbp; 206 207 dbp = dbc->dbp; 208 t = dbp->bt_internal; 209 cp = (BTREE_CURSOR *)dbc->internal; 210 211 /* 212 * If our caller set the root page number, it's because the root was 213 * known. This is always the case for off page dup cursors. Else, 214 * pull it out of our internal information. 215 */ 216 if (cp->root == PGNO_INVALID) 217 cp->root = t->bt_root; 218 219 LOCK_INIT(cp->lock); 220 cp->lock_mode = DB_LOCK_NG; 221 222 if (cp->sp == NULL) { 223 cp->sp = cp->stack; 224 cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]); 225 } 226 BT_STK_CLR(cp); 227 228 /* 229 * The btree leaf page data structures require that two key/data pairs 230 * (or four items) fit on a page, but other than that there's no fixed 231 * requirement. The btree off-page duplicates only require two items, 232 * to be exact, but requiring four for them as well seems reasonable. 233 * 234 * Recno uses the btree bt_ovflsize value -- it's close enough. 235 */ 236 cp->ovflsize = B_MINKEY_TO_OVFLSIZE( 237 dbp, F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey, dbp->pgsize); 238 239 cp->recno = RECNO_OOB; 240 cp->order = INVALID_ORDER; 241 cp->flags = 0; 242 243 /* Initialize for record numbers. */ 244 if (F_ISSET(dbc, DBC_OPD) || 245 dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_AM_RECNUM)) { 246 F_SET(cp, C_RECNUM); 247 248 /* 249 * All btrees that support record numbers, optionally standard 250 * recno trees, and all off-page duplicate recno trees have 251 * mutable record numbers. 252 */ 253 if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) || 254 F_ISSET(dbp, DB_AM_RECNUM | DB_AM_RENUMBER)) 255 F_SET(cp, C_RENUMBER); 256 } 257 258 return (0); 259} 260 261/* 262 * __bamc_close -- 263 * Close down the cursor. 264 */ 265static int 266__bamc_close(dbc, root_pgno, rmroot) 267 DBC *dbc; 268 db_pgno_t root_pgno; 269 int *rmroot; 270{ 271 BTREE_CURSOR *cp, *cp_opd, *cp_c; 272 DB *dbp; 273 DBC *dbc_opd, *dbc_c; 274 DB_MPOOLFILE *mpf; 275 ENV *env; 276 PAGE *h; 277 int cdb_lock, count, ret; 278 279 dbp = dbc->dbp; 280 env = dbp->env; 281 mpf = dbp->mpf; 282 cp = (BTREE_CURSOR *)dbc->internal; 283 cp_opd = (dbc_opd = cp->opd) == NULL ? 284 NULL : (BTREE_CURSOR *)dbc_opd->internal; 285 cdb_lock = ret = 0; 286 287 /* 288 * There are 3 ways this function is called: 289 * 290 * 1. Closing a primary cursor: we get called with a pointer to a 291 * primary cursor that has a NULL opd field. This happens when 292 * closing a btree/recno database cursor without an associated 293 * off-page duplicate tree. 294 * 295 * 2. Closing a primary and an off-page duplicate cursor stack: we 296 * get called with a pointer to the primary cursor which has a 297 * non-NULL opd field. This happens when closing a btree cursor 298 * into database with an associated off-page btree/recno duplicate 299 * tree. (It can't be a primary recno database, recno databases 300 * don't support duplicates.) 301 * 302 * 3. Closing an off-page duplicate cursor stack: we get called with 303 * a pointer to the off-page duplicate cursor. This happens when 304 * closing a non-btree database that has an associated off-page 305 * btree/recno duplicate tree or for a btree database when the 306 * opd tree is not empty (root_pgno == PGNO_INVALID). 307 * 308 * If either the primary or off-page duplicate cursor deleted a btree 309 * key/data pair, check to see if the item is still referenced by a 310 * different cursor. If it is, confirm that cursor's delete flag is 311 * set and leave it to that cursor to do the delete. 312 * 313 * NB: The test for == 0 below is correct. Our caller already removed 314 * our cursor argument from the active queue, we won't find it when we 315 * search the queue in __bam_ca_delete(). 316 * NB: It can't be true that both the primary and off-page duplicate 317 * cursors have deleted a btree key/data pair. Either the primary 318 * cursor may have deleted an item and there's no off-page duplicate 319 * cursor, or there's an off-page duplicate cursor and it may have 320 * deleted an item. 321 * 322 * Primary recno databases aren't an issue here. Recno keys are either 323 * deleted immediately or never deleted, and do not have to be handled 324 * here. 325 * 326 * Off-page duplicate recno databases are an issue here, cases #2 and 327 * #3 above can both be off-page recno databases. The problem is the 328 * same as the final problem for off-page duplicate btree databases. 329 * If we no longer need the off-page duplicate tree, we want to remove 330 * it. For off-page duplicate btrees, we are done with the tree when 331 * we delete the last item it contains, i.e., there can be no further 332 * references to it when it's empty. For off-page duplicate recnos, 333 * we remove items from the tree as the application calls the remove 334 * function, so we are done with the tree when we close the last cursor 335 * that references it. 336 * 337 * We optionally take the root page number from our caller. If the 338 * primary database is a btree, we can get it ourselves because dbc 339 * is the primary cursor. If the primary database is not a btree, 340 * the problem is that we may be dealing with a stack of pages. The 341 * cursor we're using to do the delete points at the bottom of that 342 * stack and we need the top of the stack. 343 */ 344 if (F_ISSET(cp, C_DELETED)) { 345 dbc_c = dbc; 346 switch (dbc->dbtype) { 347 case DB_BTREE: /* Case #1, #3. */ 348 if ((ret = __bam_ca_delete( 349 dbp, cp->pgno, cp->indx, 1, &count)) != 0) 350 goto err; 351 if (count == 0) 352 goto lock; 353 goto done; 354 case DB_RECNO: 355 if (!F_ISSET(dbc, DBC_OPD)) /* Case #1. */ 356 goto done; 357 /* Case #3. */ 358 if ((ret = __ram_ca_delete(dbp, cp->root, &count)) != 0) 359 goto err; 360 if (count == 0) 361 goto lock; 362 goto done; 363 case DB_HASH: 364 case DB_QUEUE: 365 case DB_UNKNOWN: 366 default: 367 ret = __db_unknown_type( 368 env, "DbCursor.close", dbc->dbtype); 369 goto err; 370 } 371 } 372 373 if (dbc_opd == NULL) 374 goto done; 375 376 if (F_ISSET(cp_opd, C_DELETED)) { /* Case #2. */ 377 /* 378 * We will not have been provided a root page number. Acquire 379 * one from the primary database. 380 */ 381 if ((ret = __memp_fget(mpf, &cp->pgno, 382 dbc->thread_info, dbc->txn, 0, &h)) != 0) 383 goto err; 384 root_pgno = GET_BOVERFLOW(dbp, h, cp->indx + O_INDX)->pgno; 385 if ((ret = __memp_fput(mpf, 386 dbc->thread_info, h, dbc->priority)) != 0) 387 goto err; 388 389 dbc_c = dbc_opd; 390 switch (dbc_opd->dbtype) { 391 case DB_BTREE: 392 if ((ret = __bam_ca_delete( 393 dbp, cp_opd->pgno, cp_opd->indx, 1, &count)) != 0) 394 goto err; 395 if (count == 0) 396 goto lock; 397 goto done; 398 case DB_RECNO: 399 if ((ret = 400 __ram_ca_delete(dbp, cp_opd->root, &count)) != 0) 401 goto err; 402 if (count == 0) 403 goto lock; 404 goto done; 405 case DB_HASH: 406 case DB_QUEUE: 407 case DB_UNKNOWN: 408 default: 409 ret = __db_unknown_type( 410 env, "DbCursor.close", dbc->dbtype); 411 goto err; 412 } 413 } 414 goto done; 415 416lock: cp_c = (BTREE_CURSOR *)dbc_c->internal; 417 418 /* 419 * If this is CDB, upgrade the lock if necessary. While we acquired 420 * the write lock to logically delete the record, we released it when 421 * we returned from that call, and so may not be holding a write lock 422 * at the moment. 423 */ 424 if (CDB_LOCKING(env)) { 425 if (F_ISSET(dbc, DBC_WRITECURSOR)) { 426 if ((ret = __lock_get(env, 427 dbc->locker, DB_LOCK_UPGRADE, &dbc->lock_dbt, 428 DB_LOCK_WRITE, &dbc->mylock)) != 0) 429 goto err; 430 cdb_lock = 1; 431 } 432 goto delete; 433 } 434 435 /* 436 * The variable dbc_c has been initialized to reference the cursor in 437 * which we're going to do the delete. Initialize the cursor's lock 438 * structures as necessary. 439 * 440 * First, we may not need to acquire any locks. If we're in case #3, 441 * that is, the primary database isn't a btree database, our caller 442 * is responsible for acquiring any necessary locks before calling us. 443 */ 444 if (F_ISSET(dbc, DBC_OPD)) 445 goto delete; 446 447 /* 448 * Otherwise, acquire a write lock on the primary database's page. 449 * 450 * Lock the primary database page, regardless of whether we're deleting 451 * an item on a primary database page or an off-page duplicates page. 452 * 453 * If the cursor that did the initial logical deletion (and had a write 454 * lock) is not the same cursor doing the physical deletion (which may 455 * have only ever had a read lock on the item), we need to upgrade to a 456 * write lock. The confusion comes as follows: 457 * 458 * C1 created, acquires item read lock 459 * C2 dup C1, create C2, also has item read lock. 460 * C1 acquire write lock, delete item 461 * C1 close 462 * C2 close, needs a write lock to physically delete item. 463 * 464 * If we're in a TXN, we know that C2 will be able to acquire the write 465 * lock, because no locker other than the one shared by C1 and C2 can 466 * acquire a write lock -- the original write lock C1 acquired was never 467 * discarded. 468 * 469 * If we're not in a TXN, it's nastier. Other cursors might acquire 470 * read locks on the item after C1 closed, discarding its write lock, 471 * and such locks would prevent C2 from acquiring a read lock. That's 472 * OK, though, we'll simply wait until we can acquire a write lock, or 473 * we'll deadlock. (Which better not happen, since we're not in a TXN.) 474 * 475 * There are similar scenarios with dirty reads, where the cursor may 476 * have downgraded its write lock to a was-write lock. 477 */ 478 if (STD_LOCKING(dbc)) 479 if ((ret = __db_lget(dbc, 480 LCK_COUPLE, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) 481 goto err; 482 483delete: /* 484 * If the delete occurred in a Btree, we're going to look at the page 485 * to see if the item has to be physically deleted. Otherwise, we do 486 * not need the actual page (and it may not even exist, it might have 487 * been truncated from the file after an allocation aborted). 488 * 489 * Delete the on-page physical item referenced by the cursor. 490 */ 491 if (dbc_c->dbtype == DB_BTREE) { 492 if ((ret = __memp_fget(mpf, &cp_c->pgno, dbc->thread_info, 493 dbc->txn, DB_MPOOL_DIRTY, &cp_c->page)) != 0) 494 goto err; 495 if ((ret = __bamc_physdel(dbc_c)) != 0) 496 goto err; 497 } 498 499 /* 500 * If we're not working in an off-page duplicate tree, then we're 501 * done. 502 */ 503 if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID) 504 goto done; 505 506 /* 507 * We may have just deleted the last element in the off-page duplicate 508 * tree, and closed the last cursor in the tree. For an off-page btree 509 * there are no other cursors in the tree by definition, if the tree is 510 * empty. For an off-page recno we know we have closed the last cursor 511 * in the tree because the __ram_ca_delete call above returned 0 only 512 * in that case. So, if the off-page duplicate tree is empty at this 513 * point, we want to remove it. 514 */ 515 if ((ret = __memp_fget(mpf, &root_pgno, 516 dbc->thread_info, dbc->txn, 0, &h)) != 0) 517 goto err; 518 if (NUM_ENT(h) == 0) { 519 DISCARD_CUR(dbc_c, ret); 520 if (ret != 0) 521 goto err; 522 if ((ret = __db_free(dbc, h)) != 0) 523 goto err; 524 } else { 525 if ((ret = __memp_fput(mpf, 526 dbc->thread_info, h, dbc->priority)) != 0) 527 goto err; 528 goto done; 529 } 530 531 /* 532 * When removing the tree, we have to do one of two things. If this is 533 * case #2, that is, the primary tree is a btree, delete the key that's 534 * associated with the tree from the btree leaf page. We know we are 535 * the only reference to it and we already have the correct lock. We 536 * detect this case because the cursor that was passed to us references 537 * an off-page duplicate cursor. 538 * 539 * If this is case #3, that is, the primary tree isn't a btree, pass 540 * the information back to our caller, it's their job to do cleanup on 541 * the primary page. 542 */ 543 if (dbc_opd != NULL) { 544 if ((ret = __memp_fget(mpf, &cp->pgno, dbc->thread_info, 545 dbc->txn, DB_MPOOL_DIRTY, &cp->page)) != 0) 546 goto err; 547 if ((ret = __bamc_physdel(dbc)) != 0) 548 goto err; 549 } else 550 *rmroot = 1; 551err: 552done: /* 553 * Discard the page references and locks, and confirm that the stack 554 * has been emptied. 555 */ 556 if (dbc_opd != NULL) 557 DISCARD_CUR(dbc_opd, ret); 558 DISCARD_CUR(dbc, ret); 559 560 /* Downgrade any CDB lock we acquired. */ 561 if (cdb_lock) 562 (void)__lock_downgrade(env, &dbc->mylock, DB_LOCK_IWRITE, 0); 563 564 return (ret); 565} 566 567/* 568 * __bamc_destroy -- 569 * Close a single cursor -- internal version. 570 */ 571static int 572__bamc_destroy(dbc) 573 DBC *dbc; 574{ 575 BTREE_CURSOR *cp; 576 ENV *env; 577 578 cp = (BTREE_CURSOR *)dbc->internal; 579 env = dbc->env; 580 581 /* Discard the structures. */ 582 if (cp->sp != cp->stack) 583 __os_free(env, cp->sp); 584 __os_free(env, cp); 585 586 return (0); 587} 588 589/* 590 * __bamc_count -- 591 * Return a count of on and off-page duplicates. 592 * 593 * PUBLIC: int __bamc_count __P((DBC *, db_recno_t *)); 594 */ 595int 596__bamc_count(dbc, recnop) 597 DBC *dbc; 598 db_recno_t *recnop; 599{ 600 BTREE_CURSOR *cp; 601 DB *dbp; 602 DB_MPOOLFILE *mpf; 603 db_indx_t indx, top; 604 db_recno_t recno; 605 int ret; 606 607 dbp = dbc->dbp; 608 mpf = dbp->mpf; 609 cp = (BTREE_CURSOR *)dbc->internal; 610 611 /* 612 * Called with the top-level cursor that may reference an off-page 613 * duplicates tree. We don't have to acquire any new locks, we have 614 * to have a read lock to even get here. 615 */ 616 if (cp->opd == NULL) { 617 /* 618 * On-page duplicates, get the page and count. 619 */ 620 if ((ret = __memp_fget(mpf, &cp->pgno, 621 dbc->thread_info, dbc->txn, 0, &cp->page)) != 0) 622 return (ret); 623 624 /* 625 * Move back to the beginning of the set of duplicates and 626 * then count forward. 627 */ 628 for (indx = cp->indx;; indx -= P_INDX) 629 if (indx == 0 || 630 !IS_DUPLICATE(dbc, indx, indx - P_INDX)) 631 break; 632 for (recno = 0, 633 top = NUM_ENT(cp->page) - P_INDX;; indx += P_INDX) { 634 if (!IS_DELETED(dbp, cp->page, indx)) 635 ++recno; 636 if (indx == top || 637 !IS_DUPLICATE(dbc, indx, indx + P_INDX)) 638 break; 639 } 640 } else { 641 /* 642 * Off-page duplicates tree, get the root page of the off-page 643 * duplicate tree. 644 */ 645 if ((ret = __memp_fget(mpf, &cp->opd->internal->root, 646 dbc->thread_info, dbc->txn, 0, &cp->page)) != 0) 647 return (ret); 648 649 /* 650 * If the page is an internal page use the page's count as it's 651 * up-to-date and reflects the status of cursors in the tree. 652 * If the page is a leaf page for unsorted duplicates, use the 653 * page's count as cursors don't mark items deleted on the page 654 * and wait, cursor delete items immediately. 655 * If the page is a leaf page for sorted duplicates, there may 656 * be cursors on the page marking deleted items -- count. 657 */ 658 if (TYPE(cp->page) == P_LDUP) 659 for (recno = 0, indx = 0, 660 top = NUM_ENT(cp->page) - O_INDX;; indx += O_INDX) { 661 if (!IS_DELETED(dbp, cp->page, indx)) 662 ++recno; 663 if (indx == top) 664 break; 665 } 666 else 667 recno = RE_NREC(cp->page); 668 } 669 670 *recnop = recno; 671 672 ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority); 673 cp->page = NULL; 674 675 return (ret); 676} 677 678/* 679 * __bamc_del -- 680 * Delete using a cursor. 681 */ 682static int 683__bamc_del(dbc) 684 DBC *dbc; 685{ 686 BTREE_CURSOR *cp; 687 DB *dbp; 688 DB_MPOOLFILE *mpf; 689 int count, ret, t_ret; 690 691 dbp = dbc->dbp; 692 mpf = dbp->mpf; 693 cp = (BTREE_CURSOR *)dbc->internal; 694 ret = 0; 695 696 /* If the item was already deleted, return failure. */ 697 if (F_ISSET(cp, C_DELETED)) 698 return (DB_KEYEMPTY); 699 700 /* 701 * This code is always called with a page lock but no page. 702 */ 703 DB_ASSERT(dbp->env, cp->page == NULL); 704 705 /* 706 * We don't physically delete the record until the cursor moves, so 707 * we have to have a long-lived write lock on the page instead of a 708 * a long-lived read lock. Note, we have to have a read lock to even 709 * get here. 710 * 711 * If we're maintaining record numbers, we lock the entire tree, else 712 * we lock the single page. 713 */ 714 if (F_ISSET(cp, C_RECNUM)) { 715 if ((ret = __bamc_getstack(dbc)) != 0) 716 goto err; 717 cp->page = cp->csp->page; 718 } else { 719 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, cp->pgno, 0, ret); 720 if (ret != 0) 721 goto err; 722 } 723 724 /* Mark the page dirty. */ 725 if ((ret = __memp_dirty(mpf, 726 &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 727 goto err; 728 729 /* Log the change. */ 730 if (DBC_LOGGING(dbc)) { 731 if ((ret = __bam_cdel_log(dbp, dbc->txn, &LSN(cp->page), 0, 732 PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0) 733 goto err; 734 } else 735 LSN_NOT_LOGGED(LSN(cp->page)); 736 737 /* Set the intent-to-delete flag on the page. */ 738 if (TYPE(cp->page) == P_LBTREE) 739 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx + O_INDX)->type); 740 else 741 B_DSET(GET_BKEYDATA(dbp, cp->page, cp->indx)->type); 742 743err: /* 744 * If we've been successful so far and the tree has record numbers, 745 * adjust the record counts. Either way, release acquired page(s). 746 */ 747 if (F_ISSET(cp, C_RECNUM)) { 748 cp->csp->page = cp->page; 749 if (ret == 0) 750 ret = __bam_adjust(dbc, -1); 751 (void)__bam_stkrel(dbc, 0); 752 } else 753 if (cp->page != NULL && 754 (t_ret = __memp_fput(mpf, dbc->thread_info, 755 cp->page, dbc->priority)) != 0 && ret == 0) 756 ret = t_ret; 757 758 cp->page = NULL; 759 760 /* 761 * Update the cursors last, after all chance of recoverable failure 762 * is past. 763 */ 764 if (ret == 0) 765 ret = __bam_ca_delete(dbp, cp->pgno, cp->indx, 1, &count); 766 767 return (ret); 768} 769 770/* 771 * __bamc_dup -- 772 * Duplicate a btree cursor, such that the new one holds appropriate 773 * locks for the position of the original. 774 * 775 * PUBLIC: int __bamc_dup __P((DBC *, DBC *)); 776 */ 777int 778__bamc_dup(orig_dbc, new_dbc) 779 DBC *orig_dbc, *new_dbc; 780{ 781 BTREE_CURSOR *orig, *new; 782 783 orig = (BTREE_CURSOR *)orig_dbc->internal; 784 new = (BTREE_CURSOR *)new_dbc->internal; 785 786 new->ovflsize = orig->ovflsize; 787 new->recno = orig->recno; 788 new->flags = orig->flags; 789 790 return (0); 791} 792 793/* 794 * __bamc_get -- 795 * Get using a cursor (btree). 796 */ 797static int 798__bamc_get(dbc, key, data, flags, pgnop) 799 DBC *dbc; 800 DBT *key, *data; 801 u_int32_t flags; 802 db_pgno_t *pgnop; 803{ 804 BTREE_CURSOR *cp; 805 DB *dbp; 806 DB_MPOOLFILE *mpf; 807 db_pgno_t orig_pgno; 808 db_indx_t orig_indx; 809 int exact, newopd, ret; 810 811 dbp = dbc->dbp; 812 mpf = dbp->mpf; 813 cp = (BTREE_CURSOR *)dbc->internal; 814 orig_pgno = cp->pgno; 815 orig_indx = cp->indx; 816 817 newopd = 0; 818 switch (flags) { 819 case DB_CURRENT: 820 /* It's not possible to return a deleted record. */ 821 if (F_ISSET(cp, C_DELETED)) { 822 ret = DB_KEYEMPTY; 823 goto err; 824 } 825 826 /* 827 * Acquire the current page. We have at least a read-lock 828 * already. The caller may have set DB_RMW asking for a 829 * write lock, but upgrading to a write lock has no better 830 * chance of succeeding now instead of later, so don't try. 831 */ 832 if ((ret = __memp_fget(mpf, &cp->pgno, 833 dbc->thread_info, dbc->txn, 0, &cp->page)) != 0) 834 goto err; 835 break; 836 case DB_FIRST: 837 newopd = 1; 838 if ((ret = __bamc_search(dbc, 839 PGNO_INVALID, NULL, flags, &exact)) != 0) 840 goto err; 841 break; 842 case DB_GET_BOTH: 843 case DB_GET_BOTH_RANGE: 844 /* 845 * There are two ways to get here based on DBcursor->get 846 * with the DB_GET_BOTH/DB_GET_BOTH_RANGE flags set: 847 * 848 * 1. Searching a sorted off-page duplicate tree: do a tree 849 * search. 850 * 851 * 2. Searching btree: do a tree search. If it returns a 852 * reference to off-page duplicate tree, return immediately 853 * and let our caller deal with it. If the search doesn't 854 * return a reference to off-page duplicate tree, continue 855 * with an on-page search. 856 */ 857 if (F_ISSET(dbc, DBC_OPD)) { 858 if ((ret = __bamc_search( 859 dbc, PGNO_INVALID, data, flags, &exact)) != 0) 860 goto err; 861 if (flags == DB_GET_BOTH) { 862 if (!exact) { 863 ret = DB_NOTFOUND; 864 goto err; 865 } 866 break; 867 } 868 869 /* 870 * We didn't require an exact match, so the search may 871 * may have returned an entry past the end of the page, 872 * or we may be referencing a deleted record. If so, 873 * move to the next entry. 874 */ 875 if ((cp->indx == NUM_ENT(cp->page) || 876 IS_CUR_DELETED(dbc)) && 877 (ret = __bamc_next(dbc, 1, 0)) != 0) 878 goto err; 879 } else { 880 if ((ret = __bamc_search( 881 dbc, PGNO_INVALID, key, flags, &exact)) != 0) 882 return (ret); 883 if (!exact) { 884 ret = DB_NOTFOUND; 885 goto err; 886 } 887 888 if (pgnop != NULL && __bam_isopd(dbc, pgnop)) { 889 newopd = 1; 890 break; 891 } 892 if ((ret = 893 __bam_getboth_finddatum(dbc, data, flags)) != 0) 894 goto err; 895 } 896 break; 897 case DB_GET_BOTHC: 898 if ((ret = __bam_getbothc(dbc, data)) != 0) 899 goto err; 900 break; 901 case DB_LAST: 902 newopd = 1; 903 if ((ret = __bamc_search(dbc, 904 PGNO_INVALID, NULL, flags, &exact)) != 0) 905 goto err; 906 break; 907 case DB_NEXT: 908 newopd = 1; 909 if (cp->pgno == PGNO_INVALID) { 910 if ((ret = __bamc_search(dbc, 911 PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0) 912 goto err; 913 } else 914 if ((ret = __bamc_next(dbc, 1, 0)) != 0) 915 goto err; 916 break; 917 case DB_NEXT_DUP: 918 if ((ret = __bamc_next(dbc, 1, 0)) != 0) 919 goto err; 920 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) { 921 ret = DB_NOTFOUND; 922 goto err; 923 } 924 break; 925 case DB_NEXT_NODUP: 926 newopd = 1; 927 if (cp->pgno == PGNO_INVALID) { 928 if ((ret = __bamc_search(dbc, 929 PGNO_INVALID, NULL, DB_FIRST, &exact)) != 0) 930 goto err; 931 } else 932 do { 933 if ((ret = __bamc_next(dbc, 1, 0)) != 0) 934 goto err; 935 } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)); 936 break; 937 case DB_PREV: 938 newopd = 1; 939 if (cp->pgno == PGNO_INVALID) { 940 if ((ret = __bamc_search(dbc, 941 PGNO_INVALID, NULL, DB_LAST, &exact)) != 0) 942 goto err; 943 } else 944 if ((ret = __bamc_prev(dbc)) != 0) 945 goto err; 946 break; 947 case DB_PREV_DUP: 948 if ((ret = __bamc_prev(dbc)) != 0) 949 goto err; 950 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) { 951 ret = DB_NOTFOUND; 952 goto err; 953 } 954 break; 955 case DB_PREV_NODUP: 956 newopd = 1; 957 if (cp->pgno == PGNO_INVALID) { 958 if ((ret = __bamc_search(dbc, 959 PGNO_INVALID, NULL, DB_LAST, &exact)) != 0) 960 goto err; 961 } else 962 do { 963 if ((ret = __bamc_prev(dbc)) != 0) 964 goto err; 965 } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)); 966 break; 967 case DB_SET: 968 case DB_SET_RECNO: 969 newopd = 1; 970 if ((ret = __bamc_search(dbc, 971 PGNO_INVALID, key, flags, &exact)) != 0) 972 goto err; 973 break; 974 case DB_SET_RANGE: 975 newopd = 1; 976 if ((ret = __bamc_search(dbc, 977 PGNO_INVALID, key, flags, &exact)) != 0) 978 goto err; 979 980 /* 981 * As we didn't require an exact match, the search function 982 * may have returned an entry past the end of the page. Or, 983 * we may be referencing a deleted record. If so, move to 984 * the next entry. 985 */ 986 if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc)) 987 if ((ret = __bamc_next(dbc, 0, 0)) != 0) 988 goto err; 989 break; 990 default: 991 ret = __db_unknown_flag(dbp->env, "__bamc_get", flags); 992 goto err; 993 } 994 995 /* 996 * We may have moved to an off-page duplicate tree. Return that 997 * information to our caller. 998 */ 999 if (newopd && pgnop != NULL) 1000 (void)__bam_isopd(dbc, pgnop); 1001 1002err: /* 1003 * Regardless of whether we were successful or not, if the cursor 1004 * moved, clear the delete flag, DBcursor->get never references a 1005 * deleted key, if it moved at all. 1006 */ 1007 if (F_ISSET(cp, C_DELETED) && 1008 (cp->pgno != orig_pgno || cp->indx != orig_indx)) 1009 F_CLR(cp, C_DELETED); 1010 1011 return (ret); 1012} 1013 1014static int 1015__bam_get_prev(dbc) 1016 DBC *dbc; 1017{ 1018 BTREE_CURSOR *cp; 1019 DBT key, data; 1020 db_pgno_t pgno; 1021 int ret; 1022 1023 if ((ret = __bamc_prev(dbc)) != 0) 1024 return (ret); 1025 1026 if (__bam_isopd(dbc, &pgno)) { 1027 cp = (BTREE_CURSOR *)dbc->internal; 1028 if ((ret = __dbc_newopd(dbc, pgno, cp->opd, &cp->opd)) != 0) 1029 return (ret); 1030 if ((ret = cp->opd->am_get(cp->opd, 1031 &key, &data, DB_LAST, NULL)) != 0) 1032 return (ret); 1033 } 1034 1035 return (0); 1036} 1037 1038/* 1039 * __bam_bulk -- Return bulk data from a btree. 1040 */ 1041static int 1042__bam_bulk(dbc, data, flags) 1043 DBC *dbc; 1044 DBT *data; 1045 u_int32_t flags; 1046{ 1047 BKEYDATA *bk; 1048 BOVERFLOW *bo; 1049 BTREE_CURSOR *cp; 1050 PAGE *pg; 1051 db_indx_t *inp, indx, pg_keyoff; 1052 int32_t *endp, key_off, *offp, *saveoffp; 1053 u_int8_t *dbuf, *dp, *np; 1054 u_int32_t key_size, pagesize, size, space; 1055 int adj, is_key, need_pg, next_key, no_dup, rec_key, ret; 1056 1057 ret = 0; 1058 key_off = 0; 1059 size = 0; 1060 pagesize = dbc->dbp->pgsize; 1061 cp = (BTREE_CURSOR *)dbc->internal; 1062 1063 /* 1064 * dp tracks the beginning of the page in the buffer. 1065 * np is the next place to copy things into the buffer. 1066 * dbuf always stays at the beginning of the buffer. 1067 */ 1068 dbuf = data->data; 1069 np = dp = dbuf; 1070 1071 /* Keep track of space that is left. There is a termination entry */ 1072 space = data->ulen; 1073 space -= sizeof(*offp); 1074 1075 /* Build the offset/size table from the end up. */ 1076 endp = (int32_t *)((u_int8_t *)dbuf + data->ulen); 1077 endp--; 1078 offp = endp; 1079 1080 key_size = 0; 1081 1082 /* 1083 * Distinguish between BTREE and RECNO. 1084 * There are no keys in RECNO. If MULTIPLE_KEY is specified 1085 * then we return the record numbers. 1086 * is_key indicates that multiple btree keys are returned. 1087 * rec_key is set if we are returning record numbers. 1088 * next_key is set if we are going after the next key rather than dup. 1089 */ 1090 if (dbc->dbtype == DB_BTREE) { 1091 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1: 0; 1092 rec_key = 0; 1093 next_key = is_key && LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP; 1094 adj = 2; 1095 } else { 1096 is_key = 0; 1097 rec_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0; 1098 next_key = LF_ISSET(DB_OPFLAGS_MASK) != DB_NEXT_DUP; 1099 adj = 1; 1100 } 1101 no_dup = LF_ISSET(DB_OPFLAGS_MASK) == DB_NEXT_NODUP; 1102 1103next_pg: 1104 indx = cp->indx; 1105 pg = cp->page; 1106 1107 inp = P_INP(dbc->dbp, pg); 1108 /* The current page is not yet in the buffer. */ 1109 need_pg = 1; 1110 1111 /* 1112 * Keep track of the offset of the current key on the page. 1113 * If we are returning keys, set it to 0 first so we force 1114 * the copy of the key to the buffer. 1115 */ 1116 pg_keyoff = 0; 1117 if (is_key == 0) 1118 pg_keyoff = inp[indx]; 1119 1120 do { 1121 if (IS_DELETED(dbc->dbp, pg, indx)) { 1122 if (dbc->dbtype != DB_RECNO) 1123 continue; 1124 1125 cp->recno++; 1126 /* 1127 * If we are not returning recnos then we 1128 * need to fill in every slot so the user 1129 * can calculate the record numbers. 1130 */ 1131 if (rec_key != 0) 1132 continue; 1133 1134 space -= 2 * sizeof(*offp); 1135 /* Check if space as underflowed. */ 1136 if (space > data->ulen) 1137 goto back_up; 1138 1139 /* Just mark the empty recno slots. */ 1140 *offp-- = 0; 1141 *offp-- = 0; 1142 continue; 1143 } 1144 1145 /* 1146 * Check to see if we have a new key. 1147 * If so, then see if we need to put the 1148 * key on the page. If its already there 1149 * then we just point to it. 1150 */ 1151 if (is_key && pg_keyoff != inp[indx]) { 1152 bk = GET_BKEYDATA(dbc->dbp, pg, indx); 1153 if (B_TYPE(bk->type) == B_OVERFLOW) { 1154 bo = (BOVERFLOW *)bk; 1155 size = key_size = bo->tlen; 1156 if (key_size > space) 1157 goto get_key_space; 1158 if ((ret = __bam_bulk_overflow(dbc, 1159 bo->tlen, bo->pgno, np)) != 0) 1160 return (ret); 1161 space -= key_size; 1162 key_off = (int32_t)(np - dbuf); 1163 np += key_size; 1164 } else { 1165 if (need_pg) { 1166 dp = np; 1167 size = pagesize - HOFFSET(pg); 1168 if (space < size) { 1169get_key_space: 1170 /* Nothing added, then error. */ 1171 if (offp == endp) { 1172 data->size = (u_int32_t) 1173 DB_ALIGN(size + 1174 pagesize, 1024); 1175 return 1176 (DB_BUFFER_SMALL); 1177 } 1178 /* 1179 * We need to back up to the 1180 * last record put into the 1181 * buffer so that it is 1182 * CURRENT. 1183 */ 1184 if (indx != 0) 1185 indx -= P_INDX; 1186 else { 1187 if ((ret = 1188 __bam_get_prev( 1189 dbc)) != 0) 1190 return (ret); 1191 indx = cp->indx; 1192 pg = cp->page; 1193 } 1194 break; 1195 } 1196 /* 1197 * Move the data part of the page 1198 * to the buffer. 1199 */ 1200 memcpy(dp, 1201 (u_int8_t *)pg + HOFFSET(pg), size); 1202 need_pg = 0; 1203 space -= size; 1204 np += size; 1205 } 1206 key_size = bk->len; 1207 key_off = (int32_t)((inp[indx] - HOFFSET(pg)) 1208 + (dp - dbuf) + SSZA(BKEYDATA, data)); 1209 pg_keyoff = inp[indx]; 1210 } 1211 } 1212 1213 /* 1214 * Reserve space for the pointers and sizes. 1215 * Either key/data pair or just for a data item. 1216 */ 1217 space -= (is_key ? 4 : 2) * sizeof(*offp); 1218 if (rec_key) 1219 space -= sizeof(*offp); 1220 1221 /* Check to see if space has underflowed. */ 1222 if (space > data->ulen) 1223 goto back_up; 1224 1225 /* 1226 * Determine if the next record is in the 1227 * buffer already or if it needs to be copied in. 1228 * If we have an off page dup, then copy as many 1229 * as will fit into the buffer. 1230 */ 1231 bk = GET_BKEYDATA(dbc->dbp, pg, indx + adj - 1); 1232 if (B_TYPE(bk->type) == B_DUPLICATE) { 1233 bo = (BOVERFLOW *)bk; 1234 if (is_key) { 1235 *offp-- = (int32_t)key_off; 1236 *offp-- = (int32_t)key_size; 1237 } 1238 /* 1239 * We pass the offset of the current key. 1240 * On return we check to see if offp has 1241 * moved to see if any data fit. 1242 */ 1243 saveoffp = offp; 1244 if ((ret = __bam_bulk_duplicates(dbc, bo->pgno, 1245 dbuf, is_key ? offp + P_INDX : NULL, 1246 &offp, &np, &space, no_dup)) != 0) { 1247 if (ret == DB_BUFFER_SMALL) { 1248 size = space; 1249 space = 0; 1250 /* If nothing was added, then error. */ 1251 if (offp == saveoffp) { 1252 offp += 2; 1253 goto back_up; 1254 } 1255 goto get_space; 1256 } 1257 return (ret); 1258 } 1259 } else if (B_TYPE(bk->type) == B_OVERFLOW) { 1260 bo = (BOVERFLOW *)bk; 1261 size = bo->tlen; 1262 if (size > space) 1263 goto back_up; 1264 if ((ret = 1265 __bam_bulk_overflow(dbc, 1266 bo->tlen, bo->pgno, np)) != 0) 1267 return (ret); 1268 space -= size; 1269 if (is_key) { 1270 *offp-- = (int32_t)key_off; 1271 *offp-- = (int32_t)key_size; 1272 } else if (rec_key) 1273 *offp-- = (int32_t)cp->recno; 1274 *offp-- = (int32_t)(np - dbuf); 1275 np += size; 1276 *offp-- = (int32_t)size; 1277 } else { 1278 if (need_pg) { 1279 dp = np; 1280 size = pagesize - HOFFSET(pg); 1281 if (space < size) { 1282back_up: 1283 /* 1284 * Back up the index so that the 1285 * last record in the buffer is CURRENT 1286 */ 1287 if (indx >= adj) 1288 indx -= adj; 1289 else { 1290 if ((ret = 1291 __bam_get_prev(dbc)) != 0 && 1292 ret != DB_NOTFOUND) 1293 return (ret); 1294 indx = cp->indx; 1295 pg = cp->page; 1296 } 1297 if (dbc->dbtype == DB_RECNO) 1298 cp->recno--; 1299get_space: 1300 /* 1301 * See if we put anything in the 1302 * buffer or if we are doing a DBP->get 1303 * did we get all of the data. 1304 */ 1305 if (offp >= 1306 (is_key ? &endp[-1] : endp) || 1307 F_ISSET(dbc, DBC_TRANSIENT)) { 1308 data->size = (u_int32_t) 1309 DB_ALIGN(size + 1310 data->ulen - space, 1024); 1311 return (DB_BUFFER_SMALL); 1312 } 1313 break; 1314 } 1315 memcpy(dp, (u_int8_t *)pg + HOFFSET(pg), size); 1316 need_pg = 0; 1317 space -= size; 1318 np += size; 1319 } 1320 /* 1321 * Add the offsets and sizes to the end of the buffer. 1322 * First add the key info then the data info. 1323 */ 1324 if (is_key) { 1325 *offp-- = (int32_t)key_off; 1326 *offp-- = (int32_t)key_size; 1327 } else if (rec_key) 1328 *offp-- = (int32_t)cp->recno; 1329 *offp-- = (int32_t)((inp[indx + adj - 1] - HOFFSET(pg)) 1330 + (dp - dbuf) + SSZA(BKEYDATA, data)); 1331 *offp-- = bk->len; 1332 } 1333 if (dbc->dbtype == DB_RECNO) 1334 cp->recno++; 1335 else if (no_dup) { 1336 while (indx + adj < NUM_ENT(pg) && 1337 pg_keyoff == inp[indx + adj]) 1338 indx += adj; 1339 } 1340 /* 1341 * Stop when we either run off the page or we move to the next key and 1342 * we are not returning multiple keys. 1343 */ 1344 } while ((indx += adj) < NUM_ENT(pg) && 1345 (next_key || pg_keyoff == inp[indx])); 1346 1347 /* If we are off the page then try to the next page. */ 1348 if (ret == 0 && next_key && indx >= NUM_ENT(pg)) { 1349 cp->indx = indx; 1350 ret = __bamc_next(dbc, 0, 1); 1351 if (ret == 0) 1352 goto next_pg; 1353 if (ret != DB_NOTFOUND) 1354 return (ret); 1355 } 1356 1357 /* 1358 * If we did a DBP->get we must error if we did not return 1359 * all the data for the current key because there is 1360 * no way to know if we did not get it all, nor any 1361 * interface to fetch the balance. 1362 */ 1363 1364 if (ret == 0 && indx < pg->entries && 1365 F_ISSET(dbc, DBC_TRANSIENT) && pg_keyoff == inp[indx]) { 1366 data->size = (data->ulen - space) + size; 1367 return (DB_BUFFER_SMALL); 1368 } 1369 /* 1370 * Must leave the index pointing at the last record fetched. 1371 * If we are not fetching keys, we may have stepped to the 1372 * next key. 1373 */ 1374 if (ret == DB_BUFFER_SMALL || next_key || pg_keyoff == inp[indx]) 1375 cp->indx = indx; 1376 else 1377 cp->indx = indx - P_INDX; 1378 1379 if (rec_key == 1) 1380 *offp = RECNO_OOB; 1381 else 1382 *offp = -1; 1383 return (0); 1384} 1385 1386/* 1387 * __bam_bulk_overflow -- 1388 * Dump overflow record into the buffer. 1389 * The space requirements have already been checked. 1390 * PUBLIC: int __bam_bulk_overflow 1391 * PUBLIC: __P((DBC *, u_int32_t, db_pgno_t, u_int8_t *)); 1392 */ 1393int 1394__bam_bulk_overflow(dbc, len, pgno, dp) 1395 DBC *dbc; 1396 u_int32_t len; 1397 db_pgno_t pgno; 1398 u_int8_t *dp; 1399{ 1400 DBT dbt; 1401 1402 memset(&dbt, 0, sizeof(dbt)); 1403 F_SET(&dbt, DB_DBT_USERMEM); 1404 dbt.ulen = len; 1405 dbt.data = (void *)dp; 1406 return (__db_goff(dbc->dbp, 1407 dbc->thread_info, dbc->txn, &dbt, len, pgno, NULL, NULL)); 1408} 1409 1410/* 1411 * __bam_bulk_duplicates -- 1412 * Put as many off page duplicates as will fit into the buffer. 1413 * This routine will adjust the cursor to reflect the position in 1414 * the overflow tree. 1415 * PUBLIC: int __bam_bulk_duplicates __P((DBC *, 1416 * PUBLIC: db_pgno_t, u_int8_t *, int32_t *, 1417 * PUBLIC: int32_t **, u_int8_t **, u_int32_t *, int)); 1418 */ 1419int 1420__bam_bulk_duplicates(dbc, pgno, dbuf, keyoff, offpp, dpp, spacep, no_dup) 1421 DBC *dbc; 1422 db_pgno_t pgno; 1423 u_int8_t *dbuf; 1424 int32_t *keyoff, **offpp; 1425 u_int8_t **dpp; 1426 u_int32_t *spacep; 1427 int no_dup; 1428{ 1429 BKEYDATA *bk; 1430 BOVERFLOW *bo; 1431 BTREE_CURSOR *cp; 1432 DB *dbp; 1433 DBC *opd; 1434 DBT key, data; 1435 PAGE *pg; 1436 db_indx_t indx, *inp; 1437 int32_t *offp; 1438 u_int32_t pagesize, size, space; 1439 u_int8_t *dp, *np; 1440 int first, need_pg, ret, t_ret; 1441 1442 ret = 0; 1443 1444 dbp = dbc->dbp; 1445 cp = (BTREE_CURSOR *)dbc->internal; 1446 opd = cp->opd; 1447 1448 if (opd == NULL) { 1449 if ((ret = __dbc_newopd(dbc, pgno, NULL, &opd)) != 0) 1450 return (ret); 1451 cp->opd = opd; 1452 if ((ret = opd->am_get(opd, 1453 &key, &data, DB_FIRST, NULL)) != 0) 1454 goto close_opd; 1455 } 1456 1457 pagesize = opd->dbp->pgsize; 1458 cp = (BTREE_CURSOR *)opd->internal; 1459 space = *spacep; 1460 /* Get current offset slot. */ 1461 offp = *offpp; 1462 1463 /* 1464 * np is the next place to put data. 1465 * dp is the beginning of the current page in the buffer. 1466 */ 1467 np = dp = *dpp; 1468 first = 1; 1469 indx = cp->indx; 1470 1471 do { 1472 /* Fetch the current record. No initial move. */ 1473 if ((ret = __bamc_next(opd, 0, 0)) != 0) 1474 break; 1475 pg = cp->page; 1476 indx = cp->indx; 1477 inp = P_INP(dbp, pg); 1478 /* We need to copy the page to the buffer. */ 1479 need_pg = 1; 1480 1481 do { 1482 if (IS_DELETED(dbp, pg, indx)) 1483 goto contin; 1484 bk = GET_BKEYDATA(dbp, pg, indx); 1485 space -= 2 * sizeof(*offp); 1486 /* Allocate space for key if needed. */ 1487 if (first == 0 && keyoff != NULL) 1488 space -= 2 * sizeof(*offp); 1489 1490 /* Did space underflow? */ 1491 if (space > *spacep) { 1492 ret = DB_BUFFER_SMALL; 1493 if (first == 1) { 1494 /* Get the absolute value. */ 1495 space = -(int32_t)space; 1496 space = *spacep + space; 1497 if (need_pg) 1498 space += pagesize - HOFFSET(pg); 1499 } 1500 break; 1501 } 1502 if (B_TYPE(bk->type) == B_OVERFLOW) { 1503 bo = (BOVERFLOW *)bk; 1504 size = bo->tlen; 1505 if (size > space) { 1506 ret = DB_BUFFER_SMALL; 1507 space = *spacep + size; 1508 break; 1509 } 1510 if (first == 0 && keyoff != NULL) { 1511 *offp-- = keyoff[0]; 1512 *offp-- = keyoff[-1]; 1513 } 1514 if ((ret = __bam_bulk_overflow(dbc, 1515 bo->tlen, bo->pgno, np)) != 0) 1516 return (ret); 1517 space -= size; 1518 *offp-- = (int32_t)(np - dbuf); 1519 np += size; 1520 } else { 1521 if (need_pg) { 1522 dp = np; 1523 size = pagesize - HOFFSET(pg); 1524 if (space < size) { 1525 ret = DB_BUFFER_SMALL; 1526 /* Return space required. */ 1527 space = *spacep + size; 1528 break; 1529 } 1530 memcpy(dp, 1531 (u_int8_t *)pg + HOFFSET(pg), size); 1532 need_pg = 0; 1533 space -= size; 1534 np += size; 1535 } 1536 if (first == 0 && keyoff != NULL) { 1537 *offp-- = keyoff[0]; 1538 *offp-- = keyoff[-1]; 1539 } 1540 size = bk->len; 1541 *offp-- = (int32_t)((inp[indx] - HOFFSET(pg)) 1542 + (dp - dbuf) + SSZA(BKEYDATA, data)); 1543 } 1544 *offp-- = (int32_t)size; 1545 first = 0; 1546 if (no_dup) 1547 break; 1548contin: 1549 indx++; 1550 if (opd->dbtype == DB_RECNO) 1551 cp->recno++; 1552 } while (indx < NUM_ENT(pg)); 1553 if (no_dup) 1554 break; 1555 cp->indx = indx; 1556 1557 } while (ret == 0); 1558 1559 /* Return the updated information. */ 1560 *spacep = space; 1561 *offpp = offp; 1562 *dpp = np; 1563 1564 /* 1565 * If we ran out of space back up the pointer. 1566 * If we did not return any dups or reached the end, close the opd. 1567 */ 1568 if (ret == DB_BUFFER_SMALL) { 1569 if (opd->dbtype == DB_RECNO) { 1570 if (--cp->recno == 0) 1571 goto close_opd; 1572 } else if (indx != 0) 1573 cp->indx--; 1574 else { 1575 t_ret = __bamc_prev(opd); 1576 if (t_ret == DB_NOTFOUND) 1577 goto close_opd; 1578 if (t_ret != 0) 1579 ret = t_ret; 1580 } 1581 } else if (keyoff == NULL && ret == DB_NOTFOUND) { 1582 cp->indx--; 1583 if (opd->dbtype == DB_RECNO) 1584 --cp->recno; 1585 } else if (indx == 0 || ret == DB_NOTFOUND) { 1586close_opd: 1587 if (ret == DB_NOTFOUND) 1588 ret = 0; 1589 if ((t_ret = __dbc_close(opd)) != 0 && ret == 0) 1590 ret = t_ret; 1591 ((BTREE_CURSOR *)dbc->internal)->opd = NULL; 1592 } 1593 if (ret == DB_NOTFOUND) 1594 ret = 0; 1595 1596 return (ret); 1597} 1598 1599/* 1600 * __bam_getbothc -- 1601 * Search for a matching data item on a join. 1602 */ 1603static int 1604__bam_getbothc(dbc, data) 1605 DBC *dbc; 1606 DBT *data; 1607{ 1608 BTREE_CURSOR *cp; 1609 DB *dbp; 1610 DB_MPOOLFILE *mpf; 1611 int cmp, exact, ret; 1612 1613 dbp = dbc->dbp; 1614 mpf = dbp->mpf; 1615 cp = (BTREE_CURSOR *)dbc->internal; 1616 1617 /* 1618 * Acquire the current page. We have at least a read-lock 1619 * already. The caller may have set DB_RMW asking for a 1620 * write lock, but upgrading to a write lock has no better 1621 * chance of succeeding now instead of later, so don't try. 1622 */ 1623 if ((ret = __memp_fget(mpf, &cp->pgno, 1624 dbc->thread_info, dbc->txn, 0, &cp->page)) != 0) 1625 return (ret); 1626 1627 /* 1628 * An off-page duplicate cursor. Search the remaining duplicates 1629 * for one which matches (do a normal btree search, then verify 1630 * that the retrieved record is greater than the original one). 1631 */ 1632 if (F_ISSET(dbc, DBC_OPD)) { 1633 /* 1634 * Check to make sure the desired item comes strictly after 1635 * the current position; if it doesn't, return DB_NOTFOUND. 1636 */ 1637 if ((ret = __bam_cmp(dbp, dbc->thread_info, 1638 dbc->txn, data, cp->page, cp->indx, 1639 dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare, 1640 &cmp)) != 0) 1641 return (ret); 1642 1643 if (cmp <= 0) 1644 return (DB_NOTFOUND); 1645 1646 /* Discard the current page, we're going to do a full search. */ 1647 if ((ret = __memp_fput(mpf, 1648 dbc->thread_info, cp->page, dbc->priority)) != 0) 1649 return (ret); 1650 cp->page = NULL; 1651 1652 return (__bamc_search(dbc, 1653 PGNO_INVALID, data, DB_GET_BOTH, &exact)); 1654 } 1655 1656 /* 1657 * We're doing a DBC->get(DB_GET_BOTHC) and we're already searching 1658 * a set of on-page duplicates (either sorted or unsorted). Continue 1659 * a linear search from after the current position. 1660 * 1661 * (Note that we could have just finished a "set" of one duplicate, 1662 * i.e. not a duplicate at all, but the following check will always 1663 * return DB_NOTFOUND in this case, which is the desired behavior.) 1664 */ 1665 if (cp->indx + P_INDX >= NUM_ENT(cp->page) || 1666 !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX)) 1667 return (DB_NOTFOUND); 1668 cp->indx += P_INDX; 1669 1670 return (__bam_getboth_finddatum(dbc, data, DB_GET_BOTH)); 1671} 1672 1673/* 1674 * __bam_getboth_finddatum -- 1675 * Find a matching on-page data item. 1676 */ 1677static int 1678__bam_getboth_finddatum(dbc, data, flags) 1679 DBC *dbc; 1680 DBT *data; 1681 u_int32_t flags; 1682{ 1683 BTREE_CURSOR *cp; 1684 DB *dbp; 1685 db_indx_t base, lim, top; 1686 int cmp, ret; 1687 1688 COMPQUIET(cmp, 0); 1689 1690 dbp = dbc->dbp; 1691 cp = (BTREE_CURSOR *)dbc->internal; 1692 1693 /* 1694 * Called (sometimes indirectly) from DBC->get to search on-page data 1695 * item(s) for a matching value. If the original flag was DB_GET_BOTH 1696 * or DB_GET_BOTH_RANGE, the cursor is set to the first undeleted data 1697 * item for the key. If the original flag was DB_GET_BOTHC, the cursor 1698 * argument is set to the first data item we can potentially return. 1699 * In both cases, there may or may not be additional duplicate data 1700 * items to search. 1701 * 1702 * If the duplicates are not sorted, do a linear search. 1703 */ 1704 if (dbp->dup_compare == NULL) { 1705 for (;; cp->indx += P_INDX) { 1706 if (!IS_CUR_DELETED(dbc) && 1707 (ret = __bam_cmp(dbp, dbc->thread_info, 1708 dbc->txn, data, cp->page, 1709 cp->indx + O_INDX, __bam_defcmp, &cmp)) != 0) 1710 return (ret); 1711 if (cmp == 0) 1712 return (0); 1713 1714 if (cp->indx + P_INDX >= NUM_ENT(cp->page) || 1715 !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX)) 1716 break; 1717 } 1718 return (DB_NOTFOUND); 1719 } 1720 1721 /* 1722 * If the duplicates are sorted, do a binary search. The reason for 1723 * this is that large pages and small key/data pairs result in large 1724 * numbers of on-page duplicates before they get pushed off-page. 1725 * 1726 * Find the top and bottom of the duplicate set. Binary search 1727 * requires at least two items, don't loop if there's only one. 1728 */ 1729 for (base = top = cp->indx; top < NUM_ENT(cp->page); top += P_INDX) 1730 if (!IS_DUPLICATE(dbc, cp->indx, top)) 1731 break; 1732 if (base == (top - P_INDX)) { 1733 if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn, data, 1734 cp->page, cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0) 1735 return (ret); 1736 return (cmp == 0 || 1737 (cmp < 0 && flags == DB_GET_BOTH_RANGE) ? 0 : DB_NOTFOUND); 1738 } 1739 1740 for (lim = (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) { 1741 cp->indx = base + ((lim >> 1) * P_INDX); 1742 if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn, data, 1743 cp->page, cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0) 1744 return (ret); 1745 if (cmp == 0) { 1746 /* 1747 * XXX 1748 * No duplicate duplicates in sorted duplicate sets, 1749 * so there can be only one. 1750 */ 1751 if (!IS_CUR_DELETED(dbc)) 1752 return (0); 1753 break; 1754 } 1755 if (cmp > 0) { 1756 base = cp->indx + P_INDX; 1757 --lim; 1758 } 1759 } 1760 1761 /* No match found; if we're looking for an exact match, we're done. */ 1762 if (flags == DB_GET_BOTH) 1763 return (DB_NOTFOUND); 1764 1765 /* 1766 * Base is the smallest index greater than the data item, may be zero 1767 * or a last + O_INDX index, and may be deleted. Find an undeleted 1768 * item. 1769 */ 1770 cp->indx = base; 1771 while (cp->indx < top && IS_CUR_DELETED(dbc)) 1772 cp->indx += P_INDX; 1773 return (cp->indx < top ? 0 : DB_NOTFOUND); 1774} 1775 1776/* 1777 * __bamc_put -- 1778 * Put using a cursor. 1779 */ 1780static int 1781__bamc_put(dbc, key, data, flags, pgnop) 1782 DBC *dbc; 1783 DBT *key, *data; 1784 u_int32_t flags; 1785 db_pgno_t *pgnop; 1786{ 1787 BTREE *t; 1788 BTREE_CURSOR *cp; 1789 DB *dbp; 1790 DBT dbt; 1791 DB_MPOOLFILE *mpf; 1792 db_pgno_t root_pgno; 1793 u_int32_t iiop; 1794 int cmp, exact, own, ret, stack; 1795 void *arg; 1796 1797 dbp = dbc->dbp; 1798 mpf = dbp->mpf; 1799 cp = (BTREE_CURSOR *)dbc->internal; 1800 root_pgno = cp->root; 1801 1802split: ret = stack = 0; 1803 switch (flags) { 1804 case DB_CURRENT: 1805 if (F_ISSET(cp, C_DELETED)) 1806 return (DB_NOTFOUND); 1807 /* FALLTHROUGH */ 1808 1809 case DB_AFTER: 1810 case DB_BEFORE: 1811 iiop = flags; 1812 own = 1; 1813 1814 /* Acquire the current page with a write lock. */ 1815 ACQUIRE_WRITE_LOCK(dbc, ret); 1816 if (ret != 0) 1817 goto err; 1818 if ((ret = __memp_fget(mpf, &cp->pgno, 1819 dbc->thread_info, dbc->txn, 0, &cp->page)) != 0) 1820 goto err; 1821 break; 1822 case DB_KEYFIRST: 1823 case DB_KEYLAST: 1824 case DB_NODUPDATA: 1825 case DB_NOOVERWRITE: 1826 own = 0; 1827 /* 1828 * Searching off-page, sorted duplicate tree: do a tree search 1829 * for the correct item; __bamc_search returns the smallest 1830 * slot greater than the key, use it. 1831 * 1832 * See comment below regarding where we can start the search. 1833 */ 1834 if (F_ISSET(dbc, DBC_OPD)) { 1835 if ((ret = __bamc_search(dbc, 1836 F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, 1837 data, flags, &exact)) != 0) 1838 goto err; 1839 stack = 1; 1840 1841 /* Disallow "sorted" duplicate duplicates. */ 1842 if (exact) { 1843 if (IS_DELETED(dbp, cp->page, cp->indx)) { 1844 iiop = DB_CURRENT; 1845 break; 1846 } 1847 ret = __db_duperr(dbp, flags); 1848 goto err; 1849 } 1850 iiop = DB_BEFORE; 1851 break; 1852 } 1853 1854 /* 1855 * Searching a btree. 1856 * 1857 * If we've done a split, we can start the search from the 1858 * parent of the split page, which __bam_split returned 1859 * for us in root_pgno, unless we're in a Btree with record 1860 * numbering. In that case, we'll need the true root page 1861 * in order to adjust the record count. 1862 */ 1863 if ((ret = __bamc_search(dbc, 1864 F_ISSET(cp, C_RECNUM) ? cp->root : root_pgno, key, 1865 flags == DB_KEYFIRST || dbp->dup_compare != NULL ? 1866 DB_KEYFIRST : DB_KEYLAST, &exact)) != 0) 1867 goto err; 1868 stack = 1; 1869 1870 /* 1871 * If we don't have an exact match, __bamc_search returned 1872 * the smallest slot greater than the key, use it. 1873 */ 1874 if (!exact) { 1875 iiop = DB_KEYFIRST; 1876 break; 1877 1878 /* 1879 * Check for NOOVERWRITE. It is possible that there 1880 * is a key with an empty duplicate page attached. 1881 */ 1882 } else if (flags == DB_NOOVERWRITE && !IS_CUR_DELETED(dbc)) { 1883 if (pgnop != NULL && __bam_isopd(dbc, pgnop)) 1884 ret = __bam_opd_exists(dbc, *pgnop); 1885 else 1886 ret = DB_KEYEXIST; 1887 if (ret != 0) 1888 goto err; 1889 } 1890 1891 /* 1892 * If duplicates aren't supported, replace the current item. 1893 */ 1894 if (!F_ISSET(dbp, DB_AM_DUP)) { 1895 iiop = DB_CURRENT; 1896 break; 1897 } 1898 1899 /* 1900 * If we find a matching entry, it may be an off-page duplicate 1901 * tree. Return the page number to our caller, we need a new 1902 * cursor. 1903 */ 1904 if (pgnop != NULL && __bam_isopd(dbc, pgnop)) 1905 goto done; 1906 1907 /* If the duplicates aren't sorted, move to the right slot. */ 1908 if (dbp->dup_compare == NULL) { 1909 if (flags == DB_KEYFIRST) 1910 iiop = DB_BEFORE; 1911 else 1912 for (;; cp->indx += P_INDX) 1913 if (cp->indx + P_INDX >= 1914 NUM_ENT(cp->page) || 1915 !IS_DUPLICATE(dbc, cp->indx, 1916 cp->indx + P_INDX)) { 1917 iiop = DB_AFTER; 1918 break; 1919 } 1920 break; 1921 } 1922 1923 /* 1924 * We know that we're looking at the first of a set of sorted 1925 * on-page duplicates. Walk the list to find the right slot. 1926 */ 1927 for (;; cp->indx += P_INDX) { 1928 if ((ret = __bam_cmp(dbp, dbc->thread_info, 1929 dbc->txn, data, cp->page, 1930 cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0) 1931 goto err; 1932 if (cmp < 0) { 1933 iiop = DB_BEFORE; 1934 break; 1935 } 1936 1937 /* Disallow "sorted" duplicate duplicates. */ 1938 if (cmp == 0) { 1939 if (IS_DELETED(dbp, cp->page, cp->indx)) { 1940 iiop = DB_CURRENT; 1941 break; 1942 } 1943 ret = __db_duperr(dbp, flags); 1944 goto err; 1945 } 1946 1947 if (cp->indx + P_INDX >= NUM_ENT(cp->page) || 1948 P_INP(dbp, ((PAGE *)cp->page))[cp->indx] != 1949 P_INP(dbp, ((PAGE *)cp->page))[cp->indx + P_INDX]) { 1950 iiop = DB_AFTER; 1951 break; 1952 } 1953 } 1954 break; 1955 default: 1956 ret = __db_unknown_flag(dbp->env, "__bamc_put", flags); 1957 goto err; 1958 } 1959 1960 switch (ret = __bam_iitem(dbc, key, data, iiop, 0)) { 1961 case 0: 1962 break; 1963 case DB_NEEDSPLIT: 1964 /* 1965 * To split, we need a key for the page. Either use the key 1966 * argument or get a copy of the key from the page. 1967 */ 1968 if (flags == DB_AFTER || 1969 flags == DB_BEFORE || flags == DB_CURRENT) { 1970 memset(&dbt, 0, sizeof(DBT)); 1971 if ((ret = __db_ret(dbp, dbc->thread_info, 1972 dbc->txn, cp->page, 0, 1973 &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0) 1974 goto err; 1975 arg = &dbt; 1976 } else 1977 arg = F_ISSET(dbc, DBC_OPD) ? data : key; 1978 1979 /* 1980 * Discard any locks and pinned pages (the locks are discarded 1981 * even if we're running with transactions, as they lock pages 1982 * that we're sorry we ever acquired). If stack is set and the 1983 * cursor entries are valid, they point to the same entries as 1984 * the stack, don't free them twice. 1985 */ 1986 if (stack) 1987 ret = __bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK); 1988 else 1989 DISCARD_CUR(dbc, ret); 1990 if (ret != 0) 1991 goto err; 1992 1993 /* 1994 * SR [#6059] 1995 * If we do not own a lock on the page any more, then clear the 1996 * cursor so we don't point at it. Even if we call __bam_stkrel 1997 * above we still may have entered the routine with the cursor 1998 * positioned to a particular record. This is in the case 1999 * where C_RECNUM is set. 2000 */ 2001 if (own == 0) { 2002 cp->pgno = PGNO_INVALID; 2003 cp->indx = 0; 2004 } 2005 2006 /* Split the tree. */ 2007 if ((ret = __bam_split(dbc, arg, &root_pgno)) != 0) 2008 return (ret); 2009 2010 goto split; 2011 default: 2012 goto err; 2013 } 2014 2015err: 2016done: /* 2017 * If we inserted a key into the first or last slot of the tree, 2018 * remember where it was so we can do it more quickly next time. 2019 * If the tree has record numbers, we need a complete stack so 2020 * that we can adjust the record counts, so skipping the tree search 2021 * isn't possible. For subdatabases we need to be careful that the 2022 * page does not move from one db to another, so we track its LSN. 2023 * 2024 * If there are duplicates and we are inserting into the last slot, 2025 * the cursor will point _to_ the last item, not after it, which 2026 * is why we subtract P_INDX below. 2027 */ 2028 2029 t = dbp->bt_internal; 2030 if (ret == 0 && TYPE(cp->page) == P_LBTREE && 2031 (flags == DB_KEYFIRST || flags == DB_KEYLAST) && 2032 !F_ISSET(cp, C_RECNUM) && 2033 (!F_ISSET(dbp, DB_AM_SUBDB) || 2034 (LOGGING_ON(dbp->env) && !F_ISSET(dbp, DB_AM_NOT_DURABLE))) && 2035 ((NEXT_PGNO(cp->page) == PGNO_INVALID && 2036 cp->indx >= NUM_ENT(cp->page) - P_INDX) || 2037 (PREV_PGNO(cp->page) == PGNO_INVALID && cp->indx == 0))) { 2038 t->bt_lpgno = cp->pgno; 2039 if (F_ISSET(dbp, DB_AM_SUBDB)) 2040 t->bt_llsn = LSN(cp->page); 2041 } else 2042 t->bt_lpgno = PGNO_INVALID; 2043 /* 2044 * Discard any pages pinned in the tree and their locks, except for 2045 * the leaf page. Note, the leaf page participated in any stack we 2046 * acquired, and so we have to adjust the stack as necessary. If 2047 * there was only a single page on the stack, we don't have to free 2048 * further stack pages. 2049 */ 2050 if (stack && BT_STK_POP(cp) != NULL) 2051 (void)__bam_stkrel(dbc, 0); 2052 2053 /* 2054 * Regardless of whether we were successful or not, clear the delete 2055 * flag. If we're successful, we either moved the cursor or the item 2056 * is no longer deleted. If we're not successful, then we're just a 2057 * copy, no need to have the flag set. 2058 * 2059 * We may have instantiated off-page duplicate cursors during the put, 2060 * so clear the deleted bit from the off-page duplicate cursor as well. 2061 */ 2062 F_CLR(cp, C_DELETED); 2063 if (cp->opd != NULL) { 2064 cp = (BTREE_CURSOR *)cp->opd->internal; 2065 F_CLR(cp, C_DELETED); 2066 } 2067 2068 return (ret); 2069} 2070 2071/* 2072 * __bamc_rget -- 2073 * Return the record number for a cursor. 2074 * 2075 * PUBLIC: int __bamc_rget __P((DBC *, DBT *)); 2076 */ 2077int 2078__bamc_rget(dbc, data) 2079 DBC *dbc; 2080 DBT *data; 2081{ 2082 BTREE_CURSOR *cp; 2083 DB *dbp; 2084 DBT dbt; 2085 DB_MPOOLFILE *mpf; 2086 db_recno_t recno; 2087 int exact, ret, t_ret; 2088 2089 dbp = dbc->dbp; 2090 mpf = dbp->mpf; 2091 cp = (BTREE_CURSOR *)dbc->internal; 2092 2093 /* 2094 * Get the page with the current item on it. 2095 * Get a copy of the key. 2096 * Release the page, making sure we don't release it twice. 2097 */ 2098 if ((ret = __memp_fget(mpf, &cp->pgno, 2099 dbc->thread_info, dbc->txn, 0, &cp->page)) != 0) 2100 return (ret); 2101 memset(&dbt, 0, sizeof(DBT)); 2102 if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn, cp->page, 2103 cp->indx, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0) 2104 goto err; 2105 ret = __memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority); 2106 cp->page = NULL; 2107 if (ret != 0) 2108 return (ret); 2109 2110 if ((ret = __bam_search(dbc, PGNO_INVALID, &dbt, 2111 F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND, 2112 1, &recno, &exact)) != 0) 2113 goto err; 2114 2115 ret = __db_retcopy(dbc->env, data, 2116 &recno, sizeof(recno), &dbc->rdata->data, &dbc->rdata->ulen); 2117 2118 /* Release the stack. */ 2119err: if ((t_ret = __bam_stkrel(dbc, 0)) != 0 && ret == 0) 2120 ret = t_ret; 2121 2122 return (ret); 2123} 2124 2125/* 2126 * __bamc_writelock -- 2127 * Upgrade the cursor to a write lock. 2128 */ 2129static int 2130__bamc_writelock(dbc) 2131 DBC *dbc; 2132{ 2133 BTREE_CURSOR *cp; 2134 int ret; 2135 2136 cp = (BTREE_CURSOR *)dbc->internal; 2137 2138 if (cp->lock_mode == DB_LOCK_WRITE) 2139 return (0); 2140 2141 /* 2142 * When writing to an off-page duplicate tree, we need to have the 2143 * appropriate page in the primary tree locked. The general DBC 2144 * code calls us first with the primary cursor so we can acquire the 2145 * appropriate lock. 2146 */ 2147 ACQUIRE_WRITE_LOCK(dbc, ret); 2148 return (ret); 2149} 2150 2151/* 2152 * __bamc_next -- 2153 * Move to the next record. 2154 */ 2155static int 2156__bamc_next(dbc, initial_move, deleted_okay) 2157 DBC *dbc; 2158 int initial_move, deleted_okay; 2159{ 2160 BTREE_CURSOR *cp; 2161 db_indx_t adjust; 2162 db_lockmode_t lock_mode; 2163 db_pgno_t pgno; 2164 int ret; 2165 2166 cp = (BTREE_CURSOR *)dbc->internal; 2167 ret = 0; 2168 2169 /* 2170 * We're either moving through a page of duplicates or a btree leaf 2171 * page. 2172 * 2173 * !!! 2174 * This code handles empty pages and pages with only deleted entries. 2175 */ 2176 if (F_ISSET(dbc, DBC_OPD)) { 2177 adjust = O_INDX; 2178 lock_mode = DB_LOCK_NG; 2179 } else { 2180 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX; 2181 lock_mode = 2182 F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; 2183 } 2184 if (cp->page == NULL) { 2185 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret); 2186 if (ret != 0) 2187 return (ret); 2188 } 2189 2190 if (initial_move) 2191 cp->indx += adjust; 2192 2193 for (;;) { 2194 /* 2195 * If at the end of the page, move to a subsequent page. 2196 * 2197 * !!! 2198 * Check for >= NUM_ENT. If the original search landed us on 2199 * NUM_ENT, we may have incremented indx before the test. 2200 */ 2201 if (cp->indx >= NUM_ENT(cp->page)) { 2202 if ((pgno = NEXT_PGNO(cp->page)) == PGNO_INVALID) 2203 return (DB_NOTFOUND); 2204 2205 ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret); 2206 if (ret != 0) 2207 return (ret); 2208 cp->indx = 0; 2209 continue; 2210 } 2211 if (!deleted_okay && IS_CUR_DELETED(dbc)) { 2212 cp->indx += adjust; 2213 continue; 2214 } 2215 break; 2216 } 2217 return (0); 2218} 2219 2220/* 2221 * __bamc_prev -- 2222 * Move to the previous record. 2223 */ 2224static int 2225__bamc_prev(dbc) 2226 DBC *dbc; 2227{ 2228 BTREE_CURSOR *cp; 2229 db_indx_t adjust; 2230 db_lockmode_t lock_mode; 2231 db_pgno_t pgno; 2232 int ret; 2233 2234 cp = (BTREE_CURSOR *)dbc->internal; 2235 ret = 0; 2236 2237 /* 2238 * We're either moving through a page of duplicates or a btree leaf 2239 * page. 2240 * 2241 * !!! 2242 * This code handles empty pages and pages with only deleted entries. 2243 */ 2244 if (F_ISSET(dbc, DBC_OPD)) { 2245 adjust = O_INDX; 2246 lock_mode = DB_LOCK_NG; 2247 } else { 2248 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX; 2249 lock_mode = 2250 F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; 2251 } 2252 if (cp->page == NULL) { 2253 ACQUIRE_CUR(dbc, lock_mode, cp->pgno, 0, ret); 2254 if (ret != 0) 2255 return (ret); 2256 } 2257 2258 for (;;) { 2259 /* If at the beginning of the page, move to a previous one. */ 2260 if (cp->indx == 0) { 2261 if ((pgno = 2262 PREV_PGNO(cp->page)) == PGNO_INVALID) 2263 return (DB_NOTFOUND); 2264 2265 ACQUIRE_CUR(dbc, lock_mode, pgno, 0, ret); 2266 if (ret != 0) 2267 return (ret); 2268 2269 if ((cp->indx = NUM_ENT(cp->page)) == 0) 2270 continue; 2271 } 2272 2273 /* Ignore deleted records. */ 2274 cp->indx -= adjust; 2275 if (IS_CUR_DELETED(dbc)) 2276 continue; 2277 2278 break; 2279 } 2280 return (0); 2281} 2282 2283/* 2284 * __bamc_search -- 2285 * Move to a specified record. 2286 */ 2287static int 2288__bamc_search(dbc, root_pgno, key, flags, exactp) 2289 DBC *dbc; 2290 db_pgno_t root_pgno; 2291 const DBT *key; 2292 u_int32_t flags; 2293 int *exactp; 2294{ 2295 BTREE *t; 2296 BTREE_CURSOR *cp; 2297 DB *dbp; 2298 PAGE *h; 2299 db_indx_t indx, *inp; 2300 db_pgno_t bt_lpgno; 2301 db_recno_t recno; 2302 u_int32_t sflags; 2303 int cmp, ret, t_ret; 2304 2305 dbp = dbc->dbp; 2306 cp = (BTREE_CURSOR *)dbc->internal; 2307 t = dbp->bt_internal; 2308 ret = 0; 2309 2310 /* 2311 * Find an entry in the database. Discard any lock we currently hold, 2312 * we're going to search the tree. 2313 */ 2314 DISCARD_CUR(dbc, ret); 2315 if (ret != 0) 2316 return (ret); 2317 2318 switch (flags) { 2319 case DB_FIRST: 2320 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MIN; 2321 goto search; 2322 case DB_LAST: 2323 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_MAX; 2324 goto search; 2325 case DB_SET_RECNO: 2326 if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0) 2327 return (ret); 2328 sflags = 2329 (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND) | SR_EXACT; 2330 if ((ret = __bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0) 2331 return (ret); 2332 break; 2333 case DB_SET: 2334 case DB_GET_BOTH: 2335 sflags = 2336 (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND) | SR_EXACT; 2337 goto search; 2338 case DB_GET_BOTH_RANGE: 2339 sflags = (F_ISSET(dbc, DBC_RMW) ? SR_FIND_WR : SR_FIND); 2340 goto search; 2341 case DB_SET_RANGE: 2342 sflags = 2343 (F_ISSET(dbc, DBC_RMW) ? SR_WRITE : SR_READ) | SR_DUPFIRST; 2344 goto search; 2345 case DB_KEYFIRST: 2346 case DB_NOOVERWRITE: 2347 sflags = SR_KEYFIRST; 2348 goto fast_search; 2349 case DB_KEYLAST: 2350 case DB_NODUPDATA: 2351 sflags = SR_KEYLAST; 2352fast_search: /* 2353 * If the application has a history of inserting into the first 2354 * or last pages of the database, we check those pages first to 2355 * avoid doing a full search. 2356 */ 2357 if (F_ISSET(dbc, DBC_OPD)) 2358 goto search; 2359 2360 /* 2361 * !!! 2362 * We do not mutex protect the t->bt_lpgno field, which means 2363 * that it can only be used in an advisory manner. If we find 2364 * page we can use, great. If we don't, we don't care, we do 2365 * it the slow way instead. Regardless, copy it into a local 2366 * variable, otherwise we might acquire a lock for a page and 2367 * then read a different page because it changed underfoot. 2368 */ 2369 bt_lpgno = t->bt_lpgno; 2370 2371 /* 2372 * If the tree has no history of insertion, do it the slow way. 2373 */ 2374 if (bt_lpgno == PGNO_INVALID) 2375 goto search; 2376 2377 /* 2378 * Lock and retrieve the page on which we last inserted. 2379 * 2380 * The page may not exist: if a transaction created the page 2381 * and then aborted, the page might have been truncated from 2382 * the end of the file. We don't want to wait on the lock. 2383 * The page may not even be relevant to this search. 2384 */ 2385 h = NULL; 2386 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, bt_lpgno, DB_LOCK_NOWAIT, ret); 2387 if (ret != 0) { 2388 if (ret == DB_LOCK_DEADLOCK || 2389 ret == DB_LOCK_NOTGRANTED || 2390 ret == DB_PAGE_NOTFOUND) 2391 ret = 0; 2392 goto fast_miss; 2393 } 2394 2395 h = cp->page; 2396 inp = P_INP(dbp, h); 2397 2398 /* 2399 * It's okay if the page type isn't right or it's empty, it 2400 * just means that the world changed. 2401 */ 2402 if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0) 2403 goto fast_miss; 2404 2405 /* Verify that this page cannot have moved to another db. */ 2406 if (F_ISSET(dbp, DB_AM_SUBDB) && 2407 LOG_COMPARE(&t->bt_llsn, &LSN(h)) != 0) 2408 goto fast_miss; 2409 /* 2410 * What we do here is test to see if we're at the beginning or 2411 * end of the tree and if the new item sorts before/after the 2412 * first/last page entry. We don't try and catch inserts into 2413 * the middle of the tree (although we could, as long as there 2414 * were two keys on the page and we saved both the index and 2415 * the page number of the last insert). 2416 */ 2417 if (h->next_pgno == PGNO_INVALID) { 2418 indx = NUM_ENT(h) - P_INDX; 2419 if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn, 2420 key, h, indx, t->bt_compare, &cmp)) != 0) 2421 goto fast_miss; 2422 2423 if (cmp < 0) 2424 goto try_begin; 2425 if (cmp > 0) { 2426 indx += P_INDX; 2427 goto fast_hit; 2428 } 2429 2430 /* 2431 * Found a duplicate. If doing DB_KEYLAST, we're at 2432 * the correct position, otherwise, move to the first 2433 * of the duplicates. If we're looking at off-page 2434 * duplicates, duplicate duplicates aren't permitted, 2435 * so we're done. 2436 */ 2437 if (flags == DB_KEYLAST) 2438 goto fast_hit; 2439 for (; 2440 indx > 0 && inp[indx - P_INDX] == inp[indx]; 2441 indx -= P_INDX) 2442 ; 2443 goto fast_hit; 2444 } 2445try_begin: if (h->prev_pgno == PGNO_INVALID) { 2446 indx = 0; 2447 if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn, 2448 key, h, indx, t->bt_compare, &cmp)) != 0) 2449 goto fast_miss; 2450 2451 if (cmp > 0) 2452 goto fast_miss; 2453 if (cmp < 0) 2454 goto fast_hit; 2455 2456 /* 2457 * Found a duplicate. If doing DB_KEYFIRST, we're at 2458 * the correct position, otherwise, move to the last 2459 * of the duplicates. If we're looking at off-page 2460 * duplicates, duplicate duplicates aren't permitted, 2461 * so we're done. 2462 */ 2463 if (flags == DB_KEYFIRST) 2464 goto fast_hit; 2465 for (; 2466 indx < (db_indx_t)(NUM_ENT(h) - P_INDX) && 2467 inp[indx] == inp[indx + P_INDX]; 2468 indx += P_INDX) 2469 ; 2470 goto fast_hit; 2471 } 2472 goto fast_miss; 2473 2474fast_hit: /* Set the exact match flag, we may have found a duplicate. */ 2475 *exactp = cmp == 0; 2476 2477 /* 2478 * Insert the entry in the stack. (Our caller is likely to 2479 * call __bam_stkrel() after our return.) 2480 */ 2481 BT_STK_CLR(cp); 2482 BT_STK_ENTER(dbp->env, 2483 cp, h, indx, cp->lock, cp->lock_mode, ret); 2484 if (ret != 0) 2485 return (ret); 2486 break; 2487 2488fast_miss: /* 2489 * This was not the right page, so we do not need to retain 2490 * the lock even in the presence of transactions. 2491 * 2492 * This is also an error path, so ret may have been set. 2493 */ 2494 DISCARD_CUR(dbc, ret); 2495 cp->pgno = PGNO_INVALID; 2496 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) 2497 ret = t_ret; 2498 if (ret != 0) 2499 return (ret); 2500 2501search: if ((ret = __bam_search(dbc, root_pgno, 2502 key, sflags, 1, NULL, exactp)) != 0) 2503 return (ret); 2504 break; 2505 default: 2506 return (__db_unknown_flag(dbp->env, "__bamc_search", flags)); 2507 } 2508 /* Initialize the cursor from the stack. */ 2509 cp->page = cp->csp->page; 2510 cp->pgno = cp->csp->page->pgno; 2511 cp->indx = cp->csp->indx; 2512 cp->lock = cp->csp->lock; 2513 cp->lock_mode = cp->csp->lock_mode; 2514 2515 /* If on an empty page or a deleted record, move to the next one. */ 2516 if (flags == DB_FIRST && 2517 (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))) 2518 if ((ret = __bamc_next(dbc, 0, 0)) != 0) 2519 return (ret); 2520 if (flags == DB_LAST && 2521 (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))) 2522 if ((ret = __bamc_prev(dbc)) != 0) 2523 return (ret); 2524 2525 return (0); 2526} 2527 2528/* 2529 * __bamc_physdel -- 2530 * Physically remove an item from the page. 2531 */ 2532static int 2533__bamc_physdel(dbc) 2534 DBC *dbc; 2535{ 2536 BTREE_CURSOR *cp; 2537 DB *dbp; 2538 DBT key; 2539 int delete_page, empty_page, exact, ret; 2540 2541 dbp = dbc->dbp; 2542 memset(&key, 0, sizeof(DBT)); 2543 cp = (BTREE_CURSOR *)dbc->internal; 2544 delete_page = empty_page = ret = 0; 2545 2546 /* If the page is going to be emptied, consider deleting it. */ 2547 delete_page = empty_page = 2548 NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1); 2549 2550 /* 2551 * Check if the application turned off reverse splits. Applications 2552 * can't turn off reverse splits in off-page duplicate trees, that 2553 * space will never be reused unless the exact same key is specified. 2554 */ 2555 if (delete_page && 2556 !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_AM_REVSPLITOFF)) 2557 delete_page = 0; 2558 2559 /* 2560 * We never delete the last leaf page. (Not really true -- we delete 2561 * the last leaf page of off-page duplicate trees, but that's handled 2562 * by our caller, not down here.) 2563 */ 2564 if (delete_page && cp->pgno == cp->root) 2565 delete_page = 0; 2566 2567 /* 2568 * To delete a leaf page other than an empty root page, we need a 2569 * copy of a key from the page. Use the 0th page index since it's 2570 * the last key the page held. 2571 * 2572 * !!! 2573 * Note that because __bamc_physdel is always called from a cursor 2574 * close, it should be safe to use the cursor's own "my_rkey" memory 2575 * to temporarily hold this key. We shouldn't own any returned-data 2576 * memory of interest--if we do, we're in trouble anyway. 2577 */ 2578 if (delete_page) { 2579 if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn, cp->page, 2580 0, &key, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0) 2581 return (ret); 2582 } 2583 2584 /* 2585 * Delete the items. If page isn't empty, we adjust the cursors. 2586 * 2587 * !!! 2588 * The following operations to delete a page may deadlock. The easy 2589 * scenario is if we're deleting an item because we're closing cursors 2590 * because we've already deadlocked and want to call txn->abort. If 2591 * we fail due to deadlock, we'll leave a locked, possibly empty page 2592 * in the tree, which won't be empty long because we'll undo the delete 2593 * when we undo the transaction's modifications. 2594 * 2595 * !!! 2596 * Delete the key item first, otherwise the on-page duplicate checks 2597 * in __bam_ditem() won't work! 2598 */ 2599 if ((ret = __memp_dirty(dbp->mpf, 2600 &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 2601 return (ret); 2602 if (TYPE(cp->page) == P_LBTREE) { 2603 if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0) 2604 return (ret); 2605 if (!empty_page) 2606 if ((ret = __bam_ca_di(dbc, 2607 PGNO(cp->page), cp->indx, -1)) != 0) 2608 return (ret); 2609 } 2610 if ((ret = __bam_ditem(dbc, cp->page, cp->indx)) != 0) 2611 return (ret); 2612 2613 /* Clear the deleted flag, the item is gone. */ 2614 F_CLR(cp, C_DELETED); 2615 2616 if (!empty_page) 2617 if ((ret = __bam_ca_di(dbc, PGNO(cp->page), cp->indx, -1)) != 0) 2618 return (ret); 2619 2620 /* 2621 * Need to downgrade write locks here or non-txn locks will get stuck. 2622 */ 2623 if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED)) { 2624 if ((ret = __TLPUT(dbc, cp->lock)) != 0) 2625 return (ret); 2626 cp->lock_mode = DB_LOCK_WWRITE; 2627 } 2628 /* If we're not going to try and delete the page, we're done. */ 2629 if (!delete_page) 2630 return (0); 2631 2632 ret = __bam_search(dbc, PGNO_INVALID, &key, SR_DEL, 0, NULL, &exact); 2633 2634 /* 2635 * If everything worked, delete the stack, otherwise, release the 2636 * stack and page locks without further damage. 2637 */ 2638 if (ret == 0) 2639 DISCARD_CUR(dbc, ret); 2640 if (ret == 0) 2641 ret = __bam_dpages(dbc, 1, 0); 2642 else 2643 (void)__bam_stkrel(dbc, 0); 2644 2645 return (ret); 2646} 2647 2648/* 2649 * __bamc_getstack -- 2650 * Acquire a full stack for a cursor. 2651 */ 2652static int 2653__bamc_getstack(dbc) 2654 DBC *dbc; 2655{ 2656 BTREE_CURSOR *cp; 2657 DB *dbp; 2658 DBT dbt; 2659 DB_MPOOLFILE *mpf; 2660 PAGE *h; 2661 int exact, ret, t_ret; 2662 2663 dbp = dbc->dbp; 2664 mpf = dbp->mpf; 2665 cp = (BTREE_CURSOR *)dbc->internal; 2666 2667 /* 2668 * Get the page with the current item on it. The caller of this 2669 * routine has to already hold a read lock on the page, so there 2670 * is no additional lock to acquire. 2671 */ 2672 if ((ret = __memp_fget(mpf, &cp->pgno, 2673 dbc->thread_info, dbc->txn, 0, &h)) != 0) 2674 return (ret); 2675 2676 /* Get a copy of a key from the page. */ 2677 memset(&dbt, 0, sizeof(DBT)); 2678 if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn, 2679 h, 0, &dbt, &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0) 2680 goto err; 2681 2682 /* Get a write-locked stack for the page. */ 2683 exact = 0; 2684 ret = __bam_search(dbc, PGNO_INVALID, 2685 &dbt, SR_KEYFIRST, 1, NULL, &exact); 2686 2687err: /* Discard the key and the page. */ 2688 if ((t_ret = __memp_fput(mpf, 2689 dbc->thread_info, h, dbc->priority)) != 0 && ret == 0) 2690 ret = t_ret; 2691 2692 return (ret); 2693} 2694 2695/* 2696 * __bam_isopd -- 2697 * Return if the cursor references an off-page duplicate tree via its 2698 * page number. 2699 */ 2700static int 2701__bam_isopd(dbc, pgnop) 2702 DBC *dbc; 2703 db_pgno_t *pgnop; 2704{ 2705 BOVERFLOW *bo; 2706 2707 if (TYPE(dbc->internal->page) != P_LBTREE) 2708 return (0); 2709 2710 bo = GET_BOVERFLOW(dbc->dbp, 2711 dbc->internal->page, dbc->internal->indx + O_INDX); 2712 if (B_TYPE(bo->type) == B_DUPLICATE) { 2713 *pgnop = bo->pgno; 2714 return (1); 2715 } 2716 return (0); 2717} 2718 2719/* 2720 * __bam_opd_exists -- 2721 * Return if the current position has any data. 2722 * PUBLIC: int __bam_opd_exists __P((DBC *, db_pgno_t)); 2723 */ 2724int 2725__bam_opd_exists(dbc, pgno) 2726 DBC *dbc; 2727 db_pgno_t pgno; 2728{ 2729 PAGE *h; 2730 int ret; 2731 2732 if ((ret = __memp_fget(dbc->dbp->mpf, &pgno, 2733 dbc->thread_info, dbc->txn, 0, &h)) != 0) 2734 return (ret); 2735 2736 /* 2737 * We always collapse OPD trees so we only need to check 2738 * the number of entries on the root. If there is a non-empty 2739 * tree then there will be duplicates. 2740 */ 2741 if (NUM_ENT(h) == 0) 2742 ret = 0; 2743 else 2744 ret = DB_KEYEXIST; 2745 2746 (void)__memp_fput(dbc->dbp->mpf, dbc->thread_info, h, dbc->priority); 2747 2748 return (ret); 2749} 2750