1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001,2008 Oracle.  All rights reserved.
5 *
6 * $Id: RepQuoteExample.cpp,v 1.21 2008/04/28 02:59:56 alexg Exp $
7 */
8
9/*
10 * In this application, we specify all communication via the command line.  In
11 * a real application, we would expect that information about the other sites
12 * in the system would be maintained in some sort of configuration file.  The
13 * critical part of this interface is that we assume at startup that we can
14 * find out
15 * 	1) what host/port we wish to listen on for connections,
16 * 	2) a (possibly empty) list of other sites we should attempt to connect
17 * 	to; and
18 * 	3) what our Berkeley DB home environment is.
19 *
20 * These pieces of information are expressed by the following flags.
21 * -m host:port (required; m stands for me)
22 * -o host:port (optional; o stands for other; any number of these may be
23 *	specified)
24 * -h home directory
25 * -n nsites (optional; number of sites in replication group; defaults to 0
26 *	in which case we try to dynamically compute the number of sites in
27 *	the replication group)
28 * -p priority (optional: defaults to 100)
29 */
30
31#include <iostream>
32#include <string>
33#include <sstream>
34
35#include <db_cxx.h>
36#include "RepConfigInfo.h"
37#include "dbc_auto.h"
38
39using std::cout;
40using std::cin;
41using std::cerr;
42using std::endl;
43using std::flush;
44using std::istream;
45using std::istringstream;
46using std::string;
47using std::getline;
48
49#define	CACHESIZE	(10 * 1024 * 1024)
50#define	DATABASE	"quote.db"
51
52const char *progname = "excxx_repquote";
53
54#include <errno.h>
55#ifdef _WIN32
56#define WIN32_LEAN_AND_MEAN
57#include <windows.h>
58
59extern "C" {
60  extern int getopt(int, char * const *, const char *);
61  extern char *optarg;
62}
63#endif
64
65// Struct used to store information in Db app_private field.
66typedef struct {
67	bool is_master;
68} APP_DATA;
69
70static void log(char *);
71
72class RepQuoteExample {
73public:
74	RepQuoteExample();
75	void init(RepConfigInfo* config);
76	void doloop();
77	int terminate();
78
79	static void event_callback(DbEnv * dbenv, u_int32_t which, void *info);
80
81private:
82	// disable copy constructor.
83	RepQuoteExample(const RepQuoteExample &);
84	void operator = (const RepQuoteExample &);
85
86	// internal data members.
87	APP_DATA		app_data;
88	RepConfigInfo   *app_config;
89	DbEnv		   cur_env;
90
91	// private methods.
92	void print_stocks(Db *dbp);
93	void prompt();
94};
95
96class DbHolder {
97public:
98	DbHolder(DbEnv *env) : env(env) {
99	dbp = 0;
100	}
101
102	~DbHolder() {
103	try {
104		close();
105	} catch (...) {
106		// Ignore: this may mean another exception is pending
107	}
108	}
109
110	bool ensure_open(bool creating) {
111	if (dbp)
112		return (true);
113	dbp = new Db(env, 0);
114	dbp->set_pagesize(512);
115
116	u_int32_t flags = DB_AUTO_COMMIT;
117	if (creating)
118		flags |= DB_CREATE;
119	try {
120		dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0);
121		return (true);
122	} catch (DbDeadlockException e) {
123	} catch (DbRepHandleDeadException e) {
124	} catch (DbException e) {
125		if (e.get_errno() == DB_REP_LOCKOUT) {
126		// Just fall through.
127		} else if (e.get_errno() == ENOENT && !creating) {
128		// Provide a bit of extra explanation.
129		//
130		log("Stock DB does not yet exist");
131		} else
132		throw;
133	}
134
135	// (All retryable errors fall through to here.)
136	//
137	log("please retry the operation");
138	close();
139	return (false);
140	}
141
142	void close() {
143	if (dbp) {
144		try {
145		dbp->close(0);
146		delete dbp;
147		dbp = 0;
148		} catch (...) {
149		delete dbp;
150		dbp = 0;
151		throw;
152		}
153	}
154	}
155
156	operator Db *() {
157	return dbp;
158	}
159
160	Db *operator->() {
161	return dbp;
162	}
163
164private:
165	Db *dbp;
166	DbEnv *env;
167};
168
169class StringDbt : public Dbt {
170public:
171#define GET_STRING_OK 0
172#define GET_STRING_INVALID_PARAM 1
173#define GET_STRING_SMALL_BUFFER 2
174#define GET_STRING_EMPTY_DATA 3
175	int get_string(char **buf, size_t buf_len)
176	{
177		size_t copy_len;
178		int ret = GET_STRING_OK;
179		if (buf == NULL) {
180			cerr << "Invalid input buffer to get_string" << endl;
181			return GET_STRING_INVALID_PARAM;
182		}
183
184		// make sure the string is null terminated.
185		memset(*buf, 0, buf_len);
186
187		// if there is no string, just return.
188		if (get_data() == NULL || get_size() == 0)
189			return GET_STRING_OK;
190
191		if (get_size() >= buf_len) {
192			ret = GET_STRING_SMALL_BUFFER;
193			copy_len = buf_len - 1; // save room for a terminator.
194		} else
195			copy_len = get_size();
196		memcpy(*buf, get_data(), copy_len);
197
198		return ret;
199	}
200	size_t get_string_length()
201	{
202		if (get_size() == 0)
203			return 0;
204		return strlen((char *)get_data());
205	}
206	void set_string(char *string)
207	{
208		set_data(string);
209		set_size((u_int32_t)strlen(string));
210	}
211
212	StringDbt(char *string) :
213	    Dbt(string, (u_int32_t)strlen(string)) {};
214	StringDbt() : Dbt() {};
215	~StringDbt() {};
216
217	// Don't add extra data to this sub-class since we want it to remain
218	// compatible with Dbt objects created internally by Berkeley DB.
219};
220
221RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(0) {
222	app_data.is_master = 0; // assume I start out as client
223}
224
225void RepQuoteExample::init(RepConfigInfo *config) {
226	app_config = config;
227
228	cur_env.set_app_private(&app_data);
229	cur_env.set_errfile(stderr);
230	cur_env.set_errpfx(progname);
231	cur_env.set_event_notify(event_callback);
232	cur_env.repmgr_set_ack_policy(DB_REPMGR_ACKS_ALL);
233
234	cur_env.repmgr_set_local_site(app_config->this_host.host,
235	    app_config->this_host.port, 0);
236
237	for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL;
238		cur = cur->next) {
239		cur_env.repmgr_add_remote_site(cur->host, cur->port,
240		    NULL, cur->peer ? DB_REPMGR_PEER : 0);
241	}
242
243	if (app_config->totalsites > 0)
244		cur_env.rep_set_nsites(app_config->totalsites);
245
246	cur_env.rep_set_priority(app_config->priority);
247
248	/*
249	 * We can now open our environment, although we're not ready to
250	 * begin replicating.  However, we want to have a dbenv around
251	 * so that we can send it into any of our message handlers.
252	 */
253	cur_env.set_cachesize(0, CACHESIZE, 0);
254	cur_env.set_flags(DB_TXN_NOSYNC, 1);
255
256	cur_env.open(app_config->home, DB_CREATE | DB_RECOVER |
257	    DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
258	    DB_INIT_MPOOL | DB_INIT_TXN, 0);
259
260	if (app_config->verbose)
261		cur_env.set_verbose(DB_VERB_REPLICATION, 1);
262
263	cur_env.repmgr_start(3, app_config->start_policy);
264}
265
266int RepQuoteExample::terminate() {
267	try {
268		/*
269		 * We have used the DB_TXN_NOSYNC environment flag for
270		 * improved performance without the usual sacrifice of
271		 * transactional durability, as discussed in the
272		 * "Transactional guarantees" page of the Reference
273		 * Guide: if one replication site crashes, we can
274		 * expect the data to exist at another site.  However,
275		 * in case we shut down all sites gracefully, we push
276		 * out the end of the log here so that the most
277		 * recent transactions don't mysteriously disappear.
278				 */
279		cur_env.log_flush(NULL);
280
281		cur_env.close(0);
282	} catch (DbException dbe) {
283		cout << "error closing environment: " << dbe.what() << endl;
284	}
285	return 0;
286}
287
288void RepQuoteExample::prompt() {
289	cout << "QUOTESERVER";
290	if (!app_data.is_master)
291		cout << "(read-only)";
292	cout << "> " << flush;
293}
294
295void log(char *msg) {
296	cerr << msg << endl;
297}
298
299// Simple command-line user interface:
300//  - enter "<stock symbol> <price>" to insert or update a record in the
301//	database;
302//  - just press Return (i.e., blank input line) to print out the contents of
303//	the database;
304//  - enter "quit" or "exit" to quit.
305//
306void RepQuoteExample::doloop() {
307	DbHolder dbh(&cur_env);
308
309	string input;
310	while (prompt(), getline(cin, input)) {
311		istringstream is(input);
312		string token1, token2;
313
314		// Read 0, 1 or 2 tokens from the input.
315		//
316		int count = 0;
317		if (is >> token1) {
318			count++;
319			if (is >> token2)
320			count++;
321		}
322
323		if (count == 1) {
324			if (token1 == "exit" || token1 == "quit")
325			break;
326			else {
327			log("Format: <stock> <price>");
328			continue;
329			}
330		}
331
332		// Here we know count is either 0 or 2, so we're about to try a
333		// DB operation.
334		//
335		if (!dbh.ensure_open(app_data.is_master))
336			continue;
337
338		try {
339			if (count == 0)
340				print_stocks(dbh);
341			else if (!app_data.is_master)
342				log("Can't update at client");
343			else {
344				const char *symbol = token1.c_str();
345				StringDbt key(const_cast<char*>(symbol));
346
347				const char *price = token2.c_str();
348				StringDbt data(const_cast<char*>(price));
349
350				dbh->put(NULL, &key, &data, 0);
351			}
352		} catch (DbDeadlockException e) {
353			log("please retry the operation");
354			dbh.close();
355		} catch (DbRepHandleDeadException e) {
356			log("please retry the operation");
357			dbh.close();
358		} catch (DbException e) {
359			if (e.get_errno() == DB_REP_LOCKOUT) {
360			log("please retry the operation");
361			dbh.close();
362			} else
363			throw;
364		}
365	}
366
367	dbh.close();
368}
369
370void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
371{
372	APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
373
374	info = NULL;		/* Currently unused. */
375
376	switch (which) {
377	case DB_EVENT_REP_MASTER:
378		app->is_master = 1;
379		break;
380
381	case DB_EVENT_REP_CLIENT:
382		app->is_master = 0;
383		break;
384
385	case DB_EVENT_REP_STARTUPDONE: /* FALLTHROUGH */
386	case DB_EVENT_REP_NEWMASTER:
387		case DB_EVENT_REP_PERM_FAILED:
388		// I don't care about this one, for now.
389		break;
390
391	default:
392		dbenv->errx("ignoring event %d", which);
393	}
394}
395
396void RepQuoteExample::print_stocks(Db *dbp) {
397	StringDbt key, data;
398#define	MAXKEYSIZE	10
399#define	MAXDATASIZE	20
400	char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
401	char *kbuf, *dbuf;
402
403	memset(&key, 0, sizeof(key));
404	memset(&data, 0, sizeof(data));
405	kbuf = keybuf;
406	dbuf = databuf;
407
408	DbcAuto dbc(dbp, 0, 0);
409	cout << "\tSymbol\tPrice" << endl
410		<< "\t======\t=====" << endl;
411
412	for (int ret = dbc->get(&key, &data, DB_FIRST);
413		ret == 0;
414		ret = dbc->get(&key, &data, DB_NEXT)) {
415		key.get_string(&kbuf, MAXKEYSIZE);
416		data.get_string(&dbuf, MAXDATASIZE);
417
418		cout << "\t" << keybuf << "\t" << databuf << endl;
419	}
420	cout << endl << flush;
421	dbc.close();
422}
423
424static void usage() {
425	cerr << "usage: " << progname << endl
426	    << "[-h home][-o host:port][-m host:port][-f host:port]"
427		<< "[-n nsites][-p priority]" << endl;
428
429	cerr << "\t -m host:port (required; m stands for me)" << endl
430	    << "\t -o host:port (optional; o stands for other; any "
431	    << "number of these may be specified)" << endl
432	    << "\t -h home directory" << endl
433	    << "\t -n nsites (optional; number of sites in replication "
434	    << "group; defaults to 0" << endl
435	    << "\t	in which case we try to dynamically compute the "
436	    << "number of sites in" << endl
437	    << "\t	the replication group)" << endl
438	    << "\t -p priority (optional: defaults to 100)" << endl;
439
440	exit(EXIT_FAILURE);
441}
442
443int main(int argc, char **argv) {
444	RepConfigInfo config;
445	char ch, *portstr, *tmphost;
446	int tmpport;
447	bool tmppeer;
448
449	// Extract the command line parameters
450	while ((ch = getopt(argc, argv, "Cf:h:Mm:n:o:p:v")) != EOF) {
451		tmppeer = false;
452		switch (ch) {
453		case 'C':
454			config.start_policy = DB_REP_CLIENT;
455			break;
456		case 'h':
457			config.home = optarg;
458			break;
459		case 'M':
460			config.start_policy = DB_REP_MASTER;
461			break;
462		case 'm':
463			config.this_host.host = strtok(optarg, ":");
464			if ((portstr = strtok(NULL, ":")) == NULL) {
465				cerr << "Bad host specification." << endl;
466				usage();
467			}
468			config.this_host.port = (unsigned short)atoi(portstr);
469			config.got_listen_address = true;
470			break;
471		case 'n':
472			config.totalsites = atoi(optarg);
473			break;
474		case 'f':
475			tmppeer = true; // FALLTHROUGH
476		case 'o':
477			tmphost = strtok(optarg, ":");
478			if ((portstr = strtok(NULL, ":")) == NULL) {
479				cerr << "Bad host specification." << endl;
480				usage();
481			}
482			tmpport = (unsigned short)atoi(portstr);
483
484			config.addOtherHost(tmphost, tmpport, tmppeer);
485
486			break;
487		case 'p':
488			config.priority = atoi(optarg);
489			break;
490		case 'v':
491			config.verbose = true;
492			break;
493		case '?':
494		default:
495			usage();
496		}
497	}
498
499	// Error check command line.
500	if ((!config.got_listen_address) || config.home == NULL)
501		usage();
502
503	RepQuoteExample runner;
504	try {
505		runner.init(&config);
506		runner.doloop();
507	} catch (DbException dbe) {
508		cerr << "Caught an exception during initialization or"
509			<< " processing: " << dbe.what() << endl;
510	}
511	runner.terminate();
512	return 0;
513}
514