• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src-rt-6.x.4708/router/db-4.8.30/examples_csharp/ex_repquote/
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2009 Oracle.  All rights reserved.
5 *
6 */
7using System;
8using System.Collections.Generic;
9using System.IO;
10using System.Text;
11using System.Threading;
12
13using BerkeleyDB;
14
15/**
16 * RepQuoteExample is a simple but complete demonstration of a replicated
17 * application. The application is a mock stock ticker. The master accepts a
18 * stock symbol and an numerical value as input, and stores this information
19 * into a replicated database; either master or clients can display the
20 * contents of the database.
21 *
22 * The options to start a given replication node are:
23 *
24 *   -h home (required; h stands for home directory)
25 *   -l host:port (required; l stands for local)
26 *   -C or M (optional; start up as client or master)
27 *   -r host:port (optional; r stands for remote; any number of these may
28 *      be specified)
29 *   -R host:port (optional; R stands for remote peer; only one of these may
30 *      be specified)
31 *   -a all|quorum (optional; a stands for ack policy)
32 *   -b (optional; b stands for bulk)
33 *   -n nsites (optional; number of sites in replication group; defaults to 0
34 *      to try to dynamically compute nsites)
35 *   -p priority (optional; defaults to 100)
36 *   -v (optional; v stands for verbose)
37 *
38 * A typical session begins with a command such as the following to start a
39 * master:
40 *
41 * ex_repquote.exe -M -h dir1 -l localhost:6000
42 *
43 * and several clients:
44 *
45 * ex_repquote.exe -C -h dir2
46 *               -l localhost:6001 -r localhost:6000
47 * ex_repquote.exe -C -h dir3
48 *               -l localhost:6002 -r localhost:6000
49 * ex_repquote.exe -C -h dir4
50 *               -l localhost:6003 -r localhost:6000
51 *
52 * Each process is a member of a DB replication group. The sample application
53 * expects the following commands to stdin:
54 *
55 * NEWLINE -- print all the stocks held in the database
56 * quit -- shutdown this node
57 * exit -- shutdown this node
58 * stock_symbol number -- enter this stock and number into the database
59 */
60
61namespace ex_repquote
62{
63	public class RepQuoteExample
64	{
65		private RepQuoteEnvironment dbenv;
66		private Thread checkpointThread;
67		private Thread logArchiveThread;
68
69		public static void usage()
70		{
71			Console.WriteLine(
72			    "usage: " + RepConfig.progname +
73			    " -h home -l host:port [-CM][-r host:port][-R host:port]\n" +
74			    "  [-a all|quorum][-b][-n nsites][-p priority][-v]");
75
76			Console.WriteLine(
77			    "\t -h home (required; h stands for home directory)\n" +
78			    "\t -l host:port (required; l stands for local)\n" +
79			    "\t -C or -M (optional; start up as client or master)\n" +
80			    "\t -r host:port (optional; r stands for remote; any number " +
81			    "of these\n" +
82			    "\t    may be specified)\n" +
83			    "\t -R host:port (optional; R stands for remote peer; only " +
84			    "one of\n" +
85			    "\t    these may be specified)\n" +
86			    "\t -a all|quorum (optional; a stands for ack policy)\n" +
87			    "\t -b (optional; b stands for bulk)\n" +
88			    "\t -n nsites (optional; number of sites in replication " +
89			    "group; defaults\n" +
90			    "\t    to 0 to try to dynamically compute nsites)\n" +
91			    "\t -p priority (optional; defaults to 100)\n" +
92			    "\t -v (optional; v stands for verbose)\n");
93
94			Environment.Exit(1);
95		}
96
97		public static void Main(string[] args)
98		{
99			RepConfig config = new RepConfig();
100			bool isPeer;
101			uint tmpPort = 0;
102
103            /*
104             * RepQuoteExample is meant to be run from build_windows\AnyCPU, in
105             * either the Debug or Release directory. The required core
106             * libraries, however, are in either build_windows\Win32 or
107             * build_windows\x64, depending upon the platform.  That location
108             * needs to be added to the PATH environment variable for the
109             * P/Invoke calls to work.
110             */
111            try {
112                String pwd = Environment.CurrentDirectory;
113                pwd = Path.Combine(pwd, "..");
114                pwd = Path.Combine(pwd, "..");
115                if (IntPtr.Size == 4)
116                    pwd = Path.Combine(pwd, "Win32");
117                else
118                    pwd = Path.Combine(pwd, "x64");
119#if DEBUG
120                pwd = Path.Combine(pwd, "Debug");
121#else
122                pwd = Path.Combine(pwd, "Release");
123#endif
124                pwd += ";" + Environment.GetEnvironmentVariable("PATH");
125                Environment.SetEnvironmentVariable("PATH", pwd);
126            } catch (Exception e) {
127                Console.WriteLine(
128                    "Unable to set the PATH environment variable.");
129                Console.WriteLine(e.Message);
130                return;
131            }
132
133            /*  Extract the command line parameters. */
134			for (int i = 0; i < args.Length; i++)
135			{
136				isPeer = false;
137				string s = args[i];
138				if (s[0] != '-')
139					continue;
140				switch (s[1])
141				{
142					case 'a':
143						if (i == args.Length - 1)
144							usage();
145						i++;
146						if (args[i].Equals("all"))
147							config.ackPolicy = AckPolicy.ALL;
148						else if (!args[i].Equals("quorum"))
149							usage();
150						break;
151					case 'b':
152						config.bulk = true;
153						break;
154					case 'C':
155						config.startPolicy = StartPolicy.CLIENT;
156						break;
157					case 'h':
158						if (i == args.Length - 1)
159							usage();
160						i++;
161						config.home = args[i];
162						break;
163					case 'l':
164						if (i == args.Length - 1)
165							usage();
166						i++;
167						string[] words = args[i].Split(':');
168						if (words.Length != 2)
169						{
170							Console.Error.WriteLine("Invalid host " +
171							    "specification host:port needed.");
172							usage();
173						}
174						try
175						{
176							tmpPort = uint.Parse(words[1]);
177						} catch (InvalidCastException)
178						{
179							Console.Error.WriteLine("Invalid host " +
180							    "specification, could not parse port number.");
181							usage();
182						}
183						config.host.Host = words[0];
184						config.host.Port = tmpPort;
185						break;
186					case 'M':
187						config.startPolicy = StartPolicy.MASTER;
188						break;
189					case 'n':
190						if (i == args.Length - 1)
191							usage();
192						i++;
193						try
194						{
195						    config.totalSites = uint.Parse(args[i]);
196						} catch (InvalidCastException)
197						{
198						    Console.Error.WriteLine(
199							"Unable to parse number of total sites.");
200						    usage();
201						}
202						break;
203					case 'p':
204						if (i == args.Length - 1)
205							usage();
206						i++;
207						try
208						{
209						    config.priority = uint.Parse(args[i]);
210						} catch (InvalidCastException)
211						{
212						    Console.Error.WriteLine("Unable to parse priority.");
213						    usage();
214						}
215						break;
216					case 'r':
217					case 'R':
218						if (i == args.Length - 1)
219							usage();
220						if (args[i].Equals("R"))
221							isPeer = true;
222						i++;
223						words = args[i].Split(':');
224						if (words.Length != 2)
225						{
226							Console.Error.WriteLine("Invalid host " +
227							    "specification host:port needed.");
228							usage();
229						}
230						try
231						{
232							tmpPort = uint.Parse(words[1]);
233						} catch (InvalidCastException)
234						{
235							Console.Error.WriteLine("Invalid host " +
236							    "specification, could not parse port number.");
237							usage();
238						}
239						config.remote.Add(
240						    new RemoteSite(words[0], tmpPort, isPeer));
241						break;
242					case 'v':
243						config.verbose = true;
244						break;
245					default:
246						Console.Error.WriteLine(
247						    "Unrecognized option: " + args[i]);
248						usage();
249						break;
250				}
251			}
252
253			/* Error check command line. */
254			if (config.host.Host == null || config.home.Length == 0)
255				usage();
256
257			RepQuoteExample runner = null;
258			try
259			{
260				runner = new RepQuoteExample();
261				runner.init(config);
262				runner.doloop();
263				runner.terminate();
264				runner = null;
265			} catch (DatabaseException dbErr)
266			{
267				Console.Error.WriteLine("Caught an exception during " +
268				    "initialization or processing: " + dbErr);
269				if (runner != null)
270					runner.terminate();
271			}
272		} /* End main. */
273
274		public RepQuoteExample()
275		{
276			dbenv = null;
277		}
278
279		public int init(RepConfig config)
280		{
281			int ret = 0;
282
283			DatabaseEnvironmentConfig envConfig = new DatabaseEnvironmentConfig();
284			envConfig.ErrorPrefix = RepConfig.progname;
285			envConfig.RepSystemCfg = new ReplicationConfig();
286			envConfig.RepSystemCfg.RepMgrLocalSite = config.host;
287			for (int i = 0; i < config.remote.Count; i++)
288				envConfig.RepSystemCfg.AddRemoteSite(config.remote[i].Host,
289				    config.remote[i].IsPeer);
290
291			if (config.totalSites > 0)
292				envConfig.RepSystemCfg.NSites = config.totalSites;
293
294			envConfig.RepSystemCfg.BulkTransfer = config.bulk;
295
296			/*
297			 * Configure heartbeat timeouts so that repmgr monitors the
298			 * health of the TCP connection.  Master sites broadcast a heartbeat
299			 * at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
300			 * Client sites wait for message activity the length of the
301			 * DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
302			 * connection to the master is lost.  The DB_REP_HEARTBEAT_MONITOR
303			 * timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
304			 */
305			envConfig.RepSystemCfg.HeartbeatMonitor = 10000000;
306			envConfig.RepSystemCfg.HeartbeatSend = 5000000;
307
308			/*
309			 * Set replication group election priority for this environment.
310			 * An election first selects the site with the most recent log
311			 * records as the new master.  If multiple sites have the most
312			 * recent log records, the site with the highest priority value
313			 * is selected as master.
314			 */
315			envConfig.RepSystemCfg.Priority = config.priority;
316			envConfig.MPoolSystemCfg = new MPoolConfig();
317			envConfig.MPoolSystemCfg.CacheSize = RepConfig.CACHESIZE;
318			envConfig.TxnNoSync = true;
319
320			envConfig.EventNotify = new EventNotifyDelegate(RepQuoteEventHandler);
321
322			/*
323			 * Set the policy that determines how master and client sites
324			 * handle acknowledgement of replication messages needed for
325			 * permanent records.  The default policy of "quorum" requires only
326			 * a quorum of electable peers sufficient to ensure a permanent
327			 * record remains durable if an election is held.  The "all" option
328			 * requires all clients to acknowledge a permanent replication
329			 * message instead.
330			 */
331			envConfig.RepSystemCfg.RepMgrAckPolicy = config.ackPolicy;
332
333			/*
334			 * Set the threshold for the minimum and maximum time the client
335			 * waits before requesting retransmission of a missing message.
336			 * Base these values on the performance and load characteristics
337			 * of the master and client host platforms as well as the round
338			 * trip message time.
339			 */
340			envConfig.RepSystemCfg.RetransmissionRequest(20000, 500000);
341
342			/*
343			 * Configure deadlock detection to ensure that any deadlocks
344			 * are broken by having one of the conflicting lock requests
345			 * rejected. DB_LOCK_DEFAULT uses the lock policy specified
346			 * at environment creation time or DB_LOCK_RANDOM if none was
347			 * specified.
348			 */
349			envConfig.LockSystemCfg = new LockingConfig();
350			envConfig.LockSystemCfg.DeadlockResolution = DeadlockPolicy.DEFAULT;
351
352			envConfig.Create = true;
353			envConfig.RunRecovery = true;
354			envConfig.FreeThreaded = true;
355			envConfig.UseReplication = true;
356			envConfig.UseLocking = true;
357			envConfig.UseLogging = true;
358			envConfig.UseMPool = true;
359			envConfig.UseTxns = true;
360			envConfig.Verbosity = new VerboseMessages();
361			envConfig.Verbosity.Replication = config.verbose;
362
363			try
364			{
365				dbenv = RepQuoteEnvironment.Open(config.home, envConfig);
366			} catch(DatabaseException e)
367			{
368				Console.WriteLine("Fail to open environment: " + e.Message);
369				return 1;
370			}
371
372
373			/* The following base replication features may also be useful to your
374			 * application. See Berkeley DB documentation for more details.
375			 *   - Master leases: Provide stricter consistency for data reads
376			 *     on a master site.
377			 *   - Timeouts: Customize the amount of time Berkeley DB waits
378			 *     for such things as an election to be concluded or a master
379			 *     lease to be granted.
380			 *   - Delayed client synchronization: Manage the master site's
381			 *     resources by spreading out resource-intensive client
382			 *     synchronizations.
383			 *   - Blocked client operations: Return immediately with an error
384			 *     instead of waiting indefinitely if a client operation is
385			 *     blocked by an ongoing client synchronization.
386			 *
387			 * The following repmgr features may also be useful to your
388			 * application.  See Berkeley DB documentation for more details.
389			 *  - Two-site strict majority rule - In a two-site replication
390			 *    group, require both sites to be available to elect a new
391			 *    master.
392			 *  - Timeouts - Customize the amount of time repmgr waits
393			 *    for such things as waiting for acknowledgements or attempting
394			 *    to reconnect to other sites.
395			 *  - Site list - return a list of sites currently known to repmgr.
396			 */
397
398			/* Start checkpoint and log archive support threads. */
399			checkpointThread = new Thread(new ThreadStart(CheckPoint));
400			checkpointThread.Start();
401			logArchiveThread = new Thread(new ThreadStart(LogArchive));
402			logArchiveThread.Start();
403
404			/* Start replication manager. */
405			if (config.startPolicy == StartPolicy.CLIENT)
406				dbenv.env.RepMgrStartClient(3);
407			else if (config.startPolicy == StartPolicy.ELECTION)
408				dbenv.env.RepMgrStartClient(3, true);
409			else if (config.startPolicy == StartPolicy.MASTER)
410				dbenv.env.RepMgrStartMaster(3);
411
412			return ret;
413		}
414
415		public int doloop()
416		{
417			BTreeDatabase db = null;
418
419			for (;;)
420			{
421				if (db == null)
422				{
423					BTreeDatabaseConfig dbConfig = new BTreeDatabaseConfig();
424					dbConfig.Env = dbenv.env;
425
426					if (dbenv.IsMaster)
427					{
428						/*
429						 * Open database allowing create only if this is a master
430						 * database.  A client database uses polling to attempt
431						 * to open the database, without creating it, until the
432						 * open succeeds.
433						 *
434						 * This polling logic for allowing create can be
435						 * simplified under some circumstances.  For example, if
436						 * the application can be sure a database is already
437						 * there, it would never need to open it allowing create.
438						 */
439							dbConfig.Creation = CreatePolicy.IF_NEEDED;
440					}
441
442					dbConfig.AutoCommit = true;
443
444					try
445					{
446						db = BTreeDatabase.Open(RepConfig.progname, dbConfig);
447					} catch (DatabaseException)
448					{
449						Console.WriteLine("no stock database available yet.");
450						if (db != null)
451						{
452							db.Close(true);
453							db = null;
454						}
455
456						Thread.Sleep(RepConfig.SLEEPTIME);
457						continue;
458					}
459				}
460
461				/* Listen for input, and add it to the database. */
462				Console.Write("QUOTESERVER");
463				if (dbenv.IsMaster == false)
464					Console.Write("(read-only)");
465				Console.Write("> ");
466				string nextLine = null;
467				try
468				{
469					nextLine = Console.ReadLine();
470				} catch(System.IO.IOException)
471				{
472					Console.WriteLine("Unable to get data");
473					break;
474				}
475
476				 /* A blank line causes the DB to be dumped. */
477				string[] words = nextLine.Split(' ');
478				if (words.Length == 0 ||
479				    words.Length == 1 && words[0].Length == 0)
480				{
481					try
482					{
483						if (dbenv.InClientSync)
484							Console.WriteLine("Cannot read data during " +
485							    "client initialization - please try again.");
486						else
487							printStocks(db);
488					} catch (DeadlockException)
489					{
490						continue;
491					} catch (DatabaseException e)
492					{
493						/*
494						 * This could be DB_REP_HANDLE_DEAD, which
495						 * should close the database and continue.
496						 */
497						Console.WriteLine("Got db exception reading replication"
498						    + "DB: " + e);
499						Console.WriteLine("Expected if it was due to a dead " +
500						    "replication handle, otherwise an unexpected error.");
501						db.Close(false);/* Close no sync. */
502						db = null;
503						continue;
504					}
505					continue;
506				}
507
508				if (words.Length == 1 &&
509				    (words[0].ToLower().Equals("quit") ||
510				    words[0].ToLower().Equals("exit")))
511				{
512					dbenv.AppFinished = true;
513					break;
514				} else if (words.Length != 2)
515				{
516					Console.WriteLine("Format: TICKER VALUE");
517					continue;
518				}
519
520				if (!dbenv.IsMaster)
521				{
522					Console.WriteLine("Can't update client");
523					continue;
524				}
525
526				DatabaseEntry key = new DatabaseEntry(
527				    ASCIIEncoding.ASCII.GetBytes(words[0]));
528				DatabaseEntry data = new DatabaseEntry(
529				    ASCIIEncoding.ASCII.GetBytes(words[1]));
530
531				db.Put(key, data);
532			}
533
534			if (db != null)
535				db.Close(true);
536
537			return 0;
538		}
539
540		public void terminate()
541		{
542			/* Wait for checkpoint and log archive threads to finish. */
543			try
544			{
545				logArchiveThread.Join();
546				checkpointThread.Join();
547			} catch (Exception)
548			{
549				Console.WriteLine("Support thread join failed.");
550			}
551
552			/*
553			 * We have used the DB_TXN_NOSYNC environment flag for improved
554			 * performance without the usual sacrifice of transactional durability,
555			 * as discussed in the "Transactional guarantees" page of the Reference
556			 * Guide: if one replication site crashes, we can expect the data to
557			 * exist at another site.  However, in case we shut down all sites
558			 * gracefully, we push out the end of the log here so that the most
559			 * recent transactions don't mysteriously disappear.
560			 */
561			dbenv.env.LogFlush();
562			dbenv.env.Close();
563		}
564
565		/*
566		 * void return type since error conditions are propogated
567		 * via exceptions.
568		 */
569		private void printStocks(Database db)
570		{
571			Cursor dbc = db.Cursor();
572			Console.WriteLine("\tSymbol\t\tPrice");
573			Console.WriteLine("\t======\t\t=====");
574			DatabaseEntry key = new DatabaseEntry();
575			DatabaseEntry data = new DatabaseEntry();
576
577			foreach (KeyValuePair<DatabaseEntry, DatabaseEntry> pair in dbc)
578			{
579				string keyStr = ASCIIEncoding.ASCII.GetString(pair.Key.Data);
580				string dataStr = ASCIIEncoding.ASCII.GetString(pair.Value.Data);
581				Console.WriteLine("\t"+keyStr+"\t\t"+dataStr);
582			}
583			dbc.Close();
584		}
585
586		/*
587		 * This is a very simple thread that performs checkpoints at a fixed
588		 * time interval.  For a master site, the time interval is one minute
589		 * plus the duration of the checkpoint_delay timeout (30 seconds by
590		 * default.)  For a client site, the time interval is one minute.
591		 */
592		public void CheckPoint()
593		{
594			for (; ; )
595			{
596				/*
597				 * Wait for one minute, polling once per second to see if
598				 * application has finished.  When application has finished,
599				 * terminate this thread.
600				 */
601				for (int i = 0; i < 60; i++)
602				{
603					Thread.Sleep(1000);
604					if (dbenv.AppFinished)
605						return;
606				}
607
608				/* Perform a checkpoint. */
609				try
610				{
611					dbenv.env.Checkpoint();
612				} catch (DatabaseException)
613				{
614					Console.WriteLine("Could not perform checkpoint.");
615				}
616			}
617		}
618
619
620		/*
621		 * This is a simple log archive thread.  Once per minute, it removes all but
622		 * the most recent 3 logs that are safe to remove according to a call to
623		 * DBENV->log_archive().
624		 *
625		 * Log cleanup is needed to conserve disk space, but aggressive log cleanup
626		 * can cause more frequent client initializations if a client lags too far
627		 * behind the current master.  This can happen in the event of a slow client,
628		 * a network partition, or a new master that has not kept as many logs as the
629		 * previous master.
630		 *
631		 * The approach in this routine balances the need to mitigate against a
632		 * lagging client by keeping a few more of the most recent unneeded logs
633		 * with the need to conserve disk space by regularly cleaning up log files.
634		 * Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE
635		 * flag) is not recommended for replication due to the risk of frequent
636		 * client initializations.
637		 */
638		public void LogArchive()
639		{
640			int logKeep = 3;
641			int minLog;
642			List<string> logFileList;
643
644			for (; ; )
645			{
646				/*
647				 * Wait for one minute, polling once per second to see if
648				 * application has finished.  When application has finished,
649				 * terminate this thread.
650				 */
651				for (int i = 0; i < 60; i++)
652				{
653					Thread.Sleep(1000);
654					if (dbenv.AppFinished)
655						return;
656				}
657
658				try
659				{
660					/* Get the list of unneeded log files. */
661					logFileList = dbenv.env.ArchivableLogFiles(true);
662					/*
663					 * Remove all but the logs_to_keep most recent unneeded
664					 * log files.
665					 */
666					minLog = logFileList.Count - logKeep;
667					for (int i = 0; i < minLog; i++)
668					{
669						FileInfo logFile = new FileInfo(logFileList[i]);
670						logFile.Delete();
671					}
672				} catch (DatabaseException)
673				{
674					Console.WriteLine("Problem deleting log archive files.");
675				}
676			}
677		}
678
679
680
681		/*
682		 * Implemention of EventHandler interface to handle the Berkeley DB events
683		 * we are interested in receiving.
684		 */
685		private void RepQuoteEventHandler(NotificationEvent eventCode, byte[] event_info)
686		{
687			switch (eventCode)
688			{
689				case NotificationEvent.REP_CLIENT:
690					dbenv.IsMaster = false;
691					dbenv.InClientSync = true;
692					break;
693				case NotificationEvent.REP_MASTER:
694					dbenv.IsMaster = true;
695					dbenv.InClientSync = false;
696					break;
697				case NotificationEvent.REP_NEWMASTER:
698					dbenv.InClientSync = true;
699					break;
700				case NotificationEvent.REP_STARTUPDONE:
701					dbenv.InClientSync = false;
702					break;
703				/*
704				 * Did not get enough acks to guarantee transaction
705				 * durability based on the configured ack policy.  This
706				 * transaction will be flushed to the master site's
707				 * local disk storage for durability.
708				 */
709				case NotificationEvent.REP_PERM_FAILED:
710					Console.WriteLine("Insufficient acknowledgements " +
711					    "to guarantee transaction durability.");
712					break;
713				default:
714					Console.WriteLine("Ignoring event: {0}", eventCode);
715					break;
716			}
717		}
718	}
719}
720