• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src/router/db-4.8.30/examples_cxx/excxx_repquote_gsg/
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2006-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9// NOTE: This example is a simplified version of the RepQuoteExample.cxx
10// example that can be found in the db/examples_cxx/excxx_repquote directory.
11//
12// This example is intended only as an aid in learning Replication Manager
13// concepts. It is not complete in that many features are not exercised
14// in it, nor are many error conditions properly handled.
15
16#include <iostream>
17
18#include <db_cxx.h>
19#include "RepConfigInfo.h"
20
21using std::cout;
22using std::cin;
23using std::cerr;
24using std::endl;
25using std::flush;
26
27#define CACHESIZE (10 * 1024 * 1024)
28#define DATABASE "quote.db"
29#define SLEEPTIME 3
30
31const char *progname = "excxx_repquote_gsg_repmgr";
32
33#ifdef _WIN32
34#define WIN32_LEAN_AND_MEAN
35#include <windows.h>
36#include <direct.h>
37#define    sleep(s)        Sleep(1000 * (s))
38
39extern "C" {
40  extern int getopt(int, char * const *, const char *);
41  extern char *optarg;
42}
43#else
44#include <errno.h>
45#endif
46
47// Struct used to store information in Db app_private field.
48typedef struct {
49    int is_master;
50} APP_DATA;
51
52class RepMgrGSG
53{
54public:
55    RepMgrGSG();
56    int init(RepConfigInfo* config);
57    int doloop();
58    int terminate();
59
60    static void event_callback(DbEnv * dbenv, u_int32_t which, void *info);
61
62private:
63    // Disable copy constructor.
64    RepMgrGSG(const RepMgrGSG &);
65    void operator = (const RepMgrGSG &);
66
67    // Internal data members.
68    APP_DATA        app_data;
69    RepConfigInfo   *app_config;
70    DbEnv           dbenv;
71
72    // Private methods.
73    static int print_stocks(Db *dbp);
74};
75
76static void usage()
77{
78    cerr << "usage: " << progname << endl
79        << "-h home -l host:port [-r host:port]"
80        << "[-n nsites][-p priority]" << endl;
81
82    cerr
83        << "\t -h home directory (required)" << endl
84        << "\t -l host:port (required; l stands for local)" << endl
85        << "\t -r host:port (optional; r stands for remote; any "
86        << "number of these" << endl
87        << "\t    may be specified)" << endl
88        << "\t -n nsites (optional; number of sites in replication "
89        << "group; defaults" << endl
90        << "\t    to 0 to try to dynamically compute nsites)" << endl
91        << "\t -p priority (optional; defaults to 100)" << endl;
92
93    exit(EXIT_FAILURE);
94}
95
96int main(int argc, char **argv)
97{
98    RepConfigInfo config;
99    char ch, *portstr, *tmphost;
100    int tmpport;
101    int ret;
102
103    // Extract the command line parameters.
104    while ((ch = getopt(argc, argv, "h:l:n:p:r:")) != EOF) {
105        switch (ch) {
106        case 'h':
107            config.home = optarg;
108            break;
109        case 'l':
110            config.this_host.host = strtok(optarg, ":");
111            if ((portstr = strtok(NULL, ":")) == NULL) {
112                cerr << "Bad host specification." << endl;
113                usage();
114            }
115            config.this_host.port = (unsigned short)atoi(portstr);
116            config.got_listen_address = true;
117            break;
118        case 'n':
119            config.totalsites = atoi(optarg);
120            break;
121        case 'p':
122            config.priority = atoi(optarg);
123            break;
124        case 'r':
125            tmphost = strtok(optarg, ":");
126            if ((portstr = strtok(NULL, ":")) == NULL) {
127                cerr << "Bad host specification." << endl;
128                usage();
129            }
130            tmpport = (unsigned short)atoi(portstr);
131            config.addOtherHost(tmphost, tmpport);
132            break;
133        case '?':
134        default:
135            usage();
136        }
137    }
138
139    // Error check command line.
140    if ((!config.got_listen_address) || config.home == NULL)
141        usage();
142
143    RepMgrGSG runner;
144    try {
145        if((ret = runner.init(&config)) != 0)
146            goto err;
147        if((ret = runner.doloop()) != 0)
148            goto err;
149    } catch (DbException dbe) {
150        cerr << "Caught an exception during initialization or"
151            << " processing: " << dbe.what() << endl;
152    }
153err:
154    runner.terminate();
155    return 0;
156}
157
158RepMgrGSG::RepMgrGSG() : app_config(0), dbenv(0)
159{
160    app_data.is_master = 0; // By default, assume this site is not a master.
161}
162
163int RepMgrGSG::init(RepConfigInfo *config)
164{
165    int ret = 0;
166
167    app_config = config;
168
169    dbenv.set_errfile(stderr);
170    dbenv.set_errpfx(progname);
171    dbenv.set_app_private(&app_data);
172    dbenv.set_event_notify(event_callback);
173    dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ALL);
174
175    if ((ret = dbenv.repmgr_set_local_site(app_config->this_host.host,
176        app_config->this_host.port, 0)) != 0) {
177        // Should throw an exception anyway.
178        cerr << "Could not set listen address to host:port "
179            << app_config->this_host.host << ":"
180            << app_config->this_host.port
181            << "error: " << ret << endl;
182        cerr << "WARNING: This should have been an exception." << endl;
183    }
184
185    for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL;
186        cur = cur->next) {
187        if ((ret = dbenv.repmgr_add_remote_site(cur->host, cur->port,
188            0, 0)) != 0) {
189            // Should have resulted in an exception.
190            cerr << "could not add site." << endl
191                << "WARNING: This should have been an exception." << endl;
192        }
193    }
194
195    if (app_config->totalsites > 0) {
196        try {
197            if ((ret = dbenv.rep_set_nsites(app_config->totalsites)) != 0)
198                dbenv.err(ret, "set_nsites");
199        } catch (DbException dbe) {
200            cerr << "rep_set_nsites call failed. Continuing." << endl;
201            // Non-fatal to the test app.
202        }
203    }
204
205    dbenv.rep_set_priority(app_config->priority);
206
207    // Permanent messages require at least one ack.
208    dbenv.repmgr_set_ack_policy(DB_REPMGR_ACKS_ONE);
209    // Give 500 microseconds to receive the ack.
210    dbenv.rep_set_timeout(DB_REP_ACK_TIMEOUT, 5);
211
212    // We can now open our environment, although we're not ready to
213    // begin replicating.  However, we want to have a dbenv around
214    // so that we can send it into any of our message handlers.
215    dbenv.set_cachesize(0, CACHESIZE, 0);
216    dbenv.set_flags(DB_TXN_NOSYNC, 1);
217
218    try {
219        dbenv.open(app_config->home, DB_CREATE | DB_RECOVER |
220            DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
221            DB_INIT_MPOOL | DB_INIT_TXN, 0);
222    } catch(DbException dbe) {
223        cerr << "Caught an exception during DB environment open." << endl
224            << "Ensure that the home directory is created prior to starting"
225            << " the application." << endl;
226        ret = ENOENT;
227        goto err;
228    }
229
230    if ((ret = dbenv.repmgr_start(3, app_config->start_policy)) != 0)
231        goto err;
232
233err:
234    return ret;
235}
236
237int RepMgrGSG::terminate()
238{
239    try {
240        dbenv.close(0);
241    } catch (DbException dbe) {
242        cerr << "error closing environment: " << dbe.what() << endl;
243    }
244    return 0;
245}
246
247// Provides the main data processing function for our application.
248// This function provides a command line prompt to which the user
249// can provide a ticker string and a stock price.  Once a value is
250// entered to the application, the application writes the value to
251// the database and then displays the entire database.
252#define BUFSIZE 1024
253int RepMgrGSG::doloop()
254{
255    Dbt key, data;
256    Db *dbp;
257    char buf[BUFSIZE], *rbuf;
258    int ret;
259
260    dbp = 0;
261    memset(&key, 0, sizeof(key));
262    memset(&data, 0, sizeof(data));
263    ret = 0;
264
265    for (;;) {
266        if (dbp == 0) {
267            dbp = new Db(&dbenv, 0);
268
269            try {
270                dbp->open(NULL, DATABASE, NULL, DB_BTREE,
271                    app_data.is_master ? DB_CREATE | DB_AUTO_COMMIT :
272                    DB_AUTO_COMMIT, 0);
273            } catch(DbException dbe) {
274                // It is expected that this condition will be triggered
275                // when client sites start up.  It can take a while for
276                // the master site to be found and synced, and no DB will
277                // be available until then.
278                if (dbe.get_errno() == ENOENT) {
279                    cout << "No stock db available yet - retrying." << endl;
280                    try {
281                        dbp->close(0);
282                    } catch (DbException dbe2) {
283                        cout << "Unexpected error closing after failed" <<
284                            " open, message: " << dbe2.what() << endl;
285                        dbp = NULL;
286                        goto err;
287                    }
288                    dbp = NULL;
289                    sleep(SLEEPTIME);
290                    continue;
291                } else {
292                    dbenv.err(ret, "DB->open");
293                    throw dbe;
294                }
295            }
296        }
297
298        cout << "QUOTESERVER" ;
299        if (!app_data.is_master)
300            cout << "(read-only)";
301        cout << "> " << flush;
302
303        if (fgets(buf, sizeof(buf), stdin) == NULL)
304            break;
305        if (strtok(&buf[0], " \t\n") == NULL) {
306            switch ((ret = print_stocks(dbp))) {
307            case 0:
308                continue;
309            case DB_REP_HANDLE_DEAD:
310                (void)dbp->close(DB_NOSYNC);
311                cout << "closing db handle due to rep handle dead" << endl;
312                dbp = NULL;
313                continue;
314            default:
315                dbp->err(ret, "Error traversing data");
316                goto err;
317            }
318        }
319        rbuf = strtok(NULL, " \t\n");
320        if (rbuf == NULL || rbuf[0] == '\0') {
321            if (strncmp(buf, "exit", 4) == 0 ||
322                strncmp(buf, "quit", 4) == 0)
323                break;
324            dbenv.errx("Format: TICKER VALUE");
325            continue;
326        }
327
328        if (!app_data.is_master) {
329            dbenv.errx("Can't update at client");
330            continue;
331        }
332
333        key.set_data(buf);
334        key.set_size((u_int32_t)strlen(buf));
335
336        data.set_data(rbuf);
337        data.set_size((u_int32_t)strlen(rbuf));
338
339        if ((ret = dbp->put(NULL, &key, &data, 0)) != 0)
340        {
341            dbp->err(ret, "DB->put");
342            if (ret != DB_KEYEXIST)
343                goto err;
344        }
345    }
346
347err:    if (dbp != 0) {
348        (void)dbp->close(DB_NOSYNC);
349        }
350
351    return (ret);
352}
353
354// Handle replication events of interest to this application.
355void RepMgrGSG::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
356{
357    APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
358
359    info = 0;                // Currently unused.
360
361    switch (which) {
362    case DB_EVENT_REP_MASTER:
363        app->is_master = 1;
364        break;
365
366    case DB_EVENT_REP_CLIENT:
367        app->is_master = 0;
368        break;
369
370    case DB_EVENT_REP_STARTUPDONE: // FALLTHROUGH
371    case DB_EVENT_REP_NEWMASTER:
372        // Ignore.
373        break;
374
375    default:
376        dbenv->errx("ignoring event %d", which);
377    }
378}
379
380// Display all the stock quote information in the database.
381int RepMgrGSG::print_stocks(Db *dbp)
382{
383    Dbc *dbc;
384    Dbt key, data;
385#define    MAXKEYSIZE    10
386#define    MAXDATASIZE    20
387    char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
388    int ret, t_ret;
389    u_int32_t keysize, datasize;
390
391     if ((ret = dbp->cursor(NULL, &dbc, 0)) != 0) {
392        dbp->err(ret, "can't open cursor");
393        return (ret);
394    }
395
396    memset(&key, 0, sizeof(key));
397    memset(&data, 0, sizeof(data));
398
399    cout << "\tSymbol\tPrice" << endl
400        << "\t======\t=====" << endl;
401
402    for (ret = dbc->get(&key, &data, DB_FIRST);
403        ret == 0;
404        ret = dbc->get(&key, &data, DB_NEXT)) {
405        keysize = key.get_size() > MAXKEYSIZE ? MAXKEYSIZE : key.get_size();
406        memcpy(keybuf, key.get_data(), keysize);
407        keybuf[keysize] = '\0';
408
409        datasize = data.get_size() >=
410            MAXDATASIZE ? MAXDATASIZE : data.get_size();
411        memcpy(databuf, data.get_data(), datasize);
412        databuf[datasize] = '\0';
413
414        cout << "\t" << keybuf << "\t" << databuf << endl;
415    }
416    cout << endl << flush;
417
418    if ((t_ret = dbc->close()) != 0 && ret == 0) {
419        cout << "closed cursor" << endl;
420        ret = t_ret;
421    }
422
423    switch (ret) {
424    case 0:
425    case DB_NOTFOUND:
426    case DB_LOCK_DEADLOCK:
427        return (0);
428    default:
429        return (ret);
430    }
431}
432
433