1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996-2009 Oracle. All rights reserved. 5 */ 6/* 7 * Copyright (c) 1990, 1993, 1994, 1995, 1996 8 * Keith Bostic. All rights reserved. 9 */ 10/* 11 * Copyright (c) 1990, 1993, 1994, 1995 12 * The Regents of the University of California. All rights reserved. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions 16 * are met: 17 * 1. Redistributions of source code must retain the above copyright 18 * notice, this list of conditions and the following disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright 20 * notice, this list of conditions and the following disclaimer in the 21 * documentation and/or other materials provided with the distribution. 22 * 3. Neither the name of the University nor the names of its contributors 23 * may be used to endorse or promote products derived from this software 24 * without specific prior written permission. 25 * 26 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 27 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 28 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 29 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 30 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 31 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 32 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 33 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 34 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 35 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 36 * SUCH DAMAGE. 37 * 38 * $Id$ 39 */ 40 41#include "db_config.h" 42 43#include "db_int.h" 44#include "dbinc/db_page.h" 45#include "dbinc/lock.h" 46#include "dbinc/mp.h" 47#include "dbinc/btree.h" 48 49static int __bam_page __P((DBC *, EPG *, EPG *)); 50static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *)); 51static int __bam_root __P((DBC *, EPG *)); 52 53/* 54 * __bam_split -- 55 * Split a page. 56 * 57 * PUBLIC: int __bam_split __P((DBC *, void *, db_pgno_t *)); 58 */ 59int 60__bam_split(dbc, arg, root_pgnop) 61 DBC *dbc; 62 void *arg; 63 db_pgno_t *root_pgnop; 64{ 65 BTREE_CURSOR *cp; 66 DB_LOCK metalock, next_lock; 67 enum { UP, DOWN } dir; 68 db_pgno_t pgno, next_pgno, root_pgno; 69 int exact, level, ret; 70 71 cp = (BTREE_CURSOR *)dbc->internal; 72 root_pgno = cp->root; 73 LOCK_INIT(next_lock); 74 next_pgno = PGNO_INVALID; 75 76 /* 77 * First get a lock on the metadata page, we will have to allocate 78 * pages and cannot get a lock while we have the search tree pinnned. 79 */ 80 81 pgno = PGNO_BASE_MD; 82 if ((ret = __db_lget(dbc, 83 0, pgno, DB_LOCK_WRITE, 0, &metalock)) != 0) 84 goto err; 85 86 /* 87 * The locking protocol we use to avoid deadlock to acquire locks by 88 * walking down the tree, but we do it as lazily as possible, locking 89 * the root only as a last resort. We expect all stack pages to have 90 * been discarded before we're called; we discard all short-term locks. 91 * 92 * When __bam_split is first called, we know that a leaf page was too 93 * full for an insert. We don't know what leaf page it was, but we 94 * have the key/recno that caused the problem. We call XX_search to 95 * reacquire the leaf page, but this time get both the leaf page and 96 * its parent, locked. We then split the leaf page and see if the new 97 * internal key will fit into the parent page. If it will, we're done. 98 * 99 * If it won't, we discard our current locks and repeat the process, 100 * only this time acquiring the parent page and its parent, locked. 101 * This process repeats until we succeed in the split, splitting the 102 * root page as the final resort. The entire process then repeats, 103 * as necessary, until we split a leaf page. 104 * 105 * XXX 106 * A traditional method of speeding this up is to maintain a stack of 107 * the pages traversed in the original search. You can detect if the 108 * stack is correct by storing the page's LSN when it was searched and 109 * comparing that LSN with the current one when it's locked during the 110 * split. This would be an easy change for this code, but I have no 111 * numbers that indicate it's worthwhile. 112 */ 113 for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) { 114 /* 115 * Acquire a page and its parent, locked. 116 */ 117retry: if ((ret = (dbc->dbtype == DB_BTREE ? 118 __bam_search(dbc, PGNO_INVALID, 119 arg, SR_WRPAIR, level, NULL, &exact) : 120 __bam_rsearch(dbc, 121 (db_recno_t *)arg, SR_WRPAIR, level, &exact))) != 0) 122 break; 123 124 if (cp->csp[0].page->pgno == root_pgno) { 125 /* we can overshoot the top of the tree. */ 126 level = cp->csp[0].page->level; 127 if (root_pgnop != NULL) 128 *root_pgnop = root_pgno; 129 } else if (root_pgnop != NULL) 130 *root_pgnop = cp->csp[-1].page->pgno; 131 132 /* 133 * Split the page if it still needs it (it's possible another 134 * thread of control has already split the page). If we are 135 * guaranteed that two items will fit on the page, the split 136 * is no longer necessary. 137 */ 138 if (2 * B_MAXSIZEONPAGE(cp->ovflsize) 139 <= (db_indx_t)P_FREESPACE(dbc->dbp, cp->csp[0].page)) { 140 if ((ret = __bam_stkrel(dbc, STK_NOLOCK)) != 0) 141 goto err; 142 goto no_split; 143 } 144 145 /* 146 * We need to try to lock the next page so we can update 147 * its PREV. 148 */ 149 if (dbc->dbtype == DB_BTREE && ISLEAF(cp->csp->page) && 150 (pgno = NEXT_PGNO(cp->csp->page)) != PGNO_INVALID) { 151 TRY_LOCK(dbc, pgno, 152 next_pgno, next_lock, DB_LOCK_WRITE, retry); 153 if (ret != 0) 154 goto err; 155 } 156 ret = cp->csp[0].page->pgno == root_pgno ? 157 __bam_root(dbc, &cp->csp[0]) : 158 __bam_page(dbc, &cp->csp[-1], &cp->csp[0]); 159 BT_STK_CLR(cp); 160 161 switch (ret) { 162 case 0: 163no_split: /* Once we've split the leaf page, we're done. */ 164 if (level == LEAFLEVEL) 165 goto done; 166 167 /* Switch directions. */ 168 if (dir == UP) 169 dir = DOWN; 170 break; 171 case DB_NEEDSPLIT: 172 /* 173 * It's possible to fail to split repeatedly, as other 174 * threads may be modifying the tree, or the page usage 175 * is sufficiently bad that we don't get enough space 176 * the first time. 177 */ 178 if (dir == DOWN) 179 dir = UP; 180 break; 181 default: 182 goto err; 183 } 184 } 185 186err: if (root_pgnop != NULL) 187 *root_pgnop = cp->root; 188done: (void)__LPUT(dbc, metalock); 189 (void)__TLPUT(dbc, next_lock); 190 return (ret); 191} 192 193/* 194 * __bam_root -- 195 * Split the root page of a btree. 196 */ 197static int 198__bam_root(dbc, cp) 199 DBC *dbc; 200 EPG *cp; 201{ 202 DB *dbp; 203 DBT log_dbt, rootent[2]; 204 DB_LOCK llock, rlock; 205 DB_LSN log_lsn; 206 DB_MPOOLFILE *mpf; 207 PAGE *lp, *rp; 208 db_indx_t split; 209 u_int32_t opflags; 210 int ret, t_ret; 211 212 dbp = dbc->dbp; 213 mpf = dbp->mpf; 214 lp = rp = NULL; 215 LOCK_INIT(llock); 216 LOCK_INIT(rlock); 217 COMPQUIET(log_dbt.data, NULL); 218 219 /* Yeah, right. */ 220 if (cp->page->level >= MAXBTREELEVEL) { 221 __db_errx(dbp->env, 222 "Too many btree levels: %d", cp->page->level); 223 return (ENOSPC); 224 } 225 226 if ((ret = __memp_dirty(mpf, 227 &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 228 goto err; 229 230 /* Create new left and right pages for the split. */ 231 if ((ret = __db_new(dbc, TYPE(cp->page), &llock, &lp)) != 0 || 232 (ret = __db_new(dbc, TYPE(cp->page), &rlock, &rp)) != 0) 233 goto err; 234 P_INIT(lp, dbp->pgsize, lp->pgno, 235 PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno, 236 cp->page->level, TYPE(cp->page)); 237 P_INIT(rp, dbp->pgsize, rp->pgno, 238 ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID, 239 cp->page->level, TYPE(cp->page)); 240 241 /* Split the page. */ 242 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) 243 goto err; 244 245 if (DBC_LOGGING(dbc)) { 246 memset(&log_dbt, 0, sizeof(log_dbt)); 247 if ((ret = 248 __os_malloc(dbp->env, dbp->pgsize, &log_dbt.data)) != 0) 249 goto err; 250 log_dbt.size = dbp->pgsize; 251 memcpy(log_dbt.data, cp->page, dbp->pgsize); 252 } 253 254 /* Clean up the new root page. */ 255 if ((ret = (dbc->dbtype == DB_RECNO ? 256 __ram_root(dbc, cp->page, lp, rp) : 257 __bam_broot(dbc, cp->page, split, lp, rp))) != 0) { 258 if (DBC_LOGGING(dbc)) 259 __os_free(dbp->env, log_dbt.data); 260 goto err; 261 } 262 263 /* Log the change. */ 264 if (DBC_LOGGING(dbc)) { 265 memset(rootent, 0, sizeof(rootent)); 266 rootent[0].data = GET_BINTERNAL(dbp, cp->page, 0); 267 rootent[1].data = GET_BINTERNAL(dbp, cp->page, 1); 268 if (dbc->dbtype == DB_RECNO) 269 rootent[0].size = rootent[1].size = RINTERNAL_SIZE; 270 else { 271 rootent[0].size = BINTERNAL_SIZE( 272 ((BINTERNAL *)rootent[0].data)->len); 273 rootent[1].size = BINTERNAL_SIZE( 274 ((BINTERNAL *)rootent[1].data)->len); 275 } 276 ZERO_LSN(log_lsn); 277 opflags = F_ISSET( 278 (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0; 279 if (dbc->dbtype == DB_RECNO) 280 opflags |= SPL_RECNO; 281 ret = __bam_split_log(dbp, 282 dbc->txn, &LSN(cp->page), 0, PGNO(lp), &LSN(lp), PGNO(rp), 283 &LSN(rp), (u_int32_t)NUM_ENT(lp), PGNO_INVALID, &log_lsn, 284 dbc->internal->root, &LSN(cp->page), 0, 285 &log_dbt, &rootent[0], &rootent[1], opflags); 286 287 __os_free(dbp->env, log_dbt.data); 288 289 if (ret != 0) 290 goto err; 291 } else 292 LSN_NOT_LOGGED(LSN(cp->page)); 293 LSN(lp) = LSN(cp->page); 294 LSN(rp) = LSN(cp->page); 295 296 /* Adjust any cursors. */ 297 ret = __bam_ca_split(dbc, cp->page->pgno, lp->pgno, rp->pgno, split, 1); 298 299 /* Success or error: release pages and locks. */ 300err: if (cp->page != NULL && (t_ret = __memp_fput(mpf, 301 dbc->thread_info, cp->page, dbc->priority)) != 0 && ret == 0) 302 ret = t_ret; 303 cp->page = NULL; 304 305 /* 306 * We are done. Put or downgrade all our locks and release 307 * the pages. 308 */ 309 if ((t_ret = __TLPUT(dbc, llock)) != 0 && ret == 0) 310 ret = t_ret; 311 if ((t_ret = __TLPUT(dbc, rlock)) != 0 && ret == 0) 312 ret = t_ret; 313 if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0) 314 ret = t_ret; 315 if (lp != NULL && (t_ret = __memp_fput(mpf, 316 dbc->thread_info, lp, dbc->priority)) != 0 && ret == 0) 317 ret = t_ret; 318 if (rp != NULL && (t_ret = __memp_fput(mpf, 319 dbc->thread_info, rp, dbc->priority)) != 0 && ret == 0) 320 ret = t_ret; 321 322 return (ret); 323} 324 325/* 326 * __bam_page -- 327 * Split the non-root page of a btree. 328 */ 329static int 330__bam_page(dbc, pp, cp) 331 DBC *dbc; 332 EPG *pp, *cp; 333{ 334 BTREE_CURSOR *bc; 335 DB *dbp; 336 DBT log_dbt, rentry; 337 DB_LOCK rplock; 338 DB_LSN log_lsn; 339 DB_LSN save_lsn; 340 DB_MPOOLFILE *mpf; 341 PAGE *lp, *rp, *alloc_rp, *tp; 342 db_indx_t split; 343 u_int32_t opflags; 344 int ret, t_ret; 345 346 dbp = dbc->dbp; 347 mpf = dbp->mpf; 348 alloc_rp = lp = rp = tp = NULL; 349 LOCK_INIT(rplock); 350 ret = -1; 351 352 /* 353 * Create new left page for the split, and fill in everything 354 * except its LSN and next-page page number. 355 * 356 * Create a new right page for the split, and fill in everything 357 * except its LSN and page number. 358 * 359 * We malloc space for both the left and right pages, so we don't get 360 * a new page from the underlying buffer pool until we know the split 361 * is going to succeed. The reason is that we can't release locks 362 * acquired during the get-a-new-page process because metadata page 363 * locks can't be discarded on failure since we may have modified the 364 * free list. So, if you assume that we're holding a write lock on the 365 * leaf page which ran out of space and started this split (e.g., we 366 * have already written records to the page, or we retrieved a record 367 * from it with the DB_RMW flag set), failing in a split with both a 368 * leaf page locked and the metadata page locked can potentially lock 369 * up the tree badly, because we've violated the rule of always locking 370 * down the tree, and never up. 371 */ 372 if ((ret = __os_malloc(dbp->env, dbp->pgsize * 2, &lp)) != 0) 373 goto err; 374 P_INIT(lp, dbp->pgsize, PGNO(cp->page), 375 ISINTERNAL(cp->page) ? PGNO_INVALID : PREV_PGNO(cp->page), 376 ISINTERNAL(cp->page) ? PGNO_INVALID : 0, 377 cp->page->level, TYPE(cp->page)); 378 379 rp = (PAGE *)((u_int8_t *)lp + dbp->pgsize); 380 P_INIT(rp, dbp->pgsize, 0, 381 ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page), 382 ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page), 383 cp->page->level, TYPE(cp->page)); 384 385 /* 386 * Split right. 387 * 388 * Only the indices are sorted on the page, i.e., the key/data pairs 389 * aren't, so it's simpler to copy the data from the split page onto 390 * two new pages instead of copying half the data to a new right page 391 * and compacting the left page in place. Since the left page can't 392 * change, we swap the original and the allocated left page after the 393 * split. 394 */ 395 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) 396 goto err; 397 398 /* 399 * Test to see if we are going to be able to insert the new pages into 400 * the parent page. The interesting failure here is that the parent 401 * page can't hold the new keys, and has to be split in turn, in which 402 * case we want to release all the locks we can. 403 */ 404 if ((ret = __bam_pinsert(dbc, pp, split, lp, rp, BPI_SPACEONLY)) != 0) 405 goto err; 406 407 /* 408 * We've got everything locked down we need, and we know the split 409 * is going to succeed. Go and get the additional page we'll need. 410 */ 411 if ((ret = __db_new(dbc, TYPE(cp->page), &rplock, &alloc_rp)) != 0) 412 goto err; 413 414 /* 415 * Prepare to fix up the previous pointer of any leaf page following 416 * the split page. Our caller has already write locked the page so 417 * we can get it without deadlocking on the parent latch. 418 */ 419 if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID && 420 (ret = __memp_fget(mpf, &NEXT_PGNO(cp->page), 421 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &tp)) != 0) 422 goto err; 423 424 /* 425 * Fix up the page numbers we didn't have before. We have to do this 426 * before calling __bam_pinsert because it may copy a page number onto 427 * the parent page and it takes the page number from its page argument. 428 */ 429 PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp); 430 431 DB_ASSERT(dbp->env, IS_DIRTY(cp->page)); 432 DB_ASSERT(dbp->env, IS_DIRTY(pp->page)); 433 434 /* Actually update the parent page. */ 435 if ((ret = __bam_pinsert(dbc, pp, split, lp, rp, BPI_NOLOGGING)) != 0) 436 goto err; 437 438 bc = (BTREE_CURSOR *)dbc->internal; 439 /* Log the change. */ 440 if (DBC_LOGGING(dbc)) { 441 memset(&log_dbt, 0, sizeof(log_dbt)); 442 log_dbt.data = cp->page; 443 log_dbt.size = dbp->pgsize; 444 memset(&rentry, 0, sizeof(rentry)); 445 rentry.data = GET_BINTERNAL(dbp, pp->page, pp->indx + 1); 446 opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0; 447 if (dbc->dbtype == DB_RECNO) { 448 opflags |= SPL_RECNO; 449 rentry.size = RINTERNAL_SIZE; 450 } else 451 rentry.size = 452 BINTERNAL_SIZE(((BINTERNAL *)rentry.data)->len); 453 if (tp == NULL) 454 ZERO_LSN(log_lsn); 455 if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0, 456 PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp), 457 &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp), 458 tp == NULL ? 0 : PGNO(tp), tp == NULL ? &log_lsn : &LSN(tp), 459 PGNO(pp->page), &LSN(pp->page), pp->indx, 460 &log_dbt, NULL, &rentry, opflags)) != 0) { 461 /* 462 * Undo the update to the parent page, which has not 463 * been logged yet. This must succeed. 464 */ 465 t_ret = __db_ditem_nolog(dbc, pp->page, 466 pp->indx + 1, rentry.size); 467 DB_ASSERT(dbp->env, t_ret == 0); 468 469 goto err; 470 } 471 472 } else 473 LSN_NOT_LOGGED(LSN(cp->page)); 474 475 /* Update the LSNs for all involved pages. */ 476 LSN(alloc_rp) = LSN(cp->page); 477 LSN(lp) = LSN(cp->page); 478 LSN(rp) = LSN(cp->page); 479 LSN(pp->page) = LSN(cp->page); 480 if (tp != NULL) { 481 /* Log record has been written; now it is safe to update next page. */ 482 PREV_PGNO(tp) = PGNO(rp); 483 LSN(tp) = LSN(cp->page); 484 } 485 486 /* 487 * Copy the left and right pages into place. There are two paths 488 * through here. Either we are logging and we set the LSNs in the 489 * logging path. However, if we are not logging, then we do not 490 * have valid LSNs on lp or rp. The correct LSNs to use are the 491 * ones on the page we got from __db_new or the one that was 492 * originally on cp->page. In both cases, we save the LSN from the 493 * real database page (not a malloc'd one) and reapply it after we 494 * do the copy. 495 */ 496 save_lsn = alloc_rp->lsn; 497 memcpy(alloc_rp, rp, LOFFSET(dbp, rp)); 498 memcpy((u_int8_t *)alloc_rp + HOFFSET(rp), 499 (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp)); 500 alloc_rp->lsn = save_lsn; 501 502 save_lsn = cp->page->lsn; 503 memcpy(cp->page, lp, LOFFSET(dbp, lp)); 504 memcpy((u_int8_t *)cp->page + HOFFSET(lp), 505 (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp)); 506 cp->page->lsn = save_lsn; 507 508 /* Adjust any cursors. */ 509 if ((ret = __bam_ca_split(dbc, 510 PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0) 511 goto err; 512 513 __os_free(dbp->env, lp); 514 515 /* 516 * Success -- write the real pages back to the store. 517 */ 518 if ((t_ret = __memp_fput(mpf, 519 dbc->thread_info, alloc_rp, dbc->priority)) != 0 && ret == 0) 520 ret = t_ret; 521 if ((t_ret = __TLPUT(dbc, rplock)) != 0 && ret == 0) 522 ret = t_ret; 523 if (tp != NULL) { 524 if ((t_ret = __memp_fput(mpf, 525 dbc->thread_info, tp, dbc->priority)) != 0 && ret == 0) 526 ret = t_ret; 527 } 528 if ((t_ret = __bam_stkrel(dbc, STK_CLRDBC)) != 0 && ret == 0) 529 ret = t_ret; 530 return (ret); 531 532err: if (lp != NULL) 533 __os_free(dbp->env, lp); 534 if (alloc_rp != NULL) 535 (void)__memp_fput(mpf, 536 dbc->thread_info, alloc_rp, dbc->priority); 537 if (tp != NULL) 538 (void)__memp_fput(mpf, dbc->thread_info, tp, dbc->priority); 539 540 if (pp->page != NULL) 541 (void)__memp_fput(mpf, 542 dbc->thread_info, pp->page, dbc->priority); 543 544 if (ret == DB_NEEDSPLIT) 545 (void)__LPUT(dbc, pp->lock); 546 else 547 (void)__TLPUT(dbc, pp->lock); 548 549 (void)__memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority); 550 551 /* 552 * We don't drop the left and right page locks. If we doing dirty 553 * reads then we need to hold the locks until we abort the transaction. 554 * If we are not transactional, we are hosed anyway as the tree 555 * is trashed. It may be better not to leak the locks. 556 */ 557 558 if (dbc->txn == NULL) 559 (void)__LPUT(dbc, rplock); 560 561 if (dbc->txn == NULL || ret == DB_NEEDSPLIT) 562 (void)__LPUT(dbc, cp->lock); 563 564 return (ret); 565} 566 567/* 568 * __bam_broot -- 569 * Fix up the btree root page after it has been split. 570 * PUBLIC: int __bam_broot __P((DBC *, PAGE *, u_int32_t, PAGE *, PAGE *)); 571 */ 572int 573__bam_broot(dbc, rootp, split, lp, rp) 574 DBC *dbc; 575 u_int32_t split; 576 PAGE *rootp, *lp, *rp; 577{ 578 BINTERNAL bi, bi0, *child_bi; 579 BKEYDATA *child_bk; 580 BOVERFLOW bo, *child_bo; 581 BTREE_CURSOR *cp; 582 DB *dbp; 583 DBT hdr, hdr0, data; 584 db_pgno_t root_pgno; 585 int ret; 586 587 dbp = dbc->dbp; 588 cp = (BTREE_CURSOR *)dbc->internal; 589 child_bo = NULL; 590 data.data = NULL; 591 memset(&bi, 0, sizeof(bi)); 592 593 switch (TYPE(rootp)) { 594 case P_IBTREE: 595 /* Copy the first key of the child page onto the root page. */ 596 child_bi = GET_BINTERNAL(dbp, rootp, split); 597 switch (B_TYPE(child_bi->type)) { 598 case B_KEYDATA: 599 bi.len = child_bi->len; 600 B_TSET(bi.type, B_KEYDATA); 601 bi.pgno = rp->pgno; 602 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 603 if ((ret = __os_malloc(dbp->env, 604 child_bi->len, &data.data)) != 0) 605 return (ret); 606 memcpy(data.data, child_bi->data, child_bi->len); 607 data.size = child_bi->len; 608 break; 609 case B_OVERFLOW: 610 /* Reuse the overflow key. */ 611 child_bo = (BOVERFLOW *)child_bi->data; 612 memset(&bo, 0, sizeof(bo)); 613 bo.type = B_OVERFLOW; 614 bo.tlen = child_bo->tlen; 615 bo.pgno = child_bo->pgno; 616 bi.len = BOVERFLOW_SIZE; 617 B_TSET(bi.type, B_OVERFLOW); 618 bi.pgno = rp->pgno; 619 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 620 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 621 break; 622 case B_DUPLICATE: 623 default: 624 goto pgfmt; 625 } 626 break; 627 case P_LDUP: 628 case P_LBTREE: 629 /* Copy the first key of the child page onto the root page. */ 630 child_bk = GET_BKEYDATA(dbp, rootp, split); 631 switch (B_TYPE(child_bk->type)) { 632 case B_KEYDATA: 633 bi.len = child_bk->len; 634 B_TSET(bi.type, B_KEYDATA); 635 bi.pgno = rp->pgno; 636 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 637 if ((ret = __os_malloc(dbp->env, 638 child_bk->len, &data.data)) != 0) 639 return (ret); 640 memcpy(data.data, child_bk->data, child_bk->len); 641 data.size = child_bk->len; 642 break; 643 case B_OVERFLOW: 644 /* Copy the overflow key. */ 645 child_bo = (BOVERFLOW *)child_bk; 646 memset(&bo, 0, sizeof(bo)); 647 bo.type = B_OVERFLOW; 648 bo.tlen = child_bo->tlen; 649 memset(&hdr, 0, sizeof(hdr)); 650 if ((ret = __db_goff(dbc, &hdr, child_bo->tlen, 651 child_bo->pgno, &hdr.data, &hdr.size)) == 0) 652 ret = __db_poff(dbc, &hdr, &bo.pgno); 653 654 if (hdr.data != NULL) 655 __os_free(dbp->env, hdr.data); 656 if (ret != 0) 657 return (ret); 658 659 bi.len = BOVERFLOW_SIZE; 660 B_TSET(bi.type, B_OVERFLOW); 661 bi.pgno = rp->pgno; 662 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 663 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 664 break; 665 case B_DUPLICATE: 666 default: 667 goto pgfmt; 668 } 669 break; 670 default: 671pgfmt: return (__db_pgfmt(dbp->env, rp->pgno)); 672 } 673 /* 674 * If the root page was a leaf page, change it into an internal page. 675 * We copy the key we split on (but not the key's data, in the case of 676 * a leaf page) to the new root page. 677 */ 678 root_pgno = cp->root; 679 P_INIT(rootp, dbp->pgsize, 680 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE); 681 682 /* 683 * The btree comparison code guarantees that the left-most key on any 684 * internal btree page is never used, so it doesn't need to be filled 685 * in. Set the record count if necessary. 686 */ 687 memset(&bi0, 0, sizeof(bi0)); 688 B_TSET(bi0.type, B_KEYDATA); 689 bi0.pgno = lp->pgno; 690 if (F_ISSET(cp, C_RECNUM)) { 691 bi0.nrecs = __bam_total(dbp, lp); 692 RE_NREC_SET(rootp, bi0.nrecs); 693 bi.nrecs = __bam_total(dbp, rp); 694 RE_NREC_ADJ(rootp, bi.nrecs); 695 } 696 DB_SET_DBT(hdr0, &bi0, SSZA(BINTERNAL, data)); 697 if ((ret = __db_pitem_nolog(dbc, rootp, 698 0, BINTERNAL_SIZE(0), &hdr0, NULL)) != 0) 699 goto err; 700 ret = __db_pitem_nolog(dbc, rootp, 1, 701 BINTERNAL_SIZE(data.size), &hdr, &data); 702 703err: if (data.data != NULL && child_bo == NULL) 704 __os_free(dbp->env, data.data); 705 return (ret); 706} 707 708/* 709 * __ram_root -- 710 * Fix up the recno root page after it has been split. 711 * PUBLIC: int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *)); 712 */ 713int 714__ram_root(dbc, rootp, lp, rp) 715 DBC *dbc; 716 PAGE *rootp, *lp, *rp; 717{ 718 DB *dbp; 719 DBT hdr; 720 RINTERNAL ri; 721 db_pgno_t root_pgno; 722 int ret; 723 724 dbp = dbc->dbp; 725 root_pgno = dbc->internal->root; 726 727 /* Initialize the page. */ 728 P_INIT(rootp, dbp->pgsize, 729 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO); 730 731 /* Initialize the header. */ 732 DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE); 733 734 /* Insert the left and right keys, set the header information. */ 735 ri.pgno = lp->pgno; 736 ri.nrecs = __bam_total(dbp, lp); 737 if ((ret = __db_pitem_nolog(dbc, 738 rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0) 739 return (ret); 740 RE_NREC_SET(rootp, ri.nrecs); 741 ri.pgno = rp->pgno; 742 ri.nrecs = __bam_total(dbp, rp); 743 if ((ret = __db_pitem_nolog(dbc, 744 rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0) 745 return (ret); 746 RE_NREC_ADJ(rootp, ri.nrecs); 747 return (0); 748} 749 750/* 751 * __bam_pinsert -- 752 * Insert a new key into a parent page, completing the split. 753 * 754 * PUBLIC: int __bam_pinsert 755 * PUBLIC: __P((DBC *, EPG *, u_int32_t, PAGE *, PAGE *, int)); 756 */ 757int 758__bam_pinsert(dbc, parent, split, lchild, rchild, flags) 759 DBC *dbc; 760 EPG *parent; 761 u_int32_t split; 762 PAGE *lchild, *rchild; 763 int flags; 764{ 765 BINTERNAL bi, *child_bi; 766 BKEYDATA *child_bk, *tmp_bk; 767 BOVERFLOW bo, *child_bo; 768 BTREE *t; 769 BTREE_CURSOR *cp; 770 DB *dbp; 771 DBT a, b, hdr, data; 772 EPG *child; 773 PAGE *ppage; 774 RINTERNAL ri; 775 db_indx_t off; 776 db_recno_t nrecs; 777 size_t (*func) __P((DB *, const DBT *, const DBT *)); 778 int (*pitem) __P((DBC *, PAGE *, u_int32_t, u_int32_t, DBT *, DBT *)); 779 u_int32_t n, nbytes, nksize, oldsize, size; 780 int ret; 781 782 dbp = dbc->dbp; 783 cp = (BTREE_CURSOR *)dbc->internal; 784 t = dbp->bt_internal; 785 ppage = parent->page; 786 child = parent + 1; 787 788 /* If handling record numbers, count records split to the right page. */ 789 nrecs = F_ISSET(cp, C_RECNUM) && 790 !LF_ISSET(BPI_SPACEONLY) ? __bam_total(dbp, rchild) : 0; 791 792 /* 793 * Now we insert the new page's first key into the parent page, which 794 * completes the split. The parent points to a PAGE and a page index 795 * offset, where the new key goes ONE AFTER the index, because we split 796 * to the right. 797 * 798 * XXX 799 * Some btree algorithms replace the key for the old page as well as 800 * the new page. We don't, as there's no reason to believe that the 801 * first key on the old page is any better than the key we have, and, 802 * in the case of a key being placed at index 0 causing the split, the 803 * key is unavailable. 804 */ 805 off = parent->indx + O_INDX; 806 if (LF_ISSET(BPI_REPLACE)) 807 oldsize = TYPE(ppage) == P_IRECNO ? RINTERNAL_PSIZE : 808 BINTERNAL_PSIZE(GET_BINTERNAL(dbp, ppage, off)->len); 809 else 810 oldsize = 0; 811 812 /* 813 * Calculate the space needed on the parent page. 814 * 815 * Prefix trees: space hack used when inserting into BINTERNAL pages. 816 * Retain only what's needed to distinguish between the new entry and 817 * the LAST entry on the page to its left. If the keys compare equal, 818 * retain the entire key. We ignore overflow keys, and the entire key 819 * must be retained for the next-to-leftmost key on the leftmost page 820 * of each level, or the search will fail. Applicable ONLY to internal 821 * pages that have leaf pages as children. Further reduction of the 822 * key between pairs of internal pages loses too much information. 823 */ 824 switch (TYPE(child->page)) { 825 case P_IBTREE: 826 child_bi = GET_BINTERNAL(dbp, child->page, split); 827 nbytes = BINTERNAL_PSIZE(child_bi->len); 828 829 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes) 830 return (DB_NEEDSPLIT); 831 if (LF_ISSET(BPI_SPACEONLY)) 832 return (0); 833 834 switch (B_TYPE(child_bi->type)) { 835 case B_KEYDATA: 836 /* Add a new record for the right page. */ 837 memset(&bi, 0, sizeof(bi)); 838 bi.len = child_bi->len; 839 B_TSET(bi.type, B_KEYDATA); 840 bi.pgno = rchild->pgno; 841 bi.nrecs = nrecs; 842 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 843 DB_SET_DBT(data, child_bi->data, child_bi->len); 844 size = BINTERNAL_SIZE(child_bi->len); 845 break; 846 case B_OVERFLOW: 847 /* Reuse the overflow key. */ 848 child_bo = (BOVERFLOW *)child_bi->data; 849 memset(&bo, 0, sizeof(bo)); 850 bo.type = B_OVERFLOW; 851 bo.tlen = child_bo->tlen; 852 bo.pgno = child_bo->pgno; 853 bi.len = BOVERFLOW_SIZE; 854 B_TSET(bi.type, B_OVERFLOW); 855 bi.pgno = rchild->pgno; 856 bi.nrecs = nrecs; 857 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 858 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 859 size = BINTERNAL_SIZE(BOVERFLOW_SIZE); 860 break; 861 case B_DUPLICATE: 862 default: 863 goto pgfmt; 864 } 865 break; 866 case P_LDUP: 867 case P_LBTREE: 868 child_bk = GET_BKEYDATA(dbp, child->page, split); 869 switch (B_TYPE(child_bk->type)) { 870 case B_KEYDATA: 871 nbytes = BINTERNAL_PSIZE(child_bk->len); 872 nksize = child_bk->len; 873 874 /* 875 * Prefix compression: 876 * We set t->bt_prefix to NULL if we have a comparison 877 * callback but no prefix compression callback. But, 878 * if we're splitting in an off-page duplicates tree, 879 * we still have to do some checking. If using the 880 * default off-page duplicates comparison routine we 881 * can use the default prefix compression callback. If 882 * not using the default off-page duplicates comparison 883 * routine, we can't do any kind of prefix compression 884 * as there's no way for an application to specify a 885 * prefix compression callback that corresponds to its 886 * comparison callback. 887 * 888 * No prefix compression if we don't have a compression 889 * function, or the key we'd compress isn't a normal 890 * key (for example, it references an overflow page). 891 * 892 * Generate a parent page key for the right child page 893 * from a comparison of the last key on the left child 894 * page and the first key on the right child page. 895 */ 896 if (F_ISSET(dbc, DBC_OPD)) { 897 if (dbp->dup_compare == __bam_defcmp) 898 func = __bam_defpfx; 899 else 900 func = NULL; 901 } else 902 func = t->bt_prefix; 903 if (func == NULL) 904 goto noprefix; 905 tmp_bk = GET_BKEYDATA(dbp, lchild, NUM_ENT(lchild) - 906 (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX)); 907 if (B_TYPE(tmp_bk->type) != B_KEYDATA) 908 goto noprefix; 909 DB_INIT_DBT(a, tmp_bk->data, tmp_bk->len); 910 DB_INIT_DBT(b, child_bk->data, child_bk->len); 911 nksize = (u_int32_t)func(dbp, &a, &b); 912 if ((n = BINTERNAL_PSIZE(nksize)) < nbytes) 913 nbytes = n; 914 else 915 nksize = child_bk->len; 916 917noprefix: if (P_FREESPACE(dbp, ppage) + oldsize < nbytes) 918 return (DB_NEEDSPLIT); 919 if (LF_ISSET(BPI_SPACEONLY)) 920 return (0); 921 922 memset(&bi, 0, sizeof(bi)); 923 bi.len = nksize; 924 B_TSET(bi.type, B_KEYDATA); 925 bi.pgno = rchild->pgno; 926 bi.nrecs = nrecs; 927 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 928 DB_SET_DBT(data, child_bk->data, nksize); 929 size = BINTERNAL_SIZE(nksize); 930 break; 931 case B_OVERFLOW: 932 nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE); 933 934 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes) 935 return (DB_NEEDSPLIT); 936 if (LF_ISSET(BPI_SPACEONLY)) 937 return (0); 938 939 /* Copy the overflow key. */ 940 child_bo = (BOVERFLOW *)child_bk; 941 memset(&bo, 0, sizeof(bo)); 942 bo.type = B_OVERFLOW; 943 bo.tlen = child_bo->tlen; 944 memset(&hdr, 0, sizeof(hdr)); 945 if ((ret = __db_goff(dbc, &hdr, child_bo->tlen, 946 child_bo->pgno, &hdr.data, &hdr.size)) == 0) 947 ret = __db_poff(dbc, &hdr, &bo.pgno); 948 949 if (hdr.data != NULL) 950 __os_free(dbp->env, hdr.data); 951 if (ret != 0) 952 return (ret); 953 954 memset(&bi, 0, sizeof(bi)); 955 bi.len = BOVERFLOW_SIZE; 956 B_TSET(bi.type, B_OVERFLOW); 957 bi.pgno = rchild->pgno; 958 bi.nrecs = nrecs; 959 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 960 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 961 size = BINTERNAL_SIZE(BOVERFLOW_SIZE); 962 963 break; 964 case B_DUPLICATE: 965 default: 966 goto pgfmt; 967 } 968 break; 969 case P_IRECNO: 970 case P_LRECNO: 971 nbytes = RINTERNAL_PSIZE; 972 973 if (P_FREESPACE(dbp, ppage) + oldsize < nbytes) 974 return (DB_NEEDSPLIT); 975 if (LF_ISSET(BPI_SPACEONLY)) 976 return (0); 977 978 /* Add a new record for the right page. */ 979 DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE); 980 ri.pgno = rchild->pgno; 981 ri.nrecs = nrecs; 982 size = RINTERNAL_SIZE; 983 data.size = 0; 984 /* 985 * For now, we are locking internal recno nodes so 986 * use two steps. 987 */ 988 if (LF_ISSET(BPI_REPLACE)) { 989 if ((ret = __bam_ditem(dbc, ppage, off)) != 0) 990 return (ret); 991 LF_CLR(BPI_REPLACE); 992 } 993 break; 994 default: 995pgfmt: return (__db_pgfmt(dbp->env, PGNO(child->page))); 996 } 997 998 if (LF_ISSET(BPI_REPLACE)) { 999 DB_ASSERT(dbp->env, !LF_ISSET(BPI_NOLOGGING)); 1000 if ((ret = __bam_irep(dbc, ppage, 1001 off, &hdr, data.size != 0 ? &data : NULL)) != 0) 1002 return (ret); 1003 } else { 1004 if (LF_ISSET(BPI_NOLOGGING)) 1005 pitem = __db_pitem_nolog; 1006 else 1007 pitem = __db_pitem; 1008 1009 if ((ret = pitem(dbc, ppage, 1010 off, size, &hdr, data.size != 0 ? &data : NULL)) != 0) 1011 return (ret); 1012 } 1013 1014 /* 1015 * If a Recno or Btree with record numbers AM page, or an off-page 1016 * duplicates tree, adjust the parent page's left page record count. 1017 */ 1018 if (F_ISSET(cp, C_RECNUM) && !LF_ISSET(BPI_NORECNUM)) { 1019 /* Log the change. */ 1020 if (DBC_LOGGING(dbc)) { 1021 if ((ret = __bam_cadjust_log(dbp, dbc->txn, 1022 &LSN(ppage), 0, PGNO(ppage), &LSN(ppage), 1023 parent->indx, -(int32_t)nrecs, 0)) != 0) 1024 return (ret); 1025 } else 1026 LSN_NOT_LOGGED(LSN(ppage)); 1027 1028 /* Update the left page count. */ 1029 if (dbc->dbtype == DB_RECNO) 1030 GET_RINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs; 1031 else 1032 GET_BINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs; 1033 } 1034 1035 return (0); 1036} 1037 1038/* 1039 * __bam_psplit -- 1040 * Do the real work of splitting the page. 1041 */ 1042static int 1043__bam_psplit(dbc, cp, lp, rp, splitret) 1044 DBC *dbc; 1045 EPG *cp; 1046 PAGE *lp, *rp; 1047 db_indx_t *splitret; 1048{ 1049 DB *dbp; 1050 PAGE *pp; 1051 db_indx_t half, *inp, nbytes, off, splitp, top; 1052 int adjust, cnt, iflag, isbigkey, ret; 1053 1054 dbp = dbc->dbp; 1055 pp = cp->page; 1056 inp = P_INP(dbp, pp); 1057 adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX; 1058 1059 /* 1060 * If we're splitting the first (last) page on a level because we're 1061 * inserting (appending) a key to it, it's likely that the data is 1062 * sorted. Moving a single item to the new page is less work and can 1063 * push the fill factor higher than normal. This is trivial when we 1064 * are splitting a new page before the beginning of the tree, all of 1065 * the interesting tests are against values of 0. 1066 * 1067 * Catching appends to the tree is harder. In a simple append, we're 1068 * inserting an item that sorts past the end of the tree; the cursor 1069 * will point past the last element on the page. But, in trees with 1070 * duplicates, the cursor may point to the last entry on the page -- 1071 * in this case, the entry will also be the last element of a duplicate 1072 * set (the last because the search call specified the SR_DUPLAST flag). 1073 * The only way to differentiate between an insert immediately before 1074 * the last item in a tree or an append after a duplicate set which is 1075 * also the last item in the tree is to call the comparison function. 1076 * When splitting internal pages during an append, the search code 1077 * guarantees the cursor always points to the largest page item less 1078 * than the new internal entry. To summarize, we want to catch three 1079 * possible index values: 1080 * 1081 * NUM_ENT(page) Btree/Recno leaf insert past end-of-tree 1082 * NUM_ENT(page) - O_INDX Btree or Recno internal insert past EOT 1083 * NUM_ENT(page) - P_INDX Btree leaf insert past EOT after a set 1084 * of duplicates 1085 * 1086 * two of which, (NUM_ENT(page) - O_INDX or P_INDX) might be an insert 1087 * near the end of the tree, and not after the end of the tree at all. 1088 * Do a simple test which might be wrong because calling the comparison 1089 * functions is expensive. Regardless, it's not a big deal if we're 1090 * wrong, we'll do the split the right way next time. 1091 */ 1092 off = 0; 1093 if (NEXT_PGNO(pp) == PGNO_INVALID && cp->indx >= NUM_ENT(pp) - adjust) 1094 off = NUM_ENT(pp) - adjust; 1095 else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0) 1096 off = adjust; 1097 if (off != 0) 1098 goto sort; 1099 1100 /* 1101 * Split the data to the left and right pages. Try not to split on 1102 * an overflow key. (Overflow keys on internal pages will slow down 1103 * searches.) Refuse to split in the middle of a set of duplicates. 1104 * 1105 * First, find the optimum place to split. 1106 * 1107 * It's possible to try and split past the last record on the page if 1108 * there's a very large record at the end of the page. Make sure this 1109 * doesn't happen by bounding the check at the next-to-last entry on 1110 * the page. 1111 * 1112 * Note, we try and split half the data present on the page. This is 1113 * because another process may have already split the page and left 1114 * it half empty. We don't try and skip the split -- we don't know 1115 * how much space we're going to need on the page, and we may need up 1116 * to half the page for a big item, so there's no easy test to decide 1117 * if we need to split or not. Besides, if two threads are inserting 1118 * data into the same place in the database, we're probably going to 1119 * need more space soon anyway. 1120 */ 1121 top = NUM_ENT(pp) - adjust; 1122 half = (dbp->pgsize - HOFFSET(pp)) / 2; 1123 for (nbytes = 0, off = 0; off < top && nbytes < half; ++off) 1124 switch (TYPE(pp)) { 1125 case P_IBTREE: 1126 if (B_TYPE( 1127 GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA) 1128 nbytes += BINTERNAL_SIZE( 1129 GET_BINTERNAL(dbp, pp, off)->len); 1130 else 1131 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE); 1132 break; 1133 case P_LBTREE: 1134 if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1135 B_KEYDATA) 1136 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp, 1137 pp, off)->len); 1138 else 1139 nbytes += BOVERFLOW_SIZE; 1140 1141 ++off; 1142 /* FALLTHROUGH */ 1143 case P_LDUP: 1144 case P_LRECNO: 1145 if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1146 B_KEYDATA) 1147 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp, 1148 pp, off)->len); 1149 else 1150 nbytes += BOVERFLOW_SIZE; 1151 break; 1152 case P_IRECNO: 1153 nbytes += RINTERNAL_SIZE; 1154 break; 1155 default: 1156 return (__db_pgfmt(dbp->env, pp->pgno)); 1157 } 1158sort: splitp = off; 1159 1160 /* 1161 * Splitp is either at or just past the optimum split point. If the 1162 * tree type is such that we're going to promote a key to an internal 1163 * page, and our current choice is an overflow key, look for something 1164 * close by that's smaller. 1165 */ 1166 switch (TYPE(pp)) { 1167 case P_IBTREE: 1168 iflag = 1; 1169 isbigkey = 1170 B_TYPE(GET_BINTERNAL(dbp, pp, off)->type) != B_KEYDATA; 1171 break; 1172 case P_LBTREE: 1173 case P_LDUP: 1174 iflag = 0; 1175 isbigkey = B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) != 1176 B_KEYDATA; 1177 break; 1178 default: 1179 iflag = isbigkey = 0; 1180 } 1181 if (isbigkey) 1182 for (cnt = 1; cnt <= 3; ++cnt) { 1183 off = splitp + cnt * adjust; 1184 if (off < (db_indx_t)NUM_ENT(pp) && 1185 ((iflag && B_TYPE( 1186 GET_BINTERNAL(dbp, pp,off)->type) == B_KEYDATA) || 1187 B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1188 B_KEYDATA)) { 1189 splitp = off; 1190 break; 1191 } 1192 if (splitp <= (db_indx_t)(cnt * adjust)) 1193 continue; 1194 off = splitp - cnt * adjust; 1195 if (iflag ? B_TYPE( 1196 GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA : 1197 B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1198 B_KEYDATA) { 1199 splitp = off; 1200 break; 1201 } 1202 } 1203 1204 /* 1205 * We can't split in the middle a set of duplicates. We know that 1206 * no duplicate set can take up more than about 25% of the page, 1207 * because that's the point where we push it off onto a duplicate 1208 * page set. So, this loop can't be unbounded. 1209 */ 1210 if (TYPE(pp) == P_LBTREE && 1211 inp[splitp] == inp[splitp - adjust]) 1212 for (cnt = 1;; ++cnt) { 1213 off = splitp + cnt * adjust; 1214 if (off < NUM_ENT(pp) && 1215 inp[splitp] != inp[off]) { 1216 splitp = off; 1217 break; 1218 } 1219 if (splitp <= (db_indx_t)(cnt * adjust)) 1220 continue; 1221 off = splitp - cnt * adjust; 1222 if (inp[splitp] != inp[off]) { 1223 splitp = off + adjust; 1224 break; 1225 } 1226 } 1227 1228 /* We're going to split at splitp. */ 1229 if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0) 1230 return (ret); 1231 if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0) 1232 return (ret); 1233 1234 *splitret = splitp; 1235 return (0); 1236} 1237 1238/* 1239 * __bam_copy -- 1240 * Copy a set of records from one page to another. 1241 * 1242 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t)); 1243 */ 1244int 1245__bam_copy(dbp, pp, cp, nxt, stop) 1246 DB *dbp; 1247 PAGE *pp, *cp; 1248 u_int32_t nxt, stop; 1249{ 1250 BINTERNAL internal; 1251 db_indx_t *cinp, nbytes, off, *pinp; 1252 1253 cinp = P_INP(dbp, cp); 1254 pinp = P_INP(dbp, pp); 1255 /* 1256 * Nxt is the offset of the next record to be placed on the target page. 1257 */ 1258 for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) { 1259 switch (TYPE(pp)) { 1260 case P_IBTREE: 1261 if (off == 0 && nxt != 0) 1262 nbytes = BINTERNAL_SIZE(0); 1263 else if (B_TYPE( 1264 GET_BINTERNAL(dbp, pp, nxt)->type) == B_KEYDATA) 1265 nbytes = BINTERNAL_SIZE( 1266 GET_BINTERNAL(dbp, pp, nxt)->len); 1267 else 1268 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE); 1269 break; 1270 case P_LBTREE: 1271 /* 1272 * If we're on a key and it's a duplicate, just copy 1273 * the offset. 1274 */ 1275 if (off != 0 && (nxt % P_INDX) == 0 && 1276 pinp[nxt] == pinp[nxt - P_INDX]) { 1277 cinp[off] = cinp[off - P_INDX]; 1278 continue; 1279 } 1280 /* FALLTHROUGH */ 1281 case P_LDUP: 1282 case P_LRECNO: 1283 if (B_TYPE(GET_BKEYDATA(dbp, pp, nxt)->type) == 1284 B_KEYDATA) 1285 nbytes = BKEYDATA_SIZE(GET_BKEYDATA(dbp, 1286 pp, nxt)->len); 1287 else 1288 nbytes = BOVERFLOW_SIZE; 1289 break; 1290 case P_IRECNO: 1291 nbytes = RINTERNAL_SIZE; 1292 break; 1293 default: 1294 return (__db_pgfmt(dbp->env, pp->pgno)); 1295 } 1296 cinp[off] = HOFFSET(cp) -= nbytes; 1297 if (off == 0 && nxt != 0 && TYPE(pp) == P_IBTREE) { 1298 internal.len = 0; 1299 UMRW_SET(internal.unused); 1300 internal.type = B_KEYDATA; 1301 internal.pgno = GET_BINTERNAL(dbp, pp, nxt)->pgno; 1302 internal.nrecs = GET_BINTERNAL(dbp, pp, nxt)->nrecs; 1303 memcpy(P_ENTRY(dbp, cp, off), &internal, nbytes); 1304 } 1305 else 1306 memcpy(P_ENTRY(dbp, cp, off), 1307 P_ENTRY(dbp, pp, nxt), nbytes); 1308 } 1309 return (0); 1310} 1311