1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1999-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/btree.h" 14#include "dbinc/lock.h" 15#include "dbinc/log.h" 16#include "dbinc/mp.h" 17#include "dbinc/qam.h" 18 19static int __qam_bulk __P((DBC *, DBT *, u_int32_t)); 20static int __qamc_close __P((DBC *, db_pgno_t, int *)); 21static int __qamc_del __P((DBC *, u_int32_t)); 22static int __qamc_destroy __P((DBC *)); 23static int __qamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); 24static int __qamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); 25static int __qam_consume __P((DBC *, QMETA *, db_recno_t)); 26static int __qam_getno __P((DB *, const DBT *, db_recno_t *)); 27 28#define DONT_NEED_LOCKS(dbc) ((dbc)->txn == NULL || \ 29 F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED)) 30 31/* 32 * __qam_position -- 33 * Position a queued access method cursor at a record. This returns 34 * the page locked. *exactp will be set if the record is valid. 35 * PUBLIC: int __qam_position 36 * PUBLIC: __P((DBC *, db_recno_t *, u_int32_t, int *)); 37 */ 38int 39__qam_position(dbc, recnop, get_mode, exactp) 40 DBC *dbc; /* open cursor */ 41 db_recno_t *recnop; /* pointer to recno to find */ 42 u_int32_t get_mode; /* flags to __memp_fget */ 43 int *exactp; /* indicate if it was found */ 44{ 45 DB *dbp; 46 QAMDATA *qp; 47 QUEUE_CURSOR *cp; 48 db_pgno_t pg; 49 int ret; 50 51 dbp = dbc->dbp; 52 cp = (QUEUE_CURSOR *)dbc->internal; 53 54 /* Fetch the page for this recno. */ 55 cp->pgno = pg = QAM_RECNO_PAGE(dbp, *recnop); 56 57 cp->page = NULL; 58 *exactp = 0; 59 if ((ret = __qam_fget(dbc, &pg, get_mode, &cp->page)) != 0) { 60 if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE) && 61 (ret == DB_PAGE_NOTFOUND || ret == ENOENT)) 62 ret = 0; 63 return (ret); 64 } 65 cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop); 66 67 if (PGNO(cp->page) == 0) { 68 /* 69 * We have read an uninitialized page: set the page number if 70 * we're creating the page. Otherwise, we know that the record 71 * doesn't exist yet. 72 */ 73 if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE)) { 74 *exactp = 0; 75 return (0); 76 } 77 DB_ASSERT(dbp->env, FLD_ISSET(get_mode, DB_MPOOL_CREATE)); 78 PGNO(cp->page) = pg; 79 TYPE(cp->page) = P_QAMDATA; 80 } 81 82 qp = QAM_GET_RECORD(dbp, cp->page, cp->indx); 83 *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0; 84 85 return (ret); 86} 87 88/* 89 * __qam_pitem -- 90 * Put an item on a queue page. Copy the data to the page and set the 91 * VALID and SET bits. If logging and the record was previously set, 92 * log that data, otherwise just log the new data. 93 * 94 * pagep must be write locked 95 * 96 * PUBLIC: int __qam_pitem 97 * PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *)); 98 */ 99int 100__qam_pitem(dbc, pagep, indx, recno, data) 101 DBC *dbc; 102 QPAGE *pagep; 103 u_int32_t indx; 104 db_recno_t recno; 105 DBT *data; 106{ 107 DB *dbp; 108 DBT olddata, pdata, *datap; 109 ENV *env; 110 QAMDATA *qp; 111 QUEUE *t; 112 u_int8_t *dest, *p; 113 int allocated, ret; 114 115 dbp = dbc->dbp; 116 env = dbp->env; 117 t = (QUEUE *)dbp->q_internal; 118 allocated = ret = 0; 119 120 if (data->size > t->re_len) 121 return (__db_rec_toobig(env, data->size, t->re_len)); 122 qp = QAM_GET_RECORD(dbp, pagep, indx); 123 124 p = qp->data; 125 datap = data; 126 if (F_ISSET(data, DB_DBT_PARTIAL)) { 127 if (data->doff + data->dlen > t->re_len) { 128 __db_errx(env, 129 "%s: data offset plus length larger than record size of %lu", 130 "Record length error", (u_long)t->re_len); 131 return (EINVAL); 132 } 133 134 if (data->size != data->dlen) 135 return (__db_rec_repl(env, data->size, data->dlen)); 136 137 if (data->size == t->re_len) 138 goto no_partial; 139 140 /* 141 * If we are logging, then we have to build the record 142 * first, otherwise, we can simply drop the change 143 * directly on the page. After this clause, make 144 * sure that datap and p are set up correctly so that 145 * copying datap into p does the right thing. 146 * 147 * Note, I am changing this so that if the existing 148 * record is not valid, we create a complete record 149 * to log so that both this and the recovery code is simpler. 150 */ 151 152 if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) { 153 datap = &pdata; 154 memset(datap, 0, sizeof(*datap)); 155 156 if ((ret = __os_malloc(env, 157 t->re_len, &datap->data)) != 0) 158 return (ret); 159 allocated = 1; 160 datap->size = t->re_len; 161 162 /* 163 * Construct the record if it's valid, otherwise set it 164 * all to the pad character. 165 */ 166 dest = datap->data; 167 if (F_ISSET(qp, QAM_VALID)) 168 memcpy(dest, p, t->re_len); 169 else 170 memset(dest, (int)t->re_pad, t->re_len); 171 172 dest += data->doff; 173 memcpy(dest, data->data, data->size); 174 } else { 175 datap = data; 176 p += data->doff; 177 } 178 } 179 180no_partial: 181 if (DBC_LOGGING(dbc)) { 182 olddata.size = 0; 183 if (F_ISSET(qp, QAM_SET)) { 184 olddata.data = qp->data; 185 olddata.size = t->re_len; 186 } 187 if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep), 188 0, &LSN(pagep), pagep->pgno, 189 indx, recno, datap, qp->flags, 190 olddata.size == 0 ? NULL : &olddata)) != 0) 191 goto err; 192 } else if (!F_ISSET((dbc), DBC_RECOVER)) 193 LSN_NOT_LOGGED(LSN(pagep)); 194 195 F_SET(qp, QAM_VALID | QAM_SET); 196 memcpy(p, datap->data, datap->size); 197 if (!F_ISSET(data, DB_DBT_PARTIAL)) 198 memset(p + datap->size, 199 (int)t->re_pad, t->re_len - datap->size); 200 201err: if (allocated) 202 __os_free(env, datap->data); 203 204 return (ret); 205} 206/* 207 * __qamc_put 208 * Cursor put for queued access method. 209 * BEFORE and AFTER cannot be specified. 210 */ 211static int 212__qamc_put(dbc, key, data, flags, pgnop) 213 DBC *dbc; 214 DBT *key, *data; 215 u_int32_t flags; 216 db_pgno_t *pgnop; 217{ 218 DB *dbp; 219 DB_MPOOLFILE *mpf; 220 ENV *env; 221 QMETA *meta; 222 QUEUE_CURSOR *cp; 223 db_pgno_t pg; 224 db_recno_t new_cur, new_first; 225 u_int32_t opcode; 226 int exact, ret, t_ret, writelock; 227 228 dbp = dbc->dbp; 229 env = dbp->env; 230 mpf = dbp->mpf; 231 if (pgnop != NULL) 232 *pgnop = PGNO_INVALID; 233 234 cp = (QUEUE_CURSOR *)dbc->internal; 235 236 switch (flags) { 237 case DB_KEYFIRST: 238 case DB_KEYLAST: 239 case DB_NOOVERWRITE: 240 case DB_OVERWRITE_DUP: 241 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) 242 return (ret); 243 /* FALLTHROUGH */ 244 case DB_CURRENT: 245 break; 246 default: 247 /* The interface shouldn't let anything else through. */ 248 return (__db_ferr(env, "DBC->put", 0)); 249 } 250 251 /* Write lock the record. */ 252 if ((ret = __db_lget(dbc, LCK_COUPLE, 253 cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0) 254 return (ret); 255 256 if ((ret = __qam_position(dbc, &cp->recno, 257 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0) { 258 /* We could not get the page, we can release the record lock. */ 259 (void)__LPUT(dbc, cp->lock); 260 return (ret); 261 } 262 263 if (exact != 0 && flags == DB_NOOVERWRITE) 264 ret = DB_KEYEXIST; 265 else 266 /* Put the item on the page. */ 267 ret = __qam_pitem(dbc, 268 (QPAGE *)cp->page, cp->indx, cp->recno, data); 269 270 if ((t_ret = __qam_fput(dbc, 271 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 272 ret = t_ret; 273 cp->page = NULL; 274 cp->lock_mode = DB_LOCK_WRITE; 275 if (ret != 0) 276 return (ret); 277 278 /* We may need to reset the head or tail of the queue. */ 279 pg = ((QUEUE *)dbp->q_internal)->q_meta; 280 281 writelock = 0; 282 if ((ret = __db_lget(dbc, LCK_COUPLE, 283 pg, DB_LOCK_READ, 0, &cp->lock)) != 0) 284 return (ret); 285 if ((ret = __memp_fget(mpf, &pg, 286 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 287 goto err; 288 289 opcode = 0; 290 new_cur = new_first = 0; 291 292 /* 293 * If the put address is outside the queue, adjust the head and 294 * tail of the queue. If the order is inverted we move 295 * the one which is closer. The first case is when the 296 * queue is empty, move first and current to where the new 297 * insert is. 298 */ 299 300recheck: 301 if (meta->first_recno == meta->cur_recno) { 302 new_first = cp->recno; 303 new_cur = cp->recno + 1; 304 if (new_cur == RECNO_OOB) 305 new_cur++; 306 opcode |= QAM_SETFIRST; 307 opcode |= QAM_SETCUR; 308 } else { 309 if (QAM_BEFORE_FIRST(meta, cp->recno)) { 310 new_first = cp->recno; 311 opcode |= QAM_SETFIRST; 312 } 313 314 if (QAM_AFTER_CURRENT(meta, cp->recno)) { 315 new_cur = cp->recno + 1; 316 if (new_cur == RECNO_OOB) 317 new_cur++; 318 opcode |= QAM_SETCUR; 319 } 320 } 321 322 if (opcode == 0) 323 goto done; 324 325 /* Drop the read lock and get the a write lock on the meta page. */ 326 if (writelock == 0 && (ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, 327 pg, DB_LOCK_WRITE, 0, &cp->lock)) != 0) 328 goto done; 329 if (writelock++ == 0) 330 goto recheck; 331 332 if (((ret = __memp_dirty(mpf, &meta, 333 dbc->thread_info, dbc->txn, dbc->priority, DB_MPOOL_DIRTY)) != 0 || 334 (DBC_LOGGING(dbc) && 335 (ret = __qam_mvptr_log(dbp, dbc->txn, 336 &meta->dbmeta.lsn, 0, opcode, meta->first_recno, 337 new_first, meta->cur_recno, new_cur, 338 &meta->dbmeta.lsn, PGNO_BASE_MD)) != 0))) 339 opcode = 0; 340 341 if (opcode & QAM_SETCUR) 342 meta->cur_recno = new_cur; 343 if (opcode & QAM_SETFIRST) 344 meta->first_recno = new_first; 345 346done: if (meta != NULL && (t_ret = __memp_fput(mpf, 347 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 348 ret = t_ret; 349 350err: /* Don't hold the meta page long term. */ 351 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) 352 ret = t_ret; 353 return (ret); 354} 355 356/* 357 * __qam_append -- 358 * Perform a put(DB_APPEND) in queue. 359 * 360 * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *)); 361 */ 362int 363__qam_append(dbc, key, data) 364 DBC *dbc; 365 DBT *key, *data; 366{ 367 DB *dbp; 368 DB_LOCK lock; 369 DB_MPOOLFILE *mpf; 370 QMETA *meta; 371 QPAGE *page; 372 QUEUE *qp; 373 QUEUE_CURSOR *cp; 374 db_pgno_t pg, metapg; 375 db_recno_t recno; 376 int ret, t_ret; 377 378 dbp = dbc->dbp; 379 mpf = dbp->mpf; 380 cp = (QUEUE_CURSOR *)dbc->internal; 381 382 /* Write lock the meta page. */ 383 metapg = ((QUEUE *)dbp->q_internal)->q_meta; 384 if ((ret = __db_lget(dbc, 0, metapg, DB_LOCK_WRITE, 0, &lock)) != 0) 385 return (ret); 386 if ((ret = __memp_fget(mpf, &metapg, 387 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0) 388 return (ret); 389 390 /* Get the next record number. */ 391 recno = meta->cur_recno; 392 meta->cur_recno++; 393 if (meta->cur_recno == RECNO_OOB) 394 meta->cur_recno++; 395 if (meta->cur_recno == meta->first_recno) { 396 meta->cur_recno--; 397 if (meta->cur_recno == RECNO_OOB) 398 meta->cur_recno--; 399 400 if (ret == 0) 401 ret = EFBIG; 402 goto err; 403 } 404 405 if (QAM_BEFORE_FIRST(meta, recno)) 406 meta->first_recno = recno; 407 408 /* Lock the record and release meta page lock. */ 409 ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, 410 recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock); 411 /* Release the meta page. */ 412 if ((t_ret = __memp_fput(mpf, 413 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 414 ret = t_ret; 415 meta = NULL; 416 417 /* 418 * The application may modify the data based on the selected record 419 * number. We always want to call this even if we ultimately end 420 * up aborting, because we are allocating a record number, regardless. 421 */ 422 if (dbc->dbp->db_append_recno != NULL && 423 (t_ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0 && 424 ret == 0) 425 ret = t_ret; 426 427 /* 428 * Capture errors from either the lock couple or the call to 429 * dbp->db_append_recno. 430 */ 431 if (ret != 0) 432 goto err; 433 434 cp->lock = lock; 435 cp->lock_mode = DB_LOCK_WRITE; 436 LOCK_INIT(lock); 437 438 pg = QAM_RECNO_PAGE(dbp, recno); 439 440 /* Fetch for write the data page. */ 441 if ((ret = __qam_fget(dbc, &pg, 442 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &page)) != 0) 443 goto err; 444 445 /* See if this is a new page. */ 446 if (page->pgno == 0) { 447 page->pgno = pg; 448 page->type = P_QAMDATA; 449 } 450 451 /* Put the item on the page and log it. */ 452 ret = __qam_pitem(dbc, page, 453 QAM_RECNO_INDEX(dbp, pg, recno), recno, data); 454 455 if ((t_ret = __qam_fput(dbc, 456 pg, page, dbc->priority)) != 0 && ret == 0) 457 ret = t_ret; 458 459 /* Return the record number to the user. */ 460 if (ret == 0 && key != NULL) 461 ret = __db_retcopy(dbp->env, key, 462 &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen); 463 464 /* Position the cursor on this record. */ 465 cp->recno = recno; 466 467 /* See if we are leaving the extent. */ 468 qp = (QUEUE *) dbp->q_internal; 469 if (qp->page_ext != 0 && 470 (recno % (qp->page_ext * qp->rec_page) == 0 || 471 recno == UINT32_MAX)) { 472 if ((ret = __db_lget(dbc, 473 0, metapg, DB_LOCK_READ, 0, &lock)) != 0) 474 goto err; 475 if ((ret = __memp_fget(mpf, &metapg, 476 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 477 goto err; 478 if (!QAM_AFTER_CURRENT(meta, recno)) 479 ret = __qam_fclose(dbp, pg); 480 } 481 482err: /* Release the meta page. */ 483 if (meta != NULL && (t_ret = __memp_fput(mpf, 484 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 485 ret = t_ret; 486 if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) 487 ret = t_ret; 488 489 return (ret); 490} 491 492/* 493 * __qamc_del -- 494 * Qam cursor->am_del function 495 */ 496static int 497__qamc_del(dbc, flags) 498 DBC *dbc; 499 u_int32_t flags; 500{ 501 DB *dbp; 502 DBT data; 503 DB_LOCK metalock; 504 DB_MPOOLFILE *mpf; 505 PAGE *pagep; 506 QAMDATA *qp; 507 QMETA *meta; 508 QUEUE_CURSOR *cp; 509 db_pgno_t pg; 510 db_recno_t first; 511 int exact, ret, t_ret; 512 513 dbp = dbc->dbp; 514 mpf = dbp->mpf; 515 cp = (QUEUE_CURSOR *)dbc->internal; 516 517 pg = ((QUEUE *)dbp->q_internal)->q_meta; 518 /* Read lock the meta page. */ 519 if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &metalock)) != 0) 520 return (ret); 521 522 if ((ret = __memp_fget(mpf, &pg, 523 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 524 return (ret); 525 526 if (QAM_NOT_VALID(meta, cp->recno)) { 527 ret = DB_NOTFOUND; 528 goto err; 529 } 530 first = meta->first_recno; 531 532 /* Don't hold the meta page long term. */ 533 if ((ret = __memp_fput(mpf, 534 dbc->thread_info, meta, dbc->priority)) != 0) 535 goto err; 536 meta = NULL; 537 if ((ret = __LPUT(dbc, metalock)) != 0) 538 goto err; 539 540 /* Get the record. */ 541 if ((ret = __db_lget(dbc, LCK_COUPLE, 542 cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0) 543 goto err; 544 cp->lock_mode = DB_LOCK_WRITE; 545 546 /* Find the record; delete only deletes exact matches. */ 547 if ((ret = __qam_position(dbc, &cp->recno, 548 DB_MPOOL_DIRTY, &exact)) != 0) 549 goto err; 550 551 if (!exact) { 552 ret = DB_NOTFOUND; 553 goto err; 554 } 555 556 pagep = cp->page; 557 qp = QAM_GET_RECORD(dbp, pagep, cp->indx); 558 559 if (DBC_LOGGING(dbc)) { 560 if (((QUEUE *)dbp->q_internal)->page_ext == 0 || 561 ((QUEUE *)dbp->q_internal)->re_len == 0) { 562 if ((ret = __qam_del_log(dbp, 563 dbc->txn, &LSN(pagep), 0, &LSN(pagep), 564 pagep->pgno, cp->indx, cp->recno)) != 0) 565 goto err; 566 } else { 567 data.size = ((QUEUE *)dbp->q_internal)->re_len; 568 data.data = qp->data; 569 if ((ret = __qam_delext_log(dbp, 570 dbc->txn, &LSN(pagep), 0, &LSN(pagep), 571 pagep->pgno, cp->indx, cp->recno, &data)) != 0) 572 goto err; 573 } 574 } else 575 LSN_NOT_LOGGED(LSN(pagep)); 576 577 F_CLR(qp, QAM_VALID); 578 if ((ret = __qam_fput(dbc, 579 cp->pgno, cp->page, dbc->priority)) != 0) 580 goto err; 581 cp->page = NULL; 582 583 /* 584 * Other threads cannot move first_recno past 585 * our position while we have the record locked. 586 * If it's pointing at the deleted record then lock 587 * the metapage and check again as lower numbered 588 * record may have been inserted. 589 */ 590 if (LF_ISSET(DB_CONSUME) || cp->recno == first) { 591 pg = ((QUEUE *)dbp->q_internal)->q_meta; 592 if ((ret = 593 __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &metalock)) != 0) 594 goto err; 595 if ((ret = __memp_fget(mpf, &pg, 596 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 597 goto err; 598 if (LF_ISSET(DB_CONSUME) || cp->recno == meta->first_recno) 599 ret = __qam_consume(dbc, meta, meta->first_recno); 600 } 601 602err: if (meta != NULL && (t_ret = __memp_fput(mpf, dbc->thread_info, 603 meta, dbc->priority)) != 0 && ret == 0) 604 ret = t_ret; 605 /* Don't hold the meta page long term. */ 606 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 607 ret = t_ret; 608 609 if (cp->page != NULL && 610 (t_ret = __qam_fput(dbc, 611 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 612 ret = t_ret; 613 cp->page = NULL; 614 615 return (ret); 616} 617 618#ifdef DEBUG_WOP 619#define QDEBUG 620#endif 621 622/* 623 * __qamc_get -- 624 * Queue DBC->get function. 625 */ 626static int 627__qamc_get(dbc, key, data, flags, pgnop) 628 DBC *dbc; 629 DBT *key, *data; 630 u_int32_t flags; 631 db_pgno_t *pgnop; 632{ 633 DB *dbp; 634 DBC *dbcdup; 635 DBT tmp; 636 DB_LOCK lock, metalock; 637 DB_MPOOLFILE *mpf; 638 ENV *env; 639 PAGE *pg; 640 QAMDATA *qp; 641 QMETA *meta; 642 QUEUE *t; 643 QUEUE_CURSOR *cp; 644 db_lockmode_t lock_mode, meta_mode; 645 db_pgno_t metapno; 646 db_recno_t first; 647 int exact, inorder, is_first, locked, ret, t_ret, wait, with_delete; 648 int retrying; 649 650 dbp = dbc->dbp; 651 env = dbp->env; 652 mpf = dbp->mpf; 653 cp = (QUEUE_CURSOR *)dbc->internal; 654 LOCK_INIT(lock); 655 656 lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; 657 meta_mode = DB_LOCK_READ; 658 meta = NULL; 659 *pgnop = 0; 660 pg = NULL; 661 retrying = t_ret = wait = with_delete = 0; 662 663 if (flags == DB_CONSUME_WAIT) { 664 wait = 1; 665 flags = DB_CONSUME; 666 } 667 if (flags == DB_CONSUME) { 668 with_delete = 1; 669 flags = DB_FIRST; 670 meta_mode = lock_mode = DB_LOCK_WRITE; 671 } 672 inorder = F_ISSET(dbp, DB_AM_INORDER) && with_delete; 673 674 DEBUG_LREAD(dbc, dbc->txn, "qamc_get", 675 flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); 676 677 /* Make lint and friends happy. */ 678 locked = 0; 679 680 is_first = 0; 681 first = 0; 682 683 t = (QUEUE *)dbp->q_internal; 684 metapno = t->q_meta; 685 LOCK_INIT(metalock); 686 687 /* 688 * Get the meta page first 689 */ 690 if (locked == 0 && (ret = __db_lget(dbc, 691 0, metapno, meta_mode, 0, &metalock)) != 0) 692 goto err; 693 locked = 1; 694 if ((ret = __memp_fget(mpf, &metapno, 695 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 696 return (ret); 697 698 /* Release any previous lock if not in a transaction. */ 699 if ((ret = __TLPUT(dbc, cp->lock)) != 0) 700 goto err; 701 702retry: /* Update the record number. */ 703 switch (flags) { 704 case DB_CURRENT: 705 break; 706 case DB_NEXT_DUP: 707 case DB_PREV_DUP: 708 ret = DB_NOTFOUND; 709 goto err; 710 /* NOTREACHED */ 711 case DB_NEXT: 712 case DB_NEXT_NODUP: 713 if (cp->recno != RECNO_OOB) { 714 ++cp->recno; 715 /* Wrap around, skipping zero. */ 716 if (cp->recno == RECNO_OOB) 717 cp->recno++; 718 /* 719 * Check to see if we are out of data. 720 */ 721 if (QAM_AFTER_CURRENT(meta, cp->recno)) { 722 pg = NULL; 723 if (!wait) { 724 ret = DB_NOTFOUND; 725 goto err; 726 } 727 flags = DB_FIRST; 728 /* 729 * If first is not set, then we skipped 730 * a locked record, go back and find it. 731 * If we find a locked record again 732 * wait for it. 733 */ 734 if (first == 0) { 735 retrying = 1; 736 goto retry; 737 } 738 739 if (CDB_LOCKING(env)) { 740 /* Drop the metapage before we wait. */ 741 ret = __memp_fput(mpf, dbc->thread_info, 742 meta, dbc->priority); 743 meta = NULL; 744 if (ret != 0) 745 goto err; 746 if ((ret = __lock_get( 747 env, dbc->locker, 748 DB_LOCK_SWITCH, &dbc->lock_dbt, 749 DB_LOCK_WAIT, &dbc->mylock)) != 0) 750 goto err; 751 752 if ((ret = __lock_get( 753 env, dbc->locker, 754 DB_LOCK_UPGRADE, &dbc->lock_dbt, 755 DB_LOCK_WRITE, &dbc->mylock)) != 0) 756 goto err; 757 if ((ret = __memp_fget(mpf, &metapno, 758 dbc->thread_info, 759 dbc->txn, 0, &meta)) != 0) 760 goto err; 761 goto retry; 762 } 763 /* 764 * Wait for someone to update the meta page. 765 * This will probably mean there is something 766 * in the queue. We then go back up and 767 * try again. 768 */ 769 if (locked == 0) { 770 if ((ret = __db_lget(dbc, 0, metapno, 771 meta_mode, 0, &metalock)) != 0) 772 goto err; 773 locked = 1; 774 if (cp->recno != RECNO_OOB && 775 !QAM_AFTER_CURRENT(meta, cp->recno)) 776 goto retry; 777 } 778 /* Drop the metapage before we wait. */ 779 ret = __memp_fput(mpf, 780 dbc->thread_info, meta, dbc->priority); 781 meta = NULL; 782 if (ret != 0) 783 goto err; 784 if ((ret = __db_lget(dbc, 785 0, metapno, DB_LOCK_WAIT, 786 DB_LOCK_SWITCH, &metalock)) != 0) { 787 if (ret == DB_LOCK_DEADLOCK) 788 ret = DB_LOCK_NOTGRANTED; 789 goto err; 790 } 791 if ((ret = __db_lget(dbc, 0, 792 PGNO_INVALID, DB_LOCK_WRITE, 793 DB_LOCK_UPGRADE, &metalock)) != 0) { 794 if (ret == DB_LOCK_DEADLOCK) 795 ret = DB_LOCK_NOTGRANTED; 796 goto err; 797 } 798 if ((ret = __memp_fget(mpf, 799 &metapno, dbc->thread_info, dbc->txn, 800 0, &meta)) != 0) 801 goto err; 802 locked = 1; 803 goto retry; 804 } 805 break; 806 } 807 /* FALLTHROUGH */ 808 case DB_FIRST: 809 flags = DB_NEXT; 810 is_first = 1; 811 812 /* get the first record number */ 813 cp->recno = first = meta->first_recno; 814 815 break; 816 case DB_PREV: 817 case DB_PREV_NODUP: 818 if (cp->recno != RECNO_OOB) { 819 if (cp->recno == meta->first_recno || 820 QAM_BEFORE_FIRST(meta, cp->recno)) { 821 ret = DB_NOTFOUND; 822 goto err; 823 } 824 --cp->recno; 825 /* Wrap around, skipping zero. */ 826 if (cp->recno == RECNO_OOB) 827 --cp->recno; 828 break; 829 } 830 /* FALLTHROUGH */ 831 case DB_LAST: 832 if (meta->first_recno == meta->cur_recno) { 833 ret = DB_NOTFOUND; 834 goto err; 835 } 836 cp->recno = meta->cur_recno - 1; 837 if (cp->recno == RECNO_OOB) 838 cp->recno--; 839 break; 840 case DB_SET: 841 case DB_SET_RANGE: 842 case DB_GET_BOTH: 843 case DB_GET_BOTH_RANGE: 844 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) 845 goto err; 846 break; 847 default: 848 ret = __db_unknown_flag(env, "__qamc_get", flags); 849 goto err; 850 } 851 852 /* Don't hold the meta page long term. */ 853 if (locked) { 854 if ((ret = __LPUT(dbc, metalock)) != 0) 855 goto err; 856 locked = 0; 857 } 858 859 if ((ret = __memp_fput(mpf, 860 dbc->thread_info, meta, dbc->priority)) != 0) 861 goto err; 862 meta = NULL; 863 864 /* Lock the record. */ 865 if (((ret = __db_lget(dbc, LCK_COUPLE, cp->recno, lock_mode, 866 (with_delete && !inorder && !retrying) ? 867 DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD, 868 &lock)) == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) && 869 with_delete) { 870#ifdef QDEBUG 871 if (DBC_LOGGING(dbc)) 872 (void)__log_printf(env, 873 dbc->txn, "Queue S: %x %d %d", 874 dbc->locker ? dbc->locker->id : 0, 875 cp->recno, first); 876#endif 877 first = 0; 878 if ((ret = 879 __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) 880 goto err; 881 locked = 1; 882 if ((ret = __memp_fget(mpf, &metapno, 883 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 884 goto err; 885 goto retry; 886 } 887 888 if (ret != 0) 889 goto err; 890 891 /* 892 * In the DB_FIRST or DB_LAST cases we must wait and then start over 893 * since the first/last may have moved while we slept. If we are 894 * reading in order and the first record was not there, we can skip it 895 * as it must have been aborted was was skipped by a non-queue insert 896 * or we could not have gotten its lock. If we have the wrong 897 * record we release our locks and try again. 898 */ 899 switch (flags) { 900 default: 901 if (inorder) { 902 if (first != cp->recno) 903 break; 904 } else if (with_delete || !is_first) 905 break; 906 /* FALLTHROUGH */ 907 case DB_SET: 908 case DB_SET_RANGE: 909 case DB_GET_BOTH: 910 case DB_GET_BOTH_RANGE: 911 case DB_LAST: 912 if ((ret = 913 __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) 914 goto lerr; 915 locked = 1; 916 if ((ret = __memp_fget(mpf, &metapno, 917 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 918 goto lerr; 919 if ((is_first && cp->recno != meta->first_recno) || 920 (flags == DB_LAST && cp->recno != meta->cur_recno - 1)) { 921 if ((ret = __LPUT(dbc, lock)) != 0) 922 goto err; 923 if (is_first) 924 flags = DB_FIRST; 925 goto retry; 926 } else if (!is_first && flags != DB_LAST) { 927 if (QAM_BEFORE_FIRST(meta, cp->recno)) { 928 if (flags == DB_SET_RANGE || 929 flags == DB_GET_BOTH_RANGE) { 930 if ((ret = __LPUT(dbc, metalock)) != 0) 931 goto err; 932 locked = 0; 933 cp->lock = lock; 934 LOCK_INIT(lock); 935 goto release_retry; 936 } 937 ret = DB_NOTFOUND; 938 goto lerr; 939 } 940 if (QAM_AFTER_CURRENT(meta, cp->recno)) { 941 ret = DB_NOTFOUND; 942 goto lerr; 943 } 944 } 945 /* Don't hold the meta page long term. */ 946 if ((ret = __LPUT(dbc, metalock)) != 0) 947 goto err; 948 locked = 0; 949 if ((ret = __memp_fput(mpf, 950 dbc->thread_info, meta, dbc->priority)) != 0) 951 goto err; 952 meta = NULL; 953 } 954 955 /* Position the cursor on the record. */ 956 if ((ret = __qam_position(dbc, &cp->recno, 0, &exact)) != 0) { 957 /* We cannot get the page, release the record lock. */ 958 (void)__LPUT(dbc, lock); 959 goto err; 960 } 961 962 pg = cp->page; 963 cp->lock = lock; 964 cp->lock_mode = lock_mode; 965 LOCK_INIT(lock); 966 967 if (!exact) { 968release_retry: /* Release locks and retry, if possible. */ 969 if (pg != NULL) 970 (void)__qam_fput(dbc, cp->pgno, pg, dbc->priority); 971 cp->page = pg = NULL; 972 if (with_delete) { 973 if ((ret = __LPUT(dbc, cp->lock)) != 0) 974 goto err1; 975 } else if ((ret = __TLPUT(dbc, cp->lock)) != 0) 976 goto err1; 977 978 if (locked == 0 && (ret = 979 __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) 980 goto err1; 981 locked = 1; 982 if (meta == NULL && (ret = __memp_fget(mpf, &metapno, 983 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 984 goto err1; 985 /* 986 * If we don't need locks and we are out of range 987 * then we can just skip to the FIRST/LAST record 988 * otherwise we must iterate to lock the records 989 * and get serializability. 990 */ 991 switch (flags) { 992 case DB_NEXT: 993 case DB_NEXT_NODUP: 994 if (!with_delete) 995 is_first = 0; 996 if (QAM_BEFORE_FIRST(meta, cp->recno) && 997 DONT_NEED_LOCKS(dbc)) 998 flags = DB_FIRST; 999 break; 1000 case DB_LAST: 1001 case DB_PREV: 1002 case DB_PREV_NODUP: 1003 if (QAM_AFTER_CURRENT(meta, cp->recno) && 1004 DONT_NEED_LOCKS(dbc)) 1005 flags = DB_LAST; 1006 else 1007 flags = DB_PREV; 1008 break; 1009 1010 case DB_GET_BOTH_RANGE: 1011 case DB_SET_RANGE: 1012 if (QAM_BEFORE_FIRST(meta, cp->recno) && 1013 DONT_NEED_LOCKS(dbc)) 1014 flags = DB_FIRST; 1015 else 1016 flags = DB_NEXT; 1017 break; 1018 1019 default: 1020 /* this is for the SET and GET_BOTH cases */ 1021 ret = DB_KEYEMPTY; 1022 goto err1; 1023 } 1024 retrying = 0; 1025 goto retry; 1026 } 1027 1028 qp = QAM_GET_RECORD(dbp, pg, cp->indx); 1029 1030 /* Return the data item. */ 1031 if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) { 1032 /* 1033 * Need to compare 1034 */ 1035 tmp.data = qp->data; 1036 tmp.size = t->re_len; 1037 if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) { 1038 if (flags == DB_GET_BOTH_RANGE) 1039 goto release_retry; 1040 ret = DB_NOTFOUND; 1041 goto err1; 1042 } 1043 } 1044 1045 /* Return the key if the user didn't give us one. */ 1046 if (key != NULL && !F_ISSET(key, DB_DBT_ISSET)) { 1047 if ((ret = __db_retcopy(dbp->env, 1048 key, &cp->recno, sizeof(cp->recno), 1049 &dbc->rkey->data, &dbc->rkey->ulen)) != 0) 1050 goto err1; 1051 F_SET(key, DB_DBT_ISSET); 1052 } 1053 1054 if (data != NULL && 1055 !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) && 1056 !F_ISSET(data, DB_DBT_ISSET)) { 1057 if ((ret = __db_retcopy(dbp->env, data, qp->data, t->re_len, 1058 &dbc->rdata->data, &dbc->rdata->ulen)) != 0) 1059 goto err1; 1060 F_SET(data, DB_DBT_ISSET); 1061 } 1062 1063 /* Finally, if we are doing DB_CONSUME mark the record. */ 1064 if (with_delete) { 1065 /* 1066 * Assert that we're not a secondary index. Doing a DB_CONSUME 1067 * on a secondary makes very little sense, since one can't 1068 * DB_APPEND there; attempting one should be forbidden by 1069 * the interface. 1070 */ 1071 DB_ASSERT(env, !F_ISSET(dbp, DB_AM_SECONDARY)); 1072 1073 /* 1074 * If we have any secondary indices, call __dbc_del_primary to 1075 * delete the references to the item we're about to delete. 1076 * 1077 * Note that we work on a duplicated cursor, since the 1078 * __db_ret work has already been done, so it's not safe 1079 * to perform any additional ops on this cursor. 1080 */ 1081 if (DB_IS_PRIMARY(dbp)) { 1082 if ((ret = __dbc_idup(dbc, 1083 &dbcdup, DB_POSITION)) != 0) 1084 goto err1; 1085 1086 if ((ret = __qam_fput(dbc, 1087 cp->pgno, cp->page, dbc->priority)) != 0) 1088 goto err1; 1089 cp->page = NULL; 1090 if (meta != NULL && 1091 (ret = __memp_fput(mpf, 1092 dbc->thread_info, meta, dbc->priority)) != 0) 1093 goto err1; 1094 meta = NULL; 1095 if ((ret = __dbc_del_primary(dbcdup)) != 0) { 1096 /* 1097 * The __dbc_del_primary return is more 1098 * interesting. 1099 */ 1100 (void)__dbc_close(dbcdup); 1101 goto err1; 1102 } 1103 1104 if ((ret = __dbc_close(dbcdup)) != 0) 1105 goto err1; 1106 if ((ret = __qam_fget(dbc, 1107 &cp->pgno, DB_MPOOL_DIRTY, &cp->page)) != 0) 1108 goto err; 1109 } else if ((ret = __qam_dirty(dbc, 1110 cp->pgno, &cp->page, dbc->priority)) != 0) 1111 goto err1; 1112 1113 pg = cp->page; 1114 1115 if (DBC_LOGGING(dbc)) { 1116 if (t->page_ext == 0 || t->re_len == 0) { 1117 if ((ret = __qam_del_log(dbp, dbc->txn, 1118 &LSN(pg), 0, &LSN(pg), 1119 pg->pgno, cp->indx, cp->recno)) != 0) 1120 goto err1; 1121 } else { 1122 tmp.data = qp->data; 1123 tmp.size = t->re_len; 1124 if ((ret = __qam_delext_log(dbp, 1125 dbc->txn, &LSN(pg), 0, &LSN(pg), 1126 pg->pgno, cp->indx, cp->recno, &tmp)) != 0) 1127 goto err1; 1128 } 1129 } else 1130 LSN_NOT_LOGGED(LSN(pg)); 1131 1132 F_CLR(qp, QAM_VALID); 1133 if ((ret = __qam_fput(dbc, 1134 cp->pgno, cp->page, dbc->priority)) != 0) 1135 goto err; 1136 cp->page = NULL; 1137 1138 /* 1139 * Now we need to update the metapage 1140 * first pointer. If we have deleted 1141 * the record that is pointed to by 1142 * first_recno then we move it as far 1143 * forward as we can without blocking. 1144 * The metapage lock must be held for 1145 * the whole scan otherwise someone could 1146 * do a random insert behind where we are 1147 * looking. 1148 */ 1149 1150 if (locked == 0 && (ret = __db_lget( 1151 dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) 1152 goto err1; 1153 locked = 1; 1154 if (meta == NULL && 1155 (ret = __memp_fget(mpf, 1156 &metapno, dbc->thread_info, dbc->txn, 0, &meta)) != 0) 1157 goto err1; 1158 1159#ifdef QDEBUG 1160 if (DBC_LOGGING(dbc)) 1161 (void)__log_printf(env, 1162 dbc->txn, "Queue D: %x %d %d %d", 1163 dbc->locker ? dbc->locker->id : 0, 1164 cp->recno, first, meta->first_recno); 1165#endif 1166 /* 1167 * See if we deleted the "first" record. If 1168 * first is zero then we skipped something, 1169 * see if first_recno has been move passed 1170 * that to the record that we deleted. 1171 */ 1172 if (first == 0) 1173 first = cp->recno; 1174 if (first != meta->first_recno) 1175 goto done; 1176 1177 if ((ret = __qam_consume(dbc, meta, first)) != 0) 1178 goto err1; 1179 } 1180 1181done: 1182err1: if (cp->page != NULL) { 1183 if ((t_ret = __qam_fput(dbc, 1184 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 1185 ret = t_ret; 1186 1187 cp->page = NULL; 1188 } 1189 if (0) { 1190lerr: (void)__LPUT(dbc, lock); 1191 } 1192 1193err: if (meta) { 1194 /* Release the meta page. */ 1195 if ((t_ret = __memp_fput(mpf, 1196 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 1197 ret = t_ret; 1198 1199 /* Don't hold the meta page long term. */ 1200 if (locked) 1201 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 1202 ret = t_ret; 1203 } 1204 DB_ASSERT(env, !LOCK_ISSET(metalock)); 1205 1206 return ((ret == DB_LOCK_NOTGRANTED && !F_ISSET(env->dbenv, 1207 DB_ENV_TIME_NOTGRANTED)) ? DB_LOCK_DEADLOCK : ret); 1208} 1209 1210/* 1211 * __qam_consume -- try to reset the head of the queue. 1212 * 1213 */ 1214static int 1215__qam_consume(dbc, meta, first) 1216 DBC *dbc; 1217 QMETA *meta; 1218 db_recno_t first; 1219{ 1220 DB *dbp; 1221 DB_LOCK lock, save_lock; 1222 DB_MPOOLFILE *mpf; 1223 QUEUE_CURSOR *cp; 1224 db_indx_t save_indx; 1225 db_pgno_t save_page; 1226 db_recno_t current, save_recno; 1227 u_int32_t rec_extent; 1228 int exact, ret, t_ret, wrapped; 1229 1230 dbp = dbc->dbp; 1231 mpf = dbp->mpf; 1232 cp = (QUEUE_CURSOR *)dbc->internal; 1233 ret = 0; 1234 1235 save_page = cp->pgno; 1236 save_indx = cp->indx; 1237 save_recno = cp->recno; 1238 save_lock = cp->lock; 1239 1240 /* 1241 * If we skipped some deleted records, we need to 1242 * reposition on the first one. Get a lock 1243 * in case someone is trying to put it back. 1244 */ 1245 if (first != cp->recno) { 1246 ret = __db_lget(dbc, 0, first, DB_LOCK_READ, 1247 DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); 1248 if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) { 1249 ret = 0; 1250 goto done; 1251 } 1252 if (ret != 0) 1253 goto done; 1254 if (cp->page != NULL && (ret = 1255 __qam_fput(dbc, cp->pgno, cp->page, dbc->priority)) != 0) 1256 goto done; 1257 cp->page = NULL; 1258 if ((ret = __qam_position(dbc, 1259 &first, 0, &exact)) != 0 || exact != 0) { 1260 (void)__LPUT(dbc, lock); 1261 goto done; 1262 } 1263 if ((ret =__LPUT(dbc, lock)) != 0) 1264 goto done; 1265 } 1266 1267 current = meta->cur_recno; 1268 wrapped = 0; 1269 if (first > current) 1270 wrapped = 1; 1271 rec_extent = meta->page_ext * meta->rec_page; 1272 1273 /* Loop until we find a record or hit current */ 1274 for (;;) { 1275 /* 1276 * Check to see if we are moving off the extent 1277 * and remove the extent. 1278 * If we are moving off a page we need to 1279 * get rid of the buffer. 1280 * Wait for the lagging readers to move off the 1281 * page. 1282 */ 1283 if (rec_extent != 0 && 1284 ((exact = (first % rec_extent == 0)) || 1285 (first % meta->rec_page == 0) || 1286 first == UINT32_MAX)) { 1287 if (exact == 1 && (ret = __db_lget(dbc, 1288 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) 1289 break; 1290#ifdef QDEBUG 1291 if (DBC_LOGGING(dbc)) 1292 (void)__log_printf(dbp->env, dbc->txn, 1293 "Queue R: %x %d %d %d", 1294 dbc->locker ? dbc->locker->id : 0, 1295 cp->pgno, first, meta->first_recno); 1296#endif 1297 if (cp->page != NULL && (ret = __qam_fput(dbc, 1298 cp->pgno, cp->page, DB_PRIORITY_VERY_LOW)) != 0) 1299 break; 1300 cp->page = NULL; 1301 1302 if (exact == 1) { 1303 ret = __qam_fremove(dbp, cp->pgno); 1304 if ((t_ret = 1305 __LPUT(dbc, cp->lock)) != 0 && ret == 0) 1306 ret = t_ret; 1307 } 1308 if (ret != 0) 1309 break; 1310 } else if (cp->page != NULL && (ret = __qam_fput(dbc, 1311 cp->pgno, cp->page, dbc->priority)) != 0) 1312 break; 1313 cp->page = NULL; 1314 first++; 1315 if (first == RECNO_OOB) { 1316 wrapped = 0; 1317 first++; 1318 } 1319 1320 /* 1321 * LOOP EXIT when we come move to the current 1322 * pointer. 1323 */ 1324 if (!wrapped && first >= current) 1325 break; 1326 1327 ret = __db_lget(dbc, 0, first, DB_LOCK_READ, 1328 DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); 1329 if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) { 1330 ret = 0; 1331 break; 1332 } 1333 if (ret != 0) 1334 break; 1335 1336 if ((ret = __qam_position(dbc, &first, 0, &exact)) != 0) { 1337 (void)__LPUT(dbc, lock); 1338 break; 1339 } 1340 if ((ret =__LPUT(dbc, lock)) != 0 || exact) { 1341 if ((t_ret = __qam_fput(dbc, cp->pgno, 1342 cp->page, dbc->priority)) != 0 && ret == 0) 1343 ret = t_ret; 1344 cp->page = NULL; 1345 break; 1346 } 1347 } 1348 1349 cp->pgno = save_page; 1350 cp->indx = save_indx; 1351 cp->recno = save_recno; 1352 cp->lock = save_lock; 1353 1354 /* 1355 * We have advanced as far as we can. 1356 * Advance first_recno to this point. 1357 */ 1358 if (ret == 0 && meta->first_recno != first) { 1359 if ((ret = __memp_dirty(mpf, 1360 &meta, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 1361 goto done; 1362#ifdef QDEBUG 1363 if (DBC_LOGGING(dbc)) 1364 (void)__log_printf(dbp->env, dbc->txn, 1365 "Queue M: %x %d %d %d", 1366 dbc->locker ? dbc->locker->id : 0, 1367 cp->recno, first, meta->first_recno); 1368#endif 1369 if (DBC_LOGGING(dbc)) { 1370 if ((ret = __qam_incfirst_log(dbp, 1371 dbc->txn, &meta->dbmeta.lsn, 0, 1372 cp->recno, PGNO_BASE_MD)) != 0) 1373 goto done; 1374 } else 1375 LSN_NOT_LOGGED(meta->dbmeta.lsn); 1376 meta->first_recno = first; 1377 } 1378 1379done: 1380 return (ret); 1381} 1382 1383static int 1384__qam_bulk(dbc, data, flags) 1385 DBC *dbc; 1386 DBT *data; 1387 u_int32_t flags; 1388{ 1389 DB *dbp; 1390 DB_LOCK metalock, rlock; 1391 DB_MPOOLFILE *mpf; 1392 PAGE *pg; 1393 QAMDATA *qp; 1394 QMETA *meta; 1395 QUEUE_CURSOR *cp; 1396 db_indx_t indx; 1397 db_lockmode_t lkmode; 1398 db_pgno_t metapno; 1399 u_int32_t *endp, *offp; 1400 u_int32_t pagesize, re_len, recs; 1401 u_int8_t *dbuf, *dp, *np; 1402 int exact, ret, t_ret, valid; 1403 int is_key, need_pg, size, space; 1404 1405 dbp = dbc->dbp; 1406 mpf = dbp->mpf; 1407 cp = (QUEUE_CURSOR *)dbc->internal; 1408 1409 lkmode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; 1410 1411 pagesize = dbp->pgsize; 1412 re_len = ((QUEUE *)dbp->q_internal)->re_len; 1413 recs = ((QUEUE *)dbp->q_internal)->rec_page; 1414 metapno = ((QUEUE *)dbp->q_internal)->q_meta; 1415 1416 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0; 1417 size = 0; 1418 1419 if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0) 1420 return (ret); 1421 if ((ret = __memp_fget(mpf, &metapno, 1422 dbc->thread_info, dbc->txn, 0, &meta)) != 0) { 1423 /* We did not fetch it, we can release the lock. */ 1424 (void)__LPUT(dbc, metalock); 1425 return (ret); 1426 } 1427 1428 dbuf = data->data; 1429 np = dp = dbuf; 1430 1431 /* Keep track of space that is left. There is an termination entry */ 1432 space = (int)data->ulen; 1433 space -= (int)sizeof(*offp); 1434 1435 /* Build the offset/size table from the end up. */ 1436 endp = (u_int32_t *)((u_int8_t *)dbuf + data->ulen); 1437 endp--; 1438 offp = endp; 1439 /* Save the lock on the current position of the cursor. */ 1440 rlock = cp->lock; 1441 LOCK_INIT(cp->lock); 1442 1443next_pg: 1444 /* Wrap around, skipping zero. */ 1445 if (cp->recno == RECNO_OOB) 1446 cp->recno++; 1447 if ((ret = __qam_position(dbc, &cp->recno, 0, &exact)) != 0) 1448 goto done; 1449 1450 pg = cp->page; 1451 indx = cp->indx; 1452 need_pg = 1; 1453 1454 do { 1455 /* 1456 * If this page is a nonexistent page at the end of an 1457 * extent, pg may be NULL. A NULL page has no valid records, 1458 * so just keep looping as though qp exists and isn't QAM_VALID; 1459 * calling QAM_GET_RECORD is unsafe. 1460 */ 1461 valid = 0; 1462 1463 if (pg != NULL) { 1464 if ((ret = __db_lget(dbc, LCK_COUPLE, 1465 cp->recno, lkmode, DB_LOCK_RECORD, &rlock)) != 0) 1466 goto done; 1467 qp = QAM_GET_RECORD(dbp, pg, indx); 1468 if (F_ISSET(qp, QAM_VALID)) { 1469 valid = 1; 1470 space -= (int) 1471 ((is_key ? 3 : 2) * sizeof(*offp)); 1472 if (space < 0) 1473 goto get_space; 1474 if (need_pg) { 1475 dp = np; 1476 size = (int)pagesize - QPAGE_SZ(dbp); 1477 if (space < size) { 1478get_space: 1479 if (offp == endp) { 1480 data->size = (u_int32_t) 1481 DB_ALIGN((u_int32_t) 1482 size + pagesize, 1483 sizeof(u_int32_t)); 1484 ret = DB_BUFFER_SMALL; 1485 break; 1486 } 1487 if (indx != 0) 1488 indx--; 1489 cp->recno--; 1490 space = 0; 1491 break; 1492 } 1493 memcpy(dp, 1494 (u_int8_t *)pg + QPAGE_SZ(dbp), 1495 (u_int)size); 1496 need_pg = 0; 1497 space -= size; 1498 np += size; 1499 } 1500 if (is_key) 1501 *offp-- = cp->recno; 1502 *offp-- = (u_int32_t)((((u_int8_t *)qp - 1503 (u_int8_t *)pg) - QPAGE_SZ(dbp)) + 1504 (dp - dbuf) + SSZA(QAMDATA, data)); 1505 *offp-- = re_len; 1506 } 1507 } 1508 if (!valid && is_key == 0) { 1509 *offp-- = 0; 1510 *offp-- = 0; 1511 } 1512 cp->recno++; 1513 } while (++indx < recs && cp->recno != RECNO_OOB && 1514 !QAM_AFTER_CURRENT(meta, cp->recno)); 1515 1516 if (cp->page != NULL) { 1517 if ((t_ret = __qam_fput(dbc, 1518 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 1519 ret = t_ret; 1520 cp->page = NULL; 1521 } 1522 1523 if (ret == 0 && space > 0 && 1524 (indx >= recs || cp->recno == RECNO_OOB) && 1525 !QAM_AFTER_CURRENT(meta, cp->recno)) 1526 goto next_pg; 1527 1528 /* 1529 * Correct recno in two cases: 1530 * 1) If we just wrapped fetch must start at record 1 not a FIRST. 1531 * 2) We ran out of space exactly at the end of a page. 1532 */ 1533 if (cp->recno == RECNO_OOB || (space == 0 && indx == recs)) 1534 cp->recno--; 1535 1536 if (is_key == 1) 1537 *offp = RECNO_OOB; 1538 else 1539 *offp = (u_int32_t)-1; 1540 1541done: /* Release the meta page. */ 1542 if ((t_ret = __memp_fput(mpf, 1543 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 1544 ret = t_ret; 1545 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 1546 ret = t_ret; 1547 1548 cp->lock = rlock; 1549 1550 return (ret); 1551} 1552 1553/* 1554 * __qamc_close -- 1555 * Close down the cursor from a single use. 1556 */ 1557static int 1558__qamc_close(dbc, root_pgno, rmroot) 1559 DBC *dbc; 1560 db_pgno_t root_pgno; 1561 int *rmroot; 1562{ 1563 QUEUE_CURSOR *cp; 1564 int ret; 1565 1566 COMPQUIET(root_pgno, 0); 1567 COMPQUIET(rmroot, NULL); 1568 1569 cp = (QUEUE_CURSOR *)dbc->internal; 1570 1571 /* Discard any locks not acquired inside of a transaction. */ 1572 ret = __TLPUT(dbc, cp->lock); 1573 1574 LOCK_INIT(cp->lock); 1575 cp->page = NULL; 1576 cp->pgno = PGNO_INVALID; 1577 cp->indx = 0; 1578 cp->lock_mode = DB_LOCK_NG; 1579 cp->recno = RECNO_OOB; 1580 cp->flags = 0; 1581 1582 return (ret); 1583} 1584 1585/* 1586 * __qamc_dup -- 1587 * Duplicate a queue cursor, such that the new one holds appropriate 1588 * locks for the position of the original. 1589 * 1590 * PUBLIC: int __qamc_dup __P((DBC *, DBC *)); 1591 */ 1592int 1593__qamc_dup(orig_dbc, new_dbc) 1594 DBC *orig_dbc, *new_dbc; 1595{ 1596 QUEUE_CURSOR *orig, *new; 1597 1598 orig = (QUEUE_CURSOR *)orig_dbc->internal; 1599 new = (QUEUE_CURSOR *)new_dbc->internal; 1600 1601 new->recno = orig->recno; 1602 1603 return (0); 1604} 1605 1606/* 1607 * __qamc_init 1608 * 1609 * PUBLIC: int __qamc_init __P((DBC *)); 1610 */ 1611int 1612__qamc_init(dbc) 1613 DBC *dbc; 1614{ 1615 DB *dbp; 1616 QUEUE_CURSOR *cp; 1617 int ret; 1618 1619 dbp = dbc->dbp; 1620 1621 /* Allocate the internal structure. */ 1622 cp = (QUEUE_CURSOR *)dbc->internal; 1623 if (cp == NULL) { 1624 if ((ret = 1625 __os_calloc(dbp->env, 1, sizeof(QUEUE_CURSOR), &cp)) != 0) 1626 return (ret); 1627 dbc->internal = (DBC_INTERNAL *)cp; 1628 } 1629 1630 /* Initialize methods. */ 1631 dbc->close = dbc->c_close = __dbc_close_pp; 1632 dbc->cmp = __dbc_cmp_pp; 1633 dbc->count = dbc->c_count = __dbc_count_pp; 1634 dbc->del = dbc->c_del = __dbc_del_pp; 1635 dbc->dup = dbc->c_dup = __dbc_dup_pp; 1636 dbc->get = dbc->c_get = __dbc_get_pp; 1637 dbc->pget = dbc->c_pget = __dbc_pget_pp; 1638 dbc->put = dbc->c_put = __dbc_put_pp; 1639 dbc->am_bulk = __qam_bulk; 1640 dbc->am_close = __qamc_close; 1641 dbc->am_del = __qamc_del; 1642 dbc->am_destroy = __qamc_destroy; 1643 dbc->am_get = __qamc_get; 1644 dbc->am_put = __qamc_put; 1645 dbc->am_writelock = NULL; 1646 1647 return (0); 1648} 1649 1650/* 1651 * __qamc_destroy -- 1652 * Close a single cursor -- internal version. 1653 */ 1654static int 1655__qamc_destroy(dbc) 1656 DBC *dbc; 1657{ 1658 /* Discard the structures. */ 1659 __os_free(dbc->env, dbc->internal); 1660 1661 return (0); 1662} 1663 1664/* 1665 * __qam_getno -- 1666 * Check the user's record number. 1667 */ 1668static int 1669__qam_getno(dbp, key, rep) 1670 DB *dbp; 1671 const DBT *key; 1672 db_recno_t *rep; 1673{ 1674 /* If passed an empty DBT from Java, key->data may be NULL */ 1675 if (key->size != sizeof(db_recno_t)) { 1676 __db_errx(dbp->env, "illegal record number size"); 1677 return (EINVAL); 1678 } 1679 1680 if ((*rep = *(db_recno_t *)key->data) == 0) { 1681 __db_errx(dbp->env, "illegal record number of 0"); 1682 return (EINVAL); 1683 } 1684 return (0); 1685} 1686 1687/* 1688 * __qam_truncate -- 1689 * Truncate a queue database 1690 * 1691 * PUBLIC: int __qam_truncate __P((DBC *, u_int32_t *)); 1692 */ 1693int 1694__qam_truncate(dbc, countp) 1695 DBC *dbc; 1696 u_int32_t *countp; 1697{ 1698 DB *dbp; 1699 DB_LOCK metalock; 1700 DB_MPOOLFILE *mpf; 1701 QMETA *meta; 1702 db_pgno_t metapno; 1703 u_int32_t count; 1704 int ret, t_ret; 1705 1706 dbp = dbc->dbp; 1707 1708 /* Walk the queue, counting rows. */ 1709 for (count = 0; 1710 (ret = __qamc_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0;) 1711 count++; 1712 if (ret != DB_NOTFOUND) 1713 return (ret); 1714 1715 /* Update the meta page. */ 1716 metapno = ((QUEUE *)dbp->q_internal)->q_meta; 1717 if ((ret = 1718 __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0) 1719 return (ret); 1720 1721 mpf = dbp->mpf; 1722 if ((ret = __memp_fget(mpf, &metapno, dbc->thread_info, dbc->txn, 1723 DB_MPOOL_DIRTY, &meta)) != 0) { 1724 /* We did not fetch it, we can release the lock. */ 1725 (void)__LPUT(dbc, metalock); 1726 return (ret); 1727 } 1728 /* Remove the last extent file. */ 1729 if (meta->cur_recno > 1 && ((QUEUE *)dbp->q_internal)->page_ext != 0) { 1730 if ((ret = __qam_fremove(dbp, 1731 QAM_RECNO_PAGE(dbp, meta->cur_recno - 1))) != 0) 1732 goto err; 1733 } 1734 1735 if (DBC_LOGGING(dbc)) { 1736 ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0, 1737 QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno, 1738 1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD); 1739 } else 1740 LSN_NOT_LOGGED(meta->dbmeta.lsn); 1741 if (ret == 0) 1742 meta->first_recno = meta->cur_recno = 1; 1743 1744err: if ((t_ret = __memp_fput(mpf, 1745 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 1746 ret = t_ret; 1747 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 1748 ret = t_ret; 1749 1750 if (countp != NULL) 1751 *countp = count; 1752 1753 return (ret); 1754} 1755 1756/* 1757 * __qam_delete -- 1758 * Queue fast delete function. 1759 * 1760 * PUBLIC: int __qam_delete __P((DBC *, DBT *, u_int32_t)); 1761 */ 1762int 1763__qam_delete(dbc, key, flags) 1764 DBC *dbc; 1765 DBT *key; 1766 u_int32_t flags; 1767{ 1768 QUEUE_CURSOR *cp; 1769 int ret; 1770 1771 cp = (QUEUE_CURSOR *)dbc->internal; 1772 if ((ret = __qam_getno(dbc->dbp, key, &cp->recno)) != 0) 1773 goto err; 1774 1775 ret = __qamc_del(dbc, flags); 1776 1777err: return (ret); 1778} 1779