1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2004,2008 Oracle. All rights reserved. 5 * 6 * $Id: rep_backup.c,v 12.153 2008/05/05 17:47:02 sue Exp $ 7 */ 8 9#include "db_config.h" 10 11#include "db_int.h" 12#include "dbinc/db_page.h" 13#include "dbinc/db_am.h" 14#include "dbinc/fop.h" 15#include "dbinc/lock.h" 16#include "dbinc/log.h" 17#include "dbinc/mp.h" 18#include "dbinc/qam.h" 19#include "dbinc/txn.h" 20 21static int __rep_check_uid __P((ENV *, u_int8_t *, u_int8_t *, u_int32_t, 22 u_int8_t *)); 23static int __rep_filedone __P((ENV *, DB_THREAD_INFO *ip, int, 24 REP *, __rep_fileinfo_args *, u_int32_t)); 25static int __rep_find_dbs __P((ENV *, u_int32_t, u_int8_t **, size_t *, 26 size_t *, u_int32_t *)); 27static int __rep_get_fileinfo __P((ENV *, const char *, 28 const char *, __rep_fileinfo_args *, u_int8_t *, u_int32_t *)); 29static int __rep_get_file_list __P((ENV *, 30 DB_FH *, u_int32_t, u_int32_t *, DBT *)); 31static int __rep_log_setup __P((ENV *, 32 REP *, u_int32_t, u_int32_t, DB_LSN *)); 33static int __rep_mpf_open __P((ENV *, DB_MPOOLFILE **, 34 __rep_fileinfo_args *, u_int32_t)); 35static int __rep_nextfile __P((ENV *, int, REP *)); 36static int __rep_page_gap __P((ENV *, 37 REP *, __rep_fileinfo_args *, u_int32_t)); 38static int __rep_page_sendpages __P((ENV *, DB_THREAD_INFO *, int, 39 __rep_control_args *, __rep_fileinfo_args *, DB_MPOOLFILE *, DB *)); 40static int __rep_queue_filedone __P((ENV *, 41 DB_THREAD_INFO *, REP *, __rep_fileinfo_args *)); 42static int __rep_remove_all __P((ENV *, u_int32_t, DBT *)); 43static int __rep_remove_file __P((ENV *, u_int8_t *, const char *, 44 u_int32_t, u_int32_t)); 45static int __rep_remove_logs __P((ENV *)); 46static int __rep_remove_by_list __P((ENV *, u_int32_t, 47 u_int8_t *, u_int32_t, u_int32_t)); 48static int __rep_remove_by_prefix __P((ENV *, const char *, const char *, 49 size_t, APPNAME)); 50static int __rep_walk_dir __P((ENV *, const char *, u_int32_t, u_int8_t **, 51 u_int8_t *, size_t *, size_t *, u_int32_t *)); 52static int __rep_write_page __P((ENV *, 53 DB_THREAD_INFO *, REP *, __rep_fileinfo_args *)); 54 55/* 56 * __rep_update_req - 57 * Process an update_req and send the file information to the client. 58 * 59 * PUBLIC: int __rep_update_req __P((ENV *, __rep_control_args *, int)); 60 */ 61int 62__rep_update_req(env, rp, eid) 63 ENV *env; 64 __rep_control_args *rp; 65 int eid; 66{ 67 DBT updbt, vdbt; 68 DB_LOG *dblp; 69 DB_LOGC *logc; 70 DB_LSN lsn; 71 __rep_update_args u_args; 72 size_t filelen, filesz, updlen; 73 u_int32_t filecnt, flag, version; 74 u_int8_t *buf, *fp; 75 int ret, t_ret; 76 77 /* 78 * Allocate enough for all currently open files and then some. 79 * Optimize for the common use of having most databases open. 80 * Allocate dbentry_cnt * 2 plus an estimated 60 bytes per 81 * file for the filename/path (or multiplied by 120). 82 * 83 * The data we send looks like this: 84 * __rep_update_args 85 * __rep_fileinfo_args 86 * __rep_fileinfo_args 87 * ... 88 */ 89 dblp = env->lg_handle; 90 logc = NULL; 91 filecnt = 0; 92 filelen = 0; 93 updlen = 0; 94 filesz = MEGABYTE; 95 if ((ret = __os_calloc(env, 1, filesz, &buf)) != 0) 96 return (ret); 97 98 /* 99 * First get our file information. Get in-memory files first 100 * then get on-disk files. 101 */ 102 fp = buf + __REP_UPDATE_SIZE; 103 if ((ret = __rep_find_dbs(env, rp->rep_version, 104 &fp, &filesz, &filelen, &filecnt)) != 0) 105 goto err; 106 107 /* 108 * Now get our first LSN. We send the lsn of the first 109 * non-archivable log file. 110 */ 111 flag = DB_SET; 112 if ((ret = __log_get_stable_lsn(env, &lsn)) != 0) { 113 if (ret != DB_NOTFOUND) 114 goto err; 115 /* 116 * If ret is DB_NOTFOUND then there is no checkpoint 117 * in this log, that is okay, just start at the beginning. 118 */ 119 ret = 0; 120 flag = DB_FIRST; 121 } 122 123 /* 124 * Now get the version number of the log file of that LSN. 125 */ 126 if ((ret = __log_cursor(env, &logc)) != 0) 127 goto err; 128 129 memset(&vdbt, 0, sizeof(vdbt)); 130 /* 131 * Set our log cursor on the LSN we are sending. Or 132 * to the first LSN if we have no stable LSN. 133 */ 134 if ((ret = __logc_get(logc, &lsn, &vdbt, flag)) != 0) { 135 /* 136 * We could be racing a fresh master starting up. If we 137 * have no log records, assume an initial LSN and current 138 * log version. 139 */ 140 if (ret != DB_NOTFOUND) 141 goto err; 142 INIT_LSN(lsn); 143 version = DB_LOGVERSION; 144 } else { 145 if ((ret = __logc_version(logc, &version)) != 0) 146 goto err; 147 } 148 /* 149 * Package up the update information. 150 */ 151 u_args.first_lsn = lsn; 152 u_args.first_vers = version; 153 u_args.num_files = filecnt; 154 if ((ret = __rep_update_marshal(env, rp->rep_version, 155 &u_args, buf, filesz, &updlen)) != 0) 156 goto err; 157 /* 158 * We have all the file information now. Send it to the client. 159 */ 160 DB_INIT_DBT(updbt, buf, filelen + updlen); 161 162 LOG_SYSTEM_LOCK(env); 163 lsn = ((LOG *)dblp->reginfo.primary)->lsn; 164 LOG_SYSTEM_UNLOCK(env); 165 (void)__rep_send_message( 166 env, eid, REP_UPDATE, &lsn, &updbt, 0, 0); 167 168err: __os_free(env, buf); 169 if (logc != NULL && (t_ret = __logc_close(logc)) != 0 && ret == 0) 170 ret = t_ret; 171 return (ret); 172} 173 174/* 175 * __rep_find_dbs - 176 * Walk through all the named files/databases including those in the 177 * environment or data_dirs and those that in named and in-memory. We 178 * need to open them, gather the necessary information and then close 179 * them. 180 * 181 * !!! 182 * The pointer *fp is expected to point into a buffer that may be used for an 183 * UPDATE message, at an offset equal to the size of __rep_update_args. This 184 * assumption is relied upon if the buffer is found to be too small and must be 185 * reallocated. 186 */ 187static int 188__rep_find_dbs(env, version, fp, fileszp, filelenp, filecntp) 189 ENV *env; 190 u_int32_t version; 191 u_int8_t **fp; 192 size_t *fileszp, *filelenp; 193 u_int32_t *filecntp; 194{ 195 DB_ENV *dbenv; 196 int ret; 197 char **ddir, *real_dir; 198 u_int8_t *origfp; 199 200 dbenv = env->dbenv; 201 ret = 0; 202 real_dir = NULL; 203 204 if (dbenv->db_data_dir == NULL) { 205 /* 206 * If we don't have a data dir, we have just the 207 * env home dir. 208 */ 209 ret = __rep_walk_dir(env, env->db_home, version, fp, NULL, 210 fileszp, filelenp, filecntp); 211 } else { 212 origfp = *fp; 213 for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) { 214 if ((ret = __db_appname(env, DB_APP_NONE, 215 *ddir, 0, NULL, &real_dir)) != 0) 216 break; 217 if ((ret = __rep_walk_dir(env, real_dir, version, fp, 218 origfp, fileszp, filelenp, filecntp)) != 0) 219 break; 220 __os_free(env, real_dir); 221 real_dir = NULL; 222 } 223 } 224 225 /* Now, collect any in-memory named databases. */ 226 if (ret == 0) 227 ret = __rep_walk_dir(env, NULL, version, 228 fp, NULL, fileszp, filelenp, filecntp); 229 230 if (real_dir != NULL) 231 __os_free(env, real_dir); 232 return (ret); 233} 234 235/* 236 * __rep_walk_dir -- 237 * 238 * This is the routine that walks a directory and fills in the structures 239 * that we use to generate messages to the client telling it what 240 * files are available. If the directory name is NULL, then we should 241 * walk the list of in-memory named files. 242 */ 243static int 244__rep_walk_dir(env, dir, version, fp, origfp, fileszp, filelenp, filecntp) 245 ENV *env; 246 const char *dir; 247 u_int32_t version; 248 u_int8_t **fp, *origfp; 249 size_t *fileszp, *filelenp; 250 u_int32_t *filecntp; 251{ 252 __rep_fileinfo_args tmpfp; 253 size_t len, offset; 254 int cnt, first_file, i, ret; 255 u_int8_t *rfp, uid[DB_FILE_ID_LEN]; 256 char *file, **names, *subdb; 257 258 if (dir == NULL) { 259 RPRINT(env, DB_VERB_REP_SYNC, (env, 260 "Walk_dir: Getting info for in-memory named files")); 261 if ((ret = __memp_inmemlist(env, &names, &cnt)) != 0) 262 return (ret); 263 } else { 264 RPRINT(env, DB_VERB_REP_SYNC, (env, 265 "Walk_dir: Getting info for dir: %s", dir)); 266 if ((ret = __os_dirlist(env, dir, 0, &names, &cnt)) != 0) 267 return (ret); 268 } 269 rfp = NULL; 270 if (fp != NULL) 271 rfp = *fp; 272 RPRINT(env, DB_VERB_REP_SYNC, (env, "Walk_dir: Dir %s has %d files", 273 (dir == NULL) ? "INMEM" : dir, cnt)); 274 first_file = 1; 275 for (i = 0; i < cnt; i++) { 276 RPRINT(env, DB_VERB_REP_SYNC, (env, 277 "Walk_dir: File %d name: %s", i, names[i])); 278 /* 279 * Skip DB-owned files: __db*, DB_CONFIG, log* 280 */ 281 if (strncmp(names[i], 282 DB_REGION_PREFIX, sizeof(DB_REGION_PREFIX) - 1) == 0) 283 continue; 284 if (strncmp(names[i], "DB_CONFIG", 9) == 0) 285 continue; 286 if (strncmp(names[i], "log.", 4) == 0) 287 continue; 288 /* 289 * We found a file to process. Check if we need 290 * to allocate more space. 291 */ 292 if (dir == NULL) { 293 file = NULL; 294 subdb = names[i]; 295 } else { 296 file = names[i]; 297 subdb = NULL; 298 } 299 if ((ret = __rep_get_fileinfo(env, 300 file, subdb, &tmpfp, uid, filecntp)) != 0) { 301 /* 302 * If we find a file that isn't a database, skip it. 303 */ 304 RPRINT(env, DB_VERB_REP_SYNC, (env, 305 "Walk_dir: File %d %s: returned error %s", 306 i, names[i], db_strerror(ret))); 307 ret = 0; 308 continue; 309 } 310 RPRINT(env, DB_VERB_REP_SYNC, (env, 311 "Walk_dir: File %d (of %d) %s at 0x%lx: pgsize %lu, max_pgno %lu", 312 tmpfp.filenum, *filecntp, names[i], P_TO_ULONG(rfp), 313 (u_long)tmpfp.pgsize, (u_long)tmpfp.max_pgno)); 314 315 /* 316 * Check if we already have info on this file. Since we're 317 * walking directories, we only need to check the first 318 * file to discover if we have a duplicate data_dir. 319 */ 320 if (first_file && origfp != NULL) { 321 /* 322 * If we have any file info, check if we have this uid. 323 */ 324 if (rfp != origfp && 325 (ret = __rep_check_uid(env, origfp, 326 origfp + *filelenp, version, uid)) != 0) { 327 /* 328 * If we have this uid. Adjust the file 329 * count and stop processing this dir. 330 */ 331 if (ret == DB_KEYEXIST) { 332 ret = 0; 333 (*filecntp)--; 334 } 335 goto err; 336 } 337 first_file = 0; 338 } 339 340 DB_SET_DBT(tmpfp.info, names[i], strlen(names[i]) + 1); 341 DB_SET_DBT(tmpfp.uid, uid, DB_FILE_ID_LEN); 342retry: ret = __rep_fileinfo_marshal(env, version, 343 &tmpfp, rfp, *fileszp, &len); 344 if (ret == ENOMEM) { 345 offset = (size_t)(rfp - *fp); 346 *fileszp *= 2; 347 /* 348 * Need to account for update info on both sides 349 * of the allocation. 350 */ 351 *fp -= __REP_UPDATE_SIZE; 352 if ((ret = __os_realloc(env, *fileszp, *fp)) != 0) 353 break; 354 *fp += __REP_UPDATE_SIZE; 355 rfp = *fp + offset; 356 /* 357 * Now that we've reallocated the space, try to 358 * store it again. 359 */ 360 goto retry; 361 } 362 rfp += len; 363 *fp = rfp; 364 *filelenp += len; 365 } 366err: 367 __os_dirfree(env, names, cnt); 368 return (ret); 369} 370 371/* 372 * This function is called when we process the first file of any 373 * new directory for internal init. We walk the list of current 374 * files to see if we have already processed these files. This 375 * is to prevent transmitting the same file multiple times if the 376 * user calls env->set_data_dir on the same directory more than once. 377 */ 378static int 379__rep_check_uid(env, origfp, endfp, version, uid) 380 ENV *env; 381 u_int32_t version; 382 u_int8_t *origfp, *endfp, *uid; 383{ 384 __rep_fileinfo_args *rfp; 385 size_t filesz; 386 u_int8_t *fp, *fuid, *new_fp; 387 int ret; 388 389 ret = 0; 390 fp = origfp; 391 rfp = NULL; 392 /* 393 * We don't know how many fp's there are, so compute the maximum 394 * size based on the endfp and the first fp. 395 */ 396 filesz = (uintptr_t)endfp - (uintptr_t)origfp; 397 while (fp <= endfp) { 398 if ((ret = __rep_fileinfo_unmarshal(env, version, 399 &rfp, fp, filesz, &new_fp)) != 0) { 400 __db_errx(env, "rep_check_uid: Could not malloc"); 401 goto err; 402 } 403 filesz -= (u_int32_t)(new_fp - fp); 404 fp = new_fp; 405 fuid = (u_int8_t *)rfp->uid.data; 406 if (memcmp(fuid, uid, DB_FILE_ID_LEN) == 0) { 407 RPRINT(env, DB_VERB_REP_SYNC, (env, 408 "Check_uid: Found matching file.")); 409 ret = DB_KEYEXIST; 410 goto err; 411 } 412 __os_free(env, rfp); 413 rfp = NULL; 414 } 415err: 416 if (rfp != NULL) 417 __os_free(env, rfp); 418 return (ret); 419 420} 421 422static int 423__rep_get_fileinfo(env, file, subdb, rfp, uid, filecntp) 424 ENV *env; 425 const char *file, *subdb; 426 __rep_fileinfo_args *rfp; 427 u_int8_t *uid; 428 u_int32_t *filecntp; 429{ 430 DB *dbp; 431 DBC *dbc; 432 DBMETA *dbmeta; 433 DB_LOCK lk; 434 DB_MPOOLFILE *mpf; 435 DB_THREAD_INFO *ip; 436 PAGE *pagep; 437 int lorder, ret, t_ret; 438 439 dbp = NULL; 440 dbc = NULL; 441 pagep = NULL; 442 mpf = NULL; 443 LOCK_INIT(lk); 444 445 ENV_GET_THREAD_INFO(env, ip); 446 447 if ((ret = __db_create_internal(&dbp, env, 0)) != 0) 448 goto err; 449 if ((ret = __db_open(dbp, ip, NULL, file, subdb, DB_UNKNOWN, 450 DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0), 451 0, PGNO_BASE_MD)) != 0) 452 goto err; 453 454 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0) 455 goto err; 456 if ((ret = __db_lget( 457 dbc, 0, dbp->meta_pgno, DB_LOCK_READ, 0, &lk)) != 0) 458 goto err; 459 if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, ip, dbc->txn, 460 0, &pagep)) != 0) 461 goto err; 462 /* 463 * We have the meta page. Set up our information. 464 */ 465 dbmeta = (DBMETA *)pagep; 466 rfp->pgno = 0; 467 /* 468 * Queue is a special-case. We need to set max_pgno to 0 so that 469 * the client can compute the pages from the meta-data. 470 */ 471 if (dbp->type == DB_QUEUE) 472 rfp->max_pgno = 0; 473 else 474 rfp->max_pgno = dbmeta->last_pgno; 475 rfp->pgsize = dbp->pgsize; 476 memcpy(uid, dbp->fileid, DB_FILE_ID_LEN); 477 rfp->filenum = (*filecntp)++; 478 rfp->type = (u_int32_t)dbp->type; 479 rfp->db_flags = dbp->flags; 480 rfp->finfo_flags = 0; 481 /* 482 * Send the lorder of this database. 483 */ 484 (void)__db_get_lorder(dbp, &lorder); 485 if (lorder == 1234) 486 FLD_SET(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN); 487 else 488 FLD_CLR(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN); 489 490 ret = __memp_fput(dbp->mpf, ip, pagep, dbc->priority); 491 pagep = NULL; 492 if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0) 493 ret = t_ret; 494 if (ret != 0) 495 goto err; 496err: 497 if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0) 498 ret = t_ret; 499 if (pagep != NULL && (t_ret = 500 __memp_fput(mpf, ip, pagep, dbc->priority)) != 0 && ret == 0) 501 ret = t_ret; 502 if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) 503 ret = t_ret; 504 if (dbp != NULL && (t_ret = __db_close(dbp, NULL, 0)) != 0 && ret == 0) 505 ret = t_ret; 506 return (ret); 507} 508 509/* 510 * __rep_page_req 511 * Process a page_req and send the page information to the client. 512 * 513 * PUBLIC: int __rep_page_req __P((ENV *, 514 * PUBLIC: DB_THREAD_INFO *, int, __rep_control_args *, DBT *)); 515 */ 516int 517__rep_page_req(env, ip, eid, rp, rec) 518 ENV *env; 519 DB_THREAD_INFO *ip; 520 int eid; 521 __rep_control_args *rp; 522 DBT *rec; 523{ 524 __rep_fileinfo_args *msgfp; 525 DB_MPOOLFILE *mpf; 526 DB_REP *db_rep; 527 REP *rep; 528 int ret, t_ret; 529 u_int8_t *next; 530 531 db_rep = env->rep_handle; 532 rep = db_rep->region; 533 534 if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version, 535 &msgfp, rec->data, rec->size, &next)) != 0) 536 return (ret); 537 538 RPRINT(env, DB_VERB_REP_SYNC, 539 (env, "page_req: file %d page %lu to %lu", 540 msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno)); 541 542 /* 543 * We need to open the file and then send its pages. 544 * If we cannot open the file, we send REP_FILE_FAIL. 545 */ 546 RPRINT(env, DB_VERB_REP_SYNC, 547 (env, "page_req: Open %d via mpf_open", msgfp->filenum)); 548 if ((ret = __rep_mpf_open(env, &mpf, msgfp, 0)) != 0) { 549 RPRINT(env, DB_VERB_REP_SYNC, 550 (env, "page_req: Open %d failed", msgfp->filenum)); 551 if (F_ISSET(rep, REP_F_MASTER)) 552 (void)__rep_send_message(env, eid, REP_FILE_FAIL, 553 NULL, rec, 0, 0); 554 else 555 ret = DB_NOTFOUND; 556 goto err; 557 } 558 559 ret = __rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, NULL); 560 t_ret = __memp_fclose(mpf, 0); 561 if (ret == 0 && t_ret != 0) 562 ret = t_ret; 563err: 564 __os_free(env, msgfp); 565 return (ret); 566} 567 568static int 569__rep_page_sendpages(env, ip, eid, rp, msgfp, mpf, dbp) 570 ENV *env; 571 DB_THREAD_INFO *ip; 572 int eid; 573 __rep_control_args *rp; 574 __rep_fileinfo_args *msgfp; 575 DB_MPOOLFILE *mpf; 576 DB *dbp; 577{ 578 DB *qdbp; 579 DBC *qdbc; 580 DBT lockdbt, msgdbt; 581 DB_LOCK lock; 582 DB_LOCKER *locker; 583 DB_LOCK_ILOCK lock_obj; 584 DB_LOG *dblp; 585 DB_LSN lsn; 586 DB_REP *db_rep; 587 PAGE *pagep; 588 REP *rep; 589 REP_BULK bulk; 590 REP_THROTTLE repth; 591 db_pgno_t p; 592 uintptr_t bulkoff; 593 size_t len, msgsz; 594 u_int32_t bulkflags, use_bulk; 595 int opened, ret, t_ret; 596 u_int8_t *buf; 597 598 db_rep = env->rep_handle; 599 rep = db_rep->region; 600 locker = NULL; 601 opened = 0; 602 t_ret = 0; 603 qdbp = NULL; 604 qdbc = NULL; 605 buf = NULL; 606 bulk.addr = NULL; 607 use_bulk = FLD_ISSET(rep->config, REP_C_BULK); 608 if (msgfp->type == (u_int32_t)DB_QUEUE) { 609 if (dbp == NULL) { 610 if ((ret = __db_create_internal(&qdbp, env, 0)) != 0) 611 goto err; 612 /* 613 * We need to check whether this is in-memory so that 614 * we pass the name correctly as either the file or 615 * the database name. 616 */ 617 if ((ret = __db_open(qdbp, ip, NULL, 618 FLD_ISSET(msgfp->db_flags, DB_AM_INMEM) ? 619 NULL : msgfp->info.data, 620 FLD_ISSET(msgfp->db_flags, DB_AM_INMEM) ? 621 msgfp->info.data : NULL, 622 DB_UNKNOWN, 623 DB_RDONLY | (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0), 624 0, PGNO_BASE_MD)) != 0) 625 goto err; 626 opened = 1; 627 } else 628 qdbp = dbp; 629 if ((ret = __db_cursor(qdbp, ip, NULL, &qdbc, 0)) != 0) 630 goto err; 631 } 632 msgsz = __REP_FILEINFO_SIZE + DB_FILE_ID_LEN + msgfp->pgsize; 633 if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0) 634 goto err; 635 memset(&msgdbt, 0, sizeof(msgdbt)); 636 RPRINT(env, DB_VERB_REP_SYNC, 637 (env, "sendpages: file %d page %lu to %lu", 638 msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno)); 639 memset(&repth, 0, sizeof(repth)); 640 /* 641 * If we're doing bulk transfer, allocate a bulk buffer to put our 642 * pages in. We still need to initialize the throttle info 643 * because if we encounter a page larger than our entire bulk 644 * buffer, we need to send it as a singleton. 645 * 646 * Use a local var so that we don't need to worry if someone else 647 * turns on/off bulk in the middle of our call here. 648 */ 649 if (use_bulk && (ret = __rep_bulk_alloc(env, &bulk, eid, 650 &bulkoff, &bulkflags, REP_BULK_PAGE)) != 0) 651 goto err; 652 REP_SYSTEM_LOCK(env); 653 repth.gbytes = rep->gbytes; 654 repth.bytes = rep->bytes; 655 repth.type = REP_PAGE; 656 repth.data_dbt = &msgdbt; 657 REP_SYSTEM_UNLOCK(env); 658 659 /* 660 * Set up locking. 661 */ 662 LOCK_INIT(lock); 663 memset(&lock_obj, 0, sizeof(lock_obj)); 664 if ((ret = __lock_id(env, NULL, &locker)) != 0) 665 goto err; 666 memcpy(lock_obj.fileid, mpf->fileid, DB_FILE_ID_LEN); 667 lock_obj.type = DB_PAGE_LOCK; 668 669 memset(&lockdbt, 0, sizeof(lockdbt)); 670 lockdbt.data = &lock_obj; 671 lockdbt.size = sizeof(lock_obj); 672 673 for (p = msgfp->pgno; p <= msgfp->max_pgno; p++) { 674 /* 675 * We're not waiting for the lock, if we cannot get 676 * the lock for this page, skip it. The gap 677 * code will rerequest it. 678 */ 679 lock_obj.pgno = p; 680 if ((ret = __lock_get(env, locker, DB_LOCK_NOWAIT, &lockdbt, 681 DB_LOCK_READ, &lock)) != 0) { 682 /* 683 * Continue if we couldn't get the lock. 684 */ 685 if (ret == DB_LOCK_NOTGRANTED) { 686 ret = 0; 687 continue; 688 } 689 /* 690 * Otherwise we have an error. 691 */ 692 goto err; 693 } 694 if (msgfp->type == (u_int32_t)DB_QUEUE && p != 0) 695#ifdef HAVE_QUEUE 696 ret = __qam_fget(qdbc, &p, DB_MPOOL_CREATE, &pagep); 697#else 698 ret = DB_PAGE_NOTFOUND; 699#endif 700 else 701 ret = __memp_fget(mpf, &p, ip, NULL, 702 DB_MPOOL_CREATE, &pagep); 703 msgfp->pgno = p; 704 if (ret == DB_PAGE_NOTFOUND) { 705 ZERO_LSN(lsn); 706 if (F_ISSET(rep, REP_F_MASTER)) { 707 ret = 0; 708 RPRINT(env, DB_VERB_REP_SYNC, (env, 709 "sendpages: PAGE_FAIL on page %lu", 710 (u_long)p)); 711 (void)__rep_send_message(env, eid, 712 REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0); 713 } else 714 ret = DB_NOTFOUND; 715 goto lockerr; 716 } else if (ret != 0) 717 goto lockerr; 718 else 719 DB_SET_DBT(msgfp->info, pagep, msgfp->pgsize); 720 len = 0; 721 /* 722 * Send along an indication of the byte order of this mpool 723 * page. Since mpool always keeps pages in the native byte 724 * order of the local environment, this is simply my 725 * environment's byte order. 726 * 727 * Since pages can be served from a variety of sites when using 728 * client-to-client synchronization, the receiving client needs 729 * to know the byte order of each page independently. 730 */ 731 if (F_ISSET(env, ENV_LITTLEENDIAN)) 732 FLD_SET(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN); 733 else 734 FLD_CLR(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN); 735 RPRINT(env, DB_VERB_REP_SYNC, (env, 736 "sendpages: %lu, page lsn [%lu][%lu]", (u_long)p, 737 (u_long)pagep->lsn.file, (u_long)pagep->lsn.offset)); 738 ret = __rep_fileinfo_marshal(env, rp->rep_version, 739 msgfp, buf, msgsz, &len); 740 if (msgfp->type != (u_int32_t)DB_QUEUE || p == 0) 741 t_ret = __memp_fput(mpf, 742 ip, pagep, DB_PRIORITY_UNCHANGED); 743#ifdef HAVE_QUEUE 744 else 745 /* 746 * We don't need an #else for HAVE_QUEUE here because if 747 * we're not compiled with queue, then we're guaranteed 748 * to have set REP_PAGE_FAIL above. 749 */ 750 t_ret = __qam_fput(qdbc, p, pagep, qdbp->priority); 751#endif 752 if (t_ret != 0 && ret == 0) 753 ret = t_ret; 754 if ((t_ret = __ENV_LPUT(env, lock)) != 0 && ret == 0) 755 ret = t_ret; 756 if (ret != 0) 757 goto err; 758 759 DB_ASSERT(env, len <= msgsz); 760 DB_SET_DBT(msgdbt, buf, len); 761 762 dblp = env->lg_handle; 763 LOG_SYSTEM_LOCK(env); 764 repth.lsn = ((LOG *)dblp->reginfo.primary)->lsn; 765 LOG_SYSTEM_UNLOCK(env); 766 /* 767 * If we are configured for bulk, try to send this as a bulk 768 * request. If not configured, or it is too big for bulk 769 * then just send normally. 770 */ 771 if (use_bulk) 772 ret = __rep_bulk_message(env, &bulk, &repth, 773 &repth.lsn, &msgdbt, 0); 774 if (!use_bulk || ret == DB_REP_BULKOVF) 775 ret = __rep_send_throttle(env, eid, &repth, 0, 0); 776 RPRINT(env, DB_VERB_REP_SYNC, (env, 777 "sendpages: %lu, lsn [%lu][%lu]", (u_long)p, 778 (u_long)repth.lsn.file, (u_long)repth.lsn.offset)); 779 /* 780 * If we have REP_PAGE_MORE we need to break this loop. 781 * Otherwise, with REP_PAGE, we keep going. 782 */ 783 if (repth.type == REP_PAGE_MORE || ret != 0) { 784 /* Ignore send failure, except to break the loop. */ 785 if (ret == DB_REP_UNAVAIL) 786 ret = 0; 787 break; 788 } 789 } 790 791 if (0) { 792lockerr: if ((t_ret = __ENV_LPUT(env, lock)) != 0 && ret == 0) 793 ret = t_ret; 794 } 795err: 796 /* 797 * We're done, force out whatever remains in the bulk buffer and 798 * free it. 799 */ 800 if (use_bulk && bulk.addr != NULL && 801 (t_ret = __rep_bulk_free(env, &bulk, 0)) != 0 && ret == 0) 802 ret = t_ret; 803 if (qdbc != NULL && (t_ret = __dbc_close(qdbc)) != 0 && ret == 0) 804 ret = t_ret; 805 if (opened && (t_ret = __db_close(qdbp, NULL, DB_NOSYNC)) != 0 && 806 ret == 0) 807 ret = t_ret; 808 if (buf != NULL) 809 __os_free(env, buf); 810 if (locker != NULL && (t_ret = __lock_id_free(env, 811 locker)) != 0 && ret == 0) 812 ret = t_ret; 813 return (ret); 814} 815 816/* 817 * __rep_update_setup 818 * Process and setup with this file information. 819 * 820 * PUBLIC: int __rep_update_setup __P((ENV *, int, __rep_control_args *, 821 * PUBLIC: DBT *)); 822 */ 823int 824__rep_update_setup(env, eid, rp, rec) 825 ENV *env; 826 int eid; 827 __rep_control_args *rp; 828 DBT *rec; 829{ 830 DB_LOG *dblp; 831 DB_REP *db_rep; 832 DB_THREAD_INFO *ip; 833 LOG *lp; 834 REGENV *renv; 835 REGINFO *infop; 836 REP *rep; 837 __rep_update_args *rup; 838 int ret; 839 u_int32_t count; 840 u_int8_t *next; 841 842 db_rep = env->rep_handle; 843 rep = db_rep->region; 844 dblp = env->lg_handle; 845 lp = dblp->reginfo.primary; 846 ret = 0; 847 848 REP_SYSTEM_LOCK(env); 849 if (!F_ISSET(rep, REP_F_RECOVER_UPDATE) || IN_ELECTION(rep)) { 850 REP_SYSTEM_UNLOCK(env); 851 return (0); 852 } 853 F_CLR(rep, REP_F_RECOVER_UPDATE); 854 /* 855 * We know we're the first to come in here due to the 856 * REP_F_RECOVER_UPDATE flag. 857 */ 858 F_SET(rep, REP_F_RECOVER_PAGE); 859 /* 860 * We should not ever be in internal init with a lease granted. 861 */ 862 DB_ASSERT(env, 863 !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0); 864 865 /* 866 * We do not clear REP_F_READY_* in this code. 867 * We'll eventually call the normal __rep_verify_match recovery 868 * code and that will clear all the flags and allow others to 869 * proceed. We only need to lockout the API here. We do not 870 * need to lockout other message threads. 871 */ 872 if ((ret = __rep_lockout_api(env, rep)) != 0) 873 goto err; 874 /* 875 * We need to update the timestamp and kill any open handles 876 * on this client. The files are changing completely. 877 */ 878 infop = env->reginfo; 879 renv = infop->primary; 880 (void)time(&renv->rep_timestamp); 881 882 REP_SYSTEM_UNLOCK(env); 883 MUTEX_LOCK(env, rep->mtx_clientdb); 884 __os_gettime(env, &lp->rcvd_ts, 1); 885 lp->wait_ts = rep->request_gap; 886 ZERO_LSN(lp->ready_lsn); 887 ZERO_LSN(lp->verify_lsn); 888 ZERO_LSN(lp->waiting_lsn); 889 ZERO_LSN(lp->max_wait_lsn); 890 ZERO_LSN(lp->max_perm_lsn); 891 if (db_rep->rep_db == NULL) 892 ret = __rep_client_dbinit(env, 0, REP_DB); 893 MUTEX_UNLOCK(env, rep->mtx_clientdb); 894 if (ret != 0) 895 goto err_nolock; 896 if ((ret = __rep_update_unmarshal(env, rp->rep_version, 897 &rup, rec->data, rec->size, &next)) != 0) 898 goto err_nolock; 899 900 /* 901 * We need to empty out any old log records that might be in the 902 * temp database. 903 */ 904 ENV_GET_THREAD_INFO(env, ip); 905 if ((ret = __db_truncate(db_rep->rep_db, ip, NULL, &count)) != 0) 906 goto err_nolock; 907 908 /* 909 * We will remove all logs we have so we need to request 910 * from the master's beginning. 911 */ 912 REP_SYSTEM_LOCK(env); 913 rep->first_lsn = rup->first_lsn; 914 rep->first_vers = rup->first_vers; 915 rep->last_lsn = rp->lsn; 916 rep->nfiles = rup->num_files; 917 918 __os_free(env, rup); 919 920 RPRINT(env, DB_VERB_REP_SYNC, 921 (env, "Update setup for %d files.", rep->nfiles)); 922 RPRINT(env, DB_VERB_REP_SYNC, 923 (env, "Update setup: First LSN [%lu][%lu].", 924 (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset)); 925 RPRINT(env, DB_VERB_REP_SYNC, 926 (env, "Update setup: Last LSN [%lu][%lu]", 927 (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset)); 928 929 if (rep->nfiles > 0) { 930 rep->infoversion = rp->rep_version; 931 rep->originfolen = rep->infolen = 932 rec->size - __REP_UPDATE_SIZE; 933 if ((ret = __os_calloc(env, 1, rep->infolen, 934 &rep->originfo)) != 0) 935 goto err; 936 memcpy(rep->originfo, next, rep->infolen); 937 rep->nextinfo = rep->originfo; 938 } 939 940 /* 941 * We need to remove all logs and databases the client has prior to 942 * getting pages for current databases on the master. 943 */ 944 if ((ret = __rep_remove_all(env, rp->rep_version, rec)) != 0) 945 goto err; 946 947 rep->curfile = 0; 948 if ((ret = __rep_nextfile(env, eid, rep)) != 0) 949 goto err; 950 951 if (0) { 952err_nolock: REP_SYSTEM_LOCK(env); 953 } 954 955err: /* 956 * If we get an error, we cannot leave ourselves in the RECOVER_PAGE 957 * state because we have no file information. That also means undo'ing 958 * the rep_lockout. We need to move back to the RECOVER_UPDATE stage. 959 */ 960 if (ret != 0) { 961 if (rep->originfo != NULL) { 962 __os_free(env, rep->originfo); 963 rep->originfo = NULL; 964 } 965 RPRINT(env, DB_VERB_REP_SYNC, (env, 966 "Update_setup: Error: Clear PAGE, set UPDATE again. %s", 967 db_strerror(ret))); 968 F_CLR(rep, REP_F_RECOVER_PAGE | REP_F_READY_API | 969 REP_F_READY_OP); 970 F_SET(rep, REP_F_RECOVER_UPDATE); 971 } 972 REP_SYSTEM_UNLOCK(env); 973 return (ret); 974} 975 976/* 977 * Removes all existing logs and databases, at the start of internal init. But 978 * before we do, write a list of the databases onto the init file, so that in 979 * case we crash in the middle, we'll know how to resume when we restart. 980 * Finally, also write into the init file the UPDATE message from the master (in 981 * the "rec" DBT), which includes the (new) list of databases we intend to 982 * request copies of (again, so that we know what to do if we crash in the 983 * middle). 984 * 985 * For the sake of simplicity, these database lists are in the form of an UPDATE 986 * message (since we already have the mechanisms in place), even though strictly 987 * speaking that contains more information than we really need to store. 988 */ 989static int 990__rep_remove_all(env, msg_version, rec) 991 ENV *env; 992 u_int32_t msg_version; 993 DBT *rec; 994{ 995 __rep_fileinfo_args *finfo; 996 __rep_update_args u_args; 997 DB_FH *fhp; 998 size_t cnt, filelen, filesz, updlen; 999 u_int32_t bufsz, filecnt, fvers, mvers, zero; 1000 u_int8_t *buf, *fp, *new_fp, *origfp; 1001 int ret, t_ret; 1002 char *fname; 1003 1004 finfo = NULL; 1005 fname = NULL; 1006 fhp = NULL; 1007 1008 /* 1009 * 1. Get list of databases currently present at this client, which we 1010 * intend to remove. 1011 */ 1012 filelen = 0; 1013 filecnt = 0; 1014 filesz = MEGABYTE; 1015 if ((ret = __os_calloc(env, 1, filesz, &buf)) != 0) 1016 return (ret); 1017 origfp = fp = buf + __REP_UPDATE_SIZE; 1018 filesz -= __REP_UPDATE_SIZE; 1019 if ((ret = __rep_find_dbs(env, DB_REPVERSION, 1020 &fp, &filesz, &filelen, &filecnt)) != 0) 1021 goto out; 1022 ZERO_LSN(u_args.first_lsn); 1023 u_args.first_vers = 0; 1024 u_args.num_files = filecnt; 1025 if ((ret = __rep_update_marshal(env, DB_REPVERSION, 1026 &u_args, buf, filesz, &updlen)) != 0) 1027 goto out; 1028 1029 /* 1030 * 2. Before removing anything, safe-store the database list, so that in 1031 * case we crash before we've removed them all, when we restart we 1032 * can clean up what we were doing. 1033 * 1034 * The original version of the file contains: 1035 * data1 size (4 bytes) 1036 * data1 1037 * data2 size (possibly) (4 bytes) 1038 * data2 (possibly) 1039 * 1040 * As of 4.7 the file has the following form: 1041 * 0 (4 bytes - to indicate a new style file) 1042 * file version (4 bytes) 1043 * data1 version (4 bytes) 1044 * data1 size (4 bytes) 1045 * data1 1046 * data2 version (possibly) (4 bytes) 1047 * data2 size (possibly) (4 bytes) 1048 * data2 (possibly) 1049 */ 1050 if ((ret = __db_appname( 1051 env, DB_APP_NONE, REP_INITNAME, 0, NULL, &fname)) != 0) 1052 goto out; 1053 /* Sanity check that the write size fits into 32 bits. */ 1054 DB_ASSERT(env, updlen + filelen == (u_int32_t)(updlen + filelen)); 1055 bufsz = (u_int32_t)(updlen + filelen); 1056 1057 /* 1058 * (Short writes aren't possible, so we don't have to verify 'cnt'.) 1059 * This first list is generated internally, so it is always in 1060 * the form of the current message version. 1061 */ 1062 zero = 0; 1063 fvers = REP_INITVERSION; 1064 mvers = DB_REPVERSION; 1065 if ((ret = __os_open(env, fname, 0, 1066 DB_OSO_CREATE | DB_OSO_TRUNC, DB_MODE_600, &fhp)) != 0 || 1067 (ret = __os_write(env, fhp, &zero, sizeof(zero), &cnt)) != 0 || 1068 (ret = __os_write(env, fhp, &fvers, sizeof(fvers), &cnt)) != 0 || 1069 (ret = __os_write(env, fhp, &mvers, sizeof(mvers), &cnt)) != 0 || 1070 (ret = __os_write(env, fhp, &bufsz, sizeof(bufsz), &cnt)) != 0 || 1071 (ret = __os_write(env, fhp, buf, bufsz, &cnt)) != 0 || 1072 (ret = __os_fsync(env, fhp)) != 0) { 1073 __db_err(env, ret, "%s", fname); 1074 goto out; 1075 } 1076 1077 /* 1078 * 3. Go ahead and remove logs and databases. The databases get removed 1079 * according to the list we just finished safe-storing. 1080 */ 1081 if ((ret = __rep_remove_logs(env)) != 0) 1082 goto out; 1083 if ((ret = __rep_closefiles(env, 0)) != 0) 1084 goto out; 1085 fp = origfp; 1086 while (filecnt-- > 0) { 1087 if ((ret = __rep_fileinfo_unmarshal(env, DB_REPVERSION, 1088 &finfo, fp, filesz, &new_fp)) != 0) 1089 goto out; 1090 if ((ret = __rep_remove_file(env, finfo->uid.data, 1091 finfo->info.data, finfo->type, finfo->db_flags)) != 0) 1092 goto out; 1093 filesz -= (u_int32_t)(new_fp - fp); 1094 fp = new_fp; 1095 __os_free(env, finfo); 1096 finfo = NULL; 1097 } 1098 1099 /* 1100 * 4. Safe-store the (new) list of database files we intend to copy from 1101 * the master (again, so that in case we crash before we're finished 1102 * doing so, we'll have enough information to clean up and start over 1103 * again). This list is the list from the master, so it uses 1104 * the message version. 1105 */ 1106 mvers = msg_version; 1107 if ((ret = __os_write(env, fhp, &mvers, sizeof(mvers), &cnt)) != 0 || 1108 (ret = __os_write(env, fhp, 1109 &rec->size, sizeof(rec->size), &cnt)) != 0 || 1110 (ret = __os_write(env, fhp, rec->data, rec->size, &cnt)) != 0 || 1111 (ret = __os_fsync(env, fhp)) != 0) { 1112 __db_err(env, ret, "%s", fname); 1113 goto out; 1114 } 1115 1116out: 1117 if (fhp != NULL && (t_ret = __os_closehandle(env, fhp)) && ret == 0) 1118 ret = t_ret; 1119 if (fname != NULL) 1120 __os_free(env, fname); 1121 if (finfo != NULL) 1122 __os_free(env, finfo); 1123 __os_free(env, buf); 1124 return (ret); 1125} 1126 1127/* 1128 * __rep_remove_logs - 1129 * Remove our logs to prepare for internal init. 1130 */ 1131static int 1132__rep_remove_logs(env) 1133 ENV *env; 1134{ 1135 DB_LOG *dblp; 1136 DB_LSN lsn; 1137 LOG *lp; 1138 u_int32_t fnum, lastfile; 1139 int ret; 1140 char *name; 1141 1142 dblp = env->lg_handle; 1143 lp = dblp->reginfo.primary; 1144 ret = 0; 1145 1146 /* 1147 * Call memp_sync to flush any pages that might be in the log buffers 1148 * and not on disk before we remove files on disk. If there were no 1149 * dirty pages, the log isn't flushed. Yet the log buffers could still 1150 * be dirty: __log_flush should take care of this rare situation. 1151 */ 1152 if ((ret = __memp_sync_int(env, 1153 NULL, 0, DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0) 1154 return (ret); 1155 if ((ret = __log_flush(env, NULL)) != 0) 1156 return (ret); 1157 /* 1158 * Forcibly remove existing log files or reset 1159 * the in-memory log space. 1160 */ 1161 if (lp->db_log_inmemory) { 1162 ZERO_LSN(lsn); 1163 if ((ret = __log_zero(env, &lsn)) != 0) 1164 return (ret); 1165 } else { 1166 lastfile = lp->lsn.file; 1167 for (fnum = 1; fnum <= lastfile; fnum++) { 1168 if ((ret = __log_name(dblp, fnum, &name, NULL, 0)) != 0) 1169 return (ret); 1170 (void)time(&lp->timestamp); 1171 (void)__os_unlink(env, name, 0); 1172 __os_free(env, name); 1173 } 1174 } 1175 return (0); 1176} 1177 1178/* 1179 * Removes a file during internal init. Assumes underlying subsystems are 1180 * active; therefore, this can't be used for internal init crash recovery. 1181 */ 1182static int 1183__rep_remove_file(env, uid, name, type, flags) 1184 ENV *env; 1185 u_int8_t *uid; 1186 const char *name; 1187 u_int32_t type, flags; 1188{ 1189 /* 1190 * Calling __fop_remove will both purge any matching 1191 * fileid from mpool and unlink it on disk. 1192 */ 1193#ifdef HAVE_QUEUE 1194 DB *dbp; 1195 int ret; 1196 1197 /* 1198 * Handle queue separately. __fop_remove will not 1199 * remove extent files. Use __qam_remove to remove 1200 * extent files that might exist under this name. Note that 1201 * in-memory queue databases can't have extent files. 1202 */ 1203 if (type == (u_int32_t)DB_QUEUE && !LF_ISSET(DB_AM_INMEM)) { 1204 if ((ret = __db_create_internal(&dbp, env, 0)) != 0) 1205 return (ret); 1206 1207 /* 1208 * At present, qam_remove expects the passed-in dbp to have a 1209 * locker allocated, and if not, db_open allocates a locker 1210 * which qam_remove then leaks. 1211 * 1212 * TODO: it would be better to avoid cobbling together this 1213 * sequence of low-level operations, if fileops provided some 1214 * API to allow us to remove a database without write-locking 1215 * its handle. 1216 */ 1217 if ((ret = __lock_id(env, NULL, &dbp->locker)) != 0) 1218 return (ret); 1219 1220 RPRINT(env, DB_VERB_REP_SYNC, 1221 (env, "QAM: Unlink %s via __qam_remove", name)); 1222 if ((ret = __qam_remove(dbp, NULL, name, NULL)) != 0) { 1223 RPRINT(env, DB_VERB_REP_SYNC, 1224 (env, "qam_remove returned %d", ret)); 1225 (void)__db_close(dbp, NULL, DB_NOSYNC); 1226 return (ret); 1227 } 1228 if ((ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0) 1229 return (ret); 1230 } 1231#else 1232 COMPQUIET(type, 0); 1233 COMPQUIET(flags, 0); 1234#endif 1235 /* 1236 * We call fop_remove even if we've called qam_remove. 1237 * That will only have removed extent files. Now 1238 * we need to deal with the actual file itself. 1239 */ 1240 return (__fop_remove(env, NULL, uid, name, DB_APP_DATA, 0)); 1241} 1242 1243/* 1244 * __rep_bulk_page 1245 * Process a bulk page message. 1246 * 1247 * PUBLIC: int __rep_bulk_page __P((ENV *, 1248 * PUBLIC: DB_THREAD_INFO *, int, __rep_control_args *, DBT *)); 1249 */ 1250int 1251__rep_bulk_page(env, ip, eid, rp, rec) 1252 ENV *env; 1253 DB_THREAD_INFO *ip; 1254 int eid; 1255 __rep_control_args *rp; 1256 DBT *rec; 1257{ 1258 __rep_control_args tmprp; 1259 __rep_bulk_args b_args; 1260 int ret; 1261 u_int8_t *p, *ep; 1262 1263 /* 1264 * We're going to be modifying the rp LSN contents so make 1265 * our own private copy to play with. We need to set the 1266 * rectype to REP_PAGE because we're calling through __rep_page 1267 * to process each page, and lower functions make decisions 1268 * based on the rectypes (for throttling/gap processing) 1269 */ 1270 memcpy(&tmprp, rp, sizeof(tmprp)); 1271 tmprp.rectype = REP_PAGE; 1272 ret = 0; 1273 for (ep = (u_int8_t *)rec->data + rec->size, p = (u_int8_t *)rec->data; 1274 p < ep;) { 1275 /* 1276 * First thing in the buffer is the length. Then the LSN 1277 * of this page, then the page info itself. 1278 */ 1279 if ((ret = __rep_bulk_unmarshal(env, 1280 &b_args, p, rec->size, &p)) != 0) 1281 return (ret); 1282 RPRINT(env, DB_VERB_REP_SYNC, (env, 1283 "rep_bulk_page: Processing LSN [%lu][%lu]", 1284 (u_long)tmprp.lsn.file, (u_long)tmprp.lsn.offset)); 1285 RPRINT(env, DB_VERB_REP_SYNC, (env, 1286 "rep_bulk_page: p %#lx ep %#lx pgrec data %#lx, size %lu (%#lx)", 1287 P_TO_ULONG(p), P_TO_ULONG(ep), 1288 P_TO_ULONG(b_args.bulkdata.data), 1289 (u_long)b_args.bulkdata.size, 1290 (u_long)b_args.bulkdata.size)); 1291 /* 1292 * Now send the page info DBT to the page processing function. 1293 */ 1294 ret = __rep_page(env, ip, eid, &tmprp, &b_args.bulkdata); 1295 RPRINT(env, DB_VERB_REP_SYNC, (env, 1296 "rep_bulk_page: rep_page ret %d", ret)); 1297 1298 /* 1299 * If this set of pages is already done just return. 1300 */ 1301 if (ret != 0) { 1302 if (ret == DB_REP_PAGEDONE) 1303 ret = 0; 1304 break; 1305 } 1306 } 1307 return (ret); 1308} 1309 1310/* 1311 * __rep_page 1312 * Process a page message. 1313 * 1314 * PUBLIC: int __rep_page __P((ENV *, 1315 * PUBLIC: DB_THREAD_INFO *, int, __rep_control_args *, DBT *)); 1316 */ 1317int 1318__rep_page(env, ip, eid, rp, rec) 1319 ENV *env; 1320 DB_THREAD_INFO *ip; 1321 int eid; 1322 __rep_control_args *rp; 1323 DBT *rec; 1324{ 1325 1326 DB_REP *db_rep; 1327 DBT key, data; 1328 REP *rep; 1329 __rep_fileinfo_args *msgfp; 1330 db_recno_t recno; 1331 int ret; 1332 1333 ret = 0; 1334 db_rep = env->rep_handle; 1335 rep = db_rep->region; 1336 1337 if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) 1338 return (DB_REP_PAGEDONE); 1339 /* 1340 * If we restarted internal init, it is possible to receive 1341 * an old REP_PAGE message, while we're in the current 1342 * stage of recovering pages. Until we have some sort of 1343 * an init generation number, ignore any message that has 1344 * a message LSN that is before this internal init's first_lsn. 1345 */ 1346 if (LOG_COMPARE(&rp->lsn, &rep->first_lsn) < 0) { 1347 RPRINT(env, DB_VERB_REP_SYNC, (env, 1348 "PAGE: Old page: msg LSN [%lu][%lu] first_lsn [%lu][%lu]", 1349 (u_long)rp->lsn.file, (u_long)rp->lsn.offset, 1350 (u_long)rep->first_lsn.file, 1351 (u_long)rep->first_lsn.offset)); 1352 return (DB_REP_PAGEDONE); 1353 } 1354 if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version, 1355 &msgfp, rec->data, rec->size, NULL)) != 0) 1356 return (ret); 1357 MUTEX_LOCK(env, rep->mtx_clientdb); 1358 REP_SYSTEM_LOCK(env); 1359 /* 1360 * We should not ever be in internal init with a lease granted. 1361 */ 1362 DB_ASSERT(env, 1363 !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0); 1364 1365 RPRINT(env, DB_VERB_REP_SYNC, (env, 1366 "PAGE: Received page %lu from file %d", 1367 (u_long)msgfp->pgno, msgfp->filenum)); 1368 /* 1369 * Check if this page is from the file we're expecting. 1370 * This may be an old or delayed page message. 1371 */ 1372 /* 1373 * !!! 1374 * If we allow dbrename/dbremove on the master while a client 1375 * is updating, then we'd have to verify the file's uid here too. 1376 */ 1377 if (msgfp->filenum != rep->curfile) { 1378 RPRINT(env, DB_VERB_REP_SYNC, 1379 (env, "Msg file %d != curfile %d", 1380 msgfp->filenum, rep->curfile)); 1381 ret = DB_REP_PAGEDONE; 1382 goto err; 1383 } 1384 /* 1385 * We want to create/open our dbp to the database 1386 * where we'll keep our page information. 1387 */ 1388 if ((ret = __rep_client_dbinit(env, 1, REP_PG)) != 0) { 1389 RPRINT(env, DB_VERB_REP_SYNC, (env, 1390 "PAGE: Client_dbinit %s", db_strerror(ret))); 1391 goto err; 1392 } 1393 1394 memset(&key, 0, sizeof(key)); 1395 memset(&data, 0, sizeof(data)); 1396 recno = (db_recno_t)(msgfp->pgno + 1); 1397 key.data = &recno; 1398 key.ulen = key.size = sizeof(db_recno_t); 1399 key.flags = DB_DBT_USERMEM; 1400 1401 /* 1402 * If we already have this page, then we don't want to bother 1403 * rewriting it into the file. Otherwise, any other error 1404 * we want to return. 1405 */ 1406 ret = __db_put(rep->file_dbp, ip, NULL, &key, &data, DB_NOOVERWRITE); 1407 if (ret == DB_KEYEXIST) { 1408 RPRINT(env, DB_VERB_REP_SYNC, (env, 1409 "PAGE: Received duplicate page %lu from file %d", 1410 (u_long)msgfp->pgno, msgfp->filenum)); 1411 STAT(rep->stat.st_pg_duplicated++); 1412 ret = 0; 1413 goto err; 1414 } 1415 if (ret != 0) 1416 goto err; 1417 1418 RPRINT(env, DB_VERB_REP_SYNC, (env, 1419 "PAGE: Write page %lu into mpool", (u_long)msgfp->pgno)); 1420 /* 1421 * We put the page in the database file itself. 1422 */ 1423 ret = __rep_write_page(env, ip, rep, msgfp); 1424 if (ret != 0) { 1425 /* 1426 * We got an error storing the page, therefore, we need 1427 * remove this page marker from the page database too. 1428 * !!! 1429 * I'm ignoring errors from the delete because we want to 1430 * return the original error. If we cannot write the page 1431 * and we cannot delete the item we just put, what should 1432 * we do? Panic the env and return DB_RUNRECOVERY? 1433 */ 1434 (void)__db_del(rep->file_dbp, NULL, NULL, &key, 0); 1435 goto err; 1436 } 1437 STAT(rep->stat.st_pg_records++); 1438 rep->npages++; 1439 1440 /* 1441 * Now check the LSN on the page and save it if it is later 1442 * than the one we have. 1443 */ 1444 if (LOG_COMPARE(&rp->lsn, &rep->last_lsn) > 0) 1445 rep->last_lsn = rp->lsn; 1446 1447 /* 1448 * We've successfully written the page. Now we need to see if 1449 * we're done with this file. __rep_filedone will check if we 1450 * have all the pages expected and if so, set up for the next 1451 * file and send out a page request for the next file's pages. 1452 */ 1453 ret = __rep_filedone(env, ip, eid, rep, msgfp, rp->rectype); 1454 1455err: REP_SYSTEM_UNLOCK(env); 1456 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1457 1458 __os_free(env, msgfp); 1459 return (ret); 1460} 1461 1462/* 1463 * __rep_page_fail 1464 * Process a page fail message. 1465 * 1466 * PUBLIC: int __rep_page_fail __P((ENV *, 1467 * PUBLIC: DB_THREAD_INFO *, int, __rep_control_args *, DBT *)); 1468 */ 1469int 1470__rep_page_fail(env, ip, eid, rp, rec) 1471 ENV *env; 1472 DB_THREAD_INFO *ip; 1473 int eid; 1474 __rep_control_args *rp; 1475 DBT *rec; 1476{ 1477 1478 DB_REP *db_rep; 1479 REP *rep; 1480 __rep_fileinfo_args *msgfp, *rfp; 1481 int ret; 1482 1483 ret = 0; 1484 db_rep = env->rep_handle; 1485 rep = db_rep->region; 1486 1487 if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) 1488 return (0); 1489 if ((ret = __rep_fileinfo_unmarshal(env, rp->rep_version, 1490 &msgfp, rec->data, rec->size, NULL)) != 0) 1491 return (ret); 1492 /* 1493 * Check if this page is from the file we're expecting. 1494 * This may be an old or delayed page message. 1495 */ 1496 /* 1497 * !!! 1498 * If we allow dbrename/dbremove on the master while a client 1499 * is updating, then we'd have to verify the file's uid here too. 1500 */ 1501 MUTEX_LOCK(env, rep->mtx_clientdb); 1502 REP_SYSTEM_LOCK(env); 1503 /* 1504 * We should not ever be in internal init with a lease granted. 1505 */ 1506 DB_ASSERT(env, 1507 !IS_USING_LEASES(env) || __rep_islease_granted(env) == 0); 1508 1509 if (msgfp->filenum != rep->curfile) { 1510 RPRINT(env, DB_VERB_REP_SYNC, 1511 (env, "Msg file %d != curfile %d", 1512 msgfp->filenum, rep->curfile)); 1513 goto out; 1514 } 1515 rfp = rep->curinfo; 1516 if (rfp->type != (u_int32_t)DB_QUEUE) 1517 --rfp->max_pgno; 1518 else { 1519 /* 1520 * Queue is special. Pages at the beginning of the queue 1521 * may disappear, as well as at the end. Use msgfp->pgno 1522 * to adjust accordingly. 1523 */ 1524 RPRINT(env, DB_VERB_REP_SYNC, (env, 1525 "page_fail: BEFORE page %lu failed. ready %lu, max %lu, npages %d", 1526 (u_long)msgfp->pgno, (u_long)rep->ready_pg, 1527 (u_long)rfp->max_pgno, rep->npages)); 1528 if (msgfp->pgno == rfp->max_pgno) 1529 --rfp->max_pgno; 1530 if (msgfp->pgno >= rep->ready_pg) { 1531 rep->ready_pg = msgfp->pgno + 1; 1532 rep->npages = rep->ready_pg; 1533 } 1534 RPRINT(env, DB_VERB_REP_SYNC, (env, 1535 "page_fail: AFTER page %lu failed. ready %lu, max %lu, npages %d", 1536 (u_long)msgfp->pgno, (u_long)rep->ready_pg, 1537 (u_long)rfp->max_pgno, rep->npages)); 1538 } 1539 1540 /* 1541 * We've lowered the number of pages expected. It is possible that 1542 * this was the last page we were expecting. Now we need to see if 1543 * we're done with this file. __rep_filedone will check if we have 1544 * all the pages expected and if so, set up for the next file and 1545 * send out a page request for the next file's pages. 1546 */ 1547 ret = __rep_filedone(env, ip, eid, rep, msgfp, REP_PAGE_FAIL); 1548out: 1549 REP_SYSTEM_UNLOCK(env); 1550 MUTEX_UNLOCK(env, rep->mtx_clientdb); 1551 __os_free(env, msgfp); 1552 return (ret); 1553} 1554 1555/* 1556 * __rep_write_page - 1557 * Write this page into a database. 1558 */ 1559static int 1560__rep_write_page(env, ip, rep, msgfp) 1561 ENV *env; 1562 DB_THREAD_INFO *ip; 1563 REP *rep; 1564 __rep_fileinfo_args *msgfp; 1565{ 1566 DB db; 1567 DBT pgcookie; 1568 DB_MPOOLFILE *mpf; 1569 DB_PGINFO *pginfo; 1570 __rep_fileinfo_args *rfp; 1571 int ret; 1572 void *dst; 1573 1574 rfp = NULL; 1575 1576 /* 1577 * If this is the first page we're putting in this database, we need 1578 * to create the mpool file. Otherwise call memp_fget to create the 1579 * page in mpool. Then copy the data to the page, and memp_fput the 1580 * page to give it back to mpool. 1581 * 1582 * We need to create the file, removing any existing file and associate 1583 * the correct file ID with the new one. 1584 */ 1585 rfp = rep->curinfo; 1586 if (rep->file_mpf == NULL) { 1587 if (!FLD_ISSET(rfp->db_flags, DB_AM_INMEM)) { 1588 /* 1589 * Recreate the file on disk. We'll be putting 1590 * the data into the file via mpool. 1591 */ 1592 RPRINT(env, DB_VERB_REP_SYNC, (env, 1593 "rep_write_page: Calling fop_create for %s", 1594 (char *)rfp->info.data)); 1595 if ((ret = __fop_create(env, NULL, NULL, 1596 rfp->info.data, DB_APP_DATA, 1597 env->db_mode, 0)) != 0) 1598 goto err; 1599 } 1600 1601 if ((ret = 1602 __rep_mpf_open(env, &rep->file_mpf, rep->curinfo, 1603 FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? DB_CREATE : 0)) != 0) 1604 goto err; 1605 } 1606 /* 1607 * Handle queue specially. If we're a QUEUE database, we need to 1608 * use the __qam_fget/put calls. We need to use rep->queue_dbc for 1609 * that. That dbp is opened after getting the metapage for the 1610 * queue database. Since the meta-page is always in the queue file, 1611 * we'll use the normal path for that first page. After that we 1612 * can assume the dbp is opened. 1613 */ 1614 if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0) { 1615#ifdef HAVE_QUEUE 1616 ret = __qam_fget(rep->queue_dbc, &msgfp->pgno, 1617 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &dst); 1618#else 1619 /* 1620 * This always returns an error. 1621 */ 1622 ret = __db_no_queue_am(env); 1623#endif 1624 } else 1625 ret = __memp_fget(rep->file_mpf, &msgfp->pgno, ip, NULL, 1626 DB_MPOOL_CREATE | DB_MPOOL_DIRTY, &dst); 1627 1628 if (ret != 0) 1629 goto err; 1630 1631 /* 1632 * Before writing this page into our local mpool, see if its byte order 1633 * needs to be swapped. When in mpool the page should be in the native 1634 * byte order of our local environment. But the page image we've 1635 * received may be in the opposite order (as indicated in finfo_flags). 1636 */ 1637 if ((F_ISSET(env, ENV_LITTLEENDIAN) && 1638 !FLD_ISSET(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN)) || 1639 (!F_ISSET(env, ENV_LITTLEENDIAN) && 1640 FLD_ISSET(msgfp->finfo_flags, REPINFO_PG_LITTLEENDIAN))) { 1641 RPRINT(env, DB_VERB_REP_SYNC, (env, 1642 "write_page: Page %d needs to be swapped", msgfp->pgno)); 1643 /* 1644 * Set up a dbp to pass into the swap functions. We need 1645 * only a few things: The environment and any special 1646 * dbp flags and some obvious basics like db type and 1647 * pagesize. Those flags were set back in rep_mpf_open 1648 * and are available in the pgcookie set up with the 1649 * mpoolfile associated with this database. 1650 */ 1651 memset(&db, 0, sizeof(db)); 1652 db.env = env; 1653 db.type = (DBTYPE)msgfp->type; 1654 db.pgsize = msgfp->pgsize; 1655 mpf = rep->file_mpf; 1656 if ((ret = __memp_get_pgcookie(mpf, &pgcookie)) != 0) 1657 goto err; 1658 pginfo = (DB_PGINFO *)pgcookie.data; 1659 db.flags = pginfo->flags; 1660 if ((ret = __db_pageswap(&db, msgfp->info.data, msgfp->pgsize, 1661 NULL, 1)) != 0) 1662 goto err; 1663 } 1664 1665 memcpy(dst, msgfp->info.data, msgfp->pgsize); 1666#ifdef HAVE_QUEUE 1667 if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0) 1668 ret = __qam_fput(rep->queue_dbc, 1669 msgfp->pgno, dst, rep->queue_dbc->priority); 1670 else 1671#endif 1672 ret = __memp_fput(rep->file_mpf, 1673 ip, dst, rep->file_dbp->priority); 1674 1675err: return (ret); 1676} 1677 1678/* 1679 * __rep_page_gap - 1680 * After we've put the page into the database, we need to check if 1681 * we have a page gap and whether we need to request pages. 1682 */ 1683static int 1684__rep_page_gap(env, rep, msgfp, type) 1685 ENV *env; 1686 REP *rep; 1687 __rep_fileinfo_args *msgfp; 1688 u_int32_t type; 1689{ 1690 DBC *dbc; 1691 DBT data, key; 1692 DB_LOG *dblp; 1693 DB_THREAD_INFO *ip; 1694 LOG *lp; 1695 __rep_fileinfo_args *rfp; 1696 db_recno_t recno; 1697 int ret, t_ret; 1698 1699 dblp = env->lg_handle; 1700 lp = dblp->reginfo.primary; 1701 ret = 0; 1702 dbc = NULL; 1703 1704 /* 1705 * We've successfully put this page into our file. 1706 * Now we need to account for it and re-request new pages 1707 * if necessary. 1708 */ 1709 /* 1710 * We already hold both the db mutex and rep mutex. 1711 */ 1712 rfp = rep->curinfo; 1713 1714 /* 1715 * Make sure we're still talking about the same file. 1716 * If not, we're done here. 1717 */ 1718 if (rfp->filenum != msgfp->filenum) { 1719 ret = DB_REP_PAGEDONE; 1720 goto err; 1721 } 1722 1723 /* 1724 * We have 3 possible states: 1725 * 1. We receive a page we already have accounted for. 1726 * msg pgno < ready pgno 1727 * 2. We receive a page that is beyond a gap. 1728 * msg pgno > ready pgno 1729 * 3. We receive the page we're expecting next. 1730 * msg pgno == ready pgno 1731 */ 1732 /* 1733 * State 1. This can happen once we put our page record into the 1734 * database, but by the time we acquire the mutex other 1735 * threads have already accounted for this page and moved on. 1736 * We just want to return. 1737 */ 1738 if (msgfp->pgno < rep->ready_pg) { 1739 RPRINT(env, DB_VERB_REP_SYNC, (env, 1740 "PAGE_GAP: pgno %lu < ready %lu, waiting %lu", 1741 (u_long)msgfp->pgno, (u_long)rep->ready_pg, 1742 (u_long)rep->waiting_pg)); 1743 goto err; 1744 } 1745 1746 /* 1747 * State 2. This page is beyond the page we're expecting. 1748 * We need to update waiting_pg if this page is less than 1749 * (earlier) the current waiting_pg. There is nothing 1750 * to do but see if we need to request. 1751 */ 1752 RPRINT(env, DB_VERB_REP_SYNC, (env, 1753 "PAGE_GAP: pgno %lu, max_pg %lu ready %lu, waiting %lu max_wait %lu", 1754 (u_long)msgfp->pgno, (u_long)rfp->max_pgno, (u_long)rep->ready_pg, 1755 (u_long)rep->waiting_pg, (u_long)rep->max_wait_pg)); 1756 if (msgfp->pgno > rep->ready_pg) { 1757 if (rep->waiting_pg == PGNO_INVALID || 1758 msgfp->pgno < rep->waiting_pg) 1759 rep->waiting_pg = msgfp->pgno; 1760 } else { 1761 /* 1762 * We received the page we're expecting. 1763 */ 1764 rep->ready_pg++; 1765 __os_gettime(env, &lp->rcvd_ts, 1); 1766 if (rep->ready_pg == rep->waiting_pg) { 1767 /* 1768 * If we get here we know we just filled a gap. 1769 * Move the cursor to that place and then walk 1770 * forward looking for the next gap, if it exists. 1771 */ 1772 lp->wait_ts = rep->request_gap; 1773 rep->max_wait_pg = PGNO_INVALID; 1774 /* 1775 * We need to walk the recno database looking for the 1776 * next page we need or expect. 1777 */ 1778 memset(&key, 0, sizeof(key)); 1779 memset(&data, 0, sizeof(data)); 1780 ENV_GET_THREAD_INFO(env, ip); 1781 if ((ret = __db_cursor(rep->file_dbp, ip, NULL, 1782 &dbc, 0)) != 0) 1783 goto err; 1784 /* 1785 * Set cursor to the first waiting page. 1786 * Page numbers/record numbers are offset by 1. 1787 */ 1788 recno = (db_recno_t)rep->waiting_pg + 1; 1789 key.data = &recno; 1790 key.ulen = key.size = sizeof(db_recno_t); 1791 key.flags = DB_DBT_USERMEM; 1792 /* 1793 * We know that page is there, this should 1794 * find the record. 1795 */ 1796 ret = __dbc_get(dbc, &key, &data, DB_SET); 1797 if (ret != 0) 1798 goto err; 1799 RPRINT(env, DB_VERB_REP_SYNC, (env, 1800 "PAGE_GAP: Set cursor for ready %lu, waiting %lu", 1801 (u_long)rep->ready_pg, (u_long)rep->waiting_pg)); 1802 } 1803 while (ret == 0 && rep->ready_pg == rep->waiting_pg) { 1804 rep->ready_pg++; 1805 ret = __dbc_get(dbc, &key, &data, DB_NEXT); 1806 /* 1807 * If we get to the end of the list, there are no 1808 * more gaps. Reset waiting_pg. 1809 */ 1810 if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY) { 1811 rep->waiting_pg = PGNO_INVALID; 1812 RPRINT(env, DB_VERB_REP_SYNC, (env, 1813 "PAGE_GAP: Next cursor No next - ready %lu, waiting %lu", 1814 (u_long)rep->ready_pg, 1815 (u_long)rep->waiting_pg)); 1816 break; 1817 } 1818 /* 1819 * Subtract 1 from waiting_pg because record numbers 1820 * are 1-based and pages are 0-based and we added 1 1821 * into the page number when we put it into the db. 1822 */ 1823 rep->waiting_pg = *(db_pgno_t *)key.data; 1824 rep->waiting_pg--; 1825 RPRINT(env, DB_VERB_REP_SYNC, (env, 1826 "PAGE_GAP: Next cursor ready %lu, waiting %lu", 1827 (u_long)rep->ready_pg, (u_long)rep->waiting_pg)); 1828 } 1829 } 1830 1831 /* 1832 * If we filled a gap and now have the entire file, there's 1833 * nothing to do. We're done when ready_pg is > max_pgno 1834 * because ready_pg is larger than the last page we received. 1835 */ 1836 if (rep->ready_pg > rfp->max_pgno) 1837 goto err; 1838 1839 /* 1840 * Check if we need to ask for more pages. 1841 */ 1842 if ((rep->waiting_pg != PGNO_INVALID && 1843 rep->ready_pg != rep->waiting_pg) || type == REP_PAGE_MORE) { 1844 /* 1845 * We got a page but we may still be waiting for more. 1846 * If we got REP_PAGE_MORE we always want to ask for more. 1847 * We need to set rfp->pgno to the current page number 1848 * we will use to ask for more pages. 1849 */ 1850 if (type == REP_PAGE_MORE) 1851 rfp->pgno = msgfp->pgno; 1852 if ((__rep_check_doreq(env, rep) || type == REP_PAGE_MORE) && 1853 ((ret = __rep_pggap_req(env, rep, rfp, 1854 (type == REP_PAGE_MORE) ? REP_GAP_FORCE : 0)) != 0)) 1855 goto err; 1856 } else { 1857 lp->wait_ts = rep->request_gap; 1858 rep->max_wait_pg = PGNO_INVALID; 1859 } 1860 1861err: 1862 if (dbc != NULL && (t_ret = __dbc_close(dbc)) != 0 && ret == 0) 1863 ret = t_ret; 1864 1865 return (ret); 1866} 1867 1868/* 1869 * __rep_init_cleanup - 1870 * Clean up internal initialization pieces. 1871 * 1872 * !!! 1873 * Caller must hold client database mutex (mtx_clientdb) and REP_SYSTEM_LOCK. 1874 * 1875 * PUBLIC: int __rep_init_cleanup __P((ENV *, REP *, int)); 1876 */ 1877int 1878__rep_init_cleanup(env, rep, force) 1879 ENV *env; 1880 REP *rep; 1881 int force; 1882{ 1883 DB *queue_dbp; 1884 DB_LOG *dblp; 1885 LOG *lp; 1886 int cleanup_failure, ret, t_ret; 1887 1888 ret = 0; 1889 /* 1890 * 1. Close up the file data pointer we used. 1891 * 2. Close/reset the page database. 1892 * 3. Close/reset the queue database if we're forcing a cleanup. 1893 * 4. Free current file info. 1894 * 5. If we have all files or need to force, free original file info. 1895 */ 1896 if (rep->file_mpf != NULL) { 1897 ret = __memp_fclose(rep->file_mpf, 0); 1898 rep->file_mpf = NULL; 1899 } 1900 if (rep->file_dbp != NULL) { 1901 t_ret = __db_close(rep->file_dbp, NULL, DB_NOSYNC); 1902 rep->file_dbp = NULL; 1903 if (t_ret != 0 && ret == 0) 1904 ret = t_ret; 1905 } 1906 if (force && rep->queue_dbc != NULL) { 1907 queue_dbp = rep->queue_dbc->dbp; 1908 if ((t_ret = __dbc_close(rep->queue_dbc)) != 0 && ret == 0) 1909 ret = t_ret; 1910 rep->queue_dbc = NULL; 1911 if ((t_ret = __db_close(queue_dbp, NULL, DB_NOSYNC)) != 0 && 1912 ret == 0) 1913 ret = t_ret; 1914 } 1915 if (rep->curinfo != NULL) { 1916 __os_free(env, rep->curinfo); 1917 rep->curinfo = NULL; 1918 } 1919 if (F_ISSET(rep, REP_F_INTERNAL_INIT_MASK) && force) { 1920 /* 1921 * Clean up files involved in an interrupted internal init. 1922 * 1923 * 1. logs 1924 * a) remove old log files 1925 * b) set up initial log file #1 1926 * 2. database files 1927 * 3. the "init file" 1928 * 1929 * Steps 1 and 2 can be attempted independently. Step 1b is 1930 * dependent on successful completion of 1a. Step 3 must not be 1931 * done if anything fails along the way, because the init file's 1932 * raison d'etre is to show that some files remain to be cleaned 1933 * up. 1934 */ 1935 RPRINT(env, DB_VERB_REP_SYNC, 1936 (env, "clean up interrupted internal init")); 1937 cleanup_failure = 0; 1938 1939 if ((t_ret = __rep_remove_logs(env)) == 0) { 1940 /* 1941 * Since we have no logs, recover by making it look like 1942 * the case when a new client first starts up, namely we 1943 * have nothing but a fresh log file #1. This is a 1944 * little wasteful, since we may soon remove this log 1945 * file again. But that's OK, because this is the 1946 * unusual case of NEWMASTER during internal init, and 1947 * the rest of internal init doubtless dwarfs this. 1948 */ 1949 dblp = env->lg_handle; 1950 lp = dblp->reginfo.primary; 1951 1952 if ((t_ret = __rep_log_setup(env, 1953 rep, 1, DB_LOGVERSION, &lp->ready_lsn)) != 0) { 1954 cleanup_failure = 1; 1955 if (ret == 0) 1956 ret = t_ret; 1957 } 1958 } else { 1959 cleanup_failure = 1; 1960 if (ret == 0) 1961 ret = t_ret; 1962 } 1963 1964 if ((t_ret = __rep_remove_by_list(env, rep->infoversion, 1965 rep->originfo, rep->originfolen, rep->nfiles)) != 0) { 1966 cleanup_failure = 1; 1967 if (ret == 0) 1968 ret = t_ret; 1969 } 1970 1971 if (!cleanup_failure && 1972 (t_ret = __rep_remove_init_file(env)) != 0) { 1973 if (ret == 0) 1974 ret = t_ret; 1975 } 1976 1977 if (rep->originfo != NULL) { 1978 __os_free(env, rep->originfo); 1979 rep->originfo = NULL; 1980 } 1981 } 1982 1983 return (ret); 1984} 1985 1986/* 1987 * __rep_filedone - 1988 * We need to check if we're done with the current file after 1989 * processing the current page. Stat the database to see if 1990 * we have all the pages. If so, we need to clean up/close 1991 * this one, set up for the next one, and ask for its pages, 1992 * or if this is the last file, request the log records and 1993 * move to the REP_RECOVER_LOG state. 1994 */ 1995static int 1996__rep_filedone(env, ip, eid, rep, msgfp, type) 1997 ENV *env; 1998 DB_THREAD_INFO *ip; 1999 int eid; 2000 REP *rep; 2001 __rep_fileinfo_args *msgfp; 2002 u_int32_t type; 2003{ 2004 __rep_fileinfo_args *rfp; 2005 int ret; 2006 2007 /* 2008 * We've put our page, now we need to do any gap processing 2009 * that might be needed to re-request pages. 2010 */ 2011 ret = __rep_page_gap(env, rep, msgfp, type); 2012 /* 2013 * The world changed while we were doing gap processing. 2014 * We're done here. 2015 */ 2016 if (ret == DB_REP_PAGEDONE) 2017 return (0); 2018 2019 rfp = rep->curinfo; 2020 /* 2021 * max_pgno is 0-based and npages is 1-based, so we don't have 2022 * all the pages until npages is > max_pgno. 2023 */ 2024 RPRINT(env, DB_VERB_REP_SYNC, 2025 (env, "FILEDONE: have %lu pages. Need %lu.", 2026 (u_long)rep->npages, (u_long)rfp->max_pgno + 1)); 2027 if (rep->npages <= rfp->max_pgno) 2028 return (0); 2029 2030 /* 2031 * If we're queue and we think we have all the pages for this file, 2032 * we need to do special queue processing. Queue is handled in 2033 * several stages. 2034 */ 2035 if (rfp->type == (u_int32_t)DB_QUEUE && 2036 ((ret = __rep_queue_filedone(env, ip, rep, rfp)) != 2037 DB_REP_PAGEDONE)) 2038 return (ret); 2039 /* 2040 * We have all the pages for this file. Clean up. 2041 */ 2042 if ((ret = __rep_init_cleanup(env, rep, 0)) != 0) 2043 goto err; 2044 2045 rep->curfile++; 2046 ret = __rep_nextfile(env, eid, rep); 2047err: 2048 return (ret); 2049} 2050 2051/* 2052 * Starts requesting pages for the next file in the list (if any), or if not, 2053 * proceeds to the next stage: requesting logs. 2054 * 2055 * !!! 2056 * Called with REP_SYSTEM_LOCK held. 2057 */ 2058static int 2059__rep_nextfile(env, eid, rep) 2060 ENV *env; 2061 int eid; 2062 REP *rep; 2063{ 2064 DBT dbt; 2065 __rep_logreq_args lr_args; 2066 int ret; 2067 u_int8_t *buf, lrbuf[__REP_LOGREQ_SIZE]; 2068 size_t len, msgsz; 2069 2070 /* 2071 * Always direct the next request to the master (at least nominally), 2072 * regardless of where the current response came from. The application 2073 * can always still redirect it to another client. 2074 */ 2075 if (rep->master_id != DB_EID_INVALID) 2076 eid = rep->master_id; 2077 if (rep->curfile == rep->nfiles) { 2078 RPRINT(env, DB_VERB_REP_SYNC, (env, 2079 "NEXTFILE: have %d files. RECOVER_LOG now", rep->nfiles)); 2080 /* 2081 * Move to REP_RECOVER_LOG state. 2082 * Request logs. 2083 */ 2084 /* 2085 * We need to do a sync here so that any later opens 2086 * can find the file and file id. We need to do it 2087 * before we clear REP_F_RECOVER_PAGE so that we do not 2088 * try to flush the log. 2089 */ 2090 if ((ret = __memp_sync_int(env, NULL, 0, 2091 DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0) 2092 return (ret); 2093 F_CLR(rep, REP_F_RECOVER_PAGE); 2094 F_SET(rep, REP_F_RECOVER_LOG); 2095 memset(&dbt, 0, sizeof(dbt)); 2096 lr_args.endlsn = rep->last_lsn; 2097 if ((ret = __rep_logreq_marshal(env, &lr_args, lrbuf, 2098 __REP_LOGREQ_SIZE, &len)) != 0) 2099 return (ret); 2100 DB_INIT_DBT(dbt, lrbuf, len); 2101 REP_SYSTEM_UNLOCK(env); 2102 if ((ret = __rep_log_setup(env, rep, 2103 rep->first_lsn.file, rep->first_vers, NULL)) != 0) 2104 return (ret); 2105 RPRINT(env, DB_VERB_REP_SYNC, (env, 2106 "NEXTFILE: LOG_REQ from LSN [%lu][%lu] to [%lu][%lu]", 2107 (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset, 2108 (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset)); 2109 (void)__rep_send_message(env, eid, 2110 REP_LOG_REQ, &rep->first_lsn, &dbt, 2111 REPCTL_INIT, DB_REP_ANYWHERE); 2112 REP_SYSTEM_LOCK(env); 2113 return (0); 2114 } 2115 2116 /* 2117 * 4. If not, set curinfo to next file and request its pages. 2118 */ 2119 rep->finfo = rep->nextinfo; 2120 if ((ret = __rep_fileinfo_unmarshal(env, rep->infoversion, 2121 &rep->curinfo, rep->finfo, rep->infolen, &rep->nextinfo)) != 0) { 2122 RPRINT(env, DB_VERB_REP_SYNC, (env, 2123 "NEXTINFO: Fileinfo read: %s", db_strerror(ret))); 2124 return (ret); 2125 } 2126 DB_ASSERT(env, rep->curinfo->pgno == 0); 2127 rep->infolen -= (u_int32_t)(rep->nextinfo - rep->finfo); 2128 rep->ready_pg = 0; 2129 rep->npages = 0; 2130 rep->waiting_pg = PGNO_INVALID; 2131 rep->max_wait_pg = PGNO_INVALID; 2132 memset(&dbt, 0, sizeof(dbt)); 2133 RPRINT(env, DB_VERB_REP_SYNC, (env, 2134 "Next file %d: pgsize %lu, maxpg %lu", rep->curinfo->filenum, 2135 (u_long)rep->curinfo->pgsize, (u_long)rep->curinfo->max_pgno)); 2136 msgsz = __REP_FILEINFO_SIZE + 2137 rep->curinfo->uid.size + rep->curinfo->info.size; 2138 if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0) 2139 return (ret); 2140 if ((ret = __rep_fileinfo_marshal(env, rep->infoversion, 2141 rep->curinfo, buf, msgsz, &len)) != 0) 2142 return (ret); 2143 DB_INIT_DBT(dbt, buf, len); 2144 (void)__rep_send_message(env, eid, REP_PAGE_REQ, 2145 NULL, &dbt, 0, DB_REP_ANYWHERE); 2146 __os_free(env, buf); 2147 2148 return (0); 2149} 2150 2151/* 2152 * __rep_mpf_open - 2153 * Create and open the mpool file for a database. 2154 * Used by both master and client to bring files into mpool. 2155 */ 2156static int 2157__rep_mpf_open(env, mpfp, rfp, flags) 2158 ENV *env; 2159 DB_MPOOLFILE **mpfp; 2160 __rep_fileinfo_args *rfp; 2161 u_int32_t flags; 2162{ 2163 DB db; 2164 int ret; 2165 2166 if ((ret = __memp_fcreate(env, mpfp)) != 0) 2167 return (ret); 2168 2169 /* 2170 * We need a dbp to pass into to __env_mpool. Set up 2171 * only the parts that it needs. 2172 */ 2173 db.env = env; 2174 db.type = (DBTYPE)rfp->type; 2175 db.pgsize = rfp->pgsize; 2176 memcpy(db.fileid, rfp->uid.data, DB_FILE_ID_LEN); 2177 db.flags = rfp->db_flags; 2178 /* We need to make sure the dbp isn't marked open. */ 2179 F_CLR(&db, DB_AM_OPEN_CALLED); 2180 /* 2181 * The byte order of this database may be different from my local native 2182 * byte order. If so, set the swap bit so that the necessary swapping 2183 * will be done during file I/O. 2184 */ 2185 if ((F_ISSET(env, ENV_LITTLEENDIAN) && 2186 !FLD_ISSET(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN)) || 2187 (!F_ISSET(env, ENV_LITTLEENDIAN) && 2188 FLD_ISSET(rfp->finfo_flags, REPINFO_DB_LITTLEENDIAN))) { 2189 RPRINT(env, DB_VERB_REP_SYNC, (env, 2190 "rep_mpf_open: Different endian database. Set swap bit.")); 2191 F_SET(&db, DB_AM_SWAP); 2192 } else 2193 F_CLR(&db, DB_AM_SWAP); 2194 2195 db.mpf = *mpfp; 2196 if (F_ISSET(&db, DB_AM_INMEM)) 2197 (void)__memp_set_flags(db.mpf, DB_MPOOL_NOFILE, 1); 2198 if ((ret = __env_mpool(&db, rfp->info.data, flags)) != 0) { 2199 (void)__memp_fclose(db.mpf, 0); 2200 *mpfp = NULL; 2201 } 2202 return (ret); 2203} 2204 2205/* 2206 * __rep_pggap_req - 2207 * Request a page gap. Assumes the caller holds the rep_mutex. 2208 * 2209 * PUBLIC: int __rep_pggap_req __P((ENV *, REP *, __rep_fileinfo_args *, 2210 * PUBLIC: u_int32_t)); 2211 */ 2212int 2213__rep_pggap_req(env, rep, reqfp, gapflags) 2214 ENV *env; 2215 REP *rep; 2216 __rep_fileinfo_args *reqfp; 2217 u_int32_t gapflags; 2218{ 2219 DBT max_pg_dbt; 2220 __rep_fileinfo_args *tmpfp, t; 2221 size_t len, msgsz; 2222 u_int32_t flags; 2223 int alloc, ret; 2224 u_int8_t *buf; 2225 2226 ret = 0; 2227 alloc = 0; 2228 /* 2229 * There is a window where we have to set REP_RECOVER_PAGE when 2230 * we receive the update information to transition from getting 2231 * file information to getting page information. However, that 2232 * thread does release and then reacquire mutexes. So, we might 2233 * try re-requesting before the original thread can get curinfo 2234 * setup. If curinfo isn't set up there is nothing to do. 2235 */ 2236 if (rep->curinfo == NULL) 2237 return (0); 2238 if (reqfp == NULL) { 2239 if ((ret = __rep_finfo_alloc(env, rep->curinfo, &tmpfp)) != 0) 2240 return (ret); 2241 alloc = 1; 2242 } else { 2243 t = *reqfp; 2244 tmpfp = &t; 2245 } 2246 2247 /* 2248 * If we've never requested this page, then 2249 * request everything between it and the first 2250 * page we have. If we have requested this page 2251 * then only request this record, not the entire gap. 2252 */ 2253 flags = 0; 2254 memset(&max_pg_dbt, 0, sizeof(max_pg_dbt)); 2255 /* 2256 * If this is a PAGE_MORE and we're forcing then we want to 2257 * force the request to ask for the next page after this one. 2258 */ 2259 if (FLD_ISSET(gapflags, REP_GAP_FORCE)) 2260 tmpfp->pgno++; 2261 else 2262 tmpfp->pgno = rep->ready_pg; 2263 msgsz = __REP_FILEINFO_SIZE + 2264 tmpfp->uid.size + tmpfp->info.size; 2265 if ((ret = __os_calloc(env, 1, msgsz, &buf)) != 0) 2266 goto err; 2267 if (rep->max_wait_pg == PGNO_INVALID || 2268 FLD_ISSET(gapflags, REP_GAP_FORCE | REP_GAP_REREQUEST)) { 2269 /* 2270 * Request the gap - set max to waiting_pg - 1 or if 2271 * there is no waiting_pg, just ask for one. 2272 */ 2273 if (rep->waiting_pg == PGNO_INVALID) { 2274 if (FLD_ISSET(gapflags, 2275 REP_GAP_FORCE | REP_GAP_REREQUEST)) 2276 rep->max_wait_pg = rep->curinfo->max_pgno; 2277 else 2278 rep->max_wait_pg = rep->ready_pg; 2279 } else { 2280 /* 2281 * If we're forcing, and waiting_pg is less than 2282 * the page we want to start this request at, then 2283 * we set max_wait_pg to the max pgno in the file. 2284 */ 2285 if (FLD_ISSET(gapflags, REP_GAP_FORCE) && 2286 rep->waiting_pg < tmpfp->pgno) 2287 rep->max_wait_pg = rep->curinfo->max_pgno; 2288 else 2289 rep->max_wait_pg = rep->waiting_pg - 1; 2290 } 2291 tmpfp->max_pgno = rep->max_wait_pg; 2292 /* 2293 * Gap requests are "new" and can go anywhere. 2294 */ 2295 if (FLD_ISSET(gapflags, REP_GAP_REREQUEST)) 2296 flags = DB_REP_REREQUEST; 2297 else 2298 flags = DB_REP_ANYWHERE; 2299 } else { 2300 /* 2301 * Request 1 page - set max to ready_pg. 2302 */ 2303 rep->max_wait_pg = rep->ready_pg; 2304 tmpfp->max_pgno = rep->ready_pg; 2305 /* 2306 * If we're dropping to singletons, this is a rerequest. 2307 */ 2308 flags = DB_REP_REREQUEST; 2309 } 2310 if (rep->master_id != DB_EID_INVALID) { 2311 STAT(rep->stat.st_pg_requested++); 2312 /* 2313 * We need to request the pages, but we need to get the 2314 * new info into rep->finfo. Assert that the sizes never 2315 * change. The only thing this should do is change 2316 * the pgno field. Everything else remains the same. 2317 */ 2318 if ((ret = __rep_fileinfo_marshal(env, rep->infoversion, 2319 tmpfp, buf, msgsz, &len)) == 0) { 2320 DB_INIT_DBT(max_pg_dbt, buf, len); 2321 DB_ASSERT(env, len == max_pg_dbt.size); 2322 (void)__rep_send_message(env, rep->master_id, 2323 REP_PAGE_REQ, NULL, &max_pg_dbt, 0, flags); 2324 } 2325 } else 2326 (void)__rep_send_message(env, DB_EID_BROADCAST, 2327 REP_MASTER_REQ, NULL, NULL, 0, 0); 2328 2329 __os_free(env, buf); 2330err: 2331 if (alloc) 2332 __os_free(env, tmpfp); 2333 return (ret); 2334} 2335 2336/* 2337 * __rep_finfo_alloc - 2338 * Allocate and initialize a fileinfo structure. 2339 * 2340 * PUBLIC: int __rep_finfo_alloc __P((ENV *, __rep_fileinfo_args *, 2341 * PUBLIC: __rep_fileinfo_args **)); 2342 */ 2343int 2344__rep_finfo_alloc(env, rfpsrc, rfpp) 2345 ENV *env; 2346 __rep_fileinfo_args *rfpsrc, **rfpp; 2347{ 2348 __rep_fileinfo_args *rfp; 2349 size_t size; 2350 int ret; 2351 void *uidp, *infop; 2352 2353 /* 2354 * Allocate enough for the structure and the two DBT data areas. 2355 */ 2356 size = sizeof(__rep_fileinfo_args) + rfpsrc->uid.size + 2357 rfpsrc->info.size; 2358 if ((ret = __os_malloc(env, size, &rfp)) != 0) 2359 return (ret); 2360 2361 /* 2362 * Copy the structure itself, and then set the DBT data pointers 2363 * to their space and copy the data itself as well. 2364 */ 2365 memcpy(rfp, rfpsrc, sizeof(__rep_fileinfo_args)); 2366 uidp = (u_int8_t *)rfp + sizeof(__rep_fileinfo_args); 2367 rfp->uid.data = uidp; 2368 memcpy(uidp, rfpsrc->uid.data, rfpsrc->uid.size); 2369 2370 infop = (u_int8_t *)uidp + rfpsrc->uid.size; 2371 rfp->info.data = infop; 2372 memcpy(infop, rfpsrc->info.data, rfpsrc->info.size); 2373 *rfpp = rfp; 2374 return (ret); 2375} 2376 2377/* 2378 * __rep_log_setup - 2379 * We know our first LSN and need to reset the log subsystem 2380 * to get our logs set up for the proper file. 2381 */ 2382static int 2383__rep_log_setup(env, rep, file, version, lsnp) 2384 ENV *env; 2385 REP *rep; 2386 u_int32_t file; 2387 u_int32_t version; 2388 DB_LSN *lsnp; 2389{ 2390 DB_LOG *dblp; 2391 DB_LSN lsn; 2392 DB_TXNMGR *mgr; 2393 DB_TXNREGION *region; 2394 LOG *lp; 2395 int ret; 2396 2397 dblp = env->lg_handle; 2398 lp = dblp->reginfo.primary; 2399 mgr = env->tx_handle; 2400 region = mgr->reginfo.primary; 2401 2402 /* 2403 * Set up the log starting at the file number of the first LSN we 2404 * need to get from the master. 2405 */ 2406 LOG_SYSTEM_LOCK(env); 2407 if ((ret = __log_newfile(dblp, &lsn, file, version)) == 0 && 2408 lsnp != NULL) 2409 *lsnp = lsn; 2410 LOG_SYSTEM_UNLOCK(env); 2411 2412 /* 2413 * We reset first_lsn to the lp->lsn. We were given the LSN of 2414 * the checkpoint and we now need the LSN for the beginning of 2415 * the file, which __log_newfile conveniently set up for us 2416 * in lp->lsn. 2417 */ 2418 rep->first_lsn = lp->lsn; 2419 TXN_SYSTEM_LOCK(env); 2420 ZERO_LSN(region->last_ckp); 2421 TXN_SYSTEM_UNLOCK(env); 2422 return (ret); 2423} 2424 2425/* 2426 * __rep_queue_filedone - 2427 * Determine if we're really done getting the pages for a queue file. 2428 * Queue is handled in several steps. 2429 * 1. First we get the meta page only. 2430 * 2. We use the meta-page information to figure out first and last 2431 * page numbers (and if queue wraps, first can be > last. 2432 * 3. If first < last, we do a REP_PAGE_REQ for all pages. 2433 * 4. If first > last, we REP_PAGE_REQ from first -> max page number. 2434 * Then we'll ask for page 1 -> last. 2435 * 2436 * This function can return several things: 2437 * DB_REP_PAGEDONE - if we're done with this file. 2438 * 0 - if we're not done with this file. 2439 * error - if we get an error doing some operations. 2440 * 2441 * This function will open a dbp handle to the queue file. This is needed 2442 * by most of the QAM macros. We'll open it on the first pass through 2443 * here and we'll close it whenever we decide we're done. 2444 */ 2445static int 2446__rep_queue_filedone(env, ip, rep, rfp) 2447 ENV *env; 2448 DB_THREAD_INFO *ip; 2449 REP *rep; 2450 __rep_fileinfo_args *rfp; 2451{ 2452#ifndef HAVE_QUEUE 2453 COMPQUIET(ip, NULL); 2454 COMPQUIET(rep, NULL); 2455 COMPQUIET(rfp, NULL); 2456 return (__db_no_queue_am(env)); 2457#else 2458 DB *queue_dbp; 2459 db_pgno_t first, last; 2460 u_int32_t flags; 2461 int empty, ret, t_ret; 2462 2463 ret = 0; 2464 queue_dbp = NULL; 2465 if (rep->queue_dbc == NULL) { 2466 /* 2467 * We need to do a sync here so that the open 2468 * can find the file and file id. 2469 */ 2470 if ((ret = __memp_sync_int(env, NULL, 0, 2471 DB_SYNC_CACHE | DB_SYNC_INTERRUPT_OK, NULL, NULL)) != 0) 2472 goto out; 2473 if ((ret = 2474 __db_create_internal(&queue_dbp, env, 0)) != 0) 2475 goto out; 2476 flags = DB_NO_AUTO_COMMIT | 2477 (F_ISSET(env, ENV_THREAD) ? DB_THREAD : 0); 2478 /* 2479 * We need to check whether this is in-memory so that we pass 2480 * the name correctly as either the file or the database name. 2481 */ 2482 if ((ret = __db_open(queue_dbp, ip, NULL, 2483 FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? NULL : 2484 rfp->info.data, 2485 FLD_ISSET(rfp->db_flags, DB_AM_INMEM) ? rfp->info.data : 2486 NULL, 2487 DB_QUEUE, flags, 0, PGNO_BASE_MD)) != 0) 2488 goto out; 2489 2490 if ((ret = __db_cursor(queue_dbp, 2491 ip, NULL, &rep->queue_dbc, 0)) != 0) 2492 goto out; 2493 } else 2494 queue_dbp = rep->queue_dbc->dbp; 2495 2496 if ((ret = __queue_pageinfo(queue_dbp, 2497 &first, &last, &empty, 0, 0)) != 0) 2498 goto out; 2499 RPRINT(env, DB_VERB_REP_SYNC, (env, 2500 "Queue fileinfo: first %lu, last %lu, empty %d", 2501 (u_long)first, (u_long)last, empty)); 2502 /* 2503 * We can be at the end of 3 possible states. 2504 * 1. We have received the meta-page and now need to get the 2505 * rest of the pages in the database. 2506 * 2. We have received from first -> max_pgno. We might be done, 2507 * or we might need to ask for wrapped pages. 2508 * 3. We have received all pages in the file. We're done. 2509 */ 2510 if (rfp->max_pgno == 0) { 2511 /* 2512 * We have just received the meta page. Set up the next 2513 * pages to ask for and check if the file is empty. 2514 */ 2515 if (empty) 2516 goto out; 2517 if (first > last) { 2518 rfp->max_pgno = 2519 QAM_RECNO_PAGE(rep->queue_dbc->dbp, UINT32_MAX); 2520 } else 2521 rfp->max_pgno = last; 2522 RPRINT(env, DB_VERB_REP_SYNC, (env, 2523 "Queue fileinfo: First req: first %lu, last %lu", 2524 (u_long)first, (u_long)rfp->max_pgno)); 2525 goto req; 2526 } else if (rfp->max_pgno != last) { 2527 /* 2528 * If max_pgno != last that means we're dealing with a 2529 * wrapped situation. Request next batch of pages. 2530 * Set npages to 1 because we already have page 0, the 2531 * meta-page, now we need pages 1-max_pgno. 2532 */ 2533 first = 1; 2534 rfp->max_pgno = last; 2535 RPRINT(env, DB_VERB_REP_SYNC, (env, 2536 "Queue fileinfo: Wrap req: first %lu, last %lu", 2537 (u_long)first, (u_long)last)); 2538req: 2539 /* 2540 * Since we're simulating a "gap" to resend new PAGE_REQ 2541 * for this file, we need to set waiting page to last + 1 2542 * so that we'll ask for all from ready_pg -> last. 2543 */ 2544 rep->npages = first; 2545 rep->ready_pg = first; 2546 rep->waiting_pg = rfp->max_pgno + 1; 2547 rep->max_wait_pg = PGNO_INVALID; 2548 ret = __rep_pggap_req(env, rep, rfp, 0); 2549 return (ret); 2550 } 2551 /* 2552 * max_pgno == last 2553 * If we get here, we have all the pages we need. 2554 * Close the dbp and return. 2555 */ 2556out: 2557 if (rep->queue_dbc != NULL && 2558 (t_ret = __dbc_close(rep->queue_dbc)) != 0 && ret == 0) 2559 ret = t_ret; 2560 rep->queue_dbc = NULL; 2561 2562 if (queue_dbp != NULL && 2563 (t_ret = __db_close(queue_dbp, NULL, DB_NOSYNC)) != 0 && ret == 0) 2564 ret = t_ret; 2565 if (ret == 0) 2566 ret = DB_REP_PAGEDONE; 2567 return (ret); 2568#endif 2569} 2570 2571/* 2572 * PUBLIC: int __rep_remove_init_file __P((ENV *)); 2573 */ 2574int 2575__rep_remove_init_file(env) 2576 ENV *env; 2577{ 2578 int ret; 2579 char *name; 2580 2581 if ((ret = __db_appname( 2582 env, DB_APP_NONE, REP_INITNAME, 0, NULL, &name)) != 0) 2583 return (ret); 2584 (void)__os_unlink(env, name, 0); 2585 __os_free(env, name); 2586 return (0); 2587} 2588 2589/* 2590 * Checks for the existence of the internal init flag file. If it exists, we 2591 * remove all logs and databases, and then remove the flag file. This is 2592 * intended to force the internal init to start over again, and thus affords 2593 * protection against a client crashing during internal init. This function 2594 * must be called before normal recovery in order to be properly effective. 2595 * 2596 * !!! 2597 * This function should only be called during initial set-up of the environment, 2598 * before various subsystems are initialized. It doesn't rely on the 2599 * subsystems' code having been initialized, and it summarily deletes files "out 2600 * from under" them, which might disturb the subsystems if they were up. 2601 * 2602 * PUBLIC: int __rep_reset_init __P((ENV *)); 2603 */ 2604int 2605__rep_reset_init(env) 2606 ENV *env; 2607{ 2608 DB_FH *fhp; 2609 __rep_update_args *rup; 2610 DBT dbt; 2611 char *allocated_dir, *dir, *init_name; 2612 size_t cnt; 2613 u_int32_t dbtvers, fvers, zero; 2614 u_int8_t *next; 2615 int ret, t_ret; 2616 2617 allocated_dir = NULL; 2618 rup = NULL; 2619 dbt.data = NULL; 2620 2621 if ((ret = __db_appname( 2622 env, DB_APP_NONE, REP_INITNAME, 0, NULL, &init_name)) != 0) 2623 return (ret); 2624 2625 if ((ret = __os_open( 2626 env, init_name, 0, DB_OSO_RDONLY, DB_MODE_600, &fhp)) != 0) { 2627 if (ret == ENOENT) 2628 ret = 0; 2629 goto out; 2630 } 2631 2632 RPRINT(env, DB_VERB_REP_SYNC, 2633 (env, "Cleaning up interrupted internal init")); 2634 2635 /* There are a few possibilities: 2636 * 1. no init file, or less than 1 full file list 2637 * 2. exactly one full file list 2638 * 3. more than one, less then a second full file list 2639 * 4. second file list in full 2640 * 2641 * In cases 2 or 4, we need to remove all logs, and then remove files 2642 * according to the (most recent) file list. (In case 1 or 3, we don't 2643 * have to do anything.) 2644 * 2645 * The __rep_get_file_list function takes care of folding these cases 2646 * into two simple outcomes. 2647 * 2648 * As of 4.7, the first 4 bytes are 0. Read the first 4 bytes now. 2649 * If they are non-zero it means we have an old-style init file. 2650 * Otherwise, pass the file version in to rep_get_file_list. 2651 */ 2652 if ((ret = __os_read(env, fhp, &zero, sizeof(zero), &cnt)) != 0) 2653 goto out; 2654 /* 2655 * If we read successfully, but not enough, then unlink the file. 2656 */ 2657 if (cnt != sizeof(zero)) 2658 goto rm; 2659 if (zero != 0) { 2660 /* 2661 * Old style file. We have to set fvers to the 4.6 2662 * version of the file and also rewind the file so 2663 * that __rep_get_file_list can read out the length itself. 2664 */ 2665 if ((ret = __os_seek(env, fhp, 0, 0, 0)) != 0) 2666 goto out; 2667 fvers = REP_INITVERSION_46; 2668 } else if ((ret = __os_read(env, 2669 fhp, &fvers, sizeof(fvers), &cnt)) != 0) 2670 goto out; 2671 else if (cnt != sizeof(fvers)) 2672 goto rm; 2673 ret = __rep_get_file_list(env, fhp, fvers, &dbtvers, &dbt); 2674 if ((t_ret = __os_closehandle(env, fhp)) != 0 || ret != 0) { 2675 if (ret == 0) 2676 ret = t_ret; 2677 goto out; 2678 } 2679 if (dbt.data == NULL) { 2680 /* 2681 * The init file did not end with an intact file list. Since we 2682 * never start log/db removal without an intact file list 2683 * sync'ed to the init file, this must mean we don't have any 2684 * partial set of files to clean up. So all we need to do is 2685 * remove the init file. 2686 */ 2687 goto rm; 2688 } 2689 2690 /* Remove all log files. */ 2691 if (env->dbenv->db_log_dir == NULL) 2692 dir = env->db_home; 2693 else { 2694 if ((ret = __db_appname(env, DB_APP_NONE, 2695 env->dbenv->db_log_dir, 0, NULL, &dir)) != 0) 2696 goto out; 2697 allocated_dir = dir; 2698 } 2699 2700 if ((ret = __rep_remove_by_prefix(env, 2701 dir, LFPREFIX, sizeof(LFPREFIX)-1, DB_APP_LOG)) != 0) 2702 goto out; 2703 2704 /* 2705 * Remove databases according to the list, and queue extent files by 2706 * searching them out on a walk through the data_dir's. 2707 */ 2708 if ((ret = __rep_update_unmarshal(env, dbtvers, 2709 &rup, dbt.data, dbt.size, &next)) != 0) 2710 goto out; 2711 if ((ret = __rep_remove_by_list(env, dbtvers, 2712 next, dbt.size, rup->num_files)) != 0) 2713 goto out; 2714 2715 /* Here, we've established that the file exists. */ 2716rm: (void)__os_unlink(env, init_name, 0); 2717out: if (rup != NULL) 2718 __os_free(env, rup); 2719 if (allocated_dir != NULL) 2720 __os_free(env, allocated_dir); 2721 if (dbt.data != NULL) 2722 __os_free(env, dbt.data); 2723 2724 __os_free(env, init_name); 2725 return (ret); 2726} 2727 2728/* 2729 * Reads the last fully intact file list from the init file. If the file ends 2730 * with a partial list (or is empty), we're not interested in it. Lack of a 2731 * full file list is indicated by a NULL dbt->data. On success, the list is 2732 * returned in allocated space, which becomes the responsibility of the caller. 2733 * 2734 * The file format is a u_int32_t buffer length, in native format, followed by 2735 * the file list itself, in the same format as in an UPDATE message (though 2736 * many parts of it in this case are meaningless). 2737 */ 2738static int 2739__rep_get_file_list(env, fhp, fvers, dbtvers, dbt) 2740 ENV *env; 2741 DB_FH *fhp; 2742 u_int32_t fvers; 2743 u_int32_t *dbtvers; 2744 DBT *dbt; 2745{ 2746 u_int32_t length, mvers; 2747 size_t cnt; 2748 int i, ret; 2749 2750 /* At most 2 file lists: old and new. */ 2751 dbt->data = NULL; 2752 mvers = DB_REPVERSION_46; 2753 length = 0; 2754 for (i = 1; i <= 2; i++) { 2755 if (fvers >= REP_INITVERSION_47) { 2756 if ((ret = __os_read(env, fhp, &mvers, 2757 sizeof(mvers), &cnt)) != 0) 2758 goto err; 2759 if (cnt == 0 && dbt->data != NULL) 2760 break; 2761 if (cnt != sizeof(mvers)) 2762 goto err; 2763 } 2764 if ((ret = __os_read(env, 2765 fhp, &length, sizeof(length), &cnt)) != 0) 2766 goto err; 2767 2768 /* 2769 * Reaching the end here is fine, if we've been through at least 2770 * once already. 2771 */ 2772 if (cnt == 0 && dbt->data != NULL) 2773 break; 2774 if (cnt != sizeof(length)) 2775 goto err; 2776 2777 if ((ret = __os_realloc(env, 2778 (size_t)length, &dbt->data)) != 0) 2779 goto err; 2780 2781 if ((ret = __os_read( 2782 env, fhp, dbt->data, length, &cnt)) != 0 || 2783 cnt != (size_t)length) 2784 goto err; 2785 } 2786 2787 *dbtvers = mvers; 2788 dbt->size = length; 2789 return (0); 2790 2791err: 2792 /* 2793 * Note that it's OK to get here with a zero value in 'ret': it means we 2794 * read less than we expected, and dbt->data == NULL indicates to the 2795 * caller that we don't have an intact list. 2796 */ 2797 if (dbt->data != NULL) 2798 __os_free(env, dbt->data); 2799 dbt->data = NULL; 2800 return (ret); 2801} 2802 2803/* 2804 * Removes every file in a given directory that matches a given prefix. Notice 2805 * how similar this is to __rep_walk_dir. 2806 */ 2807static int 2808__rep_remove_by_prefix(env, dir, prefix, pref_len, appname) 2809 ENV *env; 2810 const char *dir; 2811 const char *prefix; 2812 size_t pref_len; 2813 APPNAME appname; /* What kind of name. */ 2814{ 2815 char *namep, **names; 2816 int cnt, i, ret; 2817 2818 if ((ret = __os_dirlist(env, dir, 0, &names, &cnt)) != 0) 2819 return (ret); 2820 for (i = 0; i < cnt; i++) { 2821 if (strncmp(names[i], prefix, pref_len) == 0) { 2822 if ((ret = __db_appname(env, 2823 appname, names[i], 0, NULL, &namep)) != 0) 2824 goto out; 2825 (void)__os_unlink(env, namep, 0); 2826 __os_free(env, namep); 2827 } 2828 } 2829out: __os_dirfree(env, names, cnt); 2830 return (ret); 2831} 2832 2833/* 2834 * Removes database files according to the contents of a list. 2835 * 2836 * This function must support removal either during environment creation, or 2837 * when an internal init is reset in the middle. This means it must work 2838 * regardless of whether underlying subsystems are initialized. However, it may 2839 * assume that databases are not open. That means there is no REP! 2840 */ 2841static int 2842__rep_remove_by_list(env, version, filelist, filesz, count) 2843 ENV *env; 2844 u_int32_t version; 2845 u_int8_t *filelist; 2846 u_int32_t filesz; 2847 u_int32_t count; 2848{ 2849 DB_ENV *dbenv; 2850 __rep_fileinfo_args *rfp; 2851 char **ddir, *dir, *namep; 2852 u_int8_t *new_fp; 2853 int ret; 2854 2855 dbenv = env->dbenv; 2856 ret = 0; 2857 rfp = NULL; 2858 while (count-- > 0) { 2859 if ((ret = __rep_fileinfo_unmarshal(env, version, 2860 &rfp, filelist, filesz, &new_fp)) != 0) 2861 goto out; 2862 filesz -= (u_int32_t)(new_fp - filelist); 2863 filelist = new_fp; 2864 if ((ret = __db_appname(env, 2865 DB_APP_DATA, rfp->info.data, 0, NULL, &namep)) != 0) 2866 goto out; 2867 (void)__os_unlink(env, namep, 0); 2868 __os_free(env, namep); 2869 __os_free(env, rfp); 2870 rfp = NULL; 2871 } 2872 2873 /* Notice how similar this code is to __rep_find_dbs. */ 2874 if (dbenv->db_data_dir == NULL) 2875 ret = __rep_remove_by_prefix(env, env->db_home, 2876 QUEUE_EXTENT_PREFIX, sizeof(QUEUE_EXTENT_PREFIX) - 1, 2877 DB_APP_DATA); 2878 else { 2879 for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir) { 2880 if ((ret = __db_appname(env, DB_APP_NONE, 2881 *ddir, 0, NULL, &dir)) != 0) 2882 break; 2883 ret = __rep_remove_by_prefix(env, dir, 2884 QUEUE_EXTENT_PREFIX, sizeof(QUEUE_EXTENT_PREFIX)-1, 2885 DB_APP_DATA); 2886 __os_free(env, dir); 2887 if (ret != 0) 2888 break; 2889 } 2890 } 2891 2892out: 2893 if (rfp != NULL) 2894 __os_free(env, rfp); 2895 return (ret); 2896} 2897