1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2009 Oracle. All rights reserved. 5 * 6 */ 7using System; 8using System.Collections.Generic; 9using System.IO; 10using System.Text; 11using System.Threading; 12 13using BerkeleyDB; 14 15/** 16 * RepQuoteExample is a simple but complete demonstration of a replicated 17 * application. The application is a mock stock ticker. The master accepts a 18 * stock symbol and an numerical value as input, and stores this information 19 * into a replicated database; either master or clients can display the 20 * contents of the database. 21 * 22 * The options to start a given replication node are: 23 * 24 * -h home (required; h stands for home directory) 25 * -l host:port (required; l stands for local) 26 * -C or M (optional; start up as client or master) 27 * -r host:port (optional; r stands for remote; any number of these may 28 * be specified) 29 * -R host:port (optional; R stands for remote peer; only one of these may 30 * be specified) 31 * -a all|quorum (optional; a stands for ack policy) 32 * -b (optional; b stands for bulk) 33 * -n nsites (optional; number of sites in replication group; defaults to 0 34 * to try to dynamically compute nsites) 35 * -p priority (optional; defaults to 100) 36 * -v (optional; v stands for verbose) 37 * 38 * A typical session begins with a command such as the following to start a 39 * master: 40 * 41 * ex_repquote.exe -M -h dir1 -l localhost:6000 42 * 43 * and several clients: 44 * 45 * ex_repquote.exe -C -h dir2 46 * -l localhost:6001 -r localhost:6000 47 * ex_repquote.exe -C -h dir3 48 * -l localhost:6002 -r localhost:6000 49 * ex_repquote.exe -C -h dir4 50 * -l localhost:6003 -r localhost:6000 51 * 52 * Each process is a member of a DB replication group. The sample application 53 * expects the following commands to stdin: 54 * 55 * NEWLINE -- print all the stocks held in the database 56 * quit -- shutdown this node 57 * exit -- shutdown this node 58 * stock_symbol number -- enter this stock and number into the database 59 */ 60 61namespace ex_repquote 62{ 63 public class RepQuoteExample 64 { 65 private RepQuoteEnvironment dbenv; 66 private Thread checkpointThread; 67 private Thread logArchiveThread; 68 69 public static void usage() 70 { 71 Console.WriteLine( 72 "usage: " + RepConfig.progname + 73 " -h home -l host:port [-CM][-r host:port][-R host:port]\n" + 74 " [-a all|quorum][-b][-n nsites][-p priority][-v]"); 75 76 Console.WriteLine( 77 "\t -h home (required; h stands for home directory)\n" + 78 "\t -l host:port (required; l stands for local)\n" + 79 "\t -C or -M (optional; start up as client or master)\n" + 80 "\t -r host:port (optional; r stands for remote; any number " + 81 "of these\n" + 82 "\t may be specified)\n" + 83 "\t -R host:port (optional; R stands for remote peer; only " + 84 "one of\n" + 85 "\t these may be specified)\n" + 86 "\t -a all|quorum (optional; a stands for ack policy)\n" + 87 "\t -b (optional; b stands for bulk)\n" + 88 "\t -n nsites (optional; number of sites in replication " + 89 "group; defaults\n" + 90 "\t to 0 to try to dynamically compute nsites)\n" + 91 "\t -p priority (optional; defaults to 100)\n" + 92 "\t -v (optional; v stands for verbose)\n"); 93 94 Environment.Exit(1); 95 } 96 97 public static void Main(string[] args) 98 { 99 RepConfig config = new RepConfig(); 100 bool isPeer; 101 uint tmpPort = 0; 102 103 /* 104 * RepQuoteExample is meant to be run from build_windows\AnyCPU, in 105 * either the Debug or Release directory. The required core 106 * libraries, however, are in either build_windows\Win32 or 107 * build_windows\x64, depending upon the platform. That location 108 * needs to be added to the PATH environment variable for the 109 * P/Invoke calls to work. 110 */ 111 try { 112 String pwd = Environment.CurrentDirectory; 113 pwd = Path.Combine(pwd, ".."); 114 pwd = Path.Combine(pwd, ".."); 115 if (IntPtr.Size == 4) 116 pwd = Path.Combine(pwd, "Win32"); 117 else 118 pwd = Path.Combine(pwd, "x64"); 119#if DEBUG 120 pwd = Path.Combine(pwd, "Debug"); 121#else 122 pwd = Path.Combine(pwd, "Release"); 123#endif 124 pwd += ";" + Environment.GetEnvironmentVariable("PATH"); 125 Environment.SetEnvironmentVariable("PATH", pwd); 126 } catch (Exception e) { 127 Console.WriteLine( 128 "Unable to set the PATH environment variable."); 129 Console.WriteLine(e.Message); 130 return; 131 } 132 133 /* Extract the command line parameters. */ 134 for (int i = 0; i < args.Length; i++) 135 { 136 isPeer = false; 137 string s = args[i]; 138 if (s[0] != '-') 139 continue; 140 switch (s[1]) 141 { 142 case 'a': 143 if (i == args.Length - 1) 144 usage(); 145 i++; 146 if (args[i].Equals("all")) 147 config.ackPolicy = AckPolicy.ALL; 148 else if (!args[i].Equals("quorum")) 149 usage(); 150 break; 151 case 'b': 152 config.bulk = true; 153 break; 154 case 'C': 155 config.startPolicy = StartPolicy.CLIENT; 156 break; 157 case 'h': 158 if (i == args.Length - 1) 159 usage(); 160 i++; 161 config.home = args[i]; 162 break; 163 case 'l': 164 if (i == args.Length - 1) 165 usage(); 166 i++; 167 string[] words = args[i].Split(':'); 168 if (words.Length != 2) 169 { 170 Console.Error.WriteLine("Invalid host " + 171 "specification host:port needed."); 172 usage(); 173 } 174 try 175 { 176 tmpPort = uint.Parse(words[1]); 177 } catch (InvalidCastException) 178 { 179 Console.Error.WriteLine("Invalid host " + 180 "specification, could not parse port number."); 181 usage(); 182 } 183 config.host.Host = words[0]; 184 config.host.Port = tmpPort; 185 break; 186 case 'M': 187 config.startPolicy = StartPolicy.MASTER; 188 break; 189 case 'n': 190 if (i == args.Length - 1) 191 usage(); 192 i++; 193 try 194 { 195 config.totalSites = uint.Parse(args[i]); 196 } catch (InvalidCastException) 197 { 198 Console.Error.WriteLine( 199 "Unable to parse number of total sites."); 200 usage(); 201 } 202 break; 203 case 'p': 204 if (i == args.Length - 1) 205 usage(); 206 i++; 207 try 208 { 209 config.priority = uint.Parse(args[i]); 210 } catch (InvalidCastException) 211 { 212 Console.Error.WriteLine("Unable to parse priority."); 213 usage(); 214 } 215 break; 216 case 'r': 217 case 'R': 218 if (i == args.Length - 1) 219 usage(); 220 if (args[i].Equals("R")) 221 isPeer = true; 222 i++; 223 words = args[i].Split(':'); 224 if (words.Length != 2) 225 { 226 Console.Error.WriteLine("Invalid host " + 227 "specification host:port needed."); 228 usage(); 229 } 230 try 231 { 232 tmpPort = uint.Parse(words[1]); 233 } catch (InvalidCastException) 234 { 235 Console.Error.WriteLine("Invalid host " + 236 "specification, could not parse port number."); 237 usage(); 238 } 239 config.remote.Add( 240 new RemoteSite(words[0], tmpPort, isPeer)); 241 break; 242 case 'v': 243 config.verbose = true; 244 break; 245 default: 246 Console.Error.WriteLine( 247 "Unrecognized option: " + args[i]); 248 usage(); 249 break; 250 } 251 } 252 253 /* Error check command line. */ 254 if (config.host.Host == null || config.home.Length == 0) 255 usage(); 256 257 RepQuoteExample runner = null; 258 try 259 { 260 runner = new RepQuoteExample(); 261 runner.init(config); 262 runner.doloop(); 263 runner.terminate(); 264 runner = null; 265 } catch (DatabaseException dbErr) 266 { 267 Console.Error.WriteLine("Caught an exception during " + 268 "initialization or processing: " + dbErr); 269 if (runner != null) 270 runner.terminate(); 271 } 272 } /* End main. */ 273 274 public RepQuoteExample() 275 { 276 dbenv = null; 277 } 278 279 public int init(RepConfig config) 280 { 281 int ret = 0; 282 283 DatabaseEnvironmentConfig envConfig = new DatabaseEnvironmentConfig(); 284 envConfig.ErrorPrefix = RepConfig.progname; 285 envConfig.RepSystemCfg = new ReplicationConfig(); 286 envConfig.RepSystemCfg.RepMgrLocalSite = config.host; 287 for (int i = 0; i < config.remote.Count; i++) 288 envConfig.RepSystemCfg.AddRemoteSite(config.remote[i].Host, 289 config.remote[i].IsPeer); 290 291 if (config.totalSites > 0) 292 envConfig.RepSystemCfg.NSites = config.totalSites; 293 294 envConfig.RepSystemCfg.BulkTransfer = config.bulk; 295 296 /* 297 * Configure heartbeat timeouts so that repmgr monitors the 298 * health of the TCP connection. Master sites broadcast a heartbeat 299 * at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout. 300 * Client sites wait for message activity the length of the 301 * DB_REP_HEARTBEAT_MONITOR timeout before concluding that the 302 * connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR 303 * timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout. 304 */ 305 envConfig.RepSystemCfg.HeartbeatMonitor = 10000000; 306 envConfig.RepSystemCfg.HeartbeatSend = 5000000; 307 308 /* 309 * Set replication group election priority for this environment. 310 * An election first selects the site with the most recent log 311 * records as the new master. If multiple sites have the most 312 * recent log records, the site with the highest priority value 313 * is selected as master. 314 */ 315 envConfig.RepSystemCfg.Priority = config.priority; 316 envConfig.MPoolSystemCfg = new MPoolConfig(); 317 envConfig.MPoolSystemCfg.CacheSize = RepConfig.CACHESIZE; 318 envConfig.TxnNoSync = true; 319 320 envConfig.EventNotify = new EventNotifyDelegate(RepQuoteEventHandler); 321 322 /* 323 * Set the policy that determines how master and client sites 324 * handle acknowledgement of replication messages needed for 325 * permanent records. The default policy of "quorum" requires only 326 * a quorum of electable peers sufficient to ensure a permanent 327 * record remains durable if an election is held. The "all" option 328 * requires all clients to acknowledge a permanent replication 329 * message instead. 330 */ 331 envConfig.RepSystemCfg.RepMgrAckPolicy = config.ackPolicy; 332 333 /* 334 * Set the threshold for the minimum and maximum time the client 335 * waits before requesting retransmission of a missing message. 336 * Base these values on the performance and load characteristics 337 * of the master and client host platforms as well as the round 338 * trip message time. 339 */ 340 envConfig.RepSystemCfg.RetransmissionRequest(20000, 500000); 341 342 /* 343 * Configure deadlock detection to ensure that any deadlocks 344 * are broken by having one of the conflicting lock requests 345 * rejected. DB_LOCK_DEFAULT uses the lock policy specified 346 * at environment creation time or DB_LOCK_RANDOM if none was 347 * specified. 348 */ 349 envConfig.LockSystemCfg = new LockingConfig(); 350 envConfig.LockSystemCfg.DeadlockResolution = DeadlockPolicy.DEFAULT; 351 352 envConfig.Create = true; 353 envConfig.RunRecovery = true; 354 envConfig.FreeThreaded = true; 355 envConfig.UseReplication = true; 356 envConfig.UseLocking = true; 357 envConfig.UseLogging = true; 358 envConfig.UseMPool = true; 359 envConfig.UseTxns = true; 360 envConfig.Verbosity = new VerboseMessages(); 361 envConfig.Verbosity.Replication = config.verbose; 362 363 try 364 { 365 dbenv = RepQuoteEnvironment.Open(config.home, envConfig); 366 } catch(DatabaseException e) 367 { 368 Console.WriteLine("Fail to open environment: " + e.Message); 369 return 1; 370 } 371 372 373 /* The following base replication features may also be useful to your 374 * application. See Berkeley DB documentation for more details. 375 * - Master leases: Provide stricter consistency for data reads 376 * on a master site. 377 * - Timeouts: Customize the amount of time Berkeley DB waits 378 * for such things as an election to be concluded or a master 379 * lease to be granted. 380 * - Delayed client synchronization: Manage the master site's 381 * resources by spreading out resource-intensive client 382 * synchronizations. 383 * - Blocked client operations: Return immediately with an error 384 * instead of waiting indefinitely if a client operation is 385 * blocked by an ongoing client synchronization. 386 * 387 * The following repmgr features may also be useful to your 388 * application. See Berkeley DB documentation for more details. 389 * - Two-site strict majority rule - In a two-site replication 390 * group, require both sites to be available to elect a new 391 * master. 392 * - Timeouts - Customize the amount of time repmgr waits 393 * for such things as waiting for acknowledgements or attempting 394 * to reconnect to other sites. 395 * - Site list - return a list of sites currently known to repmgr. 396 */ 397 398 /* Start checkpoint and log archive support threads. */ 399 checkpointThread = new Thread(new ThreadStart(CheckPoint)); 400 checkpointThread.Start(); 401 logArchiveThread = new Thread(new ThreadStart(LogArchive)); 402 logArchiveThread.Start(); 403 404 /* Start replication manager. */ 405 if (config.startPolicy == StartPolicy.CLIENT) 406 dbenv.env.RepMgrStartClient(3); 407 else if (config.startPolicy == StartPolicy.ELECTION) 408 dbenv.env.RepMgrStartClient(3, true); 409 else if (config.startPolicy == StartPolicy.MASTER) 410 dbenv.env.RepMgrStartMaster(3); 411 412 return ret; 413 } 414 415 public int doloop() 416 { 417 BTreeDatabase db = null; 418 419 for (;;) 420 { 421 if (db == null) 422 { 423 BTreeDatabaseConfig dbConfig = new BTreeDatabaseConfig(); 424 dbConfig.Env = dbenv.env; 425 426 if (dbenv.IsMaster) 427 { 428 /* 429 * Open database allowing create only if this is a master 430 * database. A client database uses polling to attempt 431 * to open the database, without creating it, until the 432 * open succeeds. 433 * 434 * This polling logic for allowing create can be 435 * simplified under some circumstances. For example, if 436 * the application can be sure a database is already 437 * there, it would never need to open it allowing create. 438 */ 439 dbConfig.Creation = CreatePolicy.IF_NEEDED; 440 } 441 442 dbConfig.AutoCommit = true; 443 444 try 445 { 446 db = BTreeDatabase.Open(RepConfig.progname, dbConfig); 447 } catch (DatabaseException) 448 { 449 Console.WriteLine("no stock database available yet."); 450 if (db != null) 451 { 452 db.Close(true); 453 db = null; 454 } 455 456 Thread.Sleep(RepConfig.SLEEPTIME); 457 continue; 458 } 459 } 460 461 /* Listen for input, and add it to the database. */ 462 Console.Write("QUOTESERVER"); 463 if (dbenv.IsMaster == false) 464 Console.Write("(read-only)"); 465 Console.Write("> "); 466 string nextLine = null; 467 try 468 { 469 nextLine = Console.ReadLine(); 470 } catch(System.IO.IOException) 471 { 472 Console.WriteLine("Unable to get data"); 473 break; 474 } 475 476 /* A blank line causes the DB to be dumped. */ 477 string[] words = nextLine.Split(' '); 478 if (words.Length == 0 || 479 words.Length == 1 && words[0].Length == 0) 480 { 481 try 482 { 483 if (dbenv.InClientSync) 484 Console.WriteLine("Cannot read data during " + 485 "client initialization - please try again."); 486 else 487 printStocks(db); 488 } catch (DeadlockException) 489 { 490 continue; 491 } catch (DatabaseException e) 492 { 493 /* 494 * This could be DB_REP_HANDLE_DEAD, which 495 * should close the database and continue. 496 */ 497 Console.WriteLine("Got db exception reading replication" 498 + "DB: " + e); 499 Console.WriteLine("Expected if it was due to a dead " + 500 "replication handle, otherwise an unexpected error."); 501 db.Close(false);/* Close no sync. */ 502 db = null; 503 continue; 504 } 505 continue; 506 } 507 508 if (words.Length == 1 && 509 (words[0].ToLower().Equals("quit") || 510 words[0].ToLower().Equals("exit"))) 511 { 512 dbenv.AppFinished = true; 513 break; 514 } else if (words.Length != 2) 515 { 516 Console.WriteLine("Format: TICKER VALUE"); 517 continue; 518 } 519 520 if (!dbenv.IsMaster) 521 { 522 Console.WriteLine("Can't update client"); 523 continue; 524 } 525 526 DatabaseEntry key = new DatabaseEntry( 527 ASCIIEncoding.ASCII.GetBytes(words[0])); 528 DatabaseEntry data = new DatabaseEntry( 529 ASCIIEncoding.ASCII.GetBytes(words[1])); 530 531 db.Put(key, data); 532 } 533 534 if (db != null) 535 db.Close(true); 536 537 return 0; 538 } 539 540 public void terminate() 541 { 542 /* Wait for checkpoint and log archive threads to finish. */ 543 try 544 { 545 logArchiveThread.Join(); 546 checkpointThread.Join(); 547 } catch (Exception) 548 { 549 Console.WriteLine("Support thread join failed."); 550 } 551 552 /* 553 * We have used the DB_TXN_NOSYNC environment flag for improved 554 * performance without the usual sacrifice of transactional durability, 555 * as discussed in the "Transactional guarantees" page of the Reference 556 * Guide: if one replication site crashes, we can expect the data to 557 * exist at another site. However, in case we shut down all sites 558 * gracefully, we push out the end of the log here so that the most 559 * recent transactions don't mysteriously disappear. 560 */ 561 dbenv.env.LogFlush(); 562 dbenv.env.Close(); 563 } 564 565 /* 566 * void return type since error conditions are propogated 567 * via exceptions. 568 */ 569 private void printStocks(Database db) 570 { 571 Cursor dbc = db.Cursor(); 572 Console.WriteLine("\tSymbol\t\tPrice"); 573 Console.WriteLine("\t======\t\t====="); 574 DatabaseEntry key = new DatabaseEntry(); 575 DatabaseEntry data = new DatabaseEntry(); 576 577 foreach (KeyValuePair<DatabaseEntry, DatabaseEntry> pair in dbc) 578 { 579 string keyStr = ASCIIEncoding.ASCII.GetString(pair.Key.Data); 580 string dataStr = ASCIIEncoding.ASCII.GetString(pair.Value.Data); 581 Console.WriteLine("\t"+keyStr+"\t\t"+dataStr); 582 } 583 dbc.Close(); 584 } 585 586 /* 587 * This is a very simple thread that performs checkpoints at a fixed 588 * time interval. For a master site, the time interval is one minute 589 * plus the duration of the checkpoint_delay timeout (30 seconds by 590 * default.) For a client site, the time interval is one minute. 591 */ 592 public void CheckPoint() 593 { 594 for (; ; ) 595 { 596 /* 597 * Wait for one minute, polling once per second to see if 598 * application has finished. When application has finished, 599 * terminate this thread. 600 */ 601 for (int i = 0; i < 60; i++) 602 { 603 Thread.Sleep(1000); 604 if (dbenv.AppFinished) 605 return; 606 } 607 608 /* Perform a checkpoint. */ 609 try 610 { 611 dbenv.env.Checkpoint(); 612 } catch (DatabaseException) 613 { 614 Console.WriteLine("Could not perform checkpoint."); 615 } 616 } 617 } 618 619 620 /* 621 * This is a simple log archive thread. Once per minute, it removes all but 622 * the most recent 3 logs that are safe to remove according to a call to 623 * DBENV->log_archive(). 624 * 625 * Log cleanup is needed to conserve disk space, but aggressive log cleanup 626 * can cause more frequent client initializations if a client lags too far 627 * behind the current master. This can happen in the event of a slow client, 628 * a network partition, or a new master that has not kept as many logs as the 629 * previous master. 630 * 631 * The approach in this routine balances the need to mitigate against a 632 * lagging client by keeping a few more of the most recent unneeded logs 633 * with the need to conserve disk space by regularly cleaning up log files. 634 * Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE 635 * flag) is not recommended for replication due to the risk of frequent 636 * client initializations. 637 */ 638 public void LogArchive() 639 { 640 int logKeep = 3; 641 int minLog; 642 List<string> logFileList; 643 644 for (; ; ) 645 { 646 /* 647 * Wait for one minute, polling once per second to see if 648 * application has finished. When application has finished, 649 * terminate this thread. 650 */ 651 for (int i = 0; i < 60; i++) 652 { 653 Thread.Sleep(1000); 654 if (dbenv.AppFinished) 655 return; 656 } 657 658 try 659 { 660 /* Get the list of unneeded log files. */ 661 logFileList = dbenv.env.ArchivableLogFiles(true); 662 /* 663 * Remove all but the logs_to_keep most recent unneeded 664 * log files. 665 */ 666 minLog = logFileList.Count - logKeep; 667 for (int i = 0; i < minLog; i++) 668 { 669 FileInfo logFile = new FileInfo(logFileList[i]); 670 logFile.Delete(); 671 } 672 } catch (DatabaseException) 673 { 674 Console.WriteLine("Problem deleting log archive files."); 675 } 676 } 677 } 678 679 680 681 /* 682 * Implemention of EventHandler interface to handle the Berkeley DB events 683 * we are interested in receiving. 684 */ 685 private void RepQuoteEventHandler(NotificationEvent eventCode, byte[] event_info) 686 { 687 switch (eventCode) 688 { 689 case NotificationEvent.REP_CLIENT: 690 dbenv.IsMaster = false; 691 dbenv.InClientSync = true; 692 break; 693 case NotificationEvent.REP_MASTER: 694 dbenv.IsMaster = true; 695 dbenv.InClientSync = false; 696 break; 697 case NotificationEvent.REP_NEWMASTER: 698 dbenv.InClientSync = true; 699 break; 700 case NotificationEvent.REP_STARTUPDONE: 701 dbenv.InClientSync = false; 702 break; 703 /* 704 * Did not get enough acks to guarantee transaction 705 * durability based on the configured ack policy. This 706 * transaction will be flushed to the master site's 707 * local disk storage for durability. 708 */ 709 case NotificationEvent.REP_PERM_FAILED: 710 Console.WriteLine("Insufficient acknowledgements " + 711 "to guarantee transaction durability."); 712 break; 713 default: 714 Console.WriteLine("Ignoring event: {0}", eventCode); 715 break; 716 } 717 } 718 } 719} 720