1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2006-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include <errno.h> 10#include <stdlib.h> 11#include <string.h> 12#include <time.h> 13 14#include <db.h> 15 16#include "rep_common.h" 17 18#define CACHESIZE (10 * 1024 * 1024) 19#define DATABASE "quote.db" 20#define SLEEPTIME 3 21 22static int print_stocks __P((DB *)); 23 24/* 25 * Perform command line parsing and common replication setup for the repmgr 26 * and base replication example programs. 27 */ 28int 29common_rep_setup(dbenv, argc, argv, setup_info) 30 DB_ENV *dbenv; 31 int argc; 32 char *argv[]; 33 SETUP_DATA *setup_info; 34{ 35 repsite_t site; 36 extern char *optarg; 37 char ch, *portstr; 38 int ack_policy, got_self, is_repmgr, maxsites, priority, ret; 39 40 got_self = is_repmgr = maxsites = ret = site.peer = 0; 41 42 priority = 100; 43 ack_policy = DB_REPMGR_ACKS_QUORUM; 44 setup_info->role = UNKNOWN; 45 if (strncmp(setup_info->progname, "ex_rep_mgr", 10) == 0) 46 is_repmgr = 1; 47 48 /* 49 * Replication setup calls that are only needed if a command 50 * line option is specified are made within this while/switch 51 * statement. Replication setup calls that should be made 52 * whether or not a command line option is specified are after 53 * this while/switch statement. 54 */ 55 while ((ch = getopt(argc, argv, "a:bCh:l:Mn:p:R:r:v")) != EOF) { 56 switch (ch) { 57 case 'a': 58 if (!is_repmgr) 59 usage(is_repmgr, setup_info->progname); 60 if (strncmp(optarg, "all", 3) == 0) 61 ack_policy = DB_REPMGR_ACKS_ALL; 62 else if (strncmp(optarg, "quorum", 6) != 0) 63 usage(is_repmgr, setup_info->progname); 64 break; 65 case 'b': 66 /* 67 * Configure bulk transfer to send groups of records 68 * to clients in a single network transfer. This is 69 * useful for master sites and clients participating 70 * in client-to-client synchronization. 71 */ 72 if ((ret = dbenv->rep_set_config(dbenv, 73 DB_REP_CONF_BULK, 1)) != 0) { 74 dbenv->err(dbenv, ret, 75 "Could not configure bulk transfer.\n"); 76 goto err; 77 } 78 break; 79 case 'C': 80 setup_info->role = CLIENT; 81 break; 82 case 'h': 83 setup_info->home = optarg; 84 break; 85 case 'l': 86 setup_info->self.host = strtok(optarg, ":"); 87 if ((portstr = strtok(NULL, ":")) == NULL) { 88 fprintf(stderr, "Bad host specification.\n"); 89 goto err; 90 } 91 setup_info->self.port = (unsigned short)atoi(portstr); 92 setup_info->self.peer = 0; 93 got_self = 1; 94 break; 95 case 'M': 96 setup_info->role = MASTER; 97 break; 98 case 'n': 99 setup_info->nsites = atoi(optarg); 100 /* 101 * For repmgr, set the total number of sites in the 102 * replication group. This is used by repmgr internal 103 * election processing. For base replication, nsites 104 * is simply passed back to main for use in its 105 * communications and election processing. 106 */ 107 if (is_repmgr && setup_info->nsites > 0 && 108 (ret = dbenv->rep_set_nsites(dbenv, 109 setup_info->nsites)) != 0) { 110 dbenv->err(dbenv, ret, 111 "Could not set nsites.\n"); 112 goto err; 113 } 114 break; 115 case 'p': 116 priority = atoi(optarg); 117 break; 118 case 'R': 119 if (!is_repmgr) 120 usage(is_repmgr, setup_info->progname); 121 site.peer = 1; /* FALLTHROUGH */ 122 case 'r': 123 site.host = optarg; 124 site.host = strtok(site.host, ":"); 125 if ((portstr = strtok(NULL, ":")) == NULL) { 126 fprintf(stderr, "Bad host specification.\n"); 127 goto err; 128 } 129 site.port = (unsigned short)atoi(portstr); 130 if (setup_info->site_list == NULL || 131 setup_info->remotesites >= maxsites) { 132 maxsites = maxsites == 0 ? 10 : 2 * maxsites; 133 if ((setup_info->site_list = 134 realloc(setup_info->site_list, 135 maxsites * sizeof(repsite_t))) == NULL) { 136 fprintf(stderr, "System error %s\n", 137 strerror(errno)); 138 goto err; 139 } 140 } 141 (setup_info->site_list)[(setup_info->remotesites)++] = 142 site; 143 site.peer = 0; 144 break; 145 case 'v': 146 if ((ret = dbenv->set_verbose(dbenv, 147 DB_VERB_REPLICATION, 1)) != 0) 148 goto err; 149 break; 150 case '?': 151 default: 152 usage(is_repmgr, setup_info->progname); 153 } 154 } 155 156 /* Error check command line. */ 157 if (!got_self || setup_info->home == NULL) 158 usage(is_repmgr, setup_info->progname); 159 if (!is_repmgr && setup_info->role == UNKNOWN) { 160 fprintf(stderr, "Must specify -M or -C.\n"); 161 goto err; 162 } 163 164 /* 165 * Set replication group election priority for this environment. 166 * An election first selects the site with the most recent log 167 * records as the new master. If multiple sites have the most 168 * recent log records, the site with the highest priority value 169 * is selected as master. 170 */ 171 if ((ret = dbenv->rep_set_priority(dbenv, priority)) != 0) { 172 dbenv->err(dbenv, ret, "Could not set priority.\n"); 173 goto err; 174 } 175 176 /* 177 * For repmgr, set the policy that determines how master and client 178 * sites handle acknowledgement of replication messages needed for 179 * permanent records. The default policy of "quorum" requires only 180 * a quorum of electable peers sufficient to ensure a permanent 181 * record remains durable if an election is held. The "all" option 182 * requires all clients to acknowledge a permanent replication 183 * message instead. 184 */ 185 if (is_repmgr && 186 (ret = dbenv->repmgr_set_ack_policy(dbenv, ack_policy)) != 0) { 187 dbenv->err(dbenv, ret, "Could not set ack policy.\n"); 188 goto err; 189 } 190 191 /* 192 * Set the threshold for the minimum and maximum time the client 193 * waits before requesting retransmission of a missing message. 194 * Base these values on the performance and load characteristics 195 * of the master and client host platforms as well as the round 196 * trip message time. 197 */ 198 if ((ret = dbenv->rep_set_request(dbenv, 20000, 500000)) != 0) { 199 dbenv->err(dbenv, ret, 200 "Could not set client_retransmission defaults.\n"); 201 goto err; 202 } 203 204 /* 205 * Configure deadlock detection to ensure that any deadlocks 206 * are broken by having one of the conflicting lock requests 207 * rejected. DB_LOCK_DEFAULT uses the lock policy specified 208 * at environment creation time or DB_LOCK_RANDOM if none was 209 * specified. 210 */ 211 if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT)) != 0) { 212 dbenv->err(dbenv, ret, 213 "Could not configure deadlock detection.\n"); 214 goto err; 215 } 216 217 /* The following base replication features may also be useful to your 218 * application. See Berkeley DB documentation for more details. 219 * - Master leases: Provide stricter consistency for data reads 220 * on a master site. 221 * - Timeouts: Customize the amount of time Berkeley DB waits 222 * for such things as an election to be concluded or a master 223 * lease to be granted. 224 * - Delayed client synchronization: Manage the master site's 225 * resources by spreading out resource-intensive client 226 * synchronizations. 227 * - Blocked client operations: Return immediately with an error 228 * instead of waiting indefinitely if a client operation is 229 * blocked by an ongoing client synchronization. 230 */ 231 232err: 233 return (ret); 234} 235 236static int 237print_stocks(dbp) 238 DB *dbp; 239{ 240 DBC *dbc; 241 DBT key, data; 242#define MAXKEYSIZE 10 243#define MAXDATASIZE 20 244 char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1]; 245 int ret, t_ret; 246 u_int32_t keysize, datasize; 247 248 if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0) { 249 dbp->err(dbp, ret, "can't open cursor"); 250 return (ret); 251 } 252 253 memset(&key, 0, sizeof(key)); 254 memset(&data, 0, sizeof(data)); 255 256 printf("\tSymbol\tPrice\n"); 257 printf("\t======\t=====\n"); 258 259 for (ret = dbc->get(dbc, &key, &data, DB_FIRST); 260 ret == 0; 261 ret = dbc->get(dbc, &key, &data, DB_NEXT)) { 262 keysize = key.size > MAXKEYSIZE ? MAXKEYSIZE : key.size; 263 memcpy(keybuf, key.data, keysize); 264 keybuf[keysize] = '\0'; 265 266 datasize = data.size >= MAXDATASIZE ? MAXDATASIZE : data.size; 267 memcpy(databuf, data.data, datasize); 268 databuf[datasize] = '\0'; 269 270 printf("\t%s\t%s\n", keybuf, databuf); 271 } 272 printf("\n"); 273 fflush(stdout); 274 275 if ((t_ret = dbc->close(dbc)) != 0 && ret == 0) 276 ret = t_ret; 277 278 switch (ret) { 279 case 0: 280 case DB_NOTFOUND: 281 case DB_LOCK_DEADLOCK: 282 return (0); 283 default: 284 return (ret); 285 } 286} 287 288/* Start checkpoint and log archive support threads. */ 289int 290start_support_threads(dbenv, sup_args, ckp_thr, lga_thr) 291 DB_ENV *dbenv; 292 supthr_args *sup_args; 293 thread_t *ckp_thr; 294 thread_t *lga_thr; 295{ 296 int ret; 297 298 ret = 0; 299 if ((ret = thread_create(ckp_thr, NULL, checkpoint_thread, 300 sup_args)) != 0) { 301 dbenv->errx(dbenv, "can't create checkpoint thread"); 302 goto err; 303 } 304 if ((ret = thread_create(lga_thr, NULL, log_archive_thread, 305 sup_args)) != 0) 306 dbenv->errx(dbenv, "can't create log archive thread"); 307err: 308 return (ret); 309 310} 311 312/* Wait for checkpoint and log archive support threads to finish. */ 313int 314finish_support_threads(ckp_thr, lga_thr) 315 thread_t *ckp_thr; 316 thread_t *lga_thr; 317{ 318 void *ctstatus, *ltstatus; 319 int ret; 320 321 ret = 0; 322 if (thread_join(*lga_thr, <status) || 323 thread_join(*ckp_thr, &ctstatus)) { 324 ret = -1; 325 goto err; 326 } 327 if ((uintptr_t)ltstatus != EXIT_SUCCESS || 328 (uintptr_t)ctstatus != EXIT_SUCCESS) 329 ret = -1; 330err: 331 return (ret); 332} 333 334#define BUFSIZE 1024 335 336int 337doloop(dbenv, shared_data) 338 DB_ENV *dbenv; 339 SHARED_DATA *shared_data; 340{ 341 DB *dbp; 342 DBT key, data; 343 char buf[BUFSIZE], *first, *price; 344 u_int32_t flags; 345 int ret; 346 347 dbp = NULL; 348 ret = 0; 349 memset(&key, 0, sizeof(key)); 350 memset(&data, 0, sizeof(data)); 351 352 for (;;) { 353 printf("QUOTESERVER%s> ", 354 shared_data->is_master ? "" : " (read-only)"); 355 fflush(stdout); 356 357 if (fgets(buf, sizeof(buf), stdin) == NULL) 358 break; 359 360#define DELIM " \t\n" 361 if ((first = strtok(&buf[0], DELIM)) == NULL) { 362 /* Blank input line. */ 363 price = NULL; 364 } else if ((price = strtok(NULL, DELIM)) == NULL) { 365 /* Just one input token. */ 366 if (strncmp(buf, "exit", 4) == 0 || 367 strncmp(buf, "quit", 4) == 0) { 368 /* 369 * This makes the checkpoint and log 370 * archive threads stop. 371 */ 372 shared_data->app_finished = 1; 373 break; 374 } 375 dbenv->errx(dbenv, "Format: TICKER VALUE"); 376 continue; 377 } else { 378 /* Normal two-token input line. */ 379 if (first != NULL && !shared_data->is_master) { 380 dbenv->errx(dbenv, "Can't update at client"); 381 continue; 382 } 383 } 384 385 if (dbp == NULL) { 386 if ((ret = db_create(&dbp, dbenv, 0)) != 0) 387 return (ret); 388 389 flags = DB_AUTO_COMMIT; 390 /* 391 * Open database with DB_CREATE only if this is 392 * a master database. A client database uses 393 * polling to attempt to open the database without 394 * DB_CREATE until it is successful. 395 * 396 * This DB_CREATE polling logic can be simplified 397 * under some circumstances. For example, if the 398 * application can be sure a database is already 399 * there, it would never need to open it with 400 * DB_CREATE. 401 */ 402 if (shared_data->is_master) 403 flags |= DB_CREATE; 404 if ((ret = dbp->open(dbp, 405 NULL, DATABASE, NULL, DB_BTREE, flags, 0)) != 0) { 406 if (ret == ENOENT) { 407 printf( 408 "No stock database yet available.\n"); 409 if ((ret = dbp->close(dbp, 0)) != 0) { 410 dbenv->err(dbenv, ret, 411 "DB->close"); 412 goto err; 413 } 414 dbp = NULL; 415 continue; 416 } 417 if (ret == DB_REP_HANDLE_DEAD || 418 ret == DB_LOCK_DEADLOCK) { 419 dbenv->err(dbenv, ret, 420 "please retry the operation"); 421 dbp->close(dbp, DB_NOSYNC); 422 dbp = NULL; 423 continue; 424 } 425 dbenv->err(dbenv, ret, "DB->open"); 426 goto err; 427 } 428 } 429 430 if (first == NULL) { 431 /* 432 * If this is a client in the middle of 433 * synchronizing with the master, the client data 434 * is possibly stale and won't be displayed until 435 * client synchronization is finished. It is also 436 * possible to display the stale data if this is 437 * acceptable to the application. 438 */ 439 if (shared_data->in_client_sync) 440 printf( 441"Cannot read data during client synchronization - please try again.\n"); 442 else 443 switch ((ret = print_stocks(dbp))) { 444 case 0: 445 break; 446 case DB_REP_HANDLE_DEAD: 447 (void)dbp->close(dbp, DB_NOSYNC); 448 dbp = NULL; 449 break; 450 default: 451 dbp->err(dbp, ret, 452 "Error traversing data"); 453 goto err; 454 } 455 } else { 456 key.data = first; 457 key.size = (u_int32_t)strlen(first); 458 459 data.data = price; 460 data.size = (u_int32_t)strlen(price); 461 462 if ((ret = dbp->put(dbp, 463 NULL, &key, &data, DB_AUTO_COMMIT)) != 0) { 464 dbp->err(dbp, ret, "DB->put"); 465 goto err; 466 } 467 } 468 } 469 470err: if (dbp != NULL) 471 (void)dbp->close(dbp, DB_NOSYNC); 472 return (ret); 473} 474 475int 476create_env(progname, dbenvp) 477 const char *progname; 478 DB_ENV **dbenvp; 479{ 480 DB_ENV *dbenv; 481 int ret; 482 483 if ((ret = db_env_create(&dbenv, 0)) != 0) { 484 fprintf(stderr, "can't create env handle: %s\n", 485 db_strerror(ret)); 486 return (ret); 487 } 488 489 dbenv->set_errfile(dbenv, stderr); 490 dbenv->set_errpfx(dbenv, progname); 491 492 *dbenvp = dbenv; 493 return (0); 494} 495 496/* Open and configure an environment. */ 497int 498env_init(dbenv, home) 499 DB_ENV *dbenv; 500 const char *home; 501{ 502 u_int32_t flags; 503 int ret; 504 505 (void)dbenv->set_cachesize(dbenv, 0, CACHESIZE, 0); 506 (void)dbenv->set_flags(dbenv, DB_TXN_NOSYNC, 1); 507 508 flags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | 509 DB_INIT_REP | DB_INIT_TXN | DB_RECOVER | DB_THREAD; 510 if ((ret = dbenv->open(dbenv, home, flags, 0)) != 0) 511 dbenv->err(dbenv, ret, "can't open environment"); 512 return (ret); 513} 514 515/* 516 * In this application, we specify all communication via the command line. In 517 * a real application, we would expect that information about the other sites 518 * in the system would be maintained in some sort of configuration file. The 519 * critical part of this interface is that we assume at startup that we can 520 * find out 521 * 1) what host/port we wish to listen on for connections, 522 * 2) a (possibly empty) list of other sites we should attempt to connect 523 * to; and 524 * 3) what our Berkeley DB home environment is. 525 * 526 * These pieces of information are expressed by the following flags. 527 * -a all|quorum (optional; repmgr only, a stands for ack policy) 528 * -b (optional, b stands for bulk) 529 * -C or -M start up as client or master (optional for repmgr, required 530 * for base example) 531 * -h home directory (required) 532 * -l host:port (required; l stands for local) 533 * -n nsites (optional; number of sites in replication group; defaults to 0 534 * in which case we try to dynamically compute the number of sites in 535 * the replication group) 536 * -p priority (optional: defaults to 100) 537 * -r host:port (optional; r stands for remote; any number of these may be 538 * specified) 539 * -R host:port (optional; repmgr only, remote peer) 540 * -v (optional; v stands for verbose) 541 */ 542void 543usage(is_repmgr, progname) 544 const int is_repmgr; 545 const char *progname; 546{ 547 fprintf(stderr, "usage: %s ", progname); 548 if (is_repmgr) 549 fprintf(stderr, "[-CM]-h home -l host:port[-r host:port]%s%s", 550 "[-R host:port][-a all|quorum][-b][-n nsites]", 551 "[-p priority][-v]\n"); 552 else 553 fprintf(stderr, "-CM -h home -l host:port[-r host:port]%s", 554 "[-b][-n nsites][-p priority][-v]\n"); 555 exit(EXIT_FAILURE); 556} 557 558/* 559 * This is a very simple thread that performs checkpoints at a fixed 560 * time interval. For a master site, the time interval is one minute 561 * plus the duration of the checkpoint_delay timeout (30 seconds by 562 * default.) For a client site, the time interval is one minute. 563 */ 564void * 565checkpoint_thread(args) 566 void *args; 567{ 568 DB_ENV *dbenv; 569 SHARED_DATA *shared; 570 supthr_args *ca; 571 int i, ret; 572 573 ca = (supthr_args *)args; 574 dbenv = ca->dbenv; 575 shared = ca->shared; 576 577 for (;;) { 578 /* 579 * Wait for one minute, polling once per second to see if 580 * application has finished. When application has finished, 581 * terminate this thread. 582 */ 583 for (i = 0; i < 60; i++) { 584 sleep(1); 585 if (shared->app_finished == 1) 586 return ((void *)EXIT_SUCCESS); 587 } 588 589 /* Perform a checkpoint. */ 590 if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) { 591 dbenv->err(dbenv, ret, 592 "Could not perform checkpoint.\n"); 593 return ((void *)EXIT_FAILURE); 594 } 595 } 596} 597 598/* 599 * This is a simple log archive thread. Once per minute, it removes all but 600 * the most recent 3 logs that are safe to remove according to a call to 601 * DB_ENV->log_archive(). 602 * 603 * Log cleanup is needed to conserve disk space, but aggressive log cleanup 604 * can cause more frequent client initializations if a client lags too far 605 * behind the current master. This can happen in the event of a slow client, 606 * a network partition, or a new master that has not kept as many logs as the 607 * previous master. 608 * 609 * The approach in this routine balances the need to mitigate against a 610 * lagging client by keeping a few more of the most recent unneeded logs 611 * with the need to conserve disk space by regularly cleaning up log files. 612 * Use of automatic log removal (DB_ENV->log_set_config() DB_LOG_AUTO_REMOVE 613 * flag) is not recommended for replication due to the risk of frequent 614 * client initializations. 615 */ 616void * 617log_archive_thread(args) 618 void *args; 619{ 620 DB_ENV *dbenv; 621 SHARED_DATA *shared; 622 char **begin, **list; 623 supthr_args *la; 624 int i, listlen, logs_to_keep, minlog, ret; 625 626 la = (supthr_args *)args; 627 dbenv = la->dbenv; 628 shared = la->shared; 629 logs_to_keep = 3; 630 631 for (;;) { 632 /* 633 * Wait for one minute, polling once per second to see if 634 * application has finished. When application has finished, 635 * terminate this thread. 636 */ 637 for (i = 0; i < 60; i++) { 638 sleep(1); 639 if (shared->app_finished == 1) 640 return ((void *)EXIT_SUCCESS); 641 } 642 643 /* Get the list of unneeded log files. */ 644 if ((ret = dbenv->log_archive(dbenv, &list, DB_ARCH_ABS)) 645 != 0) { 646 dbenv->err(dbenv, ret, 647 "Could not get log archive list."); 648 return ((void *)EXIT_FAILURE); 649 } 650 if (list != NULL) { 651 listlen = 0; 652 /* Get the number of logs in the list. */ 653 for (begin = list; *begin != NULL; begin++, listlen++); 654 /* 655 * Remove all but the logs_to_keep most recent 656 * unneeded log files. 657 */ 658 minlog = listlen - logs_to_keep; 659 for (begin = list, i= 0; i < minlog; list++, i++) { 660 if ((ret = unlink(*list)) != 0) { 661 dbenv->err(dbenv, ret, 662 "logclean: remove %s", *list); 663 dbenv->errx(dbenv, 664 "logclean: Error remove %s", *list); 665 free(begin); 666 return ((void *)EXIT_FAILURE); 667 } 668 } 669 free(begin); 670 } 671 } 672} 673