1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9#include "db_config.h" 10 11#define __INCLUDE_NETWORKING 1 12#include "db_int.h" 13#include "dbinc/mp.h" 14 15/* 16 * The functions in this module implement a simple wire protocol for 17 * transmitting messages, both replication messages and our own internal control 18 * messages. The protocol is as follows: 19 * 20 * 1 byte - message type (defined in repmgr.h) 21 * 4 bytes - size of control 22 * 4 bytes - size of rec 23 * ? bytes - control 24 * ? bytes - rec 25 * 26 * where both sizes are 32-bit binary integers in network byte order. 27 * Either control or rec can have zero length, but even in this case the 28 * 4-byte length will be present. 29 * Putting both lengths right up at the front allows us to read in fewer 30 * phases, and allows us to allocate buffer space for both parts (plus a wrapper 31 * struct) at once. 32 */ 33 34/* 35 * In sending a message, we first try to send it in-line, in the sending thread, 36 * and without first copying the message, by using scatter/gather I/O, using 37 * iovecs to point to the various pieces of the message. If that all works 38 * without blocking, that's optimal. 39 * If we find that, for a particular connection, we can't send without 40 * blocking, then we must copy the message for sending later in the select() 41 * thread. In the course of doing that, we might as well "flatten" the message, 42 * forming one single buffer, to simplify life. Not only that, once we've gone 43 * to the trouble of doing that, other sites to which we also want to send the 44 * message (in the case of a broadcast), may as well take advantage of the 45 * simplified structure also. 46 * The sending_msg structure below holds it all. Note that this structure, 47 * and the "flat_msg" structure, are allocated separately, because (1) the 48 * flat_msg version is usually not needed; and (2) when a flat_msg is needed, it 49 * will need to live longer than the wrapping sending_msg structure. 50 * Note that, for the broadcast case, where we're going to use this 51 * repeatedly, the iovecs is a template that must be copied, since in normal use 52 * the iovecs pointers and lengths get adjusted after every partial write. 53 */ 54struct sending_msg { 55 REPMGR_IOVECS iovecs; 56 u_int8_t type; 57 u_int32_t control_size_buf, rec_size_buf; 58 REPMGR_FLAT *fmsg; 59}; 60 61static int final_cleanup __P((ENV *, REPMGR_CONNECTION *, void *)); 62static int flatten __P((ENV *, struct sending_msg *)); 63static void remove_connection __P((ENV *, REPMGR_CONNECTION *)); 64static int __repmgr_close_connection __P((ENV *, REPMGR_CONNECTION *)); 65static int __repmgr_destroy_connection __P((ENV *, REPMGR_CONNECTION *)); 66static void setup_sending_msg 67 __P((struct sending_msg *, u_int, const DBT *, const DBT *)); 68static int __repmgr_send_internal 69 __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, int)); 70static int enqueue_msg 71 __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); 72static REPMGR_SITE *__repmgr_available_site __P((ENV *, int)); 73 74/* 75 * __repmgr_send -- 76 * The send function for DB_ENV->rep_set_transport. 77 * 78 * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, 79 * PUBLIC: const DB_LSN *, int, u_int32_t)); 80 */ 81int 82__repmgr_send(dbenv, control, rec, lsnp, eid, flags) 83 DB_ENV *dbenv; 84 const DBT *control, *rec; 85 const DB_LSN *lsnp; 86 int eid; 87 u_int32_t flags; 88{ 89 DB_REP *db_rep; 90 REP *rep; 91 ENV *env; 92 REPMGR_CONNECTION *conn; 93 REPMGR_SITE *site; 94 u_int available, nclients, needed, npeers_sent, nsites_sent; 95 int ret, t_ret; 96 97 env = dbenv->env; 98 db_rep = env->rep_handle; 99 rep = db_rep->region; 100 ret = 0; 101 102 LOCK_MUTEX(db_rep->mutex); 103 104 /* 105 * If we're already "finished", we can't send anything. This covers the 106 * case where a bulk buffer is flushed at env close, or perhaps an 107 * unexpected __repmgr_thread_failure. 108 */ 109 if (db_rep->finished) { 110 ret = DB_REP_UNAVAIL; 111 goto out; 112 } 113 114 /* 115 * Check whether we need to refresh our site address information with 116 * more recent updates from shared memory. 117 */ 118 if (rep->siteaddr_seq > db_rep->siteaddr_seq && 119 (ret = __repmgr_sync_siteaddr(env)) != 0) 120 goto out; 121 122 if (eid == DB_EID_BROADCAST) { 123 if ((ret = __repmgr_send_broadcast(env, REPMGR_REP_MESSAGE, 124 control, rec, &nsites_sent, &npeers_sent)) != 0) 125 goto out; 126 } else { 127 DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(eid)); 128 129 /* 130 * If this is a request that can be sent anywhere, then see if 131 * we can send it to our peer (to save load on the master), but 132 * not if it's a rerequest, 'cuz that likely means we tried this 133 * already and failed. 134 */ 135 if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) == 136 DB_REP_ANYWHERE && 137 IS_VALID_EID(db_rep->peer) && 138 (site = __repmgr_available_site(env, db_rep->peer)) != 139 NULL) { 140 RPRINT(env, DB_VERB_REPMGR_MISC, 141 (env, "sending request to peer")); 142 } else if ((site = __repmgr_available_site(env, eid)) == 143 NULL) { 144 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 145 "ignoring message sent to unavailable site")); 146 ret = DB_REP_UNAVAIL; 147 goto out; 148 } 149 150 conn = site->ref.conn; 151 /* Pass the "blockable" argument as TRUE. */ 152 if ((ret = __repmgr_send_one(env, conn, REPMGR_REP_MESSAGE, 153 control, rec, TRUE)) == DB_REP_UNAVAIL && 154 (t_ret = __repmgr_bust_connection(env, conn)) != 0) 155 ret = t_ret; 156 if (ret != 0) 157 goto out; 158 159 nsites_sent = 1; 160 npeers_sent = site->priority > 0 ? 1 : 0; 161 } 162 /* 163 * Right now, nsites and npeers represent the (maximum) number of sites 164 * we've attempted to begin sending the message to. Of course we 165 * haven't really received any ack's yet. But since we've only sent to 166 * nsites/npeers other sites, that's the maximum number of ack's we 167 * could possibly expect. If even that number fails to satisfy our PERM 168 * policy, there's no point waiting for something that will never 169 * happen. 170 */ 171 if (LF_ISSET(DB_REP_PERMANENT)) { 172 /* Number of sites in the group besides myself. */ 173 nclients = __repmgr_get_nsites(db_rep) - 1; 174 175 switch (db_rep->perm_policy) { 176 case DB_REPMGR_ACKS_NONE: 177 needed = 0; 178 COMPQUIET(available, 0); 179 break; 180 181 case DB_REPMGR_ACKS_ONE: 182 needed = 1; 183 available = nsites_sent; 184 break; 185 186 case DB_REPMGR_ACKS_ALL: 187 /* Number of sites in the group besides myself. */ 188 needed = nclients; 189 available = nsites_sent; 190 break; 191 192 case DB_REPMGR_ACKS_ONE_PEER: 193 needed = 1; 194 available = npeers_sent; 195 break; 196 197 case DB_REPMGR_ACKS_ALL_PEERS: 198 /* 199 * Too hard to figure out "needed", since we're not 200 * keeping track of how many peers we have; so just skip 201 * the optimization in this case. 202 */ 203 needed = 1; 204 available = npeers_sent; 205 break; 206 207 case DB_REPMGR_ACKS_QUORUM: 208 /* 209 * The minimum number of acks necessary to ensure that 210 * the transaction is durable if an election is held. 211 * (See note below at __repmgr_is_permanent, regarding 212 * the surprising inter-relationship between 213 * 2SITE_STRICT and QUORUM.) 214 */ 215 if (nclients > 1 || 216 FLD_ISSET(db_rep->region->config, 217 REP_C_2SITE_STRICT)) 218 needed = nclients / 2; 219 else 220 needed = 1; 221 available = npeers_sent; 222 break; 223 224 default: 225 COMPQUIET(available, 0); 226 COMPQUIET(needed, 0); 227 (void)__db_unknown_path(env, "__repmgr_send"); 228 break; 229 } 230 if (needed == 0) 231 goto out; 232 if (available < needed) { 233 ret = DB_REP_UNAVAIL; 234 goto out; 235 } 236 /* In ALL_PEERS case, display of "needed" might be confusing. */ 237 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 238 "will await acknowledgement: need %u", needed)); 239 ret = __repmgr_await_ack(env, lsnp); 240 } 241 242out: UNLOCK_MUTEX(db_rep->mutex); 243 if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) { 244 STAT(db_rep->region->mstat.st_perm_failed++); 245 DB_EVENT(env, DB_EVENT_REP_PERM_FAILED, NULL); 246 } 247 return (ret); 248} 249 250static REPMGR_SITE * 251__repmgr_available_site(env, eid) 252 ENV *env; 253 int eid; 254{ 255 DB_REP *db_rep; 256 REPMGR_SITE *site; 257 258 db_rep = env->rep_handle; 259 site = SITE_FROM_EID(eid); 260 if (site->state != SITE_CONNECTED) 261 return (NULL); 262 263 if (site->ref.conn->state == CONN_READY) 264 return (site); 265 return (NULL); 266} 267 268/* 269 * Synchronize our list of sites with new information that has been added to the 270 * list in the shared region. 271 * 272 * PUBLIC: int __repmgr_sync_siteaddr __P((ENV *)); 273 */ 274int 275__repmgr_sync_siteaddr(env) 276 ENV *env; 277{ 278 DB_REP *db_rep; 279 REP *rep; 280 char *host; 281 u_int added; 282 int ret; 283 284 db_rep = env->rep_handle; 285 rep = db_rep->region; 286 287 ret = 0; 288 289 MUTEX_LOCK(env, rep->mtx_repmgr); 290 291 if (db_rep->my_addr.host == NULL && rep->my_addr.host != INVALID_ROFF) { 292 host = R_ADDR(env->reginfo, rep->my_addr.host); 293 if ((ret = __repmgr_pack_netaddr(env, 294 host, rep->my_addr.port, NULL, &db_rep->my_addr)) != 0) 295 goto out; 296 } 297 298 added = db_rep->site_cnt; 299 if ((ret = __repmgr_copy_in_added_sites(env)) == 0) 300 ret = __repmgr_init_new_sites(env, added, db_rep->site_cnt); 301 302out: 303 MUTEX_UNLOCK(env, rep->mtx_repmgr); 304 return (ret); 305} 306 307/* 308 * Sends message to all sites with which we currently have an active 309 * connection. Sets result parameters according to how many sites we attempted 310 * to begin sending to, even if we did nothing more than queue it for later 311 * delivery. 312 * 313 * !!! 314 * Caller must hold env->mutex. 315 * PUBLIC: int __repmgr_send_broadcast __P((ENV *, u_int, 316 * PUBLIC: const DBT *, const DBT *, u_int *, u_int *)); 317 */ 318int 319__repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp) 320 ENV *env; 321 u_int type; 322 const DBT *control, *rec; 323 u_int *nsitesp, *npeersp; 324{ 325 DB_REP *db_rep; 326 struct sending_msg msg; 327 REPMGR_CONNECTION *conn; 328 REPMGR_SITE *site; 329 u_int eid, nsites, npeers; 330 int ret; 331 332 static const u_int version_max_msg_type[] = { 333 0, 334 REPMGR_MAX_V1_MSG_TYPE, 335 REPMGR_MAX_V2_MSG_TYPE, 336 REPMGR_MAX_V3_MSG_TYPE 337 }; 338 339 db_rep = env->rep_handle; 340 341 /* 342 * Sending a broadcast is quick, because we allow no blocking. So it 343 * shouldn't much matter. But just in case, take the timestamp before 344 * sending, so that if anything we err on the side of keeping clients 345 * placated (i.e., possibly sending a heartbeat slightly more frequently 346 * than necessary). 347 */ 348 __os_gettime(env, &db_rep->last_bcast, 1); 349 350 setup_sending_msg(&msg, type, control, rec); 351 nsites = npeers = 0; 352 353 /* Send to (only the main connection with) every site. */ 354 for (eid = 0; eid < db_rep->site_cnt; eid++) { 355 if ((site = __repmgr_available_site(env, (int)eid)) == NULL) 356 continue; 357 conn = site->ref.conn; 358 359 DB_ASSERT(env, IS_VALID_EID(conn->eid) && 360 conn->version > 0 && 361 conn->version <= DB_REPMGR_VERSION); 362 363 /* 364 * Skip if the type of message we're sending is beyond the range 365 * of known message types for this connection's version. 366 * 367 * !!! 368 * Don't be misled by the apparent generality of this simple 369 * test. It works currently, because the only kinds of messages 370 * that we broadcast are REP_MESSAGE and HEARTBEAT. But in the 371 * future other kinds of messages might require more intricate 372 * per-connection-version customization (for example, 373 * per-version message format conversion, addition of new 374 * fields, etc.). 375 */ 376 if (type > version_max_msg_type[conn->version]) 377 continue; 378 379 /* 380 * Broadcast messages are either application threads committing 381 * transactions, or replication status message that we can 382 * afford to lose. So don't allow blocking for them (pass 383 * "blockable" argument as FALSE). 384 */ 385 if ((ret = __repmgr_send_internal(env, 386 conn, &msg, FALSE)) == 0) { 387 site = SITE_FROM_EID(conn->eid); 388 nsites++; 389 if (site->priority > 0) 390 npeers++; 391 } else if (ret == DB_REP_UNAVAIL) { 392 if ((ret = __repmgr_bust_connection(env, conn)) != 0) 393 return (ret); 394 } else 395 return (ret); 396 } 397 398 *nsitesp = nsites; 399 *npeersp = npeers; 400 return (0); 401} 402 403/* 404 * __repmgr_send_one -- 405 * Send a message to a site, or if you can't just yet, make a copy of it 406 * and arrange to have it sent later. 'rec' may be NULL, in which case we send 407 * a zero length and no data. 408 * 409 * If we get an error, we take care of cleaning up the connection (calling 410 * __repmgr_bust_connection()), so that the caller needn't do so. 411 * 412 * !!! 413 * Note that the mutex should be held through this call. 414 * It doubles as a synchronizer to make sure that two threads don't 415 * intersperse writes that are part of two single messages. 416 * 417 * PUBLIC: int __repmgr_send_one __P((ENV *, REPMGR_CONNECTION *, 418 * PUBLIC: u_int, const DBT *, const DBT *, int)); 419 */ 420int 421__repmgr_send_one(env, conn, msg_type, control, rec, blockable) 422 ENV *env; 423 REPMGR_CONNECTION *conn; 424 u_int msg_type; 425 const DBT *control, *rec; 426 int blockable; 427{ 428 struct sending_msg msg; 429 430 setup_sending_msg(&msg, msg_type, control, rec); 431 return (__repmgr_send_internal(env, conn, &msg, blockable)); 432} 433 434/* 435 * Attempts a "best effort" to send a message on the given site. If there is an 436 * excessive backlog of message already queued on the connection, what shall we 437 * do? If the caller doesn't mind blocking, we'll wait (a limited amount of 438 * time) for the queue to drain. Otherwise we'll simply drop the message. This 439 * is always allowed by the replication protocol. But in the case of a 440 * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we 441 * almost always get a flood of messages that instantly fills our queue, so 442 * blocking improves performance (by avoiding the need for the client to 443 * re-request). 444 * 445 * How long shall we wait? We could of course create a new timeout 446 * configuration type, so that the application could set it directly. But that 447 * would start to overwhelm the user with too many choices to think about. We 448 * already have an ACK timeout, which is the user's estimate of how long it 449 * should take to send a message to the client, have it be processed, and return 450 * a message back to us. We multiply that by the queue size, because that's how 451 * many messages have to be swallowed up by the client before we're able to 452 * start sending again (at least to a rough approximation). 453 */ 454static int 455__repmgr_send_internal(env, conn, msg, blockable) 456 ENV *env; 457 REPMGR_CONNECTION *conn; 458 struct sending_msg *msg; 459 int blockable; 460{ 461 DB_REP *db_rep; 462 REPMGR_IOVECS iovecs; 463 SITE_STRING_BUFFER buffer; 464 db_timeout_t drain_to; 465 int ret; 466 size_t nw; 467 size_t total_written; 468 469 db_rep = env->rep_handle; 470 471 DB_ASSERT(env, 472 conn->state != CONN_CONNECTING && conn->state != CONN_DEFUNCT); 473 if (!STAILQ_EMPTY(&conn->outbound_queue)) { 474 /* 475 * Output to this site is currently owned by the select() 476 * thread, so we can't try sending in-line here. We can only 477 * queue the msg for later. 478 */ 479 RPRINT(env, DB_VERB_REPMGR_MISC, 480 (env, "msg to %s to be queued", 481 __repmgr_format_eid_loc(env->rep_handle, 482 conn->eid, buffer))); 483 if (conn->out_queue_length >= OUT_QUEUE_LIMIT && 484 blockable && conn->state != CONN_CONGESTED) { 485 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 486 "block msg thread, await queue space")); 487 488 if ((drain_to = db_rep->ack_timeout) == 0) 489 drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT; 490 RPRINT(env, DB_VERB_REPMGR_MISC, 491 (env, "will await drain")); 492 conn->blockers++; 493 ret = __repmgr_await_drain(env, 494 conn, drain_to * OUT_QUEUE_LIMIT); 495 conn->blockers--; 496 RPRINT(env, DB_VERB_REPMGR_MISC, (env, 497 "drain returned %d (%d,%d)", ret, 498 db_rep->finished, conn->out_queue_length)); 499 if (db_rep->finished) 500 return (DB_TIMEOUT); 501 if (ret != 0) 502 return (ret); 503 if (STAILQ_EMPTY(&conn->outbound_queue)) 504 goto empty; 505 } 506 if (conn->out_queue_length < OUT_QUEUE_LIMIT) 507 return (enqueue_msg(env, conn, msg, 0)); 508 else { 509 RPRINT(env, DB_VERB_REPMGR_MISC, 510 (env, "queue limit exceeded")); 511 STAT(env->rep_handle-> 512 region->mstat.st_msgs_dropped++); 513 return (blockable ? DB_TIMEOUT : 0); 514 } 515 } 516empty: 517 518 /* 519 * Send as much data to the site as we can, without blocking. Keep 520 * writing as long as we're making some progress. Make a scratch copy 521 * of iovecs for our use, since we destroy it in the process of 522 * adjusting pointers after each partial I/O. 523 */ 524 memcpy(&iovecs, &msg->iovecs, sizeof(iovecs)); 525 total_written = 0; 526 while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset], 527 iovecs.count-iovecs.offset, &nw)) == 0) { 528 total_written += nw; 529 if (__repmgr_update_consumed(&iovecs, nw)) /* all written */ 530 return (0); 531 } 532 533 if (ret != WOULDBLOCK) { 534#ifdef EBADF 535 DB_ASSERT(env, ret != EBADF); 536#endif 537 __db_err(env, ret, "socket writing failure"); 538 STAT(env->rep_handle->region->mstat.st_connection_drop++); 539 return (DB_REP_UNAVAIL); 540 } 541 542 RPRINT(env, DB_VERB_REPMGR_MISC, (env, "wrote only %lu bytes to %s", 543 (u_long)total_written, 544 __repmgr_format_eid_loc(env->rep_handle, conn->eid, buffer))); 545 /* 546 * We can't send any more without blocking: queue (a pointer to) a 547 * "flattened" copy of the message, so that the select() thread will 548 * finish sending it later. 549 */ 550 if ((ret = enqueue_msg(env, conn, msg, total_written)) != 0) 551 return (ret); 552 553 STAT(env->rep_handle->region->mstat.st_msgs_queued++); 554 555 /* 556 * Wake the main select thread so that it can discover that it has 557 * received ownership of this connection. Note that we didn't have to 558 * do this in the previous case (above), because the non-empty queue 559 * implies that the select() thread is already managing ownership of 560 * this connection. 561 */ 562#ifdef DB_WIN32 563 if (WSAEventSelect(conn->fd, conn->event_object, 564 FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) { 565 ret = net_errno; 566 __db_err(env, ret, "can't add FD_WRITE event bit"); 567 return (ret); 568 } 569#endif 570 return (__repmgr_wake_main_thread(env)); 571} 572 573/* 574 * PUBLIC: int __repmgr_is_permanent __P((ENV *, const DB_LSN *)); 575 * 576 * Count up how many sites have ack'ed the given LSN. Returns TRUE if enough 577 * sites have ack'ed; FALSE otherwise. 578 * 579 * !!! 580 * Caller must hold the mutex. 581 */ 582int 583__repmgr_is_permanent(env, lsnp) 584 ENV *env; 585 const DB_LSN *lsnp; 586{ 587 DB_REP *db_rep; 588 REPMGR_SITE *site; 589 u_int eid, nsites, npeers; 590 int is_perm, has_missing_peer; 591 592 db_rep = env->rep_handle; 593 594 if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE) 595 return (TRUE); 596 597 nsites = npeers = 0; 598 has_missing_peer = FALSE; 599 for (eid = 0; eid < db_rep->site_cnt; eid++) { 600 site = SITE_FROM_EID(eid); 601 if (!F_ISSET(site, SITE_HAS_PRIO)) { 602 /* 603 * Never connected to this site: since we can't know 604 * whether it's a peer, assume the worst. 605 */ 606 has_missing_peer = TRUE; 607 continue; 608 } 609 610 if (LOG_COMPARE(&site->max_ack, lsnp) >= 0) { 611 nsites++; 612 if (site->priority > 0) 613 npeers++; 614 } else { 615 /* This site hasn't ack'ed the message. */ 616 if (site->priority > 0) 617 has_missing_peer = TRUE; 618 } 619 } 620 621 switch (db_rep->perm_policy) { 622 case DB_REPMGR_ACKS_ONE: 623 is_perm = (nsites >= 1); 624 break; 625 case DB_REPMGR_ACKS_ONE_PEER: 626 is_perm = (npeers >= 1); 627 break; 628 case DB_REPMGR_ACKS_QUORUM: 629 /* 630 * The minimum number of acks necessary to ensure that the 631 * transaction is durable if an election is held (given that we 632 * always conduct elections according to the standard, 633 * recommended practice of requiring votes from a majority of 634 * sites). 635 */ 636 if (__repmgr_get_nsites(db_rep) == 2 && 637 !FLD_ISSET(db_rep->region->config, REP_C_2SITE_STRICT)) { 638 /* 639 * Unless instructed otherwise, our special handling for 640 * 2-site groups means that a client that loses contact 641 * with the master elects itself master (even though 642 * that doesn't constitute a majority). In order to 643 * provide the expected guarantee implied by the 644 * definition of "quorum" we have to fudge the ack 645 * calculation in this case: specifically, we need to 646 * make sure that the client has received it in order 647 * for us to consider it "perm". 648 * 649 * Note that turning the usual strict behavior back on 650 * in a 2-site group results in "0" as the number of 651 * clients needed to ack a txn in order for it to have 652 * arrived at a quorum. This is the correct result, 653 * strange as it may seem! This may well mean that in a 654 * 2-site group the QUORUM policy is rarely the right 655 * choice. 656 */ 657 is_perm = (npeers >= 1); 658 } else 659 is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2); 660 break; 661 case DB_REPMGR_ACKS_ALL: 662 /* Adjust by 1, since get_nsites includes local site. */ 663 is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1); 664 break; 665 case DB_REPMGR_ACKS_ALL_PEERS: 666 if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) { 667 /* Assume missing site might be a peer. */ 668 has_missing_peer = TRUE; 669 } 670 is_perm = !has_missing_peer; 671 break; 672 default: 673 is_perm = FALSE; 674 (void)__db_unknown_path(env, "__repmgr_is_permanent"); 675 } 676 return (is_perm); 677} 678 679/* 680 * Abandons a connection, to recover from an error. Takes necessary recovery 681 * action. Note that we don't actually close and clean up the connection here; 682 * that happens later, in the select() thread main loop. See further 683 * explanation at function __repmgr_disable_connection(). 684 * 685 * PUBLIC: int __repmgr_bust_connection __P((ENV *, REPMGR_CONNECTION *)); 686 * 687 * !!! 688 * Caller holds mutex. 689 */ 690int 691__repmgr_bust_connection(env, conn) 692 ENV *env; 693 REPMGR_CONNECTION *conn; 694{ 695 DB_REP *db_rep; 696 REPMGR_SITE *site; 697 int connecting, ret, subordinate_conn, eid; 698 699 db_rep = env->rep_handle; 700 ret = 0; 701 702 eid = conn->eid; 703 connecting = (conn->state == CONN_CONNECTING); 704 705 __repmgr_disable_connection(env, conn); 706 707 /* 708 * Any sort of connection, in any active state, could produce an error. 709 * But when we're done DEFUNCT-ifying it here it should end up on the 710 * orphans list. So, move it if it's not already there. 711 */ 712 if (IS_VALID_EID(eid)) { 713 site = SITE_FROM_EID(eid); 714 subordinate_conn = (conn != site->ref.conn); 715 716 /* Note: schedule_connection_attempt wakes the main thread. */ 717 if (!subordinate_conn && 718 (ret = __repmgr_schedule_connection_attempt(env, 719 (u_int)eid, FALSE)) != 0) 720 return (ret); 721 722 /* 723 * If the failed connection was the one between us and the 724 * master, assume that the master may have failed, and call for 725 * an election. But only do this for the connection to the main 726 * master process, not a subordinate one. And only do it if 727 * we're our site's main process, not a subordinate one. And 728 * only do it if the connection had managed to progress beyond 729 * the "connecting" state, because otherwise it was just a 730 * reconnection attempt that may have found the site unreachable 731 * or the master process not running. 732 */ 733 if (!IS_SUBORDINATE(db_rep) && !subordinate_conn && 734 !connecting && eid == db_rep->master_eid && 735 (ret = __repmgr_init_election( 736 env, ELECT_FAILURE_ELECTION)) != 0) 737 return (ret); 738 } else { 739 /* 740 * The connection was not marked with a valid EID, so we know it 741 * must have been an incoming connection in the very early 742 * stages. Obviously it's correct for us to avoid the 743 * site-specific recovery steps above. But even if we have just 744 * learned which site the connection came from, and are killing 745 * it because it's redundant, it means we already have a 746 * perfectly fine connection, and so -- again -- it makes sense 747 * for us to be skipping scheduling a reconnection, and checking 748 * for a master crash. 749 * 750 * One way or another, make sure the main thread is poked, so 751 * that we do the deferred clean-up. 752 */ 753 ret = __repmgr_wake_main_thread(env); 754 } 755 return (ret); 756} 757 758/* 759 * Remove a connection from the possibility of any further activity, making sure 760 * it ends up on the main connections list, so that it will be cleaned up at the 761 * next opportunity in the select() thread. 762 * 763 * Various threads write onto TCP/IP sockets, and an I/O error could occur at 764 * any time. However, only the dedicated "select()" thread may close the socket 765 * file descriptor, because under POSIX we have to drop our mutex and then call 766 * select() as two distinct (non-atomic) operations. 767 * 768 * To simplify matters, there is a single place in the select thread where we 769 * close and clean up after any defunct connection. Even if the I/O error 770 * happens in the select thread we follow this convention. 771 * 772 * When an error occurs, we disable the connection (mark it defunct so that no 773 * one else will try to use it, and so that the select thread will find it and 774 * clean it up), and then usually take some additional recovery action: schedule 775 * a connection retry for later, and possibly call for an election if it was a 776 * connection to the master. (This happens in the function 777 * __repmgr_bust_connection.) But sometimes we don't want to do the recovery 778 * part; just the disabling part. 779 * 780 * PUBLIC: void __repmgr_disable_connection __P((ENV *, REPMGR_CONNECTION *)); 781 */ 782void 783__repmgr_disable_connection(env, conn) 784 ENV *env; 785 REPMGR_CONNECTION *conn; 786{ 787 DB_REP *db_rep; 788 REPMGR_SITE *site; 789 int eid; 790 791 db_rep = env->rep_handle; 792 eid = conn->eid; 793 794 if (IS_VALID_EID(eid)) { 795 site = SITE_FROM_EID(eid); 796 if (conn != site->ref.conn) 797 /* It's a subordinate connection. */ 798 TAILQ_REMOVE(&site->sub_conns, conn, entries); 799 TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries); 800 } 801 802 conn->state = CONN_DEFUNCT; 803 conn->eid = -1; 804} 805 806/* 807 * PUBLIC: int __repmgr_cleanup_connection __P((ENV *, REPMGR_CONNECTION *)); 808 * 809 * !!! 810 * Idempotent. This can be called repeatedly as blocking message threads (of 811 * which there could be multiples) wake up in case of error on the connection. 812 */ 813int 814__repmgr_cleanup_connection(env, conn) 815 ENV *env; 816 REPMGR_CONNECTION *conn; 817{ 818 DB_REP *db_rep; 819 int ret; 820 821 db_rep = env->rep_handle; 822 823 if ((ret = __repmgr_close_connection(env, conn)) != 0) 824 goto out; 825 826 /* 827 * If there's a blocked message thread waiting, we mustn't yank the 828 * connection struct out from under it. Instead, just wake it up. 829 * We'll get another chance to come back through here soon. 830 */ 831 if (conn->blockers > 0) { 832 ret = __repmgr_signal(&conn->drained); 833 goto out; 834 } 835 836 DB_ASSERT(env, !IS_VALID_EID(conn->eid) && conn->state == CONN_DEFUNCT); 837 TAILQ_REMOVE(&db_rep->connections, conn, entries); 838 ret = __repmgr_destroy_connection(env, conn); 839 840out: 841 return (ret); 842} 843 844static void 845remove_connection(env, conn) 846 ENV *env; 847 REPMGR_CONNECTION *conn; 848{ 849 DB_REP *db_rep; 850 REPMGR_SITE *site; 851 852 db_rep = env->rep_handle; 853 if (IS_VALID_EID(conn->eid)) { 854 site = SITE_FROM_EID(conn->eid); 855 856 if (site->state == SITE_CONNECTED && conn == site->ref.conn) { 857 /* Not on any list, so no need to do anything. */ 858 } else 859 TAILQ_REMOVE(&site->sub_conns, conn, entries); 860 } else 861 TAILQ_REMOVE(&db_rep->connections, conn, entries); 862} 863 864static int 865__repmgr_close_connection(env, conn) 866 ENV *env; 867 REPMGR_CONNECTION *conn; 868{ 869 int ret; 870 871 DB_ASSERT(env, 872 conn->state == CONN_DEFUNCT || env->rep_handle->finished); 873 874 ret = 0; 875 if (conn->fd != INVALID_SOCKET) { 876 ret = closesocket(conn->fd); 877 conn->fd = INVALID_SOCKET; 878 if (ret == SOCKET_ERROR) { 879 ret = net_errno; 880 __db_err(env, ret, "closing socket"); 881 } 882#ifdef DB_WIN32 883 if (!WSACloseEvent(conn->event_object) && ret == 0) 884 ret = net_errno; 885#endif 886 } 887 return (ret); 888} 889 890static int 891__repmgr_destroy_connection(env, conn) 892 ENV *env; 893 REPMGR_CONNECTION *conn; 894{ 895 QUEUED_OUTPUT *out; 896 REPMGR_FLAT *msg; 897 DBT *dbt; 898 int ret; 899 900 /* 901 * Deallocate any input and output buffers we may have. 902 */ 903 if (conn->reading_phase == DATA_PHASE) { 904 if (conn->msg_type == REPMGR_REP_MESSAGE) 905 __os_free(env, conn->input.rep_message); 906 else { 907 dbt = &conn->input.repmgr_msg.cntrl; 908 if (dbt->size > 0) 909 __os_free(env, dbt->data); 910 dbt = &conn->input.repmgr_msg.rec; 911 if (dbt->size > 0) 912 __os_free(env, dbt->data); 913 } 914 } 915 while (!STAILQ_EMPTY(&conn->outbound_queue)) { 916 out = STAILQ_FIRST(&conn->outbound_queue); 917 STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries); 918 msg = out->msg; 919 if (--msg->ref_count <= 0) 920 __os_free(env, msg); 921 __os_free(env, out); 922 } 923 924 ret = __repmgr_free_cond(&conn->drained); 925 __os_free(env, conn); 926 return (ret); 927} 928 929static int 930enqueue_msg(env, conn, msg, offset) 931 ENV *env; 932 REPMGR_CONNECTION *conn; 933 struct sending_msg *msg; 934 size_t offset; 935{ 936 QUEUED_OUTPUT *q_element; 937 int ret; 938 939 if (msg->fmsg == NULL && ((ret = flatten(env, msg)) != 0)) 940 return (ret); 941 if ((ret = __os_malloc(env, sizeof(QUEUED_OUTPUT), &q_element)) != 0) 942 return (ret); 943 q_element->msg = msg->fmsg; 944 msg->fmsg->ref_count++; /* encapsulation would be sweeter */ 945 q_element->offset = offset; 946 947 /* Put it on the connection's outbound queue. */ 948 STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries); 949 conn->out_queue_length++; 950 return (0); 951} 952 953/* 954 * Either "control" or "rec" (or both) may be NULL, in which case we treat it 955 * like a zero-length DBT. 956 */ 957static void 958setup_sending_msg(msg, type, control, rec) 959 struct sending_msg *msg; 960 u_int type; 961 const DBT *control, *rec; 962{ 963 u_int32_t control_size, rec_size; 964 965 /* 966 * The wire protocol is documented in a comment at the top of this 967 * module. 968 */ 969 __repmgr_iovec_init(&msg->iovecs); 970 msg->type = type; 971 __repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type)); 972 973 control_size = control == NULL ? 0 : control->size; 974 msg->control_size_buf = htonl(control_size); 975 __repmgr_add_buffer(&msg->iovecs, 976 &msg->control_size_buf, sizeof(msg->control_size_buf)); 977 978 rec_size = rec == NULL ? 0 : rec->size; 979 msg->rec_size_buf = htonl(rec_size); 980 __repmgr_add_buffer( 981 &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf)); 982 983 if (control->size > 0) 984 __repmgr_add_dbt(&msg->iovecs, control); 985 986 if (rec_size > 0) 987 __repmgr_add_dbt(&msg->iovecs, rec); 988 989 msg->fmsg = NULL; 990} 991 992/* 993 * Convert a message stored as iovec pointers to various pieces, into flattened 994 * form, by copying all the pieces, and then make the iovec just point to the 995 * new simplified form. 996 */ 997static int 998flatten(env, msg) 999 ENV *env; 1000 struct sending_msg *msg; 1001{ 1002 u_int8_t *p; 1003 size_t msg_size; 1004 int i, ret; 1005 1006 DB_ASSERT(env, msg->fmsg == NULL); 1007 1008 msg_size = msg->iovecs.total_bytes; 1009 if ((ret = __os_malloc(env, sizeof(*msg->fmsg) + msg_size, 1010 &msg->fmsg)) != 0) 1011 return (ret); 1012 msg->fmsg->length = msg_size; 1013 msg->fmsg->ref_count = 0; 1014 p = &msg->fmsg->data[0]; 1015 1016 for (i = 0; i < msg->iovecs.count; i++) { 1017 memcpy(p, msg->iovecs.vectors[i].iov_base, 1018 msg->iovecs.vectors[i].iov_len); 1019 p = &p[msg->iovecs.vectors[i].iov_len]; 1020 } 1021 __repmgr_iovec_init(&msg->iovecs); 1022 __repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size); 1023 return (0); 1024} 1025 1026/* 1027 * PUBLIC: REPMGR_SITE *__repmgr_find_site __P((ENV *, const char *, u_int)); 1028 */ 1029REPMGR_SITE * 1030__repmgr_find_site(env, host, port) 1031 ENV *env; 1032 const char *host; 1033 u_int port; 1034{ 1035 DB_REP *db_rep; 1036 REPMGR_SITE *site; 1037 u_int i; 1038 1039 db_rep = env->rep_handle; 1040 for (i = 0; i < db_rep->site_cnt; i++) { 1041 site = &db_rep->sites[i]; 1042 1043 if (strcmp(site->net_addr.host, host) == 0 && 1044 site->net_addr.port == port) 1045 return (site); 1046 } 1047 1048 return (NULL); 1049} 1050 1051/* 1052 * Initialize the fields of a (given) netaddr structure, with the given values. 1053 * We copy the host name, but take ownership of the ADDRINFO buffer. 1054 * 1055 * All inputs are assumed to have been already validated. 1056 * 1057 * PUBLIC: int __repmgr_pack_netaddr __P((ENV *, const char *, 1058 * PUBLIC: u_int, ADDRINFO *, repmgr_netaddr_t *)); 1059 */ 1060int 1061__repmgr_pack_netaddr(env, host, port, list, addr) 1062 ENV *env; 1063 const char *host; 1064 u_int port; 1065 ADDRINFO *list; 1066 repmgr_netaddr_t *addr; 1067{ 1068 int ret; 1069 1070 DB_ASSERT(env, host != NULL); 1071 1072 if ((ret = __os_strdup(env, host, &addr->host)) != 0) 1073 return (ret); 1074 addr->port = (u_int16_t)port; 1075 addr->address_list = list; 1076 addr->current = NULL; 1077 return (0); 1078} 1079 1080/* 1081 * PUBLIC: int __repmgr_getaddr __P((ENV *, 1082 * PUBLIC: const char *, u_int, int, ADDRINFO **)); 1083 */ 1084int 1085__repmgr_getaddr(env, host, port, flags, result) 1086 ENV *env; 1087 const char *host; 1088 u_int port; 1089 int flags; /* Matches struct addrinfo declaration. */ 1090 ADDRINFO **result; 1091{ 1092 ADDRINFO *answer, hints; 1093 char buffer[10]; /* 2**16 fits in 5 digits. */ 1094 1095 /* 1096 * Ports are really 16-bit unsigned values, but it's too painful to 1097 * push that type through the API. 1098 */ 1099 if (port > UINT16_MAX) { 1100 __db_errx(env, "port %u larger than max port %u", 1101 port, UINT16_MAX); 1102 return (EINVAL); 1103 } 1104 1105 memset(&hints, 0, sizeof(hints)); 1106 hints.ai_family = AF_UNSPEC; 1107 hints.ai_socktype = SOCK_STREAM; 1108 hints.ai_flags = flags; 1109 (void)snprintf(buffer, sizeof(buffer), "%u", port); 1110 1111 /* 1112 * Although it's generally bad to discard error information, the return 1113 * code from __os_getaddrinfo is undependable. Our callers at least 1114 * would like to be able to distinguish errors in getaddrinfo (which we 1115 * want to consider to be re-tryable), from other failure (e.g., EINVAL, 1116 * above). 1117 */ 1118 if (__os_getaddrinfo(env, host, port, buffer, &hints, &answer) != 0) 1119 return (DB_REP_UNAVAIL); 1120 *result = answer; 1121 1122 return (0); 1123} 1124 1125/* 1126 * Adds a new site to our array of known sites (unless it already exists), 1127 * and schedules it for immediate connection attempt. Whether it exists or not, 1128 * we set newsitep, either to the already existing site, or to the newly created 1129 * site. Unless newsitep is passed in as NULL, which is allowed. 1130 * 1131 * PUBLIC: int __repmgr_add_site 1132 * PUBLIC: __P((ENV *, const char *, u_int, REPMGR_SITE **, u_int32_t)); 1133 * 1134 * !!! 1135 * Caller is expected to hold db_rep->mutex on entry. 1136 */ 1137int 1138__repmgr_add_site(env, host, port, sitep, flags) 1139 ENV *env; 1140 const char *host; 1141 u_int port; 1142 REPMGR_SITE **sitep; 1143 u_int32_t flags; 1144{ 1145 int peer, state; 1146 1147 state = SITE_IDLE; 1148 peer = LF_ISSET(DB_REPMGR_PEER); 1149 return (__repmgr_add_site_int(env, host, port, sitep, peer, state)); 1150} 1151 1152/* 1153 * PUBLIC: int __repmgr_add_site_int 1154 * PUBLIC: __P((ENV *, const char *, u_int, REPMGR_SITE **, int, int)); 1155 */ 1156int 1157__repmgr_add_site_int(env, host, port, sitep, peer, state) 1158 ENV *env; 1159 const char *host; 1160 u_int port; 1161 REPMGR_SITE **sitep; 1162 int peer, state; 1163{ 1164 DB_REP *db_rep; 1165 REP *rep; 1166 DB_THREAD_INFO *ip; 1167 REPMGR_SITE *site; 1168 u_int base; 1169 int eid, locked, pre_exist, ret, t_ret; 1170 1171 db_rep = env->rep_handle; 1172 rep = db_rep->region; 1173 COMPQUIET(site, NULL); 1174 COMPQUIET(pre_exist, 0); 1175 eid = DB_EID_INVALID; 1176 1177 /* Make sure we're up to date before adding to our local list. */ 1178 ENV_ENTER(env, ip); 1179 MUTEX_LOCK(env, rep->mtx_repmgr); 1180 locked = TRUE; 1181 base = db_rep->site_cnt; 1182 if ((ret = __repmgr_copy_in_added_sites(env)) != 0) 1183 goto out; 1184 1185 /* Once we're this far, we're committed to doing init_new_sites. */ 1186 1187 /* If it's still not found, now it's safe to add it. */ 1188 if ((site = __repmgr_find_site(env, host, port)) == NULL) { 1189 pre_exist = FALSE; 1190 1191 /* 1192 * Store both locally and in shared region. 1193 */ 1194 if ((ret = __repmgr_new_site(env, 1195 &site, host, port, state)) != 0) 1196 goto init; 1197 eid = EID_FROM_SITE(site); 1198 DB_ASSERT(env, (u_int)eid == db_rep->site_cnt - 1); 1199 1200 if ((ret = __repmgr_share_netaddrs(env, 1201 rep, (u_int)eid, db_rep->site_cnt)) != 0) { 1202 /* 1203 * Rescind the added local slot. 1204 */ 1205 db_rep->site_cnt--; 1206 __repmgr_cleanup_netaddr(env, &site->net_addr); 1207 } 1208 } else { 1209 pre_exist = TRUE; 1210 eid = EID_FROM_SITE(site); 1211 } 1212 1213 /* 1214 * Bump sequence count explicitly, to cover the pre-exist case. In the 1215 * other case it's wasted, but that's not worth worrying about. 1216 */ 1217 if (peer) { 1218 db_rep->peer = rep->peer = EID_FROM_SITE(site); 1219 db_rep->siteaddr_seq = ++rep->siteaddr_seq; 1220 } 1221 1222init: 1223 MUTEX_UNLOCK(env, rep->mtx_repmgr); 1224 locked = FALSE; 1225 1226 /* 1227 * Initialize all new sites (including the ones we snarfed via 1228 * copy_in_added_sites), even if it doesn't include a pre_existing one. 1229 * But if the new one is already connected, it doesn't need this 1230 * initialization, so skip over that one (which we accomplish by making 1231 * two calls with sub-ranges). 1232 */ 1233 if (state != SITE_CONNECTED || eid == DB_EID_INVALID) 1234 t_ret = __repmgr_init_new_sites(env, base, db_rep->site_cnt); 1235 else 1236 if ((t_ret = __repmgr_init_new_sites(env, 1237 base, (u_int)eid)) == 0) 1238 t_ret = __repmgr_init_new_sites(env, 1239 (u_int)(eid+1), db_rep->site_cnt); 1240 if (t_ret != 0 && ret == 0) 1241 ret = t_ret; 1242 1243out: 1244 if (locked) 1245 MUTEX_UNLOCK(env, rep->mtx_repmgr); 1246 ENV_LEAVE(env, ip); 1247 if (ret == 0) { 1248 if (sitep != NULL) 1249 *sitep = site; 1250 if (pre_exist) 1251 ret = EEXIST; 1252 } 1253 return (ret); 1254} 1255 1256/* 1257 * Initialize a socket for listening. Sets a file descriptor for the socket, 1258 * ready for an accept() call in a thread that we're happy to let block. 1259 * 1260 * PUBLIC: int __repmgr_listen __P((ENV *)); 1261 */ 1262int 1263__repmgr_listen(env) 1264 ENV *env; 1265{ 1266 ADDRINFO *ai; 1267 DB_REP *db_rep; 1268 char *why; 1269 int sockopt, ret; 1270 socket_t s; 1271 1272 db_rep = env->rep_handle; 1273 1274 /* Use OOB value as sentinel to show no socket open. */ 1275 s = INVALID_SOCKET; 1276 1277 if ((ai = ADDR_LIST_FIRST(&db_rep->my_addr)) == NULL) { 1278 if ((ret = __repmgr_getaddr(env, db_rep->my_addr.host, 1279 db_rep->my_addr.port, AI_PASSIVE, &ai)) == 0) 1280 ADDR_LIST_INIT(&db_rep->my_addr, ai); 1281 else 1282 return (ret); 1283 } 1284 1285 /* 1286 * Given the assert is correct, we execute the loop at least once, which 1287 * means 'why' will have been set by the time it's needed. But of 1288 * course lint doesn't know about DB_ASSERT. 1289 */ 1290 COMPQUIET(why, ""); 1291 DB_ASSERT(env, ai != NULL); 1292 for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) { 1293 1294 if ((s = socket(ai->ai_family, 1295 ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) { 1296 why = "can't create listen socket"; 1297 continue; 1298 } 1299 1300 /* 1301 * When testing, it's common to kill and restart regularly. On 1302 * some systems, this causes bind to fail with "address in use" 1303 * errors unless this option is set. 1304 */ 1305 sockopt = 1; 1306 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt, 1307 sizeof(sockopt)) != 0) { 1308 why = "can't set REUSEADDR socket option"; 1309 break; 1310 } 1311 1312 if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) { 1313 why = "can't bind socket to listening address"; 1314 (void)closesocket(s); 1315 s = INVALID_SOCKET; 1316 continue; 1317 } 1318 1319 if (listen(s, 5) != 0) { 1320 why = "listen()"; 1321 break; 1322 } 1323 1324 if ((ret = __repmgr_set_nonblocking(s)) != 0) { 1325 __db_err(env, ret, "can't unblock listen socket"); 1326 goto clean; 1327 } 1328 1329 db_rep->listen_fd = s; 1330 return (0); 1331 } 1332 1333 ret = net_errno; 1334 __db_err(env, ret, why); 1335clean: if (s != INVALID_SOCKET) 1336 (void)closesocket(s); 1337 return (ret); 1338} 1339 1340/* 1341 * PUBLIC: int __repmgr_net_close __P((ENV *)); 1342 */ 1343int 1344__repmgr_net_close(env) 1345 ENV *env; 1346{ 1347 DB_REP *db_rep; 1348 REP *rep; 1349 int ret; 1350 1351 db_rep = env->rep_handle; 1352 rep = db_rep->region; 1353 1354 ret = __repmgr_each_connection(env, final_cleanup, NULL, FALSE); 1355 1356 if (db_rep->listen_fd != INVALID_SOCKET) { 1357 if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0) 1358 ret = net_errno; 1359 db_rep->listen_fd = INVALID_SOCKET; 1360 rep->listener = 0; 1361 } 1362 return (ret); 1363} 1364 1365static int 1366final_cleanup(env, conn, unused) 1367 ENV *env; 1368 REPMGR_CONNECTION *conn; 1369 void *unused; 1370{ 1371 int ret, t_ret; 1372 1373 COMPQUIET(unused, NULL); 1374 1375 remove_connection(env, conn); 1376 ret = __repmgr_close_connection(env, conn); 1377 if ((t_ret = __repmgr_destroy_connection(env, conn)) != 0 && ret == 0) 1378 ret = t_ret; 1379 return (ret); 1380} 1381 1382/* 1383 * PUBLIC: void __repmgr_net_destroy __P((ENV *, DB_REP *)); 1384 */ 1385void 1386__repmgr_net_destroy(env, db_rep) 1387 ENV *env; 1388 DB_REP *db_rep; 1389{ 1390 REPMGR_RETRY *retry; 1391 REPMGR_SITE *site; 1392 u_int i; 1393 1394 __repmgr_cleanup_netaddr(env, &db_rep->my_addr); 1395 1396 if (db_rep->sites == NULL) 1397 return; 1398 1399 while (!TAILQ_EMPTY(&db_rep->retries)) { 1400 retry = TAILQ_FIRST(&db_rep->retries); 1401 TAILQ_REMOVE(&db_rep->retries, retry, entries); 1402 __os_free(env, retry); 1403 } 1404 1405 DB_ASSERT(env, TAILQ_EMPTY(&db_rep->connections)); 1406 1407 for (i = 0; i < db_rep->site_cnt; i++) { 1408 site = &db_rep->sites[i]; 1409 DB_ASSERT(env, TAILQ_EMPTY(&site->sub_conns)); 1410 __repmgr_cleanup_netaddr(env, &site->net_addr); 1411 } 1412 __os_free(env, db_rep->sites); 1413 db_rep->sites = NULL; 1414} 1415