1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14static int kick_blockers __P((ENV *, REPMGR_CONNECTION *, void *)); 15static int mismatch_err __P((const ENV *)); 16static int __repmgr_await_threads __P((ENV *)); 17 18/* 19 * PUBLIC: int __repmgr_start __P((DB_ENV *, int, u_int32_t)); 20 */ 21int 22__repmgr_start(dbenv, nthreads, flags) 23 DB_ENV *dbenv; 24 int nthreads; 25 u_int32_t flags; 26{ 27 DBT my_addr; 28 DB_REP *db_rep; 29 REP *rep; 30 DB_THREAD_INFO *ip; 31 ENV *env; 32 REPMGR_RUNNABLE *messenger; 33 int i, is_listener, locked, need_masterseek, ret; 34 35 env = dbenv->env; 36 db_rep = env->rep_handle; 37 38 switch (flags) { 39 case DB_REP_CLIENT: 40 case DB_REP_ELECTION: 41 case DB_REP_MASTER: 42 break; 43 default: 44 __db_errx(env, 45 "repmgr_start: unrecognized flags parameter value"); 46 return (EINVAL); 47 } 48 49 ENV_REQUIRES_CONFIG_XX( 50 env, rep_handle, "DB_ENV->repmgr_start", DB_INIT_REP); 51 if (!F_ISSET(env, ENV_THREAD)) { 52 __db_errx(env, 53 "Replication Manager needs an environment with DB_THREAD"); 54 return (EINVAL); 55 } 56 57 if (APP_IS_BASEAPI(env)) { 58 __db_errx(env, 59"DB_ENV->repmgr_start: cannot call from base replication application"); 60 return (EINVAL); 61 } 62 63 /* Check that the required initialization has been done. */ 64 if (db_rep->my_addr.host == NULL) { 65 __db_errx(env, 66 "repmgr_set_local_site must be called before repmgr_start"); 67 return (EINVAL); 68 } 69 70 if (db_rep->selector != NULL || db_rep->finished) { 71 __db_errx(env, 72 "DB_ENV->repmgr_start may not be called more than once"); 73 return (EINVAL); 74 } 75 76 /* 77 * See if anyone else is already fulfilling the listener role. If not, 78 * we'll do so. 79 */ 80 rep = db_rep->region; 81 ENV_ENTER(env, ip); 82 MUTEX_LOCK(env, rep->mtx_repmgr); 83 if (rep->listener == 0) { 84 is_listener = TRUE; 85 __os_id(dbenv, &rep->listener, NULL); 86 } else { 87 is_listener = FALSE; 88 nthreads = 0; 89 } 90 MUTEX_UNLOCK(env, rep->mtx_repmgr); 91 ENV_LEAVE(env, ip); 92 93 /* 94 * The minimum legal number of threads is either 1 or 0, depending upon 95 * whether we're the main process or a subordinate. 96 */ 97 locked = FALSE; 98 if (nthreads < (is_listener ? 1 : 0)) { 99 __db_errx(env, 100 "repmgr_start: nthreads parameter must be >= 1"); 101 ret = EINVAL; 102 goto err; 103 } 104 105 if ((ret = __repmgr_init(env)) != 0) 106 goto err; 107 if (is_listener && (ret = __repmgr_listen(env)) != 0) 108 goto err; 109 110 /* 111 * Make some sort of call to rep_start before starting other threads, to 112 * ensure that incoming messages being processed always have a rep 113 * context properly configured. Note that in a way this is wasted, in 114 * the sense that any messages that rep_start sends won't really go 115 * anywhere, because we haven't started the select() thread yet, so we 116 * don't yet really have any connections to any remote sites. But 117 * trying to do it the other way ends up requiring complicated code; 118 * this way we know easily that by the time we receive a message, we've 119 * already called rep_start, so it'll be legal to call 120 * rep_process_message. 121 * Note that even if we're starting without recovery, we need a 122 * rep_start call in case we're using leases. Leases keep track of 123 * rep_start calls even within an env region lifetime. 124 */ 125 if ((ret = __rep_set_transport_int(env, SELF_EID, __repmgr_send)) != 0) 126 goto err; 127 need_masterseek = FALSE; 128 if (!is_listener) { 129 /* Another process currently already listening in this env. */ 130 db_rep->master_eid = rep->master_id; 131 } else if ((db_rep->init_policy = flags) == DB_REP_MASTER) 132 ret = __repmgr_become_master(env); 133 else { 134 if ((ret = __repmgr_prepare_my_addr(env, &my_addr)) != 0) 135 goto err; 136 ret = __rep_start_int(env, &my_addr, DB_REP_CLIENT); 137 __os_free(env, my_addr.data); 138 139 if (rep->master_id == DB_EID_INVALID || 140 rep->master_id == SELF_EID) { 141 need_masterseek = TRUE; 142 } else { 143 /* 144 * Restarted without recovery. Use existing known 145 * master. 146 */ 147 db_rep->master_eid = rep->master_id; 148 } 149 } 150 if (ret != 0) 151 goto err; 152 if ((ret = __repmgr_start_selector(env)) != 0) 153 goto err; 154 155 if (is_listener) { 156 /* 157 * Since these allocated memory blocks are used by other 158 * threads, we have to be a bit careful about freeing them in 159 * case of any errors. __repmgr_await_threads (which we call in 160 * the err: coda below) takes care of that. 161 */ 162 if ((ret = __os_calloc(env, (u_int)nthreads, 163 sizeof(REPMGR_RUNNABLE *), &db_rep->messengers)) != 0) 164 goto err; 165 db_rep->nthreads = nthreads; 166 167 for (i = 0; i < nthreads; i++) { 168 if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), 169 &messenger)) != 0) 170 goto err; 171 172 messenger->env = env; 173 messenger->run = __repmgr_msg_thread; 174 if ((ret = __repmgr_thread_start(env, 175 messenger)) != 0) { 176 __os_free(env, messenger); 177 goto err; 178 } 179 db_rep->messengers[i] = messenger; 180 } 181 } 182 183 if (need_masterseek) { 184 LOCK_MUTEX(db_rep->mutex); 185 locked = TRUE; 186 if ((ret = __repmgr_init_election(env, ELECT_REPSTART)) != 0) 187 goto err; 188 UNLOCK_MUTEX(db_rep->mutex); 189 locked = FALSE; 190 } 191 192 return (is_listener ? 0 : DB_REP_IGNORE); 193 194err: 195 /* If we couldn't succeed at everything, undo the parts we did do. */ 196 if (locked) 197 UNLOCK_MUTEX(db_rep->mutex); 198 if (db_rep->selector != NULL) { 199 (void)__repmgr_stop_threads(env); 200 (void)__repmgr_await_threads(env); 201 } 202 LOCK_MUTEX(db_rep->mutex); 203 (void)__repmgr_net_close(env); 204 if (REPMGR_INITED(db_rep)) 205 (void)__repmgr_deinit(env); 206 UNLOCK_MUTEX(db_rep->mutex); 207 return (ret); 208} 209 210/* 211 * PUBLIC: int __repmgr_autostart __P((ENV *)); 212 * 213 * Preconditions: rep_start() has been called; we're within an ENV_ENTER. 214 * Because of this, we mustn't call __rep_set_transport(), but rather we 215 * poke in send() function address manually. 216 */ 217int 218__repmgr_autostart(env) 219 ENV *env; 220{ 221 DB_REP *db_rep; 222 int ret; 223 224 db_rep = env->rep_handle; 225 226 DB_ASSERT(env, REP_ON(env)); 227 LOCK_MUTEX(db_rep->mutex); 228 229 if (REPMGR_INITED(db_rep)) 230 ret = 0; 231 else 232 ret = __repmgr_init(env); 233 if (ret != 0) 234 goto out; 235 236 RPRINT(env, DB_VERB_REPMGR_MISC, 237 (env, "Automatically joining existing repmgr env")); 238 239 db_rep->send = __repmgr_send; 240 241 if (db_rep->selector == NULL && !db_rep->finished) 242 ret = __repmgr_start_selector(env); 243 244out: 245 UNLOCK_MUTEX(db_rep->mutex); 246 return (ret); 247} 248 249/* 250 * PUBLIC: int __repmgr_start_selector __P((ENV *)); 251 */ 252int 253__repmgr_start_selector(env) 254 ENV *env; 255{ 256 DB_REP *db_rep; 257 REPMGR_RUNNABLE *selector; 258 int ret; 259 260 db_rep = env->rep_handle; 261 if ((ret = __os_calloc(env, 1, sizeof(REPMGR_RUNNABLE), &selector)) 262 != 0) 263 return (ret); 264 selector->env = env; 265 selector->run = __repmgr_select_thread; 266 267 /* 268 * In case the select thread ever examines db_rep->selector, set it 269 * before starting the thread (since once we create it we could be 270 * racing with it). 271 */ 272 db_rep->selector = selector; 273 if ((ret = __repmgr_thread_start(env, selector)) != 0) { 274 __db_err(env, ret, "can't start selector thread"); 275 __os_free(env, selector); 276 db_rep->selector = NULL; 277 return (ret); 278 } 279 280 return (0); 281} 282 283/* 284 * PUBLIC: int __repmgr_close __P((ENV *)); 285 */ 286int 287__repmgr_close(env) 288 ENV *env; 289{ 290 DB_REP *db_rep; 291 int ret, t_ret; 292 293 ret = 0; 294 db_rep = env->rep_handle; 295 if (db_rep->selector != NULL) { 296 RPRINT(env, DB_VERB_REPMGR_MISC, 297 (env, "Stopping repmgr threads")); 298 ret = __repmgr_stop_threads(env); 299 if ((t_ret = __repmgr_await_threads(env)) != 0 && ret == 0) 300 ret = t_ret; 301 RPRINT(env, DB_VERB_REPMGR_MISC, 302 (env, "Repmgr threads are finished")); 303 } 304 305 if ((t_ret = __repmgr_net_close(env)) != 0 && ret == 0) 306 ret = t_ret; 307 308 if ((t_ret = __repmgr_deinit(env)) != 0 && ret == 0) 309 ret = t_ret; 310 311 return (ret); 312} 313 314/* 315 * PUBLIC: int __repmgr_set_ack_policy __P((DB_ENV *, int)); 316 */ 317int 318__repmgr_set_ack_policy(dbenv, policy) 319 DB_ENV *dbenv; 320 int policy; 321{ 322 DB_REP *db_rep; 323 ENV *env; 324 325 env = dbenv->env; 326 db_rep = env->rep_handle; 327 328 ENV_NOT_CONFIGURED( 329 env, db_rep->region, "DB_ENV->repmgr_set_ack_policy", DB_INIT_REP); 330 331 if (APP_IS_BASEAPI(env)) { 332 __db_errx(env, "%s %s", "DB_ENV->repmgr_set_ack_policy:", 333 "cannot call from base replication application"); 334 return (EINVAL); 335 } 336 337 switch (policy) { 338 case DB_REPMGR_ACKS_ALL: /* FALLTHROUGH */ 339 case DB_REPMGR_ACKS_ALL_PEERS: /* FALLTHROUGH */ 340 case DB_REPMGR_ACKS_NONE: /* FALLTHROUGH */ 341 case DB_REPMGR_ACKS_ONE: /* FALLTHROUGH */ 342 case DB_REPMGR_ACKS_ONE_PEER: /* FALLTHROUGH */ 343 case DB_REPMGR_ACKS_QUORUM: 344 env->rep_handle->perm_policy = policy; 345 /* 346 * Setting an ack policy makes this a replication manager 347 * application. 348 */ 349 APP_SET_REPMGR(env); 350 return (0); 351 default: 352 __db_errx(env, 353 "unknown ack_policy in DB_ENV->repmgr_set_ack_policy"); 354 return (EINVAL); 355 } 356} 357 358/* 359 * PUBLIC: int __repmgr_get_ack_policy __P((DB_ENV *, int *)); 360 */ 361int 362__repmgr_get_ack_policy(dbenv, policy) 363 DB_ENV *dbenv; 364 int *policy; 365{ 366 DB_REP *db_rep; 367 ENV *env; 368 369 env = dbenv->env; 370 db_rep = env->rep_handle; 371 372 ENV_NOT_CONFIGURED( 373 env, db_rep->region, "DB_ENV->repmgr_get_ack_policy", DB_INIT_REP); 374 375 *policy = env->rep_handle->perm_policy; 376 return (0); 377} 378 379/* 380 * PUBLIC: int __repmgr_env_create __P((ENV *, DB_REP *)); 381 */ 382int 383__repmgr_env_create(env, db_rep) 384 ENV *env; 385 DB_REP *db_rep; 386{ 387 COMPQUIET(env, NULL); 388 389 /* Set some default values. */ 390 db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT; 391 db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; 392 db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; 393 db_rep->config_nsites = 0; 394 db_rep->peer = DB_EID_INVALID; 395 db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; 396 397 db_rep->listen_fd = INVALID_SOCKET; 398 db_rep->master_eid = DB_EID_INVALID; 399 TAILQ_INIT(&db_rep->connections); 400 TAILQ_INIT(&db_rep->retries); 401 402 db_rep->input_queue.size = 0; 403 STAILQ_INIT(&db_rep->input_queue.header); 404 405 __repmgr_env_create_pf(db_rep); 406 407 return (0); 408} 409 410/* 411 * PUBLIC: void __repmgr_env_destroy __P((ENV *, DB_REP *)); 412 */ 413void 414__repmgr_env_destroy(env, db_rep) 415 ENV *env; 416 DB_REP *db_rep; 417{ 418 __repmgr_queue_destroy(env); 419 __repmgr_net_destroy(env, db_rep); 420 if (db_rep->messengers != NULL) { 421 __os_free(env, db_rep->messengers); 422 db_rep->messengers = NULL; 423 } 424} 425 426/* 427 * PUBLIC: int __repmgr_stop_threads __P((ENV *)); 428 */ 429int 430__repmgr_stop_threads(env) 431 ENV *env; 432{ 433 DB_REP *db_rep; 434 int ret; 435 436 db_rep = env->rep_handle; 437 438 /* 439 * Hold mutex for the purpose of waking up threads, but then get out of 440 * the way to let them clean up and exit. 441 */ 442 LOCK_MUTEX(db_rep->mutex); 443 db_rep->finished = TRUE; 444 if (db_rep->elect_thread != NULL && 445 (ret = __repmgr_signal(&db_rep->check_election)) != 0) 446 goto unlock; 447 448 if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) 449 goto unlock; 450 451 if ((ret = __repmgr_each_connection(env, 452 kick_blockers, NULL, TRUE)) != 0) 453 goto unlock; 454 UNLOCK_MUTEX(db_rep->mutex); 455 456 return (__repmgr_wake_main_thread(env)); 457 458unlock: 459 UNLOCK_MUTEX(db_rep->mutex); 460 return (ret); 461} 462 463static int 464kick_blockers(env, conn, unused) 465 ENV *env; 466 REPMGR_CONNECTION *conn; 467 void *unused; 468{ 469 COMPQUIET(env, NULL); 470 COMPQUIET(unused, NULL); 471 472 return (conn->blockers > 0 ? __repmgr_signal(&conn->drained) : 0); 473} 474 475static int 476__repmgr_await_threads(env) 477 ENV *env; 478{ 479 DB_REP *db_rep; 480 REPMGR_RUNNABLE *messenger; 481 int ret, t_ret, i; 482 483 db_rep = env->rep_handle; 484 ret = 0; 485 if (db_rep->elect_thread != NULL) { 486 ret = __repmgr_thread_join(db_rep->elect_thread); 487 __os_free(env, db_rep->elect_thread); 488 db_rep->elect_thread = NULL; 489 } 490 491 for (i = 0; 492 i < db_rep->nthreads && db_rep->messengers[i] != NULL; i++) { 493 messenger = db_rep->messengers[i]; 494 if ((t_ret = __repmgr_thread_join(messenger)) != 0 && ret == 0) 495 ret = t_ret; 496 __os_free(env, messenger); 497 } 498 __os_free(env, db_rep->messengers); 499 db_rep->messengers = NULL; 500 501 if (db_rep->selector != NULL) { 502 if ((t_ret = __repmgr_thread_join(db_rep->selector)) != 0 && 503 ret == 0) 504 ret = t_ret; 505 __os_free(env, db_rep->selector); 506 db_rep->selector = NULL; 507 } 508 509 return (ret); 510} 511 512/* 513 * PUBLIC: int __repmgr_set_local_site __P((DB_ENV *, const char *, u_int, 514 * PUBLIC: u_int32_t)); 515 */ 516int 517__repmgr_set_local_site(dbenv, host, port, flags) 518 DB_ENV *dbenv; 519 const char *host; 520 u_int port; 521 u_int32_t flags; 522{ 523 DB_REP *db_rep; 524 DB_THREAD_INFO *ip; 525 ENV *env; 526 REGENV *renv; 527 REGINFO *infop; 528 REP *rep; 529 repmgr_netaddr_t addr; 530 char *myhost; 531 int locked, ret; 532 533 env = dbenv->env; 534 db_rep = env->rep_handle; 535 536 ENV_NOT_CONFIGURED( 537 env, db_rep->region, "DB_ENV->repmgr_set_local_site", DB_INIT_REP); 538 539 if (APP_IS_BASEAPI(env)) { 540 __db_errx(env, "%s %s", "DB_ENV->repmgr_set_local_site:", 541 "cannot call from base replication application"); 542 return (EINVAL); 543 } 544 545 if (db_rep->selector != NULL) { 546 __db_errx(env, 547"DB_ENV->repmgr_set_local_site: must be called before DB_ENV->repmgr_start"); 548 return (EINVAL); 549 } 550 551 if (flags != 0) 552 return (__db_ferr(env, "DB_ENV->repmgr_set_local_site", 0)); 553 554 if (host == NULL || port == 0) { 555 __db_errx(env, 556 "repmgr_set_local_site: host name and port (>0) required"); 557 return (EINVAL); 558 } 559 560 /* 561 * If the local site address hasn't already been set, just set it from 562 * the given inputs. If it has, all we do is verify that it matches 563 * what had already been set previously. 564 * 565 * Do this in the shared region if we have one, or else just in the 566 * local handle. 567 * 568 * In either case, don't perturb global structures until we're sure 569 * everything will succeed. 570 */ 571 COMPQUIET(rep, NULL); 572 COMPQUIET(ip, NULL); 573 COMPQUIET(renv, NULL); 574 locked = FALSE; 575 ret = 0; 576 if (REP_ON(env)) { 577 rep = db_rep->region; 578 ENV_ENTER(env, ip); 579 MUTEX_LOCK(env, rep->mtx_repmgr); 580 581 infop = env->reginfo; 582 renv = infop->primary; 583 MUTEX_LOCK(env, renv->mtx_regenv); 584 locked = TRUE; 585 if (rep->my_addr.host == INVALID_ROFF) { 586 if ((ret = __repmgr_pack_netaddr(env, 587 host, port, NULL, &addr)) != 0) 588 goto unlock; 589 590 if ((ret = __env_alloc(infop, 591 strlen(host)+1, &myhost)) == 0) { 592 (void)strcpy(myhost, host); 593 rep->my_addr.host = R_OFFSET(infop, myhost); 594 rep->my_addr.port = port; 595 } else { 596 __repmgr_cleanup_netaddr(env, &addr); 597 goto unlock; 598 } 599 memcpy(&db_rep->my_addr, &addr, sizeof(addr)); 600 rep->siteaddr_seq++; 601 } else { 602 myhost = R_ADDR(infop, rep->my_addr.host); 603 if (strcmp(myhost, host) != 0 || 604 port != rep->my_addr.port) { 605 ret = mismatch_err(env); 606 goto unlock; 607 } 608 } 609 } else { 610 if (db_rep->my_addr.host == NULL) { 611 if ((ret = __repmgr_pack_netaddr(env, 612 host, port, NULL, &db_rep->my_addr)) != 0) 613 goto unlock; 614 } else if (strcmp(host, db_rep->my_addr.host) != 0 || 615 port != db_rep->my_addr.port) { 616 ret = mismatch_err(env); 617 goto unlock; 618 } 619 } 620 621unlock: 622 if (locked) { 623 MUTEX_UNLOCK(env, renv->mtx_regenv); 624 MUTEX_UNLOCK(env, rep->mtx_repmgr); 625 ENV_LEAVE(env, ip); 626 } 627 /* 628 * Setting a local site makes this a replication manager application. 629 */ 630 if (ret == 0) 631 APP_SET_REPMGR(env); 632 return (ret); 633} 634 635static int 636mismatch_err(env) 637 const ENV *env; 638{ 639 __db_errx(env, "A (different) local site address has already been set"); 640 return (EINVAL); 641} 642 643/* 644 * If the application only calls this method from a single thread (e.g., during 645 * its initialization), it will avoid the problems with the non-thread-safe host 646 * name lookup. In any case, if we relegate the blocking lookup to here it 647 * won't affect our select() loop. 648 * 649 * PUBLIC: int __repmgr_add_remote_site __P((DB_ENV *, const char *, u_int, 650 * PUBLIC: int *, u_int32_t)); 651 */ 652int 653__repmgr_add_remote_site(dbenv, host, port, eidp, flags) 654 DB_ENV *dbenv; 655 const char *host; 656 u_int port; 657 int *eidp; 658 u_int32_t flags; 659{ 660 DB_REP *db_rep; 661 ENV *env; 662 REPMGR_SITE *site; 663 int eid, locked, ret; 664 665 env = dbenv->env; 666 db_rep = env->rep_handle; 667 locked = FALSE; 668 ret = 0; 669 670 ENV_NOT_CONFIGURED( 671 env, db_rep->region, "DB_ENV->repmgr_add_remote_site", DB_INIT_REP); 672 673 if (APP_IS_BASEAPI(env)) { 674 __db_errx(env, "%s %s", "DB_ENV->repmgr_add_remote_site:", 675 "cannot call from base replication application"); 676 return (EINVAL); 677 } 678 679 if ((ret = __db_fchk(env, 680 "DB_ENV->repmgr_add_remote_site", flags, DB_REPMGR_PEER)) != 0) 681 return (ret); 682 683 if (host == NULL) { 684 __db_errx(env, 685 "repmgr_add_remote_site: host name is required"); 686 return (EINVAL); 687 } 688 689 if (REP_ON(env)) { 690 LOCK_MUTEX(db_rep->mutex); 691 locked = TRUE; 692 693 ret = __repmgr_add_site(env, host, port, &site, flags); 694 if (ret == EEXIST) { 695 /* 696 * With NEWSITE messages arriving at any time, it would 697 * be impractical for applications to avoid this. Also 698 * this provides a way they can still set peer. 699 */ 700 ret = 0; 701 } 702 if (ret != 0) 703 goto out; 704 eid = EID_FROM_SITE(site); 705 if (eidp != NULL) 706 *eidp = eid; 707 } else { 708 if ((site = __repmgr_find_site(env, host, port)) == NULL && 709 (ret = __repmgr_new_site(env, 710 &site, host, port, SITE_IDLE)) != 0) 711 goto out; 712 eid = EID_FROM_SITE(site); 713 714 /* 715 * Set provisional EID of peer; may be adjusted at env open/join 716 * time. 717 */ 718 if (LF_ISSET(DB_REPMGR_PEER)) 719 db_rep->peer = eid; 720 } 721 722out: 723 if (locked) 724 UNLOCK_MUTEX(db_rep->mutex); 725 /* 726 * Adding a remote site makes this a replication manager application. 727 */ 728 if (ret == 0) 729 APP_SET_REPMGR(env); 730 return (ret); 731} 732