1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996,2008 Oracle. All rights reserved. 5 */ 6/* 7 * Copyright (c) 1990, 1993, 1994, 1995, 1996 8 * Keith Bostic. All rights reserved. 9 */ 10/* 11 * Copyright (c) 1990, 1993, 1994, 1995 12 * The Regents of the University of California. All rights reserved. 13 * 14 * This code is derived from software contributed to Berkeley by 15 * Mike Olson. 16 * 17 * Redistribution and use in source and binary forms, with or without 18 * modification, are permitted provided that the following conditions 19 * are met: 20 * 1. Redistributions of source code must retain the above copyright 21 * notice, this list of conditions and the following disclaimer. 22 * 2. Redistributions in binary form must reproduce the above copyright 23 * notice, this list of conditions and the following disclaimer in the 24 * documentation and/or other materials provided with the distribution. 25 * 3. Neither the name of the University nor the names of its contributors 26 * may be used to endorse or promote products derived from this software 27 * without specific prior written permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 32 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 39 * SUCH DAMAGE. 40 * 41 * $Id: bt_put.c,v 12.29 2008/01/08 20:57:59 bostic Exp $ 42 */ 43 44#include "db_config.h" 45 46#include "db_int.h" 47#include "dbinc/db_page.h" 48#include "dbinc/btree.h" 49#include "dbinc/mp.h" 50 51static int __bam_build 52 __P((DBC *, u_int32_t, DBT *, PAGE *, u_int32_t, u_int32_t)); 53static int __bam_dup_check __P((DBC *, u_int32_t, 54 PAGE *, u_int32_t, u_int32_t, db_indx_t *)); 55static int __bam_dup_convert __P((DBC *, PAGE *, u_int32_t, u_int32_t)); 56static int __bam_ovput 57 __P((DBC *, u_int32_t, db_pgno_t, PAGE *, u_int32_t, DBT *)); 58static u_int32_t 59 __bam_partsize __P((DB *, u_int32_t, DBT *, PAGE *, u_int32_t)); 60 61/* 62 * __bam_iitem -- 63 * Insert an item into the tree. 64 * 65 * PUBLIC: int __bam_iitem __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t)); 66 */ 67int 68__bam_iitem(dbc, key, data, op, flags) 69 DBC *dbc; 70 DBT *key, *data; 71 u_int32_t op, flags; 72{ 73 BKEYDATA *bk, bk_tmp; 74 BTREE *t; 75 BTREE_CURSOR *cp; 76 DB *dbp; 77 DBT bk_hdr, tdbt; 78 DB_MPOOLFILE *mpf; 79 ENV *env; 80 PAGE *h; 81 db_indx_t cnt, indx; 82 u_int32_t data_size, have_bytes, need_bytes, needed, pages, pagespace; 83 int cmp, bigkey, bigdata, del, dupadjust; 84 int padrec, replace, ret, t_ret, was_deleted; 85 86 COMPQUIET(cnt, 0); 87 88 dbp = dbc->dbp; 89 env = dbp->env; 90 mpf = dbp->mpf; 91 cp = (BTREE_CURSOR *)dbc->internal; 92 t = dbp->bt_internal; 93 h = cp->page; 94 indx = cp->indx; 95 del = dupadjust = replace = was_deleted = 0; 96 97 /* 98 * Fixed-length records with partial puts: it's an error to specify 99 * anything other simple overwrite. 100 */ 101 if (F_ISSET(dbp, DB_AM_FIXEDLEN) && 102 F_ISSET(data, DB_DBT_PARTIAL) && data->size != data->dlen) 103 return (__db_rec_repl(env, data->size, data->dlen)); 104 105 /* 106 * Figure out how much space the data will take, including if it's a 107 * partial record. 108 * 109 * Fixed-length records: it's an error to specify a record that's 110 * longer than the fixed-length, and we never require less than 111 * the fixed-length record size. 112 */ 113 data_size = F_ISSET(data, DB_DBT_PARTIAL) ? 114 __bam_partsize(dbp, op, data, h, indx) : data->size; 115 padrec = 0; 116 if (F_ISSET(dbp, DB_AM_FIXEDLEN)) { 117 if (data_size > t->re_len) 118 return (__db_rec_toobig(env, data_size, t->re_len)); 119 120 /* Records that are deleted anyway needn't be padded out. */ 121 if (!LF_ISSET(BI_DELETED) && data_size < t->re_len) { 122 padrec = 1; 123 data_size = t->re_len; 124 } 125 } 126 127 /* 128 * Handle partial puts or short fixed-length records: build the 129 * real record. 130 */ 131 if (padrec || F_ISSET(data, DB_DBT_PARTIAL)) { 132 tdbt = *data; 133 if ((ret = 134 __bam_build(dbc, op, &tdbt, h, indx, data_size)) != 0) 135 return (ret); 136 data = &tdbt; 137 } 138 139 /* 140 * If the user has specified a duplicate comparison function, return 141 * an error if DB_CURRENT was specified and the replacement data 142 * doesn't compare equal to the current data. This stops apps from 143 * screwing up the duplicate sort order. We have to do this after 144 * we build the real record so that we're comparing the real items. 145 */ 146 if (op == DB_CURRENT && dbp->dup_compare != NULL) { 147 if ((ret = __bam_cmp(dbp, dbc->thread_info, dbc->txn, data, h, 148 indx + (TYPE(h) == P_LBTREE ? O_INDX : 0), 149 dbp->dup_compare, &cmp)) != 0) 150 return (ret); 151 if (cmp != 0) { 152 __db_errx(env, 153 "Existing data sorts differently from put data"); 154 return (EINVAL); 155 } 156 } 157 158 /* 159 * If the key or data item won't fit on a page, we'll have to store 160 * them on overflow pages. 161 */ 162 needed = 0; 163 bigdata = data_size > cp->ovflsize; 164 switch (op) { 165 case DB_KEYFIRST: 166 /* We're adding a new key and data pair. */ 167 bigkey = key->size > cp->ovflsize; 168 if (bigkey) 169 needed += BOVERFLOW_PSIZE; 170 else 171 needed += BKEYDATA_PSIZE(key->size); 172 if (bigdata) 173 needed += BOVERFLOW_PSIZE; 174 else 175 needed += BKEYDATA_PSIZE(data_size); 176 break; 177 case DB_AFTER: 178 case DB_BEFORE: 179 case DB_CURRENT: 180 /* 181 * We're either overwriting the data item of a key/data pair 182 * or we're creating a new on-page duplicate and only adding 183 * a data item. 184 * 185 * !!! 186 * We're not currently correcting for space reclaimed from 187 * already deleted items, but I don't think it's worth the 188 * complexity. 189 */ 190 bigkey = 0; 191 if (op == DB_CURRENT) { 192 bk = GET_BKEYDATA(dbp, h, 193 indx + (TYPE(h) == P_LBTREE ? O_INDX : 0)); 194 if (B_TYPE(bk->type) == B_KEYDATA) 195 have_bytes = BKEYDATA_PSIZE(bk->len); 196 else 197 have_bytes = BOVERFLOW_PSIZE; 198 need_bytes = 0; 199 } else { 200 have_bytes = 0; 201 need_bytes = sizeof(db_indx_t); 202 } 203 if (bigdata) 204 need_bytes += BOVERFLOW_PSIZE; 205 else 206 need_bytes += BKEYDATA_PSIZE(data_size); 207 208 if (have_bytes < need_bytes) 209 needed += need_bytes - have_bytes; 210 break; 211 default: 212 return (__db_unknown_flag(env, "DB->put", op)); 213 } 214 215 /* Split the page if there's not enough room. */ 216 if (P_FREESPACE(dbp, h) < needed) 217 return (DB_NEEDSPLIT); 218 219 /* 220 * Check to see if we will convert to off page duplicates -- if 221 * so, we'll need a page. 222 */ 223 if (F_ISSET(dbp, DB_AM_DUP) && 224 TYPE(h) == P_LBTREE && op != DB_KEYFIRST && 225 P_FREESPACE(dbp, h) - needed <= dbp->pgsize / 2 && 226 __bam_dup_check(dbc, op, h, indx, needed, &cnt)) { 227 pages = 1; 228 dupadjust = 1; 229 } else 230 pages = 0; 231 232 /* 233 * If we are not using transactions and there is a page limit 234 * set on the file, then figure out if things will fit before 235 * taking action. 236 */ 237 if (dbc->txn == NULL && mpf->mfp->maxpgno != 0) { 238 pagespace = P_MAXSPACE(dbp, dbp->pgsize); 239 if (bigdata) 240 pages += ((data_size - 1) / pagespace) + 1; 241 if (bigkey) 242 pages += ((key->size - 1) / pagespace) + 1; 243 244 if (pages > (mpf->mfp->maxpgno - mpf->mfp->last_pgno)) 245 return (__db_space_err(dbp)); 246 } 247 248 if ((ret = __memp_dirty(mpf, &h, 249 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 250 return (ret); 251 if (cp->csp->page == cp->page) 252 cp->csp->page = h; 253 cp->page = h; 254 255 /* 256 * The code breaks it up into five cases: 257 * 258 * 1. Insert a new key/data pair. 259 * 2. Append a new data item (a new duplicate). 260 * 3. Insert a new data item (a new duplicate). 261 * 4. Delete and re-add the data item (overflow item). 262 * 5. Overwrite the data item. 263 */ 264 switch (op) { 265 case DB_KEYFIRST: /* 1. Insert a new key/data pair. */ 266 if (bigkey) { 267 if ((ret = __bam_ovput(dbc, 268 B_OVERFLOW, PGNO_INVALID, h, indx, key)) != 0) 269 return (ret); 270 } else 271 if ((ret = __db_pitem(dbc, h, indx, 272 BKEYDATA_SIZE(key->size), NULL, key)) != 0) 273 return (ret); 274 275 if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0) 276 return (ret); 277 ++indx; 278 break; 279 case DB_AFTER: /* 2. Append a new data item. */ 280 if (TYPE(h) == P_LBTREE) { 281 /* Copy the key for the duplicate and adjust cursors. */ 282 if ((ret = 283 __bam_adjindx(dbc, h, indx + P_INDX, indx, 1)) != 0) 284 return (ret); 285 if ((ret = 286 __bam_ca_di(dbc, PGNO(h), indx + P_INDX, 1)) != 0) 287 return (ret); 288 289 indx += 3; 290 291 cp->indx += 2; 292 } else { 293 ++indx; 294 cp->indx += 1; 295 } 296 break; 297 case DB_BEFORE: /* 3. Insert a new data item. */ 298 if (TYPE(h) == P_LBTREE) { 299 /* Copy the key for the duplicate and adjust cursors. */ 300 if ((ret = __bam_adjindx(dbc, h, indx, indx, 1)) != 0) 301 return (ret); 302 if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0) 303 return (ret); 304 305 ++indx; 306 } 307 break; 308 case DB_CURRENT: 309 /* 310 * Clear the cursor's deleted flag. The problem is that if 311 * we deadlock or fail while deleting the overflow item or 312 * replacing the non-overflow item, a subsequent cursor close 313 * will try and remove the item because the cursor's delete 314 * flag is set. 315 */ 316 if ((ret = __bam_ca_delete(dbp, PGNO(h), indx, 0, NULL)) != 0) 317 return (ret); 318 319 if (TYPE(h) == P_LBTREE) 320 ++indx; 321 bk = GET_BKEYDATA(dbp, h, indx); 322 323 /* 324 * In a Btree deleted records aren't counted (deleted records 325 * are counted in a Recno because all accesses are based on 326 * record number). If it's a Btree and it's a DB_CURRENT 327 * operation overwriting a previously deleted record, increment 328 * the record count. 329 */ 330 if (TYPE(h) == P_LBTREE || TYPE(h) == P_LDUP) 331 was_deleted = B_DISSET(bk->type); 332 333 /* 334 * 4. Delete and re-add the data item. 335 * 336 * If we're changing the type of the on-page structure, or we 337 * are referencing offpage items, we have to delete and then 338 * re-add the item. We do not do any cursor adjustments here 339 * because we're going to immediately re-add the item into the 340 * same slot. 341 */ 342 if (bigdata || B_TYPE(bk->type) != B_KEYDATA) { 343 if ((ret = __bam_ditem(dbc, h, indx)) != 0) 344 return (ret); 345 del = 1; 346 break; 347 } 348 349 /* 5. Overwrite the data item. */ 350 replace = 1; 351 break; 352 default: 353 return (__db_unknown_flag(env, "DB->put", op)); 354 } 355 356 /* Add the data. */ 357 if (bigdata) { 358 /* 359 * We do not have to handle deleted (BI_DELETED) records 360 * in this case; the actual records should never be created. 361 */ 362 DB_ASSERT(env, !LF_ISSET(BI_DELETED)); 363 ret = __bam_ovput(dbc, 364 B_OVERFLOW, PGNO_INVALID, h, indx, data); 365 } else { 366 if (LF_ISSET(BI_DELETED)) { 367 B_TSET_DELETED(bk_tmp.type, B_KEYDATA); 368 bk_tmp.len = data->size; 369 bk_hdr.data = &bk_tmp; 370 bk_hdr.size = SSZA(BKEYDATA, data); 371 ret = __db_pitem(dbc, h, indx, 372 BKEYDATA_SIZE(data->size), &bk_hdr, data); 373 } else if (replace) 374 ret = __bam_ritem(dbc, h, indx, data); 375 else 376 ret = __db_pitem(dbc, h, indx, 377 BKEYDATA_SIZE(data->size), NULL, data); 378 } 379 if (ret != 0) { 380 if (del == 1 && (t_ret = 381 __bam_ca_di(dbc, PGNO(h), indx + 1, -1)) != 0) { 382 __db_err(env, t_ret, 383 "cursor adjustment after delete failed"); 384 return (__env_panic(env, t_ret)); 385 } 386 return (ret); 387 } 388 389 /* 390 * Re-position the cursors if necessary and reset the current cursor 391 * to point to the new item. 392 */ 393 if (op != DB_CURRENT) { 394 if ((ret = __bam_ca_di(dbc, PGNO(h), indx, 1)) != 0) 395 return (ret); 396 cp->indx = TYPE(h) == P_LBTREE ? indx - O_INDX : indx; 397 } 398 399 /* 400 * If we've changed the record count, update the tree. There's no 401 * need to adjust the count if the operation not performed on the 402 * current record or when the current record was previously deleted. 403 */ 404 if (F_ISSET(cp, C_RECNUM) && (op != DB_CURRENT || was_deleted)) 405 if ((ret = __bam_adjust(dbc, 1)) != 0) 406 return (ret); 407 408 /* 409 * If a Btree leaf page is at least 50% full and we may have added or 410 * modified a duplicate data item, see if the set of duplicates takes 411 * up at least 25% of the space on the page. If it does, move it onto 412 * its own page. 413 */ 414 if (dupadjust && 415 (ret = __bam_dup_convert(dbc, h, indx - O_INDX, cnt)) != 0) 416 return (ret); 417 418 /* If we've modified a recno file, set the flag. */ 419 if (dbc->dbtype == DB_RECNO) 420 t->re_modified = 1; 421 422 return (ret); 423} 424 425/* 426 * __bam_partsize -- 427 * Figure out how much space a partial data item is in total. 428 */ 429static u_int32_t 430__bam_partsize(dbp, op, data, h, indx) 431 DB *dbp; 432 u_int32_t op, indx; 433 DBT *data; 434 PAGE *h; 435{ 436 BKEYDATA *bk; 437 u_int32_t nbytes; 438 439 /* 440 * If the record doesn't already exist, it's simply the data we're 441 * provided. 442 */ 443 if (op != DB_CURRENT) 444 return (data->doff + data->size); 445 446 /* 447 * Otherwise, it's the data provided plus any already existing data 448 * that we're not replacing. 449 */ 450 bk = GET_BKEYDATA(dbp, h, indx + (TYPE(h) == P_LBTREE ? O_INDX : 0)); 451 nbytes = 452 B_TYPE(bk->type) == B_OVERFLOW ? ((BOVERFLOW *)bk)->tlen : bk->len; 453 454 return (__db_partsize(nbytes, data)); 455} 456 457/* 458 * __bam_build -- 459 * Build the real record for a partial put, or short fixed-length record. 460 */ 461static int 462__bam_build(dbc, op, dbt, h, indx, nbytes) 463 DBC *dbc; 464 u_int32_t op, indx, nbytes; 465 DBT *dbt; 466 PAGE *h; 467{ 468 BKEYDATA *bk, tbk; 469 BOVERFLOW *bo; 470 BTREE *t; 471 DB *dbp; 472 DBT copy, *rdata; 473 u_int32_t len, tlen; 474 u_int8_t *p; 475 int ret; 476 477 COMPQUIET(bo, NULL); 478 479 dbp = dbc->dbp; 480 t = dbp->bt_internal; 481 482 /* We use the record data return memory, it's only a short-term use. */ 483 rdata = &dbc->my_rdata; 484 if (rdata->ulen < nbytes) { 485 if ((ret = __os_realloc(dbp->env, 486 nbytes, &rdata->data)) != 0) { 487 rdata->ulen = 0; 488 rdata->data = NULL; 489 return (ret); 490 } 491 rdata->ulen = nbytes; 492 } 493 494 /* 495 * We use nul or pad bytes for any part of the record that isn't 496 * specified; get it over with. 497 */ 498 memset(rdata->data, 499 F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_pad : 0, nbytes); 500 501 /* 502 * In the next clauses, we need to do three things: a) set p to point 503 * to the place at which to copy the user's data, b) set tlen to the 504 * total length of the record, not including the bytes contributed by 505 * the user, and c) copy any valid data from an existing record. If 506 * it's not a partial put (this code is called for both partial puts 507 * and fixed-length record padding) or it's a new key, we can cut to 508 * the chase. 509 */ 510 if (!F_ISSET(dbt, DB_DBT_PARTIAL) || op != DB_CURRENT) { 511 p = (u_int8_t *)rdata->data + dbt->doff; 512 tlen = dbt->doff; 513 goto user_copy; 514 } 515 516 /* Find the current record. */ 517 if (indx < NUM_ENT(h)) { 518 bk = GET_BKEYDATA(dbp, h, indx + (TYPE(h) == P_LBTREE ? 519 O_INDX : 0)); 520 bo = (BOVERFLOW *)bk; 521 } else { 522 bk = &tbk; 523 B_TSET(bk->type, B_KEYDATA); 524 bk->len = 0; 525 } 526 if (B_TYPE(bk->type) == B_OVERFLOW) { 527 /* 528 * In the case of an overflow record, we shift things around 529 * in the current record rather than allocate a separate copy. 530 */ 531 memset(©, 0, sizeof(copy)); 532 if ((ret = __db_goff(dbp, dbc->thread_info, dbc->txn, ©, 533 bo->tlen, bo->pgno, &rdata->data, &rdata->ulen)) != 0) 534 return (ret); 535 536 /* Skip any leading data from the original record. */ 537 tlen = dbt->doff; 538 p = (u_int8_t *)rdata->data + dbt->doff; 539 540 /* 541 * Copy in any trailing data from the original record. 542 * 543 * If the original record was larger than the original offset 544 * plus the bytes being deleted, there is trailing data in the 545 * original record we need to preserve. If we aren't deleting 546 * the same number of bytes as we're inserting, copy it up or 547 * down, into place. 548 * 549 * Use memmove(), the regions may overlap. 550 */ 551 if (bo->tlen > dbt->doff + dbt->dlen) { 552 len = bo->tlen - (dbt->doff + dbt->dlen); 553 if (dbt->dlen != dbt->size) 554 memmove(p + dbt->size, p + dbt->dlen, len); 555 tlen += len; 556 } 557 } else { 558 /* Copy in any leading data from the original record. */ 559 memcpy(rdata->data, 560 bk->data, dbt->doff > bk->len ? bk->len : dbt->doff); 561 tlen = dbt->doff; 562 p = (u_int8_t *)rdata->data + dbt->doff; 563 564 /* Copy in any trailing data from the original record. */ 565 len = dbt->doff + dbt->dlen; 566 if (bk->len > len) { 567 memcpy(p + dbt->size, bk->data + len, bk->len - len); 568 tlen += bk->len - len; 569 } 570 } 571 572user_copy: 573 /* 574 * Copy in the application provided data -- p and tlen must have been 575 * initialized above. 576 */ 577 memcpy(p, dbt->data, dbt->size); 578 tlen += dbt->size; 579 580 /* Set the DBT to reference our new record. */ 581 rdata->size = F_ISSET(dbp, DB_AM_FIXEDLEN) ? t->re_len : tlen; 582 rdata->dlen = 0; 583 rdata->doff = 0; 584 rdata->flags = 0; 585 *dbt = *rdata; 586 return (0); 587} 588 589/* 590 * __bam_ritem -- 591 * Replace an item on a page. 592 * 593 * PUBLIC: int __bam_ritem __P((DBC *, PAGE *, u_int32_t, DBT *)); 594 */ 595int 596__bam_ritem(dbc, h, indx, data) 597 DBC *dbc; 598 PAGE *h; 599 u_int32_t indx; 600 DBT *data; 601{ 602 BKEYDATA *bk; 603 DB *dbp; 604 DBT orig, repl; 605 db_indx_t cnt, lo, ln, min, off, prefix, suffix; 606 int32_t nbytes; 607 int ret; 608 db_indx_t *inp; 609 u_int8_t *p, *t; 610 611 dbp = dbc->dbp; 612 613 /* 614 * Replace a single item onto a page. The logic figuring out where 615 * to insert and whether it fits is handled in the caller. All we do 616 * here is manage the page shuffling. 617 */ 618 bk = GET_BKEYDATA(dbp, h, indx); 619 620 /* Log the change. */ 621 if (DBC_LOGGING(dbc)) { 622 /* 623 * We might as well check to see if the two data items share 624 * a common prefix and suffix -- it can save us a lot of log 625 * message if they're large. 626 */ 627 min = data->size < bk->len ? data->size : bk->len; 628 for (prefix = 0, 629 p = bk->data, t = data->data; 630 prefix < min && *p == *t; ++prefix, ++p, ++t) 631 ; 632 633 min -= prefix; 634 for (suffix = 0, 635 p = (u_int8_t *)bk->data + bk->len - 1, 636 t = (u_int8_t *)data->data + data->size - 1; 637 suffix < min && *p == *t; ++suffix, --p, --t) 638 ; 639 640 /* We only log the parts of the keys that have changed. */ 641 orig.data = (u_int8_t *)bk->data + prefix; 642 orig.size = bk->len - (prefix + suffix); 643 repl.data = (u_int8_t *)data->data + prefix; 644 repl.size = data->size - (prefix + suffix); 645 if ((ret = __bam_repl_log(dbp, dbc->txn, &LSN(h), 0, PGNO(h), 646 &LSN(h), (u_int32_t)indx, (u_int32_t)B_DISSET(bk->type), 647 &orig, &repl, (u_int32_t)prefix, (u_int32_t)suffix)) != 0) 648 return (ret); 649 } else 650 LSN_NOT_LOGGED(LSN(h)); 651 652 /* 653 * Set references to the first in-use byte on the page and the 654 * first byte of the item being replaced. 655 */ 656 inp = P_INP(dbp, h); 657 p = (u_int8_t *)h + HOFFSET(h); 658 t = (u_int8_t *)bk; 659 660 /* 661 * If the entry is growing in size, shift the beginning of the data 662 * part of the page down. If the entry is shrinking in size, shift 663 * the beginning of the data part of the page up. Use memmove(3), 664 * the regions overlap. 665 */ 666 lo = BKEYDATA_SIZE(bk->len); 667 ln = (db_indx_t)BKEYDATA_SIZE(data->size); 668 if (lo != ln) { 669 nbytes = lo - ln; /* Signed difference. */ 670 if (p == t) /* First index is fast. */ 671 inp[indx] += nbytes; 672 else { /* Else, shift the page. */ 673 memmove(p + nbytes, p, (size_t)(t - p)); 674 675 /* Adjust the indices' offsets. */ 676 off = inp[indx]; 677 for (cnt = 0; cnt < NUM_ENT(h); ++cnt) 678 if (inp[cnt] <= off) 679 inp[cnt] += nbytes; 680 } 681 682 /* Clean up the page and adjust the item's reference. */ 683 HOFFSET(h) += nbytes; 684 t += nbytes; 685 } 686 687 /* Copy the new item onto the page. */ 688 bk = (BKEYDATA *)t; 689 B_TSET(bk->type, B_KEYDATA); 690 bk->len = data->size; 691 memcpy(bk->data, data->data, data->size); 692 693 return (0); 694} 695 696/* 697 * __bam_dup_check -- 698 * Check to see if the duplicate set at indx should have its own page. 699 */ 700static int 701__bam_dup_check(dbc, op, h, indx, sz, cntp) 702 DBC *dbc; 703 u_int32_t op; 704 PAGE *h; 705 u_int32_t indx, sz; 706 db_indx_t *cntp; 707{ 708 BKEYDATA *bk; 709 DB *dbp; 710 db_indx_t cnt, first, *inp; 711 712 dbp = dbc->dbp; 713 inp = P_INP(dbp, h); 714 715 /* 716 * Count the duplicate records and calculate how much room they're 717 * using on the page. 718 */ 719 while (indx > 0 && inp[indx] == inp[indx - P_INDX]) 720 indx -= P_INDX; 721 722 /* Count the key once. */ 723 bk = GET_BKEYDATA(dbp, h, indx); 724 sz += B_TYPE(bk->type) == B_KEYDATA ? 725 BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE; 726 727 /* Sum up all the data items. */ 728 first = indx; 729 730 /* 731 * Account for the record being inserted. If we are replacing it, 732 * don't count it twice. 733 * 734 * We execute the loop with first == indx to get the size of the 735 * first record. 736 */ 737 cnt = op == DB_CURRENT ? 0 : 1; 738 for (first = indx; 739 indx < NUM_ENT(h) && inp[first] == inp[indx]; 740 ++cnt, indx += P_INDX) { 741 bk = GET_BKEYDATA(dbp, h, indx + O_INDX); 742 sz += B_TYPE(bk->type) == B_KEYDATA ? 743 BKEYDATA_PSIZE(bk->len) : BOVERFLOW_PSIZE; 744 } 745 746 /* 747 * We have to do these checks when the user is replacing the cursor's 748 * data item -- if the application replaces a duplicate item with a 749 * larger data item, it can increase the amount of space used by the 750 * duplicates, requiring this check. But that means we may have done 751 * this check when it wasn't a duplicate item after all. 752 */ 753 if (cnt == 1) 754 return (0); 755 756 /* 757 * If this set of duplicates is using more than 25% of the page, move 758 * them off. The choice of 25% is a WAG, but the value must be small 759 * enough that we can always split a page without putting duplicates 760 * on two different pages. 761 */ 762 if (sz < dbp->pgsize / 4) 763 return (0); 764 765 *cntp = cnt; 766 return (1); 767} 768 769/* 770 * __bam_dup_convert -- 771 * Move a set of duplicates off-page and into their own tree. 772 */ 773static int 774__bam_dup_convert(dbc, h, indx, cnt) 775 DBC *dbc; 776 PAGE *h; 777 u_int32_t indx, cnt; 778{ 779 BKEYDATA *bk; 780 DB *dbp; 781 DBT hdr; 782 DB_MPOOLFILE *mpf; 783 PAGE *dp; 784 db_indx_t cpindx, dindx, first, *inp; 785 int ret, t_ret; 786 787 dbp = dbc->dbp; 788 mpf = dbp->mpf; 789 inp = P_INP(dbp, h); 790 791 /* Move to the beginning of the dup set. */ 792 while (indx > 0 && inp[indx] == inp[indx - P_INDX]) 793 indx -= P_INDX; 794 795 /* Get a new page. */ 796 if ((ret = __db_new(dbc, 797 dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, &dp)) != 0) 798 return (ret); 799 P_INIT(dp, dbp->pgsize, dp->pgno, 800 PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp)); 801 802 /* 803 * Move this set of duplicates off the page. First points to the first 804 * key of the first duplicate key/data pair, cnt is the number of pairs 805 * we're dealing with. 806 */ 807 memset(&hdr, 0, sizeof(hdr)); 808 first = indx; 809 dindx = indx; 810 cpindx = 0; 811 do { 812 /* Move cursors referencing the old entry to the new entry. */ 813 if ((ret = __bam_ca_dup(dbc, first, 814 PGNO(h), indx, PGNO(dp), cpindx)) != 0) 815 goto err; 816 817 /* 818 * Copy the entry to the new page. If the off-duplicate page 819 * If the off-duplicate page is a Btree page (i.e. dup_compare 820 * will be non-NULL, we use Btree pages for sorted dups, 821 * and Recno pages for unsorted dups), move all entries 822 * normally, even deleted ones. If it's a Recno page, 823 * deleted entries are discarded (if the deleted entry is 824 * overflow, then free up those pages). 825 */ 826 bk = GET_BKEYDATA(dbp, h, dindx + 1); 827 hdr.data = bk; 828 hdr.size = B_TYPE(bk->type) == B_KEYDATA ? 829 BKEYDATA_SIZE(bk->len) : BOVERFLOW_SIZE; 830 if (dbp->dup_compare == NULL && B_DISSET(bk->type)) { 831 /* 832 * Unsorted dups, i.e. recno page, and we have 833 * a deleted entry, don't move it, but if it was 834 * an overflow entry, we need to free those pages. 835 */ 836 if (B_TYPE(bk->type) == B_OVERFLOW && 837 (ret = __db_doff(dbc, 838 (GET_BOVERFLOW(dbp, h, dindx + 1))->pgno)) != 0) 839 goto err; 840 } else { 841 if ((ret = __db_pitem( 842 dbc, dp, cpindx, hdr.size, &hdr, NULL)) != 0) 843 goto err; 844 ++cpindx; 845 } 846 /* Delete all but the last reference to the key. */ 847 if (cnt != 1) { 848 if ((ret = __bam_adjindx(dbc, 849 h, dindx, first + 1, 0)) != 0) 850 goto err; 851 } else 852 dindx++; 853 854 /* Delete the data item. */ 855 if ((ret = __db_ditem(dbc, h, dindx, hdr.size)) != 0) 856 goto err; 857 indx += P_INDX; 858 } while (--cnt); 859 860 /* Put in a new data item that points to the duplicates page. */ 861 if ((ret = __bam_ovput(dbc, 862 B_DUPLICATE, dp->pgno, h, first + 1, NULL)) != 0) 863 goto err; 864 865 /* Adjust cursors for all the above movements. */ 866 ret = __bam_ca_di(dbc, 867 PGNO(h), first + P_INDX, (int)(first + P_INDX - indx)); 868 869err: if ((t_ret = __memp_fput(mpf, 870 dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0) 871 ret = t_ret; 872 873 return (ret); 874} 875 876/* 877 * __bam_ovput -- 878 * Build an item for an off-page duplicates page or overflow page and 879 * insert it on the page. 880 */ 881static int 882__bam_ovput(dbc, type, pgno, h, indx, item) 883 DBC *dbc; 884 u_int32_t type, indx; 885 db_pgno_t pgno; 886 PAGE *h; 887 DBT *item; 888{ 889 BOVERFLOW bo; 890 DBT hdr; 891 int ret; 892 893 UMRW_SET(bo.unused1); 894 B_TSET(bo.type, type); 895 UMRW_SET(bo.unused2); 896 897 /* 898 * If we're creating an overflow item, do so and acquire the page 899 * number for it. If we're creating an off-page duplicates tree, 900 * we are giving the page number as an argument. 901 */ 902 if (type == B_OVERFLOW) { 903 if ((ret = __db_poff(dbc, item, &bo.pgno)) != 0) 904 return (ret); 905 bo.tlen = item->size; 906 } else { 907 bo.pgno = pgno; 908 bo.tlen = 0; 909 } 910 911 /* Store the new record on the page. */ 912 memset(&hdr, 0, sizeof(hdr)); 913 hdr.data = &bo; 914 hdr.size = BOVERFLOW_SIZE; 915 return (__db_pitem(dbc, h, indx, BOVERFLOW_SIZE, &hdr, NULL)); 916} 917