1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9/* 10 * In this application, we specify all communication via the command line. In 11 * a real application, we would expect that information about the other sites 12 * in the system would be maintained in some sort of configuration file. The 13 * critical part of this interface is that we assume at startup that we can 14 * find out 15 * 1) what our Berkeley DB home environment is, 16 * 2) what host/port we wish to listen on for connections; and 17 * 3) an optional list of other sites we should attempt to connect to. 18 * 19 * These pieces of information are expressed by the following flags. 20 * -h home (required; h stands for home directory) 21 * -l host:port (required; l stands for local) 22 * -C or -M (optional; start up as client or master) 23 * -r host:port (optional; r stands for remote; any number of these may be 24 * specified) 25 * -R host:port (optional; R stands for remote peer; only one of these may 26 * be specified) 27 * -a all|quorum (optional; a stands for ack policy) 28 * -b (optional; b stands for bulk) 29 * -n nsites (optional; number of sites in replication group; defaults to 0 30 * to try to dynamically compute nsites) 31 * -p priority (optional; defaults to 100) 32 * -v (optional; v stands for verbose) 33 */ 34 35#include <iostream> 36#include <string> 37#include <sstream> 38 39#include <db_cxx.h> 40#include "StlRepConfigInfo.h" 41#include "dbstl_map.h" 42 43using std::cout; 44using std::cin; 45using std::cerr; 46using std::endl; 47using std::flush; 48using std::istream; 49using std::istringstream; 50using std::string; 51using std::getline; 52using namespace dbstl; 53#define CACHESIZE (10 * 1024 * 1024) 54#define DATABASE "quote.db" 55 56const char *progname = "exstl_repquote"; 57 58#include <errno.h> 59#ifdef _WIN32 60#define WIN32_LEAN_AND_MEAN 61#include <windows.h> 62#define snprintf _snprintf 63#define sleep(s) Sleep(1000 * (s)) 64 65extern "C" { 66extern int getopt(int, char * const *, const char *); 67extern char *optarg; 68extern int optind; 69} 70 71typedef HANDLE thread_t; 72typedef DWORD thread_exit_status_t; 73#define thread_create(thrp, attr, func, arg) \ 74 (((*(thrp) = CreateThread(NULL, 0, \ 75 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 76#define thread_join(thr, statusp) \ 77 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 78 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) 79#else /* !_WIN32 */ 80#include <pthread.h> 81 82typedef pthread_t thread_t; 83typedef void* thread_exit_status_t; 84#define thread_create(thrp, attr, func, arg) \ 85 pthread_create((thrp), (attr), (func), (arg)) 86#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 87#endif 88 89// Struct used to store information in Db app_private field. 90typedef struct { 91 bool app_finished; 92 bool in_client_sync; 93 bool is_master; 94} APP_DATA; 95 96static void log(const char *); 97void *checkpoint_thread (void *); 98void *log_archive_thread (void *); 99 100class RepQuoteExample 101{ 102public: 103 typedef db_map<char *, char *, ElementHolder<char *> > str_map_t; 104 RepQuoteExample(); 105 void init(RepConfigInfo* config); 106 void doloop(); 107 int terminate(); 108 109 static void event_callback(DbEnv * dbenv, u_int32_t which, void *info); 110 111private: 112 // disable copy constructor. 113 RepQuoteExample(const RepQuoteExample &); 114 void operator = (const RepQuoteExample &); 115 116 // internal data members. 117 APP_DATA app_data; 118 RepConfigInfo *app_config; 119 DbEnv *cur_env; 120 Db *dbp; 121 str_map_t *strmap; 122 thread_t ckp_thr; 123 thread_t lga_thr; 124 125 // private methods. 126 void print_stocks(); 127 void prompt(); 128 bool open_db(bool creating); 129 void close_db(){ 130 delete strmap; 131 strmap = NULL; 132 dbstl::close_db(dbp); 133 dbp = NULL; 134 } 135 static void close_db(Db *&);// Close an unregistered Db handle. 136}; 137 138bool RepQuoteExample::open_db(bool creating) 139{ 140 int ret; 141 142 if (dbp) 143 return true; 144 145 dbp = new Db(cur_env, DB_CXX_NO_EXCEPTIONS); 146 147 u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD; 148 if (creating) 149 flags |= DB_CREATE; 150 151 ret = dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0); 152 switch (ret) { 153 case 0: 154 register_db(dbp); 155 if (strmap) 156 delete strmap; 157 strmap = new str_map_t(dbp, cur_env); 158 return (true); 159 case DB_LOCK_DEADLOCK: // Fall through 160 case DB_REP_HANDLE_DEAD: 161 log("\nFailed to open stock db."); 162 break; 163 default: 164 if (ret == DB_REP_LOCKOUT) 165 break; // Fall through 166 else if (ret == ENOENT && !creating) 167 log("\nStock DB does not yet exist\n"); 168 else { 169 DbException ex(ret); 170 throw ex; 171 } 172 } // switch 173 174 // (All retryable errors fall through to here.) 175 // 176 log("\nPlease retry the operation"); 177 close_db(dbp); 178 return (false); 179} 180 181void RepQuoteExample::close_db(Db *&dbp) 182{ 183 if (dbp) { 184 try { 185 dbp->close(0); 186 delete dbp; 187 dbp = 0; 188 } catch (...) { 189 delete dbp; 190 dbp = 0; 191 throw; 192 } 193 } 194 195} 196 197RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(NULL) { 198 app_data.app_finished = 0; 199 app_data.in_client_sync = 0; 200 app_data.is_master = 0; // assume I start out as client 201 cur_env = new DbEnv(DB_CXX_NO_EXCEPTIONS); 202 strmap = NULL; 203 dbp = NULL; 204} 205 206void RepQuoteExample::init(RepConfigInfo *config) { 207 app_config = config; 208 209 cur_env->set_app_private(&app_data); 210 cur_env->set_errfile(stderr); 211 cur_env->set_errpfx(progname); 212 cur_env->set_event_notify(event_callback); 213 214 // Configure bulk transfer to send groups of records to clients 215 // in a single network transfer. This is useful for master sites 216 // and clients participating in client-to-client synchronization. 217 // 218 if (app_config->bulk) 219 cur_env->rep_set_config(DB_REP_CONF_BULK, 1); 220 221 222 // Set the total number of sites in the replication group. 223 // This is used by repmgr internal election processing. 224 // 225 if (app_config->totalsites > 0) 226 cur_env->rep_set_nsites(app_config->totalsites); 227 228 // Turn on debugging and informational output if requested. 229 if (app_config->verbose) 230 cur_env->set_verbose(DB_VERB_REPLICATION, 1); 231 232 // Set replication group election priority for this environment. 233 // An election first selects the site with the most recent log 234 // records as the new master. If multiple sites have the most 235 // recent log records, the site with the highest priority value 236 // is selected as master. 237 // 238 cur_env->rep_set_priority(app_config->priority); 239 240 // Set the policy that determines how master and client sites 241 // handle acknowledgement of replication messages needed for 242 // permanent records. The default policy of "quorum" requires only 243 // a quorum of electable peers sufficient to ensure a permanent 244 // record remains durable if an election is held. The "all" option 245 // requires all clients to acknowledge a permanent replication 246 // message instead. 247 // 248 cur_env->repmgr_set_ack_policy(app_config->ack_policy); 249 250 // Set the threshold for the minimum and maximum time the client 251 // waits before requesting retransmission of a missing message. 252 // Base these values on the performance and load characteristics 253 // of the master and client host platforms as well as the round 254 // trip message time. 255 // 256 cur_env->rep_set_request(20000, 500000); 257 258 // Configure deadlock detection to ensure that any deadlocks 259 // are broken by having one of the conflicting lock requests 260 // rejected. DB_LOCK_DEFAULT uses the lock policy specified 261 // at environment creation time or DB_LOCK_RANDOM if none was 262 // specified. 263 // 264 cur_env->set_lk_detect(DB_LOCK_DEFAULT); 265 266 // The following base replication features may also be useful to your 267 // application. See Berkeley DB documentation for more details. 268 // - Master leases: Provide stricter consistency for data reads 269 // on a master site. 270 // - Timeouts: Customize the amount of time Berkeley DB waits 271 // for such things as an election to be concluded or a master 272 // lease to be granted. 273 // - Delayed client synchronization: Manage the master site's 274 // resources by spreading out resource-intensive client 275 // synchronizations. 276 // - Blocked client operations: Return immediately with an error 277 // instead of waiting indefinitely if a client operation is 278 // blocked by an ongoing client synchronization. 279 280 cur_env->repmgr_set_local_site(app_config->this_host.host, 281 app_config->this_host.port, 0); 282 283 for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL; 284 cur = cur->next) { 285 cur_env->repmgr_add_remote_site(cur->host, cur->port, 286 NULL, cur->peer ? DB_REPMGR_PEER : 0); 287 } 288 289 // Configure heartbeat timeouts so that repmgr monitors the 290 // health of the TCP connection. Master sites broadcast a heartbeat 291 // at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout. 292 // Client sites wait for message activity the length of the 293 // DB_REP_HEARTBEAT_MONITOR timeout before concluding that the 294 // connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR 295 // timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout. 296 // 297 cur_env->rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000); 298 cur_env->rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000); 299 300 // The following repmgr features may also be useful to your 301 // application. See Berkeley DB documentation for more details. 302 // - Two-site strict majority rule - In a two-site replication 303 // group, require both sites to be available to elect a new 304 // master. 305 // - Timeouts - Customize the amount of time repmgr waits 306 // for such things as waiting for acknowledgements or attempting 307 // to reconnect to other sites. 308 // - Site list - return a list of sites currently known to repmgr. 309 310 // We can now open our environment, although we're not ready to 311 // begin replicating. However, we want to have a dbenv around 312 // so that we can send it into any of our message handlers. 313 cur_env->set_cachesize(0, CACHESIZE, 0); 314 cur_env->set_flags(DB_TXN_NOSYNC, 1); 315 316 cur_env->open(app_config->home, DB_CREATE | DB_RECOVER | 317 DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG | 318 DB_INIT_MPOOL | DB_INIT_TXN, 0); 319 320 // Start checkpoint and log archive support threads. 321 (void)thread_create(&ckp_thr, NULL, checkpoint_thread, cur_env); 322 (void)thread_create(&lga_thr, NULL, log_archive_thread, cur_env); 323 324 dbstl::register_db_env(cur_env); 325 cur_env->repmgr_start(3, app_config->start_policy); 326} 327 328int RepQuoteExample::terminate() { 329 try { 330 // Wait for checkpoint and log archive threads to finish. 331 // Windows does not allow NULL pointer for exit code variable. 332 thread_exit_status_t exstat; 333 334 (void)thread_join(lga_thr, &exstat); 335 (void)thread_join(ckp_thr, &exstat); 336 337 // We have used the DB_TXN_NOSYNC environment flag for 338 // improved performance without the usual sacrifice of 339 // transactional durability, as discussed in the 340 // "Transactional guarantees" page of the Reference 341 // Guide: if one replication site crashes, we can 342 // expect the data to exist at another site. However, 343 // in case we shut down all sites gracefully, we push 344 // out the end of the log here so that the most 345 // recent transactions don't mysteriously disappear. 346 cur_env->log_flush(NULL); 347 } catch (DbException dbe) { 348 cout << "\nerror closing environment: " << dbe.what() << endl; 349 } 350 return 0; 351} 352 353void RepQuoteExample::prompt() { 354 cout << "QUOTESERVER"; 355 if (!app_data.is_master) 356 cout << "(read-only)"; 357 cout << "> " << flush; 358} 359 360void log(const char *msg) { 361 cerr << msg << endl; 362} 363 364// Simple command-line user interface: 365// - enter "<stock symbol> <price>" to insert or update a record in the 366// database; 367// - just press Return (i.e., blank input line) to print out the contents of 368// the database; 369// - enter "quit" or "exit" to quit. 370// 371void RepQuoteExample::doloop() { 372 string input; 373 374 while (prompt(), getline(cin, input)) { 375 istringstream is(input); 376 string token1, token2; 377 378 // Read 0, 1 or 2 tokens from the input. 379 // 380 int count = 0; 381 if (is >> token1) { 382 count++; 383 if (is >> token2) 384 count++; 385 } 386 387 if (count == 1) { 388 if (token1 == "exit" || token1 == "quit") { 389 app_data.app_finished = 1; 390 break; 391 } else { 392 log("\nFormat: <stock> <price>\n"); 393 continue; 394 } 395 } 396 397 // Here we know count is either 0 or 2, so we're about to try a 398 // DB operation. 399 // 400 // Open database with DB_CREATE only if this is a master 401 // database. A client database uses polling to attempt 402 // to open the database without DB_CREATE until it is 403 // successful. 404 // 405 // This DB_CREATE polling logic can be simplified under 406 // some circumstances. For example, if the application can 407 // be sure a database is already there, it would never need 408 // to open it with DB_CREATE. 409 // 410 if (!open_db(app_data.is_master)) 411 continue; 412 413 try { 414 if (count == 0) 415 if (app_data.in_client_sync) 416 log( 417 "Cannot read data during client initialization - please try again."); 418 else 419 print_stocks(); 420 else if (!app_data.is_master) 421 log("\nCan't update at client\n"); 422 else { 423 char *symbol = new char[token1.length() + 1]; 424 strcpy(symbol, token1.c_str()); 425 char *price = new char[token2.length() + 1]; 426 strcpy(price, token2.c_str()); 427 begin_txn(0, cur_env); 428 strmap->insert(make_pair(symbol, price)); 429 commit_txn(cur_env); 430 delete symbol; 431 delete price; 432 } 433 } catch (DbDeadlockException e) { 434 log("\nplease retry the operation\n"); 435 close_db(); 436 } catch (DbRepHandleDeadException e) { 437 log("\nplease retry the operation\n"); 438 close_db(); 439 } catch (DbException e) { 440 if (e.get_errno() == DB_REP_LOCKOUT) { 441 log("\nplease retry the operation\n"); 442 close_db(); 443 } else 444 throw; 445 } 446 447 } 448 449 close_db(); 450} 451 452void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info) 453{ 454 APP_DATA *app = (APP_DATA*)dbenv->get_app_private(); 455 456 info = NULL; /* Currently unused. */ 457 458 switch (which) { 459 case DB_EVENT_REP_MASTER: 460 app->in_client_sync = 0; 461 app->is_master = 1; 462 break; 463 464 case DB_EVENT_REP_CLIENT: 465 app->is_master = 0; 466 app->in_client_sync = 1; 467 break; 468 469 case DB_EVENT_REP_STARTUPDONE: 470 app->in_client_sync = 0; 471 break; 472 case DB_EVENT_REP_NEWMASTER: 473 app->in_client_sync = 1; 474 break; 475 476 case DB_EVENT_REP_PERM_FAILED: 477 // Did not get enough acks to guarantee transaction 478 // durability based on the configured ack policy. This 479 // transaction will be flushed to the master site's 480 // local disk storage for durability. 481 // 482 log( 483 "Insufficient acknowledgements to guarantee transaction durability."); 484 break; 485 486 default: 487 dbenv->errx("\nignoring event %d", which); 488 } 489} 490 491void RepQuoteExample::print_stocks() { 492#define MAXKEYSIZE 10 493#define MAXDATASIZE 20 494 495 cout << "\tSymbol\tPrice" << endl 496 << "\t======\t=====" << endl; 497 str_map_t::iterator itr; 498 if (strmap == NULL) 499 strmap = new str_map_t(dbp, cur_env); 500 begin_txn(0, cur_env); 501 for (itr = strmap->begin(); itr != strmap->end(); ++itr) 502 cout<<"\t"<<itr->first<<"\t"<<itr->second<<endl; 503 commit_txn(cur_env); 504 cout << endl << flush; 505} 506 507static void usage() { 508 cerr << "usage: " << progname << endl 509 << "[-h home][-o host:port][-m host:port][-f host:port]" 510 << "[-n nsites][-p priority]" << endl; 511 512 cerr << "\t -h home (required; h stands for home directory)" << endl 513 << "\t -l host:port (required; l stands for local)" << endl 514 << "\t -C or -M (optional; start up as client or master)" << endl 515 << "\t -r host:port (optional; r stands for remote; any " 516 << "number of these" << endl 517 << "\t may be specified)" << endl 518 << "\t -R host:port (optional; R stands for remote peer; only " 519 << "one of" << endl 520 << "\t these may be specified)" << endl 521 << "\t -a all|quorum (optional; a stands for ack policy)" << endl 522 << "\t -b (optional; b stands for bulk)" << endl 523 << "\t -n nsites (optional; number of sites in replication " 524 << "group; defaults " << endl 525 << "\t to 0 to try to dynamically compute nsites)" << endl 526 << "\t -p priority (optional; defaults to 100)" << endl 527 << "\t -v (optional; v stands for verbose)" << endl; 528 529 exit(EXIT_FAILURE); 530} 531 532int main(int argc, char **argv) { 533 RepConfigInfo config; 534 char ch, *portstr, *tmphost; 535 int tmpport; 536 bool tmppeer; 537 538 // Extract the command line parameters 539 while ((ch = getopt(argc, argv, "a:bCh:l:Mn:p:R:r:v")) != EOF) { 540 tmppeer = false; 541 switch (ch) { 542 case 'a': 543 if (strncmp(optarg, "all", 3) == 0) 544 config.ack_policy = DB_REPMGR_ACKS_ALL; 545 else if (strncmp(optarg, "quorum", 6) != 0) 546 usage(); 547 break; 548 case 'b': 549 config.bulk = true; 550 break; 551 case 'C': 552 config.start_policy = DB_REP_CLIENT; 553 break; 554 case 'h': 555 config.home = optarg; 556 break; 557 case 'l': 558 config.this_host.host = strtok(optarg, ":"); 559 if ((portstr = strtok(NULL, ":")) == NULL) { 560 cerr << "\nBad host specification." << endl; 561 usage(); 562 } 563 config.this_host.port = (unsigned short)atoi(portstr); 564 config.got_listen_address = true; 565 break; 566 case 'M': 567 config.start_policy = DB_REP_MASTER; 568 break; 569 case 'n': 570 config.totalsites = atoi(optarg); 571 break; 572 case 'p': 573 config.priority = atoi(optarg); 574 break; 575 case 'R': 576 tmppeer = true; // FALLTHROUGH 577 case 'r': 578 tmphost = strtok(optarg, ":"); 579 if ((portstr = strtok(NULL, ":")) == NULL) { 580 cerr << "Bad host specification." << endl; 581 usage(); 582 } 583 tmpport = (unsigned short)atoi(portstr); 584 585 config.addOtherHost(tmphost, tmpport, tmppeer); 586 587 break; 588 case 'v': 589 config.verbose = true; 590 break; 591 case '?': 592 default: 593 usage(); 594 } 595 } 596 597 // Error check command line. 598 if ((!config.got_listen_address) || config.home == NULL) 599 usage(); 600 601 RepQuoteExample runner; 602 try { 603 runner.init(&config); 604 runner.doloop(); 605 } catch (DbException dbe) { 606 cerr << "\nCaught an exception during initialization or" 607 << " processing: " << dbe.what() << endl; 608 } 609 runner.terminate(); 610 return 0; 611} 612 613// This is a very simple thread that performs checkpoints at a fixed 614// time interval. For a master site, the time interval is one minute 615// plus the duration of the checkpoint_delay timeout (30 seconds by 616// default.) For a client site, the time interval is one minute. 617// 618void *checkpoint_thread(void *args) 619{ 620 DbEnv *env; 621 APP_DATA *app; 622 int i, ret; 623 624 env = (DbEnv *)args; 625 app = (APP_DATA *)env->get_app_private(); 626 627 for (;;) { 628 // Wait for one minute, polling once per second to see if 629 // application has finished. When application has finished, 630 // terminate this thread. 631 // 632 for (i = 0; i < 60; i++) { 633 sleep(1); 634 if (app->app_finished == 1) 635 return ((void *)EXIT_SUCCESS); 636 } 637 638 // Perform a checkpoint. 639 if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) { 640 env->err(ret, "Could not perform checkpoint.\n"); 641 return ((void *)EXIT_FAILURE); 642 } 643 } 644} 645 646// This is a simple log archive thread. Once per minute, it removes all but 647// the most recent 3 logs that are safe to remove according to a call to 648// DBENV->log_archive(). 649// 650// Log cleanup is needed to conserve disk space, but aggressive log cleanup 651// can cause more frequent client initializations if a client lags too far 652// behind the current master. This can happen in the event of a slow client, 653// a network partition, or a new master that has not kept as many logs as the 654// previous master. 655// 656// The approach in this routine balances the need to mitigate against a 657// lagging client by keeping a few more of the most recent unneeded logs 658// with the need to conserve disk space by regularly cleaning up log files. 659// Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE 660// flag) is not recommended for replication due to the risk of frequent 661// client initializations. 662// 663void *log_archive_thread(void *args) 664{ 665 DbEnv *env; 666 APP_DATA *app; 667 char **begin, **list; 668 int i, listlen, logs_to_keep, minlog, ret; 669 670 env = (DbEnv *)args; 671 app = (APP_DATA *)env->get_app_private(); 672 logs_to_keep = 3; 673 674 for (;;) { 675 // Wait for one minute, polling once per second to see if 676 // application has finished. When application has finished, 677 // terminate this thread. 678 // 679 for (i = 0; i < 60; i++) { 680 sleep(1); 681 if (app->app_finished == 1) 682 return ((void *)EXIT_SUCCESS); 683 } 684 685 // Get the list of unneeded log files. 686 if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) { 687 env->err(ret, "Could not get log archive list."); 688 return ((void *)EXIT_FAILURE); 689 } 690 if (list != NULL) { 691 listlen = 0; 692 // Get the number of logs in the list. 693 for (begin = list; *begin != NULL; begin++, listlen++); 694 // Remove all but the logs_to_keep most recent 695 // unneeded log files. 696 // 697 minlog = listlen - logs_to_keep; 698 for (begin = list, i= 0; i < minlog; list++, i++) { 699 if ((ret = unlink(*list)) != 0) { 700 env->err(ret, 701 "logclean: remove %s", *list); 702 env->errx( 703 "logclean: Error remove %s", *list); 704 free(begin); 705 return ((void *)EXIT_FAILURE); 706 } 707 } 708 free(begin); 709 } 710 } 711} 712 713