1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2006,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr_sel.c,v 1.51 2008/04/30 02:33:34 alexg Exp $ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13 14typedef int (*HEARTBEAT_ACTION) __P((ENV *)); 15 16static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); 17static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *)); 18static int __repmgr_call_election __P((ENV *)); 19static int __repmgr_connect __P((ENV*, socket_t *, REPMGR_SITE *)); 20static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *)); 21static int find_version_info __P((ENV *, REPMGR_CONNECTION *, DBT *)); 22static int __repmgr_next_timeout __P((ENV *, 23 db_timespec *, HEARTBEAT_ACTION *)); 24static int dispatch_phase_completion __P((ENV *, REPMGR_CONNECTION *)); 25static REPMGR_CONNECTION *__repmgr_master_connection __P((ENV *)); 26static int process_parameters __P((ENV *, 27 REPMGR_CONNECTION *, char *, u_int, u_int32_t)); 28static int read_version_response __P((ENV *, REPMGR_CONNECTION *)); 29static int record_ack __P((ENV *, REPMGR_CONNECTION *)); 30static int __repmgr_retry_connections __P((ENV *)); 31static int send_handshake __P((ENV *, REPMGR_CONNECTION *, void *, size_t)); 32static int __repmgr_send_heartbeat __P((ENV *)); 33static int send_v1_handshake __P((ENV *, 34 REPMGR_CONNECTION *, void *, size_t)); 35static int send_version_response __P((ENV *, REPMGR_CONNECTION *)); 36static int __repmgr_try_one __P((ENV *, u_int)); 37 38#define ONLY_HANDSHAKE(env, conn) do { \ 39 if (conn->msg_type != REPMGR_HANDSHAKE) { \ 40 __db_errx(env, "unexpected msg type %d in state %d", \ 41 (int)conn->msg_type, conn->state); \ 42 return (DB_REP_UNAVAIL); \ 43 } \ 44} while (0) 45 46/* 47 * PUBLIC: void *__repmgr_select_thread __P((void *)); 48 */ 49void * 50__repmgr_select_thread(args) 51 void *args; 52{ 53 ENV *env = args; 54 int ret; 55 56 if ((ret = __repmgr_select_loop(env)) != 0) { 57 __db_err(env, ret, "select loop failed"); 58 __repmgr_thread_failure(env, ret); 59 } 60 return (NULL); 61} 62 63/* 64 * PUBLIC: int __repmgr_accept __P((ENV *)); 65 */ 66int 67__repmgr_accept(env) 68 ENV *env; 69{ 70 DB_REP *db_rep; 71 REPMGR_CONNECTION *conn; 72 struct sockaddr_in siaddr; 73 socklen_t addrlen; 74 socket_t s; 75 int ret; 76#ifdef DB_WIN32 77 WSAEVENT event_obj; 78#endif 79 80 db_rep = env->rep_handle; 81 addrlen = sizeof(siaddr); 82 if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr, 83 &addrlen)) == -1) { 84 /* 85 * Some errors are innocuous and so should be ignored. MSDN 86 * Library documents the Windows ones; the Unix ones are 87 * advocated in Stevens' UNPv1, section 16.6; and Linux 88 * Application Development, p. 416. 89 */ 90 switch (ret = net_errno) { 91#ifdef DB_WIN32 92 case WSAECONNRESET: 93 case WSAEWOULDBLOCK: 94#else 95 case EINTR: 96 case EWOULDBLOCK: 97 case ECONNABORTED: 98 case ENETDOWN: 99#ifdef EPROTO 100 case EPROTO: 101#endif 102 case ENOPROTOOPT: 103 case EHOSTDOWN: 104#ifdef ENONET 105 case ENONET: 106#endif 107 case EHOSTUNREACH: 108 case EOPNOTSUPP: 109 case ENETUNREACH: 110#endif 111 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 112 "accept error %d considered innocuous", ret)); 113 return (0); 114 default: 115 __db_err(env, ret, "accept error"); 116 return (ret); 117 } 118 } 119 RPRINT(env, DB_VERB_REPMGR_MISC, (env, "accepted a new connection")); 120 121 if ((ret = __repmgr_set_nonblocking(s)) != 0) { 122 __db_err(env, ret, "can't set nonblock after accept"); 123 (void)closesocket(s); 124 return (ret); 125 } 126 127#ifdef DB_WIN32 128 if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { 129 ret = net_errno; 130 __db_err(env, ret, "can't create WSA event"); 131 (void)closesocket(s); 132 return (ret); 133 } 134 if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) { 135 ret = net_errno; 136 __db_err(env, ret, "can't set desired event bits"); 137 (void)WSACloseEvent(event_obj); 138 (void)closesocket(s); 139 return (ret); 140 } 141#endif 142 if ((ret = 143 __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) { 144#ifdef DB_WIN32 145 (void)WSACloseEvent(event_obj); 146#endif 147 (void)closesocket(s); 148 return (ret); 149 } 150 F_SET(conn, CONN_INCOMING); 151 conn->eid = -1; 152#ifdef DB_WIN32 153 conn->event_object = event_obj; 154#endif 155 return (0); 156} 157 158/* 159 * Computes how long we should wait for input, in other words how long until we 160 * have to wake up and do something. Returns TRUE if timeout is set; FALSE if 161 * there is nothing to wait for. 162 * 163 * Note that the resulting timeout could be zero; but it can't be negative. 164 * 165 * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *)); 166 */ 167int 168__repmgr_compute_timeout(env, timeout) 169 ENV *env; 170 db_timespec *timeout; 171{ 172 DB_REP *db_rep; 173 REPMGR_RETRY *retry; 174 db_timespec now, t; 175 int have_timeout; 176 177 db_rep = env->rep_handle; 178 179 /* 180 * There are two factors to consider: are heartbeats in use? and, do we 181 * have any sites with broken connections that we ought to retry? 182 */ 183 have_timeout = __repmgr_next_timeout(env, &t, NULL); 184 185 /* List items are in order, so we only have to examine the first one. */ 186 if (!TAILQ_EMPTY(&db_rep->retries)) { 187 retry = TAILQ_FIRST(&db_rep->retries); 188 if (have_timeout) { 189 /* Choose earliest timeout deadline. */ 190 t = timespeccmp(&retry->time, &t, <) ? retry->time : t; 191 } else { 192 t = retry->time; 193 have_timeout = TRUE; 194 } 195 } 196 197 if (have_timeout) { 198 __os_gettime(env, &now, 1); 199 if (timespeccmp(&now, &t, >=)) 200 timespecclear(timeout); 201 else { 202 *timeout = t; 203 timespecsub(timeout, &now); 204 } 205 } 206 207 return (have_timeout); 208} 209 210/* 211 * Figures out the next heartbeat-related thing to be done, and when it should 212 * be done. The code is factored this way because this computation needs to be 213 * done both before each select() call, and after (when we're checking for timer 214 * expiration). 215 */ 216static int 217__repmgr_next_timeout(env, deadline, action) 218 ENV *env; 219 db_timespec *deadline; 220 HEARTBEAT_ACTION *action; 221{ 222 DB_REP *db_rep; 223 HEARTBEAT_ACTION my_action; 224 REPMGR_CONNECTION *conn; 225 REPMGR_SITE *site; 226 db_timespec t; 227 228 db_rep = env->rep_handle; 229 230 if (db_rep->master_eid == SELF_EID && db_rep->heartbeat_frequency > 0) { 231 t = db_rep->last_bcast; 232 TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_frequency); 233 my_action = __repmgr_send_heartbeat; 234 } else if ((conn = __repmgr_master_connection(env)) != NULL && 235 db_rep->heartbeat_monitor_timeout > 0 && 236 conn->version >= HEARTBEAT_MIN_VERSION) { 237 /* 238 * If we have a working connection to a heartbeat-aware master, 239 * let's monitor it. Otherwise there's really nothing we can 240 * do. 241 */ 242 site = SITE_FROM_EID(db_rep->master_eid); 243 t = site->last_rcvd_timestamp; 244 TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_monitor_timeout); 245 my_action = __repmgr_call_election; 246 } else 247 return (FALSE); 248 249 *deadline = t; 250 if (action != NULL) 251 *action = my_action; 252 return (TRUE); 253} 254 255static int 256__repmgr_send_heartbeat(env) 257 ENV *env; 258{ 259 DBT control, rec; 260 u_int unused1, unused2; 261 262 DB_INIT_DBT(control, NULL, 0); 263 DB_INIT_DBT(rec, NULL, 0); 264 return (__repmgr_send_broadcast(env, 265 REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2)); 266} 267 268static REPMGR_CONNECTION * 269__repmgr_master_connection(env) 270 ENV *env; 271{ 272 DB_REP *db_rep; 273 REPMGR_CONNECTION *conn; 274 REPMGR_SITE *master; 275 276 db_rep = env->rep_handle; 277 278 if (db_rep->master_eid == SELF_EID || 279 !IS_VALID_EID(db_rep->master_eid)) 280 return (NULL); 281 master = SITE_FROM_EID(db_rep->master_eid); 282 if (master->state != SITE_CONNECTED) 283 return (NULL); 284 conn = master->ref.conn; 285 if (IS_READY_STATE(conn->state)) 286 return (conn); 287 return (NULL); 288} 289 290static int 291__repmgr_call_election(env) 292 ENV *env; 293{ 294 REPMGR_CONNECTION *conn; 295 296 conn = __repmgr_master_connection(env); 297 DB_ASSERT(env, conn != NULL); 298 RPRINT(env, DB_VERB_REPMGR_MISC, 299 (env, "heartbeat monitor timeout expired")); 300 return (__repmgr_bust_connection(env, conn)); 301} 302 303/* 304 * PUBLIC: int __repmgr_check_timeouts __P((ENV *)); 305 * 306 * !!! 307 * Assumes caller holds the mutex. 308 */ 309int 310__repmgr_check_timeouts(env) 311 ENV *env; 312{ 313 db_timespec when, now; 314 HEARTBEAT_ACTION action; 315 int ret; 316 317 /* 318 * Figure out the next heartbeat-related thing to be done. Then, if 319 * it's time to do it, do so. 320 */ 321 if (__repmgr_next_timeout(env, &when, &action)) { 322 __os_gettime(env, &now, 1); 323 if (timespeccmp(&when, &now, <=) && 324 (ret = (*action)(env)) != 0) 325 return (ret); 326 } 327 328 return (__repmgr_retry_connections(env)); 329} 330 331/* 332 * Initiates connection attempts for any sites on the idle list whose retry 333 * times have expired. 334 */ 335static int 336__repmgr_retry_connections(env) 337 ENV *env; 338{ 339 DB_REP *db_rep; 340 REPMGR_RETRY *retry; 341 db_timespec now; 342 u_int eid; 343 int ret; 344 345 db_rep = env->rep_handle; 346 __os_gettime(env, &now, 1); 347 348 while (!TAILQ_EMPTY(&db_rep->retries)) { 349 retry = TAILQ_FIRST(&db_rep->retries); 350 if (timespeccmp(&retry->time, &now, >=)) 351 break; /* since items are in time order */ 352 353 TAILQ_REMOVE(&db_rep->retries, retry, entries); 354 355 eid = retry->eid; 356 __os_free(env, retry); 357 358 if ((ret = __repmgr_try_one(env, eid)) != 0) 359 return (ret); 360 } 361 return (0); 362} 363 364/* 365 * PUBLIC: int __repmgr_first_try_connections __P((ENV *)); 366 * 367 * !!! 368 * Assumes caller holds the mutex. 369 */ 370int 371__repmgr_first_try_connections(env) 372 ENV *env; 373{ 374 DB_REP *db_rep; 375 u_int eid; 376 int ret; 377 378 db_rep = env->rep_handle; 379 for (eid=0; eid<db_rep->site_cnt; eid++) 380 if ((ret = __repmgr_try_one(env, eid)) != 0) 381 return (ret); 382 return (0); 383} 384 385/* 386 * Makes a best-effort attempt to connect to the indicated site. Returns a 387 * non-zero error indication only for disastrous failures. For re-tryable 388 * errors, we will have scheduled another attempt, and that can be considered 389 * success enough. 390 */ 391static int 392__repmgr_try_one(env, eid) 393 ENV *env; 394 u_int eid; 395{ 396 ADDRINFO *list; 397 DB_REP *db_rep; 398 repmgr_netaddr_t *addr; 399 int ret; 400 401 db_rep = env->rep_handle; 402 403 /* 404 * If have never yet successfully resolved this site's host name, try to 405 * do so now. 406 * 407 * Throughout all the rest of repmgr, we almost never do any sort of 408 * blocking operation in the select thread. This is the sole exception 409 * to that rule. Fortunately, it should rarely happen: 410 * 411 * - for a site that we only learned about because it connected to us: 412 * not only were we not configured to know about it, but we also never 413 * got a NEWSITE message about it. And even then only if the 414 * connection fails and we want to retry it from this end; 415 * 416 * - if the name look-up system (e.g., DNS) is not working (let's hope 417 * it's temporary), or the host name is not found. 418 */ 419 addr = &SITE_FROM_EID(eid)->net_addr; 420 if (ADDR_LIST_FIRST(addr) == NULL) { 421 if ((ret = __repmgr_getaddr( 422 env, addr->host, addr->port, 0, &list)) == 0) { 423 addr->address_list = list; 424 (void)ADDR_LIST_FIRST(addr); 425 } else if (ret == DB_REP_UNAVAIL) 426 return (__repmgr_schedule_connection_attempt( 427 env, eid, FALSE)); 428 else 429 return (ret); 430 } 431 432 /* Here, when we have a valid address. */ 433 return (__repmgr_connect_site(env, eid)); 434} 435 436/* 437 * Tries to establish a connection with the site indicated by the given eid, 438 * starting with the "current" element of its address list and trying as many 439 * addresses as necessary until the list is exhausted. 440 * 441 * PUBLIC: int __repmgr_connect_site __P((ENV *, u_int eid)); 442 */ 443int 444__repmgr_connect_site(env, eid) 445 ENV *env; 446 u_int eid; 447{ 448 DB_REP *db_rep; 449 REPMGR_CONNECTION *con; 450 REPMGR_SITE *site; 451 socket_t s; 452 int state; 453 int ret; 454#ifdef DB_WIN32 455 long desired_event; 456 WSAEVENT event_obj; 457#endif 458 459 db_rep = env->rep_handle; 460 site = SITE_FROM_EID(eid); 461 462 switch (ret = __repmgr_connect(env, &s, site)) { 463 case 0: 464 state = CONN_CONNECTED; 465#ifdef DB_WIN32 466 desired_event = FD_READ|FD_CLOSE; 467#endif 468 break; 469 case INPROGRESS: 470 state = CONN_CONNECTING; 471#ifdef DB_WIN32 472 desired_event = FD_CONNECT; 473#endif 474 break; 475 default: 476 STAT(db_rep->region->mstat.st_connect_fail++); 477 return ( 478 __repmgr_schedule_connection_attempt(env, eid, FALSE)); 479 } 480 481#ifdef DB_WIN32 482 if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) { 483 ret = net_errno; 484 __db_err(env, ret, "can't create WSA event"); 485 (void)closesocket(s); 486 return (ret); 487 } 488 if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) { 489 ret = net_errno; 490 __db_err(env, ret, "can't set desired event bits"); 491 (void)WSACloseEvent(event_obj); 492 (void)closesocket(s); 493 return (ret); 494 } 495#endif 496 497 if ((ret = __repmgr_new_connection(env, &con, s, state)) 498 != 0) { 499#ifdef DB_WIN32 500 (void)WSACloseEvent(event_obj); 501#endif 502 (void)closesocket(s); 503 return (ret); 504 } 505#ifdef DB_WIN32 506 con->event_object = event_obj; 507#endif 508 509 con->eid = (int)eid; 510 site->ref.conn = con; 511 site->state = SITE_CONNECTED; 512 513 if (state == CONN_CONNECTED) { 514 switch (ret = __repmgr_propose_version(env, con)) { 515 case 0: 516 break; 517 case DB_REP_UNAVAIL: 518 return (__repmgr_bust_connection(env, con)); 519 default: 520 return (ret); 521 } 522 } 523 524 return (0); 525} 526 527static int 528__repmgr_connect(env, socket_result, site) 529 ENV *env; 530 socket_t *socket_result; 531 REPMGR_SITE *site; 532{ 533 repmgr_netaddr_t *addr; 534 ADDRINFO *ai; 535 socket_t s; 536 char *why; 537 int ret; 538 SITE_STRING_BUFFER buffer; 539 540 /* 541 * Lint doesn't know about DB_ASSERT, so it can't tell that this 542 * loop will always get executed at least once, giving 'why' a value. 543 */ 544 COMPQUIET(why, ""); 545 addr = &site->net_addr; 546 ai = ADDR_LIST_CURRENT(addr); 547 DB_ASSERT(env, ai != NULL); 548 for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) { 549 550 if ((s = socket(ai->ai_family, 551 ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) { 552 why = "can't create socket to connect"; 553 continue; 554 } 555 556 if ((ret = __repmgr_set_nonblocking(s)) != 0) { 557 __db_err(env, 558 ret, "can't make nonblock socket to connect"); 559 (void)closesocket(s); 560 return (ret); 561 } 562 563 if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) 564 ret = net_errno; 565 566 if (ret == 0 || ret == INPROGRESS) { 567 *socket_result = s; 568 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 569 "init connection to %s with result %d", 570 __repmgr_format_site_loc(site, buffer), ret)); 571 return (ret); 572 } 573 574 why = "connection failed"; 575 (void)closesocket(s); 576 } 577 578 /* We've exhausted all possible addresses. */ 579 ret = net_errno; 580 __db_err(env, ret, "%s to %s", why, 581 __repmgr_format_site_loc(site, buffer)); 582 return (ret); 583} 584 585/* 586 * Sends a proposal for version negotiation. 587 * 588 * PUBLIC: int __repmgr_propose_version __P((ENV *, REPMGR_CONNECTION *)); 589 */ 590int 591__repmgr_propose_version(env, conn) 592 ENV *env; 593 REPMGR_CONNECTION *conn; 594{ 595 DB_REP *db_rep; 596 __repmgr_version_proposal_args versions; 597 repmgr_netaddr_t *my_addr; 598 size_t hostname_len, rec_length; 599 u_int8_t *buf, *p; 600 int ret; 601 602 db_rep = env->rep_handle; 603 my_addr = &db_rep->my_addr; 604 605 /* 606 * In repmgr wire protocol version 1, a handshake message had a rec part 607 * that looked like this: 608 * 609 * +-----------------+----+ 610 * | host name ... | \0 | 611 * +-----------------+----+ 612 * 613 * To ensure its own sanity, the old repmgr would write a NUL into the 614 * last byte of a received message, and then use normal C library string 615 * operations (e.g., * strlen, strcpy). 616 * 617 * Now, a version proposal has a rec part that looks like this: 618 * 619 * +-----------------+----+------------------+------+ 620 * | host name ... | \0 | extra info ... | \0 | 621 * +-----------------+----+------------------+------+ 622 * 623 * The "extra info" contains the version parameters, in marshaled form. 624 */ 625 626 hostname_len = strlen(my_addr->host); 627 rec_length = hostname_len + 1 + 628 __REPMGR_VERSION_PROPOSAL_SIZE + 1; 629 if ((ret = __os_malloc(env, rec_length, &buf)) != 0) 630 goto out; 631 p = buf; 632 (void)strcpy((char*)p, my_addr->host); 633 634 p += hostname_len + 1; 635 versions.min = DB_REPMGR_MIN_VERSION; 636 versions.max = DB_REPMGR_VERSION; 637 __repmgr_version_proposal_marshal(env, &versions, p); 638 639 ret = send_v1_handshake(env, conn, buf, rec_length); 640 __os_free(env, buf); 641out: 642 return (ret); 643} 644 645static int 646send_v1_handshake(env, conn, buf, len) 647 ENV *env; 648 REPMGR_CONNECTION *conn; 649 void *buf; 650 size_t len; 651{ 652 DB_REP *db_rep; 653 REP *rep; 654 repmgr_netaddr_t *my_addr; 655 DB_REPMGR_V1_HANDSHAKE buffer; 656 DBT cntrl, rec; 657 658 db_rep = env->rep_handle; 659 rep = db_rep->region; 660 my_addr = &db_rep->my_addr; 661 662 buffer.version = 1; 663 buffer.priority = htonl(rep->priority); 664 buffer.port = my_addr->port; 665 cntrl.data = &buffer; 666 cntrl.size = sizeof(buffer); 667 668 rec.data = buf; 669 rec.size = (u_int32_t)len; 670 671 /* 672 * It would of course be disastrous to block the select() thread, so 673 * pass the "blockable" argument as FALSE. Fortunately blocking should 674 * never be necessary here, because the hand-shake is always the first 675 * thing we send. Which is a good thing, because it would be almost as 676 * disastrous if we allowed ourselves to drop a handshake. 677 */ 678 return (__repmgr_send_one(env, 679 conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE)); 680} 681 682/* 683 * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *)); 684 * 685 * !!! 686 * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here. 687 */ 688int 689__repmgr_read_from_site(env, conn) 690 ENV *env; 691 REPMGR_CONNECTION *conn; 692{ 693 DB_REP *db_rep; 694 REPMGR_SITE *site; 695 SITE_STRING_BUFFER buffer; 696 size_t nr; 697 int ret; 698 699 db_rep = env->rep_handle; 700 /* 701 * Keep reading pieces as long as we're making some progress, or until 702 * we complete the current read phase. 703 */ 704 for (;;) { 705 if ((ret = __repmgr_readv(conn->fd, 706 &conn->iovecs.vectors[conn->iovecs.offset], 707 conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) { 708 switch (ret) { 709#ifndef DB_WIN32 710 case EINTR: 711 continue; 712#endif 713 case WOULDBLOCK: 714 return (0); 715 default: 716 (void)__repmgr_format_eid_loc(env->rep_handle, 717 conn->eid, buffer); 718 __db_err(env, ret, 719 "can't read from %s", buffer); 720 STAT(env->rep_handle-> 721 region->mstat.st_connection_drop++); 722 return (DB_REP_UNAVAIL); 723 } 724 } 725 726 if (nr > 0) { 727 if (IS_VALID_EID(conn->eid)) { 728 site = SITE_FROM_EID(conn->eid); 729 __os_gettime( 730 env, &site->last_rcvd_timestamp, 1); 731 } 732 if (__repmgr_update_consumed(&conn->iovecs, nr)) 733 return (dispatch_phase_completion(env, 734 conn)); 735 } else { 736 (void)__repmgr_format_eid_loc(env->rep_handle, 737 conn->eid, buffer); 738 __db_errx(env, "EOF on connection from %s", buffer); 739 STAT(env->rep_handle-> 740 region->mstat.st_connection_drop++); 741 return (DB_REP_UNAVAIL); 742 } 743 } 744} 745 746/* 747 * Handles whatever needs to be done upon the completion of a reading phase on a 748 * given connection. 749 */ 750static int 751dispatch_phase_completion(env, conn) 752 ENV *env; 753 REPMGR_CONNECTION *conn; 754{ 755#define MEM_ALIGN sizeof(double) 756 DBT *dbt; 757 u_int32_t control_size, rec_size; 758 size_t memsize, control_offset, rec_offset; 759 void *membase; 760 int ret; 761 762 switch (conn->reading_phase) { 763 case SIZES_PHASE: 764 /* 765 * We've received the header: a message type and the lengths of 766 * the two pieces of the message. Set up buffers to read the 767 * two pieces. This set-up is a bit different for a 768 * REPMGR_REP_MESSAGE, because we plan to pass it off to the msg 769 * threads. 770 */ 771 __repmgr_iovec_init(&conn->iovecs); 772 control_size = ntohl(conn->control_size_buf); 773 rec_size = ntohl(conn->rec_size_buf); 774 775 if (conn->msg_type == REPMGR_REP_MESSAGE) { 776 if (control_size == 0) { 777 __db_errx( 778 env, "illegal size for rep msg"); 779 return (DB_REP_UNAVAIL); 780 } 781 /* 782 * Allocate a block of memory large enough to hold a 783 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT 784 * data areas that it points to. Start by calculating 785 * the total memory needed, rounding up for the start of 786 * each DBT, to ensure possible alignment requirements. 787 */ 788 memsize = (size_t) 789 DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN); 790 control_offset = memsize; 791 memsize += control_size; 792 if (rec_size > 0) { 793 memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN); 794 rec_offset = memsize; 795 memsize += rec_size; 796 } else 797 COMPQUIET(rec_offset, 0); 798 if ((ret = __os_malloc(env, memsize, &membase)) != 0) 799 return (ret); 800 conn->input.rep_message = membase; 801 802 conn->input.rep_message->originating_eid = conn->eid; 803 DB_INIT_DBT(conn->input.rep_message->control, 804 (u_int8_t*)membase + control_offset, control_size); 805 __repmgr_add_dbt(&conn->iovecs, 806 &conn->input.rep_message->control); 807 808 if (rec_size > 0) { 809 DB_INIT_DBT(conn->input.rep_message->rec, 810 (rec_size > 0 ? 811 (u_int8_t*)membase + rec_offset : NULL), 812 rec_size); 813 __repmgr_add_dbt(&conn->iovecs, 814 &conn->input.rep_message->rec); 815 } else 816 DB_INIT_DBT(conn->input.rep_message->rec, 817 NULL, 0); 818 } else { 819 conn->input.repmgr_msg.cntrl.size = control_size; 820 conn->input.repmgr_msg.rec.size = rec_size; 821 822 if (control_size > 0) { 823 dbt = &conn->input.repmgr_msg.cntrl; 824 if ((ret = __os_malloc(env, control_size, 825 &dbt->data)) != 0) 826 return (ret); 827 __repmgr_add_dbt(&conn->iovecs, dbt); 828 } 829 830 if (rec_size > 0) { 831 dbt = &conn->input.repmgr_msg.rec; 832 if ((ret = __os_malloc(env, rec_size, 833 &dbt->data)) != 0) { 834 if (control_size > 0) 835 __os_free(env, 836 conn->input.repmgr_msg. 837 cntrl.data); 838 return (ret); 839 } 840 __repmgr_add_dbt(&conn->iovecs, dbt); 841 } 842 } 843 844 conn->reading_phase = DATA_PHASE; 845 846 if (control_size > 0 || rec_size > 0) 847 break; 848 849 /* 850 * However, if they're both 0, we're ready to complete 851 * DATA_PHASE. 852 */ 853 /* FALLTHROUGH */ 854 855 case DATA_PHASE: 856 return (dispatch_msgin(env, conn)); 857 858 default: 859 DB_ASSERT(env, FALSE); 860 } 861 862 return (0); 863} 864 865/* 866 * Processes an incoming message, depending on our current state. 867 */ 868static int 869dispatch_msgin(env, conn) 870 ENV *env; 871 REPMGR_CONNECTION *conn; 872{ 873 DBT *dbt; 874 char *hostname; 875 int given, ret; 876 877 given = FALSE; 878 879 switch (conn->state) { 880 case CONN_CONNECTED: 881 /* 882 * In this state, we know we're working with an outgoing 883 * connection. We've sent a version proposal, and now expect 884 * the response (which could be a dumb old V1 handshake). 885 */ 886 ONLY_HANDSHAKE(env, conn); 887 if ((ret = read_version_response(env, conn)) != 0) 888 return (ret); 889 break; 890 891 case CONN_NEGOTIATE: 892 /* 893 * Since we're in this state, we know we're working with an 894 * incoming connection, and this is the first message we've 895 * received. So it must be a version negotiation proposal (or a 896 * legacy V1 handshake). (We'll verify this of course.) 897 */ 898 ONLY_HANDSHAKE(env, conn); 899 if ((ret = send_version_response(env, conn)) != 0) 900 return (ret); 901 break; 902 903 case CONN_PARAMETERS: 904 /* 905 * We've previously agreed on a (>1) version, and are now simply 906 * awaiting the other side's parameters handshake. 907 */ 908 ONLY_HANDSHAKE(env, conn); 909 dbt = &conn->input.repmgr_msg.rec; 910 hostname = dbt->data; 911 hostname[dbt->size-1] = '\0'; 912 if ((ret = accept_handshake(env, conn, hostname)) != 0) 913 return (ret); 914 conn->state = CONN_READY; 915 break; 916 917 case CONN_READY: /* FALLTHROUGH */ 918 case CONN_CONGESTED: 919 /* 920 * We have a complete message, so process it. Acks and 921 * handshakes get processed here, in line. Regular rep messages 922 * get posted to a queue, to be handled by a thread from the 923 * message thread pool. 924 */ 925 switch (conn->msg_type) { 926 case REPMGR_ACK: 927 if ((ret = record_ack(env, conn)) != 0) 928 return (ret); 929 break; 930 931 case REPMGR_HEARTBEAT: 932 /* 933 * The underlying byte-receiving mechanism will already 934 * have noted the fact that we got some traffic on this 935 * connection. And that's all we really have to do, so 936 * there's nothing more needed at this point. 937 */ 938 break; 939 940 case REPMGR_REP_MESSAGE: 941 if ((ret = __repmgr_queue_put(env, 942 conn->input.rep_message)) != 0) 943 return (ret); 944 /* 945 * The queue has taken over responsibility for the 946 * rep_message buffer, and will free it later. 947 */ 948 given = TRUE; 949 break; 950 951 default: 952 __db_errx(env, 953 "unexpected msg type rcvd in ready state: %d", 954 (int)conn->msg_type); 955 return (DB_REP_UNAVAIL); 956 } 957 break; 958 959 case CONN_DEFUNCT: 960 break; 961 962 default: 963 DB_ASSERT(env, FALSE); 964 } 965 966 if (!given) { 967 dbt = &conn->input.repmgr_msg.cntrl; 968 if (dbt->size > 0) 969 __os_free(env, dbt->data); 970 dbt = &conn->input.repmgr_msg.rec; 971 if (dbt->size > 0) 972 __os_free(env, dbt->data); 973 } 974 __repmgr_reset_for_reading(conn); 975 return (0); 976} 977 978/* 979 * Examine and verify the incoming version proposal message, and send an 980 * appropriate response. 981 */ 982static int 983send_version_response(env, conn) 984 ENV *env; 985 REPMGR_CONNECTION *conn; 986{ 987 DB_REP *db_rep; 988 __repmgr_version_proposal_args versions; 989 __repmgr_version_confirmation_args conf; 990 repmgr_netaddr_t *my_addr; 991 char *hostname; 992 u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1]; 993 DBT vi; 994 int ret; 995 996 db_rep = env->rep_handle; 997 my_addr = &db_rep->my_addr; 998 999 if ((ret = find_version_info(env, conn, &vi)) != 0) 1000 return (ret); 1001 if (vi.size == 0) { 1002 /* No version info, so we must be talking to a v1 site. */ 1003 hostname = conn->input.repmgr_msg.rec.data; 1004 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0) 1005 return (ret); 1006 if ((ret = send_v1_handshake(env, 1007 conn, my_addr->host, strlen(my_addr->host) + 1)) != 0) 1008 return (ret); 1009 conn->state = CONN_READY; 1010 } else { 1011 if ((ret = __repmgr_version_proposal_unmarshal(env, 1012 &versions, vi.data, vi.size, NULL)) != 0) 1013 return (DB_REP_UNAVAIL); 1014 1015 /* For now version 2 is the only thing we know here. */ 1016 DB_ASSERT(env, DB_REPMGR_VERSION == 2); 1017 1018 if (DB_REPMGR_VERSION >= versions.min && 1019 DB_REPMGR_VERSION <= versions.max) 1020 conf.version = DB_REPMGR_VERSION; 1021 else if (versions.max >= DB_REPMGR_MIN_VERSION && 1022 versions.max <= DB_REPMGR_VERSION) 1023 conf.version = versions.max; 1024 else { 1025 /* 1026 * User must have wired up a combination of versions 1027 * exceeding what we said we'd support. 1028 */ 1029 __db_errx(env, 1030 "No available version between %lu and %lu", 1031 (u_long)versions.min, (u_long)versions.max); 1032 return (DB_REP_UNAVAIL); 1033 } 1034 conn->version = conf.version; 1035 1036 __repmgr_version_confirmation_marshal(env, &conf, buf); 1037 if ((ret = send_handshake(env, conn, buf, sizeof(buf))) != 0) 1038 return (ret); 1039 1040 conn->state = CONN_PARAMETERS; 1041 } 1042 return (ret); 1043} 1044 1045static int 1046send_handshake(env, conn, opt, optlen) 1047 ENV *env; 1048 REPMGR_CONNECTION *conn; 1049 void *opt; 1050 size_t optlen; 1051{ 1052 DB_REP *db_rep; 1053 REP *rep; 1054 DBT cntrl, rec; 1055 __repmgr_handshake_args hs; 1056 repmgr_netaddr_t *my_addr; 1057 size_t hostname_len, rec_len; 1058 void *buf; 1059 u_int8_t *p; 1060 u_int32_t cntrl_len; 1061 int ret; 1062 1063 db_rep = env->rep_handle; 1064 rep = db_rep->region; 1065 my_addr = &db_rep->my_addr; 1066 1067 /* 1068 * The cntrl part has port and priority. The rec part has the host 1069 * name, followed by whatever optional extra data was passed to us. 1070 */ 1071 cntrl_len = __REPMGR_HANDSHAKE_SIZE; 1072 hostname_len = strlen(my_addr->host); 1073 rec_len = hostname_len + 1 + 1074 (opt == NULL ? 0 : optlen); 1075 1076 if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0) 1077 return (ret); 1078 1079 cntrl.data = p = buf; 1080 hs.port = my_addr->port; 1081 hs.priority = rep->priority; 1082 __repmgr_handshake_marshal(env, &hs, p); 1083 cntrl.size = cntrl_len; 1084 1085 p = rec.data = &p[cntrl_len]; 1086 (void)strcpy((char*)p, my_addr->host); 1087 p += hostname_len + 1; 1088 if (opt != NULL) { 1089 memcpy(p, opt, optlen); 1090 p += optlen; 1091 } 1092 rec.size = (u_int32_t)(p - (u_int8_t*)rec.data); 1093 1094 /* Never block on select thread: pass blockable as FALSE. */ 1095 ret = __repmgr_send_one(env, 1096 conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE); 1097 __os_free(env, buf); 1098 return (ret); 1099} 1100 1101static int 1102read_version_response(env, conn) 1103 ENV *env; 1104 REPMGR_CONNECTION *conn; 1105{ 1106 __repmgr_version_confirmation_args conf; 1107 DBT vi; 1108 char *hostname; 1109 int ret; 1110 1111 if ((ret = find_version_info(env, conn, &vi)) != 0) 1112 return (ret); 1113 hostname = conn->input.repmgr_msg.rec.data; 1114 if (vi.size == 0) { 1115 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0) 1116 return (ret); 1117 } else { 1118 if ((ret = __repmgr_version_confirmation_unmarshal(env, 1119 &conf, vi.data, vi.size, NULL)) != 0) 1120 return (DB_REP_UNAVAIL); 1121 if (conf.version >= DB_REPMGR_MIN_VERSION && 1122 conf.version <= DB_REPMGR_VERSION) 1123 conn->version = conf.version; 1124 else { 1125 /* 1126 * Remote site "confirmed" a version outside of the 1127 * range we proposed. It should never do that. 1128 */ 1129 __db_errx(env, 1130 "Can't support confirmed version %lu", 1131 (u_long)conf.version); 1132 return (DB_REP_UNAVAIL); 1133 } 1134 1135 if ((ret = accept_handshake(env, conn, hostname)) != 0) 1136 return (ret); 1137 if ((ret = send_handshake(env, conn, NULL, 0)) != 0) 1138 return (ret); 1139 } 1140 conn->state = CONN_READY; 1141 return (ret); 1142} 1143 1144/* 1145 * Examine the rec part of a handshake message to see if it has any version 1146 * information in it. This is the magic that lets us allows version-aware sites 1147 * to exchange information, and yet avoids tripping up v1 sites, which don't 1148 * know how to look for it. 1149 */ 1150static int 1151find_version_info(env, conn, vi) 1152 ENV *env; 1153 REPMGR_CONNECTION *conn; 1154 DBT *vi; 1155{ 1156 DBT *dbt; 1157 char *hostname; 1158 size_t hostname_len; 1159 1160 dbt = &conn->input.repmgr_msg.rec; 1161 if (dbt->size == 0) { 1162 __db_errx(env, "handshake is missing rec part"); 1163 return (DB_REP_UNAVAIL); 1164 } 1165 hostname = dbt->data; 1166 hostname[dbt->size-1] = '\0'; 1167 hostname_len = strlen(hostname); 1168 if (hostname_len + 1 == dbt->size) { 1169 /* 1170 * The rec DBT held only the host name. This is a simple legacy 1171 * V1 handshake; it contains no version information. 1172 */ 1173 vi->size = 0; 1174 } else { 1175 /* 1176 * There's more data than just the host name. The remainder is 1177 * available to be treated as a normal byte buffer (and read in 1178 * by one of the unmarshal functions). Note that the remaining 1179 * length should not include the padding byte that we have 1180 * already clobbered. 1181 */ 1182 vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1]; 1183 vi->size = (dbt->size - (hostname_len+1)) - 1; 1184 } 1185 return (0); 1186} 1187 1188static int 1189accept_handshake(env, conn, hostname) 1190 ENV *env; 1191 REPMGR_CONNECTION *conn; 1192 char *hostname; 1193{ 1194 __repmgr_handshake_args hs; 1195 1196 /* Extract port and priority from cntrl. */ 1197 if (__repmgr_handshake_unmarshal(env, &hs, 1198 conn->input.repmgr_msg.cntrl.data, 1199 conn->input.repmgr_msg.cntrl.size, NULL) != 0) 1200 return (DB_REP_UNAVAIL); 1201 1202 return (process_parameters(env, conn, hostname, hs.port, hs.priority)); 1203} 1204 1205static int 1206accept_v1_handshake(env, conn, hostname) 1207 ENV *env; 1208 REPMGR_CONNECTION *conn; 1209 char *hostname; 1210{ 1211 DB_REPMGR_V1_HANDSHAKE *handshake; 1212 u_int32_t prio; 1213 1214 handshake = conn->input.repmgr_msg.cntrl.data; 1215 if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) || 1216 handshake->version != 1) { 1217 __db_errx(env, "malformed V1 handshake"); 1218 return (DB_REP_UNAVAIL); 1219 } 1220 1221 conn->version = 1; 1222 prio = ntohl(handshake->priority); 1223 return (process_parameters(env, conn, hostname, handshake->port, prio)); 1224} 1225 1226static int 1227process_parameters(env, conn, host, port, priority) 1228 ENV *env; 1229 REPMGR_CONNECTION *conn; 1230 char *host; 1231 u_int port; 1232 u_int32_t priority; 1233{ 1234 DB_REP *db_rep; 1235 REPMGR_RETRY *retry; 1236 REPMGR_SITE *site; 1237 repmgr_netaddr_t addr; 1238 int ret, eid; 1239 1240 db_rep = env->rep_handle; 1241 1242 if (F_ISSET(conn, CONN_INCOMING)) { 1243 /* 1244 * Now that we've been given the host and port, use them to find 1245 * the site (or create a new one if necessary, etc.). 1246 */ 1247 if (IS_VALID_EID(eid = __repmgr_find_site(env, host, port))) { 1248 site = SITE_FROM_EID(eid); 1249 if (site->state == SITE_IDLE) { 1250 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1251 "handshake from idle site %s:%u", 1252 host, port)); 1253 retry = site->ref.retry; 1254 TAILQ_REMOVE(&db_rep->retries, retry, entries); 1255 __os_free(env, retry); 1256 } else { 1257 /* 1258 * We got an incoming connection for a site we 1259 * were already connected to; at least we 1260 * thought we were. 1261 */ 1262 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1263 "connection from %s:%u supersedes existing", 1264 host, port)); 1265 1266 /* 1267 * No need to schedule a retry for later, since 1268 * we now have a replacement connection. 1269 */ 1270 DISABLE_CONNECTION(site->ref.conn); 1271 } 1272 conn->eid = eid; 1273 site->state = SITE_CONNECTED; 1274 site->ref.conn = conn; 1275 } else { 1276 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1277 "handshake introduces unknown site %s:%u", 1278 host, port)); 1279 if ((ret = __repmgr_pack_netaddr(env, 1280 host, port, NULL, &addr)) != 0) 1281 return (ret); 1282 if ((ret = __repmgr_new_site(env, 1283 &site, &addr, SITE_CONNECTED)) != 0) { 1284 __repmgr_cleanup_netaddr(env, &addr); 1285 return (ret); 1286 } 1287 conn->eid = EID_FROM_SITE(site); 1288 site->ref.conn = conn; 1289 } 1290 } else { 1291 /* 1292 * Since we initiated this as an outgoing connection, we 1293 * obviously already know the host, port and site. We just need 1294 * the other site's priority. 1295 */ 1296 DB_ASSERT(env, IS_VALID_EID(conn->eid)); 1297 site = SITE_FROM_EID(conn->eid); 1298 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1299 "handshake from connection to %s:%lu", 1300 site->net_addr.host, (u_long)site->net_addr.port)); 1301 } 1302 1303 site->priority = priority; 1304 F_SET(site, SITE_HAS_PRIO); 1305 1306 /* 1307 * If we're moping around wishing we knew who the master was, then 1308 * getting in touch with another site might finally provide sufficient 1309 * connectivity to find out. But just do this once, because otherwise 1310 * we get messages while the subsequent rep_start operations are going 1311 * on, and rep tosses them in that case. 1312 */ 1313 if (db_rep->master_eid == DB_EID_INVALID && !db_rep->done_one) { 1314 db_rep->done_one = TRUE; 1315 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1316 "handshake with no known master to wake election thread")); 1317 if ((ret = __repmgr_init_election(env, ELECT_REPSTART)) != 0) 1318 return (ret); 1319 } 1320 1321 return (0); 1322} 1323 1324static int 1325record_ack(env, conn) 1326 ENV *env; 1327 REPMGR_CONNECTION *conn; 1328{ 1329 DB_REP *db_rep; 1330 REPMGR_SITE *site; 1331 __repmgr_ack_args *ackp, ack; 1332 SITE_STRING_BUFFER location; 1333 int ret; 1334 1335 db_rep = env->rep_handle; 1336 1337 DB_ASSERT(env, conn->version > 0 && 1338 IS_READY_STATE(conn->state) && IS_VALID_EID(conn->eid)); 1339 site = SITE_FROM_EID(conn->eid); 1340 1341 /* 1342 * Extract the LSN. Save it only if it is an improvement over what the 1343 * site has already ack'ed. 1344 */ 1345 if (conn->version == 1) { 1346 ackp = conn->input.repmgr_msg.cntrl.data; 1347 if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) || 1348 conn->input.repmgr_msg.rec.size != 0) { 1349 __db_errx(env, "bad ack msg size"); 1350 return (DB_REP_UNAVAIL); 1351 } 1352 } else { 1353 ackp = &ack; 1354 if ((ret = __repmgr_ack_unmarshal(env, ackp, 1355 conn->input.repmgr_msg.cntrl.data, 1356 conn->input.repmgr_msg.cntrl.size, NULL)) != 0) 1357 return (DB_REP_UNAVAIL); 1358 } 1359 1360 /* Ignore stale acks. */ 1361 if (ackp->generation < db_rep->generation) { 1362 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1363 "ignoring stale ack (%lu<%lu), from %s", 1364 (u_long)ackp->generation, (u_long)db_rep->generation, 1365 __repmgr_format_site_loc(site, location))); 1366 return (0); 1367 } 1368 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 1369 "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file, 1370 (u_long)ackp->lsn.offset, (u_long)ackp->generation, 1371 __repmgr_format_site_loc(site, location))); 1372 1373 if (ackp->generation == db_rep->generation && 1374 log_compare(&ackp->lsn, &site->max_ack) == 1) { 1375 memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN)); 1376 if ((ret = __repmgr_wake_waiting_senders(env)) != 0) 1377 return (ret); 1378 } 1379 return (0); 1380} 1381 1382/* 1383 * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *)); 1384 */ 1385int 1386__repmgr_write_some(env, conn) 1387 ENV *env; 1388 REPMGR_CONNECTION *conn; 1389{ 1390 QUEUED_OUTPUT *output; 1391 REPMGR_FLAT *msg; 1392 int bytes, ret; 1393 1394 while (!STAILQ_EMPTY(&conn->outbound_queue)) { 1395 output = STAILQ_FIRST(&conn->outbound_queue); 1396 msg = output->msg; 1397 if ((bytes = send(conn->fd, &msg->data[output->offset], 1398 (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) { 1399 if ((ret = net_errno) == WOULDBLOCK) 1400 return (0); 1401 else { 1402 __db_err(env, ret, "writing data"); 1403 STAT(env->rep_handle-> 1404 region->mstat.st_connection_drop++); 1405 return (DB_REP_UNAVAIL); 1406 } 1407 } 1408 1409 if ((output->offset += (size_t)bytes) >= msg->length) { 1410 STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); 1411 __os_free(env, output); 1412 conn->out_queue_length--; 1413 if (--msg->ref_count <= 0) 1414 __os_free(env, msg); 1415 1416 /* 1417 * We've achieved enough movement to free up at least 1418 * one space in the outgoing queue. Wake any message 1419 * threads that may be waiting for space. Leave 1420 * CONGESTED state so that when the queue reaches the 1421 * high-water mark again, the filling thread will be 1422 * allowed to try waiting again. 1423 */ 1424 conn->state = CONN_READY; 1425 if (conn->blockers > 0 && 1426 (ret = __repmgr_signal(&conn->drained)) != 0) 1427 return (ret); 1428 } 1429 } 1430 1431#ifdef DB_WIN32 1432 /* 1433 * With the queue now empty, it's time to relinquish ownership of this 1434 * connection again, so that the next call to send() can write the 1435 * message in line, instead of posting it to the queue for us. 1436 */ 1437 if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE) 1438 == SOCKET_ERROR) { 1439 ret = net_errno; 1440 __db_err(env, ret, "can't remove FD_WRITE event bit"); 1441 return (ret); 1442 } 1443#endif 1444 1445 return (0); 1446} 1447