1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996,2008 Oracle. All rights reserved. 5 */ 6/* 7 * Copyright (c) 1990, 1993, 1994, 1995, 1996 8 * Keith Bostic. All rights reserved. 9 */ 10/* 11 * Copyright (c) 1990, 1993, 1994, 1995 12 * The Regents of the University of California. All rights reserved. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions 16 * are met: 17 * 1. Redistributions of source code must retain the above copyright 18 * notice, this list of conditions and the following disclaimer. 19 * 2. Redistributions in binary form must reproduce the above copyright 20 * notice, this list of conditions and the following disclaimer in the 21 * documentation and/or other materials provided with the distribution. 22 * 3. Neither the name of the University nor the names of its contributors 23 * may be used to endorse or promote products derived from this software 24 * without specific prior written permission. 25 * 26 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 27 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 28 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 29 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 30 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 31 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 32 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 33 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 34 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 35 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 36 * SUCH DAMAGE. 37 * 38 * $Id: bt_split.c,v 12.29 2008/01/08 20:57:59 bostic Exp $ 39 */ 40 41#include "db_config.h" 42 43#include "db_int.h" 44#include "dbinc/db_page.h" 45#include "dbinc/lock.h" 46#include "dbinc/mp.h" 47#include "dbinc/btree.h" 48 49static int __bam_broot __P((DBC *, PAGE *, u_int32_t, PAGE *, PAGE *)); 50static int __bam_page __P((DBC *, EPG *, EPG *)); 51static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *)); 52static int __bam_root __P((DBC *, EPG *)); 53static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *)); 54 55/* 56 * __bam_split -- 57 * Split a page. 58 * 59 * PUBLIC: int __bam_split __P((DBC *, void *, db_pgno_t *)); 60 */ 61int 62__bam_split(dbc, arg, root_pgnop) 63 DBC *dbc; 64 void *arg; 65 db_pgno_t *root_pgnop; 66{ 67 BTREE_CURSOR *cp; 68 enum { UP, DOWN } dir; 69 db_pgno_t root_pgno; 70 int exact, level, ret; 71 72 cp = (BTREE_CURSOR *)dbc->internal; 73 root_pgno = cp->root; 74 75 /* 76 * The locking protocol we use to avoid deadlock to acquire locks by 77 * walking down the tree, but we do it as lazily as possible, locking 78 * the root only as a last resort. We expect all stack pages to have 79 * been discarded before we're called; we discard all short-term locks. 80 * 81 * When __bam_split is first called, we know that a leaf page was too 82 * full for an insert. We don't know what leaf page it was, but we 83 * have the key/recno that caused the problem. We call XX_search to 84 * reacquire the leaf page, but this time get both the leaf page and 85 * its parent, locked. We then split the leaf page and see if the new 86 * internal key will fit into the parent page. If it will, we're done. 87 * 88 * If it won't, we discard our current locks and repeat the process, 89 * only this time acquiring the parent page and its parent, locked. 90 * This process repeats until we succeed in the split, splitting the 91 * root page as the final resort. The entire process then repeats, 92 * as necessary, until we split a leaf page. 93 * 94 * XXX 95 * A traditional method of speeding this up is to maintain a stack of 96 * the pages traversed in the original search. You can detect if the 97 * stack is correct by storing the page's LSN when it was searched and 98 * comparing that LSN with the current one when it's locked during the 99 * split. This would be an easy change for this code, but I have no 100 * numbers that indicate it's worthwhile. 101 */ 102 for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) { 103 /* 104 * Acquire a page and its parent, locked. 105 */ 106 if ((ret = (dbc->dbtype == DB_BTREE ? 107 __bam_search(dbc, PGNO_INVALID, 108 arg, SR_WRPAIR, level, NULL, &exact) : 109 __bam_rsearch(dbc, 110 (db_recno_t *)arg, SR_WRPAIR, level, &exact))) != 0) 111 break; 112 113 if (root_pgnop != NULL) 114 *root_pgnop = cp->csp[0].page->pgno == root_pgno ? 115 root_pgno : cp->csp[-1].page->pgno; 116 /* 117 * Split the page if it still needs it (it's possible another 118 * thread of control has already split the page). If we are 119 * guaranteed that two items will fit on the page, the split 120 * is no longer necessary. 121 */ 122 if (2 * B_MAXSIZEONPAGE(cp->ovflsize) 123 <= (db_indx_t)P_FREESPACE(dbc->dbp, cp->csp[0].page)) { 124 __bam_stkrel(dbc, STK_NOLOCK); 125 break; 126 } 127 ret = cp->csp[0].page->pgno == root_pgno ? 128 __bam_root(dbc, &cp->csp[0]) : 129 __bam_page(dbc, &cp->csp[-1], &cp->csp[0]); 130 BT_STK_CLR(cp); 131 132 switch (ret) { 133 case 0: 134 /* Once we've split the leaf page, we're done. */ 135 if (level == LEAFLEVEL) 136 return (0); 137 138 /* Switch directions. */ 139 if (dir == UP) 140 dir = DOWN; 141 break; 142 case DB_NEEDSPLIT: 143 /* 144 * It's possible to fail to split repeatedly, as other 145 * threads may be modifying the tree, or the page usage 146 * is sufficiently bad that we don't get enough space 147 * the first time. 148 */ 149 if (dir == DOWN) 150 dir = UP; 151 break; 152 default: 153 goto err; 154 } 155 } 156 157err: if (root_pgnop != NULL) 158 *root_pgnop = cp->root; 159 return (ret); 160} 161 162/* 163 * __bam_root -- 164 * Split the root page of a btree. 165 */ 166static int 167__bam_root(dbc, cp) 168 DBC *dbc; 169 EPG *cp; 170{ 171 DB *dbp; 172 DBT log_dbt; 173 DB_LSN log_lsn; 174 DB_MPOOLFILE *mpf; 175 PAGE *lp, *rp; 176 db_indx_t split; 177 u_int32_t opflags; 178 int ret, t_ret; 179 180 dbp = dbc->dbp; 181 mpf = dbp->mpf; 182 lp = rp = NULL; 183 184 /* Yeah, right. */ 185 if (cp->page->level >= MAXBTREELEVEL) { 186 __db_errx(dbp->env, 187 "Too many btree levels: %d", cp->page->level); 188 ret = ENOSPC; 189 goto err; 190 } 191 192 if ((ret = __memp_dirty(mpf, 193 &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 194 goto err; 195 196 /* Create new left and right pages for the split. */ 197 if ((ret = __db_new(dbc, TYPE(cp->page), &lp)) != 0 || 198 (ret = __db_new(dbc, TYPE(cp->page), &rp)) != 0) 199 goto err; 200 P_INIT(lp, dbp->pgsize, lp->pgno, 201 PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno, 202 cp->page->level, TYPE(cp->page)); 203 P_INIT(rp, dbp->pgsize, rp->pgno, 204 ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID, 205 cp->page->level, TYPE(cp->page)); 206 207 /* Split the page. */ 208 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) 209 goto err; 210 211 /* Log the change. */ 212 if (DBC_LOGGING(dbc)) { 213 memset(&log_dbt, 0, sizeof(log_dbt)); 214 log_dbt.data = cp->page; 215 log_dbt.size = dbp->pgsize; 216 ZERO_LSN(log_lsn); 217 opflags = F_ISSET( 218 (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0; 219 if ((ret = __bam_split_log(dbp, 220 dbc->txn, &LSN(cp->page), 0, PGNO(lp), &LSN(lp), PGNO(rp), 221 &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &log_lsn, 222 dbc->internal->root, &log_dbt, opflags)) != 0) 223 goto err; 224 } else 225 LSN_NOT_LOGGED(LSN(cp->page)); 226 LSN(lp) = LSN(cp->page); 227 LSN(rp) = LSN(cp->page); 228 229 /* Clean up the new root page. */ 230 if ((ret = (dbc->dbtype == DB_RECNO ? 231 __ram_root(dbc, cp->page, lp, rp) : 232 __bam_broot(dbc, cp->page, split, lp, rp))) != 0) 233 goto err; 234 235 /* Adjust any cursors. */ 236 ret = __bam_ca_split(dbc, cp->page->pgno, lp->pgno, rp->pgno, split, 1); 237 238 /* Success or error: release pages and locks. */ 239err: if ((t_ret = __memp_fput(mpf, 240 dbc->thread_info, cp->page, dbc->priority)) != 0 && ret == 0) 241 ret = t_ret; 242 if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0) 243 ret = t_ret; 244 if (lp != NULL && (t_ret = __memp_fput(mpf, 245 dbc->thread_info, lp, dbc->priority)) != 0 && ret == 0) 246 ret = t_ret; 247 if (rp != NULL && (t_ret = __memp_fput(mpf, 248 dbc->thread_info, rp, dbc->priority)) != 0 && ret == 0) 249 ret = t_ret; 250 251 return (ret); 252} 253 254/* 255 * __bam_page -- 256 * Split the non-root page of a btree. 257 */ 258static int 259__bam_page(dbc, pp, cp) 260 DBC *dbc; 261 EPG *pp, *cp; 262{ 263 BTREE_CURSOR *bc; 264 DB *dbp; 265 DBT log_dbt; 266 DB_LOCK rplock, tplock; 267 DB_LSN log_lsn; 268 DB_LSN save_lsn; 269 DB_MPOOLFILE *mpf; 270 PAGE *lp, *rp, *alloc_rp, *tp; 271 db_indx_t split; 272 u_int32_t opflags; 273 int ret, t_ret; 274 275 dbp = dbc->dbp; 276 mpf = dbp->mpf; 277 alloc_rp = lp = rp = tp = NULL; 278 LOCK_INIT(rplock); 279 LOCK_INIT(tplock); 280 ret = -1; 281 282 /* 283 * Create new left page for the split, and fill in everything 284 * except its LSN and next-page page number. 285 * 286 * Create a new right page for the split, and fill in everything 287 * except its LSN and page number. 288 * 289 * We malloc space for both the left and right pages, so we don't get 290 * a new page from the underlying buffer pool until we know the split 291 * is going to succeed. The reason is that we can't release locks 292 * acquired during the get-a-new-page process because metadata page 293 * locks can't be discarded on failure since we may have modified the 294 * free list. So, if you assume that we're holding a write lock on the 295 * leaf page which ran out of space and started this split (e.g., we 296 * have already written records to the page, or we retrieved a record 297 * from it with the DB_RMW flag set), failing in a split with both a 298 * leaf page locked and the metadata page locked can potentially lock 299 * up the tree badly, because we've violated the rule of always locking 300 * down the tree, and never up. 301 */ 302 if ((ret = __os_malloc(dbp->env, dbp->pgsize * 2, &lp)) != 0) 303 goto err; 304 P_INIT(lp, dbp->pgsize, PGNO(cp->page), 305 ISINTERNAL(cp->page) ? PGNO_INVALID : PREV_PGNO(cp->page), 306 ISINTERNAL(cp->page) ? PGNO_INVALID : 0, 307 cp->page->level, TYPE(cp->page)); 308 309 rp = (PAGE *)((u_int8_t *)lp + dbp->pgsize); 310 P_INIT(rp, dbp->pgsize, 0, 311 ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page), 312 ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page), 313 cp->page->level, TYPE(cp->page)); 314 315 /* 316 * Split right. 317 * 318 * Only the indices are sorted on the page, i.e., the key/data pairs 319 * aren't, so it's simpler to copy the data from the split page onto 320 * two new pages instead of copying half the data to a new right page 321 * and compacting the left page in place. Since the left page can't 322 * change, we swap the original and the allocated left page after the 323 * split. 324 */ 325 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) 326 goto err; 327 328 /* 329 * Test to see if we are going to be able to insert the new pages into 330 * the parent page. The interesting failure here is that the parent 331 * page can't hold the new keys, and has to be split in turn, in which 332 * case we want to release all the locks we can. 333 */ 334 if ((ret = __bam_pinsert(dbc, pp, split, lp, rp, BPI_SPACEONLY)) != 0) 335 goto err; 336 337 /* 338 * Fix up the previous pointer of any leaf page following the split 339 * page. 340 * 341 * There's interesting deadlock situations here as we try to write-lock 342 * a page that's not in our direct ancestry. Consider a cursor walking 343 * backward through the leaf pages, that has our following page locked, 344 * and is waiting on a lock for the page we're splitting. In that case 345 * we're going to deadlock here. It's probably OK, stepping backward 346 * through the tree isn't a common operation. 347 */ 348 if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID) { 349 if ((ret = __db_lget(dbc, 350 0, NEXT_PGNO(cp->page), DB_LOCK_WRITE, 0, &tplock)) != 0) 351 goto err; 352 if ((ret = __memp_fget(mpf, &NEXT_PGNO(cp->page), 353 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &tp)) != 0) 354 goto err; 355 } 356 357 /* 358 * We've got everything locked down we need, and we know the split 359 * is going to succeed. Go and get the additional page we'll need. 360 */ 361 if ((ret = __db_new(dbc, TYPE(cp->page), &alloc_rp)) != 0) 362 goto err; 363 364 /* 365 * Lock the new page. We need to do this for two reasons: first, the 366 * fast-lookup code might have a reference to this page in bt_lpgno if 367 * the page was recently deleted from the tree, and that code doesn't 368 * walk the tree and so won't encounter the parent's page lock. 369 * Second, a dirty reader could get to this page via the parent or old 370 * page after the split is done but before the transaction is committed 371 * or aborted. 372 */ 373 if ((ret = __db_lget(dbc, 374 0, PGNO(alloc_rp), DB_LOCK_WRITE, 0, &rplock)) != 0) 375 goto err; 376 377 /* 378 * Fix up the page numbers we didn't have before. We have to do this 379 * before calling __bam_pinsert because it may copy a page number onto 380 * the parent page and it takes the page number from its page argument. 381 */ 382 PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp); 383 384 if ((ret = __memp_dirty(mpf, 385 &cp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 386 goto err; 387 388 /* Actually update the parent page. */ 389 if ((ret = __memp_dirty(mpf, 390 &pp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 || 391 (ret = __bam_pinsert(dbc, pp, split, lp, rp, 0)) != 0) 392 goto err; 393 394 bc = (BTREE_CURSOR *)dbc->internal; 395 /* Log the change. */ 396 if (DBC_LOGGING(dbc)) { 397 memset(&log_dbt, 0, sizeof(log_dbt)); 398 log_dbt.data = cp->page; 399 log_dbt.size = dbp->pgsize; 400 if (tp == NULL) 401 ZERO_LSN(log_lsn); 402 opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0; 403 if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0, 404 PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp), 405 &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp), 406 tp == NULL ? 0 : PGNO(tp), 407 tp == NULL ? &log_lsn : &LSN(tp), 408 PGNO_INVALID, &log_dbt, opflags)) != 0) 409 goto err; 410 411 } else 412 LSN_NOT_LOGGED(LSN(cp->page)); 413 414 /* Update the LSNs for all involved pages. */ 415 LSN(alloc_rp) = LSN(cp->page); 416 LSN(lp) = LSN(cp->page); 417 LSN(rp) = LSN(cp->page); 418 if (tp != NULL) 419 LSN(tp) = LSN(cp->page); 420 421 /* 422 * Copy the left and right pages into place. There are two paths 423 * through here. Either we are logging and we set the LSNs in the 424 * logging path. However, if we are not logging, then we do not 425 * have valid LSNs on lp or rp. The correct LSNs to use are the 426 * ones on the page we got from __db_new or the one that was 427 * originally on cp->page. In both cases, we save the LSN from the 428 * real database page (not a malloc'd one) and reapply it after we 429 * do the copy. 430 */ 431 save_lsn = alloc_rp->lsn; 432 memcpy(alloc_rp, rp, LOFFSET(dbp, rp)); 433 memcpy((u_int8_t *)alloc_rp + HOFFSET(rp), 434 (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp)); 435 alloc_rp->lsn = save_lsn; 436 437 save_lsn = cp->page->lsn; 438 memcpy(cp->page, lp, LOFFSET(dbp, lp)); 439 memcpy((u_int8_t *)cp->page + HOFFSET(lp), 440 (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp)); 441 cp->page->lsn = save_lsn; 442 443 /* Fix up the next-page link. */ 444 if (tp != NULL) 445 PREV_PGNO(tp) = PGNO(rp); 446 447 /* Adjust any cursors. */ 448 if ((ret = __bam_ca_split(dbc, 449 PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0) 450 goto err; 451 452 __os_free(dbp->env, lp); 453 454 /* 455 * Success -- write the real pages back to the store. As we never 456 * acquired any sort of lock on the new page, we release it before 457 * releasing locks on the pages that reference it. We're finished 458 * modifying the page so it's not really necessary, but it's neater. 459 */ 460 if ((t_ret = __memp_fput(mpf, 461 dbc->thread_info, alloc_rp, dbc->priority)) != 0 && ret == 0) 462 ret = t_ret; 463 if ((t_ret = __TLPUT(dbc, rplock)) != 0 && ret == 0) 464 ret = t_ret; 465 if ((t_ret = __memp_fput(mpf, 466 dbc->thread_info, pp->page, dbc->priority)) != 0 && ret == 0) 467 ret = t_ret; 468 if ((t_ret = __TLPUT(dbc, pp->lock)) != 0 && ret == 0) 469 ret = t_ret; 470 if ((t_ret = __memp_fput(mpf, 471 dbc->thread_info, cp->page, dbc->priority)) != 0 && ret == 0) 472 ret = t_ret; 473 if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0) 474 ret = t_ret; 475 if (tp != NULL) { 476 if ((t_ret = __memp_fput(mpf, 477 dbc->thread_info, tp, dbc->priority)) != 0 && ret == 0) 478 ret = t_ret; 479 if ((t_ret = __TLPUT(dbc, tplock)) != 0 && ret == 0) 480 ret = t_ret; 481 } 482 return (ret); 483 484err: if (lp != NULL) 485 __os_free(dbp->env, lp); 486 if (alloc_rp != NULL) 487 (void)__memp_fput(mpf, 488 dbc->thread_info, alloc_rp, dbc->priority); 489 if (tp != NULL) 490 (void)__memp_fput(mpf, dbc->thread_info, tp, dbc->priority); 491 492 /* We never updated the new or next pages, we can release them. */ 493 (void)__LPUT(dbc, rplock); 494 (void)__LPUT(dbc, tplock); 495 496 (void)__memp_fput(mpf, dbc->thread_info, pp->page, dbc->priority); 497 if (ret == DB_NEEDSPLIT) 498 (void)__LPUT(dbc, pp->lock); 499 else 500 (void)__TLPUT(dbc, pp->lock); 501 502 (void)__memp_fput(mpf, dbc->thread_info, cp->page, dbc->priority); 503 if (ret == DB_NEEDSPLIT) 504 (void)__LPUT(dbc, cp->lock); 505 else 506 (void)__TLPUT(dbc, cp->lock); 507 508 return (ret); 509} 510 511/* 512 * __bam_broot -- 513 * Fix up the btree root page after it has been split. 514 */ 515static int 516__bam_broot(dbc, rootp, split, lp, rp) 517 DBC *dbc; 518 u_int32_t split; 519 PAGE *rootp, *lp, *rp; 520{ 521 BINTERNAL bi, bi0, *child_bi; 522 BKEYDATA *child_bk; 523 BOVERFLOW bo, *child_bo; 524 BTREE_CURSOR *cp; 525 DB *dbp; 526 DBT hdr, hdr0, data; 527 db_pgno_t root_pgno; 528 int ret; 529 530 dbp = dbc->dbp; 531 cp = (BTREE_CURSOR *)dbc->internal; 532 child_bo = NULL; 533 data.data = NULL; 534 memset(&bi, 0, sizeof(bi)); 535 536 switch (TYPE(rootp)) { 537 case P_IBTREE: 538 /* Copy the first key of the child page onto the root page. */ 539 child_bi = GET_BINTERNAL(dbp, rootp, split); 540 switch (B_TYPE(child_bi->type)) { 541 case B_KEYDATA: 542 bi.len = child_bi->len; 543 B_TSET(bi.type, B_KEYDATA); 544 bi.pgno = rp->pgno; 545 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 546 if ((ret = __os_malloc(dbp->env, 547 child_bi->len, &data.data)) != 0) 548 return (ret); 549 memcpy(data.data, child_bi->data, child_bi->len); 550 data.size = child_bi->len; 551 break; 552 case B_OVERFLOW: 553 /* Reuse the overflow key. */ 554 child_bo = (BOVERFLOW *)child_bi->data; 555 memset(&bo, 0, sizeof(bo)); 556 bo.type = B_OVERFLOW; 557 bo.tlen = child_bo->tlen; 558 bo.pgno = child_bo->pgno; 559 bi.len = BOVERFLOW_SIZE; 560 B_TSET(bi.type, B_OVERFLOW); 561 bi.pgno = rp->pgno; 562 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 563 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 564 break; 565 case B_DUPLICATE: 566 default: 567 goto pgfmt; 568 } 569 break; 570 case P_LDUP: 571 case P_LBTREE: 572 /* Copy the first key of the child page onto the root page. */ 573 child_bk = GET_BKEYDATA(dbp, rootp, split); 574 switch (B_TYPE(child_bk->type)) { 575 case B_KEYDATA: 576 bi.len = child_bk->len; 577 B_TSET(bi.type, B_KEYDATA); 578 bi.pgno = rp->pgno; 579 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 580 if ((ret = __os_malloc(dbp->env, 581 child_bk->len, &data.data)) != 0) 582 return (ret); 583 memcpy(data.data, child_bk->data, child_bk->len); 584 data.size = child_bk->len; 585 break; 586 case B_OVERFLOW: 587 /* Copy the overflow key. */ 588 child_bo = (BOVERFLOW *)child_bk; 589 memset(&bo, 0, sizeof(bo)); 590 bo.type = B_OVERFLOW; 591 bo.tlen = child_bo->tlen; 592 memset(&hdr, 0, sizeof(hdr)); 593 if ((ret = __db_goff(dbp, dbc->thread_info, 594 dbc->txn, &hdr, child_bo->tlen, 595 child_bo->pgno, &hdr.data, &hdr.size)) == 0) 596 ret = __db_poff(dbc, &hdr, &bo.pgno); 597 598 if (hdr.data != NULL) 599 __os_free(dbp->env, hdr.data); 600 if (ret != 0) 601 return (ret); 602 603 bi.len = BOVERFLOW_SIZE; 604 B_TSET(bi.type, B_OVERFLOW); 605 bi.pgno = rp->pgno; 606 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 607 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 608 break; 609 case B_DUPLICATE: 610 default: 611 goto pgfmt; 612 } 613 break; 614 default: 615pgfmt: return (__db_pgfmt(dbp->env, rp->pgno)); 616 } 617 /* 618 * If the root page was a leaf page, change it into an internal page. 619 * We copy the key we split on (but not the key's data, in the case of 620 * a leaf page) to the new root page. 621 */ 622 root_pgno = cp->root; 623 P_INIT(rootp, dbp->pgsize, 624 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE); 625 626 /* 627 * The btree comparison code guarantees that the left-most key on any 628 * internal btree page is never used, so it doesn't need to be filled 629 * in. Set the record count if necessary. 630 */ 631 memset(&bi0, 0, sizeof(bi0)); 632 B_TSET(bi0.type, B_KEYDATA); 633 bi0.pgno = lp->pgno; 634 if (F_ISSET(cp, C_RECNUM)) { 635 bi0.nrecs = __bam_total(dbp, lp); 636 RE_NREC_SET(rootp, bi0.nrecs); 637 bi.nrecs = __bam_total(dbp, rp); 638 RE_NREC_ADJ(rootp, bi.nrecs); 639 } 640 DB_SET_DBT(hdr0, &bi0, SSZA(BINTERNAL, data)); 641 if ((ret = __db_pitem(dbc, 642 rootp, 0, BINTERNAL_SIZE(0), &hdr0, NULL)) != 0) 643 goto err; 644 ret = __db_pitem(dbc, rootp, 1, BINTERNAL_SIZE(data.size), &hdr, &data); 645 646err: if (data.data != NULL && child_bo == NULL) 647 __os_free(dbp->env, data.data); 648 return (ret); 649} 650 651/* 652 * __ram_root -- 653 * Fix up the recno root page after it has been split. 654 */ 655static int 656__ram_root(dbc, rootp, lp, rp) 657 DBC *dbc; 658 PAGE *rootp, *lp, *rp; 659{ 660 DB *dbp; 661 DBT hdr; 662 RINTERNAL ri; 663 db_pgno_t root_pgno; 664 int ret; 665 666 dbp = dbc->dbp; 667 root_pgno = dbc->internal->root; 668 669 /* Initialize the page. */ 670 P_INIT(rootp, dbp->pgsize, 671 root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO); 672 673 /* Initialize the header. */ 674 DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE); 675 676 /* Insert the left and right keys, set the header information. */ 677 ri.pgno = lp->pgno; 678 ri.nrecs = __bam_total(dbp, lp); 679 if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0) 680 return (ret); 681 RE_NREC_SET(rootp, ri.nrecs); 682 ri.pgno = rp->pgno; 683 ri.nrecs = __bam_total(dbp, rp); 684 if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0) 685 return (ret); 686 RE_NREC_ADJ(rootp, ri.nrecs); 687 return (0); 688} 689 690/* 691 * __bam_pinsert -- 692 * Insert a new key into a parent page, completing the split. 693 * 694 * PUBLIC: int __bam_pinsert 695 * PUBLIC: __P((DBC *, EPG *, u_int32_t, PAGE *, PAGE *, int)); 696 */ 697int 698__bam_pinsert(dbc, parent, split, lchild, rchild, flags) 699 DBC *dbc; 700 EPG *parent; 701 u_int32_t split; 702 PAGE *lchild, *rchild; 703 int flags; 704{ 705 BINTERNAL bi, *child_bi; 706 BKEYDATA *child_bk, *tmp_bk; 707 BOVERFLOW bo, *child_bo; 708 BTREE *t; 709 BTREE_CURSOR *cp; 710 DB *dbp; 711 DBT a, b, hdr, data; 712 EPG *child; 713 PAGE *ppage; 714 RINTERNAL ri; 715 db_indx_t off; 716 db_recno_t nrecs; 717 size_t (*func) __P((DB *, const DBT *, const DBT *)); 718 u_int32_t n, nbytes, nksize; 719 int ret; 720 721 dbp = dbc->dbp; 722 cp = (BTREE_CURSOR *)dbc->internal; 723 t = dbp->bt_internal; 724 ppage = parent->page; 725 child = parent + 1; 726 727 /* If handling record numbers, count records split to the right page. */ 728 nrecs = F_ISSET(cp, C_RECNUM) && 729 !LF_ISSET(BPI_SPACEONLY) ? __bam_total(dbp, rchild) : 0; 730 731 /* 732 * Now we insert the new page's first key into the parent page, which 733 * completes the split. The parent points to a PAGE and a page index 734 * offset, where the new key goes ONE AFTER the index, because we split 735 * to the right. 736 * 737 * XXX 738 * Some btree algorithms replace the key for the old page as well as 739 * the new page. We don't, as there's no reason to believe that the 740 * first key on the old page is any better than the key we have, and, 741 * in the case of a key being placed at index 0 causing the split, the 742 * key is unavailable. 743 */ 744 off = parent->indx + O_INDX; 745 746 /* 747 * Calculate the space needed on the parent page. 748 * 749 * Prefix trees: space hack used when inserting into BINTERNAL pages. 750 * Retain only what's needed to distinguish between the new entry and 751 * the LAST entry on the page to its left. If the keys compare equal, 752 * retain the entire key. We ignore overflow keys, and the entire key 753 * must be retained for the next-to-leftmost key on the leftmost page 754 * of each level, or the search will fail. Applicable ONLY to internal 755 * pages that have leaf pages as children. Further reduction of the 756 * key between pairs of internal pages loses too much information. 757 */ 758 switch (TYPE(child->page)) { 759 case P_IBTREE: 760 child_bi = GET_BINTERNAL(dbp, child->page, split); 761 nbytes = BINTERNAL_PSIZE(child_bi->len); 762 763 if (P_FREESPACE(dbp, ppage) < nbytes) 764 return (DB_NEEDSPLIT); 765 if (LF_ISSET(BPI_SPACEONLY)) 766 return (0); 767 768 switch (B_TYPE(child_bi->type)) { 769 case B_KEYDATA: 770 /* Add a new record for the right page. */ 771 memset(&bi, 0, sizeof(bi)); 772 bi.len = child_bi->len; 773 B_TSET(bi.type, B_KEYDATA); 774 bi.pgno = rchild->pgno; 775 bi.nrecs = nrecs; 776 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 777 DB_SET_DBT(data, child_bi->data, child_bi->len); 778 if ((ret = __db_pitem(dbc, ppage, off, 779 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0) 780 return (ret); 781 break; 782 case B_OVERFLOW: 783 /* Reuse the overflow key. */ 784 child_bo = (BOVERFLOW *)child_bi->data; 785 memset(&bo, 0, sizeof(bo)); 786 bo.type = B_OVERFLOW; 787 bo.tlen = child_bo->tlen; 788 bo.pgno = child_bo->pgno; 789 bi.len = BOVERFLOW_SIZE; 790 B_TSET(bi.type, B_OVERFLOW); 791 bi.pgno = rchild->pgno; 792 bi.nrecs = nrecs; 793 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 794 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 795 if ((ret = __db_pitem(dbc, ppage, off, 796 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0) 797 return (ret); 798 break; 799 case B_DUPLICATE: 800 default: 801 goto pgfmt; 802 } 803 break; 804 case P_LDUP: 805 case P_LBTREE: 806 child_bk = GET_BKEYDATA(dbp, child->page, split); 807 switch (B_TYPE(child_bk->type)) { 808 case B_KEYDATA: 809 nbytes = BINTERNAL_PSIZE(child_bk->len); 810 nksize = child_bk->len; 811 812 /* 813 * Prefix compression: 814 * We set t->bt_prefix to NULL if we have a comparison 815 * callback but no prefix compression callback. But, 816 * if we're splitting in an off-page duplicates tree, 817 * we still have to do some checking. If using the 818 * default off-page duplicates comparison routine we 819 * can use the default prefix compression callback. If 820 * not using the default off-page duplicates comparison 821 * routine, we can't do any kind of prefix compression 822 * as there's no way for an application to specify a 823 * prefix compression callback that corresponds to its 824 * comparison callback. 825 * 826 * No prefix compression if we don't have a compression 827 * function, or the key we'd compress isn't a normal 828 * key (for example, it references an overflow page). 829 * 830 * Generate a parent page key for the right child page 831 * from a comparison of the last key on the left child 832 * page and the first key on the right child page. 833 */ 834 if (F_ISSET(dbc, DBC_OPD)) { 835 if (dbp->dup_compare == __bam_defcmp) 836 func = __bam_defpfx; 837 else 838 func = NULL; 839 } else 840 func = t->bt_prefix; 841 if (func == NULL) 842 goto noprefix; 843 tmp_bk = GET_BKEYDATA(dbp, lchild, NUM_ENT(lchild) - 844 (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX)); 845 if (B_TYPE(tmp_bk->type) != B_KEYDATA) 846 goto noprefix; 847 DB_SET_DBT(a, tmp_bk->data, tmp_bk->len); 848 DB_SET_DBT(b, child_bk->data, child_bk->len); 849 nksize = (u_int32_t)func(dbp, &a, &b); 850 if ((n = BINTERNAL_PSIZE(nksize)) < nbytes) 851 nbytes = n; 852 else 853 nksize = child_bk->len; 854 855noprefix: if (P_FREESPACE(dbp, ppage) < nbytes) 856 return (DB_NEEDSPLIT); 857 if (LF_ISSET(BPI_SPACEONLY)) 858 return (0); 859 860 memset(&bi, 0, sizeof(bi)); 861 bi.len = nksize; 862 B_TSET(bi.type, B_KEYDATA); 863 bi.pgno = rchild->pgno; 864 bi.nrecs = nrecs; 865 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 866 DB_SET_DBT(data, child_bk->data, nksize); 867 if ((ret = __db_pitem(dbc, ppage, off, 868 BINTERNAL_SIZE(nksize), &hdr, &data)) != 0) 869 return (ret); 870 break; 871 case B_OVERFLOW: 872 nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE); 873 874 if (P_FREESPACE(dbp, ppage) < nbytes) 875 return (DB_NEEDSPLIT); 876 if (LF_ISSET(BPI_SPACEONLY)) 877 return (0); 878 879 /* Copy the overflow key. */ 880 child_bo = (BOVERFLOW *)child_bk; 881 memset(&bo, 0, sizeof(bo)); 882 bo.type = B_OVERFLOW; 883 bo.tlen = child_bo->tlen; 884 memset(&hdr, 0, sizeof(hdr)); 885 if ((ret = __db_goff(dbp, dbc->thread_info, 886 dbc->txn, &hdr, child_bo->tlen, 887 child_bo->pgno, &hdr.data, &hdr.size)) == 0) 888 ret = __db_poff(dbc, &hdr, &bo.pgno); 889 890 if (hdr.data != NULL) 891 __os_free(dbp->env, hdr.data); 892 if (ret != 0) 893 return (ret); 894 895 memset(&bi, 0, sizeof(bi)); 896 bi.len = BOVERFLOW_SIZE; 897 B_TSET(bi.type, B_OVERFLOW); 898 bi.pgno = rchild->pgno; 899 bi.nrecs = nrecs; 900 DB_SET_DBT(hdr, &bi, SSZA(BINTERNAL, data)); 901 DB_SET_DBT(data, &bo, BOVERFLOW_SIZE); 902 if ((ret = __db_pitem(dbc, ppage, off, 903 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0) 904 return (ret); 905 906 break; 907 case B_DUPLICATE: 908 default: 909 goto pgfmt; 910 } 911 break; 912 case P_IRECNO: 913 case P_LRECNO: 914 nbytes = RINTERNAL_PSIZE; 915 916 if (P_FREESPACE(dbp, ppage) < nbytes) 917 return (DB_NEEDSPLIT); 918 if (LF_ISSET(BPI_SPACEONLY)) 919 return (0); 920 921 /* Add a new record for the right page. */ 922 DB_SET_DBT(hdr, &ri, RINTERNAL_SIZE); 923 ri.pgno = rchild->pgno; 924 ri.nrecs = nrecs; 925 if ((ret = __db_pitem(dbc, 926 ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0) 927 return (ret); 928 break; 929 default: 930pgfmt: return (__db_pgfmt(dbp->env, PGNO(child->page))); 931 } 932 933 /* 934 * If a Recno or Btree with record numbers AM page, or an off-page 935 * duplicates tree, adjust the parent page's left page record count. 936 */ 937 if (F_ISSET(cp, C_RECNUM) && !LF_ISSET(BPI_NORECNUM)) { 938 /* Log the change. */ 939 if (DBC_LOGGING(dbc)) { 940 if ((ret = __bam_cadjust_log(dbp, dbc->txn, 941 &LSN(ppage), 0, PGNO(ppage), &LSN(ppage), 942 parent->indx, -(int32_t)nrecs, 0)) != 0) 943 return (ret); 944 } else 945 LSN_NOT_LOGGED(LSN(ppage)); 946 947 /* Update the left page count. */ 948 if (dbc->dbtype == DB_RECNO) 949 GET_RINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs; 950 else 951 GET_BINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs; 952 } 953 954 return (0); 955} 956 957/* 958 * __bam_psplit -- 959 * Do the real work of splitting the page. 960 */ 961static int 962__bam_psplit(dbc, cp, lp, rp, splitret) 963 DBC *dbc; 964 EPG *cp; 965 PAGE *lp, *rp; 966 db_indx_t *splitret; 967{ 968 DB *dbp; 969 PAGE *pp; 970 db_indx_t half, *inp, nbytes, off, splitp, top; 971 int adjust, cnt, iflag, isbigkey, ret; 972 973 dbp = dbc->dbp; 974 pp = cp->page; 975 inp = P_INP(dbp, pp); 976 adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX; 977 978 /* 979 * If we're splitting the first (last) page on a level because we're 980 * inserting (appending) a key to it, it's likely that the data is 981 * sorted. Moving a single item to the new page is less work and can 982 * push the fill factor higher than normal. This is trivial when we 983 * are splitting a new page before the beginning of the tree, all of 984 * the interesting tests are against values of 0. 985 * 986 * Catching appends to the tree is harder. In a simple append, we're 987 * inserting an item that sorts past the end of the tree; the cursor 988 * will point past the last element on the page. But, in trees with 989 * duplicates, the cursor may point to the last entry on the page -- 990 * in this case, the entry will also be the last element of a duplicate 991 * set (the last because the search call specified the SR_DUPLAST flag). 992 * The only way to differentiate between an insert immediately before 993 * the last item in a tree or an append after a duplicate set which is 994 * also the last item in the tree is to call the comparison function. 995 * When splitting internal pages during an append, the search code 996 * guarantees the cursor always points to the largest page item less 997 * than the new internal entry. To summarize, we want to catch three 998 * possible index values: 999 * 1000 * NUM_ENT(page) Btree/Recno leaf insert past end-of-tree 1001 * NUM_ENT(page) - O_INDX Btree or Recno internal insert past EOT 1002 * NUM_ENT(page) - P_INDX Btree leaf insert past EOT after a set 1003 * of duplicates 1004 * 1005 * two of which, (NUM_ENT(page) - O_INDX or P_INDX) might be an insert 1006 * near the end of the tree, and not after the end of the tree at all. 1007 * Do a simple test which might be wrong because calling the comparison 1008 * functions is expensive. Regardless, it's not a big deal if we're 1009 * wrong, we'll do the split the right way next time. 1010 */ 1011 off = 0; 1012 if (NEXT_PGNO(pp) == PGNO_INVALID && cp->indx >= NUM_ENT(pp) - adjust) 1013 off = NUM_ENT(pp) - adjust; 1014 else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0) 1015 off = adjust; 1016 if (off != 0) 1017 goto sort; 1018 1019 /* 1020 * Split the data to the left and right pages. Try not to split on 1021 * an overflow key. (Overflow keys on internal pages will slow down 1022 * searches.) Refuse to split in the middle of a set of duplicates. 1023 * 1024 * First, find the optimum place to split. 1025 * 1026 * It's possible to try and split past the last record on the page if 1027 * there's a very large record at the end of the page. Make sure this 1028 * doesn't happen by bounding the check at the next-to-last entry on 1029 * the page. 1030 * 1031 * Note, we try and split half the data present on the page. This is 1032 * because another process may have already split the page and left 1033 * it half empty. We don't try and skip the split -- we don't know 1034 * how much space we're going to need on the page, and we may need up 1035 * to half the page for a big item, so there's no easy test to decide 1036 * if we need to split or not. Besides, if two threads are inserting 1037 * data into the same place in the database, we're probably going to 1038 * need more space soon anyway. 1039 */ 1040 top = NUM_ENT(pp) - adjust; 1041 half = (dbp->pgsize - HOFFSET(pp)) / 2; 1042 for (nbytes = 0, off = 0; off < top && nbytes < half; ++off) 1043 switch (TYPE(pp)) { 1044 case P_IBTREE: 1045 if (B_TYPE( 1046 GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA) 1047 nbytes += BINTERNAL_SIZE( 1048 GET_BINTERNAL(dbp, pp, off)->len); 1049 else 1050 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE); 1051 break; 1052 case P_LBTREE: 1053 if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1054 B_KEYDATA) 1055 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp, 1056 pp, off)->len); 1057 else 1058 nbytes += BOVERFLOW_SIZE; 1059 1060 ++off; 1061 /* FALLTHROUGH */ 1062 case P_LDUP: 1063 case P_LRECNO: 1064 if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1065 B_KEYDATA) 1066 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp, 1067 pp, off)->len); 1068 else 1069 nbytes += BOVERFLOW_SIZE; 1070 break; 1071 case P_IRECNO: 1072 nbytes += RINTERNAL_SIZE; 1073 break; 1074 default: 1075 return (__db_pgfmt(dbp->env, pp->pgno)); 1076 } 1077sort: splitp = off; 1078 1079 /* 1080 * Splitp is either at or just past the optimum split point. If the 1081 * tree type is such that we're going to promote a key to an internal 1082 * page, and our current choice is an overflow key, look for something 1083 * close by that's smaller. 1084 */ 1085 switch (TYPE(pp)) { 1086 case P_IBTREE: 1087 iflag = 1; 1088 isbigkey = 1089 B_TYPE(GET_BINTERNAL(dbp, pp, off)->type) != B_KEYDATA; 1090 break; 1091 case P_LBTREE: 1092 case P_LDUP: 1093 iflag = 0; 1094 isbigkey = B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) != 1095 B_KEYDATA; 1096 break; 1097 default: 1098 iflag = isbigkey = 0; 1099 } 1100 if (isbigkey) 1101 for (cnt = 1; cnt <= 3; ++cnt) { 1102 off = splitp + cnt * adjust; 1103 if (off < (db_indx_t)NUM_ENT(pp) && 1104 ((iflag && B_TYPE( 1105 GET_BINTERNAL(dbp, pp,off)->type) == B_KEYDATA) || 1106 B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1107 B_KEYDATA)) { 1108 splitp = off; 1109 break; 1110 } 1111 if (splitp <= (db_indx_t)(cnt * adjust)) 1112 continue; 1113 off = splitp - cnt * adjust; 1114 if (iflag ? B_TYPE( 1115 GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA : 1116 B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) == 1117 B_KEYDATA) { 1118 splitp = off; 1119 break; 1120 } 1121 } 1122 1123 /* 1124 * We can't split in the middle a set of duplicates. We know that 1125 * no duplicate set can take up more than about 25% of the page, 1126 * because that's the point where we push it off onto a duplicate 1127 * page set. So, this loop can't be unbounded. 1128 */ 1129 if (TYPE(pp) == P_LBTREE && 1130 inp[splitp] == inp[splitp - adjust]) 1131 for (cnt = 1;; ++cnt) { 1132 off = splitp + cnt * adjust; 1133 if (off < NUM_ENT(pp) && 1134 inp[splitp] != inp[off]) { 1135 splitp = off; 1136 break; 1137 } 1138 if (splitp <= (db_indx_t)(cnt * adjust)) 1139 continue; 1140 off = splitp - cnt * adjust; 1141 if (inp[splitp] != inp[off]) { 1142 splitp = off + adjust; 1143 break; 1144 } 1145 } 1146 1147 /* We're going to split at splitp. */ 1148 if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0) 1149 return (ret); 1150 if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0) 1151 return (ret); 1152 1153 *splitret = splitp; 1154 return (0); 1155} 1156 1157/* 1158 * __bam_copy -- 1159 * Copy a set of records from one page to another. 1160 * 1161 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t)); 1162 */ 1163int 1164__bam_copy(dbp, pp, cp, nxt, stop) 1165 DB *dbp; 1166 PAGE *pp, *cp; 1167 u_int32_t nxt, stop; 1168{ 1169 BINTERNAL internal; 1170 db_indx_t *cinp, nbytes, off, *pinp; 1171 1172 cinp = P_INP(dbp, cp); 1173 pinp = P_INP(dbp, pp); 1174 /* 1175 * Nxt is the offset of the next record to be placed on the target page. 1176 */ 1177 for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) { 1178 switch (TYPE(pp)) { 1179 case P_IBTREE: 1180 if (off == 0 && nxt != 0) 1181 nbytes = BINTERNAL_SIZE(0); 1182 else if (B_TYPE( 1183 GET_BINTERNAL(dbp, pp, nxt)->type) == B_KEYDATA) 1184 nbytes = BINTERNAL_SIZE( 1185 GET_BINTERNAL(dbp, pp, nxt)->len); 1186 else 1187 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE); 1188 break; 1189 case P_LBTREE: 1190 /* 1191 * If we're on a key and it's a duplicate, just copy 1192 * the offset. 1193 */ 1194 if (off != 0 && (nxt % P_INDX) == 0 && 1195 pinp[nxt] == pinp[nxt - P_INDX]) { 1196 cinp[off] = cinp[off - P_INDX]; 1197 continue; 1198 } 1199 /* FALLTHROUGH */ 1200 case P_LDUP: 1201 case P_LRECNO: 1202 if (B_TYPE(GET_BKEYDATA(dbp, pp, nxt)->type) == 1203 B_KEYDATA) 1204 nbytes = BKEYDATA_SIZE(GET_BKEYDATA(dbp, 1205 pp, nxt)->len); 1206 else 1207 nbytes = BOVERFLOW_SIZE; 1208 break; 1209 case P_IRECNO: 1210 nbytes = RINTERNAL_SIZE; 1211 break; 1212 default: 1213 return (__db_pgfmt(dbp->env, pp->pgno)); 1214 } 1215 cinp[off] = HOFFSET(cp) -= nbytes; 1216 if (off == 0 && nxt != 0 && TYPE(pp) == P_IBTREE) { 1217 internal.len = 0; 1218 internal.type = B_KEYDATA; 1219 internal.pgno = GET_BINTERNAL(dbp, pp, nxt)->pgno; 1220 internal.nrecs = GET_BINTERNAL(dbp, pp, nxt)->nrecs; 1221 memcpy(P_ENTRY(dbp, cp, off), &internal, nbytes); 1222 } 1223 else 1224 memcpy(P_ENTRY(dbp, cp, off), 1225 P_ENTRY(dbp, pp, nxt), nbytes); 1226 } 1227 return (0); 1228} 1229