1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001,2008 Oracle. All rights reserved. 5 * 6 * $Id: rep_util.c,v 12.149 2008/03/13 16:21:04 mbrey Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/db_am.h" 14#include "dbinc/log.h" 15#include "dbinc/mp.h" 16#include "dbinc/txn.h" 17 18#ifdef REP_DIAGNOSTIC 19#include "dbinc/db_page.h" 20#include "dbinc/fop.h" 21#include "dbinc/btree.h" 22#include "dbinc/hash.h" 23#include "dbinc/qam.h" 24#endif 25 26/* 27 * rep_util.c: 28 * Miscellaneous replication-related utility functions, including 29 * those called by other subsystems. 30 */ 31#define TIMESTAMP_CHECK(env, ts, renv) do { \ 32 if (renv->op_timestamp != 0 && \ 33 renv->op_timestamp + DB_REGENV_TIMEOUT < ts) { \ 34 REP_SYSTEM_LOCK(env); \ 35 F_CLR(renv, DB_REGENV_REPLOCKED); \ 36 renv->op_timestamp = 0; \ 37 REP_SYSTEM_UNLOCK(env); \ 38 } \ 39} while (0) 40 41static int __rep_lockout_int __P((ENV *, REP *, u_int32_t *, u_int32_t, 42 const char *, u_int32_t)); 43static int __rep_newmaster_empty __P((ENV *, int)); 44#ifdef REP_DIAGNOSTIC 45static void __rep_print_logmsg __P((ENV *, const DBT *, DB_LSN *)); 46#endif 47 48/* 49 * __rep_bulk_message -- 50 * This is a wrapper for putting a record into a bulk buffer. Since 51 * we have different bulk buffers, the caller must hand us the information 52 * we need to put the record into the correct buffer. All bulk buffers 53 * are protected by the REP->mtx_clientdb. 54 * 55 * PUBLIC: int __rep_bulk_message __P((ENV *, REP_BULK *, REP_THROTTLE *, 56 * PUBLIC: DB_LSN *, const DBT *, u_int32_t)); 57 */ 58int 59__rep_bulk_message(env, bulk, repth, lsn, dbt, flags) 60 ENV *env; 61 REP_BULK *bulk; 62 REP_THROTTLE *repth; 63 DB_LSN *lsn; 64 const DBT *dbt; 65 u_int32_t flags; 66{ 67 DB_REP *db_rep; 68 REP *rep; 69 __rep_bulk_args b_args; 70 size_t len; 71 int ret; 72 u_int32_t recsize, typemore; 73 u_int8_t *p; 74 75 db_rep = env->rep_handle; 76 rep = db_rep->region; 77 ret = 0; 78 79 /* 80 * Figure out the total number of bytes needed for this record. 81 */ 82 recsize = dbt->size + sizeof(DB_LSN) + sizeof(dbt->size); 83 84 /* 85 * If *this* buffer is actively being transmitted, wait until 86 * we can use it. 87 */ 88 MUTEX_LOCK(env, rep->mtx_clientdb); 89 while (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) { 90 MUTEX_UNLOCK(env, rep->mtx_clientdb); 91 __os_yield(env, 1, 0); 92 MUTEX_LOCK(env, rep->mtx_clientdb); 93 } 94 95 /* 96 * If the record is bigger than the buffer entirely, send the 97 * current buffer and then return DB_REP_BULKOVF so that this 98 * record is sent as a singleton. Do we have enough info to 99 * do that here? XXX 100 */ 101 if (recsize > bulk->len) { 102 RPRINT(env, DB_VERB_REP_MSGS, (env, 103 "bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x", 104 recsize, recsize, bulk->len)); 105 STAT(rep->stat.st_bulk_overflows++); 106 (void)__rep_send_bulk(env, bulk, flags); 107 /* 108 * XXX __rep_send_message... 109 */ 110 MUTEX_UNLOCK(env, rep->mtx_clientdb); 111 return (DB_REP_BULKOVF); 112 } 113 /* 114 * If this record doesn't fit, send the current buffer. 115 * Sending the buffer will reset the offset, but we will 116 * drop the mutex while sending so we need to keep checking 117 * if we're racing. 118 */ 119 while (recsize + *(bulk->offp) > bulk->len) { 120 RPRINT(env, DB_VERB_REP_MSGS, (env, 121 "bulk_msg: Record %lu (%#lx) doesn't fit. Send %lu (%#lx) now.", 122 (u_long)recsize, (u_long)recsize, 123 (u_long)bulk->len, (u_long)bulk->len)); 124 STAT(rep->stat.st_bulk_fills++); 125 if ((ret = __rep_send_bulk(env, bulk, flags)) != 0) { 126 MUTEX_UNLOCK(env, rep->mtx_clientdb); 127 return (ret); 128 } 129 } 130 131 /* 132 * If we're using throttling, see if we are at the throttling 133 * limit before we do any more work here, by checking if the 134 * call to rep_send_throttle changed the repth->type to the 135 * *_MORE message type. If the throttling code hits the limit 136 * then we're done here. 137 */ 138 if (bulk->type == REP_BULK_LOG) 139 typemore = REP_LOG_MORE; 140 else 141 typemore = REP_PAGE_MORE; 142 if (repth != NULL) { 143 if ((ret = __rep_send_throttle(env, 144 bulk->eid, repth, REP_THROTTLE_ONLY, flags)) != 0) { 145 MUTEX_UNLOCK(env, rep->mtx_clientdb); 146 return (ret); 147 } 148 if (repth->type == typemore) { 149 RPRINT(env, DB_VERB_REP_MSGS, (env, 150 "bulk_msg: Record %lu (0x%lx) hit throttle limit.", 151 (u_long)recsize, (u_long)recsize)); 152 MUTEX_UNLOCK(env, rep->mtx_clientdb); 153 return (ret); 154 } 155 } 156 157 /* 158 * Now we own the buffer, and we know our record fits into it. 159 * The buffer is structured with the len, LSN and then the record. 160 * Copy the record into the buffer. Then if we need to, 161 * send the buffer. 162 */ 163 p = bulk->addr + *(bulk->offp); 164 b_args.len = dbt->size; 165 b_args.lsn = *lsn; 166 b_args.bulkdata = *dbt; 167 /* 168 * If we're the first record, we need to save the first 169 * LSN in the bulk structure. 170 */ 171 if (*(bulk->offp) == 0) 172 bulk->lsn = *lsn; 173 if (rep->version < DB_REPVERSION_47) { 174 len = 0; 175 memcpy(p, &dbt->size, sizeof(dbt->size)); 176 p += sizeof(dbt->size); 177 memcpy(p, lsn, sizeof(DB_LSN)); 178 p += sizeof(DB_LSN); 179 memcpy(p, dbt->data, dbt->size); 180 p += dbt->size; 181 } else if ((ret = __rep_bulk_marshal(env, &b_args, p, 182 bulk->len, &len)) != 0) 183 goto err; 184 *(bulk->offp) = (uintptr_t)p + (uintptr_t)len - (uintptr_t)bulk->addr; 185 STAT(rep->stat.st_bulk_records++); 186 /* 187 * Send the buffer if it is a perm record or a force. 188 */ 189 if (LF_ISSET(REPCTL_PERM)) { 190 RPRINT(env, DB_VERB_REP_MSGS, (env, 191 "bulk_msg: Send buffer after copy due to PERM")); 192 ret = __rep_send_bulk(env, bulk, flags); 193 } 194err: 195 MUTEX_UNLOCK(env, rep->mtx_clientdb); 196 return (ret); 197 198} 199 200/* 201 * __rep_send_bulk -- 202 * This function transmits the bulk buffer given. It assumes the 203 * caller holds the REP->mtx_clientdb. We may release it and reacquire 204 * it during this call. We will return with it held. 205 * 206 * PUBLIC: int __rep_send_bulk __P((ENV *, REP_BULK *, u_int32_t)); 207 */ 208int 209__rep_send_bulk(env, bulkp, ctlflags) 210 ENV *env; 211 REP_BULK *bulkp; 212 u_int32_t ctlflags; 213{ 214 DBT dbt; 215 DB_REP *db_rep; 216 REP *rep; 217 int ret; 218 219 /* 220 * If the offset is 0, we're done. There is nothing to send. 221 */ 222 if (*(bulkp->offp) == 0) 223 return (0); 224 225 db_rep = env->rep_handle; 226 rep = db_rep->region; 227 228 /* 229 * Set that this buffer is being actively transmitted. 230 */ 231 FLD_SET(*(bulkp->flagsp), BULK_XMIT); 232 DB_INIT_DBT(dbt, bulkp->addr, *(bulkp->offp)); 233 MUTEX_UNLOCK(env, rep->mtx_clientdb); 234 RPRINT(env, DB_VERB_REP_MSGS, (env, 235 "send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size)); 236 237 /* 238 * Unlocked the mutex and now send the message. 239 */ 240 STAT(rep->stat.st_bulk_transfers++); 241 if ((ret = __rep_send_message(env, 242 bulkp->eid, bulkp->type, &bulkp->lsn, &dbt, ctlflags, 0)) != 0) 243 ret = DB_REP_UNAVAIL; 244 245 MUTEX_LOCK(env, rep->mtx_clientdb); 246 /* 247 * Ready the buffer for further records. 248 */ 249 *(bulkp->offp) = 0; 250 FLD_CLR(*(bulkp->flagsp), BULK_XMIT); 251 return (ret); 252} 253 254/* 255 * __rep_bulk_alloc -- 256 * This function allocates and initializes an internal bulk buffer. 257 * This is used by the master when fulfilling a request for a chunk of 258 * log records or a bunch of pages. 259 * 260 * PUBLIC: int __rep_bulk_alloc __P((ENV *, REP_BULK *, int, uintptr_t *, 261 * PUBLIC: u_int32_t *, u_int32_t)); 262 */ 263int 264__rep_bulk_alloc(env, bulkp, eid, offp, flagsp, type) 265 ENV *env; 266 REP_BULK *bulkp; 267 int eid; 268 uintptr_t *offp; 269 u_int32_t *flagsp, type; 270{ 271 int ret; 272 273 memset(bulkp, 0, sizeof(REP_BULK)); 274 *offp = *flagsp = 0; 275 bulkp->len = MEGABYTE; 276 if ((ret = __os_malloc(env, bulkp->len, &bulkp->addr)) != 0) 277 return (ret); 278 bulkp->offp = offp; 279 bulkp->type = type; 280 bulkp->eid = eid; 281 bulkp->flagsp = flagsp; 282 return (ret); 283} 284 285/* 286 * __rep_bulk_free -- 287 * This function sends the remainder of the bulk buffer and frees it. 288 * 289 * PUBLIC: int __rep_bulk_free __P((ENV *, REP_BULK *, u_int32_t)); 290 */ 291int 292__rep_bulk_free(env, bulkp, flags) 293 ENV *env; 294 REP_BULK *bulkp; 295 u_int32_t flags; 296{ 297 DB_REP *db_rep; 298 int ret; 299 300 db_rep = env->rep_handle; 301 302 MUTEX_LOCK(env, db_rep->region->mtx_clientdb); 303 ret = __rep_send_bulk(env, bulkp, flags); 304 MUTEX_UNLOCK(env, db_rep->region->mtx_clientdb); 305 __os_free(env, bulkp->addr); 306 return (ret); 307} 308 309/* 310 * __rep_send_message -- 311 * This is a wrapper for sending a message. It takes care of constructing 312 * the control structure and calling the user's specified send function. 313 * 314 * PUBLIC: int __rep_send_message __P((ENV *, int, 315 * PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t, u_int32_t)); 316 */ 317int 318__rep_send_message(env, eid, rtype, lsnp, dbt, ctlflags, repflags) 319 ENV *env; 320 int eid; 321 u_int32_t rtype; 322 DB_LSN *lsnp; 323 const DBT *dbt; 324 u_int32_t ctlflags, repflags; 325{ 326 DBT cdbt, scrap_dbt; 327 DB_ENV *dbenv; 328 DB_LOG *dblp; 329 DB_REP *db_rep; 330 LOG *lp; 331 REP *rep; 332 REP_46_CONTROL cntrl46; 333 REP_OLD_CONTROL ocntrl; 334 __rep_control_args cntrl; 335 db_timespec msg_time; 336 int ret; 337 u_int32_t myflags, rectype; 338 u_int8_t buf[__REP_CONTROL_SIZE]; 339 size_t len; 340 341 dbenv = env->dbenv; 342 db_rep = env->rep_handle; 343 rep = db_rep->region; 344 dblp = env->lg_handle; 345 lp = dblp->reginfo.primary; 346 ret = 0; 347 348#if defined(DEBUG_ROP) || defined(DEBUG_WOP) 349 if (db_rep->send == NULL) 350 return (0); 351#endif 352 353 /* Set up control structure. */ 354 memset(&cntrl, 0, sizeof(cntrl)); 355 memset(&ocntrl, 0, sizeof(ocntrl)); 356 memset(&cntrl46, 0, sizeof(cntrl46)); 357 if (lsnp == NULL) 358 ZERO_LSN(cntrl.lsn); 359 else 360 cntrl.lsn = *lsnp; 361 /* 362 * Set the rectype based on the version we need to speak. 363 */ 364 if (rep->version == DB_REPVERSION) 365 cntrl.rectype = rtype; 366 else if (rep->version < DB_REPVERSION) { 367 cntrl.rectype = __rep_msg_to_old(rep->version, rtype); 368 RPRINT(env, DB_VERB_REP_MSGS, (env, 369 "rep_send_msg: rtype %lu to version %lu record %lu.", 370 (u_long)rtype, (u_long)rep->version, 371 (u_long)cntrl.rectype)); 372 if (cntrl.rectype == REP_INVALID) 373 return (ret); 374 } else { 375 __db_errx(env, 376 "rep_send_message: Unknown rep version %lu, my version %lu", 377 (u_long)rep->version, (u_long)DB_REPVERSION); 378 return (__env_panic(env, EINVAL)); 379 } 380 cntrl.flags = ctlflags; 381 cntrl.rep_version = rep->version; 382 cntrl.log_version = lp->persist.version; 383 cntrl.gen = rep->gen; 384 385 /* Don't assume the send function will be tolerant of NULL records. */ 386 if (dbt == NULL) { 387 memset(&scrap_dbt, 0, sizeof(DBT)); 388 dbt = &scrap_dbt; 389 } 390 391 /* 392 * There are several types of records: commit and checkpoint records 393 * that affect database durability, regular log records that might 394 * be buffered on the master before being transmitted, and control 395 * messages which don't require the guarantees of permanency, but 396 * should not be buffered. 397 * 398 * There are request records that can be sent anywhere, and there 399 * are rerequest records that the app might want to send to the master. 400 */ 401 myflags = repflags; 402 if (FLD_ISSET(ctlflags, REPCTL_PERM)) 403 myflags |= DB_REP_PERMANENT; 404 else if (rtype != REP_LOG || FLD_ISSET(ctlflags, REPCTL_RESEND)) 405 myflags |= DB_REP_NOBUFFER; 406 if (rtype == REP_LOG && !FLD_ISSET(ctlflags, REPCTL_PERM)) { 407 /* 408 * Check if this is a log record we just read that 409 * may need a REPCTL_PERM. This is of type REP_LOG, 410 * so we know that dbt is a log record. 411 */ 412 LOGCOPY_32(env, &rectype, dbt->data); 413 if (rectype == DB___txn_regop || rectype == DB___txn_ckp) 414 F_SET(&cntrl, REPCTL_PERM); 415 } 416 417 /* 418 * Let everyone know if we've been in an established group. 419 */ 420 if (F_ISSET(rep, REP_F_GROUP_ESTD)) 421 F_SET(&cntrl, REPCTL_GROUP_ESTD); 422 423 /* 424 * We're sending messages to some other version. We cannot 425 * assume DB_REP_ANYWHERE is available. Turn it off. 426 */ 427 if (rep->version != DB_REPVERSION) 428 FLD_CLR(myflags, DB_REP_ANYWHERE); 429 430 /* 431 * If we are a master sending a perm record, then set the 432 * REPCTL_LEASE flag to have the client reply. Also set 433 * the start time that the client will echo back to us. 434 * 435 * !!! If we are a master, using leases, we had better not be 436 * sending to an older version. 437 */ 438 if (IS_REP_MASTER(env) && IS_USING_LEASES(env) && 439 FLD_ISSET(ctlflags, REPCTL_PERM)) { 440 F_SET(&cntrl, REPCTL_LEASE); 441 DB_ASSERT(env, rep->version == DB_REPVERSION); 442 __os_gettime(env, &msg_time, 1); 443 cntrl.msg_sec = (u_int32_t)msg_time.tv_sec; 444 cntrl.msg_nsec = (u_int32_t)msg_time.tv_nsec; 445 } 446 447 REP_PRINT_MESSAGE(env, eid, &cntrl, "rep_send_message", myflags); 448#ifdef REP_DIAGNOSTIC 449 if (FLD_ISSET( 450 env->dbenv->verbose, DB_VERB_REP_MSGS) && rtype == REP_LOG) 451 __rep_print_logmsg(env, dbt, lsnp); 452#endif 453 454 /* 455 * If DB_REP_PERMANENT is set, the LSN better be non-zero. 456 */ 457 DB_ASSERT(env, !FLD_ISSET(myflags, DB_REP_PERMANENT) || 458 !IS_ZERO_LSN(cntrl.lsn)); 459 460 /* 461 * If we're talking to an old version, send an old control structure. 462 */ 463 memset(&cdbt, 0, sizeof(cdbt)); 464 if (rep->version <= DB_REPVERSION_45) { 465 if (rep->version == DB_REPVERSION_45 && 466 F_ISSET(&cntrl, REPCTL_INIT)) { 467 F_CLR(&cntrl, REPCTL_INIT); 468 F_SET(&cntrl, REPCTL_INIT_45); 469 } 470 ocntrl.rep_version = cntrl.rep_version; 471 ocntrl.log_version = cntrl.log_version; 472 ocntrl.lsn = cntrl.lsn; 473 ocntrl.rectype = cntrl.rectype; 474 ocntrl.gen = cntrl.gen; 475 ocntrl.flags = cntrl.flags; 476 cdbt.data = &ocntrl; 477 cdbt.size = sizeof(ocntrl); 478 } else if (rep->version == DB_REPVERSION_46) { 479 cntrl46.rep_version = cntrl.rep_version; 480 cntrl46.log_version = cntrl.log_version; 481 cntrl46.lsn = cntrl.lsn; 482 cntrl46.rectype = cntrl.rectype; 483 cntrl46.gen = cntrl.gen; 484 cntrl46.msg_time.tv_sec = (time_t)cntrl.msg_sec; 485 cntrl46.msg_time.tv_nsec = (long)cntrl.msg_nsec; 486 cntrl46.flags = cntrl.flags; 487 cdbt.data = &cntrl46; 488 cdbt.size = sizeof(cntrl46); 489 } else { 490 (void)__rep_control_marshal(env, &cntrl, buf, 491 __REP_CONTROL_SIZE, &len); 492 DB_INIT_DBT(cdbt, buf, len); 493 } 494 495 /* 496 * We set the LSN above to something valid. Give the master the 497 * actual LSN so that they can coordinate with permanent records from 498 * the client if they want to. 499 * 500 * !!! Even though we marshalled the control message for transmission, 501 * give the transport function the real LSN. 502 */ 503 ret = db_rep->send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags); 504 505 /* 506 * We don't hold the rep lock, so this could miscount if we race. 507 * I don't think it's worth grabbing the mutex for that bit of 508 * extra accuracy. 509 */ 510 if (ret != 0) { 511 RPRINT(env, DB_VERB_REP_MSGS, (env, 512 "rep_send_function returned: %d", ret)); 513#ifdef HAVE_STATISTICS 514 rep->stat.st_msgs_send_failures++; 515 } else 516 rep->stat.st_msgs_sent++; 517#else 518 } 519#endif 520 return (ret); 521} 522 523#ifdef REP_DIAGNOSTIC 524/* 525 * __rep_print_logmsg -- 526 * This is a debugging routine for printing out log records that 527 * we are about to transmit to a client. 528 */ 529static void 530__rep_print_logmsg(env, logdbt, lsnp) 531 ENV *env; 532 const DBT *logdbt; 533 DB_LSN *lsnp; 534{ 535 static int first = 1; 536 static DB_DISTAB dtab; 537 538 if (first) { 539 first = 0; 540 541 (void)__bam_init_print(env, &dtab); 542 (void)__crdel_init_print(env, &dtab); 543 (void)__db_init_print(env, &dtab); 544 (void)__dbreg_init_print(env, &dtab); 545 (void)__fop_init_print(env, &dtab); 546 (void)__ham_init_print(env, &dtab); 547 (void)__qam_init_print(env, &dtab); 548 (void)__txn_init_print(env, &dtab); 549 } 550 551 (void)__db_dispatch( 552 env, &dtab, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL); 553} 554#endif 555 556/* 557 * __rep_new_master -- 558 * Called after a master election to sync back up with a new master. 559 * It's possible that we already know of this new master in which case 560 * we don't need to do anything. 561 * 562 * This is written assuming that this message came from the master; we 563 * need to enforce that in __rep_process_record, but right now, we have 564 * no way to identify the master. 565 * 566 * PUBLIC: int __rep_new_master __P((ENV *, __rep_control_args *, int)); 567 */ 568int 569__rep_new_master(env, cntrl, eid) 570 ENV *env; 571 __rep_control_args *cntrl; 572 int eid; 573{ 574 DBT dbt; 575 DB_ENV *dbenv; 576 DB_LOG *dblp; 577 DB_LOGC *logc; 578 DB_LSN first_lsn, lsn; 579 DB_REP *db_rep; 580 DB_THREAD_INFO *ip; 581 LOG *lp; 582 REGENV *renv; 583 REGINFO *infop; 584 REP *rep; 585 db_timeout_t lease_to; 586 u_int32_t unused; 587 int change, do_req, lockout, ret, t_ret; 588 589 dbenv = env->dbenv; 590 db_rep = env->rep_handle; 591 rep = db_rep->region; 592 dblp = env->lg_handle; 593 lp = dblp->reginfo.primary; 594 ret = 0; 595 logc = NULL; 596 lockout = 0; 597 REP_SYSTEM_LOCK(env); 598 change = rep->gen != cntrl->gen || rep->master_id != eid; 599 if (change) { 600 /* 601 * If we are already locking out others, we're either 602 * in the middle of sync-up recovery or internal init 603 * when this newmaster comes in (we also lockout in 604 * rep_start, but we cannot be racing that because we 605 * don't allow rep_proc_msg when rep_start is going on). 606 * 607 * If we were in the middle of an internal initialization 608 * and we've discovered a new master instead, clean up 609 * our old internal init information. We need to clean 610 * up any flags and unlock our lockout. 611 */ 612 if (F_ISSET(rep, REP_F_READY_MSG)) 613 goto lckout; 614 615 if ((ret = __rep_lockout_msg(env, rep, 1)) != 0) 616 goto errlck; 617 618 lockout = 1; 619 /* 620 * We must wait any remaining lease time before accepting 621 * this new master. This must be after the lockout above 622 * so that no new message can be processed and re-grant 623 * the lease out from under us. 624 */ 625 if (IS_USING_LEASES(env) && 626 ((lease_to = __rep_lease_waittime(env)) != 0)) { 627 REP_SYSTEM_UNLOCK(env); 628 __os_yield(env, 0, (u_long)lease_to); 629 REP_SYSTEM_LOCK(env); 630 } 631 632 if ((ret = __env_init_rec(env, cntrl->log_version)) != 0) 633 goto errlck; 634 635 REP_SYSTEM_UNLOCK(env); 636 637 MUTEX_LOCK(env, rep->mtx_clientdb); 638 __os_gettime(env, &lp->rcvd_ts, 1); 639 lp->wait_ts = rep->request_gap; 640 ZERO_LSN(lp->verify_lsn); 641 ZERO_LSN(lp->waiting_lsn); 642 ZERO_LSN(lp->max_wait_lsn); 643 /* 644 * Open if we need to, in preparation for the truncate 645 * we'll do in a moment. 646 */ 647 if (db_rep->rep_db == NULL && 648 (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) { 649 MUTEX_UNLOCK(env, rep->mtx_clientdb); 650 goto err; 651 } 652 653 REP_SYSTEM_LOCK(env); 654 if (F_ISSET(rep, REP_F_READY_API | REP_F_READY_OP)) { 655 ret = __rep_init_cleanup(env, rep, DB_FORCE); 656 /* 657 * Note that if an in-progress internal init was indeed 658 * "cleaned up", clearing these flags now will allow the 659 * application to see a completely empty database 660 * environment for a moment (until the master responds 661 * to our ALL_REQ). 662 */ 663 F_CLR(rep, REP_F_RECOVER_MASK); 664 } 665 MUTEX_UNLOCK(env, rep->mtx_clientdb); 666 if (ret != 0) { 667 /* TODO: consider add'l error recovery steps. */ 668 goto errlck; 669 } 670 ENV_GET_THREAD_INFO(env, ip); 671 if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &unused)) 672 != 0) 673 goto errlck; 674 675 /* 676 * This needs to be performed under message lockout 677 * if we're actually changing master. 678 */ 679 __rep_elect_done(env, rep, 1); 680 RPRINT(env, DB_VERB_REP_MISC, (env, 681 "Updating gen from %lu to %lu from master %d", 682 (u_long)rep->gen, (u_long)cntrl->gen, eid)); 683 rep->gen = cntrl->gen; 684 (void)__rep_write_gen(env, rep->gen); 685 if (rep->egen <= rep->gen) 686 rep->egen = rep->gen + 1; 687 rep->master_id = eid; 688 STAT(rep->stat.st_master_changes++); 689 rep->stat.st_startup_complete = 0; 690 __log_set_version(env, cntrl->log_version); 691 rep->version = cntrl->rep_version; 692 RPRINT(env, DB_VERB_REP_MISC, (env, 693 "egen: %lu. rep version %lu", 694 (u_long)rep->egen, (u_long)rep->version)); 695 696 /* 697 * If we're delaying client sync-up, we know we have a 698 * new/changed master now, set flag indicating we are 699 * actively delaying. 700 */ 701 if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT)) 702 F_SET(rep, REP_F_DELAY); 703 F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER_VERIFY); 704 F_CLR(rep, REP_F_READY_MSG); 705 lockout = 0; 706 } else 707 __rep_elect_done(env, rep, 1); 708 REP_SYSTEM_UNLOCK(env); 709 710 MUTEX_LOCK(env, rep->mtx_clientdb); 711 lsn = lp->ready_lsn; 712 713 if (!change) { 714 ret = 0; 715 do_req = __rep_check_doreq(env, rep); 716 MUTEX_UNLOCK(env, rep->mtx_clientdb); 717 /* 718 * If there wasn't a change, we might still have some 719 * catching up or verification to do. 720 */ 721 if (do_req && 722 (F_ISSET(rep, REP_F_RECOVER_MASK) || 723 LOG_COMPARE(&lsn, &cntrl->lsn) < 0)) { 724 ret = __rep_resend_req(env, 0); 725 if (ret != 0) 726 RPRINT(env, DB_VERB_REP_MISC, (env, 727 "resend_req ret is %lu", (u_long)ret)); 728 } 729 /* 730 * If we're not in one of the recovery modes, we need to 731 * clear the NOARCHIVE flag. Elections set NOARCHIVE 732 * and if we called an election and found the same 733 * master, we need to clear NOARCHIVE here. 734 */ 735 if (!F_ISSET(rep, REP_F_RECOVER_MASK)) { 736 REP_SYSTEM_LOCK(env); 737 F_CLR(rep, REP_F_NOARCHIVE); 738 REP_SYSTEM_UNLOCK(env); 739 } 740 return (ret); 741 } 742 MUTEX_UNLOCK(env, rep->mtx_clientdb); 743 744 /* 745 * If the master changed, we need to start the process of 746 * figuring out what our last valid log record is. However, 747 * if both the master and we agree that the max LSN is 0,0, 748 * then there is no recovery to be done. If we are at 0 and 749 * the master is not, then we just need to request all the log 750 * records from the master. 751 */ 752 if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) { 753 if ((ret = __rep_newmaster_empty(env, eid)) != 0) 754 goto err; 755 (void)__memp_set_config(dbenv, DB_MEMP_SYNC_INTERRUPT, 0); 756 return (DB_REP_NEWMASTER); 757 } 758 759 memset(&dbt, 0, sizeof(dbt)); 760 /* 761 * If this client is farther ahead on the log file than the master, see 762 * if there is any overlap in the logs. If not, the client is too 763 * far ahead of the master and we cannot determine they're part of 764 * the same replication group. 765 */ 766 if (cntrl->lsn.file < lsn.file) { 767 if ((ret = __log_cursor(env, &logc)) != 0) 768 goto err; 769 ret = __logc_get(logc, &first_lsn, &dbt, DB_FIRST); 770 if ((t_ret = __logc_close(logc)) != 0 && ret == 0) 771 ret = t_ret; 772 if (ret == DB_NOTFOUND) 773 goto notfound; 774 else if (ret != 0) 775 goto err; 776 if (cntrl->lsn.file < first_lsn.file) 777 goto notfound; 778 } 779 if ((ret = __log_cursor(env, &logc)) != 0) 780 goto err; 781 ret = __rep_log_backup(env, rep, logc, &lsn); 782 if ((t_ret = __logc_close(logc)) != 0 && ret == 0) 783 ret = t_ret; 784 if (ret == DB_NOTFOUND) 785 goto notfound; 786 else if (ret != 0) 787 goto err; 788 789 /* 790 * Finally, we have a record to ask for. 791 */ 792 MUTEX_LOCK(env, rep->mtx_clientdb); 793 lp->verify_lsn = lsn; 794 __os_gettime(env, &lp->rcvd_ts, 1); 795 lp->wait_ts = rep->request_gap; 796 MUTEX_UNLOCK(env, rep->mtx_clientdb); 797 if (!F_ISSET(rep, REP_F_DELAY)) 798 (void)__rep_send_message(env, 799 eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE); 800 801 (void)__memp_set_config(dbenv, DB_MEMP_SYNC_INTERRUPT, 0); 802 return (DB_REP_NEWMASTER); 803 804err: /* 805 * If we failed, we need to clear the flags we may have set above 806 * because we're not going to be setting the verify_lsn. 807 */ 808 REP_SYSTEM_LOCK(env); 809errlck: if (lockout) 810 F_CLR(rep, REP_F_READY_MSG); 811 F_CLR(rep, REP_F_RECOVER_MASK | REP_F_DELAY); 812lckout: REP_SYSTEM_UNLOCK(env); 813 return (ret); 814 815notfound: 816 /* 817 * If we don't have an identification record, we still 818 * might have some log records but we're discarding them 819 * to sync up with the master from the start. 820 * Therefore, truncate our log and treat it as if it 821 * were empty. In-memory logs can't be completely 822 * zeroed using __log_vtruncate, so just zero them out. 823 */ 824 if (lp->db_log_inmemory) 825 ZERO_LSN(lsn); 826 else 827 INIT_LSN(lsn); 828 RPRINT(env, DB_VERB_REP_MISC, 829 (env, "No commit or ckp found. Truncate log.")); 830 ret = lp->db_log_inmemory ? 831 __log_zero(env, &lsn) : 832 __log_vtruncate(env, &lsn, &lsn, NULL); 833 if (ret != 0 && ret != DB_NOTFOUND) 834 return (ret); 835 infop = env->reginfo; 836 renv = infop->primary; 837 REP_SYSTEM_LOCK(env); 838 (void)time(&renv->rep_timestamp); 839 REP_SYSTEM_UNLOCK(env); 840 if ((ret = __rep_newmaster_empty(env, eid)) != 0) 841 goto err; 842 return (DB_REP_NEWMASTER); 843} 844 845/* 846 * __rep_newmaster_empty 847 * Handle the case of a NEWMASTER message received when we have an empty 848 * log. This requires internal init. If we can't do that because of 849 * NOAUTOINIT, return JOIN_FAILURE. If F_DELAY is in effect, don't even 850 * consider NOAUTOINIT yet, because they could change it before rep_sync call. 851 */ 852static int 853__rep_newmaster_empty(env, eid) 854 ENV *env; 855 int eid; 856{ 857 DB_REP *db_rep; 858 LOG *lp; 859 REP *rep; 860 int msg, ret; 861 862 db_rep = env->rep_handle; 863 rep = db_rep->region; 864 lp = env->lg_handle->reginfo.primary; 865 msg = ret = 0; 866 867 MUTEX_LOCK(env, rep->mtx_clientdb); 868 REP_SYSTEM_LOCK(env); 869 lp->wait_ts = rep->request_gap; 870 871 /* Usual case is to skip to UPDATE state; we may revise this below. */ 872 F_CLR(rep, REP_F_RECOVER_VERIFY); 873 F_SET(rep, REP_F_RECOVER_UPDATE); 874 875 if (F_ISSET(rep, REP_F_DELAY)) { 876 /* 877 * Having properly set up wait_ts for later, nothing more to 878 * do now. 879 */ 880 } else if (FLD_ISSET(rep->config, REP_C_NOAUTOINIT)) { 881 F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK); 882 ret = DB_REP_JOIN_FAILURE; 883 } else { 884 /* Normal case: neither DELAY nor NOAUTOINIT. */ 885 msg = 1; 886 } 887 REP_SYSTEM_UNLOCK(env); 888 MUTEX_UNLOCK(env, rep->mtx_clientdb); 889 890 if (msg) 891 (void)__rep_send_message(env, eid, REP_UPDATE_REQ, 892 NULL, NULL, 0, 0); 893 return (ret); 894} 895 896/* 897 * __rep_noarchive 898 * Used by log_archive to determine if it is okay to remove 899 * log files. 900 * 901 * PUBLIC: int __rep_noarchive __P((ENV *)); 902 */ 903int 904__rep_noarchive(env) 905 ENV *env; 906{ 907 DB_REP *db_rep; 908 REGENV *renv; 909 REGINFO *infop; 910 REP *rep; 911 time_t timestamp; 912 913 infop = env->reginfo; 914 renv = infop->primary; 915 916 /* 917 * This is tested before REP_ON below because we always need 918 * to obey if any replication process has disabled archiving. 919 * Everything is in the environment region that we need here. 920 */ 921 if (F_ISSET(renv, DB_REGENV_REPLOCKED)) { 922 (void)time(×tamp); 923 TIMESTAMP_CHECK(env, timestamp, renv); 924 /* 925 * Check if we're still locked out after checking 926 * the timestamp. 927 */ 928 if (F_ISSET(renv, DB_REGENV_REPLOCKED)) 929 return (EINVAL); 930 } 931 932 if (!REP_ON(env)) 933 return (0); 934 935 db_rep = env->rep_handle; 936 rep = db_rep->region; 937 return (F_ISSET(rep, REP_F_NOARCHIVE) ? 1 : 0); 938} 939 940/* 941 * __rep_send_vote 942 * Send this site's vote for the election. 943 * 944 * PUBLIC: void __rep_send_vote __P((ENV *, DB_LSN *, u_int32_t, u_int32_t, 945 * PUBLIC: u_int32_t, u_int32_t, u_int32_t, int, u_int32_t, u_int32_t)); 946 */ 947void 948__rep_send_vote(env, lsnp, nsites, nvotes, pri, tie, egen, eid, vtype, flags) 949 ENV *env; 950 DB_LSN *lsnp; 951 int eid; 952 u_int32_t nsites, nvotes, pri; 953 u_int32_t flags, egen, tie, vtype; 954{ 955 DB_REP *db_rep; 956 DBT vote_dbt; 957 REP *rep; 958 REP_OLD_VOTE_INFO ovi; 959 __rep_vote_info_args vi; 960 u_int8_t buf[__REP_VOTE_INFO_SIZE]; 961 size_t len; 962 963 db_rep = env->rep_handle; 964 rep = db_rep->region; 965 966 memset(&vi, 0, sizeof(vi)); 967 memset(&vote_dbt, 0, sizeof(vote_dbt)); 968 969 /* 970 * In 4.7 we went to fixed sized fields. They may not be 971 * the same as the sizes in older versions. 972 */ 973 if (rep->version < DB_REPVERSION_47) { 974 memset(&ovi, 0, sizeof(ovi)); 975 ovi.egen = egen; 976 ovi.priority = (int) pri; 977 ovi.nsites = (int) nsites; 978 ovi.nvotes = (int) nvotes; 979 ovi.tiebreaker = tie; 980 vote_dbt.data = &ovi; 981 vote_dbt.size = sizeof(ovi); 982 } else { 983 vi.egen = egen; 984 vi.priority = pri; 985 vi.nsites = nsites; 986 vi.nvotes = nvotes; 987 vi.tiebreaker = tie; 988 (void)__rep_vote_info_marshal(env, &vi, buf, 989 __REP_VOTE_INFO_SIZE, &len); 990 DB_INIT_DBT(vote_dbt, buf, len); 991 } 992 993 (void)__rep_send_message(env, eid, vtype, lsnp, &vote_dbt, flags, 0); 994} 995 996/* 997 * __rep_elect_done 998 * Clear all election information for this site. Assumes the 999 * caller hold the region mutex. 1000 * 1001 * PUBLIC: void __rep_elect_done __P((ENV *, REP *, int)); 1002 */ 1003void 1004__rep_elect_done(env, rep, found_master) 1005 ENV *env; 1006 REP *rep; 1007 int found_master; 1008{ 1009 int inelect; 1010 db_timespec endtime; 1011 1012 inelect = IN_ELECTION(rep); 1013 F_CLR(rep, 1014 REP_F_EPHASE0 | REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY); 1015 /* 1016 * Finding a master trumps finding a new egen. 1017 */ 1018 if (found_master) 1019 F_CLR(rep, REP_F_EGENUPDATE); 1020 rep->sites = 0; 1021 rep->votes = 0; 1022 if (inelect) { 1023 if (timespecisset(&rep->etime)) { 1024 __os_gettime(env, &endtime, 1); 1025 timespecsub(&endtime, &rep->etime); 1026#ifdef HAVE_STATISTICS 1027 rep->stat.st_election_sec = (u_int32_t)endtime.tv_sec; 1028 rep->stat.st_election_usec = (u_int32_t) 1029 (endtime.tv_nsec / NS_PER_US); 1030#endif 1031 RPRINT(env, DB_VERB_REP_ELECT, (env, 1032 "Election finished in %lu.%09lu sec", 1033 (u_long)endtime.tv_sec, (u_long)endtime.tv_nsec)); 1034 timespecclear(&rep->etime); 1035 } 1036 rep->egen++; 1037 } 1038 RPRINT(env, DB_VERB_REP_ELECT, 1039 (env, "Election done; egen %lu", (u_long)rep->egen)); 1040} 1041 1042/* 1043 * __rep_grow_sites -- 1044 * Called to allocate more space in the election tally information. 1045 * Called with the rep mutex held. We need to call the region mutex, so 1046 * we need to make sure that we *never* acquire those mutexes in the 1047 * opposite order. 1048 * 1049 * PUBLIC: int __rep_grow_sites __P((ENV *, u_int32_t)); 1050 */ 1051int 1052__rep_grow_sites(env, nsites) 1053 ENV *env; 1054 u_int32_t nsites; 1055{ 1056 REGENV *renv; 1057 REGINFO *infop; 1058 REP *rep; 1059 int ret, *tally; 1060 u_int32_t nalloc; 1061 1062 rep = env->rep_handle->region; 1063 1064 /* 1065 * Allocate either twice the current allocation or nsites, 1066 * whichever is more. 1067 */ 1068 nalloc = 2 * rep->asites; 1069 if (nalloc < nsites) 1070 nalloc = nsites; 1071 1072 infop = env->reginfo; 1073 renv = infop->primary; 1074 MUTEX_LOCK(env, renv->mtx_regenv); 1075 1076 /* 1077 * We allocate 2 tally regions, one for tallying VOTE1's and 1078 * one for VOTE2's. Always grow them in tandem, because if we 1079 * get more VOTE1's we'll always expect more VOTE2's then too. 1080 */ 1081 if ((ret = __env_alloc(infop, 1082 (size_t)nalloc * sizeof(REP_VTALLY), &tally)) == 0) { 1083 if (rep->tally_off != INVALID_ROFF) 1084 __env_alloc_free( 1085 infop, R_ADDR(infop, rep->tally_off)); 1086 rep->tally_off = R_OFFSET(infop, tally); 1087 if ((ret = __env_alloc(infop, 1088 (size_t)nalloc * sizeof(REP_VTALLY), &tally)) == 0) { 1089 /* Success */ 1090 if (rep->v2tally_off != INVALID_ROFF) 1091 __env_alloc_free(infop, 1092 R_ADDR(infop, rep->v2tally_off)); 1093 rep->v2tally_off = R_OFFSET(infop, tally); 1094 rep->asites = nalloc; 1095 rep->nsites = nsites; 1096 } else { 1097 /* 1098 * We were unable to allocate both. So, we must 1099 * free the first one and reinitialize. If 1100 * v2tally_off is valid, it is from an old 1101 * allocation and we are clearing it all out due 1102 * to the error. 1103 */ 1104 if (rep->v2tally_off != INVALID_ROFF) 1105 __env_alloc_free(infop, 1106 R_ADDR(infop, rep->v2tally_off)); 1107 __env_alloc_free(infop, 1108 R_ADDR(infop, rep->tally_off)); 1109 rep->v2tally_off = rep->tally_off = INVALID_ROFF; 1110 rep->asites = 0; 1111 rep->nsites = 0; 1112 } 1113 } 1114 MUTEX_UNLOCK(env, renv->mtx_regenv); 1115 return (ret); 1116} 1117 1118/* 1119 * __env_rep_enter -- 1120 * 1121 * Check if we are in the middle of replication initialization and/or 1122 * recovery, and if so, disallow operations. If operations are allowed, 1123 * increment handle-counts, so that we do not start recovery while we 1124 * are operating in the library. 1125 * 1126 * PUBLIC: int __env_rep_enter __P((ENV *, int)); 1127 */ 1128int 1129__env_rep_enter(env, checklock) 1130 ENV *env; 1131 int checklock; 1132{ 1133 DB_REP *db_rep; 1134 REGENV *renv; 1135 REGINFO *infop; 1136 REP *rep; 1137 int cnt; 1138 time_t timestamp; 1139 1140 /* Check if locks have been globally turned off. */ 1141 if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) 1142 return (0); 1143 1144 db_rep = env->rep_handle; 1145 rep = db_rep->region; 1146 1147 infop = env->reginfo; 1148 renv = infop->primary; 1149 if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) { 1150 (void)time(×tamp); 1151 TIMESTAMP_CHECK(env, timestamp, renv); 1152 /* 1153 * Check if we're still locked out after checking 1154 * the timestamp. 1155 */ 1156 if (F_ISSET(renv, DB_REGENV_REPLOCKED)) 1157 return (EINVAL); 1158 } 1159 1160 REP_SYSTEM_LOCK(env); 1161 for (cnt = 0; F_ISSET(rep, REP_F_READY_API);) { 1162 REP_SYSTEM_UNLOCK(env); 1163 if (FLD_ISSET(rep->config, REP_C_NOWAIT)) { 1164 __db_errx(env, 1165 "Operation locked out. Waiting for replication lockout to complete"); 1166 return (DB_REP_LOCKOUT); 1167 } 1168 __os_yield(env, 1, 0); 1169 REP_SYSTEM_LOCK(env); 1170 if (++cnt % 60 == 0) 1171 __db_errx(env, 1172 "DB_ENV handle waiting %d minutes for replication lockout to complete", 1173 cnt / 60); 1174 } 1175 rep->handle_cnt++; 1176 REP_SYSTEM_UNLOCK(env); 1177 1178 return (0); 1179} 1180 1181/* 1182 * __env_db_rep_exit -- 1183 * 1184 * Decrement handle count upon routine exit. 1185 * 1186 * PUBLIC: int __env_db_rep_exit __P((ENV *)); 1187 */ 1188int 1189__env_db_rep_exit(env) 1190 ENV *env; 1191{ 1192 DB_REP *db_rep; 1193 REP *rep; 1194 1195 /* Check if locks have been globally turned off. */ 1196 if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) 1197 return (0); 1198 1199 db_rep = env->rep_handle; 1200 rep = db_rep->region; 1201 1202 REP_SYSTEM_LOCK(env); 1203 rep->handle_cnt--; 1204 REP_SYSTEM_UNLOCK(env); 1205 1206 return (0); 1207} 1208 1209/* 1210 * __db_rep_enter -- 1211 * Called in replicated environments to keep track of in-use handles 1212 * and prevent any concurrent operation during recovery. If checkgen is 1213 * non-zero, then we verify that the dbp has the same handle as the env. 1214 * 1215 * If return_now is non-zero, we'll return DB_DEADLOCK immediately, else we'll 1216 * sleep before returning DB_DEADLOCK. Without the sleep, it is likely 1217 * the application will immediately try again and could reach a retry 1218 * limit before replication has a chance to finish. The sleep increases 1219 * the probability that an application retry will succeed. 1220 * 1221 * PUBLIC: int __db_rep_enter __P((DB *, int, int, int)); 1222 */ 1223int 1224__db_rep_enter(dbp, checkgen, checklock, return_now) 1225 DB *dbp; 1226 int checkgen, checklock, return_now; 1227{ 1228 DB_REP *db_rep; 1229 ENV *env; 1230 REGENV *renv; 1231 REGINFO *infop; 1232 REP *rep; 1233 time_t timestamp; 1234 1235 env = dbp->env; 1236 /* Check if locks have been globally turned off. */ 1237 if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) 1238 return (0); 1239 1240 db_rep = env->rep_handle; 1241 rep = db_rep->region; 1242 infop = env->reginfo; 1243 renv = infop->primary; 1244 1245 if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) { 1246 (void)time(×tamp); 1247 TIMESTAMP_CHECK(env, timestamp, renv); 1248 /* 1249 * Check if we're still locked out after checking 1250 * the timestamp. 1251 */ 1252 if (F_ISSET(renv, DB_REGENV_REPLOCKED)) 1253 return (EINVAL); 1254 } 1255 REP_SYSTEM_LOCK(env); 1256 /* 1257 * !!! 1258 * Note, we are checking REP_F_READY_OP, but we are 1259 * incrementing rep->handle_cnt. That seems like a mismatch, 1260 * but the intention is to return DEADLOCK to the application 1261 * which will cause them to abort the txn quickly and allow 1262 * the lockout to proceed. 1263 * 1264 * The correctness of doing this depends on the fact that 1265 * lockout of the API always sets REP_F_READY_OP first. 1266 */ 1267 if (F_ISSET(rep, REP_F_READY_OP)) { 1268 REP_SYSTEM_UNLOCK(env); 1269 if (!return_now) 1270 __os_yield(env, 5, 0); 1271 return (DB_LOCK_DEADLOCK); 1272 } 1273 1274 if (checkgen && dbp->timestamp != renv->rep_timestamp) { 1275 REP_SYSTEM_UNLOCK(env); 1276 __db_errx(env, "%s %s", 1277 "replication recovery unrolled committed transactions;", 1278 "open DB and DBcursor handles must be closed"); 1279 return (DB_REP_HANDLE_DEAD); 1280 } 1281 rep->handle_cnt++; 1282 REP_SYSTEM_UNLOCK(env); 1283 1284 return (0); 1285} 1286 1287/* 1288 * __op_rep_enter -- 1289 * 1290 * Check if we are in the middle of replication initialization and/or 1291 * recovery, and if so, disallow new multi-step operations, such as 1292 * transaction and memp gets. If operations are allowed, 1293 * increment the op_cnt, so that we do not start recovery while we have 1294 * active operations. 1295 * 1296 * PUBLIC: int __op_rep_enter __P((ENV *)); 1297 */ 1298int 1299__op_rep_enter(env) 1300 ENV *env; 1301{ 1302 DB_REP *db_rep; 1303 REP *rep; 1304 int cnt; 1305 1306 /* Check if locks have been globally turned off. */ 1307 if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) 1308 return (0); 1309 1310 db_rep = env->rep_handle; 1311 rep = db_rep->region; 1312 1313 REP_SYSTEM_LOCK(env); 1314 for (cnt = 0; F_ISSET(rep, REP_F_READY_OP);) { 1315 REP_SYSTEM_UNLOCK(env); 1316 if (FLD_ISSET(rep->config, REP_C_NOWAIT)) { 1317 __db_errx(env, 1318 "Operation locked out. Waiting for replication lockout to complete"); 1319 return (DB_REP_LOCKOUT); 1320 } 1321 __os_yield(env, 5, 0); 1322 cnt += 5; 1323 REP_SYSTEM_LOCK(env); 1324 if (cnt % 60 == 0) 1325 __db_errx(env, 1326 "__op_rep_enter waiting %d minutes for lockout to complete", 1327 cnt / 60); 1328 } 1329 rep->op_cnt++; 1330 REP_SYSTEM_UNLOCK(env); 1331 1332 return (0); 1333} 1334 1335/* 1336 * __op_rep_exit -- 1337 * 1338 * Decrement op count upon transaction commit/abort/discard or 1339 * memp_fput. 1340 * 1341 * PUBLIC: int __op_rep_exit __P((ENV *)); 1342 */ 1343int 1344__op_rep_exit(env) 1345 ENV *env; 1346{ 1347 DB_REP *db_rep; 1348 REP *rep; 1349 1350 /* Check if locks have been globally turned off. */ 1351 if (F_ISSET(env->dbenv, DB_ENV_NOLOCKING)) 1352 return (0); 1353 1354 db_rep = env->rep_handle; 1355 rep = db_rep->region; 1356 1357 REP_SYSTEM_LOCK(env); 1358 DB_ASSERT(env, rep->op_cnt > 0); 1359 rep->op_cnt--; 1360 REP_SYSTEM_UNLOCK(env); 1361 1362 return (0); 1363} 1364 1365/* 1366 * __rep_lockout_api -- 1367 * Coordinate with other threads in the library and active txns so 1368 * that we can run single-threaded, for recovery or internal backup. 1369 * Assumes the caller holds the region mutex. 1370 * 1371 * PUBLIC: int __rep_lockout_api __P((ENV *, REP *)); 1372 */ 1373int 1374__rep_lockout_api(env, rep) 1375 ENV *env; 1376 REP *rep; 1377{ 1378 int ret; 1379 1380 /* 1381 * We must drain long-running operations first. We check 1382 * REP_F_READY_OP in __db_rep_enter in order to allow them 1383 * to abort existing txns quickly. Therefore, we must 1384 * always lockout REP_F_READY_OP first, then REP_F_READY_API. 1385 */ 1386 if ((ret = __rep_lockout_int(env, rep, &rep->op_cnt, 0, 1387 "op_cnt", REP_F_READY_OP)) != 0) 1388 return (ret); 1389 return (__rep_lockout_int(env, rep, &rep->handle_cnt, 0, 1390 "handle_cnt", REP_F_READY_API)); 1391} 1392 1393/* 1394 * __rep_lockout_apply -- 1395 * Coordinate with other threads processing messages so that 1396 * we can run single-threaded and know that no incoming 1397 * message can apply new log records. 1398 * This call should be short-term covering a specific critical 1399 * operation where we need to make sure no new records change 1400 * the log. Currently used to coordinate with elections. 1401 * Assumes the caller holds the region mutex. 1402 * 1403 * PUBLIC: int __rep_lockout_apply __P((ENV *, REP *, u_int32_t)); 1404 */ 1405int 1406__rep_lockout_apply(env, rep, apply_th) 1407 ENV *env; 1408 REP *rep; 1409 u_int32_t apply_th; 1410{ 1411 return (__rep_lockout_int(env, rep, &rep->apply_th, apply_th, 1412 "apply_th", REP_F_READY_APPLY)); 1413} 1414 1415/* 1416 * __rep_lockout_msg -- 1417 * Coordinate with other threads processing messages so that 1418 * we can run single-threaded and know that no incoming 1419 * message can change the world (i.e., like a NEWMASTER message). 1420 * This call should be short-term covering a specific critical 1421 * operation where we need to make sure no new messages arrive 1422 * in the middle and all message threads are out before we start it. 1423 * Assumes the caller holds the region mutex. 1424 * 1425 * PUBLIC: int __rep_lockout_msg __P((ENV *, REP *, u_int32_t)); 1426 */ 1427int 1428__rep_lockout_msg(env, rep, msg_th) 1429 ENV *env; 1430 REP *rep; 1431 u_int32_t msg_th; 1432{ 1433 return (__rep_lockout_int(env, rep, &rep->msg_th, msg_th, 1434 "msg_th", REP_F_READY_MSG)); 1435} 1436 1437/* 1438 * __rep_lockout_int -- 1439 * Internal common code for locking out and coordinating 1440 * with other areas of the code. 1441 * Assumes the caller holds the region mutex. 1442 * 1443 */ 1444static int 1445__rep_lockout_int(env, rep, fieldp, field_val, msg, lockout_flag) 1446 ENV *env; 1447 REP *rep; 1448 u_int32_t *fieldp; 1449 const char *msg; 1450 u_int32_t field_val, lockout_flag; 1451{ 1452 int wait_cnt; 1453 1454 F_SET(rep, lockout_flag); 1455 for (wait_cnt = 0; *fieldp > field_val;) { 1456 REP_SYSTEM_UNLOCK(env); 1457 __os_yield(env, 1, 0); 1458#ifdef DIAGNOSTIC 1459 if (wait_cnt == 5) 1460 __db_errx(env, 1461"Waiting for %s (%lu) to complete replication lockout", 1462 msg, (u_long)*fieldp); 1463 if (++wait_cnt % 60 == 0) 1464 __db_errx(env, 1465"Waiting for %s (%lu) to complete replication lockout for %d minutes", 1466 msg, (u_long)*fieldp, wait_cnt / 60); 1467#endif 1468 REP_SYSTEM_LOCK(env); 1469 } 1470 1471 COMPQUIET(msg, NULL); 1472 return (0); 1473} 1474 1475/* 1476 * __rep_send_throttle - 1477 * Send a record, throttling if necessary. Callers of this function 1478 * will throttle - breaking out of their loop, if the repth->type field 1479 * changes from the normal message type to the *_MORE message type. 1480 * This function will send the normal type unless throttling gets invoked. 1481 * Then it sets the type field and sends the _MORE message. 1482 * 1483 * Throttling is always only relevant in serving requests, so we always send 1484 * with REPCTL_RESEND. Additional desired flags can be passed in the ctlflags 1485 * argument. 1486 * 1487 * PUBLIC: int __rep_send_throttle __P((ENV *, int, REP_THROTTLE *, 1488 * PUBLIC: u_int32_t, u_int32_t)); 1489 */ 1490int 1491__rep_send_throttle(env, eid, repth, flags, ctlflags) 1492 ENV *env; 1493 int eid; 1494 REP_THROTTLE *repth; 1495 u_int32_t ctlflags, flags; 1496{ 1497 DB_REP *db_rep; 1498 REP *rep; 1499 u_int32_t size, typemore; 1500 int check_limit; 1501 1502 check_limit = repth->gbytes != 0 || repth->bytes != 0; 1503 /* 1504 * If we only want to do throttle processing and we don't have it 1505 * turned on, return immediately. 1506 */ 1507 if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY)) 1508 return (0); 1509 1510 db_rep = env->rep_handle; 1511 rep = db_rep->region; 1512 typemore = 0; 1513 if (repth->type == REP_LOG) 1514 typemore = REP_LOG_MORE; 1515 if (repth->type == REP_PAGE) 1516 typemore = REP_PAGE_MORE; 1517 DB_ASSERT(env, typemore != 0); 1518 1519 /* 1520 * data_dbt.size is only the size of the log 1521 * record; it doesn't count the size of the 1522 * control structure. Factor that in as well 1523 * so we're not off by a lot if our log records 1524 * are small. 1525 */ 1526 size = repth->data_dbt->size + sizeof(__rep_control_args); 1527 if (check_limit) { 1528 while (repth->bytes <= size) { 1529 if (repth->gbytes > 0) { 1530 repth->bytes += GIGABYTE; 1531 --(repth->gbytes); 1532 continue; 1533 } 1534 /* 1535 * We don't hold the rep mutex, 1536 * and may miscount. 1537 */ 1538 STAT(rep->stat.st_nthrottles++); 1539 repth->type = typemore; 1540 goto send; 1541 } 1542 repth->bytes -= size; 1543 } 1544 /* 1545 * Always send if it is typemore, otherwise send only if 1546 * REP_THROTTLE_ONLY is not set. 1547 * 1548 * NOTE: It is the responsibility of the caller to marshal, if 1549 * needed, the data_dbt. This function just sends what it is given. 1550 */ 1551send: if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) && 1552 (__rep_send_message(env, eid, repth->type, 1553 &repth->lsn, repth->data_dbt, (REPCTL_RESEND | ctlflags), 0) != 0)) 1554 return (DB_REP_UNAVAIL); 1555 return (0); 1556} 1557 1558/* 1559 * __rep_msg_to_old -- 1560 * Convert current message numbers to old message numbers. 1561 * 1562 * PUBLIC: u_int32_t __rep_msg_to_old __P((u_int32_t, u_int32_t)); 1563 */ 1564u_int32_t 1565__rep_msg_to_old(version, rectype) 1566 u_int32_t version, rectype; 1567{ 1568 /* 1569 * We need to convert from current message numbers to old numbers and 1570 * we need to convert from old numbers to current numbers. Offset by 1571 * one for more readable code. 1572 */ 1573 /* 1574 * Everything for version 0 is invalid, there is no version 0. 1575 */ 1576 static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = { 1577 /* There is no DB_REPVERSION 0. */ 1578 { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1579 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1580 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1581 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1582 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1583 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1584 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1585 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, 1586 /* 1587 * 4.2/DB_REPVERSION 1 no longer supported. 1588 */ 1589 { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1590 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1591 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1592 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1593 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1594 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1595 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1596 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, 1597 /* 1598 * 4.3/DB_REPVERSION 2 no longer supported. 1599 */ 1600 { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1601 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1602 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1603 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1604 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1605 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1606 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1607 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, 1608 /* 1609 * From 4.7 message number To 4.4/4.5 message number 1610 */ 1611 { REP_INVALID, /* NO message 0 */ 1612 1, /* REP_ALIVE */ 1613 2, /* REP_ALIVE_REQ */ 1614 3, /* REP_ALL_REQ */ 1615 4, /* REP_BULK_LOG */ 1616 5, /* REP_BULK_PAGE */ 1617 6, /* REP_DUPMASTER */ 1618 7, /* REP_FILE */ 1619 8, /* REP_FILE_FAIL */ 1620 9, /* REP_FILE_REQ */ 1621 REP_INVALID, /* REP_LEASE_GRANT */ 1622 10, /* REP_LOG */ 1623 11, /* REP_LOG_MORE */ 1624 12, /* REP_LOG_REQ */ 1625 13, /* REP_MASTER_REQ */ 1626 14, /* REP_NEWCLIENT */ 1627 15, /* REP_NEWFILE */ 1628 16, /* REP_NEWMASTER */ 1629 17, /* REP_NEWSITE */ 1630 18, /* REP_PAGE */ 1631 19, /* REP_PAGE_FAIL */ 1632 20, /* REP_PAGE_MORE */ 1633 21, /* REP_PAGE_REQ */ 1634 22, /* REP_REREQUEST */ 1635 REP_INVALID, /* REP_START_SYNC */ 1636 23, /* REP_UPDATE */ 1637 24, /* REP_UPDATE_REQ */ 1638 25, /* REP_VERIFY */ 1639 26, /* REP_VERIFY_FAIL */ 1640 27, /* REP_VERIFY_REQ */ 1641 28, /* REP_VOTE1 */ 1642 29 /* REP_VOTE2 */ 1643 }, 1644 /* 1645 * From 4.7 message number To 4.6 message number. There are 1646 * NO message differences between 4.6 and 4.7. The 1647 * control structure changed. 1648 */ 1649 { REP_INVALID, /* NO message 0 */ 1650 1, /* REP_ALIVE */ 1651 2, /* REP_ALIVE_REQ */ 1652 3, /* REP_ALL_REQ */ 1653 4, /* REP_BULK_LOG */ 1654 5, /* REP_BULK_PAGE */ 1655 6, /* REP_DUPMASTER */ 1656 7, /* REP_FILE */ 1657 8, /* REP_FILE_FAIL */ 1658 9, /* REP_FILE_REQ */ 1659 10, /* REP_LEASE_GRANT */ 1660 11, /* REP_LOG */ 1661 12, /* REP_LOG_MORE */ 1662 13, /* REP_LOG_REQ */ 1663 14, /* REP_MASTER_REQ */ 1664 15, /* REP_NEWCLIENT */ 1665 16, /* REP_NEWFILE */ 1666 17, /* REP_NEWMASTER */ 1667 18, /* REP_NEWSITE */ 1668 19, /* REP_PAGE */ 1669 20, /* REP_PAGE_FAIL */ 1670 21, /* REP_PAGE_MORE */ 1671 22, /* REP_PAGE_REQ */ 1672 23, /* REP_REREQUEST */ 1673 24, /* REP_START_SYNC */ 1674 25, /* REP_UPDATE */ 1675 26, /* REP_UPDATE_REQ */ 1676 27, /* REP_VERIFY */ 1677 28, /* REP_VERIFY_FAIL */ 1678 29, /* REP_VERIFY_REQ */ 1679 30, /* REP_VOTE1 */ 1680 31 /* REP_VOTE2 */ 1681 } 1682 }; 1683 return (table[version][rectype]); 1684} 1685 1686/* 1687 * __rep_msg_from_old -- 1688 * Convert old message numbers to current message numbers. 1689 * 1690 * PUBLIC: u_int32_t __rep_msg_from_old __P((u_int32_t, u_int32_t)); 1691 */ 1692u_int32_t 1693__rep_msg_from_old(version, rectype) 1694 u_int32_t version, rectype; 1695{ 1696 /* 1697 * We need to convert from current message numbers to old numbers and 1698 * we need to convert from old numbers to current numbers. Offset by 1699 * one for more readable code. 1700 */ 1701 /* 1702 * Everything for version 0 is invalid, there is no version 0. 1703 */ 1704 static const u_int32_t table[DB_REPVERSION][REP_MAX_MSG+1] = { 1705 /* There is no DB_REPVERSION 0. */ 1706 { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1707 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1708 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1709 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1710 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1711 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1712 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1713 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, 1714 /* 1715 * 4.2/DB_REPVERSION 1 no longer supported. 1716 */ 1717 { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1718 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1719 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1720 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1721 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1722 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1723 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1724 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, 1725 /* 1726 * 4.3/DB_REPVERSION 2 no longer supported. 1727 */ 1728 { REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1729 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1730 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1731 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1732 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1733 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1734 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID, 1735 REP_INVALID, REP_INVALID, REP_INVALID, REP_INVALID }, 1736 /* 1737 * From 4.4/4.5 message number To 4.7 message number 1738 */ 1739 { REP_INVALID, /* NO message 0 */ 1740 1, /* 1, REP_ALIVE */ 1741 2, /* 2, REP_ALIVE_REQ */ 1742 3, /* 3, REP_ALL_REQ */ 1743 4, /* 4, REP_BULK_LOG */ 1744 5, /* 5, REP_BULK_PAGE */ 1745 6, /* 6, REP_DUPMASTER */ 1746 7, /* 7, REP_FILE */ 1747 8, /* 8, REP_FILE_FAIL */ 1748 9, /* 9, REP_FILE_REQ */ 1749 /* 10, REP_LEASE_GRANT doesn't exist */ 1750 11, /* 10, REP_LOG */ 1751 12, /* 11, REP_LOG_MORE */ 1752 13, /* 12, REP_LOG_REQ */ 1753 14, /* 13, REP_MASTER_REQ */ 1754 15, /* 14, REP_NEWCLIENT */ 1755 16, /* 15, REP_NEWFILE */ 1756 17, /* 16, REP_NEWMASTER */ 1757 18, /* 17, REP_NEWSITE */ 1758 19, /* 18, REP_PAGE */ 1759 20, /* 19, REP_PAGE_FAIL */ 1760 21, /* 20, REP_PAGE_MORE */ 1761 22, /* 21, REP_PAGE_REQ */ 1762 23, /* 22, REP_REREQUEST */ 1763 /* 24, REP_START_SYNC doesn't exist */ 1764 25, /* 23, REP_UPDATE */ 1765 26, /* 24, REP_UPDATE_REQ */ 1766 27, /* 25, REP_VERIFY */ 1767 28, /* 26, REP_VERIFY_FAIL */ 1768 29, /* 27, REP_VERIFY_REQ */ 1769 30, /* 28, REP_VOTE1 */ 1770 31, /* 29, REP_VOTE2 */ 1771 REP_INVALID, /* 30, 4.4/4.5 no message */ 1772 REP_INVALID /* 31, 4.4/4.5 no message */ 1773 }, 1774 /* 1775 * From 4.6 message number To 4.6 message number. There are 1776 * NO message differences between 4.6 and 4.7. The 1777 * control structure changed. 1778 */ 1779 { REP_INVALID, /* NO message 0 */ 1780 1, /* 1, REP_ALIVE */ 1781 2, /* 2, REP_ALIVE_REQ */ 1782 3, /* 3, REP_ALL_REQ */ 1783 4, /* 4, REP_BULK_LOG */ 1784 5, /* 5, REP_BULK_PAGE */ 1785 6, /* 6, REP_DUPMASTER */ 1786 7, /* 7, REP_FILE */ 1787 8, /* 8, REP_FILE_FAIL */ 1788 9, /* 9, REP_FILE_REQ */ 1789 10, /* 10, REP_LEASE_GRANT */ 1790 11, /* 11, REP_LOG */ 1791 12, /* 12, REP_LOG_MORE */ 1792 13, /* 13, REP_LOG_REQ */ 1793 14, /* 14, REP_MASTER_REQ */ 1794 15, /* 15, REP_NEWCLIENT */ 1795 16, /* 16, REP_NEWFILE */ 1796 17, /* 17, REP_NEWMASTER */ 1797 18, /* 18, REP_NEWSITE */ 1798 19, /* 19, REP_PAGE */ 1799 20, /* 20, REP_PAGE_FAIL */ 1800 21, /* 21, REP_PAGE_MORE */ 1801 22, /* 22, REP_PAGE_REQ */ 1802 23, /* 22, REP_REREQUEST */ 1803 24, /* 24, REP_START_SYNC */ 1804 25, /* 25, REP_UPDATE */ 1805 26, /* 26, REP_UPDATE_REQ */ 1806 27, /* 27, REP_VERIFY */ 1807 28, /* 28, REP_VERIFY_FAIL */ 1808 29, /* 29, REP_VERIFY_REQ */ 1809 30, /* 30, REP_VOTE1 */ 1810 31 /* 31, REP_VOTE2 */ 1811 } 1812 }; 1813 return (table[version][rectype]); 1814} 1815 1816/* 1817 * __rep_print -- 1818 * Optionally print a verbose message. 1819 * 1820 * PUBLIC: void __rep_print __P((ENV *, const char *, ...)) 1821 * PUBLIC: __attribute__ ((__format__ (__printf__, 2, 3))); 1822 */ 1823void 1824#ifdef STDC_HEADERS 1825__rep_print(ENV *env, const char *fmt, ...) 1826#else 1827__rep_print(env, fmt, va_alist) 1828 ENV *env; 1829 const char *fmt; 1830 va_dcl 1831#endif 1832{ 1833 va_list ap; 1834 DB_MSGBUF mb; 1835 REP *rep; 1836 const char *s; 1837 1838 DB_MSGBUF_INIT(&mb); 1839 1840 s = NULL; 1841 if (env->dbenv->db_errpfx != NULL) 1842 s = env->dbenv->db_errpfx; 1843 else if (REP_ON(env)) { 1844 rep = env->rep_handle->region; 1845 if (F_ISSET(rep, REP_F_CLIENT)) 1846 s = "CLIENT"; 1847 else if (F_ISSET(rep, REP_F_MASTER)) 1848 s = "MASTER"; 1849 } 1850 if (s == NULL) 1851 s = "REP_UNDEF"; 1852 __db_msgadd(env, &mb, "%s: ", s); 1853 1854#ifdef STDC_HEADERS 1855 va_start(ap, fmt); 1856#else 1857 va_start(ap); 1858#endif 1859 __db_msgadd_ap(env, &mb, fmt, ap); 1860 va_end(ap); 1861 1862 DB_MSGBUF_FLUSH(env, &mb); 1863} 1864 1865/* 1866 * PUBLIC: void __rep_print_message 1867 * PUBLIC: __P((ENV *, int, __rep_control_args *, char *, u_int32_t)); 1868 */ 1869void 1870__rep_print_message(env, eid, rp, str, flags) 1871 ENV *env; 1872 int eid; 1873 __rep_control_args *rp; 1874 char *str; 1875 u_int32_t flags; 1876{ 1877 u_int32_t ctlflags, rectype; 1878 char ftype[64], *type; 1879 1880 rectype = rp->rectype; 1881 ctlflags = rp->flags; 1882 if (rp->rep_version != DB_REPVERSION) 1883 rectype = __rep_msg_from_old(rp->rep_version, rectype); 1884 switch (rectype) { 1885 case REP_ALIVE: 1886 type = "alive"; 1887 break; 1888 case REP_ALIVE_REQ: 1889 type = "alive_req"; 1890 break; 1891 case REP_ALL_REQ: 1892 type = "all_req"; 1893 break; 1894 case REP_BULK_LOG: 1895 type = "bulk_log"; 1896 break; 1897 case REP_BULK_PAGE: 1898 type = "bulk_page"; 1899 break; 1900 case REP_DUPMASTER: 1901 type = "dupmaster"; 1902 break; 1903 case REP_FILE: 1904 type = "file"; 1905 break; 1906 case REP_FILE_FAIL: 1907 type = "file_fail"; 1908 break; 1909 case REP_FILE_REQ: 1910 type = "file_req"; 1911 break; 1912 case REP_LEASE_GRANT: 1913 type = "lease_grant"; 1914 break; 1915 case REP_LOG: 1916 type = "log"; 1917 break; 1918 case REP_LOG_MORE: 1919 type = "log_more"; 1920 break; 1921 case REP_LOG_REQ: 1922 type = "log_req"; 1923 break; 1924 case REP_MASTER_REQ: 1925 type = "master_req"; 1926 break; 1927 case REP_NEWCLIENT: 1928 type = "newclient"; 1929 break; 1930 case REP_NEWFILE: 1931 type = "newfile"; 1932 break; 1933 case REP_NEWMASTER: 1934 type = "newmaster"; 1935 break; 1936 case REP_NEWSITE: 1937 type = "newsite"; 1938 break; 1939 case REP_PAGE: 1940 type = "page"; 1941 break; 1942 case REP_PAGE_FAIL: 1943 type = "page_fail"; 1944 break; 1945 case REP_PAGE_MORE: 1946 type = "page_more"; 1947 break; 1948 case REP_PAGE_REQ: 1949 type = "page_req"; 1950 break; 1951 case REP_REREQUEST: 1952 type = "rerequest"; 1953 break; 1954 case REP_START_SYNC: 1955 type = "start_sync"; 1956 break; 1957 case REP_UPDATE: 1958 type = "update"; 1959 break; 1960 case REP_UPDATE_REQ: 1961 type = "update_req"; 1962 break; 1963 case REP_VERIFY: 1964 type = "verify"; 1965 break; 1966 case REP_VERIFY_FAIL: 1967 type = "verify_fail"; 1968 break; 1969 case REP_VERIFY_REQ: 1970 type = "verify_req"; 1971 break; 1972 case REP_VOTE1: 1973 type = "vote1"; 1974 break; 1975 case REP_VOTE2: 1976 type = "vote2"; 1977 break; 1978 default: 1979 type = "NOTYPE"; 1980 break; 1981 } 1982 1983 /* 1984 * !!! 1985 * If adding new flags to print out make sure the aggregate 1986 * length cannot overflow the buffer. 1987 */ 1988 ftype[0] = '\0'; 1989 if (LF_ISSET(DB_REP_ANYWHERE)) 1990 (void)strcat(ftype, " any"); /* 4 */ 1991 if (FLD_ISSET(ctlflags, REPCTL_FLUSH)) 1992 (void)strcat(ftype, " flush"); /* 10 */ 1993 /* 1994 * We expect most of the time the messages will indicate 1995 * group membership. Only print if we're not already 1996 * part of a group. 1997 */ 1998 if (!FLD_ISSET(ctlflags, REPCTL_GROUP_ESTD)) 1999 (void)strcat(ftype, " nogroup"); /* 18 */ 2000 if (FLD_ISSET(ctlflags, REPCTL_LEASE)) 2001 (void)strcat(ftype, " lease"); /* 24 */ 2002 if (LF_ISSET(DB_REP_NOBUFFER)) 2003 (void)strcat(ftype, " nobuf"); /* 30 */ 2004 if (LF_ISSET(DB_REP_PERMANENT)) 2005 (void)strcat(ftype, " perm"); /* 35 */ 2006 if (LF_ISSET(DB_REP_REREQUEST)) 2007 (void)strcat(ftype, " rereq"); /* 41 */ 2008 if (FLD_ISSET(ctlflags, REPCTL_RESEND)) 2009 (void)strcat(ftype, " resend"); /* 48 */ 2010 if (FLD_ISSET(ctlflags, REPCTL_LOG_END)) 2011 (void)strcat(ftype, " logend"); /* 55 */ 2012 RPRINT(env, DB_VERB_REP_MSGS, 2013 (env, 2014 "%s %s: msgv = %lu logv %lu gen = %lu eid %d, type %s, LSN [%lu][%lu] %s", 2015 env->db_home, str, 2016 (u_long)rp->rep_version, (u_long)rp->log_version, (u_long)rp->gen, 2017 eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset, ftype)); 2018 /* 2019 * Make sure the version is close, and not swapped 2020 * here. Check for current version, +/- a little bit. 2021 */ 2022 DB_ASSERT(env, rp->rep_version <= DB_REPVERSION+10); 2023 DB_ASSERT(env, rp->log_version <= DB_LOGVERSION+10); 2024} 2025 2026/* 2027 * PUBLIC: void __rep_fire_event __P((ENV *, u_int32_t, void *)); 2028 */ 2029void 2030__rep_fire_event(env, event, info) 2031 ENV *env; 2032 u_int32_t event; 2033 void *info; 2034{ 2035 int ret; 2036 2037 /* 2038 * Give repmgr first crack at handling all replication-related events. 2039 * If it can't (or chooses not to) handle the event fully, then pass it 2040 * along to the application. 2041 */ 2042 ret = __repmgr_handle_event(env, event, info); 2043 DB_ASSERT(env, ret == 0 || ret == DB_EVENT_NOT_HANDLED); 2044 2045 if (ret == DB_EVENT_NOT_HANDLED) 2046 DB_EVENT(env, event, info); 2047} 2048