1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2006-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9// NOTE: This example is a simplified version of the RepQuoteExample.cxx 10// example that can be found in the db/examples_cxx/excxx_repquote directory. 11// 12// This example is intended only as an aid in learning Replication Manager 13// concepts. It is not complete in that many features are not exercised 14// in it, nor are many error conditions properly handled. 15 16#include <iostream> 17 18#include <db_cxx.h> 19#include "RepConfigInfo.h" 20 21using std::cout; 22using std::cin; 23using std::cerr; 24using std::endl; 25using std::flush; 26 27#define CACHESIZE (10 * 1024 * 1024) 28#define DATABASE "quote.db" 29#define SLEEPTIME 3 30 31const char *progname = "excxx_repquote_gsg_repmgr"; 32 33#ifdef _WIN32 34#define WIN32_LEAN_AND_MEAN 35#include <windows.h> 36#include <direct.h> 37#define sleep(s) Sleep(1000 * (s)) 38 39extern "C" { 40 extern int getopt(int, char * const *, const char *); 41 extern char *optarg; 42} 43#else 44#include <errno.h> 45#endif 46 47// Struct used to store information in Db app_private field. 48typedef struct { 49 int is_master; 50} APP_DATA; 51 52class RepMgrGSG 53{ 54public: 55 RepMgrGSG(); 56 int init(RepConfigInfo* config); 57 int doloop(); 58 int terminate(); 59 60 static void event_callback(DbEnv * dbenv, u_int32_t which, void *info); 61 62private: 63 // Disable copy constructor. 64 RepMgrGSG(const RepMgrGSG &); 65 void operator = (const RepMgrGSG &); 66 67 // Internal data members. 68 APP_DATA app_data; 69 RepConfigInfo *app_config; 70 DbEnv dbenv; 71 72 // Private methods. 73 static int print_stocks(Db *dbp); 74}; 75 76static void usage() 77{ 78 cerr << "usage: " << progname << endl 79 << "-h home -l host:port [-r host:port]" 80 << "[-n nsites][-p priority]" << endl; 81 82 cerr 83 << "\t -h home directory (required)" << endl 84 << "\t -l host:port (required; l stands for local)" << endl 85 << "\t -r host:port (optional; r stands for remote; any " 86 << "number of these" << endl 87 << "\t may be specified)" << endl 88 << "\t -n nsites (optional; number of sites in replication " 89 << "group; defaults" << endl 90 << "\t to 0 to try to dynamically compute nsites)" << endl 91 << "\t -p priority (optional; defaults to 100)" << endl; 92 93 exit(EXIT_FAILURE); 94} 95 96int main(int argc, char **argv) 97{ 98 RepConfigInfo config; 99 char ch, *portstr, *tmphost; 100 int tmpport; 101 int ret; 102 103 // Extract the command line parameters. 104 while ((ch = getopt(argc, argv, "h:l:n:p:r:")) != EOF) { 105 switch (ch) { 106 case 'h': 107 config.home = optarg; 108 break; 109 case 'l': 110 config.this_host.host = strtok(optarg, ":"); 111 if ((portstr = strtok(NULL, ":")) == NULL) { 112 cerr << "Bad host specification." << endl; 113 usage(); 114 } 115 config.this_host.port = (unsigned short)atoi(portstr); 116 config.got_listen_address = true; 117 break; 118 case 'n': 119 config.totalsites = atoi(optarg); 120 break; 121 case 'p': 122 config.priority = atoi(optarg); 123 break; 124 case 'r': 125 tmphost = strtok(optarg, ":"); 126 if ((portstr = strtok(NULL, ":")) == NULL) { 127 cerr << "Bad host specification." << endl; 128 usage(); 129 } 130 tmpport = (unsigned short)atoi(portstr); 131 config.addOtherHost(tmphost, tmpport); 132 break; 133 case '?': 134 default: 135 usage(); 136 } 137 } 138 139 // Error check command line. 140 if ((!config.got_listen_address) || config.home == NULL) 141 usage(); 142 143 RepMgrGSG runner; 144 try { 145 if((ret = runner.init(&config)) != 0) 146 goto err; 147 if((ret = runner.doloop()) != 0) 148 goto err; 149 } catch (DbException dbe) { 150 cerr << "Caught an exception during initialization or" 151 << " processing: " << dbe.what() << endl; 152 } 153err: 154 runner.terminate(); 155 return 0; 156} 157 158RepMgrGSG::RepMgrGSG() : app_config(0), dbenv(0) 159{ 160 app_data.is_master = 0; // By default, assume this site is not a master. 161} 162 163int RepMgrGSG::init(RepConfigInfo *config) 164{ 165 int ret = 0; 166 167 app_config = config; 168 169 dbenv.set_errfile(stderr); 170 dbenv.set_errpfx(progname); 171 dbenv.set_app_private(&app_data); 172 dbenv.set_event_notify(event_callback); 173 dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ALL); 174 175 if ((ret = dbenv.repmgr_set_local_site(app_config->this_host.host, 176 app_config->this_host.port, 0)) != 0) { 177 // Should throw an exception anyway. 178 cerr << "Could not set listen address to host:port " 179 << app_config->this_host.host << ":" 180 << app_config->this_host.port 181 << "error: " << ret << endl; 182 cerr << "WARNING: This should have been an exception." << endl; 183 } 184 185 for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL; 186 cur = cur->next) { 187 if ((ret = dbenv.repmgr_add_remote_site(cur->host, cur->port, 188 0, 0)) != 0) { 189 // Should have resulted in an exception. 190 cerr << "could not add site." << endl 191 << "WARNING: This should have been an exception." << endl; 192 } 193 } 194 195 if (app_config->totalsites > 0) { 196 try { 197 if ((ret = dbenv.rep_set_nsites(app_config->totalsites)) != 0) 198 dbenv.err(ret, "set_nsites"); 199 } catch (DbException dbe) { 200 cerr << "rep_set_nsites call failed. Continuing." << endl; 201 // Non-fatal to the test app. 202 } 203 } 204 205 dbenv.rep_set_priority(app_config->priority); 206 207 // Permanent messages require at least one ack. 208 dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ONE); 209 // Give 500 microseconds to receive the ack. 210 dbenv.rep_set_timeout(DB_REP_ACK_TIMEOUT, 5); 211 212 // We can now open our environment, although we're not ready to 213 // begin replicating. However, we want to have a dbenv around 214 // so that we can send it into any of our message handlers. 215 dbenv.set_cachesize(0, CACHESIZE, 0); 216 dbenv.set_flags(DB_TXN_NOSYNC, 1); 217 218 try { 219 dbenv.open(app_config->home, DB_CREATE | DB_RECOVER | 220 DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG | 221 DB_INIT_MPOOL | DB_INIT_TXN, 0); 222 } catch(DbException dbe) { 223 cerr << "Caught an exception during DB environment open." << endl 224 << "Ensure that the home directory is created prior to starting" 225 << " the application." << endl; 226 ret = ENOENT; 227 goto err; 228 } 229 230 if ((ret = dbenv.repmgr_start(3, app_config->start_policy)) != 0) 231 goto err; 232 233err: 234 return ret; 235} 236 237int RepMgrGSG::terminate() 238{ 239 try { 240 dbenv.close(0); 241 } catch (DbException dbe) { 242 cerr << "error closing environment: " << dbe.what() << endl; 243 } 244 return 0; 245} 246 247// Provides the main data processing function for our application. 248// This function provides a command line prompt to which the user 249// can provide a ticker string and a stock price. Once a value is 250// entered to the application, the application writes the value to 251// the database and then displays the entire database. 252#define BUFSIZE 1024 253int RepMgrGSG::doloop() 254{ 255 Dbt key, data; 256 Db *dbp; 257 char buf[BUFSIZE], *rbuf; 258 int ret; 259 260 dbp = 0; 261 memset(&key, 0, sizeof(key)); 262 memset(&data, 0, sizeof(data)); 263 ret = 0; 264 265 for (;;) { 266 if (dbp == 0) { 267 dbp = new Db(&dbenv, 0); 268 269 try { 270 dbp->open(NULL, DATABASE, NULL, DB_BTREE, 271 app_data.is_master ? DB_CREATE | DB_AUTO_COMMIT : 272 DB_AUTO_COMMIT, 0); 273 } catch(DbException dbe) { 274 // It is expected that this condition will be triggered 275 // when client sites start up. It can take a while for 276 // the master site to be found and synced, and no DB will 277 // be available until then. 278 if (dbe.get_errno() == ENOENT) { 279 cout << "No stock db available yet - retrying." << endl; 280 try { 281 dbp->close(0); 282 } catch (DbException dbe2) { 283 cout << "Unexpected error closing after failed" << 284 " open, message: " << dbe2.what() << endl; 285 dbp = NULL; 286 goto err; 287 } 288 dbp = NULL; 289 sleep(SLEEPTIME); 290 continue; 291 } else { 292 dbenv.err(ret, "DB->open"); 293 throw dbe; 294 } 295 } 296 } 297 298 cout << "QUOTESERVER" ; 299 if (!app_data.is_master) 300 cout << "(read-only)"; 301 cout << "> " << flush; 302 303 if (fgets(buf, sizeof(buf), stdin) == NULL) 304 break; 305 if (strtok(&buf[0], " \t\n") == NULL) { 306 switch ((ret = print_stocks(dbp))) { 307 case 0: 308 continue; 309 case DB_REP_HANDLE_DEAD: 310 (void)dbp->close(DB_NOSYNC); 311 cout << "closing db handle due to rep handle dead" << endl; 312 dbp = NULL; 313 continue; 314 default: 315 dbp->err(ret, "Error traversing data"); 316 goto err; 317 } 318 } 319 rbuf = strtok(NULL, " \t\n"); 320 if (rbuf == NULL || rbuf[0] == '\0') { 321 if (strncmp(buf, "exit", 4) == 0 || 322 strncmp(buf, "quit", 4) == 0) 323 break; 324 dbenv.errx("Format: TICKER VALUE"); 325 continue; 326 } 327 328 if (!app_data.is_master) { 329 dbenv.errx("Can't update at client"); 330 continue; 331 } 332 333 key.set_data(buf); 334 key.set_size((u_int32_t)strlen(buf)); 335 336 data.set_data(rbuf); 337 data.set_size((u_int32_t)strlen(rbuf)); 338 339 if ((ret = dbp->put(NULL, &key, &data, 0)) != 0) 340 { 341 dbp->err(ret, "DB->put"); 342 if (ret != DB_KEYEXIST) 343 goto err; 344 } 345 } 346 347err: if (dbp != 0) { 348 (void)dbp->close(DB_NOSYNC); 349 } 350 351 return (ret); 352} 353 354// Handle replication events of interest to this application. 355void RepMgrGSG::event_callback(DbEnv* dbenv, u_int32_t which, void *info) 356{ 357 APP_DATA *app = (APP_DATA*)dbenv->get_app_private(); 358 359 info = 0; // Currently unused. 360 361 switch (which) { 362 case DB_EVENT_REP_MASTER: 363 app->is_master = 1; 364 break; 365 366 case DB_EVENT_REP_CLIENT: 367 app->is_master = 0; 368 break; 369 370 case DB_EVENT_REP_STARTUPDONE: // FALLTHROUGH 371 case DB_EVENT_REP_NEWMASTER: 372 // Ignore. 373 break; 374 375 default: 376 dbenv->errx("ignoring event %d", which); 377 } 378} 379 380// Display all the stock quote information in the database. 381int RepMgrGSG::print_stocks(Db *dbp) 382{ 383 Dbc *dbc; 384 Dbt key, data; 385#define MAXKEYSIZE 10 386#define MAXDATASIZE 20 387 char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1]; 388 int ret, t_ret; 389 u_int32_t keysize, datasize; 390 391 if ((ret = dbp->cursor(NULL, &dbc, 0)) != 0) { 392 dbp->err(ret, "can't open cursor"); 393 return (ret); 394 } 395 396 memset(&key, 0, sizeof(key)); 397 memset(&data, 0, sizeof(data)); 398 399 cout << "\tSymbol\tPrice" << endl 400 << "\t======\t=====" << endl; 401 402 for (ret = dbc->get(&key, &data, DB_FIRST); 403 ret == 0; 404 ret = dbc->get(&key, &data, DB_NEXT)) { 405 keysize = key.get_size() > MAXKEYSIZE ? MAXKEYSIZE : key.get_size(); 406 memcpy(keybuf, key.get_data(), keysize); 407 keybuf[keysize] = '\0'; 408 409 datasize = data.get_size() >= 410 MAXDATASIZE ? MAXDATASIZE : data.get_size(); 411 memcpy(databuf, data.get_data(), datasize); 412 databuf[datasize] = '\0'; 413 414 cout << "\t" << keybuf << "\t" << databuf << endl; 415 } 416 cout << endl << flush; 417 418 if ((t_ret = dbc->close()) != 0 && ret == 0) { 419 cout << "closed cursor" << endl; 420 ret = t_ret; 421 } 422 423 switch (ret) { 424 case 0: 425 case DB_NOTFOUND: 426 case DB_LOCK_DEADLOCK: 427 return (0); 428 default: 429 return (ret); 430 } 431} 432 433