1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996-2009 Oracle. All rights reserved. 5 */ 6/* 7 * Copyright (c) 1990, 1993, 1994 8 * The Regents of the University of California. All rights reserved. 9 * 10 * This code is derived from software contributed to Berkeley by 11 * Margo Seltzer. 12 * 13 * Redistribution and use in source and binary forms, with or without 14 * modification, are permitted provided that the following conditions 15 * are met: 16 * 1. Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 2. Redistributions in binary form must reproduce the above copyright 19 * notice, this list of conditions and the following disclaimer in the 20 * documentation and/or other materials provided with the distribution. 21 * 3. Neither the name of the University nor the names of its contributors 22 * may be used to endorse or promote products derived from this software 23 * without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 26 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 35 * SUCH DAMAGE. 36 * 37 * $Id$ 38 */ 39 40/* 41 * PACKAGE: hashing 42 * 43 * DESCRIPTION: 44 * Manipulation of duplicates for the hash package. 45 */ 46 47#include "db_config.h" 48 49#include "db_int.h" 50#include "dbinc/db_page.h" 51#include "dbinc/hash.h" 52#include "dbinc/btree.h" 53#include "dbinc/mp.h" 54 55static int __hamc_chgpg __P((DBC *, 56 db_pgno_t, u_int32_t, db_pgno_t, u_int32_t)); 57static int __ham_check_move __P((DBC *, u_int32_t)); 58static int __ham_dcursor __P((DBC *, db_pgno_t, u_int32_t)); 59static int __ham_move_offpage __P((DBC *, PAGE *, u_int32_t, db_pgno_t)); 60 61/* 62 * Called from hash_access to add a duplicate key. nval is the new 63 * value that we want to add. The flags correspond to the flag values 64 * to cursor_put indicating where to add the new element. 65 * There are 4 cases. 66 * Case 1: The existing duplicate set already resides on a separate page. 67 * We return and let the common code handle this. 68 * Case 2: The element is small enough to just be added to the existing set. 69 * Case 3: The element is large enough to be a big item, so we're going to 70 * have to push the set onto a new page. 71 * Case 4: The element is large enough to push the duplicate set onto a 72 * separate page. 73 * 74 * PUBLIC: int __ham_add_dup __P((DBC *, DBT *, u_int32_t, db_pgno_t *)); 75 */ 76int 77__ham_add_dup(dbc, nval, flags, pgnop) 78 DBC *dbc; 79 DBT *nval; 80 u_int32_t flags; 81 db_pgno_t *pgnop; 82{ 83 DB *dbp; 84 DBT pval, tmp_val; 85 DB_MPOOLFILE *mpf; 86 ENV *env; 87 HASH_CURSOR *hcp; 88 u_int32_t add_bytes, new_size; 89 int cmp, ret; 90 u_int8_t *hk; 91 92 dbp = dbc->dbp; 93 env = dbp->env; 94 mpf = dbp->mpf; 95 hcp = (HASH_CURSOR *)dbc->internal; 96 97 DB_ASSERT(env, flags != DB_CURRENT); 98 99 add_bytes = nval->size + 100 (F_ISSET(nval, DB_DBT_PARTIAL) ? nval->doff : 0); 101 add_bytes = DUP_SIZE(add_bytes); 102 103 if ((ret = __ham_check_move(dbc, add_bytes)) != 0) 104 return (ret); 105 106 /* 107 * Check if resulting duplicate set is going to need to go 108 * onto a separate duplicate page. If so, convert the 109 * duplicate set and add the new one. After conversion, 110 * hcp->dndx is the first free ndx or the index of the 111 * current pointer into the duplicate set. 112 */ 113 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx); 114 /* Add the len bytes to the current singleton. */ 115 if (HPAGE_PTYPE(hk) != H_DUPLICATE) 116 add_bytes += DUP_SIZE(0); 117 new_size = 118 LEN_HKEYDATA(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)) + 119 add_bytes; 120 121 /* 122 * We convert to off-page duplicates if the item is a big item, 123 * the addition of the new item will make the set large, or 124 * if there isn't enough room on this page to add the next item. 125 */ 126 if (HPAGE_PTYPE(hk) != H_OFFDUP && 127 (HPAGE_PTYPE(hk) == H_OFFPAGE || ISBIG(hcp, new_size) || 128 add_bytes > P_FREESPACE(dbp, hcp->page))) { 129 130 if ((ret = __ham_dup_convert(dbc)) != 0) 131 return (ret); 132 return (hcp->opd->am_put(hcp->opd, 133 NULL, nval, flags, NULL)); 134 } 135 136 /* There are two separate cases here: on page and off page. */ 137 if (HPAGE_PTYPE(hk) != H_OFFDUP) { 138 if (HPAGE_PTYPE(hk) != H_DUPLICATE) { 139 pval.flags = 0; 140 pval.data = HKEYDATA_DATA(hk); 141 pval.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, 142 hcp->indx); 143 if ((ret = __ham_make_dup(env, 144 &pval, &tmp_val, &dbc->my_rdata.data, 145 &dbc->my_rdata.ulen)) != 0 || (ret = 146 __ham_replpair(dbc, &tmp_val, 1)) != 0) 147 return (ret); 148 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx); 149 HPAGE_PTYPE(hk) = H_DUPLICATE; 150 151 /* 152 * Update the cursor position since we now are in 153 * duplicates. 154 */ 155 F_SET(hcp, H_ISDUP); 156 hcp->dup_off = 0; 157 hcp->dup_len = pval.size; 158 hcp->dup_tlen = DUP_SIZE(hcp->dup_len); 159 } 160 161 /* Now make the new entry a duplicate. */ 162 if ((ret = __ham_make_dup(env, nval, 163 &tmp_val, &dbc->my_rdata.data, &dbc->my_rdata.ulen)) != 0) 164 return (ret); 165 166 tmp_val.dlen = 0; 167 switch (flags) { /* On page. */ 168 case DB_KEYFIRST: 169 case DB_KEYLAST: 170 case DB_NODUPDATA: 171 case DB_OVERWRITE_DUP: 172 if (dbp->dup_compare != NULL) { 173 __ham_dsearch(dbc, 174 nval, &tmp_val.doff, &cmp, flags); 175 176 /* 177 * Duplicate duplicates are not supported w/ 178 * sorted dups. We can either overwrite or 179 * return DB_KEYEXIST. 180 */ 181 if (cmp == 0) { 182 if (flags == DB_OVERWRITE_DUP) 183 return (__ham_overwrite(dbc, 184 nval, flags)); 185 return (__db_duperr(dbp, flags)); 186 } 187 } else { 188 hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, 189 dbp->pgsize, hcp->indx); 190 hcp->dup_len = nval->size; 191 F_SET(hcp, H_ISDUP); 192 if (flags == DB_KEYFIRST) 193 hcp->dup_off = tmp_val.doff = 0; 194 else 195 hcp->dup_off = 196 tmp_val.doff = hcp->dup_tlen; 197 } 198 break; 199 case DB_BEFORE: 200 tmp_val.doff = hcp->dup_off; 201 break; 202 case DB_AFTER: 203 tmp_val.doff = hcp->dup_off + DUP_SIZE(hcp->dup_len); 204 break; 205 default: 206 return (__db_unknown_path(env, "__ham_add_dup")); 207 } 208 209 /* Add the duplicate. */ 210 if ((ret = __memp_dirty(mpf, &hcp->page, 211 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0 || 212 (ret = __ham_replpair(dbc, &tmp_val, 0)) != 0) 213 return (ret); 214 215 /* Now, update the cursor if necessary. */ 216 switch (flags) { 217 case DB_AFTER: 218 hcp->dup_off += DUP_SIZE(hcp->dup_len); 219 hcp->dup_len = nval->size; 220 hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size); 221 break; 222 case DB_BEFORE: 223 case DB_KEYFIRST: 224 case DB_KEYLAST: 225 case DB_NODUPDATA: 226 hcp->dup_tlen += (db_indx_t)DUP_SIZE(nval->size); 227 hcp->dup_len = nval->size; 228 break; 229 default: 230 return (__db_unknown_path(env, "__ham_add_dup")); 231 } 232 ret = __hamc_update(dbc, tmp_val.size, DB_HAM_CURADJ_ADD, 1); 233 return (ret); 234 } 235 236 /* 237 * If we get here, then we're on duplicate pages; set pgnop and 238 * return so the common code can handle it. 239 */ 240 memcpy(pgnop, HOFFDUP_PGNO(H_PAIRDATA(dbp, hcp->page, hcp->indx)), 241 sizeof(db_pgno_t)); 242 243 return (ret); 244} 245 246/* 247 * Convert an on-page set of duplicates to an offpage set of duplicates. 248 * 249 * PUBLIC: int __ham_dup_convert __P((DBC *)); 250 */ 251int 252__ham_dup_convert(dbc) 253 DBC *dbc; 254{ 255 BOVERFLOW bo; 256 DB *dbp; 257 DBC **hcs; 258 DBT dbt; 259 DB_LSN lsn; 260 DB_MPOOLFILE *mpf; 261 ENV *env; 262 HASH_CURSOR *hcp; 263 HOFFPAGE ho; 264 PAGE *dp; 265 db_indx_t i, len, off; 266 int c, ret, t_ret; 267 u_int8_t *p, *pend; 268 269 dbp = dbc->dbp; 270 env = dbp->env; 271 mpf = dbp->mpf; 272 hcp = (HASH_CURSOR *)dbc->internal; 273 274 /* 275 * Create a new page for the duplicates. 276 */ 277 if ((ret = __db_new(dbc, 278 dbp->dup_compare == NULL ? P_LRECNO : P_LDUP, NULL, &dp)) != 0) 279 return (ret); 280 P_INIT(dp, dbp->pgsize, 281 dp->pgno, PGNO_INVALID, PGNO_INVALID, LEAFLEVEL, TYPE(dp)); 282 283 /* 284 * Get the list of cursors that may need to be updated. 285 */ 286 if ((ret = __ham_get_clist(dbp, 287 PGNO(hcp->page), (u_int32_t)hcp->indx, &hcs)) != 0) 288 goto err; 289 290 /* 291 * Now put the duplicates onto the new page. 292 */ 293 dbt.flags = 0; 294 switch (HPAGE_PTYPE(H_PAIRDATA(dbp, hcp->page, hcp->indx))) { 295 case H_KEYDATA: 296 /* Simple case, one key on page; move it to dup page. */ 297 dbt.size = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx); 298 dbt.data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)); 299 ret = __db_pitem(dbc, 300 dp, 0, BKEYDATA_SIZE(dbt.size), NULL, &dbt); 301 goto finish; 302 case H_OFFPAGE: 303 /* Simple case, one key on page; move it to dup page. */ 304 memcpy(&ho, P_ENTRY(dbp, hcp->page, H_DATAINDEX(hcp->indx)), 305 HOFFPAGE_SIZE); 306 UMRW_SET(bo.unused1); 307 B_TSET(bo.type, ho.type); 308 UMRW_SET(bo.unused2); 309 bo.pgno = ho.pgno; 310 bo.tlen = ho.tlen; 311 dbt.size = BOVERFLOW_SIZE; 312 dbt.data = &bo; 313 314 ret = __db_pitem(dbc, dp, 0, dbt.size, &dbt, NULL); 315finish: if (ret == 0) { 316 /* Update any other cursors. */ 317 if (hcs != NULL && DBC_LOGGING(dbc) && 318 IS_SUBTRANSACTION(dbc->txn)) { 319 if ((ret = __ham_chgpg_log(dbp, dbc->txn, 320 &lsn, 0, DB_HAM_DUP, PGNO(hcp->page), 321 PGNO(dp), hcp->indx, 0)) != 0) 322 break; 323 } 324 for (c = 0; hcs != NULL && hcs[c] != NULL; c++) 325 if ((ret = __ham_dcursor(hcs[c], 326 PGNO(dp), 0)) != 0) 327 break; 328 } 329 break; 330 case H_DUPLICATE: 331 p = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)); 332 pend = p + 333 LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx); 334 335 /* 336 * We need to maintain the duplicate cursor position. 337 * Keep track of where we are in the duplicate set via 338 * the offset, and when it matches the one in the cursor, 339 * set the off-page duplicate cursor index to the current 340 * index. 341 */ 342 for (off = 0, i = 0; p < pend; i++) { 343 memcpy(&len, p, sizeof(db_indx_t)); 344 dbt.size = len; 345 p += sizeof(db_indx_t); 346 dbt.data = p; 347 p += len + sizeof(db_indx_t); 348 if ((ret = __db_pitem(dbc, dp, 349 i, BKEYDATA_SIZE(dbt.size), NULL, &dbt)) != 0) 350 break; 351 352 /* Update any other cursors */ 353 if (hcs != NULL && DBC_LOGGING(dbc) && 354 IS_SUBTRANSACTION(dbc->txn)) { 355 if ((ret = __ham_chgpg_log(dbp, dbc->txn, 356 &lsn, 0, DB_HAM_DUP, PGNO(hcp->page), 357 PGNO(dp), hcp->indx, i)) != 0) 358 break; 359 } 360 for (c = 0; hcs != NULL && hcs[c] != NULL; c++) 361 if (((HASH_CURSOR *)(hcs[c]->internal))->dup_off 362 == off && (ret = __ham_dcursor(hcs[c], 363 PGNO(dp), i)) != 0) 364 goto err; 365 off += len + 2 * sizeof(db_indx_t); 366 } 367 break; 368 default: 369 ret = __db_pgfmt(env, hcp->pgno); 370 break; 371 } 372 373 /* 374 * Now attach this to the source page in place of the old duplicate 375 * item. 376 */ 377 if (ret == 0) 378 ret = __memp_dirty(mpf, 379 &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0); 380 381 if (ret == 0) 382 ret = __ham_move_offpage(dbc, hcp->page, 383 (u_int32_t)H_DATAINDEX(hcp->indx), PGNO(dp)); 384 385err: if ((t_ret = __memp_fput(mpf, 386 dbc->thread_info, dp, dbc->priority)) != 0 && ret == 0) 387 ret = t_ret; 388 389 if (ret == 0) 390 hcp->dup_tlen = hcp->dup_off = hcp->dup_len = 0; 391 392 if (hcs != NULL) 393 __os_free(env, hcs); 394 395 return (ret); 396} 397 398/* 399 * __ham_make_dup 400 * 401 * Take a regular dbt and make it into a duplicate item with all the partial 402 * information set appropriately. If the incoming dbt is a partial, assume 403 * we are creating a new entry and make sure that we do any initial padding. 404 * 405 * PUBLIC: int __ham_make_dup __P((ENV *, 406 * PUBLIC: const DBT *, DBT *d, void **, u_int32_t *)); 407 */ 408int 409__ham_make_dup(env, notdup, duplicate, bufp, sizep) 410 ENV *env; 411 const DBT *notdup; 412 DBT *duplicate; 413 void **bufp; 414 u_int32_t *sizep; 415{ 416 db_indx_t tsize, item_size; 417 int ret; 418 u_int8_t *p; 419 420 item_size = (db_indx_t)notdup->size; 421 if (F_ISSET(notdup, DB_DBT_PARTIAL)) 422 item_size += notdup->doff; 423 424 tsize = DUP_SIZE(item_size); 425 if ((ret = __ham_init_dbt(env, duplicate, tsize, bufp, sizep)) != 0) 426 return (ret); 427 428 duplicate->dlen = 0; 429 duplicate->flags = notdup->flags; 430 F_SET(duplicate, DB_DBT_PARTIAL); 431 432 p = duplicate->data; 433 memcpy(p, &item_size, sizeof(db_indx_t)); 434 p += sizeof(db_indx_t); 435 if (F_ISSET(notdup, DB_DBT_PARTIAL)) { 436 memset(p, 0, notdup->doff); 437 p += notdup->doff; 438 } 439 memcpy(p, notdup->data, notdup->size); 440 p += notdup->size; 441 memcpy(p, &item_size, sizeof(db_indx_t)); 442 443 duplicate->doff = 0; 444 duplicate->dlen = notdup->size; 445 446 return (0); 447} 448 449/* 450 * __ham_check_move -- 451 * 452 * Check if we can do whatever we need to on this page. If not, 453 * then we'll have to move the current element to a new page. 454 */ 455static int 456__ham_check_move(dbc, add_len) 457 DBC *dbc; 458 u_int32_t add_len; 459{ 460 DB *dbp; 461 DBT k, d; 462 DB_LSN new_lsn; 463 DB_MPOOLFILE *mpf; 464 HASH_CURSOR *hcp; 465 PAGE *next_pagep; 466 db_pgno_t next_pgno; 467 u_int32_t new_datalen, old_len, rectype; 468 db_indx_t new_indx; 469 u_int8_t *hk; 470 int key_type, match, ret, t_ret; 471 472 dbp = dbc->dbp; 473 mpf = dbp->mpf; 474 hcp = (HASH_CURSOR *)dbc->internal; 475 476 hk = H_PAIRDATA(dbp, hcp->page, hcp->indx); 477 478 /* 479 * If the item is already off page duplicates or an offpage item, 480 * then we know we can do whatever we need to do in-place 481 */ 482 if (HPAGE_PTYPE(hk) == H_OFFDUP || HPAGE_PTYPE(hk) == H_OFFPAGE) 483 return (0); 484 485 old_len = 486 LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_DATAINDEX(hcp->indx)); 487 new_datalen = (old_len - HKEYDATA_SIZE(0)) + add_len; 488 if (HPAGE_PTYPE(hk) != H_DUPLICATE) 489 new_datalen += DUP_SIZE(0); 490 491 /* 492 * We need to add a new page under two conditions: 493 * 1. The addition makes the total data length cross the BIG 494 * threshold and the OFFDUP structure won't fit on this page. 495 * 2. The addition does not make the total data cross the 496 * threshold, but the new data won't fit on the page. 497 * If neither of these is true, then we can return. 498 */ 499 if (ISBIG(hcp, new_datalen) && (old_len > HOFFDUP_SIZE || 500 HOFFDUP_SIZE - old_len <= P_FREESPACE(dbp, hcp->page))) 501 return (0); 502 503 if (!ISBIG(hcp, new_datalen) && 504 (new_datalen - old_len) <= P_FREESPACE(dbp, hcp->page)) 505 return (0); 506 507 /* 508 * If we get here, then we need to move the item to a new page. 509 * Check if there are more pages in the chain. We now need to 510 * update new_datalen to include the size of both the key and 511 * the data that we need to move. 512 */ 513 514 new_datalen = ISBIG(hcp, new_datalen) ? 515 HOFFDUP_SIZE : HKEYDATA_SIZE(new_datalen); 516 new_datalen += 517 LEN_HITEM(dbp, hcp->page, dbp->pgsize, H_KEYINDEX(hcp->indx)); 518 519 next_pagep = NULL; 520 for (next_pgno = NEXT_PGNO(hcp->page); next_pgno != PGNO_INVALID; 521 next_pgno = NEXT_PGNO(next_pagep)) { 522 if (next_pagep != NULL && (ret = __memp_fput(mpf, 523 dbc->thread_info, next_pagep, dbc->priority)) != 0) 524 return (ret); 525 526 if ((ret = __memp_fget(mpf, 527 &next_pgno, dbc->thread_info, dbc->txn, 528 DB_MPOOL_CREATE, &next_pagep)) != 0) 529 return (ret); 530 531 if (P_FREESPACE(dbp, next_pagep) >= new_datalen) 532 break; 533 } 534 535 /* No more pages, add one. */ 536 if ((ret = __memp_dirty(mpf, 537 &hcp->page, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 538 return (ret); 539 540 if (next_pagep == NULL && (ret = __ham_add_ovflpage(dbc, 541 hcp->page, 0, &next_pagep)) != 0) 542 return (ret); 543 544 /* Add new page at the end of the chain. */ 545 if ((ret = __memp_dirty(mpf, 546 &next_pagep, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) { 547 (void)__memp_fput(mpf, 548 dbc->thread_info, next_pagep, dbc->priority); 549 return (ret); 550 } 551 552 if (P_FREESPACE(dbp, next_pagep) < new_datalen && (ret = 553 __ham_add_ovflpage(dbc, next_pagep, 1, &next_pagep)) != 0) { 554 (void)__memp_fput(mpf, 555 dbc->thread_info, next_pagep, dbc->priority); 556 return (ret); 557 } 558 559 /* Copy the item to the new page. */ 560 if (DBC_LOGGING(dbc)) { 561 rectype = PUTPAIR; 562 k.flags = 0; 563 d.flags = 0; 564 if (HPAGE_PTYPE( 565 H_PAIRKEY(dbp, hcp->page, hcp->indx)) == H_OFFPAGE) { 566 rectype |= PAIR_KEYMASK; 567 k.data = H_PAIRKEY(dbp, hcp->page, hcp->indx); 568 k.size = HOFFPAGE_SIZE; 569 key_type = H_OFFPAGE; 570 } else { 571 k.data = 572 HKEYDATA_DATA(H_PAIRKEY(dbp, hcp->page, hcp->indx)); 573 k.size = 574 LEN_HKEY(dbp, hcp->page, dbp->pgsize, hcp->indx); 575 key_type = H_KEYDATA; 576 } 577 578 /* Resolve the insert index so it can be written to the log. */ 579 if ((ret = __ham_getindex(dbc, next_pagep, &k, 580 key_type, &match, &new_indx)) != 0) 581 return (ret); 582 583 if (HPAGE_PTYPE(hk) == H_OFFPAGE) { 584 rectype |= PAIR_DATAMASK; 585 d.data = H_PAIRDATA(dbp, hcp->page, hcp->indx); 586 d.size = HOFFPAGE_SIZE; 587 } else { 588 if (HPAGE_PTYPE(H_PAIRDATA(dbp, 589 hcp->page, hcp->indx)) == H_DUPLICATE) 590 rectype |= PAIR_DUPMASK; 591 d.data = HKEYDATA_DATA( 592 H_PAIRDATA(dbp, hcp->page, hcp->indx)); 593 d.size = LEN_HDATA(dbp, hcp->page, 594 dbp->pgsize, hcp->indx); 595 } 596 597 if ((ret = __ham_insdel_log(dbp, 598 dbc->txn, &new_lsn, 0, rectype, PGNO(next_pagep), 599 (u_int32_t)new_indx, &LSN(next_pagep), 600 &k, &d)) != 0) { 601 (void)__memp_fput(mpf, 602 dbc->thread_info, next_pagep, dbc->priority); 603 return (ret); 604 } 605 } else { 606 LSN_NOT_LOGGED(new_lsn); 607 /* 608 * Ensure that an invalid index is passed to __ham_copypair, so 609 * it knows to resolve the index. Resolving the insert index 610 * here would require creating a temporary DBT with the key, 611 * and calling __ham_getindex. Let __ham_copypair do the 612 * resolution using the final key DBT. 613 */ 614 new_indx = NDX_INVALID; 615 } 616 617 /* Move lsn onto page. */ 618 if ((ret = __memp_dirty(mpf, 619 &next_pagep, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) { 620 (void)__memp_fput(mpf, 621 dbc->thread_info, next_pagep, dbc->priority); 622 return (ret); 623 } 624 LSN(next_pagep) = new_lsn; /* Structure assignment. */ 625 626 if ((ret = __ham_copypair(dbc, hcp->page, 627 H_KEYINDEX(hcp->indx), next_pagep, &new_indx)) != 0) 628 goto out; 629 630 /* Update all cursors that used to point to this item. */ 631 if ((ret = __hamc_chgpg(dbc, PGNO(hcp->page), H_KEYINDEX(hcp->indx), 632 PGNO(next_pagep), new_indx)) != 0) 633 goto out; 634 635 /* Now delete the pair from the current page. */ 636 ret = __ham_del_pair(dbc, HAM_DEL_NO_RECLAIM); 637 638 /* 639 * __ham_del_pair decremented nelem. This is incorrect; we 640 * manually copied the element elsewhere, so the total number 641 * of elements hasn't changed. Increment it again. 642 * 643 * !!! 644 * Note that we still have the metadata page pinned, and 645 * __ham_del_pair dirtied it, so we don't need to set the dirty 646 * flag again. 647 */ 648 if (!STD_LOCKING(dbc)) 649 hcp->hdr->nelem++; 650 651out: if ((t_ret = __memp_fput(mpf, 652 dbc->thread_info, hcp->page, dbc->priority)) != 0 && ret == 0) 653 ret = t_ret; 654 hcp->page = next_pagep; 655 hcp->pgno = PGNO(hcp->page); 656 hcp->indx = new_indx; 657 F_SET(hcp, H_EXPAND); 658 F_CLR(hcp, H_DELETED); 659 660 return (ret); 661} 662 663/* 664 * __ham_move_offpage -- 665 * Replace an onpage set of duplicates with the OFFDUP structure 666 * that references the duplicate page. 667 * 668 * XXX 669 * This is really just a special case of __onpage_replace; we should 670 * probably combine them. 671 * 672 */ 673static int 674__ham_move_offpage(dbc, pagep, ndx, pgno) 675 DBC *dbc; 676 PAGE *pagep; 677 u_int32_t ndx; 678 db_pgno_t pgno; 679{ 680 DB *dbp; 681 DBT new_dbt; 682 DBT old_dbt; 683 HOFFDUP od; 684 db_indx_t i, *inp; 685 int32_t difflen; 686 u_int8_t *src; 687 int ret; 688 689 dbp = dbc->dbp; 690 od.type = H_OFFDUP; 691 UMRW_SET(od.unused[0]); 692 UMRW_SET(od.unused[1]); 693 UMRW_SET(od.unused[2]); 694 od.pgno = pgno; 695 ret = 0; 696 697 if (DBC_LOGGING(dbc)) { 698 new_dbt.data = &od; 699 new_dbt.size = HOFFDUP_SIZE; 700 old_dbt.data = P_ENTRY(dbp, pagep, ndx); 701 old_dbt.size = LEN_HITEM(dbp, pagep, dbp->pgsize, ndx); 702 if ((ret = __ham_replace_log(dbp, dbc->txn, &LSN(pagep), 0, 703 PGNO(pagep), (u_int32_t)ndx, &LSN(pagep), -1, 704 &old_dbt, &new_dbt, 0)) != 0) 705 return (ret); 706 } else 707 LSN_NOT_LOGGED(LSN(pagep)); 708 709 /* 710 * difflen is the difference in the lengths, and so may be negative. 711 * We know that the difference between two unsigned lengths from a 712 * database page will fit into an int32_t. 713 */ 714 difflen = 715 (int32_t)LEN_HITEM(dbp, pagep, dbp->pgsize, ndx) - 716 (int32_t)HOFFDUP_SIZE; 717 if (difflen != 0) { 718 /* Copy data. */ 719 inp = P_INP(dbp, pagep); 720 src = (u_int8_t *)(pagep) + HOFFSET(pagep); 721 memmove(src + difflen, src, inp[ndx] - HOFFSET(pagep)); 722 HOFFSET(pagep) += difflen; 723 724 /* Update index table. */ 725 for (i = ndx; i < NUM_ENT(pagep); i++) 726 inp[i] += difflen; 727 } 728 729 /* Now copy the offdup entry onto the page. */ 730 memcpy(P_ENTRY(dbp, pagep, ndx), &od, HOFFDUP_SIZE); 731 return (ret); 732} 733 734/* 735 * __ham_dsearch: 736 * Locate a particular duplicate in a duplicate set. Make sure that 737 * we exit with the cursor set appropriately. 738 * 739 * PUBLIC: void __ham_dsearch 740 * PUBLIC: __P((DBC *, DBT *, u_int32_t *, int *, u_int32_t)); 741 */ 742void 743__ham_dsearch(dbc, dbt, offp, cmpp, flags) 744 DBC *dbc; 745 DBT *dbt; 746 u_int32_t *offp, flags; 747 int *cmpp; 748{ 749 DB *dbp; 750 DBT cur; 751 HASH_CURSOR *hcp; 752 db_indx_t i, len; 753 int (*func) __P((DB *, const DBT *, const DBT *)); 754 u_int8_t *data; 755 756 dbp = dbc->dbp; 757 hcp = (HASH_CURSOR *)dbc->internal; 758 func = dbp->dup_compare == NULL ? __bam_defcmp : dbp->dup_compare; 759 760 i = F_ISSET(hcp, H_CONTINUE) ? hcp->dup_off: 0; 761 data = HKEYDATA_DATA(H_PAIRDATA(dbp, hcp->page, hcp->indx)) + i; 762 hcp->dup_tlen = LEN_HDATA(dbp, hcp->page, dbp->pgsize, hcp->indx); 763 len = hcp->dup_len; 764 while (i < hcp->dup_tlen) { 765 memcpy(&len, data, sizeof(db_indx_t)); 766 data += sizeof(db_indx_t); 767 DB_SET_DBT(cur, data, len); 768 769 /* 770 * If we find an exact match, we're done. If in a sorted 771 * duplicate set and the item is larger than our test item, 772 * we're done. In the latter case, if permitting partial 773 * matches, it's not a failure. 774 */ 775 *cmpp = func(dbp, dbt, &cur); 776 if (*cmpp == 0) 777 break; 778 if (*cmpp < 0 && dbp->dup_compare != NULL) { 779 if (flags == DB_GET_BOTH_RANGE) 780 *cmpp = 0; 781 break; 782 } 783 784 i += len + 2 * sizeof(db_indx_t); 785 data += len + sizeof(db_indx_t); 786 } 787 788 *offp = i; 789 hcp->dup_off = i; 790 hcp->dup_len = len; 791 F_SET(hcp, H_ISDUP); 792} 793 794/* 795 * __ham_dcursor -- 796 * 797 * Create an off page duplicate cursor for this cursor. 798 */ 799static int 800__ham_dcursor(dbc, pgno, indx) 801 DBC *dbc; 802 db_pgno_t pgno; 803 u_int32_t indx; 804{ 805 BTREE_CURSOR *dcp; 806 DB *dbp; 807 HASH_CURSOR *hcp; 808 int ret; 809 810 dbp = dbc->dbp; 811 hcp = (HASH_CURSOR *)dbc->internal; 812 813 if ((ret = __dbc_newopd(dbc, pgno, hcp->opd, &hcp->opd)) != 0) 814 return (ret); 815 816 dcp = (BTREE_CURSOR *)hcp->opd->internal; 817 dcp->pgno = pgno; 818 dcp->indx = indx; 819 820 if (dbp->dup_compare == NULL) { 821 /* 822 * Converting to off-page Recno trees is tricky. The 823 * record number for the cursor is the index + 1 (to 824 * convert to 1-based record numbers). 825 */ 826 dcp->recno = indx + 1; 827 } 828 829 /* 830 * Transfer the deleted flag from the top-level cursor to the 831 * created one. 832 */ 833 if (F_ISSET(hcp, H_DELETED)) { 834 F_SET(dcp, C_DELETED); 835 F_CLR(hcp, H_DELETED); 836 } 837 838 return (0); 839} 840 841/* 842 * __hamc_chgpg -- 843 * Adjust the cursors after moving an item to a new page. We only 844 * move cursors that are pointing at this one item and are not 845 * deleted; since we only touch non-deleted cursors, and since 846 * (by definition) no item existed at the pgno/indx we're moving the 847 * item to, we're guaranteed that all the cursors we affect here or 848 * on abort really do refer to this one item. 849 */ 850static int 851__hamc_chgpg(dbc, old_pgno, old_index, new_pgno, new_index) 852 DBC *dbc; 853 db_pgno_t old_pgno, new_pgno; 854 u_int32_t old_index, new_index; 855{ 856 DB *dbp, *ldbp; 857 DBC *cp; 858 DB_LSN lsn; 859 DB_TXN *my_txn; 860 ENV *env; 861 HASH_CURSOR *hcp; 862 int found, ret; 863 864 dbp = dbc->dbp; 865 env = dbp->env; 866 867 my_txn = IS_SUBTRANSACTION(dbc->txn) ? dbc->txn : NULL; 868 869 MUTEX_LOCK(env, env->mtx_dblist); 870 FIND_FIRST_DB_MATCH(env, dbp, ldbp); 871 for (found = 0; 872 ldbp != NULL && ldbp->adj_fileid == dbp->adj_fileid; 873 ldbp = TAILQ_NEXT(ldbp, dblistlinks)) { 874 MUTEX_LOCK(env, dbp->mutex); 875 TAILQ_FOREACH(cp, &ldbp->active_queue, links) { 876 if (cp == dbc || cp->dbtype != DB_HASH) 877 continue; 878 879 hcp = (HASH_CURSOR *)cp->internal; 880 881 /* 882 * If a cursor is deleted, it doesn't refer to this 883 * item--it just happens to have the same indx, but 884 * it points to a former neighbor. Don't move it. 885 */ 886 if (F_ISSET(hcp, H_DELETED)) 887 continue; 888 889 if (hcp->pgno == old_pgno && 890 hcp->indx == old_index && 891 !MVCC_SKIP_CURADJ(cp, old_pgno)) { 892 hcp->pgno = new_pgno; 893 hcp->indx = new_index; 894 if (my_txn != NULL && cp->txn != my_txn) 895 found = 1; 896 } 897 } 898 MUTEX_UNLOCK(env, dbp->mutex); 899 } 900 MUTEX_UNLOCK(env, env->mtx_dblist); 901 902 if (found != 0 && DBC_LOGGING(dbc)) { 903 if ((ret = __ham_chgpg_log(dbp, my_txn, &lsn, 0, DB_HAM_CHGPG, 904 old_pgno, new_pgno, old_index, new_index)) != 0) 905 return (ret); 906 } 907 return (0); 908} 909