1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2005-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9#include "db_config.h"
10
11#define	__INCLUDE_NETWORKING	1
12#include "db_int.h"
13
14/* Convert time-out from microseconds to milliseconds, rounding up. */
15#define	DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
16
17typedef struct __ack_waiter {
18	HANDLE event;
19	const DB_LSN *lsnp;
20	int next_free;
21} ACK_WAITER;
22
23#define	WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL)
24
25/*
26 * Array slots [0:next_avail-1] are initialized, and either in use or on the
27 * free list.  Slots beyond that are virgin territory, whose memory contents
28 * could be garbage.  In particular, note that slots [0:next_avail-1] have a
29 * Win32 Event Object created for them, which have to be freed when cleaning up
30 * this data structure.
31 *
32 * "first_free" points to a list of not-in-use slots threaded through the first
33 * section of the array.
34 */
35struct __ack_waiters_table {
36	struct __ack_waiter *array;
37	int size;
38	int next_avail;
39	int first_free;
40};
41
42/*
43 * Aggregated control info needed for preparing for WSAWaitForMultipleEvents()
44 * call.
45 */
46struct io_info {
47	REPMGR_CONNECTION **connections;
48	WSAEVENT *events;
49	DWORD nevents;
50};
51
52static int allocate_wait_slot __P((ENV *, int *));
53static void free_wait_slot __P((ENV *, int));
54static int handle_completion __P((ENV *, REPMGR_CONNECTION *));
55static int finish_connecting __P((ENV *, REPMGR_CONNECTION *,
56				     LPWSANETWORKEVENTS));
57static int prepare_io __P((ENV *, REPMGR_CONNECTION *, void *));
58
59int
60__repmgr_thread_start(env, runnable)
61	ENV *env;
62	REPMGR_RUNNABLE *runnable;
63{
64	HANDLE thread_id;
65
66	runnable->finished = FALSE;
67
68	thread_id = CreateThread(NULL, 0,
69	    (LPTHREAD_START_ROUTINE)runnable->run, env, 0, NULL);
70	if (thread_id == NULL)
71		return (GetLastError());
72	runnable->thread_id = thread_id;
73	return (0);
74}
75
76int
77__repmgr_thread_join(thread)
78	REPMGR_RUNNABLE *thread;
79{
80	if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0)
81		return (0);
82	return (GetLastError());
83}
84
85int
86__repmgr_set_nonblocking(s)
87	SOCKET s;
88{
89	int ret;
90	u_long arg;
91
92	arg = 1;		/* any non-zero value */
93	if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR)
94		return (WSAGetLastError());
95	return (0);
96}
97
98/*
99 * Wake any send()-ing threads waiting for an acknowledgement.
100 *
101 * !!!
102 * Caller must hold the repmgr->mutex, if this thread synchronization is to work
103 * properly.
104 */
105int
106__repmgr_wake_waiting_senders(env)
107	ENV *env;
108{
109	ACK_WAITER *slot;
110	DB_REP *db_rep;
111	int i, ret;
112
113	ret = 0;
114	db_rep = env->rep_handle;
115	for (i = 0; i < db_rep->waiters->next_avail; i++) {
116		 slot = &db_rep->waiters->array[i];
117		 if (!WAITER_SLOT_IN_USE(slot))
118			 continue;
119		 if (__repmgr_is_permanent(env, slot->lsnp))
120			 if (!SetEvent(slot->event) && ret == 0)
121				 ret = GetLastError();
122	}
123	return (ret);
124}
125
126/*
127 * !!!
128 * Caller must hold mutex.
129 */
130int
131__repmgr_await_ack(env, lsnp)
132	ENV *env;
133	const DB_LSN *lsnp;
134{
135	ACK_WAITER *waiter;
136	DB_REP *db_rep;
137	DWORD ret, timeout;
138	int i;
139
140	db_rep = env->rep_handle;
141
142	if ((ret = allocate_wait_slot(env, &i)) != 0)
143		goto err;
144	waiter = &db_rep->waiters->array[i];
145
146	timeout = db_rep->ack_timeout > 0 ?
147	    DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
148	waiter->lsnp = lsnp;
149	if ((ret = SignalObjectAndWait(*db_rep->mutex, waiter->event, timeout,
150	    FALSE)) == WAIT_FAILED) {
151		ret = GetLastError();
152	} else if (ret == WAIT_TIMEOUT)
153		ret = DB_REP_UNAVAIL;
154	else
155		DB_ASSERT(env, ret == WAIT_OBJECT_0);
156
157	LOCK_MUTEX(db_rep->mutex);
158	free_wait_slot(env, i);
159
160err:
161	return (ret);
162}
163
164/*
165 * !!!
166 * Caller must hold the mutex.
167 */
168static int
169allocate_wait_slot(env, resultp)
170	ENV *env;
171	int *resultp;
172{
173	ACK_WAITER *w;
174	ACK_WAITERS_TABLE *table;
175	DB_REP *db_rep;
176	int i, ret;
177
178	db_rep = env->rep_handle;
179	table = db_rep->waiters;
180	if (table->first_free == -1) {
181		if (table->next_avail >= table->size) {
182			/*
183			 * Grow the array.
184			 */
185			table->size *= 2;
186			w = table->array;
187			if ((ret = __os_realloc(env, table->size * sizeof(*w),
188			     &w)) != 0)
189				return (ret);
190			table->array = w;
191		}
192		/*
193		 * Here if, one way or another, we're good to go for using the
194		 * next slot (for the first time).
195		 */
196		i = table->next_avail++;
197		w = &table->array[i];
198		if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) ==
199		    NULL) {
200			/*
201			 * Maintain the sanctity of our rule that
202			 * [0:next_avail-1] contain valid Event Objects.
203			 */
204			--table->next_avail;
205			return (GetLastError());
206		}
207	} else {
208		i = table->first_free;
209		w = &table->array[i];
210		table->first_free = w->next_free;
211	}
212	*resultp = i;
213	return (0);
214}
215
216static void
217free_wait_slot(env, slot_index)
218	ENV *env;
219	int slot_index;
220{
221	DB_REP *db_rep;
222	ACK_WAITER *slot;
223
224	db_rep = env->rep_handle;
225	slot = &db_rep->waiters->array[slot_index];
226
227	slot->lsnp = NULL;	/* show it's not in use */
228	slot->next_free = db_rep->waiters->first_free;
229	db_rep->waiters->first_free = slot_index;
230}
231
232/* (See requirements described in repmgr_posix.c.) */
233int
234__repmgr_await_drain(env, conn, timeout)
235	ENV *env;
236	REPMGR_CONNECTION *conn;
237	db_timeout_t timeout;
238{
239	DB_REP *db_rep;
240	db_timespec deadline, delta, now;
241	db_timeout_t t;
242	DWORD duration, ret;
243	int round_up;
244
245	db_rep = env->rep_handle;
246
247	__os_gettime(env, &deadline, 1);
248	TIMESPEC_ADD_DB_TIMEOUT(&deadline, timeout);
249
250	while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
251		if (!ResetEvent(conn->drained))
252			return (GetLastError());
253
254		/* How long until the deadline? */
255		__os_gettime(env, &now, 1);
256		if (timespeccmp(&now, &deadline, >=)) {
257			conn->state = CONN_CONGESTED;
258			return (0);
259		}
260		delta = deadline;
261		timespecsub(&delta, &now);
262		round_up = TRUE;
263		DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
264		duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
265
266		ret = SignalObjectAndWait(*db_rep->mutex,
267		    conn->drained, duration, FALSE);
268		LOCK_MUTEX(db_rep->mutex);
269		if (ret == WAIT_FAILED)
270			return (GetLastError());
271		else if (ret == WAIT_TIMEOUT) {
272			conn->state = CONN_CONGESTED;
273			return (0);
274		} else
275			DB_ASSERT(env, ret == WAIT_OBJECT_0);
276
277		if (db_rep->finished)
278			return (0);
279		if (conn->state == CONN_DEFUNCT)
280			return (DB_REP_UNAVAIL);
281	}
282	return (0);
283}
284
285/*
286 * Creates a manual reset event, which is usually our best choice when we may
287 * have multiple threads waiting on a single event.
288 */
289int
290__repmgr_alloc_cond(c)
291	cond_var_t *c;
292{
293	HANDLE event;
294
295	if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
296		return (GetLastError());
297	*c = event;
298	return (0);
299}
300
301int
302__repmgr_free_cond(c)
303	cond_var_t *c;
304{
305	if (CloseHandle(*c))
306		return (0);
307	return (GetLastError());
308}
309
310void
311__repmgr_env_create_pf(db_rep)
312	DB_REP *db_rep;
313{
314	db_rep->waiters = NULL;
315}
316
317int
318__repmgr_create_mutex_pf(mutex)
319	mgr_mutex_t *mutex;
320{
321	if ((*mutex = CreateMutex(NULL, FALSE, NULL)) == NULL)
322		return (GetLastError());
323	return (0);
324}
325
326int
327__repmgr_destroy_mutex_pf(mutex)
328	mgr_mutex_t  *mutex;
329{
330	return (CloseHandle(*mutex) ? 0 : GetLastError());
331}
332
333int
334__repmgr_init(env)
335     ENV *env;
336{
337#define	INITIAL_ALLOCATION 5		/* arbitrary size */
338	DB_REP *db_rep;
339	ACK_WAITERS_TABLE *table;
340	WSADATA wsaData;
341	int ret;
342
343	db_rep = env->rep_handle;
344	table = NULL;
345
346	if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
347		__db_err(env, ret, "unable to initialize Windows networking");
348		return (ret);
349	}
350
351	if ((db_rep->signaler = CreateEvent(NULL, /* security attr */
352	    FALSE,	/* (not) of the manual reset variety  */
353	    FALSE,		/* (not) initially signaled */
354	    NULL)) == NULL)		/* name */
355		goto geterr;
356
357	if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL))
358	    == NULL)
359		goto geterr;
360
361	if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL))
362	    == NULL)
363		goto geterr;
364
365	if ((ret = __os_calloc(env, 1, sizeof(ACK_WAITERS_TABLE), &table))
366	    != 0)
367		goto err;
368
369	if ((ret = __os_calloc(env, INITIAL_ALLOCATION, sizeof(ACK_WAITER),
370	    &table->array)) != 0)
371		goto err;
372
373	table->size = INITIAL_ALLOCATION;
374	table->first_free = -1;
375	table->next_avail = 0;
376
377	/* There's a restaurant joke in there somewhere. */
378	db_rep->waiters = table;
379	return (0);
380
381geterr:
382	ret = GetLastError();
383err:
384	if (db_rep->check_election != NULL)
385		CloseHandle(db_rep->check_election);
386	if (db_rep->queue_nonempty != NULL)
387		CloseHandle(db_rep->queue_nonempty);
388	if (db_rep->signaler != NULL)
389		CloseHandle(db_rep->signaler);
390	if (table != NULL)
391		__os_free(env, table);
392	db_rep->signaler =
393	    db_rep->queue_nonempty = db_rep->check_election = NULL;
394	db_rep->waiters = NULL;
395	(void)WSACleanup();
396	return (ret);
397}
398
399int
400__repmgr_deinit(env)
401     ENV *env;
402{
403	DB_REP *db_rep;
404	int i, ret;
405
406	db_rep = env->rep_handle;
407	if (!(REPMGR_INITED(db_rep)))
408		return (0);
409
410	ret = 0;
411	if (WSACleanup() == SOCKET_ERROR)
412		ret = WSAGetLastError();
413
414	for (i = 0; i < db_rep->waiters->next_avail; i++) {
415		if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0)
416			ret = GetLastError();
417	}
418	__os_free(env, db_rep->waiters->array);
419	__os_free(env, db_rep->waiters);
420
421	if (!CloseHandle(db_rep->check_election) && ret == 0)
422		ret = GetLastError();
423
424	if (!CloseHandle(db_rep->queue_nonempty) && ret == 0)
425		ret = GetLastError();
426
427	if (!CloseHandle(db_rep->signaler) && ret == 0)
428		ret = GetLastError();
429
430	db_rep->waiters = NULL;
431	return (ret);
432}
433
434int
435__repmgr_lock_mutex(mutex)
436	mgr_mutex_t  *mutex;
437{
438	if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0)
439		return (0);
440	return (GetLastError());
441}
442
443int
444__repmgr_unlock_mutex(mutex)
445	mgr_mutex_t  *mutex;
446{
447	if (ReleaseMutex(*mutex))
448		return (0);
449	return (GetLastError());
450}
451
452int
453__repmgr_signal(v)
454	cond_var_t *v;
455{
456	return (SetEvent(*v) ? 0 : GetLastError());
457}
458
459int
460__repmgr_wake_main_thread(env)
461	ENV *env;
462{
463	if (!SetEvent(env->rep_handle->signaler))
464		return (GetLastError());
465	return (0);
466}
467
468int
469__repmgr_writev(fd, iovec, buf_count, byte_count_p)
470	socket_t fd;
471	db_iovec_t *iovec;
472	int buf_count;
473	size_t *byte_count_p;
474{
475	DWORD bytes;
476
477	if (WSASend(fd, iovec,
478	    (DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR)
479		return (net_errno);
480
481	*byte_count_p = (size_t)bytes;
482	return (0);
483}
484
485int
486__repmgr_readv(fd, iovec, buf_count, xfr_count_p)
487	socket_t fd;
488	db_iovec_t *iovec;
489	int buf_count;
490	size_t *xfr_count_p;
491{
492	DWORD bytes, flags;
493
494	flags = 0;
495	if (WSARecv(fd, iovec,
496	    (DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR)
497		return (net_errno);
498
499	*xfr_count_p = (size_t)bytes;
500	return (0);
501}
502
503int
504__repmgr_select_loop(env)
505	ENV *env;
506{
507	DB_REP *db_rep;
508	DWORD ret;
509	DWORD select_timeout;
510	REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS];
511	WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
512	db_timespec timeout;
513	WSAEVENT listen_event;
514	WSANETWORKEVENTS net_events;
515	struct io_info io_info;
516	int i;
517
518	db_rep = env->rep_handle;
519	io_info.connections = connections;
520	io_info.events = events;
521
522	if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) {
523		__db_err(
524		    env, net_errno, "can't create event for listen socket");
525		return (net_errno);
526	}
527	if (!IS_SUBORDINATE(db_rep) &&
528	    WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) ==
529	    SOCKET_ERROR) {
530		ret = net_errno;
531		__db_err(env, ret, "can't enable event for listener");
532		goto out;
533	}
534
535	LOCK_MUTEX(db_rep->mutex);
536	if ((ret = __repmgr_first_try_connections(env)) != 0)
537		goto unlock;
538	for (;;) {
539		/* Start with the two events that we always wait for. */
540#define	SIGNALER_INDEX	0
541#define	LISTENER_INDEX	1
542		events[SIGNALER_INDEX] = db_rep->signaler;
543		if (IS_SUBORDINATE(db_rep))
544			io_info.nevents = 1;
545		else {
546			events[LISTENER_INDEX] = listen_event;
547			io_info.nevents = 2;
548		}
549
550		if ((ret = __repmgr_each_connection(env,
551		    prepare_io, &io_info, TRUE)) != 0)
552			goto unlock;
553
554		if (__repmgr_compute_timeout(env, &timeout))
555			select_timeout =
556			    (DWORD)(timeout.tv_sec * MS_PER_SEC +
557			    timeout.tv_nsec / NS_PER_MS);
558		else {
559			/* No time-based events to wake us up. */
560			select_timeout = WSA_INFINITE;
561		}
562
563		UNLOCK_MUTEX(db_rep->mutex);
564		ret = WSAWaitForMultipleEvents(
565		    io_info.nevents, events, FALSE, select_timeout, FALSE);
566		if (db_rep->finished) {
567			ret = 0;
568			goto out;
569		}
570		LOCK_MUTEX(db_rep->mutex);
571
572		/*
573		 * !!!
574		 * Note that `ret' remains set as the return code from
575		 * WSAWaitForMultipleEvents, above.
576		 */
577		if (ret >= WSA_WAIT_EVENT_0 &&
578		    ret < WSA_WAIT_EVENT_0 + io_info.nevents) {
579			if ((i = ret - WSA_WAIT_EVENT_0) == SIGNALER_INDEX) {
580				/* Another thread woke us. */
581			} else if (!IS_SUBORDINATE(db_rep) &&
582			    i == LISTENER_INDEX) {
583				if ((ret = WSAEnumNetworkEvents(
584				    db_rep->listen_fd, listen_event,
585				    &net_events)) == SOCKET_ERROR) {
586					ret = net_errno;
587					goto unlock;
588				}
589				DB_ASSERT(env,
590				    net_events.lNetworkEvents & FD_ACCEPT);
591				if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT])
592				    != 0)
593					goto unlock;
594				if ((ret = __repmgr_accept(env)) != 0)
595					goto unlock;
596			} else {
597				if (connections[i]->state != CONN_DEFUNCT &&
598				    (ret = handle_completion(env,
599				    connections[i])) != 0)
600					goto unlock;
601			}
602		} else if (ret == WSA_WAIT_TIMEOUT) {
603			if ((ret = __repmgr_check_timeouts(env)) != 0)
604				goto unlock;
605		} else if (ret == WSA_WAIT_FAILED) {
606			ret = net_errno;
607			goto unlock;
608		}
609	}
610
611unlock:
612	UNLOCK_MUTEX(db_rep->mutex);
613out:
614	if (!CloseHandle(listen_event) && ret == 0)
615		ret = GetLastError();
616	return (ret);
617}
618
619static int
620prepare_io(env, conn, info_)
621	ENV *env;
622	REPMGR_CONNECTION *conn;
623	void *info_;
624{
625	struct io_info *info;
626
627	if (conn->state == CONN_DEFUNCT)
628		return (__repmgr_cleanup_connection(env, conn));
629
630	/*
631	 * Note that even if we're suffering flow control, we
632	 * nevertheless still read if we haven't even yet gotten
633	 * a handshake.  Why?  (1) Handshakes are important; and
634	 * (2) they don't hurt anything flow-control-wise.
635	 */
636	info = info_;
637
638	/*
639	 * If we ever implemented flow control, we would have some conditions to
640	 * examine here.  But as it is, we always are willing to accept I/O on
641	 * every connection.
642	 *
643	 * We can only handle as many connections as the number of events the
644	 * WSAWaitForMultipleEvents function allows (minus 2, for our overhead:
645	 * the listener and the signaler).
646	 */
647	DB_ASSERT(env, info->nevents < WSA_MAXIMUM_WAIT_EVENTS);
648	info->events[info->nevents] = conn->event_object;
649	info->connections[info->nevents++] = conn;
650
651	return (0);
652}
653
654static int
655handle_completion(env, conn)
656	ENV *env;
657	REPMGR_CONNECTION *conn;
658{
659	int ret;
660	WSANETWORKEVENTS events;
661
662	if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events))
663	    == SOCKET_ERROR) {
664		__db_err(env, net_errno, "EnumNetworkEvents");
665		STAT(env->rep_handle->region->mstat.st_connection_drop++);
666		ret = DB_REP_UNAVAIL;
667		goto err;
668	}
669
670	if (conn->state == CONN_CONNECTING) {
671		if ((ret = finish_connecting(env, conn, &events)) != 0)
672			goto err;
673	} else {		/* Check both writing and reading. */
674		if (events.lNetworkEvents & FD_CLOSE) {
675			__db_err(env,
676			    events.iErrorCode[FD_CLOSE_BIT],
677			    "connection closed");
678			STAT(env->rep_handle->
679			    region->mstat.st_connection_drop++);
680			ret = DB_REP_UNAVAIL;
681			goto err;
682		}
683
684		if (events.lNetworkEvents & FD_WRITE) {
685			if (events.iErrorCode[FD_WRITE_BIT] != 0) {
686				__db_err(env,
687				    events.iErrorCode[FD_WRITE_BIT],
688				    "error writing");
689				STAT(env->rep_handle->
690				    region->mstat.st_connection_drop++);
691				ret = DB_REP_UNAVAIL;
692				goto err;
693			} else if ((ret =
694			    __repmgr_write_some(env, conn)) != 0)
695				goto err;
696		}
697
698		if (events.lNetworkEvents & FD_READ) {
699			if (events.iErrorCode[FD_READ_BIT] != 0) {
700				__db_err(env,
701				    events.iErrorCode[FD_READ_BIT],
702				    "error reading");
703				STAT(env->rep_handle->
704				    region->mstat.st_connection_drop++);
705				ret = DB_REP_UNAVAIL;
706				goto err;
707			} else if ((ret =
708			    __repmgr_read_from_site(env, conn)) != 0)
709				goto err;
710		}
711	}
712
713err:
714	if (ret == DB_REP_UNAVAIL)
715		ret = __repmgr_bust_connection(env, conn);
716	return (ret);
717}
718
719static int
720finish_connecting(env, conn, events)
721	ENV *env;
722	REPMGR_CONNECTION *conn;
723	LPWSANETWORKEVENTS events;
724{
725	DB_REP *db_rep;
726	REPMGR_SITE *site;
727	u_int eid;
728/*	char reason[100]; */
729	int ret/*, t_ret*/;
730/*	DWORD_PTR values[1]; */
731
732	if (!(events->lNetworkEvents & FD_CONNECT))
733		return (0);
734
735	db_rep = env->rep_handle;
736
737	DB_ASSERT(env, IS_VALID_EID(conn->eid));
738	eid = (u_int)conn->eid;
739	site = SITE_FROM_EID(eid);
740
741	if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) {
742/*		t_ret = FormatMessage( */
743/*		    FORMAT_MESSAGE_IGNORE_INSERTS | */
744/*		    FORMAT_MESSAGE_FROM_SYSTEM | */
745/*		    FORMAT_MESSAGE_ARGUMENT_ARRAY, */
746/*		    NULL, ret, 0, (LPTSTR)reason, sizeof(reason), values); */
747/*		__db_err(env/\*, ret*\/, "connecting: %s", */
748/*		    reason); */
749/*		LocalFree(reason); */
750		__db_err(env, ret, "connecting");
751		goto err;
752	}
753
754	conn->state = CONN_CONNECTED;
755	__os_gettime(env, &site->last_rcvd_timestamp, 1);
756
757	if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) ==
758	    SOCKET_ERROR) {
759		ret = net_errno;
760		__db_err(env, ret, "setting event bits for reading");
761		return (ret);
762	}
763
764	return (__repmgr_propose_version(env, conn));
765
766err:
767
768	if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
769		STAT(db_rep->region->mstat.st_connect_fail++);
770		return (DB_REP_UNAVAIL);
771	}
772
773	/*
774	 * Since we're immediately trying the next address in the list, simply
775	 * disable the failed connection, without the usual recovery.
776	 */
777	__repmgr_disable_connection(env, conn);
778
779	ret = __repmgr_connect_site(env, eid);
780	DB_ASSERT(env, ret != DB_REP_UNAVAIL);
781	return (ret);
782}
783