• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /asuswrt-rt-n18u-9.0.0.4.380.2695/release/src-rt/router/db-4.8.30/examples_java/src/db/repquote/
1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1997-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9package db.repquote;
10
11import java.io.FileNotFoundException;
12import java.io.BufferedReader;
13import java.io.InputStreamReader;
14import java.io.IOException;
15import java.lang.Thread;
16import java.lang.InterruptedException;
17
18import com.sleepycat.db.*;
19import db.repquote.RepConfig;
20
21/**
22 * RepQuoteExample is a simple but complete demonstration of a replicated
23 * application. The application is a mock stock ticker. The master accepts a
24 * stock symbol and an numerical value as input, and stores this information
25 * into a replicated database; either master or clients can display the
26 * contents of the database.
27 * <p>
28 * The options to start a given replication node are:
29 * <pre>
30 *   -h home (required; h stands for home directory)
31 *   -l host:port (required; l stands for local)
32 *   -C or M (optional; start up as client or master)
33 *   -r host:port (optional; r stands for remote; any number of these may
34 *      be specified)
35 *   -R host:port (optional; R stands for remote peer; only one of these may
36 *      be specified)
37 *   -a all|quorum (optional; a stands for ack policy)
38 *   -b (optional; b stands for bulk)
39 *   -n nsites (optional; number of sites in replication group; defaults to 0
40 *      to try to dynamically compute nsites)
41 *   -p priority (optional; defaults to 100)
42 *   -v (optional; v stands for verbose)
43 * </pre>
44 * <p>
45 * A typical session begins with a command such as the following to start a
46 * master:
47 *
48 * <pre>
49 *   java db.repquote.RepQuoteExample -M -h dir1 -l localhost:6000
50 * </pre>
51 *
52 * and several clients:
53 *
54 * <pre>
55 *   java db.repquote.RepQuoteExample -C -h dir2
56 *               -l localhost:6001 -r localhost:6000
57 *   java db.repquote.RepQuoteExample -C -h dir3
58 *               -l localhost:6002 -r localhost:6000
59 *   java db.repquote.RepQuoteExample -C -h dir4
60 *               -l localhost:6003 -r localhost:6000
61 * </pre>
62 *
63 * <p>
64 * Each process is a member of a DB replication group. The sample application
65 * expects the following commands to stdin:
66 * <ul>
67 * <li>NEWLINE -- print all the stocks held in the database</li>
68 * <li>quit -- shutdown this node</li>
69 * <li>exit -- shutdown this node</li>
70 * <li>stock_symbol number -- enter this stock and number into the
71 * database</li>
72 * </ul>
73 */
74
75public class RepQuoteExample
76{
77    private RepConfig appConfig;
78    private RepQuoteEnvironment dbenv;
79    private CheckpointThread ckpThr;
80    private LogArchiveThread lgaThr;
81
82    public static void usage()
83    {
84        System.err.println("usage: " + RepConfig.progname +
85            " -h home -l host:port [-CM][-r host:port][-R host:port]\n" +
86            "  [-a all|quorum][-b][-n nsites][-p priority][-v]");
87
88        System.err.println(
89             "\t -h home (required; h stands for home directory)\n" +
90             "\t -l host:port (required; l stands for local)\n" +
91             "\t -C or -M (optional; start up as client or master)\n" +
92             "\t -r host:port (optional; r stands for remote; any number " +
93             "of these\n" +
94             "\t    may be specified)\n" +
95             "\t -R host:port (optional; R stands for remote peer; only " +
96             "one of\n" +
97             "\t    these may be specified)\n" +
98             "\t -a all|quorum (optional; a stands for ack policy)\n" +
99             "\t -b (optional; b stands for bulk)\n" +
100             "\t -n nsites (optional; number of sites in replication " +
101             "group; defaults\n" +
102             "\t    to 0 to try to dynamically compute nsites)\n" +
103             "\t -p priority (optional; defaults to 100)\n" +
104             "\t -v (optional; v stands for verbose)\n");
105
106        System.exit(1);
107    }
108
109    public static void main(String[] argv)
110        throws Exception
111    {
112        RepConfig config = new RepConfig();
113        boolean isPeer;
114        String tmpHost;
115        int tmpPort = 0;
116
117        /*  Extract the command line parameters. */
118        for (int i = 0; i < argv.length; i++)
119        {
120            isPeer = false;
121            if (argv[i].compareTo("-a") == 0) {
122                if (i == argv.length - 1)
123                    usage();
124                i++;
125                if (argv[i].equals("all"))
126                    config.ackPolicy = ReplicationManagerAckPolicy.ALL;
127                else if (!argv[i].equals("quorum"))
128                    usage();
129            } else if (argv[i].compareTo("-b") == 0)
130                config.bulk = true;
131            else if (argv[i].compareTo("-C") == 0) {
132                config.startPolicy = ReplicationManagerStartPolicy.REP_CLIENT;
133            } else if (argv[i].compareTo("-h") == 0) {
134                if (i == argv.length - 1)
135                    usage();
136                /* home - a string arg. */
137                i++;
138                config.home = argv[i];
139            } else if (argv[i].compareTo("-l") == 0) {
140                if (i == argv.length - 1)
141                    usage();
142                /* "local" should be host:port. */
143                i++;
144                String[] words = argv[i].split(":");
145                if (words.length != 2) {
146                    System.err.println(
147                        "Invalid host specification host:port needed.");
148                    usage();
149                }
150                try {
151                    tmpPort = Integer.parseInt(words[1]);
152                } catch (NumberFormatException nfe) {
153                    System.err.println("Invalid host specification, " +
154                        "could not parse port number.");
155                    usage();
156                }
157                config.setThisHost(words[0], tmpPort);
158            } else if (argv[i].compareTo("-M") == 0) {
159                config.startPolicy = ReplicationManagerStartPolicy.REP_MASTER;
160            } else if (argv[i].compareTo("-n") == 0) {
161                if (i == argv.length - 1)
162                    usage();
163                i++;
164                config.totalSites = Integer.parseInt(argv[i]);
165            } else if (argv[i].compareTo("-p") == 0) {
166                if (i == argv.length - 1)
167                    usage();
168                i++;
169                config.priority = Integer.parseInt(argv[i]);
170            } else if (argv[i].compareTo("-R") == 0 ||
171                argv[i].compareTo("-r") == 0) {
172                if (i == argv.length - 1)
173                    usage();
174                if (argv[i].equals("-R"))
175                    isPeer = true;
176                i++;
177                String[] words = argv[i].split(":");
178                if (words.length != 2) {
179                    System.err.println(
180                        "Invalid host specification host:port needed.");
181                    usage();
182                }
183                try {
184                    tmpPort = Integer.parseInt(words[1]);
185                } catch (NumberFormatException nfe) {
186                    System.err.println("Invalid host specification, " +
187                        "could not parse port number.");
188                    usage();
189                }
190                config.addOtherHost(words[0], tmpPort, isPeer);
191            } else if (argv[i].compareTo("-v") == 0) {
192                config.verbose = true;
193            } else {
194                System.err.println("Unrecognized option: " + argv[i]);
195                usage();
196            }
197
198        }
199
200        /* Error check command line. */
201        if ((!config.gotListenAddress()) || config.home.length() == 0)
202            usage();
203
204        RepQuoteExample runner = null;
205        try {
206            runner = new RepQuoteExample();
207            runner.init(config);
208
209            /* Sleep to give ourselves time to find a master. */
210            //try {
211            //    Thread.sleep(5000);
212            //} catch (InterruptedException e) {}
213
214            runner.doloop();
215            runner.terminate();
216        } catch (DatabaseException dbe) {
217            System.err.println("Caught an exception during " +
218                "initialization or processing: " + dbe);
219            if (runner != null)
220                runner.terminate();
221        }
222    } /* End main. */
223
224    public RepQuoteExample()
225        throws DatabaseException
226    {
227        appConfig = null;
228        dbenv = null;
229    }
230
231    public int init(RepConfig config)
232        throws DatabaseException
233    {
234        int ret = 0;
235        appConfig = config;
236        EnvironmentConfig envConfig = new EnvironmentConfig();
237        envConfig.setErrorStream(System.err);
238        envConfig.setErrorPrefix(RepConfig.progname);
239
240        envConfig.setReplicationManagerLocalSite(appConfig.getThisHost());
241        for (RepRemoteHost host = appConfig.getFirstOtherHost();
242            host != null; host = appConfig.getNextOtherHost()){
243            envConfig.replicationManagerAddRemoteSite(
244                host.getAddress(), host.isPeer());
245        }
246        if (appConfig.totalSites > 0)
247            envConfig.setReplicationNumSites(appConfig.totalSites);
248
249        /*
250         * Set replication group election priority for this environment.
251         * An election first selects the site with the most recent log
252         * records as the new master.  If multiple sites have the most
253         * recent log records, the site with the highest priority value
254         * is selected as master.
255         */
256        envConfig.setReplicationPriority(appConfig.priority);
257
258        envConfig.setCacheSize(RepConfig.CACHESIZE);
259        envConfig.setTxnNoSync(true);
260
261        envConfig.setEventHandler(new RepQuoteEventHandler());
262
263        /*
264         * Set the policy that determines how master and client sites
265         * handle acknowledgement of replication messages needed for
266         * permanent records.  The default policy of "quorum" requires only
267         * a quorum of electable peers sufficient to ensure a permanent
268         * record remains durable if an election is held.  The "all" option
269         * requires all clients to acknowledge a permanent replication
270         * message instead.
271         */
272        envConfig.setReplicationManagerAckPolicy(appConfig.ackPolicy);
273
274        /*
275         * Set the threshold for the minimum and maximum time the client
276         * waits before requesting retransmission of a missing message.
277         * Base these values on the performance and load characteristics
278         * of the master and client host platforms as well as the round
279         * trip message time.
280         */
281        envConfig.setReplicationRequestMin(20000);
282        envConfig.setReplicationRequestMax(500000);
283
284        /*
285         * Configure deadlock detection to ensure that any deadlocks
286         * are broken by having one of the conflicting lock requests
287         * rejected. DB_LOCK_DEFAULT uses the lock policy specified
288         * at environment creation time or DB_LOCK_RANDOM if none was
289         * specified.
290         */
291        envConfig.setLockDetectMode(LockDetectMode.DEFAULT);
292
293        envConfig.setAllowCreate(true);
294        envConfig.setRunRecovery(true);
295        envConfig.setThreaded(true);
296        envConfig.setInitializeReplication(true);
297        envConfig.setInitializeLocking(true);
298        envConfig.setInitializeLogging(true);
299        envConfig.setInitializeCache(true);
300        envConfig.setTransactional(true);
301        envConfig.setVerboseReplication(appConfig.verbose);
302        try {
303            dbenv = new RepQuoteEnvironment(appConfig.getHome(), envConfig);
304        } catch(FileNotFoundException e) {
305            System.err.println("FileNotFound exception: " + e);
306            System.err.println(
307                "Ensure that the environment directory is pre-created.");
308            ret = 1;
309        }
310
311        if (appConfig.bulk)
312            dbenv.setReplicationConfig(ReplicationConfig.BULK, true);
313
314        /*
315         * Configure heartbeat timeouts so that repmgr monitors the
316         * health of the TCP connection.  Master sites broadcast a heartbeat
317         * at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
318         * Client sites wait for message activity the length of the
319         * DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
320         * connection to the master is lost.  The DB_REP_HEARTBEAT_MONITOR
321         * timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
322         */
323        dbenv.setReplicationTimeout(ReplicationTimeoutType.HEARTBEAT_SEND,
324            5000000);
325        dbenv.setReplicationTimeout(ReplicationTimeoutType.HEARTBEAT_MONITOR,
326            10000000);
327
328        /* The following base replication features may also be useful to your
329         * application. See Berkeley DB documentation for more details.
330         *   - Master leases: Provide stricter consistency for data reads
331         *     on a master site.
332         *   - Timeouts: Customize the amount of time Berkeley DB waits
333         *     for such things as an election to be concluded or a master
334         *     lease to be granted.
335         *   - Delayed client synchronization: Manage the master site's
336         *     resources by spreading out resource-intensive client
337         *     synchronizations.
338         *   - Blocked client operations: Return immediately with an error
339         *     instead of waiting indefinitely if a client operation is
340         *     blocked by an ongoing client synchronization.
341         *
342         * The following repmgr features may also be useful to your
343         * application.  See Berkeley DB documentation for more details.
344         *  - Two-site strict majority rule - In a two-site replication
345         *    group, require both sites to be available to elect a new
346         *    master.
347         *  - Timeouts - Customize the amount of time repmgr waits
348         *    for such things as waiting for acknowledgements or attempting
349         *    to reconnect to other sites.
350         *  - Site list - return a list of sites currently known to repmgr.
351         */
352
353        /* Start checkpoint and log archive support threads. */
354        ckpThr = new CheckpointThread(dbenv);
355        ckpThr.start();
356        lgaThr = new LogArchiveThread(dbenv, envConfig);
357        lgaThr.start();
358
359        /* Start replication manager. */
360        dbenv.replicationManagerStart(3, appConfig.startPolicy);
361
362        return ret;
363    }
364
365    public int doloop()
366        throws DatabaseException
367    {
368        Database db = null;
369
370        for (;;)
371        {
372            if (db == null) {
373                DatabaseConfig dbconf = new DatabaseConfig();
374                dbconf.setType(DatabaseType.BTREE);
375                if (dbenv.getIsMaster()) {
376                    /*
377                     * Open database allowing create only if this is a master
378                     * database.  A client database uses polling to attempt
379                     * to open the database without allowing create until
380                     * it is successful.
381                     *
382                     * This polling logic for allowing create can be
383                     * simplified under some circumstances.  For example, if
384                     * the application can be sure a database is already
385                     * there, it would never need to open it allowing create.
386                     */
387                    dbconf.setAllowCreate(true);
388                }
389                dbconf.setTransactional(true);
390
391                try {
392                    db = dbenv.openDatabase
393                        (null, RepConfig.progname, null, dbconf);
394                } catch (java.io.FileNotFoundException e) {
395                    System.err.println("no stock database available yet.");
396                    if (db != null) {
397                        db.close(true);
398                        db = null;
399                    }
400                    try {
401                        Thread.sleep(RepConfig.SLEEPTIME);
402                    } catch (InterruptedException ie) {}
403                    continue;
404                }
405            }
406
407            BufferedReader stdin =
408                new BufferedReader(new InputStreamReader(System.in));
409
410            /* Listen for input, and add it to the database. */
411            System.out.print("QUOTESERVER");
412            if (!dbenv.getIsMaster())
413                System.out.print("(read-only)");
414            System.out.print("> ");
415            System.out.flush();
416            String nextline = null;
417            try {
418                nextline = stdin.readLine();
419            } catch (IOException ioe) {
420                System.err.println("Unable to get data from stdin");
421                break;
422            }
423            String[] words = nextline.split("\\s");
424
425            /* A blank line causes the DB to be dumped to stdout. */
426            if (words.length == 0 ||
427                (words.length == 1 && words[0].length() == 0)) {
428                try {
429                    if (dbenv.getInClientSync())
430                        System.err.println(
431    "Cannot read data during client initialization - please try again.");
432                    else
433                        printStocks(db);
434                } catch (DeadlockException de) {
435                    continue;
436                } catch (DatabaseException e) {
437                    /*
438                     * This could be DB_REP_HANDLE_DEAD, which
439                     * should close the database and continue.
440                     */
441                    System.err.println("Got db exception reading replication" +
442                        "DB: " + e);
443                    System.err.println("Expected if it was due to a dead " +
444                        "replication handle, otherwise an unexpected error.");
445                    db.close(true); /* Close no sync. */
446                    db = null;
447                    continue;
448                }
449                continue;
450            }
451
452            if (words.length == 1 &&
453                (words[0].compareToIgnoreCase("quit") == 0 ||
454                words[0].compareToIgnoreCase("exit") == 0)) {
455                    dbenv.setAppFinished(true);
456                    break;
457            } else if (words.length != 2) {
458                System.err.println("Format: TICKER VALUE");
459                continue;
460            }
461
462            if (!dbenv.getIsMaster()) {
463                System.err.println("Can't update client.");
464                continue;
465            }
466
467            DatabaseEntry key = new DatabaseEntry(words[0].getBytes());
468            DatabaseEntry data = new DatabaseEntry(words[1].getBytes());
469
470            db.put(null, key, data);
471        }
472        if (db != null)
473            db.close(true);
474        return 0;
475    }
476
477    public void terminate()
478        throws DatabaseException
479    {
480        /* Wait for checkpoint and log archive threads to finish. */
481        try {
482            lgaThr.join();
483            ckpThr.join();
484        } catch (Exception e1) {
485            System.err.println("Support thread join failed.");
486        }
487
488        /*
489         * We have used the DB_TXN_NOSYNC environment flag for improved
490         * performance without the usual sacrifice of transactional durability,
491         * as discussed in the "Transactional guarantees" page of the Reference
492         * Guide: if one replication site crashes, we can expect the data to
493         * exist at another site.  However, in case we shut down all sites
494         * gracefully, we push out the end of the log here so that the most
495         * recent transactions don't mysteriously disappear.
496         */
497        dbenv.logFlush(null);
498
499        dbenv.close();
500    }
501
502    /*
503     * void return type since error conditions are propogated
504     * via exceptions.
505     */
506    private void printStocks(Database db)
507        throws DeadlockException, DatabaseException
508    {
509        Cursor dbc = db.openCursor(null, null);
510
511        System.out.println("\tSymbol\tPrice");
512        System.out.println("\t======\t=====");
513
514        DatabaseEntry key = new DatabaseEntry();
515        DatabaseEntry data = new DatabaseEntry();
516        OperationStatus ret;
517        for (ret = dbc.getFirst(key, data, LockMode.DEFAULT);
518            ret == OperationStatus.SUCCESS;
519            ret = dbc.getNext(key, data, LockMode.DEFAULT)) {
520            String keystr = new String
521                (key.getData(), key.getOffset(), key.getSize());
522            String datastr = new String
523                (data.getData(), data.getOffset(), data.getSize());
524            System.out.println("\t"+keystr+"\t"+datastr);
525
526        }
527        dbc.close();
528    }
529
530    /*
531     * Implemention of EventHandler interface to handle the Berkeley DB events
532     * we are interested in receiving.
533     */
534    private /* internal */
535    class RepQuoteEventHandler extends EventHandlerAdapter {
536        public void handleRepClientEvent()
537        {
538            dbenv.setIsMaster(false);
539            dbenv.setInClientSync(true);
540        }
541        public void handleRepMasterEvent()
542        {
543            dbenv.setIsMaster(true);
544            dbenv.setInClientSync(false);
545        }
546        public void handleRepNewMasterEvent()
547        {
548            dbenv.setInClientSync(true);
549        }
550        public void handleRepPermFailedEvent()
551        {
552            /*
553             * Did not get enough acks to guarantee transaction
554             * durability based on the configured ack policy.  This
555             * transaction will be flushed to the master site's
556             * local disk storage for durability.
557             */
558            System.err.println(
559    "Insufficient acknowledgements to guarantee transaction durability.");
560        }
561        public void handleRepStartupDoneEvent()
562        {
563            dbenv.setInClientSync(false);
564        }
565    }
566} /* End RepQuoteEventHandler class. */
567
568/*
569 * This is a very simple thread that performs checkpoints at a fixed
570 * time interval.  For a master site, the time interval is one minute
571 * plus the duration of the checkpoint_delay timeout (30 seconds by
572 * default.)  For a client site, the time interval is one minute.
573 */
574class CheckpointThread extends Thread {
575    private RepQuoteEnvironment myEnv = null;
576
577    public CheckpointThread(RepQuoteEnvironment env) {
578        myEnv = env;
579    }
580
581    public void run() {
582        for (;;) {
583            /*
584             * Wait for one minute, polling once per second to see if
585             * application has finished.  When application has finished,
586             * terminate this thread.
587             */
588            for (int i = 0; i < 60; i++) {
589                try {
590                    Thread.sleep(1000);
591                } catch (InterruptedException ie) {}
592                if (myEnv.getAppFinished())
593                    return;
594            }
595
596            /* Perform a checkpoint. */
597            try {
598                myEnv.checkpoint(null);
599            } catch (DatabaseException de) {
600                System.err.println("Could not perform checkpoint.");
601            }
602        }
603    }
604}
605
606/*
607 * This is a simple log archive thread.  Once per minute, it removes all but
608 * the most recent 3 logs that are safe to remove according to a call to
609 * DBENV->log_archive().
610 *
611 * Log cleanup is needed to conserve disk space, but aggressive log cleanup
612 * can cause more frequent client initializations if a client lags too far
613 * behind the current master.  This can happen in the event of a slow client,
614 * a network partition, or a new master that has not kept as many logs as the
615 * previous master.
616 *
617 * The approach in this routine balances the need to mitigate against a
618 * lagging client by keeping a few more of the most recent unneeded logs
619 * with the need to conserve disk space by regularly cleaning up log files.
620 * Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE
621 * flag) is not recommended for replication due to the risk of frequent
622 * client initializations.
623 */
624class LogArchiveThread extends Thread {
625    private RepQuoteEnvironment myEnv = null;
626    private EnvironmentConfig myEnvConfig = null;
627
628    public LogArchiveThread(RepQuoteEnvironment env,
629        EnvironmentConfig envConfig) {
630        myEnv = env;
631        myEnvConfig = envConfig;
632    }
633
634    public void run() {
635        java.io.File[] logFileList;
636        int logs_to_keep = 3;
637        int minlog;
638
639        for (;;) {
640            /*
641             * Wait for one minute, polling once per second to see if
642             * application has finished.  When application has finished,
643             * terminate this thread.
644             */
645            for (int i = 0; i < 60; i++) {
646                try {
647                    Thread.sleep(1000);
648                } catch (InterruptedException ie) {}
649                if (myEnv.getAppFinished())
650                    return;
651            }
652
653            try {
654                /* Get the list of unneeded log files. */
655                logFileList = myEnv.getArchiveLogFiles(false);
656                /*
657                 * Remove all but the logs_to_keep most recent unneeded
658                 * log files.
659                 */
660                minlog = logFileList.length - logs_to_keep;
661                for (int i = 0; i < minlog; i++) {
662                    logFileList[i].delete();
663                }
664            } catch (DatabaseException de) {
665                System.err.println("Problem deleting log archive files.");
666            }
667        }
668    }
669}
670