1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2000,2008 Oracle. All rights reserved. 5 * 6 * $Id: db_cam.c,v 12.79 2008/05/07 12:27:32 bschmeck Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/btree.h" 14#include "dbinc/hash.h" 15#include "dbinc/lock.h" 16#include "dbinc/mp.h" 17#include "dbinc/qam.h" 18#include "dbinc/txn.h" 19 20static int __db_buildpartial __P((DB *, DBT *, DBT *, DBT *)); 21static int __db_s_count __P((DB *)); 22static int __db_wrlock_err __P((ENV *)); 23static int __dbc_cleanup __P((DBC *, DBC *, int)); 24static int __dbc_del_foreign __P((DBC *)); 25static int __dbc_del_oldskey __P((DB *, DBC *, DBT *, DBT *, DBT *)); 26static int __dbc_del_secondary __P((DBC *)); 27static int __dbc_pget_recno __P((DBC *, DBT *, DBT *, u_int32_t)); 28 29#define CDB_LOCKING_INIT(env, dbc) \ 30 /* \ 31 * If we are running CDB, this had better be either a write \ 32 * cursor or an immediate writer. If it's a regular writer, \ 33 * that means we have an IWRITE lock and we need to upgrade \ 34 * it to a write lock. \ 35 */ \ 36 if (CDB_LOCKING(env)) { \ 37 if (!F_ISSET(dbc, DBC_WRITECURSOR | DBC_WRITER)) \ 38 return (__db_wrlock_err(env)); \ 39 \ 40 if (F_ISSET(dbc, DBC_WRITECURSOR) && \ 41 (ret = __lock_get(env, \ 42 (dbc)->locker, DB_LOCK_UPGRADE, &(dbc)->lock_dbt, \ 43 DB_LOCK_WRITE, &(dbc)->mylock)) != 0) \ 44 return (ret); \ 45 } 46#define CDB_LOCKING_DONE(env, dbc) \ 47 /* Release the upgraded lock. */ \ 48 if (F_ISSET(dbc, DBC_WRITECURSOR)) \ 49 (void)__lock_downgrade( \ 50 env, &(dbc)->mylock, DB_LOCK_IWRITE, 0); 51 52/* 53 * __dbc_close -- 54 * DBC->close. 55 * 56 * PUBLIC: int __dbc_close __P((DBC *)); 57 */ 58int 59__dbc_close(dbc) 60 DBC *dbc; 61{ 62 DB *dbp; 63 DBC *opd; 64 DBC_INTERNAL *cp; 65 DB_TXN *txn; 66 ENV *env; 67 int ret, t_ret; 68 69 dbp = dbc->dbp; 70 env = dbp->env; 71 cp = dbc->internal; 72 opd = cp->opd; 73 ret = 0; 74 75 /* 76 * Remove the cursor(s) from the active queue. We may be closing two 77 * cursors at once here, a top-level one and a lower-level, off-page 78 * duplicate one. The access-method specific cursor close routine must 79 * close both of them in a single call. 80 * 81 * !!! 82 * Cursors must be removed from the active queue before calling the 83 * access specific cursor close routine, btree depends on having that 84 * order of operations. 85 */ 86 MUTEX_LOCK(env, dbp->mutex); 87 88 if (opd != NULL) { 89 DB_ASSERT(env, F_ISSET(opd, DBC_ACTIVE)); 90 F_CLR(opd, DBC_ACTIVE); 91 TAILQ_REMOVE(&dbp->active_queue, opd, links); 92 } 93 DB_ASSERT(env, F_ISSET(dbc, DBC_ACTIVE)); 94 F_CLR(dbc, DBC_ACTIVE); 95 TAILQ_REMOVE(&dbp->active_queue, dbc, links); 96 97 MUTEX_UNLOCK(env, dbp->mutex); 98 99 /* Call the access specific cursor close routine. */ 100 if ((t_ret = 101 dbc->am_close(dbc, PGNO_INVALID, NULL)) != 0 && ret == 0) 102 ret = t_ret; 103 104 /* 105 * Release the lock after calling the access method specific close 106 * routine, a Btree cursor may have had pending deletes. 107 */ 108 if (CDB_LOCKING(env)) { 109 /* 110 * Also, be sure not to free anything if mylock.off is 111 * INVALID; in some cases, such as idup'ed read cursors 112 * and secondary update cursors, a cursor in a CDB 113 * environment may not have a lock at all. 114 */ 115 if ((t_ret = __LPUT(dbc, dbc->mylock)) != 0 && ret == 0) 116 ret = t_ret; 117 118 /* For safety's sake, since this is going on the free queue. */ 119 memset(&dbc->mylock, 0, sizeof(dbc->mylock)); 120 if (opd != NULL) 121 memset(&opd->mylock, 0, sizeof(opd->mylock)); 122 } 123 124 if ((txn = dbc->txn) != NULL) 125 txn->cursors--; 126 127 /* Move the cursor(s) to the free queue. */ 128 MUTEX_LOCK(env, dbp->mutex); 129 if (opd != NULL) { 130 if (txn != NULL) 131 txn->cursors--; 132 TAILQ_INSERT_TAIL(&dbp->free_queue, opd, links); 133 opd = NULL; 134 } 135 TAILQ_INSERT_TAIL(&dbp->free_queue, dbc, links); 136 MUTEX_UNLOCK(env, dbp->mutex); 137 138 if (txn != NULL && F_ISSET(txn, TXN_PRIVATE) && txn->cursors == 0 && 139 (t_ret = __txn_commit(txn, 0)) != 0 && ret == 0) 140 ret = t_ret; 141 142 return (ret); 143} 144 145/* 146 * __dbc_destroy -- 147 * Destroy the cursor, called after DBC->close. 148 * 149 * PUBLIC: int __dbc_destroy __P((DBC *)); 150 */ 151int 152__dbc_destroy(dbc) 153 DBC *dbc; 154{ 155 DB *dbp; 156 ENV *env; 157 int ret, t_ret; 158 159 dbp = dbc->dbp; 160 env = dbp->env; 161 162 /* Remove the cursor from the free queue. */ 163 MUTEX_LOCK(env, dbp->mutex); 164 TAILQ_REMOVE(&dbp->free_queue, dbc, links); 165 MUTEX_UNLOCK(env, dbp->mutex); 166 167 /* Free up allocated memory. */ 168 if (dbc->my_rskey.data != NULL) 169 __os_free(env, dbc->my_rskey.data); 170 if (dbc->my_rkey.data != NULL) 171 __os_free(env, dbc->my_rkey.data); 172 if (dbc->my_rdata.data != NULL) 173 __os_free(env, dbc->my_rdata.data); 174 175 /* Call the access specific cursor destroy routine. */ 176 ret = dbc->am_destroy == NULL ? 0 : dbc->am_destroy(dbc); 177 178 /* 179 * Release the lock id for this cursor. 180 */ 181 if (LOCKING_ON(env) && 182 F_ISSET(dbc, DBC_OWN_LID) && 183 (t_ret = __lock_id_free(env, dbc->lref)) != 0 && ret == 0) 184 ret = t_ret; 185 186 __os_free(env, dbc); 187 188 return (ret); 189} 190 191/* 192 * __dbc_count -- 193 * Return a count of duplicate data items. 194 * 195 * PUBLIC: int __dbc_count __P((DBC *, db_recno_t *)); 196 */ 197int 198__dbc_count(dbc, recnop) 199 DBC *dbc; 200 db_recno_t *recnop; 201{ 202 ENV *env; 203 int ret; 204 205 env = dbc->env; 206 207 /* 208 * Cursor Cleanup Note: 209 * All of the cursors passed to the underlying access methods by this 210 * routine are not duplicated and will not be cleaned up on return. 211 * So, pages/locks that the cursor references must be resolved by the 212 * underlying functions. 213 */ 214 switch (dbc->dbtype) { 215 case DB_QUEUE: 216 case DB_RECNO: 217 *recnop = 1; 218 break; 219 case DB_HASH: 220 if (dbc->internal->opd == NULL) { 221 if ((ret = __hamc_count(dbc, recnop)) != 0) 222 return (ret); 223 break; 224 } 225 /* FALLTHROUGH */ 226 case DB_BTREE: 227 if ((ret = __bamc_count(dbc, recnop)) != 0) 228 return (ret); 229 break; 230 case DB_UNKNOWN: 231 default: 232 return (__db_unknown_type(env, "__dbc_count", dbc->dbtype)); 233 } 234 return (0); 235} 236 237/* 238 * __dbc_del -- 239 * DBC->del. 240 * 241 * PUBLIC: int __dbc_del __P((DBC *, u_int32_t)); 242 */ 243int 244__dbc_del(dbc, flags) 245 DBC *dbc; 246 u_int32_t flags; 247{ 248 DB *dbp; 249 DBC *opd; 250 ENV *env; 251 int ret, t_ret; 252 253 dbp = dbc->dbp; 254 env = dbp->env; 255 256 /* 257 * Cursor Cleanup Note: 258 * All of the cursors passed to the underlying access methods by this 259 * routine are not duplicated and will not be cleaned up on return. 260 * So, pages/locks that the cursor references must be resolved by the 261 * underlying functions. 262 */ 263 264 CDB_LOCKING_INIT(env, dbc); 265 266 /* 267 * If we're a secondary index, and DB_UPDATE_SECONDARY isn't set 268 * (which it only is if we're being called from a primary update), 269 * then we need to call through to the primary and delete the item. 270 * 271 * Note that this will delete the current item; we don't need to 272 * delete it ourselves as well, so we can just goto done. 273 */ 274 if (flags != DB_UPDATE_SECONDARY && F_ISSET(dbp, DB_AM_SECONDARY)) { 275 ret = __dbc_del_secondary(dbc); 276 goto done; 277 } 278 279 /* 280 * If we are a foreign db, go through and check any foreign key 281 * constraints first, which will make rolling back changes on an abort 282 * simpler. 283 */ 284 if (LIST_FIRST(&dbp->f_primaries) != NULL && 285 (ret = __dbc_del_foreign(dbc)) != 0) 286 goto done; 287 288 /* 289 * If we are a primary and have secondary indices, go through 290 * and delete any secondary keys that point at the current record. 291 */ 292 if (LIST_FIRST(&dbp->s_secondaries) != NULL && 293 (ret = __dbc_del_primary(dbc)) != 0) 294 goto done; 295 296 /* 297 * Off-page duplicate trees are locked in the primary tree, that is, 298 * we acquire a write lock in the primary tree and no locks in the 299 * off-page dup tree. If the del operation is done in an off-page 300 * duplicate tree, call the primary cursor's upgrade routine first. 301 */ 302 opd = dbc->internal->opd; 303 if (opd == NULL) 304 ret = dbc->am_del(dbc); 305 else 306 if ((ret = dbc->am_writelock(dbc)) == 0) 307 ret = opd->am_del(opd); 308 309 /* 310 * If this was an update that is supporting dirty reads 311 * then we may have just swapped our read for a write lock 312 * which is held by the surviving cursor. We need 313 * to explicitly downgrade this lock. The closed cursor 314 * may only have had a read lock. 315 */ 316 if (F_ISSET(dbc->dbp, DB_AM_READ_UNCOMMITTED) && 317 dbc->internal->lock_mode == DB_LOCK_WRITE) { 318 if ((t_ret = 319 __TLPUT(dbc, dbc->internal->lock)) != 0 && ret == 0) 320 ret = t_ret; 321 if (t_ret == 0) 322 dbc->internal->lock_mode = DB_LOCK_WWRITE; 323 } 324 325done: CDB_LOCKING_DONE(env, dbc); 326 327 return (ret); 328} 329 330/* 331 * __dbc_dup -- 332 * Duplicate a cursor 333 * 334 * PUBLIC: int __dbc_dup __P((DBC *, DBC **, u_int32_t)); 335 */ 336int 337__dbc_dup(dbc_orig, dbcp, flags) 338 DBC *dbc_orig; 339 DBC **dbcp; 340 u_int32_t flags; 341{ 342 DBC *dbc_n, *dbc_nopd; 343 int ret; 344 345 dbc_n = dbc_nopd = NULL; 346 347 /* Allocate a new cursor and initialize it. */ 348 if ((ret = __dbc_idup(dbc_orig, &dbc_n, flags)) != 0) 349 goto err; 350 *dbcp = dbc_n; 351 352 /* 353 * If the cursor references an off-page duplicate tree, allocate a 354 * new cursor for that tree and initialize it. 355 */ 356 if (dbc_orig->internal->opd != NULL) { 357 if ((ret = 358 __dbc_idup(dbc_orig->internal->opd, &dbc_nopd, flags)) != 0) 359 goto err; 360 dbc_n->internal->opd = dbc_nopd; 361 } 362 return (0); 363 364err: if (dbc_n != NULL) 365 (void)__dbc_close(dbc_n); 366 if (dbc_nopd != NULL) 367 (void)__dbc_close(dbc_nopd); 368 369 return (ret); 370} 371 372/* 373 * __dbc_idup -- 374 * Internal version of __dbc_dup. 375 * 376 * PUBLIC: int __dbc_idup __P((DBC *, DBC **, u_int32_t)); 377 */ 378int 379__dbc_idup(dbc_orig, dbcp, flags) 380 DBC *dbc_orig, **dbcp; 381 u_int32_t flags; 382{ 383 DB *dbp; 384 DBC *dbc_n; 385 DBC_INTERNAL *int_n, *int_orig; 386 ENV *env; 387 int ret; 388 389 dbp = dbc_orig->dbp; 390 dbc_n = *dbcp; 391 env = dbp->env; 392 393 if ((ret = __db_cursor_int(dbp, dbc_orig->thread_info, 394 dbc_orig->txn, dbc_orig->dbtype, dbc_orig->internal->root, 395 F_ISSET(dbc_orig, DBC_OPD) | DBC_DUPLICATE, 396 dbc_orig->locker, &dbc_n)) != 0) 397 return (ret); 398 399 /* Position the cursor if requested, acquiring the necessary locks. */ 400 if (flags == DB_POSITION) { 401 int_n = dbc_n->internal; 402 int_orig = dbc_orig->internal; 403 404 dbc_n->flags |= dbc_orig->flags & ~DBC_OWN_LID; 405 406 int_n->indx = int_orig->indx; 407 int_n->pgno = int_orig->pgno; 408 int_n->root = int_orig->root; 409 int_n->lock_mode = int_orig->lock_mode; 410 411 switch (dbc_orig->dbtype) { 412 case DB_QUEUE: 413 if ((ret = __qamc_dup(dbc_orig, dbc_n)) != 0) 414 goto err; 415 break; 416 case DB_BTREE: 417 case DB_RECNO: 418 if ((ret = __bamc_dup(dbc_orig, dbc_n)) != 0) 419 goto err; 420 break; 421 case DB_HASH: 422 if ((ret = __hamc_dup(dbc_orig, dbc_n)) != 0) 423 goto err; 424 break; 425 case DB_UNKNOWN: 426 default: 427 ret = __db_unknown_type(env, 428 "__dbc_idup", dbc_orig->dbtype); 429 goto err; 430 } 431 } 432 433 /* Copy the locking flags to the new cursor. */ 434 F_SET(dbc_n, F_ISSET(dbc_orig, 435 DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED | DBC_WRITECURSOR)); 436 437 /* 438 * If we're in CDB and this isn't an offpage dup cursor, then 439 * we need to get a lock for the duplicated cursor. 440 */ 441 if (CDB_LOCKING(env) && !F_ISSET(dbc_n, DBC_OPD) && 442 (ret = __lock_get(env, dbc_n->locker, 0, 443 &dbc_n->lock_dbt, F_ISSET(dbc_orig, DBC_WRITECURSOR) ? 444 DB_LOCK_IWRITE : DB_LOCK_READ, &dbc_n->mylock)) != 0) 445 goto err; 446 447 dbc_n->priority = dbc_orig->priority; 448 *dbcp = dbc_n; 449 return (0); 450 451err: (void)__dbc_close(dbc_n); 452 return (ret); 453} 454 455/* 456 * __dbc_newopd -- 457 * Create a new off-page duplicate cursor. 458 * 459 * PUBLIC: int __dbc_newopd __P((DBC *, db_pgno_t, DBC *, DBC **)); 460 */ 461int 462__dbc_newopd(dbc_parent, root, oldopd, dbcp) 463 DBC *dbc_parent; 464 db_pgno_t root; 465 DBC *oldopd; 466 DBC **dbcp; 467{ 468 DB *dbp; 469 DBC *opd; 470 DBTYPE dbtype; 471 int ret; 472 473 dbp = dbc_parent->dbp; 474 dbtype = (dbp->dup_compare == NULL) ? DB_RECNO : DB_BTREE; 475 476 /* 477 * On failure, we want to default to returning the old off-page dup 478 * cursor, if any; our caller can't be left with a dangling pointer 479 * to a freed cursor. On error the only allowable behavior is to 480 * close the cursor (and the old OPD cursor it in turn points to), so 481 * this should be safe. 482 */ 483 *dbcp = oldopd; 484 485 if ((ret = __db_cursor_int(dbp, dbc_parent->thread_info, 486 dbc_parent->txn, 487 dbtype, root, DBC_OPD, dbc_parent->locker, &opd)) != 0) 488 return (ret); 489 490 opd->priority = dbc_parent->priority; 491 *dbcp = opd; 492 493 /* 494 * Check to see if we already have an off-page dup cursor that we've 495 * passed in. If we do, close it. It'd be nice to use it again 496 * if it's a cursor belonging to the right tree, but if we're doing 497 * a cursor-relative operation this might not be safe, so for now 498 * we'll take the easy way out and always close and reopen. 499 * 500 * Note that under no circumstances do we want to close the old 501 * cursor without returning a valid new one; we don't want to 502 * leave the main cursor in our caller with a non-NULL pointer 503 * to a freed off-page dup cursor. 504 */ 505 if (oldopd != NULL && (ret = __dbc_close(oldopd)) != 0) 506 return (ret); 507 508 return (0); 509} 510 511/* 512 * __dbc_get -- 513 * Get using a cursor. 514 * 515 * PUBLIC: int __dbc_get __P((DBC *, DBT *, DBT *, u_int32_t)); 516 */ 517int 518__dbc_get(dbc_arg, key, data, flags) 519 DBC *dbc_arg; 520 DBT *key, *data; 521 u_int32_t flags; 522{ 523 DB *dbp; 524 DBC *dbc, *dbc_n, *opd; 525 DBC_INTERNAL *cp, *cp_n; 526 DB_MPOOLFILE *mpf; 527 ENV *env; 528 db_pgno_t pgno; 529 db_indx_t indx_off; 530 u_int32_t multi, orig_ulen, tmp_flags, tmp_read_uncommitted, tmp_rmw; 531 u_int8_t type; 532 int key_small, ret, t_ret; 533 534 COMPQUIET(orig_ulen, 0); 535 536 key_small = 0; 537 538 /* 539 * Cursor Cleanup Note: 540 * All of the cursors passed to the underlying access methods by this 541 * routine are duplicated cursors. On return, any referenced pages 542 * will be discarded, and, if the cursor is not intended to be used 543 * again, the close function will be called. So, pages/locks that 544 * the cursor references do not need to be resolved by the underlying 545 * functions. 546 */ 547 dbp = dbc_arg->dbp; 548 env = dbp->env; 549 mpf = dbp->mpf; 550 dbc_n = NULL; 551 opd = NULL; 552 553 /* Clear OR'd in additional bits so we can check for flag equality. */ 554 tmp_rmw = LF_ISSET(DB_RMW); 555 LF_CLR(DB_RMW); 556 557 tmp_read_uncommitted = 558 LF_ISSET(DB_READ_UNCOMMITTED) && 559 !F_ISSET(dbc_arg, DBC_READ_UNCOMMITTED); 560 LF_CLR(DB_READ_UNCOMMITTED); 561 562 multi = LF_ISSET(DB_MULTIPLE|DB_MULTIPLE_KEY); 563 LF_CLR(DB_MULTIPLE|DB_MULTIPLE_KEY); 564 565 /* 566 * Return a cursor's record number. It has nothing to do with the 567 * cursor get code except that it was put into the interface. 568 */ 569 if (flags == DB_GET_RECNO) { 570 if (tmp_rmw) 571 F_SET(dbc_arg, DBC_RMW); 572 if (tmp_read_uncommitted) 573 F_SET(dbc_arg, DBC_READ_UNCOMMITTED); 574 ret = __bamc_rget(dbc_arg, data); 575 if (tmp_rmw) 576 F_CLR(dbc_arg, DBC_RMW); 577 if (tmp_read_uncommitted) 578 F_CLR(dbc_arg, DBC_READ_UNCOMMITTED); 579 return (ret); 580 } 581 582 if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT) 583 CDB_LOCKING_INIT(env, dbc_arg); 584 585 /* Don't return the key or data if it was passed to us. */ 586 if (!DB_RETURNS_A_KEY(dbp, flags)) 587 F_SET(key, DB_DBT_ISSET); 588 if (flags == DB_GET_BOTH && 589 (dbp->dup_compare == NULL || dbp->dup_compare == __bam_defcmp)) 590 F_SET(data, DB_DBT_ISSET); 591 592 /* 593 * If we have an off-page duplicates cursor, and the operation applies 594 * to it, perform the operation. Duplicate the cursor and call the 595 * underlying function. 596 * 597 * Off-page duplicate trees are locked in the primary tree, that is, 598 * we acquire a write lock in the primary tree and no locks in the 599 * off-page dup tree. If the DB_RMW flag was specified and the get 600 * operation is done in an off-page duplicate tree, call the primary 601 * cursor's upgrade routine first. 602 */ 603 cp = dbc_arg->internal; 604 if (cp->opd != NULL && 605 (flags == DB_CURRENT || flags == DB_GET_BOTHC || 606 flags == DB_NEXT || flags == DB_NEXT_DUP || 607 flags == DB_PREV || flags == DB_PREV_DUP)) { 608 if (tmp_rmw && (ret = dbc_arg->am_writelock(dbc_arg)) != 0) 609 goto err; 610 if (F_ISSET(dbc_arg, DBC_TRANSIENT)) 611 opd = cp->opd; 612 else if ((ret = __dbc_idup(cp->opd, &opd, DB_POSITION)) != 0) 613 goto err; 614 615 switch (ret = opd->am_get(opd, key, data, flags, NULL)) { 616 case 0: 617 goto done; 618 case DB_NOTFOUND: 619 /* 620 * Translate DB_NOTFOUND failures for the DB_NEXT and 621 * DB_PREV operations into a subsequent operation on 622 * the parent cursor. 623 */ 624 if (flags == DB_NEXT || flags == DB_PREV) { 625 if ((ret = __dbc_close(opd)) != 0) 626 goto err; 627 opd = NULL; 628 if (F_ISSET(dbc_arg, DBC_TRANSIENT)) 629 cp->opd = NULL; 630 break; 631 } 632 goto err; 633 default: 634 goto err; 635 } 636 } else if (cp->opd != NULL && F_ISSET(dbc_arg, DBC_TRANSIENT)) { 637 if ((ret = __dbc_close(cp->opd)) != 0) 638 goto err; 639 cp->opd = NULL; 640 } 641 642 /* 643 * Perform an operation on the main cursor. Duplicate the cursor, 644 * upgrade the lock as required, and call the underlying function. 645 */ 646 switch (flags) { 647 case DB_CURRENT: 648 case DB_GET_BOTHC: 649 case DB_NEXT: 650 case DB_NEXT_DUP: 651 case DB_NEXT_NODUP: 652 case DB_PREV: 653 case DB_PREV_DUP: 654 case DB_PREV_NODUP: 655 tmp_flags = DB_POSITION; 656 break; 657 default: 658 tmp_flags = 0; 659 break; 660 } 661 662 if (tmp_read_uncommitted) 663 F_SET(dbc_arg, DBC_READ_UNCOMMITTED); 664 665 /* 666 * If this cursor is going to be closed immediately, we don't 667 * need to take precautions to clean it up on error. 668 */ 669 if (F_ISSET(dbc_arg, DBC_TRANSIENT)) 670 dbc_n = dbc_arg; 671 else { 672 ret = __dbc_idup(dbc_arg, &dbc_n, tmp_flags); 673 if (tmp_read_uncommitted) 674 F_CLR(dbc_arg, DBC_READ_UNCOMMITTED); 675 676 if (ret != 0) 677 goto err; 678 COPY_RET_MEM(dbc_arg, dbc_n); 679 } 680 681 if (tmp_rmw) 682 F_SET(dbc_n, DBC_RMW); 683 684 switch (multi) { 685 case DB_MULTIPLE: 686 F_SET(dbc_n, DBC_MULTIPLE); 687 break; 688 case DB_MULTIPLE_KEY: 689 F_SET(dbc_n, DBC_MULTIPLE_KEY); 690 break; 691 case DB_MULTIPLE | DB_MULTIPLE_KEY: 692 F_SET(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY); 693 break; 694 case 0: 695 default: 696 break; 697 } 698 699retry: pgno = PGNO_INVALID; 700 ret = dbc_n->am_get(dbc_n, key, data, flags, &pgno); 701 if (tmp_rmw) 702 F_CLR(dbc_n, DBC_RMW); 703 if (tmp_read_uncommitted) 704 F_CLR(dbc_arg, DBC_READ_UNCOMMITTED); 705 F_CLR(dbc_n, DBC_MULTIPLE|DBC_MULTIPLE_KEY); 706 if (ret != 0) 707 goto err; 708 709 cp_n = dbc_n->internal; 710 711 /* 712 * We may be referencing a new off-page duplicates tree. Acquire 713 * a new cursor and call the underlying function. 714 */ 715 if (pgno != PGNO_INVALID) { 716 if ((ret = __dbc_newopd(dbc_arg, 717 pgno, cp_n->opd, &cp_n->opd)) != 0) 718 goto err; 719 720 switch (flags) { 721 case DB_FIRST: 722 case DB_NEXT: 723 case DB_NEXT_NODUP: 724 case DB_SET: 725 case DB_SET_RECNO: 726 case DB_SET_RANGE: 727 tmp_flags = DB_FIRST; 728 break; 729 case DB_LAST: 730 case DB_PREV: 731 case DB_PREV_NODUP: 732 tmp_flags = DB_LAST; 733 break; 734 case DB_GET_BOTH: 735 case DB_GET_BOTHC: 736 case DB_GET_BOTH_RANGE: 737 tmp_flags = flags; 738 break; 739 default: 740 ret = __db_unknown_flag(env, "__dbc_get", flags); 741 goto err; 742 } 743 ret = cp_n->opd->am_get(cp_n->opd, key, data, tmp_flags, NULL); 744 /* 745 * Another cursor may have deleted all of the off-page 746 * duplicates, so for DB_NEXT and DB_PREV operations we need to 747 * retry on the parent cursor. 748 */ 749 switch (ret) { 750 case 0: 751 break; 752 case DB_NOTFOUND: 753 /* 754 * Translate DB_NOTFOUND failures for the DB_NEXT and 755 * DB_PREV operations into a subsequent operation on 756 * the parent cursor. 757 */ 758 if (flags == DB_NEXT || flags == DB_PREV) { 759 if ((ret = __dbc_close(cp_n->opd)) != 0) 760 goto err; 761 cp_n->opd = NULL; 762 goto retry; 763 } 764 goto err; 765 default: 766 goto err; 767 } 768 } 769 770done: /* 771 * Return a key/data item. The only exception is that we don't return 772 * a key if the user already gave us one, that is, if the DB_SET flag 773 * was set. The DB_SET flag is necessary. In a Btree, the user's key 774 * doesn't have to be the same as the key stored the tree, depending on 775 * the magic performed by the comparison function. As we may not have 776 * done any key-oriented operation here, the page reference may not be 777 * valid. Fill it in as necessary. We don't have to worry about any 778 * locks, the cursor must already be holding appropriate locks. 779 * 780 * XXX 781 * If not a Btree and DB_SET_RANGE is set, we shouldn't return a key 782 * either, should we? 783 */ 784 cp_n = dbc_n == NULL ? dbc_arg->internal : dbc_n->internal; 785 if (!F_ISSET(key, DB_DBT_ISSET)) { 786 if (cp_n->page == NULL && (ret = __memp_fget(mpf, &cp_n->pgno, 787 dbc_arg->thread_info, dbc_arg->txn, 0, &cp_n->page)) != 0) 788 goto err; 789 790 if ((ret = __db_ret(dbp, dbc_arg->thread_info, 791 dbc_arg->txn, cp_n->page, cp_n->indx, key, 792 &dbc_arg->rkey->data, &dbc_arg->rkey->ulen)) != 0) { 793 /* 794 * If the key DBT is too small, we still want to return 795 * the size of the data. Otherwise applications are 796 * forced to check each one with a separate call. We 797 * don't want to copy the data, so we set the ulen to 798 * zero before calling __db_ret. 799 */ 800 if (ret == DB_BUFFER_SMALL && 801 F_ISSET(data, DB_DBT_USERMEM)) { 802 key_small = 1; 803 orig_ulen = data->ulen; 804 data->ulen = 0; 805 } else 806 goto err; 807 } 808 } 809 if (multi != 0) { 810 /* 811 * Even if fetching from the OPD cursor we need a duplicate 812 * primary cursor if we are going after multiple keys. 813 */ 814 if (dbc_n == NULL) { 815 /* 816 * Non-"_KEY" DB_MULTIPLE doesn't move the main cursor, 817 * so it's safe to just use dbc_arg, unless dbc_arg 818 * has an open OPD cursor whose state might need to 819 * be preserved. 820 */ 821 if ((!(multi & DB_MULTIPLE_KEY) && 822 dbc_arg->internal->opd == NULL) || 823 F_ISSET(dbc_arg, DBC_TRANSIENT)) 824 dbc_n = dbc_arg; 825 else { 826 if ((ret = __dbc_idup(dbc_arg, 827 &dbc_n, DB_POSITION)) != 0) 828 goto err; 829 if ((ret = dbc_n->am_get(dbc_n, 830 key, data, DB_CURRENT, &pgno)) != 0) 831 goto err; 832 } 833 cp_n = dbc_n->internal; 834 } 835 836 /* 837 * If opd is set then we dupped the opd that we came in with. 838 * When we return we may have a new opd if we went to another 839 * key. 840 */ 841 if (opd != NULL) { 842 DB_ASSERT(env, cp_n->opd == NULL); 843 cp_n->opd = opd; 844 opd = NULL; 845 } 846 847 /* 848 * Bulk get doesn't use __db_retcopy, so data.size won't 849 * get set up unless there is an error. Assume success 850 * here. This is the only call to am_bulk, and it avoids 851 * setting it exactly the same everywhere. If we have an 852 * DB_BUFFER_SMALL error, it'll get overwritten with the 853 * needed value. 854 */ 855 data->size = data->ulen; 856 ret = dbc_n->am_bulk(dbc_n, data, flags | multi); 857 } else if (!F_ISSET(data, DB_DBT_ISSET)) { 858 dbc = opd != NULL ? opd : cp_n->opd != NULL ? cp_n->opd : dbc_n; 859 cp = dbc->internal; 860 if (cp->page == NULL && 861 (ret = __memp_fget(mpf, &cp->pgno, 862 dbc_arg->thread_info, dbc->txn, 0, &cp->page)) != 0) 863 goto err; 864 865 type = TYPE(cp->page); 866 indx_off = ((type == P_LBTREE || 867 type == P_HASH || type == P_HASH_UNSORTED) ? O_INDX : 0); 868 ret = __db_ret(dbp, 869 dbc->thread_info, dbc->txn, cp->page, cp->indx + indx_off, 870 data, &dbc_arg->rdata->data, &dbc_arg->rdata->ulen); 871 } 872 873err: /* Don't pass DB_DBT_ISSET back to application level, error or no. */ 874 F_CLR(key, DB_DBT_ISSET); 875 F_CLR(data, DB_DBT_ISSET); 876 877 /* Cleanup and cursor resolution. */ 878 if (opd != NULL) { 879 /* 880 * To support dirty reads we must reget the write lock 881 * if we have just stepped off a deleted record. 882 * Since the OPD cursor does not know anything 883 * about the referencing page or cursor we need 884 * to peek at the OPD cursor and get the lock here. 885 */ 886 if (F_ISSET(dbc_arg->dbp, DB_AM_READ_UNCOMMITTED) && 887 F_ISSET((BTREE_CURSOR *) 888 dbc_arg->internal->opd->internal, C_DELETED)) 889 if ((t_ret = 890 dbc_arg->am_writelock(dbc_arg)) != 0 && ret == 0) 891 ret = t_ret; 892 if ((t_ret = __dbc_cleanup( 893 dbc_arg->internal->opd, opd, ret)) != 0 && ret == 0) 894 ret = t_ret; 895 896 } 897 898 if (key_small) { 899 data->ulen = orig_ulen; 900 if (ret == 0) 901 ret = DB_BUFFER_SMALL; 902 } 903 904 if ((t_ret = __dbc_cleanup(dbc_arg, dbc_n, ret)) != 0 && 905 (ret == 0 || ret == DB_BUFFER_SMALL)) 906 ret = t_ret; 907 908 if (flags == DB_CONSUME || flags == DB_CONSUME_WAIT) 909 CDB_LOCKING_DONE(env, dbc_arg); 910 return (ret); 911} 912 913/* 914 * __dbc_put -- 915 * Put using a cursor. 916 * 917 * PUBLIC: int __dbc_put __P((DBC *, DBT *, DBT *, u_int32_t)); 918 */ 919int 920__dbc_put(dbc_arg, key, data, flags) 921 DBC *dbc_arg; 922 DBT *key, *data; 923 u_int32_t flags; 924{ 925 DB *dbp, *sdbp; 926 DBC *dbc_n, *fdbc, *oldopd, *opd, *sdbc, *pdbc; 927 DBT *all_skeys, *skeyp, *tskeyp; 928 DBT fdata, olddata, oldpkey, newdata, pkey, temppkey, tempskey; 929 ENV *env; 930 db_pgno_t pgno; 931 int cmp, have_oldrec, ispartial, nodel, re_pad, ret, s_count, t_ret; 932 u_int32_t re_len, nskey, rmw, size, tmp_flags; 933 934 /* 935 * Cursor Cleanup Note: 936 * All of the cursors passed to the underlying access methods by this 937 * routine are duplicated cursors. On return, any referenced pages 938 * will be discarded, and, if the cursor is not intended to be used 939 * again, the close function will be called. So, pages/locks that 940 * the cursor references do not need to be resolved by the underlying 941 * functions. 942 */ 943 dbp = dbc_arg->dbp; 944 env = dbp->env; 945 sdbp = NULL; 946 fdbc = pdbc = dbc_n = NULL; 947 all_skeys = NULL; 948 memset(&newdata, 0, sizeof(DBT)); 949 ret = s_count = 0; 950 951 /* 952 * We do multiple cursor operations in some cases and subsequently 953 * access the data DBT information. Set DB_DBT_MALLOC so we don't risk 954 * modification of the data between our uses of it. 955 */ 956 memset(&olddata, 0, sizeof(DBT)); 957 F_SET(&olddata, DB_DBT_MALLOC); 958 959 /* 960 * Putting to secondary indices is forbidden; when we need 961 * to internally update one, we'll call this with a private 962 * synonym for DB_KEYLAST, DB_UPDATE_SECONDARY, which does 963 * the right thing but won't return an error from cputchk(). 964 */ 965 if (flags == DB_UPDATE_SECONDARY) 966 flags = DB_KEYLAST; 967 968 CDB_LOCKING_INIT(env, dbc_arg); 969 970 /* 971 * Check to see if we are a primary and have secondary indices. 972 * If we are not, we save ourselves a good bit of trouble and 973 * just skip to the "normal" put. 974 */ 975 if (LIST_FIRST(&dbp->s_secondaries) == NULL) 976 goto skip_s_update; 977 978 /* 979 * We have at least one secondary which we may need to update. 980 * 981 * There is a rather vile locking issue here. Secondary gets 982 * will always involve acquiring a read lock in the secondary, 983 * then acquiring a read lock in the primary. Ideally, we 984 * would likewise perform puts by updating all the secondaries 985 * first, then doing the actual put in the primary, to avoid 986 * deadlock (since having multiple threads doing secondary 987 * gets and puts simultaneously is probably a common case). 988 * 989 * However, if this put is a put-overwrite--and we have no way to 990 * tell in advance whether it will be--we may need to delete 991 * an outdated secondary key. In order to find that old 992 * secondary key, we need to get the record we're overwriting, 993 * before we overwrite it. 994 * 995 * (XXX: It would be nice to avoid this extra get, and have the 996 * underlying put routines somehow pass us the old record 997 * since they need to traverse the tree anyway. I'm saving 998 * this optimization for later, as it's a lot of work, and it 999 * would be hard to fit into this locking paradigm anyway.) 1000 * 1001 * The simple thing to do would be to go get the old record before 1002 * we do anything else. Unfortunately, though, doing so would 1003 * violate our "secondary, then primary" lock acquisition 1004 * ordering--even in the common case where no old primary record 1005 * exists, we'll still acquire and keep a lock on the page where 1006 * we're about to do the primary insert. 1007 * 1008 * To get around this, we do the following gyrations, which 1009 * hopefully solve this problem in the common case: 1010 * 1011 * 1) If this is a c_put(DB_CURRENT), go ahead and get the 1012 * old record. We already hold the lock on this page in 1013 * the primary, so no harm done, and we'll need the primary 1014 * key (which we weren't passed in this case) to do any 1015 * secondary puts anyway. 1016 * 1017 * 2) If we're doing a partial put, we need to perform the 1018 * get on the primary key right away, since we don't have 1019 * the whole datum that the secondary key is based on. 1020 * We may also need to pad out the record if the primary 1021 * has a fixed record length. 1022 * 1023 * 3) Loop through the secondary indices, putting into each a 1024 * new secondary key that corresponds to the new record. 1025 * 1026 * 4) If we haven't done so in (1) or (2), get the old primary 1027 * key/data pair. If one does not exist--the common case--we're 1028 * done with secondary indices, and can go straight on to the 1029 * primary put. 1030 * 1031 * 5) If we do have an old primary key/data pair, however, we need 1032 * to loop through all the secondaries a second time and delete 1033 * the old secondary in each. 1034 */ 1035 memset(&pkey, 0, sizeof(DBT)); 1036 s_count = __db_s_count(dbp); 1037 if ((ret = __os_calloc( 1038 env, (u_int)s_count, sizeof(DBT), &all_skeys)) != 0) 1039 goto err; 1040 have_oldrec = nodel = 0; 1041 1042 /* 1043 * Primary indices can't have duplicates, so only DB_CURRENT, 1044 * DB_KEYFIRST, and DB_KEYLAST make any sense. Other flags 1045 * should have been caught by the checking routine, but 1046 * add a sprinkling of paranoia. 1047 */ 1048 DB_ASSERT(env, flags == DB_CURRENT || flags == DB_KEYFIRST || 1049 flags == DB_KEYLAST || flags == DB_NOOVERWRITE); 1050 1051 /* 1052 * We'll want to use DB_RMW in a few places, but it's only legal 1053 * when locking is on. 1054 */ 1055 rmw = STD_LOCKING(dbc_arg) ? DB_RMW : 0; 1056 1057 if (flags == DB_CURRENT) { /* Step 1. */ 1058 /* 1059 * This is safe to do on the cursor we already have; 1060 * error or no, it won't move. 1061 * 1062 * We use DB_RMW for all of these gets because we'll be 1063 * writing soon enough in the "normal" put code. In 1064 * transactional databases we'll hold those write locks 1065 * even if we close the cursor we're reading with. 1066 * 1067 * The DB_KEYEMPTY return needs special handling -- if the 1068 * cursor is on a deleted key, we return DB_NOTFOUND. 1069 */ 1070 ret = __dbc_get(dbc_arg, &pkey, &olddata, rmw | DB_CURRENT); 1071 if (ret == DB_KEYEMPTY) 1072 ret = DB_NOTFOUND; 1073 if (ret != 0) 1074 goto err; 1075 1076 have_oldrec = 1; /* We've looked for the old record. */ 1077 } else { 1078 /* Set pkey so we can use &pkey everywhere instead of key. */ 1079 pkey.data = key->data; 1080 pkey.size = key->size; 1081 } 1082 1083 /* 1084 * Check for partial puts (step 2). 1085 */ 1086 if (F_ISSET(data, DB_DBT_PARTIAL)) { 1087 if (!have_oldrec && !nodel) { 1088 /* 1089 * We're going to have to search the tree for the 1090 * specified key. Dup a cursor (so we have the same 1091 * locking info) and do a c_get. 1092 */ 1093 if ((ret = __dbc_idup(dbc_arg, &pdbc, 0)) != 0) 1094 goto err; 1095 1096 /* We should have gotten DB_CURRENT in step 1. */ 1097 DB_ASSERT(env, flags != DB_CURRENT); 1098 1099 ret = __dbc_get(pdbc, &pkey, &olddata, rmw | DB_SET); 1100 if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) { 1101 nodel = 1; 1102 ret = 0; 1103 } 1104 if ((t_ret = __dbc_close(pdbc)) != 0) 1105 ret = t_ret; 1106 if (ret != 0) 1107 goto err; 1108 1109 have_oldrec = 1; 1110 } 1111 1112 /* 1113 * Now build the new datum from olddata and the partial data we 1114 * were given. It's okay to do this if no record was returned 1115 * above: a partial put on an empty record is allowed, if a 1116 * little strange. The data is zero-padded. 1117 */ 1118 if ((ret = 1119 __db_buildpartial(dbp, &olddata, data, &newdata)) != 0) 1120 goto err; 1121 ispartial = 1; 1122 } else 1123 ispartial = 0; 1124 1125 /* 1126 * Handle fixed-length records. If the primary database has 1127 * fixed-length records, we need to pad out the datum before 1128 * we pass it into the callback function; we always index the 1129 * "real" record. 1130 */ 1131 if ((dbp->type == DB_RECNO && F_ISSET(dbp, DB_AM_FIXEDLEN)) || 1132 (dbp->type == DB_QUEUE)) { 1133 if (dbp->type == DB_QUEUE) { 1134 re_len = ((QUEUE *)dbp->q_internal)->re_len; 1135 re_pad = ((QUEUE *)dbp->q_internal)->re_pad; 1136 } else { 1137 re_len = ((BTREE *)dbp->bt_internal)->re_len; 1138 re_pad = ((BTREE *)dbp->bt_internal)->re_pad; 1139 } 1140 1141 size = ispartial ? newdata.size : data->size; 1142 if (size > re_len) { 1143 ret = __db_rec_toobig(env, size, re_len); 1144 goto err; 1145 } else if (size < re_len) { 1146 /* 1147 * If we're not doing a partial put, copy 1148 * data->data into newdata.data, then pad out 1149 * newdata.data. 1150 * 1151 * If we're doing a partial put, the data 1152 * we want are already in newdata.data; we 1153 * just need to pad. 1154 * 1155 * Either way, realloc is safe. 1156 */ 1157 if ((ret = 1158 __os_realloc(env, re_len, &newdata.data)) != 0) 1159 goto err; 1160 if (!ispartial) 1161 memcpy(newdata.data, data->data, size); 1162 memset((u_int8_t *)newdata.data + size, re_pad, 1163 re_len - size); 1164 newdata.size = re_len; 1165 ispartial = 1; 1166 } 1167 } 1168 1169 /* 1170 * Loop through the secondaries. (Step 3.) 1171 * 1172 * Note that __db_s_first and __db_s_next will take care of 1173 * thread-locking and refcounting issues. 1174 */ 1175 for (ret = __db_s_first(dbp, &sdbp), skeyp = all_skeys; 1176 sdbp != NULL && ret == 0; 1177 ret = __db_s_next(&sdbp, dbc_arg->txn), ++skeyp) { 1178 DB_ASSERT(env, skeyp - all_skeys < s_count); 1179 /* 1180 * Don't process this secondary if the key is immutable and we 1181 * know that the old record exists. This optimization can't be 1182 * used if we have not checked for the old record yet. 1183 */ 1184 if (have_oldrec && !nodel && 1185 FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY)) 1186 continue; 1187 1188 /* 1189 * Call the callback for this secondary, to get the 1190 * appropriate secondary key. 1191 */ 1192 if ((ret = sdbp->s_callback(sdbp, 1193 &pkey, ispartial ? &newdata : data, skeyp)) != 0) { 1194 /* Not indexing is equivalent to an empty key set. */ 1195 if (ret == DB_DONOTINDEX) { 1196 F_SET(skeyp, DB_DBT_MULTIPLE); 1197 skeyp->size = 0; 1198 ret = 0; 1199 } else 1200 goto err; 1201 } 1202 1203 if (sdbp->s_foreign != NULL && 1204 (ret = __db_cursor_int(sdbp->s_foreign, 1205 dbc_arg->thread_info, dbc_arg->txn, sdbp->s_foreign->type, 1206 PGNO_INVALID, 0, dbc_arg->locker, &fdbc)) != 0) 1207 goto err; 1208 1209 /* 1210 * Mark the secondary key DBT(s) as set -- that is, the 1211 * callback returned at least one secondary key. 1212 * 1213 * Also, if this secondary index is associated with a foreign 1214 * database, check that the foreign db contains the key(s) to 1215 * maintain referential integrity. Set flags in fdata to avoid 1216 * mem copying, we just need to know existence. We need to do 1217 * this check before setting DB_DBT_ISSET, otherwise __dbc_get 1218 * will overwrite the flag values. 1219 */ 1220 if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) { 1221#ifdef DIAGNOSTIC 1222 __db_check_skeyset(sdbp, skeyp); 1223#endif 1224 for (tskeyp = (DBT *)skeyp->data, nskey = skeyp->size; 1225 nskey > 0; nskey--, tskeyp++) { 1226 if (fdbc != NULL) { 1227 memset(&fdata, 0, sizeof(DBT)); 1228 F_SET(&fdata, 1229 DB_DBT_PARTIAL | DB_DBT_USERMEM); 1230 if ((ret = __dbc_get( 1231 fdbc, tskeyp, &fdata, 1232 DB_SET | rmw)) == DB_NOTFOUND || 1233 ret == DB_KEYEMPTY) { 1234 ret = DB_FOREIGN_CONFLICT; 1235 break; 1236 } 1237 } 1238 F_SET(tskeyp, DB_DBT_ISSET); 1239 } 1240 tskeyp = (DBT *)skeyp->data; 1241 nskey = skeyp->size; 1242 } else { 1243 if (fdbc != NULL) { 1244 memset(&fdata, 0, sizeof(DBT)); 1245 F_SET(&fdata, DB_DBT_PARTIAL | DB_DBT_USERMEM); 1246 if ((ret = __dbc_get(fdbc, skeyp, &fdata, 1247 DB_SET | rmw)) == DB_NOTFOUND || 1248 ret == DB_KEYEMPTY) 1249 ret = DB_FOREIGN_CONFLICT; 1250 } 1251 F_SET(skeyp, DB_DBT_ISSET); 1252 tskeyp = skeyp; 1253 nskey = 1; 1254 } 1255 if (fdbc != NULL && (t_ret = __dbc_close(fdbc)) != 0 && 1256 ret == 0) 1257 ret = t_ret; 1258 fdbc = NULL; 1259 if (ret != 0) 1260 goto err; 1261 1262 /* 1263 * If we have the old record, we can generate and remove any 1264 * old secondary key(s) now. We can also skip the secondary put 1265 * if there is no change. 1266 */ 1267 if (have_oldrec) { 1268 if ((ret = __dbc_del_oldskey(sdbp, dbc_arg, 1269 skeyp, &pkey, &olddata)) == DB_KEYEXIST) 1270 continue; 1271 else if (ret != 0) 1272 goto err; 1273 } 1274 if (nskey == 0) 1275 continue; 1276 1277 /* 1278 * Open a cursor in this secondary. 1279 * 1280 * Use the same locker ID as our primary cursor, so that 1281 * we're guaranteed that the locks don't conflict (e.g. in CDB 1282 * or if we're subdatabases that share and want to lock a 1283 * metadata page). 1284 */ 1285 if ((ret = __db_cursor_int(sdbp, dbc_arg->thread_info, 1286 dbc_arg->txn, sdbp->type, 1287 PGNO_INVALID, 0, dbc_arg->locker, &sdbc)) != 0) 1288 goto err; 1289 1290 /* 1291 * If we're in CDB, updates will fail since the new cursor 1292 * isn't a writer. However, we hold the WRITE lock in the 1293 * primary and will for as long as our new cursor lasts, 1294 * and the primary and secondary share a lock file ID, 1295 * so it's safe to consider this a WRITER. The close 1296 * routine won't try to put anything because we don't 1297 * really have a lock. 1298 */ 1299 if (CDB_LOCKING(env)) { 1300 DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID); 1301 F_SET(sdbc, DBC_WRITER); 1302 } 1303 1304 /* 1305 * Swap the primary key to the byte order of this secondary, if 1306 * necessary. By doing this now, we can compare directly 1307 * against the data already in the secondary without having to 1308 * swap it after reading. 1309 */ 1310 SWAP_IF_NEEDED(sdbp, &pkey); 1311 1312 for (; nskey > 0 && ret == 0; nskey--, tskeyp++) { 1313 /* Skip this key if it is already in the database. */ 1314 if (!F_ISSET(tskeyp, DB_DBT_ISSET)) 1315 continue; 1316 1317 /* 1318 * There are three cases here-- 1319 * 1) The secondary supports sorted duplicates. 1320 * If we attempt to put a secondary/primary pair 1321 * that already exists, that's a duplicate 1322 * duplicate, and c_put will return DB_KEYEXIST 1323 * (see __db_duperr). This will leave us with 1324 * exactly one copy of the secondary/primary pair, 1325 * and this is just right--we'll avoid deleting it 1326 * later, as the old and new secondaries will 1327 * match (since the old secondary is the dup dup 1328 * that's already there). 1329 * 2) The secondary supports duplicates, but they're not 1330 * sorted. We need to avoid putting a duplicate 1331 * duplicate, because the matching old and new 1332 * secondaries will prevent us from deleting 1333 * anything and we'll wind up with two secondary 1334 * records that point to the same primary key. Do 1335 * a c_get(DB_GET_BOTH); only do the put if the 1336 * secondary doesn't exist. 1337 * 3) The secondary doesn't support duplicates at all. 1338 * In this case, secondary keys must be unique; 1339 * if another primary key already exists for this 1340 * secondary key, we have to either overwrite it 1341 * or not put this one, and in either case we've 1342 * corrupted the secondary index. Do a 1343 * c_get(DB_SET). If the secondary/primary pair 1344 * already exists, do nothing; if the secondary 1345 * exists with a different primary, return an 1346 * error; and if the secondary does not exist, 1347 * put it. 1348 */ 1349 if (!F_ISSET(sdbp, DB_AM_DUP)) { 1350 /* Case 3. */ 1351 memset(&oldpkey, 0, sizeof(DBT)); 1352 F_SET(&oldpkey, DB_DBT_MALLOC); 1353 ret = __dbc_get(sdbc, 1354 tskeyp, &oldpkey, rmw | DB_SET); 1355 if (ret == 0) { 1356 cmp = __bam_defcmp(sdbp, 1357 &oldpkey, &pkey); 1358 __os_ufree(env, oldpkey.data); 1359 if (cmp != 0) { 1360 __db_errx(env, "%s%s", 1361 "Put results in a non-unique secondary key in an ", 1362 "index not configured to support duplicates"); 1363 ret = EINVAL; 1364 } 1365 } 1366 if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY) 1367 break; 1368 } else if (!F_ISSET(sdbp, DB_AM_DUPSORT)) { 1369 /* Case 2. */ 1370 DB_INIT_DBT(tempskey, 1371 tskeyp->data, tskeyp->size); 1372 DB_INIT_DBT(temppkey, 1373 pkey.data, pkey.size); 1374 ret = __dbc_get(sdbc, &tempskey, &temppkey, 1375 rmw | DB_GET_BOTH); 1376 if (ret != DB_NOTFOUND && ret != DB_KEYEMPTY) 1377 break; 1378 } 1379 1380 ret = __dbc_put(sdbc, tskeyp, &pkey, 1381 DB_UPDATE_SECONDARY); 1382 1383 /* 1384 * We don't know yet whether this was a put-overwrite 1385 * that in fact changed nothing. If it was, we may get 1386 * DB_KEYEXIST. This is not an error. 1387 */ 1388 if (ret == DB_KEYEXIST) 1389 ret = 0; 1390 } 1391 1392 /* Make sure the primary key is back in native byte-order. */ 1393 SWAP_IF_NEEDED(sdbp, &pkey); 1394 1395 if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0) 1396 ret = t_ret; 1397 1398 if (ret != 0) 1399 goto err; 1400 1401 /* 1402 * Mark that we have a key for this secondary so we can check 1403 * it later before deleting the old one. We can't set it 1404 * earlier or it would be cleared in the calls above. 1405 */ 1406 F_SET(skeyp, DB_DBT_ISSET); 1407 } 1408 if (ret != 0) 1409 goto err; 1410 1411 /* 1412 * If we've already got the old primary key/data pair, the secondary 1413 * updates are already done. 1414 */ 1415 if (have_oldrec) 1416 goto skip_s_update; 1417 1418 /* 1419 * If still necessary, go get the old primary key/data. (Step 4.) 1420 * 1421 * See the comments in step 2. This is real familiar. 1422 */ 1423 if ((ret = __dbc_idup(dbc_arg, &pdbc, 0)) != 0) 1424 goto err; 1425 DB_ASSERT(env, flags != DB_CURRENT); 1426 pkey.data = key->data; 1427 pkey.size = key->size; 1428 ret = __dbc_get(pdbc, &pkey, &olddata, rmw | DB_SET); 1429 if (ret == DB_KEYEMPTY || ret == DB_NOTFOUND) { 1430 nodel = 1; 1431 ret = 0; 1432 } 1433 if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0) 1434 ret = t_ret; 1435 if (ret != 0) 1436 goto err; 1437 1438 /* 1439 * Check whether we do in fact have an old record we may need to 1440 * delete. (Step 5). 1441 */ 1442 if (nodel) 1443 goto skip_s_update; 1444 1445 for (ret = __db_s_first(dbp, &sdbp), skeyp = all_skeys; 1446 sdbp != NULL && ret == 0; 1447 ret = __db_s_next(&sdbp, dbc_arg->txn), skeyp++) { 1448 DB_ASSERT(env, skeyp - all_skeys < s_count); 1449 /* 1450 * Don't process this secondary if the key is immutable. We 1451 * know that the old record exists, so this optimization can 1452 * always be used. 1453 */ 1454 if (FLD_ISSET(sdbp->s_assoc_flags, DB_ASSOC_IMMUTABLE_KEY)) 1455 continue; 1456 1457 if ((ret = __dbc_del_oldskey(sdbp, dbc_arg, 1458 skeyp, &pkey, &olddata)) != 0 && ret != DB_KEYEXIST) 1459 goto err; 1460 } 1461 if (ret != 0) 1462 goto err; 1463 1464 /* Secondary index updates are now done. On to the "real" stuff. */ 1465 1466skip_s_update: 1467 /* 1468 * If we have an off-page duplicates cursor, and the operation applies 1469 * to it, perform the operation. Duplicate the cursor and call the 1470 * underlying function. 1471 * 1472 * Off-page duplicate trees are locked in the primary tree, that is, 1473 * we acquire a write lock in the primary tree and no locks in the 1474 * off-page dup tree. If the put operation is done in an off-page 1475 * duplicate tree, call the primary cursor's upgrade routine first. 1476 */ 1477 if (dbc_arg->internal->opd != NULL && 1478 (flags == DB_AFTER || flags == DB_BEFORE || flags == DB_CURRENT)) { 1479 /* 1480 * A special case for hash off-page duplicates. Hash doesn't 1481 * support (and is documented not to support) put operations 1482 * relative to a cursor which references an already deleted 1483 * item. For consistency, apply the same criteria to off-page 1484 * duplicates as well. 1485 */ 1486 if (dbc_arg->dbtype == DB_HASH && F_ISSET( 1487 ((BTREE_CURSOR *)(dbc_arg->internal->opd->internal)), 1488 C_DELETED)) { 1489 ret = DB_NOTFOUND; 1490 goto err; 1491 } 1492 1493 if ((ret = dbc_arg->am_writelock(dbc_arg)) != 0 || 1494 (ret = __dbc_dup(dbc_arg, &dbc_n, DB_POSITION)) != 0) 1495 goto err; 1496 opd = dbc_n->internal->opd; 1497 if ((ret = opd->am_put( 1498 opd, key, data, flags, NULL)) != 0) 1499 goto err; 1500 goto done; 1501 } 1502 1503 /* 1504 * Perform an operation on the main cursor. Duplicate the cursor, 1505 * and call the underlying function. 1506 */ 1507 tmp_flags = flags == DB_AFTER || 1508 flags == DB_BEFORE || flags == DB_CURRENT ? DB_POSITION : 0; 1509 1510 /* 1511 * If this cursor is going to be closed immediately, we don't 1512 * need to take precautions to clean it up on error. 1513 */ 1514 if (F_ISSET(dbc_arg, DBC_TRANSIENT)) 1515 dbc_n = dbc_arg; 1516 else if ((ret = __dbc_idup(dbc_arg, &dbc_n, tmp_flags)) != 0) 1517 goto err; 1518 1519 pgno = PGNO_INVALID; 1520 if ((ret = dbc_n->am_put(dbc_n, key, data, flags, &pgno)) != 0) 1521 goto err; 1522 1523 /* 1524 * We may be referencing a new off-page duplicates tree. Acquire 1525 * a new cursor and call the underlying function. 1526 */ 1527 if (pgno != PGNO_INVALID) { 1528 oldopd = dbc_n->internal->opd; 1529 if ((ret = __dbc_newopd(dbc_arg, pgno, oldopd, &opd)) != 0) { 1530 dbc_n->internal->opd = opd; 1531 goto err; 1532 } 1533 1534 dbc_n->internal->opd = opd; 1535 1536 if (flags == DB_NOOVERWRITE) 1537 flags = DB_KEYLAST; 1538 if ((ret = opd->am_put( 1539 opd, key, data, flags, NULL)) != 0) 1540 goto err; 1541 } 1542 1543done: 1544err: /* Cleanup and cursor resolution. */ 1545 if ((t_ret = __dbc_cleanup(dbc_arg, dbc_n, ret)) != 0 && ret == 0) 1546 ret = t_ret; 1547 1548 /* If newdata or olddata were used, free their buffers. */ 1549 if (newdata.data != NULL) 1550 __os_free(env, newdata.data); 1551 if (olddata.data != NULL) 1552 __os_ufree(env, olddata.data); 1553 1554 CDB_LOCKING_DONE(env, dbc_arg); 1555 1556 if (sdbp != NULL && 1557 (t_ret = __db_s_done(sdbp, dbc_arg->txn)) != 0 && ret == 0) 1558 ret = t_ret; 1559 1560 for (skeyp = all_skeys; skeyp - all_skeys < s_count; skeyp++) { 1561 if (F_ISSET(skeyp, DB_DBT_MULTIPLE)) { 1562 for (nskey = skeyp->size, tskeyp = (DBT *)skeyp->data; 1563 nskey > 0; 1564 nskey--, tskeyp++) 1565 FREE_IF_NEEDED(env, tskeyp); 1566 } 1567 FREE_IF_NEEDED(env, skeyp); 1568 } 1569 if (all_skeys != NULL) 1570 __os_free(env, all_skeys); 1571 1572 return (ret); 1573} 1574 1575/* 1576 * __dbc_del_oldskey -- 1577 * Delete an old secondary key, if necessary. 1578 * Returns DB_KEYEXIST if the new and old keys match.. 1579 */ 1580static int 1581__dbc_del_oldskey(sdbp, dbc_arg, skey, pkey, olddata) 1582 DB *sdbp; 1583 DBC *dbc_arg; 1584 DBT *skey, *pkey, *olddata; 1585{ 1586 DB *dbp; 1587 DBC *sdbc; 1588 DBT *toldskeyp, *tskeyp; 1589 DBT oldskey, temppkey, tempskey; 1590 ENV *env; 1591 int ret, t_ret; 1592 u_int32_t i, noldskey, nsame, nskey, rmw; 1593 1594 sdbc = NULL; 1595 dbp = sdbp->s_primary; 1596 env = dbp->env; 1597 nsame = 0; 1598 rmw = STD_LOCKING(dbc_arg) ? DB_RMW : 0; 1599 1600 /* 1601 * Get the old secondary key. 1602 */ 1603 memset(&oldskey, 0, sizeof(DBT)); 1604 if ((ret = sdbp->s_callback(sdbp, pkey, olddata, &oldskey)) != 0) { 1605 if (ret == DB_DONOTINDEX || 1606 (F_ISSET(&oldskey, DB_DBT_MULTIPLE) && oldskey.size == 0)) 1607 /* There's no old key to delete. */ 1608 ret = 0; 1609 return (ret); 1610 } 1611 1612 if (F_ISSET(&oldskey, DB_DBT_MULTIPLE)) { 1613#ifdef DIAGNOSTIC 1614 __db_check_skeyset(sdbp, &oldskey); 1615#endif 1616 toldskeyp = (DBT *)oldskey.data; 1617 noldskey = oldskey.size; 1618 } else { 1619 toldskeyp = &oldskey; 1620 noldskey = 1; 1621 } 1622 1623 if (F_ISSET(skey, DB_DBT_MULTIPLE)) { 1624 nskey = skey->size; 1625 skey = (DBT *)skey->data; 1626 } else 1627 nskey = F_ISSET(skey, DB_DBT_ISSET) ? 1 : 0; 1628 1629 for (; noldskey > 0 && ret == 0; noldskey--, toldskeyp++) { 1630 /* 1631 * Check whether this old secondary key is also a new key 1632 * before we delete it. Note that bt_compare is (and must be) 1633 * set no matter what access method we're in. 1634 */ 1635 for (i = 0, tskeyp = skey; i < nskey; i++, tskeyp++) 1636 if (((BTREE *)sdbp->bt_internal)->bt_compare(sdbp, 1637 toldskeyp, tskeyp) == 0) { 1638 nsame++; 1639 F_CLR(tskeyp, DB_DBT_ISSET); 1640 break; 1641 } 1642 1643 if (i < nskey) { 1644 FREE_IF_NEEDED(env, toldskeyp); 1645 continue; 1646 } 1647 1648 if (sdbc == NULL) { 1649 if ((ret = __db_cursor_int(sdbp, 1650 dbc_arg->thread_info, dbc_arg->txn, sdbp->type, 1651 PGNO_INVALID, 0, dbc_arg->locker, &sdbc)) != 0) 1652 goto err; 1653 if (CDB_LOCKING(env)) { 1654 DB_ASSERT(env, 1655 sdbc->mylock.off == LOCK_INVALID); 1656 F_SET(sdbc, DBC_WRITER); 1657 } 1658 } 1659 1660 /* 1661 * Don't let c_get(DB_GET_BOTH) stomp on our data. Use 1662 * temporary DBTs instead. 1663 */ 1664 SWAP_IF_NEEDED(sdbp, pkey); 1665 DB_INIT_DBT(temppkey, pkey->data, pkey->size); 1666 DB_INIT_DBT(tempskey, toldskeyp->data, toldskeyp->size); 1667 if ((ret = __dbc_get(sdbc, 1668 &tempskey, &temppkey, rmw | DB_GET_BOTH)) == 0) 1669 ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY); 1670 else if (ret == DB_NOTFOUND) 1671 ret = __db_secondary_corrupt(dbp); 1672 SWAP_IF_NEEDED(sdbp, pkey); 1673 FREE_IF_NEEDED(env, toldskeyp); 1674 } 1675 1676err: for (; noldskey > 0; noldskey--, toldskeyp++) 1677 FREE_IF_NEEDED(env, toldskeyp); 1678 FREE_IF_NEEDED(env, &oldskey); 1679 if (sdbc != NULL && (t_ret = __dbc_close(sdbc)) != 0 && ret == 0) 1680 ret = t_ret; 1681 if (ret == 0 && nsame == nskey) 1682 return (DB_KEYEXIST); 1683 return (ret); 1684} 1685 1686/* 1687 * __db_duperr() 1688 * Error message: we don't currently support sorted duplicate duplicates. 1689 * PUBLIC: int __db_duperr __P((DB *, u_int32_t)); 1690 */ 1691int 1692__db_duperr(dbp, flags) 1693 DB *dbp; 1694 u_int32_t flags; 1695{ 1696 1697 /* 1698 * If we run into this error while updating a secondary index, 1699 * don't yell--there's no clean way to pass DB_NODUPDATA in along 1700 * with DB_UPDATE_SECONDARY, but we may run into this problem 1701 * in a normal, non-error course of events. 1702 * 1703 * !!! 1704 * If and when we ever permit duplicate duplicates in sorted-dup 1705 * databases, we need to either change the secondary index code 1706 * to check for dup dups, or we need to maintain the implicit 1707 * "DB_NODUPDATA" behavior for databases with DB_AM_SECONDARY set. 1708 */ 1709 if (flags != DB_NODUPDATA && !F_ISSET(dbp, DB_AM_SECONDARY)) 1710 __db_errx(dbp->env, 1711 "Duplicate data items are not supported with sorted data"); 1712 return (DB_KEYEXIST); 1713} 1714 1715/* 1716 * __dbc_cleanup -- 1717 * Clean up duplicate cursors. 1718 */ 1719static int 1720__dbc_cleanup(dbc, dbc_n, failed) 1721 DBC *dbc, *dbc_n; 1722 int failed; 1723{ 1724 DB *dbp; 1725 DBC *opd; 1726 DBC_INTERNAL *internal; 1727 DB_MPOOLFILE *mpf; 1728 int ret, t_ret; 1729 1730 dbp = dbc->dbp; 1731 mpf = dbp->mpf; 1732 internal = dbc->internal; 1733 ret = 0; 1734 1735 /* Discard any pages we're holding. */ 1736 if (internal->page != NULL) { 1737 if ((t_ret = __memp_fput(mpf, dbc->thread_info, 1738 internal->page, dbc->priority)) != 0 && ret == 0) 1739 ret = t_ret; 1740 internal->page = NULL; 1741 } 1742 opd = internal->opd; 1743 if (opd != NULL && opd->internal->page != NULL) { 1744 if ((t_ret = __memp_fput(mpf, dbc->thread_info, 1745 opd->internal->page, dbc->priority)) != 0 && ret == 0) 1746 ret = t_ret; 1747 opd->internal->page = NULL; 1748 } 1749 1750 /* 1751 * If dbc_n is NULL, there's no internal cursor swapping to be done 1752 * and no dbc_n to close--we probably did the entire operation on an 1753 * offpage duplicate cursor. Just return. 1754 * 1755 * If dbc and dbc_n are the same, we're either inside a DB->{put/get} 1756 * operation, and as an optimization we performed the operation on 1757 * the main cursor rather than on a duplicated one, or we're in a 1758 * bulk get that can't have moved the cursor (DB_MULTIPLE with the 1759 * initial c_get operation on an off-page dup cursor). Just 1760 * return--either we know we didn't move the cursor, or we're going 1761 * to close it before we return to application code, so we're sure 1762 * not to visibly violate the "cursor stays put on error" rule. 1763 */ 1764 if (dbc_n == NULL || dbc == dbc_n) 1765 return (ret); 1766 1767 if (dbc_n->internal->page != NULL) { 1768 if ((t_ret = __memp_fput(mpf, dbc->thread_info, 1769 dbc_n->internal->page, dbc->priority)) != 0 && ret == 0) 1770 ret = t_ret; 1771 dbc_n->internal->page = NULL; 1772 } 1773 opd = dbc_n->internal->opd; 1774 if (opd != NULL && opd->internal->page != NULL) { 1775 if ((t_ret = __memp_fput(mpf, dbc->thread_info, 1776 opd->internal->page, dbc->priority)) != 0 && ret == 0) 1777 ret = t_ret; 1778 opd->internal->page = NULL; 1779 } 1780 1781 /* 1782 * If we didn't fail before entering this routine or just now when 1783 * freeing pages, swap the interesting contents of the old and new 1784 * cursors. 1785 */ 1786 if (!failed && ret == 0) { 1787 dbc->internal = dbc_n->internal; 1788 dbc_n->internal = internal; 1789 } 1790 1791 /* 1792 * Close the cursor we don't care about anymore. The close can fail, 1793 * but we only expect DB_LOCK_DEADLOCK failures. This violates our 1794 * "the cursor is unchanged on error" semantics, but since all you can 1795 * do with a DB_LOCK_DEADLOCK failure is close the cursor, I believe 1796 * that's OK. 1797 * 1798 * XXX 1799 * There's no way to recover from failure to close the old cursor. 1800 * All we can do is move to the new position and return an error. 1801 * 1802 * XXX 1803 * We might want to consider adding a flag to the cursor, so that any 1804 * subsequent operations other than close just return an error? 1805 */ 1806 if ((t_ret = __dbc_close(dbc_n)) != 0 && ret == 0) 1807 ret = t_ret; 1808 1809 /* 1810 * If this was an update that is supporting dirty reads 1811 * then we may have just swapped our read for a write lock 1812 * which is held by the surviving cursor. We need 1813 * to explicitly downgrade this lock. The closed cursor 1814 * may only have had a read lock. 1815 */ 1816 if (F_ISSET(dbp, DB_AM_READ_UNCOMMITTED) && 1817 dbc->internal->lock_mode == DB_LOCK_WRITE) { 1818 if ((t_ret = 1819 __TLPUT(dbc, dbc->internal->lock)) != 0 && ret == 0) 1820 ret = t_ret; 1821 if (t_ret == 0) 1822 dbc->internal->lock_mode = DB_LOCK_WWRITE; 1823 } 1824 1825 return (ret); 1826} 1827 1828/* 1829 * __dbc_secondary_get_pp -- 1830 * This wrapper function for DBC->pget() is the DBC->get() function 1831 * for a secondary index cursor. 1832 * 1833 * PUBLIC: int __dbc_secondary_get_pp __P((DBC *, DBT *, DBT *, u_int32_t)); 1834 */ 1835int 1836__dbc_secondary_get_pp(dbc, skey, data, flags) 1837 DBC *dbc; 1838 DBT *skey, *data; 1839 u_int32_t flags; 1840{ 1841 DB_ASSERT(dbc->env, F_ISSET(dbc->dbp, DB_AM_SECONDARY)); 1842 return (__dbc_pget_pp(dbc, skey, NULL, data, flags)); 1843} 1844 1845/* 1846 * __dbc_pget -- 1847 * Get a primary key/data pair through a secondary index. 1848 * 1849 * PUBLIC: int __dbc_pget __P((DBC *, DBT *, DBT *, DBT *, u_int32_t)); 1850 */ 1851int 1852__dbc_pget(dbc, skey, pkey, data, flags) 1853 DBC *dbc; 1854 DBT *skey, *pkey, *data; 1855 u_int32_t flags; 1856{ 1857 DB *pdbp, *sdbp; 1858 DBC *dbc_n, *pdbc; 1859 DBT nullpkey; 1860 u_int32_t save_pkey_flags, tmp_flags, tmp_read_uncommitted, tmp_rmw; 1861 int pkeymalloc, ret, t_ret; 1862 1863 sdbp = dbc->dbp; 1864 pdbp = sdbp->s_primary; 1865 dbc_n = NULL; 1866 pkeymalloc = t_ret = 0; 1867 1868 /* 1869 * The challenging part of this function is getting the behavior 1870 * right for all the various permutations of DBT flags. The 1871 * next several blocks handle the various cases we need to 1872 * deal with specially. 1873 */ 1874 1875 /* 1876 * We may be called with a NULL pkey argument, if we've been 1877 * wrapped by a 2-DBT get call. If so, we need to use our 1878 * own DBT. 1879 */ 1880 if (pkey == NULL) { 1881 memset(&nullpkey, 0, sizeof(DBT)); 1882 pkey = &nullpkey; 1883 } 1884 1885 /* Clear OR'd in additional bits so we can check for flag equality. */ 1886 tmp_rmw = LF_ISSET(DB_RMW); 1887 LF_CLR(DB_RMW); 1888 1889 tmp_read_uncommitted = 1890 LF_ISSET(DB_READ_UNCOMMITTED) && 1891 !F_ISSET(dbc, DBC_READ_UNCOMMITTED); 1892 LF_CLR(DB_READ_UNCOMMITTED); 1893 1894 /* 1895 * DB_GET_RECNO is a special case, because we're interested not in 1896 * the primary key/data pair, but rather in the primary's record 1897 * number. 1898 */ 1899 if (flags == DB_GET_RECNO) { 1900 if (tmp_rmw) 1901 F_SET(dbc, DBC_RMW); 1902 if (tmp_read_uncommitted) 1903 F_SET(dbc, DBC_READ_UNCOMMITTED); 1904 ret = __dbc_pget_recno(dbc, pkey, data, flags); 1905 if (tmp_rmw) 1906 F_CLR(dbc, DBC_RMW); 1907 if (tmp_read_uncommitted) 1908 F_CLR(dbc, DBC_READ_UNCOMMITTED); 1909 return (ret); 1910 } 1911 1912 /* 1913 * If the DBTs we've been passed don't have any of the 1914 * user-specified memory management flags set, we want to make sure 1915 * we return values using the DBTs dbc->rskey, dbc->rkey, and 1916 * dbc->rdata, respectively. 1917 * 1918 * There are two tricky aspects to this: first, we need to pass 1919 * skey and pkey *in* to the initial c_get on the secondary key, 1920 * since either or both may be looked at by it (depending on the 1921 * get flag). Second, we must not use a normal DB->get call 1922 * on the secondary, even though that's what we want to accomplish, 1923 * because the DB handle may be free-threaded. Instead, 1924 * we open a cursor, then take steps to ensure that we actually use 1925 * the rkey/rdata from the *secondary* cursor. 1926 * 1927 * We accomplish all this by passing in the DBTs we started out 1928 * with to the c_get, but swapping the contents of rskey and rkey, 1929 * respectively, into rkey and rdata; __db_ret will treat them like 1930 * the normal key/data pair in a c_get call, and will realloc them as 1931 * need be (this is "step 1"). Then, for "step 2", we swap back 1932 * rskey/rkey/rdata to normal, and do a get on the primary with the 1933 * secondary dbc appointed as the owner of the returned-data memory. 1934 * 1935 * Note that in step 2, we copy the flags field in case we need to 1936 * pass down a DB_DBT_PARTIAL or other flag that is compatible with 1937 * letting DB do the memory management. 1938 */ 1939 1940 /* 1941 * It is correct, though slightly sick, to attempt a partial get of a 1942 * primary key. However, if we do so here, we'll never find the 1943 * primary record; clear the DB_DBT_PARTIAL field of pkey just for the 1944 * duration of the next call. 1945 */ 1946 save_pkey_flags = pkey->flags; 1947 F_CLR(pkey, DB_DBT_PARTIAL); 1948 1949 /* 1950 * Now we can go ahead with the meat of this call. First, get the 1951 * primary key from the secondary index. (What exactly we get depends 1952 * on the flags, but the underlying cursor get will take care of the 1953 * dirty work.) Duplicate the cursor, in case the later get on the 1954 * primary fails. 1955 */ 1956 switch (flags) { 1957 case DB_CURRENT: 1958 case DB_GET_BOTHC: 1959 case DB_NEXT: 1960 case DB_NEXT_DUP: 1961 case DB_NEXT_NODUP: 1962 case DB_PREV: 1963 case DB_PREV_DUP: 1964 case DB_PREV_NODUP: 1965 tmp_flags = DB_POSITION; 1966 break; 1967 default: 1968 tmp_flags = 0; 1969 break; 1970 } 1971 1972 if ((ret = __dbc_dup(dbc, &dbc_n, tmp_flags)) != 0) 1973 return (ret); 1974 1975 F_SET(dbc_n, DBC_TRANSIENT); 1976 1977 if (tmp_rmw) 1978 F_SET(dbc_n, DBC_RMW); 1979 if (tmp_read_uncommitted) 1980 F_SET(dbc_n, DBC_READ_UNCOMMITTED); 1981 1982 /* 1983 * If we've been handed a primary key, it will be in native byte order, 1984 * so we need to swap it before reading from the secondary. 1985 */ 1986 if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC || 1987 flags == DB_GET_BOTH_RANGE) 1988 SWAP_IF_NEEDED(sdbp, pkey); 1989 1990retry: /* Step 1. */ 1991 dbc_n->rdata = dbc->rkey; 1992 dbc_n->rkey = dbc->rskey; 1993 ret = __dbc_get(dbc_n, skey, pkey, flags); 1994 /* Restore pkey's flags in case we stomped the PARTIAL flag. */ 1995 pkey->flags = save_pkey_flags; 1996 1997 /* 1998 * We need to swap the primary key to native byte order if we read it 1999 * successfully, or if we swapped it on entry above. We can't return 2000 * with the application's data modified. 2001 */ 2002 if (ret == 0 || flags == DB_GET_BOTH || flags == DB_GET_BOTHC || 2003 flags == DB_GET_BOTH_RANGE) 2004 SWAP_IF_NEEDED(sdbp, pkey); 2005 2006 if (ret != 0) 2007 goto err; 2008 2009 /* 2010 * Now we're ready for "step 2". If either or both of pkey and data do 2011 * not have memory management flags set--that is, if DB is managing 2012 * their memory--we need to swap around the rkey/rdata structures so 2013 * that we don't wind up trying to use memory managed by the primary 2014 * database cursor, which we'll close before we return. 2015 * 2016 * !!! 2017 * If you're carefully following the bouncing ball, you'll note that in 2018 * the DB-managed case, the buffer hanging off of pkey is the same as 2019 * dbc->rkey->data. This is just fine; we may well realloc and stomp 2020 * on it when we return, if we're doing a DB_GET_BOTH and need to 2021 * return a different partial or key (depending on the comparison 2022 * function), but this is safe. 2023 * 2024 * !!! 2025 * We need to use __db_cursor_int here rather than simply calling 2026 * pdbp->cursor, because otherwise, if we're in CDB, we'll allocate a 2027 * new locker ID and leave ourselves open to deadlocks. (Even though 2028 * we're only acquiring read locks, we'll still block if there are any 2029 * waiters.) 2030 */ 2031 if ((ret = __db_cursor_int(pdbp, dbc->thread_info, 2032 dbc->txn, pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0) 2033 goto err; 2034 2035 if (tmp_read_uncommitted || F_ISSET(dbc, DBC_READ_UNCOMMITTED)) 2036 F_SET(pdbc, DBC_READ_UNCOMMITTED); 2037 if (tmp_rmw || F_ISSET(dbc, DBC_RMW)) 2038 F_SET(pdbc, DBC_RMW); 2039 if (F_ISSET(dbc, DBC_READ_COMMITTED)) 2040 F_SET(pdbc, DBC_READ_COMMITTED); 2041 2042 /* 2043 * We're about to use pkey a second time. If DB_DBT_MALLOC is set on 2044 * it, we'll leak the memory we allocated the first time. Thus, set 2045 * DB_DBT_REALLOC instead so that we reuse that memory instead of 2046 * leaking it. 2047 * 2048 * Alternatively, if the application is handling copying for pkey, we 2049 * need to take a copy now. The copy will be freed on exit from 2050 * __dbc_pget_pp (and we must be coming through there if DB_DBT_USERCOPY 2051 * is set). In the case of DB_GET_BOTH_RANGE, the pkey supplied by 2052 * the application has already been copied in but the value may have 2053 * changed in the search. In that case, free the original copy and get 2054 * a new one. 2055 * 2056 * !!! 2057 * This assumes that the user must always specify a compatible realloc 2058 * function if a malloc function is specified. I think this is a 2059 * reasonable requirement. 2060 */ 2061 if (F_ISSET(pkey, DB_DBT_MALLOC)) { 2062 F_CLR(pkey, DB_DBT_MALLOC); 2063 F_SET(pkey, DB_DBT_REALLOC); 2064 pkeymalloc = 1; 2065 } else if (F_ISSET(pkey, DB_DBT_USERCOPY)) { 2066 if (flags == DB_GET_BOTH_RANGE) 2067 __dbt_userfree(sdbp->env, NULL, pkey, NULL); 2068 if ((ret = __dbt_usercopy(sdbp->env, pkey)) != 0) 2069 goto err; 2070 } 2071 2072 /* 2073 * Do the actual get. Set DBC_TRANSIENT since we don't care about 2074 * preserving the position on error, and it's faster. SET_RET_MEM so 2075 * that the secondary DBC owns any returned-data memory. 2076 */ 2077 F_SET(pdbc, DBC_TRANSIENT); 2078 SET_RET_MEM(pdbc, dbc); 2079 ret = __dbc_get(pdbc, pkey, data, DB_SET); 2080 2081 /* 2082 * If the item wasn't found in the primary, this is a bug; our 2083 * secondary has somehow gotten corrupted, and contains elements that 2084 * don't correspond to anything in the primary. Complain. 2085 */ 2086 2087 /* Now close the primary cursor. */ 2088 if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0) 2089 ret = t_ret; 2090 2091 else if (ret == DB_NOTFOUND) { 2092 if (!F_ISSET(pdbc, DBC_READ_UNCOMMITTED)) 2093 ret = __db_secondary_corrupt(pdbp); 2094 else switch (flags) { 2095 case DB_GET_BOTHC: 2096 case DB_NEXT: 2097 case DB_NEXT_DUP: 2098 case DB_NEXT_NODUP: 2099 case DB_PREV: 2100 case DB_PREV_DUP: 2101 case DB_PREV_NODUP: 2102 goto retry; 2103 default: 2104 break; 2105 } 2106 } 2107 2108err: /* Cleanup and cursor resolution. */ 2109 if ((t_ret = __dbc_cleanup(dbc, dbc_n, ret)) != 0 && ret == 0) 2110 ret = t_ret; 2111 if (pkeymalloc) { 2112 /* 2113 * If pkey had a MALLOC flag, we need to restore it; otherwise, 2114 * if the user frees the buffer but reuses the DBT without 2115 * NULL'ing its data field or changing the flags, we may drop 2116 * core. 2117 */ 2118 F_CLR(pkey, DB_DBT_REALLOC); 2119 F_SET(pkey, DB_DBT_MALLOC); 2120 } 2121 2122 return (ret); 2123} 2124 2125/* 2126 * __dbc_pget_recno -- 2127 * Perform a DB_GET_RECNO c_pget on a secondary index. Returns 2128 * the secondary's record number in the pkey field and the primary's 2129 * in the data field. 2130 */ 2131static int 2132__dbc_pget_recno(sdbc, pkey, data, flags) 2133 DBC *sdbc; 2134 DBT *pkey, *data; 2135 u_int32_t flags; 2136{ 2137 DB *pdbp, *sdbp; 2138 DBC *pdbc; 2139 DBT discardme, primary_key; 2140 ENV *env; 2141 db_recno_t oob; 2142 u_int32_t rmw; 2143 int ret, t_ret; 2144 2145 sdbp = sdbc->dbp; 2146 pdbp = sdbp->s_primary; 2147 env = sdbp->env; 2148 pdbc = NULL; 2149 ret = t_ret = 0; 2150 2151 rmw = LF_ISSET(DB_RMW); 2152 2153 memset(&discardme, 0, sizeof(DBT)); 2154 F_SET(&discardme, DB_DBT_USERMEM | DB_DBT_PARTIAL); 2155 2156 oob = RECNO_OOB; 2157 2158 /* 2159 * If the primary is an rbtree, we want its record number, whether 2160 * or not the secondary is one too. Fetch the recno into "data". 2161 * 2162 * If it's not an rbtree, return RECNO_OOB in "data". 2163 */ 2164 if (F_ISSET(pdbp, DB_AM_RECNUM)) { 2165 /* 2166 * Get the primary key, so we can find the record number 2167 * in the primary. (We're uninterested in the secondary key.) 2168 */ 2169 memset(&primary_key, 0, sizeof(DBT)); 2170 F_SET(&primary_key, DB_DBT_MALLOC); 2171 if ((ret = __dbc_get(sdbc, 2172 &discardme, &primary_key, rmw | DB_CURRENT)) != 0) 2173 return (ret); 2174 2175 /* 2176 * Open a cursor on the primary, set it to the right record, 2177 * and fetch its recno into "data". 2178 * 2179 * (See __dbc_pget for comments on the use of __db_cursor_int.) 2180 * 2181 * SET_RET_MEM so that the secondary DBC owns any returned-data 2182 * memory. 2183 */ 2184 if ((ret = __db_cursor_int(pdbp, sdbc->thread_info, sdbc->txn, 2185 pdbp->type, PGNO_INVALID, 0, sdbc->locker, &pdbc)) != 0) 2186 goto perr; 2187 SET_RET_MEM(pdbc, sdbc); 2188 if ((ret = __dbc_get(pdbc, 2189 &primary_key, &discardme, rmw | DB_SET)) != 0) 2190 goto perr; 2191 2192 ret = __dbc_get(pdbc, &discardme, data, rmw | DB_GET_RECNO); 2193 2194perr: __os_ufree(env, primary_key.data); 2195 if (pdbc != NULL && 2196 (t_ret = __dbc_close(pdbc)) != 0 && ret == 0) 2197 ret = t_ret; 2198 if (ret != 0) 2199 return (ret); 2200 } else if ((ret = __db_retcopy(env, data, &oob, 2201 sizeof(oob), &sdbc->rkey->data, &sdbc->rkey->ulen)) != 0) 2202 return (ret); 2203 2204 /* 2205 * If the secondary is an rbtree, we want its record number, whether 2206 * or not the primary is one too. Fetch the recno into "pkey". 2207 * 2208 * If it's not an rbtree, return RECNO_OOB in "pkey". 2209 */ 2210 if (F_ISSET(sdbp, DB_AM_RECNUM)) 2211 return (__dbc_get(sdbc, &discardme, pkey, flags)); 2212 else 2213 return (__db_retcopy(env, pkey, &oob, 2214 sizeof(oob), &sdbc->rdata->data, &sdbc->rdata->ulen)); 2215} 2216 2217/* 2218 * __db_wrlock_err -- do not have a write lock. 2219 */ 2220static int 2221__db_wrlock_err(env) 2222 ENV *env; 2223{ 2224 __db_errx(env, "Write attempted on read-only cursor"); 2225 return (EPERM); 2226} 2227 2228/* 2229 * __dbc_del_secondary -- 2230 * Perform a delete operation on a secondary index: call through 2231 * to the primary and delete the primary record that this record 2232 * points to. 2233 * 2234 * Note that deleting the primary record will call c_del on all 2235 * the secondaries, including this one; thus, it is not necessary 2236 * to execute both this function and an actual delete. 2237 */ 2238static int 2239__dbc_del_secondary(dbc) 2240 DBC *dbc; 2241{ 2242 DB *pdbp; 2243 DBC *pdbc; 2244 DBT skey, pkey; 2245 ENV *env; 2246 int ret, t_ret; 2247 u_int32_t rmw; 2248 2249 pdbp = dbc->dbp->s_primary; 2250 env = pdbp->env; 2251 rmw = STD_LOCKING(dbc) ? DB_RMW : 0; 2252 2253 /* 2254 * Get the current item that we're pointing at. 2255 * We don't actually care about the secondary key, just 2256 * the primary. 2257 */ 2258 memset(&skey, 0, sizeof(DBT)); 2259 memset(&pkey, 0, sizeof(DBT)); 2260 F_SET(&skey, DB_DBT_PARTIAL | DB_DBT_USERMEM); 2261 if ((ret = __dbc_get(dbc, &skey, &pkey, DB_CURRENT)) != 0) 2262 return (ret); 2263 2264 SWAP_IF_NEEDED(dbc->dbp, &pkey); 2265 2266 /* 2267 * Create a cursor on the primary with our locker ID, 2268 * so that when it calls back, we don't conflict. 2269 * 2270 * We create a cursor explicitly because there's no 2271 * way to specify the same locker ID if we're using 2272 * locking but not transactions if we use the DB->del 2273 * interface. This shouldn't be any less efficient 2274 * anyway. 2275 */ 2276 if ((ret = __db_cursor_int(pdbp, dbc->thread_info, dbc->txn, 2277 pdbp->type, PGNO_INVALID, 0, dbc->locker, &pdbc)) != 0) 2278 return (ret); 2279 2280 /* 2281 * See comment in __dbc_put--if we're in CDB, 2282 * we already hold the locks we need, and we need to flag 2283 * the cursor as a WRITER so we don't run into errors 2284 * when we try to delete. 2285 */ 2286 if (CDB_LOCKING(env)) { 2287 DB_ASSERT(env, pdbc->mylock.off == LOCK_INVALID); 2288 F_SET(pdbc, DBC_WRITER); 2289 } 2290 2291 /* 2292 * Set the new cursor to the correct primary key. Then 2293 * delete it. We don't really care about the datum; 2294 * just reuse our skey DBT. 2295 * 2296 * If the primary get returns DB_NOTFOUND, something is amiss-- 2297 * every record in the secondary should correspond to some record 2298 * in the primary. 2299 */ 2300 if ((ret = __dbc_get(pdbc, &pkey, &skey, DB_SET | rmw)) == 0) 2301 ret = __dbc_del(pdbc, 0); 2302 else if (ret == DB_NOTFOUND) 2303 ret = __db_secondary_corrupt(pdbp); 2304 2305 if ((t_ret = __dbc_close(pdbc)) != 0 && ret == 0) 2306 ret = t_ret; 2307 2308 return (ret); 2309} 2310 2311/* 2312 * __dbc_del_primary -- 2313 * Perform a delete operation on a primary index. Loop through 2314 * all the secondary indices which correspond to this primary 2315 * database, and delete any secondary keys that point at the current 2316 * record. 2317 * 2318 * PUBLIC: int __dbc_del_primary __P((DBC *)); 2319 */ 2320int 2321__dbc_del_primary(dbc) 2322 DBC *dbc; 2323{ 2324 DB *dbp, *sdbp; 2325 DBC *sdbc; 2326 DBT *tskeyp; 2327 DBT data, pkey, skey, temppkey, tempskey; 2328 ENV *env; 2329 u_int32_t nskey, rmw; 2330 int ret, t_ret; 2331 2332 dbp = dbc->dbp; 2333 env = dbp->env; 2334 rmw = STD_LOCKING(dbc) ? DB_RMW : 0; 2335 2336 /* 2337 * If we're called at all, we have at least one secondary. 2338 * (Unfortunately, we can't assert this without grabbing the mutex.) 2339 * Get the current record so that we can construct appropriate 2340 * secondary keys as needed. 2341 */ 2342 memset(&pkey, 0, sizeof(DBT)); 2343 memset(&data, 0, sizeof(DBT)); 2344 if ((ret = __dbc_get(dbc, &pkey, &data, DB_CURRENT)) != 0) 2345 return (ret); 2346 2347 memset(&skey, 0, sizeof(DBT)); 2348 for (ret = __db_s_first(dbp, &sdbp); 2349 sdbp != NULL && ret == 0; 2350 ret = __db_s_next(&sdbp, dbc->txn)) { 2351 /* 2352 * Get the secondary key for this secondary and the current 2353 * item. 2354 */ 2355 if ((ret = sdbp->s_callback(sdbp, &pkey, &data, &skey)) != 0) { 2356 /* Not indexing is equivalent to an empty key set. */ 2357 if (ret == DB_DONOTINDEX) { 2358 F_SET(&skey, DB_DBT_MULTIPLE); 2359 skey.size = 0; 2360 } else /* We had a substantive error. Bail. */ 2361 goto err; 2362 } 2363 2364#ifdef DIAGNOSTIC 2365 if (F_ISSET(&skey, DB_DBT_MULTIPLE)) 2366 __db_check_skeyset(sdbp, &skey); 2367#endif 2368 2369 if (F_ISSET(&skey, DB_DBT_MULTIPLE)) { 2370 tskeyp = (DBT *)skey.data; 2371 nskey = skey.size; 2372 if (nskey == 0) 2373 continue; 2374 } else { 2375 tskeyp = &skey; 2376 nskey = 1; 2377 } 2378 2379 /* Open a secondary cursor. */ 2380 if ((ret = __db_cursor_int(sdbp, 2381 dbc->thread_info, dbc->txn, sdbp->type, 2382 PGNO_INVALID, 0, dbc->locker, &sdbc)) != 0) 2383 goto err; 2384 /* See comment above and in __dbc_put. */ 2385 if (CDB_LOCKING(env)) { 2386 DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID); 2387 F_SET(sdbc, DBC_WRITER); 2388 } 2389 2390 for (; nskey > 0; nskey--, tskeyp++) { 2391 /* 2392 * Set the secondary cursor to the appropriate item. 2393 * Delete it. 2394 * 2395 * We want to use DB_RMW if locking is on; it's only 2396 * legal then, though. 2397 * 2398 * !!! 2399 * Don't stomp on any callback-allocated buffer in skey 2400 * when we do a c_get(DB_GET_BOTH); use a temp DBT 2401 * instead. Similarly, don't allow pkey to be 2402 * invalidated when the cursor is closed. 2403 */ 2404 DB_INIT_DBT(tempskey, tskeyp->data, tskeyp->size); 2405 SWAP_IF_NEEDED(sdbp, &pkey); 2406 DB_INIT_DBT(temppkey, pkey.data, pkey.size); 2407 if ((ret = __dbc_get(sdbc, &tempskey, &temppkey, 2408 DB_GET_BOTH | rmw)) == 0) 2409 ret = __dbc_del(sdbc, DB_UPDATE_SECONDARY); 2410 else if (ret == DB_NOTFOUND) 2411 ret = __db_secondary_corrupt(dbp); 2412 SWAP_IF_NEEDED(sdbp, &pkey); 2413 FREE_IF_NEEDED(env, tskeyp); 2414 } 2415 2416 if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0) 2417 ret = t_ret; 2418 if (ret != 0) 2419 goto err; 2420 2421 /* 2422 * In the common case where there is a single secondary key, we 2423 * will have freed any application-allocated data in skey 2424 * already. In the multiple key case, we need to free it here. 2425 * It is safe to do this twice as the macro resets the data 2426 * field. 2427 */ 2428 FREE_IF_NEEDED(env, &skey); 2429 } 2430 2431err: if (sdbp != NULL && 2432 (t_ret = __db_s_done(sdbp, dbc->txn)) != 0 && ret == 0) 2433 ret = t_ret; 2434 FREE_IF_NEEDED(env, &skey); 2435 return (ret); 2436} 2437 2438/* 2439 * __dbc_del_foreign -- 2440 * Apply the foreign database constraints for a particular foreign 2441 * database when an item is being deleted (dbc points at item being deleted 2442 * in the foreign database.) 2443 * 2444 * Delete happens in dbp, check for occurrences of key in pdpb. 2445 * Terminology: 2446 * Foreign db = Where delete occurs (dbp). 2447 * Secondary db = Where references to dbp occur (sdbp, a secondary) 2448 * Primary db = sdbp's primary database, references to dbp are secondary 2449 * keys here 2450 * Foreign Key = Key being deleted in dbp (fkey) 2451 * Primary Key = Key of the corresponding entry in sdbp's primary (pkey). 2452 */ 2453static int 2454__dbc_del_foreign(dbc) 2455 DBC *dbc; 2456{ 2457 DB_FOREIGN_INFO *f_info; 2458 DB *dbp, *pdbp, *sdbp; 2459 DBC *pdbc, *sdbc; 2460 DBT data, fkey, pkey; 2461 ENV *env; 2462 u_int32_t flags, rmw; 2463 int changed, ret, t_ret; 2464 2465 dbp = dbc->dbp; 2466 env = dbp->env; 2467 2468 memset(&fkey, 0, sizeof(DBT)); 2469 memset(&data, 0, sizeof(DBT)); 2470 if ((ret = __dbc_get(dbc, &fkey, &data, DB_CURRENT)) != 0) 2471 return (ret); 2472 2473 LIST_FOREACH(f_info, &(dbp->f_primaries), f_links) { 2474 sdbp = f_info->dbp; 2475 pdbp = sdbp->s_primary; 2476 flags = f_info->flags; 2477 2478 rmw = (STD_LOCKING(dbc) && 2479 !LF_ISSET(DB_FOREIGN_ABORT)) ? DB_RMW : 0; 2480 2481 /* 2482 * Handle CDB locking. Some of this is copied from 2483 * __dbc_del_primary, but a bit more acrobatics are required. 2484 * If we're not going to abort, then we need to get a write 2485 * cursor. If CDB_ALLDB is set, then only one write cursor is 2486 * allowed and we hold it, so we fudge things and promote the 2487 * cursor on the other DBs manually, it won't cause a problem. 2488 * If CDB_ALLDB is not set, then we go through the usual route 2489 * to make sure we block as necessary. If there are any open 2490 * read cursors on sdbp, the delete or put call later will 2491 * block. 2492 * 2493 * If NULLIFY is set, we'll need a cursor on the primary to 2494 * update it with the nullified data. Because primary and 2495 * secondary dbs share a lock file ID in CDB, we open a cursor 2496 * on the secondary and then get another writeable cursor on the 2497 * primary via __db_cursor_int to avoid deadlocking. 2498 */ 2499 sdbc = pdbc = NULL; 2500 if (!LF_ISSET(DB_FOREIGN_ABORT) && CDB_LOCKING(env) && 2501 !F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)) { 2502 ret = __db_cursor(sdbp, 2503 dbc->thread_info, dbc->txn, &sdbc, DB_WRITECURSOR); 2504 if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0) { 2505 ret = __db_cursor_int(pdbp, 2506 dbc->thread_info, dbc->txn, pdbp->type, 2507 PGNO_INVALID, 0, dbc->locker, &pdbc); 2508 F_SET(pdbc, DBC_WRITER); 2509 } 2510 } else { 2511 ret = __db_cursor_int(sdbp, dbc->thread_info, dbc->txn, 2512 sdbp->type, PGNO_INVALID, 0, dbc->locker, &sdbc); 2513 if (LF_ISSET(DB_FOREIGN_NULLIFY) && ret == 0) 2514 ret = __db_cursor_int(pdbp, dbc->thread_info, 2515 dbc->txn, pdbp->type, PGNO_INVALID, 0, 2516 dbc->locker, &pdbc); 2517 } 2518 if (ret != 0) { 2519 if (sdbc != NULL) 2520 (void)__dbc_close(sdbc); 2521 return (ret); 2522 } 2523 if (CDB_LOCKING(env) && F_ISSET(env->dbenv, DB_ENV_CDB_ALLDB)){ 2524 DB_ASSERT(env, sdbc->mylock.off == LOCK_INVALID); 2525 F_SET(sdbc, DBC_WRITER); 2526 if (LF_ISSET(DB_FOREIGN_NULLIFY) && pdbc != NULL) { 2527 DB_ASSERT(env, 2528 pdbc->mylock.off == LOCK_INVALID); 2529 F_SET(pdbc, DBC_WRITER); 2530 } 2531 } 2532 2533 /* 2534 * There are three actions possible when a foreign database has 2535 * items corresponding to a deleted item: 2536 * DB_FOREIGN_ABORT - The delete operation should be aborted. 2537 * DB_FOREIGN_CASCADE - All corresponding foreign items should 2538 * be deleted. 2539 * DB_FOREIGN_NULLIFY - A callback needs to be made, allowing 2540 * the application to modify the data DBT from the 2541 * associated database. If the callback makes a 2542 * modification, the updated item needs to replace the 2543 * original item in the foreign db 2544 */ 2545 memset(&pkey, 0, sizeof(DBT)); 2546 memset(&data, 0, sizeof(DBT)); 2547 ret = __dbc_pget(sdbc, &fkey, &pkey, &data, DB_SET|rmw); 2548 2549 if (ret == DB_NOTFOUND) { 2550 /* No entry means no constraint */ 2551 ret = __dbc_close(sdbc); 2552 if (LF_ISSET(DB_FOREIGN_NULLIFY) && 2553 (t_ret = __dbc_close(pdbc)) != 0) 2554 ret = t_ret; 2555 if (ret != 0) 2556 return (ret); 2557 continue; 2558 } else if (ret != 0) { 2559 /* Just return the error code from the pget */ 2560 (void)__dbc_close(sdbc); 2561 if (LF_ISSET(DB_FOREIGN_NULLIFY)) 2562 (void)__dbc_close(pdbc); 2563 return (ret); 2564 } else if (LF_ISSET(DB_FOREIGN_ABORT)) { 2565 /* If the record exists and ABORT is set, we're done */ 2566 if ((ret = __dbc_close(sdbc)) != 0) 2567 return (ret); 2568 return (DB_FOREIGN_CONFLICT); 2569 } 2570 2571 /* 2572 * There were matching items in the primary DB, and the action 2573 * is either DB_FOREIGN_CASCADE or DB_FOREIGN_NULLIFY. 2574 */ 2575 while (ret == 0) { 2576 if (LF_ISSET(DB_FOREIGN_CASCADE)) { 2577 /* 2578 * Don't use the DB_UPDATE_SECONDARY flag, 2579 * since we want the delete to cascade into the 2580 * secondary's primary. 2581 */ 2582 if ((ret = __dbc_del(sdbc, 0)) != 0) { 2583 __db_err(env, ret, 2584 "Attempt to execute cascading delete in a foreign index failed"); 2585 break; 2586 } 2587 } else if (LF_ISSET(DB_FOREIGN_NULLIFY)) { 2588 changed = 0; 2589 if ((ret = f_info->callback(sdbp, 2590 &pkey, &data, &fkey, &changed)) != 0) { 2591 __db_err(env, ret, 2592 "Foreign database application callback"); 2593 break; 2594 } 2595 2596 /* 2597 * If the user callback modified the DBT and 2598 * a put on the primary failed. 2599 */ 2600 if (changed && (ret = __dbc_put(pdbc, 2601 &pkey, &data, DB_KEYFIRST)) != 0) { 2602 __db_err(env, ret, 2603 "Attempt to overwrite item in foreign database with nullified value failed"); 2604 break; 2605 } 2606 } 2607 /* retrieve the next matching item from the prim. db */ 2608 memset(&pkey, 0, sizeof(DBT)); 2609 memset(&data, 0, sizeof(DBT)); 2610 ret = __dbc_pget(sdbc, 2611 &fkey, &pkey, &data, DB_NEXT_DUP|rmw); 2612 } 2613 2614 if (ret == DB_NOTFOUND) 2615 ret = 0; 2616 if ((t_ret = __dbc_close(sdbc)) != 0 && ret == 0) 2617 ret = t_ret; 2618 if (LF_ISSET(DB_FOREIGN_NULLIFY) && 2619 (t_ret = __dbc_close(pdbc)) != 0 && ret == 0) 2620 ret = t_ret; 2621 if (ret != 0) 2622 return (ret); 2623 } 2624 2625 return (ret); 2626} 2627 2628/* 2629 * __db_s_first -- 2630 * Get the first secondary, if any are present, from the primary. 2631 * 2632 * PUBLIC: int __db_s_first __P((DB *, DB **)); 2633 */ 2634int 2635__db_s_first(pdbp, sdbpp) 2636 DB *pdbp, **sdbpp; 2637{ 2638 DB *sdbp; 2639 2640 MUTEX_LOCK(pdbp->env, pdbp->mutex); 2641 sdbp = LIST_FIRST(&pdbp->s_secondaries); 2642 2643 /* See __db_s_next. */ 2644 if (sdbp != NULL) 2645 sdbp->s_refcnt++; 2646 MUTEX_UNLOCK(pdbp->env, pdbp->mutex); 2647 2648 *sdbpp = sdbp; 2649 2650 return (0); 2651} 2652 2653/* 2654 * __db_s_next -- 2655 * Get the next secondary in the list. 2656 * 2657 * PUBLIC: int __db_s_next __P((DB **, DB_TXN *)); 2658 */ 2659int 2660__db_s_next(sdbpp, txn) 2661 DB **sdbpp; 2662 DB_TXN *txn; 2663{ 2664 DB *sdbp, *pdbp, *closeme; 2665 ENV *env; 2666 int ret; 2667 2668 /* 2669 * Secondary indices are kept in a linked list, s_secondaries, 2670 * off each primary DB handle. If a primary is free-threaded, 2671 * this list may only be traversed or modified while the primary's 2672 * thread mutex is held. 2673 * 2674 * The tricky part is that we don't want to hold the thread mutex 2675 * across the full set of secondary puts necessary for each primary 2676 * put, or we'll wind up essentially single-threading all the puts 2677 * to the handle; the secondary puts will each take about as 2678 * long as the primary does, and may require I/O. So we instead 2679 * hold the thread mutex only long enough to follow one link to the 2680 * next secondary, and then we release it before performing the 2681 * actual secondary put. 2682 * 2683 * The only danger here is that we might legitimately close a 2684 * secondary index in one thread while another thread is performing 2685 * a put and trying to update that same secondary index. To 2686 * prevent this from happening, we refcount the secondary handles. 2687 * If close is called on a secondary index handle while we're putting 2688 * to it, it won't really be closed--the refcount will simply drop, 2689 * and we'll be responsible for closing it here. 2690 */ 2691 sdbp = *sdbpp; 2692 pdbp = sdbp->s_primary; 2693 env = pdbp->env; 2694 closeme = NULL; 2695 2696 MUTEX_LOCK(env, pdbp->mutex); 2697 DB_ASSERT(env, sdbp->s_refcnt != 0); 2698 if (--sdbp->s_refcnt == 0) { 2699 LIST_REMOVE(sdbp, s_links); 2700 closeme = sdbp; 2701 } 2702 sdbp = LIST_NEXT(sdbp, s_links); 2703 if (sdbp != NULL) 2704 sdbp->s_refcnt++; 2705 MUTEX_UNLOCK(env, pdbp->mutex); 2706 2707 *sdbpp = sdbp; 2708 2709 /* 2710 * closeme->close() is a wrapper; call __db_close explicitly. 2711 */ 2712 if (closeme == NULL) 2713 ret = 0; 2714 else if (txn == NULL) 2715 ret = __db_close(closeme, NULL, 0); 2716 else 2717 ret = __txn_closeevent(env, txn, closeme); 2718 2719 return (ret); 2720} 2721 2722/* 2723 * __db_s_done -- 2724 * Properly decrement the refcount on a secondary database handle we're 2725 * using, without calling __db_s_next. 2726 * 2727 * PUBLIC: int __db_s_done __P((DB *, DB_TXN *)); 2728 */ 2729int 2730__db_s_done(sdbp, txn) 2731 DB *sdbp; 2732 DB_TXN *txn; 2733{ 2734 DB *pdbp; 2735 ENV *env; 2736 int doclose, ret; 2737 2738 pdbp = sdbp->s_primary; 2739 env = pdbp->env; 2740 doclose = 0; 2741 2742 MUTEX_LOCK(env, pdbp->mutex); 2743 DB_ASSERT(env, sdbp->s_refcnt != 0); 2744 if (--sdbp->s_refcnt == 0) { 2745 LIST_REMOVE(sdbp, s_links); 2746 doclose = 1; 2747 } 2748 MUTEX_UNLOCK(env, pdbp->mutex); 2749 2750 if (doclose == 0) 2751 ret = 0; 2752 else if (txn == NULL) 2753 ret = __db_close(sdbp, NULL, 0); 2754 else 2755 ret = __txn_closeevent(env, txn, sdbp); 2756 return (ret); 2757} 2758 2759/* 2760 * __db_s_count -- 2761 * Count the number of secondaries associated with a given primary. 2762 */ 2763static int 2764__db_s_count(pdbp) 2765 DB *pdbp; 2766{ 2767 DB *sdbp; 2768 ENV *env; 2769 int count; 2770 2771 env = pdbp->env; 2772 count = 0; 2773 2774 MUTEX_LOCK(env, pdbp->mutex); 2775 for (sdbp = LIST_FIRST(&pdbp->s_secondaries); 2776 sdbp != NULL; 2777 sdbp = LIST_NEXT(sdbp, s_links)) 2778 ++count; 2779 MUTEX_UNLOCK(env, pdbp->mutex); 2780 2781 return (count); 2782} 2783 2784/* 2785 * __db_buildpartial -- 2786 * Build the record that will result after a partial put is applied to 2787 * an existing record. 2788 * 2789 * This should probably be merged with __bam_build, but that requires 2790 * a little trickery if we plan to keep the overflow-record optimization 2791 * in that function. 2792 */ 2793static int 2794__db_buildpartial(dbp, oldrec, partial, newrec) 2795 DB *dbp; 2796 DBT *oldrec, *partial, *newrec; 2797{ 2798 ENV *env; 2799 u_int32_t len, nbytes; 2800 u_int8_t *buf; 2801 int ret; 2802 2803 env = dbp->env; 2804 2805 DB_ASSERT(env, F_ISSET(partial, DB_DBT_PARTIAL)); 2806 2807 memset(newrec, 0, sizeof(DBT)); 2808 2809 nbytes = __db_partsize(oldrec->size, partial); 2810 newrec->size = nbytes; 2811 2812 if ((ret = __os_malloc(env, nbytes, &buf)) != 0) 2813 return (ret); 2814 newrec->data = buf; 2815 2816 /* Nul or pad out the buffer, for any part that isn't specified. */ 2817 memset(buf, 2818 F_ISSET(dbp, DB_AM_FIXEDLEN) ? ((BTREE *)dbp->bt_internal)->re_pad : 2819 0, nbytes); 2820 2821 /* Copy in any leading data from the original record. */ 2822 memcpy(buf, oldrec->data, 2823 partial->doff > oldrec->size ? oldrec->size : partial->doff); 2824 2825 /* Copy the data from partial. */ 2826 memcpy(buf + partial->doff, partial->data, partial->size); 2827 2828 /* Copy any trailing data from the original record. */ 2829 len = partial->doff + partial->dlen; 2830 if (oldrec->size > len) 2831 memcpy(buf + partial->doff + partial->size, 2832 (u_int8_t *)oldrec->data + len, oldrec->size - len); 2833 2834 return (0); 2835} 2836 2837/* 2838 * __db_partsize -- 2839 * Given the number of bytes in an existing record and a DBT that 2840 * is about to be partial-put, calculate the size of the record 2841 * after the put. 2842 * 2843 * This code is called from __bam_partsize. 2844 * 2845 * PUBLIC: u_int32_t __db_partsize __P((u_int32_t, DBT *)); 2846 */ 2847u_int32_t 2848__db_partsize(nbytes, data) 2849 u_int32_t nbytes; 2850 DBT *data; 2851{ 2852 2853 /* 2854 * There are really two cases here: 2855 * 2856 * Case 1: We are replacing some bytes that do not exist (i.e., they 2857 * are past the end of the record). In this case the number of bytes 2858 * we are replacing is irrelevant and all we care about is how many 2859 * bytes we are going to add from offset. So, the new record length 2860 * is going to be the size of the new bytes (size) plus wherever those 2861 * new bytes begin (doff). 2862 * 2863 * Case 2: All the bytes we are replacing exist. Therefore, the new 2864 * size is the oldsize (nbytes) minus the bytes we are replacing (dlen) 2865 * plus the bytes we are adding (size). 2866 */ 2867 if (nbytes < data->doff + data->dlen) /* Case 1 */ 2868 return (data->doff + data->size); 2869 2870 return (nbytes + data->size - data->dlen); /* Case 2 */ 2871} 2872 2873#ifdef DIAGNOSTIC 2874/* 2875 * __db_check_skeyset -- 2876 * Diagnostic check that the application's callback returns a set of 2877 * secondary keys without repeats. 2878 * 2879 * PUBLIC: #ifdef DIAGNOSTIC 2880 * PUBLIC: void __db_check_skeyset __P((DB *, DBT *)); 2881 * PUBLIC: #endif 2882 */ 2883void 2884__db_check_skeyset(sdbp, skeyp) 2885 DB *sdbp; 2886 DBT *skeyp; 2887{ 2888 DBT *firstkey, *lastkey, *key1, *key2; 2889 ENV *env; 2890 2891 env = sdbp->env; 2892 2893 firstkey = (DBT *)skeyp->data; 2894 lastkey = firstkey + skeyp->size; 2895 for (key1 = firstkey; key1 < lastkey; key1++) 2896 for (key2 = key1 + 1; key2 < lastkey; key2++) 2897 DB_ASSERT(env, 2898 ((BTREE *)sdbp->bt_internal)->bt_compare(sdbp, 2899 key1, key2) != 0); 2900} 2901#endif 2902