1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2006-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14typedef int (*HEARTBEAT_ACTION) __P((ENV *)); 15 16static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); 17static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); 18static int __repmgr_call_election __P((ENV *)); 19static int __repmgr_connect __P((ENV*, socket_t *, REPMGR_SITE *)); 20static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *)); 21static int find_version_info __P((ENV *, REPMGR_CONNECTION *, DBT *)); 22static int introduce_site __P((ENV *, char *, u_int, REPMGR_SITE**, u_int32_t)); 23static int __repmgr_next_timeout __P((ENV *, 24 db_timespec *, HEARTBEAT_ACTION *)); 25static int dispatch_phase_completion __P((ENV *, REPMGR_CONNECTION *)); 26static REPMGR_CONNECTION *__repmgr_master_connection __P((ENV *)); 27static int process_parameters __P((ENV *, 28 REPMGR_CONNECTION *, char *, u_int, u_int32_t, u_int32_t)); 29static int read_version_response __P((ENV *, REPMGR_CONNECTION *)); 30static int record_ack __P((ENV *, REPMGR_CONNECTION *)); 31static int __repmgr_retry_connections __P((ENV *)); 32static int send_handshake __P((ENV *, REPMGR_CONNECTION *, void *, size_t)); 33static int __repmgr_send_heartbeat __P((ENV *)); 34static int send_v1_handshake __P((ENV *, 35 REPMGR_CONNECTION *, void *, size_t)); 36static int send_version_response __P((ENV *, REPMGR_CONNECTION *)); 37static int __repmgr_try_one __P((ENV *, u_int)); 38 39#define ONLY_HANDSHAKE(env, conn) do { \ 40 if (conn->msg_type != REPMGR_HANDSHAKE) { \ 41 __db_errx(env, "unexpected msg type %d in state %d", \ 42 (int)conn->msg_type, conn->state); \ 43 return (DB_REP_UNAVAIL); \ 44 } \ 45} while (0) 46 47/* 48 * PUBLIC: void *__repmgr_select_thread __P((void *)); 49 */ 50void * 51__repmgr_select_thread(args) 52 void *args; 53{ 54 ENV *env = args; 55 int ret; 56 57 if ((ret = __repmgr_select_loop(env)) != 0) { 58 __db_err(env, ret, "select loop failed"); 59 __repmgr_thread_failure(env, ret); 60 } 61 return (NULL); 62} 63 64/* 65 * PUBLIC: int __repmgr_accept __P((ENV *)); 66 */ 67int 68__repmgr_accept(env) 69 ENV *env; 70{ 71 DB_REP *db_rep; 72 REPMGR_CONNECTION *conn; 73 struct sockaddr_in siaddr; 74 socklen_t addrlen; 75 socket_t s; 76 int ret; 77#ifdef DB_WIN32 78 WSAEVENT event_obj; 79#endif 80 81 db_rep = env->rep_handle; 82 addrlen = sizeof(siaddr); 83 if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr, 84 &addrlen)) == -1) { 85 /* 86 * Some errors are innocuous and so should be ignored. MSDN 87 * Library documents the Windows ones; the Unix ones are 88 * advocated in Stevens' UNPv1, section 16.6; and Linux 89 * Application Development, p. 416. 90 */ 91 switch (ret = net_errno) { 92#ifdef DB_WIN32 93 case WSAECONNRESET: 94 case WSAEWOULDBLOCK: 95#else 96 case EINTR: 97 case EWOULDBLOCK: 98 case ECONNABORTED: 99 case ENETDOWN: 100#ifdef EPROTO 101 case EPROTO: 102#endif 103 case ENOPROTOOPT: 104 case EHOSTDOWN: 105#ifdef ENONET 106 case ENONET: 107#endif 108 case EHOSTUNREACH: 109 case EOPNOTSUPP: 110 case ENETUNREACH: 111#endif 112 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 113 "accept error %d considered innocuous", ret)); 114 return (0); 115 default: 116 __db_err(env, ret, "accept error"); 117 return (ret); 118 } 119 } 120 RPRINT(env, DB_VERB_REPMGR_MISC, (env, "accepted a new connection")); 121 122 if ((ret = __repmgr_set_nonblocking(s)) != 0) { 123 __db_err(env, ret, "can't set nonblock after accept"); 124 (void)closesocket(s); 125 return (ret); 126 } 127 128#ifdef DB_WIN32 129 if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { 130 ret = net_errno; 131 __db_err(env, ret, "can't create WSA event"); 132 (void)closesocket(s); 133 return (ret); 134 } 135 if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) { 136 ret = net_errno; 137 __db_err(env, ret, "can't set desired event bits"); 138 (void)WSACloseEvent(event_obj); 139 (void)closesocket(s); 140 return (ret); 141 } 142#endif 143 if ((ret = 144 __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) { 145#ifdef DB_WIN32 146 (void)WSACloseEvent(event_obj); 147#endif 148 (void)closesocket(s); 149 return (ret); 150 } 151 F_SET(conn, CONN_INCOMING); 152 153 /* 154 * We don't yet know which site this connection is coming from. So for 155 * now, put it on the "orphans" list; we'll move it to the appropriate 156 * site struct later when we discover who we're talking with, and what 157 * type of connection it is. 158 */ 159 conn->eid = -1; 160 TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries); 161 162#ifdef DB_WIN32 163 conn->event_object = event_obj; 164#endif 165 return (0); 166} 167 168/* 169 * Computes how long we should wait for input, in other words how long until we 170 * have to wake up and do something. Returns TRUE if timeout is set; FALSE if 171 * there is nothing to wait for. 172 * 173 * Note that the resulting timeout could be zero; but it can't be negative. 174 * 175 * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *)); 176 */ 177int 178__repmgr_compute_timeout(env, timeout) 179 ENV *env; 180 db_timespec *timeout; 181{ 182 DB_REP *db_rep; 183 REPMGR_RETRY *retry; 184 db_timespec now, t; 185 int have_timeout; 186 187 db_rep = env->rep_handle; 188 189 /* 190 * There are two factors to consider: are heartbeats in use? and, do we 191 * have any sites with broken connections that we ought to retry? 192 */ 193 have_timeout = __repmgr_next_timeout(env, &t, NULL); 194 195 /* List items are in order, so we only have to examine the first one. */ 196 if (!TAILQ_EMPTY(&db_rep->retries)) { 197 retry = TAILQ_FIRST(&db_rep->retries); 198 if (have_timeout) { 199 /* Choose earliest timeout deadline. */ 200 t = timespeccmp(&retry->time, &t, <) ? retry->time : t; 201 } else { 202 t = retry->time; 203 have_timeout = TRUE; 204 } 205 } 206 207 if (have_timeout) { 208 __os_gettime(env, &now, 1); 209 if (timespeccmp(&now, &t, >=)) 210 timespecclear(timeout); 211 else { 212 *timeout = t; 213 timespecsub(timeout, &now); 214 } 215 } 216 217 return (have_timeout); 218} 219 220/* 221 * Figures out the next heartbeat-related thing to be done, and when it should 222 * be done. The code is factored this way because this computation needs to be 223 * done both before each select() call, and after (when we're checking for timer 224 * expiration). 225 */ 226static int 227__repmgr_next_timeout(env, deadline, action) 228 ENV *env; 229 db_timespec *deadline; 230 HEARTBEAT_ACTION *action; 231{ 232 DB_REP *db_rep; 233 HEARTBEAT_ACTION my_action; 234 REPMGR_CONNECTION *conn; 235 REPMGR_SITE *site; 236 db_timespec t; 237 238 db_rep = env->rep_handle; 239 240 if (db_rep->master_eid == SELF_EID && db_rep->heartbeat_frequency > 0) { 241 t = db_rep->last_bcast; 242 TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_frequency); 243 my_action = __repmgr_send_heartbeat; 244 } else if ((conn = __repmgr_master_connection(env)) != NULL && 245 !IS_SUBORDINATE(db_rep) && 246 db_rep->heartbeat_monitor_timeout > 0 && 247 conn->version >= HEARTBEAT_MIN_VERSION) { 248 /* 249 * If we have a working connection to a heartbeat-aware master, 250 * let's monitor it. Otherwise there's really nothing we can 251 * do. 252 */ 253 site = SITE_FROM_EID(db_rep->master_eid); 254 t = site->last_rcvd_timestamp; 255 TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_monitor_timeout); 256 my_action = __repmgr_call_election; 257 } else 258 return (FALSE); 259 260 *deadline = t; 261 if (action != NULL) 262 *action = my_action; 263 return (TRUE); 264} 265 266static int 267__repmgr_send_heartbeat(env) 268 ENV *env; 269{ 270 DBT control, rec; 271 u_int unused1, unused2; 272 273 DB_INIT_DBT(control, NULL, 0); 274 DB_INIT_DBT(rec, NULL, 0); 275 return (__repmgr_send_broadcast(env, 276 REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2)); 277} 278 279static REPMGR_CONNECTION * 280__repmgr_master_connection(env) 281 ENV *env; 282{ 283 DB_REP *db_rep; 284 REPMGR_CONNECTION *conn; 285 REPMGR_SITE *master; 286 287 db_rep = env->rep_handle; 288 289 if (db_rep->master_eid == SELF_EID || 290 !IS_VALID_EID(db_rep->master_eid)) 291 return (NULL); 292 master = SITE_FROM_EID(db_rep->master_eid); 293 if (master->state != SITE_CONNECTED) 294 return (NULL); 295 conn = master->ref.conn; 296 if (IS_READY_STATE(conn->state)) 297 return (conn); 298 return (NULL); 299} 300 301static int 302__repmgr_call_election(env) 303 ENV *env; 304{ 305 REPMGR_CONNECTION *conn; 306 307 conn = __repmgr_master_connection(env); 308 DB_ASSERT(env, conn != NULL); 309 RPRINT(env, DB_VERB_REPMGR_MISC, 310 (env, "heartbeat monitor timeout expired")); 311 STAT(env->rep_handle->region->mstat.st_connection_drop++); 312 return (__repmgr_bust_connection(env, conn)); 313} 314 315/* 316 * PUBLIC: int __repmgr_check_timeouts __P((ENV *)); 317 * 318 * !!! 319 * Assumes caller holds the mutex. 320 */ 321int 322__repmgr_check_timeouts(env) 323 ENV *env; 324{ 325 db_timespec when, now; 326 HEARTBEAT_ACTION action; 327 int ret; 328 329 /* 330 * Figure out the next heartbeat-related thing to be done. Then, if 331 * it's time to do it, do so. 332 */ 333 if (__repmgr_next_timeout(env, &when, &action)) { 334 __os_gettime(env, &now, 1); 335 if (timespeccmp(&when, &now, <=) && 336 (ret = (*action)(env)) != 0) 337 return (ret); 338 } 339 340 return (__repmgr_retry_connections(env)); 341} 342 343/* 344 * Initiates connection attempts for any sites on the idle list whose retry 345 * times have expired. 346 */ 347static int 348__repmgr_retry_connections(env) 349 ENV *env; 350{ 351 DB_REP *db_rep; 352 REPMGR_RETRY *retry; 353 db_timespec now; 354 u_int eid; 355 int ret; 356 357 db_rep = env->rep_handle; 358 __os_gettime(env, &now, 1); 359 360 while (!TAILQ_EMPTY(&db_rep->retries)) { 361 retry = TAILQ_FIRST(&db_rep->retries); 362 if (timespeccmp(&retry->time, &now, >=)) 363 break; /* since items are in time order */ 364 365 TAILQ_REMOVE(&db_rep->retries, retry, entries); 366 367 eid = retry->eid; 368 __os_free(env, retry); 369 370 if ((ret = __repmgr_try_one(env, eid)) != 0) 371 return (ret); 372 } 373 return (0); 374} 375 376/* 377 * PUBLIC: int __repmgr_first_try_connections __P((ENV *)); 378 * 379 * !!! 380 * Assumes caller holds the mutex. 381 */ 382int 383__repmgr_first_try_connections(env) 384 ENV *env; 385{ 386 DB_REP *db_rep; 387 u_int eid; 388 int ret; 389 390 db_rep = env->rep_handle; 391 for (eid = 0; eid < db_rep->site_cnt; eid++) 392 if ((ret = __repmgr_try_one(env, eid)) != 0) 393 return (ret); 394 return (0); 395} 396 397/* 398 * Makes a best-effort attempt to connect to the indicated site. Returns a 399 * non-zero error indication only for disastrous failures. For re-tryable 400 * errors, we will have scheduled another attempt, and that can be considered 401 * success enough. 402 */ 403static int 404__repmgr_try_one(env, eid) 405 ENV *env; 406 u_int eid; 407{ 408 ADDRINFO *list; 409 DB_REP *db_rep; 410 repmgr_netaddr_t *addr; 411 int ret; 412 413 db_rep = env->rep_handle; 414 415 addr = &SITE_FROM_EID(eid)->net_addr; 416 if (ADDR_LIST_FIRST(addr) == NULL) { 417 if ((ret = __repmgr_getaddr(env, 418 addr->host, addr->port, 0, &list)) == 0) { 419 addr->address_list = list; 420 (void)ADDR_LIST_FIRST(addr); 421 } else if (ret == DB_REP_UNAVAIL) 422 return (__repmgr_schedule_connection_attempt( 423 env, eid, FALSE)); 424 else 425 return (ret); 426 } 427 428 /* Here, when we have a valid address. */ 429 return (__repmgr_connect_site(env, eid)); 430} 431 432/* 433 * Tries to establish a connection with the site indicated by the given eid, 434 * starting with the "current" element of its address list and trying as many 435 * addresses as necessary until the list is exhausted. 436 * 437 * PUBLIC: int __repmgr_connect_site __P((ENV *, u_int eid)); 438 */ 439int 440__repmgr_connect_site(env, eid) 441 ENV *env; 442 u_int eid; 443{ 444 DB_REP *db_rep; 445 REPMGR_CONNECTION *con; 446 REPMGR_SITE *site; 447 socket_t s; 448 int state; 449 int ret; 450#ifdef DB_WIN32 451 long desired_event; 452 WSAEVENT event_obj; 453#endif 454 455 db_rep = env->rep_handle; 456 site = SITE_FROM_EID(eid); 457 458 switch (ret = __repmgr_connect(env, &s, site)) { 459 case 0: 460 state = CONN_CONNECTED; 461#ifdef DB_WIN32 462 desired_event = FD_READ|FD_CLOSE; 463#endif 464 break; 465 case INPROGRESS: 466 state = CONN_CONNECTING; 467#ifdef DB_WIN32 468 desired_event = FD_CONNECT; 469#endif 470 break; 471 default: 472 STAT(db_rep->region->mstat.st_connect_fail++); 473 return ( 474 __repmgr_schedule_connection_attempt(env, eid, FALSE)); 475 } 476 477#ifdef DB_WIN32 478 if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { 479 ret = net_errno; 480 __db_err(env, ret, "can't create WSA event"); 481 (void)closesocket(s); 482 return (ret); 483 } 484 if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) { 485 ret = net_errno; 486 __db_err(env, ret, "can't set desired event bits"); 487 (void)WSACloseEvent(event_obj); 488 (void)closesocket(s); 489 return (ret); 490 } 491#endif 492 493 if ((ret = __repmgr_new_connection(env, &con, s, state)) != 0) { 494#ifdef DB_WIN32 495 (void)WSACloseEvent(event_obj); 496#endif 497 (void)closesocket(s); 498 return (ret); 499 } 500#ifdef DB_WIN32 501 con->event_object = event_obj; 502#endif 503 504 con->eid = (int)eid; 505 site->ref.conn = con; 506 site->state = SITE_CONNECTED; 507 508 if (state == CONN_CONNECTED) { 509 __os_gettime(env, &site->last_rcvd_timestamp, 1); 510 switch (ret = __repmgr_propose_version(env, con)) { 511 case 0: 512 break; 513 case DB_REP_UNAVAIL: 514 return (__repmgr_bust_connection(env, con)); 515 default: 516 return (ret); 517 } 518 } 519 520 return (0); 521} 522 523static int 524__repmgr_connect(env, socket_result, site) 525 ENV *env; 526 socket_t *socket_result; 527 REPMGR_SITE *site; 528{ 529 repmgr_netaddr_t *addr; 530 ADDRINFO *ai; 531 socket_t s; 532 char *why; 533 int ret; 534 SITE_STRING_BUFFER buffer; 535 536 /* 537 * Lint doesn't know about DB_ASSERT, so it can't tell that this 538 * loop will always get executed at least once, giving 'why' a value. 539 */ 540 COMPQUIET(why, ""); 541 addr = &site->net_addr; 542 ai = ADDR_LIST_CURRENT(addr); 543 DB_ASSERT(env, ai != NULL); 544 for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) { 545 546 if ((s = socket(ai->ai_family, 547 ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) { 548 why = "can't create socket to connect"; 549 continue; 550 } 551 552 if ((ret = __repmgr_set_nonblocking(s)) != 0) { 553 __db_err(env, 554 ret, "can't make nonblock socket to connect"); 555 (void)closesocket(s); 556 return (ret); 557 } 558 559 if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) 560 ret = net_errno; 561 562 if (ret == 0 || ret == INPROGRESS) { 563 *socket_result = s; 564 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 565 "init connection to %s with result %d", 566 __repmgr_format_site_loc(site, buffer), ret)); 567 return (ret); 568 } 569 570 why = "connection failed"; 571 (void)closesocket(s); 572 } 573 574 /* We've exhausted all possible addresses. */ 575 ret = net_errno; 576 __db_err(env, ret, "%s to %s", why, 577 __repmgr_format_site_loc(site, buffer)); 578 return (ret); 579} 580 581/* 582 * Sends a proposal for version negotiation. 583 * 584 * PUBLIC: int __repmgr_propose_version __P((ENV *, REPMGR_CONNECTION *)); 585 */ 586int 587__repmgr_propose_version(env, conn) 588 ENV *env; 589 REPMGR_CONNECTION *conn; 590{ 591 DB_REP *db_rep; 592 __repmgr_version_proposal_args versions; 593 repmgr_netaddr_t *my_addr; 594 size_t hostname_len, rec_length; 595 u_int8_t *buf, *p; 596 int ret; 597 598 db_rep = env->rep_handle; 599 my_addr = &db_rep->my_addr; 600 601 /* 602 * In repmgr wire protocol version 1, a handshake message had a rec part 603 * that looked like this: 604 * 605 * +-----------------+----+ 606 * | host name ... | \0 | 607 * +-----------------+----+ 608 * 609 * To ensure its own sanity, the old repmgr would write a NUL into the 610 * last byte of a received message, and then use normal C library string 611 * operations (e.g., * strlen, strcpy). 612 * 613 * Now, a version proposal has a rec part that looks like this: 614 * 615 * +-----------------+----+------------------+------+ 616 * | host name ... | \0 | extra info ... | \0 | 617 * +-----------------+----+------------------+------+ 618 * 619 * The "extra info" contains the version parameters, in marshaled form. 620 */ 621 622 hostname_len = strlen(my_addr->host); 623 rec_length = hostname_len + 1 + 624 __REPMGR_VERSION_PROPOSAL_SIZE + 1; 625 if ((ret = __os_malloc(env, rec_length, &buf)) != 0) 626 goto out; 627 p = buf; 628 (void)strcpy((char*)p, my_addr->host); 629 630 p += hostname_len + 1; 631 versions.min = DB_REPMGR_MIN_VERSION; 632 versions.max = DB_REPMGR_VERSION; 633 __repmgr_version_proposal_marshal(env, &versions, p); 634 635 ret = send_v1_handshake(env, conn, buf, rec_length); 636 __os_free(env, buf); 637out: 638 return (ret); 639} 640 641static int 642send_v1_handshake(env, conn, buf, len) 643 ENV *env; 644 REPMGR_CONNECTION *conn; 645 void *buf; 646 size_t len; 647{ 648 DB_REP *db_rep; 649 REP *rep; 650 repmgr_netaddr_t *my_addr; 651 DB_REPMGR_V1_HANDSHAKE buffer; 652 DBT cntrl, rec; 653 654 db_rep = env->rep_handle; 655 rep = db_rep->region; 656 my_addr = &db_rep->my_addr; 657 658 buffer.version = 1; 659 buffer.priority = htonl(rep->priority); 660 buffer.port = my_addr->port; 661 cntrl.data = &buffer; 662 cntrl.size = sizeof(buffer); 663 664 rec.data = buf; 665 rec.size = (u_int32_t)len; 666 667 /* 668 * It would of course be disastrous to block the select() thread, so 669 * pass the "blockable" argument as FALSE. Fortunately blocking should 670 * never be necessary here, because the hand-shake is always the first 671 * thing we send. Which is a good thing, because it would be almost as 672 * disastrous if we allowed ourselves to drop a handshake. 673 */ 674 return (__repmgr_send_one(env, 675 conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE)); 676} 677 678/* 679 * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *)); 680 * 681 * !!! 682 * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here. 683 */ 684int 685__repmgr_read_from_site(env, conn) 686 ENV *env; 687 REPMGR_CONNECTION *conn; 688{ 689 DB_REP *db_rep; 690 REPMGR_SITE *site; 691 SITE_STRING_BUFFER buffer; 692 size_t nr; 693 int ret; 694 695 db_rep = env->rep_handle; 696 /* 697 * Keep reading pieces as long as we're making some progress, or until 698 * we complete the current read phase. 699 */ 700 for (;;) { 701 if ((ret = __repmgr_readv(conn->fd, 702 &conn->iovecs.vectors[conn->iovecs.offset], 703 conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) { 704 switch (ret) { 705#ifndef DB_WIN32 706 case EINTR: 707 continue; 708#endif 709 case WOULDBLOCK: 710 return (0); 711 default: 712#ifdef EBADF 713 DB_ASSERT(env, ret != EBADF); 714#endif 715 (void)__repmgr_format_eid_loc(env->rep_handle, 716 conn->eid, buffer); 717 __db_err(env, ret, 718 "can't read from %s", buffer); 719 STAT(env->rep_handle-> 720 region->mstat.st_connection_drop++); 721 return (DB_REP_UNAVAIL); 722 } 723 } 724 725 if (nr > 0) { 726 if (IS_VALID_EID(conn->eid)) { 727 site = SITE_FROM_EID(conn->eid); 728 __os_gettime( 729 env, &site->last_rcvd_timestamp, 1); 730 } 731 if (__repmgr_update_consumed(&conn->iovecs, nr)) 732 return (dispatch_phase_completion(env, 733 conn)); 734 } else { 735 (void)__repmgr_format_eid_loc(env->rep_handle, 736 conn->eid, buffer); 737 __db_errx(env, "EOF on connection from %s", buffer); 738 STAT(env->rep_handle-> 739 region->mstat.st_connection_drop++); 740 return (DB_REP_UNAVAIL); 741 } 742 } 743} 744 745/* 746 * Handles whatever needs to be done upon the completion of a reading phase on a 747 * given connection. 748 */ 749static int 750dispatch_phase_completion(env, conn) 751 ENV *env; 752 REPMGR_CONNECTION *conn; 753{ 754#define MEM_ALIGN sizeof(double) 755 DBT *dbt; 756 u_int32_t control_size, rec_size; 757 size_t memsize, control_offset, rec_offset; 758 void *membase; 759 int ret; 760 761 switch (conn->reading_phase) { 762 case SIZES_PHASE: 763 /* 764 * We've received the header: a message type and the lengths of 765 * the two pieces of the message. Set up buffers to read the 766 * two pieces. This set-up is a bit different for a 767 * REPMGR_REP_MESSAGE, because we plan to pass it off to the msg 768 * threads. 769 */ 770 __repmgr_iovec_init(&conn->iovecs); 771 control_size = ntohl(conn->control_size_buf); 772 rec_size = ntohl(conn->rec_size_buf); 773 774 if (conn->msg_type == REPMGR_REP_MESSAGE) { 775 if (control_size == 0) { 776 __db_errx( 777 env, "illegal size for rep msg"); 778 return (DB_REP_UNAVAIL); 779 } 780 /* 781 * Allocate a block of memory large enough to hold a 782 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT 783 * data areas that it points to. Start by calculating 784 * the total memory needed, rounding up for the start of 785 * each DBT, to ensure possible alignment requirements. 786 */ 787 memsize = (size_t) 788 DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN); 789 control_offset = memsize; 790 memsize += control_size; 791 if (rec_size > 0) { 792 memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN); 793 rec_offset = memsize; 794 memsize += rec_size; 795 } else 796 COMPQUIET(rec_offset, 0); 797 if ((ret = __os_malloc(env, memsize, &membase)) != 0) 798 return (ret); 799 conn->input.rep_message = membase; 800 801 conn->input.rep_message->originating_eid = conn->eid; 802 DB_INIT_DBT(conn->input.rep_message->control, 803 (u_int8_t*)membase + control_offset, control_size); 804 __repmgr_add_dbt(&conn->iovecs, 805 &conn->input.rep_message->control); 806 807 if (rec_size > 0) { 808 DB_INIT_DBT(conn->input.rep_message->rec, 809 (rec_size > 0 ? 810 (u_int8_t*)membase + rec_offset : NULL), 811 rec_size); 812 __repmgr_add_dbt(&conn->iovecs, 813 &conn->input.rep_message->rec); 814 } else 815 DB_INIT_DBT(conn->input.rep_message->rec, 816 NULL, 0); 817 } else { 818 conn->input.repmgr_msg.cntrl.size = control_size; 819 conn->input.repmgr_msg.rec.size = rec_size; 820 821 if (control_size > 0) { 822 dbt = &conn->input.repmgr_msg.cntrl; 823 if ((ret = __os_malloc(env, control_size, 824 &dbt->data)) != 0) 825 return (ret); 826 __repmgr_add_dbt(&conn->iovecs, dbt); 827 } 828 829 if (rec_size > 0) { 830 dbt = &conn->input.repmgr_msg.rec; 831 if ((ret = __os_malloc(env, rec_size, 832 &dbt->data)) != 0) { 833 if (control_size > 0) 834 __os_free(env, 835 conn->input.repmgr_msg. 836 cntrl.data); 837 return (ret); 838 } 839 __repmgr_add_dbt(&conn->iovecs, dbt); 840 } 841 } 842 843 conn->reading_phase = DATA_PHASE; 844 845 if (control_size > 0 || rec_size > 0) 846 break; 847 848 /* 849 * However, if they're both 0, we're ready to complete 850 * DATA_PHASE. 851 */ 852 /* FALLTHROUGH */ 853 854 case DATA_PHASE: 855 return (dispatch_msgin(env, conn)); 856 857 default: 858 DB_ASSERT(env, FALSE); 859 } 860 861 return (0); 862} 863 864/* 865 * Processes an incoming message, depending on our current state. 866 */ 867static int 868dispatch_msgin(env, conn) 869 ENV *env; 870 REPMGR_CONNECTION *conn; 871{ 872 DBT *dbt; 873 char *hostname; 874 int given, ret; 875 876 given = FALSE; 877 878 switch (conn->state) { 879 case CONN_CONNECTED: 880 /* 881 * In this state, we know we're working with an outgoing 882 * connection. We've sent a version proposal, and now expect 883 * the response (which could be a dumb old V1 handshake). 884 */ 885 ONLY_HANDSHAKE(env, conn); 886 if ((ret = read_version_response(env, conn)) != 0) 887 return (ret); 888 break; 889 890 case CONN_NEGOTIATE: 891 /* 892 * Since we're in this state, we know we're working with an 893 * incoming connection, and this is the first message we've 894 * received. So it must be a version negotiation proposal (or a 895 * legacy V1 handshake). (We'll verify this of course.) 896 */ 897 ONLY_HANDSHAKE(env, conn); 898 if ((ret = send_version_response(env, conn)) != 0) 899 return (ret); 900 break; 901 902 case CONN_PARAMETERS: 903 /* 904 * We've previously agreed on a (>1) version, and are now simply 905 * awaiting the other side's parameters handshake. 906 */ 907 ONLY_HANDSHAKE(env, conn); 908 dbt = &conn->input.repmgr_msg.rec; 909 hostname = dbt->data; 910 hostname[dbt->size-1] = '\0'; 911 if ((ret = accept_handshake(env, conn, hostname)) != 0) 912 return (ret); 913 conn->state = CONN_READY; 914 break; 915 916 case CONN_READY: /* FALLTHROUGH */ 917 case CONN_CONGESTED: 918 /* 919 * We have a complete message, so process it. Acks and 920 * handshakes get processed here, in line. Regular rep messages 921 * get posted to a queue, to be handled by a thread from the 922 * message thread pool. 923 */ 924 switch (conn->msg_type) { 925 case REPMGR_ACK: 926 if ((ret = record_ack(env, conn)) != 0) 927 return (ret); 928 break; 929 930 case REPMGR_HEARTBEAT: 931 /* 932 * The underlying byte-receiving mechanism will already 933 * have noted the fact that we got some traffic on this 934 * connection. And that's all we really have to do, so 935 * there's nothing more needed at this point. 936 */ 937 break; 938 939 case REPMGR_REP_MESSAGE: 940 if ((ret = __repmgr_queue_put(env, 941 conn->input.rep_message)) != 0) 942 return (ret); 943 /* 944 * The queue has taken over responsibility for the 945 * rep_message buffer, and will free it later. 946 */ 947 given = TRUE; 948 break; 949 950 default: 951 __db_errx(env, 952 "unexpected msg type rcvd in ready state: %d", 953 (int)conn->msg_type); 954 return (DB_REP_UNAVAIL); 955 } 956 break; 957 958 case CONN_DEFUNCT: 959 break; 960 961 default: 962 DB_ASSERT(env, FALSE); 963 } 964 965 if (!given) { 966 dbt = &conn->input.repmgr_msg.cntrl; 967 if (dbt->size > 0) 968 __os_free(env, dbt->data); 969 dbt = &conn->input.repmgr_msg.rec; 970 if (dbt->size > 0) 971 __os_free(env, dbt->data); 972 } 973 __repmgr_reset_for_reading(conn); 974 return (0); 975} 976 977/* 978 * Examine and verify the incoming version proposal message, and send an 979 * appropriate response. 980 */ 981static int 982send_version_response(env, conn) 983 ENV *env; 984 REPMGR_CONNECTION *conn; 985{ 986 DB_REP *db_rep; 987 __repmgr_version_proposal_args versions; 988 __repmgr_version_confirmation_args conf; 989 repmgr_netaddr_t *my_addr; 990 char *hostname; 991 u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1]; 992 DBT vi; 993 int ret; 994 995 db_rep = env->rep_handle; 996 my_addr = &db_rep->my_addr; 997 998 if ((ret = find_version_info(env, conn, &vi)) != 0) 999 return (ret); 1000 if (vi.size == 0) { 1001 /* No version info, so we must be talking to a v1 site. */ 1002 hostname = conn->input.repmgr_msg.rec.data; 1003 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0) 1004 return (ret); 1005 if ((ret = send_v1_handshake(env, conn, my_addr->host, 1006 strlen(my_addr->host) + 1)) != 0) 1007 return (ret); 1008 conn->state = CONN_READY; 1009 } else { 1010 if ((ret = __repmgr_version_proposal_unmarshal(env, 1011 &versions, vi.data, vi.size, NULL)) != 0) 1012 return (DB_REP_UNAVAIL); 1013 1014 if (DB_REPMGR_VERSION >= versions.min && 1015 DB_REPMGR_VERSION <= versions.max) 1016 conf.version = DB_REPMGR_VERSION; 1017 else if (versions.max >= DB_REPMGR_MIN_VERSION && 1018 versions.max <= DB_REPMGR_VERSION) 1019 conf.version = versions.max; 1020 else { 1021 /* 1022 * User must have wired up a combination of versions 1023 * exceeding what we said we'd support. 1024 */ 1025 __db_errx(env, 1026 "No available version between %lu and %lu", 1027 (u_long)versions.min, (u_long)versions.max); 1028 return (DB_REP_UNAVAIL); 1029 } 1030 conn->version = conf.version; 1031 1032 __repmgr_version_confirmation_marshal(env, &conf, buf); 1033 if ((ret = send_handshake(env, conn, buf, sizeof(buf))) != 0) 1034 return (ret); 1035 1036 conn->state = CONN_PARAMETERS; 1037 } 1038 return (ret); 1039} 1040 1041/* 1042 * Sends a version-aware handshake to the remote site, only after we've verified 1043 * that it is indeed version-aware. We can send either v2 or v3 handshake, 1044 * depending on the connection's version. 1045 */ 1046static int 1047send_handshake(env, conn, opt, optlen) 1048 ENV *env; 1049 REPMGR_CONNECTION *conn; 1050 void *opt; 1051 size_t optlen; 1052{ 1053 DB_REP *db_rep; 1054 REP *rep; 1055 DBT cntrl, rec; 1056 __repmgr_handshake_args hs; 1057 __repmgr_v2handshake_args v2hs; 1058 repmgr_netaddr_t *my_addr; 1059 size_t hostname_len, rec_len; 1060 void *buf; 1061 u_int8_t *p; 1062 u_int32_t cntrl_len; 1063 int ret; 1064 1065 db_rep = env->rep_handle; 1066 rep = db_rep->region; 1067 my_addr = &db_rep->my_addr; 1068 1069 /* 1070 * The cntrl part has port and priority. The rec part has the host 1071 * name, followed by whatever optional extra data was passed to us. 1072 * 1073 * Version awareness was introduced with protocol version 2. 1074 */ 1075 DB_ASSERT(env, conn->version >= 2); 1076 cntrl_len = conn->version == 2 ? 1077 __REPMGR_V2HANDSHAKE_SIZE : __REPMGR_HANDSHAKE_SIZE; 1078 hostname_len = strlen(my_addr->host); 1079 rec_len = hostname_len + 1 + 1080 (opt == NULL ? 0 : optlen); 1081 1082 if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0) 1083 return (ret); 1084 1085 cntrl.data = p = buf; 1086 if (conn->version == 2) { 1087 /* Not allowed to use multi-process feature in v2 group. */ 1088 DB_ASSERT(env, !IS_SUBORDINATE(db_rep)); 1089 v2hs.port = my_addr->port; 1090 v2hs.priority = rep->priority; 1091 __repmgr_v2handshake_marshal(env, &v2hs, p); 1092 } else { 1093 hs.port = my_addr->port; 1094 hs.priority = rep->priority; 1095 hs.flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0; 1096 __repmgr_handshake_marshal(env, &hs, p); 1097 } 1098 cntrl.size = cntrl_len; 1099 1100 p = rec.data = &p[cntrl_len]; 1101 (void)strcpy((char*)p, my_addr->host); 1102 p += hostname_len + 1; 1103 if (opt != NULL) { 1104 memcpy(p, opt, optlen); 1105 p += optlen; 1106 } 1107 rec.size = (u_int32_t)(p - (u_int8_t*)rec.data); 1108 1109 /* Never block on select thread: pass blockable as FALSE. */ 1110 ret = __repmgr_send_one(env, 1111 conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE); 1112 __os_free(env, buf); 1113 return (ret); 1114} 1115 1116static int 1117read_version_response(env, conn) 1118 ENV *env; 1119 REPMGR_CONNECTION *conn; 1120{ 1121 __repmgr_version_confirmation_args conf; 1122 DBT vi; 1123 char *hostname; 1124 int ret; 1125 1126 if ((ret = find_version_info(env, conn, &vi)) != 0) 1127 return (ret); 1128 hostname = conn->input.repmgr_msg.rec.data; 1129 if (vi.size == 0) { 1130 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0) 1131 return (ret); 1132 } else { 1133 if ((ret = __repmgr_version_confirmation_unmarshal(env, 1134 &conf, vi.data, vi.size, NULL)) != 0) 1135 return (DB_REP_UNAVAIL); 1136 if (conf.version >= DB_REPMGR_MIN_VERSION && 1137 conf.version <= DB_REPMGR_VERSION) 1138 conn->version = conf.version; 1139 else { 1140 /* 1141 * Remote site "confirmed" a version outside of the 1142 * range we proposed. It should never do that. 1143 */ 1144 __db_errx(env, 1145 "Can't support confirmed version %lu", 1146 (u_long)conf.version); 1147 return (DB_REP_UNAVAIL); 1148 } 1149 1150 if ((ret = accept_handshake(env, conn, hostname)) != 0) 1151 return (ret); 1152 if ((ret = send_handshake(env, conn, NULL, 0)) != 0) 1153 return (ret); 1154 } 1155 conn->state = CONN_READY; 1156 return (ret); 1157} 1158 1159/* 1160 * Examine the rec part of a handshake message to see if it has any version 1161 * information in it. This is the magic that lets us allows version-aware sites 1162 * to exchange information, and yet avoids tripping up v1 sites, which don't 1163 * know how to look for it. 1164 */ 1165static int 1166find_version_info(env, conn, vi) 1167 ENV *env; 1168 REPMGR_CONNECTION *conn; 1169 DBT *vi; 1170{ 1171 DBT *dbt; 1172 char *hostname; 1173 u_int32_t hostname_len; 1174 1175 dbt = &conn->input.repmgr_msg.rec; 1176 if (dbt->size == 0) { 1177 __db_errx(env, "handshake is missing rec part"); 1178 return (DB_REP_UNAVAIL); 1179 } 1180 hostname = dbt->data; 1181 hostname[dbt->size-1] = '\0'; 1182 hostname_len = (u_int32_t)strlen(hostname); 1183 if (hostname_len + 1 == dbt->size) { 1184 /* 1185 * The rec DBT held only the host name. This is a simple legacy 1186 * V1 handshake; it contains no version information. 1187 */ 1188 vi->size = 0; 1189 } else { 1190 /* 1191 * There's more data than just the host name. The remainder is 1192 * available to be treated as a normal byte buffer (and read in 1193 * by one of the unmarshal functions). Note that the remaining 1194 * length should not include the padding byte that we have 1195 * already clobbered. 1196 */ 1197 vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1]; 1198 vi->size = (dbt->size - (hostname_len+1)) - 1; 1199 } 1200 return (0); 1201} 1202 1203static int 1204accept_handshake(env, conn, hostname) 1205 ENV *env; 1206 REPMGR_CONNECTION *conn; 1207 char *hostname; 1208{ 1209 __repmgr_handshake_args hs; 1210 __repmgr_v2handshake_args hs2; 1211 u_int port; 1212 u_int32_t pri, flags; 1213 1214 /* 1215 * Current version is 3, and only other version that supports version 1216 * negotiation is 2. 1217 */ 1218 DB_ASSERT(env, conn->version == 2 || conn->version == 3); 1219 1220 /* Extract port and priority from cntrl. */ 1221 if (conn->version == 2) { 1222 if (__repmgr_v2handshake_unmarshal(env, &hs2, 1223 conn->input.repmgr_msg.cntrl.data, 1224 conn->input.repmgr_msg.cntrl.size, NULL) != 0) 1225 return (DB_REP_UNAVAIL); 1226 port = hs2.port; 1227 pri = hs2.priority; 1228 flags = 0; 1229 } else { 1230 if (__repmgr_handshake_unmarshal(env, &hs, 1231 conn->input.repmgr_msg.cntrl.data, 1232 conn->input.repmgr_msg.cntrl.size, NULL) != 0) 1233 return (DB_REP_UNAVAIL); 1234 port = hs.port; 1235 pri = hs.priority; 1236 flags = hs.flags; 1237 } 1238 1239 return (process_parameters(env, 1240 conn, hostname, port, pri, flags)); 1241} 1242 1243static int 1244accept_v1_handshake(env, conn, hostname) 1245 ENV *env; 1246 REPMGR_CONNECTION *conn; 1247 char *hostname; 1248{ 1249 DB_REPMGR_V1_HANDSHAKE *handshake; 1250 u_int32_t prio; 1251 1252 handshake = conn->input.repmgr_msg.cntrl.data; 1253 if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) || 1254 handshake->version != 1) { 1255 __db_errx(env, "malformed V1 handshake"); 1256 return (DB_REP_UNAVAIL); 1257 } 1258 1259 conn->version = 1; 1260 prio = ntohl(handshake->priority); 1261 return (process_parameters(env, 1262 conn, hostname, handshake->port, prio, 0)); 1263} 1264 1265static int 1266process_parameters(env, conn, host, port, priority, flags) 1267 ENV *env; 1268 REPMGR_CONNECTION *conn; 1269 char *host; 1270 u_int port; 1271 u_int32_t priority, flags; 1272{ 1273 DB_REP *db_rep; 1274 REPMGR_RETRY *retry; 1275 REPMGR_SITE *site; 1276 int eid, ret, sockopt; 1277 1278 db_rep = env->rep_handle; 1279 1280 if (F_ISSET(conn, CONN_INCOMING)) { 1281 /* 1282 * Incoming connection: we don't yet know what site it belongs 1283 * to, so it must be on the "orphans" list. 1284 */ 1285 DB_ASSERT(env, !IS_VALID_EID(conn->eid)); 1286 TAILQ_REMOVE(&db_rep->connections, conn, entries); 1287 1288 /* 1289 * Now that we've been given the host and port, use them to find 1290 * the site (or create a new one if necessary, etc.). 1291 */ 1292 if ((site = __repmgr_find_site(env, host, port)) != NULL) { 1293 eid = EID_FROM_SITE(site); 1294 if (LF_ISSET(REPMGR_SUBORDINATE)) { 1295 /* 1296 * Accept it, as a supplementary source of 1297 * input, but nothing else. 1298 */ 1299 TAILQ_INSERT_TAIL(&site->sub_conns, 1300 conn, entries); 1301 conn->eid = eid; 1302 1303#ifdef SO_KEEPALIVE 1304 sockopt = 1; 1305 if (setsockopt(conn->fd, SOL_SOCKET, 1306 SO_KEEPALIVE, (sockopt_t)&sockopt, 1307 sizeof(sockopt)) != 0) { 1308 ret = net_errno; 1309 __db_err(env, ret, 1310 "can't set KEEPALIVE socket option"); 1311 return (ret); 1312 } 1313#endif 1314 } else { 1315 if (site->state == SITE_IDLE) { 1316 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1317 "handshake from idle site %s:%u EID %u", 1318 host, port, eid)); 1319 retry = site->ref.retry; 1320 TAILQ_REMOVE(&db_rep->retries, 1321 retry, entries); 1322 __os_free(env, retry); 1323 } else { 1324 /* 1325 * We got an incoming connection for a 1326 * site we were already connected to; at 1327 * least we thought we were. 1328 */ 1329 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1330 "connection from %s:%u EID %u supersedes existing", 1331 host, port, eid)); 1332 1333 /* 1334 * No need to schedule a retry for 1335 * later, since we now have a 1336 * replacement connection. 1337 */ 1338 __repmgr_disable_connection(env, 1339 site->ref.conn); 1340 } 1341 conn->eid = eid; 1342 site->state = SITE_CONNECTED; 1343 site->ref.conn = conn; 1344 __os_gettime(env, 1345 &site->last_rcvd_timestamp, 1); 1346 } 1347 } else { 1348 if ((ret = introduce_site(env, 1349 host, port, &site, flags)) == 0) 1350 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1351 "handshake introduces unknown site %s:%u", host, port)); 1352 else if (ret != EEXIST) 1353 return (ret); 1354 eid = EID_FROM_SITE(site); 1355 1356 if (LF_ISSET(REPMGR_SUBORDINATE)) { 1357 TAILQ_INSERT_TAIL(&site->sub_conns, 1358 conn, entries); 1359#ifdef SO_KEEPALIVE 1360 sockopt = 1; 1361 if ((ret = setsockopt(conn->fd, SOL_SOCKET, 1362 SO_KEEPALIVE, (sockopt_t)&sockopt, 1363 sizeof(sockopt))) != 0) { 1364 __db_err(env, ret, 1365 "can't set KEEPALIVE socket option"); 1366 return (ret); 1367 } 1368#endif 1369 } else { 1370 site->state = SITE_CONNECTED; 1371 site->ref.conn = conn; 1372 __os_gettime(env, 1373 &site->last_rcvd_timestamp, 1); 1374 } 1375 conn->eid = eid; 1376 } 1377 } else { 1378 /* 1379 * Since we initiated this as an outgoing connection, we 1380 * obviously already know the host, port and site. We just need 1381 * the other site's priority. 1382 */ 1383 DB_ASSERT(env, IS_VALID_EID(conn->eid)); 1384 site = SITE_FROM_EID(conn->eid); 1385 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1386 "handshake from connection to %s:%lu EID %u", 1387 site->net_addr.host, 1388 (u_long)site->net_addr.port, conn->eid)); 1389 } 1390 1391 site->priority = priority; 1392 F_SET(site, SITE_HAS_PRIO); 1393 1394 /* 1395 * If we're moping around wishing we knew who the master was, then 1396 * getting in touch with another site might finally provide sufficient 1397 * connectivity to find out. But just do this once, because otherwise 1398 * we get messages while the subsequent rep_start operations are going 1399 * on, and rep tosses them in that case. 1400 */ 1401 if (!IS_SUBORDINATE(db_rep) && /* us */ 1402 db_rep->master_eid == DB_EID_INVALID && 1403 db_rep->init_policy != DB_REP_MASTER && 1404 !db_rep->done_one && 1405 !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */ 1406 db_rep->done_one = TRUE; 1407 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1408 "handshake with no known master to wake election thread")); 1409 if ((ret = __repmgr_init_election(env, ELECT_REPSTART)) != 0) 1410 return (ret); 1411 } 1412 1413 return (0); 1414} 1415 1416static int 1417introduce_site(env, host, port, sitep, flags) 1418 ENV *env; 1419 char *host; 1420 u_int port; 1421 REPMGR_SITE **sitep; 1422 u_int32_t flags; 1423{ 1424 int peer, state; 1425 1426 /* 1427 * SITE_CONNECTED means we have the main connection to the site. But 1428 * we're here when we first learn of a site by getting a subordinate 1429 * connection, so this doesn't suffice to put us in "connected" state. 1430 */ 1431 state = LF_ISSET(REPMGR_SUBORDINATE) ? SITE_IDLE : SITE_CONNECTED; 1432 peer = FALSE; 1433 1434 return (__repmgr_add_site_int(env, host, port, sitep, peer, state)); 1435} 1436 1437static int 1438record_ack(env, conn) 1439 ENV *env; 1440 REPMGR_CONNECTION *conn; 1441{ 1442 DB_REP *db_rep; 1443 REPMGR_SITE *site; 1444 __repmgr_ack_args *ackp, ack; 1445 SITE_STRING_BUFFER location; 1446 u_int32_t gen; 1447 int ret; 1448 1449 db_rep = env->rep_handle; 1450 1451 DB_ASSERT(env, conn->version > 0 && 1452 IS_READY_STATE(conn->state) && IS_VALID_EID(conn->eid)); 1453 site = SITE_FROM_EID(conn->eid); 1454 1455 /* 1456 * Extract the LSN. Save it only if it is an improvement over what the 1457 * site has already ack'ed. 1458 */ 1459 if (conn->version == 1) { 1460 ackp = conn->input.repmgr_msg.cntrl.data; 1461 if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) || 1462 conn->input.repmgr_msg.rec.size != 0) { 1463 __db_errx(env, "bad ack msg size"); 1464 return (DB_REP_UNAVAIL); 1465 } 1466 } else { 1467 ackp = &ack; 1468 if ((ret = __repmgr_ack_unmarshal(env, ackp, 1469 conn->input.repmgr_msg.cntrl.data, 1470 conn->input.repmgr_msg.cntrl.size, NULL)) != 0) 1471 return (DB_REP_UNAVAIL); 1472 } 1473 1474 /* Ignore stale acks. */ 1475 gen = db_rep->region->gen; 1476 if (ackp->generation < gen) { 1477 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1478 "ignoring stale ack (%lu<%lu), from %s", 1479 (u_long)ackp->generation, (u_long)gen, 1480 __repmgr_format_site_loc(site, location))); 1481 return (0); 1482 } 1483 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1484 "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file, 1485 (u_long)ackp->lsn.offset, (u_long)ackp->generation, 1486 __repmgr_format_site_loc(site, location))); 1487 1488 if (ackp->generation == gen && 1489 LOG_COMPARE(&ackp->lsn, &site->max_ack) == 1) { 1490 memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN)); 1491 if ((ret = __repmgr_wake_waiting_senders(env)) != 0) 1492 return (ret); 1493 } 1494 return (0); 1495} 1496 1497/* 1498 * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *)); 1499 */ 1500int 1501__repmgr_write_some(env, conn) 1502 ENV *env; 1503 REPMGR_CONNECTION *conn; 1504{ 1505 QUEUED_OUTPUT *output; 1506 REPMGR_FLAT *msg; 1507 int bytes, ret; 1508 1509 while (!STAILQ_EMPTY(&conn->outbound_queue)) { 1510 output = STAILQ_FIRST(&conn->outbound_queue); 1511 msg = output->msg; 1512 if ((bytes = send(conn->fd, &msg->data[output->offset], 1513 (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) { 1514 if ((ret = net_errno) == WOULDBLOCK) 1515 return (0); 1516 else { 1517 __db_err(env, ret, "writing data"); 1518 STAT(env->rep_handle-> 1519 region->mstat.st_connection_drop++); 1520 return (DB_REP_UNAVAIL); 1521 } 1522 } 1523 1524 if ((output->offset += (size_t)bytes) >= msg->length) { 1525 STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); 1526 __os_free(env, output); 1527 conn->out_queue_length--; 1528 if (--msg->ref_count <= 0) 1529 __os_free(env, msg); 1530 1531 /* 1532 * We've achieved enough movement to free up at least 1533 * one space in the outgoing queue. Wake any message 1534 * threads that may be waiting for space. Leave 1535 * CONGESTED state so that when the queue reaches the 1536 * high-water mark again, the filling thread will be 1537 * allowed to try waiting again. 1538 */ 1539 conn->state = CONN_READY; 1540 if (conn->blockers > 0 && 1541 (ret = __repmgr_signal(&conn->drained)) != 0) 1542 return (ret); 1543 } 1544 } 1545 1546#ifdef DB_WIN32 1547 /* 1548 * With the queue now empty, it's time to relinquish ownership of this 1549 * connection again, so that the next call to send() can write the 1550 * message in line, instead of posting it to the queue for us. 1551 */ 1552 if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE) 1553 == SOCKET_ERROR) { 1554 ret = net_errno; 1555 __db_err(env, ret, "can't remove FD_WRITE event bit"); 1556 return (ret); 1557 } 1558#endif 1559 1560 return (0); 1561} 1562