• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src-rt-6.x.4708/router/db-4.8.30/examples_stl/repquote/
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9/*
10 * In this application, we specify all communication via the command line.  In
11 * a real application, we would expect that information about the other sites
12 * in the system would be maintained in some sort of configuration file.  The
13 * critical part of this interface is that we assume at startup that we can
14 * find out
15 * 	1) what our Berkeley DB home environment is,
16 * 	2) what host/port we wish to listen on for connections; and
17 * 	3) an optional list of other sites we should attempt to connect to.
18 *
19 * These pieces of information are expressed by the following flags.
20 * -h home (required; h stands for home directory)
21 * -l host:port (required; l stands for local)
22 * -C or -M (optional; start up as client or master)
23 * -r host:port (optional; r stands for remote; any number of these may be
24 *	specified)
25 * -R host:port (optional; R stands for remote peer; only one of these may
26 *      be specified)
27 * -a all|quorum (optional; a stands for ack policy)
28 * -b (optional; b stands for bulk)
29 * -n nsites (optional; number of sites in replication group; defaults to 0
30 *	to try to dynamically compute nsites)
31 * -p priority (optional; defaults to 100)
32 * -v (optional; v stands for verbose)
33 */
34
35#include <iostream>
36#include <string>
37#include <sstream>
38
39#include <db_cxx.h>
40#include "StlRepConfigInfo.h"
41#include "dbstl_map.h"
42
43using std::cout;
44using std::cin;
45using std::cerr;
46using std::endl;
47using std::flush;
48using std::istream;
49using std::istringstream;
50using std::string;
51using std::getline;
52using namespace dbstl;
53#define	CACHESIZE	(10 * 1024 * 1024)
54#define	DATABASE	"quote.db"
55
56const char *progname = "exstl_repquote";
57
58#include <errno.h>
59#ifdef _WIN32
60#define WIN32_LEAN_AND_MEAN
61#include <windows.h>
62#define	snprintf		_snprintf
63#define	sleep(s)		Sleep(1000 * (s))
64
65extern "C" {
66extern int getopt(int, char * const *, const char *);
67extern char *optarg;
68extern int optind;
69}
70
71typedef HANDLE thread_t;
72typedef DWORD thread_exit_status_t;
73#define	thread_create(thrp, attr, func, arg)				   \
74    (((*(thrp) = CreateThread(NULL, 0,					   \
75	(LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
76#define	thread_join(thr, statusp)					   \
77    ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&		   \
78    GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
79#else /* !_WIN32 */
80#include <pthread.h>
81
82typedef pthread_t thread_t;
83typedef void* thread_exit_status_t;
84#define	thread_create(thrp, attr, func, arg)				   \
85    pthread_create((thrp), (attr), (func), (arg))
86#define	thread_join(thr, statusp) pthread_join((thr), (statusp))
87#endif
88
89// Struct used to store information in Db app_private field.
90typedef struct {
91	bool app_finished;
92	bool in_client_sync;
93	bool is_master;
94} APP_DATA;
95
96static void log(const char *);
97void *checkpoint_thread (void *);
98void *log_archive_thread (void *);
99
100class RepQuoteExample
101{
102public:
103	typedef db_map<char *, char *, ElementHolder<char *> > str_map_t;
104	RepQuoteExample();
105	void init(RepConfigInfo* config);
106	void doloop();
107	int terminate();
108
109	static void event_callback(DbEnv * dbenv, u_int32_t which, void *info);
110
111private:
112	// disable copy constructor.
113	RepQuoteExample(const RepQuoteExample &);
114	void operator = (const RepQuoteExample &);
115
116	// internal data members.
117	APP_DATA		app_data;
118	RepConfigInfo   *app_config;
119	DbEnv		   *cur_env;
120	Db *dbp;
121	str_map_t *strmap;
122	thread_t ckp_thr;
123	thread_t lga_thr;
124
125	// private methods.
126	void print_stocks();
127	void prompt();
128	bool open_db(bool creating);
129	void close_db(){
130		delete strmap;
131		strmap = NULL;
132		dbstl::close_db(dbp);
133		dbp = NULL;
134	}
135	static void close_db(Db *&);// Close an unregistered Db handle.
136};
137
138bool RepQuoteExample::open_db(bool creating)
139{
140	int ret;
141
142	if (dbp)
143		return true;
144
145	dbp = new Db(cur_env, DB_CXX_NO_EXCEPTIONS);
146
147	u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD;
148	if (creating)
149		flags |= DB_CREATE;
150
151	ret = dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0);
152	switch (ret) {
153	case 0:
154		register_db(dbp);
155		if (strmap)
156			delete strmap;
157		strmap = new str_map_t(dbp, cur_env);
158		return (true);
159	case DB_LOCK_DEADLOCK: // Fall through
160	case DB_REP_HANDLE_DEAD:
161		log("\nFailed to open stock db.");
162		break;
163	default:
164		if (ret == DB_REP_LOCKOUT)
165			break; // Fall through
166		else if (ret == ENOENT && !creating)
167			log("\nStock DB does not yet exist\n");
168		else {
169			DbException ex(ret);
170			throw ex;
171		}
172	} // switch
173
174	// (All retryable errors fall through to here.)
175	//
176	log("\nPlease retry the operation");
177	close_db(dbp);
178	return (false);
179}
180
181void RepQuoteExample::close_db(Db *&dbp)
182{
183	if (dbp) {
184		try {
185			dbp->close(0);
186			delete dbp;
187			dbp = 0;
188		} catch (...) {
189			delete dbp;
190			dbp = 0;
191			throw;
192		}
193	}
194
195}
196
197RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(NULL) {
198	app_data.app_finished = 0;
199	app_data.in_client_sync = 0;
200	app_data.is_master = 0; // assume I start out as client
201	cur_env = new DbEnv(DB_CXX_NO_EXCEPTIONS);
202	strmap = NULL;
203	dbp = NULL;
204}
205
206void RepQuoteExample::init(RepConfigInfo *config) {
207	app_config = config;
208
209	cur_env->set_app_private(&app_data);
210	cur_env->set_errfile(stderr);
211	cur_env->set_errpfx(progname);
212	cur_env->set_event_notify(event_callback);
213
214	// Configure bulk transfer to send groups of records to clients
215	// in a single network transfer.  This is useful for master sites
216	// and clients participating in client-to-client synchronization.
217	//
218	if (app_config->bulk)
219		cur_env->rep_set_config(DB_REP_CONF_BULK, 1);
220
221
222	// Set the total number of sites in the replication group.
223	// This is used by repmgr internal election processing.
224	//
225	if (app_config->totalsites > 0)
226		cur_env->rep_set_nsites(app_config->totalsites);
227
228	// Turn on debugging and informational output if requested.
229	if (app_config->verbose)
230		cur_env->set_verbose(DB_VERB_REPLICATION, 1);
231
232	// Set replication group election priority for this environment.
233	// An election first selects the site with the most recent log
234	// records as the new master.  If multiple sites have the most
235	// recent log records, the site with the highest priority value
236	// is selected as master.
237	//
238	cur_env->rep_set_priority(app_config->priority);
239
240	// Set the policy that determines how master and client sites
241	// handle acknowledgement of replication messages needed for
242	// permanent records.  The default policy of "quorum" requires only
243	// a quorum of electable peers sufficient to ensure a permanent
244	// record remains durable if an election is held.  The "all" option
245	// requires all clients to acknowledge a permanent replication
246	// message instead.
247	//
248	cur_env->repmgr_set_ack_policy(app_config->ack_policy);
249
250	// Set the threshold for the minimum and maximum time the client
251	// waits before requesting retransmission of a missing message.
252	// Base these values on the performance and load characteristics
253	// of the master and client host platforms as well as the round
254	// trip message time.
255	//
256	cur_env->rep_set_request(20000, 500000);
257
258	// Configure deadlock detection to ensure that any deadlocks
259	// are broken by having one of the conflicting lock requests
260	// rejected. DB_LOCK_DEFAULT uses the lock policy specified
261	// at environment creation time or DB_LOCK_RANDOM if none was
262	// specified.
263	//
264	cur_env->set_lk_detect(DB_LOCK_DEFAULT);
265
266	// The following base replication features may also be useful to your
267	// application. See Berkeley DB documentation for more details.
268	//   - Master leases: Provide stricter consistency for data reads
269	//     on a master site.
270	//   - Timeouts: Customize the amount of time Berkeley DB waits
271	//     for such things as an election to be concluded or a master
272	//     lease to be granted.
273	//   - Delayed client synchronization: Manage the master site's
274	//     resources by spreading out resource-intensive client
275	//     synchronizations.
276	//   - Blocked client operations: Return immediately with an error
277	//     instead of waiting indefinitely if a client operation is
278	//     blocked by an ongoing client synchronization.
279
280	cur_env->repmgr_set_local_site(app_config->this_host.host,
281	    app_config->this_host.port, 0);
282
283	for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL;
284		cur = cur->next) {
285		cur_env->repmgr_add_remote_site(cur->host, cur->port,
286		    NULL, cur->peer ? DB_REPMGR_PEER : 0);
287	}
288
289	// Configure heartbeat timeouts so that repmgr monitors the
290	// health of the TCP connection.  Master sites broadcast a heartbeat
291	// at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
292	// Client sites wait for message activity the length of the
293	// DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
294	// connection to the master is lost.  The DB_REP_HEARTBEAT_MONITOR
295	// timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
296	//
297	cur_env->rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000);
298	cur_env->rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000);
299
300	// The following repmgr features may also be useful to your
301	// application.  See Berkeley DB documentation for more details.
302	//  - Two-site strict majority rule - In a two-site replication
303	//    group, require both sites to be available to elect a new
304	//    master.
305	//  - Timeouts - Customize the amount of time repmgr waits
306	//    for such things as waiting for acknowledgements or attempting
307	//    to reconnect to other sites.
308	//  - Site list - return a list of sites currently known to repmgr.
309
310	// We can now open our environment, although we're not ready to
311	// begin replicating.  However, we want to have a dbenv around
312	// so that we can send it into any of our message handlers.
313	cur_env->set_cachesize(0, CACHESIZE, 0);
314	cur_env->set_flags(DB_TXN_NOSYNC, 1);
315
316	cur_env->open(app_config->home, DB_CREATE | DB_RECOVER |
317	    DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
318	    DB_INIT_MPOOL | DB_INIT_TXN, 0);
319
320	// Start checkpoint and log archive support threads.
321	(void)thread_create(&ckp_thr, NULL, checkpoint_thread, cur_env);
322	(void)thread_create(&lga_thr, NULL, log_archive_thread, cur_env);
323
324	dbstl::register_db_env(cur_env);
325	cur_env->repmgr_start(3, app_config->start_policy);
326}
327
328int RepQuoteExample::terminate() {
329	try {
330		// Wait for checkpoint and log archive threads to finish.
331		// Windows does not allow NULL pointer for exit code variable.
332		thread_exit_status_t exstat;
333
334		(void)thread_join(lga_thr, &exstat);
335		(void)thread_join(ckp_thr, &exstat);
336
337		// We have used the DB_TXN_NOSYNC environment flag for
338		// improved performance without the usual sacrifice of
339		// transactional durability, as discussed in the
340		// "Transactional guarantees" page of the Reference
341		// Guide: if one replication site crashes, we can
342		// expect the data to exist at another site.  However,
343		// in case we shut down all sites gracefully, we push
344		// out the end of the log here so that the most
345		// recent transactions don't mysteriously disappear.
346		cur_env->log_flush(NULL);
347	} catch (DbException dbe) {
348		cout << "\nerror closing environment: " << dbe.what() << endl;
349	}
350	return 0;
351}
352
353void RepQuoteExample::prompt() {
354	cout << "QUOTESERVER";
355	if (!app_data.is_master)
356		cout << "(read-only)";
357	cout << "> " << flush;
358}
359
360void log(const char *msg) {
361	cerr << msg << endl;
362}
363
364// Simple command-line user interface:
365//  - enter "<stock symbol> <price>" to insert or update a record in the
366//	database;
367//  - just press Return (i.e., blank input line) to print out the contents of
368//	the database;
369//  - enter "quit" or "exit" to quit.
370//
371void RepQuoteExample::doloop() {
372	string input;
373
374	while (prompt(), getline(cin, input)) {
375		istringstream is(input);
376		string token1, token2;
377
378		// Read 0, 1 or 2 tokens from the input.
379		//
380		int count = 0;
381		if (is >> token1) {
382			count++;
383			if (is >> token2)
384			count++;
385		}
386
387		if (count == 1) {
388			if (token1 == "exit" || token1 == "quit") {
389				app_data.app_finished = 1;
390				break;
391			} else {
392				log("\nFormat: <stock> <price>\n");
393				continue;
394			}
395		}
396
397		// Here we know count is either 0 or 2, so we're about to try a
398		// DB operation.
399		//
400		// Open database with DB_CREATE only if this is a master
401		// database.  A client database uses polling to attempt
402		// to open the database without DB_CREATE until it is
403		// successful.
404		//
405		// This DB_CREATE polling logic can be simplified under
406		// some circumstances.  For example, if the application can
407		// be sure a database is already there, it would never need
408		// to open it with DB_CREATE.
409		//
410		if (!open_db(app_data.is_master))
411			continue;
412
413		try {
414			if (count == 0)
415				if (app_data.in_client_sync)
416					log(
417    "Cannot read data during client initialization - please try again.");
418				else
419					print_stocks();
420			else if (!app_data.is_master)
421				log("\nCan't update at client\n");
422			else {
423				char *symbol = new char[token1.length() + 1];
424				strcpy(symbol, token1.c_str());
425				char *price = new char[token2.length() + 1];
426				strcpy(price, token2.c_str());
427				begin_txn(0, cur_env);
428				strmap->insert(make_pair(symbol, price));
429				commit_txn(cur_env);
430				delete symbol;
431				delete price;
432			}
433		} catch (DbDeadlockException e) {
434			log("\nplease retry the operation\n");
435			close_db();
436		} catch (DbRepHandleDeadException e) {
437			log("\nplease retry the operation\n");
438			close_db();
439		} catch (DbException e) {
440			if (e.get_errno() == DB_REP_LOCKOUT) {
441			log("\nplease retry the operation\n");
442			close_db();
443			} else
444			throw;
445		}
446
447	}
448
449	close_db();
450}
451
452void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
453{
454	APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
455
456	info = NULL;		/* Currently unused. */
457
458	switch (which) {
459	case DB_EVENT_REP_MASTER:
460		app->in_client_sync = 0;
461		app->is_master = 1;
462		break;
463
464	case DB_EVENT_REP_CLIENT:
465		app->is_master = 0;
466		app->in_client_sync = 1;
467		break;
468
469	case DB_EVENT_REP_STARTUPDONE:
470		app->in_client_sync = 0;
471		break;
472	case DB_EVENT_REP_NEWMASTER:
473		app->in_client_sync = 1;
474		break;
475
476	case DB_EVENT_REP_PERM_FAILED:
477		// Did not get enough acks to guarantee transaction
478		// durability based on the configured ack policy.  This
479		// transaction will be flushed to the master site's
480		// local disk storage for durability.
481		//
482		log(
483    "Insufficient acknowledgements to guarantee transaction durability.");
484		break;
485
486	default:
487		dbenv->errx("\nignoring event %d", which);
488	}
489}
490
491void RepQuoteExample::print_stocks() {
492#define	MAXKEYSIZE	10
493#define	MAXDATASIZE	20
494
495	cout << "\tSymbol\tPrice" << endl
496		<< "\t======\t=====" << endl;
497	str_map_t::iterator itr;
498	if (strmap == NULL)
499		strmap = new str_map_t(dbp, cur_env);
500	begin_txn(0, cur_env);
501	for (itr = strmap->begin(); itr != strmap->end(); ++itr)
502		cout<<"\t"<<itr->first<<"\t"<<itr->second<<endl;
503	commit_txn(cur_env);
504	cout << endl << flush;
505}
506
507static void usage() {
508	cerr << "usage: " << progname << endl
509	    << "[-h home][-o host:port][-m host:port][-f host:port]"
510		<< "[-n nsites][-p priority]" << endl;
511
512	cerr << "\t -h home (required; h stands for home directory)" << endl
513	    << "\t -l host:port (required; l stands for local)" << endl
514	    << "\t -C or -M (optional; start up as client or master)" << endl
515	    << "\t -r host:port (optional; r stands for remote; any "
516	    << "number of these" << endl
517	    << "\t               may be specified)" << endl
518	    << "\t -R host:port (optional; R stands for remote peer; only "
519	    << "one of" << endl
520	    << "\t               these may be specified)" << endl
521	    << "\t -a all|quorum (optional; a stands for ack policy)" << endl
522	    << "\t -b (optional; b stands for bulk)" << endl
523	    << "\t -n nsites (optional; number of sites in replication "
524	    << "group; defaults " << endl
525	    << "\t	    to 0 to try to dynamically compute nsites)" << endl
526	    << "\t -p priority (optional; defaults to 100)" << endl
527	    << "\t -v (optional; v stands for verbose)" << endl;
528
529	exit(EXIT_FAILURE);
530}
531
532int main(int argc, char **argv) {
533	RepConfigInfo config;
534	char ch, *portstr, *tmphost;
535	int tmpport;
536	bool tmppeer;
537
538	// Extract the command line parameters
539	while ((ch = getopt(argc, argv, "a:bCh:l:Mn:p:R:r:v")) != EOF) {
540		tmppeer = false;
541		switch (ch) {
542		case 'a':
543			if (strncmp(optarg, "all", 3) == 0)
544				config.ack_policy = DB_REPMGR_ACKS_ALL;
545			else if (strncmp(optarg, "quorum", 6) != 0)
546				usage();
547			break;
548		case 'b':
549			config.bulk = true;
550			break;
551		case 'C':
552			config.start_policy = DB_REP_CLIENT;
553			break;
554		case 'h':
555			config.home = optarg;
556			break;
557		case 'l':
558			config.this_host.host = strtok(optarg, ":");
559			if ((portstr = strtok(NULL, ":")) == NULL) {
560				cerr << "\nBad host specification." << endl;
561				usage();
562			}
563			config.this_host.port = (unsigned short)atoi(portstr);
564			config.got_listen_address = true;
565			break;
566		case 'M':
567			config.start_policy = DB_REP_MASTER;
568			break;
569		case 'n':
570			config.totalsites = atoi(optarg);
571			break;
572		case 'p':
573			config.priority = atoi(optarg);
574			break;
575		case 'R':
576			tmppeer = true; // FALLTHROUGH
577		case 'r':
578			tmphost = strtok(optarg, ":");
579			if ((portstr = strtok(NULL, ":")) == NULL) {
580				cerr << "Bad host specification." << endl;
581				usage();
582			}
583			tmpport = (unsigned short)atoi(portstr);
584
585			config.addOtherHost(tmphost, tmpport, tmppeer);
586
587			break;
588		case 'v':
589			config.verbose = true;
590			break;
591		case '?':
592		default:
593			usage();
594		}
595	}
596
597	// Error check command line.
598	if ((!config.got_listen_address) || config.home == NULL)
599		usage();
600
601	RepQuoteExample runner;
602	try {
603		runner.init(&config);
604		runner.doloop();
605	} catch (DbException dbe) {
606		cerr << "\nCaught an exception during initialization or"
607			<< " processing: " << dbe.what() << endl;
608	}
609	runner.terminate();
610	return 0;
611}
612
613// This is a very simple thread that performs checkpoints at a fixed
614// time interval.  For a master site, the time interval is one minute
615// plus the duration of the checkpoint_delay timeout (30 seconds by
616// default.)  For a client site, the time interval is one minute.
617//
618void *checkpoint_thread(void *args)
619{
620	DbEnv *env;
621	APP_DATA *app;
622	int i, ret;
623
624	env = (DbEnv *)args;
625	app = (APP_DATA *)env->get_app_private();
626
627	for (;;) {
628		// Wait for one minute, polling once per second to see if
629		// application has finished.  When application has finished,
630		// terminate this thread.
631		//
632		for (i = 0; i < 60; i++) {
633			sleep(1);
634			if (app->app_finished == 1)
635				return ((void *)EXIT_SUCCESS);
636		}
637
638		// Perform a checkpoint.
639		if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) {
640			env->err(ret, "Could not perform checkpoint.\n");
641			return ((void *)EXIT_FAILURE);
642		}
643	}
644}
645
646// This is a simple log archive thread.  Once per minute, it removes all but
647// the most recent 3 logs that are safe to remove according to a call to
648// DBENV->log_archive().
649//
650// Log cleanup is needed to conserve disk space, but aggressive log cleanup
651// can cause more frequent client initializations if a client lags too far
652// behind the current master.  This can happen in the event of a slow client,
653// a network partition, or a new master that has not kept as many logs as the
654// previous master.
655//
656// The approach in this routine balances the need to mitigate against a
657// lagging client by keeping a few more of the most recent unneeded logs
658// with the need to conserve disk space by regularly cleaning up log files.
659// Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE
660// flag) is not recommended for replication due to the risk of frequent
661// client initializations.
662//
663void *log_archive_thread(void *args)
664{
665	DbEnv *env;
666	APP_DATA *app;
667	char **begin, **list;
668	int i, listlen, logs_to_keep, minlog, ret;
669
670	env = (DbEnv *)args;
671	app = (APP_DATA *)env->get_app_private();
672	logs_to_keep = 3;
673
674	for (;;) {
675		// Wait for one minute, polling once per second to see if
676		// application has finished.  When application has finished,
677		// terminate this thread.
678		//
679		for (i = 0; i < 60; i++) {
680			sleep(1);
681			if (app->app_finished == 1)
682				return ((void *)EXIT_SUCCESS);
683		}
684
685		// Get the list of unneeded log files.
686		if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) {
687			env->err(ret, "Could not get log archive list.");
688			return ((void *)EXIT_FAILURE);
689		}
690		if (list != NULL) {
691			listlen = 0;
692			// Get the number of logs in the list.
693			for (begin = list; *begin != NULL; begin++, listlen++);
694			// Remove all but the logs_to_keep most recent
695			// unneeded log files.
696			//
697			minlog = listlen - logs_to_keep;
698			for (begin = list, i= 0; i < minlog; list++, i++) {
699				if ((ret = unlink(*list)) != 0) {
700					env->err(ret,
701					    "logclean: remove %s", *list);
702					env->errx(
703					    "logclean: Error remove %s", *list);
704					free(begin);
705					return ((void *)EXIT_FAILURE);
706				}
707			}
708			free(begin);
709		}
710	}
711}
712
713