1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005,2008 Oracle. All rights reserved. 5 * 6 * $Id: repmgr_net.c,v 1.70 2008/03/13 17:31:28 mbrey Exp $ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13#include "dbinc/mp.h" 14 15/* 16 * The functions in this module implement a simple wire protocol for 17 * transmitting messages, both replication messages and our own internal control 18 * messages. The protocol is as follows: 19 * 20 * 1 byte - message type (defined in repmgr.h) 21 * 4 bytes - size of control 22 * 4 bytes - size of rec 23 * ? bytes - control 24 * ? bytes - rec 25 * 26 * where both sizes are 32-bit binary integers in network byte order. 27 * Either control or rec can have zero length, but even in this case the 28 * 4-byte length will be present. 29 * Putting both lengths right up at the front allows us to read in fewer 30 * phases, and allows us to allocate buffer space for both parts (plus a wrapper 31 * struct) at once. 32 */ 33 34/* 35 * In sending a message, we first try to send it in-line, in the sending thread, 36 * and without first copying the message, by using scatter/gather I/O, using 37 * iovecs to point to the various pieces of the message. If that all works 38 * without blocking, that's optimal. 39 * If we find that, for a particular connection, we can't send without 40 * blocking, then we must copy the message for sending later in the select() 41 * thread. In the course of doing that, we might as well "flatten" the message, 42 * forming one single buffer, to simplify life. Not only that, once we've gone 43 * to the trouble of doing that, other sites to which we also want to send the 44 * message (in the case of a broadcast), may as well take advantage of the 45 * simplified structure also. 46 * This structure holds it all. Note that this structure, and the 47 * "flat_msg" structure, are allocated separately, because (1) the flat_msg 48 * version is usually not needed; and (2) when it is needed, it will need to 49 * live longer than the wrapping sending_msg structure. 50 * Note that, for the broadcast case, where we're going to use this 51 * repeatedly, the iovecs is a template that must be copied, since in normal use 52 * the iovecs pointers and lengths get adjusted after every partial write. 53 */ 54struct sending_msg { 55 REPMGR_IOVECS iovecs; 56 u_int8_t type; 57 u_int32_t control_size_buf, rec_size_buf; 58 REPMGR_FLAT *fmsg; 59}; 60 61static int __repmgr_close_connection __P((ENV *, REPMGR_CONNECTION *)); 62static int __repmgr_destroy_connection __P((ENV *, REPMGR_CONNECTION *)); 63static void setup_sending_msg 64 __P((struct sending_msg *, u_int, const DBT *, const DBT *)); 65static int __repmgr_send_internal 66 __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, int)); 67static int enqueue_msg 68 __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); 69static int flatten __P((ENV *, struct sending_msg *)); 70static REPMGR_SITE *__repmgr_available_site __P((ENV *, int)); 71 72/* 73 * __repmgr_send -- 74 * The send function for DB_ENV->rep_set_transport. 75 * 76 * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, 77 * PUBLIC: const DB_LSN *, int, u_int32_t)); 78 */ 79int 80__repmgr_send(dbenv, control, rec, lsnp, eid, flags) 81 DB_ENV *dbenv; 82 const DBT *control, *rec; 83 const DB_LSN *lsnp; 84 int eid; 85 u_int32_t flags; 86{ 87 DB_REP *db_rep; 88 ENV *env; 89 REPMGR_CONNECTION *conn; 90 REPMGR_SITE *site; 91 u_int available, nclients, needed, npeers_sent, nsites_sent; 92 int ret, t_ret; 93 94 env = dbenv->env; 95 db_rep = env->rep_handle; 96 97 LOCK_MUTEX(db_rep->mutex); 98 if (eid == DB_EID_BROADCAST) { 99 if ((ret = __repmgr_send_broadcast(env, REPMGR_REP_MESSAGE, 100 control, rec, &nsites_sent, &npeers_sent)) != 0) 101 goto out; 102 } else { 103 /* 104 * If this is a request that can be sent anywhere, then see if 105 * we can send it to our peer (to save load on the master), but 106 * not if it's a rerequest, 'cuz that likely means we tried this 107 * already and failed. 108 */ 109 if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) == 110 DB_REP_ANYWHERE && 111 IS_VALID_EID(db_rep->peer) && 112 (site = __repmgr_available_site(env, db_rep->peer)) != 113 NULL) { 114 RPRINT(env, DB_VERB_REPMGR_MISC, 115 (env, "sending request to peer")); 116 } else if ((site = __repmgr_available_site(env, eid)) == 117 NULL) { 118 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 119 "ignoring message sent to unavailable site")); 120 ret = DB_REP_UNAVAIL; 121 goto out; 122 } 123 124 conn = site->ref.conn; 125 /* Pass the "blockable" argument as TRUE. */ 126 if ((ret = __repmgr_send_one(env, conn, REPMGR_REP_MESSAGE, 127 control, rec, TRUE)) == DB_REP_UNAVAIL && 128 (t_ret = __repmgr_bust_connection(env, conn)) != 0) 129 ret = t_ret; 130 if (ret != 0) 131 goto out; 132 133 nsites_sent = 1; 134 npeers_sent = site->priority > 0 ? 1 : 0; 135 } 136 /* 137 * Right now, nsites and npeers represent the (maximum) number of sites 138 * we've attempted to begin sending the message to. Of course we 139 * haven't really received any ack's yet. But since we've only sent to 140 * nsites/npeers other sites, that's the maximum number of ack's we 141 * could possibly expect. If even that number fails to satisfy our PERM 142 * policy, there's no point waiting for something that will never 143 * happen. 144 */ 145 if (LF_ISSET(DB_REP_PERMANENT)) { 146 /* Number of sites in the group besides myself. */ 147 nclients = __repmgr_get_nsites(db_rep) - 1; 148 149 switch (db_rep->perm_policy) { 150 case DB_REPMGR_ACKS_NONE: 151 needed = 0; 152 COMPQUIET(available, 0); 153 break; 154 155 case DB_REPMGR_ACKS_ONE: 156 needed = 1; 157 available = nsites_sent; 158 break; 159 160 case DB_REPMGR_ACKS_ALL: 161 /* Number of sites in the group besides myself. */ 162 needed = nclients; 163 available = nsites_sent; 164 break; 165 166 case DB_REPMGR_ACKS_ONE_PEER: 167 needed = 1; 168 available = npeers_sent; 169 break; 170 171 case DB_REPMGR_ACKS_ALL_PEERS: 172 /* 173 * Too hard to figure out "needed", since we're not 174 * keeping track of how many peers we have; so just skip 175 * the optimization in this case. 176 */ 177 needed = 1; 178 available = npeers_sent; 179 break; 180 181 case DB_REPMGR_ACKS_QUORUM: 182 /* 183 * The minimum number of acks necessary to ensure that 184 * the transaction is durable if an election is held. 185 * (See note below at __repmgr_is_permanent, regarding 186 * the surprising inter-relationship between 187 * 2SITE_STRICT and QUORUM.) 188 */ 189 if (nclients > 1 || 190 FLD_ISSET(db_rep->region->config, 191 REP_C_2SITE_STRICT)) 192 needed = nclients / 2; 193 else 194 needed = 1; 195 available = npeers_sent; 196 break; 197 198 default: 199 COMPQUIET(available, 0); 200 COMPQUIET(needed, 0); 201 (void)__db_unknown_path(env, "__repmgr_send"); 202 break; 203 } 204 if (needed == 0) 205 goto out; 206 if (available < needed) { 207 ret = DB_REP_UNAVAIL; 208 goto out; 209 } 210 /* In ALL_PEERS case, display of "needed" might be confusing. */ 211 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 212 "will await acknowledgement: need %u", needed)); 213 ret = __repmgr_await_ack(env, lsnp); 214 } 215 216out: UNLOCK_MUTEX(db_rep->mutex); 217 if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) { 218 STAT(db_rep->region->mstat.st_perm_failed++); 219 DB_EVENT(env, DB_EVENT_REP_PERM_FAILED, NULL); 220 } 221 return (ret); 222} 223 224static REPMGR_SITE * 225__repmgr_available_site(env, eid) 226 ENV *env; 227 int eid; 228{ 229 DB_REP *db_rep; 230 REPMGR_SITE *site; 231 232 db_rep = env->rep_handle; 233 site = SITE_FROM_EID(eid); 234 if (site->state != SITE_CONNECTED) 235 return (NULL); 236 237 if (site->ref.conn->state == CONN_READY) 238 return (site); 239 return (NULL); 240} 241 242/* 243 * Sends message to all sites with which we currently have an active 244 * connection. Sets result parameters according to how many sites we attempted 245 * to begin sending to, even if we did nothing more than queue it for later 246 * delivery. 247 * 248 * !!! 249 * Caller must hold env->mutex. 250 * PUBLIC: int __repmgr_send_broadcast __P((ENV *, u_int, 251 * PUBLIC: const DBT *, const DBT *, u_int *, u_int *)); 252 */ 253int 254__repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp) 255 ENV *env; 256 u_int type; 257 const DBT *control, *rec; 258 u_int *nsitesp, *npeersp; 259{ 260 DB_REP *db_rep; 261 struct sending_msg msg; 262 REPMGR_CONNECTION *conn; 263 REPMGR_SITE *site; 264 u_int nsites, npeers; 265 int ret; 266 267 static const u_int version_max_msg_type[] = { 268 0, REPMGR_MAX_V1_MSG_TYPE, REPMGR_MAX_V2_MSG_TYPE 269 }; 270 271 db_rep = env->rep_handle; 272 273 /* 274 * Sending a broadcast is quick, because we allow no blocking. So it 275 * shouldn't much matter. But just in case, take the timestamp before 276 * sending, so that if anything we err on the side of keeping clients 277 * placated (i.e., possibly sending a heartbeat slightly more frequently 278 * than necessary). 279 */ 280 __os_gettime(env, &db_rep->last_bcast, 1); 281 282 setup_sending_msg(&msg, type, control, rec); 283 nsites = npeers = 0; 284 285 /* 286 * Traverse the connections list. Here, even in bust_connection, we 287 * don't unlink the current list entry, so we can use the TAILQ_FOREACH 288 * macro. 289 */ 290 TAILQ_FOREACH(conn, &db_rep->connections, entries) { 291 if (conn->state != CONN_READY) 292 continue; 293 DB_ASSERT(env, IS_VALID_EID(conn->eid) && 294 conn->version > 0 && 295 conn->version <= DB_REPMGR_VERSION); 296 297 /* 298 * Skip if the type of message we're sending is beyond the range 299 * of known message types for this connection's version. 300 * 301 * !!! 302 * Don't be misled by the apparent generality of this simple 303 * test. It works currently, because the only kinds of messages 304 * that we broadcast are REP_MESSAGE and HEARTBEAT. But in the 305 * future other kinds of messages might require more intricate 306 * per-connection-version customization (for example, 307 * per-version message format conversion, addition of new 308 * fields, etc.). 309 */ 310 if (type > version_max_msg_type[conn->version]) 311 continue; 312 313 /* 314 * Broadcast messages are either application threads committing 315 * transactions, or replication status message that we can 316 * afford to lose. So don't allow blocking for them (pass 317 * "blockable" argument as FALSE). 318 */ 319 if ((ret = __repmgr_send_internal(env, 320 conn, &msg, FALSE)) == 0) { 321 site = SITE_FROM_EID(conn->eid); 322 nsites++; 323 if (site->priority > 0) 324 npeers++; 325 } else if (ret == DB_REP_UNAVAIL) { 326 if ((ret = __repmgr_bust_connection(env, conn)) != 0) 327 return (ret); 328 } else 329 return (ret); 330 } 331 332 *nsitesp = nsites; 333 *npeersp = npeers; 334 return (0); 335} 336 337/* 338 * __repmgr_send_one -- 339 * Send a message to a site, or if you can't just yet, make a copy of it 340 * and arrange to have it sent later. 'rec' may be NULL, in which case we send 341 * a zero length and no data. 342 * 343 * If we get an error, we take care of cleaning up the connection (calling 344 * __repmgr_bust_connection()), so that the caller needn't do so. 345 * 346 * !!! 347 * Note that the mutex should be held through this call. 348 * It doubles as a synchronizer to make sure that two threads don't 349 * intersperse writes that are part of two single messages. 350 * 351 * PUBLIC: int __repmgr_send_one __P((ENV *, REPMGR_CONNECTION *, 352 * PUBLIC: u_int, const DBT *, const DBT *, int)); 353 */ 354int 355__repmgr_send_one(env, conn, msg_type, control, rec, blockable) 356 ENV *env; 357 REPMGR_CONNECTION *conn; 358 u_int msg_type; 359 const DBT *control, *rec; 360 int blockable; 361{ 362 struct sending_msg msg; 363 364 setup_sending_msg(&msg, msg_type, control, rec); 365 return (__repmgr_send_internal(env, conn, &msg, blockable)); 366} 367 368/* 369 * Attempts a "best effort" to send a message on the given site. If there is an 370 * excessive backlog of message already queued on the connection, what shall we 371 * do? If the caller doesn't mind blocking, we'll wait (a limited amount of 372 * time) for the queue to drain. Otherwise we'll simply drop the message. This 373 * is always allowed by the replication protocol. But in the case of a 374 * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we 375 * almost always get a flood of messages that instantly fills our queue, so 376 * blocking improves performance (by avoiding the need for the client to 377 * re-request). 378 * 379 * How long shall we wait? We could of course create a new timeout 380 * configuration type, so that the application could set it directly. But that 381 * would start to overwhelm the user with too many choices to think about. We 382 * already have an ACK timeout, which is the user's estimate of how long it 383 * should take to send a message to the client, have it be processed, and return 384 * a message back to us. We multiply that by the queue size, because that's how 385 * many messages have to be swallowed up by the client before we're able to 386 * start sending again (at least to a rough approximation). 387 */ 388static int 389__repmgr_send_internal(env, conn, msg, blockable) 390 ENV *env; 391 REPMGR_CONNECTION *conn; 392 struct sending_msg *msg; 393 int blockable; 394{ 395 DB_REP *db_rep; 396 REPMGR_IOVECS iovecs; 397 SITE_STRING_BUFFER buffer; 398 db_timeout_t drain_to; 399 int ret; 400 size_t nw; 401 size_t total_written; 402 403 db_rep = env->rep_handle; 404 405 DB_ASSERT(env, 406 conn->state != CONN_CONNECTING && conn->state != CONN_DEFUNCT); 407 if (!STAILQ_EMPTY(&conn->outbound_queue)) { 408 /* 409 * Output to this site is currently owned by the select() 410 * thread, so we can't try sending in-line here. We can only 411 * queue the msg for later. 412 */ 413 RPRINT(env, DB_VERB_REPMGR_MISC, 414 (env, "msg to %s to be queued", 415 __repmgr_format_eid_loc(env->rep_handle, 416 conn->eid, buffer))); 417 if (conn->out_queue_length >= OUT_QUEUE_LIMIT && 418 blockable && conn->state != CONN_CONGESTED) { 419 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 420 "block msg thread, await queue space")); 421 422 if ((drain_to = db_rep->ack_timeout) == 0) 423 drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT; 424 RPRINT(env, DB_VERB_REPMGR_MISC, 425 (env, "will await drain")); 426 conn->blockers++; 427 ret = __repmgr_await_drain(env, 428 conn, drain_to * OUT_QUEUE_LIMIT); 429 conn->blockers--; 430 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 431 "drain returned %d (%d,%d)", ret, 432 db_rep->finished, conn->out_queue_length)); 433 if (db_rep->finished) 434 return (DB_TIMEOUT); 435 if (ret != 0) 436 return (ret); 437 if (STAILQ_EMPTY(&conn->outbound_queue)) 438 goto empty; 439 } 440 if (conn->out_queue_length < OUT_QUEUE_LIMIT) 441 return (enqueue_msg(env, conn, msg, 0)); 442 else { 443 RPRINT(env, DB_VERB_REPMGR_MISC, 444 (env, "queue limit exceeded")); 445 STAT(env->rep_handle-> 446 region->mstat.st_msgs_dropped++); 447 return (blockable ? DB_TIMEOUT : 0); 448 } 449 } 450empty: 451 452 /* 453 * Send as much data to the site as we can, without blocking. Keep 454 * writing as long as we're making some progress. Make a scratch copy 455 * of iovecs for our use, since we destroy it in the process of 456 * adjusting pointers after each partial I/O. 457 */ 458 memcpy(&iovecs, &msg->iovecs, sizeof(iovecs)); 459 total_written = 0; 460 while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset], 461 iovecs.count-iovecs.offset, &nw)) == 0) { 462 total_written += nw; 463 if (__repmgr_update_consumed(&iovecs, nw)) /* all written */ 464 return (0); 465 } 466 467 if (ret != WOULDBLOCK) { 468 __db_err(env, ret, "socket writing failure"); 469 return (DB_REP_UNAVAIL); 470 } 471 472 RPRINT(env, DB_VERB_REPMGR_MISC, (env, "wrote only %lu bytes to %s", 473 (u_long)total_written, 474 __repmgr_format_eid_loc(env->rep_handle, conn->eid, buffer))); 475 /* 476 * We can't send any more without blocking: queue (a pointer to) a 477 * "flattened" copy of the message, so that the select() thread will 478 * finish sending it later. 479 */ 480 if ((ret = enqueue_msg(env, conn, msg, total_written)) != 0) 481 return (ret); 482 483 STAT(env->rep_handle->region->mstat.st_msgs_queued++); 484 485 /* 486 * Wake the main select thread so that it can discover that it has 487 * received ownership of this connection. Note that we didn't have to 488 * do this in the previous case (above), because the non-empty queue 489 * implies that the select() thread is already managing ownership of 490 * this connection. 491 */ 492#ifdef DB_WIN32 493 if (WSAEventSelect(conn->fd, conn->event_object, 494 FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) { 495 ret = net_errno; 496 __db_err(env, ret, "can't add FD_WRITE event bit"); 497 return (ret); 498 } 499#endif 500 return (__repmgr_wake_main_thread(env)); 501} 502 503/* 504 * PUBLIC: int __repmgr_is_permanent __P((ENV *, const DB_LSN *)); 505 * 506 * Count up how many sites have ack'ed the given LSN. Returns TRUE if enough 507 * sites have ack'ed; FALSE otherwise. 508 * 509 * !!! 510 * Caller must hold the mutex. 511 */ 512int 513__repmgr_is_permanent(env, lsnp) 514 ENV *env; 515 const DB_LSN *lsnp; 516{ 517 DB_REP *db_rep; 518 REPMGR_SITE *site; 519 u_int eid, nsites, npeers; 520 int is_perm, has_missing_peer; 521 522 db_rep = env->rep_handle; 523 524 if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE) 525 return (TRUE); 526 527 nsites = npeers = 0; 528 has_missing_peer = FALSE; 529 for (eid = 0; eid < db_rep->site_cnt; eid++) { 530 site = SITE_FROM_EID(eid); 531 if (!F_ISSET(site, SITE_HAS_PRIO)) { 532 /* 533 * Never connected to this site: since we can't know 534 * whether it's a peer, assume the worst. 535 */ 536 has_missing_peer = TRUE; 537 continue; 538 } 539 540 if (log_compare(&site->max_ack, lsnp) >= 0) { 541 nsites++; 542 if (site->priority > 0) 543 npeers++; 544 } else { 545 /* This site hasn't ack'ed the message. */ 546 if (site->priority > 0) 547 has_missing_peer = TRUE; 548 } 549 } 550 551 switch (db_rep->perm_policy) { 552 case DB_REPMGR_ACKS_ONE: 553 is_perm = (nsites >= 1); 554 break; 555 case DB_REPMGR_ACKS_ONE_PEER: 556 is_perm = (npeers >= 1); 557 break; 558 case DB_REPMGR_ACKS_QUORUM: 559 /* 560 * The minimum number of acks necessary to ensure that the 561 * transaction is durable if an election is held (given that we 562 * always conduct elections according to the standard, 563 * recommended practice of requiring votes from a majority of 564 * sites). 565 */ 566 if (__repmgr_get_nsites(db_rep) == 2 && 567 !FLD_ISSET(db_rep->region->config, REP_C_2SITE_STRICT)) { 568 /* 569 * Unless instructed otherwise, our special handling for 570 * 2-site groups means that a client that loses contact 571 * with the master elects itself master (even though 572 * that doesn't constitute a majority). In order to 573 * provide the expected guarantee implied by the 574 * definition of "quorum" we have to fudge the ack 575 * calculation in this case: specifically, we need to 576 * make sure that the client has received it in order 577 * for us to consider it "perm". 578 * 579 * Note that turning the usual strict behavior back on 580 * in a 2-site group results in "0" as the number of 581 * clients needed to ack a txn in order for it to have 582 * arrived at a quorum. This is the correct result, 583 * strange as it may seem! This may well mean that in a 584 * 2-site group the QUORUM policy is rarely the right 585 * choice. 586 */ 587 is_perm = (npeers >= 1); 588 } else 589 is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2); 590 break; 591 case DB_REPMGR_ACKS_ALL: 592 /* Adjust by 1, since get_nsites includes local site. */ 593 is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1); 594 break; 595 case DB_REPMGR_ACKS_ALL_PEERS: 596 if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) { 597 /* Assume missing site might be a peer. */ 598 has_missing_peer = TRUE; 599 } 600 is_perm = !has_missing_peer; 601 break; 602 default: 603 is_perm = FALSE; 604 (void)__db_unknown_path(env, "__repmgr_is_permanent"); 605 } 606 return (is_perm); 607} 608 609/* 610 * Abandons a connection, to recover from an error. Takes necessary recovery 611 * action. Note that we don't actually close and clean up the connection here; 612 * that happens later, in the select() thread main loop. See the definition of 613 * DISABLE_CONNECTION (repmgr.h) for more discussion. 614 * 615 * PUBLIC: int __repmgr_bust_connection __P((ENV *, 616 * PUBLIC: REPMGR_CONNECTION *)); 617 * 618 * !!! 619 * Caller holds mutex. 620 */ 621int 622__repmgr_bust_connection(env, conn) 623 ENV *env; 624 REPMGR_CONNECTION *conn; 625{ 626 DB_REP *db_rep; 627 int connecting, ret, eid; 628 629 db_rep = env->rep_handle; 630 ret = 0; 631 632 eid = conn->eid; 633 connecting = (conn->state == CONN_CONNECTING); 634 635 DISABLE_CONNECTION(conn); 636 637 /* 638 * When we first accepted the incoming connection, we set conn->eid to 639 * -1 to indicate that we didn't yet know what site it might be from. 640 * If we then get here because we later decide it was a redundant 641 * connection, the following scary stuff will correctly not happen. 642 */ 643 if (IS_VALID_EID(eid)) { 644 /* schedule_connection_attempt wakes the main thread. */ 645 if ((ret = __repmgr_schedule_connection_attempt( 646 env, (u_int)eid, FALSE)) != 0) 647 return (ret); 648 649 /* 650 * If this connection had gotten no further than the CONNECTING 651 * state, this can't count as a loss of connection to the 652 * master. 653 */ 654 if (!connecting && eid == db_rep->master_eid) { 655 (void)__memp_set_config( 656 env->dbenv, DB_MEMP_SYNC_INTERRUPT, 1); 657 if ((ret = __repmgr_init_election( 658 env, ELECT_FAILURE_ELECTION)) != 0) 659 return (ret); 660 } 661 } else { 662 /* 663 * One way or another, make sure the main thread is poked, so 664 * that we do the deferred clean-up. 665 */ 666 ret = __repmgr_wake_main_thread(env); 667 } 668 return (ret); 669} 670 671/* 672 * PUBLIC: int __repmgr_cleanup_connection 673 * PUBLIC: __P((ENV *, REPMGR_CONNECTION *)); 674 * 675 * !!! 676 * Idempotent. This can be called repeatedly as blocking message threads (of 677 * which there could be multiples) wake up in case of error on the connection. 678 */ 679int 680__repmgr_cleanup_connection(env, conn) 681 ENV *env; 682 REPMGR_CONNECTION *conn; 683{ 684 DB_REP *db_rep; 685 int ret; 686 687 db_rep = env->rep_handle; 688 689 if ((ret = __repmgr_close_connection(env, conn)) != 0) 690 goto out; 691 692 /* 693 * If there's a blocked message thread waiting, we mustn't yank the 694 * connection struct out from under it. Instead, just wake it up. 695 * We'll get another chance to come back through here soon. 696 */ 697 if (conn->blockers > 0) { 698 ret = __repmgr_signal(&conn->drained); 699 goto out; 700 } 701 702 TAILQ_REMOVE(&db_rep->connections, conn, entries); 703 704 ret = __repmgr_destroy_connection(env, conn); 705 706out: 707 return (ret); 708} 709 710static int 711__repmgr_close_connection(env, conn) 712 ENV *env; 713 REPMGR_CONNECTION *conn; 714{ 715 int ret; 716 717 DB_ASSERT(env, 718 conn->state == CONN_DEFUNCT || env->rep_handle->finished); 719 720 ret = 0; 721 if (conn->fd != INVALID_SOCKET) { 722 ret = closesocket(conn->fd); 723 conn->fd = INVALID_SOCKET; 724 if (ret == SOCKET_ERROR) { 725 ret = net_errno; 726 __db_err(env, ret, "closing socket"); 727 } 728#ifdef DB_WIN32 729 if (!WSACloseEvent(conn->event_object) && ret == 0) 730 ret = net_errno; 731#endif 732 } 733 return (ret); 734} 735 736static int 737__repmgr_destroy_connection(env, conn) 738 ENV *env; 739 REPMGR_CONNECTION *conn; 740{ 741 QUEUED_OUTPUT *out; 742 REPMGR_FLAT *msg; 743 DBT *dbt; 744 int ret; 745 746 /* 747 * Deallocate any input and output buffers we may have. 748 */ 749 if (conn->reading_phase == DATA_PHASE) { 750 if (conn->msg_type == REPMGR_REP_MESSAGE) 751 __os_free(env, conn->input.rep_message); 752 else { 753 dbt = &conn->input.repmgr_msg.cntrl; 754 if (dbt->size > 0) 755 __os_free(env, dbt->data); 756 dbt = &conn->input.repmgr_msg.rec; 757 if (dbt->size > 0) 758 __os_free(env, dbt->data); 759 } 760 } 761 while (!STAILQ_EMPTY(&conn->outbound_queue)) { 762 out = STAILQ_FIRST(&conn->outbound_queue); 763 STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); 764 msg = out->msg; 765 if (--msg->ref_count <= 0) 766 __os_free(env, msg); 767 __os_free(env, out); 768 } 769 770 ret = __repmgr_free_cond(&conn->drained); 771 __os_free(env, conn); 772 return (ret); 773} 774 775static int 776enqueue_msg(env, conn, msg, offset) 777 ENV *env; 778 REPMGR_CONNECTION *conn; 779 struct sending_msg *msg; 780 size_t offset; 781{ 782 QUEUED_OUTPUT *q_element; 783 int ret; 784 785 if (msg->fmsg == NULL && ((ret = flatten(env, msg)) != 0)) 786 return (ret); 787 if ((ret = __os_malloc(env, sizeof(QUEUED_OUTPUT), &q_element)) != 0) 788 return (ret); 789 q_element->msg = msg->fmsg; 790 msg->fmsg->ref_count++; /* encapsulation would be sweeter */ 791 q_element->offset = offset; 792 793 /* Put it on the connection's outbound queue. */ 794 STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries); 795 conn->out_queue_length++; 796 return (0); 797} 798 799/* 800 * Either "control" or "rec" (or both) may be NULL, in which case we treat it 801 * like a zero-length DBT. 802 */ 803static void 804setup_sending_msg(msg, type, control, rec) 805 struct sending_msg *msg; 806 u_int type; 807 const DBT *control, *rec; 808{ 809 u_int32_t control_size, rec_size; 810 811 /* 812 * The wire protocol is documented in a comment at the top of this 813 * module. 814 */ 815 __repmgr_iovec_init(&msg->iovecs); 816 msg->type = type; 817 __repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type)); 818 819 control_size = control == NULL ? 0 : control->size; 820 msg->control_size_buf = htonl(control_size); 821 __repmgr_add_buffer(&msg->iovecs, 822 &msg->control_size_buf, sizeof(msg->control_size_buf)); 823 824 rec_size = rec == NULL ? 0 : rec->size; 825 msg->rec_size_buf = htonl(rec_size); 826 __repmgr_add_buffer( 827 &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf)); 828 829 if (control->size > 0) 830 __repmgr_add_dbt(&msg->iovecs, control); 831 832 if (rec_size > 0) 833 __repmgr_add_dbt(&msg->iovecs, rec); 834 835 msg->fmsg = NULL; 836} 837 838/* 839 * Convert a message stored as iovec pointers to various pieces, into flattened 840 * form, by copying all the pieces, and then make the iovec just point to the 841 * new simplified form. 842 */ 843static int 844flatten(env, msg) 845 ENV *env; 846 struct sending_msg *msg; 847{ 848 u_int8_t *p; 849 size_t msg_size; 850 int i, ret; 851 852 DB_ASSERT(env, msg->fmsg == NULL); 853 854 msg_size = msg->iovecs.total_bytes; 855 if ((ret = __os_malloc(env, sizeof(*msg->fmsg) + msg_size, 856 &msg->fmsg)) != 0) 857 return (ret); 858 msg->fmsg->length = msg_size; 859 msg->fmsg->ref_count = 0; 860 p = &msg->fmsg->data[0]; 861 862 for (i = 0; i < msg->iovecs.count; i++) { 863 memcpy(p, msg->iovecs.vectors[i].iov_base, 864 msg->iovecs.vectors[i].iov_len); 865 p = &p[msg->iovecs.vectors[i].iov_len]; 866 } 867 __repmgr_iovec_init(&msg->iovecs); 868 __repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size); 869 return (0); 870} 871 872/* 873 * PUBLIC: int __repmgr_find_site __P((ENV *, const char *, u_int)); 874 */ 875int 876__repmgr_find_site(env, host, port) 877 ENV *env; 878 const char *host; 879 u_int port; 880{ 881 DB_REP *db_rep; 882 REPMGR_SITE *site; 883 u_int i; 884 885 db_rep = env->rep_handle; 886 for (i = 0; i < db_rep->site_cnt; i++) { 887 site = &db_rep->sites[i]; 888 889 if (strcmp(site->net_addr.host, host) == 0 && 890 site->net_addr.port == port) 891 return ((int)i); 892 } 893 894 return (-1); 895} 896 897/* 898 * Stash a copy of the given host name and port number into a convenient data 899 * structure so that we can save it permanently. This is kind of like a 900 * constructor for a netaddr object, except that the caller supplies the memory 901 * for the base struct (though not the subordinate attachments). 902 * 903 * All inputs are assumed to have been already validated. 904 * 905 * PUBLIC: int __repmgr_pack_netaddr __P((ENV *, const char *, 906 * PUBLIC: u_int, ADDRINFO *, repmgr_netaddr_t *)); 907 */ 908int 909__repmgr_pack_netaddr(env, host, port, list, addr) 910 ENV *env; 911 const char *host; 912 u_int port; 913 ADDRINFO *list; 914 repmgr_netaddr_t *addr; 915{ 916 int ret; 917 918 DB_ASSERT(env, host != NULL); 919 920 if ((ret = __os_strdup(env, host, &addr->host)) != 0) 921 return (ret); 922 addr->port = (u_int16_t)port; 923 addr->address_list = list; 924 addr->current = NULL; 925 return (0); 926} 927 928/* 929 * PUBLIC: int __repmgr_getaddr __P((ENV *, 930 * PUBLIC: const char *, u_int, int, ADDRINFO **)); 931 */ 932int 933__repmgr_getaddr(env, host, port, flags, result) 934 ENV *env; 935 const char *host; 936 u_int port; 937 int flags; /* Matches struct addrinfo declaration. */ 938 ADDRINFO **result; 939{ 940 ADDRINFO *answer, hints; 941 char buffer[10]; /* 2**16 fits in 5 digits. */ 942#ifdef DB_WIN32 943 int ret; 944#endif 945 946 /* 947 * Ports are really 16-bit unsigned values, but it's too painful to 948 * push that type through the API. 949 */ 950 if (port > UINT16_MAX) { 951 __db_errx(env, "port %u larger than max port %u", 952 port, UINT16_MAX); 953 return (EINVAL); 954 } 955 956#ifdef DB_WIN32 957 if (!env->rep_handle->wsa_inited && 958 (ret = __repmgr_wsa_init(env)) != 0) 959 return (ret); 960#endif 961 962 memset(&hints, 0, sizeof(hints)); 963 hints.ai_family = AF_UNSPEC; 964 hints.ai_socktype = SOCK_STREAM; 965 hints.ai_flags = flags; 966 (void)snprintf(buffer, sizeof(buffer), "%u", port); 967 968 /* 969 * Although it's generally bad to discard error information, the return 970 * code from __os_getaddrinfo is undependable. Our callers at least 971 * would like to be able to distinguish errors in getaddrinfo (which we 972 * want to consider to be re-tryable), from other failure (e.g., EINVAL, 973 * above). 974 */ 975 if (__os_getaddrinfo(env, host, port, buffer, &hints, &answer) != 0) 976 return (DB_REP_UNAVAIL); 977 *result = answer; 978 979 return (0); 980} 981 982/* 983 * Adds a new site to our array of known sites (unless it already exists), 984 * and schedules it for immediate connection attempt. Whether it exists or not, 985 * we set newsitep, either to the already existing site, or to the newly created 986 * site. Unless newsitep is passed in as NULL, which is allowed. 987 * 988 * PUBLIC: int __repmgr_add_site 989 * PUBLIC: __P((ENV *, const char *, u_int, REPMGR_SITE **)); 990 * 991 * !!! 992 * Caller is expected to hold the mutex. 993 */ 994int 995__repmgr_add_site(env, host, port, newsitep) 996 ENV *env; 997 const char *host; 998 u_int port; 999 REPMGR_SITE **newsitep; 1000{ 1001 ADDRINFO *address_list; 1002 DB_REP *db_rep; 1003 repmgr_netaddr_t addr; 1004 REPMGR_SITE *site; 1005 int ret, eid; 1006 1007 ret = 0; 1008 db_rep = env->rep_handle; 1009 1010 if (IS_VALID_EID(eid = __repmgr_find_site(env, host, port))) { 1011 site = SITE_FROM_EID(eid); 1012 ret = EEXIST; 1013 goto out; 1014 } 1015 1016 if ((ret = __repmgr_getaddr( 1017 env, host, port, 0, &address_list)) == DB_REP_UNAVAIL) { 1018 /* Allow re-tryable errors. We'll try again later. */ 1019 address_list = NULL; 1020 } else if (ret != 0) 1021 return (ret); 1022 1023 if ((ret = __repmgr_pack_netaddr( 1024 env, host, port, address_list, &addr)) != 0) { 1025 __os_freeaddrinfo(env, address_list); 1026 return (ret); 1027 } 1028 1029 if ((ret = __repmgr_new_site(env, &site, &addr, SITE_IDLE)) != 0) { 1030 __repmgr_cleanup_netaddr(env, &addr); 1031 return (ret); 1032 } 1033 1034 if (db_rep->selector != NULL && 1035 (ret = __repmgr_schedule_connection_attempt( 1036 env, (u_int)EID_FROM_SITE(site), TRUE)) != 0) 1037 return (ret); 1038 1039 /* Note that we should only come here for success and EEXIST. */ 1040out: 1041 if (newsitep != NULL) 1042 *newsitep = site; 1043 return (ret); 1044} 1045 1046/* 1047 * Initializes net-related memory in the db_rep handle. 1048 * 1049 * PUBLIC: int __repmgr_net_create __P((DB_REP *)); 1050 */ 1051int 1052__repmgr_net_create(db_rep) 1053 DB_REP *db_rep; 1054{ 1055 db_rep->listen_fd = INVALID_SOCKET; 1056 db_rep->master_eid = DB_EID_INVALID; 1057 1058 TAILQ_INIT(&db_rep->connections); 1059 TAILQ_INIT(&db_rep->retries); 1060 1061 return (0); 1062} 1063 1064/* 1065 * listen_socket_init -- 1066 * Initialize a socket for listening. Sets 1067 * a file descriptor for the socket, ready for an accept() call 1068 * in a thread that we're happy to let block. 1069 * 1070 * PUBLIC: int __repmgr_listen __P((ENV *)); 1071 */ 1072int 1073__repmgr_listen(env) 1074 ENV *env; 1075{ 1076 ADDRINFO *ai; 1077 DB_REP *db_rep; 1078 char *why; 1079 int sockopt, ret; 1080 socket_t s; 1081 1082 db_rep = env->rep_handle; 1083 1084 /* Use OOB value as sentinel to show no socket open. */ 1085 s = INVALID_SOCKET; 1086 ai = ADDR_LIST_FIRST(&db_rep->my_addr); 1087 1088 /* 1089 * Given the assert is correct, we execute the loop at least once, which 1090 * means 'why' will have been set by the time it's needed. But I guess 1091 * lint doesn't know about DB_ASSERT. 1092 */ 1093 COMPQUIET(why, ""); 1094 DB_ASSERT(env, ai != NULL); 1095 for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) { 1096 1097 if ((s = socket(ai->ai_family, 1098 ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) { 1099 why = "can't create listen socket"; 1100 continue; 1101 } 1102 1103 /* 1104 * When testing, it's common to kill and restart regularly. On 1105 * some systems, this causes bind to fail with "address in use" 1106 * errors unless this option is set. 1107 */ 1108 sockopt = 1; 1109 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt, 1110 sizeof(sockopt)) != 0) { 1111 why = "can't set REUSEADDR socket option"; 1112 break; 1113 } 1114 1115 if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) { 1116 why = "can't bind socket to listening address"; 1117 (void)closesocket(s); 1118 s = INVALID_SOCKET; 1119 continue; 1120 } 1121 1122 if (listen(s, 5) != 0) { 1123 why = "listen()"; 1124 break; 1125 } 1126 1127 if ((ret = __repmgr_set_nonblocking(s)) != 0) { 1128 __db_err(env, ret, "can't unblock listen socket"); 1129 goto clean; 1130 } 1131 1132 db_rep->listen_fd = s; 1133 return (0); 1134 } 1135 1136 ret = net_errno; 1137 __db_err(env, ret, why); 1138clean: if (s != INVALID_SOCKET) 1139 (void)closesocket(s); 1140 return (ret); 1141} 1142 1143/* 1144 * PUBLIC: int __repmgr_net_close __P((ENV *)); 1145 */ 1146int 1147__repmgr_net_close(env) 1148 ENV *env; 1149{ 1150 DB_REP *db_rep; 1151 REPMGR_CONNECTION *conn; 1152#ifndef DB_WIN32 1153 struct sigaction sigact; 1154#endif 1155 int ret, t_ret; 1156 1157 db_rep = env->rep_handle; 1158 if (db_rep->listen_fd == INVALID_SOCKET) 1159 return (0); 1160 1161 ret = 0; 1162 while (!TAILQ_EMPTY(&db_rep->connections)) { 1163 conn = TAILQ_FIRST(&db_rep->connections); 1164 if ((t_ret = __repmgr_close_connection(env, conn)) != 0 && 1165 ret == 0) 1166 ret = t_ret; 1167 TAILQ_REMOVE(&db_rep->connections, conn, entries); 1168 if ((t_ret = __repmgr_destroy_connection(env, conn)) != 0 && 1169 ret == 0) 1170 ret = t_ret; 1171 } 1172 1173 if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0) 1174 ret = net_errno; 1175 1176#ifdef DB_WIN32 1177 /* Shut down the Windows sockets DLL. */ 1178 if (WSACleanup() == SOCKET_ERROR && ret == 0) 1179 ret = net_errno; 1180 db_rep->wsa_inited = FALSE; 1181#else 1182 /* Restore original SIGPIPE handling configuration. */ 1183 if (db_rep->chg_sig_handler) { 1184 memset(&sigact, 0, sizeof(sigact)); 1185 sigact.sa_handler = SIG_DFL; 1186 if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0) 1187 ret = errno; 1188 } 1189#endif 1190 db_rep->listen_fd = INVALID_SOCKET; 1191 return (ret); 1192} 1193 1194/* 1195 * PUBLIC: void __repmgr_net_destroy __P((ENV *, DB_REP *)); 1196 */ 1197void 1198__repmgr_net_destroy(env, db_rep) 1199 ENV *env; 1200 DB_REP *db_rep; 1201{ 1202 REPMGR_CONNECTION *conn; 1203 REPMGR_RETRY *retry; 1204 REPMGR_SITE *site; 1205 u_int i; 1206 1207 __repmgr_cleanup_netaddr(env, &db_rep->my_addr); 1208 1209 if (db_rep->sites == NULL) 1210 return; 1211 1212 while (!TAILQ_EMPTY(&db_rep->retries)) { 1213 retry = TAILQ_FIRST(&db_rep->retries); 1214 TAILQ_REMOVE(&db_rep->retries, retry, entries); 1215 __os_free(env, retry); 1216 } 1217 1218 while (!TAILQ_EMPTY(&db_rep->connections)) { 1219 conn = TAILQ_FIRST(&db_rep->connections); 1220 (void)__repmgr_destroy_connection(env, conn); 1221 } 1222 1223 for (i = 0; i < db_rep->site_cnt; i++) { 1224 site = &db_rep->sites[i]; 1225 __repmgr_cleanup_netaddr(env, &site->net_addr); 1226 } 1227 __os_free(env, db_rep->sites); 1228 db_rep->sites = NULL; 1229} 1230