1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1999-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/btree.h" 14#include "dbinc/lock.h" 15#include "dbinc/log.h" 16#include "dbinc/mp.h" 17#include "dbinc/txn.h" 18 19static int __bam_compact_dups __P((DBC *, 20 PAGE **, u_int32_t, int, DB_COMPACT *, int *)); 21static int __bam_compact_int __P((DBC *, 22 DBT *, DBT *, u_int32_t, int *, DB_COMPACT *, int *)); 23static int __bam_compact_isdone __P((DBC *, DBT *, PAGE *, int *)); 24static int __bam_csearch __P((DBC *, DBT *, u_int32_t, int)); 25static int __bam_lock_tree __P((DBC *, EPG *, EPG *csp, u_int32_t, u_int32_t)); 26static int __bam_lock_subtree __P((DBC *, PAGE *, u_int32_t, u_int32_t)); 27static int __bam_merge __P((DBC *, 28 DBC *, u_int32_t, DBT *, DB_COMPACT *,int *)); 29static int __bam_merge_internal __P((DBC *, DBC *, int, DB_COMPACT *, int *)); 30static int __bam_merge_pages __P((DBC *, DBC *, DB_COMPACT *)); 31static int __bam_merge_records __P((DBC *, DBC*, u_int32_t, DB_COMPACT *)); 32static int __bam_truncate_internal_overflow __P((DBC *, PAGE *, DB_COMPACT *)); 33static int __bam_truncate_overflow __P((DBC *, 34 db_pgno_t, PAGE **, DB_COMPACT *)); 35static int __bam_truncate_page __P((DBC *, PAGE **, PAGE *, int)); 36static int __bam_truncate_root_page __P((DBC *, 37 PAGE *, u_int32_t, DB_COMPACT *)); 38 39#ifdef HAVE_FTRUNCATE 40static int __bam_free_freelist __P((DB *, DB_THREAD_INFO *, DB_TXN *)); 41static int __bam_savekey __P((DBC *, int, DBT *)); 42static int __bam_setup_freelist __P((DB *, db_pglist_t *, u_int32_t)); 43static int __bam_truncate_internal __P((DB *, 44 DB_THREAD_INFO *, DB_TXN *, DB_COMPACT *)); 45#endif 46 47#define SAVE_START \ 48 do { \ 49 save_data = *c_data; \ 50 ret = __db_retcopy(env, \ 51 &save_start, current.data, current.size, \ 52 &save_start.data, &save_start.ulen); \ 53 } while (0) 54 55/* 56 * Only restore those things that are negated by aborting the 57 * transaction. We don't restore the number of deadlocks, for example. 58 */ 59 60#define RESTORE_START \ 61 do { \ 62 c_data->compact_pages_free = \ 63 save_data.compact_pages_free; \ 64 c_data->compact_levels = save_data.compact_levels; \ 65 c_data->compact_truncate = save_data.compact_truncate; \ 66 ret = __db_retcopy(env, ¤t, \ 67 save_start.data, save_start.size, \ 68 ¤t.data, ¤t.ulen); \ 69 } while (0) 70 71/* 72 e __bam_compact -- compact a btree. 73 * 74 * PUBLIC: int __bam_compact __P((DB *, DB_THREAD_INFO *, DB_TXN *, 75 * PUBLIC: DBT *, DBT *, DB_COMPACT *, u_int32_t, DBT *)); 76 */ 77int 78__bam_compact(dbp, ip, txn, start, stop, c_data, flags, end) 79 DB *dbp; 80 DB_THREAD_INFO *ip; 81 DB_TXN *txn; 82 DBT *start, *stop; 83 DB_COMPACT *c_data; 84 u_int32_t flags; 85 DBT *end; 86{ 87 DBC *dbc; 88 DBT current, save_start; 89 DB_COMPACT save_data; 90 ENV *env; 91 u_int32_t factor, retry; 92 int deadlock, have_freelist, isdone, ret, span, t_ret, txn_local; 93 94#ifdef HAVE_FTRUNCATE 95 db_pglist_t *list; 96 db_pgno_t last_pgno; 97 u_int32_t nelems, truncated; 98#endif 99 100 env = dbp->env; 101 102 memset(¤t, 0, sizeof(current)); 103 memset(&save_start, 0, sizeof(save_start)); 104 dbc = NULL; 105 factor = 0; 106 have_freelist = deadlock = isdone = ret = span = 0; 107 ret = retry = 0; 108 109#ifdef HAVE_FTRUNCATE 110 list = NULL; 111 last_pgno = 0; 112 nelems = truncated = 0; 113#endif 114 115 /* 116 * We pass "current" to the internal routine, indicating where that 117 * routine should begin its work and expecting that it will return to 118 * us the last key that it processed. 119 */ 120 if (start != NULL && (ret = __db_retcopy(env, 121 ¤t, start->data, start->size, 122 ¤t.data, ¤t.ulen)) != 0) 123 return (ret); 124 125 if (IS_DB_AUTO_COMMIT(dbp, txn)) 126 txn_local = 1; 127 else 128 txn_local = 0; 129 if (!LF_ISSET(DB_FREE_SPACE | DB_FREELIST_ONLY)) 130 goto no_free; 131 if (LF_ISSET(DB_FREELIST_ONLY)) 132 LF_SET(DB_FREE_SPACE); 133 134#ifdef HAVE_FTRUNCATE 135 /* Sort the freelist and set up the in-memory list representation. */ 136 if (txn_local && (ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0) 137 goto err; 138 139 if ((ret = __db_free_truncate(dbp, ip, 140 txn, flags, c_data, &list, &nelems, &last_pgno)) != 0) { 141 LF_CLR(DB_FREE_SPACE); 142 goto terr; 143 } 144 145 /* If the freelist is empty and we are not filling, get out. */ 146 if (nelems == 0 && LF_ISSET(DB_FREELIST_ONLY)) { 147 ret = 0; 148 LF_CLR(DB_FREE_SPACE); 149 goto terr; 150 } 151 if ((ret = __bam_setup_freelist(dbp, list, nelems)) != 0) { 152 /* Someone else owns the free list. */ 153 if (ret == EBUSY) 154 ret = 0; 155 } 156 if (ret == 0) 157 have_freelist = 1; 158 159 /* Commit the txn and release the meta page lock. */ 160terr: if (txn_local) { 161 if ((t_ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0 && ret == 0) 162 ret = t_ret; 163 txn = NULL; 164 } 165 if (ret != 0) 166 goto err; 167 168 /* Save the number truncated so far, we will add what we get below. */ 169 truncated = c_data->compact_pages_truncated; 170 if (LF_ISSET(DB_FREELIST_ONLY)) 171 goto done; 172#endif 173 174 /* 175 * We want factor to be the target number of free bytes on each page, 176 * so we know when to stop adding items to a page. Make sure to 177 * subtract the page overhead when computing this target. This can 178 * result in a 1-2% error on the smallest page. 179 * First figure out how many bytes we should use: 180 */ 181no_free: 182 factor = dbp->pgsize - SIZEOF_PAGE; 183 if (c_data->compact_fillpercent != 0) { 184 factor *= c_data->compact_fillpercent; 185 factor /= 100; 186 } 187 /* Now convert to the number of free bytes to target. */ 188 factor = (dbp->pgsize - SIZEOF_PAGE) - factor; 189 190 if (c_data->compact_pages == 0) 191 c_data->compact_pages = DB_MAX_PAGES; 192 193 do { 194 deadlock = 0; 195 196 SAVE_START; 197 if (ret != 0) 198 break; 199 200 if (txn_local) { 201 if ((ret = __txn_begin(env, ip, NULL, &txn, 0)) != 0) 202 break; 203 204 if (c_data->compact_timeout != 0 && 205 (ret = __txn_set_timeout(txn, 206 c_data->compact_timeout, DB_SET_LOCK_TIMEOUT)) != 0) 207 goto err; 208 } 209 210 if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0) 211 goto err; 212 213 if ((ret = __bam_compact_int(dbc, ¤t, stop, factor, 214 &span, c_data, &isdone)) == 215 DB_LOCK_DEADLOCK && txn_local) { 216 /* 217 * We retry on deadlock. Cancel the statistics 218 * and reset the start point to before this 219 * iteration. 220 */ 221 deadlock = 1; 222 c_data->compact_deadlock++; 223 RESTORE_START; 224 } 225 /* 226 * If we could not get a lock while holding an internal 227 * node latched, commit the current local transaction otherwise 228 * report a deadlock. 229 */ 230 if (ret == DB_LOCK_NOTGRANTED) { 231 if (txn_local || retry++ < 5) 232 ret = 0; 233 else 234 ret = DB_LOCK_DEADLOCK; 235 } else 236 retry = 0; 237 238 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) 239 ret = t_ret; 240 241err: if (txn_local && txn != NULL) { 242 if (ret == 0 && deadlock == 0) 243 ret = __txn_commit(txn, DB_TXN_NOSYNC); 244 else if ((t_ret = __txn_abort(txn)) != 0 && ret == 0) 245 ret = t_ret; 246 txn = NULL; 247 } 248 } while (ret == 0 && !isdone); 249 250 if (ret == 0 && end != NULL) 251 ret = __db_retcopy(env, end, current.data, current.size, 252 &end->data, &end->ulen); 253 if (current.data != NULL) 254 __os_free(env, current.data); 255 if (save_start.data != NULL) 256 __os_free(env, save_start.data); 257 258#ifdef HAVE_FTRUNCATE 259 /* 260 * Finish up truncation work. If there are pages left in the free 261 * list then search the internal nodes of the tree as we may have 262 * missed some while walking the leaf nodes. Then calculate how 263 * many pages we have truncated and release the in-memory free list. 264 */ 265done: if (LF_ISSET(DB_FREE_SPACE)) { 266 DBMETA *meta; 267 db_pgno_t pgno; 268 269 pgno = PGNO_BASE_MD; 270 isdone = 1; 271 if (ret == 0 && !LF_ISSET(DB_FREELIST_ONLY) && (t_ret = 272 __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) { 273 isdone = meta->free == PGNO_INVALID; 274 ret = __memp_fput(dbp->mpf, ip, meta, dbp->priority); 275 } 276 277 if (!isdone) 278 ret = __bam_truncate_internal(dbp, ip, txn, c_data); 279 280 /* Clean up the free list. */ 281 if (list != NULL) 282 __os_free(env, list); 283 284 if ((t_ret = 285 __memp_fget(dbp->mpf, &pgno, ip, txn, 0, &meta)) == 0) { 286 c_data->compact_pages_truncated = 287 truncated + last_pgno - meta->last_pgno; 288 if ((t_ret = __memp_fput(dbp->mpf, ip, 289 meta, dbp->priority)) != 0 && ret == 0) 290 ret = t_ret; 291 } else if (ret == 0) 292 ret = t_ret; 293 294 if (have_freelist && (t_ret = 295 __bam_free_freelist(dbp, ip, txn)) != 0 && ret == 0) 296 t_ret = ret; 297 } 298#endif 299 300 return (ret); 301} 302 303/* 304 * __bam_csearch -- isolate search code for bam_compact. 305 * This routine hides the differences between searching 306 * a BTREE and a RECNO from the rest of the code. 307 */ 308#define CS_READ 0 /* We are just reading. */ 309#define CS_PARENT 1 /* We want the parent too, write lock. */ 310#define CS_NEXT 2 /* Get the next page. */ 311#define CS_NEXT_WRITE 3 /* Get the next page and write lock. */ 312#define CS_DEL 4 /* Get a stack to delete a page. */ 313#define CS_START 5 /* Starting level for stack, write lock. */ 314#define CS_NEXT_BOTH 6 /* Get this page and the next, write lock. */ 315#define CS_GETRECNO 0x80 /* Extract record number from start. */ 316 317static int 318__bam_csearch(dbc, start, sflag, level) 319 DBC *dbc; 320 DBT *start; 321 u_int32_t sflag; 322 int level; 323{ 324 BTREE_CURSOR *cp; 325 int not_used, ret; 326 327 cp = (BTREE_CURSOR *)dbc->internal; 328 329 if (dbc->dbtype == DB_RECNO) { 330 /* If GETRECNO is not set the cp->recno is what we want. */ 331 if (FLD_ISSET(sflag, CS_GETRECNO)) { 332 if (start == NULL || start->size == 0) 333 cp->recno = 1; 334 else if ((ret = 335 __ram_getno(dbc, start, &cp->recno, 0)) != 0) 336 return (ret); 337 FLD_CLR(sflag, CS_GETRECNO); 338 } 339 switch (sflag) { 340 case CS_READ: 341 sflag = SR_READ; 342 break; 343 case CS_NEXT: 344 sflag = SR_PARENT | SR_READ; 345 break; 346 case CS_START: 347 level = LEAFLEVEL; 348 /* FALLTHROUGH */ 349 case CS_DEL: 350 case CS_NEXT_WRITE: 351 sflag = SR_STACK; 352 break; 353 case CS_NEXT_BOTH: 354 sflag = SR_BOTH | SR_NEXT | SR_WRITE; 355 break; 356 case CS_PARENT: 357 sflag = SR_PARENT | SR_WRITE; 358 break; 359 default: 360 return (__env_panic(dbc->env, EINVAL)); 361 } 362 if ((ret = __bam_rsearch(dbc, 363 &cp->recno, sflag, level, ¬_used)) != 0) 364 return (ret); 365 /* Reset the cursor's recno to the beginning of the page. */ 366 cp->recno -= cp->csp->indx; 367 } else { 368 FLD_CLR(sflag, CS_GETRECNO); 369 switch (sflag) { 370 case CS_READ: 371 sflag = SR_READ | SR_DUPFIRST; 372 break; 373 case CS_DEL: 374 sflag = SR_DEL; 375 break; 376 case CS_NEXT: 377 sflag = SR_NEXT; 378 break; 379 case CS_NEXT_WRITE: 380 sflag = SR_NEXT | SR_WRITE; 381 break; 382 case CS_NEXT_BOTH: 383 sflag = SR_BOTH | SR_NEXT | SR_WRITE; 384 break; 385 case CS_START: 386 sflag = SR_START | SR_WRITE; 387 break; 388 case CS_PARENT: 389 sflag = SR_PARENT | SR_WRITE; 390 break; 391 default: 392 return (__env_panic(dbc->env, EINVAL)); 393 } 394 if (start == NULL || start->size == 0) 395 FLD_SET(sflag, SR_MIN); 396 397 if ((ret = __bam_search(dbc, 398 cp->root, start, sflag, level, NULL, ¬_used)) != 0) 399 return (ret); 400 } 401 402 return (0); 403} 404 405/* 406 * __bam_compact_int -- internal compaction routine. 407 * Called either with a cursor on the main database 408 * or a cursor initialized to the root of an off page duplicate 409 * tree. 410 */ 411static int 412__bam_compact_int(dbc, start, stop, factor, spanp, c_data, donep) 413 DBC *dbc; 414 DBT *start, *stop; 415 u_int32_t factor; 416 int *spanp; 417 DB_COMPACT *c_data; 418 int *donep; 419{ 420 BTREE_CURSOR *cp, *ncp; 421 DB *dbp; 422 DBC *ndbc; 423 DB_LOCK metalock, next_lock, nnext_lock, prev_lock, saved_lock; 424 DB_MPOOLFILE *dbmp; 425 ENV *env; 426 EPG *epg; 427 PAGE *pg, *ppg, *npg; 428 db_pgno_t metapgno, npgno, nnext_pgno; 429 db_pgno_t pgno, prev_pgno, ppgno, saved_pgno; 430 db_recno_t next_recno; 431 u_int32_t sflag, pgs_free; 432 int check_dups, check_trunc, clear_root, isdone; 433 int merged, nentry, next_p, pgs_done, ret, t_ret, tdone; 434 435#ifdef DEBUG 436#define CTRACE(dbc, location, t, start, f) do { \ 437 DBT __trace; \ 438 DB_SET_DBT(__trace, t, strlen(t)); \ 439 DEBUG_LWRITE( \ 440 dbc, (dbc)->txn, location, &__trace, start, f) \ 441 } while (0) 442#define PTRACE(dbc, location, p, start, f) do { \ 443 char __buf[32]; \ 444 (void)snprintf(__buf, \ 445 sizeof(__buf), "pgno: %lu", (u_long)p); \ 446 CTRACE(dbc, location, __buf, start, f); \ 447 } while (0) 448#else 449#define CTRACE(dbc, location, t, start, f) 450#define PTRACE(dbc, location, p, start, f) 451#endif 452 453 ndbc = NULL; 454 pg = NULL; 455 npg = NULL; 456 457 isdone = 0; 458 tdone = 0; 459 pgs_done = 0; 460 next_recno = 0; 461 next_p = 0; 462 clear_root = 0; 463 metapgno = PGNO_BASE_MD; 464 LOCK_INIT(next_lock); 465 LOCK_INIT(nnext_lock); 466 LOCK_INIT(saved_lock); 467 LOCK_INIT(metalock); 468 LOCK_INIT(prev_lock); 469 check_trunc = c_data->compact_truncate != PGNO_INVALID; 470 check_dups = (!F_ISSET(dbc, DBC_OPD) && 471 F_ISSET(dbc->dbp, DB_AM_DUP)) || check_trunc; 472 473 dbp = dbc->dbp; 474 env = dbp->env; 475 dbmp = dbp->mpf; 476 cp = (BTREE_CURSOR *)dbc->internal; 477 pgs_free = c_data->compact_pages_free; 478 479 /* Search down the tree for the starting point. */ 480 if ((ret = __bam_csearch(dbc, 481 start, CS_READ | CS_GETRECNO, LEAFLEVEL)) != 0) { 482 /* Its not an error to compact an empty db. */ 483 if (ret == DB_NOTFOUND) 484 ret = 0; 485 isdone = 1; 486 goto err; 487 } 488 489 /* 490 * Get the first leaf page. The loop below will change pg so 491 * we clear the stack reference so we don't put a a page twice. 492 */ 493 pg = cp->csp->page; 494 cp->csp->page = NULL; 495 next_recno = cp->recno; 496next: /* 497 * This is the start of the main compaction loop. There are 3 498 * parts to the process: 499 * 1) Walk the leaf pages of the tree looking for a page to 500 * process. We do this with read locks. Save the 501 * key from the page and release it. 502 * 2) Set up a cursor stack which will write lock the page 503 * and enough of its ancestors to get the job done. 504 * This could go to the root if we might delete a subtree 505 * or we have record numbers to update. 506 * 3) Loop fetching pages after the above page and move enough 507 * data to fill it. 508 * We exit the loop if we are at the end of the leaf pages, are 509 * about to lock a new subtree (we span) or on error. 510 */ 511 512 /* Walk the pages looking for something to fill up. */ 513 while ((npgno = NEXT_PGNO(pg)) != PGNO_INVALID) { 514 c_data->compact_pages_examine++; 515 PTRACE(dbc, "Next", PGNO(pg), start, 0); 516 517 /* If we have fetched the next page, get the new key. */ 518 if (next_p == 1 && 519 dbc->dbtype != DB_RECNO && NUM_ENT(pg) != 0) { 520 if ((ret = __db_ret(dbc, pg, 0, start, 521 &start->data, &start->ulen)) != 0) 522 goto err; 523 } 524 next_recno += NUM_ENT(pg); 525 if (P_FREESPACE(dbp, pg) > factor || 526 (check_trunc && PGNO(pg) > c_data->compact_truncate)) 527 break; 528 if (stop != NULL && stop->size > 0) { 529 if ((ret = __bam_compact_isdone(dbc, 530 stop, pg, &isdone)) != 0) 531 goto err; 532 if (isdone) 533 goto done; 534 } 535 536 /* 537 * The page does not need more data or to be swapped, 538 * check to see if we want to look at possible duplicate 539 * trees or overflow records and the move on to the next page. 540 */ 541 cp->recno += NUM_ENT(pg); 542 next_p = 1; 543 tdone = pgs_done; 544 PTRACE(dbc, "Dups", PGNO(pg), start, 0); 545 if (check_dups && (ret = __bam_compact_dups( 546 dbc, &pg, factor, 0, c_data, &pgs_done)) != 0) 547 goto err; 548 npgno = NEXT_PGNO(pg); 549 if ((ret = __memp_fput(dbmp, 550 dbc->thread_info, pg, dbc->priority)) != 0) 551 goto err; 552 pg = NULL; 553 /* 554 * If we don't do anything we don't need to hold 555 * the lock on the previous page, so couple always. 556 */ 557 if ((ret = __db_lget(dbc, 558 tdone == pgs_done ? LCK_COUPLE_ALWAYS : LCK_COUPLE, 559 npgno, DB_LOCK_READ, 0, &cp->csp->lock)) != 0) 560 goto err; 561 if ((ret = __memp_fget(dbmp, &npgno, 562 dbc->thread_info, dbc->txn, 0, &pg)) != 0) 563 goto err; 564 } 565 566 /* 567 * When we get here we have 3 cases: 568 * 1) We've reached the end of the leaf linked list and are done. 569 * 2) A page whose freespace exceeds our target and therefore needs 570 * to have data added to it. 571 * 3) A page that doesn't have too much free space but needs to be 572 * checked for truncation. 573 * In both cases 2 and 3, we need that page's first key or record 574 * number. We may already have it, if not get it here. 575 */ 576 if ((nentry = NUM_ENT(pg)) != 0) { 577 next_p = 0; 578 /* Get a copy of the first recno on the page. */ 579 if (dbc->dbtype == DB_RECNO) { 580 if ((ret = __db_retcopy(dbp->env, start, 581 &cp->recno, sizeof(cp->recno), 582 &start->data, &start->ulen)) != 0) 583 goto err; 584 } else if (start->size == 0 && (ret = __db_ret(dbc, 585 pg, 0, start, &start->data, &start->ulen)) != 0) 586 goto err; 587 588 if (npgno == PGNO_INVALID) { 589 /* End of the tree, check its duplicates and exit. */ 590 PTRACE(dbc, "GoDone", PGNO(pg), start, 0); 591 if (check_dups && (ret = __bam_compact_dups(dbc, 592 &pg, factor, 0, c_data, &pgs_done)) != 0) 593 goto err; 594 c_data->compact_pages_examine++; 595 isdone = 1; 596 goto done; 597 } 598 } 599 600 /* Release the page so we don't deadlock getting its parent. */ 601 if ((ret = __memp_fput(dbmp, dbc->thread_info, pg, dbc->priority)) != 0) 602 goto err; 603 if ((ret = __LPUT(dbc, cp->csp->lock)) != 0) 604 goto err; 605 BT_STK_CLR(cp); 606 pg = NULL; 607 saved_pgno = PGNO_INVALID; 608 prev_pgno = PGNO_INVALID; 609 nnext_pgno = PGNO_INVALID; 610 611 /* 612 * We must lock the metadata page first because we cannot block 613 * while holding interior nodes of the tree pinned. 614 */ 615 616 if (!LOCK_ISSET(metalock) && pgs_free == c_data->compact_pages_free && 617 (ret = __db_lget(dbc, 618 LCK_ALWAYS, metapgno, DB_LOCK_WRITE, 0, &metalock)) != 0) 619 goto err; 620 621 /* 622 * Setup the cursor stack. There are 3 cases: 623 * 1) the page is empty and will be deleted: nentry == 0. 624 * 2) the next page has the same parent: *spanp == 0. 625 * 3) the next page has a different parent: *spanp == 1. 626 * 627 * We now need to search the tree again, getting a write lock 628 * on the page we are going to merge or delete. We do this by 629 * searching down the tree and locking as much of the subtree 630 * above the page as needed. In the case of a delete we will 631 * find the maximal subtree that can be deleted. In the case 632 * of merge if the current page and the next page are siblings 633 * with the same parent then we only need to lock the parent. 634 * Otherwise *span will be set and we need to search to find the 635 * lowest common ancestor. Dbc will be set to contain the subtree 636 * containing the page to be merged or deleted. Ndbc will contain 637 * the minimal subtree containing that page and its next sibling. 638 * In all cases for DB_RECNO we simplify things and get the whole 639 * tree if we need more than a single parent. 640 * The tree can collapse while we don't have it locked, so the 641 * page we are looking for may be gone. If so we are at 642 * the right most end of the leaf pages and are done. 643 */ 644 645retry: pg = NULL; 646 if (npg != NULL && (ret = __memp_fput(dbmp, 647 dbc->thread_info, npg, dbc->priority)) != 0) 648 goto err; 649 npg = NULL; 650 if (ndbc != NULL) { 651 ncp = (BTREE_CURSOR *)ndbc->internal; 652 if (clear_root == 1) { 653 ncp->sp->page = NULL; 654 LOCK_INIT(ncp->sp->lock); 655 } 656 if ((ret = __bam_stkrel(ndbc, 0)) != 0) 657 goto err; 658 } 659 clear_root = 0; 660 /* Case 1 -- page is empty. */ 661 if (nentry == 0) { 662 CTRACE(dbc, "Empty", "", start, 0); 663 if (next_p == 1) 664 sflag = CS_NEXT_WRITE; 665 else 666 sflag = CS_DEL; 667 if ((ret = __bam_csearch(dbc, start, sflag, LEAFLEVEL)) != 0) { 668 isdone = 1; 669 if (ret == DB_NOTFOUND) 670 ret = 0; 671 goto err; 672 } 673 674 pg = cp->csp->page; 675 /* Check to see if the page is still empty. */ 676 if (NUM_ENT(pg) != 0) 677 npgno = PGNO(pg); 678 else { 679 npgno = NEXT_PGNO(pg); 680 /* If this is now the root, we are very done. */ 681 if (PGNO(pg) == cp->root) 682 isdone = 1; 683 else { 684 if (npgno != PGNO_INVALID) { 685 TRY_LOCK(dbc, npgno, saved_pgno, 686 next_lock, DB_LOCK_WRITE, retry); 687 if (ret != 0) 688 goto err; 689 } 690 if (PREV_PGNO(pg) != PGNO_INVALID) { 691 TRY_LOCK(dbc, PREV_PGNO(pg), prev_pgno, 692 prev_lock, DB_LOCK_WRITE, retry); 693 if (ret != 0) 694 goto err; 695 } 696 if ((ret = 697 __bam_dpages(dbc, 0, BTD_RELINK)) != 0) 698 goto err; 699 c_data->compact_pages_free++; 700 if ((ret = __TLPUT(dbc, prev_lock)) != 0) 701 goto err; 702 LOCK_INIT(prev_lock); 703 if ((ret = __TLPUT(dbc, next_lock)) != 0) 704 goto err; 705 LOCK_INIT(next_lock); 706 goto next_no_release; 707 } 708 } 709 goto next_page; 710 } 711 712 /* case 3 -- different parents. */ 713 if (*spanp) { 714 CTRACE(dbc, "Span", "", start, 0); 715 /* 716 * Search the tree looking for the page containing and 717 * the next page after the current key. 718 * The stack will be rooted at the page that spans 719 * the current and next pages. The two subtrees 720 * are returned below that. For BTREE the current 721 * page subtreee will be first while for RECNO the 722 * next page subtree will be first 723 */ 724 if (ndbc == NULL && (ret = __dbc_dup(dbc, &ndbc, 0)) != 0) 725 goto err; 726 ncp = (BTREE_CURSOR *)ndbc->internal; 727 728 ncp->recno = cp->recno; 729 cp->recno = next_recno; 730 731 if ((ret = __bam_csearch(dbc, start, CS_NEXT_BOTH, 0)) != 0) { 732 if (ret == DB_NOTFOUND) { 733 isdone = 1; 734 ret = 0; 735 } 736 goto err; 737 } 738 739 /* 740 * Find the top of the stack for the second subtree. 741 */ 742 for (epg = cp->csp - 1; epg > cp->sp; epg--) 743 if (LEVEL(epg->page) == LEAFLEVEL) 744 break; 745 DB_ASSERT(env, epg != cp->sp); 746 747 /* 748 * Copy the root. We will have two instances of the 749 * same page, be careful not to free both. 750 */ 751 BT_STK_PUSH(env, ncp, cp->sp->page, cp->sp->indx, 752 cp->sp->lock, cp->sp->lock_mode, ret); 753 if (ret != 0) 754 goto err; 755 clear_root = 1; 756 757 /* Copy the stack containing the next page. */ 758 for (epg++; epg <= cp->csp; epg++) { 759 BT_STK_PUSH(env, ncp, epg->page, epg->indx, 760 epg->lock, epg->lock_mode, ret); 761 if (ret != 0) 762 goto err; 763 } 764 /* adjust the stack pointer to remove these items. */ 765 ncp->csp--; 766 cp->csp -= ncp->csp - ncp->sp; 767 768 /* 769 * If this is RECNO then we want to swap the stacks. 770 */ 771 if (dbp->type == DB_RECNO) { 772 ndbc->internal = (DBC_INTERNAL *)cp; 773 dbc->internal = (DBC_INTERNAL *)ncp; 774 cp = ncp; 775 ncp = (BTREE_CURSOR *)ndbc->internal; 776 cp->sp->indx--; 777 } else 778 ncp->sp->indx++; 779 780 DB_ASSERT(env, 781 NEXT_PGNO(cp->csp->page) == PGNO(ncp->csp->page)); 782 pg = cp->csp->page; 783 784 /* 785 * The page may have emptied while we waited for the 786 * lock or the record we are looking for may have 787 * moved. 788 * Reset npgno so we re-get this page when we go back 789 * to the top. 790 */ 791 if (NUM_ENT(pg) == 0 || 792 (dbc->dbtype == DB_RECNO && 793 NEXT_PGNO(cp->csp->page) != PGNO(ncp->csp->page))) { 794 npgno = PGNO(pg); 795 *spanp = 0; 796 goto next_page; 797 } 798 799 if (check_trunc && PGNO(pg) > c_data->compact_truncate) { 800 if (PREV_PGNO(pg) != PGNO_INVALID) { 801 TRY_LOCK2(dbc, ndbc, PREV_PGNO(pg), prev_pgno, 802 prev_lock, DB_LOCK_WRITE, retry); 803 if (ret != 0) 804 goto err1; 805 } 806 pgs_done++; 807 /* Get a fresh low numbered page. */ 808 if ((ret = __bam_truncate_page(dbc, 809 &pg, ncp->csp->page, 1)) != 0) 810 goto err1; 811 if ((ret = __TLPUT(dbc, prev_lock)) != 0) 812 goto err; 813 LOCK_INIT(prev_lock); 814 } 815 *spanp = 0; 816 PTRACE(dbc, "SDups", PGNO(ncp->csp->page), start, 0); 817 if (check_dups && (ret = __bam_compact_dups(ndbc, 818 &ncp->csp->page, factor, 1, c_data, &pgs_done)) != 0) 819 goto err1; 820 821 /* Check to see if the tree collapsed. */ 822 if (PGNO(ncp->csp->page) == ncp->root) 823 goto done; 824 825 pg = cp->csp->page; 826 npgno = NEXT_PGNO(pg); 827 PTRACE(dbc, "SDups", PGNO(pg), start, 0); 828 if (check_dups && (ret = 829 __bam_compact_dups(dbc, &cp->csp->page, 830 factor, 1, c_data, &pgs_done)) != 0) 831 goto err1; 832 833 /* 834 * We may have dropped our locks, check again 835 * to see if we still need to fill this page and 836 * we are in a spanning situation. 837 */ 838 839 if (P_FREESPACE(dbp, pg) <= factor || 840 cp->csp[-1].indx != NUM_ENT(cp->csp[-1].page) - 1) 841 goto next_page; 842 843 /* 844 * Try to move things into a single parent. 845 */ 846 merged = 0; 847 for (epg = cp->sp; epg != cp->csp; epg++) { 848 PTRACE(dbc, "PMerge", PGNO(epg->page), start, 0); 849 if ((ret = __bam_merge_internal(dbc, 850 ndbc, LEVEL(epg->page), c_data, &merged)) != 0) 851 break; 852 if (merged) 853 break; 854 } 855 856 if (ret != 0 && ret != DB_LOCK_NOTGRANTED) 857 goto err1; 858 /* 859 * If we merged the parent, then we no longer span. 860 * Otherwise if we tried to merge the parent but would 861 * block on one of the other leaf pages try again. 862 * If we did not merge any records of the parent, 863 * exit to commit any local transactions and try again. 864 */ 865 if (merged || ret == DB_LOCK_NOTGRANTED) { 866 if (merged) 867 pgs_done++; 868 else 869 goto done; 870 if (cp->csp->page == NULL) 871 goto deleted; 872 npgno = PGNO(pg); 873 next_recno = cp->recno; 874 goto next_page; 875 } 876 PTRACE(dbc, "SMerge", PGNO(cp->csp->page), start, 0); 877 878 /* if we remove the next page, then we need its next locked */ 879 npgno = NEXT_PGNO(ncp->csp->page); 880 if (npgno != PGNO_INVALID) { 881 TRY_LOCK2(dbc, ndbc, npgno, 882 nnext_pgno, nnext_lock, DB_LOCK_WRITE, retry); 883 if (ret != 0) 884 goto err1; 885 } 886 if ((ret = __bam_merge(dbc, 887 ndbc, factor, stop, c_data, &isdone)) != 0) 888 goto err1; 889 pgs_done++; 890 /* 891 * __bam_merge could have freed our stack if it 892 * deleted a page possibly collapsing the tree. 893 */ 894 if (cp->csp->page == NULL) 895 goto deleted; 896 cp->recno += NUM_ENT(pg); 897 898 if ((ret = __TLPUT(dbc, nnext_lock)) != 0) 899 goto err1; 900 LOCK_INIT(nnext_lock); 901 902 /* If we did not bump to the next page something did not fit. */ 903 if (npgno != NEXT_PGNO(pg)) { 904 npgno = NEXT_PGNO(pg); 905 goto next_page; 906 } 907 } else { 908 /* Case 2 -- same parents. */ 909 CTRACE(dbc, "Sib", "", start, 0); 910 if ((ret = 911 __bam_csearch(dbc, start, CS_PARENT, LEAFLEVEL)) != 0) { 912 if (ret == DB_NOTFOUND) { 913 isdone = 1; 914 ret = 0; 915 } 916 goto err; 917 } 918 919 pg = cp->csp->page; 920 DB_ASSERT(env, IS_DIRTY(pg)); 921 DB_ASSERT(env, 922 PGNO(pg) == cp->root || IS_DIRTY(cp->csp[-1].page)); 923 924 /* We now have a write lock, recheck the page. */ 925 if ((nentry = NUM_ENT(pg)) == 0) { 926 npgno = PGNO(pg); 927 goto next_page; 928 } 929 930 /* Check duplicate trees, we have a write lock on the page. */ 931 PTRACE(dbc, "SibDup", PGNO(pg), start, 0); 932 if (check_dups && (ret = 933 __bam_compact_dups(dbc, &cp->csp->page, 934 factor, 1, c_data, &pgs_done)) != 0) 935 goto err1; 936 pg = cp->csp->page; 937 npgno = NEXT_PGNO(pg); 938 939 /* Check to see if the tree collapsed. */ 940 if (PGNO(pg) == cp->root) 941 goto err1; 942 DB_ASSERT(env, cp->csp - cp->sp == 1); 943 944 /* After re-locking check to see if we still need to fill. */ 945 if (P_FREESPACE(dbp, pg) <= factor) { 946 if (check_trunc && 947 PGNO(pg) > c_data->compact_truncate) { 948 if (PREV_PGNO(pg) != PGNO_INVALID) { 949 TRY_LOCK(dbc, PREV_PGNO(pg), prev_pgno, 950 prev_lock, DB_LOCK_WRITE, retry); 951 if (ret != 0) 952 goto err1; 953 } 954 if (npgno != PGNO_INVALID) { 955 TRY_LOCK(dbc, npgno, saved_pgno, 956 next_lock, DB_LOCK_WRITE, retry); 957 if (ret != 0) 958 goto err1; 959 } 960 pgs_done++; 961 /* Get a fresh low numbered page. */ 962 if ((ret = __bam_truncate_page(dbc, 963 &pg, NULL, 1)) != 0) 964 goto err1; 965 if ((ret = __TLPUT(dbc, prev_lock)) != 0) 966 goto err1; 967 LOCK_INIT(prev_lock); 968 if ((ret = __TLPUT(dbc, next_lock)) != 0) 969 goto err1; 970 LOCK_INIT(next_lock); 971 } 972 goto next_page; 973 } 974 975 /* If they have the same parent, just dup the cursor */ 976 if (ndbc != NULL && (ret = __dbc_close(ndbc)) != 0) 977 goto err1; 978 if ((ret = __dbc_dup(dbc, &ndbc, DB_POSITION)) != 0) 979 goto err1; 980 ncp = (BTREE_CURSOR *)ndbc->internal; 981 982 /* 983 * ncp->recno needs to have the recno of the next page. 984 * Bump it by the number of records on the current page. 985 */ 986 ncp->recno += NUM_ENT(pg); 987 } 988 989 pgno = PGNO(cp->csp->page); 990 ppgno = PGNO(cp->csp[-1].page); 991 /* Fetch pages until we fill this one. */ 992 while (!isdone && npgno != PGNO_INVALID && 993 P_FREESPACE(dbp, pg) > factor && c_data->compact_pages != 0) { 994 /* 995 * merging may have to free the parent page, if it does, 996 * refetch it but do it decending the tree. 997 */ 998 epg = &cp->csp[-1]; 999 if ((ppg = epg->page) == NULL) { 1000 if ((ret = __memp_fput(dbmp, dbc->thread_info, 1001 cp->csp->page, dbc->priority)) != 0) 1002 goto err1; 1003 pg = NULL; 1004 if ((ret = __memp_fget(dbmp, &ppgno, dbc->thread_info, 1005 dbc->txn, DB_MPOOL_DIRTY, &ppg)) != 0) 1006 goto err1; 1007 if ((ret = __memp_fget(dbmp, &pgno, dbc->thread_info, 1008 dbc->txn, DB_MPOOL_DIRTY, &pg)) != 0) 1009 goto err1; 1010 epg->page = ppg; 1011 cp->csp->page = pg; 1012 } 1013 1014 /* 1015 * If our current position is the last one on a parent 1016 * page, then we are about to merge across different 1017 * internal nodes. Thus, we need to lock higher up 1018 * in the tree. We will exit the routine and commit 1019 * what we have done so far. Set spanp so we know 1020 * we are in this case when we come back. 1021 */ 1022 if (epg->indx == NUM_ENT(ppg) - 1) { 1023 *spanp = 1; 1024 npgno = PGNO(pg); 1025 next_recno = cp->recno; 1026 epg->page = ppg; 1027 goto next_page; 1028 } 1029 1030 /* Lock and get the next page. */ 1031 TRY_LOCK(dbc, npgno, 1032 saved_pgno, saved_lock, DB_LOCK_WRITE, retry); 1033 if (ret != 0) 1034 goto err1; 1035 if ((ret = __LPUT(dbc, ncp->lock)) != 0) 1036 goto err1; 1037 ncp->lock = saved_lock; 1038 LOCK_INIT(saved_lock); 1039 saved_pgno = PGNO_INVALID; 1040 1041 if ((ret = __memp_fget(dbmp, &npgno, 1042 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &npg)) != 0) 1043 goto err1; 1044 1045 if (check_trunc && 1046 PGNO(pg) > c_data->compact_truncate) { 1047 if (PREV_PGNO(pg) != PGNO_INVALID) { 1048 TRY_LOCK(dbc, PREV_PGNO(pg), 1049 prev_pgno, prev_lock, DB_LOCK_WRITE, retry); 1050 if (ret != 0) 1051 goto err1; 1052 } 1053 pgs_done++; 1054 /* Get a fresh low numbered page. */ 1055 if ((ret = __bam_truncate_page(dbc, &pg, npg, 1)) != 0) 1056 goto err1; 1057 if ((ret = __TLPUT(dbc, prev_lock)) != 0) 1058 goto err1; 1059 LOCK_INIT(prev_lock); 1060 pgno = PGNO(pg); 1061 } 1062 c_data->compact_pages_examine++; 1063 1064 PTRACE(dbc, "MDups", PGNO(npg), start, 0); 1065 if (check_dups && (ret = __bam_compact_dups(ndbc, 1066 &npg, factor, 1, c_data, &pgs_done)) != 0) 1067 goto err1; 1068 1069 npgno = NEXT_PGNO(npg); 1070 if (npgno != PGNO_INVALID) { 1071 TRY_LOCK(dbc, npgno, 1072 nnext_pgno, nnext_lock, DB_LOCK_WRITE, retry); 1073 if (ret != 0) 1074 goto err1; 1075 } 1076 1077 /* copy the common parent to the stack. */ 1078 BT_STK_PUSH(env, ncp, ppg, 1079 epg->indx + 1, epg->lock, epg->lock_mode, ret); 1080 if (ret != 0) 1081 goto err1; 1082 1083 /* Put the page on the stack. */ 1084 BT_STK_ENTER(env, ncp, npg, 0, ncp->lock, DB_LOCK_WRITE, ret); 1085 1086 LOCK_INIT(ncp->lock); 1087 npg = NULL; 1088 1089 /* 1090 * Merge the pages. This will either free the next 1091 * page or just update its parent pointer. 1092 */ 1093 PTRACE(dbc, "Merge", PGNO(cp->csp->page), start, 0); 1094 if ((ret = __bam_merge(dbc, 1095 ndbc, factor, stop, c_data, &isdone)) != 0) 1096 goto err1; 1097 1098 pgs_done++; 1099 1100 if ((ret = __TLPUT(dbc, nnext_lock)) != 0) 1101 goto err1; 1102 LOCK_INIT(nnext_lock); 1103 1104 /* 1105 * __bam_merge could have freed our stack if it 1106 * deleted a page possibly collapsing the tree. 1107 */ 1108 if (cp->csp->page == NULL) 1109 goto deleted; 1110 /* If we did not bump to the next page something did not fit. */ 1111 if (npgno != NEXT_PGNO(pg)) 1112 break; 1113 } 1114 1115 /* Bottom of the main loop. Move to the next page. */ 1116 npgno = NEXT_PGNO(pg); 1117 cp->recno += NUM_ENT(pg); 1118 next_recno = cp->recno; 1119 1120next_page: 1121 if (ndbc != NULL) { 1122 ncp = (BTREE_CURSOR *)ndbc->internal; 1123 if (ncp->sp->page == cp->sp->page) { 1124 ncp->sp->page = NULL; 1125 LOCK_INIT(ncp->sp->lock); 1126 } 1127 if ((ret = __bam_stkrel(ndbc, 1128 pgs_done == 0 ? STK_NOLOCK : 0)) != 0) 1129 goto err; 1130 } 1131 /* 1132 * Unlatch the tree before trying to lock the next page. We must 1133 * unlatch to avoid a latch deadlock but we want to hold the 1134 * lock on the parent node so this leaf cannot be unlinked. 1135 */ 1136 pg = NULL; 1137 if ((ret = __bam_stkrel(dbc, STK_PGONLY)) != 0) 1138 goto err; 1139 if ((ret = __db_lget(dbc, 0, npgno, DB_LOCK_READ, 0, &next_lock)) != 0) 1140 goto err; 1141 if ((ret = __bam_stkrel(dbc, pgs_done == 0 ? STK_NOLOCK : 0)) != 0) 1142 goto err; 1143 if ((ret = __TLPUT(dbc, saved_lock)) != 0) 1144 goto err; 1145 if ((ret = __TLPUT(dbc, prev_lock)) != 0) 1146 goto err; 1147 1148next_no_release: 1149 pg = NULL; 1150 1151 if (npgno == PGNO_INVALID || c_data->compact_pages == 0) 1152 isdone = 1; 1153 if (!isdone) { 1154 /* 1155 * If we are at the end of this parent commit the 1156 * transaction so we don't tie things up. 1157 */ 1158 if (pgs_done != 0 && *spanp) { 1159deleted: if (((ret = __bam_stkrel(ndbc, 0)) != 0 || 1160 (ret = __dbc_close(ndbc)) != 0)) 1161 goto err; 1162 *donep = 0; 1163 return (0); 1164 } 1165 1166 /* Reget the next page to look at. */ 1167 cp->recno = next_recno; 1168 if ((ret = __memp_fget(dbmp, &npgno, 1169 dbc->thread_info, dbc->txn, 0, &pg)) != 0) 1170 goto err; 1171 cp->csp->lock = next_lock; 1172 LOCK_INIT(next_lock); 1173 next_p = 1; 1174 /* If we did not do anything we can drop the metalock. */ 1175 if (pgs_done == 0 && (ret = __LPUT(dbc, metalock)) != 0) 1176 goto err; 1177 goto next; 1178 } 1179 1180done: 1181 if (0) { 1182 /* 1183 * We come here if pg came from cp->csp->page and could 1184 * have already been fput. 1185 */ 1186err1: pg = NULL; 1187 } 1188err: /* 1189 * Don't release locks (STK_PGONLY)if we had an error, we could reveal 1190 * a bad tree to a dirty reader. Wait till the abort to free the locks. 1191 */ 1192 sflag = STK_CLRDBC; 1193 if (dbc->txn != NULL && ret != 0) 1194 sflag |= STK_PGONLY; 1195 if (ndbc != NULL) { 1196 ncp = (BTREE_CURSOR *)ndbc->internal; 1197 if (npg == ncp->csp->page) 1198 npg = NULL; 1199 if (ncp->sp->page == cp->sp->page) { 1200 ncp->sp->page = NULL; 1201 LOCK_INIT(ncp->sp->lock); 1202 } 1203 if ((t_ret = __bam_stkrel(ndbc, sflag)) != 0 && ret == 0) 1204 ret = t_ret; 1205 else if ((t_ret = __dbc_close(ndbc)) != 0 && ret == 0) 1206 ret = t_ret; 1207 } 1208 if (pg == cp->csp->page) 1209 pg = NULL; 1210 if ((t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0) 1211 ret = t_ret; 1212 1213 if ((t_ret = __TLPUT(dbc, metalock)) != 0 && ret == 0) 1214 ret = t_ret; 1215 1216 if (pg != NULL && (t_ret = 1217 __memp_fput(dbmp, 1218 dbc->thread_info, pg, dbc->priority) != 0) && ret == 0) 1219 ret = t_ret; 1220 if (npg != NULL && (t_ret = 1221 __memp_fput(dbmp, 1222 dbc->thread_info, npg, dbc->priority) != 0) && ret == 0) 1223 ret = t_ret; 1224 1225 *donep = isdone; 1226 1227 return (ret); 1228} 1229 1230/* 1231 * __bam_merge -- do actual merging of leaf pages. 1232 */ 1233static int 1234__bam_merge(dbc, ndbc, factor, stop, c_data, donep) 1235 DBC *dbc, *ndbc; 1236 u_int32_t factor; 1237 DBT *stop; 1238 DB_COMPACT *c_data; 1239 int *donep; 1240{ 1241 BTREE_CURSOR *cp, *ncp; 1242 DB *dbp; 1243 PAGE *pg, *npg; 1244 db_indx_t nent; 1245 int ret; 1246 1247 DB_ASSERT(NULL, dbc != NULL); 1248 DB_ASSERT(NULL, ndbc != NULL); 1249 dbp = dbc->dbp; 1250 cp = (BTREE_CURSOR *)dbc->internal; 1251 ncp = (BTREE_CURSOR *)ndbc->internal; 1252 pg = cp->csp->page; 1253 npg = ncp->csp->page; 1254 1255 nent = NUM_ENT(npg); 1256 1257 /* If the page is empty just throw it away. */ 1258 if (nent == 0) 1259 goto free_page; 1260 1261 /* Find if the stopping point is on this page. */ 1262 if (stop != NULL && stop->size != 0) { 1263 if ((ret = __bam_compact_isdone(dbc, stop, npg, donep)) != 0) 1264 return (ret); 1265 if (*donep) 1266 return (0); 1267 } 1268 1269 /* 1270 * If there is too much data then just move records one at a time. 1271 * Otherwise copy the data space over and fix up the index table. 1272 * If we are on the left most child we will effect our parent's 1273 * index entry so we call merge_records to figure out key sizes. 1274 */ 1275 if ((dbc->dbtype == DB_BTREE && 1276 ncp->csp[-1].indx == 0 && ncp->csp[-1].entries != 1) || 1277 (int)(P_FREESPACE(dbp, pg) - 1278 ((dbp->pgsize - P_OVERHEAD(dbp)) - 1279 P_FREESPACE(dbp, npg))) < (int)factor) 1280 ret = __bam_merge_records(dbc, ndbc, factor, c_data); 1281 else 1282free_page: ret = __bam_merge_pages(dbc, ndbc, c_data); 1283 1284 return (ret); 1285} 1286 1287static int 1288__bam_merge_records(dbc, ndbc, factor, c_data) 1289 DBC *dbc, *ndbc; 1290 u_int32_t factor; 1291 DB_COMPACT *c_data; 1292{ 1293 BINTERNAL *bi; 1294 BKEYDATA *bk, *tmp_bk; 1295 BTREE *t; 1296 BTREE_CURSOR *cp, *ncp; 1297 DB *dbp; 1298 DBT a, b, data, hdr; 1299 ENV *env; 1300 EPG *epg; 1301 PAGE *pg, *npg; 1302 db_indx_t adj, indx, nent, *ninp, pind; 1303 int32_t adjust; 1304 u_int32_t freespace, nksize, pfree, size; 1305 int first_dup, is_dup, next_dup, n_ok, ret; 1306 size_t (*func) __P((DB *, const DBT *, const DBT *)); 1307 1308 dbp = dbc->dbp; 1309 env = dbp->env; 1310 t = dbp->bt_internal; 1311 cp = (BTREE_CURSOR *)dbc->internal; 1312 ncp = (BTREE_CURSOR *)ndbc->internal; 1313 pg = cp->csp->page; 1314 npg = ncp->csp->page; 1315 memset(&hdr, 0, sizeof(hdr)); 1316 pind = NUM_ENT(pg); 1317 n_ok = 0; 1318 adjust = 0; 1319 ret = 0; 1320 nent = NUM_ENT(npg); 1321 1322 DB_ASSERT(env, nent != 0); 1323 1324 /* See if we want to swap out this page. */ 1325 if (c_data->compact_truncate != PGNO_INVALID && 1326 PGNO(npg) > c_data->compact_truncate) { 1327 /* Get a fresh low numbered page. */ 1328 if ((ret = __bam_truncate_page(ndbc, &npg, pg, 1)) != 0) 1329 goto err; 1330 } 1331 1332 ninp = P_INP(dbp, npg); 1333 1334 /* 1335 * pg is the page that is being filled, it is in the stack in cp. 1336 * npg is the next page, it is in the stack in ncp. 1337 */ 1338 freespace = P_FREESPACE(dbp, pg); 1339 1340 adj = TYPE(npg) == P_LBTREE ? P_INDX : O_INDX; 1341 /* 1342 * Loop through the records and find the stopping point. 1343 */ 1344 for (indx = 0; indx < nent; indx += adj) { 1345 bk = GET_BKEYDATA(dbp, npg, indx); 1346 1347 /* Size of the key. */ 1348 size = BITEM_PSIZE(bk); 1349 1350 /* Size of the data. */ 1351 if (TYPE(pg) == P_LBTREE) 1352 size += BITEM_PSIZE(GET_BKEYDATA(dbp, npg, indx + 1)); 1353 /* 1354 * If we are at a duplicate set, skip ahead to see and 1355 * get the total size for the group. 1356 */ 1357 n_ok = adj; 1358 if (TYPE(pg) == P_LBTREE && 1359 indx < nent - adj && 1360 ninp[indx] == ninp[indx + adj]) { 1361 do { 1362 /* Size of index for key reference. */ 1363 size += sizeof(db_indx_t); 1364 n_ok++; 1365 /* Size of data item. */ 1366 size += BITEM_PSIZE( 1367 GET_BKEYDATA(dbp, npg, indx + n_ok)); 1368 n_ok++; 1369 } while (indx + n_ok < nent && 1370 ninp[indx] == ninp[indx + n_ok]); 1371 } 1372 /* if the next set will not fit on the page we are done. */ 1373 if (freespace < size) 1374 break; 1375 1376 /* 1377 * Otherwise figure out if we are past the goal and if 1378 * adding this set will put us closer to the goal than 1379 * we are now. 1380 */ 1381 if ((freespace - size) < factor) { 1382 if (freespace - factor > factor - (freespace - size)) 1383 indx += n_ok; 1384 break; 1385 } 1386 freespace -= size; 1387 indx += n_ok - adj; 1388 } 1389 1390 /* If we have hit the first record then there is nothing we can move. */ 1391 if (indx == 0) 1392 goto done; 1393 if (TYPE(pg) != P_LBTREE && TYPE(pg) != P_LDUP) { 1394 if (indx == nent) 1395 return (__bam_merge_pages(dbc, ndbc, c_data)); 1396 goto no_check; 1397 } 1398 /* 1399 * We need to update npg's parent key. Avoid creating a new key 1400 * that will be too big. Get what space will be available on the 1401 * parents. Then if there will not be room for this key, see if 1402 * prefix compression will make it work, if not backup till we 1403 * find something that will. (Needless to say, this is a very 1404 * unlikely event.) If we are deleting this page then we will 1405 * need to propagate the next key to our grand parents, so we 1406 * see if that will fit. 1407 */ 1408 pfree = dbp->pgsize; 1409 for (epg = &ncp->csp[-1]; epg >= ncp->sp; epg--) 1410 if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) { 1411 bi = GET_BINTERNAL(dbp, epg->page, epg->indx); 1412 /* Add back in the key we will be deleting. */ 1413 freespace += BINTERNAL_PSIZE(bi->len); 1414 if (freespace < pfree) 1415 pfree = freespace; 1416 if (epg->indx != 0) 1417 break; 1418 } 1419 1420 /* 1421 * If we are at the end, we will delete this page. We need to 1422 * check the next parent key only if we are the leftmost page and 1423 * will therefore have to propagate the key up the tree. 1424 */ 1425 if (indx == nent) { 1426 if (ncp->csp[-1].indx != 0 || ncp->csp[-1].entries == 1 || 1427 BINTERNAL_PSIZE(GET_BINTERNAL(dbp, 1428 ncp->csp[-1].page, 1)->len) <= pfree) 1429 return (__bam_merge_pages(dbc, ndbc, c_data)); 1430 indx -= adj; 1431 } 1432 bk = GET_BKEYDATA(dbp, npg, indx); 1433 if (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) { 1434 if (F_ISSET(dbc, DBC_OPD)) { 1435 if (dbp->dup_compare == __bam_defcmp) 1436 func = __bam_defpfx; 1437 else 1438 func = NULL; 1439 } else 1440 func = t->bt_prefix; 1441 } else 1442 func = NULL; 1443 1444 /* Skip to the beginning of a duplicate set. */ 1445 while (indx != 0 && ninp[indx] == ninp[indx - adj]) 1446 indx -= adj; 1447 1448 while (indx != 0 && BINTERNAL_SIZE(bk->len) >= pfree) { 1449 if (B_TYPE(bk->type) != B_KEYDATA) 1450 goto noprefix; 1451 /* 1452 * Figure out if we can truncate this key. 1453 * Code borrowed from bt_split.c 1454 */ 1455 if (func == NULL) 1456 goto noprefix; 1457 tmp_bk = GET_BKEYDATA(dbp, npg, indx - adj); 1458 if (B_TYPE(tmp_bk->type) != B_KEYDATA) 1459 goto noprefix; 1460 memset(&a, 0, sizeof(a)); 1461 a.size = tmp_bk->len; 1462 a.data = tmp_bk->data; 1463 memset(&b, 0, sizeof(b)); 1464 b.size = bk->len; 1465 b.data = bk->data; 1466 nksize = (u_int32_t)func(dbp, &a, &b); 1467 if (BINTERNAL_PSIZE(nksize) < pfree) 1468 break; 1469noprefix: 1470 /* Skip to the beginning of a duplicate set. */ 1471 do { 1472 indx -= adj; 1473 } while (indx != 0 && ninp[indx] == ninp[indx - adj]); 1474 1475 bk = GET_BKEYDATA(dbp, npg, indx); 1476 } 1477 1478 /* 1479 * indx references the first record that will not move to the previous 1480 * page. If it is 0 then we could not find a key that would fit in 1481 * the parent that would permit us to move any records. 1482 */ 1483 if (indx == 0) 1484 goto done; 1485 DB_ASSERT(env, indx <= nent); 1486 1487 /* Loop through the records and move them from npg to pg. */ 1488no_check: is_dup = first_dup = next_dup = 0; 1489 pg = cp->csp->page; 1490 npg = ncp->csp->page; 1491 DB_ASSERT(env, IS_DIRTY(pg)); 1492 DB_ASSERT(env, IS_DIRTY(npg)); 1493 ninp = P_INP(dbp, npg); 1494 do { 1495 bk = GET_BKEYDATA(dbp, npg, 0); 1496 /* Figure out if we are in a duplicate group or not. */ 1497 if ((NUM_ENT(npg) % 2) == 0) { 1498 if (NUM_ENT(npg) > 2 && ninp[0] == ninp[2]) { 1499 if (!is_dup) { 1500 first_dup = 1; 1501 is_dup = 1; 1502 } else 1503 first_dup = 0; 1504 1505 next_dup = 1; 1506 } else if (next_dup) { 1507 is_dup = 1; 1508 first_dup = 0; 1509 next_dup = 0; 1510 } else 1511 is_dup = 0; 1512 } 1513 1514 if (is_dup && !first_dup && (pind % 2) == 0) { 1515 /* Duplicate key. */ 1516 if ((ret = __bam_adjindx(dbc, 1517 pg, pind, pind - P_INDX, 1)) != 0) 1518 goto err; 1519 if (!next_dup) 1520 is_dup = 0; 1521 } else switch (B_TYPE(bk->type)) { 1522 case B_KEYDATA: 1523 hdr.data = bk; 1524 hdr.size = SSZA(BKEYDATA, data); 1525 data.size = bk->len; 1526 data.data = bk->data; 1527 if ((ret = __db_pitem(dbc, pg, pind, 1528 BKEYDATA_SIZE(bk->len), &hdr, &data)) != 0) 1529 goto err; 1530 break; 1531 case B_OVERFLOW: 1532 case B_DUPLICATE: 1533 data.size = BOVERFLOW_SIZE; 1534 data.data = bk; 1535 if ((ret = __db_pitem(dbc, pg, pind, 1536 BOVERFLOW_SIZE, &data, NULL)) != 0) 1537 goto err; 1538 break; 1539 default: 1540 __db_errx(env, 1541 "Unknown record format, page %lu, indx 0", 1542 (u_long)PGNO(pg)); 1543 ret = EINVAL; 1544 goto err; 1545 } 1546 pind++; 1547 if (next_dup && (NUM_ENT(npg) % 2) == 0) { 1548 if ((ret = __bam_adjindx(ndbc, 1549 npg, 0, O_INDX, 0)) != 0) 1550 goto err; 1551 } else { 1552 if ((ret = __db_ditem(ndbc, 1553 npg, 0, BITEM_SIZE(bk))) != 0) 1554 goto err; 1555 } 1556 adjust++; 1557 } while (--indx != 0); 1558 1559 DB_ASSERT(env, NUM_ENT(npg) != 0); 1560 1561 if (adjust != 0 && 1562 (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD))) { 1563 if (TYPE(pg) == P_LBTREE) 1564 adjust /= P_INDX; 1565 if ((ret = __bam_adjust(ndbc, -adjust)) != 0) 1566 goto err; 1567 1568 if ((ret = __bam_adjust(dbc, adjust)) != 0) 1569 goto err; 1570 } 1571 1572 /* Update parent with new key. */ 1573 if (ndbc->dbtype == DB_BTREE && 1574 (ret = __bam_pupdate(ndbc, pg)) != 0) 1575 goto err; 1576 1577done: if (cp->sp->page == ncp->sp->page) { 1578 cp->sp->page = NULL; 1579 LOCK_INIT(cp->sp->lock); 1580 } 1581 ret = __bam_stkrel(ndbc, STK_CLRDBC); 1582 1583err: return (ret); 1584} 1585 1586static int 1587__bam_merge_pages(dbc, ndbc, c_data) 1588 DBC *dbc, *ndbc; 1589 DB_COMPACT *c_data; 1590{ 1591 BTREE_CURSOR *cp, *ncp; 1592 DB *dbp; 1593 DBT data, hdr; 1594 DB_MPOOLFILE *dbmp; 1595 PAGE *pg, *npg; 1596 db_indx_t nent, *ninp, *pinp; 1597 db_pgno_t ppgno; 1598 u_int8_t *bp; 1599 u_int32_t len; 1600 int i, level, ret; 1601 1602 COMPQUIET(ppgno, PGNO_INVALID); 1603 dbp = dbc->dbp; 1604 dbmp = dbp->mpf; 1605 cp = (BTREE_CURSOR *)dbc->internal; 1606 ncp = (BTREE_CURSOR *)ndbc->internal; 1607 pg = cp->csp->page; 1608 npg = ncp->csp->page; 1609 memset(&hdr, 0, sizeof(hdr)); 1610 nent = NUM_ENT(npg); 1611 1612 /* If the page is empty just throw it away. */ 1613 if (nent == 0) 1614 goto free_page; 1615 1616 pg = cp->csp->page; 1617 npg = ncp->csp->page; 1618 DB_ASSERT(dbp->env, IS_DIRTY(pg)); 1619 DB_ASSERT(dbp->env, IS_DIRTY(npg)); 1620 DB_ASSERT(dbp->env, nent == NUM_ENT(npg)); 1621 1622 /* Bulk copy the data to the new page. */ 1623 len = dbp->pgsize - HOFFSET(npg); 1624 if (DBC_LOGGING(dbc)) { 1625 memset(&hdr, 0, sizeof(hdr)); 1626 hdr.data = npg; 1627 hdr.size = LOFFSET(dbp, npg); 1628 memset(&data, 0, sizeof(data)); 1629 data.data = (u_int8_t *)npg + HOFFSET(npg); 1630 data.size = len; 1631 if ((ret = __bam_merge_log(dbp, 1632 dbc->txn, &LSN(pg), 0, PGNO(pg), 1633 &LSN(pg), PGNO(npg), &LSN(npg), &hdr, &data, 0)) != 0) 1634 goto err; 1635 } else 1636 LSN_NOT_LOGGED(LSN(pg)); 1637 LSN(npg) = LSN(pg); 1638 bp = (u_int8_t *)pg + HOFFSET(pg) - len; 1639 memcpy(bp, (u_int8_t *)npg + HOFFSET(npg), len); 1640 1641 /* Copy index table offset by what was there already. */ 1642 pinp = P_INP(dbp, pg) + NUM_ENT(pg); 1643 ninp = P_INP(dbp, npg); 1644 for (i = 0; i < NUM_ENT(npg); i++) 1645 *pinp++ = *ninp++ - (dbp->pgsize - HOFFSET(pg)); 1646 HOFFSET(pg) -= len; 1647 NUM_ENT(pg) += i; 1648 1649 NUM_ENT(npg) = 0; 1650 HOFFSET(npg) += len; 1651 1652 if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD)) { 1653 /* 1654 * There are two cases here regarding the stack. 1655 * Either we have two two level stacks but only ndbc 1656 * references the parent page or we have a multilevel 1657 * stack and only ndbc has an entry for the spanning 1658 * page. 1659 */ 1660 if (TYPE(pg) == P_LBTREE) 1661 i /= P_INDX; 1662 if ((ret = __bam_adjust(ndbc, -i)) != 0) 1663 goto err; 1664 1665 if ((ret = __bam_adjust(dbc, i)) != 0) 1666 goto err; 1667 } 1668 1669free_page: 1670 /* 1671 * __bam_dpages may decide to collapse the tree. 1672 * This can happen if we have the root and there 1673 * are exactly 2 pointers left in it. 1674 * If it can collapse the tree we must free the other 1675 * stack since it will nolonger be valid. This 1676 * must be done before hand because we cannot 1677 * hold a page pinned if it might be truncated. 1678 */ 1679 if ((ret = __bam_relink(dbc, 1680 ncp->csp->page, cp->csp->page, PGNO_INVALID)) != 0) 1681 goto err; 1682 /* Drop the duplicate reference to the sub tree root. */ 1683 cp->sp->page = NULL; 1684 LOCK_INIT(cp->sp->lock); 1685 if (PGNO(ncp->sp->page) == ncp->root && 1686 NUM_ENT(ncp->sp->page) == 2) { 1687 if ((ret = __bam_stkrel(dbc, STK_CLRDBC | STK_PGONLY)) != 0) 1688 goto err; 1689 level = LEVEL(ncp->sp->page); 1690 ppgno = PGNO(ncp->csp[-1].page); 1691 } else 1692 level = 0; 1693 if (c_data->compact_truncate > PGNO(npg)) 1694 c_data->compact_truncate--; 1695 if ((ret = __bam_dpages(ndbc, 1696 0, ndbc->dbtype == DB_RECNO ? 0 : BTD_UPDATE)) != 0) 1697 goto err; 1698 npg = NULL; 1699 c_data->compact_pages_free++; 1700 c_data->compact_pages--; 1701 if (level != 0) { 1702 if ((ret = __memp_fget(dbmp, &ncp->root, 1703 dbc->thread_info, dbc->txn, 0, &npg)) != 0) 1704 goto err; 1705 if (level == LEVEL(npg)) 1706 level = 0; 1707 if ((ret = __memp_fput(dbmp, 1708 dbc->thread_info, npg, dbc->priority)) != 0) 1709 goto err; 1710 npg = NULL; 1711 if (level != 0) { 1712 c_data->compact_levels++; 1713 c_data->compact_pages_free++; 1714 if (c_data->compact_truncate > ppgno) 1715 c_data->compact_truncate--; 1716 if (c_data->compact_pages != 0) 1717 c_data->compact_pages--; 1718 } 1719 } 1720 1721err: return (ret); 1722} 1723 1724/* 1725 * __bam_merge_internal -- 1726 * Merge internal nodes of the tree. 1727 */ 1728static int 1729__bam_merge_internal(dbc, ndbc, level, c_data, merged) 1730 DBC *dbc, *ndbc; 1731 int level; 1732 DB_COMPACT *c_data; 1733 int *merged; 1734{ 1735 BINTERNAL bi, *bip, *fip; 1736 BTREE_CURSOR *cp, *ncp; 1737 DB *dbp; 1738 DBT data, hdr; 1739 DB_MPOOLFILE *dbmp; 1740 EPG *epg, *save_csp, *nsave_csp; 1741 PAGE *pg, *npg; 1742 RINTERNAL *rk; 1743 db_indx_t first, indx, pind; 1744 db_pgno_t ppgno; 1745 int32_t nrecs, trecs; 1746 u_int16_t size; 1747 u_int32_t freespace, pfree; 1748 int ret; 1749 1750 COMPQUIET(bip, NULL); 1751 COMPQUIET(ppgno, PGNO_INVALID); 1752 DB_ASSERT(NULL, dbc != NULL); 1753 DB_ASSERT(NULL, ndbc != NULL); 1754 1755 /* 1756 * ndbc will contain the the dominating parent of the subtree. 1757 * dbc will have the tree containing the left child. 1758 * 1759 * The stacks descend to the leaf level. 1760 * If this is a recno tree then both stacks will start at the root. 1761 */ 1762 dbp = dbc->dbp; 1763 dbmp = dbp->mpf; 1764 cp = (BTREE_CURSOR *)dbc->internal; 1765 ncp = (BTREE_CURSOR *)ndbc->internal; 1766 *merged = 0; 1767 ret = 0; 1768 1769 /* 1770 * Set the stacks to the level requested. 1771 * Save the old value to restore when we exit. 1772 */ 1773 save_csp = cp->csp; 1774 cp->csp = &cp->csp[-level + 1]; 1775 pg = cp->csp->page; 1776 pind = NUM_ENT(pg); 1777 1778 nsave_csp = ncp->csp; 1779 ncp->csp = &ncp->csp[-level + 1]; 1780 npg = ncp->csp->page; 1781 indx = NUM_ENT(npg); 1782 1783 /* 1784 * The caller may have two stacks that include common ancestors, we 1785 * check here for convenience. 1786 */ 1787 if (npg == pg) 1788 goto done; 1789 1790 if (TYPE(pg) == P_IBTREE) { 1791 /* 1792 * Check for overflow keys on both pages while we have 1793 * them locked. 1794 */ 1795 if ((ret = 1796 __bam_truncate_internal_overflow(dbc, pg, c_data)) != 0) 1797 goto err; 1798 if ((ret = 1799 __bam_truncate_internal_overflow(dbc, npg, c_data)) != 0) 1800 goto err; 1801 } 1802 1803 /* 1804 * If we are about to move data off the left most page of an 1805 * internal node we will need to update its parents, make sure there 1806 * will be room for the new key on all the parents in the stack. 1807 * If not, move less data. 1808 */ 1809 fip = NULL; 1810 if (TYPE(pg) == P_IBTREE) { 1811 /* See where we run out of space. */ 1812 freespace = P_FREESPACE(dbp, pg); 1813 /* 1814 * The leftmost key of an internal page is not accurate. 1815 * Go up the tree to find a non-leftmost parent. 1816 */ 1817 epg = ncp->csp; 1818 while (--epg >= ncp->sp && epg->indx == 0) 1819 continue; 1820 fip = bip = GET_BINTERNAL(dbp, epg->page, epg->indx); 1821 epg = ncp->csp; 1822 1823 for (indx = 0;;) { 1824 size = BINTERNAL_PSIZE(bip->len); 1825 if (size > freespace) 1826 break; 1827 freespace -= size; 1828 if (++indx >= NUM_ENT(npg)) 1829 break; 1830 bip = GET_BINTERNAL(dbp, npg, indx); 1831 } 1832 1833 /* See if we are deleting the page and we are not left most. */ 1834 if (indx == NUM_ENT(npg) && epg[-1].indx != 0) 1835 goto fits; 1836 1837 pfree = dbp->pgsize; 1838 for (epg--; epg >= ncp->sp; epg--) 1839 if ((freespace = P_FREESPACE(dbp, epg->page)) < pfree) { 1840 bip = GET_BINTERNAL(dbp, epg->page, epg->indx); 1841 /* Add back in the key we will be deleting. */ 1842 freespace += BINTERNAL_PSIZE(bip->len); 1843 if (freespace < pfree) 1844 pfree = freespace; 1845 if (epg->indx != 0) 1846 break; 1847 } 1848 epg = ncp->csp; 1849 1850 /* If we are at the end of the page we will delete it. */ 1851 if (indx == NUM_ENT(npg)) { 1852 if (NUM_ENT(epg[-1].page) == 1) 1853 goto fits; 1854 bip = 1855 GET_BINTERNAL(dbp, epg[-1].page, epg[-1].indx + 1); 1856 } else 1857 bip = GET_BINTERNAL(dbp, npg, indx); 1858 1859 /* Back up until we have a key that fits. */ 1860 while (indx != 0 && BINTERNAL_PSIZE(bip->len) > pfree) { 1861 indx--; 1862 bip = GET_BINTERNAL(dbp, npg, indx); 1863 } 1864 if (indx == 0) 1865 goto done; 1866 } 1867 1868fits: memset(&bi, 0, sizeof(bi)); 1869 memset(&hdr, 0, sizeof(hdr)); 1870 memset(&data, 0, sizeof(data)); 1871 trecs = 0; 1872 1873 /* 1874 * Copy data between internal nodes till one is full 1875 * or the other is empty. 1876 */ 1877 first = 0; 1878 nrecs = 0; 1879 do { 1880 if (dbc->dbtype == DB_BTREE) { 1881 bip = GET_BINTERNAL(dbp, npg, 0); 1882 size = fip == NULL ? 1883 BINTERNAL_SIZE(bip->len) : 1884 BINTERNAL_SIZE(fip->len); 1885 if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t)) 1886 break; 1887 1888 if (fip == NULL) { 1889 data.size = bip->len; 1890 data.data = bip->data; 1891 } else { 1892 data.size = fip->len; 1893 data.data = fip->data; 1894 } 1895 bi.len = data.size; 1896 B_TSET(bi.type, bip->type); 1897 bi.pgno = bip->pgno; 1898 bi.nrecs = bip->nrecs; 1899 hdr.data = &bi; 1900 hdr.size = SSZA(BINTERNAL, data); 1901 if (F_ISSET(cp, C_RECNUM) || F_ISSET(dbc, DBC_OPD)) 1902 nrecs = (int32_t)bip->nrecs; 1903 } else { 1904 rk = GET_RINTERNAL(dbp, npg, 0); 1905 size = RINTERNAL_SIZE; 1906 if (P_FREESPACE(dbp, pg) < size + sizeof(db_indx_t)) 1907 break; 1908 1909 hdr.data = rk; 1910 hdr.size = size; 1911 nrecs = (int32_t)rk->nrecs; 1912 } 1913 /* 1914 * Try to lock the subtree leaf records without waiting. 1915 * We must lock the subtree below the record we are merging 1916 * and the one after it since that is were a search will wind 1917 * up if it has already looked at our parent. After the first 1918 * move we have the current subtree already locked. 1919 * If we merged any records then we will revisit this 1920 * node when we merge its leaves. If not we will return 1921 * NOTGRANTED and our caller will do a retry. We only 1922 * need to do this if we are in a transation. If not then 1923 * we cannot abort and things will be hosed up on error 1924 * anyway. 1925 */ 1926 if (dbc->txn != NULL && (ret = __bam_lock_tree(ndbc, 1927 ncp->csp, nsave_csp, first, 1928 NUM_ENT(ncp->csp->page) == 1 ? 1 : 2)) != 0) { 1929 if (ret != DB_LOCK_NOTGRANTED) 1930 goto err; 1931 break; 1932 } 1933 first = 1; 1934 if ((ret = __db_pitem(dbc, pg, pind, size, &hdr, &data)) != 0) 1935 goto err; 1936 pind++; 1937 if (fip != NULL) { 1938 /* reset size to be for the record being deleted. */ 1939 size = BINTERNAL_SIZE(bip->len); 1940 fip = NULL; 1941 } 1942 if ((ret = __db_ditem(ndbc, npg, 0, size)) != 0) 1943 goto err; 1944 *merged = 1; 1945 trecs += nrecs; 1946 } while (--indx != 0); 1947 1948 if (!*merged) 1949 goto done; 1950 1951 if (trecs != 0) { 1952 cp->csp--; 1953 ret = __bam_adjust(dbc, trecs); 1954 if (ret != 0) 1955 goto err; 1956 cp->csp++; 1957 ncp->csp--; 1958 if ((ret = __bam_adjust(ndbc, -trecs)) != 0) 1959 goto err; 1960 ncp->csp++; 1961 } 1962 1963 /* 1964 * Either we emptied the page or we need to update its 1965 * parent to reflect the first page we now point to. 1966 * First get rid of the bottom of the stack, 1967 * bam_dpages will clear the stack. Maintain transactional 1968 * locks on the leaf pages to protect changes at this level. 1969 */ 1970 do { 1971 if ((ret = __memp_fput(dbmp, dbc->thread_info, 1972 nsave_csp->page, dbc->priority)) != 0) 1973 goto err; 1974 nsave_csp->page = NULL; 1975 if ((ret = __TLPUT(dbc, nsave_csp->lock)) != 0) 1976 goto err; 1977 LOCK_INIT(nsave_csp->lock); 1978 nsave_csp--; 1979 } while (nsave_csp != ncp->csp); 1980 1981 if (NUM_ENT(npg) == 0) { 1982 /* 1983 * __bam_dpages may decide to collapse the tree 1984 * so we need to free our other stack. The tree 1985 * will change in hight and our stack will nolonger 1986 * be valid. 1987 */ 1988 cp->csp = save_csp; 1989 cp->sp->page = NULL; 1990 LOCK_INIT(cp->sp->lock); 1991 if (PGNO(ncp->sp->page) == ncp->root && 1992 NUM_ENT(ncp->sp->page) == 2) { 1993 if ((ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0) 1994 goto err; 1995 level = LEVEL(ncp->sp->page); 1996 ppgno = PGNO(ncp->csp[-1].page); 1997 } else 1998 level = 0; 1999 2000 if (c_data->compact_truncate > PGNO(npg)) 2001 c_data->compact_truncate--; 2002 ret = __bam_dpages(ndbc, 2003 0, ndbc->dbtype == DB_RECNO ? 2004 BTD_RELINK : BTD_UPDATE | BTD_RELINK); 2005 c_data->compact_pages_free++; 2006 if (ret == 0 && level != 0) { 2007 if ((ret = __memp_fget(dbmp, &ncp->root, 2008 dbc->thread_info, dbc->txn, 0, &npg)) != 0) 2009 goto err; 2010 if (level == LEVEL(npg)) 2011 level = 0; 2012 if ((ret = __memp_fput(dbmp, 2013 dbc->thread_info, npg, dbc->priority)) != 0) 2014 goto err; 2015 npg = NULL; 2016 if (level != 0) { 2017 c_data->compact_levels++; 2018 c_data->compact_pages_free++; 2019 if (c_data->compact_truncate > ppgno) 2020 c_data->compact_truncate--; 2021 if (c_data->compact_pages != 0) 2022 c_data->compact_pages--; 2023 } 2024 } 2025 } else { 2026 ret = __bam_pupdate(ndbc, npg); 2027 2028 if (NUM_ENT(npg) != 0 && 2029 c_data->compact_truncate != PGNO_INVALID && 2030 PGNO(npg) > c_data->compact_truncate && 2031 ncp->csp != ncp->sp) { 2032 if ((ret = __bam_truncate_page(ndbc, &npg, pg, 1)) != 0) 2033 goto err; 2034 } 2035 if (c_data->compact_truncate != PGNO_INVALID && 2036 PGNO(pg) > c_data->compact_truncate && cp->csp != cp->sp) { 2037 if ((ret = __bam_truncate_page(dbc, &pg, npg, 1)) != 0) 2038 goto err; 2039 } 2040 } 2041 cp->csp = save_csp; 2042 2043 return (ret); 2044 2045done: 2046err: cp->csp = save_csp; 2047 ncp->csp = nsave_csp; 2048 2049 return (ret); 2050} 2051 2052/* 2053 * __bam_compact_dups -- try to compress off page dup trees. 2054 * We may or may not have a write lock on this page. 2055 */ 2056static int 2057__bam_compact_dups(dbc, ppg, factor, have_lock, c_data, donep) 2058 DBC *dbc; 2059 PAGE **ppg; 2060 u_int32_t factor; 2061 int have_lock; 2062 DB_COMPACT *c_data; 2063 int *donep; 2064{ 2065 BOVERFLOW *bo; 2066 BTREE_CURSOR *cp; 2067 DB *dbp; 2068 DBC *opd; 2069 DBT start; 2070 DB_MPOOLFILE *dbmp; 2071 ENV *env; 2072 PAGE *dpg, *pg; 2073 db_indx_t i; 2074 db_pgno_t pgno; 2075 int isdone, level, ret, span, t_ret; 2076 2077 span = 0; 2078 ret = 0; 2079 opd = NULL; 2080 2081 DB_ASSERT(NULL, dbc != NULL); 2082 dbp = dbc->dbp; 2083 env = dbp->env; 2084 dbmp = dbp->mpf; 2085 cp = (BTREE_CURSOR *)dbc->internal; 2086 pg = *ppg; 2087 2088 for (i = 0; i < NUM_ENT(pg); i++) { 2089 bo = GET_BOVERFLOW(dbp, pg, i); 2090 if (B_TYPE(bo->type) == B_KEYDATA) 2091 continue; 2092 c_data->compact_pages_examine++; 2093 if (bo->pgno > c_data->compact_truncate) { 2094 (*donep)++; 2095 if (!have_lock) { 2096 /* 2097 * The caller should have the page at 2098 * least read locked. Drop the buffer 2099 * and get the write lock. 2100 */ 2101 pgno = PGNO(pg); 2102 if ((ret = __memp_fput(dbmp, dbc->thread_info, 2103 pg, dbc->priority)) != 0) 2104 goto err; 2105 *ppg = NULL; 2106 if ((ret = __db_lget(dbc, 0, pgno, 2107 DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0) 2108 goto err; 2109 have_lock = 1; 2110 if ((ret = __memp_fget(dbmp, &pgno, 2111 dbc->thread_info, 2112 dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0) 2113 goto err; 2114 pg = *ppg; 2115 } 2116 if ((ret = 2117 __bam_truncate_root_page(dbc, pg, i, c_data)) != 0) 2118 goto err; 2119 /* Just in case it should move. Could it? */ 2120 bo = GET_BOVERFLOW(dbp, pg, i); 2121 } 2122 2123 if (B_TYPE(bo->type) == B_OVERFLOW) { 2124 if ((ret = __bam_truncate_overflow(dbc, 2125 bo->pgno, have_lock ? NULL : ppg, c_data)) != 0) 2126 goto err; 2127 (*donep)++; 2128 continue; 2129 } 2130 /* 2131 * Take a peek at the root. If it's a leaf then 2132 * there is no tree here, avoid all the trouble. 2133 */ 2134 if ((ret = __memp_fget(dbmp, &bo->pgno, 2135 dbc->thread_info, dbc->txn, 0, &dpg)) != 0) 2136 goto err; 2137 2138 level = dpg->level; 2139 if ((ret = __memp_fput(dbmp, 2140 dbc->thread_info, dpg, dbc->priority)) != 0) 2141 goto err; 2142 if (level == LEAFLEVEL) 2143 continue; 2144 if ((ret = __dbc_newopd(dbc, bo->pgno, NULL, &opd)) != 0) 2145 return (ret); 2146 if (!have_lock) { 2147 /* 2148 * The caller should have the page at 2149 * least read locked. Drop the buffer 2150 * and get the write lock. 2151 */ 2152 pgno = PGNO(pg); 2153 if ((ret = __memp_fput(dbmp, dbc->thread_info, 2154 pg, dbc->priority)) != 0) 2155 goto err; 2156 *ppg = NULL; 2157 if ((ret = __db_lget(dbc, 0, pgno, 2158 DB_LOCK_WRITE, 0, &cp->csp->lock)) != 0) 2159 goto err; 2160 have_lock = 1; 2161 if ((ret = __memp_fget(dbmp, &pgno, 2162 dbc->thread_info, 2163 dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0) 2164 goto err; 2165 pg = *ppg; 2166 } 2167 (*donep)++; 2168 memset(&start, 0, sizeof(start)); 2169 do { 2170 if ((ret = __bam_compact_int(opd, &start, 2171 NULL, factor, &span, c_data, &isdone)) != 0) 2172 break; 2173 } while (!isdone); 2174 2175 if (start.data != NULL) 2176 __os_free(env, start.data); 2177 2178 if (ret != 0) 2179 goto err; 2180 2181 ret = __dbc_close(opd); 2182 opd = NULL; 2183 if (ret != 0) 2184 goto err; 2185 } 2186 2187err: if (opd != NULL && (t_ret = __dbc_close(opd)) != 0 && ret == 0) 2188 ret = t_ret; 2189 return (ret); 2190} 2191 2192/* 2193 * __bam_truncate_page -- swap a page with a lower numbered page. 2194 * The cusor has a stack which includes at least the 2195 * immediate parent of this page. 2196 */ 2197static int 2198__bam_truncate_page(dbc, pgp, opg, update_parent) 2199 DBC *dbc; 2200 PAGE **pgp, *opg; 2201 int update_parent; 2202{ 2203 BTREE_CURSOR *cp; 2204 DB *dbp; 2205 DBT data, hdr; 2206 DB_LSN lsn; 2207 DB_LOCK lock; 2208 EPG *epg; 2209 PAGE *newpage; 2210 db_pgno_t newpgno, oldpgno, *pgnop; 2211 int ret; 2212 2213 DB_ASSERT(NULL, dbc != NULL); 2214 dbp = dbc->dbp; 2215 LOCK_INIT(lock); 2216 2217 /* 2218 * We want to free a page that lives in the part of the file that 2219 * can be truncated, so we're going to move it onto a free page 2220 * that is in the part of the file that need not be truncated. 2221 * Since the freelist is ordered now, we can simply call __db_new 2222 * which will grab the first element off the freelist; we know this 2223 * is the lowest numbered free page. 2224 */ 2225 if ((ret = __db_new(dbc, P_DONTEXTEND | TYPE(*pgp), 2226 TYPE(*pgp) == P_LBTREE ? &lock : NULL, &newpage)) != 0) 2227 return (ret); 2228 2229 /* 2230 * If newpage is null then __db_new would have had to allocate 2231 * a new page from the filesystem, so there is no reason 2232 * to continue this action. 2233 */ 2234 if (newpage == NULL) 2235 return (0); 2236 2237 /* 2238 * It is possible that a higher page is allocated if other threads 2239 * are allocating at the same time, if so, just put it back. 2240 */ 2241 if (PGNO(newpage) > PGNO(*pgp)) { 2242 /* Its unfortunate but you can't just free a new overflow. */ 2243 if (TYPE(newpage) == P_OVERFLOW) 2244 OV_LEN(newpage) = 0; 2245 if ((ret = __LPUT(dbc, lock)) != 0) 2246 return (ret); 2247 return (__db_free(dbc, newpage)); 2248 } 2249 2250 /* Log if necessary. */ 2251 if (DBC_LOGGING(dbc)) { 2252 memset(&hdr, 0, sizeof(hdr)); 2253 hdr.data = *pgp; 2254 hdr.size = P_OVERHEAD(dbp); 2255 memset(&data, 0, sizeof(data)); 2256 if (TYPE(*pgp) == P_OVERFLOW) { 2257 data.data = (u_int8_t *)*pgp + P_OVERHEAD(dbp); 2258 data.size = OV_LEN(*pgp); 2259 } else { 2260 data.data = (u_int8_t *)*pgp + HOFFSET(*pgp); 2261 data.size = dbp->pgsize - HOFFSET(*pgp); 2262 hdr.size += NUM_ENT(*pgp) * sizeof(db_indx_t); 2263 } 2264 if ((ret = __bam_merge_log(dbp, dbc->txn, 2265 &LSN(newpage), 0, PGNO(newpage), &LSN(newpage), 2266 PGNO(*pgp), &LSN(*pgp), &hdr, &data, 1)) != 0) 2267 goto err; 2268 } else 2269 LSN_NOT_LOGGED(LSN(newpage)); 2270 2271 oldpgno = PGNO(*pgp); 2272 newpgno = PGNO(newpage); 2273 lsn = LSN(newpage); 2274 memcpy(newpage, *pgp, dbp->pgsize); 2275 PGNO(newpage) = newpgno; 2276 LSN(newpage) = lsn; 2277 2278 /* Empty the old page. */ 2279 if ((ret = __memp_dirty(dbp->mpf, 2280 pgp, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 2281 goto err; 2282 if (TYPE(*pgp) == P_OVERFLOW) 2283 OV_LEN(*pgp) = 0; 2284 else { 2285 HOFFSET(*pgp) = dbp->pgsize; 2286 NUM_ENT(*pgp) = 0; 2287 } 2288 LSN(*pgp) = lsn; 2289 2290 /* Update siblings. */ 2291 switch (TYPE(newpage)) { 2292 case P_OVERFLOW: 2293 case P_LBTREE: 2294 case P_LRECNO: 2295 case P_LDUP: 2296 if (NEXT_PGNO(newpage) == PGNO_INVALID && 2297 PREV_PGNO(newpage) == PGNO_INVALID) 2298 break; 2299 if ((ret = __bam_relink(dbc, *pgp, opg, PGNO(newpage))) != 0) 2300 goto err; 2301 break; 2302 default: 2303 break; 2304 } 2305 cp = (BTREE_CURSOR *)dbc->internal; 2306 2307 /* 2308 * Now, if we free this page, it will get truncated, when we free 2309 * all the pages after it in the file. 2310 */ 2311 ret = __db_free(dbc, *pgp); 2312 /* db_free always puts the page. */ 2313 *pgp = newpage; 2314 2315 if (ret != 0) 2316 return (ret); 2317 2318 if (!update_parent) 2319 goto done; 2320 2321 /* Update the parent. */ 2322 epg = &cp->csp[-1]; 2323 2324 switch (TYPE(epg->page)) { 2325 case P_IBTREE: 2326 pgnop = &GET_BINTERNAL(dbp, epg->page, epg->indx)->pgno; 2327 break; 2328 case P_IRECNO: 2329 pgnop = &GET_RINTERNAL(dbp, epg->page, epg->indx)->pgno; 2330 break; 2331 default: 2332 pgnop = &GET_BOVERFLOW(dbp, epg->page, epg->indx)->pgno; 2333 break; 2334 } 2335 DB_ASSERT(dbp->env, oldpgno == *pgnop); 2336 if (DBC_LOGGING(dbc)) { 2337 if ((ret = __bam_pgno_log(dbp, dbc->txn, &LSN(epg->page), 2338 0, PGNO(epg->page), &LSN(epg->page), (u_int32_t)epg->indx, 2339 *pgnop, PGNO(newpage))) != 0) 2340 return (ret); 2341 } else 2342 LSN_NOT_LOGGED(LSN(epg->page)); 2343 2344 *pgnop = PGNO(newpage); 2345 cp->csp->page = newpage; 2346 if ((ret = __TLPUT(dbc, lock)) != 0) 2347 return (ret); 2348 2349done: return (0); 2350 2351err: (void)__memp_fput(dbp->mpf, dbc->thread_info, newpage, dbc->priority); 2352 (void)__TLPUT(dbc, lock); 2353 return (ret); 2354} 2355 2356/* 2357 * __bam_truncate_overflow -- find overflow pages to truncate. 2358 * Walk the pages of an overflow chain and swap out 2359 * high numbered pages. We are passed the first page 2360 * but only deal with the second and subsequent pages. 2361 */ 2362 2363static int 2364__bam_truncate_overflow(dbc, pgno, ppg, c_data) 2365 DBC *dbc; 2366 db_pgno_t pgno; 2367 PAGE **ppg; 2368 DB_COMPACT *c_data; 2369{ 2370 DB *dbp; 2371 DB_LOCK lock; 2372 PAGE *page; 2373 db_pgno_t ppgno; 2374 int have_lock, ret, t_ret; 2375 2376 dbp = dbc->dbp; 2377 page = NULL; 2378 LOCK_INIT(lock); 2379 have_lock = ppg == NULL; 2380 2381 if ((ret = __memp_fget(dbp->mpf, &pgno, 2382 dbc->thread_info, dbc->txn, 0, &page)) != 0) 2383 return (ret); 2384 2385 while ((pgno = NEXT_PGNO(page)) != PGNO_INVALID) { 2386 if ((ret = __memp_fput(dbp->mpf, 2387 dbc->thread_info, page, dbc->priority)) != 0) 2388 return (ret); 2389 if ((ret = __memp_fget(dbp->mpf, &pgno, 2390 dbc->thread_info, dbc->txn, 0, &page)) != 0) 2391 return (ret); 2392 if (pgno <= c_data->compact_truncate) 2393 continue; 2394 if (have_lock == 0) { 2395 ppgno = PGNO(*ppg); 2396 if ((ret = __memp_fput(dbp->mpf, dbc->thread_info, 2397 *ppg, dbc->priority)) != 0) 2398 goto err; 2399 *ppg = NULL; 2400 if ((ret = __db_lget(dbc, 0, ppgno, 2401 DB_LOCK_WRITE, 0, &lock)) != 0) 2402 goto err; 2403 if ((ret = __memp_fget(dbp->mpf, &ppgno, 2404 dbc->thread_info, 2405 dbc->txn, DB_MPOOL_DIRTY, ppg)) != 0) 2406 goto err; 2407 have_lock = 1; 2408 } 2409 if ((ret = __bam_truncate_page(dbc, &page, NULL, 0)) != 0) 2410 break; 2411 } 2412 2413err: if (page != NULL && 2414 (t_ret = __memp_fput( dbp->mpf, 2415 dbc->thread_info, page, dbc->priority)) != 0 && ret == 0) 2416 ret = t_ret; 2417 if ((t_ret = __TLPUT(dbc, lock)) != 0 && ret == 0) 2418 ret = t_ret; 2419 return (ret); 2420} 2421 2422/* 2423 * __bam_truncate_root_page -- swap a page which is 2424 * the root of an off page dup tree or the head of an overflow. 2425 * The page is reference by the pg/indx passed in. 2426 */ 2427static int 2428__bam_truncate_root_page(dbc, pg, indx, c_data) 2429 DBC *dbc; 2430 PAGE *pg; 2431 u_int32_t indx; 2432 DB_COMPACT *c_data; 2433{ 2434 BINTERNAL *bi; 2435 BOVERFLOW *bo; 2436 DB *dbp; 2437 DBT orig; 2438 PAGE *page; 2439 db_pgno_t newpgno, *pgnop; 2440 int ret, t_ret; 2441 2442 COMPQUIET(c_data, NULL); 2443 COMPQUIET(bo, NULL); 2444 COMPQUIET(newpgno, PGNO_INVALID); 2445 dbp = dbc->dbp; 2446 page = NULL; 2447 if (TYPE(pg) == P_IBTREE) { 2448 bi = GET_BINTERNAL(dbp, pg, indx); 2449 if (B_TYPE(bi->type) == B_OVERFLOW) { 2450 bo = (BOVERFLOW *)(bi->data); 2451 pgnop = &bo->pgno; 2452 } else 2453 pgnop = &bi->pgno; 2454 } else { 2455 bo = GET_BOVERFLOW(dbp, pg, indx); 2456 pgnop = &bo->pgno; 2457 } 2458 2459 DB_ASSERT(dbp->env, IS_DIRTY(pg)); 2460 2461 if ((ret = __memp_fget(dbp->mpf, pgnop, 2462 dbc->thread_info, dbc->txn, 0, &page)) != 0) 2463 goto err; 2464 2465 /* 2466 * If this is a multiply reference overflow key, then we will just 2467 * copy it and decrement the reference count. This is part of a 2468 * fix to get rid of multiple references. 2469 */ 2470 if (TYPE(page) == P_OVERFLOW && OV_REF(page) > 1) { 2471 if ((ret = __db_ovref(dbc, bo->pgno)) != 0) 2472 goto err; 2473 memset(&orig, 0, sizeof(orig)); 2474 if ((ret = __db_goff(dbc, &orig, bo->tlen, bo->pgno, 2475 &orig.data, &orig.size)) == 0) 2476 ret = __db_poff(dbc, &orig, &newpgno); 2477 if (orig.data != NULL) 2478 __os_free(dbp->env, orig.data); 2479 if (ret != 0) 2480 goto err; 2481 } else { 2482 if ((ret = __bam_truncate_page(dbc, &page, NULL, 0)) != 0) 2483 goto err; 2484 newpgno = PGNO(page); 2485 /* If we could not allocate from the free list, give up.*/ 2486 if (newpgno == *pgnop) 2487 goto err; 2488 } 2489 2490 /* Update the reference. */ 2491 if (DBC_LOGGING(dbc)) { 2492 if ((ret = __bam_pgno_log(dbp, 2493 dbc->txn, &LSN(pg), 0, PGNO(pg), 2494 &LSN(pg), (u_int32_t)indx, *pgnop, newpgno)) != 0) 2495 goto err; 2496 } else 2497 LSN_NOT_LOGGED(LSN(pg)); 2498 2499 *pgnop = newpgno; 2500 2501err: if (page != NULL && (t_ret = 2502 __memp_fput(dbp->mpf, dbc->thread_info, 2503 page, dbc->priority)) != 0 && ret == 0) 2504 ret = t_ret; 2505 return (ret); 2506} 2507 2508/* 2509 * -- bam_truncate_internal_overflow -- find overflow keys 2510 * on internal pages and if they have high page 2511 * numbers swap them with lower pages and truncate them. 2512 * Note that if there are overflow keys in the internal 2513 * nodes they will get copied adding pages to the database. 2514 */ 2515static int 2516__bam_truncate_internal_overflow(dbc, page, c_data) 2517 DBC *dbc; 2518 PAGE *page; 2519 DB_COMPACT *c_data; 2520{ 2521 BINTERNAL *bi; 2522 BOVERFLOW *bo; 2523 db_indx_t indx; 2524 int ret; 2525 2526 COMPQUIET(bo, NULL); 2527 ret = 0; 2528 for (indx = 0; indx < NUM_ENT(page); indx++) { 2529 bi = GET_BINTERNAL(dbc->dbp, page, indx); 2530 if (B_TYPE(bi->type) != B_OVERFLOW) 2531 continue; 2532 bo = (BOVERFLOW *)(bi->data); 2533 if (bo->pgno > c_data->compact_truncate && (ret = 2534 __bam_truncate_root_page(dbc, page, indx, c_data)) != 0) 2535 break; 2536 if ((ret = __bam_truncate_overflow( 2537 dbc, bo->pgno, NULL, c_data)) != 0) 2538 break; 2539 } 2540 return (ret); 2541} 2542 2543/* 2544 * __bam_compact_isdone --- 2545 * 2546 * Check to see if the stop key specified by the caller is on the 2547 * current page, in which case we are done compacting. 2548 */ 2549static int 2550__bam_compact_isdone(dbc, stop, pg, isdone) 2551 DBC *dbc; 2552 DBT *stop; 2553 PAGE *pg; 2554 int *isdone; 2555{ 2556 db_recno_t recno; 2557 BTREE *t; 2558 BTREE_CURSOR *cp; 2559 int cmp, ret; 2560 2561 *isdone = 0; 2562 cp = (BTREE_CURSOR *)dbc->internal; 2563 t = dbc->dbp->bt_internal; 2564 2565 if (dbc->dbtype == DB_RECNO) { 2566 if ((ret = __ram_getno(dbc, stop, &recno, 0)) != 0) 2567 return (ret); 2568 *isdone = cp->recno > recno; 2569 } else { 2570 DB_ASSERT(dbc->dbp->env, TYPE(pg) == P_LBTREE); 2571 if ((ret = __bam_cmp(dbc, stop, pg, 0, 2572 t->bt_compare, &cmp)) != 0) 2573 return (ret); 2574 2575 *isdone = cmp <= 0; 2576 } 2577 return (0); 2578} 2579 2580/* 2581 * Lock the subtrees from the top of the stack. 2582 * The 0'th child may be in the stack and locked otherwise iterate 2583 * through the records by calling __bam_lock_subtree. 2584 */ 2585static int 2586__bam_lock_tree(dbc, sp, csp, start, stop) 2587 DBC *dbc; 2588 EPG *sp, *csp; 2589 u_int32_t start, stop; 2590{ 2591 PAGE *cpage; 2592 db_pgno_t pgno; 2593 int ret; 2594 2595 if (dbc->dbtype == DB_RECNO) 2596 pgno = GET_RINTERNAL(dbc->dbp, sp->page, 0)->pgno; 2597 else 2598 pgno = GET_BINTERNAL(dbc->dbp, sp->page, 0)->pgno; 2599 cpage = (sp + 1)->page; 2600 /* 2601 * First recurse down the left most sub tree if it is in the cursor 2602 * stack. We already have these pages latched and locked if its a 2603 * leaf. 2604 */ 2605 if (start == 0 && sp + 1 != csp && pgno == PGNO(cpage) && 2606 (ret = __bam_lock_tree(dbc, sp + 1, csp, 0, NUM_ENT(cpage))) != 0) 2607 return (ret); 2608 2609 /* 2610 * Then recurse on the other records on the page if needed. 2611 * If the page is in the stack then its already locked or 2612 * was processed above. 2613 */ 2614 if (start == 0 && pgno == PGNO(cpage)) 2615 start = 1; 2616 2617 if (start == stop) 2618 return (0); 2619 return (__bam_lock_subtree(dbc, sp->page, start, stop)); 2620 2621} 2622 2623/* 2624 * Lock the subtree from the current node. 2625 */ 2626static int 2627__bam_lock_subtree(dbc, page, indx, stop) 2628 DBC *dbc; 2629 PAGE *page; 2630 u_int32_t indx, stop; 2631{ 2632 DB *dbp; 2633 DB_LOCK lock; 2634 PAGE *cpage; 2635 db_pgno_t pgno; 2636 int ret, t_ret; 2637 2638 dbp = dbc->dbp; 2639 2640 for (; indx < stop; indx++) { 2641 if (dbc->dbtype == DB_RECNO) 2642 pgno = GET_RINTERNAL(dbc->dbp, page, indx)->pgno; 2643 else 2644 pgno = GET_BINTERNAL(dbc->dbp, page, indx)->pgno; 2645 if (LEVEL(page) - 1 == LEAFLEVEL) { 2646 if ((ret = __db_lget(dbc, 0, pgno, 2647 DB_LOCK_WRITE, DB_LOCK_NOWAIT, &lock)) != 0) { 2648 if (ret == DB_LOCK_DEADLOCK) 2649 return (DB_LOCK_NOTGRANTED); 2650 return (ret); 2651 } 2652 } else { 2653 if ((ret = __memp_fget(dbp->mpf, &pgno, 2654 dbc->thread_info, dbc->txn, 0, &cpage)) != 0) 2655 return (ret); 2656 ret = __bam_lock_subtree(dbc, cpage, 0, NUM_ENT(cpage)); 2657 if ((t_ret = __memp_fput(dbp->mpf, dbc->thread_info, 2658 cpage, dbc->priority)) != 0 && ret == 0) 2659 ret = t_ret; 2660 if (ret != 0) 2661 return (ret); 2662 } 2663 } 2664 return (0); 2665} 2666 2667#ifdef HAVE_FTRUNCATE 2668/* 2669 * __bam_savekey -- save the key from an internal page. 2670 * We need to save information so that we can 2671 * fetch then next internal node of the tree. This means 2672 * we need the btree key on this current page, or the 2673 * next record number. 2674 */ 2675static int 2676__bam_savekey(dbc, next, start) 2677 DBC *dbc; 2678 int next; 2679 DBT *start; 2680{ 2681 BINTERNAL *bi; 2682 BKEYDATA *bk; 2683 BOVERFLOW *bo; 2684 BTREE_CURSOR *cp; 2685 DB *dbp; 2686 DB_LOCK lock; 2687 ENV *env; 2688 PAGE *pg; 2689 RINTERNAL *ri; 2690 db_indx_t indx, top; 2691 db_pgno_t pgno, saved_pgno; 2692 int ret, t_ret; 2693 u_int32_t len; 2694 u_int8_t *data; 2695 int level; 2696 2697 dbp = dbc->dbp; 2698 env = dbp->env; 2699 cp = (BTREE_CURSOR *)dbc->internal; 2700 pg = cp->csp->page; 2701 ret = 0; 2702 2703 if (dbc->dbtype == DB_RECNO) { 2704 if (next) 2705 for (indx = 0, top = NUM_ENT(pg); indx != top; indx++) { 2706 ri = GET_RINTERNAL(dbp, pg, indx); 2707 cp->recno += ri->nrecs; 2708 } 2709 return (__db_retcopy(env, start, &cp->recno, 2710 sizeof(cp->recno), &start->data, &start->ulen)); 2711 2712 } 2713 2714 bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1); 2715 data = bi->data; 2716 len = bi->len; 2717 LOCK_INIT(lock); 2718 saved_pgno = PGNO_INVALID; 2719 /* If there is single record on the page it may have an empty key. */ 2720 while (len == 0) { 2721 /* 2722 * We should not have an empty data page, since we just 2723 * compacted things, check anyway and punt. 2724 */ 2725 if (NUM_ENT(pg) == 0) 2726 goto no_key; 2727 pgno = bi->pgno; 2728 level = LEVEL(pg); 2729 if (pg != cp->csp->page && 2730 (ret = __memp_fput(dbp->mpf, 2731 dbc->thread_info, pg, dbc->priority)) != 0) { 2732 pg = NULL; 2733 goto err; 2734 } 2735 if (level - 1 == LEAFLEVEL) { 2736 TRY_LOCK(dbc, pgno, saved_pgno, 2737 lock, DB_LOCK_READ, retry); 2738 if (ret != 0) 2739 goto err; 2740 } 2741 if ((ret = __memp_fget(dbp->mpf, &pgno, 2742 dbc->thread_info, dbc->txn, 0, &pg)) != 0) 2743 goto err; 2744 2745 /* 2746 * At the data level use the last key to try and avoid the 2747 * possibility that the user has a zero length key, if they 2748 * do, we punt. 2749 */ 2750 if (pg->level == LEAFLEVEL) { 2751 bk = GET_BKEYDATA(dbp, pg, NUM_ENT(pg) - 2); 2752 data = bk->data; 2753 len = bk->len; 2754 if (len == 0) { 2755no_key: __db_errx(env, 2756 "Compact cannot handle zero length key"); 2757 ret = DB_NOTFOUND; 2758 goto err; 2759 } 2760 } else { 2761 bi = GET_BINTERNAL(dbp, pg, NUM_ENT(pg) - 1); 2762 data = bi->data; 2763 len = bi->len; 2764 } 2765 } 2766 if (B_TYPE(bi->type) == B_OVERFLOW) { 2767 bo = (BOVERFLOW *)(data); 2768 ret = __db_goff(dbc, start, bo->tlen, bo->pgno, 2769 &start->data, &start->ulen); 2770 } 2771 else 2772 ret = __db_retcopy(env, 2773 start, data, len, &start->data, &start->ulen); 2774 2775err: if (pg != NULL && pg != cp->csp->page && 2776 (t_ret = __memp_fput(dbp->mpf, dbc->thread_info, 2777 pg, dbc->priority)) != 0 && ret == 0) 2778 ret = t_ret; 2779 return (ret); 2780 2781retry: return (DB_LOCK_NOTGRANTED); 2782} 2783 2784/* 2785 * bam_truncate_internal -- 2786 * Find high numbered pages in the internal nodes of a tree and 2787 * swap them. 2788 */ 2789static int 2790__bam_truncate_internal(dbp, ip, txn, c_data) 2791 DB *dbp; 2792 DB_THREAD_INFO *ip; 2793 DB_TXN *txn; 2794 DB_COMPACT *c_data; 2795{ 2796 BTREE_CURSOR *cp; 2797 DBC *dbc; 2798 DBT start; 2799 DB_LOCK meta_lock; 2800 PAGE *pg; 2801 db_pgno_t pgno; 2802 u_int32_t sflag; 2803 int level, local_txn, ret, t_ret; 2804 2805 dbc = NULL; 2806 memset(&start, 0, sizeof(start)); 2807 2808 if (IS_DB_AUTO_COMMIT(dbp, txn)) { 2809 local_txn = 1; 2810 txn = NULL; 2811 } else 2812 local_txn = 0; 2813 2814 level = LEAFLEVEL + 1; 2815 sflag = CS_READ | CS_GETRECNO; 2816 LOCK_INIT(meta_lock); 2817 2818new_txn: 2819 if (local_txn && 2820 (ret = __txn_begin(dbp->env, ip, NULL, &txn, 0)) != 0) 2821 goto err; 2822 2823 if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0) 2824 goto err; 2825 cp = (BTREE_CURSOR *)dbc->internal; 2826 2827 /* 2828 * If the the root is a leaf we have nothing to do. 2829 * Searching an empty RECNO tree will return NOTFOUND below and loop. 2830 */ 2831 if ((ret = __memp_fget(dbp->mpf, &cp->root, ip, txn, 0, &pg)) != 0) 2832 goto err; 2833 if (LEVEL(pg) == LEAFLEVEL) { 2834 ret = __memp_fput(dbp->mpf, ip, pg, dbp->priority); 2835 goto err; 2836 } 2837 if ((ret = __memp_fput(dbp->mpf, ip, pg, dbp->priority)) != 0) 2838 goto err; 2839 2840 pgno = PGNO_INVALID; 2841 do { 2842 if ((ret = __bam_csearch(dbc, &start, sflag, level)) != 0) { 2843 /* No more at this level, go up one. */ 2844 if (ret == DB_NOTFOUND) { 2845 level++; 2846 if (start.data != NULL) 2847 __os_free(dbp->env, start.data); 2848 memset(&start, 0, sizeof(start)); 2849 sflag = CS_READ | CS_GETRECNO; 2850 continue; 2851 } 2852 goto err; 2853 } 2854 c_data->compact_pages_examine++; 2855 2856 pg = cp->csp->page; 2857 pgno = PGNO(pg); 2858 2859 sflag = CS_NEXT | CS_GETRECNO; 2860 /* Grab info about the page and drop the stack. */ 2861 if (pgno != cp->root && (ret = __bam_savekey(dbc, 2862 pgno <= c_data->compact_truncate, &start)) != 0) { 2863 if (ret == DB_LOCK_NOTGRANTED) 2864 continue; 2865 goto err; 2866 } 2867 2868 if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0) 2869 goto err; 2870 if (pgno == cp->root) 2871 break; 2872 2873 if (pgno <= c_data->compact_truncate) 2874 continue; 2875 2876 /* Get the meta page lock before latching interior nodes. */ 2877 if (!LOCK_ISSET(meta_lock) && (ret = __db_lget(dbc, 2878 0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &meta_lock)) != 0) 2879 goto err; 2880 2881 /* Reget the page with a write latch, and its parent too. */ 2882 if ((ret = __bam_csearch(dbc, 2883 &start, CS_PARENT | CS_GETRECNO, level)) != 0) { 2884 if (ret == DB_NOTFOUND) { 2885 ret = 0; 2886 } 2887 goto err; 2888 } 2889 pg = cp->csp->page; 2890 pgno = PGNO(pg); 2891 2892 if (pgno > c_data->compact_truncate) { 2893 if ((ret = __bam_truncate_page(dbc, &pg, NULL, 1)) != 0) 2894 goto err; 2895 if (pgno == PGNO(pg)) { 2896 /* We could not allocate. Give up. */ 2897 pgno = cp->root; 2898 } 2899 } 2900 2901 if ((ret = __bam_stkrel(dbc, 2902 pgno > c_data->compact_truncate ? 0 : STK_NOLOCK)) != 0) 2903 goto err; 2904 2905 /* We are locking subtrees, so drop the write locks asap. */ 2906 if (local_txn && pgno > c_data->compact_truncate) 2907 break; 2908 } while (pgno != cp->root); 2909 2910 if ((ret = __dbc_close(dbc)) != 0) 2911 goto err; 2912 dbc = NULL; 2913 if (local_txn) { 2914 if ((ret = __txn_commit(txn, DB_TXN_NOSYNC)) != 0) 2915 goto err; 2916 txn = NULL; 2917 LOCK_INIT(meta_lock); 2918 } 2919 if (pgno != ((BTREE *)dbp->bt_internal)->bt_root) 2920 goto new_txn; 2921 2922err: if (txn != NULL && ret != 0) 2923 sflag = STK_PGONLY; 2924 else 2925 sflag = 0; 2926 if (txn == NULL) 2927 if ((t_ret = __LPUT(dbc, meta_lock)) != 0 && ret == 0) 2928 ret = t_ret; 2929 if (dbc != NULL && (t_ret = __bam_stkrel(dbc, sflag)) != 0 && ret == 0) 2930 ret = t_ret; 2931 if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) 2932 ret = t_ret; 2933 if (local_txn && 2934 txn != NULL && (t_ret = __txn_abort(txn)) != 0 && ret == 0) 2935 ret = t_ret; 2936 if (start.data != NULL) 2937 __os_free(dbp->env, start.data); 2938 return (ret); 2939} 2940 2941static int 2942__bam_setup_freelist(dbp, list, nelems) 2943 DB *dbp; 2944 db_pglist_t *list; 2945 u_int32_t nelems; 2946{ 2947 DB_MPOOLFILE *mpf; 2948 db_pgno_t *plist; 2949 int ret; 2950 2951 mpf = dbp->mpf; 2952 2953 if ((ret = __memp_alloc_freelist(mpf, nelems, &plist)) != 0) 2954 return (ret); 2955 2956 while (nelems-- != 0) 2957 *plist++ = list++->pgno; 2958 2959 return (0); 2960} 2961 2962static int 2963__bam_free_freelist(dbp, ip, txn) 2964 DB *dbp; 2965 DB_THREAD_INFO *ip; 2966 DB_TXN *txn; 2967{ 2968 DBC *dbc; 2969 DB_LOCK lock; 2970 int auto_commit, ret, t_ret; 2971 2972 LOCK_INIT(lock); 2973 auto_commit = ret = 0; 2974 2975 /* 2976 * If we are not in a transaction then we need to get 2977 * a lock on the meta page, otherwise we should already 2978 * have the lock. 2979 */ 2980 2981 dbc = NULL; 2982 if (IS_DB_AUTO_COMMIT(dbp, txn)) { 2983 /* 2984 * We must not timeout the lock or we will not free the list. 2985 * We ignore errors from txn_begin as there is little that 2986 * the application can do with the error and we want to 2987 * get the lock and free the list if at all possible. 2988 */ 2989 if (__txn_begin(dbp->env, ip, NULL, &txn, 0) == 0) { 2990 (void)__lock_set_timeout(dbp->env, 2991 txn->locker, 0, DB_SET_TXN_TIMEOUT); 2992 (void)__lock_set_timeout(dbp->env, 2993 txn->locker, 0, DB_SET_LOCK_TIMEOUT); 2994 auto_commit = 1; 2995 } 2996 /* Get a cursor so we can call __db_lget. */ 2997 if ((ret = __db_cursor(dbp, ip, txn, &dbc, 0)) != 0) 2998 return (ret); 2999 3000 if ((ret = __db_lget(dbc, 3001 0, PGNO_BASE_MD, DB_LOCK_WRITE, 0, &lock)) != 0) 3002 goto err; 3003 } 3004 3005 ret = __memp_free_freelist(dbp->mpf); 3006 3007err: if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) 3008 ret = t_ret; 3009 3010 if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) 3011 ret = t_ret; 3012 3013 if (auto_commit && __txn_abort(txn) != 0 && ret == 0) 3014 ret = t_ret; 3015 3016 return (ret); 3017} 3018#endif 3019