1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001,2008 Oracle. All rights reserved. 5 * 6 * $Id: rep_record.c,v 12.143 2008/03/13 16:21:04 mbrey Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/db_am.h" 14#include "dbinc/lock.h" 15#include "dbinc/log.h" 16#include "dbinc/mp.h" 17#include "dbinc/txn.h" 18 19static int __rep_collect_txn __P((ENV *, DB_LSN *, LSN_COLLECTION *)); 20static int __rep_do_ckp __P((ENV *, DBT *, __rep_control_args *)); 21static int __rep_fire_newmaster __P((ENV *, u_int32_t, int)); 22static int __rep_fire_startupdone __P((ENV *, u_int32_t, int)); 23static int __rep_getnext __P((ENV *, DB_THREAD_INFO *)); 24static int __rep_lsn_cmp __P((const void *, const void *)); 25static int __rep_newfile __P((ENV *, __rep_control_args *, DBT *)); 26static int __rep_process_rec __P((ENV *, DB_THREAD_INFO *, __rep_control_args *, 27 DBT *, db_timespec *, DB_LSN *)); 28static int __rep_remfirst __P((ENV *, DB_THREAD_INFO *, DBT *, DBT *)); 29static int __rep_skip_msg __P((ENV *, REP *, int, u_int32_t)); 30 31/* Used to consistently designate which messages ought to be received where. */ 32 33#define MASTER_ONLY(rep, rp) do { \ 34 if (!F_ISSET(rep, REP_F_MASTER)) { \ 35 RPRINT(env, DB_VERB_REP_MSGS, \ 36 (env, "Master record received on client")); \ 37 REP_PRINT_MESSAGE(env, \ 38 eid, rp, "rep_process_message", 0); \ 39 ret = EINVAL; \ 40 goto errlock; \ 41 } \ 42} while (0) 43 44#define CLIENT_ONLY(rep, rp) do { \ 45 if (!F_ISSET(rep, REP_F_CLIENT)) { \ 46 RPRINT(env, DB_VERB_REP_MSGS, \ 47 (env, "Client record received on master")); \ 48 /* \ 49 * Only broadcast DUPMASTER if leases are not \ 50 * in effect. If I am an old master, using \ 51 * leases and I get a newer message, my leases \ 52 * had better all be expired. \ 53 */ \ 54 if (IS_USING_LEASES(env)) \ 55 DB_ASSERT(env, \ 56 __rep_lease_check(env, 0) == \ 57 DB_REP_LEASE_EXPIRED); \ 58 else { \ 59 REP_PRINT_MESSAGE(env, \ 60 eid, rp, "rep_process_message", 0); \ 61 (void)__rep_send_message(env, DB_EID_BROADCAST, \ 62 REP_DUPMASTER, NULL, NULL, 0, 0); \ 63 } \ 64 ret = DB_REP_DUPMASTER; \ 65 goto errlock; \ 66 } \ 67} while (0) 68 69/* 70 * If a client is attempting to service a request it does not have, 71 * call rep_skip_msg to skip this message and force a rerequest to the 72 * sender. We don't hold the mutex for the stats and may miscount. 73 */ 74#define CLIENT_REREQ do { \ 75 if (F_ISSET(rep, REP_F_CLIENT)) { \ 76 STAT(rep->stat.st_client_svc_req++); \ 77 if (ret == DB_NOTFOUND) { \ 78 STAT(rep->stat.st_client_svc_miss++); \ 79 ret = __rep_skip_msg(env, rep, eid, rp->rectype);\ 80 } \ 81 } \ 82} while (0) 83 84#define MASTER_UPDATE(env, renv) do { \ 85 REP_SYSTEM_LOCK(env); \ 86 F_SET((renv), DB_REGENV_REPLOCKED); \ 87 (void)time(&(renv)->op_timestamp); \ 88 REP_SYSTEM_UNLOCK(env); \ 89} while (0) 90 91#define RECOVERING_SKIP do { \ 92 if (IS_REP_CLIENT(env) && recovering) { \ 93 /* Not holding region mutex, may miscount */ \ 94 STAT(rep->stat.st_msgs_recover++); \ 95 ret = __rep_skip_msg(env, rep, eid, rp->rectype); \ 96 goto errlock; \ 97 } \ 98} while (0) 99 100/* 101 * If we're recovering the log we only want log records that are in the 102 * range we need to recover. Otherwise we can end up storing a huge 103 * number of "new" records, only to truncate the temp database later after 104 * we run recovery. If we are actively delaying a sync-up, we also skip 105 * all incoming log records until the application requests sync-up. 106 */ 107#define RECOVERING_LOG_SKIP do { \ 108 if (F_ISSET(rep, REP_F_DELAY) || \ 109 rep->master_id == DB_EID_INVALID || \ 110 (recovering && \ 111 (!F_ISSET(rep, REP_F_RECOVER_LOG) || \ 112 LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0))) { \ 113 /* Not holding region mutex, may miscount */ \ 114 STAT(rep->stat.st_msgs_recover++); \ 115 ret = __rep_skip_msg(env, rep, eid, rp->rectype); \ 116 goto errlock; \ 117 } \ 118} while (0) 119 120#define ANYSITE(rep) 121 122/* 123 * __rep_process_message -- 124 * 125 * This routine takes an incoming message and processes it. 126 * 127 * control: contains the control fields from the record 128 * rec: contains the actual record 129 * eid: the environment id of the sender of the message; 130 * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the 131 * lsn of the maximum permanent or current not permanent log record 132 * (respectively). 133 * 134 * PUBLIC: int __rep_process_message 135 * PUBLIC: __P((DB_ENV *, DBT *, DBT *, int, DB_LSN *)); 136 */ 137int 138__rep_process_message(dbenv, control, rec, eid, ret_lsnp) 139 DB_ENV *dbenv; 140 DBT *control, *rec; 141 int eid; 142 DB_LSN *ret_lsnp; 143{ 144 DBT data_dbt; 145 DB_LOG *dblp; 146 DB_LSN last_lsn, lsn; 147 DB_REP *db_rep; 148 DB_THREAD_INFO *ip; 149 ENV *env; 150 LOG *lp; 151 REGENV *renv; 152 REGINFO *infop; 153 REP *rep; 154 REP_46_CONTROL *rp46; 155 REP_OLD_CONTROL *orp; 156 __rep_control_args *rp, tmprp; 157 __rep_egen_args egen_arg; 158 size_t len; 159 u_int32_t gen, rep_version; 160 int cmp, do_sync, lockout, recovering, ret, t_ret; 161 time_t savetime; 162 u_int8_t buf[__REP_MAXMSG_SIZE]; 163 164 env = dbenv->env; 165 166 ENV_REQUIRES_CONFIG_XX( 167 env, rep_handle, "DB_ENV->rep_process_message", DB_INIT_REP); 168 169 /* Control argument must be non-Null. */ 170 if (control == NULL || control->size == 0) { 171 __db_errx(env, 172 "DB_ENV->rep_process_message: control argument must be specified"); 173 return (EINVAL); 174 } 175 176 if (!IS_REP_MASTER(env) && !IS_REP_CLIENT(env)) { 177 __db_errx(env, 178 "Environment not configured as replication master or client"); 179 return (EINVAL); 180 } 181 182 if ((ret = __dbt_usercopy(env, control)) != 0 || 183 (ret = __dbt_usercopy(env, rec)) != 0) { 184 __dbt_userfree(env, control, rec, NULL); 185 __db_errx(env, 186 "DB_ENV->rep_process_message: error retrieving DBT contents"); 187 return ret; 188 } 189 190 ret = 0; 191 do_sync = 0; 192 lockout = 0; 193 db_rep = env->rep_handle; 194 rep = db_rep->region; 195 dblp = env->lg_handle; 196 lp = dblp->reginfo.primary; 197 infop = env->reginfo; 198 renv = infop->primary; 199 /* 200 * Casting this to REP_OLD_CONTROL is just kind of stylistic: the 201 * rep_version field of course has to be in the same offset in all 202 * versions in order for this to work. 203 * 204 * We can look at the rep_version unswapped here because if we're 205 * talking to an old version, it will always be unswapped. If 206 * we're talking to a new version, the only issue is if it is 207 * swapped and we take one of the old version conditionals 208 * incorrectly. The rep_version would need to be very, very 209 * large for a swapped version to look like a small, older 210 * version. There is no problem here looking at it unswapped. 211 */ 212 rep_version = ((REP_OLD_CONTROL *)control->data)->rep_version; 213 if (rep_version <= DB_REPVERSION_45) { 214 orp = (REP_OLD_CONTROL *)control->data; 215 if (rep_version == DB_REPVERSION_45 && 216 F_ISSET(orp, REPCTL_INIT_45)) { 217 F_CLR(orp, REPCTL_INIT_45); 218 F_SET(orp, REPCTL_INIT); 219 } 220 tmprp.rep_version = orp->rep_version; 221 tmprp.log_version = orp->log_version; 222 tmprp.lsn = orp->lsn; 223 tmprp.rectype = orp->rectype; 224 tmprp.gen = orp->gen; 225 tmprp.flags = orp->flags; 226 tmprp.msg_sec = 0; 227 tmprp.msg_nsec = 0; 228 } else if (rep_version == DB_REPVERSION_46) { 229 rp46 = (REP_46_CONTROL *)control->data; 230 tmprp.rep_version = rp46->rep_version; 231 tmprp.log_version = rp46->log_version; 232 tmprp.lsn = rp46->lsn; 233 tmprp.rectype = rp46->rectype; 234 tmprp.gen = rp46->gen; 235 tmprp.flags = rp46->flags; 236 tmprp.msg_sec = (u_int32_t)rp46->msg_time.tv_sec; 237 tmprp.msg_nsec = (u_int32_t)rp46->msg_time.tv_nsec; 238 } else 239 if ((ret = __rep_control_unmarshal(env, &tmprp, 240 control->data, control->size, NULL)) != 0) 241 return (ret); 242 rp = &tmprp; 243 if (ret_lsnp != NULL) 244 ZERO_LSN(*ret_lsnp); 245 246 ENV_ENTER(env, ip); 247 248 REP_PRINT_MESSAGE(env, eid, rp, "rep_process_message", 0); 249 /* 250 * Check the version number for both rep and log. If it is 251 * an old version we support, convert it. Otherwise complain. 252 */ 253 if (rp->rep_version < DB_REPVERSION) { 254 if (rp->rep_version < DB_REPVERSION_MIN) { 255 __db_errx(env, 256 "unsupported old replication message version %lu, minimum version %d", 257 (u_long)rp->rep_version, DB_REPVERSION_MIN); 258 ret = EINVAL; 259 goto errlock; 260 } 261 RPRINT(env, DB_VERB_REP_MSGS, (env, 262 "Received record %lu with old rep version %lu", 263 (u_long)rp->rectype, (u_long)rp->rep_version)); 264 rp->rectype = __rep_msg_from_old(rp->rep_version, rp->rectype); 265 DB_ASSERT(env, rp->rectype != REP_INVALID); 266 /* 267 * We should have a valid new record type for all the old 268 * versions. 269 */ 270 RPRINT(env, DB_VERB_REP_MSGS, (env, 271 "Converted to record %lu with old rep version %lu", 272 (u_long)rp->rectype, (u_long)rp->rep_version)); 273 } else if (rp->rep_version > DB_REPVERSION) { 274 __db_errx(env, 275 "unexpected replication message version %lu, expected %d", 276 (u_long)rp->rep_version, DB_REPVERSION); 277 ret = EINVAL; 278 goto errlock; 279 } 280 281 if (rp->log_version < DB_LOGVERSION) { 282 if (rp->log_version < DB_LOGVERSION_MIN) { 283 __db_errx(env, 284 "unsupported old replication log version %lu, minimum version %d", 285 (u_long)rp->log_version, DB_LOGVERSION_MIN); 286 ret = EINVAL; 287 goto errlock; 288 } 289 RPRINT(env, DB_VERB_REP_MSGS, (env, 290 "Received record %lu with old log version %lu", 291 (u_long)rp->rectype, (u_long)rp->log_version)); 292 } else if (rp->log_version > DB_LOGVERSION) { 293 __db_errx(env, 294 "unexpected log record version %lu, expected %d", 295 (u_long)rp->log_version, DB_LOGVERSION); 296 ret = EINVAL; 297 goto errlock; 298 } 299 300 /* 301 * Acquire the replication lock. 302 */ 303 REP_SYSTEM_LOCK(env); 304 if (F_ISSET(rep, REP_F_READY_MSG)) { 305 /* 306 * If we're racing with a thread in rep_start, then 307 * just ignore the message and return. 308 */ 309 RPRINT(env, DB_VERB_REP_MSGS, (env, 310 "Racing replication msg lockout, ignore message.")); 311 if (F_ISSET(rp, REPCTL_PERM)) 312 ret = DB_REP_IGNORE; 313 REP_SYSTEM_UNLOCK(env); 314 /* 315 * If another client has sent a c2c request to us, it may be a 316 * long time before it resends the request (due to its dual data 317 * streams avoidance heuristic); let it know we can't serve the 318 * request just now. 319 */ 320 if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rp->rectype)) { 321 STAT(rep->stat.st_client_svc_req++); 322 STAT(rep->stat.st_client_svc_miss++); 323 (void)__rep_send_message(env, 324 eid, REP_REREQUEST, NULL, NULL, 0, 0); 325 } 326 goto out; 327 } 328 rep->msg_th++; 329 gen = rep->gen; 330 recovering = F_ISSET(rep, REP_F_RECOVER_MASK); 331 savetime = renv->rep_timestamp; 332 333 STAT(rep->stat.st_msgs_processed++); 334 REP_SYSTEM_UNLOCK(env); 335 336 /* 337 * Check for lease configuration matching. Leases must be 338 * configured all or none. If I am a client and I receive a 339 * message requesting a lease, and I'm not using leases, that 340 * is an error. 341 */ 342 if (!IS_USING_LEASES(env) && 343 (F_ISSET(rp, REPCTL_LEASE) || rp->rectype == REP_LEASE_GRANT)) { 344 __db_errx(env, 345 "Inconsistent lease configuration"); 346 RPRINT(env, DB_VERB_REP_MSGS, (env, 347 "Client received lease message and not using leases")); 348 ret = EINVAL; 349 ret = __env_panic(env, ret); 350 goto errlock; 351 } 352 353 /* 354 * Check for generation number matching. Ignore any old messages 355 * except requests that are indicative of a new client that needs 356 * to get in sync. 357 */ 358 if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ && 359 rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ && 360 rp->rectype != REP_DUPMASTER && rp->rectype != REP_VOTE1) { 361 /* 362 * We don't hold the rep mutex, and could miscount if we race. 363 */ 364 STAT(rep->stat.st_msgs_badgen++); 365 if (F_ISSET(rp, REPCTL_PERM)) 366 ret = DB_REP_IGNORE; 367 goto errlock; 368 } 369 370 if (rp->gen > gen) { 371 /* 372 * If I am a master and am out of date with a lower generation 373 * number, I am in bad shape and should downgrade. 374 */ 375 if (F_ISSET(rep, REP_F_MASTER)) { 376 STAT(rep->stat.st_dupmasters++); 377 ret = DB_REP_DUPMASTER; 378 /* 379 * Only broadcast DUPMASTER if leases are not 380 * in effect. If I am an old master, using 381 * leases and I get a newer message, my leases 382 * had better all be expired. 383 */ 384 if (IS_USING_LEASES(env)) 385 DB_ASSERT(env, 386 __rep_lease_check(env, 0) == 387 DB_REP_LEASE_EXPIRED); 388 else if (rp->rectype != REP_DUPMASTER) 389 (void)__rep_send_message(env, 390 DB_EID_BROADCAST, REP_DUPMASTER, 391 NULL, NULL, 0, 0); 392 goto errlock; 393 } 394 395 /* 396 * I am a client and am out of date. If this is an election, 397 * or a response from the first site I contacted, then I can 398 * accept the generation number and participate in future 399 * elections and communication. Otherwise, I need to hear about 400 * a new master and sync up. 401 */ 402 if (rp->rectype == REP_ALIVE || 403 rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) { 404 REP_SYSTEM_LOCK(env); 405 RPRINT(env, DB_VERB_REP_MSGS, (env, 406 "Updating gen from %lu to %lu", 407 (u_long)gen, (u_long)rp->gen)); 408 rep->master_id = DB_EID_INVALID; 409 gen = rep->gen = rp->gen; 410 /* 411 * Updating of egen will happen when we process the 412 * message below for each message type. 413 */ 414 REP_SYSTEM_UNLOCK(env); 415 if (rp->rectype == REP_ALIVE) 416 (void)__rep_send_message(env, 417 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, 418 NULL, 0, 0); 419 } else if (rp->rectype != REP_NEWMASTER) { 420 /* 421 * Ignore this message, retransmit if needed. 422 */ 423 if (__rep_check_doreq(env, rep)) 424 (void)__rep_send_message(env, 425 DB_EID_BROADCAST, REP_MASTER_REQ, 426 NULL, NULL, 0, 0); 427 goto errlock; 428 } 429 /* 430 * If you get here, then you're a client and either you're 431 * in an election or you have a NEWMASTER or an ALIVE message 432 * whose processing will do the right thing below. 433 */ 434 } 435 436 /* 437 * If the sender is part of an established group, so are we now. 438 */ 439 if (F_ISSET(rp, REPCTL_GROUP_ESTD)) { 440 REP_SYSTEM_LOCK(env); 441#ifdef DIAGNOSTIC 442 if (!F_ISSET(rep, REP_F_GROUP_ESTD)) 443 RPRINT(env, DB_VERB_REP_MSGS, (env, 444 "I am now part of an established group")); 445#endif 446 F_SET(rep, REP_F_GROUP_ESTD); 447 REP_SYSTEM_UNLOCK(env); 448 } 449 450 /* 451 * We need to check if we're in recovery and if we are 452 * then we need to ignore any messages except VERIFY*, VOTE*, 453 * NEW* and ALIVE_REQ, or backup related messages: UPDATE*, 454 * PAGE* and FILE*. We need to also accept LOG messages 455 * if we're copying the log for recovery/backup. 456 */ 457 switch (rp->rectype) { 458 case REP_ALIVE: 459 /* 460 * Handle even if we're recovering. 461 */ 462 ANYSITE(rep); 463 if (rp->rep_version < DB_REPVERSION_47) 464 egen_arg.egen = *(u_int32_t *)rec->data; 465 else if ((ret = __rep_egen_unmarshal(env, &egen_arg, 466 rec->data, rec->size, NULL)) != 0) 467 return (ret); 468 REP_SYSTEM_LOCK(env); 469 RPRINT(env, DB_VERB_REP_MSGS, (env, 470 "Received ALIVE egen of %lu, mine %lu", 471 (u_long)egen_arg.egen, (u_long)rep->egen)); 472 if (egen_arg.egen > rep->egen) { 473 /* 474 * We're changing egen, need to clear out any old 475 * election information. We need to set the 476 * REP_F_EGENUPDATE flag here so that any thread 477 * waiting in rep_elect/rep_wait can distinguish 478 * this situation (and restart its election) from 479 * a current master saying it is still master and 480 * the egen getting incremented on that path. 481 */ 482 __rep_elect_done(env, rep, 0); 483 rep->egen = egen_arg.egen; 484 F_SET(rep, REP_F_EGENUPDATE); 485 } 486 REP_SYSTEM_UNLOCK(env); 487 break; 488 case REP_ALIVE_REQ: 489 /* 490 * Handle even if we're recovering. 491 */ 492 ANYSITE(rep); 493 LOG_SYSTEM_LOCK(env); 494 lsn = lp->lsn; 495 LOG_SYSTEM_UNLOCK(env); 496#ifdef CONFIG_TEST 497 /* 498 * Send this first, before the ALIVE message because of the 499 * way the test suite and messaging is done sequentially. 500 * In some sequences it is possible to get into a situation 501 * where the test suite cannot get the later NEWMASTER because 502 * we break out of the messaging loop too early. 503 */ 504 if (F_ISSET(rep, REP_F_MASTER)) 505 (void)__rep_send_message(env, 506 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0); 507#endif 508 REP_SYSTEM_LOCK(env); 509 egen_arg.egen = rep->egen; 510 REP_SYSTEM_UNLOCK(env); 511 if (rep->version < DB_REPVERSION_47) 512 DB_INIT_DBT(data_dbt, &egen_arg.egen, 513 sizeof(egen_arg.egen)); 514 else { 515 if ((ret = __rep_egen_marshal(env, 516 &egen_arg, buf, __REP_EGEN_SIZE, &len)) != 0) 517 goto errlock; 518 DB_INIT_DBT(data_dbt, buf, len); 519 } 520 (void)__rep_send_message(env, 521 eid, REP_ALIVE, &lsn, &data_dbt, 0, 0); 522 break; 523 case REP_ALL_REQ: 524 RECOVERING_SKIP; 525 ret = __rep_allreq(env, rp, eid); 526 CLIENT_REREQ; 527 break; 528 case REP_BULK_LOG: 529 RECOVERING_LOG_SKIP; 530 CLIENT_ONLY(rep, rp); 531 ret = __rep_bulk_log(env, ip, rp, rec, savetime, ret_lsnp); 532 break; 533 case REP_BULK_PAGE: 534 /* 535 * Handle even if we're recovering. 536 */ 537 CLIENT_ONLY(rep, rp); 538 ret = __rep_bulk_page(env, ip, eid, rp, rec); 539 break; 540 case REP_DUPMASTER: 541 /* 542 * Handle even if we're recovering. 543 */ 544 if (F_ISSET(rep, REP_F_MASTER)) 545 ret = DB_REP_DUPMASTER; 546 break; 547#ifdef NOTYET 548 case REP_FILE: /* TODO */ 549 CLIENT_ONLY(rep, rp); 550 break; 551 case REP_FILE_REQ: 552 ret = __rep_send_file(env, rec, eid); 553 break; 554#endif 555 case REP_FILE_FAIL: 556 /* 557 * Handle even if we're recovering. 558 */ 559 CLIENT_ONLY(rep, rp); 560 /* 561 * XXX 562 */ 563 break; 564 case REP_LEASE_GRANT: 565 /* 566 * Handle even if we're recovering. 567 */ 568 MASTER_ONLY(rep, rp); 569 ret = __rep_lease_grant(env, rp, rec, eid); 570 break; 571 case REP_LOG: 572 case REP_LOG_MORE: 573 RECOVERING_LOG_SKIP; 574 CLIENT_ONLY(rep, rp); 575 ret = __rep_log(env, ip, rp, rec, savetime, ret_lsnp); 576 break; 577 case REP_LOG_REQ: 578 RECOVERING_SKIP; 579 if (F_ISSET(rp, REPCTL_INIT)) 580 MASTER_UPDATE(env, renv); 581 ret = __rep_logreq(env, rp, rec, eid); 582 CLIENT_REREQ; 583 break; 584 case REP_NEWSITE: 585 /* 586 * Handle even if we're recovering. 587 */ 588 /* We don't hold the rep mutex, and may miscount. */ 589 STAT(rep->stat.st_newsites++); 590 591 /* This is a rebroadcast; simply tell the application. */ 592 if (F_ISSET(rep, REP_F_MASTER)) { 593 dblp = env->lg_handle; 594 lp = dblp->reginfo.primary; 595 LOG_SYSTEM_LOCK(env); 596 lsn = lp->lsn; 597 LOG_SYSTEM_UNLOCK(env); 598 (void)__rep_send_message(env, 599 eid, REP_NEWMASTER, &lsn, NULL, 0, 0); 600 if (IS_USING_LEASES(env)) 601 (void)__rep_lease_refresh(env); 602 } 603 ret = DB_REP_NEWSITE; 604 break; 605 case REP_NEWCLIENT: 606 /* 607 * Handle even if we're recovering. 608 */ 609 /* 610 * This message was received and should have resulted in the 611 * application entering the machine ID in its machine table. 612 * We respond to this with an ALIVE to send relevant information 613 * to the new client (if we are a master, we'll send a 614 * NEWMASTER, so we only need to send the ALIVE if we're a 615 * client). But first, broadcast the new client's record to 616 * all the clients. 617 */ 618 (void)__rep_send_message(env, 619 DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0); 620 621 ret = DB_REP_NEWSITE; 622 623 if (F_ISSET(rep, REP_F_CLIENT)) { 624 REP_SYSTEM_LOCK(env); 625 egen_arg.egen = rep->egen; 626 627 /* 628 * Clean up any previous master remnants by making 629 * master_id invalid and cleaning up any internal 630 * init that was in progress. 631 */ 632 if (eid == rep->master_id) { 633 rep->master_id = DB_EID_INVALID; 634 635 /* 636 * Already locking out messages, must be 637 * in sync-up recover or internal init, 638 * give up. 639 */ 640 if (F_ISSET(rep, REP_F_READY_MSG)) 641 goto errhlk; 642 643 /* 644 * Lock out other messages to prevent race 645 * conditions. 646 */ 647 if ((t_ret = 648 __rep_lockout_msg(env, rep, 1)) != 0) { 649 ret = t_ret; 650 goto errhlk; 651 } 652 lockout = 1; 653 654 /* 655 * Need mtx_clientdb to safely clean up 656 * page database in __rep_init_cleanup(). 657 */ 658 REP_SYSTEM_UNLOCK(env); 659 MUTEX_LOCK(env, rep->mtx_clientdb); 660 REP_SYSTEM_LOCK(env); 661 662 /* 663 * Clean up internal init if one was in 664 * progress. 665 */ 666 if (F_ISSET(rep, REP_F_READY_API | 667 REP_F_READY_OP)) { 668 RPRINT(env, DB_VERB_REP_MSGS, (env, 669 "NEWCLIENT is cleaning up old internal init for invalid master")); 670 t_ret = __rep_init_cleanup(env, 671 rep, DB_FORCE); 672 F_CLR(rep, REP_F_RECOVER_MASK); 673 } 674 MUTEX_UNLOCK(env, rep->mtx_clientdb); 675 if (t_ret != 0) { 676 ret = t_ret; 677 RPRINT(env, DB_VERB_REP_MSGS, (env, 678 "NEWCLIENT error cleaning up internal init for invalid master: %d", ret)); 679 goto errhlk; 680 } 681 F_CLR(rep, REP_F_READY_MSG); 682 lockout = 0; 683 } 684 REP_SYSTEM_UNLOCK(env); 685 if (rep->version < DB_REPVERSION_47) 686 DB_INIT_DBT(data_dbt, &egen_arg.egen, 687 sizeof(egen_arg.egen)); 688 else { 689 if ((ret = __rep_egen_marshal(env, &egen_arg, 690 buf, __REP_EGEN_SIZE, &len)) != 0) 691 goto errlock; 692 DB_INIT_DBT(data_dbt, buf, len); 693 } 694 (void)__rep_send_message(env, DB_EID_BROADCAST, 695 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0); 696 break; 697 } 698 /* FALLTHROUGH */ 699 case REP_MASTER_REQ: 700 RECOVERING_SKIP; 701 if (F_ISSET(rep, REP_F_MASTER)) { 702 LOG_SYSTEM_LOCK(env); 703 lsn = lp->lsn; 704 LOG_SYSTEM_UNLOCK(env); 705 (void)__rep_send_message(env, 706 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0); 707 if (IS_USING_LEASES(env)) 708 (void)__rep_lease_refresh(env); 709 } 710 /* 711 * If there is no master, then we could get into a state 712 * where an old client lost the initial ALIVE message and 713 * is calling an election under an old gen and can 714 * never get to the current gen. 715 */ 716 if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) { 717 REP_SYSTEM_LOCK(env); 718 egen_arg.egen = rep->egen; 719 if (eid == rep->master_id) 720 rep->master_id = DB_EID_INVALID; 721 REP_SYSTEM_UNLOCK(env); 722 if (rep->version < DB_REPVERSION_47) 723 DB_INIT_DBT(data_dbt, &egen_arg.egen, 724 sizeof(egen_arg.egen)); 725 else { 726 if ((ret = __rep_egen_marshal(env, &egen_arg, 727 buf, __REP_EGEN_SIZE, &len)) != 0) 728 goto errlock; 729 DB_INIT_DBT(data_dbt, buf, len); 730 } 731 (void)__rep_send_message(env, eid, 732 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0); 733 } 734 break; 735 case REP_NEWFILE: 736 RECOVERING_LOG_SKIP; 737 CLIENT_ONLY(rep, rp); 738 ret = __rep_apply(env, 739 ip, rp, rec, ret_lsnp, NULL, &last_lsn); 740 if (ret == DB_REP_LOGREADY) 741 ret = __rep_logready(env, rep, savetime, &last_lsn); 742 break; 743 case REP_NEWMASTER: 744 /* 745 * Handle even if we're recovering. 746 */ 747 ANYSITE(rep); 748 if (F_ISSET(rep, REP_F_MASTER) && 749 eid != rep->eid) { 750 /* We don't hold the rep mutex, and may miscount. */ 751 STAT(rep->stat.st_dupmasters++); 752 ret = DB_REP_DUPMASTER; 753 if (IS_USING_LEASES(env)) 754 DB_ASSERT(env, 755 __rep_lease_check(env, 0) == 756 DB_REP_LEASE_EXPIRED); 757 else 758 (void)__rep_send_message(env, 759 DB_EID_BROADCAST, REP_DUPMASTER, 760 NULL, NULL, 0, 0); 761 break; 762 } 763 if ((ret = 764 __rep_new_master(env, rp, eid)) == DB_REP_NEWMASTER) 765 ret = __rep_fire_newmaster(env, rp->gen, eid); 766 break; 767 case REP_PAGE: 768 case REP_PAGE_MORE: 769 /* 770 * Handle even if we're recovering. 771 */ 772 CLIENT_ONLY(rep, rp); 773 ret = __rep_page(env, ip, eid, rp, rec); 774 if (ret == DB_REP_PAGEDONE) 775 ret = 0; 776 break; 777 case REP_PAGE_FAIL: 778 /* 779 * Handle even if we're recovering. 780 */ 781 CLIENT_ONLY(rep, rp); 782 ret = __rep_page_fail(env, ip, eid, rp, rec); 783 break; 784 case REP_PAGE_REQ: 785 RECOVERING_SKIP; 786 MASTER_UPDATE(env, renv); 787 ret = __rep_page_req(env, ip, eid, rp, rec); 788 CLIENT_REREQ; 789 break; 790 case REP_REREQUEST: 791 /* 792 * Handle even if we're recovering. Don't do a master 793 * check. 794 */ 795 CLIENT_ONLY(rep, rp); 796 /* 797 * Don't hold any mutex, may miscount. 798 */ 799 STAT(rep->stat.st_client_rerequests++); 800 ret = __rep_resend_req(env, 1); 801 break; 802 case REP_START_SYNC: 803 RECOVERING_SKIP; 804 MUTEX_LOCK(env, rep->mtx_clientdb); 805 cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn); 806 /* 807 * The comparison needs to be <= because the LSN in 808 * the message can be the LSN of the first outstanding 809 * txn, which may be the LSN immediately after the 810 * previous commit. The ready_lsn is the LSN of the 811 * next record expected. In that case, the LSNs 812 * could be equal and the client has the commit and 813 * wants to sync. [SR #15338] 814 */ 815 if (cmp <= 0) { 816 MUTEX_UNLOCK(env, rep->mtx_clientdb); 817 do_sync = 1; 818 } else { 819 STAT(rep->stat.st_startsync_delayed++); 820 /* 821 * There are cases where keeping the first ckp_lsn 822 * LSN is advantageous and cases where keeping 823 * a later LSN is better. If random, earlier 824 * log records are missing, keeping the later 825 * LSN seems to be better. That is what we'll 826 * do for now. 827 */ 828 if (LOG_COMPARE(&rp->lsn, &rep->ckp_lsn) > 0) 829 rep->ckp_lsn = rp->lsn; 830 RPRINT(env, DB_VERB_REP_MSGS, (env, 831 "Delayed START_SYNC memp_sync due to missing records.")); 832 RPRINT(env, DB_VERB_REP_MSGS, (env, 833 "ready LSN [%lu][%lu], ckp_lsn [%lu][%lu]", 834 (u_long)lp->ready_lsn.file, (u_long)lp->ready_lsn.offset, 835 (u_long)rep->ckp_lsn.file, (u_long)rep->ckp_lsn.offset)); 836 MUTEX_UNLOCK(env, rep->mtx_clientdb); 837 } 838 break; 839 case REP_UPDATE: 840 /* 841 * Handle even if we're recovering. 842 */ 843 CLIENT_ONLY(rep, rp); 844 ret = __rep_update_setup(env, eid, rp, rec); 845 break; 846 case REP_UPDATE_REQ: 847 /* 848 * Handle even if we're recovering. 849 */ 850 MASTER_ONLY(rep, rp); 851 infop = env->reginfo; 852 renv = infop->primary; 853 MASTER_UPDATE(env, renv); 854 ret = __rep_update_req(env, rp, eid); 855 break; 856 case REP_VERIFY: 857 if (recovering) { 858 MUTEX_LOCK(env, rep->mtx_clientdb); 859 cmp = LOG_COMPARE(&lp->verify_lsn, &rp->lsn); 860 MUTEX_UNLOCK(env, rep->mtx_clientdb); 861 /* 862 * If this is not the verify record I want, skip it. 863 */ 864 if (cmp != 0) { 865 ret = __rep_skip_msg( 866 env, rep, eid, rp->rectype); 867 break; 868 } 869 } 870 CLIENT_ONLY(rep, rp); 871 ret = __rep_verify(env, rp, rec, eid, savetime); 872 break; 873 case REP_VERIFY_FAIL: 874 /* 875 * Handle even if we're recovering. 876 */ 877 CLIENT_ONLY(rep, rp); 878 ret = __rep_verify_fail(env, rp, eid); 879 break; 880 case REP_VERIFY_REQ: 881 RECOVERING_SKIP; 882 ret = __rep_verify_req(env, rp, eid); 883 CLIENT_REREQ; 884 break; 885 case REP_VOTE1: 886 /* 887 * Handle even if we're recovering. 888 */ 889 ret = __rep_vote1(env, rp, rec, eid); 890 break; 891 case REP_VOTE2: 892 /* 893 * Handle even if we're recovering. 894 */ 895 ret = __rep_vote2(env, rp, rec, eid); 896 break; 897 default: 898 __db_errx(env, 899 "DB_ENV->rep_process_message: unknown replication message: type %lu", 900 (u_long)rp->rectype); 901 ret = EINVAL; 902 break; 903 } 904 905errlock: 906 REP_SYSTEM_LOCK(env); 907errhlk: if (lockout) 908 F_CLR(rep, REP_F_READY_MSG); 909 rep->msg_th--; 910 REP_SYSTEM_UNLOCK(env); 911 if (do_sync) { 912 MUTEX_LOCK(env, rep->mtx_ckp); 913 lsn = rp->lsn; 914 /* 915 * This is the REP_START_SYNC sync, and so we permit it to be 916 * interrupted. 917 */ 918 ret = __memp_sync( 919 env, DB_SYNC_CHECKPOINT | DB_SYNC_INTERRUPT_OK, &lsn); 920 MUTEX_UNLOCK(env, rep->mtx_ckp); 921 RPRINT(env, DB_VERB_REP_MSGS, 922 (env, "ALIVE: Completed sync [%lu][%lu]", 923 (u_long)lsn.file, (u_long)lsn.offset)); 924 } 925out: 926 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) { 927 if (ret_lsnp != NULL) 928 *ret_lsnp = rp->lsn; 929 ret = DB_REP_NOTPERM; 930 } 931 __dbt_userfree(env, control, rec, NULL); 932 ENV_LEAVE(env, ip); 933 return (ret); 934} 935 936/* 937 * __rep_apply -- 938 * 939 * Handle incoming log records on a client, applying when possible and 940 * entering into the bookkeeping table otherwise. This routine manages 941 * the state of the incoming message stream -- processing records, via 942 * __rep_process_rec, when possible and enqueuing in the __db.rep.db 943 * when necessary. As gaps in the stream are filled in, this is where 944 * we try to process as much as possible from __db.rep.db to catch up. 945 * 946 * PUBLIC: int __rep_apply __P((ENV *, DB_THREAD_INFO *, __rep_control_args *, 947 * PUBLIC: DBT *, DB_LSN *, int *, DB_LSN *)); 948 */ 949int 950__rep_apply(env, ip, rp, rec, ret_lsnp, is_dupp, last_lsnp) 951 ENV *env; 952 DB_THREAD_INFO *ip; 953 __rep_control_args *rp; 954 DBT *rec; 955 DB_LSN *ret_lsnp; 956 int *is_dupp; 957 DB_LSN *last_lsnp; 958{ 959 DB *dbp; 960 DBT control_dbt, key_dbt; 961 DBT rec_dbt; 962 DB_LOG *dblp; 963 DB_LSN max_lsn, save_lsn; 964 DB_REP *db_rep; 965 LOG *lp; 966 REP *rep; 967 db_timespec msg_time, max_ts; 968 u_int32_t gen; 969 int cmp, event, master, ret, set_apply, t_ret; 970 971 COMPQUIET(gen, 0); 972 COMPQUIET(master, DB_EID_INVALID); 973 974 db_rep = env->rep_handle; 975 rep = db_rep->region; 976 event = ret = set_apply = 0; 977 memset(&control_dbt, 0, sizeof(control_dbt)); 978 memset(&rec_dbt, 0, sizeof(rec_dbt)); 979 ZERO_LSN(max_lsn); 980 timespecclear(&max_ts); 981 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); 982 cmp = -2; /* OOB value that LOG_COMPARE can't return. */ 983 984 dblp = env->lg_handle; 985 MUTEX_LOCK(env, rep->mtx_clientdb); 986 /* 987 * Lazily open the temp db. Always set the startup flag to 0 988 * because it was initialized from rep_start. 989 */ 990 if (db_rep->rep_db == NULL && 991 (ret = __rep_client_dbinit(env, 0, REP_DB)) != 0) { 992 MUTEX_UNLOCK(env, rep->mtx_clientdb); 993 goto out; 994 } 995 dbp = db_rep->rep_db; 996 lp = dblp->reginfo.primary; 997 REP_SYSTEM_LOCK(env); 998 if (F_ISSET(rep, REP_F_RECOVER_LOG) && 999 LOG_COMPARE(&lp->ready_lsn, &rep->first_lsn) < 0) 1000 lp->ready_lsn = rep->first_lsn; 1001 cmp = LOG_COMPARE(&rp->lsn, &lp->ready_lsn); 1002 /* 1003 * If we are going to skip or process any message other 1004 * than a duplicate, make note of it if we're in an 1005 * election so that the election can rerequest proactively. 1006 */ 1007 if (F_ISSET(rep, REP_F_READY_APPLY) && cmp >= 0) 1008 F_SET(rep, REP_F_SKIPPED_APPLY); 1009 1010 /* 1011 * If we're in the middle of processing a NEWFILE, we've dropped 1012 * the mutex and if this matches it is a duplicate record. We 1013 * do not want this call taking the "matching" code below because 1014 * we may then process later records in the temp db and the 1015 * original NEWFILE may not have the log file ready. It will 1016 * process those temp db items when it completes. 1017 */ 1018 if (F_ISSET(rep, REP_F_NEWFILE) && cmp == 0) 1019 cmp = -1; 1020 1021 if (cmp == 0) { 1022 /* 1023 * If we are in an election (i.e. we've sent a vote 1024 * with an LSN in it), then we drop the next record 1025 * we're expecting. When we find a master, we'll 1026 * either go into sync, or if it was an existing 1027 * master, rerequest this one record (later records 1028 * are accumulating in the temp db). 1029 * 1030 * We can simply return here, and rep_process_message 1031 * will set NOTPERM if necessary for this record. 1032 */ 1033 if (F_ISSET(rep, REP_F_READY_APPLY)) { 1034 /* 1035 * We will simply return now. All special return 1036 * processing should be ignored because the special 1037 * values are just initialized. Variables like 1038 * max_lsn are still 0. 1039 */ 1040 RPRINT(env, DB_VERB_REP_MISC, (env, 1041 "rep_apply: In election. Ignoring [%lu][%lu]", 1042 (u_long)rp->lsn.file, (u_long)rp->lsn.offset)); 1043 REP_SYSTEM_UNLOCK(env); 1044 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1045 goto out; 1046 } 1047 rep->apply_th++; 1048 set_apply = 1; 1049 RPRINT(env, DB_VERB_REP_MISC, (env, 1050 "rep_apply: Set apply_th %d", rep->apply_th)); 1051 REP_SYSTEM_UNLOCK(env); 1052 if ((ret = __rep_process_rec(env, ip, 1053 rp, rec, &max_ts, &max_lsn)) != 0) 1054 goto err; 1055 /* 1056 * If we get the record we are expecting, reset 1057 * the count of records we've received and are applying 1058 * towards the request interval. 1059 */ 1060 __os_gettime(env, &lp->rcvd_ts, 1); 1061 ZERO_LSN(lp->max_wait_lsn); 1062 1063 while (ret == 0 && 1064 LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) { 1065 /* 1066 * We just filled in a gap in the log record stream. 1067 * Write subsequent records to the log. 1068 */ 1069gap_check: 1070 if ((ret = __rep_remfirst(env, ip, 1071 &control_dbt, &rec_dbt)) != 0) 1072 goto err; 1073 1074 rp = (__rep_control_args *)control_dbt.data; 1075 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); 1076 rec = &rec_dbt; 1077 if ((ret = __rep_process_rec(env, ip, 1078 rp, rec, &max_ts, &max_lsn)) != 0) 1079 goto err; 1080 1081 --rep->stat.st_log_queued; 1082 1083 /* 1084 * Since we just filled a gap in the log stream, and 1085 * we're writing subsequent records to the log, we want 1086 * to use rcvd_ts and wait_ts so that we will 1087 * request the next gap if we end up with a gap and 1088 * not so recent records in the temp db, but not 1089 * request if recent records are in the temp db and 1090 * likely to arrive on its own shortly. We want to 1091 * avoid requesting the record in that case. Also 1092 * reset max_wait_lsn because the next gap is a 1093 * fresh gap. 1094 */ 1095 lp->rcvd_ts = lp->last_ts; 1096 lp->wait_ts = rep->request_gap; 1097 if ((ret = __rep_getnext(env, ip)) == DB_NOTFOUND) { 1098 __os_gettime(env, &lp->rcvd_ts, 1); 1099 ret = 0; 1100 break; 1101 } else if (ret != 0) 1102 goto err; 1103 } 1104 1105 /* 1106 * Check if we're at a gap in the table and if so, whether we 1107 * need to ask for any records. 1108 */ 1109 if (!IS_ZERO_LSN(lp->waiting_lsn) && 1110 LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) != 0) { 1111 /* 1112 * We got a record and processed it, but we may 1113 * still be waiting for more records. If we 1114 * filled a gap we keep a count of how many other 1115 * records are in the temp database and if we should 1116 * request the next gap at this time. 1117 */ 1118 if (__rep_check_doreq(env, rep) && (ret = 1119 __rep_loggap_req(env, rep, &rp->lsn, 0)) != 0) 1120 goto err; 1121 } else { 1122 lp->wait_ts = rep->request_gap; 1123 ZERO_LSN(lp->max_wait_lsn); 1124 } 1125 1126 } else if (cmp > 0) { 1127 /* 1128 * The LSN is higher than the one we were waiting for. 1129 * This record isn't in sequence; add it to the temporary 1130 * database, update waiting_lsn if necessary, and perform 1131 * calculations to determine if we should issue requests 1132 * for new records. 1133 */ 1134 REP_SYSTEM_UNLOCK(env); 1135 memset(&key_dbt, 0, sizeof(key_dbt)); 1136 key_dbt.data = rp; 1137 key_dbt.size = sizeof(*rp); 1138 ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE); 1139 if (ret == 0) { 1140 rep->stat.st_log_queued++; 1141 __os_gettime(env, &lp->last_ts, 1); 1142#ifdef HAVE_STATISTICS 1143 STAT(rep->stat.st_log_queued_total++); 1144 if (rep->stat.st_log_queued_max < 1145 rep->stat.st_log_queued) 1146 rep->stat.st_log_queued_max = 1147 rep->stat.st_log_queued; 1148#endif 1149 } 1150 1151 if (ret == DB_KEYEXIST) 1152 ret = 0; 1153 if (ret != 0) 1154 goto done; 1155 1156 if (IS_ZERO_LSN(lp->waiting_lsn) || 1157 LOG_COMPARE(&rp->lsn, &lp->waiting_lsn) < 0) 1158 lp->waiting_lsn = rp->lsn; 1159 1160 if (__rep_check_doreq(env, rep) && 1161 (ret = __rep_loggap_req(env, rep, &rp->lsn, 0) != 0)) 1162 goto err; 1163 1164 /* 1165 * If this is permanent; let the caller know that we have 1166 * not yet written it to disk, but we've accepted it. 1167 */ 1168 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) { 1169 max_lsn = rp->lsn; 1170 ret = DB_REP_NOTPERM; 1171 } 1172 goto done; 1173 } else { 1174 STAT(rep->stat.st_log_duplicated++); 1175 REP_SYSTEM_UNLOCK(env); 1176 if (is_dupp != NULL) 1177 *is_dupp = 1; 1178 if (F_ISSET(rp, REPCTL_PERM)) 1179 max_lsn = lp->max_perm_lsn; 1180 /* 1181 * We check REPCTL_LEASE here, because this client may 1182 * have leases configured but the master may not (especially 1183 * in a mixed version group. If the master has leases 1184 * configured, all clients must also. 1185 */ 1186 if (IS_USING_LEASES(env) && 1187 F_ISSET(rp, REPCTL_LEASE) && 1188 timespecisset(&msg_time)) { 1189 if (timespeccmp(&msg_time, &lp->max_lease_ts, >)) 1190 max_ts = msg_time; 1191 else 1192 max_ts = lp->max_lease_ts; 1193 } 1194 goto done; 1195 } 1196 1197 /* Check if we need to go back into the table. */ 1198 if (ret == 0 && LOG_COMPARE(&lp->ready_lsn, &lp->waiting_lsn) == 0) 1199 goto gap_check; 1200 1201done: 1202err: /* 1203 * In case of a race, to make sure only one thread can get 1204 * DB_REP_LOGREADY, zero out rep->last_lsn to show that we've gotten to 1205 * this point. 1206 */ 1207 REP_SYSTEM_LOCK(env); 1208 if (ret == 0 && 1209 F_ISSET(rep, REP_F_RECOVER_LOG) && 1210 !IS_ZERO_LSN(rep->last_lsn) && 1211 LOG_COMPARE(&lp->ready_lsn, &rep->last_lsn) >= 0) { 1212 *last_lsnp = max_lsn; 1213 ZERO_LSN(rep->last_lsn); 1214 ZERO_LSN(max_lsn); 1215 ret = DB_REP_LOGREADY; 1216 } 1217 /* 1218 * Only decrement if we were actually applying log records. 1219 * We do not care if we processed a dup record or put one 1220 * in the temp db. 1221 */ 1222 if (set_apply) { 1223 rep->apply_th--; 1224 RPRINT(env, DB_VERB_REP_MISC, (env, 1225 "rep_apply: Decrement apply_th %d", rep->apply_th)); 1226 } 1227 1228 if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) && 1229 !IS_ZERO_LSN(max_lsn)) { 1230 if (ret_lsnp != NULL) 1231 *ret_lsnp = max_lsn; 1232 ret = DB_REP_ISPERM; 1233 DB_ASSERT(env, LOG_COMPARE(&max_lsn, &lp->max_perm_lsn) >= 0); 1234 lp->max_perm_lsn = max_lsn; 1235 } 1236 1237 /* 1238 * Start-up is complete when we process (or have already processed) up 1239 * to the end of the replication group's log. In case we miss that 1240 * message, as a back-up, we also recognize start-up completion when we 1241 * actually process a live log record. Having cmp==0 here (with a good 1242 * "ret" value) implies we actually processed the record. 1243 */ 1244 if ((ret == 0 || ret == DB_REP_ISPERM) && 1245 rep->stat.st_startup_complete == 0 && 1246 !F_ISSET(rep, REP_F_RECOVER_LOG) && 1247 ((cmp <= 0 && F_ISSET(rp, REPCTL_LOG_END)) || 1248 (cmp == 0 && !F_ISSET(rp, REPCTL_RESEND)))) { 1249 rep->stat.st_startup_complete = 1; 1250 event = 1; 1251 gen = rep->gen; 1252 master = rep->master_id; 1253 } 1254 REP_SYSTEM_UNLOCK(env); 1255 /* 1256 * If we've processed beyond the needed LSN for a pending 1257 * start sync, start it now. We can compare >= here 1258 * because ready_lsn is the next record we expect. 1259 * Since ckp_lsn can point to the last commit record itself, 1260 * but if it does and ready_lsn == commit (i.e. we haven't 1261 * written the commit yet), we can still start to sync 1262 * because we're guaranteed no additional buffers can 1263 * be dirtied. 1264 */ 1265 if (!IS_ZERO_LSN(rep->ckp_lsn) && 1266 LOG_COMPARE(&lp->ready_lsn, &rep->ckp_lsn) >= 0) { 1267 save_lsn = rep->ckp_lsn; 1268 ZERO_LSN(rep->ckp_lsn); 1269 } else 1270 ZERO_LSN(save_lsn); 1271 1272 /* 1273 * If this is a perm record, we are using leases, update the lease 1274 * grant. We must hold the clientdb mutex. We must not hold 1275 * the region mutex because rep_update_grant will acquire it. 1276 */ 1277 if (ret == DB_REP_ISPERM && IS_USING_LEASES(env) && 1278 timespecisset(&max_ts)) { 1279 if ((t_ret = __rep_update_grant(env, &max_ts)) != 0) 1280 ret = t_ret; 1281 else if (timespeccmp(&max_ts, &lp->max_lease_ts, >)) 1282 lp->max_lease_ts = max_ts; 1283 } 1284 1285 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1286 if (!IS_ZERO_LSN(save_lsn)) { 1287 /* 1288 * Now call memp_sync holding only the ckp mutex. 1289 */ 1290 MUTEX_LOCK(env, rep->mtx_ckp); 1291 RPRINT(env, DB_VERB_REP_MISC, (env, 1292 "Starting delayed __memp_sync call [%lu][%lu]", 1293 (u_long)save_lsn.file, (u_long)save_lsn.offset)); 1294 t_ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &save_lsn); 1295 MUTEX_UNLOCK(env, rep->mtx_ckp); 1296 } 1297 if (event) { 1298 RPRINT(env, DB_VERB_REP_MISC, (env, 1299 "Start-up is done [%lu][%lu]", 1300 (u_long)rp->lsn.file, (u_long)rp->lsn.offset)); 1301 1302 if ((t_ret = __rep_fire_startupdone(env, gen, master)) != 0) { 1303 DB_ASSERT(env, ret == 0 || ret == DB_REP_ISPERM); 1304 /* Failure trumps either of those values. */ 1305 ret = t_ret; 1306 goto out; 1307 } 1308 } 1309 if (ret == 0 && rp->rectype == REP_NEWFILE && lp->db_log_autoremove) 1310 __log_autoremove(env); 1311 if (control_dbt.data != NULL) 1312 __os_ufree(env, control_dbt.data); 1313 if (rec_dbt.data != NULL) 1314 __os_ufree(env, rec_dbt.data); 1315 1316out: 1317 switch (ret) { 1318 case 0: 1319 break; 1320 case DB_REP_ISPERM: 1321 RPRINT(env, DB_VERB_REP_MSGS, 1322 (env, "Returning ISPERM [%lu][%lu], cmp = %d", 1323 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); 1324 break; 1325 case DB_REP_LOGREADY: 1326 RPRINT(env, DB_VERB_REP_MSGS, (env, 1327 "Returning LOGREADY up to [%lu][%lu], cmp = %d", 1328 (u_long)last_lsnp->file, 1329 (u_long)last_lsnp->offset, cmp)); 1330 break; 1331 case DB_REP_NOTPERM: 1332 if (!F_ISSET(rep, REP_F_RECOVER_LOG) && 1333 !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL) 1334 *ret_lsnp = max_lsn; 1335 1336 RPRINT(env, DB_VERB_REP_MSGS, 1337 (env, "Returning NOTPERM [%lu][%lu], cmp = %d", 1338 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); 1339 break; 1340 default: 1341 RPRINT(env, DB_VERB_REP_MSGS, 1342 (env, "Returning %d [%lu][%lu], cmp = %d", ret, 1343 (u_long)max_lsn.file, (u_long)max_lsn.offset, cmp)); 1344 break; 1345 } 1346 1347 return (ret); 1348} 1349 1350/* 1351 * __rep_process_txn -- 1352 * 1353 * This is the routine that actually gets a transaction ready for 1354 * processing. 1355 * 1356 * PUBLIC: int __rep_process_txn __P((ENV *, DBT *)); 1357 */ 1358int 1359__rep_process_txn(env, rec) 1360 ENV *env; 1361 DBT *rec; 1362{ 1363 DBT data_dbt, *lock_dbt; 1364 DB_LOCKER *locker; 1365 DB_LOCKREQ req, *lvp; 1366 DB_LOGC *logc; 1367 DB_LSN prev_lsn, *lsnp; 1368 DB_REP *db_rep; 1369 DB_THREAD_INFO *ip; 1370 DB_TXNHEAD *txninfo; 1371 LSN_COLLECTION lc; 1372 REP *rep; 1373 __txn_regop_args *txn_args; 1374 __txn_regop_42_args *txn42_args; 1375 __txn_xa_regop_args *prep_args; 1376 u_int32_t rectype; 1377 u_int i; 1378 int ret, t_ret; 1379 1380 db_rep = env->rep_handle; 1381 rep = db_rep->region; 1382 logc = NULL; 1383 txn_args = NULL; 1384 txn42_args = NULL; 1385 prep_args = NULL; 1386 txninfo = NULL; 1387 1388 ENV_ENTER(env, ip); 1389 memset(&data_dbt, 0, sizeof(data_dbt)); 1390 if (F_ISSET(env, ENV_THREAD)) 1391 F_SET(&data_dbt, DB_DBT_REALLOC); 1392 1393 /* 1394 * There are two phases: First, we have to traverse backwards through 1395 * the log records gathering the list of all LSNs in the transaction. 1396 * Once we have this information, we can loop through and then apply it. 1397 * 1398 * We may be passed a prepare (if we're restoring a prepare on upgrade) 1399 * instead of a commit (the common case). Check which it is and behave 1400 * appropriately. 1401 */ 1402 LOGCOPY_32(env, &rectype, rec->data); 1403 memset(&lc, 0, sizeof(lc)); 1404 if (rectype == DB___txn_regop) { 1405 /* 1406 * We're the end of a transaction. Make sure this is 1407 * really a commit and not an abort! 1408 */ 1409 if (rep->version >= DB_REPVERSION_44) { 1410 if ((ret = __txn_regop_read( 1411 env, rec->data, &txn_args)) != 0) 1412 return (ret); 1413 if (txn_args->opcode != TXN_COMMIT) { 1414 __os_free(env, txn_args); 1415 return (0); 1416 } 1417 prev_lsn = txn_args->prev_lsn; 1418 lock_dbt = &txn_args->locks; 1419 } else { 1420 if ((ret = __txn_regop_42_read( 1421 env, rec->data, &txn42_args)) != 0) 1422 return (ret); 1423 if (txn42_args->opcode != TXN_COMMIT) { 1424 __os_free(env, txn42_args); 1425 return (0); 1426 } 1427 prev_lsn = txn42_args->prev_lsn; 1428 lock_dbt = &txn42_args->locks; 1429 } 1430 } else { 1431 /* We're a prepare. */ 1432 DB_ASSERT(env, rectype == DB___txn_xa_regop); 1433 1434 if ((ret = __txn_xa_regop_read( 1435 env, rec->data, &prep_args)) != 0) 1436 return (ret); 1437 prev_lsn = prep_args->prev_lsn; 1438 lock_dbt = &prep_args->locks; 1439 } 1440 1441 /* Get locks. */ 1442 if ((ret = __lock_id(env, NULL, &locker)) != 0) 1443 goto err1; 1444 1445 if ((ret = 1446 __lock_get_list(env, locker, 0, DB_LOCK_WRITE, lock_dbt)) != 0) 1447 goto err; 1448 1449 /* Phase 1. Get a list of the LSNs in this transaction, and sort it. */ 1450 if ((ret = __rep_collect_txn(env, &prev_lsn, &lc)) != 0) 1451 goto err; 1452 qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp); 1453 1454 /* 1455 * The set of records for a transaction may include dbreg_register 1456 * records. Create a txnlist so that they can keep track of file 1457 * state between records. 1458 */ 1459 if ((ret = __db_txnlist_init(env, ip, 0, 0, NULL, &txninfo)) != 0) 1460 goto err; 1461 1462 /* Phase 2: Apply updates. */ 1463 if ((ret = __log_cursor(env, &logc)) != 0) 1464 goto err; 1465 for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) { 1466 if ((ret = __logc_get(logc, lsnp, &data_dbt, DB_SET)) != 0) { 1467 __db_errx(env, "failed to read the log at [%lu][%lu]", 1468 (u_long)lsnp->file, (u_long)lsnp->offset); 1469 goto err; 1470 } 1471 if ((ret = __db_dispatch(env, &env->recover_dtab, 1472 &data_dbt, lsnp, DB_TXN_APPLY, txninfo)) != 0) { 1473 __db_errx(env, "transaction failed at [%lu][%lu]", 1474 (u_long)lsnp->file, (u_long)lsnp->offset); 1475 goto err; 1476 } 1477 } 1478 1479err: memset(&req, 0, sizeof(req)); 1480 req.op = DB_LOCK_PUT_ALL; 1481 if ((t_ret = 1482 __lock_vec(env, locker, 0, &req, 1, &lvp)) != 0 && ret == 0) 1483 ret = t_ret; 1484 1485 if ((t_ret = __lock_id_free(env, locker)) != 0 && ret == 0) 1486 ret = t_ret; 1487 1488err1: if (txn_args != NULL) 1489 __os_free(env, txn_args); 1490 if (txn42_args != NULL) 1491 __os_free(env, txn42_args); 1492 if (prep_args != NULL) 1493 __os_free(env, prep_args); 1494 if (lc.array != NULL) 1495 __os_free(env, lc.array); 1496 1497 if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0) 1498 ret = t_ret; 1499 1500 if (txninfo != NULL) 1501 __db_txnlist_end(env, txninfo); 1502 1503 if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL) 1504 __os_ufree(env, data_dbt.data); 1505 1506#ifdef HAVE_STATISTICS 1507 if (ret == 0) 1508 /* 1509 * We don't hold the rep mutex, and could miscount if we race. 1510 */ 1511 rep->stat.st_txns_applied++; 1512#endif 1513 1514 return (ret); 1515} 1516 1517/* 1518 * __rep_collect_txn 1519 * Recursive function that will let us visit every entry in a transaction 1520 * chain including all child transactions so that we can then apply 1521 * the entire transaction family at once. 1522 */ 1523static int 1524__rep_collect_txn(env, lsnp, lc) 1525 ENV *env; 1526 DB_LSN *lsnp; 1527 LSN_COLLECTION *lc; 1528{ 1529 __txn_child_args *argp; 1530 DB_LOGC *logc; 1531 DB_LSN c_lsn; 1532 DBT data; 1533 u_int32_t rectype; 1534 u_int nalloc; 1535 int ret, t_ret; 1536 1537 memset(&data, 0, sizeof(data)); 1538 F_SET(&data, DB_DBT_REALLOC); 1539 1540 if ((ret = __log_cursor(env, &logc)) != 0) 1541 return (ret); 1542 1543 while (!IS_ZERO_LSN(*lsnp) && 1544 (ret = __logc_get(logc, lsnp, &data, DB_SET)) == 0) { 1545 LOGCOPY_32(env, &rectype, data.data); 1546 if (rectype == DB___txn_child) { 1547 if ((ret = __txn_child_read( 1548 env, data.data, &argp)) != 0) 1549 goto err; 1550 c_lsn = argp->c_lsn; 1551 *lsnp = argp->prev_lsn; 1552 __os_free(env, argp); 1553 ret = __rep_collect_txn(env, &c_lsn, lc); 1554 } else { 1555 if (lc->nalloc < lc->nlsns + 1) { 1556 nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2; 1557 if ((ret = __os_realloc(env, 1558 nalloc * sizeof(DB_LSN), &lc->array)) != 0) 1559 goto err; 1560 lc->nalloc = nalloc; 1561 } 1562 lc->array[lc->nlsns++] = *lsnp; 1563 1564 /* 1565 * Explicitly copy the previous lsn. The record 1566 * starts with a u_int32_t record type, a u_int32_t 1567 * txn id, and then the DB_LSN (prev_lsn) that we 1568 * want. We copy explicitly because we have no idea 1569 * what kind of record this is. 1570 */ 1571 LOGCOPY_TOLSN(env, lsnp, (u_int8_t *)data.data + 1572 sizeof(u_int32_t) + sizeof(u_int32_t)); 1573 } 1574 1575 if (ret != 0) 1576 goto err; 1577 } 1578 if (ret != 0) 1579 __db_errx(env, "collect failed at: [%lu][%lu]", 1580 (u_long)lsnp->file, (u_long)lsnp->offset); 1581 1582err: if ((t_ret = __logc_close(logc)) != 0 && ret == 0) 1583 ret = t_ret; 1584 if (data.data != NULL) 1585 __os_ufree(env, data.data); 1586 return (ret); 1587} 1588 1589/* 1590 * __rep_lsn_cmp -- 1591 * qsort-type-compatible wrapper for LOG_COMPARE. 1592 */ 1593static int 1594__rep_lsn_cmp(lsn1, lsn2) 1595 const void *lsn1, *lsn2; 1596{ 1597 1598 return (LOG_COMPARE((DB_LSN *)lsn1, (DB_LSN *)lsn2)); 1599} 1600 1601/* 1602 * __rep_newfile -- 1603 * NEWFILE messages have the LSN of the last record in the previous 1604 * log file. When applying a NEWFILE message, make sure we haven't already 1605 * swapped files. Assume caller hold mtx_clientdb. 1606 */ 1607static int 1608__rep_newfile(env, rp, rec) 1609 ENV *env; 1610 __rep_control_args *rp; 1611 DBT *rec; 1612{ 1613 DB_LOG *dblp; 1614 DB_LSN tmplsn; 1615 DB_REP *db_rep; 1616 LOG *lp; 1617 REP *rep; 1618 __rep_newfile_args nf_args; 1619 int ret; 1620 1621 dblp = env->lg_handle; 1622 lp = dblp->reginfo.primary; 1623 db_rep = env->rep_handle; 1624 rep = db_rep->region; 1625 1626 /* 1627 * If a newfile is already in progress, just ignore. 1628 */ 1629 if (F_ISSET(rep, REP_F_NEWFILE)) 1630 return (0); 1631 if (rp->lsn.file + 1 > lp->ready_lsn.file) { 1632 if (rec == NULL || rec->size == 0) { 1633 RPRINT(env, DB_VERB_REP_MISC, (env, 1634"rep_newfile: Old-style NEWFILE msg. Use control msg log version: %lu", 1635 (u_long) rp->log_version)); 1636 nf_args.version = rp->log_version; 1637 } else if (rp->rep_version < DB_REPVERSION_47) 1638 nf_args.version = *(u_int32_t *)rec->data; 1639 else if ((ret = __rep_newfile_unmarshal(env, &nf_args, 1640 rec->data, rec->size, NULL)) != 0) 1641 return (ret); 1642 RPRINT(env, DB_VERB_REP_MISC, 1643 (env, "rep_newfile: File %lu vers %lu", 1644 (u_long)rp->lsn.file + 1, (u_long)nf_args.version)); 1645 1646 /* 1647 * We drop the mtx_clientdb mutex during 1648 * the file operation, and then reacquire it when 1649 * we're done. We avoid colliding with new incoming 1650 * log records because lp->ready_lsn is not getting 1651 * updated and there is no real log record at this 1652 * ready_lsn. We avoid colliding with a duplicate 1653 * NEWFILE message by setting an in-progress flag. 1654 */ 1655 REP_SYSTEM_LOCK(env); 1656 F_SET(rep, REP_F_NEWFILE); 1657 REP_SYSTEM_UNLOCK(env); 1658 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1659 LOG_SYSTEM_LOCK(env); 1660 ret = __log_newfile(dblp, &tmplsn, 0, nf_args.version); 1661 LOG_SYSTEM_UNLOCK(env); 1662 MUTEX_LOCK(env, rep->mtx_clientdb); 1663 REP_SYSTEM_LOCK(env); 1664 F_CLR(rep, REP_F_NEWFILE); 1665 REP_SYSTEM_UNLOCK(env); 1666 if (ret == 0) 1667 lp->ready_lsn = tmplsn; 1668 return (ret); 1669 } else 1670 /* We've already applied this NEWFILE. Just ignore it. */ 1671 return (0); 1672} 1673 1674/* 1675 * __rep_do_ckp -- 1676 * Perform the memp_sync necessary for this checkpoint without holding the 1677 * REP->mtx_clientdb. Callers of this function must hold REP->mtx_clientdb 1678 * and must not be holding the region mutex. 1679 */ 1680static int 1681__rep_do_ckp(env, rec, rp) 1682 ENV *env; 1683 DBT *rec; 1684 __rep_control_args *rp; 1685{ 1686 DB_ENV *dbenv; 1687 __txn_ckp_args *ckp_args; 1688 DB_LSN ckp_lsn; 1689 REP *rep; 1690 int ret; 1691 1692 dbenv = env->dbenv; 1693 1694 /* Crack the log record and extract the checkpoint LSN. */ 1695 if ((ret = __txn_ckp_read(env, rec->data, &ckp_args)) != 0) 1696 return (ret); 1697 ckp_lsn = ckp_args->ckp_lsn; 1698 __os_free(env, ckp_args); 1699 1700 rep = env->rep_handle->region; 1701 1702 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1703 DB_TEST_WAIT(env, env->test_check); 1704 1705 /* 1706 * Sync the memory pool. 1707 * 1708 * This is the real PERM lock record/ckp. We cannot return ISPERM 1709 * if we haven't truly completed the checkpoint, so we don't allow 1710 * this call to be interrupted. 1711 * 1712 * We may be overlapping our log record with an in-progress startsync 1713 * of this checkpoint; suppress the max_write settings on any running 1714 * cache-flush operation so it completes quickly. 1715 */ 1716 (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 1); 1717 MUTEX_LOCK(env, rep->mtx_ckp); 1718 ret = __memp_sync(env, DB_SYNC_CHECKPOINT, &ckp_lsn); 1719 MUTEX_UNLOCK(env, rep->mtx_ckp); 1720 (void)__memp_set_config(dbenv, DB_MEMP_SUPPRESS_WRITE, 0); 1721 1722 /* Update the last_ckp in the txn region. */ 1723 if (ret == 0) 1724 ret = __txn_updateckp(env, &rp->lsn); 1725 else { 1726 __db_errx(env, "Error syncing ckp [%lu][%lu]", 1727 (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset); 1728 ret = __env_panic(env, ret); 1729 } 1730 1731 MUTEX_LOCK(env, rep->mtx_clientdb); 1732 return (ret); 1733} 1734 1735/* 1736 * __rep_remfirst -- 1737 * Remove the first entry from the __db.rep.db 1738 */ 1739static int 1740__rep_remfirst(env, ip, cntrl, rec) 1741 ENV *env; 1742 DB_THREAD_INFO *ip; 1743 DBT *cntrl; 1744 DBT *rec; 1745{ 1746 DB *dbp; 1747 DBC *dbc; 1748 DB_REP *db_rep; 1749 int ret, t_ret; 1750 1751 db_rep = env->rep_handle; 1752 dbp = db_rep->rep_db; 1753 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) 1754 return (ret); 1755 1756 /* The DBTs need to persist through another call. */ 1757 F_SET(cntrl, DB_DBT_REALLOC); 1758 F_SET(rec, DB_DBT_REALLOC); 1759 if ((ret = __dbc_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0) 1760 ret = __dbc_del(dbc, 0); 1761 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) 1762 ret = t_ret; 1763 1764 return (ret); 1765} 1766 1767/* 1768 * __rep_getnext -- 1769 * Get the next record out of the __db.rep.db table. 1770 */ 1771static int 1772__rep_getnext(env, ip) 1773 ENV *env; 1774 DB_THREAD_INFO *ip; 1775{ 1776 DB *dbp; 1777 DBC *dbc; 1778 DBT lsn_dbt, nextrec_dbt; 1779 DB_LOG *dblp; 1780 DB_REP *db_rep; 1781 LOG *lp; 1782 __rep_control_args *rp; 1783 int ret, t_ret; 1784 1785 dblp = env->lg_handle; 1786 lp = dblp->reginfo.primary; 1787 1788 db_rep = env->rep_handle; 1789 dbp = db_rep->rep_db; 1790 1791 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) 1792 return (ret); 1793 1794 /* 1795 * Update waiting_lsn. We need to move it 1796 * forward to the LSN of the next record 1797 * in the queue. 1798 * 1799 * If the next item in the database is a log 1800 * record--the common case--we're not 1801 * interested in its contents, just in its LSN. 1802 * Optimize by doing a partial get of the data item. 1803 */ 1804 memset(&nextrec_dbt, 0, sizeof(nextrec_dbt)); 1805 F_SET(&nextrec_dbt, DB_DBT_PARTIAL); 1806 nextrec_dbt.ulen = nextrec_dbt.dlen = 0; 1807 1808 memset(&lsn_dbt, 0, sizeof(lsn_dbt)); 1809 ret = __dbc_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST); 1810 if (ret != DB_NOTFOUND && ret != 0) 1811 goto err; 1812 1813 if (ret == DB_NOTFOUND) { 1814 ZERO_LSN(lp->waiting_lsn); 1815 /* 1816 * Whether or not the current record is 1817 * simple, there's no next one, and 1818 * therefore we haven't got anything 1819 * else to do right now. Break out. 1820 */ 1821 goto err; 1822 } 1823 rp = (__rep_control_args *)lsn_dbt.data; 1824 lp->waiting_lsn = rp->lsn; 1825 1826err: if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0) 1827 ret = t_ret; 1828 return (ret); 1829} 1830 1831/* 1832 * __rep_process_rec -- 1833 * 1834 * Given a record in 'rp', process it. In the case of a NEWFILE, that means 1835 * potentially switching files. In the case of a checkpoint, it means doing 1836 * the checkpoint, and in other cases, it means simply writing the record into 1837 * the log. 1838 */ 1839static int 1840__rep_process_rec(env, ip, rp, rec, ret_tsp, ret_lsnp) 1841 ENV *env; 1842 DB_THREAD_INFO *ip; 1843 __rep_control_args *rp; 1844 DBT *rec; 1845 db_timespec *ret_tsp; 1846 DB_LSN *ret_lsnp; 1847{ 1848 DB *dbp; 1849 DBT control_dbt, key_dbt, rec_dbt; 1850 DB_REP *db_rep; 1851 REP *rep; 1852 db_timespec msg_time; 1853 u_int32_t rectype, txnid; 1854 int ret, t_ret; 1855 1856 db_rep = env->rep_handle; 1857 rep = db_rep->region; 1858 dbp = db_rep->rep_db; 1859 ret = 0; 1860 1861 if (rp->rectype == REP_NEWFILE) { 1862 ret = __rep_newfile(env, rp, rec); 1863 return (0); 1864 } 1865 1866 LOGCOPY_32(env, &rectype, rec->data); 1867 memset(&control_dbt, 0, sizeof(control_dbt)); 1868 memset(&rec_dbt, 0, sizeof(rec_dbt)); 1869 timespecset(&msg_time, rp->msg_sec, rp->msg_nsec); 1870 1871 /* 1872 * We write all records except for checkpoint records here. 1873 * All non-checkpoint records need to appear in the log before 1874 * we take action upon them (i.e., we enforce write-ahead logging). 1875 * However, we can't write the checkpoint record here until the 1876 * data buffers are actually written to disk, else we are creating 1877 * an invalid log -- one that says all data before a certain point 1878 * has been written to disk. 1879 * 1880 * If two threads are both processing the same checkpoint record 1881 * (because, for example, it was resent and the original finally 1882 * arrived), we handle that below by checking for the existence of 1883 * the log record when we add it to the replication database. 1884 * 1885 * Any log records that arrive while we are processing the checkpoint 1886 * are added to the bookkeeping database because ready_lsn is not yet 1887 * updated to point after the checkpoint record. 1888 */ 1889 if (rectype != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) { 1890 if ((ret = __log_rep_put(env, &rp->lsn, rec, 0)) != 0) 1891 return (ret); 1892 STAT(rep->stat.st_log_records++); 1893 if (F_ISSET(rep, REP_F_RECOVER_LOG)) { 1894 *ret_lsnp = rp->lsn; 1895 goto out; 1896 } 1897 } 1898 1899 switch (rectype) { 1900 case DB___dbreg_register: 1901 /* 1902 * DB opens occur in the context of a transaction, so we can 1903 * simply handle them when we process the transaction. Closes, 1904 * however, are not transaction-protected, so we have to handle 1905 * them here. 1906 * 1907 * It should be unsafe for the master to do a close of a file 1908 * that was opened in an active transaction, so we should be 1909 * guaranteed to get the ordering right. 1910 * 1911 * !!! 1912 * The txn ID is the second 4-byte field of the log record. 1913 * We should really be calling __dbreg_register_read() and 1914 * working from the __dbreg_register_args structure, but this 1915 * is considerably faster and the order of the fields won't 1916 * change. 1917 */ 1918 LOGCOPY_32(env, &txnid, 1919 (u_int8_t *)rec->data + sizeof(u_int32_t)); 1920 if (txnid == TXN_INVALID) 1921 ret = __db_dispatch(env, &env->recover_dtab, 1922 rec, &rp->lsn, DB_TXN_APPLY, NULL); 1923 break; 1924 case DB___txn_regop: 1925 /* 1926 * If an application is doing app-specific recovery 1927 * and acquires locks while applying a transaction, 1928 * it can deadlock. Any other locks held by this 1929 * thread should have been discarded in the 1930 * __rep_process_txn error path, so if we simply 1931 * retry, we should eventually succeed. 1932 */ 1933 do { 1934 ret = 0; 1935 if (!F_ISSET(db_rep, DBREP_OPENFILES)) { 1936 ret = __txn_openfiles(env, ip, NULL, 1); 1937 F_SET(db_rep, DBREP_OPENFILES); 1938 } 1939 if (ret == 0) 1940 ret = __rep_process_txn(env, rec); 1941 } while (ret == DB_LOCK_DEADLOCK); 1942 1943 /* Now flush the log unless we're running TXN_NOSYNC. */ 1944 if (ret == 0 && !F_ISSET(env->dbenv, DB_ENV_TXN_NOSYNC)) 1945 ret = __log_flush(env, NULL); 1946 if (ret != 0) { 1947 __db_errx(env, "Error processing txn [%lu][%lu]", 1948 (u_long)rp->lsn.file, (u_long)rp->lsn.offset); 1949 ret = __env_panic(env, ret); 1950 } 1951 break; 1952 case DB___txn_xa_regop: 1953 ret = __log_flush(env, NULL); 1954 /* 1955 * Save the biggest prepared LSN we've seen. 1956 */ 1957 rep->max_prep_lsn = rp->lsn; 1958 RPRINT(env, DB_VERB_REP_MSGS, 1959 (env, "process_rec: prepare at [%lu][%lu]", 1960 (u_long)rep->max_prep_lsn.file, 1961 (u_long)rep->max_prep_lsn.offset)); 1962 break; 1963 case DB___txn_ckp: 1964 /* 1965 * We do not want to hold the REP->mtx_clientdb mutex while 1966 * syncing the mpool, so if we get a checkpoint record we are 1967 * supposed to process, add it to the __db.rep.db, do the 1968 * memp_sync and then go back and process it later, when the 1969 * sync has finished. If this record is already in the table, 1970 * then some other thread will process it, so simply return 1971 * REP_NOTPERM. 1972 */ 1973 memset(&key_dbt, 0, sizeof(key_dbt)); 1974 key_dbt.data = rp; 1975 key_dbt.size = sizeof(*rp); 1976 1977 /* 1978 * We want to put this record into the tmp DB only if 1979 * it doesn't exist, so use DB_NOOVERWRITE. 1980 */ 1981 ret = __db_put(dbp, ip, NULL, &key_dbt, rec, DB_NOOVERWRITE); 1982 if (ret == DB_KEYEXIST) { 1983 if (ret_lsnp != NULL) 1984 *ret_lsnp = rp->lsn; 1985 ret = DB_REP_NOTPERM; 1986 } 1987 if (ret != 0) 1988 break; 1989 1990 /* 1991 * Now, do the checkpoint. Regardless of 1992 * whether the checkpoint succeeds or not, 1993 * we need to remove the record we just put 1994 * in the temporary database. If the 1995 * checkpoint failed, return an error. We 1996 * will act like we never received the 1997 * checkpoint. 1998 */ 1999 if ((ret = __rep_do_ckp(env, rec, rp)) == 0) 2000 ret = __log_rep_put(env, &rp->lsn, rec, 2001 DB_LOG_CHKPNT); 2002 if ((t_ret = __rep_remfirst(env, ip, 2003 &control_dbt, &rec_dbt)) != 0 && ret == 0) 2004 ret = t_ret; 2005 /* 2006 * If we're successful putting the log record in the 2007 * log, flush it for a checkpoint. 2008 */ 2009 if (ret == 0) 2010 ret = __log_flush(env, NULL); 2011 break; 2012 default: 2013 break; 2014 } 2015 2016out: 2017 if (ret == 0 && F_ISSET(rp, REPCTL_PERM)) 2018 *ret_lsnp = rp->lsn; 2019 if (IS_USING_LEASES(env) && 2020 F_ISSET(rp, REPCTL_LEASE)) 2021 *ret_tsp = msg_time; 2022 /* 2023 * Set ret_lsnp before flushing the log because if the 2024 * flush fails, we've still written the record to the 2025 * log and the LSN has been entered. 2026 */ 2027 if (ret == 0 && F_ISSET(rp, REPCTL_FLUSH)) 2028 ret = __log_flush(env, NULL); 2029 if (control_dbt.data != NULL) 2030 __os_ufree(env, control_dbt.data); 2031 if (rec_dbt.data != NULL) 2032 __os_ufree(env, rec_dbt.data); 2033 2034 return (ret); 2035} 2036 2037/* 2038 * __rep_resend_req -- 2039 * We might have dropped a message, we need to resend our request. 2040 * The request we send is dependent on what recovery state we're in. 2041 * The caller holds no locks. 2042 * 2043 * PUBLIC: int __rep_resend_req __P((ENV *, int)); 2044 */ 2045int 2046__rep_resend_req(env, rereq) 2047 ENV *env; 2048 int rereq; 2049{ 2050 DB_LOG *dblp; 2051 DB_LSN lsn; 2052 DB_REP *db_rep; 2053 LOG *lp; 2054 REP *rep; 2055 int ret; 2056 u_int32_t gapflags, repflags; 2057 2058 db_rep = env->rep_handle; 2059 rep = db_rep->region; 2060 dblp = env->lg_handle; 2061 lp = dblp->reginfo.primary; 2062 ret = 0; 2063 2064 repflags = rep->flags; 2065 /* 2066 * If we are delayed we do not rerequest anything. 2067 */ 2068 if (FLD_ISSET(repflags, REP_F_DELAY)) 2069 return (ret); 2070 gapflags = rereq ? REP_GAP_REREQUEST : 0; 2071 2072 if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) { 2073 MUTEX_LOCK(env, rep->mtx_clientdb); 2074 lsn = lp->verify_lsn; 2075 MUTEX_UNLOCK(env, rep->mtx_clientdb); 2076 if (!IS_ZERO_LSN(lsn)) 2077 (void)__rep_send_message(env, rep->master_id, 2078 REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_REREQUEST); 2079 } else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) { 2080 /* 2081 * UPDATE_REQ only goes to the master. 2082 */ 2083 (void)__rep_send_message(env, rep->master_id, 2084 REP_UPDATE_REQ, NULL, NULL, 0, 0); 2085 } else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) { 2086 REP_SYSTEM_LOCK(env); 2087 ret = __rep_pggap_req(env, rep, NULL, gapflags); 2088 REP_SYSTEM_UNLOCK(env); 2089 } else { 2090 MUTEX_LOCK(env, rep->mtx_clientdb); 2091 ret = __rep_loggap_req(env, rep, NULL, gapflags); 2092 MUTEX_UNLOCK(env, rep->mtx_clientdb); 2093 } 2094 2095 return (ret); 2096} 2097 2098/* 2099 * __rep_check_doreq -- 2100 * PUBLIC: int __rep_check_doreq __P((ENV *, REP *)); 2101 * 2102 * Check if we need to send another request. If so, compare with 2103 * the request limits the user might have set. This assumes the 2104 * caller holds the REP->mtx_clientdb mutex. Returns 1 if a request 2105 * needs to be made, and 0 if it does not. 2106 */ 2107int 2108__rep_check_doreq(env, rep) 2109 ENV *env; 2110 REP *rep; 2111{ 2112 2113 DB_LOG *dblp; 2114 LOG *lp; 2115 db_timespec now; 2116 int req; 2117 2118 dblp = env->lg_handle; 2119 lp = dblp->reginfo.primary; 2120 __os_gettime(env, &now, 1); 2121 timespecsub(&now, &lp->rcvd_ts); 2122 req = timespeccmp(&now, &lp->wait_ts, >=); 2123 if (req) { 2124 /* 2125 * Add wait_ts to itself to double it. 2126 */ 2127 timespecadd(&lp->wait_ts, &lp->wait_ts); 2128 if (timespeccmp(&lp->wait_ts, &rep->max_gap, >)) 2129 lp->wait_ts = rep->max_gap; 2130 __os_gettime(env, &lp->rcvd_ts, 1); 2131 } 2132 return (req); 2133} 2134 2135/* 2136 * __rep_skip_msg - 2137 * 2138 * If we're in recovery we want to skip/ignore the message, but 2139 * we also need to see if we need to re-request any retransmissions. 2140 */ 2141static int 2142__rep_skip_msg(env, rep, eid, rectype) 2143 ENV *env; 2144 REP *rep; 2145 int eid; 2146 u_int32_t rectype; 2147{ 2148 int do_req, ret; 2149 2150 ret = 0; 2151 /* 2152 * If we have a request message from a client then immediately 2153 * send a REP_REREQUEST back to that client since we're skipping it. 2154 */ 2155 if (F_ISSET(rep, REP_F_CLIENT) && REP_MSG_REQ(rectype)) 2156 do_req = 1; 2157 else { 2158 /* Check for need to retransmit. */ 2159 MUTEX_LOCK(env, rep->mtx_clientdb); 2160 do_req = __rep_check_doreq(env, rep); 2161 MUTEX_UNLOCK(env, rep->mtx_clientdb); 2162 } 2163 /* 2164 * Don't respond to a MASTER_REQ with 2165 * a MASTER_REQ or REREQUEST. 2166 */ 2167 if (do_req && rectype != REP_MASTER_REQ) { 2168 /* 2169 * There are three cases: 2170 * 1. If we don't know who the master is, then send MASTER_REQ. 2171 * 2. If the message we're skipping came from the master, 2172 * then we need to rerequest. 2173 * 3. If the message didn't come from a master (i.e. client 2174 * to client), then send a rerequest back to the sender so 2175 * the sender can rerequest it elsewhere, if we are a client. 2176 */ 2177 if (rep->master_id == DB_EID_INVALID) /* Case 1. */ 2178 (void)__rep_send_message(env, 2179 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0); 2180 else if (eid == rep->master_id) /* Case 2. */ 2181 ret = __rep_resend_req(env, 0); 2182 else if (F_ISSET(rep, REP_F_CLIENT)) /* Case 3. */ 2183 (void)__rep_send_message(env, 2184 eid, REP_REREQUEST, NULL, NULL, 0, 0); 2185 } 2186 return (ret); 2187} 2188 2189static int 2190__rep_fire_newmaster(env, gen, master) 2191 ENV *env; 2192 u_int32_t gen; 2193 int master; 2194{ 2195 DB_REP *db_rep; 2196 REP *rep; 2197 2198 db_rep = env->rep_handle; 2199 rep = db_rep->region; 2200 2201 REP_EVENT_LOCK(env); 2202 /* 2203 * The firing of this event should be idempotent with respect to a 2204 * particular generation number. 2205 */ 2206 if (rep->newmaster_event_gen < gen) { 2207 __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master); 2208 rep->newmaster_event_gen = gen; 2209 } 2210 REP_EVENT_UNLOCK(env); 2211 return (0); 2212} 2213 2214static int 2215__rep_fire_startupdone(env, gen, master) 2216 ENV *env; 2217 u_int32_t gen; 2218 int master; 2219{ 2220 DB_REP *db_rep; 2221 REP *rep; 2222 2223 db_rep = env->rep_handle; 2224 rep = db_rep->region; 2225 2226 REP_EVENT_LOCK(env); 2227 /* 2228 * Usually NEWMASTER will already have been fired. But if not, fire 2229 * it here now, to ensure the application receives events in the 2230 * expected order. 2231 */ 2232 if (rep->newmaster_event_gen < gen) { 2233 __rep_fire_event(env, DB_EVENT_REP_NEWMASTER, &master); 2234 rep->newmaster_event_gen = gen; 2235 } 2236 2237 /* 2238 * Caller already ensures that it only tries to fire STARTUPDONE once 2239 * per generation. If we did not want to rely on that, we could add a 2240 * simple boolean flag (to the set of data protected by the mtx_event). 2241 * The precise meaning of that flag would be "STARTUPDONE has been fired 2242 * for the generation value stored in `newmaster_event_gen'". Then the 2243 * more accurate test here would be simply to check that flag, and fire 2244 * the event (and set the flag) if it were not already set. 2245 */ 2246 if (rep->newmaster_event_gen == gen) 2247 __rep_fire_event(env, DB_EVENT_REP_STARTUPDONE, NULL); 2248 REP_EVENT_UNLOCK(env); 2249 return (0); 2250} 2251