1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996,2008 Oracle. All rights reserved. 5 */ 6/* 7 * Copyright (c) 1990, 1993, 1994 8 * Margo Seltzer. All rights reserved. 9 */ 10/* 11 * Copyright (c) 1990, 1993, 1994 12 * The Regents of the University of California. All rights reserved. 13 * 14 * This code is derived from software contributed to Berkeley by 15 * Margo Seltzer. 16 * 17 * Redistribution and use in source and binary forms, with or without 18 * modification, are permitted provided that the following conditions 19 * are met: 20 * 1. Redistributions of source code must retain the above copyright 21 * notice, this list of conditions and the following disclaimer. 22 * 2. Redistributions in binary form must reproduce the above copyright 23 * notice, this list of conditions and the following disclaimer in the 24 * documentation and/or other materials provided with the distribution. 25 * 3. Neither the name of the University nor the names of its contributors 26 * may be used to endorse or promote products derived from this software 27 * without specific prior written permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 32 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 39 * SUCH DAMAGE. 40 * 41 * $Id: hash_page.c,v 12.53 2008/05/07 12:27:34 bschmeck Exp $ 42 */ 43 44/* 45 * PACKAGE: hashing 46 * 47 * DESCRIPTION: 48 * Page manipulation for hashing package. 49 */ 50 51#include "db_config.h" 52 53#include "db_int.h" 54#include "dbinc/db_page.h" 55#include "dbinc/hash.h" 56#include "dbinc/lock.h" 57#include "dbinc/mp.h" 58 59static int __hamc_delpg 60 __P((DBC *, db_pgno_t, db_pgno_t, u_int32_t, db_ham_mode, u_int32_t *)); 61static int __ham_getindex_sorted 62 __P((DBC *, PAGE *, const DBT *, int, int *, db_indx_t *)); 63static int __ham_getindex_unsorted 64 __P((DBC *, PAGE *, const DBT *, int *, db_indx_t *)); 65static int __ham_sort_page_cursor __P((DBC *, PAGE *)); 66 67/* 68 * PUBLIC: int __ham_item __P((DBC *, db_lockmode_t, db_pgno_t *)); 69 */ 70int 71__ham_item(dbc, mode, pgnop) 72 DBC *dbc; 73 db_lockmode_t mode; 74 db_pgno_t *pgnop; 75{ 76 DB *dbp; 77 HASH_CURSOR *hcp; 78 db_pgno_t next_pgno; 79 int ret; 80 81 dbp = dbc->dbp; 82 hcp = (HASH_CURSOR *)dbc->internal; 83 84 if (F_ISSET(hcp, H_DELETED)) { 85 __db_errx(dbp->env, "Attempt to return a deleted item"); 86 return (EINVAL); 87 } 88 F_CLR(hcp, H_OK | H_NOMORE); 89 90 /* Check if we need to get a page for this cursor. */ 91 if ((ret = __ham_get_cpage(dbc, mode)) != 0) 92 return (ret); 93 94recheck: 95 /* Check if we are looking for space in which to insert an item. */ 96 if (hcp->seek_size != 0 && hcp->seek_found_page == PGNO_INVALID && 97 hcp->seek_size < P_FREESPACE(dbp, hcp->page)) { 98 hcp->seek_found_page = hcp->pgno; 99 hcp->seek_found_indx = NDX_INVALID; 100 } 101 102 /* Check for off-page duplicates. */ 103 if (hcp->indx < NUM_ENT(hcp->page) && 104 HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP) { 105 memcpy(pgnop, 106 HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)), 107 sizeof(db_pgno_t)); 108 F_SET(hcp, H_OK); 109 return (0); 110 } 111 112 /* Check if we need to go on to the next page. */ 113 if (F_ISSET(hcp, H_ISDUP)) 114 /* 115 * ISDUP is set, and offset is at the beginning of the datum. 116 * We need to grab the length of the datum, then set the datum 117 * pointer to be the beginning of the datum. 118 */ 119 memcpy(&hcp->dup_len, 120 HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + 121 hcp->dup_off, sizeof(db_indx_t)); 122 123 if (hcp->indx >= (db_indx_t)NUM_ENT(hcp->page)) { 124 /* Fetch next page. */ 125 if (NEXT_PGNO(hcp->page) == PGNO_INVALID) { 126 F_SET(hcp, H_NOMORE); 127 return (DB_NOTFOUND); 128 } 129 next_pgno = NEXT_PGNO(hcp->page); 130 hcp->indx = 0; 131 if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0) 132 return (ret); 133 goto recheck; 134 } 135 136 F_SET(hcp, H_OK); 137 return (0); 138} 139 140/* 141 * PUBLIC: int __ham_item_reset __P((DBC *)); 142 */ 143int 144__ham_item_reset(dbc) 145 DBC *dbc; 146{ 147 DB *dbp; 148 DB_MPOOLFILE *mpf; 149 HASH_CURSOR *hcp; 150 int ret, t_ret; 151 152 dbp = dbc->dbp; 153 mpf = dbp->mpf; 154 hcp = (HASH_CURSOR *)dbc->internal; 155 156 ret = 0; 157 if (hcp->page != NULL) 158 ret = __memp_fput(mpf, 159 dbc->thread_info, hcp->page, dbc->priority); 160 161 if ((t_ret = __ham_item_init(dbc)) != 0 && ret == 0) 162 ret = t_ret; 163 164 return (ret); 165} 166 167/* 168 * PUBLIC: int __ham_item_init __P((DBC *)); 169 */ 170int 171__ham_item_init(dbc) 172 DBC *dbc; 173{ 174 HASH_CURSOR *hcp; 175 int ret; 176 177 hcp = (HASH_CURSOR *)dbc->internal; 178 179 /* 180 * If this cursor still holds any locks, we must release them if 181 * we are not running with transactions. 182 */ 183 ret = __TLPUT(dbc, hcp->lock); 184 185 /* 186 * The following fields must *not* be initialized here because they 187 * may have meaning across inits. 188 * hlock, hdr, split_buf, stats 189 */ 190 hcp->bucket = BUCKET_INVALID; 191 hcp->lbucket = BUCKET_INVALID; 192 LOCK_INIT(hcp->lock); 193 hcp->lock_mode = DB_LOCK_NG; 194 hcp->dup_off = 0; 195 hcp->dup_len = 0; 196 hcp->dup_tlen = 0; 197 hcp->seek_size = 0; 198 hcp->seek_found_page = PGNO_INVALID; 199 hcp->seek_found_indx = NDX_INVALID; 200 hcp->flags = 0; 201 202 hcp->pgno = PGNO_INVALID; 203 hcp->indx = NDX_INVALID; 204 hcp->page = NULL; 205 206 return (ret); 207} 208 209/* 210 * Returns the last item in a bucket. 211 * 212 * PUBLIC: int __ham_item_last __P((DBC *, db_lockmode_t, db_pgno_t *)); 213 */ 214int 215__ham_item_last(dbc, mode, pgnop) 216 DBC *dbc; 217 db_lockmode_t mode; 218 db_pgno_t *pgnop; 219{ 220 HASH_CURSOR *hcp; 221 int ret; 222 223 hcp = (HASH_CURSOR *)dbc->internal; 224 if ((ret = __ham_item_reset(dbc)) != 0) 225 return (ret); 226 227 hcp->bucket = hcp->hdr->max_bucket; 228 hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket); 229 F_SET(hcp, H_OK); 230 return (__ham_item_prev(dbc, mode, pgnop)); 231} 232 233/* 234 * PUBLIC: int __ham_item_first __P((DBC *, db_lockmode_t, db_pgno_t *)); 235 */ 236int 237__ham_item_first(dbc, mode, pgnop) 238 DBC *dbc; 239 db_lockmode_t mode; 240 db_pgno_t *pgnop; 241{ 242 HASH_CURSOR *hcp; 243 int ret; 244 245 hcp = (HASH_CURSOR *)dbc->internal; 246 if ((ret = __ham_item_reset(dbc)) != 0) 247 return (ret); 248 F_SET(hcp, H_OK); 249 hcp->bucket = 0; 250 hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket); 251 return (__ham_item_next(dbc, mode, pgnop)); 252} 253 254/* 255 * __ham_item_prev -- 256 * Returns a pointer to key/data pair on a page. In the case of 257 * bigkeys, just returns the page number and index of the bigkey 258 * pointer pair. 259 * 260 * PUBLIC: int __ham_item_prev __P((DBC *, db_lockmode_t, db_pgno_t *)); 261 */ 262int 263__ham_item_prev(dbc, mode, pgnop) 264 DBC *dbc; 265 db_lockmode_t mode; 266 db_pgno_t *pgnop; 267{ 268 DB *dbp; 269 HASH_CURSOR *hcp; 270 db_pgno_t next_pgno; 271 int ret; 272 273 hcp = (HASH_CURSOR *)dbc->internal; 274 dbp = dbc->dbp; 275 276 /* 277 * There are 5 cases for backing up in a hash file. 278 * Case 1: In the middle of a page, no duplicates, just dec the index. 279 * Case 2: In the middle of a duplicate set, back up one. 280 * Case 3: At the beginning of a duplicate set, get out of set and 281 * back up to next key. 282 * Case 4: At the beginning of a page; go to previous page. 283 * Case 5: At the beginning of a bucket; go to prev bucket. 284 */ 285 F_CLR(hcp, H_OK | H_NOMORE | H_DELETED); 286 287 if ((ret = __ham_get_cpage(dbc, mode)) != 0) 288 return (ret); 289 290 /* 291 * First handle the duplicates. Either you'll get the key here 292 * or you'll exit the duplicate set and drop into the code below 293 * to handle backing up through keys. 294 */ 295 if (!F_ISSET(hcp, H_NEXT_NODUP) && F_ISSET(hcp, H_ISDUP)) { 296 if (HPAGE_TYPE(dbp, hcp->page, H_DATAINDEX(hcp->indx)) == 297 H_OFFDUP) { 298 memcpy(pgnop, 299 HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)), 300 sizeof(db_pgno_t)); 301 F_SET(hcp, H_OK); 302 return (0); 303 } 304 305 /* Duplicates are on-page. */ 306 if (hcp->dup_off != 0) { 307 memcpy(&hcp->dup_len, HKEYDATA_DATA( 308 H_PAIRDATA(dbp, hcp->page, hcp->indx)) 309 + hcp->dup_off - sizeof(db_indx_t), 310 sizeof(db_indx_t)); 311 hcp->dup_off -= 312 DUP_SIZE(hcp->dup_len); 313 return (__ham_item(dbc, mode, pgnop)); 314 } 315 } 316 317 /* 318 * If we get here, we are not in a duplicate set, and just need 319 * to back up the cursor. There are still three cases: 320 * midpage, beginning of page, beginning of bucket. 321 */ 322 323 if (F_ISSET(hcp, H_DUPONLY)) { 324 F_CLR(hcp, H_OK); 325 F_SET(hcp, H_NOMORE); 326 return (0); 327 } else 328 /* 329 * We are no longer in a dup set; flag this so the dup code 330 * will reinitialize should we stumble upon another one. 331 */ 332 F_CLR(hcp, H_ISDUP); 333 334 if (hcp->indx == 0) { /* Beginning of page. */ 335 hcp->pgno = PREV_PGNO(hcp->page); 336 if (hcp->pgno == PGNO_INVALID) { 337 /* Beginning of bucket. */ 338 F_SET(hcp, H_NOMORE); 339 return (DB_NOTFOUND); 340 } else if ((ret = 341 __ham_next_cpage(dbc, hcp->pgno)) != 0) 342 return (ret); 343 else 344 hcp->indx = NUM_ENT(hcp->page); 345 } 346 347 /* 348 * Either we've got the cursor set up to be decremented, or we 349 * have to find the end of a bucket. 350 */ 351 if (hcp->indx == NDX_INVALID) { 352 DB_ASSERT(dbp->env, hcp->page != NULL); 353 354 hcp->indx = NUM_ENT(hcp->page); 355 for (next_pgno = NEXT_PGNO(hcp->page); 356 next_pgno != PGNO_INVALID; 357 next_pgno = NEXT_PGNO(hcp->page)) { 358 if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0) 359 return (ret); 360 hcp->indx = NUM_ENT(hcp->page); 361 } 362 363 if (hcp->indx == 0) { 364 /* Bucket was empty. */ 365 F_SET(hcp, H_NOMORE); 366 return (DB_NOTFOUND); 367 } 368 } 369 370 hcp->indx -= 2; 371 372 return (__ham_item(dbc, mode, pgnop)); 373} 374 375/* 376 * Sets the cursor to the next key/data pair on a page. 377 * 378 * PUBLIC: int __ham_item_next __P((DBC *, db_lockmode_t, db_pgno_t *)); 379 */ 380int 381__ham_item_next(dbc, mode, pgnop) 382 DBC *dbc; 383 db_lockmode_t mode; 384 db_pgno_t *pgnop; 385{ 386 HASH_CURSOR *hcp; 387 int ret; 388 389 hcp = (HASH_CURSOR *)dbc->internal; 390 391 if ((ret = __ham_get_cpage(dbc, mode)) != 0) 392 return (ret); 393 394 /* 395 * Deleted on-page duplicates are a weird case. If we delete the last 396 * one, then our cursor is at the very end of a duplicate set and 397 * we actually need to go on to the next key. 398 */ 399 if (F_ISSET(hcp, H_DELETED)) { 400 if (hcp->indx != NDX_INVALID && 401 F_ISSET(hcp, H_ISDUP) && 402 HPAGE_TYPE(dbc->dbp, hcp->page, H_DATAINDEX(hcp->indx)) 403 == H_DUPLICATE && hcp->dup_tlen == hcp->dup_off) { 404 if (F_ISSET(hcp, H_DUPONLY)) { 405 F_CLR(hcp, H_OK); 406 F_SET(hcp, H_NOMORE); 407 return (0); 408 } else { 409 F_CLR(hcp, H_ISDUP); 410 hcp->indx += 2; 411 } 412 } else if (!F_ISSET(hcp, H_ISDUP) && F_ISSET(hcp, H_DUPONLY)) { 413 F_CLR(hcp, H_OK); 414 F_SET(hcp, H_NOMORE); 415 return (0); 416 } else if (F_ISSET(hcp, H_ISDUP) && 417 F_ISSET(hcp, H_NEXT_NODUP)) { 418 F_CLR(hcp, H_ISDUP); 419 hcp->indx += 2; 420 } 421 F_CLR(hcp, H_DELETED); 422 } else if (hcp->indx == NDX_INVALID) { 423 hcp->indx = 0; 424 F_CLR(hcp, H_ISDUP); 425 } else if (F_ISSET(hcp, H_NEXT_NODUP)) { 426 hcp->indx += 2; 427 F_CLR(hcp, H_ISDUP); 428 } else if (F_ISSET(hcp, H_ISDUP) && hcp->dup_tlen != 0) { 429 if (hcp->dup_off + DUP_SIZE(hcp->dup_len) >= 430 hcp->dup_tlen && F_ISSET(hcp, H_DUPONLY)) { 431 F_CLR(hcp, H_OK); 432 F_SET(hcp, H_NOMORE); 433 return (0); 434 } 435 hcp->dup_off += DUP_SIZE(hcp->dup_len); 436 if (hcp->dup_off >= hcp->dup_tlen) { 437 F_CLR(hcp, H_ISDUP); 438 hcp->indx += 2; 439 } 440 } else if (F_ISSET(hcp, H_DUPONLY)) { 441 F_CLR(hcp, H_OK); 442 F_SET(hcp, H_NOMORE); 443 return (0); 444 } else { 445 hcp->indx += 2; 446 F_CLR(hcp, H_ISDUP); 447 } 448 449 return (__ham_item(dbc, mode, pgnop)); 450} 451 452/* 453 * __ham_insertpair -- 454 * 455 * Used for adding a pair of elements to a sorted page. We are guaranteed that 456 * the pair will fit on this page. 457 * 458 * If an index is provided, then use it, otherwise lookup the index using 459 * __ham_getindex. This saves a getindex call when inserting using a cursor. 460 * 461 * We're overloading the meaning of the H_OFFPAGE type here, which is a little 462 * bit sleazy. When we recover deletes, we have the entire entry instead of 463 * having only the DBT, so we'll pass type H_OFFPAGE to mean "copy the whole 464 * entry" as opposed to constructing an H_KEYDATA around it. In the recovery 465 * case it is assumed that a valid index is passed in, since a lookup using 466 * the overloaded H_OFFPAGE key will be incorrect. 467 * 468 * PUBLIC: int __ham_insertpair __P((DBC *, 469 * PUBLIC: PAGE *p, db_indx_t *indxp, const DBT *, const DBT *, int, int)); 470 */ 471int 472__ham_insertpair(dbc, p, indxp, key_dbt, data_dbt, key_type, data_type) 473 DBC *dbc; 474 PAGE *p; 475 db_indx_t *indxp; 476 const DBT *key_dbt, *data_dbt; 477 int key_type, data_type; 478{ 479 DB *dbp; 480 u_int16_t n, indx; 481 db_indx_t *inp; 482 u_int32_t ksize, dsize, increase, distance; 483 u_int8_t *offset; 484 int i, match, ret; 485 486 dbp = dbc->dbp; 487 n = NUM_ENT(p); 488 inp = P_INP(dbp, p); 489 ksize = (key_type == H_OFFPAGE) ? 490 key_dbt->size : HKEYDATA_SIZE(key_dbt->size); 491 dsize = (data_type == H_OFFPAGE) ? 492 data_dbt->size : HKEYDATA_SIZE(data_dbt->size); 493 increase = ksize + dsize; 494 495 if (indxp != NULL && *indxp != NDX_INVALID) 496 indx = *indxp; 497 else { 498 if ((ret = __ham_getindex(dbc, p, key_dbt, 499 key_type, &match, &indx)) != 0) 500 return (ret); 501 /* Save the index for the caller */ 502 if (indxp != NULL) 503 *indxp = indx; 504 /* It is an error to insert a duplicate key */ 505 DB_ASSERT(dbp->env, match != 0); 506 } 507 508 /* Special case if the page is empty or inserting at end of page.*/ 509 if (n == 0 || indx == n) { 510 inp[indx] = HOFFSET(p) - ksize; 511 inp[indx+1] = HOFFSET(p) - increase; 512 } else { 513 /* 514 * Shuffle the data elements. 515 * 516 * For example, inserting an element that sorts between items 517 * 2 and 3 on a page: 518 * The copy starts from the beginning of the second item. 519 * 520 * --------------------------- 521 * |pgheader.. 522 * |__________________________ 523 * ||1|2|3|4|... 524 * |-------------------------- 525 * | 526 * |__________________________ 527 * | ...|4|3|2|1| 528 * |-------------------------- 529 * --------------------------- 530 * 531 * Becomes: 532 * 533 * --------------------------- 534 * |pgheader.. 535 * |__________________________ 536 * ||1|2|2a|3|4|... 537 * |-------------------------- 538 * | 539 * |__________________________ 540 * | ...|4|3|2a|2|1| 541 * |-------------------------- 542 * --------------------------- 543 * 544 * Index's 3,4 etc move down the page. 545 * The data for 3,4,etc moves up the page by sizeof(2a) 546 * The index pointers in 3,4 etc are updated to point at the 547 * relocated data. 548 * It is necessary to move the data (not just adjust the index) 549 * since the hash format uses consecutive data items to 550 * dynamically calculate the item size. 551 * An item in this example is a key/data pair. 552 */ 553 offset = (u_int8_t *)p + HOFFSET(p); 554 if (indx == 0) 555 distance = dbp->pgsize - HOFFSET(p); 556 else 557 distance = (u_int32_t) 558 (P_ENTRY(dbp, p, indx - 1) - offset); 559 memmove(offset - increase, offset, distance); 560 561 /* Shuffle the index array */ 562 memmove(&inp[indx + 2], &inp[indx], 563 (n - indx) * sizeof(db_indx_t)); 564 565 /* update the index array */ 566 for (i = indx + 2; i < n + 2; i++) 567 inp[i] -= increase; 568 569 /* set the new index elements. */ 570 inp[indx] = (HOFFSET(p) - increase) + distance + dsize; 571 inp[indx + 1] = (HOFFSET(p) - increase) + distance; 572 } 573 574 HOFFSET(p) -= increase; 575 /* insert the new elements */ 576 if (key_type == H_OFFPAGE) 577 memcpy(P_ENTRY(dbp, p, indx), key_dbt->data, key_dbt->size); 578 else 579 PUT_HKEYDATA(P_ENTRY(dbp, p, indx), key_dbt->data, 580 key_dbt->size, key_type); 581 if (data_type == H_OFFPAGE) 582 memcpy(P_ENTRY(dbp, p, indx+1), data_dbt->data, 583 data_dbt->size); 584 else 585 PUT_HKEYDATA(P_ENTRY(dbp, p, indx+1), data_dbt->data, 586 data_dbt->size, data_type); 587 NUM_ENT(p) += 2; 588 589 /* 590 * If debugging a sorted hash page problem, this is a good place to 591 * insert a call to __ham_verify_sorted_page. 592 * It used to be called when diagnostic mode was enabled, but that 593 * causes problems in recovery if a custom comparator was used. 594 */ 595 return (0); 596} 597 598/* 599 * __hame_getindex -- 600 * 601 * The key_type parameter overloads the entry type to allow for comparison of 602 * a key DBT that contains off-page data. A key that is not of type H_OFFPAGE 603 * might contain data larger than the page size, since this routine can be 604 * called with user-provided DBTs. 605 * 606 * PUBLIC: int __ham_getindex __P((DBC *, 607 * PUBLIC: PAGE *, const DBT *, int, int *, db_indx_t *)); 608 */ 609int 610__ham_getindex(dbc, p, key, key_type, match, indx) 611 DBC *dbc; 612 PAGE *p; 613 const DBT *key; 614 int key_type, *match; 615 db_indx_t *indx; 616{ 617 /* Since all entries are key/data pairs. */ 618 DB_ASSERT(dbc->env, NUM_ENT(p)%2 == 0 ); 619 620 /* Support pre 4.6 unsorted hash pages. */ 621 if (p->type == P_HASH_UNSORTED) 622 return (__ham_getindex_unsorted(dbc, p, key, match, indx)); 623 else 624 return (__ham_getindex_sorted(dbc, 625 p, key, key_type, match, indx)); 626} 627 628#undef min 629#define min(a, b) (((a) < (b)) ? (a) : (b)) 630 631/* 632 * Perform a linear search of an unsorted (pre 4.6 format) hash page. 633 * 634 * This routine is never used to generate an index for insertion, because any 635 * unsorted page is sorted before we insert. 636 * 637 * Returns 0 if an exact match is found, with indx set to requested elem. 638 * Returns 1 if the item did not exist, indx is set to the last element on the 639 * page. 640 */ 641static int 642__ham_getindex_unsorted(dbc, p, key, match, indx) 643 DBC *dbc; 644 PAGE *p; 645 const DBT *key; 646 int *match; 647 db_indx_t *indx; 648{ 649 DB *dbp; 650 DBT pg_dbt; 651 DB_THREAD_INFO *ip; 652 DB_TXN *txn; 653 HASH *t; 654 db_pgno_t pgno; 655 int i, n, res, ret; 656 u_int32_t tlen; 657 u_int8_t *hk; 658 659 dbp = dbc->dbp; 660 txn = dbc->txn; 661 ip = dbc->thread_info; 662 n = NUM_ENT(p); 663 t = dbp->h_internal; 664 res = 1; 665 666 /* Do a linear search over the page looking for an exact match */ 667 for (i = 0; i < n; i+=2) { 668 hk = H_PAIRKEY(dbp, p, i); 669 switch (HPAGE_PTYPE(hk)) { 670 case H_OFFPAGE: 671 /* extract item length from possibly unaligned DBT */ 672 memcpy(&tlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); 673 if (tlen == key->size) { 674 memcpy(&pgno, 675 HOFFPAGE_PGNO(hk), sizeof(db_pgno_t)); 676 if ((ret = __db_moff(dbp, ip, txn, key, 677 pgno, tlen, t->h_compare, &res)) != 0) 678 return (ret); 679 } 680 break; 681 case H_KEYDATA: 682 if (t->h_compare != NULL) { 683 DB_INIT_DBT(pg_dbt, 684 HKEYDATA_DATA(hk), key->size); 685 if (t->h_compare( 686 dbp, key, &pg_dbt) != 0) 687 break; 688 } else if (key->size == 689 LEN_HKEY(dbp, p, dbp->pgsize, i)) 690 res = memcmp(key->data, HKEYDATA_DATA(hk), 691 key->size); 692 break; 693 case H_DUPLICATE: 694 case H_OFFDUP: 695 /* 696 * These are errors because keys are never duplicated. 697 */ 698 /* FALLTHROUGH */ 699 default: 700 return (__db_pgfmt(dbp->env, PGNO(p))); 701 } 702 if (res == 0) 703 break; 704 } 705 *indx = i; 706 *match = (res == 0 ? 0 : 1); 707 return (0); 708} 709 710/* 711 * Perform a binary search of a sorted hash page for a key. 712 * Return 0 if an exact match is found, with indx set to requested elem. 713 * Return 1 if the item did not exist, indx will be set to the first element 714 * greater than the requested item. 715 */ 716static int 717__ham_getindex_sorted(dbc, p, key, key_type, match, indxp) 718 DBC *dbc; 719 PAGE *p; 720 const DBT *key; 721 int key_type, *match; 722 db_indx_t *indxp; 723{ 724 DB *dbp; 725 DBT tmp_dbt; 726 DB_THREAD_INFO *ip; 727 DB_TXN *txn; 728 HASH *t; 729 HOFFPAGE *offp; 730 db_indx_t indx; 731 db_pgno_t off_pgno, koff_pgno; 732 u_int32_t base, itemlen, lim, off_len; 733 u_int8_t *entry; 734 int res, ret; 735 void *data; 736 737 dbp = dbc->dbp; 738 txn = dbc->txn; 739 ip = dbc->thread_info; 740 DB_ASSERT(dbp->env, p->type == P_HASH ); 741 742 t = dbp->h_internal; 743 /* Initialize so the return params are correct for empty pages. */ 744 res = indx = 0; 745 746 /* Do a binary search for the element. */ 747 DB_BINARY_SEARCH_FOR(base, lim, p, 2) { 748 DB_BINARY_SEARCH_INCR(indx, base, lim, 2); 749 data = HKEYDATA_DATA(H_PAIRKEY(dbp, p, indx)); 750 /* 751 * There are 4 cases here: 752 * 1) Off page key, off page match 753 * 2) Off page key, on page match 754 * 3) On page key, off page match 755 * 4) On page key, on page match 756 */ 757 entry = P_ENTRY(dbp, p, indx); 758 if (*entry == H_OFFPAGE) { 759 offp = (HOFFPAGE*)P_ENTRY(dbp, p, indx); 760 (void)__ua_memcpy(&itemlen, HOFFPAGE_TLEN(offp), 761 sizeof(u_int32_t)); 762 if (key_type == H_OFFPAGE) { 763 /* 764 * Case 1. 765 * 766 * If both key and cmp DBTs refer to different 767 * offpage items, it is necessary to compare 768 * the content of the entries, in order to be 769 * able to maintain a valid lexicographic sort 770 * order. 771 */ 772 (void)__ua_memcpy(&koff_pgno, 773 HOFFPAGE_PGNO(key->data), 774 sizeof(db_pgno_t)); 775 (void)__ua_memcpy(&off_pgno, HOFFPAGE_PGNO(offp), 776 sizeof(db_pgno_t)); 777 if (koff_pgno == off_pgno) 778 res = 0; 779 else { 780 memset(&tmp_dbt, 0, sizeof(tmp_dbt)); 781 tmp_dbt.size = HOFFPAGE_SIZE; 782 tmp_dbt.data = offp; 783 if ((ret = __db_coff(dbp, ip, txn, key, 784 &tmp_dbt, t->h_compare, &res)) 785 != 0) 786 return (ret); 787 } 788 } else { 789 /* Case 2 */ 790 (void)__ua_memcpy(&off_pgno, 791 HOFFPAGE_PGNO(offp), sizeof(db_pgno_t)); 792 if ((ret = __db_moff(dbp, ip, txn, key, 793 off_pgno, itemlen, t->h_compare, &res)) 794 != 0) 795 return (ret); 796 } 797 } else { 798 itemlen = LEN_HKEYDATA(dbp, p, dbp->pgsize, indx); 799 if (key_type == H_OFFPAGE) { 800 /* Case 3 */ 801 tmp_dbt.data = data; 802 tmp_dbt.size = itemlen; 803 offp = (HOFFPAGE *)key->data; 804 (void)__ua_memcpy(&off_pgno, 805 HOFFPAGE_PGNO(offp), sizeof(db_pgno_t)); 806 (void)__ua_memcpy(&off_len, HOFFPAGE_TLEN(offp), 807 sizeof(u_int32_t)); 808 if ((ret = __db_moff(dbp, ip, txn, &tmp_dbt, 809 off_pgno, off_len, t->h_compare, 810 &res)) != 0) 811 return (ret); 812 /* 813 * Since we switched the key/match parameters 814 * in the __db_moff call, the result needs to 815 * be inverted. 816 */ 817 res = -res; 818 } else if (t->h_compare != NULL) { 819 /* Case 4, with a user comparison func */ 820 DB_INIT_DBT(tmp_dbt, data, itemlen); 821 res = t->h_compare(dbp, key, &tmp_dbt); 822 } else { 823 /* Case 4, without a user comparison func */ 824 if ((res = memcmp(key->data, data, 825 min(key->size, itemlen))) == 0) 826 res = itemlen > key->size ? 1 : 827 (itemlen < key->size ? -1 : 0); 828 } 829 } 830 if (res == 0) { 831 /* Found a match */ 832 *indxp = indx; 833 *match = 0; 834 return (0); 835 } else if (res > 0) 836 DB_BINARY_SEARCH_SHIFT_BASE(indx, base, lim, 2); 837 } 838 /* 839 * If no match was found, and the comparison indicates that the 840 * closest match was lexicographically less than the input key adjust 841 * the insertion index to be after the index of the closest match. 842 */ 843 if (res > 0) 844 indx += 2; 845 *indxp = indx; 846 *match = 1; 847 return (0); 848} 849 850/* 851 * PUBLIC: int __ham_verify_sorted_page __P((DB *, 852 * PUBLIC: DB_THREAD_INFO *, DB_TXN *, PAGE *)); 853 * 854 * The__ham_verify_sorted_page function is used to determine the correctness 855 * of sorted hash pages. The checks are used by verification, they are 856 * implemented in the hash code because they are also useful debugging aids. 857 */ 858int 859__ham_verify_sorted_page (dbp, ip, txn, p) 860 DB *dbp; 861 DB_THREAD_INFO *ip; 862 DB_TXN *txn; 863 PAGE *p; 864{ 865 DBT prev_dbt, curr_dbt; 866 ENV *env; 867 HASH *t; 868 db_pgno_t tpgno; 869 u_int32_t curr_len, prev_len, tlen; 870 u_int16_t *indxp; 871 db_indx_t i, n; 872 int res, ret; 873 char *prev, *curr; 874 875 /* Validate that next, prev pointers are OK */ 876 n = NUM_ENT(p); 877 DB_ASSERT(dbp->env, n%2 == 0 ); 878 879 env = dbp->env; 880 t = dbp->h_internal; 881 882 /* Disable verification if a custom comparator is supplied */ 883 if (t->h_compare != NULL) 884 return (0); 885 886 /* Iterate through page, ensuring order */ 887 prev = (char *)HKEYDATA_DATA(H_PAIRKEY(dbp, p, 0)); 888 prev_len = LEN_HKEYDATA(dbp, p, dbp->pgsize, 0); 889 for (i = 2; i < n; i+=2) { 890 curr = (char *)HKEYDATA_DATA(H_PAIRKEY(dbp, p, i)); 891 curr_len = LEN_HKEYDATA(dbp, p, dbp->pgsize, i); 892 893 if (HPAGE_TYPE(dbp, p, i-2) == H_OFFPAGE && 894 HPAGE_TYPE(dbp, p, i) == H_OFFPAGE) { 895 memset(&prev_dbt, 0, sizeof(prev_dbt)); 896 memset(&curr_dbt, 0, sizeof(curr_dbt)); 897 prev_dbt.size = curr_dbt.size = HOFFPAGE_SIZE; 898 prev_dbt.data = H_PAIRKEY(dbp, p, i-2); 899 curr_dbt.data = H_PAIRKEY(dbp, p, i); 900 if ((ret = __db_coff(dbp, ip, txn, 901 &prev_dbt, &curr_dbt, t->h_compare, &res)) != 0) 902 return (ret); 903 } else if (HPAGE_TYPE(dbp, p, i-2) == H_OFFPAGE) { 904 memset(&curr_dbt, 0, sizeof(curr_dbt)); 905 curr_dbt.size = curr_len; 906 curr_dbt.data = H_PAIRKEY(dbp, p, i); 907 memcpy(&tlen, HOFFPAGE_TLEN(H_PAIRKEY(dbp, p, i-2)), 908 sizeof(u_int32_t)); 909 memcpy(&tpgno, HOFFPAGE_PGNO(H_PAIRKEY(dbp, p, i-2)), 910 sizeof(db_pgno_t)); 911 if ((ret = __db_moff(dbp, ip, txn, 912 &curr_dbt, tpgno, tlen, t->h_compare, &res)) != 0) 913 return (ret); 914 } else if (HPAGE_TYPE(dbp, p, i) == H_OFFPAGE) { 915 memset(&prev_dbt, 0, sizeof(prev_dbt)); 916 prev_dbt.size = prev_len; 917 prev_dbt.data = H_PAIRKEY(dbp, p, i); 918 memcpy(&tlen, HOFFPAGE_TLEN(H_PAIRKEY(dbp, p, i)), 919 sizeof(u_int32_t)); 920 memcpy(&tpgno, HOFFPAGE_PGNO(H_PAIRKEY(dbp, p, i)), 921 sizeof(db_pgno_t)); 922 if ((ret = __db_moff(dbp, ip, txn, 923 &prev_dbt, tpgno, tlen, t->h_compare, &res)) != 0) 924 return (ret); 925 } else 926 res = memcmp(prev, curr, min(curr_len, prev_len)); 927 928 if (res == 0 && curr_len > prev_len) 929 res = 1; 930 else if (res == 0 && curr_len < prev_len) 931 res = -1; 932 933 if (res >= 0) { 934 __db_msg(env, "key1: %s, key2: %s, len: %lu\n", 935 (char *)prev, (char *)curr, 936 (u_long)min(curr_len, prev_len)); 937 __db_msg(env, "curroffset %lu\n", (u_long)i); 938 __db_msg(env, "indexes: "); 939 for (i = 0; i < n; i++) { 940 indxp = P_INP(dbp, p) + i; 941 __db_msg(env, "%04X, ", *indxp); 942 } 943 __db_msg(env, "\n"); 944#ifdef HAVE_STATISTICS 945 if ((ret = __db_prpage(dbp, p, DB_PR_PAGE)) != 0) 946 return (ret); 947#endif 948 DB_ASSERT(dbp->env, res < 0); 949 } 950 951 prev = curr; 952 prev_len = curr_len; 953 } 954 return (0); 955} 956 957/* 958 * A wrapper for the __ham_sort_page function. Implements logging and cursor 959 * adjustments associated with sorting a page outside of recovery/upgrade. 960 */ 961static int 962__ham_sort_page_cursor(dbc, page) 963 DBC *dbc; 964 PAGE *page; 965{ 966 DB *dbp; 967 DBT page_dbt; 968 DB_LSN new_lsn; 969 HASH_CURSOR *hcp; 970 int ret; 971 972 dbp = dbc->dbp; 973 hcp = (HASH_CURSOR *)dbc->internal; 974 975 if (DBC_LOGGING(dbc)) { 976 page_dbt.size = dbp->pgsize; 977 page_dbt.data = page; 978 if ((ret = __ham_splitdata_log(dbp, dbc->txn, 979 &new_lsn, 0, SORTPAGE, PGNO(page), 980 &page_dbt, &LSN(page))) != 0) 981 return (ret); 982 } else 983 LSN_NOT_LOGGED(new_lsn); 984 /* Move lsn onto page. */ 985 LSN(page) = new_lsn; /* Structure assignment. */ 986 987 /* 988 * Invalidate the saved index, it needs to be retrieved 989 * again once the page is sorted. 990 */ 991 hcp->seek_found_indx = NDX_INVALID; 992 hcp->seek_found_page = PGNO_INVALID; 993 994 return (__ham_sort_page(dbc, &hcp->split_buf, page)); 995} 996 997/* 998 * PUBLIC: int __ham_sort_page __P((DBC *, PAGE **, PAGE *)); 999 * 1000 * Convert a page from P_HASH_UNSORTED into the sorted format P_HASH. 1001 * 1002 * All locking and logging is carried out be the caller. A user buffer can 1003 * optionally be passed in to save allocating a page size buffer for sorting. 1004 * This is allows callers to re-use the buffer pre-allocated for page splits 1005 * in the hash cursor. The buffer is optional since no cursor exists when in 1006 * the recovery or upgrade code paths. 1007 */ 1008int 1009__ham_sort_page(dbc, tmp_buf, page) 1010 DBC *dbc; 1011 PAGE **tmp_buf; 1012 PAGE *page; 1013{ 1014 DB *dbp; 1015 PAGE *temp_pagep; 1016 db_indx_t i; 1017 int ret; 1018 1019 dbp = dbc->dbp; 1020 DB_ASSERT(dbp->env, page->type == P_HASH_UNSORTED); 1021 1022 ret = 0; 1023 if (tmp_buf != NULL) 1024 temp_pagep = *tmp_buf; 1025 else if ((ret = __os_malloc(dbp->env, dbp->pgsize, &temp_pagep)) != 0) 1026 return (ret); 1027 1028 memcpy(temp_pagep, page, dbp->pgsize); 1029 1030 /* Re-initialize the page. */ 1031 P_INIT(page, dbp->pgsize, 1032 page->pgno, page->prev_pgno, page->next_pgno, 0, P_HASH); 1033 1034 for (i = 0; i < NUM_ENT(temp_pagep); i += 2) 1035 if ((ret = 1036 __ham_copypair(dbc, temp_pagep, i, page, NULL)) != 0) 1037 break; 1038 1039 if (tmp_buf == NULL) 1040 __os_free(dbp->env, temp_pagep); 1041 1042 return (ret); 1043} 1044 1045/* 1046 * PUBLIC: int __ham_del_pair __P((DBC *, int)); 1047 */ 1048int 1049__ham_del_pair(dbc, flags) 1050 DBC *dbc; 1051 int flags; 1052{ 1053 DB *dbp; 1054 DBT data_dbt, key_dbt; 1055 DB_LSN new_lsn, *n_lsn, tmp_lsn; 1056 DB_MPOOLFILE *mpf; 1057 HASH_CURSOR *hcp; 1058 PAGE *n_pagep, *nn_pagep, *p, *p_pagep; 1059 db_ham_mode op; 1060 db_indx_t ndx; 1061 db_pgno_t chg_pgno, pgno, tmp_pgno; 1062 u_int32_t order; 1063 int ret, t_ret; 1064 1065 dbp = dbc->dbp; 1066 mpf = dbp->mpf; 1067 hcp = (HASH_CURSOR *)dbc->internal; 1068 n_pagep = p_pagep = nn_pagep = NULL; 1069 ndx = hcp->indx; 1070 1071 if (hcp->page == NULL && 1072 (ret = __memp_fget(mpf, &hcp->pgno, dbc->thread_info, dbc->txn, 1073 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &hcp->page)) != 0) 1074 return (ret); 1075 p = hcp->page; 1076 1077 /* 1078 * We optimize for the normal case which is when neither the key nor 1079 * the data are large. In this case, we write a single log record 1080 * and do the delete. If either is large, we'll call __big_delete 1081 * to remove the big item and then update the page to remove the 1082 * entry referring to the big item. 1083 */ 1084 if (HPAGE_PTYPE(H_PAIRKEY(dbp, p, ndx)) == H_OFFPAGE) { 1085 memcpy(&pgno, HOFFPAGE_PGNO(P_ENTRY(dbp, p, H_KEYINDEX(ndx))), 1086 sizeof(db_pgno_t)); 1087 ret = __db_doff(dbc, pgno); 1088 } else 1089 ret = 0; 1090 1091 if (ret == 0) 1092 switch (HPAGE_PTYPE(H_PAIRDATA(dbp, p, ndx))) { 1093 case H_OFFPAGE: 1094 memcpy(&pgno, 1095 HOFFPAGE_PGNO(P_ENTRY(dbp, p, H_DATAINDEX(ndx))), 1096 sizeof(db_pgno_t)); 1097 ret = __db_doff(dbc, pgno); 1098 break; 1099 case H_OFFDUP: 1100 case H_DUPLICATE: 1101 /* 1102 * If we delete a pair that is/was a duplicate, then 1103 * we had better clear the flag so that we update the 1104 * cursor appropriately. 1105 */ 1106 F_CLR(hcp, H_ISDUP); 1107 break; 1108 default: 1109 /* No-op */ 1110 break; 1111 } 1112 1113 if (ret) 1114 return (ret); 1115 1116 /* Now log the delete off this page. */ 1117 if (DBC_LOGGING(dbc)) { 1118 key_dbt.data = P_ENTRY(dbp, p, H_KEYINDEX(ndx)); 1119 key_dbt.size = LEN_HITEM(dbp, p, dbp->pgsize, H_KEYINDEX(ndx)); 1120 data_dbt.data = P_ENTRY(dbp, p, H_DATAINDEX(ndx)); 1121 data_dbt.size = 1122 LEN_HITEM(dbp, p, dbp->pgsize, H_DATAINDEX(ndx)); 1123 1124 if ((ret = __ham_insdel_log(dbp, 1125 dbc->txn, &new_lsn, 0, DELPAIR, PGNO(p), (u_int32_t)ndx, 1126 &LSN(p), &key_dbt, &data_dbt)) != 0) 1127 return (ret); 1128 } else 1129 LSN_NOT_LOGGED(new_lsn); 1130 1131 /* Move lsn onto page. */ 1132 LSN(p) = new_lsn; 1133 /* Do the delete. */ 1134 __ham_dpair(dbp, p, ndx); 1135 1136 /* 1137 * Mark item deleted so that we don't try to return it, and 1138 * so that we update the cursor correctly on the next call 1139 * to next. 1140 */ 1141 F_SET(hcp, H_DELETED); 1142 F_CLR(hcp, H_OK); 1143 1144 /* 1145 * If we are locking, we will not maintain this, because it is 1146 * a hot spot. 1147 * 1148 * XXX 1149 * Perhaps we can retain incremental numbers and apply them later. 1150 */ 1151 if (!STD_LOCKING(dbc)) { 1152 if ((ret = __ham_dirty_meta(dbc, 0)) != 0) 1153 return (ret); 1154 --hcp->hdr->nelem; 1155 } 1156 1157 /* The HAM_DEL_NO_CURSOR flag implies HAM_DEL_NO_RECLAIM. */ 1158 if (LF_ISSET(HAM_DEL_NO_CURSOR)) 1159 return (0); 1160 /* 1161 * Update cursors that are on the page where the delete happened. 1162 */ 1163 if ((ret = __hamc_update(dbc, 0, DB_HAM_CURADJ_DEL, 0)) != 0) 1164 return (ret); 1165 1166 /* 1167 * If we need to reclaim the page, then check if the page is empty. 1168 * There are two cases. If it's empty and it's not the first page 1169 * in the bucket (i.e., the bucket page) then we can simply remove 1170 * it. If it is the first chain in the bucket, then we need to copy 1171 * the second page into it and remove the second page. 1172 * If its the only page in the bucket we leave it alone. 1173 */ 1174 if (LF_ISSET(HAM_DEL_NO_RECLAIM) || 1175 NUM_ENT(p) != 0 || 1176 (PREV_PGNO(p) == PGNO_INVALID && NEXT_PGNO(p) == PGNO_INVALID)) 1177 return (0); 1178 1179 if (PREV_PGNO(p) == PGNO_INVALID) { 1180 /* 1181 * First page in chain is empty and we know that there 1182 * are more pages in the chain. 1183 */ 1184 if ((ret = __memp_fget(mpf, 1185 &NEXT_PGNO(p), dbc->thread_info, dbc->txn, 1186 DB_MPOOL_DIRTY, &n_pagep)) != 0) 1187 return (ret); 1188 1189 if (NEXT_PGNO(n_pagep) != PGNO_INVALID && 1190 (ret = __memp_fget(mpf, &NEXT_PGNO(n_pagep), 1191 dbc->thread_info, dbc->txn, 1192 DB_MPOOL_DIRTY, &nn_pagep)) != 0) 1193 goto err; 1194 1195 if (DBC_LOGGING(dbc)) { 1196 key_dbt.data = n_pagep; 1197 key_dbt.size = dbp->pgsize; 1198 if ((ret = __ham_copypage_log(dbp, 1199 dbc->txn, &new_lsn, 0, PGNO(p), 1200 &LSN(p), PGNO(n_pagep), &LSN(n_pagep), 1201 NEXT_PGNO(n_pagep), 1202 nn_pagep == NULL ? NULL : &LSN(nn_pagep), 1203 &key_dbt)) != 0) 1204 goto err; 1205 } else 1206 LSN_NOT_LOGGED(new_lsn); 1207 1208 /* Move lsn onto page. */ 1209 LSN(p) = new_lsn; /* Structure assignment. */ 1210 LSN(n_pagep) = new_lsn; 1211 if (NEXT_PGNO(n_pagep) != PGNO_INVALID) 1212 LSN(nn_pagep) = new_lsn; 1213 1214 if (nn_pagep != NULL) { 1215 PREV_PGNO(nn_pagep) = PGNO(p); 1216 if ((ret = __memp_fput(mpf, 1217 dbc->thread_info, nn_pagep, dbc->priority)) != 0) { 1218 nn_pagep = NULL; 1219 goto err; 1220 } 1221 } 1222 1223 tmp_pgno = PGNO(p); 1224 tmp_lsn = LSN(p); 1225 memcpy(p, n_pagep, dbp->pgsize); 1226 PGNO(p) = tmp_pgno; 1227 LSN(p) = tmp_lsn; 1228 PREV_PGNO(p) = PGNO_INVALID; 1229 1230 /* 1231 * Update cursors to reflect the fact that records 1232 * on the second page have moved to the first page. 1233 */ 1234 if ((ret = __hamc_delpg(dbc, PGNO(n_pagep), 1235 PGNO(p), 0, DB_HAM_DELFIRSTPG, &order)) != 0) 1236 goto err; 1237 1238 /* 1239 * Update the cursor to reflect its new position. 1240 */ 1241 hcp->indx = 0; 1242 hcp->pgno = PGNO(p); 1243 hcp->order += order; 1244 1245 if ((ret = __db_free(dbc, n_pagep)) != 0) { 1246 n_pagep = NULL; 1247 goto err; 1248 } 1249 } else { 1250 if ((ret = __memp_fget(mpf, 1251 &PREV_PGNO(p), dbc->thread_info, dbc->txn, 1252 DB_MPOOL_DIRTY, &p_pagep)) != 0) 1253 goto err; 1254 1255 if (NEXT_PGNO(p) != PGNO_INVALID) { 1256 if ((ret = __memp_fget(mpf, &NEXT_PGNO(p), 1257 dbc->thread_info, dbc->txn, 1258 DB_MPOOL_DIRTY, &n_pagep)) != 0) 1259 goto err; 1260 n_lsn = &LSN(n_pagep); 1261 } else { 1262 n_pagep = NULL; 1263 n_lsn = NULL; 1264 } 1265 1266 NEXT_PGNO(p_pagep) = NEXT_PGNO(p); 1267 if (n_pagep != NULL) 1268 PREV_PGNO(n_pagep) = PGNO(p_pagep); 1269 1270 if (DBC_LOGGING(dbc)) { 1271 if ((ret = __ham_newpage_log(dbp, dbc->txn, 1272 &new_lsn, 0, DELOVFL, PREV_PGNO(p), &LSN(p_pagep), 1273 PGNO(p), &LSN(p), NEXT_PGNO(p), n_lsn)) != 0) 1274 goto err; 1275 } else 1276 LSN_NOT_LOGGED(new_lsn); 1277 1278 /* Move lsn onto page. */ 1279 LSN(p_pagep) = new_lsn; /* Structure assignment. */ 1280 if (n_pagep) 1281 LSN(n_pagep) = new_lsn; 1282 LSN(p) = new_lsn; 1283 1284 if (NEXT_PGNO(p) == PGNO_INVALID) { 1285 /* 1286 * There is no next page; put the cursor on the 1287 * previous page as if we'd deleted the last item 1288 * on that page, with index after the last valid 1289 * entry. 1290 * 1291 * The deleted flag was set up above. 1292 */ 1293 hcp->pgno = PGNO(p_pagep); 1294 hcp->indx = NUM_ENT(p_pagep); 1295 op = DB_HAM_DELLASTPG; 1296 } else { 1297 /* 1298 * There is a next page, so put the cursor at 1299 * the beginning of it. 1300 */ 1301 hcp->pgno = NEXT_PGNO(p); 1302 hcp->indx = 0; 1303 op = DB_HAM_DELMIDPG; 1304 } 1305 1306 /* 1307 * Since we are about to delete the cursor page and we have 1308 * just moved the cursor, we need to make sure that the 1309 * old page pointer isn't left hanging around in the cursor. 1310 */ 1311 hcp->page = NULL; 1312 chg_pgno = PGNO(p); 1313 ret = __db_free(dbc, p); 1314 if ((t_ret = __memp_fput(mpf, dbc->thread_info, 1315 p_pagep, dbc->priority)) != 0 && ret == 0) 1316 ret = t_ret; 1317 if (n_pagep != NULL && (t_ret = __memp_fput(mpf, 1318 dbc->thread_info, n_pagep, dbc->priority)) != 0 && ret == 0) 1319 ret = t_ret; 1320 if (ret != 0) 1321 return (ret); 1322 if ((ret = __hamc_delpg(dbc, 1323 chg_pgno, hcp->pgno, hcp->indx, op, &order)) != 0) 1324 return (ret); 1325 hcp->order += order; 1326 } 1327 return (ret); 1328 1329err: /* Clean up any pages. */ 1330 if (n_pagep != NULL) 1331 (void)__memp_fput(mpf, 1332 dbc->thread_info, n_pagep, dbc->priority); 1333 if (nn_pagep != NULL) 1334 (void)__memp_fput(mpf, 1335 dbc->thread_info, nn_pagep, dbc->priority); 1336 if (p_pagep != NULL) 1337 (void)__memp_fput(mpf, 1338 dbc->thread_info, p_pagep, dbc->priority); 1339 return (ret); 1340} 1341 1342/* 1343 * __ham_replpair -- 1344 * Given the key data indicated by the cursor, replace part/all of it 1345 * according to the fields in the dbt. 1346 * 1347 * PUBLIC: int __ham_replpair __P((DBC *, DBT *, u_int32_t)); 1348 */ 1349int 1350__ham_replpair(dbc, dbt, make_dup) 1351 DBC *dbc; 1352 DBT *dbt; 1353 u_int32_t make_dup; 1354{ 1355 DB *dbp; 1356 DBC **carray, *dbc_n; 1357 DBT old_dbt, tdata, tmp, *new_dbt; 1358 DB_LSN new_lsn; 1359 ENV *env; 1360 HASH_CURSOR *hcp, *cp; 1361 db_indx_t orig_indx; 1362 db_pgno_t orig_pgno; 1363 u_int32_t change; 1364 u_int32_t dup_flag, len, memsize, newlen; 1365 int beyond_eor, is_big, is_plus, ret, type, i, found, t_ret; 1366 u_int8_t *beg, *dest, *end, *hk, *src; 1367 void *memp; 1368 1369 /* 1370 * Items that were already offpage (ISBIG) were handled before 1371 * we get in here. So, we need only handle cases where the old 1372 * key is on a regular page. That leaves us 6 cases: 1373 * 1. Original data onpage; new data is smaller 1374 * 2. Original data onpage; new data is the same size 1375 * 3. Original data onpage; new data is bigger, but not ISBIG, 1376 * fits on page 1377 * 4. Original data onpage; new data is bigger, but not ISBIG, 1378 * does not fit on page 1379 * 5. Original data onpage; New data is an off-page item. 1380 * 6. Original data was offpage; new item is smaller. 1381 * 1382 * Cases 1-3 are essentially the same (and should be the common case). 1383 * We handle 4-6 as delete and add. 1384 */ 1385 dbp = dbc->dbp; 1386 env = dbp->env; 1387 hcp = (HASH_CURSOR *)dbc->internal; 1388 found = 0; 1389 dbc_n = memp = NULL; 1390 carray = NULL; 1391 1392 /* 1393 * We need to compute the number of bytes that we are adding or 1394 * removing from the entry. Normally, we can simply subtract 1395 * the number of bytes we are replacing (dbt->dlen) from the 1396 * number of bytes we are inserting (dbt->size). However, if 1397 * we are doing a partial put off the end of a record, then this 1398 * formula doesn't work, because we are essentially adding 1399 * new bytes. 1400 */ 1401 if (dbt->size > dbt->dlen) { 1402 change = dbt->size - dbt->dlen; 1403 is_plus = 1; 1404 } else { 1405 change = dbt->dlen - dbt->size; 1406 is_plus = 0; 1407 } 1408 1409 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx); 1410 is_big = HPAGE_PTYPE(hk) == H_OFFPAGE; 1411 1412 if (is_big) 1413 memcpy(&len, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); 1414 else 1415 len = LEN_HKEYDATA(dbp, hcp->page, 1416 dbp->pgsize, H_DATAINDEX(hcp->indx)); 1417 1418 beyond_eor = dbt->doff + dbt->dlen > len; 1419 if (beyond_eor) { 1420 /* 1421 * The change is beyond the end of record. If change 1422 * is a positive number, we can simply add the extension 1423 * to it. However, if change is negative, then we need 1424 * to figure out if the extension is larger than the 1425 * negative change. 1426 */ 1427 if (is_plus) 1428 change += dbt->doff + dbt->dlen - len; 1429 else if (dbt->doff + dbt->dlen - len > change) { 1430 /* Extension bigger than change */ 1431 is_plus = 1; 1432 change = (dbt->doff + dbt->dlen - len) - change; 1433 } else /* Extension is smaller than change. */ 1434 change -= (dbt->doff + dbt->dlen - len); 1435 } 1436 1437 newlen = (is_plus ? len + change : len - change); 1438 if (ISBIG(hcp, newlen) || 1439 (is_plus && change > P_FREESPACE(dbp, hcp->page)) || 1440 beyond_eor || is_big) { 1441 /* 1442 * If we are in cases 4 or 5 then is_plus will be true. 1443 * If we don't have a transaction then we cannot roll back, 1444 * make sure there is enough room for the new page. 1445 */ 1446 if (is_plus && dbc->txn == NULL && 1447 dbp->mpf->mfp->maxpgno != 0 && 1448 dbp->mpf->mfp->maxpgno == dbp->mpf->mfp->last_pgno) 1449 return (__db_space_err(dbp)); 1450 /* 1451 * Cases 4-6 -- two subcases. 1452 * A. This is not really a partial operation, but an overwrite. 1453 * Simple del and add works. 1454 * B. This is a partial and we need to construct the data that 1455 * we are really inserting (yuck). 1456 * In both cases, we need to grab the key off the page (in 1457 * some cases we could do this outside of this routine; for 1458 * cleanliness we do it here. If you happen to be on a big 1459 * key, this could be a performance hit). 1460 */ 1461 memset(&tmp, 0, sizeof(tmp)); 1462 if ((ret = __db_ret(dbp, dbc->thread_info, dbc->txn, 1463 hcp->page, H_KEYINDEX(hcp->indx), &tmp, 1464 &dbc->my_rkey.data, &dbc->my_rkey.ulen)) != 0) 1465 return (ret); 1466 1467 /* 1468 * In cases 4-6, a delete and insert works, but we need to 1469 * track and update any cursors pointing to the item being 1470 * moved. 1471 */ 1472 orig_pgno = PGNO(hcp->page); 1473 orig_indx = hcp->indx; 1474 if ((ret = __ham_get_clist(dbp, 1475 orig_pgno, orig_indx, &carray)) != 0) 1476 goto err; 1477 1478 /* Preserve duplicate info. */ 1479 dup_flag = F_ISSET(hcp, H_ISDUP); 1480 if (dbt->doff == 0 && dbt->dlen == len) { 1481 type = (dup_flag ? H_DUPLICATE : H_KEYDATA); 1482 new_dbt = dbt; 1483 } else { /* Case B */ 1484 type = HPAGE_PTYPE(hk) != H_OFFPAGE ? 1485 HPAGE_PTYPE(hk) : H_KEYDATA; 1486 memset(&tdata, 0, sizeof(tdata)); 1487 memsize = 0; 1488 if ((ret = __db_ret(dbp, dbc->thread_info, 1489 dbc->txn, hcp->page, H_DATAINDEX(hcp->indx), 1490 &tdata, &memp, &memsize)) != 0) 1491 goto err; 1492 1493 /* Now shift old data around to make room for new. */ 1494 if (is_plus) { 1495 if ((ret = __os_realloc(env, 1496 tdata.size + change, &tdata.data)) != 0) 1497 return (ret); 1498 memp = tdata.data; 1499 memsize = tdata.size + change; 1500 memset((u_int8_t *)tdata.data + tdata.size, 1501 0, change); 1502 } 1503 end = (u_int8_t *)tdata.data + tdata.size; 1504 1505 src = (u_int8_t *)tdata.data + dbt->doff + dbt->dlen; 1506 if (src < end && tdata.size > dbt->doff + dbt->dlen) { 1507 len = tdata.size - (dbt->doff + dbt->dlen); 1508 if (is_plus) 1509 dest = src + change; 1510 else 1511 dest = src - change; 1512 memmove(dest, src, len); 1513 } 1514 memcpy((u_int8_t *)tdata.data + dbt->doff, 1515 dbt->data, dbt->size); 1516 if (is_plus) 1517 tdata.size += change; 1518 else 1519 tdata.size -= change; 1520 new_dbt = &tdata; 1521 } 1522 if ((ret = __ham_del_pair(dbc, HAM_DEL_NO_CURSOR)) != 0) 1523 goto err; 1524 /* 1525 * Save the state of the cursor after the delete, so that we 1526 * can adjust any cursors impacted by the delete. Don't just 1527 * update the cursors now, to avoid ambiguity in reversing the 1528 * adjustments during abort. 1529 */ 1530 if ((ret = __dbc_dup(dbc, &dbc_n, DB_POSITION)) != 0) 1531 goto err; 1532 if ((ret = __ham_add_el(dbc, &tmp, new_dbt, type)) != 0) 1533 goto err; 1534 F_SET(hcp, dup_flag); 1535 1536 /* 1537 * If the delete/insert pair caused the item to be moved 1538 * to another location (which is possible for duplicate sets 1539 * that are moved onto another page in the bucket), then update 1540 * any impacted cursors. 1541 */ 1542 if (((HASH_CURSOR*)dbc_n->internal)->pgno != hcp->pgno || 1543 ((HASH_CURSOR*)dbc_n->internal)->indx != hcp->indx) { 1544 /* 1545 * Set any cursors pointing to items in the moved 1546 * duplicate set to the destination location and reset 1547 * the deleted flag. This can't be done earlier, since 1548 * the insert location is not computed until the actual 1549 * __ham_add_el call is made. 1550 */ 1551 if (carray != NULL) { 1552 for (i = 0; carray[i] != NULL; i++) { 1553 cp = (HASH_CURSOR*)carray[i]->internal; 1554 cp->pgno = hcp->pgno; 1555 cp->indx = hcp->indx; 1556 F_CLR(cp, H_DELETED); 1557 found = 1; 1558 } 1559 /* 1560 * Only log the update once, since the recovery 1561 * code iterates through all open cursors and 1562 * applies the change to all matching cursors. 1563 */ 1564 if (found && DBC_LOGGING(dbc) && 1565 IS_SUBTRANSACTION(dbc->txn)) { 1566 if ((ret = 1567 __ham_chgpg_log(dbp, 1568 dbc->txn, &new_lsn, 0, 1569 DB_HAM_CHGPG, orig_pgno, hcp->pgno, 1570 orig_indx, hcp->indx)) != 0) 1571 goto err; 1572 } 1573 } 1574 /* 1575 * Update any cursors impacted by the delete. Do this 1576 * after chgpg log so that recovery does not re-bump 1577 * cursors pointing to the deleted item. 1578 */ 1579 ret = __hamc_update(dbc_n, 0, DB_HAM_CURADJ_DEL, 0); 1580 } 1581 1582err: if (dbc_n != NULL && (t_ret = __dbc_close(dbc_n)) != 0 && 1583 ret == 0) 1584 ret = t_ret; 1585 if (carray != NULL) 1586 __os_free(env, carray); 1587 if (memp != NULL) 1588 __os_free(env, memp); 1589 return (ret); 1590 } 1591 1592 /* 1593 * Set up pointer into existing data. Do it before the log 1594 * message so we can use it inside of the log setup. 1595 */ 1596 beg = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)); 1597 beg += dbt->doff; 1598 1599 /* 1600 * If we are going to have to move bytes at all, figure out 1601 * all the parameters here. Then log the call before moving 1602 * anything around. 1603 */ 1604 if (DBC_LOGGING(dbc)) { 1605 old_dbt.data = beg; 1606 old_dbt.size = dbt->dlen; 1607 if ((ret = __ham_replace_log(dbp, 1608 dbc->txn, &new_lsn, 0, PGNO(hcp->page), 1609 (u_int32_t)H_DATAINDEX(hcp->indx), &LSN(hcp->page), 1610 (int32_t)dbt->doff, &old_dbt, dbt, make_dup)) != 0) 1611 return (ret); 1612 } else 1613 LSN_NOT_LOGGED(new_lsn); 1614 1615 LSN(hcp->page) = new_lsn; /* Structure assignment. */ 1616 1617 __ham_onpage_replace(dbp, hcp->page, (u_int32_t)H_DATAINDEX(hcp->indx), 1618 (int32_t)dbt->doff, change, is_plus, dbt); 1619 1620 return (0); 1621} 1622 1623/* 1624 * Replace data on a page with new data, possibly growing or shrinking what's 1625 * there. This is called on two different occasions. On one (from replpair) 1626 * we are interested in changing only the data. On the other (from recovery) 1627 * we are replacing the entire data (header and all) with a new element. In 1628 * the latter case, the off argument is negative. 1629 * pagep: the page that we're changing 1630 * ndx: page index of the element that is growing/shrinking. 1631 * off: Offset at which we are beginning the replacement. 1632 * change: the number of bytes (+ or -) that the element is growing/shrinking. 1633 * dbt: the new data that gets written at beg. 1634 * 1635 * PUBLIC: void __ham_onpage_replace __P((DB *, PAGE *, u_int32_t, 1636 * PUBLIC: int32_t, u_int32_t, int, DBT *)); 1637 */ 1638void 1639__ham_onpage_replace(dbp, pagep, ndx, off, change, is_plus, dbt) 1640 DB *dbp; 1641 PAGE *pagep; 1642 u_int32_t ndx; 1643 int32_t off; 1644 u_int32_t change; 1645 int is_plus; 1646 DBT *dbt; 1647{ 1648 db_indx_t i, *inp; 1649 int32_t len; 1650 size_t pgsize; 1651 u_int8_t *src, *dest; 1652 int zero_me; 1653 1654 pgsize = dbp->pgsize; 1655 inp = P_INP(dbp, pagep); 1656 if (change != 0) { 1657 zero_me = 0; 1658 src = (u_int8_t *)(pagep) + HOFFSET(pagep); 1659 if (off < 0) 1660 len = inp[ndx] - HOFFSET(pagep); 1661 else if ((u_int32_t)off >= 1662 LEN_HKEYDATA(dbp, pagep, pgsize, ndx)) { 1663 len = (int32_t)(HKEYDATA_DATA(P_ENTRY(dbp, pagep, ndx)) 1664 + LEN_HKEYDATA(dbp, pagep, pgsize, ndx) - src); 1665 zero_me = 1; 1666 } else 1667 len = (int32_t)( 1668 (HKEYDATA_DATA(P_ENTRY(dbp, pagep, ndx)) + off) - 1669 src); 1670 if (is_plus) 1671 dest = src - change; 1672 else 1673 dest = src + change; 1674 memmove(dest, src, (size_t)len); 1675 if (zero_me) 1676 memset(dest + len, 0, change); 1677 1678 /* Now update the indices. */ 1679 for (i = ndx; i < NUM_ENT(pagep); i++) { 1680 if (is_plus) 1681 inp[i] -= change; 1682 else 1683 inp[i] += change; 1684 } 1685 if (is_plus) 1686 HOFFSET(pagep) -= change; 1687 else 1688 HOFFSET(pagep) += change; 1689 } 1690 if (off >= 0) 1691 memcpy(HKEYDATA_DATA(P_ENTRY(dbp, pagep, ndx)) + off, 1692 dbt->data, dbt->size); 1693 else 1694 memcpy(P_ENTRY(dbp, pagep, ndx), dbt->data, dbt->size); 1695} 1696 1697/* 1698 * PUBLIC: int __ham_split_page __P((DBC *, u_int32_t, u_int32_t)); 1699 */ 1700int 1701__ham_split_page(dbc, obucket, nbucket) 1702 DBC *dbc; 1703 u_int32_t obucket, nbucket; 1704{ 1705 DB *dbp; 1706 DBC **carray, *tmp_dbc; 1707 DBT key, page_dbt; 1708 DB_LOCK block; 1709 DB_LSN new_lsn; 1710 DB_MPOOLFILE *mpf; 1711 ENV *env; 1712 HASH_CURSOR *hcp, *cp; 1713 PAGE **pp, *old_pagep, *temp_pagep, *new_pagep; 1714 db_indx_t n, dest_indx; 1715 db_pgno_t bucket_pgno, npgno, next_pgno; 1716 u_int32_t big_len, len; 1717 int found, i, ret, t_ret; 1718 void *big_buf; 1719 1720 dbp = dbc->dbp; 1721 carray = NULL; 1722 env = dbp->env; 1723 mpf = dbp->mpf; 1724 hcp = (HASH_CURSOR *)dbc->internal; 1725 temp_pagep = old_pagep = new_pagep = NULL; 1726 npgno = PGNO_INVALID; 1727 LOCK_INIT(block); 1728 1729 bucket_pgno = BUCKET_TO_PAGE(hcp, obucket); 1730 if ((ret = __db_lget(dbc, 1731 0, bucket_pgno, DB_LOCK_WRITE, 0, &block)) != 0) 1732 goto err; 1733 if ((ret = __memp_fget(mpf, &bucket_pgno, dbc->thread_info, dbc->txn, 1734 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &old_pagep)) != 0) 1735 goto err; 1736 1737 /* Sort any unsorted pages before doing a hash split. */ 1738 if (old_pagep->type == P_HASH_UNSORTED) 1739 if ((ret = __ham_sort_page_cursor(dbc, old_pagep)) != 0) 1740 return (ret); 1741 1742 /* Properly initialize the new bucket page. */ 1743 npgno = BUCKET_TO_PAGE(hcp, nbucket); 1744 if ((ret = __memp_fget(mpf, &npgno, dbc->thread_info, dbc->txn, 1745 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &new_pagep)) != 0) 1746 goto err; 1747 P_INIT(new_pagep, 1748 dbp->pgsize, npgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH); 1749 1750 temp_pagep = hcp->split_buf; 1751 memcpy(temp_pagep, old_pagep, dbp->pgsize); 1752 1753 if (DBC_LOGGING(dbc)) { 1754 page_dbt.size = dbp->pgsize; 1755 page_dbt.data = old_pagep; 1756 if ((ret = __ham_splitdata_log(dbp, 1757 dbc->txn, &new_lsn, 0, SPLITOLD, 1758 PGNO(old_pagep), &page_dbt, &LSN(old_pagep))) != 0) 1759 goto err; 1760 } else 1761 LSN_NOT_LOGGED(new_lsn); 1762 1763 LSN(old_pagep) = new_lsn; /* Structure assignment. */ 1764 1765 P_INIT(old_pagep, dbp->pgsize, PGNO(old_pagep), PGNO_INVALID, 1766 PGNO_INVALID, 0, P_HASH); 1767 1768 big_len = 0; 1769 big_buf = NULL; 1770 memset(&key, 0, sizeof(key)); 1771 while (temp_pagep != NULL) { 1772 if ((ret = __ham_get_clist(dbp, 1773 PGNO(temp_pagep), NDX_INVALID, &carray)) != 0) 1774 goto err; 1775 1776 for (n = 0; n < (db_indx_t)NUM_ENT(temp_pagep); n += 2) { 1777 if ((ret = __db_ret(dbp, dbc->thread_info, 1778 dbc->txn, temp_pagep, H_KEYINDEX(n), 1779 &key, &big_buf, &big_len)) != 0) 1780 goto err; 1781 1782 if (__ham_call_hash(dbc, key.data, key.size) == obucket) 1783 pp = &old_pagep; 1784 else 1785 pp = &new_pagep; 1786 1787 /* 1788 * Figure out how many bytes we need on the new 1789 * page to store the key/data pair. 1790 */ 1791 len = LEN_HITEM(dbp, temp_pagep, dbp->pgsize, 1792 H_DATAINDEX(n)) + 1793 LEN_HITEM(dbp, temp_pagep, dbp->pgsize, 1794 H_KEYINDEX(n)) + 1795 2 * sizeof(db_indx_t); 1796 1797 if (P_FREESPACE(dbp, *pp) < len) { 1798 if (DBC_LOGGING(dbc)) { 1799 page_dbt.size = dbp->pgsize; 1800 page_dbt.data = *pp; 1801 if ((ret = __ham_splitdata_log(dbp, 1802 dbc->txn, &new_lsn, 0, 1803 SPLITNEW, PGNO(*pp), &page_dbt, 1804 &LSN(*pp))) != 0) 1805 goto err; 1806 } else 1807 LSN_NOT_LOGGED(new_lsn); 1808 LSN(*pp) = new_lsn; 1809 if ((ret = 1810 __ham_add_ovflpage(dbc, *pp, 1, pp)) != 0) 1811 goto err; 1812 } 1813 1814 dest_indx = NDX_INVALID; 1815 if ((ret = __ham_copypair(dbc, temp_pagep, 1816 H_KEYINDEX(n), *pp, &dest_indx)) != 0) 1817 goto err; 1818 1819 /* 1820 * Update any cursors that were pointing to items 1821 * shuffled because of this insert. 1822 * Use __hamc_update, since the cursor adjustments are 1823 * the same as those required for an insert. The 1824 * overhead of creating a cursor is worthwhile to save 1825 * replicating the adjustment functionality. 1826 * Adjusting shuffled cursors needs to be done prior to 1827 * adjusting any cursors that were pointing to the 1828 * moved item. 1829 * All pages in a bucket are sorted, but the items are 1830 * not sorted across pages within a bucket. This means 1831 * that splitting the first page in a bucket into two 1832 * new buckets won't require any cursor shuffling, 1833 * since all inserts will be appends. Splitting of the 1834 * second etc page from the initial bucket could 1835 * cause an item to be inserted at any location on a 1836 * page (since items already inserted from page 1 of 1837 * the initial bucket may overlap), so only adjust 1838 * cursors for the second etc pages within a bucket. 1839 */ 1840 if (PGNO(temp_pagep) != bucket_pgno) { 1841 if ((ret = __db_cursor_int(dbp, 1842 dbc->thread_info, dbc->txn, dbp->type, 1843 PGNO_INVALID, 0, DB_LOCK_INVALIDID, 1844 &tmp_dbc)) != 0) 1845 goto err; 1846 hcp = (HASH_CURSOR*)tmp_dbc->internal; 1847 hcp->pgno = PGNO(*pp); 1848 hcp->indx = dest_indx; 1849 hcp->dup_off = 0; 1850 hcp->order = 0; 1851 if ((ret = __hamc_update( 1852 tmp_dbc, len, DB_HAM_CURADJ_ADD, 0)) != 0) 1853 goto err; 1854 if ((ret = __dbc_close(tmp_dbc)) != 0) 1855 goto err; 1856 } 1857 /* Update any cursors pointing at the moved item. */ 1858 if (carray != NULL) { 1859 found = 0; 1860 for (i = 0; carray[i] != NULL; i++) { 1861 cp = 1862 (HASH_CURSOR *)carray[i]->internal; 1863 if (cp->pgno == PGNO(temp_pagep) && 1864 cp->indx == n) { 1865 cp->pgno = PGNO(*pp); 1866 cp->indx = dest_indx; 1867 found = 1; 1868 } 1869 } 1870 /* 1871 * Only log the update once, since the recovery 1872 * code iterates through all open cursors and 1873 * applies the change to all matching cursors. 1874 */ 1875 if (found && DBC_LOGGING(dbc) && 1876 IS_SUBTRANSACTION(dbc->txn)) { 1877 if ((ret = 1878 __ham_chgpg_log(dbp, 1879 dbc->txn, &new_lsn, 0, 1880 DB_HAM_SPLIT, PGNO(temp_pagep), 1881 PGNO(*pp), n, dest_indx)) != 0) 1882 goto err; 1883 } 1884 } 1885 } 1886 next_pgno = NEXT_PGNO(temp_pagep); 1887 1888 /* Clear temp_page; if it's a link overflow page, free it. */ 1889 if (PGNO(temp_pagep) != bucket_pgno && (ret = 1890 __db_free(dbc, temp_pagep)) != 0) { 1891 temp_pagep = NULL; 1892 goto err; 1893 } 1894 1895 if (next_pgno == PGNO_INVALID) 1896 temp_pagep = NULL; 1897 else if ((ret = __memp_fget(mpf, 1898 &next_pgno, dbc->thread_info, dbc->txn, 1899 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &temp_pagep)) != 0) 1900 goto err; 1901 1902 if (temp_pagep != NULL) { 1903 if (DBC_LOGGING(dbc)) { 1904 page_dbt.size = dbp->pgsize; 1905 page_dbt.data = temp_pagep; 1906 if ((ret = __ham_splitdata_log(dbp, 1907 dbc->txn, &new_lsn, 0, 1908 SPLITOLD, PGNO(temp_pagep), 1909 &page_dbt, &LSN(temp_pagep))) != 0) 1910 goto err; 1911 } else 1912 LSN_NOT_LOGGED(new_lsn); 1913 LSN(temp_pagep) = new_lsn; 1914 } 1915 1916 if (carray != NULL) /* We never knew its size. */ 1917 __os_free(env, carray); 1918 carray = NULL; 1919 } 1920 if (big_buf != NULL) 1921 __os_free(env, big_buf); 1922 1923 /* 1924 * If the original bucket spanned multiple pages, then we've got 1925 * a pointer to a page that used to be on the bucket chain. It 1926 * should be deleted. 1927 */ 1928 if (temp_pagep != NULL && PGNO(temp_pagep) != bucket_pgno && 1929 (ret = __db_free(dbc, temp_pagep)) != 0) { 1930 temp_pagep = NULL; 1931 goto err; 1932 } 1933 1934 /* 1935 * Write new buckets out. 1936 */ 1937 if (DBC_LOGGING(dbc)) { 1938 page_dbt.size = dbp->pgsize; 1939 page_dbt.data = old_pagep; 1940 if ((ret = __ham_splitdata_log(dbp, dbc->txn, 1941 &new_lsn, 0, SPLITNEW, PGNO(old_pagep), &page_dbt, 1942 &LSN(old_pagep))) != 0) 1943 goto err; 1944 LSN(old_pagep) = new_lsn; 1945 1946 page_dbt.data = new_pagep; 1947 if ((ret = __ham_splitdata_log(dbp, dbc->txn, &new_lsn, 0, 1948 SPLITNEW, PGNO(new_pagep), &page_dbt, 1949 &LSN(new_pagep))) != 0) 1950 goto err; 1951 LSN(new_pagep) = new_lsn; 1952 } else { 1953 LSN_NOT_LOGGED(LSN(old_pagep)); 1954 LSN_NOT_LOGGED(LSN(new_pagep)); 1955 } 1956 1957 ret = __memp_fput(mpf, dbc->thread_info, old_pagep, dbc->priority); 1958 if ((t_ret = __memp_fput(mpf, 1959 dbc->thread_info, new_pagep, dbc->priority)) != 0 && ret == 0) 1960 ret = t_ret; 1961 1962 if (0) { 1963err: if (old_pagep != NULL) 1964 (void)__memp_fput(mpf, 1965 dbc->thread_info, old_pagep, dbc->priority); 1966 if (new_pagep != NULL) { 1967 P_INIT(new_pagep, dbp->pgsize, 1968 npgno, PGNO_INVALID, PGNO_INVALID, 0, P_HASH); 1969 (void)__memp_fput(mpf, 1970 dbc->thread_info, new_pagep, dbc->priority); 1971 } 1972 if (temp_pagep != NULL && PGNO(temp_pagep) != bucket_pgno) 1973 (void)__memp_fput(mpf, 1974 dbc->thread_info, temp_pagep, dbc->priority); 1975 } 1976 if ((t_ret = __TLPUT(dbc, block)) != 0 && ret == 0) 1977 ret = t_ret; 1978 if (carray != NULL) /* We never knew its size. */ 1979 __os_free(env, carray); 1980 return (ret); 1981} 1982 1983/* 1984 * Add the given pair to the page. The page in question may already be 1985 * held (i.e. it was already gotten). If it is, then the page is passed 1986 * in via the pagep parameter. On return, pagep will contain the page 1987 * to which we just added something. This allows us to link overflow 1988 * pages and return the new page having correctly put the last page. 1989 * 1990 * PUBLIC: int __ham_add_el __P((DBC *, const DBT *, const DBT *, int)); 1991 */ 1992int 1993__ham_add_el(dbc, key, val, type) 1994 DBC *dbc; 1995 const DBT *key, *val; 1996 int type; 1997{ 1998 const DBT *pkey, *pdata; 1999 DB *dbp; 2000 DBT key_dbt, data_dbt; 2001 DB_LSN new_lsn; 2002 DB_MPOOLFILE *mpf; 2003 HASH_CURSOR *hcp; 2004 HOFFPAGE doff, koff; 2005 db_pgno_t next_pgno, pgno; 2006 u_int32_t data_size, key_size; 2007 u_int32_t pages, pagespace, pairsize, rectype; 2008 int do_expand, data_type, is_keybig, is_databig, key_type, match, ret; 2009 2010 dbp = dbc->dbp; 2011 mpf = dbp->mpf; 2012 hcp = (HASH_CURSOR *)dbc->internal; 2013 do_expand = 0; 2014 2015 pgno = hcp->seek_found_page != PGNO_INVALID ? 2016 hcp->seek_found_page : hcp->pgno; 2017 if (hcp->page == NULL && (ret = __memp_fget(mpf, &pgno, 2018 dbc->thread_info, dbc->txn, DB_MPOOL_CREATE, &hcp->page)) != 0) 2019 return (ret); 2020 2021 key_size = HKEYDATA_PSIZE(key->size); 2022 data_size = HKEYDATA_PSIZE(val->size); 2023 is_keybig = ISBIG(hcp, key->size); 2024 is_databig = ISBIG(hcp, val->size); 2025 if (is_keybig) 2026 key_size = HOFFPAGE_PSIZE; 2027 if (is_databig) 2028 data_size = HOFFPAGE_PSIZE; 2029 2030 pairsize = key_size + data_size; 2031 2032 /* Advance to first page in chain with room for item. */ 2033 while (H_NUMPAIRS(hcp->page) && NEXT_PGNO(hcp->page) != PGNO_INVALID) { 2034 /* 2035 * This may not be the end of the chain, but the pair may fit 2036 * anyway. Check if it's a bigpair that fits or a regular 2037 * pair that fits. 2038 */ 2039 if (P_FREESPACE(dbp, hcp->page) >= pairsize) 2040 break; 2041 next_pgno = NEXT_PGNO(hcp->page); 2042 if ((ret = __ham_next_cpage(dbc, next_pgno)) != 0) 2043 return (ret); 2044 } 2045 2046 /* 2047 * Check if we need to allocate a new page. 2048 */ 2049 if (P_FREESPACE(dbp, hcp->page) < pairsize) { 2050 do_expand = 1; 2051 if ((ret = __memp_dirty(mpf, &hcp->page, 2052 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 2053 return (ret); 2054 if ((ret = __ham_add_ovflpage(dbc, 2055 (PAGE *)hcp->page, 1, (PAGE **)&hcp->page)) != 0) 2056 return (ret); 2057 hcp->pgno = PGNO(hcp->page); 2058 } 2059 2060 /* 2061 * If we don't have a transaction then make sure we will not 2062 * run out of file space before updating the key or data. 2063 */ 2064 if (dbc->txn == NULL && 2065 dbp->mpf->mfp->maxpgno != 0 && (is_keybig || is_databig)) { 2066 pagespace = P_MAXSPACE(dbp, dbp->pgsize); 2067 pages = 0; 2068 if (is_databig) 2069 pages = ((data_size - 1) / pagespace) + 1; 2070 if (is_keybig) { 2071 pages += ((key->size - 1) / pagespace) + 1; 2072 if (pages > 2073 (dbp->mpf->mfp->maxpgno - dbp->mpf->mfp->last_pgno)) 2074 return (__db_space_err(dbp)); 2075 } 2076 } 2077 2078 if ((ret = __memp_dirty(mpf, 2079 &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 2080 return (ret); 2081 2082 /* 2083 * Update cursor. 2084 */ 2085 hcp->indx = hcp->seek_found_indx; 2086 F_CLR(hcp, H_DELETED); 2087 if (is_keybig) { 2088 koff.type = H_OFFPAGE; 2089 UMRW_SET(koff.unused[0]); 2090 UMRW_SET(koff.unused[1]); 2091 UMRW_SET(koff.unused[2]); 2092 if ((ret = __db_poff(dbc, key, &koff.pgno)) != 0) 2093 return (ret); 2094 koff.tlen = key->size; 2095 key_dbt.data = &koff; 2096 key_dbt.size = sizeof(koff); 2097 pkey = &key_dbt; 2098 key_type = H_OFFPAGE; 2099 } else { 2100 pkey = key; 2101 key_type = H_KEYDATA; 2102 } 2103 2104 if (is_databig) { 2105 doff.type = H_OFFPAGE; 2106 UMRW_SET(doff.unused[0]); 2107 UMRW_SET(doff.unused[1]); 2108 UMRW_SET(doff.unused[2]); 2109 if ((ret = __db_poff(dbc, val, &doff.pgno)) != 0) 2110 return (ret); 2111 doff.tlen = val->size; 2112 data_dbt.data = &doff; 2113 data_dbt.size = sizeof(doff); 2114 pdata = &data_dbt; 2115 data_type = H_OFFPAGE; 2116 } else { 2117 pdata = val; 2118 data_type = type; 2119 } 2120 2121 /* Sort any unsorted pages before doing the insert. */ 2122 if (((PAGE *)hcp->page)->type == P_HASH_UNSORTED) 2123 if ((ret = __ham_sort_page_cursor(dbc, hcp->page)) != 0) 2124 return (ret); 2125 2126 /* 2127 * If inserting on the page found initially, then use the saved index. 2128 * If inserting on a different page resolve the index now so it can be 2129 * logged. 2130 * The page might be different, if P_FREESPACE constraint failed (due 2131 * to a partial put that increases the data size). 2132 */ 2133 if (PGNO(hcp->page) != hcp->seek_found_page) { 2134 if ((ret = __ham_getindex(dbc, hcp->page, pkey, 2135 key_type, &match, &hcp->seek_found_indx)) != 0) 2136 return (ret); 2137 hcp->seek_found_page = PGNO(hcp->page); 2138 2139 DB_ASSERT(dbp->env, hcp->seek_found_indx <= NUM_ENT(hcp->page)); 2140 } 2141 2142 if (DBC_LOGGING(dbc)) { 2143 rectype = PUTPAIR; 2144 if (is_databig) 2145 rectype |= PAIR_DATAMASK; 2146 if (is_keybig) 2147 rectype |= PAIR_KEYMASK; 2148 if (type == H_DUPLICATE) 2149 rectype |= PAIR_DUPMASK; 2150 2151 if ((ret = __ham_insdel_log(dbp, dbc->txn, &new_lsn, 0, 2152 rectype, PGNO(hcp->page), (u_int32_t)hcp->seek_found_indx, 2153 &LSN(hcp->page), pkey, pdata)) != 0) 2154 return (ret); 2155 } else 2156 LSN_NOT_LOGGED(new_lsn); 2157 2158 /* Move lsn onto page. */ 2159 LSN(hcp->page) = new_lsn; /* Structure assignment. */ 2160 2161 if ((ret = __ham_insertpair(dbc, hcp->page, 2162 &hcp->seek_found_indx, pkey, pdata, key_type, data_type)) != 0) 2163 return (ret); 2164 2165 /* 2166 * Adjust any cursors that were pointing at items whose indices were 2167 * shuffled due to the insert. 2168 */ 2169 if ((ret = __hamc_update(dbc, pairsize, DB_HAM_CURADJ_ADD, 0)) != 0) 2170 return (ret); 2171 2172 /* 2173 * For splits, we are going to update item_info's page number 2174 * field, so that we can easily return to the same page the 2175 * next time we come in here. For other operations, this doesn't 2176 * matter, since this is the last thing that happens before we return 2177 * to the user program. 2178 */ 2179 hcp->pgno = PGNO(hcp->page); 2180 /* 2181 * When moving an item from one page in a bucket to another, due to an 2182 * expanding on page duplicate set, or a partial put that increases the 2183 * size of an item. The destination index needs to be saved so that the 2184 * __ham_replpair code can update any cursors impacted by the move. For 2185 * other operations, this does not matter, since this is the last thing 2186 * that happens before we return to the user program. 2187 */ 2188 hcp->indx = hcp->seek_found_indx; 2189 2190 /* 2191 * XXX 2192 * Maybe keep incremental numbers here. 2193 */ 2194 if (!STD_LOCKING(dbc)) { 2195 if ((ret = __ham_dirty_meta(dbc, 0)) != 0) 2196 return (ret); 2197 hcp->hdr->nelem++; 2198 } 2199 2200 if (do_expand || (hcp->hdr->ffactor != 0 && 2201 (u_int32_t)H_NUMPAIRS(hcp->page) > hcp->hdr->ffactor)) 2202 F_SET(hcp, H_EXPAND); 2203 return (0); 2204} 2205 2206/* 2207 * Special insert pair call -- copies a key/data pair from one page to 2208 * another. Works for all types of hash entries (H_OFFPAGE, H_KEYDATA, 2209 * H_DUPLICATE, H_OFFDUP). Since we log splits at a high level, we 2210 * do not need to do any logging here. 2211 * 2212 * dest_indx is an optional parameter, it serves several purposes: 2213 * * ignored if NULL 2214 * * Used as an insert index if non-null and not NDX_INVALID 2215 * * Populated with the insert index if non-null and NDX_INVALID 2216 * 2217 * PUBLIC: int __ham_copypair __P((DBC *, PAGE *, u_int32_t, 2218 * PUBLIC: PAGE *, db_indx_t *)); 2219 */ 2220int 2221__ham_copypair(dbc, src_page, src_ndx, dest_page, dest_indx) 2222 DBC *dbc; 2223 PAGE *src_page; 2224 u_int32_t src_ndx; 2225 PAGE *dest_page; 2226 db_indx_t *dest_indx; 2227{ 2228 DB *dbp; 2229 DBT tkey, tdata; 2230 db_indx_t kindx, dindx; 2231 int ktype, dtype, ret; 2232 2233 dbp = dbc->dbp; 2234 ret = 0; 2235 memset(&tkey, 0, sizeof(tkey)); 2236 memset(&tdata, 0, sizeof(tdata)); 2237 2238 ktype = HPAGE_TYPE(dbp, src_page, H_KEYINDEX(src_ndx)); 2239 dtype = HPAGE_TYPE(dbp, src_page, H_DATAINDEX(src_ndx)); 2240 kindx = H_KEYINDEX(src_ndx); 2241 dindx = H_DATAINDEX(src_ndx); 2242 if (ktype == H_OFFPAGE) { 2243 tkey.data = P_ENTRY(dbp, src_page, kindx); 2244 tkey.size = LEN_HITEM(dbp, src_page, dbp->pgsize, kindx); 2245 } else { 2246 tkey.data = HKEYDATA_DATA(P_ENTRY(dbp, src_page, kindx)); 2247 tkey.size = LEN_HKEYDATA(dbp, src_page, dbp->pgsize, kindx); 2248 } 2249 if (dtype == H_OFFPAGE) { 2250 tdata.data = P_ENTRY(dbp, src_page, dindx); 2251 tdata.size = LEN_HITEM(dbp, src_page, dbp->pgsize, dindx); 2252 } else { 2253 tdata.data = HKEYDATA_DATA(P_ENTRY(dbp, src_page, dindx)); 2254 tdata.size = LEN_HKEYDATA(dbp, src_page, dbp->pgsize, dindx); 2255 } 2256 if ((ret = __ham_insertpair(dbc, dest_page, dest_indx, 2257 &tkey, &tdata, ktype, dtype)) != 0) 2258 return (ret); 2259 2260 return (ret); 2261} 2262 2263/* 2264 * __ham_add_ovflpage -- 2265 * 2266 * Returns: 2267 * 0 on success: pp points to new page; !0 on error, pp not valid. 2268 * 2269 * PUBLIC: int __ham_add_ovflpage __P((DBC *, PAGE *, int, PAGE **)); 2270 */ 2271int 2272__ham_add_ovflpage(dbc, pagep, release, pp) 2273 DBC *dbc; 2274 PAGE *pagep; 2275 int release; 2276 PAGE **pp; 2277{ 2278 DB *dbp; 2279 DB_LSN new_lsn; 2280 DB_MPOOLFILE *mpf; 2281 PAGE *new_pagep; 2282 int ret; 2283 2284 dbp = dbc->dbp; 2285 mpf = dbp->mpf; 2286 2287 DB_ASSERT(dbp->env, IS_DIRTY(pagep)); 2288 2289 if ((ret = __db_new(dbc, P_HASH, &new_pagep)) != 0) 2290 return (ret); 2291 2292 if (DBC_LOGGING(dbc)) { 2293 if ((ret = __ham_newpage_log(dbp, dbc->txn, &new_lsn, 0, 2294 PUTOVFL, PGNO(pagep), &LSN(pagep), PGNO(new_pagep), 2295 &LSN(new_pagep), PGNO_INVALID, NULL)) != 0) { 2296 (void)__memp_fput(mpf, 2297 dbc->thread_info, pagep, dbc->priority); 2298 return (ret); 2299 } 2300 } else 2301 LSN_NOT_LOGGED(new_lsn); 2302 2303 /* Move lsn onto page. */ 2304 LSN(pagep) = LSN(new_pagep) = new_lsn; 2305 NEXT_PGNO(pagep) = PGNO(new_pagep); 2306 2307 PREV_PGNO(new_pagep) = PGNO(pagep); 2308 2309 if (release) 2310 ret = __memp_fput(mpf, dbc->thread_info, pagep, dbc->priority); 2311 2312 *pp = new_pagep; 2313 return (ret); 2314} 2315 2316/* 2317 * PUBLIC: int __ham_get_cpage __P((DBC *, db_lockmode_t)); 2318 */ 2319int 2320__ham_get_cpage(dbc, mode) 2321 DBC *dbc; 2322 db_lockmode_t mode; 2323{ 2324 DB *dbp; 2325 DB_LOCK tmp_lock; 2326 DB_MPOOLFILE *mpf; 2327 HASH_CURSOR *hcp; 2328 int ret; 2329 2330 dbp = dbc->dbp; 2331 mpf = dbp->mpf; 2332 hcp = (HASH_CURSOR *)dbc->internal; 2333 ret = 0; 2334 2335 /* 2336 * There are four cases with respect to buckets and locks. 2337 * 1. If there is no lock held, then if we are locking, we should 2338 * get the lock. 2339 * 2. If there is a lock held, it's for the current bucket, and it's 2340 * for the right mode, we don't need to do anything. 2341 * 3. If there is a lock held for the current bucket but it's not 2342 * strong enough, we need to upgrade. 2343 * 4. If there is a lock, but it's for a different bucket, then we need 2344 * to release the existing lock and get a new lock. 2345 */ 2346 LOCK_INIT(tmp_lock); 2347 if (STD_LOCKING(dbc)) { 2348 if (hcp->lbucket != hcp->bucket) { /* Case 4 */ 2349 if ((ret = __TLPUT(dbc, hcp->lock)) != 0) 2350 return (ret); 2351 LOCK_INIT(hcp->lock); 2352 } 2353 2354 /* 2355 * See if we have the right lock. If we are doing 2356 * dirty reads we assume the write lock has been downgraded. 2357 */ 2358 if ((LOCK_ISSET(hcp->lock) && 2359 ((hcp->lock_mode == DB_LOCK_READ || 2360 F_ISSET(dbp, DB_AM_READ_UNCOMMITTED)) && 2361 mode == DB_LOCK_WRITE))) { 2362 /* Case 3. */ 2363 tmp_lock = hcp->lock; 2364 LOCK_INIT(hcp->lock); 2365 } 2366 2367 /* Acquire the lock. */ 2368 if (!LOCK_ISSET(hcp->lock)) 2369 /* Cases 1, 3, and 4. */ 2370 if ((ret = __ham_lock_bucket(dbc, mode)) != 0) 2371 return (ret); 2372 2373 if (ret == 0) { 2374 hcp->lock_mode = mode; 2375 hcp->lbucket = hcp->bucket; 2376 /* Case 3: release the original lock. */ 2377 if ((ret = __ENV_LPUT(dbp->env, tmp_lock)) != 0) 2378 return (ret); 2379 } else if (LOCK_ISSET(tmp_lock)) 2380 hcp->lock = tmp_lock; 2381 } 2382 2383 if (ret == 0 && hcp->page == NULL) { 2384 if (hcp->pgno == PGNO_INVALID) 2385 hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket); 2386 if ((ret = __memp_fget(mpf, 2387 &hcp->pgno, dbc->thread_info, dbc->txn, 2388 DB_MPOOL_CREATE, &hcp->page)) != 0) 2389 return (ret); 2390 } 2391 return (0); 2392} 2393 2394/* 2395 * Get a new page at the cursor, putting the last page if necessary. 2396 * If the flag is set to H_ISDUP, then we are talking about the 2397 * duplicate page, not the main page. 2398 * 2399 * PUBLIC: int __ham_next_cpage __P((DBC *, db_pgno_t)); 2400 */ 2401int 2402__ham_next_cpage(dbc, pgno) 2403 DBC *dbc; 2404 db_pgno_t pgno; 2405{ 2406 DB *dbp; 2407 DB_MPOOLFILE *mpf; 2408 HASH_CURSOR *hcp; 2409 PAGE *p; 2410 int ret; 2411 2412 dbp = dbc->dbp; 2413 mpf = dbp->mpf; 2414 hcp = (HASH_CURSOR *)dbc->internal; 2415 2416 if (hcp->page != NULL && (ret = __memp_fput(mpf, 2417 dbc->thread_info, hcp->page, dbc->priority)) != 0) 2418 return (ret); 2419 hcp->page = NULL; 2420 2421 if ((ret = __memp_fget(mpf, &pgno, dbc->thread_info, dbc->txn, 2422 DB_MPOOL_CREATE, &p)) != 0) 2423 return (ret); 2424 2425 hcp->page = p; 2426 hcp->pgno = pgno; 2427 hcp->indx = 0; 2428 2429 return (0); 2430} 2431 2432/* 2433 * __ham_lock_bucket -- 2434 * Get the lock on a particular bucket. 2435 * 2436 * PUBLIC: int __ham_lock_bucket __P((DBC *, db_lockmode_t)); 2437 */ 2438int 2439__ham_lock_bucket(dbc, mode) 2440 DBC *dbc; 2441 db_lockmode_t mode; 2442{ 2443 HASH_CURSOR *hcp; 2444 db_pgno_t pgno; 2445 int gotmeta, ret; 2446 2447 hcp = (HASH_CURSOR *)dbc->internal; 2448 gotmeta = hcp->hdr == NULL ? 1 : 0; 2449 if (gotmeta) 2450 if ((ret = __ham_get_meta(dbc)) != 0) 2451 return (ret); 2452 pgno = BUCKET_TO_PAGE(hcp, hcp->bucket); 2453 if (gotmeta) 2454 if ((ret = __ham_release_meta(dbc)) != 0) 2455 return (ret); 2456 2457 ret = __db_lget(dbc, 0, pgno, mode, 0, &hcp->lock); 2458 2459 hcp->lock_mode = mode; 2460 return (ret); 2461} 2462 2463/* 2464 * __ham_dpair -- 2465 * Delete a pair on a page, paying no attention to what the pair 2466 * represents. The caller is responsible for freeing up duplicates 2467 * or offpage entries that might be referenced by this pair. 2468 * 2469 * Recovery assumes that this may be called without the metadata 2470 * page pinned. 2471 * 2472 * PUBLIC: void __ham_dpair __P((DB *, PAGE *, u_int32_t)); 2473 */ 2474void 2475__ham_dpair(dbp, p, indx) 2476 DB *dbp; 2477 PAGE *p; 2478 u_int32_t indx; 2479{ 2480 db_indx_t delta, n, *inp; 2481 u_int8_t *dest, *src; 2482 2483 inp = P_INP(dbp, p); 2484 /* 2485 * Compute "delta", the amount we have to shift all of the 2486 * offsets. To find the delta, we just need to calculate 2487 * the size of the pair of elements we are removing. 2488 */ 2489 delta = H_PAIRSIZE(dbp, p, dbp->pgsize, indx); 2490 2491 /* 2492 * The hard case: we want to remove something other than 2493 * the last item on the page. We need to shift data and 2494 * offsets down. 2495 */ 2496 if ((db_indx_t)indx != NUM_ENT(p) - 2) { 2497 /* 2498 * Move the data: src is the first occupied byte on 2499 * the page. (Length is delta.) 2500 */ 2501 src = (u_int8_t *)p + HOFFSET(p); 2502 2503 /* 2504 * Destination is delta bytes beyond src. This might 2505 * be an overlapping copy, so we have to use memmove. 2506 */ 2507 dest = src + delta; 2508 memmove(dest, src, inp[H_DATAINDEX(indx)] - HOFFSET(p)); 2509 } 2510 2511 /* Adjust page metadata. */ 2512 HOFFSET(p) = HOFFSET(p) + delta; 2513 NUM_ENT(p) = NUM_ENT(p) - 2; 2514 2515 /* Adjust the offsets. */ 2516 for (n = (db_indx_t)indx; n < (db_indx_t)(NUM_ENT(p)); n++) 2517 inp[n] = inp[n + 2] + delta; 2518 2519} 2520 2521/* 2522 * __hamc_delpg -- 2523 * 2524 * Adjust the cursors after we've emptied a page in a bucket, taking 2525 * care that when we move cursors pointing to deleted items, their 2526 * orders don't collide with the orders of cursors on the page we move 2527 * them to (since after this function is called, cursors with the same 2528 * index on the two pages will be otherwise indistinguishable--they'll 2529 * all have pgno new_pgno). There are three cases: 2530 * 2531 * 1) The emptied page is the first page in the bucket. In this 2532 * case, we've copied all the items from the second page into the 2533 * first page, so the first page is new_pgno and the second page is 2534 * old_pgno. new_pgno is empty, but can have deleted cursors 2535 * pointing at indx 0, so we need to be careful of the orders 2536 * there. This is DB_HAM_DELFIRSTPG. 2537 * 2538 * 2) The page is somewhere in the middle of a bucket. Our caller 2539 * can just delete such a page, so it's old_pgno. old_pgno is 2540 * empty, but may have deleted cursors pointing at indx 0, so we 2541 * need to be careful of indx 0 when we move those cursors to 2542 * new_pgno. This is DB_HAM_DELMIDPG. 2543 * 2544 * 3) The page is the last in a bucket. Again the empty page is 2545 * old_pgno, and again it should only have cursors that are deleted 2546 * and at indx == 0. This time, though, there's no next page to 2547 * move them to, so we set them to indx == num_ent on the previous 2548 * page--and indx == num_ent is the index whose cursors we need to 2549 * be careful of. This is DB_HAM_DELLASTPG. 2550 */ 2551static int 2552__hamc_delpg(dbc, old_pgno, new_pgno, num_ent, op, orderp) 2553 DBC *dbc; 2554 db_pgno_t old_pgno, new_pgno; 2555 u_int32_t num_ent; 2556 db_ham_mode op; 2557 u_int32_t *orderp; 2558{ 2559 DB *dbp, *ldbp; 2560 DBC *cp; 2561 DB_LSN lsn; 2562 DB_TXN *my_txn; 2563 ENV *env; 2564 HASH_CURSOR *hcp; 2565 db_indx_t indx; 2566 u_int32_t order; 2567 int found, ret; 2568 2569 /* Which is the worrisome index? */ 2570 indx = (op == DB_HAM_DELLASTPG) ? num_ent : 0; 2571 2572 dbp = dbc->dbp; 2573 env = dbp->env; 2574 2575 my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL; 2576 MUTEX_LOCK(env, env->mtx_dblist); 2577 /* 2578 * Find the highest order of any cursor our movement 2579 * may collide with. 2580 */ 2581 FIND_FIRST_DB_MATCH(env, dbp, ldbp); 2582 for (order = 1; 2583 ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid; 2584 ldbp = TAILQ_NEXT(ldbp, dblistlinks)) { 2585 MUTEX_LOCK(env, dbp->mutex); 2586 TAILQ_FOREACH(cp, &ldbp->active_queue, links) { 2587 if (cp == dbc || cp->dbtype != DB_HASH) 2588 continue; 2589 hcp = (HASH_CURSOR *)cp->internal; 2590 if (hcp->pgno == new_pgno && 2591 !MVCC_SKIP_CURADJ(cp, new_pgno)) { 2592 if (hcp->indx == indx && 2593 F_ISSET(hcp, H_DELETED) && 2594 hcp->order >= order) 2595 order = hcp->order + 1; 2596 DB_ASSERT(env, op != DB_HAM_DELFIRSTPG || 2597 hcp->indx == NDX_INVALID || 2598 (hcp->indx == 0 && 2599 F_ISSET(hcp, H_DELETED))); 2600 } 2601 } 2602 MUTEX_UNLOCK(env, dbp->mutex); 2603 } 2604 2605 FIND_FIRST_DB_MATCH(env, dbp, ldbp); 2606 for (found = 0; 2607 ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid; 2608 ldbp = TAILQ_NEXT(ldbp, dblistlinks)) { 2609 MUTEX_LOCK(env, dbp->mutex); 2610 TAILQ_FOREACH(cp, &ldbp->active_queue, links) { 2611 if (cp == dbc || cp->dbtype != DB_HASH) 2612 continue; 2613 2614 hcp = (HASH_CURSOR *)cp->internal; 2615 2616 if (hcp->pgno == old_pgno && 2617 !MVCC_SKIP_CURADJ(cp, old_pgno)) { 2618 switch (op) { 2619 case DB_HAM_DELFIRSTPG: 2620 /* 2621 * We're moving all items, 2622 * regardless of index. 2623 */ 2624 hcp->pgno = new_pgno; 2625 2626 /* 2627 * But we have to be careful of 2628 * the order values. 2629 */ 2630 if (hcp->indx == indx) 2631 hcp->order += order; 2632 break; 2633 case DB_HAM_DELMIDPG: 2634 hcp->pgno = new_pgno; 2635 DB_ASSERT(env, hcp->indx == 0 && 2636 F_ISSET(hcp, H_DELETED)); 2637 hcp->order += order; 2638 break; 2639 case DB_HAM_DELLASTPG: 2640 hcp->pgno = new_pgno; 2641 DB_ASSERT(env, hcp->indx == 0 && 2642 F_ISSET(hcp, H_DELETED)); 2643 hcp->indx = indx; 2644 hcp->order += order; 2645 break; 2646 default: 2647 return (__db_unknown_path( 2648 env, "__hamc_delpg")); 2649 } 2650 if (my_txn != NULL && cp->txn != my_txn) 2651 found = 1; 2652 } 2653 } 2654 MUTEX_UNLOCK(env, dbp->mutex); 2655 } 2656 MUTEX_UNLOCK(env, env->mtx_dblist); 2657 2658 if (found != 0 && DBC_LOGGING(dbc)) { 2659 if ((ret = __ham_chgpg_log(dbp, my_txn, &lsn, 0, op, 2660 old_pgno, new_pgno, indx, order)) != 0) 2661 return (ret); 2662 } 2663 *orderp = order; 2664 return (0); 2665} 2666