1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include <sys/types.h> 10#include <errno.h> 11#include <signal.h> 12#include <stdlib.h> 13#include <string.h> 14 15#include <db.h> 16 17#include "rep_base.h" 18 19/* 20 * Process globals (we could put these in the machtab I suppose). 21 */ 22int master_eid; 23char *myaddr; 24unsigned short myport; 25 26const char *progname = "ex_rep_base"; 27 28static void event_callback __P((DB_ENV *, u_int32_t, void *)); 29 30int 31main(argc, argv) 32 int argc; 33 char *argv[]; 34{ 35 DB_ENV *dbenv; 36 SETUP_DATA setup_info; 37 DBT local; 38 all_args aa; 39 connect_args ca; 40 supthr_args supa; 41 machtab_t *machtab; 42 thread_t all_thr, ckp_thr, conn_thr, lga_thr; 43 void *astatus, *cstatus; 44#ifdef _WIN32 45 WSADATA wsaData; 46#else 47 struct sigaction sigact; 48#endif 49 APP_DATA my_app_data; 50 int ret; 51 52 memset(&setup_info, 0, sizeof(SETUP_DATA)); 53 setup_info.progname = progname; 54 master_eid = DB_EID_INVALID; 55 memset(&my_app_data, 0, sizeof(APP_DATA)); 56 dbenv = NULL; 57 machtab = NULL; 58 ret = 0; 59 60 if ((ret = create_env(progname, &dbenv)) != 0) 61 goto err; 62 dbenv->app_private = &my_app_data; 63 (void)dbenv->set_event_notify(dbenv, event_callback); 64 65 /* Parse command line and perform common replication setup. */ 66 if ((ret = common_rep_setup(dbenv, argc, argv, &setup_info)) != 0) 67 goto err; 68 69 if (setup_info.role == MASTER) 70 master_eid = SELF_EID; 71 72 myaddr = strdup(setup_info.self.host); 73 myport = setup_info.self.port; 74 75#ifdef _WIN32 76 /* Initialize the Windows sockets DLL. */ 77 if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) { 78 fprintf(stderr, 79 "Unable to initialize Windows sockets: %d\n", ret); 80 goto err; 81 } 82#else 83 /* 84 * Turn off SIGPIPE so that we don't kill processes when they 85 * happen to lose a connection at the wrong time. 86 */ 87 memset(&sigact, 0, sizeof(sigact)); 88 sigact.sa_handler = SIG_IGN; 89 if ((ret = sigaction(SIGPIPE, &sigact, NULL)) != 0) { 90 fprintf(stderr, 91 "Unable to turn off SIGPIPE: %s\n", strerror(ret)); 92 goto err; 93 } 94#endif 95 96 /* 97 * We are hardcoding priorities here that all clients have the 98 * same priority except for a designated master who gets a higher 99 * priority. 100 */ 101 if ((ret = 102 machtab_init(&machtab, setup_info.nsites)) != 0) 103 goto err; 104 my_app_data.comm_infrastructure = machtab; 105 106 if ((ret = env_init(dbenv, setup_info.home)) != 0) 107 goto err; 108 109 /* 110 * Now sets up comm infrastructure. There are two phases. First, 111 * we open our port for listening for incoming connections. Then 112 * we attempt to connect to every host we know about. 113 */ 114 115 (void)dbenv->rep_set_transport(dbenv, SELF_EID, quote_send); 116 117 ca.dbenv = dbenv; 118 ca.home = setup_info.home; 119 ca.progname = progname; 120 ca.machtab = machtab; 121 ca.port = setup_info.self.port; 122 if ((ret = thread_create(&conn_thr, NULL, connect_thread, &ca)) != 0) { 123 dbenv->errx(dbenv, "can't create connect thread"); 124 goto err; 125 } 126 127 aa.dbenv = dbenv; 128 aa.progname = progname; 129 aa.home = setup_info.home; 130 aa.machtab = machtab; 131 aa.sites = setup_info.site_list; 132 aa.nsites = setup_info.remotesites; 133 if ((ret = thread_create(&all_thr, NULL, connect_all, &aa)) != 0) { 134 dbenv->errx(dbenv, "can't create connect-all thread"); 135 goto err; 136 } 137 138 /* Start checkpoint and log archive threads. */ 139 supa.dbenv = dbenv; 140 supa.shared = &my_app_data.shared_data; 141 if ((ret = start_support_threads(dbenv, &supa, &ckp_thr, &lga_thr)) 142 != 0) 143 goto err; 144 145 /* 146 * We have now got the entire communication infrastructure set up. 147 * It's time to declare ourselves to be a client or master. 148 */ 149 if (setup_info.role == MASTER) { 150 if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) { 151 dbenv->err(dbenv, ret, "dbenv->rep_start failed"); 152 goto err; 153 } 154 } else { 155 memset(&local, 0, sizeof(local)); 156 local.data = myaddr; 157 local.size = (u_int32_t)strlen(myaddr) + 1; 158 if ((ret = 159 dbenv->rep_start(dbenv, &local, DB_REP_CLIENT)) != 0) { 160 dbenv->err(dbenv, ret, "dbenv->rep_start failed"); 161 goto err; 162 } 163 /* Sleep to give ourselves time to find a master. */ 164 sleep(5); 165 } 166 167 if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) { 168 dbenv->err(dbenv, ret, "Main loop failed"); 169 goto err; 170 } 171 172 /* Finish checkpoint and log archive threads. */ 173 if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0) 174 goto err; 175 176 /* Wait on the connection threads. */ 177 if (thread_join(all_thr, &astatus) || thread_join(conn_thr, &cstatus)) { 178 ret = -1; 179 goto err; 180 } 181 if ((uintptr_t)astatus != EXIT_SUCCESS || 182 (uintptr_t)cstatus != EXIT_SUCCESS) { 183 ret = -1; 184 goto err; 185 } 186 187 /* 188 * We have used the DB_TXN_NOSYNC environment flag for improved 189 * performance without the usual sacrifice of transactional durability, 190 * as discussed in the "Transactional guarantees" page of the Reference 191 * Guide: if one replication site crashes, we can expect the data to 192 * exist at another site. However, in case we shut down all sites 193 * gracefully, we push out the end of the log here so that the most 194 * recent transactions don't mysteriously disappear. 195 */ 196 if ((ret = dbenv->log_flush(dbenv, NULL)) != 0) 197 dbenv->err(dbenv, ret, "log_flush"); 198 199err: if (machtab != NULL) 200 free(machtab); 201 if (dbenv != NULL) 202 (void)dbenv->close(dbenv, 0); 203#ifdef _WIN32 204 /* Shut down the Windows sockets DLL. */ 205 (void)WSACleanup(); 206#endif 207 return (ret); 208} 209 210static void 211event_callback(dbenv, which, info) 212 DB_ENV *dbenv; 213 u_int32_t which; 214 void *info; 215{ 216 APP_DATA *app = dbenv->app_private; 217 SHARED_DATA *shared = &app->shared_data; 218 219 switch (which) { 220 case DB_EVENT_REP_CLIENT: 221 shared->is_master = 0; 222 shared->in_client_sync = 1; 223 break; 224 225 case DB_EVENT_REP_ELECTED: 226 app->elected = 1; 227 master_eid = SELF_EID; 228 break; 229 230 case DB_EVENT_REP_MASTER: 231 shared->is_master = 1; 232 shared->in_client_sync = 0; 233 break; 234 235 case DB_EVENT_REP_NEWMASTER: 236 master_eid = *(int*)info; 237 shared->in_client_sync = 1; 238 break; 239 240 case DB_EVENT_REP_STARTUPDONE: 241 shared->in_client_sync = 0; 242 break; 243 244 default: 245 dbenv->errx(dbenv, "ignoring event %d", which); 246 } 247} 248