1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1997,2008 Oracle. All rights reserved. 5 * 6 * $Id: RepQuoteExample.java,v 1.20 2008/02/15 18:47:11 alanb Exp $ 7 */ 8 9package db.repquote; 10 11import java.io.FileNotFoundException; 12import java.io.BufferedReader; 13import java.io.InputStreamReader; 14import java.io.IOException; 15import java.lang.Thread; 16import java.lang.InterruptedException; 17 18import com.sleepycat.db.*; 19import db.repquote.RepConfig; 20 21/** 22 * RepQuoteExample is a simple but complete demonstration of a replicated 23 * application. The application is a mock stock ticker. The master accepts a 24 * stock symbol and an numerical value as input, and stores this information 25 * into a replicated database; either master or clients can display the 26 * contents of the database. 27 * <p> 28 * The options to start a given replication node are: 29 * <pre> 30 * -M (configure this process to start as a master) 31 * -C (configure this process to start as a client) 32 * -h environment home directory 33 * -m host:port (required; m stands for me) 34 * -o host:port (optional; o stands for other; any number of these may 35 * be specified) 36 * -f host:port (optional; f stands for friend, and indicates a peer 37 * relationship to the specified site) 38 * -n nsites (optional; number of sites in replication group. 39 * Defaults to 0 in which case we dynamically compute the number of 40 * sites in the replication group) 41 * -p priority (optional: defaults to 100) 42 * -v Enable verbose logging 43 * </pre> 44 * <p> 45 * A typical session begins with a command such as the following to start a 46 * master: 47 * 48 * <pre> 49 * java db.repquote.RepQuoteExample -M -h dir1 -m localhost:6000 50 * </pre> 51 * 52 * and several clients: 53 * 54 * <pre> 55 * java db.repquote.RepQuoteExample -C -h dir2 56 * -m localhost:6001 -o localhost:6000 57 * java db.repquote.RepQuoteExample -C -h dir3 58 * -m localhost:6002 -o localhost:6000 59 * java db.repquote.RepQuoteExample -C -h dir4 60 * -m localhost:6003 -o localhost:6000 61 * </pre> 62 * 63 * <p> 64 * Each process is a member of a DB replication group. The sample application 65 * expects the following commands to stdin: 66 * <ul> 67 * <li>NEWLINE -- print all the stocks held in the database</li> 68 * <li>quit -- shutdown this node</li> 69 * <li>exit -- shutdown this node</li> 70 * <li>stock_symbol number -- enter this stock and number into the 71 * database</li> 72 * </ul> 73 */ 74 75public class RepQuoteExample 76{ 77 private RepConfig appConfig; 78 private RepQuoteEnvironment dbenv; 79 80 public static void usage() 81 { 82 System.err.println("usage: " + RepConfig.progname); 83 System.err.println("[-C][-M][-F][-h home][-o host:port]" + 84 "[-m host:port][-f host:port][-n nsites][-p priority][-v]"); 85 86 System.err.println( 87 "\t -C start the site as client of the replication group\n" + 88 "\t -M start the site as master of the replication group\n" + 89 "\t -f host:port (optional; f stands for friend and \n" + 90 "\t indicates a peer relationship to the specified site)\n" + 91 "\t -h home directory\n" + 92 "\t -m host:port (required; m stands for me)\n" + 93 "\t -n nsites (optional; number of sites in replication \n" + 94 "\t group; defaults to 0\n" + 95 "\t In which case the number of sites are computed \n" + 96 "\t dynamically\n" + 97 "\t -o host:port (optional; o stands for other; any number\n" + 98 "\t of these may be specified)\n" + 99 "\t -p priority (optional: defaults to 100)\n" + 100 "\t -v Enable verbose logging\n"); 101 102 System.exit(1); 103 } 104 105 public static void main(String[] argv) 106 throws Exception 107 { 108 RepConfig config = new RepConfig(); 109 boolean isPeer; 110 String tmpHost; 111 int tmpPort = 0; 112 // Extract the command line parameters 113 for (int i = 0; i < argv.length; i++) 114 { 115 isPeer = false; 116 if (argv[i].compareTo("-C") == 0) { 117 config.startPolicy = ReplicationManagerStartPolicy.REP_CLIENT; 118 } else if (argv[i].compareTo("-h") == 0) { 119 // home - a string arg. 120 i++; 121 config.home = argv[i]; 122 } else if (argv[i].compareTo("-M") == 0) { 123 config.startPolicy = ReplicationManagerStartPolicy.REP_MASTER; 124 } else if (argv[i].compareTo("-m") == 0) { 125 // "me" should be host:port 126 i++; 127 String[] words = argv[i].split(":"); 128 if (words.length != 2) { 129 System.err.println( 130 "Invalid host specification host:port needed."); 131 usage(); 132 } 133 try { 134 tmpPort = Integer.parseInt(words[1]); 135 } catch (NumberFormatException nfe) { 136 System.err.println("Invalid host specification, " + 137 "could not parse port number."); 138 usage(); 139 } 140 config.setThisHost(words[0], tmpPort); 141 } else if (argv[i].compareTo("-n") == 0) { 142 i++; 143 config.totalSites = Integer.parseInt(argv[i]); 144 } else if (argv[i].compareTo("-f") == 0 || 145 argv[i].compareTo("-o") == 0) { 146 if (argv[i].equals("-f")) 147 isPeer = true; 148 i++; 149 String[] words = argv[i].split(":"); 150 if (words.length != 2) { 151 System.err.println( 152 "Invalid host specification host:port needed."); 153 usage(); 154 } 155 try { 156 tmpPort = Integer.parseInt(words[1]); 157 } catch (NumberFormatException nfe) { 158 System.err.println("Invalid host specification, " + 159 "could not parse port number."); 160 usage(); 161 } 162 config.addOtherHost(words[0], tmpPort, isPeer); 163 } else if (argv[i].compareTo("-p") == 0) { 164 i++; 165 config.priority = Integer.parseInt(argv[i]); 166 } else if (argv[i].compareTo("-v") == 0) { 167 config.verbose = true; 168 } else { 169 System.err.println("Unrecognized option: " + argv[i]); 170 usage(); 171 } 172 173 } 174 175 // Error check command line. 176 if ((!config.gotListenAddress()) || config.home.length() == 0) 177 usage(); 178 179 RepQuoteExample runner = null; 180 try { 181 runner = new RepQuoteExample(); 182 runner.init(config); 183 184 // Sleep to give ourselves time to find a master. 185 //try { 186 // Thread.sleep(5000); 187 //} catch (InterruptedException e) {} 188 189 runner.doloop(); 190 runner.terminate(); 191 } catch (DatabaseException dbe) { 192 System.err.println("Caught an exception during " + 193 "initialization or processing: " + dbe); 194 if (runner != null) 195 runner.terminate(); 196 } 197 } // end main 198 199 public RepQuoteExample() 200 throws DatabaseException 201 { 202 appConfig = null; 203 dbenv = null; 204 } 205 206 public int init(RepConfig config) 207 throws DatabaseException 208 { 209 int ret = 0; 210 appConfig = config; 211 EnvironmentConfig envConfig = new EnvironmentConfig(); 212 envConfig.setErrorStream(System.err); 213 envConfig.setErrorPrefix(RepConfig.progname); 214 215 envConfig.setReplicationManagerLocalSite(appConfig.getThisHost()); 216 for (RepRemoteHost host = appConfig.getFirstOtherHost(); 217 host != null; host = appConfig.getNextOtherHost()){ 218 envConfig.replicationManagerAddRemoteSite( 219 host.getAddress(), host.isPeer()); 220 } 221 if (appConfig.totalSites > 0) 222 envConfig.setReplicationNumSites(appConfig.totalSites); 223 envConfig.setReplicationPriority(appConfig.priority); 224 225 envConfig.setCacheSize(RepConfig.CACHESIZE); 226 envConfig.setTxnNoSync(true); 227 228 envConfig.setEventHandler(new RepQuoteEventHandler()); 229 envConfig.setReplicationManagerAckPolicy(ReplicationManagerAckPolicy.ALL); 230 231 envConfig.setAllowCreate(true); 232 envConfig.setRunRecovery(true); 233 envConfig.setThreaded(true); 234 envConfig.setInitializeReplication(true); 235 envConfig.setInitializeLocking(true); 236 envConfig.setInitializeLogging(true); 237 envConfig.setInitializeCache(true); 238 envConfig.setTransactional(true); 239 envConfig.setVerboseReplication(appConfig.verbose); 240 try { 241 dbenv = new RepQuoteEnvironment(appConfig.getHome(), envConfig); 242 } catch(FileNotFoundException e) { 243 System.err.println("FileNotFound exception: " + e); 244 System.err.println( 245 "Ensure that the environment directory is pre-created."); 246 ret = 1; 247 } 248 249 // start replication manager 250 dbenv.replicationManagerStart(3, appConfig.startPolicy); 251 252 return ret; 253 } 254 255 public int doloop() 256 throws DatabaseException 257 { 258 Database db = null; 259 260 for (;;) 261 { 262 if (db == null) { 263 DatabaseConfig dbconf = new DatabaseConfig(); 264 // Set page size small so page allocation is cheap. 265 dbconf.setPageSize(512); 266 dbconf.setType(DatabaseType.BTREE); 267 if (dbenv.getIsMaster()) { 268 dbconf.setAllowCreate(true); 269 } 270 dbconf.setTransactional(true); 271 272 try { 273 db = dbenv.openDatabase 274 (null, RepConfig.progname, null, dbconf); 275 } catch (java.io.FileNotFoundException e) { 276 System.err.println("no stock database available yet."); 277 if (db != null) { 278 db.close(true); 279 db = null; 280 } 281 try { 282 Thread.sleep(RepConfig.SLEEPTIME); 283 } catch (InterruptedException ie) {} 284 continue; 285 } 286 } 287 288 BufferedReader stdin = 289 new BufferedReader(new InputStreamReader(System.in)); 290 291 // listen for input, and add it to the database. 292 System.out.print("QUOTESERVER"); 293 if (!dbenv.getIsMaster()) 294 System.out.print("(read-only)"); 295 System.out.print("> "); 296 System.out.flush(); 297 String nextline = null; 298 try { 299 nextline = stdin.readLine(); 300 } catch (IOException ioe) { 301 System.err.println("Unable to get data from stdin"); 302 break; 303 } 304 String[] words = nextline.split("\\s"); 305 306 // A blank line causes the DB to be dumped to stdout. 307 if (words.length == 0 || 308 (words.length == 1 && words[0].length() == 0)) { 309 try { 310 printStocks(db); 311 } catch (DeadlockException de) { 312 continue; 313 } catch (DatabaseException e) { 314 // this could be DB_REP_HANDLE_DEAD 315 // should close the database and continue 316 System.err.println("Got db exception reading replication" + 317 "DB: " + e); 318 System.err.println("Expected if it was due to a dead " + 319 "replication handle, otherwise an unexpected error."); 320 db.close(true); // close no sync. 321 db = null; 322 continue; 323 } 324 continue; 325 } 326 327 if (words.length == 1 && 328 (words[0].compareToIgnoreCase("quit") == 0 || 329 words[0].compareToIgnoreCase("exit") == 0)) { 330 break; 331 } else if (words.length != 2) { 332 System.err.println("Format: TICKER VALUE"); 333 continue; 334 } 335 336 if (!dbenv.getIsMaster()) { 337 System.err.println("Can't update client."); 338 continue; 339 } 340 341 DatabaseEntry key = new DatabaseEntry(words[0].getBytes()); 342 DatabaseEntry data = new DatabaseEntry(words[1].getBytes()); 343 344 db.put(null, key, data); 345 } 346 if (db != null) 347 db.close(true); 348 return 0; 349 } 350 351 public void terminate() 352 throws DatabaseException 353 { 354 /* 355 * We have used the DB_TXN_NOSYNC environment flag for improved 356 * performance without the usual sacrifice of transactional durability, 357 * as discussed in the "Transactional guarantees" page of the Reference 358 * Guide: if one replication site crashes, we can expect the data to 359 * exist at another site. However, in case we shut down all sites 360 * gracefully, we push out the end of the log here so that the most 361 * recent transactions don't mysteriously disappear. 362 */ 363 dbenv.logFlush(null); 364 365 dbenv.close(); 366 } 367 368 /* 369 * void return type since error conditions are propogated 370 * via exceptions. 371 */ 372 private void printStocks(Database db) 373 throws DeadlockException, DatabaseException 374 { 375 Cursor dbc = db.openCursor(null, null); 376 377 System.out.println("\tSymbol\tPrice"); 378 System.out.println("\t======\t====="); 379 380 DatabaseEntry key = new DatabaseEntry(); 381 DatabaseEntry data = new DatabaseEntry(); 382 OperationStatus ret; 383 for (ret = dbc.getFirst(key, data, LockMode.DEFAULT); 384 ret == OperationStatus.SUCCESS; 385 ret = dbc.getNext(key, data, LockMode.DEFAULT)) { 386 String keystr = new String 387 (key.getData(), key.getOffset(), key.getSize()); 388 String datastr = new String 389 (data.getData(), data.getOffset(), data.getSize()); 390 System.out.println("\t"+keystr+"\t"+datastr); 391 392 } 393 dbc.close(); 394 } 395 396 /* 397 * Implemention of EventHandler interface to handle the Berkeley DB events 398 * we are interested in receiving. 399 */ 400 private /* internal */ 401 class RepQuoteEventHandler extends EventHandlerAdapter { 402 public void handleRepClientEvent() 403 { 404 dbenv.setIsMaster(false); 405 } 406 public void handleRepMasterEvent() 407 { 408 dbenv.setIsMaster(true); 409 } 410 } 411} // end class 412 413