1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/db_am.h" 14#include "dbinc/lock.h" 15#include "dbinc/log.h" 16#include "dbinc/mp.h" 17#include "dbinc/txn.h" 18 19static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *)); 20static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *)); 21static int __rep_fire_newmaster __P((ENV *, u_int32_t, int)); 22static int __rep_fire_startupdone __P((ENV *, u_int32_t, int)); 23static int __rep_getnext __P((ENV *, DB_THREAD_INFO *)); 24static int __rep_lsn_cmp __P((const void *, const void *)); 25static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *)); 26static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *, 27 DBT *, db_timespec *, DB_LSN *)); 28static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *)); 29static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t)); 30 31/* Used to consistently designate which messages ought to be received where. */ 32 33#define MASTER_ONLY(rep, rp) do { \ 34 if (!F_ISSET(rep, REP_F_MASTER)) { \ 35 RPRINT(env, DB_VERB_REP_MSGS, \ 36 (env, "Master record received on client")); \ 37 REP_PRINT_MESSAGE(env, \ 38 eid, rp, "rep_process_message", 0); \ 39 /* Just skip/ignore it. */ \ 40 ret = 0; \ 41 goto errlock; \ 42 } \ 43} while (0) 44 45#define CLIENT_ONLY(rep, rp) do { \ 46 if (!F_ISSET(rep, REP_F_CLIENT)) { \ 47 RPRINT(env, DB_VERB_REP_MSGS, \ 48 (env, "Client record received on master")); \ 49 /* \ 50 * Only broadcast DUPMASTER if leases are not \ 51 * in effect. If I am an old master, using \ 52 * leases and I get a newer message, my leases \ 53 * had better all be expired. \ 54 */ \ 55 if (IS_USING_LEASES(env)) \ 56 DB_ASSERT(env, \ 57 __rep_lease_check(env, 0) == \ 58 DB_REP_LEASE_EXPIRED); \ 59 else { \ 60 REP_PRINT_MESSAGE(env, \ 61 eid, rp, "rep_process_message", 0); \ 62 (void)__rep_send_message(env, DB_EID_BROADCAST, \ 63 REP_DUPMASTER, NULL, NULL, 0, 0); \ 64 } \ 65 ret = DB_REP_DUPMASTER; \ 66 goto errlock; \ 67 } \ 68} while (0) 69 70/* 71 * If a client is attempting to service a request it does not have, 72 * call rep_skip_msg to skip this message and force a rerequest to the 73 * sender. We don't hold the mutex for the stats and may miscount. 74 */ 75#define CLIENT_REREQ do { \ 76 if (F_ISSET(rep, REP_F_CLIENT)) { \ 77 STAT(rep->stat.st_client_svc_req++); \ 78 if (ret == DB_NOTFOUND) { \ 79 STAT(rep->stat.st_client_svc_miss++); \ 80 ret = __rep_skip_msg(env, rep, eid, rp->rectype);\ 81 } \ 82 } \ 83} while (0) 84 85#define MASTER_UPDATE(env, renv) do { \ 86 REP_SYSTEM_LOCK(env); \ 87 F_SET((renv), DB_REGENV_REPLOCKED); \ 88 (void)time(&(renv)->op_timestamp); \ 89 REP_SYSTEM_UNLOCK(env); \ 90} while (0) 91 92#define RECOVERING_SKIP do { \ 93 if (IS_REP_CLIENT(env) && recovering) { \ 94 /* Not holding region mutex, may miscount */ \ 95 STAT(rep->stat.st_msgs_recover++); \ 96 ret = __rep_skip_msg(env, rep, eid, rp->rectype); \ 97 goto errlock; \ 98 } \ 99} while (0) 100 101/* 102 * If we're recovering the log we only want log records that are in the 103 * range we need to recover. Otherwise we can end up storing a huge 104 * number of "new" records, only to truncate the temp database later after 105 * we run recovery. If we are actively delaying a sync-up, we also skip 106 * all incoming log records until the application requests sync-up. 107 */ 108#define RECOVERING_LOG_SKIP do { \ 109 if (F_ISSET(rep, REP_F_DELAY) || \ 110 rep->master_id == DB_EID_INVALID || \ 111 (recovering && \ 112 (!F_ISSET(rep, REP_F_RECOVER_LOG) || \ 113 LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0))) { \ 114 /* Not holding region mutex, may miscount */ \ 115 STAT(rep->stat.st_msgs_recover++); \ 116 ret = __rep_skip_msg(env, rep, eid, rp->rectype); \ 117 goto errlock; \ 118 } \ 119} while (0) 120 121#define ANYSITE(rep) 122 123/* 124 * __rep_process_message_pp -- 125 * 126 * This routine takes an incoming message and processes it. 127 * 128 * control: contains the control fields from the record 129 * rec: contains the actual record 130 * eid: the environment id of the sender of the message; 131 * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the 132 * lsn of the maximum permanent or current not permanent log record 133 * (respectively). 134 * 135 * PUBLIC: int __rep_process_message_pp 136 * PUBLIC: __P((DB_ENV *, DBT *, DBT *, int, DB_LSN *)); 137 */ 138int 139__rep_process_message_pp(dbenv, control, rec, eid, ret_lsnp) 140 DB_ENV *dbenv; 141 DBT *control, *rec; 142 int eid; 143 DB_LSN *ret_lsnp; 144{ 145 ENV *env; 146 int ret; 147 148 env = dbenv->env; 149 ret = 0; 150 151 ENV_REQUIRES_CONFIG_XX( 152 env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP); 153 154 if (APP_IS_REPMGR(env)) { 155 __db_errx(env, "%s %s", "DB_ENV->rep_process_message:", 156 "cannot call from Replication Manager application"); 157 return (EINVAL); 158 } 159 160 /* Control argument must be non-Null. */ 161 if (control == NULL || control->size == 0) { 162 __db_errx(env, 163 "DB_ENV->rep_process_message: control argument must be specified"); 164 return (EINVAL); 165 } 166 167 /* 168 * Make sure site is a master or a client, which implies that 169 * replication has been started. 170 */ 171 if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) { 172 __db_errx(env, 173 "Environment not configured as replication master or client"); 174 return (EINVAL); 175 } 176 177 if ((ret = __dbt_usercopy(env, control)) != 0 || 178 (ret = __dbt_usercopy(env, rec)) != 0) { 179 __dbt_userfree(env, control, rec, NULL); 180 __db_errx(env, 181 "DB_ENV->rep_process_message: error retrieving DBT contents"); 182 return ret; 183 } 184 185 ret = __rep_process_message_int(env, control, rec, eid, ret_lsnp); 186 187 return (ret); 188} 189 190/* 191 * __rep_process_message_int -- 192 * 193 * This routine performs the internal steps to process an incoming message. 194 * 195 * PUBLIC: int __rep_process_message_int 196 * PUBLIC: __P((ENV *, DBT *, DBT *, int, DB_LSN *)); 197 */ 198int 199__rep_process_message_int(env, control, rec, eid, ret_lsnp) 200 ENV *env; 201 DBT *control, *rec; 202 int eid; 203 DB_LSN *ret_lsnp; 204{ 205 DBT data_dbt; 206 DB_LOG *dblp; 207 DB_LSN last_lsn, lsn; 208 DB_REP *db_rep; 209 DB_THREAD_INFO *ip; 210 LOG *lp; 211 REGENV *renv; 212 REGINFO *infop; 213 REP *rep; 214 REP_46_CONTROL *rp46; 215 REP_OLD_CONTROL *orp; 216 __rep_control_args *rp, tmprp; 217 __rep_egen_args egen_arg; 218 size_t len; 219 u_int32_t gen, rep_version; 220 int cmp, do_sync, lockout, recovering, ret, t_ret; 221 time_t savetime; 222 u_int8_t buf[__REP_MAXMSG_SIZE]; 223 224 ret = 0; 225 do_sync = 0; 226 lockout = 0; 227 db_rep = env->rep_handle; 228 rep = db_rep->region; 229 dblp = env->lg_handle; 230 lp = dblp->reginfo.primary; 231 infop = env->reginfo; 232 renv = infop->primary; 233 /* 234 * Casting this to REP_OLD_CONTROL is just kind of stylistic: the 235 * rep_version field of course has to be in the same offset in all 236 * versions in order for this to work. 237 * 238 * We can look at the rep_version unswapped here because if we're 239 * talking to an old version, it will always be unswapped. If 240 * we're talking to a new version, the only issue is if it is 241 * swapped and we take one of the old version conditionals 242 * incorrectly. The rep_version would need to be very, very 243 * large for a swapped version to look like a small, older 244 * version. There is no problem here looking at it unswapped. 245 */ 246 rep_version = ((REP_OLD_CONTROL *)control->data)->rep_version; 247 if (rep_version <= DB_REPVERSION_45) { 248 orp = (REP_OLD_CONTROL *)control->data; 249 if (rep_version == DB_REPVERSION_45 && 250 F_ISSET(orp, REPCTL_INIT_45)) { 251 F_CLR(orp, REPCTL_INIT_45); 252 F_SET(orp, REPCTL_INIT); 253 } 254 tmprp.rep_version = orp->rep_version; 255 tmprp.log_version = orp->log_version; 256 tmprp.lsn = orp->lsn; 257 tmprp.rectype = orp->rectype; 258 tmprp.gen = orp->gen; 259 tmprp.flags = orp->flags; 260 tmprp.msg_sec = 0; 261 tmprp.msg_nsec = 0; 262 } else if (rep_version == DB_REPVERSION_46) { 263 rp46 = (REP_46_CONTROL *)control->data; 264 tmprp.rep_version = rp46->rep_version; 265 tmprp.log_version = rp46->log_version; 266 tmprp.lsn = rp46->lsn; 267 tmprp.rectype = rp46->rectype; 268 tmprp.gen = rp46->gen; 269 tmprp.flags = rp46->flags; 270 tmprp.msg_sec = (u_int32_t)rp46->msg_time.tv_sec; 271 tmprp.msg_nsec = (u_int32_t)rp46->msg_time.tv_nsec; 272 } else 273 if ((ret = __rep_control_unmarshal(env, &tmprp, 274 control->data, control->size, NULL)) != 0) 275 return (ret); 276 rp = &tmprp; 277 if (ret_lsnp != NULL) 278 ZERO_LSN(*ret_lsnp); 279 280 ENV_ENTER(env, ip); 281 282 REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0); 283 /* 284 * Check the version number for both rep and log. If it is 285 * an old version we support, convert it. Otherwise complain. 286 */ 287 if (rp->rep_version < DB_REPVERSION) { 288 if (rp->rep_version < DB_REPVERSION_MIN) { 289 __db_errx(env, 290 "unsupported old replication message version %lu, minimum version %d", 291 (u_long)rp->rep_version, DB_REPVERSION_MIN); 292 ret = EINVAL; 293 goto errlock; 294 } 295 RPRINT(env, DB_VERB_REP_MSGS, (env, 296 "Received record %lu with old rep version %lu", 297 (u_long)rp->rectype, (u_long)rp->rep_version)); 298 rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype); 299 DB_ASSERT(env, rp->rectype != REP_INVALID); 300 /* 301 * We should have a valid new record type for all the old 302 * versions. 303 */ 304 RPRINT(env, DB_VERB_REP_MSGS, (env, 305 "Converted to record %lu with old rep version %lu", 306 (u_long)rp->rectype, (u_long)rp->rep_version)); 307 } else if (rp->rep_version > DB_REPVERSION) { 308 __db_errx(env, 309 "unexpected replication message version %lu, expected %d", 310 (u_long)rp->rep_version, DB_REPVERSION); 311 ret = EINVAL; 312 goto errlock; 313 } 314 315 if (rp->log_version < DB_LOGVERSION) { 316 if (rp->log_version < DB_LOGVERSION_MIN) { 317 __db_errx(env, 318 "unsupported old replication log version %lu, minimum version %d", 319 (u_long)rp->log_version, DB_LOGVERSION_MIN); 320 ret = EINVAL; 321 goto errlock; 322 } 323 RPRINT(env, DB_VERB_REP_MSGS, (env, 324 "Received record %lu with old log version %lu", 325 (u_long)rp->rectype, (u_long)rp->log_version)); 326 } else if (rp->log_version > DB_LOGVERSION) { 327 __db_errx(env, 328 "unexpected log record version %lu, expected %d", 329 (u_long)rp->log_version, DB_LOGVERSION); 330 ret = EINVAL; 331 goto errlock; 332 } 333 334 /* 335 * Acquire the replication lock. 336 */ 337 REP_SYSTEM_LOCK(env); 338 if (F_ISSET(rep, REP_F_READY_MSG)) { 339 /* 340 * If we're racing with a thread in rep_start, then 341 * just ignore the message and return. 342 */ 343 RPRINT(env, DB_VERB_REP_MSGS, (env, 344 "Racing replication msg lockout, ignore message.")); 345 if (F_ISSET(rp, REPCTL_PERM)) 346 ret = DB_REP_IGNORE; 347 REP_SYSTEM_UNLOCK(env); 348 /* 349 * If another client has sent a c2c request to us, it may be a 350 * long time before it resends the request (due to its dual data 351 * streams avoidance heuristic); let it know we can't serve the 352 * request just now. 353 */ 354 if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) { 355 STAT(rep->stat.st_client_svc_req++); 356 STAT(rep->stat.st_client_svc_miss++); 357 (void)__rep_send_message(env, 358 eid, REP_REREQUEST, NULL, NULL, 0, 0); 359 } 360 goto out; 361 } 362 rep->msg_th++; 363 gen = rep->gen; 364 recovering = F_ISSET(rep, REP_F_RECOVER_MASK); 365 savetime = renv->rep_timestamp; 366 367 STAT(rep->stat.st_msgs_processed++); 368 REP_SYSTEM_UNLOCK(env); 369 370 /* 371 * Check for lease configuration matching. Leases must be 372 * configured all or none. If I am a client and I receive a 373 * message requesting a lease, and I'm not using leases, that 374 * is an error. 375 */ 376 if (!IS_USING_LEASES(env) && 377 (F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) { 378 __db_errx(env, 379 "Inconsistent lease configuration"); 380 RPRINT(env, DB_VERB_REP_MSGS, (env, 381 "Client received lease message and not using leases")); 382 ret = EINVAL; 383 ret = __env_panic(env, ret); 384 goto errlock; 385 } 386 387 /* 388 * Check for generation number matching. Ignore any old messages 389 * except requests that are indicative of a new client that needs 390 * to get in sync. 391 */ 392 if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ && 393 rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ && 394 rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) { 395 /* 396 * We don't hold the rep mutex, and could miscount if we race. 397 */ 398 STAT(rep->stat.st_msgs_badgen++); 399 if (F_ISSET(rp, REPCTL_PERM)) 400 ret = DB_REP_IGNORE; 401 goto errlock; 402 } 403 404 if (rp->gen > gen) { 405 /* 406 * If I am a master and am out of date with a lower generation 407 * number, I am in bad shape and should downgrade. 408 */ 409 if (F_ISSET(rep, REP_F_MASTER)) { 410 STAT(rep->stat.st_dupmasters++); 411 ret = DB_REP_DUPMASTER; 412 /* 413 * Only broadcast DUPMASTER if leases are not 414 * in effect. If I am an old master, using 415 * leases and I get a newer message, my leases 416 * had better all be expired. 417 */ 418 if (IS_USING_LEASES(env)) 419 DB_ASSERT(env, 420 __rep_lease_check(env, 0) == 421 DB_REP_LEASE_EXPIRED); 422 else if (rp->rectype != REP_DUPMASTER) 423 (void)__rep_send_message(env, 424 DB_EID_BROADCAST, REP_DUPMASTER, 425 NULL, NULL, 0, 0); 426 goto errlock; 427 } 428 429 /* 430 * I am a client and am out of date. If this is an election, 431 * or a response from the first site I contacted, then I can 432 * accept the generation number and participate in future 433 * elections and communication. Otherwise, I need to hear about 434 * a new master and sync up. 435 */ 436 if (rp->rectype == REP_ALIVE || 437 rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) { 438 REP_SYSTEM_LOCK(env); 439 RPRINT(env, DB_VERB_REP_MSGS, (env, 440 "Updating gen from %lu to %lu", 441 (u_long)gen, (u_long)rp->gen)); 442 rep->master_id = DB_EID_INVALID; 443 gen = rep->gen = rp->gen; 444 /* 445 * Updating of egen will happen when we process the 446 * message below for each message type. 447 */ 448 REP_SYSTEM_UNLOCK(env); 449 if (rp->rectype == REP_ALIVE) 450 (void)__rep_send_message(env, 451 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, 452 NULL, 0, 0); 453 } else if (rp->rectype != REP_NEWMASTER) { 454 /* 455 * Ignore this message, retransmit if needed. 456 */ 457 if (__rep_check_doreq(env, rep)) 458 (void)__rep_send_message(env, 459 DB_EID_BROADCAST, REP_MASTER_REQ, 460 NULL, NULL, 0, 0); 461 goto errlock; 462 } 463 /* 464 * If you get here, then you're a client and either you're 465 * in an election or you have a NEWMASTER or an ALIVE message 466 * whose processing will do the right thing below. 467 */ 468 } 469 470 /* 471 * If the sender is part of an established group, so are we now. 472 */ 473 if (F_ISSET(rp, REPCTL_GROUP_ESTD)) { 474 REP_SYSTEM_LOCK(env); 475#ifdef DIAGNOSTIC 476 if (!F_ISSET(rep, REP_F_GROUP_ESTD)) 477 RPRINT(env, DB_VERB_REP_MSGS, (env, 478 "I am now part of an established group")); 479#endif 480 F_SET(rep, REP_F_GROUP_ESTD); 481 REP_SYSTEM_UNLOCK(env); 482 } 483 484 /* 485 * We need to check if we're in recovery and if we are 486 * then we need to ignore any messages except VERIFY*, VOTE*, 487 * NEW* and ALIVE_REQ, or backup related messages: UPDATE*, 488 * PAGE* and FILE*. We need to also accept LOG messages 489 * if we're copying the log for recovery/backup. 490 */ 491 switch (rp->rectype) { 492 case REP_ALIVE: 493 /* 494 * Handle even if we're recovering. 495 */ 496 ANYSITE(rep); 497 if (rp->rep_version < DB_REPVERSION_47) 498 egen_arg.egen = *(u_int32_t *)rec->data; 499 else if ((ret = __rep_egen_unmarshal(env, &egen_arg, 500 rec->data, rec->size, NULL)) != 0) 501 return (ret); 502 REP_SYSTEM_LOCK(env); 503 RPRINT(env, DB_VERB_REP_MSGS, (env, 504 "Received ALIVE egen of %lu, mine %lu", 505 (u_long)egen_arg.egen, (u_long)rep->egen)); 506 if (egen_arg.egen > rep->egen) { 507 /* 508 * We're changing egen, need to clear out any old 509 * election information. We need to set the 510 * REP_F_EGENUPDATE flag here so that any thread 511 * waiting in rep_elect/rep_wait can distinguish 512 * this situation (and restart its election) from 513 * a current master saying it is still master and 514 * the egen getting incremented on that path. 515 */ 516 __rep_elect_done(env, rep, 0); 517 rep->egen = egen_arg.egen; 518 F_SET(rep, REP_F_EGENUPDATE); 519 } 520 REP_SYSTEM_UNLOCK(env); 521 break; 522 case REP_ALIVE_REQ: 523 /* 524 * Handle even if we're recovering. 525 */ 526 ANYSITE(rep); 527 LOG_SYSTEM_LOCK(env); 528 lsn = lp->lsn; 529 LOG_SYSTEM_UNLOCK(env); 530#ifdef CONFIG_TEST 531 /* 532 * Send this first, before the ALIVE message because of the 533 * way the test suite and messaging is done sequentially. 534 * In some sequences it is possible to get into a situation 535 * where the test suite cannot get the later NEWMASTER because 536 * we break out of the messaging loop too early. 537 */ 538 if (F_ISSET(rep, REP_F_MASTER)) 539 (void)__rep_send_message(env, 540 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0); 541#endif 542 REP_SYSTEM_LOCK(env); 543 egen_arg.egen = rep->egen; 544 REP_SYSTEM_UNLOCK(env); 545 if (rep->version < DB_REPVERSION_47) 546 DB_INIT_DBT(data_dbt, &egen_arg.egen, 547 sizeof(egen_arg.egen)); 548 else { 549 if ((ret = __rep_egen_marshal(env, 550 &egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0) 551 goto errlock; 552 DB_INIT_DBT(data_dbt, buf, len); 553 } 554 (void)__rep_send_message(env, 555 eid, REP_ALIVE, &lsn, &data_dbt, 0, 0); 556 break; 557 case REP_ALL_REQ: 558 RECOVERING_SKIP; 559 ret = __rep_allreq(env, rp, eid); 560 CLIENT_REREQ; 561 break; 562 case REP_BULK_LOG: 563 RECOVERING_LOG_SKIP; 564 CLIENT_ONLY(rep, rp); 565 ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp); 566 break; 567 case REP_BULK_PAGE: 568 /* 569 * Handle even if we're recovering. 570 */ 571 CLIENT_ONLY(rep, rp); 572 ret = __rep_bulk_page(env, ip, eid, rp, rec); 573 break; 574 case REP_DUPMASTER: 575 /* 576 * Handle even if we're recovering. 577 */ 578 if (F_ISSET(rep, REP_F_MASTER)) 579 ret = DB_REP_DUPMASTER; 580 break; 581#ifdef NOTYET 582 case REP_FILE: /* TODO */ 583 CLIENT_ONLY(rep, rp); 584 break; 585 case REP_FILE_REQ: 586 ret = __rep_send_file(env, rec, eid); 587 break; 588#endif 589 case REP_FILE_FAIL: 590 /* 591 * Handle even if we're recovering. 592 */ 593 CLIENT_ONLY(rep, rp); 594 /* 595 * Clean up any internal init that was in progress. 596 */ 597 if (eid == rep->master_id) { 598 REP_SYSTEM_LOCK(env); 599 /* 600 * If we're already locking out messages, give up. 601 */ 602 if (F_ISSET(rep, REP_F_READY_MSG)) 603 goto errhlk; 604 /* 605 * Lock out other messages to prevent race 606 * conditions. 607 */ 608 if ((ret = 609 __rep_lockout_msg(env, rep, 1)) != 0) { 610 goto errhlk; 611 } 612 lockout = 1; 613 /* 614 * Need mtx_clientdb to safely clean up 615 * page database in __rep_init_cleanup(). 616 */ 617 REP_SYSTEM_UNLOCK(env); 618 MUTEX_LOCK(env, rep->mtx_clientdb); 619 REP_SYSTEM_LOCK(env); 620 /* 621 * Clean up internal init if one was in progress. 622 */ 623 if (F_ISSET(rep, REP_F_READY_API | REP_F_READY_OP)) { 624 RPRINT(env, DB_VERB_REP_MSGS, (env, 625 "FILE_FAIL is cleaning up old internal init")); 626#ifdef CONFIG_TEST 627 STAT(rep->stat.st_filefail_cleanups++); 628#endif 629 ret = __rep_init_cleanup(env, rep, DB_FORCE); 630 F_CLR(rep, 631 REP_F_ABBREVIATED | REP_F_RECOVER_MASK); 632 } 633 MUTEX_UNLOCK(env, rep->mtx_clientdb); 634 if (ret != 0) { 635 RPRINT(env, DB_VERB_REP_MSGS, (env, 636 "FILE_FAIL error cleaning up internal init: %d", ret)); 637 goto errhlk; 638 } 639 F_CLR(rep, REP_F_READY_MSG); 640 lockout = 0; 641 /* 642 * Restart internal init, setting UPDATE flag and 643 * zeroing applicable LSNs. 644 */ 645 F_SET(rep, REP_F_RECOVER_UPDATE); 646 ZERO_LSN(rep->first_lsn); 647 ZERO_LSN(rep->ckp_lsn); 648 REP_SYSTEM_UNLOCK(env); 649 (void)__rep_send_message(env, eid, REP_UPDATE_REQ, 650 NULL, NULL, 0, 0); 651 } 652 break; 653 case REP_LEASE_GRANT: 654 /* 655 * Handle even if we're recovering. 656 */ 657 MASTER_ONLY(rep, rp); 658 ret = __rep_lease_grant(env, rp, rec, eid); 659 break; 660 case REP_LOG: 661 case REP_LOG_MORE: 662 RECOVERING_LOG_SKIP; 663 CLIENT_ONLY(rep, rp); 664 ret = __rep_log(env, ip, rp, rec, savetime, ret_lsnp); 665 break; 666 case REP_LOG_REQ: 667 RECOVERING_SKIP; 668 if (F_ISSET(rp, REPCTL_INIT)) 669 MASTER_UPDATE(env, renv); 670 ret = __rep_logreq(env, rp, rec, eid); 671 CLIENT_REREQ; 672 break; 673 case REP_NEWSITE: 674 /* 675 * Handle even if we're recovering. 676 */ 677 /* We don't hold the rep mutex, and may miscount. */ 678 STAT(rep->stat.st_newsites++); 679 680 /* This is a rebroadcast; simply tell the application. */ 681 if (F_ISSET(rep, REP_F_MASTER)) { 682 dblp = env->lg_handle; 683 lp = dblp->reginfo.primary; 684 LOG_SYSTEM_LOCK(env); 685 lsn = lp->lsn; 686 LOG_SYSTEM_UNLOCK(env); 687 (void)__rep_send_message(env, 688 eid, REP_NEWMASTER, &lsn, NULL, 0, 0); 689 } 690 ret = DB_REP_NEWSITE; 691 break; 692 case REP_NEWCLIENT: 693 /* 694 * Handle even if we're recovering. 695 */ 696 /* 697 * This message was received and should have resulted in the 698 * application entering the machine ID in its machine table. 699 * We respond to this with an ALIVE to send relevant information 700 * to the new client (if we are a master, we'll send a 701 * NEWMASTER, so we only need to send the ALIVE if we're a 702 * client). But first, broadcast the new client's record to 703 * all the clients. 704 */ 705 (void)__rep_send_message(env, 706 DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0); 707 708 ret = DB_REP_NEWSITE; 709 710 if (F_ISSET(rep, REP_F_CLIENT)) { 711 REP_SYSTEM_LOCK(env); 712 egen_arg.egen = rep->egen; 713 714 /* 715 * Clean up any previous master remnants by making 716 * master_id invalid and cleaning up any internal 717 * init that was in progress. 718 */ 719 if (eid == rep->master_id) { 720 rep->master_id = DB_EID_INVALID; 721 722 /* 723 * Already locking out messages, must be 724 * in sync-up recover or internal init, 725 * give up. 726 */ 727 if (F_ISSET(rep, REP_F_READY_MSG)) 728 goto errhlk; 729 730 /* 731 * Lock out other messages to prevent race 732 * conditions. 733 */ 734 if ((t_ret = 735 __rep_lockout_msg(env, rep, 1)) != 0) { 736 ret = t_ret; 737 goto errhlk; 738 } 739 lockout = 1; 740 741 /* 742 * Need mtx_clientdb to safely clean up 743 * page database in __rep_init_cleanup(). 744 */ 745 REP_SYSTEM_UNLOCK(env); 746 MUTEX_LOCK(env, rep->mtx_clientdb); 747 REP_SYSTEM_LOCK(env); 748 749 /* 750 * Clean up internal init if one was in 751 * progress. 752 */ 753 if (F_ISSET(rep, REP_F_READY_API | 754 REP_F_READY_OP)) { 755 RPRINT(env, DB_VERB_REP_MSGS, (env, 756 "NEWCLIENT is cleaning up old internal init for invalid master")); 757 t_ret = __rep_init_cleanup(env, 758 rep, DB_FORCE); 759 F_CLR(rep, REP_F_ABBREVIATED | 760 REP_F_RECOVER_MASK); 761 } 762 MUTEX_UNLOCK(env, rep->mtx_clientdb); 763 if (t_ret != 0) { 764 ret = t_ret; 765 RPRINT(env, DB_VERB_REP_MSGS, (env, 766 "NEWCLIENT error cleaning up internal init for invalid master: %d", ret)); 767 goto errhlk; 768 } 769 F_CLR(rep, REP_F_READY_MSG); 770 lockout = 0; 771 } 772 REP_SYSTEM_UNLOCK(env); 773 if (rep->version < DB_REPVERSION_47) 774 DB_INIT_DBT(data_dbt, &egen_arg.egen, 775 sizeof(egen_arg.egen)); 776 else { 777 if ((ret = __rep_egen_marshal(env, &egen_arg, 778 buf, __REP_EGEN_SIZE, &len)) != 0) 779 goto errlock; 780 DB_INIT_DBT(data_dbt, buf, len); 781 } 782 (void)__rep_send_message(env, DB_EID_BROADCAST, 783 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0); 784 break; 785 } 786 /* FALLTHROUGH */ 787 case REP_MASTER_REQ: 788 RECOVERING_SKIP; 789 if (F_ISSET(rep, REP_F_MASTER)) { 790 LOG_SYSTEM_LOCK(env); 791 lsn = lp->lsn; 792 LOG_SYSTEM_UNLOCK(env); 793 (void)__rep_send_message(env, 794 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0); 795 if (IS_USING_LEASES(env)) 796 (void)__rep_lease_refresh(env); 797 } 798 /* 799 * If there is no master, then we could get into a state 800 * where an old client lost the initial ALIVE message and 801 * is calling an election under an old gen and can 802 * never get to the current gen. 803 */ 804 if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) { 805 REP_SYSTEM_LOCK(env); 806 egen_arg.egen = rep->egen; 807 if (eid == rep->master_id) 808 rep->master_id = DB_EID_INVALID; 809 REP_SYSTEM_UNLOCK(env); 810 if (rep->version < DB_REPVERSION_47) 811 DB_INIT_DBT(data_dbt, &egen_arg.egen, 812 sizeof(egen_arg.egen)); 813 else { 814 if ((ret = __rep_egen_marshal(env, &egen_arg, 815 buf, __REP_EGEN_SIZE, &len)) != 0) 816 goto errlock; 817 DB_INIT_DBT(data_dbt, buf, len); 818 } 819 (void)__rep_send_message(env, eid, 820 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0); 821 } 822 break; 823 case REP_NEWFILE: 824 RECOVERING_LOG_SKIP; 825 CLIENT_ONLY(rep, rp); 826 ret = __rep_apply(env, 827 ip, rp, rec, ret_lsnp, NULL, &last_lsn); 828 if (ret == DB_REP_LOGREADY) 829 ret = __rep_logready(env, rep, savetime, &last_lsn); 830 break; 831 case REP_NEWMASTER: 832 /* 833 * Handle even if we're recovering. 834 */ 835 ANYSITE(rep); 836 if (F_ISSET(rep, REP_F_MASTER) && 837 eid != rep->eid) { 838 /* We don't hold the rep mutex, and may miscount. */ 839 STAT(rep->stat.st_dupmasters++); 840 ret = DB_REP_DUPMASTER; 841 if (IS_USING_LEASES(env)) 842 DB_ASSERT(env, 843 __rep_lease_check(env, 0) == 844 DB_REP_LEASE_EXPIRED); 845 else 846 (void)__rep_send_message(env, 847 DB_EID_BROADCAST, REP_DUPMASTER, 848 NULL, NULL, 0, 0); 849 break; 850 } 851 if ((ret = 852 __rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER) 853 ret = __rep_fire_newmaster(env, rp->gen, eid); 854 break; 855 case REP_PAGE: 856 case REP_PAGE_MORE: 857 /* 858 * Handle even if we're recovering. 859 */ 860 CLIENT_ONLY(rep, rp); 861 ret = __rep_page(env, ip, eid, rp, rec); 862 if (ret == DB_REP_PAGEDONE) 863 ret = 0; 864 break; 865 case REP_PAGE_FAIL: 866 /* 867 * Handle even if we're recovering. 868 */ 869 CLIENT_ONLY(rep, rp); 870 ret = __rep_page_fail(env, ip, eid, rp, rec); 871 break; 872 case REP_PAGE_REQ: 873 RECOVERING_SKIP; 874 MASTER_UPDATE(env, renv); 875 ret = __rep_page_req(env, ip, eid, rp, rec); 876 CLIENT_REREQ; 877 break; 878 case REP_REREQUEST: 879 /* 880 * Handle even if we're recovering. Don't do a master 881 * check. 882 */ 883 CLIENT_ONLY(rep, rp); 884 /* 885 * Don't hold any mutex, may miscount. 886 */ 887 STAT(rep->stat.st_client_rerequests++); 888 ret = __rep_resend_req(env, 1); 889 break; 890 case REP_START_SYNC: 891 RECOVERING_SKIP; 892 MUTEX_LOCK(env, rep->mtx_clientdb); 893 cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn); 894 /* 895 * The comparison needs to be <= because the LSN in 896 * the message can be the LSN of the first outstanding 897 * txn, which may be the LSN immediately after the 898 * previous commit. The ready_lsn is the LSN of the 899 * next record expected. In that case, the LSNs 900 * could be equal and the client has the commit and 901 * wants to sync. [SR #15338] 902 */ 903 if (cmp <= 0) { 904 MUTEX_UNLOCK(env, rep->mtx_clientdb); 905 do_sync = 1; 906 } else { 907 STAT(rep->stat.st_startsync_delayed++); 908 /* 909 * There are cases where keeping the first ckp_lsn 910 * LSN is advantageous and cases where keeping 911 * a later LSN is better. If random, earlier 912 * log records are missing, keeping the later 913 * LSN seems to be better. That is what we'll 914 * do for now. 915 */ 916 if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0) 917 rep->ckp_lsn = rp->lsn; 918 RPRINT(env, DB_VERB_REP_MSGS, (env, 919 "Delayed START_SYNC memp_sync due to missing records.")); 920 RPRINT(env, DB_VERB_REP_MSGS, (env, 921 "ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]", 922 (u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset, 923 (u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset)); 924 MUTEX_UNLOCK(env, rep->mtx_clientdb); 925 } 926 break; 927 case REP_UPDATE: 928 /* 929 * Handle even if we're recovering. 930 */ 931 CLIENT_ONLY(rep, rp); 932 ret = __rep_update_setup(env, eid, rp, rec, savetime); 933 break; 934 case REP_UPDATE_REQ: 935 /* 936 * Handle even if we're recovering. 937 */ 938 MASTER_ONLY(rep, rp); 939 infop = env->reginfo; 940 renv = infop->primary; 941 MASTER_UPDATE(env, renv); 942 ret = __rep_update_req(env, rp, eid); 943 break; 944 case REP_VERIFY: 945 if (recovering) { 946 MUTEX_LOCK(env, rep->mtx_clientdb); 947 cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn); 948 MUTEX_UNLOCK(env, rep->mtx_clientdb); 949 /* 950 * If this is not the verify record I want, skip it. 951 */ 952 if (cmp != 0) { 953 ret = __rep_skip_msg( 954 env, rep, eid, rp->rectype); 955 break; 956 } 957 } 958 CLIENT_ONLY(rep, rp); 959 ret = __rep_verify(env, rp, rec, eid, savetime); 960 break; 961 case REP_VERIFY_FAIL: 962 /* 963 * Handle even if we're recovering. 964 */ 965 CLIENT_ONLY(rep, rp); 966 ret = __rep_verify_fail(env, rp); 967 break; 968 case REP_VERIFY_REQ: 969 RECOVERING_SKIP; 970 ret = __rep_verify_req(env, rp, eid); 971 CLIENT_REREQ; 972 break; 973 case REP_VOTE1: 974 /* 975 * Handle even if we're recovering. 976 */ 977 ret = __rep_vote1(env, rp, rec, eid); 978 break; 979 case REP_VOTE2: 980 /* 981 * Handle even if we're recovering. 982 */ 983 ret = __rep_vote2(env, rp, rec, eid); 984 break; 985 default: 986 __db_errx(env, 987 "DB_ENV->rep_process_message: unknown replication message: type %lu", 988 (u_long)rp->rectype); 989 ret = EINVAL; 990 break; 991 } 992 993errlock: 994 REP_SYSTEM_LOCK(env); 995errhlk: if (lockout) 996 F_CLR(rep, REP_F_READY_MSG); 997 rep->msg_th--; 998 REP_SYSTEM_UNLOCK(env); 999 if (do_sync) { 1000 MUTEX_LOCK(env, rep->mtx_ckp); 1001 lsn = rp->lsn; 1002 /* 1003 * This is the REP_START_SYNC sync, and so we permit it to be 1004 * interrupted. 1005 */ 1006 ret = __memp_sync( 1007 env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn); 1008 MUTEX_UNLOCK(env, rep->mtx_ckp); 1009 RPRINT(env, DB_VERB_REP_MSGS, 1010 (env, "ALIVE: Completed sync [%lu][%lu]", 1011 (u_long)lsn.file, (u_long)lsn.offset)); 1012 } 1013out: 1014 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) { 1015 if (ret_lsnp != NULL) 1016 *ret_lsnp = rp->lsn; 1017 ret = DB_REP_NOTPERM; 1018 } 1019 __dbt_userfree(env, control, rec, NULL); 1020 ENV_LEAVE(env, ip); 1021 return (ret); 1022} 1023 1024/* 1025 * __rep_apply -- 1026 * 1027 * Handle incoming log records on a client, applying when possible and 1028 * entering into the bookkeeping table otherwise. This routine manages 1029 * the state of the incoming message stream -- processing records, via 1030 * __rep_process_rec, when possible and enqueuing in the __db.rep.db 1031 * when necessary. As gaps in the stream are filled in, this is where 1032 * we try to process as much as possible from __db.rep.db to catch up. 1033 * 1034 * PUBLIC: int __rep_apply __P((ENV *, DB_THREAD_INFO *, __rep_control_args *, 1035 * PUBLIC: DBT *, DB_LSN *, int *, DB_LSN *)); 1036 */ 1037int 1038__rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp) 1039 ENV *env; 1040 DB_THREAD_INFO *ip; 1041 __rep_control_args *rp; 1042 DBT *rec; 1043 DB_LSN *ret_lsnp; 1044 int *is_dupp; 1045 DB_LSN *last_lsnp; 1046{ 1047 DB *dbp; 1048 DBT control_dbt, key_dbt; 1049 DBT rec_dbt; 1050 DB_LOG *dblp; 1051 DB_LSN max_lsn, save_lsn; 1052 DB_REP *db_rep; 1053 LOG *lp; 1054 REP *rep; 1055 db_timespec msg_time, max_ts; 1056 u_int32_t gen, rectype; 1057 int cmp, event, master, newfile_seen, ret, set_apply, t_ret; 1058 1059 COMPQUIET(gen, 0); 1060 COMPQUIET(master, DB_EID_INVALID); 1061 1062 db_rep = env->rep_handle; 1063 rep = db_rep->region; 1064 event = ret = set_apply = 0; 1065 memset(&control_dbt, 0, sizeof(control_dbt)); 1066 memset(&rec_dbt, 0, sizeof(rec_dbt)); 1067 ZERO_LSN(max_lsn); 1068 timespecclear(&max_ts); 1069 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); 1070 cmp = -2; /* OOB value that LOG_COMPARE can't return. */ 1071 1072 dblp = env->lg_handle; 1073 MUTEX_LOCK(env, rep->mtx_clientdb); 1074 /* 1075 * Lazily open the temp db. Always set the startup flag to 0 1076 * because it was initialized from rep_start. 1077 */ 1078 if (db_rep->rep_db == NULL && 1079 (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) { 1080 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1081 goto out; 1082 } 1083 dbp = db_rep->rep_db; 1084 lp = dblp->reginfo.primary; 1085 newfile_seen = 0; 1086 REP_SYSTEM_LOCK(env); 1087 if (F_ISSET(rep, REP_F_RECOVER_LOG) && 1088 LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0) 1089 lp->ready_lsn = rep->first_lsn; 1090 cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn); 1091 /* 1092 * If we are going to skip or process any message other 1093 * than a duplicate, make note of it if we're in an 1094 * election so that the election can rerequest proactively. 1095 */ 1096 if (F_ISSET(rep, REP_F_READY_APPLY) && cmp >= 0) 1097 F_SET(rep, REP_F_SKIPPED_APPLY); 1098 1099 /* 1100 * If we're in the middle of processing a NEWFILE, we've dropped 1101 * the mutex and if this matches it is a duplicate record. We 1102 * do not want this call taking the "matching" code below because 1103 * we may then process later records in the temp db and the 1104 * original NEWFILE may not have the log file ready. It will 1105 * process those temp db items when it completes. 1106 */ 1107 if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0) 1108 cmp = -1; 1109 1110 if (cmp == 0) { 1111 /* 1112 * If we are in an election (i.e. we've sent a vote 1113 * with an LSN in it), then we drop the next record 1114 * we're expecting. When we find a master, we'll 1115 * either go into sync, or if it was an existing 1116 * master, rerequest this one record (later records 1117 * are accumulating in the temp db). 1118 * 1119 * We can simply return here, and rep_process_message 1120 * will set NOTPERM if necessary for this record. 1121 */ 1122 if (F_ISSET(rep, REP_F_READY_APPLY)) { 1123 /* 1124 * We will simply return now. All special return 1125 * processing should be ignored because the special 1126 * values are just initialized. Variables like 1127 * max_lsn are still 0. 1128 */ 1129 RPRINT(env, DB_VERB_REP_MISC, (env, 1130 "rep_apply: In election. Ignoring [%lu][%lu]", 1131 (u_long)rp->lsn.file, (u_long)rp->lsn.offset)); 1132 REP_SYSTEM_UNLOCK(env); 1133 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1134 goto out; 1135 } 1136 rep->apply_th++; 1137 set_apply = 1; 1138 RPRINT(env, DB_VERB_REP_MISC, (env, 1139 "rep_apply: Set apply_th %d", rep->apply_th)); 1140 REP_SYSTEM_UNLOCK(env); 1141 if (rp->rectype == REP_NEWFILE) 1142 newfile_seen = 1; 1143 if ((ret = __rep_process_rec(env, ip, 1144 rp, rec, &max_ts, &max_lsn)) != 0) 1145 goto err; 1146 /* 1147 * If we get the record we are expecting, reset 1148 * the count of records we've received and are applying 1149 * towards the request interval. 1150 */ 1151 __os_gettime(env, &lp->rcvd_ts, 1); 1152 ZERO_LSN(lp->max_wait_lsn); 1153 1154 /* 1155 * The __rep_remfirst() and __rep_getnext() functions each open, 1156 * use and then close a cursor on the temp db, each time through 1157 * the loop. Although this may seem excessive, it is necessary 1158 * to avoid locking problems with checkpoints. 1159 */ 1160 while (ret == 0 && 1161 LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) { 1162 /* 1163 * We just filled in a gap in the log record stream. 1164 * Write subsequent records to the log. 1165 */ 1166gap_check: 1167 if ((ret = __rep_remfirst(env, ip, 1168 &control_dbt, &rec_dbt)) != 0) 1169 goto err; 1170 1171 rp = (__rep_control_args *)control_dbt.data; 1172 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); 1173 rec = &rec_dbt; 1174 if (rp->rectype == REP_NEWFILE) 1175 newfile_seen = 1; 1176 if ((ret = __rep_process_rec(env, ip, 1177 rp, rec, &max_ts, &max_lsn)) != 0) 1178 goto err; 1179 1180 --rep->stat.st_log_queued; 1181 1182 /* 1183 * Since we just filled a gap in the log stream, and 1184 * we're writing subsequent records to the log, we want 1185 * to use rcvd_ts and wait_ts so that we will 1186 * request the next gap if we end up with a gap and 1187 * not so recent records in the temp db, but not 1188 * request if recent records are in the temp db and 1189 * likely to arrive on its own shortly. We want to 1190 * avoid requesting the record in that case. Also 1191 * reset max_wait_lsn because the next gap is a 1192 * fresh gap. 1193 */ 1194 lp->rcvd_ts = lp->last_ts; 1195 lp->wait_ts = rep->request_gap; 1196 if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) { 1197 __os_gettime(env, &lp->rcvd_ts, 1); 1198 ret = 0; 1199 break; 1200 } else if (ret != 0) 1201 goto err; 1202 } 1203 1204 /* 1205 * Check if we're at a gap in the table and if so, whether we 1206 * need to ask for any records. 1207 */ 1208 if (!IS_ZERO_LSN(lp->waiting_lsn) && 1209 LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) { 1210 /* 1211 * We got a record and processed it, but we may 1212 * still be waiting for more records. If we 1213 * filled a gap we keep a count of how many other 1214 * records are in the temp database and if we should 1215 * request the next gap at this time. 1216 */ 1217 if (__rep_check_doreq(env, rep) && (ret = 1218 __rep_loggap_req(env, rep, &rp->lsn, 0)) != 0) 1219 goto err; 1220 } else { 1221 lp->wait_ts = rep->request_gap; 1222 ZERO_LSN(lp->max_wait_lsn); 1223 } 1224 1225 } else if (cmp > 0) { 1226 /* 1227 * The LSN is higher than the one we were waiting for. 1228 * This record isn't in sequence; add it to the temporary 1229 * database, update waiting_lsn if necessary, and perform 1230 * calculations to determine if we should issue requests 1231 * for new records. 1232 */ 1233 REP_SYSTEM_UNLOCK(env); 1234 memset(&key_dbt, 0, sizeof(key_dbt)); 1235 key_dbt.data = rp; 1236 key_dbt.size = sizeof(*rp); 1237 ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE); 1238 if (ret == 0) { 1239 rep->stat.st_log_queued++; 1240 __os_gettime(env, &lp->last_ts, 1); 1241#ifdef HAVE_STATISTICS 1242 STAT(rep->stat.st_log_queued_total++); 1243 if (rep->stat.st_log_queued_max < 1244 rep->stat.st_log_queued) 1245 rep->stat.st_log_queued_max = 1246 rep->stat.st_log_queued; 1247#endif 1248 } 1249 1250 if (ret == DB_KEYEXIST) 1251 ret = 0; 1252 if (ret != 0) 1253 goto done; 1254 1255 if (IS_ZERO_LSN(lp->waiting_lsn) || 1256 LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0) { 1257 /* 1258 * If this is a new gap, then reset the rcvd_ts so 1259 * that an out-of-order record after an idle period 1260 * does not (likely) immediately rerequest. 1261 */ 1262 if (IS_ZERO_LSN(lp->waiting_lsn)) 1263 __os_gettime(env, &lp->rcvd_ts, 1); 1264 lp->waiting_lsn = rp->lsn; 1265 } 1266 1267 if (__rep_check_doreq(env, rep) && 1268 (ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0)) 1269 goto err; 1270 1271 /* 1272 * If this is permanent; let the caller know that we have 1273 * not yet written it to disk, but we've accepted it. 1274 */ 1275 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) { 1276 max_lsn = rp->lsn; 1277 ret = DB_REP_NOTPERM; 1278 } 1279 goto done; 1280 } else { 1281 STAT(rep->stat.st_log_duplicated++); 1282 REP_SYSTEM_UNLOCK(env); 1283 if (is_dupp != NULL) 1284 *is_dupp = 1; 1285 LOGCOPY_32(env, &rectype, rec->data); 1286 if (rectype == DB___txn_regop || rectype == DB___txn_ckp) 1287 max_lsn = lp->max_perm_lsn; 1288 /* 1289 * We check REPCTL_LEASE here, because this client may 1290 * have leases configured but the master may not (especially 1291 * in a mixed version group. If the master has leases 1292 * configured, all clients must also. 1293 */ 1294 if (IS_USING_LEASES(env) && 1295 F_ISSET(rp, REPCTL_LEASE) && 1296 timespecisset(&msg_time)) { 1297 if (timespeccmp(&msg_time, &lp->max_lease_ts, >)) 1298 max_ts = msg_time; 1299 else 1300 max_ts = lp->max_lease_ts; 1301 } 1302 goto done; 1303 } 1304 1305 /* Check if we need to go back into the table. */ 1306 if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) 1307 goto gap_check; 1308 1309done: 1310err: /* 1311 * In case of a race, to make sure only one thread can get 1312 * DB_REP_LOGREADY, zero out rep->last_lsn to show that we've gotten to 1313 * this point. 1314 */ 1315 REP_SYSTEM_LOCK(env); 1316 if (ret == 0 && 1317 F_ISSET(rep, REP_F_RECOVER_LOG) && 1318 !IS_ZERO_LSN(rep->last_lsn) && 1319 LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) { 1320 *last_lsnp = max_lsn; 1321 ZERO_LSN(rep->last_lsn); 1322 ZERO_LSN(max_lsn); 1323 ret = DB_REP_LOGREADY; 1324 } 1325 /* 1326 * Only decrement if we were actually applying log records. 1327 * We do not care if we processed a dup record or put one 1328 * in the temp db. 1329 */ 1330 if (set_apply) { 1331 rep->apply_th--; 1332 RPRINT(env, DB_VERB_REP_MISC, (env, 1333 "rep_apply: Decrement apply_th %d [%lu][%lu]", 1334 rep->apply_th, (u_long)lp->ready_lsn.file, 1335 (u_long)lp->ready_lsn.offset)); 1336 } 1337 1338 if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) && 1339 !IS_ZERO_LSN(max_lsn)) { 1340 if (ret_lsnp != NULL) 1341 *ret_lsnp = max_lsn; 1342 ret = DB_REP_ISPERM; 1343 DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0); 1344 lp->max_perm_lsn = max_lsn; 1345 } 1346 1347 /* 1348 * Start-up is complete when we process (or have already processed) up 1349 * to the end of the replication group's log. In case we miss that 1350 * message, as a back-up, we also recognize start-up completion when we 1351 * actually process a live log record. Having cmp==0 here (with a good 1352 * "ret" value) implies we actually processed the record. 1353 */ 1354 if ((ret == 0 || ret == DB_REP_ISPERM) && 1355 rep->stat.st_startup_complete == 0 && 1356 !F_ISSET(rep, REP_F_RECOVER_LOG) && 1357 ((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) || 1358 (cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) { 1359 rep->stat.st_startup_complete = 1; 1360 event = 1; 1361 gen = rep->gen; 1362 master = rep->master_id; 1363 } 1364 REP_SYSTEM_UNLOCK(env); 1365 /* 1366 * If we've processed beyond the needed LSN for a pending 1367 * start sync, start it now. We can compare >= here 1368 * because ready_lsn is the next record we expect. 1369 * Since ckp_lsn can point to the last commit record itself, 1370 * but if it does and ready_lsn == commit (i.e. we haven't 1371 * written the commit yet), we can still start to sync 1372 * because we're guaranteed no additional buffers can 1373 * be dirtied. 1374 */ 1375 if (!IS_ZERO_LSN(rep->ckp_lsn) && 1376 LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) >= 0) { 1377 save_lsn = rep->ckp_lsn; 1378 ZERO_LSN(rep->ckp_lsn); 1379 } else 1380 ZERO_LSN(save_lsn); 1381 1382 /* 1383 * If this is a perm record, we are using leases, update the lease 1384 * grant. We must hold the clientdb mutex. We must not hold 1385 * the region mutex because rep_update_grant will acquire it. 1386 */ 1387 if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) && 1388 timespecisset(&max_ts)) { 1389 if ((t_ret = __rep_update_grant(env, &max_ts)) != 0) 1390 ret = t_ret; 1391 else if (timespeccmp(&max_ts, &lp->max_lease_ts, >)) 1392 lp->max_lease_ts = max_ts; 1393 } 1394 1395 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1396 if (!IS_ZERO_LSN(save_lsn)) { 1397 /* 1398 * Now call memp_sync holding only the ckp mutex. 1399 */ 1400 MUTEX_LOCK(env, rep->mtx_ckp); 1401 RPRINT(env, DB_VERB_REP_MISC, (env, 1402 "Starting delayed __memp_sync call [%lu][%lu]", 1403 (u_long)save_lsn.file, (u_long)save_lsn.offset)); 1404 t_ret = __memp_sync(env, 1405 DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &save_lsn); 1406 MUTEX_UNLOCK(env, rep->mtx_ckp); 1407 } 1408 if (event) { 1409 RPRINT(env, DB_VERB_REP_MISC, (env, 1410 "Start-up is done [%lu][%lu]", 1411 (u_long)rp->lsn.file, (u_long)rp->lsn.offset)); 1412 1413 if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) { 1414 DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM); 1415 /* Failure trumps either of those values. */ 1416 ret = t_ret; 1417 goto out; 1418 } 1419 } 1420 if ((ret == 0 || ret == DB_REP_ISPERM) && 1421 newfile_seen && lp->db_log_autoremove) 1422 __log_autoremove(env); 1423 if (control_dbt.data != NULL) 1424 __os_ufree(env, control_dbt.data); 1425 if (rec_dbt.data != NULL) 1426 __os_ufree(env, rec_dbt.data); 1427 1428out: 1429 switch (ret) { 1430 case 0: 1431 break; 1432 case DB_REP_ISPERM: 1433 RPRINT(env, DB_VERB_REP_MSGS, 1434 (env, "Returning ISPERM [%lu][%lu], cmp = %d", 1435 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); 1436 break; 1437 case DB_REP_LOGREADY: 1438 RPRINT(env, DB_VERB_REP_MSGS, (env, 1439 "Returning LOGREADY up to [%lu][%lu], cmp = %d", 1440 (u_long)last_lsnp->file, 1441 (u_long)last_lsnp->offset, cmp)); 1442 break; 1443 case DB_REP_NOTPERM: 1444 if (!F_ISSET(rep, REP_F_RECOVER_LOG) && 1445 !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL) 1446 *ret_lsnp = max_lsn; 1447 1448 RPRINT(env, DB_VERB_REP_MSGS, 1449 (env, "Returning NOTPERM [%lu][%lu], cmp = %d", 1450 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); 1451 break; 1452 default: 1453 RPRINT(env, DB_VERB_REP_MSGS, 1454 (env, "Returning %d [%lu][%lu], cmp = %d", ret, 1455 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); 1456 break; 1457 } 1458 1459 return (ret); 1460} 1461 1462/* 1463 * __rep_process_txn -- 1464 * 1465 * This is the routine that actually gets a transaction ready for 1466 * processing. 1467 * 1468 * PUBLIC: int __rep_process_txn __P((ENV *, DBT *)); 1469 */ 1470int 1471__rep_process_txn(env, rec) 1472 ENV *env; 1473 DBT *rec; 1474{ 1475 DBT data_dbt, *lock_dbt; 1476 DB_LOCKER *locker; 1477 DB_LOCKREQ req, *lvp; 1478 DB_LOGC *logc; 1479 DB_LSN prev_lsn, *lsnp; 1480 DB_REP *db_rep; 1481 DB_THREAD_INFO *ip; 1482 DB_TXNHEAD *txninfo; 1483 LSN_COLLECTION lc; 1484 REP *rep; 1485 __txn_regop_args *txn_args; 1486 __txn_regop_42_args *txn42_args; 1487 __txn_prepare_args *prep_args; 1488 u_int32_t rectype; 1489 u_int i; 1490 int ret, t_ret; 1491 1492 db_rep = env->rep_handle; 1493 rep = db_rep->region; 1494 logc = NULL; 1495 txn_args = NULL; 1496 txn42_args = NULL; 1497 prep_args = NULL; 1498 txninfo = NULL; 1499 1500 ENV_ENTER(env, ip); 1501 memset(&data_dbt, 0, sizeof(data_dbt)); 1502 if (F_ISSET(env, ENV_THREAD)) 1503 F_SET(&data_dbt, DB_DBT_REALLOC); 1504 1505 /* 1506 * There are two phases: First, we have to traverse backwards through 1507 * the log records gathering the list of all LSNs in the transaction. 1508 * Once we have this information, we can loop through and then apply it. 1509 * 1510 * We may be passed a prepare (if we're restoring a prepare on upgrade) 1511 * instead of a commit (the common case). Check which it is and behave 1512 * appropriately. 1513 */ 1514 LOGCOPY_32(env, &rectype, rec->data); 1515 memset(&lc, 0, sizeof(lc)); 1516 if (rectype == DB___txn_regop) { 1517 /* 1518 * We're the end of a transaction. Make sure this is 1519 * really a commit and not an abort! 1520 */ 1521 if (rep->version >= DB_REPVERSION_44) { 1522 if ((ret = __txn_regop_read( 1523 env, rec->data, &txn_args)) != 0) 1524 return (ret); 1525 if (txn_args->opcode != TXN_COMMIT) { 1526 __os_free(env, txn_args); 1527 return (0); 1528 } 1529 prev_lsn = txn_args->prev_lsn; 1530 lock_dbt = &txn_args->locks; 1531 } else { 1532 if ((ret = __txn_regop_42_read( 1533 env, rec->data, &txn42_args)) != 0) 1534 return (ret); 1535 if (txn42_args->opcode != TXN_COMMIT) { 1536 __os_free(env, txn42_args); 1537 return (0); 1538 } 1539 prev_lsn = txn42_args->prev_lsn; 1540 lock_dbt = &txn42_args->locks; 1541 } 1542 } else { 1543 /* We're a prepare. */ 1544 DB_ASSERT(env, rectype == DB___txn_prepare); 1545 1546 if ((ret = __txn_prepare_read( 1547 env, rec->data, &prep_args)) != 0) 1548 return (ret); 1549 prev_lsn = prep_args->prev_lsn; 1550 lock_dbt = &prep_args->locks; 1551 } 1552 1553 /* Get locks. */ 1554 if ((ret = __lock_id(env, NULL, &locker)) != 0) 1555 goto err1; 1556 1557 if ((ret = 1558 __lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0) 1559 goto err; 1560 1561 /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */ 1562 if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0) 1563 goto err; 1564 qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp); 1565 1566 /* 1567 * The set of records for a transaction may include dbreg_register 1568 * records. Create a txnlist so that they can keep track of file 1569 * state between records. 1570 */ 1571 if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0) 1572 goto err; 1573 1574 /* Phase 2: Apply updates. */ 1575 if ((ret = __log_cursor(env, &logc)) != 0) 1576 goto err; 1577 for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) { 1578 if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) { 1579 __db_errx(env, "failed to read the log at [%lu][%lu]", 1580 (u_long)lsnp->file, (u_long)lsnp->offset); 1581 goto err; 1582 } 1583 if ((ret = __db_dispatch(env, &env->recover_dtab, 1584 &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) { 1585 __db_errx(env, "transaction failed at [%lu][%lu]", 1586 (u_long)lsnp->file, (u_long)lsnp->offset); 1587 goto err; 1588 } 1589 } 1590 1591err: memset(&req, 0, sizeof(req)); 1592 req.op = DB_LOCK_PUT_ALL; 1593 if ((t_ret = 1594 __lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0) 1595 ret = t_ret; 1596 1597 if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0) 1598 ret = t_ret; 1599 1600err1: if (txn_args != NULL) 1601 __os_free(env, txn_args); 1602 if (txn42_args != NULL) 1603 __os_free(env, txn42_args); 1604 if (prep_args != NULL) 1605 __os_free(env, prep_args); 1606 if (lc.array != NULL) 1607 __os_free(env, lc.array); 1608 1609 if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0) 1610 ret = t_ret; 1611 1612 if (txninfo != NULL) 1613 __db_txnlist_end(env, txninfo); 1614 1615 if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL) 1616 __os_ufree(env, data_dbt.data); 1617 1618#ifdef HAVE_STATISTICS 1619 if (ret == 0) 1620 /* 1621 * We don't hold the rep mutex, and could miscount if we race. 1622 */ 1623 rep->stat.st_txns_applied++; 1624#endif 1625 1626 return (ret); 1627} 1628 1629/* 1630 * __rep_collect_txn 1631 * Recursive function that will let us visit every entry in a transaction 1632 * chain including all child transactions so that we can then apply 1633 * the entire transaction family at once. 1634 */ 1635static int 1636__rep_collect_txn(env, lsnp, lc) 1637 ENV *env; 1638 DB_LSN *lsnp; 1639 LSN_COLLECTION *lc; 1640{ 1641 __txn_child_args *argp; 1642 DB_LOGC *logc; 1643 DB_LSN c_lsn; 1644 DBT data; 1645 u_int32_t rectype; 1646 u_int nalloc; 1647 int ret, t_ret; 1648 1649 memset(&data, 0, sizeof(data)); 1650 F_SET(&data, DB_DBT_REALLOC); 1651 1652 if ((ret = __log_cursor(env, &logc)) != 0) 1653 return (ret); 1654 1655 while (!IS_ZERO_LSN(*lsnp) && 1656 (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) { 1657 LOGCOPY_32(env, &rectype, data.data); 1658 if (rectype == DB___txn_child) { 1659 if ((ret = __txn_child_read( 1660 env, data.data, &argp)) != 0) 1661 goto err; 1662 c_lsn = argp->c_lsn; 1663 *lsnp = argp->prev_lsn; 1664 __os_free(env, argp); 1665 ret = __rep_collect_txn(env, &c_lsn, lc); 1666 } else { 1667 if (lc->nalloc < lc->nlsns + 1) { 1668 nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2; 1669 if ((ret = __os_realloc(env, 1670 nalloc * sizeof(DB_LSN), &lc->array)) != 0) 1671 goto err; 1672 lc->nalloc = nalloc; 1673 } 1674 lc->array[lc->nlsns++] = *lsnp; 1675 1676 /* 1677 * Explicitly copy the previous lsn. The record 1678 * starts with a u_int32_t record type, a u_int32_t 1679 * txn id, and then the DB_LSN (prev_lsn) that we 1680 * want. We copy explicitly because we have no idea 1681 * what kind of record this is. 1682 */ 1683 LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data + 1684 sizeof(u_int32_t) + sizeof(u_int32_t)); 1685 } 1686 1687 if (ret != 0) 1688 goto err; 1689 } 1690 if (ret != 0) 1691 __db_errx(env, "collect failed at: [%lu][%lu]", 1692 (u_long)lsnp->file, (u_long)lsnp->offset); 1693 1694err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0) 1695 ret = t_ret; 1696 if (data.data != NULL) 1697 __os_ufree(env, data.data); 1698 return (ret); 1699} 1700 1701/* 1702 * __rep_lsn_cmp -- 1703 * qsort-type-compatible wrapper for LOG_COMPARE. 1704 */ 1705static int 1706__rep_lsn_cmp(lsn1, lsn2) 1707 const void *lsn1, *lsn2; 1708{ 1709 1710 return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2)); 1711} 1712 1713/* 1714 * __rep_newfile -- 1715 * NEWFILE messages have the LSN of the last record in the previous 1716 * log file. When applying a NEWFILE message, make sure we haven't already 1717 * swapped files. Assume caller hold mtx_clientdb. 1718 */ 1719static int 1720__rep_newfile(env, rp, rec) 1721 ENV *env; 1722 __rep_control_args *rp; 1723 DBT *rec; 1724{ 1725 DB_LOG *dblp; 1726 DB_LSN tmplsn; 1727 DB_REP *db_rep; 1728 LOG *lp; 1729 REP *rep; 1730 __rep_newfile_args nf_args; 1731 int ret; 1732 1733 dblp = env->lg_handle; 1734 lp = dblp->reginfo.primary; 1735 db_rep = env->rep_handle; 1736 rep = db_rep->region; 1737 1738 /* 1739 * If a newfile is already in progress, just ignore. 1740 */ 1741 if (F_ISSET(rep, REP_F_NEWFILE)) 1742 return (0); 1743 if (rp->lsn.file + 1 > lp->ready_lsn.file) { 1744 if (rec == NULL || rec->size == 0) { 1745 RPRINT(env, DB_VERB_REP_MISC, (env, 1746"rep_newfile: Old-style NEWFILE msg. Use control msg log version: %lu", 1747 (u_long) rp->log_version)); 1748 nf_args.version = rp->log_version; 1749 } else if (rp->rep_version < DB_REPVERSION_47) 1750 nf_args.version = *(u_int32_t *)rec->data; 1751 else if ((ret = __rep_newfile_unmarshal(env, &nf_args, 1752 rec->data, rec->size, NULL)) != 0) 1753 return (ret); 1754 RPRINT(env, DB_VERB_REP_MISC, 1755 (env, "rep_newfile: File %lu vers %lu", 1756 (u_long)rp->lsn.file + 1, (u_long)nf_args.version)); 1757 1758 /* 1759 * We drop the mtx_clientdb mutex during 1760 * the file operation, and then reacquire it when 1761 * we're done. We avoid colliding with new incoming 1762 * log records because lp->ready_lsn is not getting 1763 * updated and there is no real log record at this 1764 * ready_lsn. We avoid colliding with a duplicate 1765 * NEWFILE message by setting an in-progress flag. 1766 */ 1767 REP_SYSTEM_LOCK(env); 1768 F_SET(rep, REP_F_NEWFILE); 1769 REP_SYSTEM_UNLOCK(env); 1770 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1771 LOG_SYSTEM_LOCK(env); 1772 ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version); 1773 LOG_SYSTEM_UNLOCK(env); 1774 MUTEX_LOCK(env, rep->mtx_clientdb); 1775 REP_SYSTEM_LOCK(env); 1776 F_CLR(rep, REP_F_NEWFILE); 1777 REP_SYSTEM_UNLOCK(env); 1778 if (ret == 0) 1779 lp->ready_lsn = tmplsn; 1780 return (ret); 1781 } else 1782 /* We've already applied this NEWFILE. Just ignore it. */ 1783 return (0); 1784} 1785 1786/* 1787 * __rep_do_ckp -- 1788 * Perform the memp_sync necessary for this checkpoint without holding the 1789 * REP->mtx_clientdb. Callers of this function must hold REP->mtx_clientdb 1790 * and must not be holding the region mutex. 1791 */ 1792static int 1793__rep_do_ckp(env, rec, rp) 1794 ENV *env; 1795 DBT *rec; 1796 __rep_control_args *rp; 1797{ 1798 DB_ENV *dbenv; 1799 __txn_ckp_args *ckp_args; 1800 DB_LSN ckp_lsn; 1801 REP *rep; 1802 int ret; 1803 1804 dbenv = env->dbenv; 1805 1806 /* Crack the log record and extract the checkpoint LSN. */ 1807 if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0) 1808 return (ret); 1809 ckp_lsn = ckp_args->ckp_lsn; 1810 __os_free(env, ckp_args); 1811 1812 rep = env->rep_handle->region; 1813 1814 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1815 DB_TEST_WAIT(env, env->test_check); 1816 1817 /* 1818 * Sync the memory pool. 1819 * 1820 * This is the real PERM lock record/ckp. We cannot return ISPERM 1821 * if we haven't truly completed the checkpoint, so we don't allow 1822 * this call to be interrupted. 1823 * 1824 * We may be overlapping our log record with an in-progress startsync 1825 * of this checkpoint; suppress the max_write settings on any running 1826 * cache-flush operation so it completes quickly. 1827 */ 1828 (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1); 1829 MUTEX_LOCK(env, rep->mtx_ckp); 1830 ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn); 1831 MUTEX_UNLOCK(env, rep->mtx_ckp); 1832 (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0); 1833 1834 /* Update the last_ckp in the txn region. */ 1835 if (ret == 0) 1836 ret = __txn_updateckp(env, &rp->lsn); 1837 else { 1838 __db_errx(env, "Error syncing ckp [%lu][%lu]", 1839 (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset); 1840 ret = __env_panic(env, ret); 1841 } 1842 1843 MUTEX_LOCK(env, rep->mtx_clientdb); 1844 return (ret); 1845} 1846 1847/* 1848 * __rep_remfirst -- 1849 * Remove the first entry from the __db.rep.db 1850 */ 1851static int 1852__rep_remfirst(env, ip, cntrl, rec) 1853 ENV *env; 1854 DB_THREAD_INFO *ip; 1855 DBT *cntrl; 1856 DBT *rec; 1857{ 1858 DB *dbp; 1859 DBC *dbc; 1860 DB_REP *db_rep; 1861 int ret, t_ret; 1862 1863 db_rep = env->rep_handle; 1864 dbp = db_rep->rep_db; 1865 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) 1866 return (ret); 1867 1868 /* The DBTs need to persist through another call. */ 1869 F_SET(cntrl, DB_DBT_REALLOC); 1870 F_SET(rec, DB_DBT_REALLOC); 1871 if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0) 1872 ret = __dbc_del(dbc, 0); 1873 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) 1874 ret = t_ret; 1875 1876 return (ret); 1877} 1878 1879/* 1880 * __rep_getnext -- 1881 * Get the next record out of the __db.rep.db table. 1882 */ 1883static int 1884__rep_getnext(env, ip) 1885 ENV *env; 1886 DB_THREAD_INFO *ip; 1887{ 1888 DB *dbp; 1889 DBC *dbc; 1890 DBT lsn_dbt, nextrec_dbt; 1891 DB_LOG *dblp; 1892 DB_REP *db_rep; 1893 LOG *lp; 1894 __rep_control_args *rp; 1895 int ret, t_ret; 1896 1897 dblp = env->lg_handle; 1898 lp = dblp->reginfo.primary; 1899 1900 db_rep = env->rep_handle; 1901 dbp = db_rep->rep_db; 1902 1903 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) 1904 return (ret); 1905 1906 /* 1907 * Update waiting_lsn. We need to move it 1908 * forward to the LSN of the next record 1909 * in the queue. 1910 * 1911 * If the next item in the database is a log 1912 * record--the common case--we're not 1913 * interested in its contents, just in its LSN. 1914 * Optimize by doing a partial get of the data item. 1915 */ 1916 memset(&nextrec_dbt, 0, sizeof(nextrec_dbt)); 1917 F_SET(&nextrec_dbt, DB_DBT_PARTIAL); 1918 nextrec_dbt.ulen = nextrec_dbt.dlen = 0; 1919 1920 memset(&lsn_dbt, 0, sizeof(lsn_dbt)); 1921 ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST); 1922 if (ret != DB_NOTFOUND && ret != 0) 1923 goto err; 1924 1925 if (ret == DB_NOTFOUND) { 1926 ZERO_LSN(lp->waiting_lsn); 1927 /* 1928 * Whether or not the current record is 1929 * simple, there's no next one, and 1930 * therefore we haven't got anything 1931 * else to do right now. Break out. 1932 */ 1933 goto err; 1934 } 1935 rp = (__rep_control_args *)lsn_dbt.data; 1936 lp->waiting_lsn = rp->lsn; 1937 1938err: if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) 1939 ret = t_ret; 1940 return (ret); 1941} 1942 1943/* 1944 * __rep_process_rec -- 1945 * 1946 * Given a record in 'rp', process it. In the case of a NEWFILE, that means 1947 * potentially switching files. In the case of a checkpoint, it means doing 1948 * the checkpoint, and in other cases, it means simply writing the record into 1949 * the log. 1950 */ 1951static int 1952__rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp) 1953 ENV *env; 1954 DB_THREAD_INFO *ip; 1955 __rep_control_args *rp; 1956 DBT *rec; 1957 db_timespec *ret_tsp; 1958 DB_LSN *ret_lsnp; 1959{ 1960 DB *dbp; 1961 DBT control_dbt, key_dbt, rec_dbt; 1962 DB_REP *db_rep; 1963 REP *rep; 1964 db_timespec msg_time; 1965 u_int32_t rectype, txnid; 1966 int ret, t_ret; 1967 1968 db_rep = env->rep_handle; 1969 rep = db_rep->region; 1970 dbp = db_rep->rep_db; 1971 ret = 0; 1972 1973 if (rp->rectype == REP_NEWFILE) { 1974 ret = __rep_newfile(env, rp, rec); 1975 return (0); 1976 } 1977 1978 LOGCOPY_32(env, &rectype, rec->data); 1979 memset(&control_dbt, 0, sizeof(control_dbt)); 1980 memset(&rec_dbt, 0, sizeof(rec_dbt)); 1981 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); 1982 1983 /* 1984 * We write all records except for checkpoint records here. 1985 * All non-checkpoint records need to appear in the log before 1986 * we take action upon them (i.e., we enforce write-ahead logging). 1987 * However, we can't write the checkpoint record here until the 1988 * data buffers are actually written to disk, else we are creating 1989 * an invalid log -- one that says all data before a certain point 1990 * has been written to disk. 1991 * 1992 * If two threads are both processing the same checkpoint record 1993 * (because, for example, it was resent and the original finally 1994 * arrived), we handle that below by checking for the existence of 1995 * the log record when we add it to the replication database. 1996 * 1997 * Any log records that arrive while we are processing the checkpoint 1998 * are added to the bookkeeping database because ready_lsn is not yet 1999 * updated to point after the checkpoint record. 2000 */ 2001 if (rectype != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) { 2002 if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0) 2003 return (ret); 2004 STAT(rep->stat.st_log_records++); 2005 if (F_ISSET(rep, REP_F_RECOVER_LOG)) { 2006 *ret_lsnp = rp->lsn; 2007 goto out; 2008 } 2009 } 2010 2011 switch (rectype) { 2012 case DB___dbreg_register: 2013 /* 2014 * DB opens occur in the context of a transaction, so we can 2015 * simply handle them when we process the transaction. Closes, 2016 * however, are not transaction-protected, so we have to handle 2017 * them here. 2018 * 2019 * It should be unsafe for the master to do a close of a file 2020 * that was opened in an active transaction, so we should be 2021 * guaranteed to get the ordering right. 2022 * 2023 * !!! 2024 * The txn ID is the second 4-byte field of the log record. 2025 * We should really be calling __dbreg_register_read() and 2026 * working from the __dbreg_register_args structure, but this 2027 * is considerably faster and the order of the fields won't 2028 * change. 2029 */ 2030 LOGCOPY_32(env, &txnid, 2031 (u_int8_t *)rec->data + sizeof(u_int32_t)); 2032 if (txnid == TXN_INVALID) 2033 ret = __db_dispatch(env, &env->recover_dtab, 2034 rec, &rp->lsn, DB_TXN_APPLY, NULL); 2035 break; 2036 case DB___txn_regop: 2037 /* 2038 * If an application is doing app-specific recovery 2039 * and acquires locks while applying a transaction, 2040 * it can deadlock. Any other locks held by this 2041 * thread should have been discarded in the 2042 * __rep_process_txn error path, so if we simply 2043 * retry, we should eventually succeed. 2044 */ 2045 do { 2046 ret = 0; 2047 if (!F_ISSET(db_rep, DBREP_OPENFILES)) { 2048 ret = __txn_openfiles(env, ip, NULL, 1); 2049 F_SET(db_rep, DBREP_OPENFILES); 2050 } 2051 if (ret == 0) 2052 ret = __rep_process_txn(env, rec); 2053 } while (ret == DB_LOCK_DEADLOCK || ret == DB_LOCK_NOTGRANTED); 2054 2055 /* Now flush the log unless we're running TXN_NOSYNC. */ 2056 if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC)) 2057 ret = __log_flush(env, NULL); 2058 if (ret != 0) { 2059 __db_errx(env, "Error processing txn [%lu][%lu]", 2060 (u_long)rp->lsn.file, (u_long)rp->lsn.offset); 2061 ret = __env_panic(env, ret); 2062 } 2063 *ret_lsnp = rp->lsn; 2064 break; 2065 case DB___txn_prepare: 2066 ret = __log_flush(env, NULL); 2067 /* 2068 * Save the biggest prepared LSN we've seen. 2069 */ 2070 rep->max_prep_lsn = rp->lsn; 2071 RPRINT(env, DB_VERB_REP_MSGS, 2072 (env, "process_rec: prepare at [%lu][%lu]", 2073 (u_long)rep->max_prep_lsn.file, 2074 (u_long)rep->max_prep_lsn.offset)); 2075 break; 2076 case DB___txn_ckp: 2077 /* 2078 * We do not want to hold the REP->mtx_clientdb mutex while 2079 * syncing the mpool, so if we get a checkpoint record we are 2080 * supposed to process, add it to the __db.rep.db, do the 2081 * memp_sync and then go back and process it later, when the 2082 * sync has finished. If this record is already in the table, 2083 * then some other thread will process it, so simply return 2084 * REP_NOTPERM. 2085 */ 2086 memset(&key_dbt, 0, sizeof(key_dbt)); 2087 key_dbt.data = rp; 2088 key_dbt.size = sizeof(*rp); 2089 2090 /* 2091 * We want to put this record into the tmp DB only if 2092 * it doesn't exist, so use DB_NOOVERWRITE. 2093 */ 2094 ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE); 2095 if (ret == DB_KEYEXIST) { 2096 if (ret_lsnp != NULL) 2097 *ret_lsnp = rp->lsn; 2098 ret = DB_REP_NOTPERM; 2099 } 2100 if (ret != 0) 2101 break; 2102 2103 /* 2104 * Now, do the checkpoint. Regardless of 2105 * whether the checkpoint succeeds or not, 2106 * we need to remove the record we just put 2107 * in the temporary database. If the 2108 * checkpoint failed, return an error. We 2109 * will act like we never received the 2110 * checkpoint. 2111 */ 2112 if ((ret = __rep_do_ckp(env, rec, rp)) == 0) 2113 ret = __log_rep_put(env, &rp->lsn, rec, 2114 DB_LOG_CHKPNT); 2115 if ((t_ret = __rep_remfirst(env, ip, 2116 &control_dbt, &rec_dbt)) != 0 && ret == 0) 2117 ret = t_ret; 2118 /* 2119 * If we're successful putting the log record in the 2120 * log, flush it for a checkpoint. 2121 */ 2122 if (ret == 0) { 2123 *ret_lsnp = rp->lsn; 2124 ret = __log_flush(env, NULL); 2125 } 2126 break; 2127 default: 2128 break; 2129 } 2130 2131out: 2132 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) 2133 *ret_lsnp = rp->lsn; 2134 if (IS_USING_LEASES(env) && 2135 F_ISSET(rp, REPCTL_LEASE)) 2136 *ret_tsp = msg_time; 2137 /* 2138 * Set ret_lsnp before flushing the log because if the 2139 * flush fails, we've still written the record to the 2140 * log and the LSN has been entered. 2141 */ 2142 if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH)) 2143 ret = __log_flush(env, NULL); 2144 if (control_dbt.data != NULL) 2145 __os_ufree(env, control_dbt.data); 2146 if (rec_dbt.data != NULL) 2147 __os_ufree(env, rec_dbt.data); 2148 2149 return (ret); 2150} 2151 2152/* 2153 * __rep_resend_req -- 2154 * We might have dropped a message, we need to resend our request. 2155 * The request we send is dependent on what recovery state we're in. 2156 * The caller holds no locks. 2157 * 2158 * PUBLIC: int __rep_resend_req __P((ENV *, int)); 2159 */ 2160int 2161__rep_resend_req(env, rereq) 2162 ENV *env; 2163 int rereq; 2164{ 2165 DB_LOG *dblp; 2166 DB_LSN lsn, *lsnp; 2167 DB_REP *db_rep; 2168 LOG *lp; 2169 REP *rep; 2170 int master, ret; 2171 u_int32_t gapflags, msgtype, repflags, sendflags; 2172 2173 db_rep = env->rep_handle; 2174 rep = db_rep->region; 2175 dblp = env->lg_handle; 2176 lp = dblp->reginfo.primary; 2177 ret = 0; 2178 lsnp = NULL; 2179 msgtype = REP_INVALID; 2180 sendflags = 0; 2181 2182 repflags = rep->flags; 2183 /* 2184 * If we are delayed we do not rerequest anything. 2185 */ 2186 if (FLD_ISSET(repflags, REP_F_DELAY)) 2187 return (ret); 2188 gapflags = rereq ? REP_GAP_REREQUEST : 0; 2189 2190 if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) { 2191 MUTEX_LOCK(env, rep->mtx_clientdb); 2192 lsn = lp->verify_lsn; 2193 MUTEX_UNLOCK(env, rep->mtx_clientdb); 2194 if (!IS_ZERO_LSN(lsn)) { 2195 msgtype = REP_VERIFY_REQ; 2196 lsnp = &lsn; 2197 sendflags = DB_REP_REREQUEST; 2198 } 2199 } else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) { 2200 /* 2201 * UPDATE_REQ only goes to the master. 2202 */ 2203 msgtype = REP_UPDATE_REQ; 2204 } else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) { 2205 REP_SYSTEM_LOCK(env); 2206 ret = __rep_pggap_req(env, rep, NULL, gapflags); 2207 REP_SYSTEM_UNLOCK(env); 2208 } else { 2209 MUTEX_LOCK(env, rep->mtx_clientdb); 2210 ret = __rep_loggap_req(env, rep, NULL, gapflags); 2211 MUTEX_UNLOCK(env, rep->mtx_clientdb); 2212 } 2213 2214 if (msgtype != REP_INVALID) { 2215 master = rep->master_id; 2216 if (master == DB_EID_INVALID) 2217 (void)__rep_send_message(env, 2218 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); 2219 else 2220 (void)__rep_send_message(env, 2221 master, msgtype, lsnp, NULL, 0, sendflags); 2222 } 2223 2224 return (ret); 2225} 2226 2227/* 2228 * __rep_check_doreq -- 2229 * PUBLIC: int __rep_check_doreq __P((ENV *, REP *)); 2230 * 2231 * Check if we need to send another request. If so, compare with 2232 * the request limits the user might have set. This assumes the 2233 * caller holds the REP->mtx_clientdb mutex. Returns 1 if a request 2234 * needs to be made, and 0 if it does not. 2235 */ 2236int 2237__rep_check_doreq(env, rep) 2238 ENV *env; 2239 REP *rep; 2240{ 2241 2242 DB_LOG *dblp; 2243 LOG *lp; 2244 db_timespec now; 2245 int req; 2246 2247 dblp = env->lg_handle; 2248 lp = dblp->reginfo.primary; 2249 __os_gettime(env, &now, 1); 2250 timespecsub(&now, &lp->rcvd_ts); 2251 req = timespeccmp(&now, &lp->wait_ts, >=); 2252 if (req) { 2253 /* 2254 * Add wait_ts to itself to double it. 2255 */ 2256 timespecadd(&lp->wait_ts, &lp->wait_ts); 2257 if (timespeccmp(&lp->wait_ts, &rep->max_gap, >)) 2258 lp->wait_ts = rep->max_gap; 2259 __os_gettime(env, &lp->rcvd_ts, 1); 2260 } 2261 return (req); 2262} 2263 2264/* 2265 * __rep_skip_msg - 2266 * 2267 * If we're in recovery we want to skip/ignore the message, but 2268 * we also need to see if we need to re-request any retransmissions. 2269 */ 2270static int 2271__rep_skip_msg(env, rep, eid, rectype) 2272 ENV *env; 2273 REP *rep; 2274 int eid; 2275 u_int32_t rectype; 2276{ 2277 int do_req, ret; 2278 2279 ret = 0; 2280 /* 2281 * If we have a request message from a client then immediately 2282 * send a REP_REREQUEST back to that client since we're skipping it. 2283 */ 2284 if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype)) 2285 do_req = 1; 2286 else { 2287 /* Check for need to retransmit. */ 2288 MUTEX_LOCK(env, rep->mtx_clientdb); 2289 do_req = __rep_check_doreq(env, rep); 2290 MUTEX_UNLOCK(env, rep->mtx_clientdb); 2291 } 2292 /* 2293 * Don't respond to a MASTER_REQ with 2294 * a MASTER_REQ or REREQUEST. 2295 */ 2296 if (do_req && rectype != REP_MASTER_REQ) { 2297 /* 2298 * There are three cases: 2299 * 1. If we don't know who the master is, then send MASTER_REQ. 2300 * 2. If the message we're skipping came from the master, 2301 * then we need to rerequest. 2302 * 3. If the message didn't come from a master (i.e. client 2303 * to client), then send a rerequest back to the sender so 2304 * the sender can rerequest it elsewhere, if we are a client. 2305 */ 2306 if (rep->master_id == DB_EID_INVALID) /* Case 1. */ 2307 (void)__rep_send_message(env, 2308 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); 2309 else if (eid == rep->master_id) /* Case 2. */ 2310 ret = __rep_resend_req(env, 0); 2311 else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */ 2312 (void)__rep_send_message(env, 2313 eid, REP_REREQUEST, NULL, NULL, 0, 0); 2314 } 2315 return (ret); 2316} 2317 2318static int 2319__rep_fire_newmaster(env, gen, master) 2320 ENV *env; 2321 u_int32_t gen; 2322 int master; 2323{ 2324 DB_REP *db_rep; 2325 REP *rep; 2326 2327 db_rep = env->rep_handle; 2328 rep = db_rep->region; 2329 2330 REP_EVENT_LOCK(env); 2331 /* 2332 * The firing of this event should be idempotent with respect to a 2333 * particular generation number. 2334 */ 2335 if (rep->newmaster_event_gen < gen) { 2336 __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master); 2337 rep->newmaster_event_gen = gen; 2338 } 2339 REP_EVENT_UNLOCK(env); 2340 return (0); 2341} 2342 2343static int 2344__rep_fire_startupdone(env, gen, master) 2345 ENV *env; 2346 u_int32_t gen; 2347 int master; 2348{ 2349 DB_REP *db_rep; 2350 REP *rep; 2351 2352 db_rep = env->rep_handle; 2353 rep = db_rep->region; 2354 2355 REP_EVENT_LOCK(env); 2356 /* 2357 * Usually NEWMASTER will already have been fired. But if not, fire 2358 * it here now, to ensure the application receives events in the 2359 * expected order. 2360 */ 2361 if (rep->newmaster_event_gen < gen) { 2362 __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master); 2363 rep->newmaster_event_gen = gen; 2364 } 2365 2366 /* 2367 * Caller already ensures that it only tries to fire STARTUPDONE once 2368 * per generation. If we did not want to rely on that, we could add a 2369 * simple boolean flag (to the set of data protected by the mtx_event). 2370 * The precise meaning of that flag would be "STARTUPDONE has been fired 2371 * for the generation value stored in `newmaster_event_gen'". Then the 2372 * more accurate test here would be simply to check that flag, and fire 2373 * the event (and set the flag) if it were not already set. 2374 */ 2375 if (rep->newmaster_event_gen == gen) 2376 __rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL); 2377 REP_EVENT_UNLOCK(env); 2378 return (0); 2379} 2380