1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2001,2008 Oracle. All rights reserved. 5 * 6 * $Id: rep_msg.c,v 12.16 2008/01/08 20:58:24 bostic Exp $ 7 */ 8 9#include <sys/types.h> 10#include <errno.h> 11#include <stdio.h> 12#include <stdlib.h> 13#include <string.h> 14 15#include <db.h> 16 17#include "rep_base.h" 18 19static int connect_site __P((DB_ENV *, machtab_t *, 20 const char *, repsite_t *, int *, thread_t *)); 21static void *elect_thread __P((void *)); 22static void *hm_loop __P((void *)); 23 24typedef struct { 25 DB_ENV *dbenv; 26 machtab_t *machtab; 27} elect_args; 28 29typedef struct { 30 DB_ENV *dbenv; 31 const char *progname; 32 const char *home; 33 socket_t fd; 34 u_int32_t eid; 35 machtab_t *tab; 36} hm_loop_args; 37 38/* 39 * This is a generic message handling loop that is used both by the 40 * master to accept messages from a client as well as by clients 41 * to communicate with other clients. 42 */ 43static void * 44hm_loop(args) 45 void *args; 46{ 47 DB_ENV *dbenv; 48 DB_LSN permlsn; 49 DBT rec, control; 50 APP_DATA *app; 51 const char *c, *home, *progname; 52 elect_args *ea; 53 hm_loop_args *ha; 54 machtab_t *tab; 55 thread_t elect_thr, *site_thrs, *tmp, tid; 56 repsite_t self; 57 u_int32_t timeout; 58 int eid, n, nsites, nsites_allocd; 59 int already_open, r, ret, t_ret; 60 socket_t fd; 61 void *status; 62 63 ea = NULL; 64 site_thrs = NULL; 65 nsites_allocd = 0; 66 nsites = 0; 67 68 ha = (hm_loop_args *)args; 69 dbenv = ha->dbenv; 70 fd = ha->fd; 71 home = ha->home; 72 eid = ha->eid; 73 progname = ha->progname; 74 tab = ha->tab; 75 free(ha); 76 app = dbenv->app_private; 77 78 memset(&rec, 0, sizeof(DBT)); 79 memset(&control, 0, sizeof(DBT)); 80 81 for (ret = 0; ret == 0;) { 82 if ((ret = get_next_message(fd, &rec, &control)) != 0) { 83 /* 84 * Close this connection; if it's the master call 85 * for an election. 86 */ 87 closesocket(fd); 88 if ((ret = machtab_rem(tab, eid, 1)) != 0) 89 break; 90 91 /* 92 * If I'm the master, I just lost a client and this 93 * thread is done. 94 */ 95 if (master_eid == SELF_EID) 96 break; 97 98 /* 99 * If I was talking with the master and the master 100 * went away, I need to call an election; else I'm 101 * done. 102 */ 103 if (master_eid != eid) 104 break; 105 106 master_eid = DB_EID_INVALID; 107 machtab_parm(tab, &n, &timeout); 108 (void)dbenv->rep_set_timeout(dbenv, 109 DB_REP_ELECTION_TIMEOUT, timeout); 110 if ((ret = dbenv->rep_elect(dbenv, 111 n, (n/2+1), 0)) != 0) 112 continue; 113 114 /* 115 * Regardless of the results, the site I was talking 116 * to is gone, so I have nothing to do but exit. 117 */ 118 if (app->elected) { 119 app->elected = 0; 120 ret = dbenv->rep_start(dbenv, 121 NULL, DB_REP_MASTER); 122 } 123 break; 124 } 125 126 switch (r = dbenv->rep_process_message(dbenv, 127 &control, &rec, eid, &permlsn)) { 128 case DB_REP_NEWSITE: 129 /* 130 * Check if we got sent connect information and if we 131 * did, if this is me or if we already have a 132 * connection to this new site. If we don't, 133 * establish a new one. 134 */ 135 136 /* No connect info. */ 137 if (rec.size == 0) 138 break; 139 140 /* It's me, do nothing. */ 141 if (strncmp(myaddr, rec.data, rec.size) == 0) 142 break; 143 144 self.host = (char *)rec.data; 145 self.host = strtok(self.host, ":"); 146 if ((c = strtok(NULL, ":")) == NULL) { 147 dbenv->errx(dbenv, "Bad host specification"); 148 goto out; 149 } 150 self.port = atoi(c); 151 152 /* 153 * We try to connect to the new site. If we can't, 154 * we treat it as an error since we know that the site 155 * should be up if we got a message from it (even 156 * indirectly). 157 */ 158 if (nsites == nsites_allocd) { 159 /* Need to allocate more space. */ 160 if ((tmp = realloc( 161 site_thrs, (10 + nsites) * 162 sizeof(thread_t))) == NULL) { 163 ret = errno; 164 goto out; 165 } 166 site_thrs = tmp; 167 nsites_allocd += 10; 168 } 169 if ((ret = connect_site(dbenv, tab, progname, 170 &self, &already_open, &tid)) != 0) 171 goto out; 172 if (!already_open) 173 memcpy(&site_thrs 174 [nsites++], &tid, sizeof(thread_t)); 175 break; 176 case DB_REP_HOLDELECTION: 177 if (master_eid == SELF_EID) 178 break; 179 /* Make sure that previous election has finished. */ 180 if (ea != NULL) { 181 if (thread_join(elect_thr, &status) != 0) { 182 dbenv->errx(dbenv, 183 "thread join failure"); 184 goto out; 185 } 186 ea = NULL; 187 } 188 if ((ea = calloc(sizeof(elect_args), 1)) == NULL) { 189 dbenv->errx(dbenv, "can't allocate memory"); 190 ret = errno; 191 goto out; 192 } 193 ea->dbenv = dbenv; 194 ea->machtab = tab; 195 if ((ret = thread_create(&elect_thr, 196 NULL, elect_thread, (void *)ea)) != 0) { 197 dbenv->errx(dbenv, 198 "can't create election thread"); 199 } 200 break; 201 case DB_REP_ISPERM: 202 break; 203 case 0: 204 if (app->elected) { 205 app->elected = 0; 206 if ((ret = dbenv->rep_start(dbenv, 207 NULL, DB_REP_MASTER)) != 0) { 208 dbenv->err(dbenv, ret, 209 "can't start as master"); 210 goto out; 211 } 212 } 213 break; 214 default: 215 dbenv->err(dbenv, r, "DB_ENV->rep_process_message"); 216 break; 217 } 218 } 219 220out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0) 221 ret = t_ret; 222 223 /* Don't close the environment before any children exit. */ 224 if (ea != NULL && thread_join(elect_thr, &status) != 0) 225 dbenv->errx(dbenv, "can't join election thread"); 226 227 if (site_thrs != NULL) 228 while (--nsites >= 0) 229 if (thread_join(site_thrs[nsites], &status) != 0) 230 dbenv->errx(dbenv, "can't join site thread"); 231 232 return ((void *)(uintptr_t)ret); 233} 234 235/* 236 * This is a generic thread that spawns a thread to listen for connections 237 * on a socket and then spawns off child threads to handle each new 238 * connection. 239 */ 240void * 241connect_thread(args) 242 void *args; 243{ 244 DB_ENV *dbenv; 245 const char *home, *progname; 246 hm_loop_args *ha; 247 connect_args *cargs; 248 machtab_t *machtab; 249 thread_t hm_thrs[MAX_THREADS]; 250 void *status; 251 int i, eid, port, ret; 252 socket_t fd, ns; 253 254 ha = NULL; 255 cargs = (connect_args *)args; 256 dbenv = cargs->dbenv; 257 home = cargs->home; 258 progname = cargs->progname; 259 machtab = cargs->machtab; 260 port = cargs->port; 261 262 /* 263 * Loop forever, accepting connections from new machines, 264 * and forking off a thread to handle each. 265 */ 266 if ((fd = listen_socket_init(progname, port)) < 0) { 267 ret = errno; 268 goto err; 269 } 270 271 for (i = 0; i < MAX_THREADS; i++) { 272 if ((ns = listen_socket_accept(machtab, 273 progname, fd, &eid)) == SOCKET_CREATION_FAILURE) { 274 ret = errno; 275 goto err; 276 } 277 if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) { 278 dbenv->errx(dbenv, "can't allocate memory"); 279 ret = errno; 280 goto err; 281 } 282 ha->progname = progname; 283 ha->home = home; 284 ha->fd = ns; 285 ha->eid = eid; 286 ha->tab = machtab; 287 ha->dbenv = dbenv; 288 if ((ret = thread_create(&hm_thrs[i++], NULL, 289 hm_loop, (void *)ha)) != 0) { 290 dbenv->errx(dbenv, "can't create thread for site"); 291 goto err; 292 } 293 ha = NULL; 294 } 295 296 /* If we fell out, we ended up with too many threads. */ 297 dbenv->errx(dbenv, "Too many threads"); 298 ret = ENOMEM; 299 300 /* Do not return until all threads have exited. */ 301 while (--i >= 0) 302 if (thread_join(hm_thrs[i], &status) != 0) 303 dbenv->errx(dbenv, "can't join site thread"); 304 305err: return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE); 306} 307 308/* 309 * Open a connection to everyone that we've been told about. If we 310 * cannot open some connections, keep trying. 311 */ 312void * 313connect_all(args) 314 void *args; 315{ 316 DB_ENV *dbenv; 317 all_args *aa; 318 const char *home, *progname; 319 hm_loop_args *ha; 320 int failed, i, nsites, open, ret, *success; 321 machtab_t *machtab; 322 thread_t *hm_thr; 323 repsite_t *sites; 324 325 ha = NULL; 326 aa = (all_args *)args; 327 dbenv = aa->dbenv; 328 progname = aa->progname; 329 home = aa->home; 330 machtab = aa->machtab; 331 nsites = aa->nsites; 332 sites = aa->sites; 333 334 ret = 0; 335 hm_thr = NULL; 336 success = NULL; 337 338 /* Some implementations of calloc are sad about allocating 0 things. */ 339 if ((success = calloc(nsites > 0 ? nsites : 1, sizeof(int))) == NULL) { 340 dbenv->err(dbenv, errno, "connect_all"); 341 ret = 1; 342 goto err; 343 } 344 345 if (nsites > 0 && (hm_thr = calloc(nsites, sizeof(int))) == NULL) { 346 dbenv->err(dbenv, errno, "connect_all"); 347 ret = 1; 348 goto err; 349 } 350 351 for (failed = nsites; failed > 0;) { 352 for (i = 0; i < nsites; i++) { 353 if (success[i]) 354 continue; 355 356 ret = connect_site(dbenv, machtab, 357 progname, &sites[i], &open, &hm_thr[i]); 358 359 /* 360 * If we couldn't make the connection, this isn't 361 * fatal to the loop, but we have nothing further 362 * to do on this machine at the moment. 363 */ 364 if (ret == DB_REP_UNAVAIL) 365 continue; 366 367 if (ret != 0) 368 goto err; 369 370 failed--; 371 success[i] = 1; 372 373 /* If the connection is already open, we're done. */ 374 if (ret == 0 && open == 1) 375 continue; 376 377 } 378 sleep(1); 379 } 380 381err: if (success != NULL) 382 free(success); 383 if (hm_thr != NULL) 384 free(hm_thr); 385 return (ret ? (void *)EXIT_FAILURE : (void *)EXIT_SUCCESS); 386} 387 388static int 389connect_site(dbenv, machtab, progname, site, is_open, hm_thrp) 390 DB_ENV *dbenv; 391 machtab_t *machtab; 392 const char *progname; 393 repsite_t *site; 394 int *is_open; 395 thread_t *hm_thrp; 396{ 397 int eid, ret; 398 socket_t s; 399 hm_loop_args *ha; 400 401 if ((s = get_connected_socket(machtab, progname, 402 site->host, site->port, is_open, &eid)) < 0) 403 return (DB_REP_UNAVAIL); 404 405 if (*is_open) 406 return (0); 407 408 if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) { 409 dbenv->errx(dbenv, "can't allocate memory"); 410 ret = errno; 411 goto err; 412 } 413 414 ha->progname = progname; 415 ha->fd = s; 416 ha->eid = eid; 417 ha->tab = machtab; 418 ha->dbenv = dbenv; 419 420 if ((ret = thread_create(hm_thrp, NULL, 421 hm_loop, (void *)ha)) != 0) { 422 dbenv->errx(dbenv, "can't create thread for connected site"); 423 goto err1; 424 } 425 426 return (0); 427 428err1: free(ha); 429err: 430 return (ret); 431} 432 433/* 434 * We need to spawn off a new thread in which to hold an election in 435 * case we are the only thread listening on for messages. 436 */ 437static void * 438elect_thread(args) 439 void *args; 440{ 441 DB_ENV *dbenv; 442 elect_args *eargs; 443 machtab_t *machtab; 444 u_int32_t timeout; 445 int n, ret; 446 APP_DATA *app; 447 448 eargs = (elect_args *)args; 449 dbenv = eargs->dbenv; 450 machtab = eargs->machtab; 451 free(eargs); 452 app = dbenv->app_private; 453 454 machtab_parm(machtab, &n, &timeout); 455 (void)dbenv->rep_set_timeout(dbenv, DB_REP_ELECTION_TIMEOUT, timeout); 456 while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0) 457 sleep(2); 458 459 if (app->elected) { 460 app->elected = 0; 461 if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) 462 dbenv->err(dbenv, ret, 463 "can't start as master in election thread"); 464 } 465 466 return (NULL); 467} 468