1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#define __INCLUDE_SELECT_H 1 13#include "db_int.h" 14 15/* 16 * A very rough guess at the maximum stack space one of our threads could ever 17 * need, which we hope is plenty conservative. This can be patched in the field 18 * if necessary. 19 */ 20#ifdef _POSIX_THREAD_ATTR_STACKSIZE 21size_t __repmgr_guesstimated_max = (128 * 1024); 22#endif 23 24/* 25 * Invalid open file descriptor value, that can be used as an out-of-band 26 * sentinel to mark our signalling pipe as unopened. 27 */ 28#define NO_SUCH_FILE_DESC (-1) 29 30/* Aggregated control info needed for preparing for select() call. */ 31struct io_info { 32 fd_set *reads, *writes; 33 int maxfd; 34}; 35 36static int __repmgr_conn_work __P((ENV *, REPMGR_CONNECTION *, void *)); 37static int finish_connecting __P((ENV *, REPMGR_CONNECTION *)); 38static int prepare_io __P((ENV *, REPMGR_CONNECTION *, void *)); 39 40/* 41 * Starts the thread described in the argument, and stores the resulting thread 42 * ID therein. 43 * 44 * PUBLIC: int __repmgr_thread_start __P((ENV *, REPMGR_RUNNABLE *)); 45 */ 46int 47__repmgr_thread_start(env, runnable) 48 ENV *env; 49 REPMGR_RUNNABLE *runnable; 50{ 51 pthread_attr_t *attrp; 52#ifdef _POSIX_THREAD_ATTR_STACKSIZE 53 pthread_attr_t attributes; 54 size_t size; 55 int ret; 56#endif 57 58 runnable->finished = FALSE; 59 60#ifdef _POSIX_THREAD_ATTR_STACKSIZE 61 attrp = &attributes; 62 if ((ret = pthread_attr_init(&attributes)) != 0) { 63 __db_err(env, 64 ret, "pthread_attr_init in repmgr_thread_start"); 65 return (ret); 66 } 67 68 /* 69 * On a 64-bit machine it seems reasonable that we could need twice as 70 * much stack space as we did on a 32-bit machine. 71 */ 72 size = __repmgr_guesstimated_max; 73 if (sizeof(size_t) > 4) 74 size *= 2; 75#ifdef PTHREAD_STACK_MIN 76 if (size < PTHREAD_STACK_MIN) 77 size = PTHREAD_STACK_MIN; 78#endif 79 if ((ret = pthread_attr_setstacksize(&attributes, size)) != 0) { 80 __db_err(env, 81 ret, "pthread_attr_setstacksize in repmgr_thread_start"); 82 return (ret); 83 } 84#else 85 attrp = NULL; 86#endif 87 88 return (pthread_create(&runnable->thread_id, attrp, 89 runnable->run, env)); 90} 91 92/* 93 * PUBLIC: int __repmgr_thread_join __P((REPMGR_RUNNABLE *)); 94 */ 95int 96__repmgr_thread_join(thread) 97 REPMGR_RUNNABLE *thread; 98{ 99 return (pthread_join(thread->thread_id, NULL)); 100} 101 102/* 103 * PUBLIC: int __repmgr_set_nonblocking __P((socket_t)); 104 */ 105int 106__repmgr_set_nonblocking(fd) 107 socket_t fd; 108{ 109 int flags; 110 111 if ((flags = fcntl(fd, F_GETFL, 0)) < 0) 112 return (errno); 113 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) 114 return (errno); 115 return (0); 116} 117 118/* 119 * PUBLIC: int __repmgr_wake_waiting_senders __P((ENV *)); 120 * 121 * Wake any send()-ing threads waiting for an acknowledgement. 122 * 123 * !!! 124 * Caller must hold the db_rep->mutex, if this thread synchronization is to work 125 * properly. 126 */ 127int 128__repmgr_wake_waiting_senders(env) 129 ENV *env; 130{ 131 return (pthread_cond_broadcast(&env->rep_handle->ack_condition)); 132} 133 134/* 135 * PUBLIC: int __repmgr_await_ack __P((ENV *, const DB_LSN *)); 136 * 137 * Waits (a limited time) for configured number of remote sites to ack the given 138 * LSN. 139 * 140 * !!! 141 * Caller must hold repmgr->mutex. 142 */ 143int 144__repmgr_await_ack(env, lsnp) 145 ENV *env; 146 const DB_LSN *lsnp; 147{ 148 DB_REP *db_rep; 149 struct timespec deadline; 150 int ret, timed; 151 152 db_rep = env->rep_handle; 153 154 if ((timed = (db_rep->ack_timeout > 0))) 155 __repmgr_compute_wait_deadline(env, &deadline, 156 db_rep->ack_timeout); 157 else 158 COMPQUIET(deadline.tv_sec, 0); 159 160 while (!__repmgr_is_permanent(env, lsnp)) { 161 if (timed) 162 ret = pthread_cond_timedwait(&db_rep->ack_condition, 163 db_rep->mutex, &deadline); 164 else 165 ret = pthread_cond_wait(&db_rep->ack_condition, 166 db_rep->mutex); 167 if (db_rep->finished) 168 return (DB_REP_UNAVAIL); 169 if (ret != 0) 170 return (ret); 171 } 172 return (0); 173} 174 175/* 176 * __repmgr_compute_wait_deadline -- 177 * Computes a deadline time a certain distance into the future. 178 * 179 * PUBLIC: void __repmgr_compute_wait_deadline __P((ENV*, 180 * PUBLIC: struct timespec *, db_timeout_t)); 181 */ 182void 183__repmgr_compute_wait_deadline(env, result, wait) 184 ENV *env; 185 struct timespec *result; 186 db_timeout_t wait; 187{ 188 /* 189 * The result is suitable for the pthread_cond_timewait call. (That 190 * call uses nano-second resolution; elsewhere we use microseconds.) 191 * 192 * Start with "now"; then add the "wait" offset. 193 * 194 * A db_timespec is the same as a "struct timespec" so we can pass 195 * result directly to the underlying Berkeley DB OS routine. 196 * 197 * !!! 198 * We use the system clock for the pthread_cond_timedwait call, but 199 * that's not optimal on systems with monotonic timers. Instead, 200 * we should call pthread_condattr_setclock on systems where it and 201 * monotonic timers are available, and then configure both this call 202 * and the subsequent pthread_cond_timewait call to use a monotonic 203 * timer. 204 */ 205 __os_gettime(env, (db_timespec *)result, 0); 206 TIMESPEC_ADD_DB_TIMEOUT(result, wait); 207} 208 209/* 210 * PUBLIC: int __repmgr_await_drain __P((ENV *, 211 * PUBLIC: REPMGR_CONNECTION *, db_timeout_t)); 212 * 213 * Waits for space to become available on the connection's output queue. 214 * Various ways we can exit: 215 * 216 * 1. queue becomes non-full 217 * 2. exceed time limit 218 * 3. connection becomes defunct (due to error in another thread) 219 * 4. repmgr is shutting down 220 * 5. any unexpected system resource failure 221 * 222 * In cases #3 and #5 we return an error code. Caller is responsible for 223 * distinguishing the remaining cases if desired. 224 * 225 * !!! 226 * Caller must hold repmgr->mutex. 227 */ 228int 229__repmgr_await_drain(env, conn, timeout) 230 ENV *env; 231 REPMGR_CONNECTION *conn; 232 db_timeout_t timeout; 233{ 234 DB_REP *db_rep; 235 struct timespec deadline; 236 int ret; 237 238 db_rep = env->rep_handle; 239 240 __repmgr_compute_wait_deadline(env, &deadline, timeout); 241 242 ret = 0; 243 while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { 244 ret = pthread_cond_timedwait(&conn->drained, 245 db_rep->mutex, &deadline); 246 switch (ret) { 247 case 0: 248 if (db_rep->finished) 249 goto out; /* #4. */ 250 /* 251 * Another thread could have stumbled into an error on 252 * the socket while we were waiting. 253 */ 254 if (conn->state == CONN_DEFUNCT) { 255 ret = DB_REP_UNAVAIL; /* #3. */ 256 goto out; 257 } 258 break; 259 case ETIMEDOUT: 260 conn->state = CONN_CONGESTED; 261 ret = 0; 262 goto out; /* #2. */ 263 default: 264 goto out; /* #5. */ 265 } 266 } 267 /* #1. */ 268 269out: 270 return (ret); 271} 272 273/* 274 * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *)); 275 * 276 * Initialize a condition variable (in allocated space). 277 */ 278int 279__repmgr_alloc_cond(c) 280 cond_var_t *c; 281{ 282 return (pthread_cond_init(c, NULL)); 283} 284 285/* 286 * PUBLIC: int __repmgr_free_cond __P((cond_var_t *)); 287 * 288 * Clean up a previously initialized condition variable. 289 */ 290int 291__repmgr_free_cond(c) 292 cond_var_t *c; 293{ 294 return (pthread_cond_destroy(c)); 295} 296 297/* 298 * PUBLIC: void __repmgr_env_create_pf __P((DB_REP *)); 299 */ 300void 301__repmgr_env_create_pf(db_rep) 302 DB_REP *db_rep; 303{ 304 db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC; 305} 306 307/* 308 * PUBLIC: int __repmgr_create_mutex_pf __P((mgr_mutex_t *)); 309 */ 310int 311__repmgr_create_mutex_pf(mutex) 312 mgr_mutex_t *mutex; 313{ 314 return (pthread_mutex_init(mutex, NULL)); 315} 316 317/* 318 * PUBLIC: int __repmgr_destroy_mutex_pf __P((mgr_mutex_t *)); 319 */ 320int 321__repmgr_destroy_mutex_pf(mutex) 322 mgr_mutex_t *mutex; 323{ 324 return (pthread_mutex_destroy(mutex)); 325} 326 327/* 328 * PUBLIC: int __repmgr_init __P((ENV *)); 329 */ 330int 331__repmgr_init(env) 332 ENV *env; 333{ 334 DB_REP *db_rep; 335 struct sigaction sigact; 336 int ack_inited, elect_inited, file_desc[2], queue_inited, ret; 337 338 db_rep = env->rep_handle; 339 340 /* 341 * Make sure we're not ignoring SIGPIPE, 'cuz otherwise we'd be killed 342 * just for trying to write onto a socket that had been reset. Note 343 * that we don't undo this in case of a later error, since we document 344 * that we leave the signal handling state like this, even after env 345 * close. 346 */ 347 if (sigaction(SIGPIPE, NULL, &sigact) == -1) { 348 ret = errno; 349 __db_err(env, ret, "can't access signal handler"); 350 return (ret); 351 } 352 if (sigact.sa_handler == SIG_DFL) { 353 sigact.sa_handler = SIG_IGN; 354 sigact.sa_flags = 0; 355 if (sigaction(SIGPIPE, &sigact, NULL) == -1) { 356 ret = errno; 357 __db_err(env, ret, "can't access signal handler"); 358 return (ret); 359 } 360 } 361 362 ack_inited = elect_inited = queue_inited = FALSE; 363 if ((ret = pthread_cond_init(&db_rep->ack_condition, NULL)) != 0) 364 goto err; 365 ack_inited = TRUE; 366 367 if ((ret = pthread_cond_init(&db_rep->check_election, NULL)) != 0) 368 goto err; 369 elect_inited = TRUE; 370 371 if ((ret = pthread_cond_init(&db_rep->queue_nonempty, NULL)) != 0) 372 goto err; 373 queue_inited = TRUE; 374 375 if ((ret = pipe(file_desc)) == -1) { 376 ret = errno; 377 goto err; 378 } 379 380 db_rep->read_pipe = file_desc[0]; 381 db_rep->write_pipe = file_desc[1]; 382 return (0); 383err: 384 if (queue_inited) 385 (void)pthread_cond_destroy(&db_rep->queue_nonempty); 386 if (elect_inited) 387 (void)pthread_cond_destroy(&db_rep->check_election); 388 if (ack_inited) 389 (void)pthread_cond_destroy(&db_rep->ack_condition); 390 db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC; 391 392 return (ret); 393} 394 395/* 396 * PUBLIC: int __repmgr_deinit __P((ENV *)); 397 */ 398int 399__repmgr_deinit(env) 400 ENV *env; 401{ 402 DB_REP *db_rep; 403 int ret, t_ret; 404 405 db_rep = env->rep_handle; 406 407 if (!(REPMGR_INITED(db_rep))) 408 return (0); 409 410 ret = pthread_cond_destroy(&db_rep->queue_nonempty); 411 412 if ((t_ret = pthread_cond_destroy(&db_rep->check_election)) != 0 && 413 ret == 0) 414 ret = t_ret; 415 416 if ((t_ret = pthread_cond_destroy(&db_rep->ack_condition)) != 0 && 417 ret == 0) 418 ret = t_ret; 419 420 if (close(db_rep->read_pipe) == -1 && ret == 0) 421 ret = errno; 422 if (close(db_rep->write_pipe) == -1 && ret == 0) 423 ret = errno; 424 425 db_rep->read_pipe = db_rep->write_pipe = NO_SUCH_FILE_DESC; 426 return (ret); 427} 428 429/* 430 * PUBLIC: int __repmgr_lock_mutex __P((mgr_mutex_t *)); 431 */ 432int 433__repmgr_lock_mutex(mutex) 434 mgr_mutex_t *mutex; 435{ 436 return (pthread_mutex_lock(mutex)); 437} 438 439/* 440 * PUBLIC: int __repmgr_unlock_mutex __P((mgr_mutex_t *)); 441 */ 442int 443__repmgr_unlock_mutex(mutex) 444 mgr_mutex_t *mutex; 445{ 446 return (pthread_mutex_unlock(mutex)); 447} 448 449/* 450 * Signals a condition variable. 451 * 452 * !!! 453 * Caller must hold mutex. 454 * 455 * PUBLIC: int __repmgr_signal __P((cond_var_t *)); 456 */ 457int 458__repmgr_signal(v) 459 cond_var_t *v; 460{ 461 return (pthread_cond_broadcast(v)); 462} 463 464/* 465 * PUBLIC: int __repmgr_wake_main_thread __P((ENV*)); 466 */ 467int 468__repmgr_wake_main_thread(env) 469 ENV *env; 470{ 471 DB_REP *db_rep; 472 u_int8_t any_value; 473 474 COMPQUIET(any_value, 0); 475 db_rep = env->rep_handle; 476 477 /* 478 * It doesn't matter what byte value we write. Just the appearance of a 479 * byte in the stream is enough to wake up the select() thread reading 480 * the pipe. 481 */ 482 if (write(db_rep->write_pipe, &any_value, 1) == -1) 483 return (errno); 484 return (0); 485} 486 487/* 488 * PUBLIC: int __repmgr_writev __P((socket_t, db_iovec_t *, int, size_t *)); 489 */ 490int 491__repmgr_writev(fd, iovec, buf_count, byte_count_p) 492 socket_t fd; 493 db_iovec_t *iovec; 494 int buf_count; 495 size_t *byte_count_p; 496{ 497 int nw; 498 499 if ((nw = writev(fd, iovec, buf_count)) == -1) 500 return (errno); 501 *byte_count_p = (size_t)nw; 502 return (0); 503} 504 505/* 506 * PUBLIC: int __repmgr_readv __P((socket_t, db_iovec_t *, int, size_t *)); 507 */ 508int 509__repmgr_readv(fd, iovec, buf_count, byte_count_p) 510 socket_t fd; 511 db_iovec_t *iovec; 512 int buf_count; 513 size_t *byte_count_p; 514{ 515 ssize_t nw; 516 517 if ((nw = readv(fd, iovec, buf_count)) == -1) 518 return (errno); 519 *byte_count_p = (size_t)nw; 520 return (0); 521} 522 523/* 524 * PUBLIC: int __repmgr_select_loop __P((ENV *)); 525 */ 526int 527__repmgr_select_loop(env) 528 ENV *env; 529{ 530 struct timeval select_timeout, *select_timeout_p; 531 DB_REP *db_rep; 532 db_timespec timeout; 533 fd_set reads, writes; 534 struct io_info io_info; 535 int ret; 536 u_int8_t buf[10]; /* arbitrary size */ 537 538 db_rep = env->rep_handle; 539 /* 540 * Almost this entire thread operates while holding the mutex. But note 541 * that it never blocks, except in the call to select() (which is the 542 * one place we relinquish the mutex). 543 */ 544 LOCK_MUTEX(db_rep->mutex); 545 if ((ret = __repmgr_first_try_connections(env)) != 0) 546 goto out; 547 for (;;) { 548 FD_ZERO(&reads); 549 FD_ZERO(&writes); 550 551 /* 552 * Figure out which sockets to ask for input and output. It's 553 * simple for the signalling pipe and listen socket; but depends 554 * on backlog states for the connections to other sites. 555 */ 556 FD_SET((u_int)db_rep->read_pipe, &reads); 557 io_info.maxfd = db_rep->read_pipe; 558 559 if (!IS_SUBORDINATE(db_rep)) { 560 FD_SET((u_int)db_rep->listen_fd, &reads); 561 if (db_rep->listen_fd > io_info.maxfd) 562 io_info.maxfd = db_rep->listen_fd; 563 } 564 565 io_info.reads = &reads; 566 io_info.writes = &writes; 567 if ((ret = __repmgr_each_connection(env, 568 prepare_io, &io_info, TRUE)) != 0) 569 goto out; 570 571 if (__repmgr_compute_timeout(env, &timeout)) { 572 /* Convert the timespec to a timeval. */ 573 select_timeout.tv_sec = timeout.tv_sec; 574 select_timeout.tv_usec = timeout.tv_nsec / NS_PER_US; 575 select_timeout_p = &select_timeout; 576 } else { 577 /* No time-based events, so wait only for I/O. */ 578 select_timeout_p = NULL; 579 } 580 581 UNLOCK_MUTEX(db_rep->mutex); 582 583 if ((ret = select(io_info.maxfd + 1, 584 &reads, &writes, NULL, select_timeout_p)) == -1) { 585 switch (ret = errno) { 586 case EINTR: 587 case EWOULDBLOCK: 588 LOCK_MUTEX(db_rep->mutex); 589 continue; /* simply retry */ 590 default: 591 __db_err(env, ret, "select"); 592 return (ret); 593 } 594 } 595 LOCK_MUTEX(db_rep->mutex); 596 597 /* 598 * Timer expiration events include retrying of lost connections. 599 * Obviously elements can be added to the connection list there. 600 */ 601 if ((ret = __repmgr_check_timeouts(env)) != 0) 602 goto out; 603 604 if ((ret = __repmgr_each_connection(env, 605 __repmgr_conn_work, &io_info, TRUE)) != 0) 606 goto out; 607 608 /* 609 * Read any bytes in the signalling pipe. Note that we don't 610 * actually need to do anything with them; they're just there to 611 * wake us up when necessary. 612 */ 613 if (FD_ISSET((u_int)db_rep->read_pipe, &reads)) { 614 if (read(db_rep->read_pipe, buf, sizeof(buf)) <= 0) { 615 ret = errno; 616 goto out; 617 } else if (db_rep->finished) { 618 ret = 0; 619 goto out; 620 } 621 } 622 /* 623 * Obviously elements can be added to the connection list here. 624 */ 625 if (!IS_SUBORDINATE(db_rep) && 626 FD_ISSET((u_int)db_rep->listen_fd, &reads) && 627 (ret = __repmgr_accept(env)) != 0) 628 goto out; 629 } 630out: 631 UNLOCK_MUTEX(db_rep->mutex); 632 return (ret); 633} 634 635/* 636 * Examines a connection to see what sort of I/O to ask for. Clean up defunct 637 * connections. 638 */ 639static int 640prepare_io(env, conn, info_) 641 ENV *env; 642 REPMGR_CONNECTION *conn; 643 void *info_; 644{ 645 struct io_info *info; 646 647 info = info_; 648 649 if (conn->state == CONN_DEFUNCT) 650 return (__repmgr_cleanup_connection(env, conn)); 651 652 if (conn->state == CONN_CONNECTING) { 653 FD_SET((u_int)conn->fd, info->reads); 654 FD_SET((u_int)conn->fd, info->writes); 655 if (conn->fd > info->maxfd) 656 info->maxfd = conn->fd; 657 return (0); 658 } 659 660 if (!STAILQ_EMPTY(&conn->outbound_queue)) { 661 FD_SET((u_int)conn->fd, info->writes); 662 if (conn->fd > info->maxfd) 663 info->maxfd = conn->fd; 664 } 665 /* 666 * For now we always accept incoming data. If we ever implement some 667 * kind of flow control, we should override it for fledgling connections 668 * (!IS_VALID_EID(conn->eid)) -- in other words, allow reading such a 669 * connection even during flow control duress. 670 */ 671 FD_SET((u_int)conn->fd, info->reads); 672 if (conn->fd > info->maxfd) 673 info->maxfd = conn->fd; 674 675 return (0); 676} 677 678/* 679 * Examine a connection, to see what work needs to be done. 680 */ 681static int 682__repmgr_conn_work(env, conn, info_) 683 ENV *env; 684 REPMGR_CONNECTION *conn; 685 void *info_; 686{ 687 struct io_info *info; 688 int ret; 689 u_int fd; 690 691 ret = 0; 692 fd = (u_int)conn->fd; 693 info = info_; 694 695 if (conn->state == CONN_DEFUNCT) 696 return (0); 697 698 if (conn->state == CONN_CONNECTING) { 699 if (FD_ISSET(fd, info->reads) || FD_ISSET(fd, info->writes)) 700 ret = finish_connecting(env, conn); 701 } else { 702 /* 703 * Here, the site is connected, and the FD_SET's are valid. 704 */ 705 if (FD_ISSET(fd, info->writes)) 706 ret = __repmgr_write_some(env, conn); 707 708 if (ret == 0 && FD_ISSET(fd, info->reads)) 709 ret = __repmgr_read_from_site(env, conn); 710 } 711 712 if (ret == DB_REP_UNAVAIL) 713 ret = __repmgr_bust_connection(env, conn); 714 return (ret); 715} 716 717static int 718finish_connecting(env, conn) 719 ENV *env; 720 REPMGR_CONNECTION *conn; 721{ 722 DB_REP *db_rep; 723 REPMGR_SITE *site; 724 socklen_t len; 725 SITE_STRING_BUFFER buffer; 726 u_int eid; 727 int error, ret; 728 729 db_rep = env->rep_handle; 730 731 DB_ASSERT(env, IS_VALID_EID(conn->eid)); 732 eid = (u_int)conn->eid; 733 site = SITE_FROM_EID(eid); 734 735 len = sizeof(error); 736 if (getsockopt( 737 conn->fd, SOL_SOCKET, SO_ERROR, (sockopt_t)&error, &len) < 0) 738 goto err_rpt; 739 if (error) { 740 errno = error; 741 goto err_rpt; 742 } 743 744 conn->state = CONN_CONNECTED; 745 __os_gettime(env, &site->last_rcvd_timestamp, 1); 746 return (__repmgr_propose_version(env, conn)); 747 748err_rpt: 749 __db_err(env, errno, 750 "connecting to %s", __repmgr_format_site_loc(site, buffer)); 751 752 /* If we've exhausted the list of possible addresses, give up. */ 753 if (ADDR_LIST_NEXT(&site->net_addr) == NULL) { 754 STAT(db_rep->region->mstat.st_connect_fail++); 755 return (DB_REP_UNAVAIL); 756 } 757 758 /* 759 * Since we're immediately trying the next address in the list, simply 760 * disable the failed connection, without the usual recovery. 761 */ 762 __repmgr_disable_connection(env, conn); 763 764 ret = __repmgr_connect_site(env, eid); 765 DB_ASSERT(env, ret != DB_REP_UNAVAIL); 766 return (ret); 767} 768