1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2006,2008 Oracle.  All rights reserved.
5 *
6 * $Id: repmgr_sel.c,v 1.51 2008/04/30 02:33:34 alexg Exp $
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14typedef int (*HEARTBEAT_ACTION) __P((ENV *));
15
16static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
17static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
18static int __repmgr_call_election __P((ENV *));
19static int __repmgr_connect __P((ENV*, socket_t *, REPMGR_SITE *));
20static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
21static int find_version_info __P((ENV *, REPMGR_CONNECTION *, DBT *));
22static int __repmgr_next_timeout __P((ENV *,
23    db_timespec *, HEARTBEAT_ACTION *));
24static int dispatch_phase_completion __P((ENV *, REPMGR_CONNECTION *));
25static REPMGR_CONNECTION *__repmgr_master_connection __P((ENV *));
26static int process_parameters __P((ENV *,
27    REPMGR_CONNECTION *, char *, u_int, u_int32_t));
28static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
29static int record_ack __P((ENV *, REPMGR_CONNECTION *));
30static int __repmgr_retry_connections __P((ENV *));
31static int send_handshake __P((ENV *, REPMGR_CONNECTION *, void *, size_t));
32static int __repmgr_send_heartbeat __P((ENV *));
33static int send_v1_handshake __P((ENV *,
34    REPMGR_CONNECTION *, void *, size_t));
35static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
36static int __repmgr_try_one __P((ENV *, u_int));
37
38#define	ONLY_HANDSHAKE(env, conn) do {				     \
39	if (conn->msg_type != REPMGR_HANDSHAKE) {		     \
40		__db_errx(env, "unexpected msg type %d in state %d", \
41		    (int)conn->msg_type, conn->state);		     \
42		return (DB_REP_UNAVAIL);			     \
43	}							     \
44} while (0)
45
46/*
47 * PUBLIC: void *__repmgr_select_thread __P((void *));
48 */
49void *
50__repmgr_select_thread(args)
51	void *args;
52{
53	ENV *env = args;
54	int ret;
55
56	if ((ret = __repmgr_select_loop(env)) != 0) {
57		__db_err(env, ret, "select loop failed");
58		__repmgr_thread_failure(env, ret);
59	}
60	return (NULL);
61}
62
63/*
64 * PUBLIC: int __repmgr_accept __P((ENV *));
65 */
66int
67__repmgr_accept(env)
68	ENV *env;
69{
70	DB_REP *db_rep;
71	REPMGR_CONNECTION *conn;
72	struct sockaddr_in siaddr;
73	socklen_t addrlen;
74	socket_t s;
75	int ret;
76#ifdef DB_WIN32
77	WSAEVENT event_obj;
78#endif
79
80	db_rep = env->rep_handle;
81	addrlen = sizeof(siaddr);
82	if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
83	    &addrlen)) == -1) {
84		/*
85		 * Some errors are innocuous and so should be ignored.  MSDN
86		 * Library documents the Windows ones; the Unix ones are
87		 * advocated in Stevens' UNPv1, section 16.6; and Linux
88		 * Application Development, p. 416.
89		 */
90		switch (ret = net_errno) {
91#ifdef DB_WIN32
92		case WSAECONNRESET:
93		case WSAEWOULDBLOCK:
94#else
95		case EINTR:
96		case EWOULDBLOCK:
97		case ECONNABORTED:
98		case ENETDOWN:
99#ifdef EPROTO
100		case EPROTO:
101#endif
102		case ENOPROTOOPT:
103		case EHOSTDOWN:
104#ifdef ENONET
105		case ENONET:
106#endif
107		case EHOSTUNREACH:
108		case EOPNOTSUPP:
109		case ENETUNREACH:
110#endif
111			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
112			    "accept error %d considered innocuous", ret));
113			return (0);
114		default:
115			__db_err(env, ret, "accept error");
116			return (ret);
117		}
118	}
119	RPRINT(env, DB_VERB_REPMGR_MISC, (env, "accepted a new connection"));
120
121	if ((ret = __repmgr_set_nonblocking(s)) != 0) {
122		__db_err(env, ret, "can't set nonblock after accept");
123		(void)closesocket(s);
124		return (ret);
125	}
126
127#ifdef DB_WIN32
128	if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
129		ret = net_errno;
130		__db_err(env, ret, "can't create WSA event");
131		(void)closesocket(s);
132		return (ret);
133	}
134	if (WSAEventSelect(s, event_obj, FD_READ|FD_CLOSE) == SOCKET_ERROR) {
135		ret = net_errno;
136		__db_err(env, ret, "can't set desired event bits");
137		(void)WSACloseEvent(event_obj);
138		(void)closesocket(s);
139		return (ret);
140	}
141#endif
142	if ((ret =
143	    __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
144#ifdef DB_WIN32
145		(void)WSACloseEvent(event_obj);
146#endif
147		(void)closesocket(s);
148		return (ret);
149	}
150	F_SET(conn, CONN_INCOMING);
151	conn->eid = -1;
152#ifdef DB_WIN32
153	conn->event_object = event_obj;
154#endif
155	return (0);
156}
157
158/*
159 * Computes how long we should wait for input, in other words how long until we
160 * have to wake up and do something.  Returns TRUE if timeout is set; FALSE if
161 * there is nothing to wait for.
162 *
163 * Note that the resulting timeout could be zero; but it can't be negative.
164 *
165 * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *));
166 */
167int
168__repmgr_compute_timeout(env, timeout)
169	ENV *env;
170	db_timespec *timeout;
171{
172	DB_REP *db_rep;
173	REPMGR_RETRY *retry;
174	db_timespec now, t;
175	int have_timeout;
176
177	db_rep = env->rep_handle;
178
179	/*
180	 * There are two factors to consider: are heartbeats in use?  and, do we
181	 * have any sites with broken connections that we ought to retry?
182	 */
183	have_timeout = __repmgr_next_timeout(env, &t, NULL);
184
185	/* List items are in order, so we only have to examine the first one. */
186	if (!TAILQ_EMPTY(&db_rep->retries)) {
187		retry = TAILQ_FIRST(&db_rep->retries);
188		if (have_timeout) {
189			/* Choose earliest timeout deadline. */
190			t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
191		} else {
192			t = retry->time;
193			have_timeout = TRUE;
194		}
195	}
196
197	if (have_timeout) {
198		__os_gettime(env, &now, 1);
199		if (timespeccmp(&now, &t, >=))
200			timespecclear(timeout);
201		else {
202			*timeout = t;
203			timespecsub(timeout, &now);
204		}
205	}
206
207	return (have_timeout);
208}
209
210/*
211 * Figures out the next heartbeat-related thing to be done, and when it should
212 * be done.  The code is factored this way because this computation needs to be
213 * done both before each select() call, and after (when we're checking for timer
214 * expiration).
215 */
216static int
217__repmgr_next_timeout(env, deadline, action)
218	ENV *env;
219	db_timespec *deadline;
220	HEARTBEAT_ACTION *action;
221{
222	DB_REP *db_rep;
223	HEARTBEAT_ACTION my_action;
224	REPMGR_CONNECTION *conn;
225	REPMGR_SITE *site;
226	db_timespec t;
227
228	db_rep = env->rep_handle;
229
230	if (db_rep->master_eid == SELF_EID && db_rep->heartbeat_frequency > 0) {
231		t = db_rep->last_bcast;
232		TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_frequency);
233		my_action = __repmgr_send_heartbeat;
234	} else if ((conn = __repmgr_master_connection(env)) != NULL &&
235	    db_rep->heartbeat_monitor_timeout > 0 &&
236	    conn->version >= HEARTBEAT_MIN_VERSION) {
237		/*
238		 * If we have a working connection to a heartbeat-aware master,
239		 * let's monitor it.  Otherwise there's really nothing we can
240		 * do.
241		 */
242		site = SITE_FROM_EID(db_rep->master_eid);
243		t = site->last_rcvd_timestamp;
244		TIMESPEC_ADD_DB_TIMEOUT(&t, db_rep->heartbeat_monitor_timeout);
245		my_action = __repmgr_call_election;
246	} else
247		return (FALSE);
248
249	*deadline = t;
250	if (action != NULL)
251		*action = my_action;
252	return (TRUE);
253}
254
255static int
256__repmgr_send_heartbeat(env)
257	ENV *env;
258{
259	DBT control, rec;
260	u_int unused1, unused2;
261
262	DB_INIT_DBT(control, NULL, 0);
263	DB_INIT_DBT(rec, NULL, 0);
264	return (__repmgr_send_broadcast(env,
265	    REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2));
266}
267
268static REPMGR_CONNECTION *
269__repmgr_master_connection(env)
270	ENV *env;
271{
272	DB_REP *db_rep;
273	REPMGR_CONNECTION *conn;
274	REPMGR_SITE *master;
275
276	db_rep = env->rep_handle;
277
278	if (db_rep->master_eid == SELF_EID ||
279	    !IS_VALID_EID(db_rep->master_eid))
280		return (NULL);
281	master = SITE_FROM_EID(db_rep->master_eid);
282	if (master->state != SITE_CONNECTED)
283		return (NULL);
284	conn = master->ref.conn;
285	if (IS_READY_STATE(conn->state))
286		return (conn);
287	return (NULL);
288}
289
290static int
291__repmgr_call_election(env)
292	ENV *env;
293{
294	REPMGR_CONNECTION *conn;
295
296	conn = __repmgr_master_connection(env);
297	DB_ASSERT(env, conn != NULL);
298	RPRINT(env, DB_VERB_REPMGR_MISC,
299	    (env, "heartbeat monitor timeout expired"));
300	return (__repmgr_bust_connection(env, conn));
301}
302
303/*
304 * PUBLIC: int __repmgr_check_timeouts __P((ENV *));
305 *
306 * !!!
307 * Assumes caller holds the mutex.
308 */
309int
310__repmgr_check_timeouts(env)
311	ENV *env;
312{
313	db_timespec when, now;
314	HEARTBEAT_ACTION action;
315	int ret;
316
317	/*
318	 * Figure out the next heartbeat-related thing to be done.  Then, if
319	 * it's time to do it, do so.
320	 */
321	if (__repmgr_next_timeout(env, &when, &action)) {
322		__os_gettime(env, &now, 1);
323		if (timespeccmp(&when, &now, <=) &&
324		    (ret = (*action)(env)) != 0)
325			return (ret);
326	}
327
328	return (__repmgr_retry_connections(env));
329}
330
331/*
332 * Initiates connection attempts for any sites on the idle list whose retry
333 * times have expired.
334 */
335static int
336__repmgr_retry_connections(env)
337	ENV *env;
338{
339	DB_REP *db_rep;
340	REPMGR_RETRY *retry;
341	db_timespec now;
342	u_int eid;
343	int ret;
344
345	db_rep = env->rep_handle;
346	__os_gettime(env, &now, 1);
347
348	while (!TAILQ_EMPTY(&db_rep->retries)) {
349		retry = TAILQ_FIRST(&db_rep->retries);
350		if (timespeccmp(&retry->time, &now, >=))
351			break;	/* since items are in time order */
352
353		TAILQ_REMOVE(&db_rep->retries, retry, entries);
354
355		eid = retry->eid;
356		__os_free(env, retry);
357
358		if ((ret = __repmgr_try_one(env, eid)) != 0)
359			return (ret);
360	}
361	return (0);
362}
363
364/*
365 * PUBLIC: int __repmgr_first_try_connections __P((ENV *));
366 *
367 * !!!
368 * Assumes caller holds the mutex.
369 */
370int
371__repmgr_first_try_connections(env)
372	ENV *env;
373{
374	DB_REP *db_rep;
375	u_int eid;
376	int ret;
377
378	db_rep = env->rep_handle;
379	for (eid=0; eid<db_rep->site_cnt; eid++)
380		if ((ret = __repmgr_try_one(env, eid)) != 0)
381			return (ret);
382	return (0);
383}
384
385/*
386 * Makes a best-effort attempt to connect to the indicated site.  Returns a
387 * non-zero error indication only for disastrous failures.  For re-tryable
388 * errors, we will have scheduled another attempt, and that can be considered
389 * success enough.
390 */
391static int
392__repmgr_try_one(env, eid)
393	ENV *env;
394	u_int eid;
395{
396	ADDRINFO *list;
397	DB_REP *db_rep;
398	repmgr_netaddr_t *addr;
399	int ret;
400
401	db_rep = env->rep_handle;
402
403	/*
404	 * If have never yet successfully resolved this site's host name, try to
405	 * do so now.
406	 *
407	 * Throughout all the rest of repmgr, we almost never do any sort of
408	 * blocking operation in the select thread.  This is the sole exception
409	 * to that rule.  Fortunately, it should rarely happen:
410	 *
411	 * - for a site that we only learned about because it connected to us:
412	 *   not only were we not configured to know about it, but we also never
413	 *   got a NEWSITE message about it.  And even then only if the
414	 *   connection fails and we want to retry it from this end;
415	 *
416	 * - if the name look-up system (e.g., DNS) is not working (let's hope
417	 *   it's temporary), or the host name is not found.
418	 */
419	addr = &SITE_FROM_EID(eid)->net_addr;
420	if (ADDR_LIST_FIRST(addr) == NULL) {
421		if ((ret = __repmgr_getaddr(
422		    env, addr->host, addr->port, 0, &list)) == 0) {
423			addr->address_list = list;
424			(void)ADDR_LIST_FIRST(addr);
425		} else if (ret == DB_REP_UNAVAIL)
426			return (__repmgr_schedule_connection_attempt(
427			    env, eid, FALSE));
428		else
429			return (ret);
430	}
431
432	/* Here, when we have a valid address. */
433	return (__repmgr_connect_site(env, eid));
434}
435
436/*
437 * Tries to establish a connection with the site indicated by the given eid,
438 * starting with the "current" element of its address list and trying as many
439 * addresses as necessary until the list is exhausted.
440 *
441 * PUBLIC: int __repmgr_connect_site __P((ENV *, u_int eid));
442 */
443int
444__repmgr_connect_site(env, eid)
445	ENV *env;
446	u_int eid;
447{
448	DB_REP *db_rep;
449	REPMGR_CONNECTION *con;
450	REPMGR_SITE *site;
451	socket_t s;
452	int state;
453	int ret;
454#ifdef DB_WIN32
455	long desired_event;
456	WSAEVENT event_obj;
457#endif
458
459	db_rep = env->rep_handle;
460	site = SITE_FROM_EID(eid);
461
462	switch (ret = __repmgr_connect(env, &s, site)) {
463	case 0:
464		state = CONN_CONNECTED;
465#ifdef DB_WIN32
466		desired_event = FD_READ|FD_CLOSE;
467#endif
468		break;
469	case INPROGRESS:
470		state = CONN_CONNECTING;
471#ifdef DB_WIN32
472		desired_event = FD_CONNECT;
473#endif
474		break;
475	default:
476		STAT(db_rep->region->mstat.st_connect_fail++);
477		return (
478		    __repmgr_schedule_connection_attempt(env, eid, FALSE));
479	}
480
481#ifdef DB_WIN32
482	if ((event_obj = WSACreateEvent()) == WSA_INVALID_EVENT) {
483		ret = net_errno;
484		__db_err(env, ret, "can't create WSA event");
485		(void)closesocket(s);
486		return (ret);
487	}
488	if (WSAEventSelect(s, event_obj, desired_event) == SOCKET_ERROR) {
489		ret = net_errno;
490		__db_err(env, ret, "can't set desired event bits");
491		(void)WSACloseEvent(event_obj);
492		(void)closesocket(s);
493		return (ret);
494	}
495#endif
496
497	if ((ret = __repmgr_new_connection(env, &con, s, state))
498	    != 0) {
499#ifdef DB_WIN32
500		(void)WSACloseEvent(event_obj);
501#endif
502		(void)closesocket(s);
503		return (ret);
504	}
505#ifdef DB_WIN32
506	con->event_object = event_obj;
507#endif
508
509	con->eid = (int)eid;
510	site->ref.conn = con;
511	site->state = SITE_CONNECTED;
512
513	if (state == CONN_CONNECTED) {
514		switch (ret = __repmgr_propose_version(env, con)) {
515		case 0:
516			break;
517		case DB_REP_UNAVAIL:
518			return (__repmgr_bust_connection(env, con));
519		default:
520			return (ret);
521		}
522	}
523
524	return (0);
525}
526
527static int
528__repmgr_connect(env, socket_result, site)
529	ENV *env;
530	socket_t *socket_result;
531	REPMGR_SITE *site;
532{
533	repmgr_netaddr_t *addr;
534	ADDRINFO *ai;
535	socket_t s;
536	char *why;
537	int ret;
538	SITE_STRING_BUFFER buffer;
539
540	/*
541	 * Lint doesn't know about DB_ASSERT, so it can't tell that this
542	 * loop will always get executed at least once, giving 'why' a value.
543	 */
544	COMPQUIET(why, "");
545	addr = &site->net_addr;
546	ai = ADDR_LIST_CURRENT(addr);
547	DB_ASSERT(env, ai != NULL);
548	for (; ai != NULL; ai = ADDR_LIST_NEXT(addr)) {
549
550		if ((s = socket(ai->ai_family,
551		    ai->ai_socktype, ai->ai_protocol)) == SOCKET_ERROR) {
552			why = "can't create socket to connect";
553			continue;
554		}
555
556		if ((ret = __repmgr_set_nonblocking(s)) != 0) {
557			__db_err(env,
558			    ret, "can't make nonblock socket to connect");
559			(void)closesocket(s);
560			return (ret);
561		}
562
563		if (connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) != 0)
564			ret = net_errno;
565
566		if (ret == 0 || ret == INPROGRESS) {
567			*socket_result = s;
568			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
569			    "init connection to %s with result %d",
570			    __repmgr_format_site_loc(site, buffer), ret));
571			return (ret);
572		}
573
574		why = "connection failed";
575		(void)closesocket(s);
576	}
577
578	/* We've exhausted all possible addresses. */
579	ret = net_errno;
580	__db_err(env, ret, "%s to %s", why,
581	    __repmgr_format_site_loc(site, buffer));
582	return (ret);
583}
584
585/*
586 * Sends a proposal for version negotiation.
587 *
588 * PUBLIC: int __repmgr_propose_version __P((ENV *, REPMGR_CONNECTION *));
589 */
590int
591__repmgr_propose_version(env, conn)
592	ENV *env;
593	REPMGR_CONNECTION *conn;
594{
595	DB_REP *db_rep;
596	__repmgr_version_proposal_args versions;
597	repmgr_netaddr_t *my_addr;
598	size_t hostname_len, rec_length;
599	u_int8_t *buf, *p;
600	int ret;
601
602	db_rep = env->rep_handle;
603	my_addr = &db_rep->my_addr;
604
605	/*
606	 * In repmgr wire protocol version 1, a handshake message had a rec part
607	 * that looked like this:
608	 *
609	 *  +-----------------+----+
610	 *  |  host name ...  | \0 |
611	 *  +-----------------+----+
612	 *
613	 * To ensure its own sanity, the old repmgr would write a NUL into the
614	 * last byte of a received message, and then use normal C library string
615	 * operations (e.g., * strlen, strcpy).
616	 *
617	 * Now, a version proposal has a rec part that looks like this:
618	 *
619	 *  +-----------------+----+------------------+------+
620	 *  |  host name ...  | \0 |  extra info ...  |  \0  |
621	 *  +-----------------+----+------------------+------+
622	 *
623	 * The "extra info" contains the version parameters, in marshaled form.
624	 */
625
626	hostname_len = strlen(my_addr->host);
627	rec_length = hostname_len + 1 +
628	    __REPMGR_VERSION_PROPOSAL_SIZE + 1;
629	if ((ret = __os_malloc(env, rec_length, &buf)) != 0)
630		goto out;
631	p = buf;
632	(void)strcpy((char*)p, my_addr->host);
633
634	p += hostname_len + 1;
635	versions.min = DB_REPMGR_MIN_VERSION;
636	versions.max = DB_REPMGR_VERSION;
637	__repmgr_version_proposal_marshal(env, &versions, p);
638
639	ret = send_v1_handshake(env, conn, buf, rec_length);
640	__os_free(env, buf);
641out:
642	return (ret);
643}
644
645static int
646send_v1_handshake(env, conn, buf, len)
647	ENV *env;
648	REPMGR_CONNECTION *conn;
649	void *buf;
650	size_t len;
651{
652	DB_REP *db_rep;
653	REP *rep;
654	repmgr_netaddr_t *my_addr;
655	DB_REPMGR_V1_HANDSHAKE buffer;
656	DBT cntrl, rec;
657
658	db_rep = env->rep_handle;
659	rep = db_rep->region;
660	my_addr = &db_rep->my_addr;
661
662	buffer.version = 1;
663	buffer.priority = htonl(rep->priority);
664	buffer.port = my_addr->port;
665	cntrl.data = &buffer;
666	cntrl.size = sizeof(buffer);
667
668	rec.data = buf;
669	rec.size = (u_int32_t)len;
670
671	/*
672	 * It would of course be disastrous to block the select() thread, so
673	 * pass the "blockable" argument as FALSE.  Fortunately blocking should
674	 * never be necessary here, because the hand-shake is always the first
675	 * thing we send.  Which is a good thing, because it would be almost as
676	 * disastrous if we allowed ourselves to drop a handshake.
677	 */
678	return (__repmgr_send_one(env,
679	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
680}
681
682/*
683 * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *));
684 *
685 * !!!
686 * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
687 */
688int
689__repmgr_read_from_site(env, conn)
690	ENV *env;
691	REPMGR_CONNECTION *conn;
692{
693	DB_REP *db_rep;
694	REPMGR_SITE *site;
695	SITE_STRING_BUFFER buffer;
696	size_t nr;
697	int ret;
698
699	db_rep = env->rep_handle;
700	/*
701	 * Keep reading pieces as long as we're making some progress, or until
702	 * we complete the current read phase.
703	 */
704	for (;;) {
705		if ((ret = __repmgr_readv(conn->fd,
706		    &conn->iovecs.vectors[conn->iovecs.offset],
707		    conn->iovecs.count - conn->iovecs.offset, &nr)) != 0) {
708			switch (ret) {
709#ifndef DB_WIN32
710			case EINTR:
711				continue;
712#endif
713			case WOULDBLOCK:
714				return (0);
715			default:
716				(void)__repmgr_format_eid_loc(env->rep_handle,
717				    conn->eid, buffer);
718				__db_err(env, ret,
719				    "can't read from %s", buffer);
720				STAT(env->rep_handle->
721				    region->mstat.st_connection_drop++);
722				return (DB_REP_UNAVAIL);
723			}
724		}
725
726		if (nr > 0) {
727			if (IS_VALID_EID(conn->eid)) {
728				site = SITE_FROM_EID(conn->eid);
729				__os_gettime(
730				    env, &site->last_rcvd_timestamp, 1);
731			}
732			if (__repmgr_update_consumed(&conn->iovecs, nr))
733				return (dispatch_phase_completion(env,
734					    conn));
735		} else {
736			(void)__repmgr_format_eid_loc(env->rep_handle,
737			    conn->eid, buffer);
738			__db_errx(env, "EOF on connection from %s", buffer);
739			STAT(env->rep_handle->
740			    region->mstat.st_connection_drop++);
741			return (DB_REP_UNAVAIL);
742		}
743	}
744}
745
746/*
747 * Handles whatever needs to be done upon the completion of a reading phase on a
748 * given connection.
749 */
750static int
751dispatch_phase_completion(env, conn)
752	ENV *env;
753	REPMGR_CONNECTION *conn;
754{
755#define	MEM_ALIGN sizeof(double)
756	DBT *dbt;
757	u_int32_t control_size, rec_size;
758	size_t memsize, control_offset, rec_offset;
759	void *membase;
760	int ret;
761
762	switch (conn->reading_phase) {
763	case SIZES_PHASE:
764		/*
765		 * We've received the header: a message type and the lengths of
766		 * the two pieces of the message.  Set up buffers to read the
767		 * two pieces.  This set-up is a bit different for a
768		 * REPMGR_REP_MESSAGE, because we plan to pass it off to the msg
769		 * threads.
770		 */
771		__repmgr_iovec_init(&conn->iovecs);
772		control_size = ntohl(conn->control_size_buf);
773		rec_size = ntohl(conn->rec_size_buf);
774
775		if (conn->msg_type == REPMGR_REP_MESSAGE) {
776			if (control_size == 0) {
777				__db_errx(
778				    env, "illegal size for rep msg");
779				return (DB_REP_UNAVAIL);
780			}
781			/*
782			 * Allocate a block of memory large enough to hold a
783			 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
784			 * data areas that it points to.  Start by calculating
785			 * the total memory needed, rounding up for the start of
786			 * each DBT, to ensure possible alignment requirements.
787			 */
788			memsize = (size_t)
789			    DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
790			control_offset = memsize;
791			memsize += control_size;
792			if (rec_size > 0) {
793				memsize = (size_t)DB_ALIGN(memsize, MEM_ALIGN);
794				rec_offset = memsize;
795				memsize += rec_size;
796			} else
797				COMPQUIET(rec_offset, 0);
798			if ((ret = __os_malloc(env, memsize, &membase)) != 0)
799				return (ret);
800			conn->input.rep_message = membase;
801
802			conn->input.rep_message->originating_eid = conn->eid;
803			DB_INIT_DBT(conn->input.rep_message->control,
804			    (u_int8_t*)membase + control_offset, control_size);
805			__repmgr_add_dbt(&conn->iovecs,
806			    &conn->input.rep_message->control);
807
808			if (rec_size > 0) {
809				DB_INIT_DBT(conn->input.rep_message->rec,
810				    (rec_size > 0 ?
811					(u_int8_t*)membase + rec_offset : NULL),
812				    rec_size);
813				__repmgr_add_dbt(&conn->iovecs,
814				    &conn->input.rep_message->rec);
815			} else
816				DB_INIT_DBT(conn->input.rep_message->rec,
817				    NULL, 0);
818		} else {
819			conn->input.repmgr_msg.cntrl.size = control_size;
820			conn->input.repmgr_msg.rec.size = rec_size;
821
822			if (control_size > 0) {
823				dbt = &conn->input.repmgr_msg.cntrl;
824				if ((ret = __os_malloc(env, control_size,
825				    &dbt->data)) != 0)
826					return (ret);
827				__repmgr_add_dbt(&conn->iovecs, dbt);
828			}
829
830			if (rec_size > 0) {
831				dbt = &conn->input.repmgr_msg.rec;
832				if ((ret = __os_malloc(env, rec_size,
833				     &dbt->data)) != 0) {
834					if (control_size > 0)
835						__os_free(env,
836						    conn->input.repmgr_msg.
837						    cntrl.data);
838					return (ret);
839				}
840				__repmgr_add_dbt(&conn->iovecs, dbt);
841			}
842		}
843
844		conn->reading_phase = DATA_PHASE;
845
846		if (control_size > 0 || rec_size > 0)
847			break;
848
849		/*
850		 * However, if they're both 0, we're ready to complete
851		 * DATA_PHASE.
852		 */
853		/* FALLTHROUGH */
854
855	case DATA_PHASE:
856		return (dispatch_msgin(env, conn));
857
858	default:
859		DB_ASSERT(env, FALSE);
860	}
861
862	return (0);
863}
864
865/*
866 * Processes an incoming message, depending on our current state.
867 */
868static int
869dispatch_msgin(env, conn)
870	ENV *env;
871	REPMGR_CONNECTION *conn;
872{
873	DBT *dbt;
874	char *hostname;
875	int given, ret;
876
877	given = FALSE;
878
879	switch (conn->state) {
880	case CONN_CONNECTED:
881		/*
882		 * In this state, we know we're working with an outgoing
883		 * connection.  We've sent a version proposal, and now expect
884		 * the response (which could be a dumb old V1 handshake).
885		 */
886		ONLY_HANDSHAKE(env, conn);
887		if ((ret = read_version_response(env, conn)) != 0)
888			return (ret);
889		break;
890
891	case CONN_NEGOTIATE:
892		/*
893		 * Since we're in this state, we know we're working with an
894		 * incoming connection, and this is the first message we've
895		 * received.  So it must be a version negotiation proposal (or a
896		 * legacy V1 handshake).  (We'll verify this of course.)
897		 */
898		ONLY_HANDSHAKE(env, conn);
899		if ((ret = send_version_response(env, conn)) != 0)
900			return (ret);
901		break;
902
903	case CONN_PARAMETERS:
904		/*
905		 * We've previously agreed on a (>1) version, and are now simply
906		 * awaiting the other side's parameters handshake.
907		 */
908		ONLY_HANDSHAKE(env, conn);
909		dbt = &conn->input.repmgr_msg.rec;
910		hostname = dbt->data;
911		hostname[dbt->size-1] = '\0';
912		if ((ret = accept_handshake(env, conn, hostname)) != 0)
913			return (ret);
914		conn->state = CONN_READY;
915		break;
916
917	case CONN_READY:	/* FALLTHROUGH */
918	case CONN_CONGESTED:
919		/*
920		 * We have a complete message, so process it.  Acks and
921		 * handshakes get processed here, in line.  Regular rep messages
922		 * get posted to a queue, to be handled by a thread from the
923		 * message thread pool.
924		 */
925		switch (conn->msg_type) {
926		case REPMGR_ACK:
927			if ((ret = record_ack(env, conn)) != 0)
928				return (ret);
929			break;
930
931		case REPMGR_HEARTBEAT:
932			/*
933			 * The underlying byte-receiving mechanism will already
934			 * have noted the fact that we got some traffic on this
935			 * connection.  And that's all we really have to do, so
936			 * there's nothing more needed at this point.
937			 */
938			break;
939
940		case REPMGR_REP_MESSAGE:
941			if ((ret = __repmgr_queue_put(env,
942			    conn->input.rep_message)) != 0)
943				return (ret);
944			/*
945			 * The queue has taken over responsibility for the
946			 * rep_message buffer, and will free it later.
947			 */
948			given = TRUE;
949			break;
950
951		default:
952			__db_errx(env,
953			    "unexpected msg type rcvd in ready state: %d",
954			    (int)conn->msg_type);
955			return (DB_REP_UNAVAIL);
956		}
957		break;
958
959	case CONN_DEFUNCT:
960		break;
961
962	default:
963		DB_ASSERT(env, FALSE);
964	}
965
966	if (!given) {
967		dbt = &conn->input.repmgr_msg.cntrl;
968		if (dbt->size > 0)
969			__os_free(env, dbt->data);
970		dbt = &conn->input.repmgr_msg.rec;
971		if (dbt->size > 0)
972			__os_free(env, dbt->data);
973	}
974	__repmgr_reset_for_reading(conn);
975	return (0);
976}
977
978/*
979 * Examine and verify the incoming version proposal message, and send an
980 * appropriate response.
981 */
982static int
983send_version_response(env, conn)
984	ENV *env;
985	REPMGR_CONNECTION *conn;
986{
987	DB_REP *db_rep;
988	__repmgr_version_proposal_args versions;
989	__repmgr_version_confirmation_args conf;
990	repmgr_netaddr_t *my_addr;
991	char *hostname;
992	u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
993	DBT vi;
994	int ret;
995
996	db_rep = env->rep_handle;
997	my_addr = &db_rep->my_addr;
998
999	if ((ret = find_version_info(env, conn, &vi)) != 0)
1000		return (ret);
1001	if (vi.size == 0) {
1002		/* No version info, so we must be talking to a v1 site. */
1003		hostname = conn->input.repmgr_msg.rec.data;
1004		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1005			return (ret);
1006		if ((ret = send_v1_handshake(env,
1007		    conn, my_addr->host, strlen(my_addr->host) + 1)) != 0)
1008			return (ret);
1009		conn->state = CONN_READY;
1010	} else {
1011		if ((ret = __repmgr_version_proposal_unmarshal(env,
1012		    &versions, vi.data, vi.size, NULL)) != 0)
1013			return (DB_REP_UNAVAIL);
1014
1015		/* For now version 2 is the only thing we know here. */
1016		DB_ASSERT(env, DB_REPMGR_VERSION == 2);
1017
1018		if (DB_REPMGR_VERSION >= versions.min &&
1019		    DB_REPMGR_VERSION <= versions.max)
1020			conf.version = DB_REPMGR_VERSION;
1021		else if (versions.max >= DB_REPMGR_MIN_VERSION &&
1022		    versions.max <= DB_REPMGR_VERSION)
1023			conf.version = versions.max;
1024		else {
1025			/*
1026			 * User must have wired up a combination of versions
1027			 * exceeding what we said we'd support.
1028			 */
1029			__db_errx(env,
1030			    "No available version between %lu and %lu",
1031			    (u_long)versions.min, (u_long)versions.max);
1032			return (DB_REP_UNAVAIL);
1033		}
1034		conn->version = conf.version;
1035
1036		__repmgr_version_confirmation_marshal(env, &conf, buf);
1037		if ((ret = send_handshake(env, conn, buf, sizeof(buf))) != 0)
1038			return (ret);
1039
1040		conn->state = CONN_PARAMETERS;
1041	}
1042	return (ret);
1043}
1044
1045static int
1046send_handshake(env, conn, opt, optlen)
1047	ENV *env;
1048	REPMGR_CONNECTION *conn;
1049	void *opt;
1050	size_t optlen;
1051{
1052	DB_REP *db_rep;
1053	REP *rep;
1054	DBT cntrl, rec;
1055	__repmgr_handshake_args hs;
1056	repmgr_netaddr_t *my_addr;
1057	size_t hostname_len, rec_len;
1058	void *buf;
1059	u_int8_t *p;
1060	u_int32_t cntrl_len;
1061	int ret;
1062
1063	db_rep = env->rep_handle;
1064	rep = db_rep->region;
1065	my_addr = &db_rep->my_addr;
1066
1067	/*
1068	 * The cntrl part has port and priority.  The rec part has the host
1069	 * name, followed by whatever optional extra data was passed to us.
1070	 */
1071	cntrl_len = __REPMGR_HANDSHAKE_SIZE;
1072	hostname_len = strlen(my_addr->host);
1073	rec_len = hostname_len + 1 +
1074	    (opt == NULL ? 0 : optlen);
1075
1076	if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
1077		return (ret);
1078
1079	cntrl.data = p = buf;
1080	hs.port = my_addr->port;
1081	hs.priority = rep->priority;
1082	__repmgr_handshake_marshal(env, &hs, p);
1083	cntrl.size = cntrl_len;
1084
1085	p = rec.data = &p[cntrl_len];
1086	(void)strcpy((char*)p, my_addr->host);
1087	p += hostname_len + 1;
1088	if (opt != NULL) {
1089		memcpy(p, opt, optlen);
1090		p += optlen;
1091	}
1092	rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);
1093
1094	/* Never block on select thread: pass blockable as FALSE. */
1095	ret = __repmgr_send_one(env,
1096	    conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE);
1097	__os_free(env, buf);
1098	return (ret);
1099}
1100
1101static int
1102read_version_response(env, conn)
1103	ENV *env;
1104	REPMGR_CONNECTION *conn;
1105{
1106	__repmgr_version_confirmation_args conf;
1107	DBT vi;
1108	char *hostname;
1109	int ret;
1110
1111	if ((ret = find_version_info(env, conn, &vi)) != 0)
1112		return (ret);
1113	hostname = conn->input.repmgr_msg.rec.data;
1114	if (vi.size == 0) {
1115		if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1116			return (ret);
1117	} else {
1118		if ((ret = __repmgr_version_confirmation_unmarshal(env,
1119		    &conf, vi.data, vi.size, NULL)) != 0)
1120			return (DB_REP_UNAVAIL);
1121		if (conf.version >= DB_REPMGR_MIN_VERSION &&
1122		    conf.version <= DB_REPMGR_VERSION)
1123			conn->version = conf.version;
1124		else {
1125			/*
1126			 * Remote site "confirmed" a version outside of the
1127			 * range we proposed.  It should never do that.
1128			 */
1129			__db_errx(env,
1130			    "Can't support confirmed version %lu",
1131			    (u_long)conf.version);
1132			return (DB_REP_UNAVAIL);
1133		}
1134
1135		if ((ret = accept_handshake(env, conn, hostname)) != 0)
1136			return (ret);
1137		if ((ret = send_handshake(env, conn, NULL, 0)) != 0)
1138			return (ret);
1139	}
1140	conn->state = CONN_READY;
1141	return (ret);
1142}
1143
1144/*
1145 * Examine the rec part of a handshake message to see if it has any version
1146 * information in it.  This is the magic that lets us allows version-aware sites
1147 * to exchange information, and yet avoids tripping up v1 sites, which don't
1148 * know how to look for it.
1149 */
1150static int
1151find_version_info(env, conn, vi)
1152	ENV *env;
1153	REPMGR_CONNECTION *conn;
1154	DBT *vi;
1155{
1156	DBT *dbt;
1157	char *hostname;
1158	size_t hostname_len;
1159
1160	dbt = &conn->input.repmgr_msg.rec;
1161	if (dbt->size == 0) {
1162		__db_errx(env, "handshake is missing rec part");
1163		return (DB_REP_UNAVAIL);
1164	}
1165	hostname = dbt->data;
1166	hostname[dbt->size-1] = '\0';
1167	hostname_len = strlen(hostname);
1168	if (hostname_len + 1 == dbt->size) {
1169		/*
1170		 * The rec DBT held only the host name.  This is a simple legacy
1171		 * V1 handshake; it contains no version information.
1172		 */
1173		vi->size = 0;
1174	} else {
1175		/*
1176		 * There's more data than just the host name.  The remainder is
1177		 * available to be treated as a normal byte buffer (and read in
1178		 * by one of the unmarshal functions).  Note that the remaining
1179		 * length should not include the padding byte that we have
1180		 * already clobbered.
1181		 */
1182		vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
1183		vi->size = (dbt->size - (hostname_len+1)) - 1;
1184	}
1185	return (0);
1186}
1187
1188static int
1189accept_handshake(env, conn, hostname)
1190	ENV *env;
1191	REPMGR_CONNECTION *conn;
1192	char *hostname;
1193{
1194	__repmgr_handshake_args hs;
1195
1196	/* Extract port and priority from cntrl. */
1197	if (__repmgr_handshake_unmarshal(env, &hs,
1198	    conn->input.repmgr_msg.cntrl.data,
1199	    conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1200		return (DB_REP_UNAVAIL);
1201
1202	return (process_parameters(env, conn, hostname, hs.port, hs.priority));
1203}
1204
1205static int
1206accept_v1_handshake(env, conn, hostname)
1207	ENV *env;
1208	REPMGR_CONNECTION *conn;
1209	char *hostname;
1210{
1211	DB_REPMGR_V1_HANDSHAKE *handshake;
1212	u_int32_t prio;
1213
1214	handshake = conn->input.repmgr_msg.cntrl.data;
1215	if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
1216	    handshake->version != 1) {
1217		__db_errx(env, "malformed V1 handshake");
1218		return (DB_REP_UNAVAIL);
1219	}
1220
1221	conn->version = 1;
1222	prio = ntohl(handshake->priority);
1223	return (process_parameters(env, conn, hostname, handshake->port, prio));
1224}
1225
1226static int
1227process_parameters(env, conn, host, port, priority)
1228	ENV *env;
1229	REPMGR_CONNECTION *conn;
1230	char *host;
1231	u_int port;
1232	u_int32_t priority;
1233{
1234	DB_REP *db_rep;
1235	REPMGR_RETRY *retry;
1236	REPMGR_SITE *site;
1237	repmgr_netaddr_t addr;
1238	int ret, eid;
1239
1240	db_rep = env->rep_handle;
1241
1242	if (F_ISSET(conn, CONN_INCOMING)) {
1243		/*
1244		 * Now that we've been given the host and port, use them to find
1245		 * the site (or create a new one if necessary, etc.).
1246		 */
1247		if (IS_VALID_EID(eid = __repmgr_find_site(env, host, port))) {
1248			site = SITE_FROM_EID(eid);
1249			if (site->state == SITE_IDLE) {
1250				RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1251				    "handshake from idle site %s:%u",
1252				     host, port));
1253				retry = site->ref.retry;
1254				TAILQ_REMOVE(&db_rep->retries, retry, entries);
1255				__os_free(env, retry);
1256			} else {
1257				/*
1258				 * We got an incoming connection for a site we
1259				 * were already connected to; at least we
1260				 * thought we were.
1261				 */
1262				RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1263				    "connection from %s:%u supersedes existing",
1264				     host, port));
1265
1266				/*
1267				 * No need to schedule a retry for later, since
1268				 * we now have a replacement connection.
1269				 */
1270				DISABLE_CONNECTION(site->ref.conn);
1271			}
1272			conn->eid = eid;
1273			site->state = SITE_CONNECTED;
1274			site->ref.conn = conn;
1275		} else {
1276			RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1277			    "handshake introduces unknown site %s:%u",
1278			    host, port));
1279			if ((ret = __repmgr_pack_netaddr(env,
1280			    host, port, NULL, &addr)) != 0)
1281				return (ret);
1282			if ((ret = __repmgr_new_site(env,
1283			    &site, &addr, SITE_CONNECTED)) != 0) {
1284				__repmgr_cleanup_netaddr(env, &addr);
1285				return (ret);
1286			}
1287			conn->eid = EID_FROM_SITE(site);
1288			site->ref.conn = conn;
1289		}
1290	} else {
1291		/*
1292		 * Since we initiated this as an outgoing connection, we
1293		 * obviously already know the host, port and site.  We just need
1294		 * the other site's priority.
1295		 */
1296		DB_ASSERT(env, IS_VALID_EID(conn->eid));
1297		site = SITE_FROM_EID(conn->eid);
1298		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1299		    "handshake from connection to %s:%lu",
1300		    site->net_addr.host, (u_long)site->net_addr.port));
1301	}
1302
1303	site->priority = priority;
1304	F_SET(site, SITE_HAS_PRIO);
1305
1306	/*
1307	 * If we're moping around wishing we knew who the master was, then
1308	 * getting in touch with another site might finally provide sufficient
1309	 * connectivity to find out.  But just do this once, because otherwise
1310	 * we get messages while the subsequent rep_start operations are going
1311	 * on, and rep tosses them in that case.
1312	 */
1313	if (db_rep->master_eid == DB_EID_INVALID && !db_rep->done_one) {
1314		db_rep->done_one = TRUE;
1315		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1316		    "handshake with no known master to wake election thread"));
1317		if ((ret = __repmgr_init_election(env, ELECT_REPSTART)) != 0)
1318			return (ret);
1319	}
1320
1321	return (0);
1322}
1323
1324static int
1325record_ack(env, conn)
1326	ENV *env;
1327	REPMGR_CONNECTION *conn;
1328{
1329	DB_REP *db_rep;
1330	REPMGR_SITE *site;
1331	__repmgr_ack_args *ackp, ack;
1332	SITE_STRING_BUFFER location;
1333	int ret;
1334
1335	db_rep = env->rep_handle;
1336
1337	DB_ASSERT(env, conn->version > 0 &&
1338	    IS_READY_STATE(conn->state) && IS_VALID_EID(conn->eid));
1339	site = SITE_FROM_EID(conn->eid);
1340
1341	/*
1342	 * Extract the LSN.  Save it only if it is an improvement over what the
1343	 * site has already ack'ed.
1344	 */
1345	if (conn->version == 1) {
1346		ackp = conn->input.repmgr_msg.cntrl.data;
1347		if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
1348		    conn->input.repmgr_msg.rec.size != 0) {
1349			__db_errx(env, "bad ack msg size");
1350			return (DB_REP_UNAVAIL);
1351		}
1352	} else {
1353		ackp = &ack;
1354		if ((ret = __repmgr_ack_unmarshal(env, ackp,
1355			 conn->input.repmgr_msg.cntrl.data,
1356			 conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
1357			return (DB_REP_UNAVAIL);
1358	}
1359
1360	/* Ignore stale acks. */
1361	if (ackp->generation < db_rep->generation) {
1362		RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1363		    "ignoring stale ack (%lu<%lu), from %s",
1364		     (u_long)ackp->generation, (u_long)db_rep->generation,
1365		     __repmgr_format_site_loc(site, location)));
1366		return (0);
1367	}
1368	RPRINT(env, DB_VERB_REPMGR_MISC, (env,
1369	    "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
1370	    (u_long)ackp->lsn.offset, (u_long)ackp->generation,
1371	    __repmgr_format_site_loc(site, location)));
1372
1373	if (ackp->generation == db_rep->generation &&
1374	    log_compare(&ackp->lsn, &site->max_ack) == 1) {
1375		memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
1376		if ((ret = __repmgr_wake_waiting_senders(env)) != 0)
1377			return (ret);
1378	}
1379	return (0);
1380}
1381
1382/*
1383 * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *));
1384 */
1385int
1386__repmgr_write_some(env, conn)
1387	ENV *env;
1388	REPMGR_CONNECTION *conn;
1389{
1390	QUEUED_OUTPUT *output;
1391	REPMGR_FLAT *msg;
1392	int bytes, ret;
1393
1394	while (!STAILQ_EMPTY(&conn->outbound_queue)) {
1395		output = STAILQ_FIRST(&conn->outbound_queue);
1396		msg = output->msg;
1397		if ((bytes = send(conn->fd, &msg->data[output->offset],
1398		    (size_t)msg->length - output->offset, 0)) == SOCKET_ERROR) {
1399			if ((ret = net_errno) == WOULDBLOCK)
1400				return (0);
1401			else {
1402				__db_err(env, ret, "writing data");
1403				STAT(env->rep_handle->
1404				    region->mstat.st_connection_drop++);
1405				return (DB_REP_UNAVAIL);
1406			}
1407		}
1408
1409		if ((output->offset += (size_t)bytes) >= msg->length) {
1410			STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
1411			__os_free(env, output);
1412			conn->out_queue_length--;
1413			if (--msg->ref_count <= 0)
1414				__os_free(env, msg);
1415
1416			/*
1417			 * We've achieved enough movement to free up at least
1418			 * one space in the outgoing queue.  Wake any message
1419			 * threads that may be waiting for space.  Leave
1420			 * CONGESTED state so that when the queue reaches the
1421			 * high-water mark again, the filling thread will be
1422			 * allowed to try waiting again.
1423			 */
1424			conn->state = CONN_READY;
1425			if (conn->blockers > 0 &&
1426			    (ret = __repmgr_signal(&conn->drained)) != 0)
1427				return (ret);
1428		}
1429	}
1430
1431#ifdef DB_WIN32
1432	/*
1433	 * With the queue now empty, it's time to relinquish ownership of this
1434	 * connection again, so that the next call to send() can write the
1435	 * message in line, instead of posting it to the queue for us.
1436	 */
1437	if (WSAEventSelect(conn->fd, conn->event_object, FD_READ|FD_CLOSE)
1438	    == SOCKET_ERROR) {
1439		ret = net_errno;
1440		__db_err(env, ret, "can't remove FD_WRITE event bit");
1441		return (ret);
1442	}
1443#endif
1444
1445	return (0);
1446}
1447