1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2006-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14typedef int (*HEARTBEAT_ACTION) __P((ENV *));
15
16static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
17static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
18static int __repmgr_call_election __P((ENV *));
19static int __repmgr_connect __P((ENV*, socket_t *, REPMGR_SITE *));
20static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
21static int find_version_info __P((ENV *, REPMGR_CONNECTION *, DBT *));
22static int introduce_site __P((ENV *, char *, u_int, REPMGR_SITE**, u_int32_t));
23static int __repmgr_next_timeout __P((ENV *,
24    db_timespec *, HEARTBEAT_ACTION *));
25static int dispatch_phase_completion __P((ENV *, REPMGR_CONNECTION *));
26static REPMGR_CONNECTION *__repmgr_master_connection __P((ENV *));
27static int process_parameters __P((ENV *,
28    REPMGR_CONNECTION *, char *, u_int, u_int32_t, u_int32_t));
29static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
30static int record_ack __P((ENV *, REPMGR_CONNECTION *));
31static int __repmgr_retry_connections __P((ENV *));
32static int send_handshake __P((ENV *, REPMGR_CONNECTION *, void *, size_t));
33static int __repmgr_send_heartbeat __P((ENV *));
34static int send_v1_handshake __P((ENV *,
35    REPMGR_CONNECTION *, void *, size_t));
36static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
37static int __repmgr_try_one __P((ENV *, u_int));
38
39#define	ONLY_HANDSHAKE(env, conn) do {				     \
40	if (conn->msg_type != REPMGR_HANDSHAKE) {		     \
41		__db_errx(env, "unexpected msg type %d in state %d", \
42		    (int)conn->msg_type, conn->state);		     \
43		return (DB_REP_UNAVAIL);			     \
44	}							     \
45} while (0)
46
47/*
48 * PUBLIC: void *__repmgr_select_thread __P((void *));
49 */
50void *
51__repmgr_select_thread(args)
52	void *args;
53{
54	ENV *env = args;
55	int ret;
56
57	if ((ret = __repmgr_select_loop(env)) != 0) {
58		__db_err(env, ret, "select loop failed");
59		__repmgr_thread_failure(env, ret);
60	}
61	return (NULL);
62}
63
64/*
65 * PUBLIC: int __repmgr_accept __P((ENV *));
66 */
67int
68__repmgr_accept(env)
69	ENV *env;
70{
71	DB_REP *db_rep;
72	REPMGR_CONNECTION *conn;
73	struct sockaddr_in siaddr;
74	socklen_t addrlen;
75	socket_t s;
76	int ret;
77#ifdef DB_WIN32
78	WSAEVENT event_obj;
79#endif
80
81	db_rep = env->rep_handle;
82	addrlen = sizeof(siaddr);
83	if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
84	    &addrlen)) == -1) {
85		/*
86		 * Some errors are innocuous and so should be ignored.  MSDN
87		 * Library documents the Windows ones; the Unix ones are
88		 * advocated in Stevens' UNPv1, section 16.6; and Linux
89		 * Application Development, p. 416.
90		 */
91		switch (ret = net_errno) {
92#ifdef DB_WIN32
93		case WSAECONNRESET:
94		case WSAEWOULDBLOCK:
95#else
96		case EINTR:
97		case EWOULDBLOCK:
98		case ECONNABORTED:
99		case ENETDOWN:
100#ifdef EPROTO
101		case EPROTO:
102#endif
103		case ENOPROTOOPT:
104		case EHOSTDOWN:
105#ifdef ENONET
106		case ENONET:
107#endif
108		case EHOSTUNREACH:
109		case EOPNOTSUPP:
110		case ENETUNREACH:
111#endif
112			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
113			    "accept error %d considered innocuous", ret));
114			return (0);
115		default:
116			__db_err(env, ret, "accept error");
117			return (ret);
118		}
119	}
120	RPRINT(env, DB_VERB_REPMGR_MISC, (env, "accepted a new connection"));
121
122	if ((ret = __repmgr_set_nonblocking(s)) != 0) {
123		__db_err(env, ret, "can't set nonblock after accept");
124		(void)closesocket(s);
125		return (ret);
126	}
127
128#ifdef DB_WIN32
129	if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
130		ret = net_errno;
131		__db_err(env, ret, "can't create WSA event");
132		(void)closesocket(s);
133		return (ret);
134	}
135	if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) {
136		ret = net_errno;
137		__db_err(env, ret, "can't set desired event bits");
138		(void)WSACloseEvent(event_obj);
139		(void)closesocket(s);
140		return (ret);
141	}
142#endif
143	if ((ret =
144	    __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
145#ifdef DB_WIN32
146		(void)WSACloseEvent(event_obj);
147#endif
148		(void)closesocket(s);
149		return (ret);
150	}
151	F_SET(conn, CONN_INCOMING);
152
153	/*
154	 * We don't yet know which site this connection is coming from.  So for
155	 * now, put it on the "orphans" list; we'll move it to the appropriate
156	 * site struct later when we discover who we're talking with, and what
157	 * type of connection it is.
158	 */
159	conn->eid = -1;
160	TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
161
162#ifdef DB_WIN32
163	conn->event_object = event_obj;
164#endif
165	return (0);
166}
167
168/*
169 * Computes how long we should wait for input, in other words how long until we
170 * have to wake up and do something.  Returns TRUE if timeout is set; FALSE if
171 * there is nothing to wait for.
172 *
173 * Note that the resulting timeout could be zero; but it can't be negative.
174 *
175 * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *));
176 */
177int
178__repmgr_compute_timeout(env, timeout)
179	ENV *env;
180	db_timespec *timeout;
181{
182	DB_REP *db_rep;
183	REPMGR_RETRY *retry;
184	db_timespec now, t;
185	int have_timeout;
186
187	db_rep = env->rep_handle;
188
189	/*
190	 * There are two factors to consider: are heartbeats in use?  and, do we
191	 * have any sites with broken connections that we ought to retry?
192	 */
193	have_timeout = __repmgr_next_timeout(env, &t, NULL);
194
195	/* List items are in order, so we only have to examine the first one. */
196	if (!TAILQ_EMPTY(&db_rep->retries)) {
197		retry = TAILQ_FIRST(&db_rep->retries);
198		if (have_timeout) {
199			/* Choose earliest timeout deadline. */
200			t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
201		} else {
202			t = retry->time;
203			have_timeout = TRUE;
204		}
205	}
206
207	if (have_timeout) {
208		__os_gettime(env, &now, 1);
209		if (timespeccmp(&now, &t, >=))
210			timespecclear(timeout);
211		else {
212			*timeout = t;
213			timespecsub(timeout, &now);
214		}
215	}
216
217	return (have_timeout);
218}
219
220/*
221 * Figures out the next heartbeat-related thing to be done, and when it should
222 * be done.  The code is factored this way because this computation needs to be
223 * done both before each select() call, and after (when we're checking for timer
224 * expiration).
225 */
226static int
227__repmgr_next_timeout(env, deadline, action)
228	ENV *env;
229	db_timespec *deadline;
230	HEARTBEAT_ACTION *action;
231{
232	DB_REP *db_rep;
233	HEARTBEAT_ACTION my_action;
234	REPMGR_CONNECTION *conn;
235	REPMGR_SITE *site;
236	db_timespec t;
237
238	db_rep = env->rep_handle;
239
240	if (db_rep->master_eid == SELF_EID && db_rep->heartbeat_frequency > 0) {
241		t = db_rep->last_bcast;
242		TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_frequency);
243		my_action = __repmgr_send_heartbeat;
244	} else if ((conn = __repmgr_master_connection(env)) != NULL &&
245	    !IS_SUBORDINATE(db_rep) &&
246	    db_rep->heartbeat_monitor_timeout > 0 &&
247	    conn->version >= HEARTBEAT_MIN_VERSION) {
248		/*
249		 * If we have a working connection to a heartbeat-aware master,
250		 * let's monitor it.  Otherwise there's really nothing we can
251		 * do.
252		 */
253		site = SITE_FROM_EID(db_rep->master_eid);
254		t = site->last_rcvd_timestamp;
255		TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_monitor_timeout);
256		my_action = __repmgr_call_election;
257	} else
258		return (FALSE);
259
260	*deadline = t;
261	if (action != NULL)
262		*action = my_action;
263	return (TRUE);
264}
265
266static int
267__repmgr_send_heartbeat(env)
268	ENV *env;
269{
270	DBT control, rec;
271	u_int unused1, unused2;
272
273	DB_INIT_DBT(control, NULL, 0);
274	DB_INIT_DBT(rec, NULL, 0);
275	return (__repmgr_send_broadcast(env,
276	    REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2));
277}
278
279static REPMGR_CONNECTION *
280__repmgr_master_connection(env)
281	ENV *env;
282{
283	DB_REP *db_rep;
284	REPMGR_CONNECTION *conn;
285	REPMGR_SITE *master;
286
287	db_rep = env->rep_handle;
288
289	if (db_rep->master_eid == SELF_EID ||
290	    !IS_VALID_EID(db_rep->master_eid))
291		return (NULL);
292	master = SITE_FROM_EID(db_rep->master_eid);
293	if (master->state != SITE_CONNECTED)
294		return (NULL);
295	conn = master->ref.conn;
296	if (IS_READY_STATE(conn->state))
297		return (conn);
298	return (NULL);
299}
300
301static int
302__repmgr_call_election(env)
303	ENV *env;
304{
305	REPMGR_CONNECTION *conn;
306
307	conn = __repmgr_master_connection(env);
308	DB_ASSERT(env, conn != NULL);
309	RPRINT(env, DB_VERB_REPMGR_MISC,
310	    (env, "heartbeat monitor timeout expired"));
311	STAT(env->rep_handle->region->mstat.st_connection_drop++);
312	return (__repmgr_bust_connection(env, conn));
313}
314
315/*
316 * PUBLIC: int __repmgr_check_timeouts __P((ENV *));
317 *
318 * !!!
319 * Assumes caller holds the mutex.
320 */
321int
322__repmgr_check_timeouts(env)
323	ENV *env;
324{
325	db_timespec when, now;
326	HEARTBEAT_ACTION action;
327	int ret;
328
329	/*
330	 * Figure out the next heartbeat-related thing to be done.  Then, if
331	 * it's time to do it, do so.
332	 */
333	if (__repmgr_next_timeout(env, &when, &action)) {
334		__os_gettime(env, &now, 1);
335		if (timespeccmp(&when, &now, <=) &&
336		    (ret = (*action)(env)) != 0)
337			return (ret);
338	}
339
340	return (__repmgr_retry_connections(env));
341}
342
343/*
344 * Initiates connection attempts for any sites on the idle list whose retry
345 * times have expired.
346 */
347static int
348__repmgr_retry_connections(env)
349	ENV *env;
350{
351	DB_REP *db_rep;
352	REPMGR_RETRY *retry;
353	db_timespec now;
354	u_int eid;
355	int ret;
356
357	db_rep = env->rep_handle;
358	__os_gettime(env, &now, 1);
359
360	while (!TAILQ_EMPTY(&db_rep->retries)) {
361		retry = TAILQ_FIRST(&db_rep->retries);
362		if (timespeccmp(&retry->time, &now, >=))
363			break;	/* since items are in time order */
364
365		TAILQ_REMOVE(&db_rep->retries, retry, entries);
366
367		eid = retry->eid;
368		__os_free(env, retry);
369
370		if ((ret = __repmgr_try_one(env, eid)) != 0)
371			return (ret);
372	}
373	return (0);
374}
375
376/*
377 * PUBLIC: int __repmgr_first_try_connections __P((ENV *));
378 *
379 * !!!
380 * Assumes caller holds the mutex.
381 */
382int
383__repmgr_first_try_connections(env)
384	ENV *env;
385{
386	DB_REP *db_rep;
387	u_int eid;
388	int ret;
389
390	db_rep = env->rep_handle;
391	for (eid = 0; eid < db_rep->site_cnt; eid++)
392		if ((ret = __repmgr_try_one(env, eid)) != 0)
393			return (ret);
394	return (0);
395}
396
397/*
398 * Makes a best-effort attempt to connect to the indicated site.  Returns a
399 * non-zero error indication only for disastrous failures.  For re-tryable
400 * errors, we will have scheduled another attempt, and that can be considered
401 * success enough.
402 */
403static int
404__repmgr_try_one(env, eid)
405	ENV *env;
406	u_int eid;
407{
408	ADDRINFO *list;
409	DB_REP *db_rep;
410	repmgr_netaddr_t *addr;
411	int ret;
412
413	db_rep = env->rep_handle;
414
415	addr = &SITE_FROM_EID(eid)->net_addr;
416	if (ADDR_LIST_FIRST(addr) == NULL) {
417		if ((ret = __repmgr_getaddr(env,
418		    addr->host, addr->port, 0, &list)) == 0) {
419			addr->address_list = list;
420			(void)ADDR_LIST_FIRST(addr);
421		} else if (ret == DB_REP_UNAVAIL)
422			return (__repmgr_schedule_connection_attempt(
423			    env, eid, FALSE));
424		else
425			return (ret);
426	}
427
428	/* Here, when we have a valid address. */
429	return (__repmgr_connect_site(env, eid));
430}
431
432/*
433 * Tries to establish a connection with the site indicated by the given eid,
434 * starting with the "current" element of its address list and trying as many
435 * addresses as necessary until the list is exhausted.
436 *
437 * PUBLIC: int __repmgr_connect_site __P((ENV *, u_int eid));
438 */
439int
440__repmgr_connect_site(env, eid)
441	ENV *env;
442	u_int eid;
443{
444	DB_REP *db_rep;
445	REPMGR_CONNECTION *con;
446	REPMGR_SITE *site;
447	socket_t s;
448	int state;
449	int ret;
450#ifdef DB_WIN32
451	long desired_event;
452	WSAEVENT event_obj;
453#endif
454
455	db_rep = env->rep_handle;
456	site = SITE_FROM_EID(eid);
457
458	switch (ret = __repmgr_connect(env, &s, site)) {
459	case 0:
460		state = CONN_CONNECTED;
461#ifdef DB_WIN32
462		desired_event = FD_READ|FD_CLOSE;
463#endif
464		break;
465	case INPROGRESS:
466		state = CONN_CONNECTING;
467#ifdef DB_WIN32
468		desired_event = FD_CONNECT;
469#endif
470		break;
471	default:
472		STAT(db_rep->region->mstat.st_connect_fail++);
473		return (
474		    __repmgr_schedule_connection_attempt(env, eid, FALSE));
475	}
476
477#ifdef DB_WIN32
478	if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
479		ret = net_errno;
480		__db_err(env, ret, "can't create WSA event");
481		(void)closesocket(s);
482		return (ret);
483	}
484	if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) {
485		ret = net_errno;
486		__db_err(env, ret, "can't set desired event bits");
487		(void)WSACloseEvent(event_obj);
488		(void)closesocket(s);
489		return (ret);
490	}
491#endif
492
493	if ((ret = __repmgr_new_connection(env, &con, s, state)) != 0) {
494#ifdef DB_WIN32
495		(void)WSACloseEvent(event_obj);
496#endif
497		(void)closesocket(s);
498		return (ret);
499	}
500#ifdef DB_WIN32
501	con->event_object = event_obj;
502#endif
503
504	con->eid = (int)eid;
505	site->ref.conn = con;
506	site->state = SITE_CONNECTED;
507
508	if (state == CONN_CONNECTED) {
509		__os_gettime(env, &site->last_rcvd_timestamp, 1);
510		switch (ret = __repmgr_propose_version(env, con)) {
511		case 0:
512			break;
513		case DB_REP_UNAVAIL:
514			return (__repmgr_bust_connection(env, con));
515		default:
516			return (ret);
517		}
518	}
519
520	return (0);
521}
522
523static int
524__repmgr_connect(env, socket_result, site)
525	ENV *env;
526	socket_t *socket_result;
527	REPMGR_SITE *site;
528{
529	repmgr_netaddr_t *addr;
530	ADDRINFO *ai;
531	socket_t s;
532	char *why;
533	int ret;
534	SITE_STRING_BUFFER buffer;
535
536	/*
537	 * Lint doesn't know about DB_ASSERT, so it can't tell that this
538	 * loop will always get executed at least once, giving 'why' a value.
539	 */
540	COMPQUIET(why, "");
541	addr = &site->net_addr;
542	ai = ADDR_LIST_CURRENT(addr);
543	DB_ASSERT(env, ai != NULL);
544	for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) {
545
546		if ((s = socket(ai->ai_family,
547		    ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) {
548			why = "can't create socket to connect";
549			continue;
550		}
551
552		if ((ret = __repmgr_set_nonblocking(s)) != 0) {
553			__db_err(env,
554			    ret, "can't make nonblock socket to connect");
555			(void)closesocket(s);
556			return (ret);
557		}
558
559		if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0)
560			ret = net_errno;
561
562		if (ret == 0 || ret == INPROGRESS) {
563			*socket_result = s;
564			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
565			    "init connection to %s with result %d",
566			    __repmgr_format_site_loc(site, buffer), ret));
567			return (ret);
568		}
569
570		why = "connection failed";
571		(void)closesocket(s);
572	}
573
574	/* We've exhausted all possible addresses. */
575	ret = net_errno;
576	__db_err(env, ret, "%s to %s", why,
577	    __repmgr_format_site_loc(site, buffer));
578	return (ret);
579}
580
581/*
582 * Sends a proposal for version negotiation.
583 *
584 * PUBLIC: int __repmgr_propose_version __P((ENV *, REPMGR_CONNECTION *));
585 */
586int
587__repmgr_propose_version(env, conn)
588	ENV *env;
589	REPMGR_CONNECTION *conn;
590{
591	DB_REP *db_rep;
592	__repmgr_version_proposal_args versions;
593	repmgr_netaddr_t *my_addr;
594	size_t hostname_len, rec_length;
595	u_int8_t *buf, *p;
596	int ret;
597
598	db_rep = env->rep_handle;
599	my_addr = &db_rep->my_addr;
600
601	/*
602	 * In repmgr wire protocol version 1, a handshake message had a rec part
603	 * that looked like this:
604	 *
605	 *  +-----------------+----+
606	 *  |  host name ...  | \0 |
607	 *  +-----------------+----+
608	 *
609	 * To ensure its own sanity, the old repmgr would write a NUL into the
610	 * last byte of a received message, and then use normal C library string
611	 * operations (e.g., * strlen, strcpy).
612	 *
613	 * Now, a version proposal has a rec part that looks like this:
614	 *
615	 *  +-----------------+----+------------------+------+
616	 *  |  host name ...  | \0 |  extra info ...  |  \0  |
617	 *  +-----------------+----+------------------+------+
618	 *
619	 * The "extra info" contains the version parameters, in marshaled form.
620	 */
621
622	hostname_len = strlen(my_addr->host);
623	rec_length = hostname_len + 1 +
624	    __REPMGR_VERSION_PROPOSAL_SIZE + 1;
625	if ((ret = __os_malloc(env, rec_length, &buf)) != 0)
626		goto out;
627	p = buf;
628	(void)strcpy((char*)p, my_addr->host);
629
630	p += hostname_len + 1;
631	versions.min = DB_REPMGR_MIN_VERSION;
632	versions.max = DB_REPMGR_VERSION;
633	__repmgr_version_proposal_marshal(env, &versions, p);
634
635	ret = send_v1_handshake(env, conn, buf, rec_length);
636	__os_free(env, buf);
637out:
638	return (ret);
639}
640
641static int
642send_v1_handshake(env, conn, buf, len)
643	ENV *env;
644	REPMGR_CONNECTION *conn;
645	void *buf;
646	size_t len;
647{
648	DB_REP *db_rep;
649	REP *rep;
650	repmgr_netaddr_t *my_addr;
651	DB_REPMGR_V1_HANDSHAKE buffer;
652	DBT cntrl, rec;
653
654	db_rep = env->rep_handle;
655	rep = db_rep->region;
656	my_addr = &db_rep->my_addr;
657
658	buffer.version = 1;
659	buffer.priority = htonl(rep->priority);
660	buffer.port = my_addr->port;
661	cntrl.data = &buffer;
662	cntrl.size = sizeof(buffer);
663
664	rec.data = buf;
665	rec.size = (u_int32_t)len;
666
667	/*
668	 * It would of course be disastrous to block the select() thread, so
669	 * pass the "blockable" argument as FALSE.  Fortunately blocking should
670	 * never be necessary here, because the hand-shake is always the first
671	 * thing we send.  Which is a good thing, because it would be almost as
672	 * disastrous if we allowed ourselves to drop a handshake.
673	 */
674	return (__repmgr_send_one(env,
675	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
676}
677
678/*
679 * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *));
680 *
681 * !!!
682 * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
683 */
684int
685__repmgr_read_from_site(env, conn)
686	ENV *env;
687	REPMGR_CONNECTION *conn;
688{
689	DB_REP *db_rep;
690	REPMGR_SITE *site;
691	SITE_STRING_BUFFER buffer;
692	size_t nr;
693	int ret;
694
695	db_rep = env->rep_handle;
696	/*
697	 * Keep reading pieces as long as we're making some progress, or until
698	 * we complete the current read phase.
699	 */
700	for (;;) {
701		if ((ret = __repmgr_readv(conn->fd,
702		    &conn->iovecs.vectors[conn->iovecs.offset],
703		    conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) {
704			switch (ret) {
705#ifndef DB_WIN32
706			case EINTR:
707				continue;
708#endif
709			case WOULDBLOCK:
710				return (0);
711			default:
712#ifdef EBADF
713				DB_ASSERT(env, ret != EBADF);
714#endif
715				(void)__repmgr_format_eid_loc(env->rep_handle,
716				    conn->eid, buffer);
717				__db_err(env, ret,
718				    "can't read from %s", buffer);
719				STAT(env->rep_handle->
720				    region->mstat.st_connection_drop++);
721				return (DB_REP_UNAVAIL);
722			}
723		}
724
725		if (nr > 0) {
726			if (IS_VALID_EID(conn->eid)) {
727				site = SITE_FROM_EID(conn->eid);
728				__os_gettime(
729				    env, &site->last_rcvd_timestamp, 1);
730			}
731			if (__repmgr_update_consumed(&conn->iovecs, nr))
732				return (dispatch_phase_completion(env,
733					    conn));
734		} else {
735			(void)__repmgr_format_eid_loc(env->rep_handle,
736			    conn->eid, buffer);
737			__db_errx(env, "EOF on connection from %s", buffer);
738			STAT(env->rep_handle->
739			    region->mstat.st_connection_drop++);
740			return (DB_REP_UNAVAIL);
741		}
742	}
743}
744
745/*
746 * Handles whatever needs to be done upon the completion of a reading phase on a
747 * given connection.
748 */
749static int
750dispatch_phase_completion(env, conn)
751	ENV *env;
752	REPMGR_CONNECTION *conn;
753{
754#define	MEM_ALIGN sizeof(double)
755	DBT *dbt;
756	u_int32_t control_size, rec_size;
757	size_t memsize, control_offset, rec_offset;
758	void *membase;
759	int ret;
760
761	switch (conn->reading_phase) {
762	case SIZES_PHASE:
763		/*
764		 * We've received the header: a message type and the lengths of
765		 * the two pieces of the message.  Set up buffers to read the
766		 * two pieces.  This set-up is a bit different for a
767		 * REPMGR_REP_MESSAGE, because we plan to pass it off to the msg
768		 * threads.
769		 */
770		__repmgr_iovec_init(&conn->iovecs);
771		control_size = ntohl(conn->control_size_buf);
772		rec_size = ntohl(conn->rec_size_buf);
773
774		if (conn->msg_type == REPMGR_REP_MESSAGE) {
775			if (control_size == 0) {
776				__db_errx(
777				    env, "illegal size for rep msg");
778				return (DB_REP_UNAVAIL);
779			}
780			/*
781			 * Allocate a block of memory large enough to hold a
782			 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
783			 * data areas that it points to.  Start by calculating
784			 * the total memory needed, rounding up for the start of
785			 * each DBT, to ensure possible alignment requirements.
786			 */
787			memsize = (size_t)
788			    DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
789			control_offset = memsize;
790			memsize += control_size;
791			if (rec_size > 0) {
792				memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN);
793				rec_offset = memsize;
794				memsize += rec_size;
795			} else
796				COMPQUIET(rec_offset, 0);
797			if ((ret = __os_malloc(env, memsize, &membase)) != 0)
798				return (ret);
799			conn->input.rep_message = membase;
800
801			conn->input.rep_message->originating_eid = conn->eid;
802			DB_INIT_DBT(conn->input.rep_message->control,
803			    (u_int8_t*)membase + control_offset, control_size);
804			__repmgr_add_dbt(&conn->iovecs,
805			    &conn->input.rep_message->control);
806
807			if (rec_size > 0) {
808				DB_INIT_DBT(conn->input.rep_message->rec,
809				    (rec_size > 0 ?
810					(u_int8_t*)membase + rec_offset : NULL),
811				    rec_size);
812				__repmgr_add_dbt(&conn->iovecs,
813				    &conn->input.rep_message->rec);
814			} else
815				DB_INIT_DBT(conn->input.rep_message->rec,
816				    NULL, 0);
817		} else {
818			conn->input.repmgr_msg.cntrl.size = control_size;
819			conn->input.repmgr_msg.rec.size = rec_size;
820
821			if (control_size > 0) {
822				dbt = &conn->input.repmgr_msg.cntrl;
823				if ((ret = __os_malloc(env, control_size,
824				    &dbt->data)) != 0)
825					return (ret);
826				__repmgr_add_dbt(&conn->iovecs, dbt);
827			}
828
829			if (rec_size > 0) {
830				dbt = &conn->input.repmgr_msg.rec;
831				if ((ret = __os_malloc(env, rec_size,
832				     &dbt->data)) != 0) {
833					if (control_size > 0)
834						__os_free(env,
835						    conn->input.repmgr_msg.
836						    cntrl.data);
837					return (ret);
838				}
839				__repmgr_add_dbt(&conn->iovecs, dbt);
840			}
841		}
842
843		conn->reading_phase = DATA_PHASE;
844
845		if (control_size > 0 || rec_size > 0)
846			break;
847
848		/*
849		 * However, if they're both 0, we're ready to complete
850		 * DATA_PHASE.
851		 */
852		/* FALLTHROUGH */
853
854	case DATA_PHASE:
855		return (dispatch_msgin(env, conn));
856
857	default:
858		DB_ASSERT(env, FALSE);
859	}
860
861	return (0);
862}
863
864/*
865 * Processes an incoming message, depending on our current state.
866 */
867static int
868dispatch_msgin(env, conn)
869	ENV *env;
870	REPMGR_CONNECTION *conn;
871{
872	DBT *dbt;
873	char *hostname;
874	int given, ret;
875
876	given = FALSE;
877
878	switch (conn->state) {
879	case CONN_CONNECTED:
880		/*
881		 * In this state, we know we're working with an outgoing
882		 * connection.  We've sent a version proposal, and now expect
883		 * the response (which could be a dumb old V1 handshake).
884		 */
885		ONLY_HANDSHAKE(env, conn);
886		if ((ret = read_version_response(env, conn)) != 0)
887			return (ret);
888		break;
889
890	case CONN_NEGOTIATE:
891		/*
892		 * Since we're in this state, we know we're working with an
893		 * incoming connection, and this is the first message we've
894		 * received.  So it must be a version negotiation proposal (or a
895		 * legacy V1 handshake).  (We'll verify this of course.)
896		 */
897		ONLY_HANDSHAKE(env, conn);
898		if ((ret = send_version_response(env, conn)) != 0)
899			return (ret);
900		break;
901
902	case CONN_PARAMETERS:
903		/*
904		 * We've previously agreed on a (>1) version, and are now simply
905		 * awaiting the other side's parameters handshake.
906		 */
907		ONLY_HANDSHAKE(env, conn);
908		dbt = &conn->input.repmgr_msg.rec;
909		hostname = dbt->data;
910		hostname[dbt->size-1] = '\0';
911		if ((ret = accept_handshake(env, conn, hostname)) != 0)
912			return (ret);
913		conn->state = CONN_READY;
914		break;
915
916	case CONN_READY:	/* FALLTHROUGH */
917	case CONN_CONGESTED:
918		/*
919		 * We have a complete message, so process it.  Acks and
920		 * handshakes get processed here, in line.  Regular rep messages
921		 * get posted to a queue, to be handled by a thread from the
922		 * message thread pool.
923		 */
924		switch (conn->msg_type) {
925		case REPMGR_ACK:
926			if ((ret = record_ack(env, conn)) != 0)
927				return (ret);
928			break;
929
930		case REPMGR_HEARTBEAT:
931			/*
932			 * The underlying byte-receiving mechanism will already
933			 * have noted the fact that we got some traffic on this
934			 * connection.  And that's all we really have to do, so
935			 * there's nothing more needed at this point.
936			 */
937			break;
938
939		case REPMGR_REP_MESSAGE:
940			if ((ret = __repmgr_queue_put(env,
941			    conn->input.rep_message)) != 0)
942				return (ret);
943			/*
944			 * The queue has taken over responsibility for the
945			 * rep_message buffer, and will free it later.
946			 */
947			given = TRUE;
948			break;
949
950		default:
951			__db_errx(env,
952			    "unexpected msg type rcvd in ready state: %d",
953			    (int)conn->msg_type);
954			return (DB_REP_UNAVAIL);
955		}
956		break;
957
958	case CONN_DEFUNCT:
959		break;
960
961	default:
962		DB_ASSERT(env, FALSE);
963	}
964
965	if (!given) {
966		dbt = &conn->input.repmgr_msg.cntrl;
967		if (dbt->size > 0)
968			__os_free(env, dbt->data);
969		dbt = &conn->input.repmgr_msg.rec;
970		if (dbt->size > 0)
971			__os_free(env, dbt->data);
972	}
973	__repmgr_reset_for_reading(conn);
974	return (0);
975}
976
977/*
978 * Examine and verify the incoming version proposal message, and send an
979 * appropriate response.
980 */
981static int
982send_version_response(env, conn)
983	ENV *env;
984	REPMGR_CONNECTION *conn;
985{
986	DB_REP *db_rep;
987	__repmgr_version_proposal_args versions;
988	__repmgr_version_confirmation_args conf;
989	repmgr_netaddr_t *my_addr;
990	char *hostname;
991	u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
992	DBT vi;
993	int ret;
994
995	db_rep = env->rep_handle;
996	my_addr = &db_rep->my_addr;
997
998	if ((ret = find_version_info(env, conn, &vi)) != 0)
999		return (ret);
1000	if (vi.size == 0) {
1001		/* No version info, so we must be talking to a v1 site. */
1002		hostname = conn->input.repmgr_msg.rec.data;
1003		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1004			return (ret);
1005		if ((ret = send_v1_handshake(env, conn, my_addr->host,
1006		     strlen(my_addr->host) + 1)) != 0)
1007			return (ret);
1008		conn->state = CONN_READY;
1009	} else {
1010		if ((ret = __repmgr_version_proposal_unmarshal(env,
1011		    &versions, vi.data, vi.size, NULL)) != 0)
1012			return (DB_REP_UNAVAIL);
1013
1014		if (DB_REPMGR_VERSION >= versions.min &&
1015		    DB_REPMGR_VERSION <= versions.max)
1016			conf.version = DB_REPMGR_VERSION;
1017		else if (versions.max >= DB_REPMGR_MIN_VERSION &&
1018		    versions.max <= DB_REPMGR_VERSION)
1019			conf.version = versions.max;
1020		else {
1021			/*
1022			 * User must have wired up a combination of versions
1023			 * exceeding what we said we'd support.
1024			 */
1025			__db_errx(env,
1026			    "No available version between %lu and %lu",
1027			    (u_long)versions.min, (u_long)versions.max);
1028			return (DB_REP_UNAVAIL);
1029		}
1030		conn->version = conf.version;
1031
1032		__repmgr_version_confirmation_marshal(env, &conf, buf);
1033		if ((ret = send_handshake(env, conn, buf, sizeof(buf))) != 0)
1034			return (ret);
1035
1036		conn->state = CONN_PARAMETERS;
1037	}
1038	return (ret);
1039}
1040
1041/*
1042 * Sends a version-aware handshake to the remote site, only after we've verified
1043 * that it is indeed version-aware.  We can send either v2 or v3 handshake,
1044 * depending on the connection's version.
1045 */
1046static int
1047send_handshake(env, conn, opt, optlen)
1048	ENV *env;
1049	REPMGR_CONNECTION *conn;
1050	void *opt;
1051	size_t optlen;
1052{
1053	DB_REP *db_rep;
1054	REP *rep;
1055	DBT cntrl, rec;
1056	__repmgr_handshake_args hs;
1057	__repmgr_v2handshake_args v2hs;
1058	repmgr_netaddr_t *my_addr;
1059	size_t hostname_len, rec_len;
1060	void *buf;
1061	u_int8_t *p;
1062	u_int32_t cntrl_len;
1063	int ret;
1064
1065	db_rep = env->rep_handle;
1066	rep = db_rep->region;
1067	my_addr = &db_rep->my_addr;
1068
1069	/*
1070	 * The cntrl part has port and priority.  The rec part has the host
1071	 * name, followed by whatever optional extra data was passed to us.
1072	 *
1073	 * Version awareness was introduced with protocol version 2.
1074	 */
1075	DB_ASSERT(env, conn->version >= 2);
1076	cntrl_len = conn->version == 2 ?
1077	    __REPMGR_V2HANDSHAKE_SIZE : __REPMGR_HANDSHAKE_SIZE;
1078	hostname_len = strlen(my_addr->host);
1079	rec_len = hostname_len + 1 +
1080	    (opt == NULL ? 0 : optlen);
1081
1082	if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
1083		return (ret);
1084
1085	cntrl.data = p = buf;
1086	if (conn->version == 2) {
1087		/* Not allowed to use multi-process feature in v2 group. */
1088		DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1089		v2hs.port = my_addr->port;
1090		v2hs.priority = rep->priority;
1091		__repmgr_v2handshake_marshal(env, &v2hs, p);
1092	} else {
1093		hs.port = my_addr->port;
1094		hs.priority = rep->priority;
1095		hs.flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
1096		__repmgr_handshake_marshal(env, &hs, p);
1097	}
1098	cntrl.size = cntrl_len;
1099
1100	p = rec.data = &p[cntrl_len];
1101	(void)strcpy((char*)p, my_addr->host);
1102	p += hostname_len + 1;
1103	if (opt != NULL) {
1104		memcpy(p, opt, optlen);
1105		p += optlen;
1106	}
1107	rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);
1108
1109	/* Never block on select thread: pass blockable as FALSE. */
1110	ret = __repmgr_send_one(env,
1111	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE);
1112	__os_free(env, buf);
1113	return (ret);
1114}
1115
1116static int
1117read_version_response(env, conn)
1118	ENV *env;
1119	REPMGR_CONNECTION *conn;
1120{
1121	__repmgr_version_confirmation_args conf;
1122	DBT vi;
1123	char *hostname;
1124	int ret;
1125
1126	if ((ret = find_version_info(env, conn, &vi)) != 0)
1127		return (ret);
1128	hostname = conn->input.repmgr_msg.rec.data;
1129	if (vi.size == 0) {
1130		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1131			return (ret);
1132	} else {
1133		if ((ret = __repmgr_version_confirmation_unmarshal(env,
1134		    &conf, vi.data, vi.size, NULL)) != 0)
1135			return (DB_REP_UNAVAIL);
1136		if (conf.version >= DB_REPMGR_MIN_VERSION &&
1137		    conf.version <= DB_REPMGR_VERSION)
1138			conn->version = conf.version;
1139		else {
1140			/*
1141			 * Remote site "confirmed" a version outside of the
1142			 * range we proposed.  It should never do that.
1143			 */
1144			__db_errx(env,
1145			    "Can't support confirmed version %lu",
1146			    (u_long)conf.version);
1147			return (DB_REP_UNAVAIL);
1148		}
1149
1150		if ((ret = accept_handshake(env, conn, hostname)) != 0)
1151			return (ret);
1152		if ((ret = send_handshake(env, conn, NULL, 0)) != 0)
1153			return (ret);
1154	}
1155	conn->state = CONN_READY;
1156	return (ret);
1157}
1158
1159/*
1160 * Examine the rec part of a handshake message to see if it has any version
1161 * information in it.  This is the magic that lets us allows version-aware sites
1162 * to exchange information, and yet avoids tripping up v1 sites, which don't
1163 * know how to look for it.
1164 */
1165static int
1166find_version_info(env, conn, vi)
1167	ENV *env;
1168	REPMGR_CONNECTION *conn;
1169	DBT *vi;
1170{
1171	DBT *dbt;
1172	char *hostname;
1173	u_int32_t hostname_len;
1174
1175	dbt = &conn->input.repmgr_msg.rec;
1176	if (dbt->size == 0) {
1177		__db_errx(env, "handshake is missing rec part");
1178		return (DB_REP_UNAVAIL);
1179	}
1180	hostname = dbt->data;
1181	hostname[dbt->size-1] = '\0';
1182	hostname_len = (u_int32_t)strlen(hostname);
1183	if (hostname_len + 1 == dbt->size) {
1184		/*
1185		 * The rec DBT held only the host name.  This is a simple legacy
1186		 * V1 handshake; it contains no version information.
1187		 */
1188		vi->size = 0;
1189	} else {
1190		/*
1191		 * There's more data than just the host name.  The remainder is
1192		 * available to be treated as a normal byte buffer (and read in
1193		 * by one of the unmarshal functions).  Note that the remaining
1194		 * length should not include the padding byte that we have
1195		 * already clobbered.
1196		 */
1197		vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
1198		vi->size = (dbt->size - (hostname_len+1)) - 1;
1199	}
1200	return (0);
1201}
1202
1203static int
1204accept_handshake(env, conn, hostname)
1205	ENV *env;
1206	REPMGR_CONNECTION *conn;
1207	char *hostname;
1208{
1209	__repmgr_handshake_args hs;
1210	__repmgr_v2handshake_args hs2;
1211	u_int port;
1212	u_int32_t pri, flags;
1213
1214	/*
1215	 * Current version is 3, and only other version that supports version
1216	 * negotiation is 2.
1217	 */
1218	DB_ASSERT(env, conn->version == 2 || conn->version == 3);
1219
1220	/* Extract port and priority from cntrl. */
1221	if (conn->version == 2) {
1222		if (__repmgr_v2handshake_unmarshal(env, &hs2,
1223		    conn->input.repmgr_msg.cntrl.data,
1224		    conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1225			return (DB_REP_UNAVAIL);
1226		port = hs2.port;
1227		pri = hs2.priority;
1228		flags = 0;
1229	} else {
1230		if (__repmgr_handshake_unmarshal(env, &hs,
1231		   conn->input.repmgr_msg.cntrl.data,
1232		   conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1233			return (DB_REP_UNAVAIL);
1234		port = hs.port;
1235		pri = hs.priority;
1236		flags = hs.flags;
1237	}
1238
1239	return (process_parameters(env,
1240		    conn, hostname, port, pri, flags));
1241}
1242
1243static int
1244accept_v1_handshake(env, conn, hostname)
1245	ENV *env;
1246	REPMGR_CONNECTION *conn;
1247	char *hostname;
1248{
1249	DB_REPMGR_V1_HANDSHAKE *handshake;
1250	u_int32_t prio;
1251
1252	handshake = conn->input.repmgr_msg.cntrl.data;
1253	if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
1254	    handshake->version != 1) {
1255		__db_errx(env, "malformed V1 handshake");
1256		return (DB_REP_UNAVAIL);
1257	}
1258
1259	conn->version = 1;
1260	prio = ntohl(handshake->priority);
1261	return (process_parameters(env,
1262		    conn, hostname, handshake->port, prio, 0));
1263}
1264
1265static int
1266process_parameters(env, conn, host, port, priority, flags)
1267	ENV *env;
1268	REPMGR_CONNECTION *conn;
1269	char *host;
1270	u_int port;
1271	u_int32_t priority, flags;
1272{
1273	DB_REP *db_rep;
1274	REPMGR_RETRY *retry;
1275	REPMGR_SITE *site;
1276	int eid, ret, sockopt;
1277
1278	db_rep = env->rep_handle;
1279
1280	if (F_ISSET(conn, CONN_INCOMING)) {
1281		/*
1282		 * Incoming connection: we don't yet know what site it belongs
1283		 * to, so it must be on the "orphans" list.
1284		 */
1285		DB_ASSERT(env, !IS_VALID_EID(conn->eid));
1286		TAILQ_REMOVE(&db_rep->connections, conn, entries);
1287
1288		/*
1289		 * Now that we've been given the host and port, use them to find
1290		 * the site (or create a new one if necessary, etc.).
1291		 */
1292		if ((site = __repmgr_find_site(env, host, port)) != NULL) {
1293			eid = EID_FROM_SITE(site);
1294			if (LF_ISSET(REPMGR_SUBORDINATE)) {
1295				/*
1296				 * Accept it, as a supplementary source of
1297				 * input, but nothing else.
1298				 */
1299				TAILQ_INSERT_TAIL(&site->sub_conns,
1300				    conn, entries);
1301				conn->eid = eid;
1302
1303#ifdef SO_KEEPALIVE
1304				sockopt = 1;
1305				if (setsockopt(conn->fd, SOL_SOCKET,
1306				    SO_KEEPALIVE, (sockopt_t)&sockopt,
1307				     sizeof(sockopt)) != 0) {
1308					ret = net_errno;
1309					__db_err(env, ret,
1310					   "can't set KEEPALIVE socket option");
1311					return (ret);
1312				}
1313#endif
1314			} else {
1315				if (site->state == SITE_IDLE) {
1316					RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1317					"handshake from idle site %s:%u EID %u",
1318					    host, port, eid));
1319					retry = site->ref.retry;
1320					TAILQ_REMOVE(&db_rep->retries,
1321					    retry, entries);
1322					__os_free(env, retry);
1323				} else {
1324					/*
1325					 * We got an incoming connection for a
1326					 * site we were already connected to; at
1327					 * least we thought we were.
1328					 */
1329					RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1330			     "connection from %s:%u EID %u supersedes existing",
1331					    host, port, eid));
1332
1333					/*
1334					 * No need to schedule a retry for
1335					 * later, since we now have a
1336					 * replacement connection.
1337					 */
1338					__repmgr_disable_connection(env,
1339					     site->ref.conn);
1340				}
1341				conn->eid = eid;
1342				site->state = SITE_CONNECTED;
1343				site->ref.conn = conn;
1344				__os_gettime(env,
1345				    &site->last_rcvd_timestamp, 1);
1346			}
1347		} else {
1348			if ((ret = introduce_site(env,
1349			    host, port, &site, flags)) == 0)
1350				RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1351			"handshake introduces unknown site %s:%u", host, port));
1352			else if (ret != EEXIST)
1353				return (ret);
1354			eid = EID_FROM_SITE(site);
1355
1356			if (LF_ISSET(REPMGR_SUBORDINATE)) {
1357				TAILQ_INSERT_TAIL(&site->sub_conns,
1358				    conn, entries);
1359#ifdef SO_KEEPALIVE
1360				sockopt = 1;
1361				if ((ret = setsockopt(conn->fd, SOL_SOCKET,
1362				    SO_KEEPALIVE, (sockopt_t)&sockopt,
1363				     sizeof(sockopt))) != 0) {
1364					__db_err(env, ret,
1365					   "can't set KEEPALIVE socket option");
1366					return (ret);
1367				}
1368#endif
1369			} else {
1370				site->state = SITE_CONNECTED;
1371				site->ref.conn = conn;
1372				__os_gettime(env,
1373				    &site->last_rcvd_timestamp, 1);
1374			}
1375			conn->eid = eid;
1376		}
1377	} else {
1378		/*
1379		 * Since we initiated this as an outgoing connection, we
1380		 * obviously already know the host, port and site.  We just need
1381		 * the other site's priority.
1382		 */
1383		DB_ASSERT(env, IS_VALID_EID(conn->eid));
1384		site = SITE_FROM_EID(conn->eid);
1385		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1386		    "handshake from connection to %s:%lu EID %u",
1387		    site->net_addr.host,
1388		    (u_long)site->net_addr.port, conn->eid));
1389	}
1390
1391	site->priority = priority;
1392	F_SET(site, SITE_HAS_PRIO);
1393
1394	/*
1395	 * If we're moping around wishing we knew who the master was, then
1396	 * getting in touch with another site might finally provide sufficient
1397	 * connectivity to find out.  But just do this once, because otherwise
1398	 * we get messages while the subsequent rep_start operations are going
1399	 * on, and rep tosses them in that case.
1400	 */
1401	if (!IS_SUBORDINATE(db_rep) && /* us */
1402	    db_rep->master_eid == DB_EID_INVALID &&
1403	    db_rep->init_policy != DB_REP_MASTER &&
1404	    !db_rep->done_one &&
1405	    !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */
1406		db_rep->done_one = TRUE;
1407		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1408		    "handshake with no known master to wake election thread"));
1409		if ((ret = __repmgr_init_election(env, ELECT_REPSTART)) != 0)
1410			return (ret);
1411	}
1412
1413	return (0);
1414}
1415
1416static int
1417introduce_site(env, host, port, sitep, flags)
1418	ENV *env;
1419	char *host;
1420	u_int port;
1421	REPMGR_SITE **sitep;
1422	u_int32_t flags;
1423{
1424	int peer, state;
1425
1426	/*
1427	 * SITE_CONNECTED means we have the main connection to the site.  But
1428	 * we're here when we first learn of a site by getting a subordinate
1429	 * connection, so this doesn't suffice to put us in "connected" state.
1430	 */
1431	state = LF_ISSET(REPMGR_SUBORDINATE) ? SITE_IDLE : SITE_CONNECTED;
1432	peer = FALSE;
1433
1434	return (__repmgr_add_site_int(env, host, port, sitep, peer, state));
1435}
1436
1437static int
1438record_ack(env, conn)
1439	ENV *env;
1440	REPMGR_CONNECTION *conn;
1441{
1442	DB_REP *db_rep;
1443	REPMGR_SITE *site;
1444	__repmgr_ack_args *ackp, ack;
1445	SITE_STRING_BUFFER location;
1446	u_int32_t gen;
1447	int ret;
1448
1449	db_rep = env->rep_handle;
1450
1451	DB_ASSERT(env, conn->version > 0 &&
1452	    IS_READY_STATE(conn->state) && IS_VALID_EID(conn->eid));
1453	site = SITE_FROM_EID(conn->eid);
1454
1455	/*
1456	 * Extract the LSN.  Save it only if it is an improvement over what the
1457	 * site has already ack'ed.
1458	 */
1459	if (conn->version == 1) {
1460		ackp = conn->input.repmgr_msg.cntrl.data;
1461		if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
1462		    conn->input.repmgr_msg.rec.size != 0) {
1463			__db_errx(env, "bad ack msg size");
1464			return (DB_REP_UNAVAIL);
1465		}
1466	} else {
1467		ackp = &ack;
1468		if ((ret = __repmgr_ack_unmarshal(env, ackp,
1469			 conn->input.repmgr_msg.cntrl.data,
1470			 conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
1471			return (DB_REP_UNAVAIL);
1472	}
1473
1474	/* Ignore stale acks. */
1475	gen = db_rep->region->gen;
1476	if (ackp->generation < gen) {
1477		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1478		    "ignoring stale ack (%lu<%lu), from %s",
1479		     (u_long)ackp->generation, (u_long)gen,
1480		     __repmgr_format_site_loc(site, location)));
1481		return (0);
1482	}
1483	RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1484	    "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
1485	    (u_long)ackp->lsn.offset, (u_long)ackp->generation,
1486	    __repmgr_format_site_loc(site, location)));
1487
1488	if (ackp->generation == gen &&
1489	    LOG_COMPARE(&ackp->lsn, &site->max_ack) == 1) {
1490		memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
1491		if ((ret = __repmgr_wake_waiting_senders(env)) != 0)
1492			return (ret);
1493	}
1494	return (0);
1495}
1496
1497/*
1498 * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *));
1499 */
1500int
1501__repmgr_write_some(env, conn)
1502	ENV *env;
1503	REPMGR_CONNECTION *conn;
1504{
1505	QUEUED_OUTPUT *output;
1506	REPMGR_FLAT *msg;
1507	int bytes, ret;
1508
1509	while (!STAILQ_EMPTY(&conn->outbound_queue)) {
1510		output = STAILQ_FIRST(&conn->outbound_queue);
1511		msg = output->msg;
1512		if ((bytes = send(conn->fd, &msg->data[output->offset],
1513		    (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) {
1514			if ((ret = net_errno) == WOULDBLOCK)
1515				return (0);
1516			else {
1517				__db_err(env, ret, "writing data");
1518				STAT(env->rep_handle->
1519				    region->mstat.st_connection_drop++);
1520				return (DB_REP_UNAVAIL);
1521			}
1522		}
1523
1524		if ((output->offset += (size_t)bytes) >= msg->length) {
1525			STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
1526			__os_free(env, output);
1527			conn->out_queue_length--;
1528			if (--msg->ref_count <= 0)
1529				__os_free(env, msg);
1530
1531			/*
1532			 * We've achieved enough movement to free up at least
1533			 * one space in the outgoing queue.  Wake any message
1534			 * threads that may be waiting for space.  Leave
1535			 * CONGESTED state so that when the queue reaches the
1536			 * high-water mark again, the filling thread will be
1537			 * allowed to try waiting again.
1538			 */
1539			conn->state = CONN_READY;
1540			if (conn->blockers > 0 &&
1541			    (ret = __repmgr_signal(&conn->drained)) != 0)
1542				return (ret);
1543		}
1544	}
1545
1546#ifdef DB_WIN32
1547	/*
1548	 * With the queue now empty, it's time to relinquish ownership of this
1549	 * connection again, so that the next call to send() can write the
1550	 * message in line, instead of posting it to the queue for us.
1551	 */
1552	if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE)
1553	    == SOCKET_ERROR) {
1554		ret = net_errno;
1555		__db_err(env, ret, "can't remove FD_WRITE event bit");
1556		return (ret);
1557	}
1558#endif
1559
1560	return (0);
1561}
1562