1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001,2008 Oracle. All rights reserved. 5 * 6 * $Id: rep_method.c,v 12.122 2008/04/28 18:18:36 sue Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/btree.h" 14#include "dbinc/log.h" 15#include "dbinc/mp.h" 16#include "dbinc/txn.h" 17 18static int __rep_abort_prepared __P((ENV *)); 19static int __rep_bt_cmp __P((DB *, const DBT *, const DBT *)); 20static void __rep_config_map __P((ENV *, u_int32_t *, u_int32_t *)); 21static u_int32_t __rep_conv_vers __P((ENV *, u_int32_t)); 22static int __rep_restore_prepared __P((ENV *)); 23 24/* 25 * __rep_env_create -- 26 * Replication-specific initialization of the ENV structure. 27 * 28 * PUBLIC: int __rep_env_create __P((DB_ENV *)); 29 */ 30int 31__rep_env_create(dbenv) 32 DB_ENV *dbenv; 33{ 34 DB_REP *db_rep; 35 ENV *env; 36 int ret; 37 38 env = dbenv->env; 39 40 if ((ret = __os_calloc(env, 1, sizeof(DB_REP), &db_rep)) != 0) 41 return (ret); 42 43 db_rep->eid = DB_EID_INVALID; 44 db_rep->bytes = REP_DEFAULT_THROTTLE; 45 DB_TIMEOUT_TO_TIMESPEC(DB_REP_REQUEST_GAP, &db_rep->request_gap); 46 DB_TIMEOUT_TO_TIMESPEC(DB_REP_MAX_GAP, &db_rep->max_gap); 47 db_rep->elect_timeout = 2 * US_PER_SEC; /* 2 seconds */ 48 db_rep->chkpt_delay = 30 * US_PER_SEC; /* 30 seconds */ 49 db_rep->my_priority = DB_REP_DEFAULT_PRIORITY; 50 /* 51 * Make no clock skew the default. Setting both fields 52 * to the same non-zero value means no skew. 53 */ 54 db_rep->clock_skew = 1; 55 db_rep->clock_base = 1; 56 57#ifdef HAVE_REPLICATION_THREADS 58 if ((ret = __repmgr_env_create(env, db_rep)) != 0) { 59 __os_free(env, db_rep); 60 return (ret); 61 } 62#endif 63 64 env->rep_handle = db_rep; 65 return (0); 66} 67 68/* 69 * __rep_env_destroy -- 70 * Replication-specific destruction of the ENV structure. 71 * 72 * PUBLIC: void __rep_env_destroy __P((DB_ENV *)); 73 */ 74void 75__rep_env_destroy(dbenv) 76 DB_ENV *dbenv; 77{ 78 ENV *env; 79 80 env = dbenv->env; 81 82 if (env->rep_handle != NULL) { 83#ifdef HAVE_REPLICATION_THREADS 84 __repmgr_env_destroy(env, env->rep_handle); 85#endif 86 __os_free(env, env->rep_handle); 87 env->rep_handle = NULL; 88 } 89} 90 91/* 92 * __rep_get_config -- 93 * Return the replication subsystem configuration. 94 * 95 * PUBLIC: int __rep_get_config __P((DB_ENV *, u_int32_t, int *)); 96 */ 97int 98__rep_get_config(dbenv, which, onp) 99 DB_ENV *dbenv; 100 u_int32_t which; 101 int *onp; 102{ 103 DB_REP *db_rep; 104 ENV *env; 105 REP *rep; 106 u_int32_t mapped; 107 108 env = dbenv->env; 109 110#undef OK_FLAGS 111#define OK_FLAGS \ 112 (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | \ 113 DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT) 114 115 if (FLD_ISSET(which, ~OK_FLAGS)) 116 return (__db_ferr(env, "DB_ENV->rep_get_config", 0)); 117 118 db_rep = env->rep_handle; 119 ENV_NOT_CONFIGURED( 120 env, db_rep->region, "DB_ENV->rep_get_config", DB_INIT_REP); 121 122 mapped = 0; 123 __rep_config_map(env, &which, &mapped); 124 if (REP_ON(env)) { 125 rep = db_rep->region; 126 if (FLD_ISSET(rep->config, mapped)) 127 *onp = 1; 128 else 129 *onp = 0; 130 } else { 131 if (FLD_ISSET(db_rep->config, mapped)) 132 *onp = 1; 133 else 134 *onp = 0; 135 } 136 return (0); 137} 138 139/* 140 * __rep_set_config -- 141 * Configure the replication subsystem. 142 * 143 * PUBLIC: int __rep_set_config __P((DB_ENV *, u_int32_t, int)); 144 */ 145int 146__rep_set_config(dbenv, which, on) 147 DB_ENV *dbenv; 148 u_int32_t which; 149 int on; 150{ 151 DB_LOG *dblp; 152 DB_REP *db_rep; 153 DB_THREAD_INFO *ip; 154 ENV *env; 155 LOG *lp; 156 REP *rep; 157 REP_BULK bulk; 158 u_int32_t mapped, orig; 159 int ret; 160 161 env = dbenv->env; 162 db_rep = env->rep_handle; 163 ret = 0; 164 165#undef OK_FLAGS 166#define OK_FLAGS \ 167 (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_LEASE | \ 168 DB_REP_CONF_NOAUTOINIT | DB_REP_CONF_NOWAIT | DB_REPMGR_CONF_2SITE_STRICT) 169 170 ENV_NOT_CONFIGURED( 171 env, db_rep->region, "DB_ENV->rep_set_config", DB_INIT_REP); 172 173 if (FLD_ISSET(which, ~OK_FLAGS)) 174 return (__db_ferr(env, "DB_ENV->rep_set_config", 0)); 175 176 mapped = 0; 177 __rep_config_map(env, &which, &mapped); 178 179 if (REP_ON(env)) { 180 ENV_ENTER(env, ip); 181 182 rep = db_rep->region; 183 /* 184 * Leases must be turned on before calling rep_start. 185 * Leases can never be turned off once they're turned on. 186 */ 187 if (FLD_ISSET(mapped, REP_C_LEASE)) { 188 if (F_ISSET(rep, REP_F_START_CALLED)) { 189 __db_errx(env, 190"DB_ENV->rep_set_config: leases must be configured before DB_ENV->rep_start"); 191 ret = EINVAL; 192 } 193 if (on == 0) { 194 __db_errx(env, 195 "DB_ENV->rep_set_config: leases cannot be turned off"); 196 ret = EINVAL; 197 } 198 if (ret != 0) 199 return (ret); 200 } 201 MUTEX_LOCK(env, rep->mtx_clientdb); 202 REP_SYSTEM_LOCK(env); 203 orig = rep->config; 204 if (on) 205 FLD_SET(rep->config, mapped); 206 else 207 FLD_CLR(rep->config, mapped); 208 209 /* 210 * Bulk transfer requires special processing if it is getting 211 * toggled. 212 */ 213 dblp = env->lg_handle; 214 lp = dblp->reginfo.primary; 215 if (FLD_ISSET(rep->config, REP_C_BULK) && 216 !FLD_ISSET(orig, REP_C_BULK)) 217 db_rep->bulk = R_ADDR(&dblp->reginfo, lp->bulk_buf); 218 REP_SYSTEM_UNLOCK(env); 219 220 /* 221 * If turning bulk off and it was on, send out whatever is in 222 * the buffer already. 223 */ 224 if (FLD_ISSET(orig, REP_C_BULK) && 225 !FLD_ISSET(rep->config, REP_C_BULK) && lp->bulk_off != 0) { 226 memset(&bulk, 0, sizeof(bulk)); 227 if (db_rep->bulk == NULL) 228 bulk.addr = 229 R_ADDR(&dblp->reginfo, lp->bulk_buf); 230 else 231 bulk.addr = db_rep->bulk; 232 bulk.offp = &lp->bulk_off; 233 bulk.len = lp->bulk_len; 234 bulk.type = REP_BULK_LOG; 235 bulk.eid = DB_EID_BROADCAST; 236 bulk.flagsp = &lp->bulk_flags; 237 ret = __rep_send_bulk(env, &bulk, 0); 238 } 239 MUTEX_UNLOCK(env, rep->mtx_clientdb); 240 241 ENV_LEAVE(env, ip); 242 } else { 243 if (on) 244 FLD_SET(db_rep->config, mapped); 245 else 246 FLD_CLR(db_rep->config, mapped); 247 } 248 return (ret); 249} 250 251static void 252__rep_config_map(env, inflagsp, outflagsp) 253 ENV *env; 254 u_int32_t *inflagsp, *outflagsp; 255{ 256 COMPQUIET(env, NULL); 257 258 if (FLD_ISSET(*inflagsp, DB_REP_CONF_BULK)) { 259 FLD_SET(*outflagsp, REP_C_BULK); 260 FLD_CLR(*inflagsp, DB_REP_CONF_BULK); 261 } 262 if (FLD_ISSET(*inflagsp, DB_REP_CONF_DELAYCLIENT)) { 263 FLD_SET(*outflagsp, REP_C_DELAYCLIENT); 264 FLD_CLR(*inflagsp, DB_REP_CONF_DELAYCLIENT); 265 } 266 if (FLD_ISSET(*inflagsp, DB_REP_CONF_LEASE)) { 267 FLD_SET(*outflagsp, REP_C_LEASE); 268 FLD_CLR(*inflagsp, DB_REP_CONF_LEASE); 269 } 270 if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOAUTOINIT)) { 271 FLD_SET(*outflagsp, REP_C_NOAUTOINIT); 272 FLD_CLR(*inflagsp, DB_REP_CONF_NOAUTOINIT); 273 } 274 if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOWAIT)) { 275 FLD_SET(*outflagsp, REP_C_NOWAIT); 276 FLD_CLR(*inflagsp, DB_REP_CONF_NOWAIT); 277 } 278 if (FLD_ISSET(*inflagsp, DB_REPMGR_CONF_2SITE_STRICT)) { 279 FLD_SET(*outflagsp, REP_C_2SITE_STRICT); 280 FLD_CLR(*inflagsp, DB_REPMGR_CONF_2SITE_STRICT); 281 } 282} 283 284/* 285 * __rep_start -- 286 * Become a master or client, and start sending messages to participate 287 * in the replication environment. Must be called after the environment 288 * is open. 289 * 290 * We must protect rep_start, which may change the world, with the rest 291 * of the DB library. Each API interface will count itself as it enters 292 * the library. Rep_start checks the following: 293 * 294 * rep->msg_th - this is the count of threads currently in rep_process_message 295 * rep->handle_cnt - number of threads actively using a dbp in library. 296 * rep->txn_cnt - number of active txns. 297 * REP_F_READY_* - Replication flag that indicates that we wish to run 298 * recovery, and want to prohibit new transactions from entering and cause 299 * existing ones to return immediately (with a DB_LOCK_DEADLOCK error). 300 * 301 * There is also the renv->rep_timestamp which is updated whenever significant 302 * events (i.e., new masters, log rollback, etc). Upon creation, a handle 303 * is associated with the current timestamp. Each time a handle enters the 304 * library it must check if the handle timestamp is the same as the one 305 * stored in the replication region. This prevents the use of handles on 306 * clients that reference non-existent files whose creation was backed out 307 * during a synchronizing recovery. 308 * 309 * PUBLIC: int __rep_start __P((DB_ENV *, DBT *, u_int32_t)); 310 */ 311int 312__rep_start(dbenv, dbt, flags) 313 DB_ENV *dbenv; 314 DBT *dbt; 315 u_int32_t flags; 316{ 317 DB *dbp; 318 DB_LOG *dblp; 319 DB_LSN lsn; 320 DB_REP *db_rep; 321 DB_THREAD_INFO *ip; 322 DB_TXNREGION *region; 323 ENV *env; 324 LOG *lp; 325 REGINFO *infop; 326 REP *rep; 327 db_timeout_t tmp; 328 u_int32_t oldvers, pending_event, repflags, role; 329 int announce, locked, ret, role_chg; 330 int t_ret; 331 332 env = dbenv->env; 333 334 ENV_REQUIRES_CONFIG_XX( 335 env, rep_handle, "DB_ENV->rep_start", DB_INIT_REP); 336 337 db_rep = env->rep_handle; 338 rep = db_rep->region; 339 infop = env->reginfo; 340 locked = 0; 341 pending_event = DB_EVENT_NO_SUCH_EVENT; 342 343 switch (role = LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER)) { 344 case DB_REP_CLIENT: 345 case DB_REP_MASTER: 346 break; 347 default: 348 __db_errx(env, 349 "DB_ENV->rep_start: must specify DB_REP_CLIENT or DB_REP_MASTER"); 350 return (EINVAL); 351 } 352 353 /* We need a transport function. */ 354 if (db_rep->send == NULL) { 355 __db_errx(env, 356 "DB_ENV->rep_set_transport must be called before DB_ENV->rep_start"); 357 return (EINVAL); 358 } 359 360 /* 361 * If we're using master leases, check that all needed 362 * setup has been done. 363 */ 364 if (IS_USING_LEASES(env) && rep->lease_timeout == 0) { 365 __db_errx(env, 366"DB_ENV->rep_start: must call DB_ENV->rep_set_timeout for leases first"); 367 return (EINVAL); 368 } 369 370 ENV_ENTER(env, ip); 371 372 /* 373 * In order to correctly check log files for old versions, we 374 * need to flush the logs. 375 */ 376 if ((ret = __log_flush(env, NULL)) != 0) 377 goto out; 378 379 REP_SYSTEM_LOCK(env); 380 /* 381 * We only need one thread to start-up replication, so if 382 * there is another thread in rep_start, we'll let it finish 383 * its work and have this thread simply return. Similarly, 384 * if a thread is in a critical lockout section we return. 385 */ 386 if (F_ISSET(rep, REP_F_READY_MSG)) { 387 /* 388 * There is already someone in lockout. Return. 389 */ 390 RPRINT(env, DB_VERB_REP_MISC, 391 (env, "Thread already in lockout")); 392 REP_SYSTEM_UNLOCK(env); 393 goto out; 394 } else if ((ret = __rep_lockout_msg(env, rep, 0)) != 0) 395 goto errunlock; 396 397 role_chg = (!F_ISSET(rep, REP_F_MASTER) && role == DB_REP_MASTER) || 398 (!F_ISSET(rep, REP_F_CLIENT) && role == DB_REP_CLIENT); 399 400 /* 401 * Wait for any active txns or mpool ops to complete, and 402 * prevent any new ones from occurring, only if we're 403 * changing roles. 404 */ 405 if (role_chg) { 406 if ((ret = __rep_lockout_api(env, rep)) != 0) 407 goto errunlock; 408 locked = 1; 409 } 410 411 dblp = env->lg_handle; 412 lp = dblp->reginfo.primary; 413 if (role == DB_REP_MASTER) { 414 if (role_chg) { 415 /* 416 * If we're upgrading from having been a client, 417 * preclose, so that we close our temporary database 418 * and any files we opened while doing a rep_apply. 419 * If we don't we can infinitely leak file ids if 420 * the master crashed with files open (the likely 421 * case). If we don't close them we can run into 422 * problems if we try to remove that file or long 423 * running applications end up with an unbounded 424 * number of used fileids, each getting written 425 * on checkpoint. Just close them. 426 * Then invalidate all files open in the logging 427 * region. These are files open by other processes 428 * attached to the environment. They must be 429 * closed by the other processes when they notice 430 * the change in role. 431 */ 432 if ((ret = __rep_preclose(env)) != 0) 433 goto errunlock; 434 435 rep->gen++; 436 /* 437 * There could have been any number of failed 438 * elections, so jump the gen if we need to now. 439 */ 440 if (rep->egen > rep->gen) 441 rep->gen = rep->egen; 442 if (IS_USING_LEASES(env) && 443 !F_ISSET(rep, REP_F_MASTERELECT)) { 444 __db_errx(env, 445 "rep_start: Cannot become master without being elected when using leases."); 446 ret = EINVAL; 447 goto errunlock; 448 } 449 if (F_ISSET(rep, REP_F_MASTERELECT)) { 450 __rep_elect_done(env, rep, 0); 451 F_CLR(rep, REP_F_MASTERELECT); 452 } 453 if (rep->egen <= rep->gen) 454 rep->egen = rep->gen + 1; 455 RPRINT(env, DB_VERB_REP_MISC, (env, 456 "New master gen %lu, egen %lu", 457 (u_long)rep->gen, (u_long)rep->egen)); 458 if ((ret = __rep_write_gen(env, rep->gen)) != 0) 459 goto errunlock; 460 } 461 /* 462 * Set lease duration assuming clients have faster clock. 463 * Master needs to compensate so that clients do not 464 * expire their grant while the master thinks it is valid. 465 */ 466 if (IS_USING_LEASES(env) && 467 (role_chg || !F_ISSET(rep, REP_F_START_CALLED))) { 468 /* 469 * If we have already granted our lease, we 470 * cannot become master. 471 */ 472 if ((ret = __rep_islease_granted(env))) { 473 __db_errx(env, 474 "rep_start: Cannot become master with outstanding lease granted."); 475 ret = EINVAL; 476 goto errunlock; 477 } 478 /* 479 * Simply compute the larger ratio for the lease. 480 */ 481 tmp = (db_timeout_t)((double)rep->lease_timeout / 482 ((double)rep->clock_skew / 483 (double)rep->clock_base)); 484 DB_TIMEOUT_TO_TIMESPEC(tmp, &rep->lease_duration); 485 /* 486 * Keep track of last perm LSN on master for 487 * lease refresh. 488 */ 489 INIT_LSN(lp->max_perm_lsn); 490 if ((ret = __rep_lease_table_alloc(env, 491 rep->nsites)) != 0) 492 goto errunlock; 493 } 494 rep->master_id = rep->eid; 495 496 /* 497 * Clear out almost everything, and then set MASTER. Leave 498 * READY_* alone in case we did a lockout above; 499 * we'll clear it in a moment (below), once we've written 500 * the txn_recycle into the log. 501 */ 502 repflags = F_ISSET(rep, REP_F_READY_API | REP_F_READY_MSG | 503 REP_F_READY_OP); 504#ifdef DIAGNOSTIC 505 if (!F_ISSET(rep, REP_F_GROUP_ESTD)) 506 RPRINT(env, DB_VERB_REP_MISC, (env, 507 "Establishing group as master.")); 508#endif 509 FLD_SET(repflags, REP_F_MASTER | REP_F_GROUP_ESTD); 510 rep->flags = repflags; 511 512 /* 513 * We're master. Set the versions to the current ones. 514 */ 515 oldvers = lp->persist.version; 516 /* 517 * If we're moving forward to the current version, we need 518 * to force the log file to advance and reset the 519 * recovery table since it contains pointers to old 520 * recovery functions. 521 */ 522 RPRINT(env, DB_VERB_REP_MISC, (env, 523 "rep_start: Old log version was %lu", (u_long)oldvers)); 524 if (lp->persist.version != DB_LOGVERSION) { 525 if ((ret = __env_init_rec(env, DB_LOGVERSION)) != 0) 526 goto errunlock; 527 } 528 rep->version = DB_REPVERSION; 529 F_CLR(rep, REP_F_READY_MSG); 530 REP_SYSTEM_UNLOCK(env); 531 LOG_SYSTEM_LOCK(env); 532 lsn = lp->lsn; 533 LOG_SYSTEM_UNLOCK(env); 534 535 /* 536 * Send the NEWMASTER message first so that clients know 537 * subsequent messages are coming from the right master. 538 * We need to perform all actions below no matter what 539 * regarding errors. 540 */ 541 (void)__rep_send_message(env, 542 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0); 543 ret = 0; 544 if (role_chg) { 545 pending_event = DB_EVENT_REP_MASTER; 546 /* 547 * If prepared transactions have not been restored 548 * look to see if there are any. If there are, 549 * then mark the open files, otherwise close them. 550 */ 551 region = env->tx_handle->reginfo.primary; 552 if (region->stat.st_nrestores == 0 && 553 (t_ret = __rep_restore_prepared(env)) != 0 && 554 ret == 0) 555 ret = t_ret; 556 if (region->stat.st_nrestores != 0) { 557 if ((t_ret = __dbreg_mark_restored(env)) != 0 && 558 ret == 0) 559 ret = t_ret; 560 } else { 561 ret = __dbreg_invalidate_files(env, 0); 562 if ((t_ret = __rep_closefiles( 563 env, 0)) != 0 && ret == 0) 564 ret = t_ret; 565 } 566 if ((t_ret = __txn_recycle_id(env)) != 0 && ret == 0) 567 ret = t_ret; 568 REP_SYSTEM_LOCK(env); 569 F_CLR(rep, REP_F_READY_API | REP_F_READY_OP); 570 locked = 0; 571 REP_SYSTEM_UNLOCK(env); 572 (void)__memp_set_config( 573 dbenv, DB_MEMP_SYNC_INTERRUPT, 0); 574 } 575 } else { 576 announce = role_chg || rep->master_id == DB_EID_INVALID; 577 578 if (role_chg) 579 rep->master_id = DB_EID_INVALID; 580 /* Zero out everything except recovery and tally flags. */ 581 repflags = F_ISSET(rep, REP_F_NOARCHIVE | REP_F_READY_MSG | 582 REP_F_RECOVER_MASK | REP_F_TALLY); 583 FLD_SET(repflags, REP_F_CLIENT); 584 if (role_chg) { 585 if ((ret = __log_get_oldversion(env, &oldvers)) != 0) 586 goto errunlock; 587 RPRINT(env, DB_VERB_REP_MISC, (env, 588 "rep_start: Found old version log %d", oldvers)); 589 if (oldvers >= DB_LOGVERSION_MIN) { 590 __log_set_version(env, oldvers); 591 oldvers = __rep_conv_vers(env, oldvers); 592 DB_ASSERT( 593 env, oldvers != DB_REPVERSION_INVALID); 594 rep->version = oldvers; 595 } 596 } 597 rep->flags = repflags; 598 /* 599 * On a client, compute the lease duration on the 600 * assumption that the client has a fast clock. 601 * Expire any existing leases we might have held as 602 * a master. 603 */ 604 if (IS_USING_LEASES(env) && 605 (role_chg || !F_ISSET(rep, REP_F_START_CALLED))) { 606 if ((ret = __rep_lease_expire(env, 1)) != 0) 607 goto errunlock; 608 /* 609 * Since the master is also compensating on its 610 * side as well, we're being doubly conservative 611 * to compensate on the client side. Theoretically, 612 * this compensation is not necessary, as it is 613 * effectively doubling the skew compensation. 614 * But we are making guarantees based on time and 615 * skews across machines. So we are being extra 616 * cautious. 617 */ 618 tmp = (db_timeout_t)((double)rep->lease_timeout * 619 ((double)rep->clock_skew / 620 (double)rep->clock_base)); 621 DB_TIMEOUT_TO_TIMESPEC(tmp, &rep->lease_duration); 622 if (rep->lease_off != INVALID_ROFF) { 623 __env_alloc_free(infop, 624 R_ADDR(infop, rep->lease_off)); 625 rep->lease_off = INVALID_ROFF; 626 } 627 } 628 REP_SYSTEM_UNLOCK(env); 629 630 /* 631 * Abort any prepared transactions that were restored 632 * by recovery. We won't be able to create any txns of 633 * our own until they're resolved, but we can't resolve 634 * them ourselves; the master has to. If any get 635 * resolved as commits, we'll redo them when commit 636 * records come in. Aborts will simply be ignored. 637 */ 638 if ((ret = __rep_abort_prepared(env)) != 0) 639 goto errlock; 640 641 /* 642 * If we're changing roles we need to init the db. 643 */ 644 if (role_chg) { 645 if ((ret = db_create(&dbp, dbenv, 0)) != 0) 646 goto errlock; 647 /* 648 * Ignore errors, because if the file doesn't exist, 649 * this is perfectly OK. 650 */ 651 MUTEX_LOCK(env, rep->mtx_clientdb); 652 (void)__db_remove(dbp, ip, NULL, REPDBNAME, 653 NULL, DB_FORCE); 654 MUTEX_UNLOCK(env, rep->mtx_clientdb); 655 /* 656 * Set pending_event after calls that can fail. 657 */ 658 pending_event = DB_EVENT_REP_CLIENT; 659 } 660 REP_SYSTEM_LOCK(env); 661 F_CLR(rep, REP_F_READY_MSG); 662 if (locked) { 663 F_CLR(rep, REP_F_READY_API | REP_F_READY_OP); 664 locked = 0; 665 } 666 REP_SYSTEM_UNLOCK(env); 667 668 /* 669 * If this client created a newly replicated environment, 670 * then announce the existence of this client. The master 671 * should respond with a message that will tell this client 672 * the current generation number and the current LSN. This 673 * will allow the client to either perform recovery or 674 * simply join in. 675 */ 676 if (announce) { 677 /* 678 * If we think we're a new client, and we have a 679 * private env, set our gen number down to 0. 680 * Otherwise, we can restart and think 681 * we're ready to accept a new record (because our 682 * gen is okay), but really this client needs to 683 * sync with the master. So, if we are announcing 684 * ourselves force ourselves to find the master 685 * and sync up. 686 */ 687 if (F_ISSET(env, ENV_PRIVATE)) 688 rep->gen = 0; 689 if ((ret = __dbt_usercopy(env, dbt)) != 0) 690 goto out; 691 (void)__rep_send_message(env, 692 DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0, 0); 693 } else 694 (void)__rep_send_message(env, 695 DB_EID_BROADCAST, REP_ALIVE_REQ, NULL, NULL, 0, 0); 696 } 697 698 if (0) { 699 /* 700 * We have separate labels for errors. If we're returning an 701 * error before we've set REP_F_READY_MSG, we use 'err'. If 702 * we are erroring while holding the region mutex, then we use 703 * 'errunlock' label. If we error without holding the rep 704 * mutex we must use 'errlock'. 705 */ 706errlock: REP_SYSTEM_LOCK(env); 707errunlock: F_CLR(rep, REP_F_READY_MSG); 708 if (locked) 709 F_CLR(rep, REP_F_READY_API | REP_F_READY_OP); 710 REP_SYSTEM_UNLOCK(env); 711 } 712out: 713 if (ret == 0) { 714 REP_SYSTEM_LOCK(env); 715 F_SET(rep, REP_F_START_CALLED); 716 REP_SYSTEM_UNLOCK(env); 717 } 718 if (pending_event != DB_EVENT_NO_SUCH_EVENT) 719 __rep_fire_event(env, pending_event, NULL); 720 __dbt_userfree(env, dbt, NULL, NULL); 721 ENV_LEAVE(env, ip); 722 return (ret); 723} 724 725/* 726 * __rep_client_dbinit -- 727 * 728 * Initialize the LSN database on the client side. This is called from the 729 * client initialization code. The startup flag value indicates if 730 * this is the first thread/process starting up and therefore should create 731 * the LSN database. This routine must be called once by each process acting 732 * as a client. 733 * 734 * Assumes caller holds appropriate mutex. 735 * 736 * PUBLIC: int __rep_client_dbinit __P((ENV *, int, repdb_t)); 737 */ 738int 739__rep_client_dbinit(env, startup, which) 740 ENV *env; 741 int startup; 742 repdb_t which; 743{ 744 DB *dbp, **rdbpp; 745 DB_ENV *dbenv; 746 DB_REP *db_rep; 747 DB_THREAD_INFO *ip; 748 REP *rep; 749 int ret, t_ret; 750 u_int32_t flags; 751 const char *name; 752 753 dbenv = env->dbenv; 754 db_rep = env->rep_handle; 755 rep = db_rep->region; 756 dbp = NULL; 757 758 if (which == REP_DB) { 759 name = REPDBNAME; 760 rdbpp = &db_rep->rep_db; 761 } else { 762 name = REPPAGENAME; 763 rdbpp = &rep->file_dbp; 764 } 765 /* Check if this has already been called on this environment. */ 766 if (*rdbpp != NULL) 767 return (0); 768 769 ENV_GET_THREAD_INFO(env, ip); 770 771 if (startup) { 772 if ((ret = db_create(&dbp, dbenv, 0)) != 0) 773 goto err; 774 /* 775 * Ignore errors, because if the file doesn't exist, this 776 * is perfectly OK. 777 */ 778 (void)__db_remove(dbp, ip, NULL, name, NULL, DB_FORCE); 779 } 780 781 if ((ret = db_create(&dbp, dbenv, 0)) != 0) 782 goto err; 783 if (which == REP_DB && 784 (ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0) 785 goto err; 786 787 /* Don't write log records on the client. */ 788 if ((ret = __db_set_flags(dbp, DB_TXN_NOT_DURABLE)) != 0) 789 goto err; 790 791 flags = DB_NO_AUTO_COMMIT | DB_CREATE | 792 (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0); 793 794 if ((ret = __db_open(dbp, ip, NULL, name, NULL, 795 (which == REP_DB ? DB_BTREE : DB_RECNO), 796 flags, 0, PGNO_BASE_MD)) != 0) 797 goto err; 798 799 *rdbpp = dbp; 800 801 if (0) { 802err: if (dbp != NULL && 803 (t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0) 804 ret = t_ret; 805 *rdbpp = NULL; 806 } 807 808 return (ret); 809} 810 811/* 812 * __rep_bt_cmp -- 813 * 814 * Comparison function for the LSN table. We use the entire control 815 * structure as a key (for simplicity, so we don't have to merge the 816 * other fields in the control with the data field), but really only 817 * care about the LSNs. 818 */ 819static int 820__rep_bt_cmp(dbp, dbt1, dbt2) 821 DB *dbp; 822 const DBT *dbt1, *dbt2; 823{ 824 DB_LSN lsn1, lsn2; 825 __rep_control_args *rp1, *rp2; 826 827 COMPQUIET(dbp, NULL); 828 829 rp1 = dbt1->data; 830 rp2 = dbt2->data; 831 832 (void)__ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN)); 833 (void)__ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN)); 834 835 if (lsn1.file > lsn2.file) 836 return (1); 837 838 if (lsn1.file < lsn2.file) 839 return (-1); 840 841 if (lsn1.offset > lsn2.offset) 842 return (1); 843 844 if (lsn1.offset < lsn2.offset) 845 return (-1); 846 847 return (0); 848} 849 850/* 851 * __rep_abort_prepared -- 852 * Abort any prepared transactions that recovery restored. 853 * 854 * This is used by clients that have just run recovery, since 855 * they cannot/should not call txn_recover and handle prepared transactions 856 * themselves. 857 */ 858static int 859__rep_abort_prepared(env) 860 ENV *env; 861{ 862#define PREPLISTSIZE 50 863 DB_LOG *dblp; 864 DB_PREPLIST prep[PREPLISTSIZE], *p; 865 DB_TXNMGR *mgr; 866 DB_TXNREGION *region; 867 LOG *lp; 868 int ret; 869 long count, i; 870 u_int32_t op; 871 872 mgr = env->tx_handle; 873 region = mgr->reginfo.primary; 874 dblp = env->lg_handle; 875 lp = dblp->reginfo.primary; 876 877 if (region->stat.st_nrestores == 0) 878 return (0); 879 880 op = DB_FIRST; 881 do { 882 if ((ret = __txn_recover(env, 883 prep, PREPLISTSIZE, &count, op)) != 0) 884 return (ret); 885 for (i = 0; i < count; i++) { 886 p = &prep[i]; 887 if ((ret = __txn_abort(p->txn)) != 0) 888 return (ret); 889 env->rep_handle->region->op_cnt--; 890 env->rep_handle->region->max_prep_lsn = lp->lsn; 891 region->stat.st_nrestores--; 892 } 893 op = DB_NEXT; 894 } while (count == PREPLISTSIZE); 895 896 return (0); 897} 898 899/* 900 * __rep_restore_prepared -- 901 * Restore to a prepared state any prepared but not yet committed 902 * transactions. 903 * 904 * This performs, in effect, a "mini-recovery"; it is called from 905 * __rep_start by newly upgraded masters. There may be transactions that an 906 * old master prepared but did not resolve, which we need to restore to an 907 * active state. 908 */ 909static int 910__rep_restore_prepared(env) 911 ENV *env; 912{ 913 DBT rec; 914 DB_LOGC *logc; 915 DB_LSN ckp_lsn, lsn; 916 DB_REP *db_rep; 917 DB_TXNHEAD *txninfo; 918 REP *rep; 919 __txn_ckp_args *ckp_args; 920 __txn_regop_args *regop_args; 921 __txn_xa_regop_args *prep_args; 922 int ret, t_ret; 923 u_int32_t hi_txn, low_txn, rectype, status, txnid, txnop; 924 925 db_rep = env->rep_handle; 926 rep = db_rep->region; 927 if (IS_ZERO_LSN(rep->max_prep_lsn)) { 928 RPRINT(env, DB_VERB_REP_MISC, 929 (env, "restore_prep: No prepares. Skip.")); 930 return (0); 931 } 932 txninfo = NULL; 933 ckp_args = NULL; 934 prep_args = NULL; 935 regop_args = NULL; 936 ZERO_LSN(ckp_lsn); 937 ZERO_LSN(lsn); 938 939 if ((ret = __log_cursor(env, &logc)) != 0) 940 return (ret); 941 942 /* 943 * Get our first LSN to see if the prepared LSN is still 944 * available. If so, it might be unresolved. If not, 945 * then it is guaranteed to be resolved. 946 */ 947 memset(&rec, 0, sizeof(DBT)); 948 if ((ret = __logc_get(logc, &lsn, &rec, DB_FIRST)) != 0) { 949 __db_errx(env, "First record not found"); 950 goto err; 951 } 952 /* 953 * If the max_prep_lsn is no longer available, we're sure 954 * that txn has been resolved. We're done. 955 */ 956 if (rep->max_prep_lsn.file < lsn.file) { 957 RPRINT(env, DB_VERB_REP_MISC, 958 (env, "restore_prep: Prepare resolved. Skip")); 959 ZERO_LSN(rep->max_prep_lsn); 960 goto done; 961 } 962 /* 963 * We need to consider the set of records between the most recent 964 * checkpoint LSN and the end of the log; any txn in that 965 * range, and only txns in that range, could still have been 966 * active, and thus prepared but not yet committed (PBNYC), 967 * when the old master died. 968 * 969 * Find the most recent checkpoint LSN, and get the record there. 970 * If there is no checkpoint in the log, start off by getting 971 * the very first record in the log instead. 972 */ 973 if ((ret = __txn_getckp(env, &lsn)) == 0) { 974 if ((ret = __logc_get(logc, &lsn, &rec, DB_SET)) != 0) { 975 __db_errx(env, 976 "Checkpoint record at LSN [%lu][%lu] not found", 977 (u_long)lsn.file, (u_long)lsn.offset); 978 goto err; 979 } 980 981 if ((ret = __txn_ckp_read( 982 env, rec.data, &ckp_args)) == 0) { 983 ckp_lsn = ckp_args->ckp_lsn; 984 __os_free(env, ckp_args); 985 } 986 if (ret != 0) { 987 __db_errx(env, 988 "Invalid checkpoint record at [%lu][%lu]", 989 (u_long)lsn.file, (u_long)lsn.offset); 990 goto err; 991 } 992 993 if ((ret = __logc_get(logc, &ckp_lsn, &rec, DB_SET)) != 0) { 994 __db_errx(env, 995 "Checkpoint LSN record [%lu][%lu] not found", 996 (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset); 997 goto err; 998 } 999 } else if ((ret = __logc_get(logc, &lsn, &rec, DB_FIRST)) != 0) { 1000 if (ret == DB_NOTFOUND) { 1001 /* An empty log means no PBNYC txns. */ 1002 ret = 0; 1003 goto done; 1004 } 1005 __db_errx(env, "Attempt to get first log record failed"); 1006 goto err; 1007 } 1008 1009 /* 1010 * We use the same txnlist infrastructure that recovery does; 1011 * it demands an estimate of the high and low txnids for 1012 * initialization. 1013 * 1014 * First, the low txnid. 1015 */ 1016 do { 1017 /* txnid is after rectype, which is a u_int32. */ 1018 LOGCOPY_32(env, &low_txn, 1019 (u_int8_t *)rec.data + sizeof(u_int32_t)); 1020 if (low_txn != 0) 1021 break; 1022 } while ((ret = __logc_get(logc, &lsn, &rec, DB_NEXT)) == 0); 1023 1024 /* If there are no txns, there are no PBNYC txns. */ 1025 if (ret == DB_NOTFOUND) { 1026 ret = 0; 1027 goto done; 1028 } else if (ret != 0) 1029 goto err; 1030 1031 /* Now, the high txnid. */ 1032 if ((ret = __logc_get(logc, &lsn, &rec, DB_LAST)) != 0) { 1033 /* 1034 * Note that DB_NOTFOUND is unacceptable here because we 1035 * had to have looked at some log record to get this far. 1036 */ 1037 __db_errx(env, "Final log record not found"); 1038 goto err; 1039 } 1040 do { 1041 /* txnid is after rectype, which is a u_int32. */ 1042 LOGCOPY_32(env, &hi_txn, 1043 (u_int8_t *)rec.data + sizeof(u_int32_t)); 1044 if (hi_txn != 0) 1045 break; 1046 } while ((ret = __logc_get(logc, &lsn, &rec, DB_PREV)) == 0); 1047 if (ret == DB_NOTFOUND) { 1048 ret = 0; 1049 goto done; 1050 } else if (ret != 0) 1051 goto err; 1052 1053 /* We have a high and low txnid. Initialise the txn list. */ 1054 if ((ret = __db_txnlist_init(env, 1055 NULL, low_txn, hi_txn, NULL, &txninfo)) != 0) 1056 goto err; 1057 1058 /* 1059 * Now, walk backward from the end of the log to ckp_lsn. Any 1060 * prepares that we hit without first hitting a commit or 1061 * abort belong to PBNYC txns, and we need to apply them and 1062 * restore them to a prepared state. 1063 * 1064 * Note that we wind up applying transactions out of order. 1065 * Since all PBNYC txns still held locks on the old master and 1066 * were isolated, this should be safe. 1067 */ 1068 F_SET(env->lg_handle, DBLOG_RECOVER); 1069 for (ret = __logc_get(logc, &lsn, &rec, DB_LAST); 1070 ret == 0 && LOG_COMPARE(&lsn, &ckp_lsn) > 0; 1071 ret = __logc_get(logc, &lsn, &rec, DB_PREV)) { 1072 LOGCOPY_32(env, &rectype, rec.data); 1073 switch (rectype) { 1074 case DB___txn_regop: 1075 /* 1076 * It's a commit or abort--but we don't care 1077 * which! Just add it to the list of txns 1078 * that are resolved. 1079 */ 1080 if ((ret = __txn_regop_read( 1081 env, rec.data, ®op_args)) != 0) 1082 goto err; 1083 txnid = regop_args->txnp->txnid; 1084 txnop = regop_args->opcode; 1085 __os_free(env, regop_args); 1086 1087 ret = __db_txnlist_find(env, 1088 txninfo, txnid, &status); 1089 if (ret == DB_NOTFOUND) 1090 ret = __db_txnlist_add(env, txninfo, 1091 txnid, txnop, &lsn); 1092 else if (ret != 0) 1093 goto err; 1094 break; 1095 case DB___txn_xa_regop: 1096 /* 1097 * It's a prepare. If its not aborted and 1098 * we haven't put the txn on our list yet, it 1099 * hasn't been resolved, so apply and restore it. 1100 */ 1101 if ((ret = __txn_xa_regop_read( 1102 env, rec.data, &prep_args)) != 0) 1103 goto err; 1104 ret = __db_txnlist_find(env, txninfo, 1105 prep_args->txnp->txnid, &status); 1106 if (ret == DB_NOTFOUND) { 1107 if (prep_args->opcode == TXN_ABORT) 1108 ret = __db_txnlist_add(env, txninfo, 1109 prep_args->txnp->txnid, 1110 prep_args->opcode, &lsn); 1111 else if ((ret = 1112 __rep_process_txn(env, &rec)) == 0) { 1113 /* 1114 * We are guaranteed to be single 1115 * threaded here. We need to 1116 * account for this newly 1117 * instantiated txn in the op_cnt 1118 * so that it is counted when it is 1119 * resolved. 1120 */ 1121 rep->op_cnt++; 1122 ret = __txn_restore_txn(env, 1123 &lsn, prep_args); 1124 } 1125 } else if (ret != 0) 1126 goto err; 1127 __os_free(env, prep_args); 1128 break; 1129 default: 1130 continue; 1131 } 1132 } 1133 1134 /* It's not an error to have hit the beginning of the log. */ 1135 if (ret == DB_NOTFOUND) 1136 ret = 0; 1137 1138done: 1139err: t_ret = __logc_close(logc); 1140 F_CLR(env->lg_handle, DBLOG_RECOVER); 1141 1142 if (txninfo != NULL) 1143 __db_txnlist_end(env, txninfo); 1144 1145 return (ret == 0 ? t_ret : ret); 1146} 1147 1148/* 1149 * __rep_get_limit -- 1150 * Get the limit on the amount of data that will be sent during a single 1151 * invocation of __rep_process_message. 1152 * 1153 * PUBLIC: int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *)); 1154 */ 1155int 1156__rep_get_limit(dbenv, gbytesp, bytesp) 1157 DB_ENV *dbenv; 1158 u_int32_t *gbytesp, *bytesp; 1159{ 1160 DB_REP *db_rep; 1161 DB_THREAD_INFO *ip; 1162 ENV *env; 1163 REP *rep; 1164 1165 env = dbenv->env; 1166 db_rep = env->rep_handle; 1167 1168 ENV_NOT_CONFIGURED( 1169 env, db_rep->region, "DB_ENV->rep_get_limit", DB_INIT_REP); 1170 1171 if (REP_ON(env)) { 1172 rep = db_rep->region; 1173 ENV_ENTER(env, ip); 1174 REP_SYSTEM_LOCK(env); 1175 if (gbytesp != NULL) 1176 *gbytesp = rep->gbytes; 1177 if (bytesp != NULL) 1178 *bytesp = rep->bytes; 1179 REP_SYSTEM_UNLOCK(env); 1180 ENV_LEAVE(env, ip); 1181 } else { 1182 if (gbytesp != NULL) 1183 *gbytesp = db_rep->gbytes; 1184 if (bytesp != NULL) 1185 *bytesp = db_rep->bytes; 1186 } 1187 1188 return (0); 1189} 1190 1191/* 1192 * __rep_set_limit -- 1193 * Set a limit on the amount of data that will be sent during a single 1194 * invocation of __rep_process_message. 1195 * 1196 * PUBLIC: int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t)); 1197 */ 1198int 1199__rep_set_limit(dbenv, gbytes, bytes) 1200 DB_ENV *dbenv; 1201 u_int32_t gbytes, bytes; 1202{ 1203 DB_REP *db_rep; 1204 DB_THREAD_INFO *ip; 1205 ENV *env; 1206 REP *rep; 1207 1208 env = dbenv->env; 1209 db_rep = env->rep_handle; 1210 1211 ENV_NOT_CONFIGURED( 1212 env, db_rep->region, "DB_ENV->rep_set_limit", DB_INIT_REP); 1213 1214 if (bytes > GIGABYTE) { 1215 gbytes += bytes / GIGABYTE; 1216 bytes = bytes % GIGABYTE; 1217 } 1218 1219 if (REP_ON(env)) { 1220 rep = db_rep->region; 1221 ENV_ENTER(env, ip); 1222 REP_SYSTEM_LOCK(env); 1223 rep->gbytes = gbytes; 1224 rep->bytes = bytes; 1225 REP_SYSTEM_UNLOCK(env); 1226 ENV_LEAVE(env, ip); 1227 } else { 1228 db_rep->gbytes = gbytes; 1229 db_rep->bytes = bytes; 1230 } 1231 1232 return (0); 1233} 1234 1235/* 1236 * PUBLIC: int __rep_set_nsites __P((DB_ENV *, u_int32_t)); 1237 */ 1238int 1239__rep_set_nsites(dbenv, n) 1240 DB_ENV *dbenv; 1241 u_int32_t n; 1242{ 1243 DB_REP *db_rep; 1244 ENV *env; 1245 REP *rep; 1246 1247 env = dbenv->env; 1248 db_rep = env->rep_handle; 1249 1250 ENV_NOT_CONFIGURED( 1251 env, db_rep->region, "DB_ENV->rep_set_nsites", DB_INIT_REP); 1252 1253 if (REP_ON(env)) { 1254 rep = db_rep->region; 1255 if (rep != NULL && F_ISSET(rep, REP_F_START_CALLED)) { 1256 __db_errx(env, 1257 "DB_ENV->rep_set_nsites: must be called before DB_ENV->rep_start"); 1258 return (EINVAL); 1259 } 1260 rep->config_nsites = n; 1261 } else 1262 db_rep->config_nsites = n; 1263 return (0); 1264} 1265 1266/* 1267 * PUBLIC: int __rep_get_nsites __P((DB_ENV *, u_int32_t *)); 1268 */ 1269int 1270__rep_get_nsites(dbenv, n) 1271 DB_ENV *dbenv; 1272 u_int32_t *n; 1273{ 1274 DB_REP *db_rep; 1275 ENV *env; 1276 REP *rep; 1277 1278 env = dbenv->env; 1279 db_rep = env->rep_handle; 1280 1281 /* TODO: ENV_REQUIRES_CONFIG(... ) and/or ENV_NOT_CONFIGURED (?) */ 1282 1283 if (REP_ON(env)) { 1284 rep = db_rep->region; 1285 *n = rep->config_nsites; 1286 } else 1287 *n = db_rep->config_nsites; 1288 1289 return (0); 1290} 1291 1292/* 1293 * PUBLIC: int __rep_set_priority __P((DB_ENV *, u_int32_t)); 1294 */ 1295int 1296__rep_set_priority(dbenv, priority) 1297 DB_ENV *dbenv; 1298 u_int32_t priority; 1299{ 1300 DB_REP *db_rep; 1301 ENV *env; 1302 REP *rep; 1303 1304 env = dbenv->env; 1305 db_rep = env->rep_handle; 1306 1307 if (REP_ON(env)) { 1308 rep = db_rep->region; 1309 rep->priority = priority; 1310 } else 1311 db_rep->my_priority = priority; 1312 return (0); 1313} 1314 1315/* 1316 * PUBLIC: int __rep_get_priority __P((DB_ENV *, u_int32_t *)); 1317 */ 1318int 1319__rep_get_priority(dbenv, priority) 1320 DB_ENV *dbenv; 1321 u_int32_t *priority; 1322{ 1323 DB_REP *db_rep; 1324 ENV *env; 1325 REP *rep; 1326 1327 env = dbenv->env; 1328 db_rep = env->rep_handle; 1329 1330 if (REP_ON(env)) { 1331 rep = db_rep->region; 1332 *priority = rep->priority; 1333 } else 1334 *priority = db_rep->my_priority; 1335 return (0); 1336} 1337 1338/* 1339 * PUBLIC: int __rep_set_timeout __P((DB_ENV *, int, db_timeout_t)); 1340 */ 1341int 1342__rep_set_timeout(dbenv, which, timeout) 1343 DB_ENV *dbenv; 1344 int which; 1345 db_timeout_t timeout; 1346{ 1347 DB_REP *db_rep; 1348 ENV *env; 1349 REP *rep; 1350 int ret; 1351 1352 env = dbenv->env; 1353 db_rep = env->rep_handle; 1354 rep = db_rep->region; 1355 ret = 0; 1356 1357 switch (which) { 1358 case DB_REP_CHECKPOINT_DELAY: 1359 if (REP_ON(env)) 1360 rep->chkpt_delay = timeout; 1361 else 1362 db_rep->chkpt_delay = timeout; 1363 break; 1364 case DB_REP_ELECTION_TIMEOUT: 1365 if (REP_ON(env)) 1366 rep->elect_timeout = timeout; 1367 else 1368 db_rep->elect_timeout = timeout; 1369 break; 1370 case DB_REP_FULL_ELECTION_TIMEOUT: 1371 if (REP_ON(env)) 1372 rep->full_elect_timeout = timeout; 1373 else 1374 db_rep->full_elect_timeout = timeout; 1375 break; 1376 case DB_REP_LEASE_TIMEOUT: 1377 if (REP_ON(env) && F_ISSET(rep, REP_F_START_CALLED)) { 1378 ret = EINVAL; 1379 __db_errx(env, 1380"DB_ENV->rep_set_timeout: lease timeout must be set before DB_ENV->rep_start."); 1381 goto out; 1382 } 1383 if (REP_ON(env)) 1384 rep->lease_timeout = timeout; 1385 else 1386 db_rep->lease_timeout = timeout; 1387 break; 1388#ifdef HAVE_REPLICATION_THREADS 1389 case DB_REP_ACK_TIMEOUT: 1390 db_rep->ack_timeout = timeout; 1391 break; 1392 case DB_REP_CONNECTION_RETRY: 1393 db_rep->connection_retry_wait = timeout; 1394 break; 1395 case DB_REP_ELECTION_RETRY: 1396 db_rep->election_retry_wait = timeout; 1397 break; 1398 case DB_REP_HEARTBEAT_MONITOR: 1399 db_rep->heartbeat_monitor_timeout = timeout; 1400 break; 1401 case DB_REP_HEARTBEAT_SEND: 1402 db_rep->heartbeat_frequency = timeout; 1403 break; 1404#endif 1405 default: 1406 __db_errx(env, 1407 "Unknown timeout type argument to DB_ENV->rep_set_timeout"); 1408 ret = EINVAL; 1409 } 1410 1411out: 1412 return (ret); 1413} 1414 1415/* 1416 * PUBLIC: int __rep_get_timeout __P((DB_ENV *, int, db_timeout_t *)); 1417 */ 1418int 1419__rep_get_timeout(dbenv, which, timeout) 1420 DB_ENV *dbenv; 1421 int which; 1422 db_timeout_t *timeout; 1423{ 1424 DB_REP *db_rep; 1425 ENV *env; 1426 REP *rep; 1427 1428 env = dbenv->env; 1429 db_rep = env->rep_handle; 1430 rep = db_rep->region; 1431 1432 switch (which) { 1433 case DB_REP_CHECKPOINT_DELAY: 1434 *timeout = REP_ON(env) ? 1435 rep->chkpt_delay : db_rep->chkpt_delay; 1436 break; 1437 case DB_REP_ELECTION_TIMEOUT: 1438 *timeout = REP_ON(env) ? 1439 rep->elect_timeout : db_rep->elect_timeout; 1440 break; 1441 case DB_REP_FULL_ELECTION_TIMEOUT: 1442 *timeout = REP_ON(env) ? 1443 rep->full_elect_timeout : db_rep->full_elect_timeout; 1444 break; 1445 case DB_REP_LEASE_TIMEOUT: 1446 *timeout = REP_ON(env) ? 1447 rep->lease_timeout : db_rep->lease_timeout; 1448 break; 1449#ifdef HAVE_REPLICATION_THREADS 1450 case DB_REP_ACK_TIMEOUT: 1451 *timeout = db_rep->ack_timeout; 1452 break; 1453 case DB_REP_ELECTION_RETRY: 1454 *timeout = db_rep->election_retry_wait; 1455 break; 1456 case DB_REP_CONNECTION_RETRY: 1457 *timeout = db_rep->connection_retry_wait; 1458 break; 1459#endif 1460 default: 1461 __db_errx(env, 1462 "unknown timeout type argument to DB_ENV->rep_get_timeout"); 1463 return (EINVAL); 1464 } 1465 1466 return (0); 1467} 1468 1469/* 1470 * __rep_get_request -- 1471 * Get the minimum and maximum number of log records that we wait 1472 * before retransmitting. 1473 * 1474 * PUBLIC: int __rep_get_request 1475 * PUBLIC: __P((DB_ENV *, db_timeout_t *, db_timeout_t *)); 1476 */ 1477int 1478__rep_get_request(dbenv, minp, maxp) 1479 DB_ENV *dbenv; 1480 db_timeout_t *minp, *maxp; 1481{ 1482 DB_REP *db_rep; 1483 DB_THREAD_INFO *ip; 1484 ENV *env; 1485 REP *rep; 1486 1487 env = dbenv->env; 1488 db_rep = env->rep_handle; 1489 1490 ENV_NOT_CONFIGURED( 1491 env, db_rep->region, "DB_ENV->rep_get_request", DB_INIT_REP); 1492 1493 if (REP_ON(env)) { 1494 rep = db_rep->region; 1495 ENV_ENTER(env, ip); 1496 /* 1497 * We acquire the mtx_region or mtx_clientdb mutexes as needed. 1498 */ 1499 REP_SYSTEM_LOCK(env); 1500 if (minp != NULL) 1501 DB_TIMESPEC_TO_TIMEOUT((*minp), &rep->request_gap, 0); 1502 if (maxp != NULL) 1503 DB_TIMESPEC_TO_TIMEOUT((*maxp), &rep->max_gap, 0); 1504 REP_SYSTEM_UNLOCK(env); 1505 ENV_LEAVE(env, ip); 1506 } else { 1507 if (minp != NULL) 1508 DB_TIMESPEC_TO_TIMEOUT((*minp), 1509 &db_rep->request_gap, 0); 1510 if (maxp != NULL) 1511 DB_TIMESPEC_TO_TIMEOUT((*maxp), &db_rep->max_gap, 0); 1512 } 1513 1514 return (0); 1515} 1516 1517/* 1518 * __rep_set_request -- 1519 * Set the minimum and maximum number of log records that we wait 1520 * before retransmitting. 1521 * 1522 * PUBLIC: int __rep_set_request __P((DB_ENV *, db_timeout_t, db_timeout_t)); 1523 */ 1524int 1525__rep_set_request(dbenv, min, max) 1526 DB_ENV *dbenv; 1527 db_timeout_t min, max; 1528{ 1529 DB_LOG *dblp; 1530 DB_REP *db_rep; 1531 DB_THREAD_INFO *ip; 1532 ENV *env; 1533 LOG *lp; 1534 REP *rep; 1535 1536 env = dbenv->env; 1537 db_rep = env->rep_handle; 1538 1539 ENV_NOT_CONFIGURED( 1540 env, db_rep->region, "DB_ENV->rep_set_request", DB_INIT_REP); 1541 1542 if (min == 0 || max < min) { 1543 __db_errx(env, 1544 "DB_ENV->rep_set_request: Invalid min or max values"); 1545 return (EINVAL); 1546 } 1547 if (REP_ON(env)) { 1548 rep = db_rep->region; 1549 ENV_ENTER(env, ip); 1550 /* 1551 * We acquire the mtx_region or mtx_clientdb mutexes as needed. 1552 */ 1553 REP_SYSTEM_LOCK(env); 1554 DB_TIMEOUT_TO_TIMESPEC(min, &rep->request_gap); 1555 DB_TIMEOUT_TO_TIMESPEC(max, &rep->max_gap); 1556 REP_SYSTEM_UNLOCK(env); 1557 1558 MUTEX_LOCK(env, rep->mtx_clientdb); 1559 dblp = env->lg_handle; 1560 if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) { 1561 DB_TIMEOUT_TO_TIMESPEC(min, &lp->wait_ts); 1562 } 1563 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1564 ENV_LEAVE(env, ip); 1565 } else { 1566 DB_TIMEOUT_TO_TIMESPEC(min, &db_rep->request_gap); 1567 DB_TIMEOUT_TO_TIMESPEC(max, &db_rep->max_gap); 1568 } 1569 1570 return (0); 1571} 1572 1573/* 1574 * __rep_set_transport -- 1575 * Set the transport function for replication. 1576 * 1577 * PUBLIC: int __rep_set_transport __P((DB_ENV *, int, 1578 * PUBLIC: int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *, 1579 * PUBLIC: int, u_int32_t))); 1580 */ 1581int 1582__rep_set_transport(dbenv, eid, f_send) 1583 DB_ENV *dbenv; 1584 int eid; 1585 int (*f_send) __P((DB_ENV *, 1586 const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); 1587{ 1588 DB_REP *db_rep; 1589 DB_THREAD_INFO *ip; 1590 ENV *env; 1591 REP *rep; 1592 1593 env = dbenv->env; 1594 1595 if (f_send == NULL) { 1596 __db_errx(env, 1597 "DB_ENV->rep_set_transport: no send function specified"); 1598 return (EINVAL); 1599 } 1600 1601 if (eid < 0) { 1602 __db_errx(env, 1603 "DB_ENV->rep_set_transport: eid must be greater than or equal to 0"); 1604 return (EINVAL); 1605 } 1606 1607 db_rep = env->rep_handle; 1608 db_rep->send = f_send; 1609 1610 if (REP_ON(env)) { 1611 rep = db_rep->region; 1612 ENV_ENTER(env, ip); 1613 REP_SYSTEM_LOCK(env); 1614 rep->eid = eid; 1615 REP_SYSTEM_UNLOCK(env); 1616 ENV_LEAVE(env, ip); 1617 } else 1618 db_rep->eid = eid; 1619 return (0); 1620} 1621 1622/* 1623 * PUBLIC: int __rep_get_clockskew __P((DB_ENV *, u_int32_t *, u_int32_t *)); 1624 */ 1625int 1626__rep_get_clockskew(dbenv, fast_clockp, slow_clockp) 1627 DB_ENV *dbenv; 1628 u_int32_t *fast_clockp, *slow_clockp; 1629{ 1630 DB_REP *db_rep; 1631 ENV *env; 1632 REP *rep; 1633 1634 env = dbenv->env; 1635 db_rep = env->rep_handle; 1636 1637 if (REP_ON(env)) { 1638 rep = db_rep->region; 1639 *fast_clockp = rep->clock_skew; 1640 *slow_clockp = rep->clock_base; 1641 } else { 1642 *fast_clockp = db_rep->clock_skew; 1643 *slow_clockp = db_rep->clock_base; 1644 } 1645 1646 return (0); 1647} 1648 1649/* 1650 * PUBLIC: int __rep_set_clockskew __P((DB_ENV *, u_int32_t, u_int32_t)); 1651 */ 1652int 1653__rep_set_clockskew(dbenv, fast_clock, slow_clock) 1654 DB_ENV *dbenv; 1655 u_int32_t fast_clock, slow_clock; 1656{ 1657 DB_REP *db_rep; 1658 DB_THREAD_INFO *ip; 1659 ENV *env; 1660 REP *rep; 1661 int ret; 1662 1663 env = dbenv->env; 1664 db_rep = env->rep_handle; 1665 ret = 0; 1666 1667 ENV_NOT_CONFIGURED( 1668 env, db_rep->region, "DB_ENV->rep_set_clockskew", DB_INIT_REP); 1669 1670 /* 1671 * Check for valid values. The fast clock should be a larger 1672 * number than the slow clock. We use the slow clock value as 1673 * our base for adjustment - therefore, a 2% difference should 1674 * be fast == 102, slow == 100. Check for values being 0. If 1675 * they are, then set them both to 1 internally. 1676 * 1677 * We will use these numbers to compute the larger ratio to be 1678 * most conservative about the user's intention. 1679 */ 1680 if (fast_clock == 0 || slow_clock == 0) { 1681 /* 1682 * If one value is zero, reject if both aren't zero. 1683 */ 1684 if (slow_clock != 0 || fast_clock != 0) { 1685 __db_errx(env, 1686"DB_ENV->rep_set_clockskew: Zero only valid for when used for both arguments"); 1687 return (EINVAL); 1688 } 1689 fast_clock = 1; 1690 slow_clock = 1; 1691 } 1692 if (fast_clock < slow_clock) { 1693 __db_errx(env, 1694"DB_ENV->rep_set_clockskew: slow_clock value is larger than fast_clock_value"); 1695 return (EINVAL); 1696 } 1697 if (REP_ON(env)) { 1698 rep = db_rep->region; 1699 if (F_ISSET(rep, REP_F_START_CALLED)) { 1700 __db_errx(env, 1701 "DB_ENV->rep_set_clockskew: must be called before DB_ENV->rep_start"); 1702 return (EINVAL); 1703 } 1704 ENV_ENTER(env, ip); 1705 REP_SYSTEM_LOCK(env); 1706 rep->clock_skew = fast_clock; 1707 rep->clock_base = slow_clock; 1708 REP_SYSTEM_UNLOCK(env); 1709 ENV_LEAVE(env, ip); 1710 } else { 1711 db_rep->clock_skew = fast_clock; 1712 db_rep->clock_base = slow_clock; 1713 } 1714 return (ret); 1715} 1716 1717/* 1718 * __rep_flush -- 1719 * Re-push the last log record to all clients, in case they've lost 1720 * messages and don't know it. 1721 * 1722 * PUBLIC: int __rep_flush __P((DB_ENV *)); 1723 */ 1724int 1725__rep_flush(dbenv) 1726 DB_ENV *dbenv; 1727{ 1728 DBT rec; 1729 DB_LOGC *logc; 1730 DB_LSN lsn; 1731 DB_THREAD_INFO *ip; 1732 ENV *env; 1733 int ret, t_ret; 1734 1735 env = dbenv->env; 1736 1737 ENV_REQUIRES_CONFIG_XX( 1738 env, rep_handle, "DB_ENV->rep_flush", DB_INIT_REP); 1739 ENV_ENTER(env, ip); 1740 1741 if ((ret = __log_cursor(env, &logc)) != 0) 1742 return (ret); 1743 1744 memset(&rec, 0, sizeof(rec)); 1745 memset(&lsn, 0, sizeof(lsn)); 1746 1747 if ((ret = __logc_get(logc, &lsn, &rec, DB_LAST)) != 0) 1748 goto err; 1749 1750 (void)__rep_send_message(env, 1751 DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0, 0); 1752 1753err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0) 1754 ret = t_ret; 1755 ENV_LEAVE(env, ip); 1756 return (ret); 1757} 1758 1759/* 1760 * __rep_sync -- 1761 * Force a synchronization to occur between this client and the master. 1762 * This is the other half of configuring DELAYCLIENT. 1763 * 1764 * PUBLIC: int __rep_sync __P((DB_ENV *, u_int32_t)); 1765 */ 1766int 1767__rep_sync(dbenv, flags) 1768 DB_ENV *dbenv; 1769 u_int32_t flags; 1770{ 1771 DB_LOG *dblp; 1772 DB_LSN lsn; 1773 DB_REP *db_rep; 1774 DB_THREAD_INFO *ip; 1775 ENV *env; 1776 LOG *lp; 1777 REP *rep; 1778 int master, ret; 1779 u_int32_t repflags, type; 1780 1781 env = dbenv->env; 1782 1783 COMPQUIET(flags, 0); 1784 1785 ENV_REQUIRES_CONFIG_XX( 1786 env, rep_handle, "DB_ENV->rep_sync", DB_INIT_REP); 1787 1788 dblp = env->lg_handle; 1789 lp = dblp->reginfo.primary; 1790 db_rep = env->rep_handle; 1791 rep = db_rep->region; 1792 ret = 0; 1793 1794 ENV_ENTER(env, ip); 1795 1796 /* 1797 * Simple cases. If we're not in the DELAY state we have nothing 1798 * to do. If we don't know who the master is, send a MASTER_REQ. 1799 */ 1800 MUTEX_LOCK(env, rep->mtx_clientdb); 1801 lsn = lp->verify_lsn; 1802 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1803 REP_SYSTEM_LOCK(env); 1804 master = rep->master_id; 1805 if (master == DB_EID_INVALID) { 1806 REP_SYSTEM_UNLOCK(env); 1807 (void)__rep_send_message(env, DB_EID_BROADCAST, 1808 REP_MASTER_REQ, NULL, NULL, 0, 0); 1809 goto out; 1810 } 1811 /* 1812 * We want to hold the rep mutex to test and then clear the 1813 * DELAY flag. Racing threads in here could otherwise result 1814 * in dual data streams. 1815 */ 1816 if (!F_ISSET(rep, REP_F_DELAY)) { 1817 REP_SYSTEM_UNLOCK(env); 1818 goto out; 1819 } 1820 1821 DB_ASSERT(env, 1822 !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0); 1823 1824 /* 1825 * If we get here, we clear the delay flag and kick off a 1826 * synchronization. From this point forward, we will 1827 * synchronize until the next time the master changes. 1828 */ 1829 F_CLR(rep, REP_F_DELAY); 1830 if (IS_ZERO_LSN(lsn) && FLD_ISSET(rep->config, REP_C_NOAUTOINIT)) { 1831 F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK); 1832 ret = DB_REP_JOIN_FAILURE; 1833 REP_SYSTEM_UNLOCK(env); 1834 goto out; 1835 } 1836 REP_SYSTEM_UNLOCK(env); 1837 /* 1838 * When we set REP_F_DELAY, we set verify_lsn to the real verify lsn if 1839 * we need to verify, or we zeroed it out if this is a client that needs 1840 * internal init. So, send the type of message now that 1841 * __rep_new_master delayed sending. 1842 */ 1843 if (IS_ZERO_LSN(lsn)) { 1844 DB_ASSERT(env, F_ISSET(rep, REP_F_RECOVER_UPDATE)); 1845 type = REP_UPDATE_REQ; 1846 repflags = 0; 1847 } else { 1848 DB_ASSERT(env, F_ISSET(rep, REP_F_RECOVER_VERIFY)); 1849 type = REP_VERIFY_REQ; 1850 repflags = DB_REP_ANYWHERE; 1851 } 1852 (void)__rep_send_message(env, master, type, &lsn, NULL, 0, repflags); 1853 1854out: ENV_LEAVE(env, ip); 1855 return (ret); 1856} 1857 1858/* 1859 * __rep_conv_vers -- 1860 * Convert from a log version to the replication message version 1861 * that release used. 1862 */ 1863static u_int32_t 1864__rep_conv_vers(env, log_ver) 1865 ENV *env; 1866 u_int32_t log_ver; 1867{ 1868 COMPQUIET(env, NULL); 1869 1870 /* 1871 * We can't use a switch statement, some of the DB_LOGVERSION_XX 1872 * constants are the same 1873 */ 1874 if (log_ver == DB_LOGVERSION_44) 1875 return (DB_REPVERSION_44); 1876 if (log_ver == DB_LOGVERSION_45) 1877 return (DB_REPVERSION_45); 1878 if (log_ver == DB_LOGVERSION_46) 1879 return (DB_REPVERSION_46); 1880 if (log_ver == DB_LOGVERSION_47) 1881 return (DB_REPVERSION_47); 1882 return (DB_REPVERSION_INVALID); 1883} 1884