1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001,2008 Oracle. All rights reserved. 5 * 6 * $Id: rep_base.c,v 12.21 2008/01/08 20:58:24 bostic Exp $ 7 */ 8 9#include <sys/types.h> 10#include <errno.h> 11#include <signal.h> 12#include <stdlib.h> 13#include <string.h> 14 15#include <db.h> 16 17#include "rep_base.h" 18 19/* 20 * Process globals (we could put these in the machtab I suppose). 21 */ 22int master_eid; 23char *myaddr; 24unsigned short myport; 25 26static void event_callback __P((DB_ENV *, u_int32_t, void *)); 27 28int 29main(argc, argv) 30 int argc; 31 char *argv[]; 32{ 33 extern char *optarg; 34 DB_ENV *dbenv; 35 DBT local; 36 enum { MASTER, CLIENT, UNKNOWN } whoami; 37 all_args aa; 38 connect_args ca; 39 machtab_t *machtab; 40 thread_t all_thr, conn_thr; 41 void *astatus, *cstatus; 42#ifdef _WIN32 43 WSADATA wsaData; 44#else 45 struct sigaction sigact; 46#endif 47 repsite_t site, *sitep, self, *selfp; 48 int maxsites, nsites, ret, priority, totalsites; 49 char *c, ch; 50 const char *home, *progname; 51 APP_DATA my_app_data; 52 53 master_eid = DB_EID_INVALID; 54 55 my_app_data.elected = 0; 56 my_app_data.shared_data.is_master = 0; /* assume start out as client */ 57 dbenv = NULL; 58 whoami = UNKNOWN; 59 machtab = NULL; 60 selfp = sitep = NULL; 61 maxsites = nsites = ret = totalsites = 0; 62 priority = 100; 63 home = "TESTDIR"; 64 progname = "ex_rep_base"; 65 66 if ((ret = create_env(progname, &dbenv)) != 0) 67 goto err; 68 dbenv->app_private = &my_app_data; 69 (void)dbenv->set_event_notify(dbenv, event_callback); 70 71 while ((ch = getopt(argc, argv, "Ch:Mm:n:o:p:v")) != EOF) 72 switch (ch) { 73 case 'M': 74 whoami = MASTER; 75 master_eid = SELF_EID; 76 break; 77 case 'C': 78 whoami = CLIENT; 79 break; 80 case 'h': 81 home = optarg; 82 break; 83 case 'm': 84 if ((myaddr = strdup(optarg)) == NULL) { 85 fprintf(stderr, 86 "System error %s\n", strerror(errno)); 87 goto err; 88 } 89 self.host = optarg; 90 self.host = strtok(self.host, ":"); 91 if ((c = strtok(NULL, ":")) == NULL) { 92 fprintf(stderr, "Bad host specification.\n"); 93 goto err; 94 } 95 myport = self.port = (unsigned short)atoi(c); 96 selfp = &self; 97 break; 98 case 'n': 99 totalsites = atoi(optarg); 100 break; 101 case 'o': 102 site.host = optarg; 103 site.host = strtok(site.host, ":"); 104 if ((c = strtok(NULL, ":")) == NULL) { 105 fprintf(stderr, "Bad host specification.\n"); 106 goto err; 107 } 108 site.port = atoi(c); 109 if (sitep == NULL || nsites >= maxsites) { 110 maxsites = maxsites == 0 ? 10 : 2 * maxsites; 111 if ((sitep = realloc(sitep, 112 maxsites * sizeof(repsite_t))) == NULL) { 113 fprintf(stderr, "System error %s\n", 114 strerror(errno)); 115 goto err; 116 } 117 } 118 sitep[nsites++] = site; 119 break; 120 case 'p': 121 priority = atoi(optarg); 122 break; 123 case 'v': 124 if ((ret = dbenv->set_verbose(dbenv, 125 DB_VERB_REPLICATION, 1)) != 0) 126 goto err; 127 break; 128 case '?': 129 default: 130 usage(progname); 131 } 132 133 /* Error check command line. */ 134 if (whoami == UNKNOWN) { 135 fprintf(stderr, "Must specify -M or -C.\n"); 136 goto err; 137 } 138 139 if (selfp == NULL) 140 usage(progname); 141 142 if (home == NULL) 143 usage(progname); 144 145 dbenv->rep_set_priority(dbenv, priority); 146 147#ifdef _WIN32 148 /* Initialize the Windows sockets DLL. */ 149 if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) { 150 fprintf(stderr, 151 "Unable to initialize Windows sockets: %d\n", ret); 152 goto err; 153 } 154#else 155 /* 156 * Turn off SIGPIPE so that we don't kill processes when they 157 * happen to lose a connection at the wrong time. 158 */ 159 memset(&sigact, 0, sizeof(sigact)); 160 sigact.sa_handler = SIG_IGN; 161 if ((ret = sigaction(SIGPIPE, &sigact, NULL)) != 0) { 162 fprintf(stderr, 163 "Unable to turn off SIGPIPE: %s\n", strerror(ret)); 164 goto err; 165 } 166#endif 167 168 /* 169 * We are hardcoding priorities here that all clients have the 170 * same priority except for a designated master who gets a higher 171 * priority. 172 */ 173 if ((ret = 174 machtab_init(&machtab, totalsites)) != 0) 175 goto err; 176 my_app_data.comm_infrastructure = machtab; 177 178 if ((ret = env_init(dbenv, home)) != 0) 179 goto err; 180 181 /* 182 * Now sets up comm infrastructure. There are two phases. First, 183 * we open our port for listening for incoming connections. Then 184 * we attempt to connect to every host we know about. 185 */ 186 187 (void)dbenv->rep_set_transport(dbenv, SELF_EID, quote_send); 188 189 ca.dbenv = dbenv; 190 ca.home = home; 191 ca.progname = progname; 192 ca.machtab = machtab; 193 ca.port = selfp->port; 194 if ((ret = thread_create(&conn_thr, NULL, connect_thread, &ca)) != 0) { 195 dbenv->errx(dbenv, "can't create connect thread"); 196 goto err; 197 } 198 199 aa.dbenv = dbenv; 200 aa.progname = progname; 201 aa.home = home; 202 aa.machtab = machtab; 203 aa.sites = sitep; 204 aa.nsites = nsites; 205 if ((ret = thread_create(&all_thr, NULL, connect_all, &aa)) != 0) { 206 dbenv->errx(dbenv, "can't create connect-all thread"); 207 goto err; 208 } 209 210 /* 211 * We have now got the entire communication infrastructure set up. 212 * It's time to declare ourselves to be a client or master. 213 */ 214 if (whoami == MASTER) { 215 if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) { 216 dbenv->err(dbenv, ret, "dbenv->rep_start failed"); 217 goto err; 218 } 219 } else { 220 memset(&local, 0, sizeof(local)); 221 local.data = myaddr; 222 local.size = (u_int32_t)strlen(myaddr) + 1; 223 if ((ret = 224 dbenv->rep_start(dbenv, &local, DB_REP_CLIENT)) != 0) { 225 dbenv->err(dbenv, ret, "dbenv->rep_start failed"); 226 goto err; 227 } 228 /* Sleep to give ourselves time to find a master. */ 229 sleep(5); 230 } 231 if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) { 232 dbenv->err(dbenv, ret, "Main loop failed"); 233 goto err; 234 } 235 236 /* Wait on the connection threads. */ 237 if (thread_join(all_thr, &astatus) || thread_join(conn_thr, &cstatus)) { 238 ret = -1; 239 goto err; 240 } 241 if ((uintptr_t)astatus != EXIT_SUCCESS || 242 (uintptr_t)cstatus != EXIT_SUCCESS) { 243 ret = -1; 244 goto err; 245 } 246 247 /* 248 * We have used the DB_TXN_NOSYNC environment flag for improved 249 * performance without the usual sacrifice of transactional durability, 250 * as discussed in the "Transactional guarantees" page of the Reference 251 * Guide: if one replication site crashes, we can expect the data to 252 * exist at another site. However, in case we shut down all sites 253 * gracefully, we push out the end of the log here so that the most 254 * recent transactions don't mysteriously disappear. 255 */ 256 if ((ret = dbenv->log_flush(dbenv, NULL)) != 0) 257 dbenv->err(dbenv, ret, "log_flush"); 258 259err: if (machtab != NULL) 260 free(machtab); 261 if (dbenv != NULL) 262 (void)dbenv->close(dbenv, 0); 263#ifdef _WIN32 264 /* Shut down the Windows sockets DLL. */ 265 (void)WSACleanup(); 266#endif 267 return (ret); 268} 269 270static void 271event_callback(dbenv, which, info) 272 DB_ENV *dbenv; 273 u_int32_t which; 274 void *info; 275{ 276 APP_DATA *app = dbenv->app_private; 277 SHARED_DATA *shared = &app->shared_data; 278 279 switch (which) { 280 case DB_EVENT_REP_CLIENT: 281 shared->is_master = 0; 282 break; 283 284 case DB_EVENT_REP_ELECTED: 285 app->elected = 1; 286 master_eid = SELF_EID; 287 break; 288 289 case DB_EVENT_REP_MASTER: 290 shared->is_master = 1; 291 break; 292 293 case DB_EVENT_REP_NEWMASTER: 294 master_eid = *(int*)info; 295 break; 296 297 case DB_EVENT_REP_STARTUPDONE: 298 /* I don't care about this, for now. */ 299 break; 300 301 default: 302 dbenv->errx(dbenv, "ignoring event %d", which); 303 } 304} 305