1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr_posix.c,v 1.37 2008/03/13 17:31:28 mbrey Exp $ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#define __INCLUDE_SELECT_H 1 13#include "db_int.h" 14 15/* 16 * A very rough guess at the maximum stack space one of our threads could ever 17 * need, which we hope is plenty conservative. This can be patched in the field 18 * if necessary. 19 */ 20#ifdef _POSIX_THREAD_ATTR_STACKSIZE 21size_t __repmgr_guesstimated_max = (128 * 1024); 22#endif 23 24static int __repmgr_conn_work __P((ENV *, 25 REPMGR_CONNECTION *, fd_set *, fd_set *, int)); 26static int finish_connecting __P((ENV *, REPMGR_CONNECTION *)); 27 28/* 29 * Starts the thread described in the argument, and stores the resulting thread 30 * ID therein. 31 * 32 * PUBLIC: int __repmgr_thread_start __P((ENV *, REPMGR_RUNNABLE *)); 33 */ 34int 35__repmgr_thread_start(env, runnable) 36 ENV *env; 37 REPMGR_RUNNABLE *runnable; 38{ 39 pthread_attr_t *attrp; 40#ifdef _POSIX_THREAD_ATTR_STACKSIZE 41 pthread_attr_t attributes; 42 size_t size; 43 int ret; 44#endif 45 46 runnable->finished = FALSE; 47 48#ifdef _POSIX_THREAD_ATTR_STACKSIZE 49 attrp = &attributes; 50 if ((ret = pthread_attr_init(&attributes)) != 0) { 51 __db_err(env, 52 ret, "pthread_attr_init in repmgr_thread_start"); 53 return (ret); 54 } 55 56 /* 57 * On a 64-bit machine it seems reasonable that we could need twice as 58 * much stack space as we did on a 32-bit machine. 59 */ 60 size = __repmgr_guesstimated_max; 61 if (sizeof(size_t) > 4) 62 size *= 2; 63#ifdef PTHREAD_STACK_MIN 64 if (size < PTHREAD_STACK_MIN) 65 size = PTHREAD_STACK_MIN; 66#endif 67 if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) { 68 __db_err(env, 69 ret, "pthread_attr_setstacksize in repmgr_thread_start"); 70 return (ret); 71 } 72#else 73 attrp = NULL; 74#endif 75 76 return (pthread_create(&runnable->thread_id, attrp, 77 runnable->run, env)); 78} 79 80/* 81 * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *)); 82 */ 83int 84__repmgr_thread_join(thread) 85 REPMGR_RUNNABLE *thread; 86{ 87 return (pthread_join(thread->thread_id, NULL)); 88} 89 90/* 91 * PUBLIC: int __repmgr_set_nonblocking __P((socket_t)); 92 */ 93int 94__repmgr_set_nonblocking(fd) 95 socket_t fd; 96{ 97 int flags; 98 99 if ((flags = fcntl(fd, F_GETFL, 0)) < 0) 100 return (errno); 101 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) 102 return (errno); 103 return (0); 104} 105 106/* 107 * PUBLIC: int __repmgr_wake_waiting_senders __P((ENV *)); 108 * 109 * Wake any send()-ing threads waiting for an acknowledgement. 110 * 111 * !!! 112 * Caller must hold the db_rep->mutex, if this thread synchronization is to work 113 * properly. 114 */ 115int 116__repmgr_wake_waiting_senders(env) 117 ENV *env; 118{ 119 return (pthread_cond_broadcast(&env->rep_handle->ack_condition)); 120} 121 122/* 123 * PUBLIC: int __repmgr_await_ack __P((ENV *, const DB_LSN *)); 124 * 125 * Waits (a limited time) for configured number of remote sites to ack the given 126 * LSN. 127 * 128 * !!! 129 * Caller must hold repmgr->mutex. 130 */ 131int 132__repmgr_await_ack(env, lsnp) 133 ENV *env; 134 const DB_LSN *lsnp; 135{ 136 DB_REP *db_rep; 137 struct timespec deadline; 138 int ret, timed; 139 140 db_rep = env->rep_handle; 141 142 if ((timed = (db_rep->ack_timeout > 0))) 143 __repmgr_compute_wait_deadline(env, &deadline, 144 db_rep->ack_timeout); 145 else 146 COMPQUIET(deadline.tv_sec, 0); 147 148 while (!__repmgr_is_permanent(env, lsnp)) { 149 if (timed) 150 ret = pthread_cond_timedwait(&db_rep->ack_condition, 151 &db_rep->mutex, &deadline); 152 else 153 ret = pthread_cond_wait(&db_rep->ack_condition, 154 &db_rep->mutex); 155 if (db_rep->finished) 156 return (DB_REP_UNAVAIL); 157 if (ret != 0) 158 return (ret); 159 } 160 return (0); 161} 162 163/* 164 * __repmgr_compute_wait_deadline -- 165 * Computes a deadline time a certain distance into the future. 166 * 167 * PUBLIC: void __repmgr_compute_wait_deadline __P((ENV*, 168 * PUBLIC: struct timespec *, db_timeout_t)); 169 */ 170void 171__repmgr_compute_wait_deadline(env, result, wait) 172 ENV *env; 173 struct timespec *result; 174 db_timeout_t wait; 175{ 176 /* 177 * The result is suitable for the pthread_cond_timewait call. (That 178 * call uses nano-second resolution; elsewhere we use microseconds.) 179 * 180 * Start with "now"; then add the "wait" offset. 181 * 182 * A db_timespec is the same as a "struct timespec" so we can pass 183 * result directly to the underlying Berkeley DB OS routine. 184 * 185 * !!! 186 * We use the system clock for the pthread_cond_timedwait call, but 187 * that's not optimal on systems with monotonic timers. Instead, 188 * we should call pthread_condattr_setclock on systems where it and 189 * monotonic timers are available, and then configure both this call 190 * and the subsequent pthread_cond_timewait call to use a monotonic 191 * timer. 192 */ 193 __os_gettime(env, (db_timespec *)result, 0); 194 TIMESPEC_ADD_DB_TIMEOUT(result, wait); 195} 196 197/* 198 * PUBLIC: int __repmgr_await_drain __P((ENV *, 199 * PUBLIC: REPMGR_CONNECTION *, db_timeout_t)); 200 * 201 * Waits for space to become available on the connection's output queue. 202 * Various ways we can exit: 203 * 204 * 1. queue becomes non-full 205 * 2. exceed time limit 206 * 3. connection becomes defunct (due to error in another thread) 207 * 4. repmgr is shutting down 208 * 5. any unexpected system resource failure 209 * 210 * In cases #3 and #5 we return an error code. Caller is responsible for 211 * distinguishing the remaining cases if desired. 212 * 213 * !!! 214 * Caller must hold repmgr->mutex. 215 */ 216int 217__repmgr_await_drain(env, conn, timeout) 218 ENV *env; 219 REPMGR_CONNECTION *conn; 220 db_timeout_t timeout; 221{ 222 DB_REP *db_rep; 223 struct timespec deadline; 224 int ret; 225 226 db_rep = env->rep_handle; 227 228 __repmgr_compute_wait_deadline(env, &deadline, timeout); 229 230 ret = 0; 231 while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { 232 ret = pthread_cond_timedwait(&conn->drained, 233 &db_rep->mutex, &deadline); 234 switch (ret) { 235 case 0: 236 if (db_rep->finished) 237 goto out; /* #4. */ 238 /* 239 * Another thread could have stumbled into an error on 240 * the socket while we were waiting. 241 */ 242 if (conn->state == CONN_DEFUNCT) { 243 ret = DB_REP_UNAVAIL; /* #3. */ 244 goto out; 245 } 246 break; 247 case ETIMEDOUT: 248 conn->state = CONN_CONGESTED; 249 ret = 0; 250 goto out; /* #2. */ 251 default: 252 goto out; /* #5. */ 253 } 254 } 255 /* #1. */ 256 257out: 258 return (ret); 259} 260 261/* 262 * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *)); 263 * 264 * Initialize a condition variable (in allocated space). 265 */ 266int 267__repmgr_alloc_cond(c) 268 cond_var_t *c; 269{ 270 return (pthread_cond_init(c, NULL)); 271} 272 273/* 274 * PUBLIC: int __repmgr_free_cond __P((cond_var_t *)); 275 * 276 * Clean up a previously initialized condition variable. 277 */ 278int 279__repmgr_free_cond(c) 280 cond_var_t *c; 281{ 282 return (pthread_cond_destroy(c)); 283} 284 285/* 286 * PUBLIC: int __repmgr_init_sync __P((ENV *, DB_REP *)); 287 * 288 * Allocate/initialize all data necessary for thread synchronization. This 289 * should be an all-or-nothing affair. Other than here and in _close_sync there 290 * should never be a time when these resources aren't either all allocated or 291 * all freed. If that's true, then we can safely use the values of the file 292 * descriptor(s) to keep track of which it is. 293 */ 294int 295__repmgr_init_sync(env, db_rep) 296 ENV *env; 297 DB_REP *db_rep; 298{ 299 int ret, mutex_inited, ack_inited, elect_inited, queue_inited, 300 file_desc[2]; 301 302 COMPQUIET(env, NULL); 303 304 mutex_inited = ack_inited = elect_inited = queue_inited = FALSE; 305 306 if ((ret = pthread_mutex_init(&db_rep->mutex, NULL)) != 0) 307 goto err; 308 mutex_inited = TRUE; 309 310 if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0) 311 goto err; 312 ack_inited = TRUE; 313 314 if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0) 315 goto err; 316 elect_inited = TRUE; 317 318 if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0) 319 goto err; 320 queue_inited = TRUE; 321 322 if ((ret = pipe(file_desc)) == -1) { 323 ret = errno; 324 goto err; 325 } 326 327 db_rep->read_pipe = file_desc[0]; 328 db_rep->write_pipe = file_desc[1]; 329 return (0); 330err: 331 if (queue_inited) 332 (void)pthread_cond_destroy(&db_rep->queue_nonempty); 333 if (elect_inited) 334 (void)pthread_cond_destroy(&db_rep->check_election); 335 if (ack_inited) 336 (void)pthread_cond_destroy(&db_rep->ack_condition); 337 if (mutex_inited) 338 (void)pthread_mutex_destroy(&db_rep->mutex); 339 db_rep->read_pipe = db_rep->write_pipe = -1; 340 341 return (ret); 342} 343 344/* 345 * PUBLIC: int __repmgr_close_sync __P((ENV *)); 346 * 347 * Frees the thread synchronization data within a repmgr struct, in a 348 * platform-specific way. 349 */ 350int 351__repmgr_close_sync(env) 352 ENV *env; 353{ 354 DB_REP *db_rep; 355 int ret, t_ret; 356 357 db_rep = env->rep_handle; 358 359 if (!(REPMGR_SYNC_INITED(db_rep))) 360 return (0); 361 362 ret = pthread_cond_destroy(&db_rep->queue_nonempty); 363 364 if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 && 365 ret == 0) 366 ret = t_ret; 367 368 if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 && 369 ret == 0) 370 ret = t_ret; 371 372 if ((t_ret = pthread_mutex_destroy(&db_rep->mutex)) != 0 && 373 ret == 0) 374 ret = t_ret; 375 376 if (close(db_rep->read_pipe) == -1 && ret == 0) 377 ret = errno; 378 if (close(db_rep->write_pipe) == -1 && ret == 0) 379 ret = errno; 380 381 db_rep->read_pipe = db_rep->write_pipe = -1; 382 return (ret); 383} 384 385/* 386 * Performs net-related resource initialization other than memory initialization 387 * and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing" 388 * sentinel signifying that these resources are allocated. 389 * 390 * PUBLIC: int __repmgr_net_init __P((ENV *, DB_REP *)); 391 */ 392int 393__repmgr_net_init(env, db_rep) 394 ENV *env; 395 DB_REP *db_rep; 396{ 397 int ret; 398 struct sigaction sigact; 399 400 if ((ret = __repmgr_listen(env)) != 0) 401 return (ret); 402 403 /* 404 * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed 405 * just for trying to write onto a socket that had been reset. 406 */ 407 if (sigaction(SIGPIPE, NULL, &sigact) == -1) { 408 ret = errno; 409 __db_err(env, ret, "can't access signal handler"); 410 goto err; 411 } 412 /* 413 * If we need to change the sig handler, do so, and also set a flag so 414 * that we remember we did. 415 */ 416 if ((db_rep->chg_sig_handler = (sigact.sa_handler == SIG_DFL))) { 417 sigact.sa_handler = SIG_IGN; 418 sigact.sa_flags = 0; 419 if (sigaction(SIGPIPE, &sigact, NULL) == -1) { 420 ret = errno; 421 __db_err(env, ret, "can't access signal handler"); 422 goto err; 423 } 424 } 425 return (0); 426 427err: 428 (void)closesocket(db_rep->listen_fd); 429 db_rep->listen_fd = INVALID_SOCKET; 430 return (ret); 431} 432 433/* 434 * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *)); 435 */ 436int 437__repmgr_lock_mutex(mutex) 438 mgr_mutex_t *mutex; 439{ 440 return (pthread_mutex_lock(mutex)); 441} 442 443/* 444 * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *)); 445 */ 446int 447__repmgr_unlock_mutex(mutex) 448 mgr_mutex_t *mutex; 449{ 450 return (pthread_mutex_unlock(mutex)); 451} 452 453/* 454 * Signals a condition variable. 455 * 456 * !!! 457 * Caller must hold mutex. 458 * 459 * PUBLIC: int __repmgr_signal __P((cond_var_t *)); 460 */ 461int 462__repmgr_signal(v) 463 cond_var_t *v; 464{ 465 return (pthread_cond_broadcast(v)); 466} 467 468/* 469 * PUBLIC: int __repmgr_wake_main_thread __P((ENV*)); 470 */ 471int 472__repmgr_wake_main_thread(env) 473 ENV *env; 474{ 475 DB_REP *db_rep; 476 u_int8_t any_value; 477 478 COMPQUIET(any_value, 0); 479 db_rep = env->rep_handle; 480 481 /* 482 * It doesn't matter what byte value we write. Just the appearance of a 483 * byte in the stream is enough to wake up the select() thread reading 484 * the pipe. 485 */ 486 if (write(db_rep->write_pipe, &any_value, 1) == -1) 487 return (errno); 488 return (0); 489} 490 491/* 492 * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *)); 493 */ 494int 495__repmgr_writev(fd, iovec, buf_count, byte_count_p) 496 socket_t fd; 497 db_iovec_t *iovec; 498 int buf_count; 499 size_t *byte_count_p; 500{ 501 int nw; 502 503 if ((nw = writev(fd, iovec, buf_count)) == -1) 504 return (errno); 505 *byte_count_p = (size_t)nw; 506 return (0); 507} 508 509/* 510 * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *)); 511 */ 512int 513__repmgr_readv(fd, iovec, buf_count, byte_count_p) 514 socket_t fd; 515 db_iovec_t *iovec; 516 int buf_count; 517 size_t *byte_count_p; 518{ 519 ssize_t nw; 520 521 if ((nw = readv(fd, iovec, buf_count)) == -1) 522 return (errno); 523 *byte_count_p = (size_t)nw; 524 return (0); 525} 526 527/* 528 * PUBLIC: int __repmgr_select_loop __P((ENV *)); 529 */ 530int 531__repmgr_select_loop(env) 532 ENV *env; 533{ 534 struct timeval select_timeout, *select_timeout_p; 535 DB_REP *db_rep; 536 REPMGR_CONNECTION *conn, *next; 537 db_timespec timeout; 538 fd_set reads, writes; 539 int ret, flow_control, maxfd; 540 u_int8_t buf[10]; /* arbitrary size */ 541 542 flow_control = FALSE; 543 544 db_rep = env->rep_handle; 545 /* 546 * Almost this entire thread operates while holding the mutex. But note 547 * that it never blocks, except in the call to select() (which is the 548 * one place we relinquish the mutex). 549 */ 550 LOCK_MUTEX(db_rep->mutex); 551 if ((ret = __repmgr_first_try_connections(env)) != 0) 552 goto out; 553 for (;;) { 554 FD_ZERO(&reads); 555 FD_ZERO(&writes); 556 557 /* 558 * Always ask for input on listening socket and signalling 559 * pipe. 560 */ 561 FD_SET((u_int)db_rep->listen_fd, &reads); 562 maxfd = db_rep->listen_fd; 563 564 FD_SET((u_int)db_rep->read_pipe, &reads); 565 if (db_rep->read_pipe > maxfd) 566 maxfd = db_rep->read_pipe; 567 568 /* 569 * Examine all connections to see what sort of I/O to ask for on 570 * each one. Clean up defunct connections; note that this is 571 * the only place where elements get deleted from this list. 572 * 573 * The TAILQ_FOREACH macro would be suitable here, except that 574 * it doesn't allow unlinking the current element., which is 575 * needed for cleanup_connection. 576 */ 577 for (conn = TAILQ_FIRST(&db_rep->connections); 578 conn != NULL; 579 conn = next) { 580 next = TAILQ_NEXT(conn, entries); 581 582 if (conn->state == CONN_DEFUNCT) { 583 if ((ret = __repmgr_cleanup_connection(env, 584 conn)) != 0) 585 goto out; 586 continue; 587 } 588 589 if (conn->state == CONN_CONNECTING) { 590 FD_SET((u_int)conn->fd, &reads); 591 FD_SET((u_int)conn->fd, &writes); 592 if (conn->fd > maxfd) 593 maxfd = conn->fd; 594 continue; 595 } 596 597 if (!STAILQ_EMPTY(&conn->outbound_queue)) { 598 FD_SET((u_int)conn->fd, &writes); 599 if (conn->fd > maxfd) 600 maxfd = conn->fd; 601 } 602 /* 603 * If we haven't yet gotten site's handshake, then read 604 * from it even if we're flow-controlling. 605 */ 606 if (!flow_control || !IS_VALID_EID(conn->eid)) { 607 FD_SET((u_int)conn->fd, &reads); 608 if (conn->fd > maxfd) 609 maxfd = conn->fd; 610 } 611 } 612 613 if (__repmgr_compute_timeout(env, &timeout)) { 614 /* Convert the timespec to a timeval. */ 615 select_timeout.tv_sec = timeout.tv_sec; 616 select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US; 617 select_timeout_p = &select_timeout; 618 } else { 619 /* No time-based events, so wait only for I/O. */ 620 select_timeout_p = NULL; 621 } 622 623 UNLOCK_MUTEX(db_rep->mutex); 624 625 if ((ret = select(maxfd + 1, 626 &reads, &writes, NULL, select_timeout_p)) == -1) { 627 switch (ret = errno) { 628 case EINTR: 629 case EWOULDBLOCK: 630 LOCK_MUTEX(db_rep->mutex); 631 continue; /* simply retry */ 632 default: 633 __db_err(env, ret, "select"); 634 return (ret); 635 } 636 } 637 LOCK_MUTEX(db_rep->mutex); 638 639 /* 640 * Timer expiration events include retrying of lost connections. 641 * Obviously elements can be added to the connection list there. 642 */ 643 if ((ret = __repmgr_check_timeouts(env)) != 0) 644 goto out; 645 646 /* 647 * Examine each connection, to see what work needs to be done. 648 * Except for one obscure case in finish_connecting, no 649 * structural change to the connections list happens here. 650 */ 651 TAILQ_FOREACH(conn, &db_rep->connections, entries) { 652 if (conn->state == CONN_DEFUNCT) 653 continue; 654 655 if ((ret = __repmgr_conn_work(env, 656 conn, &reads, &writes, flow_control)) != 0) 657 goto out; 658 } 659 660 /* 661 * Read any bytes in the signalling pipe. Note that we don't 662 * actually need to do anything with them; they're just there to 663 * wake us up when necessary. 664 */ 665 if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) { 666 if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) { 667 ret = errno; 668 goto out; 669 } else if (db_rep->finished) { 670 ret = 0; 671 goto out; 672 } 673 } 674 /* 675 * Obviously elements can be added to the connection list here. 676 */ 677 if (FD_ISSET((u_int)db_rep->listen_fd, &reads) && 678 (ret = __repmgr_accept(env)) != 0) 679 goto out; 680 } 681out: 682 UNLOCK_MUTEX(db_rep->mutex); 683 return (ret); 684} 685 686static int 687__repmgr_conn_work(env, conn, reads, writes, flow_control) 688 ENV *env; 689 REPMGR_CONNECTION *conn; 690 fd_set *reads, *writes; 691 int flow_control; 692{ 693 int ret; 694 u_int fd; 695 696 ret = 0; 697 fd = (u_int)conn->fd; 698 699 if (conn->state == CONN_CONNECTING) { 700 if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes)) 701 ret = finish_connecting(env, conn); 702 } else { 703 /* 704 * Here, the site is connected, and the FD_SET's are valid. 705 */ 706 if (FD_ISSET(fd, writes)) 707 ret = __repmgr_write_some(env, conn); 708 709 if (ret == 0 && !flow_control && FD_ISSET(fd, reads)) 710 ret = __repmgr_read_from_site(env, conn); 711 } 712 713 if (ret == DB_REP_UNAVAIL) 714 ret = __repmgr_bust_connection(env, conn); 715 return (ret); 716} 717 718static int 719finish_connecting(env, conn) 720 ENV *env; 721 REPMGR_CONNECTION *conn; 722{ 723 DB_REP *db_rep; 724 REPMGR_SITE *site; 725 socklen_t len; 726 SITE_STRING_BUFFER buffer; 727 u_int eid; 728 int error, ret; 729 730 len = sizeof(error); 731 if (getsockopt( 732 conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0) 733 goto err_rpt; 734 if (error) { 735 errno = error; 736 goto err_rpt; 737 } 738 739 conn->state = CONN_CONNECTED; 740 return (__repmgr_propose_version(env, conn)); 741 742err_rpt: 743 db_rep = env->rep_handle; 744 745 DB_ASSERT(env, IS_VALID_EID(conn->eid)); 746 eid = (u_int)conn->eid; 747 748 site = SITE_FROM_EID(eid); 749 __db_err(env, errno, 750 "connecting to %s", __repmgr_format_site_loc(site, buffer)); 751 752 /* If we've exhausted the list of possible addresses, give up. */ 753 if (ADDR_LIST_NEXT(&site->net_addr) == NULL) { 754 STAT(db_rep->region->mstat.st_connect_fail++); 755 return (DB_REP_UNAVAIL); 756 } 757 758 /* 759 * Since we're immediately trying the next address in the list, simply 760 * disable the failed connection, without the usual recovery. 761 */ 762 DISABLE_CONNECTION(conn); 763 764 ret = __repmgr_connect_site(env, eid); 765 DB_ASSERT(env, ret != DB_REP_UNAVAIL); 766 return (ret); 767} 768