1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1999,2008 Oracle. All rights reserved. 5 * 6 * $Id: qam.c,v 12.59 2008/03/13 15:44:50 mbrey Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/btree.h" 14#include "dbinc/lock.h" 15#include "dbinc/log.h" 16#include "dbinc/mp.h" 17#include "dbinc/qam.h" 18 19static int __qam_bulk __P((DBC *, DBT *, u_int32_t)); 20static int __qamc_close __P((DBC *, db_pgno_t, int *)); 21static int __qamc_del __P((DBC *)); 22static int __qamc_destroy __P((DBC *)); 23static int __qamc_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); 24static int __qamc_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *)); 25static int __qam_consume __P((DBC *, QMETA *, db_recno_t)); 26static int __qam_getno __P((DB *, const DBT *, db_recno_t *)); 27 28#define DONT_NEED_LOCKS(dbc) ((dbc)->txn == NULL || \ 29 F_ISSET(dbc, DBC_READ_COMMITTED | DBC_READ_UNCOMMITTED)) 30 31/* 32 * __qam_position -- 33 * Position a queued access method cursor at a record. This returns 34 * the page locked. *exactp will be set if the record is valid. 35 * PUBLIC: int __qam_position 36 * PUBLIC: __P((DBC *, db_recno_t *, db_lockmode_t, u_int32_t, int *)); 37 */ 38int 39__qam_position(dbc, recnop, lock_mode, get_mode, exactp) 40 DBC *dbc; /* open cursor */ 41 db_recno_t *recnop; /* pointer to recno to find */ 42 db_lockmode_t lock_mode;/* locking: read or write */ 43 u_int32_t get_mode; /* flags to __memp_fget */ 44 int *exactp; /* indicate if it was found */ 45{ 46 DB *dbp; 47 QAMDATA *qp; 48 QUEUE_CURSOR *cp; 49 db_pgno_t pg; 50 int ret, t_ret; 51 52 dbp = dbc->dbp; 53 cp = (QUEUE_CURSOR *)dbc->internal; 54 55 /* Fetch the page for this recno. */ 56 pg = QAM_RECNO_PAGE(dbp, *recnop); 57 58 if ((ret = __db_lget(dbc, 0, pg, lock_mode, 0, &cp->lock)) != 0) 59 return (ret); 60 cp->page = NULL; 61 *exactp = 0; 62 if ((ret = __qam_fget(dbc, &pg, get_mode, &cp->page)) != 0) { 63 if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE) && 64 (ret == DB_PAGE_NOTFOUND || ret == ENOENT)) 65 ret = 0; 66 67 /* We did not fetch it, we can release the lock. */ 68 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) 69 ret = t_ret; 70 return (ret); 71 } 72 cp->pgno = pg; 73 cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop); 74 75 if (PGNO(cp->page) == 0) { 76 /* 77 * We have read an uninitialized page: set the page number if 78 * we're creating the page. Otherwise, we know that the record 79 * doesn't exist yet. 80 */ 81 if (!FLD_ISSET(get_mode, DB_MPOOL_CREATE)) { 82 *exactp = 0; 83 return (0); 84 } 85 DB_ASSERT(dbp->env, FLD_ISSET(get_mode, DB_MPOOL_CREATE)); 86 PGNO(cp->page) = pg; 87 TYPE(cp->page) = P_QAMDATA; 88 } 89 90 qp = QAM_GET_RECORD(dbp, cp->page, cp->indx); 91 *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0; 92 93 return (ret); 94} 95 96/* 97 * __qam_pitem -- 98 * Put an item on a queue page. Copy the data to the page and set the 99 * VALID and SET bits. If logging and the record was previously set, 100 * log that data, otherwise just log the new data. 101 * 102 * pagep must be write locked 103 * 104 * PUBLIC: int __qam_pitem 105 * PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *)); 106 */ 107int 108__qam_pitem(dbc, pagep, indx, recno, data) 109 DBC *dbc; 110 QPAGE *pagep; 111 u_int32_t indx; 112 db_recno_t recno; 113 DBT *data; 114{ 115 DB *dbp; 116 DBT olddata, pdata, *datap; 117 ENV *env; 118 QAMDATA *qp; 119 QUEUE *t; 120 u_int8_t *dest, *p; 121 int allocated, ret; 122 123 dbp = dbc->dbp; 124 env = dbp->env; 125 t = (QUEUE *)dbp->q_internal; 126 allocated = ret = 0; 127 128 if (data->size > t->re_len) 129 return (__db_rec_toobig(env, data->size, t->re_len)); 130 qp = QAM_GET_RECORD(dbp, pagep, indx); 131 132 p = qp->data; 133 datap = data; 134 if (F_ISSET(data, DB_DBT_PARTIAL)) { 135 if (data->doff + data->dlen > t->re_len) { 136 __db_errx(env, 137 "%s: data offset plus length larger than record size of %lu", 138 "Record length error", (u_long)t->re_len); 139 return (EINVAL); 140 } 141 142 if (data->size != data->dlen) 143 return (__db_rec_repl(env, data->size, data->dlen)); 144 145 if (data->size == t->re_len) 146 goto no_partial; 147 148 /* 149 * If we are logging, then we have to build the record 150 * first, otherwise, we can simply drop the change 151 * directly on the page. After this clause, make 152 * sure that datap and p are set up correctly so that 153 * copying datap into p does the right thing. 154 * 155 * Note, I am changing this so that if the existing 156 * record is not valid, we create a complete record 157 * to log so that both this and the recovery code is simpler. 158 */ 159 160 if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) { 161 datap = &pdata; 162 memset(datap, 0, sizeof(*datap)); 163 164 if ((ret = __os_malloc(env, 165 t->re_len, &datap->data)) != 0) 166 return (ret); 167 allocated = 1; 168 datap->size = t->re_len; 169 170 /* 171 * Construct the record if it's valid, otherwise set it 172 * all to the pad character. 173 */ 174 dest = datap->data; 175 if (F_ISSET(qp, QAM_VALID)) 176 memcpy(dest, p, t->re_len); 177 else 178 memset(dest, (int)t->re_pad, t->re_len); 179 180 dest += data->doff; 181 memcpy(dest, data->data, data->size); 182 } else { 183 datap = data; 184 p += data->doff; 185 } 186 } 187 188no_partial: 189 if (DBC_LOGGING(dbc)) { 190 olddata.size = 0; 191 if (F_ISSET(qp, QAM_SET)) { 192 olddata.data = qp->data; 193 olddata.size = t->re_len; 194 } 195 if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep), 196 0, &LSN(pagep), pagep->pgno, 197 indx, recno, datap, qp->flags, 198 olddata.size == 0 ? NULL : &olddata)) != 0) 199 goto err; 200 } else if (!F_ISSET((dbc), DBC_RECOVER)) 201 LSN_NOT_LOGGED(LSN(pagep)); 202 203 F_SET(qp, QAM_VALID | QAM_SET); 204 memcpy(p, datap->data, datap->size); 205 if (!F_ISSET(data, DB_DBT_PARTIAL)) 206 memset(p + datap->size, 207 (int)t->re_pad, t->re_len - datap->size); 208 209err: if (allocated) 210 __os_free(env, datap->data); 211 212 return (ret); 213} 214/* 215 * __qamc_put 216 * Cursor put for queued access method. 217 * BEFORE and AFTER cannot be specified. 218 */ 219static int 220__qamc_put(dbc, key, data, flags, pgnop) 221 DBC *dbc; 222 DBT *key, *data; 223 u_int32_t flags; 224 db_pgno_t *pgnop; 225{ 226 DB *dbp; 227 DB_LOCK lock; 228 DB_MPOOLFILE *mpf; 229 ENV *env; 230 QMETA *meta; 231 QUEUE_CURSOR *cp; 232 db_pgno_t pg; 233 db_recno_t new_cur, new_first; 234 u_int32_t opcode; 235 int exact, ret, t_ret, writelock; 236 237 dbp = dbc->dbp; 238 env = dbp->env; 239 mpf = dbp->mpf; 240 if (pgnop != NULL) 241 *pgnop = PGNO_INVALID; 242 243 cp = (QUEUE_CURSOR *)dbc->internal; 244 245 switch (flags) { 246 case DB_KEYFIRST: 247 case DB_KEYLAST: 248 case DB_NOOVERWRITE: 249 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) 250 return (ret); 251 /* FALLTHROUGH */ 252 case DB_CURRENT: 253 break; 254 default: 255 /* The interface shouldn't let anything else through. */ 256 return (__db_ferr(env, "DBC->put", 0)); 257 } 258 259 /* Write lock the record. */ 260 if ((ret = __db_lget(dbc, LCK_COUPLE, 261 cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0) 262 return (ret); 263 264 lock = cp->lock; 265 266 if ((ret = __qam_position(dbc, &cp->recno, DB_LOCK_WRITE, 267 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0) { 268 /* We could not get the page, we can release the record lock. */ 269 (void)__LPUT(dbc, lock); 270 return (ret); 271 } 272 273 if (exact != 0 && flags == DB_NOOVERWRITE) 274 ret = DB_KEYEXIST; 275 else 276 /* Put the item on the page. */ 277 ret = __qam_pitem(dbc, 278 (QPAGE *)cp->page, cp->indx, cp->recno, data); 279 280 /* Doing record locking, release the page lock */ 281 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) 282 ret = t_ret; 283 if ((t_ret = __qam_fput(dbc, 284 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 285 ret = t_ret; 286 cp->page = NULL; 287 cp->lock = lock; 288 cp->lock_mode = DB_LOCK_WRITE; 289 if (ret != 0) 290 return (ret); 291 292 /* We may need to reset the head or tail of the queue. */ 293 pg = ((QUEUE *)dbp->q_internal)->q_meta; 294 295 /* 296 * Get the meta page first, we don't want to lock it while trying 297 * to pin it. 298 */ 299 writelock = 0; 300 if ((ret = __memp_fget(mpf, &pg, 301 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 302 return (ret); 303 if ((ret = __db_lget(dbc, LCK_COUPLE, 304 pg, DB_LOCK_READ, 0, &cp->lock)) != 0) { 305 (void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority); 306 return (ret); 307 } 308 309 opcode = 0; 310 new_cur = new_first = 0; 311 312 /* 313 * If the put address is outside the queue, adjust the head and 314 * tail of the queue. If the order is inverted we move 315 * the one which is closer. The first case is when the 316 * queue is empty, move first and current to where the new 317 * insert is. 318 */ 319 320recheck: 321 if (meta->first_recno == meta->cur_recno) { 322 new_first = cp->recno; 323 new_cur = cp->recno + 1; 324 if (new_cur == RECNO_OOB) 325 new_cur++; 326 opcode |= QAM_SETFIRST; 327 opcode |= QAM_SETCUR; 328 } else { 329 if (QAM_BEFORE_FIRST(meta, cp->recno)) { 330 new_first = cp->recno; 331 opcode |= QAM_SETFIRST; 332 } 333 334 if (QAM_AFTER_CURRENT(meta, cp->recno)) { 335 new_cur = cp->recno + 1; 336 if (new_cur == RECNO_OOB) 337 new_cur++; 338 opcode |= QAM_SETCUR; 339 } 340 } 341 342 if (opcode == 0) 343 goto done; 344 345 /* Drop the read lock and get the a write lock on the meta page. */ 346 if (writelock == 0 && (ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, 347 pg, DB_LOCK_WRITE, 0, &cp->lock)) != 0) { 348 (void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority); 349 return (ret); 350 } 351 if (writelock++ == 0) 352 goto recheck; 353 354 if (((ret = __memp_dirty(mpf, &meta, 355 dbc->thread_info, dbc->txn, dbc->priority, DB_MPOOL_DIRTY)) != 0 || 356 (DBC_LOGGING(dbc) && 357 (ret = __qam_mvptr_log(dbp, dbc->txn, 358 &meta->dbmeta.lsn, 0, opcode, meta->first_recno, 359 new_first, meta->cur_recno, new_cur, 360 &meta->dbmeta.lsn, PGNO_BASE_MD)) != 0))) 361 opcode = 0; 362 363 if (opcode & QAM_SETCUR) 364 meta->cur_recno = new_cur; 365 if (opcode & QAM_SETFIRST) 366 meta->first_recno = new_first; 367 368done: if ((t_ret = __memp_fput(mpf, 369 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 370 ret = t_ret; 371 372 /* Don't hold the meta page long term. */ 373 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) 374 ret = t_ret; 375 return (ret); 376} 377 378/* 379 * __qam_append -- 380 * Perform a put(DB_APPEND) in queue. 381 * 382 * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *)); 383 */ 384int 385__qam_append(dbc, key, data) 386 DBC *dbc; 387 DBT *key, *data; 388{ 389 DB *dbp; 390 DB_LOCK lock; 391 DB_MPOOLFILE *mpf; 392 QMETA *meta; 393 QPAGE *page; 394 QUEUE *qp; 395 QUEUE_CURSOR *cp; 396 db_pgno_t pg; 397 db_recno_t recno; 398 int ret, t_ret; 399 400 dbp = dbc->dbp; 401 mpf = dbp->mpf; 402 cp = (QUEUE_CURSOR *)dbc->internal; 403 404 pg = ((QUEUE *)dbp->q_internal)->q_meta; 405 /* 406 * Get the meta page first, we don't want to write lock it while 407 * trying to pin it. 408 */ 409 if ((ret = __memp_fget(mpf, &pg, 410 dbc->thread_info, dbc->txn, DB_MPOOL_DIRTY, &meta)) != 0) 411 return (ret); 412 /* Write lock the meta page. */ 413 if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) { 414 (void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority); 415 return (ret); 416 } 417 418 /* Get the next record number. */ 419 recno = meta->cur_recno; 420 meta->cur_recno++; 421 if (meta->cur_recno == RECNO_OOB) 422 meta->cur_recno++; 423 if (meta->cur_recno == meta->first_recno) { 424 meta->cur_recno--; 425 if (meta->cur_recno == RECNO_OOB) 426 meta->cur_recno--; 427 ret = __LPUT(dbc, lock); 428 429 if (ret == 0) 430 ret = EFBIG; 431 goto err; 432 } 433 434 if (QAM_BEFORE_FIRST(meta, recno)) 435 meta->first_recno = recno; 436 437 /* Lock the record and release meta page lock. */ 438 ret = __db_lget(dbc, LCK_COUPLE_ALWAYS, 439 recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock); 440 441 /* 442 * The application may modify the data based on the selected record 443 * number. We always want to call this even if we ultimately end 444 * up aborting, because we are allocating a record number, regardless. 445 */ 446 if (dbc->dbp->db_append_recno != NULL && 447 (t_ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0 && 448 ret == 0) 449 ret = t_ret; 450 451 /* 452 * Capture errors from either the lock couple or the call to 453 * dbp->db_append_recno. 454 */ 455 if (ret != 0) { 456 (void)__LPUT(dbc, lock); 457 goto err; 458 } 459 460 cp->lock = lock; 461 cp->lock_mode = DB_LOCK_WRITE; 462 463 pg = QAM_RECNO_PAGE(dbp, recno); 464 465 /* Fetch and write lock the data page. */ 466 if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) 467 goto err; 468 if ((ret = __qam_fget(dbc, &pg, 469 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &page)) != 0) { 470 /* We did not fetch it, we can release the lock. */ 471 (void)__LPUT(dbc, lock); 472 goto err; 473 } 474 475 /* See if this is a new page. */ 476 if (page->pgno == 0) { 477 page->pgno = pg; 478 page->type = P_QAMDATA; 479 } 480 481 /* Put the item on the page and log it. */ 482 ret = __qam_pitem(dbc, page, 483 QAM_RECNO_INDEX(dbp, pg, recno), recno, data); 484 485 /* Doing record locking, release the page lock */ 486 if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) 487 ret = t_ret; 488 489 if ((t_ret = __qam_fput(dbc, 490 pg, page, dbc->priority)) != 0 && ret == 0) 491 ret = t_ret; 492 493 /* Return the record number to the user. */ 494 if (ret == 0 && key != NULL) 495 ret = __db_retcopy(dbp->env, key, 496 &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen); 497 498 /* Position the cursor on this record. */ 499 cp->recno = recno; 500 501 /* See if we are leaving the extent. */ 502 qp = (QUEUE *) dbp->q_internal; 503 if (qp->page_ext != 0 && 504 (recno % (qp->page_ext * qp->rec_page) == 0 || 505 recno == UINT32_MAX)) { 506 if ((ret = __db_lget(dbc, 507 0, ((QUEUE *)dbp->q_internal)->q_meta, 508 DB_LOCK_WRITE, 0, &lock)) != 0) 509 goto err; 510 if (!QAM_AFTER_CURRENT(meta, recno)) 511 ret = __qam_fclose(dbp, pg); 512 if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0) 513 ret = t_ret; 514 } 515 516err: /* Release the meta page. */ 517 if ((t_ret = __memp_fput(mpf, 518 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 519 ret = t_ret; 520 521 return (ret); 522} 523 524/* 525 * __qamc_del -- 526 * Qam cursor->am_del function 527 */ 528static int 529__qamc_del(dbc) 530 DBC *dbc; 531{ 532 DB *dbp; 533 DBT data; 534 DB_LOCK lock, metalock; 535 DB_MPOOLFILE *mpf; 536 PAGE *pagep; 537 QAMDATA *qp; 538 QMETA *meta; 539 QUEUE_CURSOR *cp; 540 db_pgno_t pg; 541 int exact, ret, t_ret; 542 543 dbp = dbc->dbp; 544 mpf = dbp->mpf; 545 cp = (QUEUE_CURSOR *)dbc->internal; 546 LOCK_INIT(lock); 547 548 pg = ((QUEUE *)dbp->q_internal)->q_meta; 549 /* 550 * Get the meta page first, we don't want to write lock it while 551 * trying to pin it. 552 */ 553 if ((ret = __memp_fget(mpf, &pg, 554 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 555 return (ret); 556 /* Write lock the meta page. */ 557 if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &metalock)) != 0) { 558 (void)__memp_fput(mpf, dbc->thread_info, meta, dbc->priority); 559 return (ret); 560 } 561 562 if (QAM_NOT_VALID(meta, cp->recno)) 563 ret = DB_NOTFOUND; 564 565 /* Don't hold the meta page long term. */ 566 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 567 ret = t_ret; 568 569 if (ret != 0) 570 goto err; 571 572 if ((ret = __db_lget(dbc, LCK_COUPLE, 573 cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &cp->lock)) != 0) 574 goto err; 575 cp->lock_mode = DB_LOCK_WRITE; 576 lock = cp->lock; 577 578 /* Find the record ; delete only deletes exact matches. */ 579 if ((ret = __qam_position(dbc, &cp->recno, DB_LOCK_WRITE, 580 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &exact)) != 0) 581 goto err; 582 583 if (!exact) { 584 ret = DB_NOTFOUND; 585 goto err; 586 } 587 588 pagep = cp->page; 589 qp = QAM_GET_RECORD(dbp, pagep, cp->indx); 590 591 if (DBC_LOGGING(dbc)) { 592 if (((QUEUE *)dbp->q_internal)->page_ext == 0 || 593 ((QUEUE *)dbp->q_internal)->re_len == 0) { 594 if ((ret = __qam_del_log(dbp, 595 dbc->txn, &LSN(pagep), 0, &LSN(pagep), 596 pagep->pgno, cp->indx, cp->recno)) != 0) 597 goto err; 598 } else { 599 data.size = ((QUEUE *)dbp->q_internal)->re_len; 600 data.data = qp->data; 601 if ((ret = __qam_delext_log(dbp, 602 dbc->txn, &LSN(pagep), 0, &LSN(pagep), 603 pagep->pgno, cp->indx, cp->recno, &data)) != 0) 604 goto err; 605 } 606 } else 607 LSN_NOT_LOGGED(LSN(pagep)); 608 609 F_CLR(qp, QAM_VALID); 610 611 /* 612 * Peek at the first_recno before locking the meta page. 613 * Other threads cannot move first_recno past 614 * our position while we have the record locked. 615 * If it's pointing at the deleted record then lock 616 * the metapage and check again as lower numbered 617 * record may have been inserted. 618 */ 619 if (cp->recno == meta->first_recno) { 620 pg = ((QUEUE *)dbp->q_internal)->q_meta; 621 if ((ret = 622 __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &metalock)) != 0) 623 goto err; 624 if (cp->recno == meta->first_recno) 625 ret = __qam_consume(dbc, meta, meta->first_recno); 626 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 627 ret = t_ret; 628 } 629 630err: if ((t_ret = __memp_fput(mpf, dbc->thread_info, 631 meta, dbc->priority)) != 0 && ret == 0) 632 ret = t_ret; 633 if (cp->page != NULL && 634 (t_ret = __qam_fput(dbc, 635 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 636 ret = t_ret; 637 cp->page = NULL; 638 639 /* Doing record locking, release the page lock */ 640 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) 641 ret = t_ret; 642 cp->lock = lock; 643 644 return (ret); 645} 646 647#ifdef DEBUG_WOP 648#define QDEBUG 649#endif 650 651/* 652 * __qamc_get -- 653 * Queue DBC->get function. 654 */ 655static int 656__qamc_get(dbc, key, data, flags, pgnop) 657 DBC *dbc; 658 DBT *key, *data; 659 u_int32_t flags; 660 db_pgno_t *pgnop; 661{ 662 DB *dbp; 663 DBC *dbcdup; 664 DBT tmp; 665 DB_LOCK lock, pglock, metalock; 666 DB_MPOOLFILE *mpf; 667 ENV *env; 668 PAGE *pg; 669 QAMDATA *qp; 670 QMETA *meta; 671 QUEUE *t; 672 QUEUE_CURSOR *cp; 673 db_lockmode_t lock_mode, meta_mode; 674 db_pgno_t metapno; 675 db_recno_t first; 676 int exact, inorder, is_first, locked, ret, t_ret, wait, with_delete; 677 int retrying; 678 679 dbp = dbc->dbp; 680 env = dbp->env; 681 mpf = dbp->mpf; 682 cp = (QUEUE_CURSOR *)dbc->internal; 683 LOCK_INIT(lock); 684 LOCK_INIT(pglock); 685 686 lock_mode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; 687 meta_mode = DB_LOCK_READ; 688 meta = NULL; 689 *pgnop = 0; 690 pg = NULL; 691 retrying = t_ret = wait = with_delete = 0; 692 693 if (flags == DB_CONSUME_WAIT) { 694 wait = 1; 695 flags = DB_CONSUME; 696 } 697 if (flags == DB_CONSUME) { 698 with_delete = 1; 699 flags = DB_FIRST; 700 meta_mode = lock_mode = DB_LOCK_WRITE; 701 } 702 inorder = F_ISSET(dbp, DB_AM_INORDER) && with_delete; 703 704 DEBUG_LREAD(dbc, dbc->txn, "qamc_get", 705 flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); 706 707 /* Make lint and friends happy. */ 708 locked = 0; 709 710 is_first = 0; 711 first = 0; 712 713 t = (QUEUE *)dbp->q_internal; 714 metapno = t->q_meta; 715 LOCK_INIT(metalock); 716 717 /* 718 * Get the meta page first, we don't want to write lock it while 719 * trying to pin it. This is because someone my have it pinned 720 * but not locked. 721 */ 722 if ((ret = __memp_fget(mpf, &metapno, 723 dbc->thread_info, dbc->txn, 0, &meta)) != 0) 724 return (ret); 725 726get_next: 727 switch (flags) { 728 case DB_NEXT: 729 case DB_NEXT_NODUP: 730 case DB_FIRST: 731 case DB_PREV: 732 case DB_PREV_NODUP: 733 case DB_LAST: 734 if ((ret = __db_lget(dbc, 735 0, metapno, meta_mode, 0, &metalock)) != 0) 736 goto err; 737 locked = 1; 738 break; 739 default: 740 break; 741 } 742 743 /* Release any previous lock if not in a transaction. */ 744 if ((ret = __TLPUT(dbc, cp->lock)) != 0) 745 goto err; 746 747retry: /* Update the record number. */ 748 switch (flags) { 749 case DB_CURRENT: 750 break; 751 case DB_NEXT_DUP: 752 case DB_PREV_DUP: 753 ret = DB_NOTFOUND; 754 goto err; 755 /* NOTREACHED */ 756 case DB_NEXT: 757 case DB_NEXT_NODUP: 758 if (cp->recno != RECNO_OOB) { 759 ++cp->recno; 760 /* Wrap around, skipping zero. */ 761 if (cp->recno == RECNO_OOB) 762 cp->recno++; 763 /* 764 * Check to see if we are out of data. 765 */ 766 if (QAM_AFTER_CURRENT(meta, cp->recno)) { 767 pg = NULL; 768 if (!wait) { 769 ret = DB_NOTFOUND; 770 goto err; 771 } 772 flags = DB_FIRST; 773 /* 774 * If first is not set, then we skipped 775 * a locked record, go back and find it. 776 * If we find a locked record again 777 * wait for it. 778 */ 779 if (first == 0) { 780 retrying = 1; 781 goto retry; 782 } 783 784 if (CDB_LOCKING(env)) { 785 /* Drop the metapage before we wait. */ 786 ret = __memp_fput(mpf, dbc->thread_info, 787 meta, dbc->priority); 788 meta = NULL; 789 if (ret != 0) 790 goto err; 791 if ((ret = __lock_get( 792 env, dbc->locker, 793 DB_LOCK_SWITCH, &dbc->lock_dbt, 794 DB_LOCK_WAIT, &dbc->mylock)) != 0) 795 goto err; 796 797 if ((ret = __memp_fget(mpf, &metapno, 798 dbc->thread_info, 799 dbc->txn, 0, &meta)) != 0) 800 goto err; 801 if ((ret = __lock_get( 802 env, dbc->locker, 803 DB_LOCK_UPGRADE, &dbc->lock_dbt, 804 DB_LOCK_WRITE, &dbc->mylock)) != 0) 805 goto err; 806 goto retry; 807 } 808 /* 809 * Wait for someone to update the meta page. 810 * This will probably mean there is something 811 * in the queue. We then go back up and 812 * try again. 813 */ 814 if (locked == 0) { 815 if ((ret = __db_lget(dbc, 0, metapno, 816 meta_mode, 0, &metalock)) != 0) 817 goto err; 818 locked = 1; 819 if (cp->recno != RECNO_OOB && 820 !QAM_AFTER_CURRENT(meta, cp->recno)) 821 goto retry; 822 } 823 /* Drop the metapage before we wait. */ 824 ret = __memp_fput(mpf, 825 dbc->thread_info, meta, dbc->priority); 826 meta = NULL; 827 if (ret != 0) 828 goto err; 829 if ((ret = __db_lget(dbc, 830 0, metapno, DB_LOCK_WAIT, 831 DB_LOCK_SWITCH, &metalock)) != 0) { 832 if (ret == DB_LOCK_DEADLOCK) 833 ret = DB_LOCK_NOTGRANTED; 834 goto err; 835 } 836 if ((ret = __memp_fget(mpf, 837 &metapno, dbc->thread_info, dbc->txn, 838 0, &meta)) != 0) 839 goto err; 840 if ((ret = __db_lget(dbc, 0, 841 PGNO_INVALID, DB_LOCK_WRITE, 842 DB_LOCK_UPGRADE, &metalock)) != 0) { 843 if (ret == DB_LOCK_DEADLOCK) 844 ret = DB_LOCK_NOTGRANTED; 845 goto err; 846 } 847 locked = 1; 848 goto retry; 849 } 850 break; 851 } 852 /* FALLTHROUGH */ 853 case DB_FIRST: 854 flags = DB_NEXT; 855 is_first = 1; 856 857 /* get the first record number */ 858 cp->recno = first = meta->first_recno; 859 860 break; 861 case DB_PREV: 862 case DB_PREV_NODUP: 863 if (cp->recno != RECNO_OOB) { 864 if (cp->recno == meta->first_recno || 865 QAM_BEFORE_FIRST(meta, cp->recno)) { 866 ret = DB_NOTFOUND; 867 goto err; 868 } 869 --cp->recno; 870 /* Wrap around, skipping zero. */ 871 if (cp->recno == RECNO_OOB) 872 --cp->recno; 873 break; 874 } 875 /* FALLTHROUGH */ 876 case DB_LAST: 877 if (meta->first_recno == meta->cur_recno) { 878 ret = DB_NOTFOUND; 879 goto err; 880 } 881 cp->recno = meta->cur_recno - 1; 882 if (cp->recno == RECNO_OOB) 883 cp->recno--; 884 break; 885 case DB_SET: 886 case DB_SET_RANGE: 887 case DB_GET_BOTH: 888 case DB_GET_BOTH_RANGE: 889 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0) 890 goto err; 891 break; 892 default: 893 ret = __db_unknown_flag(env, "__qamc_get", flags); 894 goto err; 895 } 896 897 /* Don't hold the meta page long term. */ 898 if (locked) { 899 if ((ret = __LPUT(dbc, metalock)) != 0) 900 goto err; 901 locked = 0; 902 } 903 904 /* Lock the record. */ 905 if (((ret = __db_lget(dbc, LCK_COUPLE, cp->recno, lock_mode, 906 (with_delete && !inorder && !retrying) ? 907 DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD, 908 &lock)) == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED) && 909 with_delete) { 910#ifdef QDEBUG 911 if (DBC_LOGGING(dbc)) 912 (void)__log_printf(env, 913 dbc->txn, "Queue S: %x %d %d %d", 914 dbc->locker ? dbc->locker->id : 0, 915 cp->recno, first, meta->first_recno); 916#endif 917 first = 0; 918 if ((ret = 919 __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) 920 goto err; 921 locked = 1; 922 goto retry; 923 } 924 925 if (ret != 0) 926 goto err; 927 928 /* 929 * In the DB_FIRST or DB_LAST cases we must wait and then start over 930 * since the first/last may have moved while we slept. If we are 931 * reading in order and the first record was not there, we can skip it 932 * as it must have been aborted was was skipped by a non-queue insert 933 * or we could not have gotten its lock. If we have the wrong 934 * record we release our locks and try again. 935 */ 936 switch (flags) { 937 default: 938 if (inorder) { 939 if (first != cp->recno) 940 break; 941 } else if (with_delete || !is_first) 942 break; 943 /* FALLTHROUGH */ 944 case DB_SET: 945 case DB_SET_RANGE: 946 case DB_GET_BOTH: 947 case DB_GET_BOTH_RANGE: 948 case DB_LAST: 949 if ((ret = 950 __db_lget(dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) 951 goto lerr; 952 locked = 1; 953 if ((is_first && cp->recno != meta->first_recno) || 954 (flags == DB_LAST && cp->recno != meta->cur_recno - 1)) { 955 if ((ret = __LPUT(dbc, lock)) != 0) 956 goto err; 957 if (is_first) 958 flags = DB_FIRST; 959 goto retry; 960 } else if (!is_first && flags != DB_LAST) { 961 if (QAM_BEFORE_FIRST(meta, cp->recno)) { 962 if (flags == DB_SET_RANGE || 963 flags == DB_GET_BOTH_RANGE) { 964 if ((ret = __LPUT(dbc, metalock)) != 0) 965 goto err; 966 locked = 0; 967 cp->lock = lock; 968 LOCK_INIT(lock); 969 goto release_retry; 970 } 971 ret = DB_NOTFOUND; 972 goto lerr; 973 } 974 if (QAM_AFTER_CURRENT(meta, cp->recno)) { 975 ret = DB_NOTFOUND; 976 goto lerr; 977 } 978 } 979 /* Don't hold the meta page long term. */ 980 if ((ret = __LPUT(dbc, metalock)) != 0) 981 goto err; 982 locked = 0; 983 } 984 985 /* Position the cursor on the record. */ 986 if ((ret = __qam_position(dbc, &cp->recno, 987 lock_mode, 0, &exact)) != 0) { 988 /* We cannot get the page, release the record lock. */ 989 (void)__LPUT(dbc, lock); 990 goto err; 991 } 992 993 pg = cp->page; 994 pglock = cp->lock; 995 cp->lock = lock; 996 cp->lock_mode = lock_mode; 997 LOCK_INIT(lock); 998 999 if (!exact) { 1000release_retry: /* Release locks and retry, if possible. */ 1001 if (pg != NULL) 1002 (void)__qam_fput(dbc, cp->pgno, pg, dbc->priority); 1003 cp->page = pg = NULL; 1004 if ((ret = __LPUT(dbc, pglock)) != 0) 1005 goto err1; 1006 if (with_delete) { 1007 if ((ret = __LPUT(dbc, cp->lock)) != 0) 1008 goto err1; 1009 } else if ((ret = __TLPUT(dbc, cp->lock)) != 0) 1010 goto err1; 1011 1012 /* 1013 * If we don't need locks and we are out of range 1014 * then we can just skip to the FIRST/LAST record 1015 * otherwise we must iterate to lock the records 1016 * and get serializability. 1017 */ 1018 switch (flags) { 1019 case DB_NEXT: 1020 case DB_NEXT_NODUP: 1021 if (!with_delete) 1022 is_first = 0; 1023 if (QAM_BEFORE_FIRST(meta, cp->recno) && 1024 DONT_NEED_LOCKS(dbc)) 1025 flags = DB_FIRST; 1026 break; 1027 case DB_LAST: 1028 case DB_PREV: 1029 case DB_PREV_NODUP: 1030 if (QAM_AFTER_CURRENT(meta, cp->recno) && 1031 DONT_NEED_LOCKS(dbc)) 1032 flags = DB_LAST; 1033 else 1034 flags = DB_PREV; 1035 break; 1036 1037 case DB_GET_BOTH_RANGE: 1038 case DB_SET_RANGE: 1039 if (QAM_BEFORE_FIRST(meta, cp->recno) && 1040 DONT_NEED_LOCKS(dbc)) 1041 flags = DB_FIRST; 1042 else 1043 flags = DB_NEXT; 1044 break; 1045 1046 default: 1047 /* this is for the SET and GET_BOTH cases */ 1048 ret = DB_KEYEMPTY; 1049 goto err1; 1050 } 1051 retrying = 0; 1052 goto get_next; 1053 } 1054 1055 qp = QAM_GET_RECORD(dbp, pg, cp->indx); 1056 1057 /* Return the data item. */ 1058 if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) { 1059 /* 1060 * Need to compare 1061 */ 1062 tmp.data = qp->data; 1063 tmp.size = t->re_len; 1064 if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) { 1065 if (flags == DB_GET_BOTH_RANGE) 1066 goto release_retry; 1067 ret = DB_NOTFOUND; 1068 goto err1; 1069 } 1070 } 1071 1072 /* Return the key if the user didn't give us one. */ 1073 if (key != NULL && !F_ISSET(key, DB_DBT_ISSET)) { 1074 if ((ret = __db_retcopy(dbp->env, 1075 key, &cp->recno, sizeof(cp->recno), 1076 &dbc->rkey->data, &dbc->rkey->ulen)) != 0) 1077 goto err1; 1078 F_SET(key, DB_DBT_ISSET); 1079 } 1080 1081 if (data != NULL && 1082 !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) && 1083 !F_ISSET(data, DB_DBT_ISSET)) { 1084 if ((ret = __db_retcopy(dbp->env, data, qp->data, t->re_len, 1085 &dbc->rdata->data, &dbc->rdata->ulen)) != 0) 1086 goto err1; 1087 F_SET(data, DB_DBT_ISSET); 1088 } 1089 1090 /* Finally, if we are doing DB_CONSUME mark the record. */ 1091 if (with_delete) { 1092 /* 1093 * Assert that we're not a secondary index. Doing a DB_CONSUME 1094 * on a secondary makes very little sense, since one can't 1095 * DB_APPEND there; attempting one should be forbidden by 1096 * the interface. 1097 */ 1098 DB_ASSERT(env, !F_ISSET(dbp, DB_AM_SECONDARY)); 1099 1100 if ((ret = __qam_dirty(dbc, 1101 cp->pgno, &cp->page, dbc->priority)) != 0) 1102 goto err1; 1103 pg = cp->page; 1104 1105 /* 1106 * Check and see if we *have* any secondary indices. 1107 * If we do, we're a primary, so call __dbc_del_primary 1108 * to delete the references to the item we're about to 1109 * delete. 1110 * 1111 * Note that we work on a duplicated cursor, since the 1112 * __db_ret work has already been done, so it's not safe 1113 * to perform any additional ops on this cursor. 1114 */ 1115 if (LIST_FIRST(&dbp->s_secondaries) != NULL) { 1116 if ((ret = __dbc_idup(dbc, 1117 &dbcdup, DB_POSITION)) != 0) 1118 goto err1; 1119 1120 if ((ret = __dbc_del_primary(dbcdup)) != 0) { 1121 /* 1122 * The __dbc_del_primary return is more 1123 * interesting. 1124 */ 1125 (void)__dbc_close(dbcdup); 1126 goto err1; 1127 } 1128 1129 if ((ret = __dbc_close(dbcdup)) != 0) 1130 goto err1; 1131 } 1132 1133 if (DBC_LOGGING(dbc)) { 1134 if (t->page_ext == 0 || t->re_len == 0) { 1135 if ((ret = __qam_del_log(dbp, dbc->txn, 1136 &LSN(pg), 0, &LSN(pg), 1137 pg->pgno, cp->indx, cp->recno)) != 0) 1138 goto err1; 1139 } else { 1140 tmp.data = qp->data; 1141 tmp.size = t->re_len; 1142 if ((ret = __qam_delext_log(dbp, 1143 dbc->txn, &LSN(pg), 0, &LSN(pg), 1144 pg->pgno, cp->indx, cp->recno, &tmp)) != 0) 1145 goto err1; 1146 } 1147 } else 1148 LSN_NOT_LOGGED(LSN(pg)); 1149 1150 F_CLR(qp, QAM_VALID); 1151 1152 if ((ret = __LPUT(dbc, pglock)) != 0) 1153 goto err1; 1154 1155 /* 1156 * Now we need to update the metapage 1157 * first pointer. If we have deleted 1158 * the record that is pointed to by 1159 * first_recno then we move it as far 1160 * forward as we can without blocking. 1161 * The metapage lock must be held for 1162 * the whole scan otherwise someone could 1163 * do a random insert behind where we are 1164 * looking. 1165 */ 1166 1167 if (locked == 0 && (ret = __db_lget( 1168 dbc, 0, metapno, meta_mode, 0, &metalock)) != 0) 1169 goto err1; 1170 locked = 1; 1171 1172#ifdef QDEBUG 1173 if (DBC_LOGGING(dbc)) 1174 (void)__log_printf(env, 1175 dbc->txn, "Queue D: %x %d %d %d", 1176 dbc->locker ? dbc->locker->id : 0, 1177 cp->recno, first, meta->first_recno); 1178#endif 1179 /* 1180 * See if we deleted the "first" record. If 1181 * first is zero then we skipped something, 1182 * see if first_recno has been move passed 1183 * that to the record that we deleted. 1184 */ 1185 if (first == 0) 1186 first = cp->recno; 1187 if (first != meta->first_recno) 1188 goto done; 1189 1190 if ((ret = __qam_consume(dbc, meta, first)) != 0) 1191 goto err1; 1192 } 1193 1194done: 1195err1: if (cp->page != NULL) { 1196 if ((t_ret = __qam_fput(dbc, 1197 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 1198 ret = t_ret; 1199 1200 /* Doing record locking, release the page lock */ 1201 if ((t_ret = __LPUT(dbc, pglock)) != 0 && ret == 0) 1202 ret = t_ret; 1203 cp->page = NULL; 1204 } 1205 if (0) { 1206lerr: (void)__LPUT(dbc, lock); 1207 } 1208 1209err: if (meta) { 1210 /* Release the meta page. */ 1211 if ((t_ret = __memp_fput(mpf, 1212 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 1213 ret = t_ret; 1214 1215 /* Don't hold the meta page long term. */ 1216 if (locked) 1217 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 1218 ret = t_ret; 1219 } 1220 DB_ASSERT(env, !LOCK_ISSET(metalock)); 1221 1222 return ((ret == DB_LOCK_NOTGRANTED && !F_ISSET(env->dbenv, 1223 DB_ENV_TIME_NOTGRANTED)) ? DB_LOCK_DEADLOCK : ret); 1224} 1225 1226/* 1227 * __qam_consume -- try to reset the head of the queue. 1228 * 1229 */ 1230static int 1231__qam_consume(dbc, meta, first) 1232 DBC *dbc; 1233 QMETA *meta; 1234 db_recno_t first; 1235{ 1236 DB *dbp; 1237 DB_LOCK lock, save_lock; 1238 DB_MPOOLFILE *mpf; 1239 QUEUE_CURSOR *cp; 1240 db_indx_t save_indx; 1241 db_pgno_t save_page; 1242 db_recno_t current, save_recno; 1243 u_int32_t rec_extent; 1244 int exact, ret, t_ret, wrapped; 1245 1246 dbp = dbc->dbp; 1247 mpf = dbp->mpf; 1248 cp = (QUEUE_CURSOR *)dbc->internal; 1249 ret = 0; 1250 1251 save_page = cp->pgno; 1252 save_indx = cp->indx; 1253 save_recno = cp->recno; 1254 save_lock = cp->lock; 1255 1256 /* 1257 * If we skipped some deleted records, we need to 1258 * reposition on the first one. Get a lock 1259 * in case someone is trying to put it back. 1260 */ 1261 if (first != cp->recno) { 1262 ret = __db_lget(dbc, 0, first, DB_LOCK_READ, 1263 DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); 1264 if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) { 1265 ret = 0; 1266 goto done; 1267 } 1268 if (ret != 0) 1269 goto done; 1270 if ((ret = 1271 __qam_fput(dbc, cp->pgno, cp->page, dbc->priority)) != 0) 1272 goto done; 1273 cp->page = NULL; 1274 if ((ret = __qam_position(dbc, 1275 &first, DB_LOCK_READ, 0, &exact)) != 0 || exact != 0) { 1276 (void)__LPUT(dbc, lock); 1277 goto done; 1278 } 1279 if ((ret =__LPUT(dbc, lock)) != 0) 1280 goto done; 1281 if ((ret = __LPUT(dbc, cp->lock)) != 0) 1282 goto done; 1283 } 1284 1285 current = meta->cur_recno; 1286 wrapped = 0; 1287 if (first > current) 1288 wrapped = 1; 1289 rec_extent = meta->page_ext * meta->rec_page; 1290 1291 /* Loop until we find a record or hit current */ 1292 for (;;) { 1293 /* 1294 * Check to see if we are moving off the extent 1295 * and remove the extent. 1296 * If we are moving off a page we need to 1297 * get rid of the buffer. 1298 * Wait for the lagging readers to move off the 1299 * page. 1300 */ 1301 if (cp->page != NULL && rec_extent != 0 && 1302 ((exact = (first % rec_extent == 0)) || 1303 (first % meta->rec_page == 0) || 1304 first == UINT32_MAX)) { 1305 if (exact == 1 && (ret = __db_lget(dbc, 1306 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0) 1307 break; 1308#ifdef QDEBUG 1309 if (DBC_LOGGING(dbc)) 1310 (void)__log_printf(dbp->env, dbc->txn, 1311 "Queue R: %x %d %d %d", 1312 dbc->locker ? dbc->locker->id : 0, 1313 cp->pgno, first, meta->first_recno); 1314#endif 1315 if ((ret = __qam_fput(dbc, 1316 cp->pgno, cp->page, DB_PRIORITY_VERY_LOW)) != 0) 1317 break; 1318 cp->page = NULL; 1319 1320 if (exact == 1) { 1321 ret = __qam_fremove(dbp, cp->pgno); 1322 if ((t_ret = 1323 __LPUT(dbc, cp->lock)) != 0 && ret == 0) 1324 ret = t_ret; 1325 } 1326 if (ret != 0) 1327 break; 1328 } else if (cp->page != NULL && (ret = __qam_fput(dbc, 1329 cp->pgno, cp->page, dbc->priority)) != 0) 1330 break; 1331 cp->page = NULL; 1332 first++; 1333 if (first == RECNO_OOB) { 1334 wrapped = 0; 1335 first++; 1336 } 1337 1338 /* 1339 * LOOP EXIT when we come move to the current 1340 * pointer. 1341 */ 1342 if (!wrapped && first >= current) 1343 break; 1344 1345 ret = __db_lget(dbc, 0, first, DB_LOCK_READ, 1346 DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock); 1347 if (ret == DB_LOCK_NOTGRANTED || ret == DB_LOCK_DEADLOCK) { 1348 ret = 0; 1349 break; 1350 } 1351 if (ret != 0) 1352 break; 1353 1354 if ((ret = __qam_position(dbc, 1355 &first, DB_LOCK_READ, 0, &exact)) != 0) { 1356 (void)__LPUT(dbc, lock); 1357 break; 1358 } 1359 if ((ret =__LPUT(dbc, lock)) != 0 || 1360 (ret = __LPUT(dbc, cp->lock)) != 0 || exact) { 1361 if ((t_ret = __qam_fput(dbc, cp->pgno, 1362 cp->page, dbc->priority)) != 0 && ret == 0) 1363 ret = t_ret; 1364 cp->page = NULL; 1365 break; 1366 } 1367 } 1368 1369 cp->pgno = save_page; 1370 cp->indx = save_indx; 1371 cp->recno = save_recno; 1372 cp->lock = save_lock; 1373 1374 /* 1375 * We have advanced as far as we can. 1376 * Advance first_recno to this point. 1377 */ 1378 if (ret == 0 && meta->first_recno != first) { 1379 if ((ret = __memp_dirty(mpf, 1380 &meta, dbc->thread_info, dbc->txn, dbc->priority, 0)) != 0) 1381 goto done; 1382#ifdef QDEBUG 1383 if (DBC_LOGGING(dbc)) 1384 (void)__log_printf(dbp->env, dbc->txn, 1385 "Queue M: %x %d %d %d", 1386 dbc->locker ? dbc->locker->id : 0, 1387 cp->recno, first, meta->first_recno); 1388#endif 1389 if (DBC_LOGGING(dbc)) { 1390 if ((ret = __qam_incfirst_log(dbp, 1391 dbc->txn, &meta->dbmeta.lsn, 0, 1392 cp->recno, PGNO_BASE_MD)) != 0) 1393 goto done; 1394 } else 1395 LSN_NOT_LOGGED(meta->dbmeta.lsn); 1396 meta->first_recno = first; 1397 } 1398 1399done: 1400 return (ret); 1401} 1402 1403static int 1404__qam_bulk(dbc, data, flags) 1405 DBC *dbc; 1406 DBT *data; 1407 u_int32_t flags; 1408{ 1409 DB *dbp; 1410 DB_LOCK metalock, rlock; 1411 DB_MPOOLFILE *mpf; 1412 PAGE *pg; 1413 QAMDATA *qp; 1414 QMETA *meta; 1415 QUEUE_CURSOR *cp; 1416 db_indx_t indx; 1417 db_lockmode_t lkmode; 1418 db_pgno_t metapno; 1419 u_int32_t *endp, *offp; 1420 u_int32_t pagesize, re_len, recs; 1421 u_int8_t *dbuf, *dp, *np; 1422 int exact, ret, t_ret, valid; 1423 int is_key, need_pg, size, space; 1424 1425 dbp = dbc->dbp; 1426 mpf = dbp->mpf; 1427 cp = (QUEUE_CURSOR *)dbc->internal; 1428 1429 lkmode = F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ; 1430 1431 pagesize = dbp->pgsize; 1432 re_len = ((QUEUE *)dbp->q_internal)->re_len; 1433 recs = ((QUEUE *)dbp->q_internal)->rec_page; 1434 metapno = ((QUEUE *)dbp->q_internal)->q_meta; 1435 1436 is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0; 1437 size = 0; 1438 1439 if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0) 1440 return (ret); 1441 if ((ret = __memp_fget(mpf, &metapno, 1442 dbc->thread_info, dbc->txn, 0, &meta)) != 0) { 1443 /* We did not fetch it, we can release the lock. */ 1444 (void)__LPUT(dbc, metalock); 1445 return (ret); 1446 } 1447 1448 dbuf = data->data; 1449 np = dp = dbuf; 1450 1451 /* Keep track of space that is left. There is an termination entry */ 1452 space = (int)data->ulen; 1453 space -= (int)sizeof(*offp); 1454 1455 /* Build the offset/size table from the end up. */ 1456 endp = (u_int32_t *)((u_int8_t *)dbuf + data->ulen); 1457 endp--; 1458 offp = endp; 1459 /* Save the lock on the current position of the cursor. */ 1460 rlock = cp->lock; 1461 LOCK_INIT(cp->lock); 1462 1463next_pg: 1464 /* Wrap around, skipping zero. */ 1465 if (cp->recno == RECNO_OOB) 1466 cp->recno++; 1467 if ((ret = __qam_position(dbc, &cp->recno, lkmode, 0, &exact)) != 0) 1468 goto done; 1469 1470 pg = cp->page; 1471 indx = cp->indx; 1472 need_pg = 1; 1473 1474 do { 1475 /* 1476 * If this page is a nonexistent page at the end of an 1477 * extent, pg may be NULL. A NULL page has no valid records, 1478 * so just keep looping as though qp exists and isn't QAM_VALID; 1479 * calling QAM_GET_RECORD is unsafe. 1480 */ 1481 valid = 0; 1482 1483 if (pg != NULL) { 1484 if ((ret = __db_lget(dbc, LCK_COUPLE, 1485 cp->recno, lkmode, DB_LOCK_RECORD, &rlock)) != 0) 1486 goto done; 1487 qp = QAM_GET_RECORD(dbp, pg, indx); 1488 if (F_ISSET(qp, QAM_VALID)) { 1489 valid = 1; 1490 space -= (int) 1491 ((is_key ? 3 : 2) * sizeof(*offp)); 1492 if (space < 0) 1493 goto get_space; 1494 if (need_pg) { 1495 dp = np; 1496 size = (int)pagesize - QPAGE_SZ(dbp); 1497 if (space < size) { 1498get_space: 1499 if (offp == endp) { 1500 data->size = (u_int32_t) 1501 DB_ALIGN((u_int32_t) 1502 size + pagesize, 1503 sizeof(u_int32_t)); 1504 ret = DB_BUFFER_SMALL; 1505 break; 1506 } 1507 if (indx != 0) 1508 indx--; 1509 cp->recno--; 1510 space = 0; 1511 break; 1512 } 1513 memcpy(dp, 1514 (u_int8_t *)pg + QPAGE_SZ(dbp), 1515 (u_int)size); 1516 need_pg = 0; 1517 space -= size; 1518 np += size; 1519 } 1520 if (is_key) 1521 *offp-- = cp->recno; 1522 *offp-- = (u_int32_t)((((u_int8_t *)qp - 1523 (u_int8_t *)pg) - QPAGE_SZ(dbp)) + 1524 (dp - dbuf) + SSZA(QAMDATA, data)); 1525 *offp-- = re_len; 1526 } 1527 } 1528 if (!valid && is_key == 0) { 1529 *offp-- = 0; 1530 *offp-- = 0; 1531 } 1532 cp->recno++; 1533 } while (++indx < recs && cp->recno != RECNO_OOB && 1534 !QAM_AFTER_CURRENT(meta, cp->recno)); 1535 1536 /* Drop the page lock. */ 1537 if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0) 1538 ret = t_ret; 1539 1540 if (cp->page != NULL) { 1541 if ((t_ret = __qam_fput(dbc, 1542 cp->pgno, cp->page, dbc->priority)) != 0 && ret == 0) 1543 ret = t_ret; 1544 cp->page = NULL; 1545 } 1546 1547 if (ret == 0 && space > 0 && 1548 (indx >= recs || cp->recno == RECNO_OOB) && 1549 !QAM_AFTER_CURRENT(meta, cp->recno)) 1550 goto next_pg; 1551 1552 /* 1553 * Correct recno in two cases: 1554 * 1) If we just wrapped fetch must start at record 1 not a FIRST. 1555 * 2) We ran out of space exactly at the end of a page. 1556 */ 1557 if (cp->recno == RECNO_OOB || (space == 0 && indx == recs)) 1558 cp->recno--; 1559 1560 if (is_key == 1) 1561 *offp = RECNO_OOB; 1562 else 1563 *offp = (u_int32_t)-1; 1564 1565done: /* Release the meta page. */ 1566 if ((t_ret = __memp_fput(mpf, 1567 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 1568 ret = t_ret; 1569 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 1570 ret = t_ret; 1571 1572 cp->lock = rlock; 1573 1574 return (ret); 1575} 1576 1577/* 1578 * __qamc_close -- 1579 * Close down the cursor from a single use. 1580 */ 1581static int 1582__qamc_close(dbc, root_pgno, rmroot) 1583 DBC *dbc; 1584 db_pgno_t root_pgno; 1585 int *rmroot; 1586{ 1587 QUEUE_CURSOR *cp; 1588 int ret; 1589 1590 COMPQUIET(root_pgno, 0); 1591 COMPQUIET(rmroot, NULL); 1592 1593 cp = (QUEUE_CURSOR *)dbc->internal; 1594 1595 /* Discard any locks not acquired inside of a transaction. */ 1596 ret = __TLPUT(dbc, cp->lock); 1597 1598 LOCK_INIT(cp->lock); 1599 cp->page = NULL; 1600 cp->pgno = PGNO_INVALID; 1601 cp->indx = 0; 1602 cp->lock_mode = DB_LOCK_NG; 1603 cp->recno = RECNO_OOB; 1604 cp->flags = 0; 1605 1606 return (ret); 1607} 1608 1609/* 1610 * __qamc_dup -- 1611 * Duplicate a queue cursor, such that the new one holds appropriate 1612 * locks for the position of the original. 1613 * 1614 * PUBLIC: int __qamc_dup __P((DBC *, DBC *)); 1615 */ 1616int 1617__qamc_dup(orig_dbc, new_dbc) 1618 DBC *orig_dbc, *new_dbc; 1619{ 1620 QUEUE_CURSOR *orig, *new; 1621 1622 orig = (QUEUE_CURSOR *)orig_dbc->internal; 1623 new = (QUEUE_CURSOR *)new_dbc->internal; 1624 1625 new->recno = orig->recno; 1626 1627 return (0); 1628} 1629 1630/* 1631 * __qamc_init 1632 * 1633 * PUBLIC: int __qamc_init __P((DBC *)); 1634 */ 1635int 1636__qamc_init(dbc) 1637 DBC *dbc; 1638{ 1639 DB *dbp; 1640 QUEUE_CURSOR *cp; 1641 int ret; 1642 1643 dbp = dbc->dbp; 1644 1645 /* Allocate the internal structure. */ 1646 cp = (QUEUE_CURSOR *)dbc->internal; 1647 if (cp == NULL) { 1648 if ((ret = 1649 __os_calloc(dbp->env, 1, sizeof(QUEUE_CURSOR), &cp)) != 0) 1650 return (ret); 1651 dbc->internal = (DBC_INTERNAL *)cp; 1652 } 1653 1654 /* Initialize methods. */ 1655 dbc->close = dbc->c_close = __dbc_close_pp; 1656 dbc->count = dbc->c_count = __dbc_count_pp; 1657 dbc->del = dbc->c_del = __dbc_del_pp; 1658 dbc->dup = dbc->c_dup = __dbc_dup_pp; 1659 dbc->get = dbc->c_get = __dbc_get_pp; 1660 dbc->pget = dbc->c_pget = __dbc_pget_pp; 1661 dbc->put = dbc->c_put = __dbc_put_pp; 1662 dbc->am_bulk = __qam_bulk; 1663 dbc->am_close = __qamc_close; 1664 dbc->am_del = __qamc_del; 1665 dbc->am_destroy = __qamc_destroy; 1666 dbc->am_get = __qamc_get; 1667 dbc->am_put = __qamc_put; 1668 dbc->am_writelock = NULL; 1669 1670 return (0); 1671} 1672 1673/* 1674 * __qamc_destroy -- 1675 * Close a single cursor -- internal version. 1676 */ 1677static int 1678__qamc_destroy(dbc) 1679 DBC *dbc; 1680{ 1681 /* Discard the structures. */ 1682 __os_free(dbc->env, dbc->internal); 1683 1684 return (0); 1685} 1686 1687/* 1688 * __qam_getno -- 1689 * Check the user's record number. 1690 */ 1691static int 1692__qam_getno(dbp, key, rep) 1693 DB *dbp; 1694 const DBT *key; 1695 db_recno_t *rep; 1696{ 1697 /* If passed an empty DBT from Java, key->data may be NULL */ 1698 if (key->size != sizeof(db_recno_t)) { 1699 __db_errx(dbp->env, "illegal record number size"); 1700 return (EINVAL); 1701 } 1702 1703 if ((*rep = *(db_recno_t *)key->data) == 0) { 1704 __db_errx(dbp->env, "illegal record number of 0"); 1705 return (EINVAL); 1706 } 1707 return (0); 1708} 1709 1710/* 1711 * __qam_truncate -- 1712 * Truncate a queue database 1713 * 1714 * PUBLIC: int __qam_truncate __P((DBC *, u_int32_t *)); 1715 */ 1716int 1717__qam_truncate(dbc, countp) 1718 DBC *dbc; 1719 u_int32_t *countp; 1720{ 1721 DB *dbp; 1722 DB_LOCK metalock; 1723 DB_MPOOLFILE *mpf; 1724 QMETA *meta; 1725 db_pgno_t metapno; 1726 u_int32_t count; 1727 int ret, t_ret; 1728 1729 dbp = dbc->dbp; 1730 1731 /* Walk the queue, counting rows. */ 1732 for (count = 0; 1733 (ret = __qamc_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0;) 1734 count++; 1735 if (ret != DB_NOTFOUND) 1736 return (ret); 1737 1738 /* Update the meta page. */ 1739 metapno = ((QUEUE *)dbp->q_internal)->q_meta; 1740 if ((ret = 1741 __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0) 1742 return (ret); 1743 1744 mpf = dbp->mpf; 1745 if ((ret = __memp_fget(mpf, &metapno, dbc->thread_info, dbc->txn, 1746 DB_MPOOL_DIRTY, &meta)) != 0) { 1747 /* We did not fetch it, we can release the lock. */ 1748 (void)__LPUT(dbc, metalock); 1749 return (ret); 1750 } 1751 /* Remove the last extent file. */ 1752 if (meta->cur_recno > 1 && ((QUEUE *)dbp->q_internal)->page_ext != 0) { 1753 if ((ret = __qam_fremove(dbp, 1754 QAM_RECNO_PAGE(dbp, meta->cur_recno - 1))) != 0) 1755 return (ret); 1756 } 1757 1758 if (DBC_LOGGING(dbc)) { 1759 ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0, 1760 QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno, 1761 1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD); 1762 } else 1763 LSN_NOT_LOGGED(meta->dbmeta.lsn); 1764 if (ret == 0) 1765 meta->first_recno = meta->cur_recno = 1; 1766 1767 if ((t_ret = __memp_fput(mpf, 1768 dbc->thread_info, meta, dbc->priority)) != 0 && ret == 0) 1769 ret = t_ret; 1770 if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0) 1771 ret = t_ret; 1772 1773 if (countp != NULL) 1774 *countp = count; 1775 1776 return (ret); 1777} 1778 1779/* 1780 * __qam_delete -- 1781 * Queue fast delete function. 1782 * 1783 * PUBLIC: int __qam_delete __P((DBC *, DBT *)); 1784 */ 1785int 1786__qam_delete(dbc, key) 1787 DBC *dbc; 1788 DBT *key; 1789{ 1790 QUEUE_CURSOR *cp; 1791 int ret; 1792 1793 cp = (QUEUE_CURSOR *)dbc->internal; 1794 if ((ret = __qam_getno(dbc->dbp, key, &cp->recno)) != 0) 1795 goto err; 1796 1797 ret = __qamc_del(dbc); 1798 1799err: return (ret); 1800} 1801