1/* 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1998,2008 Oracle. All rights reserved. 5 * 6 * $Id: db_join.c,v 12.27 2008/01/08 20:58:10 bostic Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/db_join.h" 14#include "dbinc/btree.h" 15#include "dbinc/lock.h" 16 17static int __db_join_close_pp __P((DBC *)); 18static int __db_join_cmp __P((const void *, const void *)); 19static int __db_join_del __P((DBC *, u_int32_t)); 20static int __db_join_get __P((DBC *, DBT *, DBT *, u_int32_t)); 21static int __db_join_get_pp __P((DBC *, DBT *, DBT *, u_int32_t)); 22static int __db_join_getnext __P((DBC *, DBT *, DBT *, u_int32_t, u_int32_t)); 23static int __db_join_primget __P((DB *, DB_THREAD_INFO *, 24 DB_TXN *, DB_LOCKER *, DBT *, DBT *, u_int32_t)); 25static int __db_join_put __P((DBC *, DBT *, DBT *, u_int32_t)); 26 27/* 28 * Check to see if the Nth secondary cursor of join cursor jc is pointing 29 * to a sorted duplicate set. 30 */ 31#define SORTED_SET(jc, n) ((jc)->j_curslist[(n)]->dbp->dup_compare != NULL) 32 33/* 34 * This is the duplicate-assisted join functionality. Right now we're 35 * going to write it such that we return one item at a time, although 36 * I think we may need to optimize it to return them all at once. 37 * It should be easier to get it working this way, and I believe that 38 * changing it should be fairly straightforward. 39 * 40 * We optimize the join by sorting cursors from smallest to largest 41 * cardinality. In most cases, this is indeed optimal. However, if 42 * a cursor with large cardinality has very few data in common with the 43 * first cursor, it is possible that the join will be made faster by 44 * putting it earlier in the cursor list. Since we have no way to detect 45 * cases like this, we simply provide a flag, DB_JOIN_NOSORT, which retains 46 * the sort order specified by the caller, who may know more about the 47 * structure of the data. 48 * 49 * The first cursor moves sequentially through the duplicate set while 50 * the others search explicitly for the duplicate in question. 51 * 52 */ 53 54/* 55 * __db_join -- 56 * This is the interface to the duplicate-assisted join functionality. 57 * In the same way that cursors mark a position in a database, a cursor 58 * can mark a position in a join. While most cursors are created by the 59 * cursor method of a DB, join cursors are created through an explicit 60 * call to DB->join. 61 * 62 * The curslist is an array of existing, initialized cursors and primary 63 * is the DB of the primary file. The data item that joins all the 64 * cursors in the curslist is used as the key into the primary and that 65 * key and data are returned. When no more items are left in the join 66 * set, the c_next operation off the join cursor will return DB_NOTFOUND. 67 * 68 * PUBLIC: int __db_join __P((DB *, DBC **, DBC **, u_int32_t)); 69 */ 70int 71__db_join(primary, curslist, dbcp, flags) 72 DB *primary; 73 DBC **curslist, **dbcp; 74 u_int32_t flags; 75{ 76 DBC *dbc; 77 ENV *env; 78 JOIN_CURSOR *jc; 79 size_t ncurs, nslots; 80 u_int32_t i; 81 int ret; 82 83 env = primary->env; 84 dbc = NULL; 85 jc = NULL; 86 87 if ((ret = __os_calloc(env, 1, sizeof(DBC), &dbc)) != 0) 88 goto err; 89 90 if ((ret = __os_calloc(env, 1, sizeof(JOIN_CURSOR), &jc)) != 0) 91 goto err; 92 93 if ((ret = __os_malloc(env, 256, &jc->j_key.data)) != 0) 94 goto err; 95 jc->j_key.ulen = 256; 96 F_SET(&jc->j_key, DB_DBT_USERMEM); 97 98 F_SET(&jc->j_rdata, DB_DBT_REALLOC); 99 100 for (jc->j_curslist = curslist; 101 *jc->j_curslist != NULL; jc->j_curslist++) 102 ; 103 104 /* 105 * The number of cursor slots we allocate is one greater than 106 * the number of cursors involved in the join, because the 107 * list is NULL-terminated. 108 */ 109 ncurs = (size_t)(jc->j_curslist - curslist); 110 nslots = ncurs + 1; 111 112 /* 113 * !!! -- A note on the various lists hanging off jc. 114 * 115 * j_curslist is the initial NULL-terminated list of cursors passed 116 * into __db_join. The original cursors are not modified; pristine 117 * copies are required because, in databases with unsorted dups, we 118 * must reset all of the secondary cursors after the first each 119 * time the first one is incremented, or else we will lose data 120 * which happen to be sorted differently in two different cursors. 121 * 122 * j_workcurs is where we put those copies that we're planning to 123 * work with. They're lazily c_dup'ed from j_curslist as we need 124 * them, and closed when the join cursor is closed or when we need 125 * to reset them to their original values (in which case we just 126 * c_dup afresh). 127 * 128 * j_fdupcurs is an array of cursors which point to the first 129 * duplicate in the duplicate set that contains the data value 130 * we're currently interested in. We need this to make 131 * __db_join_get correctly return duplicate duplicates; i.e., if a 132 * given data value occurs twice in the set belonging to cursor #2, 133 * and thrice in the set belonging to cursor #3, and once in all 134 * the other cursors, successive calls to __db_join_get need to 135 * return that data item six times. To make this happen, each time 136 * cursor N is allowed to advance to a new datum, all cursors M 137 * such that M > N have to be reset to the first duplicate with 138 * that datum, so __db_join_get will return all the dup-dups again. 139 * We could just reset them to the original cursor from j_curslist, 140 * but that would be a bit slower in the unsorted case and a LOT 141 * slower in the sorted one. 142 * 143 * j_exhausted is a list of boolean values which represent 144 * whether or not their corresponding cursors are "exhausted", 145 * i.e. whether the datum under the corresponding cursor has 146 * been found not to exist in any unreturned combinations of 147 * later secondary cursors, in which case they are ready to be 148 * incremented. 149 */ 150 151 /* We don't want to free regions whose callocs have failed. */ 152 jc->j_curslist = NULL; 153 jc->j_workcurs = NULL; 154 jc->j_fdupcurs = NULL; 155 jc->j_exhausted = NULL; 156 157 if ((ret = __os_calloc(env, nslots, sizeof(DBC *), 158 &jc->j_curslist)) != 0) 159 goto err; 160 if ((ret = __os_calloc(env, nslots, sizeof(DBC *), 161 &jc->j_workcurs)) != 0) 162 goto err; 163 if ((ret = __os_calloc(env, nslots, sizeof(DBC *), 164 &jc->j_fdupcurs)) != 0) 165 goto err; 166 if ((ret = __os_calloc(env, nslots, sizeof(u_int8_t), 167 &jc->j_exhausted)) != 0) 168 goto err; 169 for (i = 0; curslist[i] != NULL; i++) { 170 jc->j_curslist[i] = curslist[i]; 171 jc->j_workcurs[i] = NULL; 172 jc->j_fdupcurs[i] = NULL; 173 jc->j_exhausted[i] = 0; 174 } 175 jc->j_ncurs = (u_int32_t)ncurs; 176 177 /* 178 * If DB_JOIN_NOSORT is not set, optimize secondary cursors by 179 * sorting in order of increasing cardinality. 180 */ 181 if (!LF_ISSET(DB_JOIN_NOSORT)) 182 qsort(jc->j_curslist, ncurs, sizeof(DBC *), __db_join_cmp); 183 184 /* 185 * We never need to reset the 0th cursor, so there's no 186 * solid reason to use workcurs[0] rather than curslist[0] in 187 * join_get. Nonetheless, it feels cleaner to do it for symmetry, 188 * and this is the most logical place to copy it. 189 * 190 * !!! 191 * There's no need to close the new cursor if we goto err only 192 * because this is the last thing that can fail. Modifier of this 193 * function beware! 194 */ 195 if ((ret = 196 __dbc_dup(jc->j_curslist[0], jc->j_workcurs, DB_POSITION)) != 0) 197 goto err; 198 199 dbc->close = dbc->c_close = __db_join_close_pp; 200 dbc->del = dbc->c_del = __db_join_del; 201 dbc->get = dbc->c_get = __db_join_get_pp; 202 dbc->put = dbc->c_put = __db_join_put; 203 dbc->internal = (DBC_INTERNAL *)jc; 204 dbc->dbp = primary; 205 jc->j_primary = primary; 206 207 /* Stash the first cursor's transaction here for easy access. */ 208 dbc->txn = curslist[0]->txn; 209 210 *dbcp = dbc; 211 212 MUTEX_LOCK(env, primary->mutex); 213 TAILQ_INSERT_TAIL(&primary->join_queue, dbc, links); 214 MUTEX_UNLOCK(env, primary->mutex); 215 216 return (0); 217 218err: if (jc != NULL) { 219 if (jc->j_curslist != NULL) 220 __os_free(env, jc->j_curslist); 221 if (jc->j_workcurs != NULL) { 222 if (jc->j_workcurs[0] != NULL) 223 (void)__dbc_close(jc->j_workcurs[0]); 224 __os_free(env, jc->j_workcurs); 225 } 226 if (jc->j_fdupcurs != NULL) 227 __os_free(env, jc->j_fdupcurs); 228 if (jc->j_exhausted != NULL) 229 __os_free(env, jc->j_exhausted); 230 __os_free(env, jc); 231 } 232 if (dbc != NULL) 233 __os_free(env, dbc); 234 return (ret); 235} 236 237/* 238 * __db_join_close_pp -- 239 * DBC->close pre/post processing for join cursors. 240 */ 241static int 242__db_join_close_pp(dbc) 243 DBC *dbc; 244{ 245 DB *dbp; 246 DB_THREAD_INFO *ip; 247 ENV *env; 248 int handle_check, ret, t_ret; 249 250 dbp = dbc->dbp; 251 env = dbp->env; 252 253 ENV_ENTER(env, ip); 254 255 handle_check = IS_ENV_REPLICATED(env); 256 if (handle_check && 257 (ret = __db_rep_enter(dbp, 1, 0, dbc->txn != NULL)) != 0) { 258 handle_check = 0; 259 goto err; 260 } 261 262 ret = __db_join_close(dbc); 263 264 if (handle_check && (t_ret = __env_db_rep_exit(env)) != 0 && ret == 0) 265 ret = t_ret; 266 267err: ENV_LEAVE(env, ip); 268 return (ret); 269} 270 271static int 272__db_join_put(dbc, key, data, flags) 273 DBC *dbc; 274 DBT *key; 275 DBT *data; 276 u_int32_t flags; 277{ 278 COMPQUIET(dbc, NULL); 279 COMPQUIET(key, NULL); 280 COMPQUIET(data, NULL); 281 COMPQUIET(flags, 0); 282 return (EINVAL); 283} 284 285static int 286__db_join_del(dbc, flags) 287 DBC *dbc; 288 u_int32_t flags; 289{ 290 COMPQUIET(dbc, NULL); 291 COMPQUIET(flags, 0); 292 return (EINVAL); 293} 294 295/* 296 * __db_join_get_pp -- 297 * DBjoin->get pre/post processing. 298 */ 299static int 300__db_join_get_pp(dbc, key, data, flags) 301 DBC *dbc; 302 DBT *key, *data; 303 u_int32_t flags; 304{ 305 DB *dbp; 306 DB_THREAD_INFO *ip; 307 ENV *env; 308 u_int32_t handle_check, save_flags; 309 int ret, t_ret; 310 311 dbp = dbc->dbp; 312 env = dbp->env; 313 314 /* Save the original flags value. */ 315 save_flags = flags; 316 317 if (LF_ISSET(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW)) { 318 if (!LOCKING_ON(env)) 319 return (__db_fnl(env, "DBC->get")); 320 LF_CLR(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW); 321 } 322 323 switch (flags) { 324 case 0: 325 case DB_JOIN_ITEM: 326 break; 327 default: 328 return (__db_ferr(env, "DBC->get", 0)); 329 } 330 331 /* 332 * A partial get of the key of a join cursor don't make much sense; 333 * the entire key is necessary to query the primary database 334 * and find the datum, and so regardless of the size of the key 335 * it would not be a performance improvement. Since it would require 336 * special handling, we simply disallow it. 337 * 338 * A partial get of the data, however, potentially makes sense (if 339 * all possible data are a predictable large structure, for instance) 340 * and causes us no headaches, so we permit it. 341 */ 342 if (F_ISSET(key, DB_DBT_PARTIAL)) { 343 __db_errx(env, 344 "DB_DBT_PARTIAL may not be set on key during join_get"); 345 return (EINVAL); 346 } 347 348 ENV_ENTER(env, ip); 349 350 handle_check = IS_ENV_REPLICATED(env); 351 if (handle_check && 352 (ret = __db_rep_enter(dbp, 1, 0, dbc->txn != NULL)) != 0) { 353 handle_check = 0; 354 goto err; 355 } 356 357 /* Restore the original flags value. */ 358 flags = save_flags; 359 360 ret = __db_join_get(dbc, key, data, flags); 361 362 if (handle_check && (t_ret = __env_db_rep_exit(env)) != 0 && ret == 0) 363 ret = t_ret; 364 365err: ENV_LEAVE(env, ip); 366 __dbt_userfree(env, key, NULL, NULL); 367 return (ret); 368} 369 370static int 371__db_join_get(dbc, key_arg, data_arg, flags) 372 DBC *dbc; 373 DBT *key_arg, *data_arg; 374 u_int32_t flags; 375{ 376 DB *dbp; 377 DBC *cp; 378 DBT *key_n, key_n_mem; 379 ENV *env; 380 JOIN_CURSOR *jc; 381 int db_manage_data, ret; 382 u_int32_t i, j, operation, opmods; 383 384 dbp = dbc->dbp; 385 env = dbp->env; 386 jc = (JOIN_CURSOR *)dbc->internal; 387 388 operation = LF_ISSET(DB_OPFLAGS_MASK); 389 390 /* !!! 391 * If the set of flags here changes, check that __db_join_primget 392 * is updated to handle them properly. 393 */ 394 opmods = LF_ISSET(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW); 395 396 /* 397 * Since we are fetching the key as a datum in the secondary indices, 398 * we must be careful of caller-specified DB_DBT_* memory 399 * management flags. If necessary, use a stack-allocated DBT; 400 * we'll appropriately copy and/or allocate the data later. 401 */ 402 if (F_ISSET(key_arg, 403 DB_DBT_MALLOC | DB_DBT_USERCOPY | DB_DBT_USERMEM)) { 404 /* We just use the default buffer; no need to go malloc. */ 405 key_n = &key_n_mem; 406 memset(key_n, 0, sizeof(DBT)); 407 } else { 408 /* 409 * Either DB_DBT_REALLOC or the default buffer will work 410 * fine if we have to reuse it, as we do. 411 */ 412 key_n = key_arg; 413 } 414 if (F_ISSET(key_arg, DB_DBT_USERCOPY)) 415 key_arg->data = NULL; 416 417 /* 418 * If our last attempt to do a get on the primary key failed, 419 * short-circuit the join and try again with the same key. 420 */ 421 if (F_ISSET(jc, JOIN_RETRY)) 422 goto samekey; 423 F_CLR(jc, JOIN_RETRY); 424 425retry: ret = __dbc_get(jc->j_workcurs[0], &jc->j_key, key_n, 426 opmods | (jc->j_exhausted[0] ? DB_NEXT_DUP : DB_CURRENT)); 427 428 if (ret == DB_BUFFER_SMALL) { 429 jc->j_key.ulen <<= 1; 430 if ((ret = __os_realloc(env, 431 jc->j_key.ulen, &jc->j_key.data)) != 0) 432 goto mem_err; 433 goto retry; 434 } 435 436 /* 437 * If ret == DB_NOTFOUND, we're out of elements of the first 438 * secondary cursor. This is how we finally finish the join 439 * if all goes well. 440 */ 441 if (ret != 0) 442 goto err; 443 444 /* 445 * If jc->j_exhausted[0] == 1, we've just advanced the first cursor, 446 * and we're going to want to advance all the cursors that point to 447 * the first member of a duplicate duplicate set (j_fdupcurs[1..N]). 448 * Close all the cursors in j_fdupcurs; we'll reopen them the 449 * first time through the upcoming loop. 450 */ 451 for (i = 1; i < jc->j_ncurs; i++) { 452 if (jc->j_fdupcurs[i] != NULL && 453 (ret = __dbc_close(jc->j_fdupcurs[i])) != 0) 454 goto err; 455 jc->j_fdupcurs[i] = NULL; 456 } 457 458 /* 459 * If jc->j_curslist[1] == NULL, we have only one cursor in the join. 460 * Thus, we can safely increment that one cursor on each call 461 * to __db_join_get, and we signal this by setting jc->j_exhausted[0] 462 * right away. 463 * 464 * Otherwise, reset jc->j_exhausted[0] to 0, so that we don't 465 * increment it until we know we're ready to. 466 */ 467 if (jc->j_curslist[1] == NULL) 468 jc->j_exhausted[0] = 1; 469 else 470 jc->j_exhausted[0] = 0; 471 472 /* We have the first element; now look for it in the other cursors. */ 473 for (i = 1; i < jc->j_ncurs; i++) { 474 DB_ASSERT(env, jc->j_curslist[i] != NULL); 475 if (jc->j_workcurs[i] == NULL) 476 /* If this is NULL, we need to dup curslist into it. */ 477 if ((ret = __dbc_dup(jc->j_curslist[i], 478 &jc->j_workcurs[i], DB_POSITION)) != 0) 479 goto err; 480 481retry2: cp = jc->j_workcurs[i]; 482 483 if ((ret = __db_join_getnext(cp, &jc->j_key, key_n, 484 jc->j_exhausted[i], opmods)) == DB_NOTFOUND) { 485 /* 486 * jc->j_workcurs[i] has no more of the datum we're 487 * interested in. Go back one cursor and get 488 * a new dup. We can't just move to a new 489 * element of the outer relation, because that way 490 * we might miss duplicate duplicates in cursor i-1. 491 * 492 * If this takes us back to the first cursor, 493 * -then- we can move to a new element of the outer 494 * relation. 495 */ 496 --i; 497 jc->j_exhausted[i] = 1; 498 499 if (i == 0) { 500 for (j = 1; jc->j_workcurs[j] != NULL; j++) { 501 /* 502 * We're moving to a new element of 503 * the first secondary cursor. If 504 * that cursor is sorted, then any 505 * other sorted cursors can be safely 506 * reset to the first duplicate 507 * duplicate in the current set if we 508 * have a pointer to it (we can't just 509 * leave them be, or we'll miss 510 * duplicate duplicates in the outer 511 * relation). 512 * 513 * If the first cursor is unsorted, or 514 * if cursor j is unsorted, we can 515 * make no assumptions about what 516 * we're looking for next or where it 517 * will be, so we reset to the very 518 * beginning (setting workcurs NULL 519 * will achieve this next go-round). 520 * 521 * XXX: This is likely to break 522 * horribly if any two cursors are 523 * both sorted, but have different 524 * specified sort functions. For, 525 * now, we dismiss this as pathology 526 * and let strange things happen--we 527 * can't make rope childproof. 528 */ 529 if ((ret = __dbc_close( 530 jc->j_workcurs[j])) != 0) 531 goto err; 532 if (!SORTED_SET(jc, 0) || 533 !SORTED_SET(jc, j) || 534 jc->j_fdupcurs[j] == NULL) 535 /* 536 * Unsafe conditions; 537 * reset fully. 538 */ 539 jc->j_workcurs[j] = NULL; 540 else 541 /* Partial reset suffices. */ 542 if ((__dbc_dup( 543 jc->j_fdupcurs[j], 544 &jc->j_workcurs[j], 545 DB_POSITION)) != 0) 546 goto err; 547 jc->j_exhausted[j] = 0; 548 } 549 goto retry; 550 /* NOTREACHED */ 551 } 552 553 /* 554 * We're about to advance the cursor and need to 555 * reset all of the workcurs[j] where j>i, so that 556 * we don't miss any duplicate duplicates. 557 */ 558 for (j = i + 1; 559 jc->j_workcurs[j] != NULL; 560 j++) { 561 if ((ret = 562 __dbc_close(jc->j_workcurs[j])) != 0) 563 goto err; 564 jc->j_exhausted[j] = 0; 565 if (jc->j_fdupcurs[j] == NULL) 566 jc->j_workcurs[j] = NULL; 567 else if ((ret = __dbc_dup(jc->j_fdupcurs[j], 568 &jc->j_workcurs[j], DB_POSITION)) != 0) 569 goto err; 570 } 571 goto retry2; 572 /* NOTREACHED */ 573 } 574 575 if (ret == DB_BUFFER_SMALL) { 576 jc->j_key.ulen <<= 1; 577 if ((ret = __os_realloc(env, jc->j_key.ulen, 578 &jc->j_key.data)) != 0) { 579mem_err: __db_errx(env, 580 "Allocation failed for join key, len = %lu", 581 (u_long)jc->j_key.ulen); 582 goto err; 583 } 584 goto retry2; 585 } 586 587 if (ret != 0) 588 goto err; 589 590 /* 591 * If we made it this far, we've found a matching 592 * datum in cursor i. Mark the current cursor 593 * unexhausted, so we don't miss any duplicate 594 * duplicates the next go-round--unless this is the 595 * very last cursor, in which case there are none to 596 * miss, and we'll need that exhausted flag to finally 597 * get a DB_NOTFOUND and move on to the next datum in 598 * the outermost cursor. 599 */ 600 if (i + 1 != jc->j_ncurs) 601 jc->j_exhausted[i] = 0; 602 else 603 jc->j_exhausted[i] = 1; 604 605 /* 606 * If jc->j_fdupcurs[i] is NULL and the ith cursor's dups are 607 * sorted, then we're here for the first time since advancing 608 * cursor 0, and we have a new datum of interest. 609 * jc->j_workcurs[i] points to the beginning of a set of 610 * duplicate duplicates; store this into jc->j_fdupcurs[i]. 611 */ 612 if (SORTED_SET(jc, i) && jc->j_fdupcurs[i] == NULL && (ret = 613 __dbc_dup(cp, &jc->j_fdupcurs[i], DB_POSITION)) != 0) 614 goto err; 615 } 616 617err: if (ret != 0) 618 return (ret); 619 620 if (0) { 621samekey: /* 622 * Get the key we tried and failed to return last time; 623 * it should be the current datum of all the secondary cursors. 624 */ 625 if ((ret = __dbc_get(jc->j_workcurs[0], 626 &jc->j_key, key_n, DB_CURRENT | opmods)) != 0) 627 return (ret); 628 F_CLR(jc, JOIN_RETRY); 629 } 630 631 /* 632 * ret == 0; we have a key to return. 633 * 634 * If DB_DBT_USERMEM or DB_DBT_MALLOC is set, we need to copy the key 635 * back into the dbt we were given for the key; call __db_retcopy. 636 * Otherwise, assert that we do not need to copy anything and proceed. 637 */ 638 DB_ASSERT(env, F_ISSET(key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC | 639 DB_DBT_USERCOPY) || key_n == key_arg); 640 641 if ((F_ISSET(key_arg, DB_DBT_USERMEM | DB_DBT_MALLOC | 642 DB_DBT_USERCOPY)) && 643 (ret = __db_retcopy(env, 644 key_arg, key_n->data, key_n->size, NULL, NULL)) != 0) { 645 /* 646 * The retcopy failed, most commonly because we have a user 647 * buffer for the key which is too small. Set things up to 648 * retry next time, and return. 649 */ 650 F_SET(jc, JOIN_RETRY); 651 return (ret); 652 } 653 654 /* 655 * If DB_JOIN_ITEM is set, we return it; otherwise we do the lookup 656 * in the primary and then return. 657 */ 658 if (operation == DB_JOIN_ITEM) 659 return (0); 660 661 /* 662 * If data_arg->flags == 0--that is, if DB is managing the 663 * data DBT's memory--it's not safe to just pass the DBT 664 * through to the primary get call, since we don't want that 665 * memory to belong to the primary DB handle (and if the primary 666 * is free-threaded, it can't anyway). 667 * 668 * Instead, use memory that is managed by the join cursor, in 669 * jc->j_rdata. 670 */ 671 if (!F_ISSET(data_arg, DB_DBT_MALLOC | DB_DBT_REALLOC | 672 DB_DBT_USERMEM | DB_DBT_USERCOPY)) 673 db_manage_data = 1; 674 else 675 db_manage_data = 0; 676 if ((ret = __db_join_primget(jc->j_primary, dbc->thread_info, 677 jc->j_curslist[0]->txn, jc->j_curslist[0]->locker, key_n, 678 db_manage_data ? &jc->j_rdata : data_arg, opmods)) != 0) { 679 if (ret == DB_NOTFOUND) { 680 if (LF_ISSET(DB_READ_UNCOMMITTED) || 681 (jc->j_curslist[0]->txn != NULL && F_ISSET( 682 jc->j_curslist[0]->txn, TXN_READ_UNCOMMITTED))) 683 goto retry; 684 /* 685 * If ret == DB_NOTFOUND, the primary and secondary 686 * are out of sync; every item in each secondary 687 * should correspond to something in the primary, 688 * or we shouldn't have done the join this way. 689 * Wail. 690 */ 691 ret = __db_secondary_corrupt(jc->j_primary); 692 } else 693 /* 694 * The get on the primary failed for some other 695 * reason, most commonly because we're using a user 696 * buffer that's not big enough. Flag our failure 697 * so we can return the same key next time. 698 */ 699 F_SET(jc, JOIN_RETRY); 700 } 701 if (db_manage_data && ret == 0) { 702 data_arg->data = jc->j_rdata.data; 703 data_arg->size = jc->j_rdata.size; 704 } 705 706 return (ret); 707} 708 709/* 710 * __db_join_close -- 711 * DBC->close for join cursors. 712 * 713 * PUBLIC: int __db_join_close __P((DBC *)); 714 */ 715int 716__db_join_close(dbc) 717 DBC *dbc; 718{ 719 DB *dbp; 720 DB_THREAD_INFO *ip; 721 ENV *env; 722 JOIN_CURSOR *jc; 723 int ret, t_ret; 724 u_int32_t i; 725 726 jc = (JOIN_CURSOR *)dbc->internal; 727 dbp = dbc->dbp; 728 env = dbp->env; 729 ret = t_ret = 0; 730 731 /* 732 * Remove from active list of join cursors. Note that this 733 * must happen before any action that can fail and return, or else 734 * __db_close may loop indefinitely. 735 */ 736 MUTEX_LOCK(env, dbp->mutex); 737 TAILQ_REMOVE(&dbp->join_queue, dbc, links); 738 MUTEX_UNLOCK(env, dbp->mutex); 739 740 ENV_ENTER(env, ip); 741 /* 742 * Close any open scratch cursors. In each case, there may 743 * not be as many outstanding as there are cursors in 744 * curslist, but we want to close whatever's there. 745 * 746 * If any close fails, there's no reason not to close everything else; 747 * we'll just return the error code of the last one to fail. There's 748 * not much the caller can do anyway, since these cursors only exist 749 * hanging off a db-internal data structure that they shouldn't be 750 * mucking with. 751 */ 752 for (i = 0; i < jc->j_ncurs; i++) { 753 if (jc->j_workcurs[i] != NULL && 754 (t_ret = __dbc_close(jc->j_workcurs[i])) != 0) 755 ret = t_ret; 756 if (jc->j_fdupcurs[i] != NULL && 757 (t_ret = __dbc_close(jc->j_fdupcurs[i])) != 0) 758 ret = t_ret; 759 } 760 ENV_LEAVE(env, ip); 761 762 __os_free(env, jc->j_exhausted); 763 __os_free(env, jc->j_curslist); 764 __os_free(env, jc->j_workcurs); 765 __os_free(env, jc->j_fdupcurs); 766 __os_free(env, jc->j_key.data); 767 if (jc->j_rdata.data != NULL) 768 __os_ufree(env, jc->j_rdata.data); 769 __os_free(env, jc); 770 __os_free(env, dbc); 771 772 return (ret); 773} 774 775/* 776 * __db_join_getnext -- 777 * This function replaces the DBC_CONTINUE and DBC_KEYSET 778 * functionality inside the various cursor get routines. 779 * 780 * If exhausted == 0, we're not done with the current datum; 781 * return it if it matches "matching", otherwise search 782 * using DB_GET_BOTHC (which is faster than iteratively doing 783 * DB_NEXT_DUP) forward until we find one that does. 784 * 785 * If exhausted == 1, we are done with the current datum, so just 786 * leap forward to searching NEXT_DUPs. 787 * 788 * If no matching datum exists, returns DB_NOTFOUND, else 0. 789 */ 790static int 791__db_join_getnext(dbc, key, data, exhausted, opmods) 792 DBC *dbc; 793 DBT *key, *data; 794 u_int32_t exhausted, opmods; 795{ 796 int ret, cmp; 797 DB *dbp; 798 DBT ldata; 799 int (*func) __P((DB *, const DBT *, const DBT *)); 800 801 dbp = dbc->dbp; 802 func = (dbp->dup_compare == NULL) ? __bam_defcmp : dbp->dup_compare; 803 804 switch (exhausted) { 805 case 0: 806 /* 807 * We don't want to step on data->data; use a new 808 * DBT and malloc so we don't step on dbc's rdata memory. 809 */ 810 memset(&ldata, 0, sizeof(DBT)); 811 F_SET(&ldata, DB_DBT_MALLOC); 812 if ((ret = __dbc_get(dbc, 813 key, &ldata, opmods | DB_CURRENT)) != 0) 814 break; 815 cmp = func(dbp, data, &ldata); 816 if (cmp == 0) { 817 /* 818 * We have to return the real data value. Copy 819 * it into data, then free the buffer we malloc'ed 820 * above. 821 */ 822 if ((ret = __db_retcopy(dbp->env, data, ldata.data, 823 ldata.size, &data->data, &data->size)) != 0) 824 return (ret); 825 __os_ufree(dbp->env, ldata.data); 826 return (0); 827 } 828 829 /* 830 * Didn't match--we want to fall through and search future 831 * dups. We just forget about ldata and free 832 * its buffer--data contains the value we're searching for. 833 */ 834 __os_ufree(dbp->env, ldata.data); 835 /* FALLTHROUGH */ 836 case 1: 837 ret = __dbc_get(dbc, key, data, opmods | DB_GET_BOTHC); 838 break; 839 default: 840 ret = EINVAL; 841 break; 842 } 843 844 return (ret); 845} 846 847/* 848 * __db_join_cmp -- 849 * Comparison function for sorting DBCs in cardinality order. 850 */ 851static int 852__db_join_cmp(a, b) 853 const void *a, *b; 854{ 855 DBC *dbca, *dbcb; 856 db_recno_t counta, countb; 857 858 dbca = *((DBC * const *)a); 859 dbcb = *((DBC * const *)b); 860 861 if (__dbc_count(dbca, &counta) != 0 || 862 __dbc_count(dbcb, &countb) != 0) 863 return (0); 864 865 return ((long)counta - (long)countb); 866} 867 868/* 869 * __db_join_primget -- 870 * Perform a DB->get in the primary, being careful not to use a new 871 * locker ID if we're doing CDB locking. 872 */ 873static int 874__db_join_primget(dbp, ip, txn, locker, key, data, flags) 875 DB *dbp; 876 DB_THREAD_INFO *ip; 877 DB_TXN *txn; 878 DB_LOCKER *locker; 879 DBT *key, *data; 880 u_int32_t flags; 881{ 882 DBC *dbc; 883 u_int32_t rmw; 884 int ret, t_ret; 885 886 if ((ret = __db_cursor_int(dbp, ip, 887 txn, dbp->type, PGNO_INVALID, 0, locker, &dbc)) != 0) 888 return (ret); 889 890 /* 891 * The only allowable flags here are the two flags copied into "opmods" 892 * in __db_join_get, DB_RMW and DB_READ_UNCOMMITTED. The former is an 893 * op on the c_get call, the latter on the cursor call. It's a DB bug 894 * if we allow any other flags down in here. 895 */ 896 rmw = LF_ISSET(DB_RMW); 897 if (LF_ISSET(DB_READ_UNCOMMITTED) || 898 (txn != NULL && F_ISSET(txn, TXN_READ_UNCOMMITTED))) 899 F_SET(dbc, DBC_READ_UNCOMMITTED); 900 901 if (LF_ISSET(DB_READ_COMMITTED) || 902 (txn != NULL && F_ISSET(txn, TXN_READ_COMMITTED))) 903 F_SET(dbc, DBC_READ_COMMITTED); 904 905 LF_CLR(DB_READ_COMMITTED | DB_READ_UNCOMMITTED | DB_RMW); 906 DB_ASSERT(dbp->env, flags == 0); 907 908 F_SET(dbc, DBC_TRANSIENT); 909 910 /* 911 * This shouldn't be necessary, thanks to the fact that join cursors 912 * swap in their own DB_DBT_REALLOC'ed buffers, but just for form's 913 * sake, we mirror what __db_get does. 914 */ 915 SET_RET_MEM(dbc, dbp); 916 917 ret = __dbc_get(dbc, key, data, DB_SET | rmw); 918 919 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) 920 ret = t_ret; 921 922 return (ret); 923} 924 925/* 926 * __db_secondary_corrupt -- 927 * Report primary/secondary inconsistencies. 928 * 929 * PUBLIC: int __db_secondary_corrupt __P((DB *)); 930 */ 931int 932__db_secondary_corrupt(dbp) 933 DB *dbp; 934{ 935 __db_err(dbp->env, DB_SECONDARY_BAD, "%s%s%s", 936 dbp->fname == NULL ? "unnamed" : dbp->fname, 937 dbp->dname == NULL ? "" : "/", 938 dbp->dname == NULL ? "" : dbp->dname); 939 return (DB_SECONDARY_BAD); 940} 941