1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13#include "dbinc/mp.h"
14
15/*
16 * The functions in this module implement a simple wire protocol for
17 * transmitting messages, both replication messages and our own internal control
18 * messages.  The protocol is as follows:
19 *
20 *      1 byte          - message type  (defined in repmgr.h)
21 *      4 bytes         - size of control
22 *      4 bytes         - size of rec
23 *      ? bytes         - control
24 *      ? bytes         - rec
25 *
26 * where both sizes are 32-bit binary integers in network byte order.
27 * Either control or rec can have zero length, but even in this case the
28 * 4-byte length will be present.
29 *     Putting both lengths right up at the front allows us to read in fewer
30 * phases, and allows us to allocate buffer space for both parts (plus a wrapper
31 * struct) at once.
32 */
33
34/*
35 * In sending a message, we first try to send it in-line, in the sending thread,
36 * and without first copying the message, by using scatter/gather I/O, using
37 * iovecs to point to the various pieces of the message.  If that all works
38 * without blocking, that's optimal.
39 *     If we find that, for a particular connection, we can't send without
40 * blocking, then we must copy the message for sending later in the select()
41 * thread.  In the course of doing that, we might as well "flatten" the message,
42 * forming one single buffer, to simplify life.  Not only that, once we've gone
43 * to the trouble of doing that, other sites to which we also want to send the
44 * message (in the case of a broadcast), may as well take advantage of the
45 * simplified structure also.
46 *     The sending_msg structure below holds it all.  Note that this structure,
47 * and the "flat_msg" structure, are allocated separately, because (1) the
48 * flat_msg version is usually not needed; and (2) when a flat_msg is needed, it
49 * will need to live longer than the wrapping sending_msg structure.
50 *     Note that, for the broadcast case, where we're going to use this
51 * repeatedly, the iovecs is a template that must be copied, since in normal use
52 * the iovecs pointers and lengths get adjusted after every partial write.
53 */
54struct sending_msg {
55	REPMGR_IOVECS iovecs;
56	u_int8_t type;
57	u_int32_t control_size_buf, rec_size_buf;
58	REPMGR_FLAT *fmsg;
59};
60
61static int final_cleanup __P((ENV *, REPMGR_CONNECTION *, void *));
62static int flatten __P((ENV *, struct sending_msg *));
63static void remove_connection __P((ENV *, REPMGR_CONNECTION *));
64static int __repmgr_close_connection __P((ENV *, REPMGR_CONNECTION *));
65static int __repmgr_destroy_connection __P((ENV *, REPMGR_CONNECTION *));
66static void setup_sending_msg
67    __P((struct sending_msg *, u_int, const DBT *, const DBT *));
68static int __repmgr_send_internal
69    __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
70static int enqueue_msg
71    __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
72static REPMGR_SITE *__repmgr_available_site __P((ENV *, int));
73
74/*
75 * __repmgr_send --
76 *	The send function for DB_ENV->rep_set_transport.
77 *
78 * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
79 * PUBLIC:     const DB_LSN *, int, u_int32_t));
80 */
81int
82__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
83	DB_ENV *dbenv;
84	const DBT *control, *rec;
85	const DB_LSN *lsnp;
86	int eid;
87	u_int32_t flags;
88{
89	DB_REP *db_rep;
90	REP *rep;
91	ENV *env;
92	REPMGR_CONNECTION *conn;
93	REPMGR_SITE *site;
94	u_int available, nclients, needed, npeers_sent, nsites_sent;
95	int ret, t_ret;
96
97	env = dbenv->env;
98	db_rep = env->rep_handle;
99	rep = db_rep->region;
100	ret = 0;
101
102	LOCK_MUTEX(db_rep->mutex);
103
104	/*
105	 * If we're already "finished", we can't send anything.  This covers the
106	 * case where a bulk buffer is flushed at env close, or perhaps an
107	 * unexpected __repmgr_thread_failure.
108	 */
109	if (db_rep->finished) {
110		ret = DB_REP_UNAVAIL;
111		goto out;
112	}
113
114	/*
115	 * Check whether we need to refresh our site address information with
116	 * more recent updates from shared memory.
117	 */
118	if (rep->siteaddr_seq > db_rep->siteaddr_seq &&
119	    (ret = __repmgr_sync_siteaddr(env)) != 0)
120		goto out;
121
122	if (eid == DB_EID_BROADCAST) {
123		if ((ret = __repmgr_send_broadcast(env, REPMGR_REP_MESSAGE,
124		    control, rec, &nsites_sent, &npeers_sent)) != 0)
125			goto out;
126	} else {
127		DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(eid));
128
129		/*
130		 * If this is a request that can be sent anywhere, then see if
131		 * we can send it to our peer (to save load on the master), but
132		 * not if it's a rerequest, 'cuz that likely means we tried this
133		 * already and failed.
134		 */
135		if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
136		    DB_REP_ANYWHERE &&
137		    IS_VALID_EID(db_rep->peer) &&
138		    (site = __repmgr_available_site(env, db_rep->peer)) !=
139		    NULL) {
140			RPRINT(env, DB_VERB_REPMGR_MISC,
141			    (env, "sending request to peer"));
142		} else if ((site = __repmgr_available_site(env, eid)) ==
143		    NULL) {
144			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
145			    "ignoring message sent to unavailable site"));
146			ret = DB_REP_UNAVAIL;
147			goto out;
148		}
149
150		conn = site->ref.conn;
151		/* Pass the "blockable" argument as TRUE. */
152		if ((ret = __repmgr_send_one(env, conn, REPMGR_REP_MESSAGE,
153		    control, rec, TRUE)) == DB_REP_UNAVAIL &&
154		    (t_ret = __repmgr_bust_connection(env, conn)) != 0)
155			ret = t_ret;
156		if (ret != 0)
157			goto out;
158
159		nsites_sent = 1;
160		npeers_sent = site->priority > 0 ? 1 : 0;
161	}
162	/*
163	 * Right now, nsites and npeers represent the (maximum) number of sites
164	 * we've attempted to begin sending the message to.  Of course we
165	 * haven't really received any ack's yet.  But since we've only sent to
166	 * nsites/npeers other sites, that's the maximum number of ack's we
167	 * could possibly expect.  If even that number fails to satisfy our PERM
168	 * policy, there's no point waiting for something that will never
169	 * happen.
170	 */
171	if (LF_ISSET(DB_REP_PERMANENT)) {
172		/* Number of sites in the group besides myself. */
173		nclients = __repmgr_get_nsites(db_rep) - 1;
174
175		switch (db_rep->perm_policy) {
176		case DB_REPMGR_ACKS_NONE:
177			needed = 0;
178			COMPQUIET(available, 0);
179			break;
180
181		case DB_REPMGR_ACKS_ONE:
182			needed = 1;
183			available = nsites_sent;
184			break;
185
186		case DB_REPMGR_ACKS_ALL:
187			/* Number of sites in the group besides myself. */
188			needed = nclients;
189			available = nsites_sent;
190			break;
191
192		case DB_REPMGR_ACKS_ONE_PEER:
193			needed = 1;
194			available = npeers_sent;
195			break;
196
197		case DB_REPMGR_ACKS_ALL_PEERS:
198			/*
199			 * Too hard to figure out "needed", since we're not
200			 * keeping track of how many peers we have; so just skip
201			 * the optimization in this case.
202			 */
203			needed = 1;
204			available = npeers_sent;
205			break;
206
207		case DB_REPMGR_ACKS_QUORUM:
208			/*
209			 * The minimum number of acks necessary to ensure that
210			 * the transaction is durable if an election is held.
211			 * (See note below at __repmgr_is_permanent, regarding
212			 * the surprising inter-relationship between
213			 * 2SITE_STRICT and QUORUM.)
214			 */
215			if (nclients > 1 ||
216			    FLD_ISSET(db_rep->region->config,
217			    REP_C_2SITE_STRICT))
218				needed = nclients / 2;
219			else
220				needed = 1;
221			available = npeers_sent;
222			break;
223
224		default:
225			COMPQUIET(available, 0);
226			COMPQUIET(needed, 0);
227			(void)__db_unknown_path(env, "__repmgr_send");
228			break;
229		}
230		if (needed == 0)
231			goto out;
232		if (available < needed) {
233			ret = DB_REP_UNAVAIL;
234			goto out;
235		}
236		/* In ALL_PEERS case, display of "needed" might be confusing. */
237		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
238		    "will await acknowledgement: need %u", needed));
239		ret = __repmgr_await_ack(env, lsnp);
240	}
241
242out:	UNLOCK_MUTEX(db_rep->mutex);
243	if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) {
244		STAT(db_rep->region->mstat.st_perm_failed++);
245		DB_EVENT(env, DB_EVENT_REP_PERM_FAILED, NULL);
246	}
247	return (ret);
248}
249
250static REPMGR_SITE *
251__repmgr_available_site(env, eid)
252	ENV *env;
253	int eid;
254{
255	DB_REP *db_rep;
256	REPMGR_SITE *site;
257
258	db_rep = env->rep_handle;
259	site = SITE_FROM_EID(eid);
260	if (site->state != SITE_CONNECTED)
261		return (NULL);
262
263	if (site->ref.conn->state == CONN_READY)
264		return (site);
265	return (NULL);
266}
267
268/*
269 * Synchronize our list of sites with new information that has been added to the
270 * list in the shared region.
271 *
272 * PUBLIC: int __repmgr_sync_siteaddr __P((ENV *));
273 */
274int
275__repmgr_sync_siteaddr(env)
276	ENV *env;
277{
278	DB_REP *db_rep;
279	REP *rep;
280	char *host;
281	u_int added;
282	int ret;
283
284	db_rep = env->rep_handle;
285	rep = db_rep->region;
286
287	ret = 0;
288
289	MUTEX_LOCK(env, rep->mtx_repmgr);
290
291	if (db_rep->my_addr.host == NULL && rep->my_addr.host != INVALID_ROFF) {
292		host = R_ADDR(env->reginfo, rep->my_addr.host);
293		if ((ret = __repmgr_pack_netaddr(env,
294		    host, rep->my_addr.port, NULL, &db_rep->my_addr)) != 0)
295			goto out;
296	}
297
298	added = db_rep->site_cnt;
299	if ((ret = __repmgr_copy_in_added_sites(env)) == 0)
300		ret = __repmgr_init_new_sites(env, added, db_rep->site_cnt);
301
302out:
303	MUTEX_UNLOCK(env, rep->mtx_repmgr);
304	return (ret);
305}
306
307/*
308 * Sends message to all sites with which we currently have an active
309 * connection.  Sets result parameters according to how many sites we attempted
310 * to begin sending to, even if we did nothing more than queue it for later
311 * delivery.
312 *
313 * !!!
314 * Caller must hold env->mutex.
315 * PUBLIC: int __repmgr_send_broadcast __P((ENV *, u_int,
316 * PUBLIC:    const DBT *, const DBT *, u_int *, u_int *));
317 */
318int
319__repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp)
320	ENV *env;
321	u_int type;
322	const DBT *control, *rec;
323	u_int *nsitesp, *npeersp;
324{
325	DB_REP *db_rep;
326	struct sending_msg msg;
327	REPMGR_CONNECTION *conn;
328	REPMGR_SITE *site;
329	u_int eid, nsites, npeers;
330	int ret;
331
332	static const u_int version_max_msg_type[] = {
333		0,
334		REPMGR_MAX_V1_MSG_TYPE,
335		REPMGR_MAX_V2_MSG_TYPE,
336		REPMGR_MAX_V3_MSG_TYPE
337	};
338
339	db_rep = env->rep_handle;
340
341	/*
342	 * Sending a broadcast is quick, because we allow no blocking.  So it
343	 * shouldn't much matter.  But just in case, take the timestamp before
344	 * sending, so that if anything we err on the side of keeping clients
345	 * placated (i.e., possibly sending a heartbeat slightly more frequently
346	 * than necessary).
347	 */
348	__os_gettime(env, &db_rep->last_bcast, 1);
349
350	setup_sending_msg(&msg, type, control, rec);
351	nsites = npeers = 0;
352
353	/* Send to (only the main connection with) every site. */
354	for (eid = 0; eid < db_rep->site_cnt; eid++) {
355		if ((site = __repmgr_available_site(env, (int)eid)) == NULL)
356			continue;
357		conn = site->ref.conn;
358
359		DB_ASSERT(env, IS_VALID_EID(conn->eid) &&
360		    conn->version > 0 &&
361		    conn->version <= DB_REPMGR_VERSION);
362
363		/*
364		 * Skip if the type of message we're sending is beyond the range
365		 * of known message types for this connection's version.
366		 *
367		 * !!!
368		 * Don't be misled by the apparent generality of this simple
369		 * test.  It works currently, because the only kinds of messages
370		 * that we broadcast are REP_MESSAGE and HEARTBEAT.  But in the
371		 * future other kinds of messages might require more intricate
372		 * per-connection-version customization (for example,
373		 * per-version message format conversion, addition of new
374		 * fields, etc.).
375		 */
376		if (type > version_max_msg_type[conn->version])
377			continue;
378
379		/*
380		 * Broadcast messages are either application threads committing
381		 * transactions, or replication status message that we can
382		 * afford to lose.  So don't allow blocking for them (pass
383		 * "blockable" argument as FALSE).
384		 */
385		if ((ret = __repmgr_send_internal(env,
386		    conn, &msg, FALSE)) == 0) {
387			site = SITE_FROM_EID(conn->eid);
388			nsites++;
389			if (site->priority > 0)
390				npeers++;
391		} else if (ret == DB_REP_UNAVAIL) {
392			if ((ret = __repmgr_bust_connection(env, conn)) != 0)
393				return (ret);
394		} else
395			return (ret);
396	}
397
398	*nsitesp = nsites;
399	*npeersp = npeers;
400	return (0);
401}
402
403/*
404 * __repmgr_send_one --
405 *	Send a message to a site, or if you can't just yet, make a copy of it
406 * and arrange to have it sent later.  'rec' may be NULL, in which case we send
407 * a zero length and no data.
408 *
409 * If we get an error, we take care of cleaning up the connection (calling
410 * __repmgr_bust_connection()), so that the caller needn't do so.
411 *
412 * !!!
413 * Note that the mutex should be held through this call.
414 * It doubles as a synchronizer to make sure that two threads don't
415 * intersperse writes that are part of two single messages.
416 *
417 * PUBLIC: int __repmgr_send_one __P((ENV *, REPMGR_CONNECTION *,
418 * PUBLIC:    u_int, const DBT *, const DBT *, int));
419 */
420int
421__repmgr_send_one(env, conn, msg_type, control, rec, blockable)
422	ENV *env;
423	REPMGR_CONNECTION *conn;
424	u_int msg_type;
425	const DBT *control, *rec;
426	int blockable;
427{
428	struct sending_msg msg;
429
430	setup_sending_msg(&msg, msg_type, control, rec);
431	return (__repmgr_send_internal(env, conn, &msg, blockable));
432}
433
434/*
435 * Attempts a "best effort" to send a message on the given site.  If there is an
436 * excessive backlog of message already queued on the connection, what shall we
437 * do?  If the caller doesn't mind blocking, we'll wait (a limited amount of
438 * time) for the queue to drain.  Otherwise we'll simply drop the message.  This
439 * is always allowed by the replication protocol.  But in the case of a
440 * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
441 * almost always get a flood of messages that instantly fills our queue, so
442 * blocking improves performance (by avoiding the need for the client to
443 * re-request).
444 *
445 * How long shall we wait?  We could of course create a new timeout
446 * configuration type, so that the application could set it directly.  But that
447 * would start to overwhelm the user with too many choices to think about.  We
448 * already have an ACK timeout, which is the user's estimate of how long it
449 * should take to send a message to the client, have it be processed, and return
450 * a message back to us.  We multiply that by the queue size, because that's how
451 * many messages have to be swallowed up by the client before we're able to
452 * start sending again (at least to a rough approximation).
453 */
454static int
455__repmgr_send_internal(env, conn, msg, blockable)
456	ENV *env;
457	REPMGR_CONNECTION *conn;
458	struct sending_msg *msg;
459	int blockable;
460{
461	DB_REP *db_rep;
462	REPMGR_IOVECS iovecs;
463	SITE_STRING_BUFFER buffer;
464	db_timeout_t drain_to;
465	int ret;
466	size_t nw;
467	size_t total_written;
468
469	db_rep = env->rep_handle;
470
471	DB_ASSERT(env,
472	    conn->state != CONN_CONNECTING && conn->state != CONN_DEFUNCT);
473	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
474		/*
475		 * Output to this site is currently owned by the select()
476		 * thread, so we can't try sending in-line here.  We can only
477		 * queue the msg for later.
478		 */
479		RPRINT(env, DB_VERB_REPMGR_MISC,
480		    (env, "msg to %s to be queued",
481		    __repmgr_format_eid_loc(env->rep_handle,
482		    conn->eid, buffer)));
483		if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
484		    blockable && conn->state != CONN_CONGESTED) {
485			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
486			    "block msg thread, await queue space"));
487
488			if ((drain_to = db_rep->ack_timeout) == 0)
489				drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
490			RPRINT(env, DB_VERB_REPMGR_MISC,
491			    (env, "will await drain"));
492			conn->blockers++;
493			ret = __repmgr_await_drain(env,
494			    conn, drain_to * OUT_QUEUE_LIMIT);
495			conn->blockers--;
496			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
497			    "drain returned %d (%d,%d)", ret,
498			    db_rep->finished, conn->out_queue_length));
499			if (db_rep->finished)
500				return (DB_TIMEOUT);
501			if (ret != 0)
502				return (ret);
503			if (STAILQ_EMPTY(&conn->outbound_queue))
504				goto empty;
505		}
506		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
507			return (enqueue_msg(env, conn, msg, 0));
508		else {
509			RPRINT(env, DB_VERB_REPMGR_MISC,
510			    (env, "queue limit exceeded"));
511			STAT(env->rep_handle->
512			    region->mstat.st_msgs_dropped++);
513			return (blockable ? DB_TIMEOUT : 0);
514		}
515	}
516empty:
517
518	/*
519	 * Send as much data to the site as we can, without blocking.  Keep
520	 * writing as long as we're making some progress.  Make a scratch copy
521	 * of iovecs for our use, since we destroy it in the process of
522	 * adjusting pointers after each partial I/O.
523	 */
524	memcpy(&iovecs, &msg->iovecs, sizeof(iovecs));
525	total_written = 0;
526	while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset],
527	    iovecs.count-iovecs.offset, &nw)) == 0) {
528		total_written += nw;
529		if (__repmgr_update_consumed(&iovecs, nw)) /* all written */
530			return (0);
531	}
532
533	if (ret != WOULDBLOCK) {
534#ifdef EBADF
535		DB_ASSERT(env, ret != EBADF);
536#endif
537		__db_err(env, ret, "socket writing failure");
538		STAT(env->rep_handle->region->mstat.st_connection_drop++);
539		return (DB_REP_UNAVAIL);
540	}
541
542	RPRINT(env, DB_VERB_REPMGR_MISC, (env, "wrote only %lu bytes to %s",
543	    (u_long)total_written,
544	    __repmgr_format_eid_loc(env->rep_handle, conn->eid, buffer)));
545	/*
546	 * We can't send any more without blocking: queue (a pointer to) a
547	 * "flattened" copy of the message, so that the select() thread will
548	 * finish sending it later.
549	 */
550	if ((ret = enqueue_msg(env, conn, msg, total_written)) != 0)
551		return (ret);
552
553	STAT(env->rep_handle->region->mstat.st_msgs_queued++);
554
555	/*
556	 * Wake the main select thread so that it can discover that it has
557	 * received ownership of this connection.  Note that we didn't have to
558	 * do this in the previous case (above), because the non-empty queue
559	 * implies that the select() thread is already managing ownership of
560	 * this connection.
561	 */
562#ifdef DB_WIN32
563	if (WSAEventSelect(conn->fd, conn->event_object,
564	    FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
565		ret = net_errno;
566		__db_err(env, ret, "can't add FD_WRITE event bit");
567		return (ret);
568	}
569#endif
570	return (__repmgr_wake_main_thread(env));
571}
572
573/*
574 * PUBLIC: int __repmgr_is_permanent __P((ENV *, const DB_LSN *));
575 *
576 * Count up how many sites have ack'ed the given LSN.  Returns TRUE if enough
577 * sites have ack'ed; FALSE otherwise.
578 *
579 * !!!
580 * Caller must hold the mutex.
581 */
582int
583__repmgr_is_permanent(env, lsnp)
584	ENV *env;
585	const DB_LSN *lsnp;
586{
587	DB_REP *db_rep;
588	REPMGR_SITE *site;
589	u_int eid, nsites, npeers;
590	int is_perm, has_missing_peer;
591
592	db_rep = env->rep_handle;
593
594	if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE)
595		return (TRUE);
596
597	nsites = npeers = 0;
598	has_missing_peer = FALSE;
599	for (eid = 0; eid < db_rep->site_cnt; eid++) {
600		site = SITE_FROM_EID(eid);
601		if (!F_ISSET(site, SITE_HAS_PRIO)) {
602			/*
603			 * Never connected to this site: since we can't know
604			 * whether it's a peer, assume the worst.
605			 */
606			has_missing_peer = TRUE;
607			continue;
608		}
609
610		if (LOG_COMPARE(&site->max_ack, lsnp) >= 0) {
611			nsites++;
612			if (site->priority > 0)
613				npeers++;
614		} else {
615			/* This site hasn't ack'ed the message. */
616			if (site->priority > 0)
617				has_missing_peer = TRUE;
618		}
619	}
620
621	switch (db_rep->perm_policy) {
622	case DB_REPMGR_ACKS_ONE:
623		is_perm = (nsites >= 1);
624		break;
625	case DB_REPMGR_ACKS_ONE_PEER:
626		is_perm = (npeers >= 1);
627		break;
628	case DB_REPMGR_ACKS_QUORUM:
629		/*
630		 * The minimum number of acks necessary to ensure that the
631		 * transaction is durable if an election is held (given that we
632		 * always conduct elections according to the standard,
633		 * recommended practice of requiring votes from a majority of
634		 * sites).
635		 */
636		if (__repmgr_get_nsites(db_rep) == 2 &&
637		    !FLD_ISSET(db_rep->region->config, REP_C_2SITE_STRICT)) {
638			/*
639			 * Unless instructed otherwise, our special handling for
640			 * 2-site groups means that a client that loses contact
641			 * with the master elects itself master (even though
642			 * that doesn't constitute a majority).  In order to
643			 * provide the expected guarantee implied by the
644			 * definition of "quorum" we have to fudge the ack
645			 * calculation in this case: specifically, we need to
646			 * make sure that the client has received it in order
647			 * for us to consider it "perm".
648			 *
649			 * Note that turning the usual strict behavior back on
650			 * in a 2-site group results in "0" as the number of
651			 * clients needed to ack a txn in order for it to have
652			 * arrived at a quorum.  This is the correct result,
653			 * strange as it may seem!  This may well mean that in a
654			 * 2-site group the QUORUM policy is rarely the right
655			 * choice.
656			 */
657			is_perm = (npeers >= 1);
658		} else
659			is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2);
660		break;
661	case DB_REPMGR_ACKS_ALL:
662		/* Adjust by 1, since get_nsites includes local site. */
663		is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1);
664		break;
665	case DB_REPMGR_ACKS_ALL_PEERS:
666		if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) {
667			/* Assume missing site might be a peer. */
668			has_missing_peer = TRUE;
669		}
670		is_perm = !has_missing_peer;
671		break;
672	default:
673		is_perm = FALSE;
674		(void)__db_unknown_path(env, "__repmgr_is_permanent");
675	}
676	return (is_perm);
677}
678
679/*
680 * Abandons a connection, to recover from an error.  Takes necessary recovery
681 * action.  Note that we don't actually close and clean up the connection here;
682 * that happens later, in the select() thread main loop.  See further
683 * explanation at function __repmgr_disable_connection().
684 *
685 * PUBLIC: int __repmgr_bust_connection __P((ENV *, REPMGR_CONNECTION *));
686 *
687 * !!!
688 * Caller holds mutex.
689 */
690int
691__repmgr_bust_connection(env, conn)
692	ENV *env;
693	REPMGR_CONNECTION *conn;
694{
695	DB_REP *db_rep;
696	REPMGR_SITE *site;
697	int connecting, ret, subordinate_conn, eid;
698
699	db_rep = env->rep_handle;
700	ret = 0;
701
702	eid = conn->eid;
703	connecting = (conn->state == CONN_CONNECTING);
704
705	__repmgr_disable_connection(env, conn);
706
707	/*
708	 * Any sort of connection, in any active state, could produce an error.
709	 * But when we're done DEFUNCT-ifying it here it should end up on the
710	 * orphans list.  So, move it if it's not already there.
711	 */
712	if (IS_VALID_EID(eid)) {
713		site = SITE_FROM_EID(eid);
714		subordinate_conn = (conn != site->ref.conn);
715
716		/* Note: schedule_connection_attempt wakes the main thread. */
717		if (!subordinate_conn &&
718		    (ret = __repmgr_schedule_connection_attempt(env,
719		    (u_int)eid, FALSE)) != 0)
720			return (ret);
721
722		/*
723		 * If the failed connection was the one between us and the
724		 * master, assume that the master may have failed, and call for
725		 * an election.  But only do this for the connection to the main
726		 * master process, not a subordinate one.  And only do it if
727		 * we're our site's main process, not a subordinate one.  And
728		 * only do it if the connection had managed to progress beyond
729		 * the "connecting" state, because otherwise it was just a
730		 * reconnection attempt that may have found the site unreachable
731		 * or the master process not running.
732		 */
733		if (!IS_SUBORDINATE(db_rep) && !subordinate_conn &&
734		    !connecting && eid == db_rep->master_eid &&
735		    (ret = __repmgr_init_election(
736		    env, ELECT_FAILURE_ELECTION)) != 0)
737			return (ret);
738	} else {
739		/*
740		 * The connection was not marked with a valid EID, so we know it
741		 * must have been an incoming connection in the very early
742		 * stages.  Obviously it's correct for us to avoid the
743		 * site-specific recovery steps above.  But even if we have just
744		 * learned which site the connection came from, and are killing
745		 * it because it's redundant, it means we already have a
746		 * perfectly fine connection, and so -- again -- it makes sense
747		 * for us to be skipping scheduling a reconnection, and checking
748		 * for a master crash.
749		 *
750		 * One way or another, make sure the main thread is poked, so
751		 * that we do the deferred clean-up.
752		 */
753		ret = __repmgr_wake_main_thread(env);
754	}
755	return (ret);
756}
757
758/*
759 * Remove a connection from the possibility of any further activity, making sure
760 * it ends up on the main connections list, so that it will be cleaned up at the
761 * next opportunity in the select() thread.
762 *
763 * Various threads write onto TCP/IP sockets, and an I/O error could occur at
764 * any time.  However, only the dedicated "select()" thread may close the socket
765 * file descriptor, because under POSIX we have to drop our mutex and then call
766 * select() as two distinct (non-atomic) operations.
767 *
768 * To simplify matters, there is a single place in the select thread where we
769 * close and clean up after any defunct connection.  Even if the I/O error
770 * happens in the select thread we follow this convention.
771 *
772 * When an error occurs, we disable the connection (mark it defunct so that no
773 * one else will try to use it, and so that the select thread will find it and
774 * clean it up), and then usually take some additional recovery action: schedule
775 * a connection retry for later, and possibly call for an election if it was a
776 * connection to the master.  (This happens in the function
777 * __repmgr_bust_connection.)  But sometimes we don't want to do the recovery
778 * part; just the disabling part.
779 *
780 * PUBLIC: void __repmgr_disable_connection __P((ENV *, REPMGR_CONNECTION *));
781 */
782void
783__repmgr_disable_connection(env, conn)
784	ENV *env;
785	REPMGR_CONNECTION *conn;
786{
787	DB_REP *db_rep;
788	REPMGR_SITE *site;
789	int eid;
790
791	db_rep = env->rep_handle;
792	eid = conn->eid;
793
794	if (IS_VALID_EID(eid)) {
795		site = SITE_FROM_EID(eid);
796		if (conn != site->ref.conn)
797			/* It's a subordinate connection. */
798			TAILQ_REMOVE(&site->sub_conns, conn, entries);
799		TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
800	}
801
802	conn->state = CONN_DEFUNCT;
803	conn->eid = -1;
804}
805
806/*
807 * PUBLIC: int __repmgr_cleanup_connection __P((ENV *, REPMGR_CONNECTION *));
808 *
809 * !!!
810 * Idempotent.  This can be called repeatedly as blocking message threads (of
811 * which there could be multiples) wake up in case of error on the connection.
812 */
813int
814__repmgr_cleanup_connection(env, conn)
815	ENV *env;
816	REPMGR_CONNECTION *conn;
817{
818	DB_REP *db_rep;
819	int ret;
820
821	db_rep = env->rep_handle;
822
823	if ((ret = __repmgr_close_connection(env, conn)) != 0)
824		goto out;
825
826	/*
827	 * If there's a blocked message thread waiting, we mustn't yank the
828	 * connection struct out from under it.  Instead, just wake it up.
829	 * We'll get another chance to come back through here soon.
830	 */
831	if (conn->blockers > 0) {
832		ret = __repmgr_signal(&conn->drained);
833		goto out;
834	}
835
836	DB_ASSERT(env, !IS_VALID_EID(conn->eid) && conn->state == CONN_DEFUNCT);
837	TAILQ_REMOVE(&db_rep->connections, conn, entries);
838	ret = __repmgr_destroy_connection(env, conn);
839
840out:
841	return (ret);
842}
843
844static void
845remove_connection(env, conn)
846	ENV *env;
847	REPMGR_CONNECTION *conn;
848{
849	DB_REP *db_rep;
850	REPMGR_SITE *site;
851
852	db_rep = env->rep_handle;
853	if (IS_VALID_EID(conn->eid)) {
854		site = SITE_FROM_EID(conn->eid);
855
856		if (site->state == SITE_CONNECTED && conn == site->ref.conn) {
857			/* Not on any list, so no need to do anything. */
858		} else
859			TAILQ_REMOVE(&site->sub_conns, conn, entries);
860	} else
861		TAILQ_REMOVE(&db_rep->connections, conn, entries);
862}
863
864static int
865__repmgr_close_connection(env, conn)
866	ENV *env;
867	REPMGR_CONNECTION *conn;
868{
869	int ret;
870
871	DB_ASSERT(env,
872	    conn->state == CONN_DEFUNCT || env->rep_handle->finished);
873
874	ret = 0;
875	if (conn->fd != INVALID_SOCKET) {
876		ret = closesocket(conn->fd);
877		conn->fd = INVALID_SOCKET;
878		if (ret == SOCKET_ERROR) {
879			ret = net_errno;
880			__db_err(env, ret, "closing socket");
881		}
882#ifdef DB_WIN32
883		if (!WSACloseEvent(conn->event_object) && ret == 0)
884			ret = net_errno;
885#endif
886	}
887	return (ret);
888}
889
890static int
891__repmgr_destroy_connection(env, conn)
892	ENV *env;
893	REPMGR_CONNECTION *conn;
894{
895	QUEUED_OUTPUT *out;
896	REPMGR_FLAT *msg;
897	DBT *dbt;
898	int ret;
899
900	/*
901	 * Deallocate any input and output buffers we may have.
902	 */
903	if (conn->reading_phase == DATA_PHASE) {
904		if (conn->msg_type == REPMGR_REP_MESSAGE)
905			__os_free(env, conn->input.rep_message);
906		else {
907			dbt = &conn->input.repmgr_msg.cntrl;
908			if (dbt->size > 0)
909				__os_free(env, dbt->data);
910			dbt = &conn->input.repmgr_msg.rec;
911			if (dbt->size > 0)
912				__os_free(env, dbt->data);
913		}
914	}
915	while (!STAILQ_EMPTY(&conn->outbound_queue)) {
916		out = STAILQ_FIRST(&conn->outbound_queue);
917		STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
918		msg = out->msg;
919		if (--msg->ref_count <= 0)
920			__os_free(env, msg);
921		__os_free(env, out);
922	}
923
924	ret = __repmgr_free_cond(&conn->drained);
925	__os_free(env, conn);
926	return (ret);
927}
928
929static int
930enqueue_msg(env, conn, msg, offset)
931	ENV *env;
932	REPMGR_CONNECTION *conn;
933	struct sending_msg *msg;
934	size_t offset;
935{
936	QUEUED_OUTPUT *q_element;
937	int ret;
938
939	if (msg->fmsg == NULL && ((ret = flatten(env, msg)) != 0))
940		return (ret);
941	if ((ret = __os_malloc(env, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
942		return (ret);
943	q_element->msg = msg->fmsg;
944	msg->fmsg->ref_count++;	/* encapsulation would be sweeter */
945	q_element->offset = offset;
946
947	/* Put it on the connection's outbound queue. */
948	STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
949	conn->out_queue_length++;
950	return (0);
951}
952
953/*
954 * Either "control" or "rec" (or both) may be NULL, in which case we treat it
955 * like a zero-length DBT.
956 */
957static void
958setup_sending_msg(msg, type, control, rec)
959	struct sending_msg *msg;
960	u_int type;
961	const DBT *control, *rec;
962{
963	u_int32_t control_size, rec_size;
964
965	/*
966	 * The wire protocol is documented in a comment at the top of this
967	 * module.
968	 */
969	__repmgr_iovec_init(&msg->iovecs);
970	msg->type = type;
971	__repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type));
972
973	control_size = control == NULL ? 0 : control->size;
974	msg->control_size_buf = htonl(control_size);
975	__repmgr_add_buffer(&msg->iovecs,
976	    &msg->control_size_buf, sizeof(msg->control_size_buf));
977
978	rec_size = rec == NULL ? 0 : rec->size;
979	msg->rec_size_buf = htonl(rec_size);
980	__repmgr_add_buffer(
981	    &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf));
982
983	if (control->size > 0)
984		__repmgr_add_dbt(&msg->iovecs, control);
985
986	if (rec_size > 0)
987		__repmgr_add_dbt(&msg->iovecs, rec);
988
989	msg->fmsg = NULL;
990}
991
992/*
993 * Convert a message stored as iovec pointers to various pieces, into flattened
994 * form, by copying all the pieces, and then make the iovec just point to the
995 * new simplified form.
996 */
997static int
998flatten(env, msg)
999	ENV *env;
1000	struct sending_msg *msg;
1001{
1002	u_int8_t *p;
1003	size_t msg_size;
1004	int i, ret;
1005
1006	DB_ASSERT(env, msg->fmsg == NULL);
1007
1008	msg_size = msg->iovecs.total_bytes;
1009	if ((ret = __os_malloc(env, sizeof(*msg->fmsg) + msg_size,
1010	    &msg->fmsg)) != 0)
1011		return (ret);
1012	msg->fmsg->length = msg_size;
1013	msg->fmsg->ref_count = 0;
1014	p = &msg->fmsg->data[0];
1015
1016	for (i = 0; i < msg->iovecs.count; i++) {
1017		memcpy(p, msg->iovecs.vectors[i].iov_base,
1018		    msg->iovecs.vectors[i].iov_len);
1019		p = &p[msg->iovecs.vectors[i].iov_len];
1020	}
1021	__repmgr_iovec_init(&msg->iovecs);
1022	__repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size);
1023	return (0);
1024}
1025
1026/*
1027 * PUBLIC: REPMGR_SITE *__repmgr_find_site __P((ENV *, const char *, u_int));
1028 */
1029REPMGR_SITE *
1030__repmgr_find_site(env, host, port)
1031	ENV *env;
1032	const char *host;
1033	u_int port;
1034{
1035	DB_REP *db_rep;
1036	REPMGR_SITE *site;
1037	u_int i;
1038
1039	db_rep = env->rep_handle;
1040	for (i = 0; i < db_rep->site_cnt; i++) {
1041		site = &db_rep->sites[i];
1042
1043		if (strcmp(site->net_addr.host, host) == 0 &&
1044		    site->net_addr.port == port)
1045			return (site);
1046	}
1047
1048	return (NULL);
1049}
1050
1051/*
1052 * Initialize the fields of a (given) netaddr structure, with the given values.
1053 * We copy the host name, but take ownership of the ADDRINFO buffer.
1054 *
1055 * All inputs are assumed to have been already validated.
1056 *
1057 * PUBLIC: int __repmgr_pack_netaddr __P((ENV *, const char *,
1058 * PUBLIC:     u_int, ADDRINFO *, repmgr_netaddr_t *));
1059 */
1060int
1061__repmgr_pack_netaddr(env, host, port, list, addr)
1062	ENV *env;
1063	const char *host;
1064	u_int port;
1065	ADDRINFO *list;
1066	repmgr_netaddr_t *addr;
1067{
1068	int ret;
1069
1070	DB_ASSERT(env, host != NULL);
1071
1072	if ((ret = __os_strdup(env, host, &addr->host)) != 0)
1073		return (ret);
1074	addr->port = (u_int16_t)port;
1075	addr->address_list = list;
1076	addr->current = NULL;
1077	return (0);
1078}
1079
1080/*
1081 * PUBLIC: int __repmgr_getaddr __P((ENV *,
1082 * PUBLIC:     const char *, u_int, int, ADDRINFO **));
1083 */
1084int
1085__repmgr_getaddr(env, host, port, flags, result)
1086	ENV *env;
1087	const char *host;
1088	u_int port;
1089	int flags;    /* Matches struct addrinfo declaration. */
1090	ADDRINFO **result;
1091{
1092	ADDRINFO *answer, hints;
1093	char buffer[10];		/* 2**16 fits in 5 digits. */
1094
1095	/*
1096	 * Ports are really 16-bit unsigned values, but it's too painful to
1097	 * push that type through the API.
1098	 */
1099	if (port > UINT16_MAX) {
1100		__db_errx(env, "port %u larger than max port %u",
1101		    port, UINT16_MAX);
1102		return (EINVAL);
1103	}
1104
1105	memset(&hints, 0, sizeof(hints));
1106	hints.ai_family = AF_UNSPEC;
1107	hints.ai_socktype = SOCK_STREAM;
1108	hints.ai_flags = flags;
1109	(void)snprintf(buffer, sizeof(buffer), "%u", port);
1110
1111	/*
1112	 * Although it's generally bad to discard error information, the return
1113	 * code from __os_getaddrinfo is undependable.  Our callers at least
1114	 * would like to be able to distinguish errors in getaddrinfo (which we
1115	 * want to consider to be re-tryable), from other failure (e.g., EINVAL,
1116	 * above).
1117	 */
1118	if (__os_getaddrinfo(env, host, port, buffer, &hints, &answer) != 0)
1119		return (DB_REP_UNAVAIL);
1120	*result = answer;
1121
1122	return (0);
1123}
1124
1125/*
1126 * Adds a new site to our array of known sites (unless it already exists),
1127 * and schedules it for immediate connection attempt.  Whether it exists or not,
1128 * we set newsitep, either to the already existing site, or to the newly created
1129 * site.  Unless newsitep is passed in as NULL, which is allowed.
1130 *
1131 * PUBLIC: int __repmgr_add_site
1132 * PUBLIC:     __P((ENV *, const char *, u_int, REPMGR_SITE **, u_int32_t));
1133 *
1134 * !!!
1135 * Caller is expected to hold db_rep->mutex on entry.
1136 */
1137int
1138__repmgr_add_site(env, host, port, sitep, flags)
1139	ENV *env;
1140	const char *host;
1141	u_int port;
1142	REPMGR_SITE **sitep;
1143	u_int32_t flags;
1144{
1145	int peer, state;
1146
1147	state = SITE_IDLE;
1148	peer = LF_ISSET(DB_REPMGR_PEER);
1149	return (__repmgr_add_site_int(env, host, port, sitep, peer, state));
1150}
1151
1152/*
1153 * PUBLIC: int __repmgr_add_site_int
1154 * PUBLIC:     __P((ENV *, const char *, u_int, REPMGR_SITE **, int, int));
1155 */
1156int
1157__repmgr_add_site_int(env, host, port, sitep, peer, state)
1158	ENV *env;
1159	const char *host;
1160	u_int port;
1161	REPMGR_SITE **sitep;
1162	int peer, state;
1163{
1164	DB_REP *db_rep;
1165	REP *rep;
1166	DB_THREAD_INFO *ip;
1167	REPMGR_SITE *site;
1168	u_int base;
1169	int eid, locked, pre_exist, ret, t_ret;
1170
1171	db_rep = env->rep_handle;
1172	rep = db_rep->region;
1173	COMPQUIET(site, NULL);
1174	COMPQUIET(pre_exist, 0);
1175	eid = DB_EID_INVALID;
1176
1177	/* Make sure we're up to date before adding to our local list. */
1178	ENV_ENTER(env, ip);
1179	MUTEX_LOCK(env, rep->mtx_repmgr);
1180	locked = TRUE;
1181	base = db_rep->site_cnt;
1182	if ((ret = __repmgr_copy_in_added_sites(env)) != 0)
1183		goto out;
1184
1185	/* Once we're this far, we're committed to doing init_new_sites. */
1186
1187	/* If it's still not found, now it's safe to add it. */
1188	if ((site = __repmgr_find_site(env, host, port)) == NULL) {
1189		pre_exist = FALSE;
1190
1191		/*
1192		 * Store both locally and in shared region.
1193		 */
1194		if ((ret = __repmgr_new_site(env,
1195		    &site, host, port, state)) != 0)
1196			goto init;
1197		eid = EID_FROM_SITE(site);
1198		DB_ASSERT(env, (u_int)eid == db_rep->site_cnt - 1);
1199
1200		if ((ret = __repmgr_share_netaddrs(env,
1201		    rep, (u_int)eid, db_rep->site_cnt)) != 0) {
1202			/*
1203			 * Rescind the added local slot.
1204			 */
1205			db_rep->site_cnt--;
1206			__repmgr_cleanup_netaddr(env, &site->net_addr);
1207		}
1208	} else {
1209		pre_exist = TRUE;
1210		eid = EID_FROM_SITE(site);
1211	}
1212
1213	/*
1214	 * Bump sequence count explicitly, to cover the pre-exist case.  In the
1215	 * other case it's wasted, but that's not worth worrying about.
1216	 */
1217	if (peer) {
1218		db_rep->peer = rep->peer = EID_FROM_SITE(site);
1219		db_rep->siteaddr_seq = ++rep->siteaddr_seq;
1220	}
1221
1222init:
1223	MUTEX_UNLOCK(env, rep->mtx_repmgr);
1224	locked = FALSE;
1225
1226	/*
1227	 * Initialize all new sites (including the ones we snarfed via
1228	 * copy_in_added_sites), even if it doesn't include a pre_existing one.
1229	 * But if the new one is already connected, it doesn't need this
1230	 * initialization, so skip over that one (which we accomplish by making
1231	 * two calls with sub-ranges).
1232	 */
1233	if (state != SITE_CONNECTED || eid == DB_EID_INVALID)
1234		t_ret = __repmgr_init_new_sites(env, base, db_rep->site_cnt);
1235	else
1236		if ((t_ret = __repmgr_init_new_sites(env,
1237		    base, (u_int)eid)) == 0)
1238			t_ret = __repmgr_init_new_sites(env,
1239			    (u_int)(eid+1), db_rep->site_cnt);
1240	if (t_ret != 0 && ret == 0)
1241		ret = t_ret;
1242
1243out:
1244	if (locked)
1245		MUTEX_UNLOCK(env, rep->mtx_repmgr);
1246	ENV_LEAVE(env, ip);
1247	if (ret == 0) {
1248		if (sitep != NULL)
1249			*sitep = site;
1250		if (pre_exist)
1251			ret = EEXIST;
1252	}
1253	return (ret);
1254}
1255
1256/*
1257 * Initialize a socket for listening.  Sets a file descriptor for the socket,
1258 * ready for an accept() call in a thread that we're happy to let block.
1259 *
1260 * PUBLIC:  int __repmgr_listen __P((ENV *));
1261 */
1262int
1263__repmgr_listen(env)
1264	ENV *env;
1265{
1266	ADDRINFO *ai;
1267	DB_REP *db_rep;
1268	char *why;
1269	int sockopt, ret;
1270	socket_t s;
1271
1272	db_rep = env->rep_handle;
1273
1274	/* Use OOB value as sentinel to show no socket open. */
1275	s = INVALID_SOCKET;
1276
1277	if ((ai = ADDR_LIST_FIRST(&db_rep->my_addr)) == NULL) {
1278		if ((ret = __repmgr_getaddr(env, db_rep->my_addr.host,
1279		    db_rep->my_addr.port, AI_PASSIVE, &ai)) == 0)
1280			ADDR_LIST_INIT(&db_rep->my_addr, ai);
1281		else
1282			return (ret);
1283	}
1284
1285	/*
1286	 * Given the assert is correct, we execute the loop at least once, which
1287	 * means 'why' will have been set by the time it's needed.  But of
1288	 * course lint doesn't know about DB_ASSERT.
1289	 */
1290	COMPQUIET(why, "");
1291	DB_ASSERT(env, ai != NULL);
1292	for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) {
1293
1294		if ((s = socket(ai->ai_family,
1295		    ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
1296			why = "can't create listen socket";
1297			continue;
1298		}
1299
1300		/*
1301		 * When testing, it's common to kill and restart regularly.  On
1302		 * some systems, this causes bind to fail with "address in use"
1303		 * errors unless this option is set.
1304		 */
1305		sockopt = 1;
1306		if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
1307		    sizeof(sockopt)) != 0) {
1308			why = "can't set REUSEADDR socket option";
1309			break;
1310		}
1311
1312		if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
1313			why = "can't bind socket to listening address";
1314			(void)closesocket(s);
1315			s = INVALID_SOCKET;
1316			continue;
1317		}
1318
1319		if (listen(s, 5) != 0) {
1320			why = "listen()";
1321			break;
1322		}
1323
1324		if ((ret = __repmgr_set_nonblocking(s)) != 0) {
1325			__db_err(env, ret, "can't unblock listen socket");
1326			goto clean;
1327		}
1328
1329		db_rep->listen_fd = s;
1330		return (0);
1331	}
1332
1333	ret = net_errno;
1334	__db_err(env, ret, why);
1335clean:	if (s != INVALID_SOCKET)
1336		(void)closesocket(s);
1337	return (ret);
1338}
1339
1340/*
1341 * PUBLIC: int __repmgr_net_close __P((ENV *));
1342 */
1343int
1344__repmgr_net_close(env)
1345	ENV *env;
1346{
1347	DB_REP *db_rep;
1348	REP *rep;
1349	int ret;
1350
1351	db_rep = env->rep_handle;
1352	rep = db_rep->region;
1353
1354	ret = __repmgr_each_connection(env, final_cleanup, NULL, FALSE);
1355
1356	if (db_rep->listen_fd != INVALID_SOCKET) {
1357		if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0)
1358			ret = net_errno;
1359		db_rep->listen_fd = INVALID_SOCKET;
1360		rep->listener = 0;
1361	}
1362	return (ret);
1363}
1364
1365static int
1366final_cleanup(env, conn, unused)
1367	ENV *env;
1368	REPMGR_CONNECTION *conn;
1369	void *unused;
1370{
1371	int ret, t_ret;
1372
1373	COMPQUIET(unused, NULL);
1374
1375	remove_connection(env, conn);
1376	ret = __repmgr_close_connection(env, conn);
1377	if ((t_ret = __repmgr_destroy_connection(env, conn)) != 0 && ret == 0)
1378		ret = t_ret;
1379	return (ret);
1380}
1381
1382/*
1383 * PUBLIC: void __repmgr_net_destroy __P((ENV *, DB_REP *));
1384 */
1385void
1386__repmgr_net_destroy(env, db_rep)
1387	ENV *env;
1388	DB_REP *db_rep;
1389{
1390	REPMGR_RETRY *retry;
1391	REPMGR_SITE *site;
1392	u_int i;
1393
1394	__repmgr_cleanup_netaddr(env, &db_rep->my_addr);
1395
1396	if (db_rep->sites == NULL)
1397		return;
1398
1399	while (!TAILQ_EMPTY(&db_rep->retries)) {
1400		retry = TAILQ_FIRST(&db_rep->retries);
1401		TAILQ_REMOVE(&db_rep->retries, retry, entries);
1402		__os_free(env, retry);
1403	}
1404
1405	DB_ASSERT(env, TAILQ_EMPTY(&db_rep->connections));
1406
1407	for (i = 0; i < db_rep->site_cnt; i++) {
1408		site = &db_rep->sites[i];
1409		DB_ASSERT(env, TAILQ_EMPTY(&site->sub_conns));
1410		__repmgr_cleanup_netaddr(env, &site->net_addr);
1411	}
1412	__os_free(env, db_rep->sites);
1413	db_rep->sites = NULL;
1414}
1415