1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001,2008 Oracle. All rights reserved. 5 * 6 * $Id: RepQuoteExample.cpp,v 1.21 2008/04/28 02:59:56 alexg Exp $ 7 */ 8 9/* 10 * In this application, we specify all communication via the command line. In 11 * a real application, we would expect that information about the other sites 12 * in the system would be maintained in some sort of configuration file. The 13 * critical part of this interface is that we assume at startup that we can 14 * find out 15 * 1) what host/port we wish to listen on for connections, 16 * 2) a (possibly empty) list of other sites we should attempt to connect 17 * to; and 18 * 3) what our Berkeley DB home environment is. 19 * 20 * These pieces of information are expressed by the following flags. 21 * -m host:port (required; m stands for me) 22 * -o host:port (optional; o stands for other; any number of these may be 23 * specified) 24 * -h home directory 25 * -n nsites (optional; number of sites in replication group; defaults to 0 26 * in which case we try to dynamically compute the number of sites in 27 * the replication group) 28 * -p priority (optional: defaults to 100) 29 */ 30 31#include <iostream> 32#include <string> 33#include <sstream> 34 35#include <db_cxx.h> 36#include "RepConfigInfo.h" 37#include "dbc_auto.h" 38 39using std::cout; 40using std::cin; 41using std::cerr; 42using std::endl; 43using std::flush; 44using std::istream; 45using std::istringstream; 46using std::string; 47using std::getline; 48 49#define CACHESIZE (10 * 1024 * 1024) 50#define DATABASE "quote.db" 51 52const char *progname = "excxx_repquote"; 53 54#include <errno.h> 55#ifdef _WIN32 56#define WIN32_LEAN_AND_MEAN 57#include <windows.h> 58 59extern "C" { 60 extern int getopt(int, char * const *, const char *); 61 extern char *optarg; 62} 63#endif 64 65// Struct used to store information in Db app_private field. 66typedef struct { 67 bool is_master; 68} APP_DATA; 69 70static void log(char *); 71 72class RepQuoteExample { 73public: 74 RepQuoteExample(); 75 void init(RepConfigInfo* config); 76 void doloop(); 77 int terminate(); 78 79 static void event_callback(DbEnv * dbenv, u_int32_t which, void *info); 80 81private: 82 // disable copy constructor. 83 RepQuoteExample(const RepQuoteExample &); 84 void operator = (const RepQuoteExample &); 85 86 // internal data members. 87 APP_DATA app_data; 88 RepConfigInfo *app_config; 89 DbEnv cur_env; 90 91 // private methods. 92 void print_stocks(Db *dbp); 93 void prompt(); 94}; 95 96class DbHolder { 97public: 98 DbHolder(DbEnv *env) : env(env) { 99 dbp = 0; 100 } 101 102 ~DbHolder() { 103 try { 104 close(); 105 } catch (...) { 106 // Ignore: this may mean another exception is pending 107 } 108 } 109 110 bool ensure_open(bool creating) { 111 if (dbp) 112 return (true); 113 dbp = new Db(env, 0); 114 dbp->set_pagesize(512); 115 116 u_int32_t flags = DB_AUTO_COMMIT; 117 if (creating) 118 flags |= DB_CREATE; 119 try { 120 dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0); 121 return (true); 122 } catch (DbDeadlockException e) { 123 } catch (DbRepHandleDeadException e) { 124 } catch (DbException e) { 125 if (e.get_errno() == DB_REP_LOCKOUT) { 126 // Just fall through. 127 } else if (e.get_errno() == ENOENT && !creating) { 128 // Provide a bit of extra explanation. 129 // 130 log("Stock DB does not yet exist"); 131 } else 132 throw; 133 } 134 135 // (All retryable errors fall through to here.) 136 // 137 log("please retry the operation"); 138 close(); 139 return (false); 140 } 141 142 void close() { 143 if (dbp) { 144 try { 145 dbp->close(0); 146 delete dbp; 147 dbp = 0; 148 } catch (...) { 149 delete dbp; 150 dbp = 0; 151 throw; 152 } 153 } 154 } 155 156 operator Db *() { 157 return dbp; 158 } 159 160 Db *operator->() { 161 return dbp; 162 } 163 164private: 165 Db *dbp; 166 DbEnv *env; 167}; 168 169class StringDbt : public Dbt { 170public: 171#define GET_STRING_OK 0 172#define GET_STRING_INVALID_PARAM 1 173#define GET_STRING_SMALL_BUFFER 2 174#define GET_STRING_EMPTY_DATA 3 175 int get_string(char **buf, size_t buf_len) 176 { 177 size_t copy_len; 178 int ret = GET_STRING_OK; 179 if (buf == NULL) { 180 cerr << "Invalid input buffer to get_string" << endl; 181 return GET_STRING_INVALID_PARAM; 182 } 183 184 // make sure the string is null terminated. 185 memset(*buf, 0, buf_len); 186 187 // if there is no string, just return. 188 if (get_data() == NULL || get_size() == 0) 189 return GET_STRING_OK; 190 191 if (get_size() >= buf_len) { 192 ret = GET_STRING_SMALL_BUFFER; 193 copy_len = buf_len - 1; // save room for a terminator. 194 } else 195 copy_len = get_size(); 196 memcpy(*buf, get_data(), copy_len); 197 198 return ret; 199 } 200 size_t get_string_length() 201 { 202 if (get_size() == 0) 203 return 0; 204 return strlen((char *)get_data()); 205 } 206 void set_string(char *string) 207 { 208 set_data(string); 209 set_size((u_int32_t)strlen(string)); 210 } 211 212 StringDbt(char *string) : 213 Dbt(string, (u_int32_t)strlen(string)) {}; 214 StringDbt() : Dbt() {}; 215 ~StringDbt() {}; 216 217 // Don't add extra data to this sub-class since we want it to remain 218 // compatible with Dbt objects created internally by Berkeley DB. 219}; 220 221RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(0) { 222 app_data.is_master = 0; // assume I start out as client 223} 224 225void RepQuoteExample::init(RepConfigInfo *config) { 226 app_config = config; 227 228 cur_env.set_app_private(&app_data); 229 cur_env.set_errfile(stderr); 230 cur_env.set_errpfx(progname); 231 cur_env.set_event_notify(event_callback); 232 cur_env.repmgr_set_ack_policy(DB_REPMGR_ACKS_ALL); 233 234 cur_env.repmgr_set_local_site(app_config->this_host.host, 235 app_config->this_host.port, 0); 236 237 for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL; 238 cur = cur->next) { 239 cur_env.repmgr_add_remote_site(cur->host, cur->port, 240 NULL, cur->peer ? DB_REPMGR_PEER : 0); 241 } 242 243 if (app_config->totalsites > 0) 244 cur_env.rep_set_nsites(app_config->totalsites); 245 246 cur_env.rep_set_priority(app_config->priority); 247 248 /* 249 * We can now open our environment, although we're not ready to 250 * begin replicating. However, we want to have a dbenv around 251 * so that we can send it into any of our message handlers. 252 */ 253 cur_env.set_cachesize(0, CACHESIZE, 0); 254 cur_env.set_flags(DB_TXN_NOSYNC, 1); 255 256 cur_env.open(app_config->home, DB_CREATE | DB_RECOVER | 257 DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG | 258 DB_INIT_MPOOL | DB_INIT_TXN, 0); 259 260 if (app_config->verbose) 261 cur_env.set_verbose(DB_VERB_REPLICATION, 1); 262 263 cur_env.repmgr_start(3, app_config->start_policy); 264} 265 266int RepQuoteExample::terminate() { 267 try { 268 /* 269 * We have used the DB_TXN_NOSYNC environment flag for 270 * improved performance without the usual sacrifice of 271 * transactional durability, as discussed in the 272 * "Transactional guarantees" page of the Reference 273 * Guide: if one replication site crashes, we can 274 * expect the data to exist at another site. However, 275 * in case we shut down all sites gracefully, we push 276 * out the end of the log here so that the most 277 * recent transactions don't mysteriously disappear. 278 */ 279 cur_env.log_flush(NULL); 280 281 cur_env.close(0); 282 } catch (DbException dbe) { 283 cout << "error closing environment: " << dbe.what() << endl; 284 } 285 return 0; 286} 287 288void RepQuoteExample::prompt() { 289 cout << "QUOTESERVER"; 290 if (!app_data.is_master) 291 cout << "(read-only)"; 292 cout << "> " << flush; 293} 294 295void log(char *msg) { 296 cerr << msg << endl; 297} 298 299// Simple command-line user interface: 300// - enter "<stock symbol> <price>" to insert or update a record in the 301// database; 302// - just press Return (i.e., blank input line) to print out the contents of 303// the database; 304// - enter "quit" or "exit" to quit. 305// 306void RepQuoteExample::doloop() { 307 DbHolder dbh(&cur_env); 308 309 string input; 310 while (prompt(), getline(cin, input)) { 311 istringstream is(input); 312 string token1, token2; 313 314 // Read 0, 1 or 2 tokens from the input. 315 // 316 int count = 0; 317 if (is >> token1) { 318 count++; 319 if (is >> token2) 320 count++; 321 } 322 323 if (count == 1) { 324 if (token1 == "exit" || token1 == "quit") 325 break; 326 else { 327 log("Format: <stock> <price>"); 328 continue; 329 } 330 } 331 332 // Here we know count is either 0 or 2, so we're about to try a 333 // DB operation. 334 // 335 if (!dbh.ensure_open(app_data.is_master)) 336 continue; 337 338 try { 339 if (count == 0) 340 print_stocks(dbh); 341 else if (!app_data.is_master) 342 log("Can't update at client"); 343 else { 344 const char *symbol = token1.c_str(); 345 StringDbt key(const_cast<char*>(symbol)); 346 347 const char *price = token2.c_str(); 348 StringDbt data(const_cast<char*>(price)); 349 350 dbh->put(NULL, &key, &data, 0); 351 } 352 } catch (DbDeadlockException e) { 353 log("please retry the operation"); 354 dbh.close(); 355 } catch (DbRepHandleDeadException e) { 356 log("please retry the operation"); 357 dbh.close(); 358 } catch (DbException e) { 359 if (e.get_errno() == DB_REP_LOCKOUT) { 360 log("please retry the operation"); 361 dbh.close(); 362 } else 363 throw; 364 } 365 } 366 367 dbh.close(); 368} 369 370void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info) 371{ 372 APP_DATA *app = (APP_DATA*)dbenv->get_app_private(); 373 374 info = NULL; /* Currently unused. */ 375 376 switch (which) { 377 case DB_EVENT_REP_MASTER: 378 app->is_master = 1; 379 break; 380 381 case DB_EVENT_REP_CLIENT: 382 app->is_master = 0; 383 break; 384 385 case DB_EVENT_REP_STARTUPDONE: /* FALLTHROUGH */ 386 case DB_EVENT_REP_NEWMASTER: 387 case DB_EVENT_REP_PERM_FAILED: 388 // I don't care about this one, for now. 389 break; 390 391 default: 392 dbenv->errx("ignoring event %d", which); 393 } 394} 395 396void RepQuoteExample::print_stocks(Db *dbp) { 397 StringDbt key, data; 398#define MAXKEYSIZE 10 399#define MAXDATASIZE 20 400 char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1]; 401 char *kbuf, *dbuf; 402 403 memset(&key, 0, sizeof(key)); 404 memset(&data, 0, sizeof(data)); 405 kbuf = keybuf; 406 dbuf = databuf; 407 408 DbcAuto dbc(dbp, 0, 0); 409 cout << "\tSymbol\tPrice" << endl 410 << "\t======\t=====" << endl; 411 412 for (int ret = dbc->get(&key, &data, DB_FIRST); 413 ret == 0; 414 ret = dbc->get(&key, &data, DB_NEXT)) { 415 key.get_string(&kbuf, MAXKEYSIZE); 416 data.get_string(&dbuf, MAXDATASIZE); 417 418 cout << "\t" << keybuf << "\t" << databuf << endl; 419 } 420 cout << endl << flush; 421 dbc.close(); 422} 423 424static void usage() { 425 cerr << "usage: " << progname << endl 426 << "[-h home][-o host:port][-m host:port][-f host:port]" 427 << "[-n nsites][-p priority]" << endl; 428 429 cerr << "\t -m host:port (required; m stands for me)" << endl 430 << "\t -o host:port (optional; o stands for other; any " 431 << "number of these may be specified)" << endl 432 << "\t -h home directory" << endl 433 << "\t -n nsites (optional; number of sites in replication " 434 << "group; defaults to 0" << endl 435 << "\t in which case we try to dynamically compute the " 436 << "number of sites in" << endl 437 << "\t the replication group)" << endl 438 << "\t -p priority (optional: defaults to 100)" << endl; 439 440 exit(EXIT_FAILURE); 441} 442 443int main(int argc, char **argv) { 444 RepConfigInfo config; 445 char ch, *portstr, *tmphost; 446 int tmpport; 447 bool tmppeer; 448 449 // Extract the command line parameters 450 while ((ch = getopt(argc, argv, "Cf:h:Mm:n:o:p:v")) != EOF) { 451 tmppeer = false; 452 switch (ch) { 453 case 'C': 454 config.start_policy = DB_REP_CLIENT; 455 break; 456 case 'h': 457 config.home = optarg; 458 break; 459 case 'M': 460 config.start_policy = DB_REP_MASTER; 461 break; 462 case 'm': 463 config.this_host.host = strtok(optarg, ":"); 464 if ((portstr = strtok(NULL, ":")) == NULL) { 465 cerr << "Bad host specification." << endl; 466 usage(); 467 } 468 config.this_host.port = (unsigned short)atoi(portstr); 469 config.got_listen_address = true; 470 break; 471 case 'n': 472 config.totalsites = atoi(optarg); 473 break; 474 case 'f': 475 tmppeer = true; // FALLTHROUGH 476 case 'o': 477 tmphost = strtok(optarg, ":"); 478 if ((portstr = strtok(NULL, ":")) == NULL) { 479 cerr << "Bad host specification." << endl; 480 usage(); 481 } 482 tmpport = (unsigned short)atoi(portstr); 483 484 config.addOtherHost(tmphost, tmpport, tmppeer); 485 486 break; 487 case 'p': 488 config.priority = atoi(optarg); 489 break; 490 case 'v': 491 config.verbose = true; 492 break; 493 case '?': 494 default: 495 usage(); 496 } 497 } 498 499 // Error check command line. 500 if ((!config.got_listen_address) || config.home == NULL) 501 usage(); 502 503 RepQuoteExample runner; 504 try { 505 runner.init(&config); 506 runner.doloop(); 507 } catch (DbException dbe) { 508 cerr << "Caught an exception during initialization or" 509 << " processing: " << dbe.what() << endl; 510 } 511 runner.terminate(); 512 return 0; 513} 514