1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr_net.c,v 1.70 2008/03/13 17:31:28 mbrey Exp $
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13#include "dbinc/mp.h"
14
15/*
16 * The functions in this module implement a simple wire protocol for
17 * transmitting messages, both replication messages and our own internal control
18 * messages.  The protocol is as follows:
19 *
20 *      1 byte          - message type  (defined in repmgr.h)
21 *      4 bytes         - size of control
22 *      4 bytes         - size of rec
23 *      ? bytes         - control
24 *      ? bytes         - rec
25 *
26 * where both sizes are 32-bit binary integers in network byte order.
27 * Either control or rec can have zero length, but even in this case the
28 * 4-byte length will be present.
29 *     Putting both lengths right up at the front allows us to read in fewer
30 * phases, and allows us to allocate buffer space for both parts (plus a wrapper
31 * struct) at once.
32 */
33
34/*
35 * In sending a message, we first try to send it in-line, in the sending thread,
36 * and without first copying the message, by using scatter/gather I/O, using
37 * iovecs to point to the various pieces of the message.  If that all works
38 * without blocking, that's optimal.
39 *     If we find that, for a particular connection, we can't send without
40 * blocking, then we must copy the message for sending later in the select()
41 * thread.  In the course of doing that, we might as well "flatten" the message,
42 * forming one single buffer, to simplify life.  Not only that, once we've gone
43 * to the trouble of doing that, other sites to which we also want to send the
44 * message (in the case of a broadcast), may as well take advantage of the
45 * simplified structure also.
46 *     This structure holds it all.  Note that this structure, and the
47 * "flat_msg" structure, are allocated separately, because (1) the flat_msg
48 * version is usually not needed; and (2) when it is needed, it will need to
49 * live longer than the wrapping sending_msg structure.
50 *     Note that, for the broadcast case, where we're going to use this
51 * repeatedly, the iovecs is a template that must be copied, since in normal use
52 * the iovecs pointers and lengths get adjusted after every partial write.
53 */
54struct sending_msg {
55	REPMGR_IOVECS iovecs;
56	u_int8_t type;
57	u_int32_t control_size_buf, rec_size_buf;
58	REPMGR_FLAT *fmsg;
59};
60
61static int __repmgr_close_connection __P((ENV *, REPMGR_CONNECTION *));
62static int __repmgr_destroy_connection __P((ENV *, REPMGR_CONNECTION *));
63static void setup_sending_msg
64    __P((struct sending_msg *, u_int, const DBT *, const DBT *));
65static int __repmgr_send_internal
66    __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
67static int enqueue_msg
68    __P((ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
69static int flatten __P((ENV *, struct sending_msg *));
70static REPMGR_SITE *__repmgr_available_site __P((ENV *, int));
71
72/*
73 * __repmgr_send --
74 *	The send function for DB_ENV->rep_set_transport.
75 *
76 * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
77 * PUBLIC:     const DB_LSN *, int, u_int32_t));
78 */
79int
80__repmgr_send(dbenv, control, rec, lsnp, eid, flags)
81	DB_ENV *dbenv;
82	const DBT *control, *rec;
83	const DB_LSN *lsnp;
84	int eid;
85	u_int32_t flags;
86{
87	DB_REP *db_rep;
88	ENV *env;
89	REPMGR_CONNECTION *conn;
90	REPMGR_SITE *site;
91	u_int available, nclients, needed, npeers_sent, nsites_sent;
92	int ret, t_ret;
93
94	env = dbenv->env;
95	db_rep = env->rep_handle;
96
97	LOCK_MUTEX(db_rep->mutex);
98	if (eid == DB_EID_BROADCAST) {
99		if ((ret = __repmgr_send_broadcast(env, REPMGR_REP_MESSAGE,
100		    control, rec, &nsites_sent, &npeers_sent)) != 0)
101			goto out;
102	} else {
103		/*
104		 * If this is a request that can be sent anywhere, then see if
105		 * we can send it to our peer (to save load on the master), but
106		 * not if it's a rerequest, 'cuz that likely means we tried this
107		 * already and failed.
108		 */
109		if ((flags & (DB_REP_ANYWHERE | DB_REP_REREQUEST)) ==
110		    DB_REP_ANYWHERE &&
111		    IS_VALID_EID(db_rep->peer) &&
112		    (site = __repmgr_available_site(env, db_rep->peer)) !=
113		    NULL) {
114			RPRINT(env, DB_VERB_REPMGR_MISC,
115			    (env, "sending request to peer"));
116		} else if ((site = __repmgr_available_site(env, eid)) ==
117		    NULL) {
118			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
119			    "ignoring message sent to unavailable site"));
120			ret = DB_REP_UNAVAIL;
121			goto out;
122		}
123
124		conn = site->ref.conn;
125		/* Pass the "blockable" argument as TRUE. */
126		if ((ret = __repmgr_send_one(env, conn, REPMGR_REP_MESSAGE,
127		    control, rec, TRUE)) == DB_REP_UNAVAIL &&
128		    (t_ret = __repmgr_bust_connection(env, conn)) != 0)
129			ret = t_ret;
130		if (ret != 0)
131			goto out;
132
133		nsites_sent = 1;
134		npeers_sent = site->priority > 0 ? 1 : 0;
135	}
136	/*
137	 * Right now, nsites and npeers represent the (maximum) number of sites
138	 * we've attempted to begin sending the message to.  Of course we
139	 * haven't really received any ack's yet.  But since we've only sent to
140	 * nsites/npeers other sites, that's the maximum number of ack's we
141	 * could possibly expect.  If even that number fails to satisfy our PERM
142	 * policy, there's no point waiting for something that will never
143	 * happen.
144	 */
145	if (LF_ISSET(DB_REP_PERMANENT)) {
146		/* Number of sites in the group besides myself. */
147		nclients = __repmgr_get_nsites(db_rep) - 1;
148
149		switch (db_rep->perm_policy) {
150		case DB_REPMGR_ACKS_NONE:
151			needed = 0;
152			COMPQUIET(available, 0);
153			break;
154
155		case DB_REPMGR_ACKS_ONE:
156			needed = 1;
157			available = nsites_sent;
158			break;
159
160		case DB_REPMGR_ACKS_ALL:
161			/* Number of sites in the group besides myself. */
162			needed = nclients;
163			available = nsites_sent;
164			break;
165
166		case DB_REPMGR_ACKS_ONE_PEER:
167			needed = 1;
168			available = npeers_sent;
169			break;
170
171		case DB_REPMGR_ACKS_ALL_PEERS:
172			/*
173			 * Too hard to figure out "needed", since we're not
174			 * keeping track of how many peers we have; so just skip
175			 * the optimization in this case.
176			 */
177			needed = 1;
178			available = npeers_sent;
179			break;
180
181		case DB_REPMGR_ACKS_QUORUM:
182			/*
183			 * The minimum number of acks necessary to ensure that
184			 * the transaction is durable if an election is held.
185			 * (See note below at __repmgr_is_permanent, regarding
186			 * the surprising inter-relationship between
187			 * 2SITE_STRICT and QUORUM.)
188			 */
189			if (nclients > 1 ||
190			    FLD_ISSET(db_rep->region->config,
191			    REP_C_2SITE_STRICT))
192				needed = nclients / 2;
193			else
194				needed = 1;
195			available = npeers_sent;
196			break;
197
198		default:
199			COMPQUIET(available, 0);
200			COMPQUIET(needed, 0);
201			(void)__db_unknown_path(env, "__repmgr_send");
202			break;
203		}
204		if (needed == 0)
205			goto out;
206		if (available < needed) {
207			ret = DB_REP_UNAVAIL;
208			goto out;
209		}
210		/* In ALL_PEERS case, display of "needed" might be confusing. */
211		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
212		    "will await acknowledgement: need %u", needed));
213		ret = __repmgr_await_ack(env, lsnp);
214	}
215
216out:	UNLOCK_MUTEX(db_rep->mutex);
217	if (ret != 0 && LF_ISSET(DB_REP_PERMANENT)) {
218		STAT(db_rep->region->mstat.st_perm_failed++);
219		DB_EVENT(env, DB_EVENT_REP_PERM_FAILED, NULL);
220	}
221	return (ret);
222}
223
224static REPMGR_SITE *
225__repmgr_available_site(env, eid)
226	ENV *env;
227	int eid;
228{
229	DB_REP *db_rep;
230	REPMGR_SITE *site;
231
232	db_rep = env->rep_handle;
233	site = SITE_FROM_EID(eid);
234	if (site->state != SITE_CONNECTED)
235		return (NULL);
236
237	if (site->ref.conn->state == CONN_READY)
238		return (site);
239	return (NULL);
240}
241
242/*
243 * Sends message to all sites with which we currently have an active
244 * connection.  Sets result parameters according to how many sites we attempted
245 * to begin sending to, even if we did nothing more than queue it for later
246 * delivery.
247 *
248 * !!!
249 * Caller must hold env->mutex.
250 * PUBLIC: int __repmgr_send_broadcast __P((ENV *, u_int,
251 * PUBLIC:    const DBT *, const DBT *, u_int *, u_int *));
252 */
253int
254__repmgr_send_broadcast(env, type, control, rec, nsitesp, npeersp)
255	ENV *env;
256	u_int type;
257	const DBT *control, *rec;
258	u_int *nsitesp, *npeersp;
259{
260	DB_REP *db_rep;
261	struct sending_msg msg;
262	REPMGR_CONNECTION *conn;
263	REPMGR_SITE *site;
264	u_int nsites, npeers;
265	int ret;
266
267	static const u_int version_max_msg_type[] = {
268		0, REPMGR_MAX_V1_MSG_TYPE, REPMGR_MAX_V2_MSG_TYPE
269	};
270
271	db_rep = env->rep_handle;
272
273	/*
274	 * Sending a broadcast is quick, because we allow no blocking.  So it
275	 * shouldn't much matter.  But just in case, take the timestamp before
276	 * sending, so that if anything we err on the side of keeping clients
277	 * placated (i.e., possibly sending a heartbeat slightly more frequently
278	 * than necessary).
279	 */
280	__os_gettime(env, &db_rep->last_bcast, 1);
281
282	setup_sending_msg(&msg, type, control, rec);
283	nsites = npeers = 0;
284
285	/*
286	 * Traverse the connections list.  Here, even in bust_connection, we
287	 * don't unlink the current list entry, so we can use the TAILQ_FOREACH
288	 * macro.
289	 */
290	TAILQ_FOREACH(conn, &db_rep->connections, entries) {
291		if (conn->state != CONN_READY)
292			continue;
293		DB_ASSERT(env, IS_VALID_EID(conn->eid) &&
294		    conn->version > 0 &&
295		    conn->version <= DB_REPMGR_VERSION);
296
297		/*
298		 * Skip if the type of message we're sending is beyond the range
299		 * of known message types for this connection's version.
300		 *
301		 * !!!
302		 * Don't be misled by the apparent generality of this simple
303		 * test.  It works currently, because the only kinds of messages
304		 * that we broadcast are REP_MESSAGE and HEARTBEAT.  But in the
305		 * future other kinds of messages might require more intricate
306		 * per-connection-version customization (for example,
307		 * per-version message format conversion, addition of new
308		 * fields, etc.).
309		 */
310		if (type > version_max_msg_type[conn->version])
311			continue;
312
313		/*
314		 * Broadcast messages are either application threads committing
315		 * transactions, or replication status message that we can
316		 * afford to lose.  So don't allow blocking for them (pass
317		 * "blockable" argument as FALSE).
318		 */
319		if ((ret = __repmgr_send_internal(env,
320		    conn, &msg, FALSE)) == 0) {
321			site = SITE_FROM_EID(conn->eid);
322			nsites++;
323			if (site->priority > 0)
324				npeers++;
325		} else if (ret == DB_REP_UNAVAIL) {
326			if ((ret = __repmgr_bust_connection(env, conn)) != 0)
327				return (ret);
328		} else
329			return (ret);
330	}
331
332	*nsitesp = nsites;
333	*npeersp = npeers;
334	return (0);
335}
336
337/*
338 * __repmgr_send_one --
339 *	Send a message to a site, or if you can't just yet, make a copy of it
340 * and arrange to have it sent later.  'rec' may be NULL, in which case we send
341 * a zero length and no data.
342 *
343 * If we get an error, we take care of cleaning up the connection (calling
344 * __repmgr_bust_connection()), so that the caller needn't do so.
345 *
346 * !!!
347 * Note that the mutex should be held through this call.
348 * It doubles as a synchronizer to make sure that two threads don't
349 * intersperse writes that are part of two single messages.
350 *
351 * PUBLIC: int __repmgr_send_one __P((ENV *, REPMGR_CONNECTION *,
352 * PUBLIC:    u_int, const DBT *, const DBT *, int));
353 */
354int
355__repmgr_send_one(env, conn, msg_type, control, rec, blockable)
356	ENV *env;
357	REPMGR_CONNECTION *conn;
358	u_int msg_type;
359	const DBT *control, *rec;
360	int blockable;
361{
362	struct sending_msg msg;
363
364	setup_sending_msg(&msg, msg_type, control, rec);
365	return (__repmgr_send_internal(env, conn, &msg, blockable));
366}
367
368/*
369 * Attempts a "best effort" to send a message on the given site.  If there is an
370 * excessive backlog of message already queued on the connection, what shall we
371 * do?  If the caller doesn't mind blocking, we'll wait (a limited amount of
372 * time) for the queue to drain.  Otherwise we'll simply drop the message.  This
373 * is always allowed by the replication protocol.  But in the case of a
374 * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
375 * almost always get a flood of messages that instantly fills our queue, so
376 * blocking improves performance (by avoiding the need for the client to
377 * re-request).
378 *
379 * How long shall we wait?  We could of course create a new timeout
380 * configuration type, so that the application could set it directly.  But that
381 * would start to overwhelm the user with too many choices to think about.  We
382 * already have an ACK timeout, which is the user's estimate of how long it
383 * should take to send a message to the client, have it be processed, and return
384 * a message back to us.  We multiply that by the queue size, because that's how
385 * many messages have to be swallowed up by the client before we're able to
386 * start sending again (at least to a rough approximation).
387 */
388static int
389__repmgr_send_internal(env, conn, msg, blockable)
390	ENV *env;
391	REPMGR_CONNECTION *conn;
392	struct sending_msg *msg;
393	int blockable;
394{
395	DB_REP *db_rep;
396	REPMGR_IOVECS iovecs;
397	SITE_STRING_BUFFER buffer;
398	db_timeout_t drain_to;
399	int ret;
400	size_t nw;
401	size_t total_written;
402
403	db_rep = env->rep_handle;
404
405	DB_ASSERT(env,
406	    conn->state != CONN_CONNECTING && conn->state != CONN_DEFUNCT);
407	if (!STAILQ_EMPTY(&conn->outbound_queue)) {
408		/*
409		 * Output to this site is currently owned by the select()
410		 * thread, so we can't try sending in-line here.  We can only
411		 * queue the msg for later.
412		 */
413		RPRINT(env, DB_VERB_REPMGR_MISC,
414		    (env, "msg to %s to be queued",
415		    __repmgr_format_eid_loc(env->rep_handle,
416		    conn->eid, buffer)));
417		if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
418		    blockable && conn->state != CONN_CONGESTED) {
419			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
420			    "block msg thread, await queue space"));
421
422			if ((drain_to = db_rep->ack_timeout) == 0)
423				drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
424			RPRINT(env, DB_VERB_REPMGR_MISC,
425			    (env, "will await drain"));
426			conn->blockers++;
427			ret = __repmgr_await_drain(env,
428			    conn, drain_to * OUT_QUEUE_LIMIT);
429			conn->blockers--;
430			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
431			    "drain returned %d (%d,%d)", ret,
432			    db_rep->finished, conn->out_queue_length));
433			if (db_rep->finished)
434				return (DB_TIMEOUT);
435			if (ret != 0)
436				return (ret);
437			if (STAILQ_EMPTY(&conn->outbound_queue))
438				goto empty;
439		}
440		if (conn->out_queue_length < OUT_QUEUE_LIMIT)
441			return (enqueue_msg(env, conn, msg, 0));
442		else {
443			RPRINT(env, DB_VERB_REPMGR_MISC,
444			    (env, "queue limit exceeded"));
445			STAT(env->rep_handle->
446			    region->mstat.st_msgs_dropped++);
447			return (blockable ? DB_TIMEOUT : 0);
448		}
449	}
450empty:
451
452	/*
453	 * Send as much data to the site as we can, without blocking.  Keep
454	 * writing as long as we're making some progress.  Make a scratch copy
455	 * of iovecs for our use, since we destroy it in the process of
456	 * adjusting pointers after each partial I/O.
457	 */
458	memcpy(&iovecs, &msg->iovecs, sizeof(iovecs));
459	total_written = 0;
460	while ((ret = __repmgr_writev(conn->fd, &iovecs.vectors[iovecs.offset],
461	    iovecs.count-iovecs.offset, &nw)) == 0) {
462		total_written += nw;
463		if (__repmgr_update_consumed(&iovecs, nw)) /* all written */
464			return (0);
465	}
466
467	if (ret != WOULDBLOCK) {
468		__db_err(env, ret, "socket writing failure");
469		return (DB_REP_UNAVAIL);
470	}
471
472	RPRINT(env, DB_VERB_REPMGR_MISC, (env, "wrote only %lu bytes to %s",
473	    (u_long)total_written,
474	    __repmgr_format_eid_loc(env->rep_handle, conn->eid, buffer)));
475	/*
476	 * We can't send any more without blocking: queue (a pointer to) a
477	 * "flattened" copy of the message, so that the select() thread will
478	 * finish sending it later.
479	 */
480	if ((ret = enqueue_msg(env, conn, msg, total_written)) != 0)
481		return (ret);
482
483	STAT(env->rep_handle->region->mstat.st_msgs_queued++);
484
485	/*
486	 * Wake the main select thread so that it can discover that it has
487	 * received ownership of this connection.  Note that we didn't have to
488	 * do this in the previous case (above), because the non-empty queue
489	 * implies that the select() thread is already managing ownership of
490	 * this connection.
491	 */
492#ifdef DB_WIN32
493	if (WSAEventSelect(conn->fd, conn->event_object,
494	    FD_READ|FD_WRITE|FD_CLOSE) == SOCKET_ERROR) {
495		ret = net_errno;
496		__db_err(env, ret, "can't add FD_WRITE event bit");
497		return (ret);
498	}
499#endif
500	return (__repmgr_wake_main_thread(env));
501}
502
503/*
504 * PUBLIC: int __repmgr_is_permanent __P((ENV *, const DB_LSN *));
505 *
506 * Count up how many sites have ack'ed the given LSN.  Returns TRUE if enough
507 * sites have ack'ed; FALSE otherwise.
508 *
509 * !!!
510 * Caller must hold the mutex.
511 */
512int
513__repmgr_is_permanent(env, lsnp)
514	ENV *env;
515	const DB_LSN *lsnp;
516{
517	DB_REP *db_rep;
518	REPMGR_SITE *site;
519	u_int eid, nsites, npeers;
520	int is_perm, has_missing_peer;
521
522	db_rep = env->rep_handle;
523
524	if (db_rep->perm_policy == DB_REPMGR_ACKS_NONE)
525		return (TRUE);
526
527	nsites = npeers = 0;
528	has_missing_peer = FALSE;
529	for (eid = 0; eid < db_rep->site_cnt; eid++) {
530		site = SITE_FROM_EID(eid);
531		if (!F_ISSET(site, SITE_HAS_PRIO)) {
532			/*
533			 * Never connected to this site: since we can't know
534			 * whether it's a peer, assume the worst.
535			 */
536			has_missing_peer = TRUE;
537			continue;
538		}
539
540		if (log_compare(&site->max_ack, lsnp) >= 0) {
541			nsites++;
542			if (site->priority > 0)
543				npeers++;
544		} else {
545			/* This site hasn't ack'ed the message. */
546			if (site->priority > 0)
547				has_missing_peer = TRUE;
548		}
549	}
550
551	switch (db_rep->perm_policy) {
552	case DB_REPMGR_ACKS_ONE:
553		is_perm = (nsites >= 1);
554		break;
555	case DB_REPMGR_ACKS_ONE_PEER:
556		is_perm = (npeers >= 1);
557		break;
558	case DB_REPMGR_ACKS_QUORUM:
559		/*
560		 * The minimum number of acks necessary to ensure that the
561		 * transaction is durable if an election is held (given that we
562		 * always conduct elections according to the standard,
563		 * recommended practice of requiring votes from a majority of
564		 * sites).
565		 */
566		if (__repmgr_get_nsites(db_rep) == 2 &&
567		    !FLD_ISSET(db_rep->region->config, REP_C_2SITE_STRICT)) {
568			/*
569			 * Unless instructed otherwise, our special handling for
570			 * 2-site groups means that a client that loses contact
571			 * with the master elects itself master (even though
572			 * that doesn't constitute a majority).  In order to
573			 * provide the expected guarantee implied by the
574			 * definition of "quorum" we have to fudge the ack
575			 * calculation in this case: specifically, we need to
576			 * make sure that the client has received it in order
577			 * for us to consider it "perm".
578			 *
579			 * Note that turning the usual strict behavior back on
580			 * in a 2-site group results in "0" as the number of
581			 * clients needed to ack a txn in order for it to have
582			 * arrived at a quorum.  This is the correct result,
583			 * strange as it may seem!  This may well mean that in a
584			 * 2-site group the QUORUM policy is rarely the right
585			 * choice.
586			 */
587			is_perm = (npeers >= 1);
588		} else
589			is_perm = (npeers >= (__repmgr_get_nsites(db_rep)-1)/2);
590		break;
591	case DB_REPMGR_ACKS_ALL:
592		/* Adjust by 1, since get_nsites includes local site. */
593		is_perm = (nsites >= __repmgr_get_nsites(db_rep) - 1);
594		break;
595	case DB_REPMGR_ACKS_ALL_PEERS:
596		if (db_rep->site_cnt < __repmgr_get_nsites(db_rep) - 1) {
597			/* Assume missing site might be a peer. */
598			has_missing_peer = TRUE;
599		}
600		is_perm = !has_missing_peer;
601		break;
602	default:
603		is_perm = FALSE;
604		(void)__db_unknown_path(env, "__repmgr_is_permanent");
605	}
606	return (is_perm);
607}
608
609/*
610 * Abandons a connection, to recover from an error.  Takes necessary recovery
611 * action.  Note that we don't actually close and clean up the connection here;
612 * that happens later, in the select() thread main loop.  See the definition of
613 * DISABLE_CONNECTION (repmgr.h) for more discussion.
614 *
615 * PUBLIC: int __repmgr_bust_connection __P((ENV *,
616 * PUBLIC:     REPMGR_CONNECTION *));
617 *
618 * !!!
619 * Caller holds mutex.
620 */
621int
622__repmgr_bust_connection(env, conn)
623	ENV *env;
624	REPMGR_CONNECTION *conn;
625{
626	DB_REP *db_rep;
627	int connecting, ret, eid;
628
629	db_rep = env->rep_handle;
630	ret = 0;
631
632	eid = conn->eid;
633	connecting = (conn->state == CONN_CONNECTING);
634
635	DISABLE_CONNECTION(conn);
636
637	/*
638	 * When we first accepted the incoming connection, we set conn->eid to
639	 * -1 to indicate that we didn't yet know what site it might be from.
640	 * If we then get here because we later decide it was a redundant
641	 * connection, the following scary stuff will correctly not happen.
642	 */
643	if (IS_VALID_EID(eid)) {
644		/* schedule_connection_attempt wakes the main thread. */
645		if ((ret = __repmgr_schedule_connection_attempt(
646		    env, (u_int)eid, FALSE)) != 0)
647			return (ret);
648
649		/*
650		 * If this connection had gotten no further than the CONNECTING
651		 * state, this can't count as a loss of connection to the
652		 * master.
653		 */
654		if (!connecting && eid == db_rep->master_eid) {
655			(void)__memp_set_config(
656			    env->dbenv, DB_MEMP_SYNC_INTERRUPT, 1);
657			if ((ret = __repmgr_init_election(
658			    env, ELECT_FAILURE_ELECTION)) != 0)
659				return (ret);
660		}
661	} else {
662		/*
663		 * One way or another, make sure the main thread is poked, so
664		 * that we do the deferred clean-up.
665		 */
666		ret = __repmgr_wake_main_thread(env);
667	}
668	return (ret);
669}
670
671/*
672 * PUBLIC: int __repmgr_cleanup_connection
673 * PUBLIC:    __P((ENV *, REPMGR_CONNECTION *));
674 *
675 * !!!
676 * Idempotent.  This can be called repeatedly as blocking message threads (of
677 * which there could be multiples) wake up in case of error on the connection.
678 */
679int
680__repmgr_cleanup_connection(env, conn)
681	ENV *env;
682	REPMGR_CONNECTION *conn;
683{
684	DB_REP *db_rep;
685	int ret;
686
687	db_rep = env->rep_handle;
688
689	if ((ret = __repmgr_close_connection(env, conn)) != 0)
690		goto out;
691
692	/*
693	 * If there's a blocked message thread waiting, we mustn't yank the
694	 * connection struct out from under it.  Instead, just wake it up.
695	 * We'll get another chance to come back through here soon.
696	 */
697	if (conn->blockers > 0) {
698		ret = __repmgr_signal(&conn->drained);
699		goto out;
700	}
701
702	TAILQ_REMOVE(&db_rep->connections, conn, entries);
703
704	ret = __repmgr_destroy_connection(env, conn);
705
706out:
707	return (ret);
708}
709
710static int
711__repmgr_close_connection(env, conn)
712	ENV *env;
713	REPMGR_CONNECTION *conn;
714{
715	int ret;
716
717	DB_ASSERT(env,
718	    conn->state == CONN_DEFUNCT || env->rep_handle->finished);
719
720	ret = 0;
721	if (conn->fd != INVALID_SOCKET) {
722		ret = closesocket(conn->fd);
723		conn->fd = INVALID_SOCKET;
724		if (ret == SOCKET_ERROR) {
725			ret = net_errno;
726			__db_err(env, ret, "closing socket");
727		}
728#ifdef DB_WIN32
729		if (!WSACloseEvent(conn->event_object) && ret == 0)
730			ret = net_errno;
731#endif
732	}
733	return (ret);
734}
735
736static int
737__repmgr_destroy_connection(env, conn)
738	ENV *env;
739	REPMGR_CONNECTION *conn;
740{
741	QUEUED_OUTPUT *out;
742	REPMGR_FLAT *msg;
743	DBT *dbt;
744	int ret;
745
746	/*
747	 * Deallocate any input and output buffers we may have.
748	 */
749	if (conn->reading_phase == DATA_PHASE) {
750		if (conn->msg_type == REPMGR_REP_MESSAGE)
751			__os_free(env, conn->input.rep_message);
752		else {
753			dbt = &conn->input.repmgr_msg.cntrl;
754			if (dbt->size > 0)
755				__os_free(env, dbt->data);
756			dbt = &conn->input.repmgr_msg.rec;
757			if (dbt->size > 0)
758				__os_free(env, dbt->data);
759		}
760	}
761	while (!STAILQ_EMPTY(&conn->outbound_queue)) {
762		out = STAILQ_FIRST(&conn->outbound_queue);
763		STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
764		msg = out->msg;
765		if (--msg->ref_count <= 0)
766			__os_free(env, msg);
767		__os_free(env, out);
768	}
769
770	ret = __repmgr_free_cond(&conn->drained);
771	__os_free(env, conn);
772	return (ret);
773}
774
775static int
776enqueue_msg(env, conn, msg, offset)
777	ENV *env;
778	REPMGR_CONNECTION *conn;
779	struct sending_msg *msg;
780	size_t offset;
781{
782	QUEUED_OUTPUT *q_element;
783	int ret;
784
785	if (msg->fmsg == NULL && ((ret = flatten(env, msg)) != 0))
786		return (ret);
787	if ((ret = __os_malloc(env, sizeof(QUEUED_OUTPUT), &q_element)) != 0)
788		return (ret);
789	q_element->msg = msg->fmsg;
790	msg->fmsg->ref_count++;	/* encapsulation would be sweeter */
791	q_element->offset = offset;
792
793	/* Put it on the connection's outbound queue. */
794	STAILQ_INSERT_TAIL(&conn->outbound_queue, q_element, entries);
795	conn->out_queue_length++;
796	return (0);
797}
798
799/*
800 * Either "control" or "rec" (or both) may be NULL, in which case we treat it
801 * like a zero-length DBT.
802 */
803static void
804setup_sending_msg(msg, type, control, rec)
805	struct sending_msg *msg;
806	u_int type;
807	const DBT *control, *rec;
808{
809	u_int32_t control_size, rec_size;
810
811	/*
812	 * The wire protocol is documented in a comment at the top of this
813	 * module.
814	 */
815	__repmgr_iovec_init(&msg->iovecs);
816	msg->type = type;
817	__repmgr_add_buffer(&msg->iovecs, &msg->type, sizeof(msg->type));
818
819	control_size = control == NULL ? 0 : control->size;
820	msg->control_size_buf = htonl(control_size);
821	__repmgr_add_buffer(&msg->iovecs,
822	    &msg->control_size_buf, sizeof(msg->control_size_buf));
823
824	rec_size = rec == NULL ? 0 : rec->size;
825	msg->rec_size_buf = htonl(rec_size);
826	__repmgr_add_buffer(
827	    &msg->iovecs, &msg->rec_size_buf, sizeof(msg->rec_size_buf));
828
829	if (control->size > 0)
830		__repmgr_add_dbt(&msg->iovecs, control);
831
832	if (rec_size > 0)
833		__repmgr_add_dbt(&msg->iovecs, rec);
834
835	msg->fmsg = NULL;
836}
837
838/*
839 * Convert a message stored as iovec pointers to various pieces, into flattened
840 * form, by copying all the pieces, and then make the iovec just point to the
841 * new simplified form.
842 */
843static int
844flatten(env, msg)
845	ENV *env;
846	struct sending_msg *msg;
847{
848	u_int8_t *p;
849	size_t msg_size;
850	int i, ret;
851
852	DB_ASSERT(env, msg->fmsg == NULL);
853
854	msg_size = msg->iovecs.total_bytes;
855	if ((ret = __os_malloc(env, sizeof(*msg->fmsg) + msg_size,
856	    &msg->fmsg)) != 0)
857		return (ret);
858	msg->fmsg->length = msg_size;
859	msg->fmsg->ref_count = 0;
860	p = &msg->fmsg->data[0];
861
862	for (i = 0; i < msg->iovecs.count; i++) {
863		memcpy(p, msg->iovecs.vectors[i].iov_base,
864		    msg->iovecs.vectors[i].iov_len);
865		p = &p[msg->iovecs.vectors[i].iov_len];
866	}
867	__repmgr_iovec_init(&msg->iovecs);
868	__repmgr_add_buffer(&msg->iovecs, &msg->fmsg->data[0], msg_size);
869	return (0);
870}
871
872/*
873 * PUBLIC: int __repmgr_find_site __P((ENV *, const char *, u_int));
874 */
875int
876__repmgr_find_site(env, host, port)
877	ENV *env;
878	const char *host;
879	u_int port;
880{
881	DB_REP *db_rep;
882	REPMGR_SITE *site;
883	u_int i;
884
885	db_rep = env->rep_handle;
886	for (i = 0; i < db_rep->site_cnt; i++) {
887		site = &db_rep->sites[i];
888
889		if (strcmp(site->net_addr.host, host) == 0 &&
890		    site->net_addr.port == port)
891			return ((int)i);
892	}
893
894	return (-1);
895}
896
897/*
898 * Stash a copy of the given host name and port number into a convenient data
899 * structure so that we can save it permanently.  This is kind of like a
900 * constructor for a netaddr object, except that the caller supplies the memory
901 * for the base struct (though not the subordinate attachments).
902 *
903 * All inputs are assumed to have been already validated.
904 *
905 * PUBLIC: int __repmgr_pack_netaddr __P((ENV *, const char *,
906 * PUBLIC:     u_int, ADDRINFO *, repmgr_netaddr_t *));
907 */
908int
909__repmgr_pack_netaddr(env, host, port, list, addr)
910	ENV *env;
911	const char *host;
912	u_int port;
913	ADDRINFO *list;
914	repmgr_netaddr_t *addr;
915{
916	int ret;
917
918	DB_ASSERT(env, host != NULL);
919
920	if ((ret = __os_strdup(env, host, &addr->host)) != 0)
921		return (ret);
922	addr->port = (u_int16_t)port;
923	addr->address_list = list;
924	addr->current = NULL;
925	return (0);
926}
927
928/*
929 * PUBLIC: int __repmgr_getaddr __P((ENV *,
930 * PUBLIC:     const char *, u_int, int, ADDRINFO **));
931 */
932int
933__repmgr_getaddr(env, host, port, flags, result)
934	ENV *env;
935	const char *host;
936	u_int port;
937	int flags;    /* Matches struct addrinfo declaration. */
938	ADDRINFO **result;
939{
940	ADDRINFO *answer, hints;
941	char buffer[10];		/* 2**16 fits in 5 digits. */
942#ifdef DB_WIN32
943	int ret;
944#endif
945
946	/*
947	 * Ports are really 16-bit unsigned values, but it's too painful to
948	 * push that type through the API.
949	 */
950	if (port > UINT16_MAX) {
951		__db_errx(env, "port %u larger than max port %u",
952		    port, UINT16_MAX);
953		return (EINVAL);
954	}
955
956#ifdef DB_WIN32
957	if (!env->rep_handle->wsa_inited &&
958	    (ret = __repmgr_wsa_init(env)) != 0)
959		return (ret);
960#endif
961
962	memset(&hints, 0, sizeof(hints));
963	hints.ai_family = AF_UNSPEC;
964	hints.ai_socktype = SOCK_STREAM;
965	hints.ai_flags = flags;
966	(void)snprintf(buffer, sizeof(buffer), "%u", port);
967
968	/*
969	 * Although it's generally bad to discard error information, the return
970	 * code from __os_getaddrinfo is undependable.  Our callers at least
971	 * would like to be able to distinguish errors in getaddrinfo (which we
972	 * want to consider to be re-tryable), from other failure (e.g., EINVAL,
973	 * above).
974	 */
975	if (__os_getaddrinfo(env, host, port, buffer, &hints, &answer) != 0)
976		return (DB_REP_UNAVAIL);
977	*result = answer;
978
979	return (0);
980}
981
982/*
983 * Adds a new site to our array of known sites (unless it already exists),
984 * and schedules it for immediate connection attempt.  Whether it exists or not,
985 * we set newsitep, either to the already existing site, or to the newly created
986 * site.  Unless newsitep is passed in as NULL, which is allowed.
987 *
988 * PUBLIC: int __repmgr_add_site
989 * PUBLIC:     __P((ENV *, const char *, u_int, REPMGR_SITE **));
990 *
991 * !!!
992 * Caller is expected to hold the mutex.
993 */
994int
995__repmgr_add_site(env, host, port, newsitep)
996	ENV *env;
997	const char *host;
998	u_int port;
999	REPMGR_SITE **newsitep;
1000{
1001	ADDRINFO *address_list;
1002	DB_REP *db_rep;
1003	repmgr_netaddr_t addr;
1004	REPMGR_SITE *site;
1005	int ret, eid;
1006
1007	ret = 0;
1008	db_rep = env->rep_handle;
1009
1010	if (IS_VALID_EID(eid = __repmgr_find_site(env, host, port))) {
1011		site = SITE_FROM_EID(eid);
1012		ret = EEXIST;
1013		goto out;
1014	}
1015
1016	if ((ret = __repmgr_getaddr(
1017	    env, host, port, 0, &address_list)) == DB_REP_UNAVAIL) {
1018		/* Allow re-tryable errors.  We'll try again later. */
1019		address_list = NULL;
1020	} else if (ret != 0)
1021		return (ret);
1022
1023	if ((ret = __repmgr_pack_netaddr(
1024	    env, host, port, address_list, &addr)) != 0) {
1025		__os_freeaddrinfo(env, address_list);
1026		return (ret);
1027	}
1028
1029	if ((ret = __repmgr_new_site(env, &site, &addr, SITE_IDLE)) != 0) {
1030		__repmgr_cleanup_netaddr(env, &addr);
1031		return (ret);
1032	}
1033
1034	if (db_rep->selector != NULL &&
1035	    (ret = __repmgr_schedule_connection_attempt(
1036	    env, (u_int)EID_FROM_SITE(site), TRUE)) != 0)
1037		return (ret);
1038
1039	/* Note that we should only come here for success and EEXIST. */
1040out:
1041	if (newsitep != NULL)
1042		*newsitep = site;
1043	return (ret);
1044}
1045
1046/*
1047 * Initializes net-related memory in the db_rep handle.
1048 *
1049 * PUBLIC: int __repmgr_net_create __P((DB_REP *));
1050 */
1051int
1052__repmgr_net_create(db_rep)
1053	DB_REP *db_rep;
1054{
1055	db_rep->listen_fd = INVALID_SOCKET;
1056	db_rep->master_eid = DB_EID_INVALID;
1057
1058	TAILQ_INIT(&db_rep->connections);
1059	TAILQ_INIT(&db_rep->retries);
1060
1061	return (0);
1062}
1063
1064/*
1065 * listen_socket_init --
1066 *	Initialize a socket for listening.  Sets
1067 *	a file descriptor for the socket, ready for an accept() call
1068 *	in a thread that we're happy to let block.
1069 *
1070 * PUBLIC:  int __repmgr_listen __P((ENV *));
1071 */
1072int
1073__repmgr_listen(env)
1074	ENV *env;
1075{
1076	ADDRINFO *ai;
1077	DB_REP *db_rep;
1078	char *why;
1079	int sockopt, ret;
1080	socket_t s;
1081
1082	db_rep = env->rep_handle;
1083
1084	/* Use OOB value as sentinel to show no socket open. */
1085	s = INVALID_SOCKET;
1086	ai = ADDR_LIST_FIRST(&db_rep->my_addr);
1087
1088	/*
1089	 * Given the assert is correct, we execute the loop at least once, which
1090	 * means 'why' will have been set by the time it's needed.  But I guess
1091	 * lint doesn't know about DB_ASSERT.
1092	 */
1093	COMPQUIET(why, "");
1094	DB_ASSERT(env, ai != NULL);
1095	for (; ai != NULL; ai = ADDR_LIST_NEXT(&db_rep->my_addr)) {
1096
1097		if ((s = socket(ai->ai_family,
1098		    ai->ai_socktype, ai->ai_protocol)) == INVALID_SOCKET) {
1099			why = "can't create listen socket";
1100			continue;
1101		}
1102
1103		/*
1104		 * When testing, it's common to kill and restart regularly.  On
1105		 * some systems, this causes bind to fail with "address in use"
1106		 * errors unless this option is set.
1107		 */
1108		sockopt = 1;
1109		if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (sockopt_t)&sockopt,
1110		    sizeof(sockopt)) != 0) {
1111			why = "can't set REUSEADDR socket option";
1112			break;
1113		}
1114
1115		if (bind(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0) {
1116			why = "can't bind socket to listening address";
1117			(void)closesocket(s);
1118			s = INVALID_SOCKET;
1119			continue;
1120		}
1121
1122		if (listen(s, 5) != 0) {
1123			why = "listen()";
1124			break;
1125		}
1126
1127		if ((ret = __repmgr_set_nonblocking(s)) != 0) {
1128			__db_err(env, ret, "can't unblock listen socket");
1129			goto clean;
1130		}
1131
1132		db_rep->listen_fd = s;
1133		return (0);
1134	}
1135
1136	ret = net_errno;
1137	__db_err(env, ret, why);
1138clean:	if (s != INVALID_SOCKET)
1139		(void)closesocket(s);
1140	return (ret);
1141}
1142
1143/*
1144 * PUBLIC: int __repmgr_net_close __P((ENV *));
1145 */
1146int
1147__repmgr_net_close(env)
1148	ENV *env;
1149{
1150	DB_REP *db_rep;
1151	REPMGR_CONNECTION *conn;
1152#ifndef DB_WIN32
1153	struct sigaction sigact;
1154#endif
1155	int ret, t_ret;
1156
1157	db_rep = env->rep_handle;
1158	if (db_rep->listen_fd == INVALID_SOCKET)
1159		return (0);
1160
1161	ret = 0;
1162	while (!TAILQ_EMPTY(&db_rep->connections)) {
1163		conn = TAILQ_FIRST(&db_rep->connections);
1164		if ((t_ret = __repmgr_close_connection(env, conn)) != 0 &&
1165		    ret == 0)
1166			ret = t_ret;
1167		TAILQ_REMOVE(&db_rep->connections, conn, entries);
1168		if ((t_ret = __repmgr_destroy_connection(env, conn)) != 0 &&
1169		    ret == 0)
1170			ret = t_ret;
1171	}
1172
1173	if (closesocket(db_rep->listen_fd) == SOCKET_ERROR && ret == 0)
1174		ret = net_errno;
1175
1176#ifdef DB_WIN32
1177	/* Shut down the Windows sockets DLL. */
1178	if (WSACleanup() == SOCKET_ERROR && ret == 0)
1179		ret = net_errno;
1180	db_rep->wsa_inited = FALSE;
1181#else
1182	/* Restore original SIGPIPE handling configuration. */
1183	if (db_rep->chg_sig_handler) {
1184		memset(&sigact, 0, sizeof(sigact));
1185		sigact.sa_handler = SIG_DFL;
1186		if (sigaction(SIGPIPE, &sigact, NULL) == -1 && ret == 0)
1187			ret = errno;
1188	}
1189#endif
1190	db_rep->listen_fd = INVALID_SOCKET;
1191	return (ret);
1192}
1193
1194/*
1195 * PUBLIC: void __repmgr_net_destroy __P((ENV *, DB_REP *));
1196 */
1197void
1198__repmgr_net_destroy(env, db_rep)
1199	ENV *env;
1200	DB_REP *db_rep;
1201{
1202	REPMGR_CONNECTION *conn;
1203	REPMGR_RETRY *retry;
1204	REPMGR_SITE *site;
1205	u_int i;
1206
1207	__repmgr_cleanup_netaddr(env, &db_rep->my_addr);
1208
1209	if (db_rep->sites == NULL)
1210		return;
1211
1212	while (!TAILQ_EMPTY(&db_rep->retries)) {
1213		retry = TAILQ_FIRST(&db_rep->retries);
1214		TAILQ_REMOVE(&db_rep->retries, retry, entries);
1215		__os_free(env, retry);
1216	}
1217
1218	while (!TAILQ_EMPTY(&db_rep->connections)) {
1219		conn = TAILQ_FIRST(&db_rep->connections);
1220		(void)__repmgr_destroy_connection(env, conn);
1221	}
1222
1223	for (i = 0; i < db_rep->site_cnt; i++) {
1224		site = &db_rep->sites[i];
1225		__repmgr_cleanup_netaddr(env, &site->net_addr);
1226	}
1227	__os_free(env, db_rep->sites);
1228	db_rep->sites = NULL;
1229}
1230