1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996,2008 Oracle. All rights reserved. 5 */ 6/* 7 * Copyright (c) 1990, 1993, 1994, 1995, 1996 8 * Keith Bostic. All rights reserved. 9 */ 10/* 11 * Copyright (c) 1990, 1993, 1994, 1995 12 * The Regents of the University of California. All rights reserved. 13 * 14 * This code is derived from software contributed to Berkeley by 15 * Mike Olson. 16 * 17 * Redistribution and use in source and binary forms, with or without 18 * modification, are permitted provided that the following conditions 19 * are met: 20 * 1. Redistributions of source code must retain the above copyright 21 * notice, this list of conditions and the following disclaimer. 22 * 2. Redistributions in binary form must reproduce the above copyright 23 * notice, this list of conditions and the following disclaimer in the 24 * documentation and/or other materials provided with the distribution. 25 * 3. Neither the name of the University nor the names of its contributors 26 * may be used to endorse or promote products derived from this software 27 * without specific prior written permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 30 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 31 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 32 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 33 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 34 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 35 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 37 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 38 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 39 * SUCH DAMAGE. 40 * 41 * $Id: db_overflow.c,v 12.26 2008/03/12 20:32:32 mbrey Exp $ 42 */ 43 44#include "db_config.h" 45 46#include "db_int.h" 47#include "dbinc/db_page.h" 48#include "dbinc/db_am.h" 49#include "dbinc/mp.h" 50 51/* 52 * Big key/data code. 53 * 54 * Big key and data entries are stored on linked lists of pages. The initial 55 * reference is a structure with the total length of the item and the page 56 * number where it begins. Each entry in the linked list contains a pointer 57 * to the next page of data, and so on. 58 */ 59 60/* 61 * __db_goff -- 62 * Get an offpage item. 63 * 64 * PUBLIC: int __db_goff __P((DB *, DB_THREAD_INFO *, DB_TXN *, DBT *, 65 * PUBLIC: u_int32_t, db_pgno_t, void **, u_int32_t *)); 66 */ 67int 68__db_goff(dbp, ip, txn, dbt, tlen, pgno, bpp, bpsz) 69 DB *dbp; 70 DB_THREAD_INFO *ip; 71 DB_TXN *txn; 72 DBT *dbt; 73 u_int32_t tlen; 74 db_pgno_t pgno; 75 void **bpp; 76 u_int32_t *bpsz; 77{ 78 DB_MPOOLFILE *mpf; 79 ENV *env; 80 PAGE *h; 81 db_indx_t bytes; 82 u_int32_t curoff, needed, start; 83 u_int8_t *p, *src; 84 int ret; 85 86 env = dbp->env; 87 mpf = dbp->mpf; 88 89 /* 90 * Check if the buffer is big enough; if it is not and we are 91 * allowed to malloc space, then we'll malloc it. If we are 92 * not (DB_DBT_USERMEM), then we'll set the dbt and return 93 * appropriately. 94 */ 95 if (F_ISSET(dbt, DB_DBT_PARTIAL)) { 96 start = dbt->doff; 97 if (start > tlen) 98 needed = 0; 99 else if (dbt->dlen > tlen - start) 100 needed = tlen - start; 101 else 102 needed = dbt->dlen; 103 } else { 104 start = 0; 105 needed = tlen; 106 } 107 108 if (F_ISSET(dbt, DB_DBT_USERCOPY)) 109 goto skip_alloc; 110 111 /* Allocate any necessary memory. */ 112 if (F_ISSET(dbt, DB_DBT_USERMEM)) { 113 if (needed > dbt->ulen) { 114 dbt->size = needed; 115 return (DB_BUFFER_SMALL); 116 } 117 } else if (F_ISSET(dbt, DB_DBT_MALLOC)) { 118 if ((ret = __os_umalloc(env, needed, &dbt->data)) != 0) 119 return (ret); 120 } else if (F_ISSET(dbt, DB_DBT_REALLOC)) { 121 if ((ret = __os_urealloc(env, needed, &dbt->data)) != 0) 122 return (ret); 123 } else if (bpsz != NULL && (*bpsz == 0 || *bpsz < needed)) { 124 if ((ret = __os_realloc(env, needed, bpp)) != 0) 125 return (ret); 126 *bpsz = needed; 127 dbt->data = *bpp; 128 } else if (bpp != NULL) 129 dbt->data = *bpp; 130 else { 131 DB_ASSERT(env, 132 F_ISSET(dbt, 133 DB_DBT_USERMEM | DB_DBT_MALLOC | DB_DBT_REALLOC) || 134 bpsz != NULL || bpp != NULL); 135 return (DB_BUFFER_SMALL); 136 } 137 138skip_alloc: 139 /* 140 * Step through the linked list of pages, copying the data on each 141 * one into the buffer. Never copy more than the total data length. 142 */ 143 dbt->size = needed; 144 for (curoff = 0, p = dbt->data; pgno != PGNO_INVALID && needed > 0;) { 145 if ((ret = __memp_fget(mpf, &pgno, ip, txn, 0, &h)) != 0) 146 return (ret); 147 DB_ASSERT(env, TYPE(h) == P_OVERFLOW); 148 149 /* Check if we need any bytes from this page. */ 150 if (curoff + OV_LEN(h) >= start) { 151 bytes = OV_LEN(h); 152 src = (u_int8_t *)h + P_OVERHEAD(dbp); 153 if (start > curoff) { 154 src += start - curoff; 155 bytes -= start - curoff; 156 } 157 if (bytes > needed) 158 bytes = needed; 159 if (F_ISSET(dbt, DB_DBT_USERCOPY)) { 160 /* 161 * The offset into the DBT is the total size 162 * less the amount of data still needed. Care 163 * needs to be taken if doing a partial copy 164 * beginning at an offset other than 0. 165 */ 166 if ((ret = env->dbt_usercopy( 167 dbt, dbt->size - needed, 168 src, bytes, DB_USERCOPY_SETDATA)) != 0) { 169 (void)__memp_fput(mpf, ip, 170 h, dbp->priority); 171 return (ret); 172 } 173 } else 174 memcpy(p, src, bytes); 175 p += bytes; 176 needed -= bytes; 177 } 178 curoff += OV_LEN(h); 179 pgno = h->next_pgno; 180 (void)__memp_fput(mpf, ip, h, dbp->priority); 181 } 182 return (0); 183} 184 185/* 186 * __db_poff -- 187 * Put an offpage item. 188 * 189 * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *)); 190 */ 191int 192__db_poff(dbc, dbt, pgnop) 193 DBC *dbc; 194 const DBT *dbt; 195 db_pgno_t *pgnop; 196{ 197 DB *dbp; 198 DBT tmp_dbt; 199 DB_LSN new_lsn, null_lsn; 200 DB_MPOOLFILE *mpf; 201 PAGE *pagep, *lastp; 202 db_indx_t pagespace; 203 u_int32_t sz; 204 u_int8_t *p; 205 int ret, t_ret; 206 207 /* 208 * Allocate pages and copy the key/data item into them. Calculate the 209 * number of bytes we get for pages we fill completely with a single 210 * item. 211 */ 212 dbp = dbc->dbp; 213 mpf = dbp->mpf; 214 pagespace = P_MAXSPACE(dbp, dbp->pgsize); 215 216 ret = 0; 217 lastp = NULL; 218 for (p = dbt->data, 219 sz = dbt->size; sz > 0; p += pagespace, sz -= pagespace) { 220 /* 221 * Reduce pagespace so we terminate the loop correctly and 222 * don't copy too much data. 223 */ 224 if (sz < pagespace) 225 pagespace = sz; 226 227 /* 228 * Allocate and initialize a new page and copy all or part of 229 * the item onto the page. If sz is less than pagespace, we 230 * have a partial record. 231 */ 232 if ((ret = __db_new(dbc, P_OVERFLOW, &pagep)) != 0) 233 break; 234 if (DBC_LOGGING(dbc)) { 235 tmp_dbt.data = p; 236 tmp_dbt.size = pagespace; 237 ZERO_LSN(null_lsn); 238 if ((ret = __db_big_log(dbp, dbc->txn, 239 &new_lsn, 0, DB_ADD_BIG, PGNO(pagep), 240 lastp ? PGNO(lastp) : PGNO_INVALID, 241 PGNO_INVALID, &tmp_dbt, &LSN(pagep), 242 lastp == NULL ? &null_lsn : &LSN(lastp), 243 &null_lsn)) != 0) { 244 if (lastp != NULL) 245 (void)__memp_fput(mpf, dbc->thread_info, 246 lastp, dbc->priority); 247 lastp = pagep; 248 break; 249 } 250 } else 251 LSN_NOT_LOGGED(new_lsn); 252 253 /* Move LSN onto page. */ 254 if (lastp != NULL) 255 LSN(lastp) = new_lsn; 256 LSN(pagep) = new_lsn; 257 258 OV_LEN(pagep) = pagespace; 259 OV_REF(pagep) = 1; 260 memcpy((u_int8_t *)pagep + P_OVERHEAD(dbp), p, pagespace); 261 262 /* 263 * If this is the first entry, update the user's info. 264 * Otherwise, update the entry on the last page filled 265 * in and release that page. 266 */ 267 if (lastp == NULL) 268 *pgnop = PGNO(pagep); 269 else { 270 lastp->next_pgno = PGNO(pagep); 271 pagep->prev_pgno = PGNO(lastp); 272 (void)__memp_fput(mpf, 273 dbc->thread_info, lastp, dbc->priority); 274 } 275 lastp = pagep; 276 } 277 if (lastp != NULL && (t_ret = __memp_fput(mpf, 278 dbc->thread_info, lastp, dbc->priority)) != 0 && ret == 0) 279 ret = t_ret; 280 return (ret); 281} 282 283/* 284 * __db_ovref -- 285 * Decrement the reference count on an overflow page. 286 * 287 * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t)); 288 */ 289int 290__db_ovref(dbc, pgno) 291 DBC *dbc; 292 db_pgno_t pgno; 293{ 294 DB *dbp; 295 DB_MPOOLFILE *mpf; 296 PAGE *h; 297 int ret; 298 299 dbp = dbc->dbp; 300 mpf = dbp->mpf; 301 302 if ((ret = __memp_fget(mpf, &pgno, 303 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &h)) != 0) 304 return (ret); 305 306 if (DBC_LOGGING(dbc)) { 307 if ((ret = __db_ovref_log(dbp, 308 dbc->txn, &LSN(h), 0, h->pgno, -1, &LSN(h))) != 0) { 309 (void)__memp_fput(mpf, 310 dbc->thread_info, h, dbc->priority); 311 return (ret); 312 } 313 } else 314 LSN_NOT_LOGGED(LSN(h)); 315 316 /* 317 * In BDB releases before 4.5, the overflow reference counts were 318 * incremented when an overflow item was split onto an internal 319 * page. There was a lock race in that code, and rather than fix 320 * the race, we changed BDB to copy overflow items when splitting 321 * them onto internal pages. The code to decrement reference 322 * counts remains so databases already in the field continue to 323 * work. 324 */ 325 --OV_REF(h); 326 327 return (__memp_fput(mpf, dbc->thread_info, h, dbc->priority)); 328} 329 330/* 331 * __db_doff -- 332 * Delete an offpage chain of overflow pages. 333 * 334 * PUBLIC: int __db_doff __P((DBC *, db_pgno_t)); 335 */ 336int 337__db_doff(dbc, pgno) 338 DBC *dbc; 339 db_pgno_t pgno; 340{ 341 DB *dbp; 342 DBT tmp_dbt; 343 DB_LSN null_lsn; 344 DB_MPOOLFILE *mpf; 345 PAGE *pagep; 346 int ret; 347 348 dbp = dbc->dbp; 349 mpf = dbp->mpf; 350 351 do { 352 if ((ret = __memp_fget(mpf, &pgno, 353 dbc->thread_info, dbc->txn, 0, &pagep)) != 0) 354 return (ret); 355 356 DB_ASSERT(dbp->env, TYPE(pagep) == P_OVERFLOW); 357 /* 358 * If it's referenced by more than one key/data item, 359 * decrement the reference count and return. 360 */ 361 if (OV_REF(pagep) > 1) { 362 (void)__memp_fput(mpf, 363 dbc->thread_info, pagep, dbc->priority); 364 return (__db_ovref(dbc, pgno)); 365 } 366 367 if ((ret = __memp_dirty(mpf, &pagep, 368 dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) { 369 (void)__memp_fput(mpf, 370 dbc->thread_info, pagep, dbc->priority); 371 return (ret); 372 } 373 374 if (DBC_LOGGING(dbc)) { 375 tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD(dbp); 376 tmp_dbt.size = OV_LEN(pagep); 377 ZERO_LSN(null_lsn); 378 if ((ret = __db_big_log(dbp, dbc->txn, 379 &LSN(pagep), 0, DB_REM_BIG, 380 PGNO(pagep), PREV_PGNO(pagep), 381 NEXT_PGNO(pagep), &tmp_dbt, 382 &LSN(pagep), &null_lsn, &null_lsn)) != 0) { 383 (void)__memp_fput(mpf, 384 dbc->thread_info, pagep, dbc->priority); 385 return (ret); 386 } 387 } else 388 LSN_NOT_LOGGED(LSN(pagep)); 389 pgno = pagep->next_pgno; 390 OV_LEN(pagep) = 0; 391 if ((ret = __db_free(dbc, pagep)) != 0) 392 return (ret); 393 } while (pgno != PGNO_INVALID); 394 395 return (0); 396} 397 398/* 399 * __db_moff -- 400 * Match on overflow pages. 401 * 402 * Given a starting page number and a key, return <0, 0, >0 to indicate if the 403 * key on the page is less than, equal to or greater than the key specified. 404 * We optimize this by doing chunk at a time comparison unless the user has 405 * specified a comparison function. In this case, we need to materialize 406 * the entire object and call their comparison routine. 407 * 408 * __db_moff and __db_coff are generic functions useful in searching and 409 * ordering off page items. __db_moff matches an overflow DBT with an offpage 410 * item. __db_coff compares two offpage items for lexicographic sort order. 411 * 412 * PUBLIC: int __db_moff __P((DB *, 413 * PUBLIC: DB_THREAD_INFO *, DB_TXN *, const DBT *, db_pgno_t, 414 * PUBLIC: u_int32_t, int (*)(DB *, const DBT *, const DBT *), int *)); 415 */ 416int 417__db_moff(dbp, ip, txn, dbt, pgno, tlen, cmpfunc, cmpp) 418 DB *dbp; 419 DB_THREAD_INFO *ip; 420 DB_TXN *txn; 421 const DBT *dbt; 422 db_pgno_t pgno; 423 u_int32_t tlen; 424 int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp; 425{ 426 DBT local_dbt; 427 DB_MPOOLFILE *mpf; 428 PAGE *pagep; 429 void *buf; 430 u_int32_t bufsize, cmp_bytes, key_left; 431 u_int8_t *p1, *p2; 432 int ret; 433 434 mpf = dbp->mpf; 435 436 /* 437 * If there is a user-specified comparison function, build a 438 * contiguous copy of the key, and call it. 439 */ 440 if (cmpfunc != NULL) { 441 memset(&local_dbt, 0, sizeof(local_dbt)); 442 buf = NULL; 443 bufsize = 0; 444 445 if ((ret = __db_goff(dbp, ip, txn, 446 &local_dbt, tlen, pgno, &buf, &bufsize)) != 0) 447 return (ret); 448 /* Pass the key as the first argument */ 449 *cmpp = cmpfunc(dbp, dbt, &local_dbt); 450 __os_free(dbp->env, buf); 451 return (0); 452 } 453 454 /* While there are both keys to compare. */ 455 for (*cmpp = 0, p1 = dbt->data, 456 key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) { 457 if ((ret = __memp_fget(mpf, &pgno, ip, txn, 0, &pagep)) != 0) 458 return (ret); 459 460 cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left; 461 tlen -= cmp_bytes; 462 key_left -= cmp_bytes; 463 for (p2 = (u_int8_t *)pagep + P_OVERHEAD(dbp); 464 cmp_bytes-- > 0; ++p1, ++p2) 465 if (*p1 != *p2) { 466 *cmpp = (long)*p1 - (long)*p2; 467 break; 468 } 469 pgno = NEXT_PGNO(pagep); 470 if ((ret = __memp_fput(mpf, ip, pagep, dbp->priority)) != 0) 471 return (ret); 472 if (*cmpp != 0) 473 return (0); 474 } 475 if (key_left > 0) /* DBT is longer than the page key. */ 476 *cmpp = 1; 477 else if (tlen > 0) /* DBT is shorter than the page key. */ 478 *cmpp = -1; 479 else 480 *cmpp = 0; 481 482 return (0); 483} 484 485/* 486 * __db_coff -- 487 * Match two offpage dbts. 488 * 489 * The DBTs must both refer to offpage items. 490 * The match happens a chunk (page) at a time unless a user defined comparison 491 * function exists. It is not possible to optimize this comparison away when 492 * a lexicographic sort order is required on mismatch. 493 * 494 * NOTE: For now this function only works for H_OFFPAGE type items. It would 495 * be simple to extend it for use with B_OVERFLOW type items. It would only 496 * require extracting the total length, and page number, dependent on the 497 * DBT type. 498 * 499 * PUBLIC: int __db_coff __P((DB *, DB_THREAD_INFO *, DB_TXN *, const DBT *, 500 * PUBLIC: const DBT *, int (*)(DB *, const DBT *, const DBT *), int *)); 501 */ 502int 503__db_coff(dbp, ip, txn, dbt, match, cmpfunc, cmpp) 504 DB *dbp; 505 DB_THREAD_INFO *ip; 506 DB_TXN *txn; 507 const DBT *dbt, *match; 508 int (*cmpfunc) __P((DB *, const DBT *, const DBT *)), *cmpp; 509{ 510 DBT local_key, local_match; 511 DB_MPOOLFILE *mpf; 512 PAGE *dbt_pagep, *match_pagep; 513 db_pgno_t dbt_pgno, match_pgno; 514 u_int32_t cmp_bytes, dbt_bufsz, dbt_len, match_bufsz; 515 u_int32_t match_len, max_data, page_sz; 516 u_int8_t *p1, *p2; 517 int ret; 518 void *dbt_buf, *match_buf; 519 520 mpf = dbp->mpf; 521 page_sz = dbp->pgsize; 522 *cmpp = 0; 523 dbt_buf = match_buf = NULL; 524 525 DB_ASSERT(dbp->env, HPAGE_PTYPE(dbt->data) == H_OFFPAGE); 526 DB_ASSERT(dbp->env, HPAGE_PTYPE(match->data) == H_OFFPAGE); 527 528 /* Extract potentially unaligned length and pgno fields from DBTs */ 529 memcpy(&dbt_len, HOFFPAGE_TLEN(dbt->data), sizeof(u_int32_t)); 530 memcpy(&dbt_pgno, HOFFPAGE_PGNO(dbt->data), sizeof(db_pgno_t)); 531 memcpy(&match_len, HOFFPAGE_TLEN(match->data), sizeof(u_int32_t)); 532 memcpy(&match_pgno, HOFFPAGE_PGNO(match->data), sizeof(db_pgno_t)); 533 max_data = (dbt_len < match_len ? dbt_len : match_len); 534 535 /* 536 * If there is a custom comparator, fully resolve both DBTs. 537 * Then call the users comparator. 538 */ 539 if (cmpfunc != NULL) { 540 memset(&local_key, 0, sizeof(local_key)); 541 memset(&local_match, 0, sizeof(local_match)); 542 dbt_buf = match_buf = NULL; 543 dbt_bufsz = match_bufsz = 0; 544 545 if ((ret = __db_goff(dbp, ip, txn, &local_key, dbt_len, 546 dbt_pgno, &dbt_buf, &dbt_bufsz)) != 0) 547 goto err1; 548 if ((ret = __db_goff(dbp, ip, txn, &local_match, match_len, 549 match_pgno, &match_buf, &match_bufsz)) != 0) 550 goto err1; 551 /* The key needs to be the first argument for sort order */ 552 *cmpp = cmpfunc(dbp, &local_key, &local_match); 553 554err1: if (dbt_buf != NULL) 555 __os_free(dbp->env, dbt_buf); 556 if (match_buf != NULL) 557 __os_free(dbp->env, match_buf); 558 return (ret); 559 } 560 561 /* Match the offpage DBTs a page at a time. */ 562 while (dbt_pgno != PGNO_INVALID && match_pgno != PGNO_INVALID) { 563 if ((ret = 564 __memp_fget(mpf, &dbt_pgno, ip, txn, 0, &dbt_pagep)) != 0) 565 return (ret); 566 if ((ret = 567 __memp_fget(mpf, &match_pgno, 568 ip, txn, 0, &match_pagep)) != 0) { 569 (void)__memp_fput( 570 mpf, ip, dbt_pagep, DB_PRIORITY_UNCHANGED); 571 return (ret); 572 } 573 cmp_bytes = page_sz < max_data ? page_sz : max_data; 574 for (p1 = (u_int8_t *)dbt_pagep + P_OVERHEAD(dbp), 575 p2 = (u_int8_t *)match_pagep + P_OVERHEAD(dbp); 576 cmp_bytes-- > 0; ++p1, ++p2) 577 if (*p1 != *p2) { 578 *cmpp = (long)*p1 - (long)*p2; 579 break; 580 } 581 582 dbt_pgno = NEXT_PGNO(dbt_pagep); 583 match_pgno = NEXT_PGNO(match_pagep); 584 max_data -= page_sz; 585 if ((ret = __memp_fput(mpf, 586 ip, dbt_pagep, DB_PRIORITY_UNCHANGED)) != 0) { 587 (void)__memp_fput(mpf, 588 ip, match_pagep, DB_PRIORITY_UNCHANGED); 589 return (ret); 590 } 591 if ((ret = __memp_fput(mpf, 592 ip, match_pagep, DB_PRIORITY_UNCHANGED)) != 0) 593 return (ret); 594 if (*cmpp != 0) 595 return (0); 596 } 597 598 /* If a lexicographic mismatch was found, then the result has already 599 * been returned. If the DBTs matched, consider the lengths of the 600 * items, and return appropriately. 601 */ 602 if (dbt_len > match_len) /* DBT is longer than the match key. */ 603 *cmpp = 1; 604 else if (match_len > dbt_len) /* DBT is shorter than the match key. */ 605 *cmpp = -1; 606 else 607 *cmpp = 0; 608 609 return (0); 610 611} 612