1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1997-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9package db.repquote; 10 11import java.io.FileNotFoundException; 12import java.io.BufferedReader; 13import java.io.InputStreamReader; 14import java.io.IOException; 15import java.lang.Thread; 16import java.lang.InterruptedException; 17 18import com.sleepycat.db.*; 19import db.repquote.RepConfig; 20 21/** 22 * RepQuoteExample is a simple but complete demonstration of a replicated 23 * application. The application is a mock stock ticker. The master accepts a 24 * stock symbol and an numerical value as input, and stores this information 25 * into a replicated database; either master or clients can display the 26 * contents of the database. 27 * <p> 28 * The options to start a given replication node are: 29 * <pre> 30 * -h home (required; h stands for home directory) 31 * -l host:port (required; l stands for local) 32 * -C or M (optional; start up as client or master) 33 * -r host:port (optional; r stands for remote; any number of these may 34 * be specified) 35 * -R host:port (optional; R stands for remote peer; only one of these may 36 * be specified) 37 * -a all|quorum (optional; a stands for ack policy) 38 * -b (optional; b stands for bulk) 39 * -n nsites (optional; number of sites in replication group; defaults to 0 40 * to try to dynamically compute nsites) 41 * -p priority (optional; defaults to 100) 42 * -v (optional; v stands for verbose) 43 * </pre> 44 * <p> 45 * A typical session begins with a command such as the following to start a 46 * master: 47 * 48 * <pre> 49 * java db.repquote.RepQuoteExample -M -h dir1 -l localhost:6000 50 * </pre> 51 * 52 * and several clients: 53 * 54 * <pre> 55 * java db.repquote.RepQuoteExample -C -h dir2 56 * -l localhost:6001 -r localhost:6000 57 * java db.repquote.RepQuoteExample -C -h dir3 58 * -l localhost:6002 -r localhost:6000 59 * java db.repquote.RepQuoteExample -C -h dir4 60 * -l localhost:6003 -r localhost:6000 61 * </pre> 62 * 63 * <p> 64 * Each process is a member of a DB replication group. The sample application 65 * expects the following commands to stdin: 66 * <ul> 67 * <li>NEWLINE -- print all the stocks held in the database</li> 68 * <li>quit -- shutdown this node</li> 69 * <li>exit -- shutdown this node</li> 70 * <li>stock_symbol number -- enter this stock and number into the 71 * database</li> 72 * </ul> 73 */ 74 75public class RepQuoteExample 76{ 77 private RepConfig appConfig; 78 private RepQuoteEnvironment dbenv; 79 private CheckpointThread ckpThr; 80 private LogArchiveThread lgaThr; 81 82 public static void usage() 83 { 84 System.err.println("usage: " + RepConfig.progname + 85 " -h home -l host:port [-CM][-r host:port][-R host:port]\n" + 86 " [-a all|quorum][-b][-n nsites][-p priority][-v]"); 87 88 System.err.println( 89 "\t -h home (required; h stands for home directory)\n" + 90 "\t -l host:port (required; l stands for local)\n" + 91 "\t -C or -M (optional; start up as client or master)\n" + 92 "\t -r host:port (optional; r stands for remote; any number " + 93 "of these\n" + 94 "\t may be specified)\n" + 95 "\t -R host:port (optional; R stands for remote peer; only " + 96 "one of\n" + 97 "\t these may be specified)\n" + 98 "\t -a all|quorum (optional; a stands for ack policy)\n" + 99 "\t -b (optional; b stands for bulk)\n" + 100 "\t -n nsites (optional; number of sites in replication " + 101 "group; defaults\n" + 102 "\t to 0 to try to dynamically compute nsites)\n" + 103 "\t -p priority (optional; defaults to 100)\n" + 104 "\t -v (optional; v stands for verbose)\n"); 105 106 System.exit(1); 107 } 108 109 public static void main(String[] argv) 110 throws Exception 111 { 112 RepConfig config = new RepConfig(); 113 boolean isPeer; 114 String tmpHost; 115 int tmpPort = 0; 116 117 /* Extract the command line parameters. */ 118 for (int i = 0; i < argv.length; i++) 119 { 120 isPeer = false; 121 if (argv[i].compareTo("-a") == 0) { 122 if (i == argv.length - 1) 123 usage(); 124 i++; 125 if (argv[i].equals("all")) 126 config.ackPolicy = ReplicationManagerAckPolicy.ALL; 127 else if (!argv[i].equals("quorum")) 128 usage(); 129 } else if (argv[i].compareTo("-b") == 0) 130 config.bulk = true; 131 else if (argv[i].compareTo("-C") == 0) { 132 config.startPolicy = ReplicationManagerStartPolicy.REP_CLIENT; 133 } else if (argv[i].compareTo("-h") == 0) { 134 if (i == argv.length - 1) 135 usage(); 136 /* home - a string arg. */ 137 i++; 138 config.home = argv[i]; 139 } else if (argv[i].compareTo("-l") == 0) { 140 if (i == argv.length - 1) 141 usage(); 142 /* "local" should be host:port. */ 143 i++; 144 String[] words = argv[i].split(":"); 145 if (words.length != 2) { 146 System.err.println( 147 "Invalid host specification host:port needed."); 148 usage(); 149 } 150 try { 151 tmpPort = Integer.parseInt(words[1]); 152 } catch (NumberFormatException nfe) { 153 System.err.println("Invalid host specification, " + 154 "could not parse port number."); 155 usage(); 156 } 157 config.setThisHost(words[0], tmpPort); 158 } else if (argv[i].compareTo("-M") == 0) { 159 config.startPolicy = ReplicationManagerStartPolicy.REP_MASTER; 160 } else if (argv[i].compareTo("-n") == 0) { 161 if (i == argv.length - 1) 162 usage(); 163 i++; 164 config.totalSites = Integer.parseInt(argv[i]); 165 } else if (argv[i].compareTo("-p") == 0) { 166 if (i == argv.length - 1) 167 usage(); 168 i++; 169 config.priority = Integer.parseInt(argv[i]); 170 } else if (argv[i].compareTo("-R") == 0 || 171 argv[i].compareTo("-r") == 0) { 172 if (i == argv.length - 1) 173 usage(); 174 if (argv[i].equals("-R")) 175 isPeer = true; 176 i++; 177 String[] words = argv[i].split(":"); 178 if (words.length != 2) { 179 System.err.println( 180 "Invalid host specification host:port needed."); 181 usage(); 182 } 183 try { 184 tmpPort = Integer.parseInt(words[1]); 185 } catch (NumberFormatException nfe) { 186 System.err.println("Invalid host specification, " + 187 "could not parse port number."); 188 usage(); 189 } 190 config.addOtherHost(words[0], tmpPort, isPeer); 191 } else if (argv[i].compareTo("-v") == 0) { 192 config.verbose = true; 193 } else { 194 System.err.println("Unrecognized option: " + argv[i]); 195 usage(); 196 } 197 198 } 199 200 /* Error check command line. */ 201 if ((!config.gotListenAddress()) || config.home.length() == 0) 202 usage(); 203 204 RepQuoteExample runner = null; 205 try { 206 runner = new RepQuoteExample(); 207 runner.init(config); 208 209 /* Sleep to give ourselves time to find a master. */ 210 //try { 211 // Thread.sleep(5000); 212 //} catch (InterruptedException e) {} 213 214 runner.doloop(); 215 runner.terminate(); 216 } catch (DatabaseException dbe) { 217 System.err.println("Caught an exception during " + 218 "initialization or processing: " + dbe); 219 if (runner != null) 220 runner.terminate(); 221 } 222 } /* End main. */ 223 224 public RepQuoteExample() 225 throws DatabaseException 226 { 227 appConfig = null; 228 dbenv = null; 229 } 230 231 public int init(RepConfig config) 232 throws DatabaseException 233 { 234 int ret = 0; 235 appConfig = config; 236 EnvironmentConfig envConfig = new EnvironmentConfig(); 237 envConfig.setErrorStream(System.err); 238 envConfig.setErrorPrefix(RepConfig.progname); 239 240 envConfig.setReplicationManagerLocalSite(appConfig.getThisHost()); 241 for (RepRemoteHost host = appConfig.getFirstOtherHost(); 242 host != null; host = appConfig.getNextOtherHost()){ 243 envConfig.replicationManagerAddRemoteSite( 244 host.getAddress(), host.isPeer()); 245 } 246 if (appConfig.totalSites > 0) 247 envConfig.setReplicationNumSites(appConfig.totalSites); 248 249 /* 250 * Set replication group election priority for this environment. 251 * An election first selects the site with the most recent log 252 * records as the new master. If multiple sites have the most 253 * recent log records, the site with the highest priority value 254 * is selected as master. 255 */ 256 envConfig.setReplicationPriority(appConfig.priority); 257 258 envConfig.setCacheSize(RepConfig.CACHESIZE); 259 envConfig.setTxnNoSync(true); 260 261 envConfig.setEventHandler(new RepQuoteEventHandler()); 262 263 /* 264 * Set the policy that determines how master and client sites 265 * handle acknowledgement of replication messages needed for 266 * permanent records. The default policy of "quorum" requires only 267 * a quorum of electable peers sufficient to ensure a permanent 268 * record remains durable if an election is held. The "all" option 269 * requires all clients to acknowledge a permanent replication 270 * message instead. 271 */ 272 envConfig.setReplicationManagerAckPolicy(appConfig.ackPolicy); 273 274 /* 275 * Set the threshold for the minimum and maximum time the client 276 * waits before requesting retransmission of a missing message. 277 * Base these values on the performance and load characteristics 278 * of the master and client host platforms as well as the round 279 * trip message time. 280 */ 281 envConfig.setReplicationRequestMin(20000); 282 envConfig.setReplicationRequestMax(500000); 283 284 /* 285 * Configure deadlock detection to ensure that any deadlocks 286 * are broken by having one of the conflicting lock requests 287 * rejected. DB_LOCK_DEFAULT uses the lock policy specified 288 * at environment creation time or DB_LOCK_RANDOM if none was 289 * specified. 290 */ 291 envConfig.setLockDetectMode(LockDetectMode.DEFAULT); 292 293 envConfig.setAllowCreate(true); 294 envConfig.setRunRecovery(true); 295 envConfig.setThreaded(true); 296 envConfig.setInitializeReplication(true); 297 envConfig.setInitializeLocking(true); 298 envConfig.setInitializeLogging(true); 299 envConfig.setInitializeCache(true); 300 envConfig.setTransactional(true); 301 envConfig.setVerboseReplication(appConfig.verbose); 302 try { 303 dbenv = new RepQuoteEnvironment(appConfig.getHome(), envConfig); 304 } catch(FileNotFoundException e) { 305 System.err.println("FileNotFound exception: " + e); 306 System.err.println( 307 "Ensure that the environment directory is pre-created."); 308 ret = 1; 309 } 310 311 if (appConfig.bulk) 312 dbenv.setReplicationConfig(ReplicationConfig.BULK, true); 313 314 /* 315 * Configure heartbeat timeouts so that repmgr monitors the 316 * health of the TCP connection. Master sites broadcast a heartbeat 317 * at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout. 318 * Client sites wait for message activity the length of the 319 * DB_REP_HEARTBEAT_MONITOR timeout before concluding that the 320 * connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR 321 * timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout. 322 */ 323 dbenv.setReplicationTimeout(ReplicationTimeoutType.HEARTBEAT_SEND, 324 5000000); 325 dbenv.setReplicationTimeout(ReplicationTimeoutType.HEARTBEAT_MONITOR, 326 10000000); 327 328 /* The following base replication features may also be useful to your 329 * application. See Berkeley DB documentation for more details. 330 * - Master leases: Provide stricter consistency for data reads 331 * on a master site. 332 * - Timeouts: Customize the amount of time Berkeley DB waits 333 * for such things as an election to be concluded or a master 334 * lease to be granted. 335 * - Delayed client synchronization: Manage the master site's 336 * resources by spreading out resource-intensive client 337 * synchronizations. 338 * - Blocked client operations: Return immediately with an error 339 * instead of waiting indefinitely if a client operation is 340 * blocked by an ongoing client synchronization. 341 * 342 * The following repmgr features may also be useful to your 343 * application. See Berkeley DB documentation for more details. 344 * - Two-site strict majority rule - In a two-site replication 345 * group, require both sites to be available to elect a new 346 * master. 347 * - Timeouts - Customize the amount of time repmgr waits 348 * for such things as waiting for acknowledgements or attempting 349 * to reconnect to other sites. 350 * - Site list - return a list of sites currently known to repmgr. 351 */ 352 353 /* Start checkpoint and log archive support threads. */ 354 ckpThr = new CheckpointThread(dbenv); 355 ckpThr.start(); 356 lgaThr = new LogArchiveThread(dbenv, envConfig); 357 lgaThr.start(); 358 359 /* Start replication manager. */ 360 dbenv.replicationManagerStart(3, appConfig.startPolicy); 361 362 return ret; 363 } 364 365 public int doloop() 366 throws DatabaseException 367 { 368 Database db = null; 369 370 for (;;) 371 { 372 if (db == null) { 373 DatabaseConfig dbconf = new DatabaseConfig(); 374 dbconf.setType(DatabaseType.BTREE); 375 if (dbenv.getIsMaster()) { 376 /* 377 * Open database allowing create only if this is a master 378 * database. A client database uses polling to attempt 379 * to open the database without allowing create until 380 * it is successful. 381 * 382 * This polling logic for allowing create can be 383 * simplified under some circumstances. For example, if 384 * the application can be sure a database is already 385 * there, it would never need to open it allowing create. 386 */ 387 dbconf.setAllowCreate(true); 388 } 389 dbconf.setTransactional(true); 390 391 try { 392 db = dbenv.openDatabase 393 (null, RepConfig.progname, null, dbconf); 394 } catch (java.io.FileNotFoundException e) { 395 System.err.println("no stock database available yet."); 396 if (db != null) { 397 db.close(true); 398 db = null; 399 } 400 try { 401 Thread.sleep(RepConfig.SLEEPTIME); 402 } catch (InterruptedException ie) {} 403 continue; 404 } 405 } 406 407 BufferedReader stdin = 408 new BufferedReader(new InputStreamReader(System.in)); 409 410 /* Listen for input, and add it to the database. */ 411 System.out.print("QUOTESERVER"); 412 if (!dbenv.getIsMaster()) 413 System.out.print("(read-only)"); 414 System.out.print("> "); 415 System.out.flush(); 416 String nextline = null; 417 try { 418 nextline = stdin.readLine(); 419 } catch (IOException ioe) { 420 System.err.println("Unable to get data from stdin"); 421 break; 422 } 423 String[] words = nextline.split("\\s"); 424 425 /* A blank line causes the DB to be dumped to stdout. */ 426 if (words.length == 0 || 427 (words.length == 1 && words[0].length() == 0)) { 428 try { 429 if (dbenv.getInClientSync()) 430 System.err.println( 431 "Cannot read data during client initialization - please try again."); 432 else 433 printStocks(db); 434 } catch (DeadlockException de) { 435 continue; 436 } catch (DatabaseException e) { 437 /* 438 * This could be DB_REP_HANDLE_DEAD, which 439 * should close the database and continue. 440 */ 441 System.err.println("Got db exception reading replication" + 442 "DB: " + e); 443 System.err.println("Expected if it was due to a dead " + 444 "replication handle, otherwise an unexpected error."); 445 db.close(true); /* Close no sync. */ 446 db = null; 447 continue; 448 } 449 continue; 450 } 451 452 if (words.length == 1 && 453 (words[0].compareToIgnoreCase("quit") == 0 || 454 words[0].compareToIgnoreCase("exit") == 0)) { 455 dbenv.setAppFinished(true); 456 break; 457 } else if (words.length != 2) { 458 System.err.println("Format: TICKER VALUE"); 459 continue; 460 } 461 462 if (!dbenv.getIsMaster()) { 463 System.err.println("Can't update client."); 464 continue; 465 } 466 467 DatabaseEntry key = new DatabaseEntry(words[0].getBytes()); 468 DatabaseEntry data = new DatabaseEntry(words[1].getBytes()); 469 470 db.put(null, key, data); 471 } 472 if (db != null) 473 db.close(true); 474 return 0; 475 } 476 477 public void terminate() 478 throws DatabaseException 479 { 480 /* Wait for checkpoint and log archive threads to finish. */ 481 try { 482 lgaThr.join(); 483 ckpThr.join(); 484 } catch (Exception e1) { 485 System.err.println("Support thread join failed."); 486 } 487 488 /* 489 * We have used the DB_TXN_NOSYNC environment flag for improved 490 * performance without the usual sacrifice of transactional durability, 491 * as discussed in the "Transactional guarantees" page of the Reference 492 * Guide: if one replication site crashes, we can expect the data to 493 * exist at another site. However, in case we shut down all sites 494 * gracefully, we push out the end of the log here so that the most 495 * recent transactions don't mysteriously disappear. 496 */ 497 dbenv.logFlush(null); 498 499 dbenv.close(); 500 } 501 502 /* 503 * void return type since error conditions are propogated 504 * via exceptions. 505 */ 506 private void printStocks(Database db) 507 throws DeadlockException, DatabaseException 508 { 509 Cursor dbc = db.openCursor(null, null); 510 511 System.out.println("\tSymbol\tPrice"); 512 System.out.println("\t======\t====="); 513 514 DatabaseEntry key = new DatabaseEntry(); 515 DatabaseEntry data = new DatabaseEntry(); 516 OperationStatus ret; 517 for (ret = dbc.getFirst(key, data, LockMode.DEFAULT); 518 ret == OperationStatus.SUCCESS; 519 ret = dbc.getNext(key, data, LockMode.DEFAULT)) { 520 String keystr = new String 521 (key.getData(), key.getOffset(), key.getSize()); 522 String datastr = new String 523 (data.getData(), data.getOffset(), data.getSize()); 524 System.out.println("\t"+keystr+"\t"+datastr); 525 526 } 527 dbc.close(); 528 } 529 530 /* 531 * Implemention of EventHandler interface to handle the Berkeley DB events 532 * we are interested in receiving. 533 */ 534 private /* internal */ 535 class RepQuoteEventHandler extends EventHandlerAdapter { 536 public void handleRepClientEvent() 537 { 538 dbenv.setIsMaster(false); 539 dbenv.setInClientSync(true); 540 } 541 public void handleRepMasterEvent() 542 { 543 dbenv.setIsMaster(true); 544 dbenv.setInClientSync(false); 545 } 546 public void handleRepNewMasterEvent() 547 { 548 dbenv.setInClientSync(true); 549 } 550 public void handleRepPermFailedEvent() 551 { 552 /* 553 * Did not get enough acks to guarantee transaction 554 * durability based on the configured ack policy. This 555 * transaction will be flushed to the master site's 556 * local disk storage for durability. 557 */ 558 System.err.println( 559 "Insufficient acknowledgements to guarantee transaction durability."); 560 } 561 public void handleRepStartupDoneEvent() 562 { 563 dbenv.setInClientSync(false); 564 } 565 } 566} /* End RepQuoteEventHandler class. */ 567 568/* 569 * This is a very simple thread that performs checkpoints at a fixed 570 * time interval. For a master site, the time interval is one minute 571 * plus the duration of the checkpoint_delay timeout (30 seconds by 572 * default.) For a client site, the time interval is one minute. 573 */ 574class CheckpointThread extends Thread { 575 private RepQuoteEnvironment myEnv = null; 576 577 public CheckpointThread(RepQuoteEnvironment env) { 578 myEnv = env; 579 } 580 581 public void run() { 582 for (;;) { 583 /* 584 * Wait for one minute, polling once per second to see if 585 * application has finished. When application has finished, 586 * terminate this thread. 587 */ 588 for (int i = 0; i < 60; i++) { 589 try { 590 Thread.sleep(1000); 591 } catch (InterruptedException ie) {} 592 if (myEnv.getAppFinished()) 593 return; 594 } 595 596 /* Perform a checkpoint. */ 597 try { 598 myEnv.checkpoint(null); 599 } catch (DatabaseException de) { 600 System.err.println("Could not perform checkpoint."); 601 } 602 } 603 } 604} 605 606/* 607 * This is a simple log archive thread. Once per minute, it removes all but 608 * the most recent 3 logs that are safe to remove according to a call to 609 * DBENV->log_archive(). 610 * 611 * Log cleanup is needed to conserve disk space, but aggressive log cleanup 612 * can cause more frequent client initializations if a client lags too far 613 * behind the current master. This can happen in the event of a slow client, 614 * a network partition, or a new master that has not kept as many logs as the 615 * previous master. 616 * 617 * The approach in this routine balances the need to mitigate against a 618 * lagging client by keeping a few more of the most recent unneeded logs 619 * with the need to conserve disk space by regularly cleaning up log files. 620 * Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE 621 * flag) is not recommended for replication due to the risk of frequent 622 * client initializations. 623 */ 624class LogArchiveThread extends Thread { 625 private RepQuoteEnvironment myEnv = null; 626 private EnvironmentConfig myEnvConfig = null; 627 628 public LogArchiveThread(RepQuoteEnvironment env, 629 EnvironmentConfig envConfig) { 630 myEnv = env; 631 myEnvConfig = envConfig; 632 } 633 634 public void run() { 635 java.io.File[] logFileList; 636 int logs_to_keep = 3; 637 int minlog; 638 639 for (;;) { 640 /* 641 * Wait for one minute, polling once per second to see if 642 * application has finished. When application has finished, 643 * terminate this thread. 644 */ 645 for (int i = 0; i < 60; i++) { 646 try { 647 Thread.sleep(1000); 648 } catch (InterruptedException ie) {} 649 if (myEnv.getAppFinished()) 650 return; 651 } 652 653 try { 654 /* Get the list of unneeded log files. */ 655 logFileList = myEnv.getArchiveLogFiles(false); 656 /* 657 * Remove all but the logs_to_keep most recent unneeded 658 * log files. 659 */ 660 minlog = logFileList.length - logs_to_keep; 661 for (int i = 0; i < minlog; i++) { 662 logFileList[i].delete(); 663 } 664 } catch (DatabaseException de) { 665 System.err.println("Problem deleting log archive files."); 666 } 667 } 668 } 669} 670