primary.c revision 223655
1149871Sscottl/*-
2149871Sscottl * Copyright (c) 2009 The FreeBSD Foundation
3136849Sscottl * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net>
4136849Sscottl * All rights reserved.
5136849Sscottl *
6136849Sscottl * This software was developed by Pawel Jakub Dawidek under sponsorship from
7136849Sscottl * the FreeBSD Foundation.
8136849Sscottl *
9136849Sscottl * Redistribution and use in source and binary forms, with or without
10136849Sscottl * modification, are permitted provided that the following conditions
11136849Sscottl * are met:
12136849Sscottl * 1. Redistributions of source code must retain the above copyright
13136849Sscottl *    notice, this list of conditions and the following disclaimer.
14136849Sscottl * 2. Redistributions in binary form must reproduce the above copyright
15136849Sscottl *    notice, this list of conditions and the following disclaimer in the
16136849Sscottl *    documentation and/or other materials provided with the distribution.
17136849Sscottl *
18136849Sscottl * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19136849Sscottl * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20136849Sscottl * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21136849Sscottl * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22136849Sscottl * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23136849Sscottl * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24136849Sscottl * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25136849Sscottl * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26136849Sscottl * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27136849Sscottl * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28136849Sscottl * SUCH DAMAGE.
29136849Sscottl */
30136849Sscottl
31136849Sscottl#include <sys/cdefs.h>
32136849Sscottl__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 223655 2011-06-28 21:01:32Z trociny $");
33136849Sscottl
34136849Sscottl#include <sys/types.h>
35136849Sscottl#include <sys/time.h>
36136849Sscottl#include <sys/bio.h>
37136849Sscottl#include <sys/disk.h>
38136849Sscottl#include <sys/refcount.h>
39136849Sscottl#include <sys/stat.h>
40136849Sscottl
41136849Sscottl#include <geom/gate/g_gate.h>
42136849Sscottl
43136849Sscottl#include <err.h>
44136849Sscottl#include <errno.h>
45136849Sscottl#include <fcntl.h>
46136849Sscottl#include <libgeom.h>
47136849Sscottl#include <pthread.h>
48136849Sscottl#include <signal.h>
49136849Sscottl#include <stdint.h>
50136849Sscottl#include <stdio.h>
51136849Sscottl#include <string.h>
52136849Sscottl#include <sysexits.h>
53136849Sscottl#include <unistd.h>
54136849Sscottl
55136849Sscottl#include <activemap.h>
56136849Sscottl#include <nv.h>
57136849Sscottl#include <rangelock.h>
58136849Sscottl
59136849Sscottl#include "control.h"
60136849Sscottl#include "event.h"
61136849Sscottl#include "hast.h"
62136849Sscottl#include "hast_proto.h"
63136849Sscottl#include "hastd.h"
64136849Sscottl#include "hooks.h"
65136849Sscottl#include "metadata.h"
66136849Sscottl#include "proto.h"
67136849Sscottl#include "pjdlog.h"
68136849Sscottl#include "subr.h"
69136849Sscottl#include "synch.h"
70136849Sscottl
71136849Sscottl/* The is only one remote component for now. */
72136849Sscottl#define	ISREMOTE(no)	((no) == 1)
73136849Sscottl
74136849Sscottlstruct hio {
75136849Sscottl	/*
76136849Sscottl	 * Number of components we are still waiting for.
77136849Sscottl	 * When this field goes to 0, we can send the request back to the
78136849Sscottl	 * kernel. Each component has to decrease this counter by one
79136849Sscottl	 * even on failure.
80136849Sscottl	 */
81136849Sscottl	unsigned int		 hio_countdown;
82136849Sscottl	/*
83136849Sscottl	 * Each component has a place to store its own error.
84136849Sscottl	 * Once the request is handled by all components we can decide if the
85136849Sscottl	 * request overall is successful or not.
86136849Sscottl	 */
87136849Sscottl	int			*hio_errors;
88136849Sscottl	/*
89136849Sscottl	 * Structure used to communicate with GEOM Gate class.
90136849Sscottl	 */
91136849Sscottl	struct g_gate_ctl_io	 hio_ggio;
92136849Sscottl	TAILQ_ENTRY(hio)	*hio_next;
93136849Sscottl};
94136849Sscottl#define	hio_free_next	hio_next[0]
95136849Sscottl#define	hio_done_next	hio_next[0]
96136849Sscottl
97136849Sscottl/*
98136849Sscottl * Free list holds unused structures. When free list is empty, we have to wait
99136849Sscottl * until some in-progress requests are freed.
100136849Sscottl */
101136849Sscottlstatic TAILQ_HEAD(, hio) hio_free_list;
102136849Sscottlstatic pthread_mutex_t hio_free_list_lock;
103136849Sscottlstatic pthread_cond_t hio_free_list_cond;
104136849Sscottl/*
105136849Sscottl * There is one send list for every component. One requests is placed on all
106136849Sscottl * send lists - each component gets the same request, but each component is
107136849Sscottl * responsible for managing his own send list.
108136849Sscottl */
109136849Sscottlstatic TAILQ_HEAD(, hio) *hio_send_list;
110136849Sscottlstatic pthread_mutex_t *hio_send_list_lock;
111136849Sscottlstatic pthread_cond_t *hio_send_list_cond;
112136849Sscottl/*
113136849Sscottl * There is one recv list for every component, although local components don't
114136849Sscottl * use recv lists as local requests are done synchronously.
115136849Sscottl */
116136849Sscottlstatic TAILQ_HEAD(, hio) *hio_recv_list;
117136849Sscottlstatic pthread_mutex_t *hio_recv_list_lock;
118136849Sscottlstatic pthread_cond_t *hio_recv_list_cond;
119136849Sscottl/*
120136849Sscottl * Request is placed on done list by the slowest component (the one that
121136849Sscottl * decreased hio_countdown from 1 to 0).
122136849Sscottl */
123136849Sscottlstatic TAILQ_HEAD(, hio) hio_done_list;
124136849Sscottlstatic pthread_mutex_t hio_done_list_lock;
125136849Sscottlstatic pthread_cond_t hio_done_list_cond;
126136849Sscottl/*
127136849Sscottl * Structure below are for interaction with sync thread.
128136849Sscottl */
129136849Sscottlstatic bool sync_inprogress;
130136849Sscottlstatic pthread_mutex_t sync_lock;
131136849Sscottlstatic pthread_cond_t sync_cond;
132136849Sscottl/*
133136849Sscottl * The lock below allows to synchornize access to remote connections.
134136849Sscottl */
135136849Sscottlstatic pthread_rwlock_t *hio_remote_lock;
136136849Sscottl
137136849Sscottl/*
138136849Sscottl * Lock to synchronize metadata updates. Also synchronize access to
139136849Sscottl * hr_primary_localcnt and hr_primary_remotecnt fields.
140136849Sscottl */
141136849Sscottlstatic pthread_mutex_t metadata_lock;
142136849Sscottl
143136849Sscottl/*
144136849Sscottl * Maximum number of outstanding I/O requests.
145136849Sscottl */
146136849Sscottl#define	HAST_HIO_MAX	256
147136849Sscottl/*
148136849Sscottl * Number of components. At this point there are only two components: local
149136849Sscottl * and remote, but in the future it might be possible to use multiple local
150136849Sscottl * and remote components.
151136849Sscottl */
152136849Sscottl#define	HAST_NCOMPONENTS	2
153136849Sscottl
154136849Sscottl#define	ISCONNECTED(res, no)	\
155136849Sscottl	((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
156136849Sscottl
157136849Sscottl#define	QUEUE_INSERT1(hio, name, ncomp)	do {				\
158136849Sscottl	bool _wakeup;							\
159136849Sscottl									\
160136849Sscottl	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
161136849Sscottl	_wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]);		\
162136849Sscottl	TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio),		\
163136849Sscottl	    hio_next[(ncomp)]);						\
164136849Sscottl	mtx_unlock(&hio_##name##_list_lock[ncomp]);			\
165190809Sdelphij	if (_wakeup)							\
166136849Sscottl		cv_signal(&hio_##name##_list_cond[(ncomp)]);		\
167136849Sscottl} while (0)
168136849Sscottl#define	QUEUE_INSERT2(hio, name)	do {				\
169136849Sscottl	bool _wakeup;							\
170136849Sscottl									\
171136849Sscottl	mtx_lock(&hio_##name##_list_lock);				\
172136849Sscottl	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
173136849Sscottl	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
174136849Sscottl	mtx_unlock(&hio_##name##_list_lock);				\
175136849Sscottl	if (_wakeup)							\
176136849Sscottl		cv_signal(&hio_##name##_list_cond);			\
177136849Sscottl} while (0)
178136849Sscottl#define	QUEUE_TAKE1(hio, name, ncomp, timeout)	do {			\
179136849Sscottl	bool _last;							\
180136849Sscottl									\
181136849Sscottl	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
182136849Sscottl	_last = false;							\
183136849Sscottl	while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
184136849Sscottl		cv_timedwait(&hio_##name##_list_cond[(ncomp)],		\
185136849Sscottl		    &hio_##name##_list_lock[(ncomp)], (timeout));	\
186136849Sscottl		if ((timeout) != 0)					\
187136849Sscottl			_last = true;					\
188136849Sscottl	}								\
189136849Sscottl	if (hio != NULL) {						\
190136849Sscottl		TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio),	\
191136849Sscottl		    hio_next[(ncomp)]);					\
192136849Sscottl	}								\
193136849Sscottl	mtx_unlock(&hio_##name##_list_lock[(ncomp)]);			\
194136849Sscottl} while (0)
195136849Sscottl#define	QUEUE_TAKE2(hio, name)	do {					\
196136849Sscottl	mtx_lock(&hio_##name##_list_lock);				\
197136849Sscottl	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
198136849Sscottl		cv_wait(&hio_##name##_list_cond,			\
199136849Sscottl		    &hio_##name##_list_lock);				\
200136849Sscottl	}								\
201136849Sscottl	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next);	\
202136849Sscottl	mtx_unlock(&hio_##name##_list_lock);				\
203136849Sscottl} while (0)
204136849Sscottl
205136849Sscottl#define	SYNCREQ(hio)		do {					\
206136849Sscottl	(hio)->hio_ggio.gctl_unit = -1;					\
207136849Sscottl	(hio)->hio_ggio.gctl_seq = 1;					\
208136849Sscottl} while (0)
209136849Sscottl#define	ISSYNCREQ(hio)		((hio)->hio_ggio.gctl_unit == -1)
210136849Sscottl#define	SYNCREQDONE(hio)	do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
211136849Sscottl#define	ISSYNCREQDONE(hio)	((hio)->hio_ggio.gctl_unit == -2)
212136849Sscottl
213136849Sscottlstatic struct hast_resource *gres;
214136849Sscottl
215136849Sscottlstatic pthread_mutex_t range_lock;
216136849Sscottlstatic struct rangelocks *range_regular;
217136849Sscottlstatic bool range_regular_wait;
218136849Sscottlstatic pthread_cond_t range_regular_cond;
219136849Sscottlstatic struct rangelocks *range_sync;
220136849Sscottlstatic bool range_sync_wait;
221136849Sscottlstatic pthread_cond_t range_sync_cond;
222136849Sscottlstatic bool fullystarted;
223136849Sscottl
224190809Sdelphijstatic void *ggate_recv_thread(void *arg);
225190809Sdelphijstatic void *local_send_thread(void *arg);
226190809Sdelphijstatic void *remote_send_thread(void *arg);
227190809Sdelphijstatic void *remote_recv_thread(void *arg);
228190809Sdelphijstatic void *ggate_send_thread(void *arg);
229136849Sscottlstatic void *sync_thread(void *arg);
230136849Sscottlstatic void *guard_thread(void *arg);
231136849Sscottl
232136849Sscottlstatic void
233136849Sscottlcleanup(struct hast_resource *res)
234136849Sscottl{
235136849Sscottl	int rerrno;
236136849Sscottl
237136849Sscottl	/* Remember errno. */
238136849Sscottl	rerrno = errno;
239136849Sscottl
240136849Sscottl	/* Destroy ggate provider if we created one. */
241136849Sscottl	if (res->hr_ggateunit >= 0) {
242136849Sscottl		struct g_gate_ctl_destroy ggiod;
243136849Sscottl
244136849Sscottl		bzero(&ggiod, sizeof(ggiod));
245136849Sscottl		ggiod.gctl_version = G_GATE_VERSION;
246136849Sscottl		ggiod.gctl_unit = res->hr_ggateunit;
247136849Sscottl		ggiod.gctl_force = 1;
248136849Sscottl		if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) {
249136849Sscottl			pjdlog_errno(LOG_WARNING,
250136849Sscottl			    "Unable to destroy hast/%s device",
251136849Sscottl			    res->hr_provname);
252136849Sscottl		}
253136849Sscottl		res->hr_ggateunit = -1;
254136849Sscottl	}
255136849Sscottl
256136849Sscottl	/* Restore errno. */
257136849Sscottl	errno = rerrno;
258136849Sscottl}
259136849Sscottl
260136849Sscottlstatic __dead2 void
261136849Sscottlprimary_exit(int exitcode, const char *fmt, ...)
262136849Sscottl{
263136849Sscottl	va_list ap;
264136849Sscottl
265136849Sscottl	PJDLOG_ASSERT(exitcode != EX_OK);
266136849Sscottl	va_start(ap, fmt);
267136849Sscottl	pjdlogv_errno(LOG_ERR, fmt, ap);
268136849Sscottl	va_end(ap);
269136849Sscottl	cleanup(gres);
270136849Sscottl	exit(exitcode);
271136849Sscottl}
272136849Sscottl
273136849Sscottlstatic __dead2 void
274136849Sscottlprimary_exitx(int exitcode, const char *fmt, ...)
275136849Sscottl{
276136849Sscottl	va_list ap;
277136849Sscottl
278136849Sscottl	va_start(ap, fmt);
279136849Sscottl	pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
280136849Sscottl	va_end(ap);
281136849Sscottl	cleanup(gres);
282136849Sscottl	exit(exitcode);
283136849Sscottl}
284136849Sscottl
285136849Sscottlstatic int
286136849Sscottlhast_activemap_flush(struct hast_resource *res)
287136849Sscottl{
288136849Sscottl	const unsigned char *buf;
289136849Sscottl	size_t size;
290136849Sscottl
291136849Sscottl	buf = activemap_bitmap(res->hr_amp, &size);
292136849Sscottl	PJDLOG_ASSERT(buf != NULL);
293136849Sscottl	PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0);
294136849Sscottl	if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
295136849Sscottl	    (ssize_t)size) {
296136849Sscottl		KEEP_ERRNO(pjdlog_errno(LOG_ERR,
297136849Sscottl		    "Unable to flush activemap to disk"));
298136849Sscottl		return (-1);
299136849Sscottl	}
300136849Sscottl	return (0);
301136849Sscottl}
302136849Sscottl
303136849Sscottlstatic bool
304136849Sscottlreal_remote(const struct hast_resource *res)
305136849Sscottl{
306136849Sscottl
307136849Sscottl	return (strcmp(res->hr_remoteaddr, "none") != 0);
308136849Sscottl}
309136849Sscottl
310136849Sscottlstatic void
311136849Sscottlinit_environment(struct hast_resource *res __unused)
312136849Sscottl{
313136849Sscottl	struct hio *hio;
314136849Sscottl	unsigned int ii, ncomps;
315136849Sscottl
316136849Sscottl	/*
317136849Sscottl	 * In the future it might be per-resource value.
318136849Sscottl	 */
319136849Sscottl	ncomps = HAST_NCOMPONENTS;
320136849Sscottl
321136849Sscottl	/*
322136849Sscottl	 * Allocate memory needed by lists.
323136849Sscottl	 */
324136849Sscottl	hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
325136849Sscottl	if (hio_send_list == NULL) {
326136849Sscottl		primary_exitx(EX_TEMPFAIL,
327136849Sscottl		    "Unable to allocate %zu bytes of memory for send lists.",
328136849Sscottl		    sizeof(hio_send_list[0]) * ncomps);
329136849Sscottl	}
330136849Sscottl	hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
331136849Sscottl	if (hio_send_list_lock == NULL) {
332136849Sscottl		primary_exitx(EX_TEMPFAIL,
333136849Sscottl		    "Unable to allocate %zu bytes of memory for send list locks.",
334136849Sscottl		    sizeof(hio_send_list_lock[0]) * ncomps);
335136849Sscottl	}
336136849Sscottl	hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
337136849Sscottl	if (hio_send_list_cond == NULL) {
338136849Sscottl		primary_exitx(EX_TEMPFAIL,
339136849Sscottl		    "Unable to allocate %zu bytes of memory for send list condition variables.",
340136849Sscottl		    sizeof(hio_send_list_cond[0]) * ncomps);
341136849Sscottl	}
342136849Sscottl	hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
343136849Sscottl	if (hio_recv_list == NULL) {
344136849Sscottl		primary_exitx(EX_TEMPFAIL,
345136849Sscottl		    "Unable to allocate %zu bytes of memory for recv lists.",
346136849Sscottl		    sizeof(hio_recv_list[0]) * ncomps);
347136849Sscottl	}
348136849Sscottl	hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
349136849Sscottl	if (hio_recv_list_lock == NULL) {
350136849Sscottl		primary_exitx(EX_TEMPFAIL,
351136849Sscottl		    "Unable to allocate %zu bytes of memory for recv list locks.",
352136849Sscottl		    sizeof(hio_recv_list_lock[0]) * ncomps);
353136849Sscottl	}
354136849Sscottl	hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
355136849Sscottl	if (hio_recv_list_cond == NULL) {
356136849Sscottl		primary_exitx(EX_TEMPFAIL,
357136849Sscottl		    "Unable to allocate %zu bytes of memory for recv list condition variables.",
358136849Sscottl		    sizeof(hio_recv_list_cond[0]) * ncomps);
359136849Sscottl	}
360136849Sscottl	hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
361136849Sscottl	if (hio_remote_lock == NULL) {
362136849Sscottl		primary_exitx(EX_TEMPFAIL,
363136849Sscottl		    "Unable to allocate %zu bytes of memory for remote connections locks.",
364136849Sscottl		    sizeof(hio_remote_lock[0]) * ncomps);
365136849Sscottl	}
366136849Sscottl
367136849Sscottl	/*
368136849Sscottl	 * Initialize lists, their locks and theirs condition variables.
369136849Sscottl	 */
370136849Sscottl	TAILQ_INIT(&hio_free_list);
371136849Sscottl	mtx_init(&hio_free_list_lock);
372136849Sscottl	cv_init(&hio_free_list_cond);
373136849Sscottl	for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
374136849Sscottl		TAILQ_INIT(&hio_send_list[ii]);
375136849Sscottl		mtx_init(&hio_send_list_lock[ii]);
376190809Sdelphij		cv_init(&hio_send_list_cond[ii]);
377190809Sdelphij		TAILQ_INIT(&hio_recv_list[ii]);
378136849Sscottl		mtx_init(&hio_recv_list_lock[ii]);
379190809Sdelphij		cv_init(&hio_recv_list_cond[ii]);
380190809Sdelphij		rw_init(&hio_remote_lock[ii]);
381136849Sscottl	}
382190809Sdelphij	TAILQ_INIT(&hio_done_list);
383136849Sscottl	mtx_init(&hio_done_list_lock);
384136849Sscottl	cv_init(&hio_done_list_cond);
385136849Sscottl	mtx_init(&metadata_lock);
386136849Sscottl
387136849Sscottl	/*
388136849Sscottl	 * Allocate requests pool and initialize requests.
389136849Sscottl	 */
390136849Sscottl	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
391136849Sscottl		hio = malloc(sizeof(*hio));
392136849Sscottl		if (hio == NULL) {
393136849Sscottl			primary_exitx(EX_TEMPFAIL,
394136849Sscottl			    "Unable to allocate %zu bytes of memory for hio request.",
395136849Sscottl			    sizeof(*hio));
396136849Sscottl		}
397136849Sscottl		hio->hio_countdown = 0;
398136849Sscottl		hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
399136849Sscottl		if (hio->hio_errors == NULL) {
400136849Sscottl			primary_exitx(EX_TEMPFAIL,
401136849Sscottl			    "Unable allocate %zu bytes of memory for hio errors.",
402136849Sscottl			    sizeof(hio->hio_errors[0]) * ncomps);
403136849Sscottl		}
404136849Sscottl		hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
405136849Sscottl		if (hio->hio_next == NULL) {
406136849Sscottl			primary_exitx(EX_TEMPFAIL,
407136849Sscottl			    "Unable allocate %zu bytes of memory for hio_next field.",
408136849Sscottl			    sizeof(hio->hio_next[0]) * ncomps);
409136849Sscottl		}
410136849Sscottl		hio->hio_ggio.gctl_version = G_GATE_VERSION;
411136849Sscottl		hio->hio_ggio.gctl_data = malloc(MAXPHYS);
412136849Sscottl		if (hio->hio_ggio.gctl_data == NULL) {
413136849Sscottl			primary_exitx(EX_TEMPFAIL,
414136849Sscottl			    "Unable to allocate %zu bytes of memory for gctl_data.",
415136849Sscottl			    MAXPHYS);
416136849Sscottl		}
417136849Sscottl		hio->hio_ggio.gctl_length = MAXPHYS;
418136849Sscottl		hio->hio_ggio.gctl_error = 0;
419136849Sscottl		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
420136849Sscottl	}
421136849Sscottl}
422136849Sscottl
423136849Sscottlstatic bool
424136849Sscottlinit_resuid(struct hast_resource *res)
425136849Sscottl{
426136849Sscottl
427136849Sscottl	mtx_lock(&metadata_lock);
428136849Sscottl	if (res->hr_resuid != 0) {
429136849Sscottl		mtx_unlock(&metadata_lock);
430136849Sscottl		return (false);
431136849Sscottl	} else {
432136849Sscottl		/* Initialize unique resource identifier. */
433136849Sscottl		arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
434136849Sscottl		mtx_unlock(&metadata_lock);
435136849Sscottl		if (metadata_write(res) < 0)
436136849Sscottl			exit(EX_NOINPUT);
437136849Sscottl		return (true);
438136849Sscottl	}
439136849Sscottl}
440136849Sscottl
441136849Sscottlstatic void
442136849Sscottlinit_local(struct hast_resource *res)
443136849Sscottl{
444136849Sscottl	unsigned char *buf;
445136849Sscottl	size_t mapsize;
446136849Sscottl
447136849Sscottl	if (metadata_read(res, true) < 0)
448136849Sscottl		exit(EX_NOINPUT);
449136849Sscottl	mtx_init(&res->hr_amp_lock);
450136849Sscottl	if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
451136849Sscottl	    res->hr_local_sectorsize, res->hr_keepdirty) < 0) {
452136849Sscottl		primary_exit(EX_TEMPFAIL, "Unable to create activemap");
453136849Sscottl	}
454136849Sscottl	mtx_init(&range_lock);
455136849Sscottl	cv_init(&range_regular_cond);
456136849Sscottl	if (rangelock_init(&range_regular) < 0)
457136849Sscottl		primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
458136849Sscottl	cv_init(&range_sync_cond);
459136849Sscottl	if (rangelock_init(&range_sync) < 0)
460136849Sscottl		primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
461136849Sscottl	mapsize = activemap_ondisk_size(res->hr_amp);
462136849Sscottl	buf = calloc(1, mapsize);
463136849Sscottl	if (buf == NULL) {
464136849Sscottl		primary_exitx(EX_TEMPFAIL,
465136849Sscottl		    "Unable to allocate buffer for activemap.");
466136849Sscottl	}
467136849Sscottl	if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
468136849Sscottl	    (ssize_t)mapsize) {
469136849Sscottl		primary_exit(EX_NOINPUT, "Unable to read activemap");
470136849Sscottl	}
471136849Sscottl	activemap_copyin(res->hr_amp, buf, mapsize);
472136849Sscottl	free(buf);
473136849Sscottl	if (res->hr_resuid != 0)
474136849Sscottl		return;
475136849Sscottl	/*
476136849Sscottl	 * We're using provider for the first time. Initialize local and remote
477136849Sscottl	 * counters. We don't initialize resuid here, as we want to do it just
478136849Sscottl	 * in time. The reason for this is that we want to inform secondary
479136849Sscottl	 * that there were no writes yet, so there is no need to synchronize
480136849Sscottl	 * anything.
481136849Sscottl	 */
482136849Sscottl	res->hr_primary_localcnt = 0;
483136849Sscottl	res->hr_primary_remotecnt = 0;
484136849Sscottl	if (metadata_write(res) < 0)
485136849Sscottl		exit(EX_NOINPUT);
486136849Sscottl}
487136849Sscottl
488136849Sscottlstatic int
489136849Sscottlprimary_connect(struct hast_resource *res, struct proto_conn **connp)
490136849Sscottl{
491136849Sscottl	struct proto_conn *conn;
492136849Sscottl	int16_t val;
493136849Sscottl
494136849Sscottl	val = 1;
495136849Sscottl	if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) {
496136849Sscottl		primary_exit(EX_TEMPFAIL,
497136849Sscottl		    "Unable to send connection request to parent");
498136849Sscottl	}
499136849Sscottl	if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) {
500136849Sscottl		primary_exit(EX_TEMPFAIL,
501136849Sscottl		    "Unable to receive reply to connection request from parent");
502136849Sscottl	}
503136849Sscottl	if (val != 0) {
504136849Sscottl		errno = val;
505136849Sscottl		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
506136849Sscottl		    res->hr_remoteaddr);
507136849Sscottl		return (-1);
508136849Sscottl	}
509136849Sscottl	if (proto_connection_recv(res->hr_conn, true, &conn) < 0) {
510136849Sscottl		primary_exit(EX_TEMPFAIL,
511136849Sscottl		    "Unable to receive connection from parent");
512136849Sscottl	}
513136849Sscottl	if (proto_connect_wait(conn, res->hr_timeout) < 0) {
514136849Sscottl		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
515136849Sscottl		    res->hr_remoteaddr);
516136849Sscottl		proto_close(conn);
517136849Sscottl		return (-1);
518136849Sscottl	}
519136849Sscottl	/* Error in setting timeout is not critical, but why should it fail? */
520136849Sscottl	if (proto_timeout(conn, res->hr_timeout) < 0)
521136849Sscottl		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
522136849Sscottl
523136849Sscottl	*connp = conn;
524136849Sscottl
525136849Sscottl	return (0);
526136849Sscottl}
527136849Sscottl
528136849Sscottlstatic int
529136849Sscottlinit_remote(struct hast_resource *res, struct proto_conn **inp,
530136849Sscottl    struct proto_conn **outp)
531136849Sscottl{
532136849Sscottl	struct proto_conn *in, *out;
533136849Sscottl	struct nv *nvout, *nvin;
534136849Sscottl	const unsigned char *token;
535136849Sscottl	unsigned char *map;
536136849Sscottl	const char *errmsg;
537136849Sscottl	int32_t extentsize;
538136849Sscottl	int64_t datasize;
539136849Sscottl	uint32_t mapsize;
540136849Sscottl	size_t size;
541136849Sscottl	int error;
542136849Sscottl
543136849Sscottl	PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
544136849Sscottl	PJDLOG_ASSERT(real_remote(res));
545136849Sscottl
546136849Sscottl	in = out = NULL;
547136849Sscottl	errmsg = NULL;
548136849Sscottl
549136849Sscottl	if (primary_connect(res, &out) == -1)
550136849Sscottl		return (ECONNREFUSED);
551136849Sscottl
552136849Sscottl	error = ECONNABORTED;
553136849Sscottl
554136849Sscottl	/*
555136849Sscottl	 * First handshake step.
556136849Sscottl	 * Setup outgoing connection with remote node.
557136849Sscottl	 */
558136849Sscottl	nvout = nv_alloc();
559136849Sscottl	nv_add_string(nvout, res->hr_name, "resource");
560136849Sscottl	if (nv_error(nvout) != 0) {
561136849Sscottl		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
562136849Sscottl		    "Unable to allocate header for connection with %s",
563136849Sscottl		    res->hr_remoteaddr);
564136849Sscottl		nv_free(nvout);
565136849Sscottl		goto close;
566136849Sscottl	}
567136849Sscottl	if (hast_proto_send(res, out, nvout, NULL, 0) < 0) {
568136849Sscottl		pjdlog_errno(LOG_WARNING,
569136849Sscottl		    "Unable to send handshake header to %s",
570136849Sscottl		    res->hr_remoteaddr);
571136849Sscottl		nv_free(nvout);
572136849Sscottl		goto close;
573136849Sscottl	}
574136849Sscottl	nv_free(nvout);
575136849Sscottl	if (hast_proto_recv_hdr(out, &nvin) < 0) {
576136849Sscottl		pjdlog_errno(LOG_WARNING,
577136849Sscottl		    "Unable to receive handshake header from %s",
578136849Sscottl		    res->hr_remoteaddr);
579136849Sscottl		goto close;
580136849Sscottl	}
581136849Sscottl	errmsg = nv_get_string(nvin, "errmsg");
582136849Sscottl	if (errmsg != NULL) {
583136849Sscottl		pjdlog_warning("%s", errmsg);
584136849Sscottl		if (nv_exists(nvin, "wait"))
585136849Sscottl			error = EBUSY;
586136849Sscottl		nv_free(nvin);
587136849Sscottl		goto close;
588136849Sscottl	}
589136849Sscottl	token = nv_get_uint8_array(nvin, &size, "token");
590136849Sscottl	if (token == NULL) {
591136849Sscottl		pjdlog_warning("Handshake header from %s has no 'token' field.",
592136849Sscottl		    res->hr_remoteaddr);
593136849Sscottl		nv_free(nvin);
594136849Sscottl		goto close;
595136849Sscottl	}
596136849Sscottl	if (size != sizeof(res->hr_token)) {
597136849Sscottl		pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
598136849Sscottl		    res->hr_remoteaddr, size, sizeof(res->hr_token));
599136849Sscottl		nv_free(nvin);
600136849Sscottl		goto close;
601136849Sscottl	}
602136849Sscottl	bcopy(token, res->hr_token, sizeof(res->hr_token));
603136849Sscottl	nv_free(nvin);
604136849Sscottl
605136849Sscottl	/*
606136849Sscottl	 * Second handshake step.
607136849Sscottl	 * Setup incoming connection with remote node.
608136849Sscottl	 */
609136849Sscottl	if (primary_connect(res, &in) == -1)
610136849Sscottl		goto close;
611136849Sscottl
612136849Sscottl	nvout = nv_alloc();
613136849Sscottl	nv_add_string(nvout, res->hr_name, "resource");
614136849Sscottl	nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
615136849Sscottl	    "token");
616136849Sscottl	if (res->hr_resuid == 0) {
617136849Sscottl		/*
618136849Sscottl		 * The resuid field was not yet initialized.
619136849Sscottl		 * Because we do synchronization inside init_resuid(), it is
620136849Sscottl		 * possible that someone already initialized it, the function
621136849Sscottl		 * will return false then, but if we successfully initialized
622136849Sscottl		 * it, we will get true. True means that there were no writes
623136849Sscottl		 * to this resource yet and we want to inform secondary that
624136849Sscottl		 * synchronization is not needed by sending "virgin" argument.
625136849Sscottl		 */
626136849Sscottl		if (init_resuid(res))
627136849Sscottl			nv_add_int8(nvout, 1, "virgin");
628136849Sscottl	}
629136849Sscottl	nv_add_uint64(nvout, res->hr_resuid, "resuid");
630136849Sscottl	nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
631136849Sscottl	nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
632136849Sscottl	if (nv_error(nvout) != 0) {
633136849Sscottl		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
634136849Sscottl		    "Unable to allocate header for connection with %s",
635136849Sscottl		    res->hr_remoteaddr);
636136849Sscottl		nv_free(nvout);
637136849Sscottl		goto close;
638136849Sscottl	}
639136849Sscottl	if (hast_proto_send(res, in, nvout, NULL, 0) < 0) {
640136849Sscottl		pjdlog_errno(LOG_WARNING,
641136849Sscottl		    "Unable to send handshake header to %s",
642136849Sscottl		    res->hr_remoteaddr);
643136849Sscottl		nv_free(nvout);
644136849Sscottl		goto close;
645136849Sscottl	}
646136849Sscottl	nv_free(nvout);
647136849Sscottl	if (hast_proto_recv_hdr(out, &nvin) < 0) {
648136849Sscottl		pjdlog_errno(LOG_WARNING,
649136849Sscottl		    "Unable to receive handshake header from %s",
650136849Sscottl		    res->hr_remoteaddr);
651136849Sscottl		goto close;
652136849Sscottl	}
653136849Sscottl	errmsg = nv_get_string(nvin, "errmsg");
654136849Sscottl	if (errmsg != NULL) {
655136849Sscottl		pjdlog_warning("%s", errmsg);
656136849Sscottl		nv_free(nvin);
657136849Sscottl		goto close;
658136849Sscottl	}
659136849Sscottl	datasize = nv_get_int64(nvin, "datasize");
660136849Sscottl	if (datasize != res->hr_datasize) {
661136849Sscottl		pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
662136849Sscottl		    (intmax_t)res->hr_datasize, (intmax_t)datasize);
663136849Sscottl		nv_free(nvin);
664136849Sscottl		goto close;
665136849Sscottl	}
666136849Sscottl	extentsize = nv_get_int32(nvin, "extentsize");
667136849Sscottl	if (extentsize != res->hr_extentsize) {
668136849Sscottl		pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
669136849Sscottl		    (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
670136849Sscottl		nv_free(nvin);
671136849Sscottl		goto close;
672136849Sscottl	}
673136849Sscottl	res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
674136849Sscottl	res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
675136849Sscottl	res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
676136849Sscottl	if (nv_exists(nvin, "virgin")) {
677136849Sscottl		/*
678136849Sscottl		 * Secondary was reinitialized, bump localcnt if it is 0 as
679136849Sscottl		 * only we have the data.
680136849Sscottl		 */
681136849Sscottl		PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY);
682136849Sscottl		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
683136849Sscottl
684136849Sscottl		if (res->hr_primary_localcnt == 0) {
685136849Sscottl			PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0);
686136849Sscottl
687136849Sscottl			mtx_lock(&metadata_lock);
688136849Sscottl			res->hr_primary_localcnt++;
689136849Sscottl			pjdlog_debug(1, "Increasing localcnt to %ju.",
690136849Sscottl			    (uintmax_t)res->hr_primary_localcnt);
691136849Sscottl			(void)metadata_write(res);
692136849Sscottl			mtx_unlock(&metadata_lock);
693136849Sscottl		}
694136849Sscottl	}
695136849Sscottl	map = NULL;
696136849Sscottl	mapsize = nv_get_uint32(nvin, "mapsize");
697136849Sscottl	if (mapsize > 0) {
698136849Sscottl		map = malloc(mapsize);
699136849Sscottl		if (map == NULL) {
700190809Sdelphij			pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
701190809Sdelphij			    (uintmax_t)mapsize);
702190809Sdelphij			nv_free(nvin);
703136849Sscottl			goto close;
704136849Sscottl		}
705190809Sdelphij		/*
706190809Sdelphij		 * Remote node have some dirty extents on its own, lets
707190809Sdelphij		 * download its activemap.
708190809Sdelphij		 */
709136849Sscottl		if (hast_proto_recv_data(res, out, nvin, map,
710136849Sscottl		    mapsize) < 0) {
711136849Sscottl			pjdlog_errno(LOG_ERR,
712136849Sscottl			    "Unable to receive remote activemap");
713136849Sscottl			nv_free(nvin);
714136849Sscottl			free(map);
715136849Sscottl			goto close;
716136849Sscottl		}
717136849Sscottl		/*
718136849Sscottl		 * Merge local and remote bitmaps.
719136849Sscottl		 */
720136849Sscottl		activemap_merge(res->hr_amp, map, mapsize);
721136849Sscottl		free(map);
722136849Sscottl		/*
723136849Sscottl		 * Now that we merged bitmaps from both nodes, flush it to the
724136849Sscottl		 * disk before we start to synchronize.
725136849Sscottl		 */
726136849Sscottl		(void)hast_activemap_flush(res);
727136849Sscottl	}
728136849Sscottl	nv_free(nvin);
729136849Sscottl#ifdef notyet
730136849Sscottl	/* Setup directions. */
731136849Sscottl	if (proto_send(out, NULL, 0) == -1)
732136849Sscottl		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
733136849Sscottl	if (proto_recv(in, NULL, 0) == -1)
734136849Sscottl		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
735136849Sscottl#endif
736136849Sscottl	pjdlog_info("Connected to %s.", res->hr_remoteaddr);
737136849Sscottl	if (inp != NULL && outp != NULL) {
738136849Sscottl		*inp = in;
739136849Sscottl		*outp = out;
740136849Sscottl	} else {
741190809Sdelphij		res->hr_remotein = in;
742190809Sdelphij		res->hr_remoteout = out;
743136849Sscottl	}
744136849Sscottl	event_send(res, EVENT_CONNECT);
745136849Sscottl	return (0);
746136849Sscottlclose:
747136849Sscottl	if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
748136849Sscottl		event_send(res, EVENT_SPLITBRAIN);
749136849Sscottl	proto_close(out);
750136849Sscottl	if (in != NULL)
751136849Sscottl		proto_close(in);
752136849Sscottl	return (error);
753136849Sscottl}
754136849Sscottl
755190809Sdelphijstatic void
756136849Sscottlsync_start(void)
757136849Sscottl{
758136849Sscottl
759136849Sscottl	mtx_lock(&sync_lock);
760136849Sscottl	sync_inprogress = true;
761136849Sscottl	mtx_unlock(&sync_lock);
762136849Sscottl	cv_signal(&sync_cond);
763136849Sscottl}
764136849Sscottl
765136849Sscottlstatic void
766136849Sscottlsync_stop(void)
767136849Sscottl{
768136849Sscottl
769136849Sscottl	mtx_lock(&sync_lock);
770136849Sscottl	if (sync_inprogress)
771136849Sscottl		sync_inprogress = false;
772136849Sscottl	mtx_unlock(&sync_lock);
773136849Sscottl}
774136849Sscottl
775136849Sscottlstatic void
776136849Sscottlinit_ggate(struct hast_resource *res)
777136849Sscottl{
778136849Sscottl	struct g_gate_ctl_create ggiocreate;
779136849Sscottl	struct g_gate_ctl_cancel ggiocancel;
780136849Sscottl
781136849Sscottl	/*
782136849Sscottl	 * We communicate with ggate via /dev/ggctl. Open it.
783136849Sscottl	 */
784136849Sscottl	res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
785136849Sscottl	if (res->hr_ggatefd < 0)
786136849Sscottl		primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
787136849Sscottl	/*
788136849Sscottl	 * Create provider before trying to connect, as connection failure
789136849Sscottl	 * is not critical, but may take some time.
790136849Sscottl	 */
791136849Sscottl	bzero(&ggiocreate, sizeof(ggiocreate));
792149871Sscottl	ggiocreate.gctl_version = G_GATE_VERSION;
793136849Sscottl	ggiocreate.gctl_mediasize = res->hr_datasize;
794136849Sscottl	ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
795136849Sscottl	ggiocreate.gctl_flags = 0;
796136849Sscottl	ggiocreate.gctl_maxcount = 0;
797136849Sscottl	ggiocreate.gctl_timeout = 0;
798136849Sscottl	ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
799136849Sscottl	snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
800136849Sscottl	    res->hr_provname);
801136849Sscottl	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
802136849Sscottl		pjdlog_info("Device hast/%s created.", res->hr_provname);
803136849Sscottl		res->hr_ggateunit = ggiocreate.gctl_unit;
804136849Sscottl		return;
805136849Sscottl	}
806136849Sscottl	if (errno != EEXIST) {
807136849Sscottl		primary_exit(EX_OSERR, "Unable to create hast/%s device",
808136849Sscottl		    res->hr_provname);
809136849Sscottl	}
810136849Sscottl	pjdlog_debug(1,
811136849Sscottl	    "Device hast/%s already exists, we will try to take it over.",
812136849Sscottl	    res->hr_provname);
813136849Sscottl	/*
814136849Sscottl	 * If we received EEXIST, we assume that the process who created the
815136849Sscottl	 * provider died and didn't clean up. In that case we will start from
816136849Sscottl	 * where he left of.
817136849Sscottl	 */
818149871Sscottl	bzero(&ggiocancel, sizeof(ggiocancel));
819136849Sscottl	ggiocancel.gctl_version = G_GATE_VERSION;
820136849Sscottl	ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
821136849Sscottl	snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
822136849Sscottl	    res->hr_provname);
823136849Sscottl	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
824136849Sscottl		pjdlog_info("Device hast/%s recovered.", res->hr_provname);
825136849Sscottl		res->hr_ggateunit = ggiocancel.gctl_unit;
826136849Sscottl		return;
827136849Sscottl	}
828136849Sscottl	primary_exit(EX_OSERR, "Unable to take over hast/%s device",
829136849Sscottl	    res->hr_provname);
830136849Sscottl}
831136849Sscottl
832136849Sscottlvoid
833136849Sscottlhastd_primary(struct hast_resource *res)
834136849Sscottl{
835136849Sscottl	pthread_t td;
836136849Sscottl	pid_t pid;
837136849Sscottl	int error, mode, debuglevel;
838136849Sscottl
839136849Sscottl	/*
840136849Sscottl	 * Create communication channel for sending control commands from
841136849Sscottl	 * parent to child.
842136849Sscottl	 */
843136849Sscottl	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) {
844136849Sscottl		/* TODO: There's no need for this to be fatal error. */
845136849Sscottl		KEEP_ERRNO((void)pidfile_remove(pfh));
846136849Sscottl		pjdlog_exit(EX_OSERR,
847136849Sscottl		    "Unable to create control sockets between parent and child");
848136849Sscottl	}
849136849Sscottl	/*
850136849Sscottl	 * Create communication channel for sending events from child to parent.
851136849Sscottl	 */
852136849Sscottl	if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) {
853136849Sscottl		/* TODO: There's no need for this to be fatal error. */
854136849Sscottl		KEEP_ERRNO((void)pidfile_remove(pfh));
855136849Sscottl		pjdlog_exit(EX_OSERR,
856136849Sscottl		    "Unable to create event sockets between child and parent");
857136849Sscottl	}
858136849Sscottl	/*
859136849Sscottl	 * Create communication channel for sending connection requests from
860136849Sscottl	 * child to parent.
861136849Sscottl	 */
862136849Sscottl	if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) {
863136849Sscottl		/* TODO: There's no need for this to be fatal error. */
864136849Sscottl		KEEP_ERRNO((void)pidfile_remove(pfh));
865136849Sscottl		pjdlog_exit(EX_OSERR,
866136849Sscottl		    "Unable to create connection sockets between child and parent");
867136849Sscottl	}
868136849Sscottl
869136849Sscottl	pid = fork();
870136849Sscottl	if (pid < 0) {
871136849Sscottl		/* TODO: There's no need for this to be fatal error. */
872136849Sscottl		KEEP_ERRNO((void)pidfile_remove(pfh));
873136849Sscottl		pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
874136849Sscottl	}
875136849Sscottl
876136849Sscottl	if (pid > 0) {
877136849Sscottl		/* This is parent. */
878136849Sscottl		/* Declare that we are receiver. */
879136849Sscottl		proto_recv(res->hr_event, NULL, 0);
880136849Sscottl		proto_recv(res->hr_conn, NULL, 0);
881136849Sscottl		/* Declare that we are sender. */
882136849Sscottl		proto_send(res->hr_ctrl, NULL, 0);
883136849Sscottl		res->hr_workerpid = pid;
884136849Sscottl		return;
885136849Sscottl	}
886136849Sscottl
887136849Sscottl	gres = res;
888136849Sscottl	mode = pjdlog_mode_get();
889136849Sscottl	debuglevel = pjdlog_debug_get();
890136849Sscottl
891136849Sscottl	/* Declare that we are sender. */
892136849Sscottl	proto_send(res->hr_event, NULL, 0);
893136849Sscottl	proto_send(res->hr_conn, NULL, 0);
894136849Sscottl	/* Declare that we are receiver. */
895136849Sscottl	proto_recv(res->hr_ctrl, NULL, 0);
896136849Sscottl	descriptors_cleanup(res);
897136849Sscottl
898136849Sscottl	descriptors_assert(res, mode);
899136849Sscottl
900136849Sscottl	pjdlog_init(mode);
901136849Sscottl	pjdlog_debug_set(debuglevel);
902136849Sscottl	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
903136849Sscottl	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
904136849Sscottl
905136849Sscottl	init_local(res);
906136849Sscottl	init_ggate(res);
907136849Sscottl	init_environment(res);
908136849Sscottl
909136849Sscottl	if (drop_privs(res) != 0) {
910136849Sscottl		cleanup(res);
911136849Sscottl		exit(EX_CONFIG);
912136849Sscottl	}
913136849Sscottl	pjdlog_info("Privileges successfully dropped.");
914136849Sscottl
915136849Sscottl	/*
916136849Sscottl	 * Create the guard thread first, so we can handle signals from the
917136849Sscottl	 * very begining.
918136849Sscottl	 */
919136849Sscottl	error = pthread_create(&td, NULL, guard_thread, res);
920136849Sscottl	PJDLOG_ASSERT(error == 0);
921136849Sscottl	/*
922136849Sscottl	 * Create the control thread before sending any event to the parent,
923136849Sscottl	 * as we can deadlock when parent sends control request to worker,
924136849Sscottl	 * but worker has no control thread started yet, so parent waits.
925136849Sscottl	 * In the meantime worker sends an event to the parent, but parent
926136849Sscottl	 * is unable to handle the event, because it waits for control
927136849Sscottl	 * request response.
928136849Sscottl	 */
929136849Sscottl	error = pthread_create(&td, NULL, ctrl_thread, res);
930136849Sscottl	PJDLOG_ASSERT(error == 0);
931136849Sscottl	if (real_remote(res)) {
932136849Sscottl		error = init_remote(res, NULL, NULL);
933136849Sscottl		if (error == 0) {
934136849Sscottl			sync_start();
935136849Sscottl		} else if (error == EBUSY) {
936136849Sscottl			time_t start = time(NULL);
937136849Sscottl
938136849Sscottl			pjdlog_warning("Waiting for remote node to become %s for %ds.",
939136849Sscottl			    role2str(HAST_ROLE_SECONDARY),
940136849Sscottl			    res->hr_timeout);
941136849Sscottl			for (;;) {
942136849Sscottl				sleep(1);
943136849Sscottl				error = init_remote(res, NULL, NULL);
944136849Sscottl				if (error != EBUSY)
945136849Sscottl					break;
946136849Sscottl				if (time(NULL) > start + res->hr_timeout)
947136849Sscottl					break;
948136849Sscottl			}
949136849Sscottl			if (error == EBUSY) {
950136849Sscottl				pjdlog_warning("Remote node is still %s, starting anyway.",
951136849Sscottl				    role2str(HAST_ROLE_PRIMARY));
952136849Sscottl			}
953136849Sscottl		}
954136849Sscottl	}
955136849Sscottl	error = pthread_create(&td, NULL, ggate_recv_thread, res);
956136849Sscottl	PJDLOG_ASSERT(error == 0);
957136849Sscottl	error = pthread_create(&td, NULL, local_send_thread, res);
958136849Sscottl	PJDLOG_ASSERT(error == 0);
959136849Sscottl	error = pthread_create(&td, NULL, remote_send_thread, res);
960136849Sscottl	PJDLOG_ASSERT(error == 0);
961136849Sscottl	error = pthread_create(&td, NULL, remote_recv_thread, res);
962136849Sscottl	PJDLOG_ASSERT(error == 0);
963136849Sscottl	error = pthread_create(&td, NULL, ggate_send_thread, res);
964136849Sscottl	PJDLOG_ASSERT(error == 0);
965136849Sscottl	fullystarted = true;
966136849Sscottl	(void)sync_thread(res);
967136849Sscottl}
968136849Sscottl
969136849Sscottlstatic void
970136849Sscottlreqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
971136849Sscottl{
972136849Sscottl	char msg[1024];
973136849Sscottl	va_list ap;
974136849Sscottl	int len;
975136849Sscottl
976136849Sscottl	va_start(ap, fmt);
977136849Sscottl	len = vsnprintf(msg, sizeof(msg), fmt, ap);
978136849Sscottl	va_end(ap);
979136849Sscottl	if ((size_t)len < sizeof(msg)) {
980136849Sscottl		switch (ggio->gctl_cmd) {
981136849Sscottl		case BIO_READ:
982136849Sscottl			(void)snprintf(msg + len, sizeof(msg) - len,
983136849Sscottl			    "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
984136849Sscottl			    (uintmax_t)ggio->gctl_length);
985136849Sscottl			break;
986136849Sscottl		case BIO_DELETE:
987136849Sscottl			(void)snprintf(msg + len, sizeof(msg) - len,
988136849Sscottl			    "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
989136849Sscottl			    (uintmax_t)ggio->gctl_length);
990136849Sscottl			break;
991136849Sscottl		case BIO_FLUSH:
992136849Sscottl			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
993136849Sscottl			break;
994136849Sscottl		case BIO_WRITE:
995136849Sscottl			(void)snprintf(msg + len, sizeof(msg) - len,
996136849Sscottl			    "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
997136849Sscottl			    (uintmax_t)ggio->gctl_length);
998136849Sscottl			break;
999136849Sscottl		default:
1000136849Sscottl			(void)snprintf(msg + len, sizeof(msg) - len,
1001136849Sscottl			    "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd);
1002136849Sscottl			break;
1003136849Sscottl		}
1004136849Sscottl	}
1005136849Sscottl	pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
1006136849Sscottl}
1007136849Sscottl
1008136849Sscottlstatic void
1009136849Sscottlremote_close(struct hast_resource *res, int ncomp)
1010136849Sscottl{
1011136849Sscottl
1012136849Sscottl	rw_wlock(&hio_remote_lock[ncomp]);
1013136849Sscottl	/*
1014136849Sscottl	 * A race is possible between dropping rlock and acquiring wlock -
1015136849Sscottl	 * another thread can close connection in-between.
1016136849Sscottl	 */
1017136849Sscottl	if (!ISCONNECTED(res, ncomp)) {
1018136849Sscottl		PJDLOG_ASSERT(res->hr_remotein == NULL);
1019136849Sscottl		PJDLOG_ASSERT(res->hr_remoteout == NULL);
1020136849Sscottl		rw_unlock(&hio_remote_lock[ncomp]);
1021136849Sscottl		return;
1022136849Sscottl	}
1023136849Sscottl
1024136849Sscottl	PJDLOG_ASSERT(res->hr_remotein != NULL);
1025136849Sscottl	PJDLOG_ASSERT(res->hr_remoteout != NULL);
1026136849Sscottl
1027136849Sscottl	pjdlog_debug(2, "Closing incoming connection to %s.",
1028136849Sscottl	    res->hr_remoteaddr);
1029136849Sscottl	proto_close(res->hr_remotein);
1030136849Sscottl	res->hr_remotein = NULL;
1031136849Sscottl	pjdlog_debug(2, "Closing outgoing connection to %s.",
1032136849Sscottl	    res->hr_remoteaddr);
1033136849Sscottl	proto_close(res->hr_remoteout);
1034136849Sscottl	res->hr_remoteout = NULL;
1035136849Sscottl
1036136849Sscottl	rw_unlock(&hio_remote_lock[ncomp]);
1037136849Sscottl
1038136849Sscottl	pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
1039136849Sscottl
1040136849Sscottl	/*
1041136849Sscottl	 * Stop synchronization if in-progress.
1042136849Sscottl	 */
1043136849Sscottl	sync_stop();
1044136849Sscottl
1045136849Sscottl	event_send(res, EVENT_DISCONNECT);
1046136849Sscottl}
1047136849Sscottl
1048136849Sscottl/*
1049136849Sscottl * Thread receives ggate I/O requests from the kernel and passes them to
1050136849Sscottl * appropriate threads:
1051136849Sscottl * WRITE - always goes to both local_send and remote_send threads
1052136849Sscottl * READ (when the block is up-to-date on local component) -
1053136849Sscottl *	only local_send thread
1054136849Sscottl * READ (when the block isn't up-to-date on local component) -
1055136849Sscottl *	only remote_send thread
1056136849Sscottl * DELETE - always goes to both local_send and remote_send threads
1057136849Sscottl * FLUSH - always goes to both local_send and remote_send threads
1058136849Sscottl */
1059136849Sscottlstatic void *
1060149871Sscottlggate_recv_thread(void *arg)
1061136849Sscottl{
1062136849Sscottl	struct hast_resource *res = arg;
1063136849Sscottl	struct g_gate_ctl_io *ggio;
1064136849Sscottl	struct hio *hio;
1065136849Sscottl	unsigned int ii, ncomp, ncomps;
1066136849Sscottl	int error;
1067136849Sscottl
1068136849Sscottl	ncomps = HAST_NCOMPONENTS;
1069136849Sscottl
1070136849Sscottl	for (;;) {
1071136849Sscottl		pjdlog_debug(2, "ggate_recv: Taking free request.");
1072136849Sscottl		QUEUE_TAKE2(hio, free);
1073136849Sscottl		pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
1074136849Sscottl		ggio = &hio->hio_ggio;
1075136849Sscottl		ggio->gctl_unit = res->hr_ggateunit;
1076136849Sscottl		ggio->gctl_length = MAXPHYS;
1077136849Sscottl		ggio->gctl_error = 0;
1078136849Sscottl		pjdlog_debug(2,
1079136849Sscottl		    "ggate_recv: (%p) Waiting for request from the kernel.",
1080136849Sscottl		    hio);
1081136849Sscottl		if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) {
1082136849Sscottl			if (sigexit_received)
1083136849Sscottl				pthread_exit(NULL);
1084136849Sscottl			primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
1085136849Sscottl		}
1086136849Sscottl		error = ggio->gctl_error;
1087136849Sscottl		switch (error) {
1088136849Sscottl		case 0:
1089136849Sscottl			break;
1090136849Sscottl		case ECANCELED:
1091136849Sscottl			/* Exit gracefully. */
1092136849Sscottl			if (!sigexit_received) {
1093136849Sscottl				pjdlog_debug(2,
1094136849Sscottl				    "ggate_recv: (%p) Received cancel from the kernel.",
1095136849Sscottl				    hio);
1096136849Sscottl				pjdlog_info("Received cancel from the kernel, exiting.");
1097136849Sscottl			}
1098136849Sscottl			pthread_exit(NULL);
1099136849Sscottl		case ENOMEM:
1100136849Sscottl			/*
1101136849Sscottl			 * Buffer too small? Impossible, we allocate MAXPHYS
1102136849Sscottl			 * bytes - request can't be bigger than that.
1103136849Sscottl			 */
1104136849Sscottl			/* FALLTHROUGH */
1105136849Sscottl		case ENXIO:
1106136849Sscottl		default:
1107136849Sscottl			primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
1108136849Sscottl			    strerror(error));
1109136849Sscottl		}
1110136849Sscottl		for (ii = 0; ii < ncomps; ii++)
1111136849Sscottl			hio->hio_errors[ii] = EINVAL;
1112136849Sscottl		reqlog(LOG_DEBUG, 2, ggio,
1113136849Sscottl		    "ggate_recv: (%p) Request received from the kernel: ",
1114136849Sscottl		    hio);
1115136849Sscottl		/*
1116136849Sscottl		 * Inform all components about new write request.
1117136849Sscottl		 * For read request prefer local component unless the given
1118136849Sscottl		 * range is out-of-date, then use remote component.
1119136849Sscottl		 */
1120136849Sscottl		switch (ggio->gctl_cmd) {
1121136849Sscottl		case BIO_READ:
1122136849Sscottl			res->hr_stat_read++;
1123136849Sscottl			pjdlog_debug(2,
1124136849Sscottl			    "ggate_recv: (%p) Moving request to the send queue.",
1125136849Sscottl			    hio);
1126136849Sscottl			refcount_init(&hio->hio_countdown, 1);
1127136849Sscottl			mtx_lock(&metadata_lock);
1128136849Sscottl			if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
1129136849Sscottl			    res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1130136849Sscottl				/*
1131136849Sscottl				 * This range is up-to-date on local component,
1132136849Sscottl				 * so handle request locally.
1133136849Sscottl				 */
1134136849Sscottl				 /* Local component is 0 for now. */
1135136849Sscottl				ncomp = 0;
1136136849Sscottl			} else /* if (res->hr_syncsrc ==
1137136849Sscottl			    HAST_SYNCSRC_SECONDARY) */ {
1138136849Sscottl				PJDLOG_ASSERT(res->hr_syncsrc ==
1139136849Sscottl				    HAST_SYNCSRC_SECONDARY);
1140136849Sscottl				/*
1141136849Sscottl				 * This range is out-of-date on local component,
1142136849Sscottl				 * so send request to the remote node.
1143136849Sscottl				 */
1144136849Sscottl				 /* Remote component is 1 for now. */
1145136849Sscottl				ncomp = 1;
1146136849Sscottl			}
1147136849Sscottl			mtx_unlock(&metadata_lock);
1148136849Sscottl			QUEUE_INSERT1(hio, send, ncomp);
1149136849Sscottl			break;
1150136849Sscottl		case BIO_WRITE:
1151136849Sscottl			res->hr_stat_write++;
1152136849Sscottl			if (res->hr_resuid == 0) {
1153136849Sscottl				/*
1154136849Sscottl				 * This is first write, initialize localcnt and
1155136849Sscottl				 * resuid.
1156136849Sscottl				 */
1157136849Sscottl				res->hr_primary_localcnt = 1;
1158136849Sscottl				(void)init_resuid(res);
1159136849Sscottl			}
1160136849Sscottl			for (;;) {
1161136849Sscottl				mtx_lock(&range_lock);
1162136849Sscottl				if (rangelock_islocked(range_sync,
1163136849Sscottl				    ggio->gctl_offset, ggio->gctl_length)) {
1164136849Sscottl					pjdlog_debug(2,
1165136849Sscottl					    "regular: Range offset=%jd length=%zu locked.",
1166136849Sscottl					    (intmax_t)ggio->gctl_offset,
1167136849Sscottl					    (size_t)ggio->gctl_length);
1168136849Sscottl					range_regular_wait = true;
1169136849Sscottl					cv_wait(&range_regular_cond, &range_lock);
1170136849Sscottl					range_regular_wait = false;
1171136849Sscottl					mtx_unlock(&range_lock);
1172136849Sscottl					continue;
1173136849Sscottl				}
1174136849Sscottl				if (rangelock_add(range_regular,
1175136849Sscottl				    ggio->gctl_offset, ggio->gctl_length) < 0) {
1176136849Sscottl					mtx_unlock(&range_lock);
1177136849Sscottl					pjdlog_debug(2,
1178136849Sscottl					    "regular: Range offset=%jd length=%zu is already locked, waiting.",
1179136849Sscottl					    (intmax_t)ggio->gctl_offset,
1180136849Sscottl					    (size_t)ggio->gctl_length);
1181136849Sscottl					sleep(1);
1182136849Sscottl					continue;
1183136849Sscottl				}
1184136849Sscottl				mtx_unlock(&range_lock);
1185136849Sscottl				break;
1186136849Sscottl			}
1187136849Sscottl			mtx_lock(&res->hr_amp_lock);
1188136849Sscottl			if (activemap_write_start(res->hr_amp,
1189136849Sscottl			    ggio->gctl_offset, ggio->gctl_length)) {
1190136849Sscottl				res->hr_stat_activemap_update++;
1191136849Sscottl				(void)hast_activemap_flush(res);
1192136849Sscottl			}
1193136849Sscottl			mtx_unlock(&res->hr_amp_lock);
1194136849Sscottl			/* FALLTHROUGH */
1195136849Sscottl		case BIO_DELETE:
1196136849Sscottl		case BIO_FLUSH:
1197136849Sscottl			switch (ggio->gctl_cmd) {
1198136849Sscottl			case BIO_DELETE:
1199136849Sscottl				res->hr_stat_delete++;
1200136849Sscottl				break;
1201136849Sscottl			case BIO_FLUSH:
1202136849Sscottl				res->hr_stat_flush++;
1203136849Sscottl				break;
1204136849Sscottl			}
1205136849Sscottl			pjdlog_debug(2,
1206136849Sscottl			    "ggate_recv: (%p) Moving request to the send queues.",
1207136849Sscottl			    hio);
1208136849Sscottl			refcount_init(&hio->hio_countdown, ncomps);
1209136849Sscottl			for (ii = 0; ii < ncomps; ii++)
1210136849Sscottl				QUEUE_INSERT1(hio, send, ii);
1211136849Sscottl			break;
1212136849Sscottl		}
1213136849Sscottl	}
1214136849Sscottl	/* NOTREACHED */
1215136849Sscottl	return (NULL);
1216136849Sscottl}
1217136849Sscottl
1218136849Sscottl/*
1219190809Sdelphij * Thread reads from or writes to local component.
1220190809Sdelphij * If local read fails, it redirects it to remote_send thread.
1221190809Sdelphij */
1222190809Sdelphijstatic void *
1223190809Sdelphijlocal_send_thread(void *arg)
1224190809Sdelphij{
1225190809Sdelphij	struct hast_resource *res = arg;
1226190809Sdelphij	struct g_gate_ctl_io *ggio;
1227190809Sdelphij	struct hio *hio;
1228190809Sdelphij	unsigned int ncomp, rncomp;
1229190809Sdelphij	ssize_t ret;
1230190809Sdelphij
1231136849Sscottl	/* Local component is 0 for now. */
1232136849Sscottl	ncomp = 0;
1233136849Sscottl	/* Remote component is 1 for now. */
1234136849Sscottl	rncomp = 1;
1235
1236	for (;;) {
1237		pjdlog_debug(2, "local_send: Taking request.");
1238		QUEUE_TAKE1(hio, send, ncomp, 0);
1239		pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1240		ggio = &hio->hio_ggio;
1241		switch (ggio->gctl_cmd) {
1242		case BIO_READ:
1243			ret = pread(res->hr_localfd, ggio->gctl_data,
1244			    ggio->gctl_length,
1245			    ggio->gctl_offset + res->hr_localoff);
1246			if (ret == ggio->gctl_length)
1247				hio->hio_errors[ncomp] = 0;
1248			else if (!ISSYNCREQ(hio)) {
1249				/*
1250				 * If READ failed, try to read from remote node.
1251				 */
1252				if (ret < 0) {
1253					reqlog(LOG_WARNING, 0, ggio,
1254					    "Local request failed (%s), trying remote node. ",
1255					    strerror(errno));
1256				} else if (ret != ggio->gctl_length) {
1257					reqlog(LOG_WARNING, 0, ggio,
1258					    "Local request failed (%zd != %jd), trying remote node. ",
1259					    ret, (intmax_t)ggio->gctl_length);
1260				}
1261				QUEUE_INSERT1(hio, send, rncomp);
1262				continue;
1263			}
1264			break;
1265		case BIO_WRITE:
1266			ret = pwrite(res->hr_localfd, ggio->gctl_data,
1267			    ggio->gctl_length,
1268			    ggio->gctl_offset + res->hr_localoff);
1269			if (ret < 0) {
1270				hio->hio_errors[ncomp] = errno;
1271				reqlog(LOG_WARNING, 0, ggio,
1272				    "Local request failed (%s): ",
1273				    strerror(errno));
1274			} else if (ret != ggio->gctl_length) {
1275				hio->hio_errors[ncomp] = EIO;
1276				reqlog(LOG_WARNING, 0, ggio,
1277				    "Local request failed (%zd != %jd): ",
1278				    ret, (intmax_t)ggio->gctl_length);
1279			} else {
1280				hio->hio_errors[ncomp] = 0;
1281			}
1282			break;
1283		case BIO_DELETE:
1284			ret = g_delete(res->hr_localfd,
1285			    ggio->gctl_offset + res->hr_localoff,
1286			    ggio->gctl_length);
1287			if (ret < 0) {
1288				hio->hio_errors[ncomp] = errno;
1289				reqlog(LOG_WARNING, 0, ggio,
1290				    "Local request failed (%s): ",
1291				    strerror(errno));
1292			} else {
1293				hio->hio_errors[ncomp] = 0;
1294			}
1295			break;
1296		case BIO_FLUSH:
1297			ret = g_flush(res->hr_localfd);
1298			if (ret < 0) {
1299				hio->hio_errors[ncomp] = errno;
1300				reqlog(LOG_WARNING, 0, ggio,
1301				    "Local request failed (%s): ",
1302				    strerror(errno));
1303			} else {
1304				hio->hio_errors[ncomp] = 0;
1305			}
1306			break;
1307		}
1308		if (refcount_release(&hio->hio_countdown)) {
1309			if (ISSYNCREQ(hio)) {
1310				mtx_lock(&sync_lock);
1311				SYNCREQDONE(hio);
1312				mtx_unlock(&sync_lock);
1313				cv_signal(&sync_cond);
1314			} else {
1315				pjdlog_debug(2,
1316				    "local_send: (%p) Moving request to the done queue.",
1317				    hio);
1318				QUEUE_INSERT2(hio, done);
1319			}
1320		}
1321	}
1322	/* NOTREACHED */
1323	return (NULL);
1324}
1325
1326static void
1327keepalive_send(struct hast_resource *res, unsigned int ncomp)
1328{
1329	struct nv *nv;
1330
1331	rw_rlock(&hio_remote_lock[ncomp]);
1332
1333	if (!ISCONNECTED(res, ncomp)) {
1334		rw_unlock(&hio_remote_lock[ncomp]);
1335		return;
1336	}
1337
1338	PJDLOG_ASSERT(res->hr_remotein != NULL);
1339	PJDLOG_ASSERT(res->hr_remoteout != NULL);
1340
1341	nv = nv_alloc();
1342	nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
1343	if (nv_error(nv) != 0) {
1344		rw_unlock(&hio_remote_lock[ncomp]);
1345		nv_free(nv);
1346		pjdlog_debug(1,
1347		    "keepalive_send: Unable to prepare header to send.");
1348		return;
1349	}
1350	if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) {
1351		rw_unlock(&hio_remote_lock[ncomp]);
1352		pjdlog_common(LOG_DEBUG, 1, errno,
1353		    "keepalive_send: Unable to send request");
1354		nv_free(nv);
1355		remote_close(res, ncomp);
1356		return;
1357	}
1358
1359	rw_unlock(&hio_remote_lock[ncomp]);
1360	nv_free(nv);
1361	pjdlog_debug(2, "keepalive_send: Request sent.");
1362}
1363
1364/*
1365 * Thread sends request to secondary node.
1366 */
1367static void *
1368remote_send_thread(void *arg)
1369{
1370	struct hast_resource *res = arg;
1371	struct g_gate_ctl_io *ggio;
1372	time_t lastcheck, now;
1373	struct hio *hio;
1374	struct nv *nv;
1375	unsigned int ncomp;
1376	bool wakeup;
1377	uint64_t offset, length;
1378	uint8_t cmd;
1379	void *data;
1380
1381	/* Remote component is 1 for now. */
1382	ncomp = 1;
1383	lastcheck = time(NULL);
1384
1385	for (;;) {
1386		pjdlog_debug(2, "remote_send: Taking request.");
1387		QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE);
1388		if (hio == NULL) {
1389			now = time(NULL);
1390			if (lastcheck + HAST_KEEPALIVE <= now) {
1391				keepalive_send(res, ncomp);
1392				lastcheck = now;
1393			}
1394			continue;
1395		}
1396		pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1397		ggio = &hio->hio_ggio;
1398		switch (ggio->gctl_cmd) {
1399		case BIO_READ:
1400			cmd = HIO_READ;
1401			data = NULL;
1402			offset = ggio->gctl_offset;
1403			length = ggio->gctl_length;
1404			break;
1405		case BIO_WRITE:
1406			cmd = HIO_WRITE;
1407			data = ggio->gctl_data;
1408			offset = ggio->gctl_offset;
1409			length = ggio->gctl_length;
1410			break;
1411		case BIO_DELETE:
1412			cmd = HIO_DELETE;
1413			data = NULL;
1414			offset = ggio->gctl_offset;
1415			length = ggio->gctl_length;
1416			break;
1417		case BIO_FLUSH:
1418			cmd = HIO_FLUSH;
1419			data = NULL;
1420			offset = 0;
1421			length = 0;
1422			break;
1423		default:
1424			PJDLOG_ASSERT(!"invalid condition");
1425			abort();
1426		}
1427		nv = nv_alloc();
1428		nv_add_uint8(nv, cmd, "cmd");
1429		nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1430		nv_add_uint64(nv, offset, "offset");
1431		nv_add_uint64(nv, length, "length");
1432		if (nv_error(nv) != 0) {
1433			hio->hio_errors[ncomp] = nv_error(nv);
1434			pjdlog_debug(2,
1435			    "remote_send: (%p) Unable to prepare header to send.",
1436			    hio);
1437			reqlog(LOG_ERR, 0, ggio,
1438			    "Unable to prepare header to send (%s): ",
1439			    strerror(nv_error(nv)));
1440			/* Move failed request immediately to the done queue. */
1441			goto done_queue;
1442		}
1443		pjdlog_debug(2,
1444		    "remote_send: (%p) Moving request to the recv queue.",
1445		    hio);
1446		/*
1447		 * Protect connection from disappearing.
1448		 */
1449		rw_rlock(&hio_remote_lock[ncomp]);
1450		if (!ISCONNECTED(res, ncomp)) {
1451			rw_unlock(&hio_remote_lock[ncomp]);
1452			hio->hio_errors[ncomp] = ENOTCONN;
1453			goto done_queue;
1454		}
1455		/*
1456		 * Move the request to recv queue before sending it, because
1457		 * in different order we can get reply before we move request
1458		 * to recv queue.
1459		 */
1460		mtx_lock(&hio_recv_list_lock[ncomp]);
1461		wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1462		TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1463		mtx_unlock(&hio_recv_list_lock[ncomp]);
1464		if (hast_proto_send(res, res->hr_remoteout, nv, data,
1465		    data != NULL ? length : 0) < 0) {
1466			hio->hio_errors[ncomp] = errno;
1467			rw_unlock(&hio_remote_lock[ncomp]);
1468			pjdlog_debug(2,
1469			    "remote_send: (%p) Unable to send request.", hio);
1470			reqlog(LOG_ERR, 0, ggio,
1471			    "Unable to send request (%s): ",
1472			    strerror(hio->hio_errors[ncomp]));
1473			remote_close(res, ncomp);
1474			/*
1475			 * Take request back from the receive queue and move
1476			 * it immediately to the done queue.
1477			 */
1478			mtx_lock(&hio_recv_list_lock[ncomp]);
1479			TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1480			mtx_unlock(&hio_recv_list_lock[ncomp]);
1481			goto done_queue;
1482		}
1483		rw_unlock(&hio_remote_lock[ncomp]);
1484		nv_free(nv);
1485		if (wakeup)
1486			cv_signal(&hio_recv_list_cond[ncomp]);
1487		continue;
1488done_queue:
1489		nv_free(nv);
1490		if (ISSYNCREQ(hio)) {
1491			if (!refcount_release(&hio->hio_countdown))
1492				continue;
1493			mtx_lock(&sync_lock);
1494			SYNCREQDONE(hio);
1495			mtx_unlock(&sync_lock);
1496			cv_signal(&sync_cond);
1497			continue;
1498		}
1499		if (ggio->gctl_cmd == BIO_WRITE) {
1500			mtx_lock(&res->hr_amp_lock);
1501			if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1502			    ggio->gctl_length)) {
1503				(void)hast_activemap_flush(res);
1504			}
1505			mtx_unlock(&res->hr_amp_lock);
1506		}
1507		if (!refcount_release(&hio->hio_countdown))
1508			continue;
1509		pjdlog_debug(2,
1510		    "remote_send: (%p) Moving request to the done queue.",
1511		    hio);
1512		QUEUE_INSERT2(hio, done);
1513	}
1514	/* NOTREACHED */
1515	return (NULL);
1516}
1517
1518/*
1519 * Thread receives answer from secondary node and passes it to ggate_send
1520 * thread.
1521 */
1522static void *
1523remote_recv_thread(void *arg)
1524{
1525	struct hast_resource *res = arg;
1526	struct g_gate_ctl_io *ggio;
1527	struct hio *hio;
1528	struct nv *nv;
1529	unsigned int ncomp;
1530	uint64_t seq;
1531	int error;
1532
1533	/* Remote component is 1 for now. */
1534	ncomp = 1;
1535
1536	for (;;) {
1537		/* Wait until there is anything to receive. */
1538		mtx_lock(&hio_recv_list_lock[ncomp]);
1539		while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1540			pjdlog_debug(2, "remote_recv: No requests, waiting.");
1541			cv_wait(&hio_recv_list_cond[ncomp],
1542			    &hio_recv_list_lock[ncomp]);
1543		}
1544		mtx_unlock(&hio_recv_list_lock[ncomp]);
1545		rw_rlock(&hio_remote_lock[ncomp]);
1546		if (!ISCONNECTED(res, ncomp)) {
1547			rw_unlock(&hio_remote_lock[ncomp]);
1548			/*
1549			 * Connection is dead, so move all pending requests to
1550			 * the done queue (one-by-one).
1551			 */
1552			mtx_lock(&hio_recv_list_lock[ncomp]);
1553			hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1554			PJDLOG_ASSERT(hio != NULL);
1555			TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1556			    hio_next[ncomp]);
1557			mtx_unlock(&hio_recv_list_lock[ncomp]);
1558			goto done_queue;
1559		}
1560		if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) {
1561			pjdlog_errno(LOG_ERR,
1562			    "Unable to receive reply header");
1563			rw_unlock(&hio_remote_lock[ncomp]);
1564			remote_close(res, ncomp);
1565			continue;
1566		}
1567		rw_unlock(&hio_remote_lock[ncomp]);
1568		seq = nv_get_uint64(nv, "seq");
1569		if (seq == 0) {
1570			pjdlog_error("Header contains no 'seq' field.");
1571			nv_free(nv);
1572			continue;
1573		}
1574		mtx_lock(&hio_recv_list_lock[ncomp]);
1575		TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1576			if (hio->hio_ggio.gctl_seq == seq) {
1577				TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1578				    hio_next[ncomp]);
1579				break;
1580			}
1581		}
1582		mtx_unlock(&hio_recv_list_lock[ncomp]);
1583		if (hio == NULL) {
1584			pjdlog_error("Found no request matching received 'seq' field (%ju).",
1585			    (uintmax_t)seq);
1586			nv_free(nv);
1587			continue;
1588		}
1589		error = nv_get_int16(nv, "error");
1590		if (error != 0) {
1591			/* Request failed on remote side. */
1592			hio->hio_errors[ncomp] = error;
1593			reqlog(LOG_WARNING, 0, &hio->hio_ggio,
1594			    "Remote request failed (%s): ", strerror(error));
1595			nv_free(nv);
1596			goto done_queue;
1597		}
1598		ggio = &hio->hio_ggio;
1599		switch (ggio->gctl_cmd) {
1600		case BIO_READ:
1601			rw_rlock(&hio_remote_lock[ncomp]);
1602			if (!ISCONNECTED(res, ncomp)) {
1603				rw_unlock(&hio_remote_lock[ncomp]);
1604				nv_free(nv);
1605				goto done_queue;
1606			}
1607			if (hast_proto_recv_data(res, res->hr_remotein, nv,
1608			    ggio->gctl_data, ggio->gctl_length) < 0) {
1609				hio->hio_errors[ncomp] = errno;
1610				pjdlog_errno(LOG_ERR,
1611				    "Unable to receive reply data");
1612				rw_unlock(&hio_remote_lock[ncomp]);
1613				nv_free(nv);
1614				remote_close(res, ncomp);
1615				goto done_queue;
1616			}
1617			rw_unlock(&hio_remote_lock[ncomp]);
1618			break;
1619		case BIO_WRITE:
1620		case BIO_DELETE:
1621		case BIO_FLUSH:
1622			break;
1623		default:
1624			PJDLOG_ASSERT(!"invalid condition");
1625			abort();
1626		}
1627		hio->hio_errors[ncomp] = 0;
1628		nv_free(nv);
1629done_queue:
1630		if (refcount_release(&hio->hio_countdown)) {
1631			if (ISSYNCREQ(hio)) {
1632				mtx_lock(&sync_lock);
1633				SYNCREQDONE(hio);
1634				mtx_unlock(&sync_lock);
1635				cv_signal(&sync_cond);
1636			} else {
1637				pjdlog_debug(2,
1638				    "remote_recv: (%p) Moving request to the done queue.",
1639				    hio);
1640				QUEUE_INSERT2(hio, done);
1641			}
1642		}
1643	}
1644	/* NOTREACHED */
1645	return (NULL);
1646}
1647
1648/*
1649 * Thread sends answer to the kernel.
1650 */
1651static void *
1652ggate_send_thread(void *arg)
1653{
1654	struct hast_resource *res = arg;
1655	struct g_gate_ctl_io *ggio;
1656	struct hio *hio;
1657	unsigned int ii, ncomp, ncomps;
1658
1659	ncomps = HAST_NCOMPONENTS;
1660
1661	for (;;) {
1662		pjdlog_debug(2, "ggate_send: Taking request.");
1663		QUEUE_TAKE2(hio, done);
1664		pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1665		ggio = &hio->hio_ggio;
1666		for (ii = 0; ii < ncomps; ii++) {
1667			if (hio->hio_errors[ii] == 0) {
1668				/*
1669				 * One successful request is enough to declare
1670				 * success.
1671				 */
1672				ggio->gctl_error = 0;
1673				break;
1674			}
1675		}
1676		if (ii == ncomps) {
1677			/*
1678			 * None of the requests were successful.
1679			 * Use the error from local component except the
1680			 * case when we did only remote request.
1681			 */
1682			if (ggio->gctl_cmd == BIO_READ &&
1683			    res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
1684				ggio->gctl_error = hio->hio_errors[1];
1685			else
1686				ggio->gctl_error = hio->hio_errors[0];
1687		}
1688		if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1689			mtx_lock(&res->hr_amp_lock);
1690			if (activemap_write_complete(res->hr_amp,
1691				ggio->gctl_offset, ggio->gctl_length)) {
1692				res->hr_stat_activemap_update++;
1693				(void)hast_activemap_flush(res);
1694			}
1695			mtx_unlock(&res->hr_amp_lock);
1696		}
1697		if (ggio->gctl_cmd == BIO_WRITE) {
1698			/*
1699			 * Unlock range we locked.
1700			 */
1701			mtx_lock(&range_lock);
1702			rangelock_del(range_regular, ggio->gctl_offset,
1703			    ggio->gctl_length);
1704			if (range_sync_wait)
1705				cv_signal(&range_sync_cond);
1706			mtx_unlock(&range_lock);
1707			/*
1708			 * Bump local count if this is first write after
1709			 * connection failure with remote node.
1710			 */
1711			ncomp = 1;
1712			rw_rlock(&hio_remote_lock[ncomp]);
1713			if (!ISCONNECTED(res, ncomp)) {
1714				mtx_lock(&metadata_lock);
1715				if (res->hr_primary_localcnt ==
1716				    res->hr_secondary_remotecnt) {
1717					res->hr_primary_localcnt++;
1718					pjdlog_debug(1,
1719					    "Increasing localcnt to %ju.",
1720					    (uintmax_t)res->hr_primary_localcnt);
1721					(void)metadata_write(res);
1722				}
1723				mtx_unlock(&metadata_lock);
1724			}
1725			rw_unlock(&hio_remote_lock[ncomp]);
1726		}
1727		if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
1728			primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1729		pjdlog_debug(2,
1730		    "ggate_send: (%p) Moving request to the free queue.", hio);
1731		QUEUE_INSERT2(hio, free);
1732	}
1733	/* NOTREACHED */
1734	return (NULL);
1735}
1736
1737/*
1738 * Thread synchronize local and remote components.
1739 */
1740static void *
1741sync_thread(void *arg __unused)
1742{
1743	struct hast_resource *res = arg;
1744	struct hio *hio;
1745	struct g_gate_ctl_io *ggio;
1746	struct timeval tstart, tend, tdiff;
1747	unsigned int ii, ncomp, ncomps;
1748	off_t offset, length, synced;
1749	bool dorewind;
1750	int syncext;
1751
1752	ncomps = HAST_NCOMPONENTS;
1753	dorewind = true;
1754	synced = 0;
1755	offset = -1;
1756
1757	for (;;) {
1758		mtx_lock(&sync_lock);
1759		if (offset >= 0 && !sync_inprogress) {
1760			gettimeofday(&tend, NULL);
1761			timersub(&tend, &tstart, &tdiff);
1762			pjdlog_info("Synchronization interrupted after %#.0T. "
1763			    "%NB synchronized so far.", &tdiff,
1764			    (intmax_t)synced);
1765			event_send(res, EVENT_SYNCINTR);
1766		}
1767		while (!sync_inprogress) {
1768			dorewind = true;
1769			synced = 0;
1770			cv_wait(&sync_cond, &sync_lock);
1771		}
1772		mtx_unlock(&sync_lock);
1773		/*
1774		 * Obtain offset at which we should synchronize.
1775		 * Rewind synchronization if needed.
1776		 */
1777		mtx_lock(&res->hr_amp_lock);
1778		if (dorewind)
1779			activemap_sync_rewind(res->hr_amp);
1780		offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1781		if (syncext != -1) {
1782			/*
1783			 * We synchronized entire syncext extent, we can mark
1784			 * it as clean now.
1785			 */
1786			if (activemap_extent_complete(res->hr_amp, syncext))
1787				(void)hast_activemap_flush(res);
1788		}
1789		mtx_unlock(&res->hr_amp_lock);
1790		if (dorewind) {
1791			dorewind = false;
1792			if (offset < 0)
1793				pjdlog_info("Nodes are in sync.");
1794			else {
1795				pjdlog_info("Synchronization started. %NB to go.",
1796				    (intmax_t)(res->hr_extentsize *
1797				    activemap_ndirty(res->hr_amp)));
1798				event_send(res, EVENT_SYNCSTART);
1799				gettimeofday(&tstart, NULL);
1800			}
1801		}
1802		if (offset < 0) {
1803			sync_stop();
1804			pjdlog_debug(1, "Nothing to synchronize.");
1805			/*
1806			 * Synchronization complete, make both localcnt and
1807			 * remotecnt equal.
1808			 */
1809			ncomp = 1;
1810			rw_rlock(&hio_remote_lock[ncomp]);
1811			if (ISCONNECTED(res, ncomp)) {
1812				if (synced > 0) {
1813					int64_t bps;
1814
1815					gettimeofday(&tend, NULL);
1816					timersub(&tend, &tstart, &tdiff);
1817					bps = (int64_t)((double)synced /
1818					    ((double)tdiff.tv_sec +
1819					    (double)tdiff.tv_usec / 1000000));
1820					pjdlog_info("Synchronization complete. "
1821					    "%NB synchronized in %#.0lT (%NB/sec).",
1822					    (intmax_t)synced, &tdiff,
1823					    (intmax_t)bps);
1824					event_send(res, EVENT_SYNCDONE);
1825				}
1826				mtx_lock(&metadata_lock);
1827				res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1828				res->hr_primary_localcnt =
1829				    res->hr_secondary_remotecnt;
1830				res->hr_primary_remotecnt =
1831				    res->hr_secondary_localcnt;
1832				pjdlog_debug(1,
1833				    "Setting localcnt to %ju and remotecnt to %ju.",
1834				    (uintmax_t)res->hr_primary_localcnt,
1835				    (uintmax_t)res->hr_primary_remotecnt);
1836				(void)metadata_write(res);
1837				mtx_unlock(&metadata_lock);
1838			}
1839			rw_unlock(&hio_remote_lock[ncomp]);
1840			continue;
1841		}
1842		pjdlog_debug(2, "sync: Taking free request.");
1843		QUEUE_TAKE2(hio, free);
1844		pjdlog_debug(2, "sync: (%p) Got free request.", hio);
1845		/*
1846		 * Lock the range we are going to synchronize. We don't want
1847		 * race where someone writes between our read and write.
1848		 */
1849		for (;;) {
1850			mtx_lock(&range_lock);
1851			if (rangelock_islocked(range_regular, offset, length)) {
1852				pjdlog_debug(2,
1853				    "sync: Range offset=%jd length=%jd locked.",
1854				    (intmax_t)offset, (intmax_t)length);
1855				range_sync_wait = true;
1856				cv_wait(&range_sync_cond, &range_lock);
1857				range_sync_wait = false;
1858				mtx_unlock(&range_lock);
1859				continue;
1860			}
1861			if (rangelock_add(range_sync, offset, length) < 0) {
1862				mtx_unlock(&range_lock);
1863				pjdlog_debug(2,
1864				    "sync: Range offset=%jd length=%jd is already locked, waiting.",
1865				    (intmax_t)offset, (intmax_t)length);
1866				sleep(1);
1867				continue;
1868			}
1869			mtx_unlock(&range_lock);
1870			break;
1871		}
1872		/*
1873		 * First read the data from synchronization source.
1874		 */
1875		SYNCREQ(hio);
1876		ggio = &hio->hio_ggio;
1877		ggio->gctl_cmd = BIO_READ;
1878		ggio->gctl_offset = offset;
1879		ggio->gctl_length = length;
1880		ggio->gctl_error = 0;
1881		for (ii = 0; ii < ncomps; ii++)
1882			hio->hio_errors[ii] = EINVAL;
1883		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1884		    hio);
1885		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1886		    hio);
1887		mtx_lock(&metadata_lock);
1888		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1889			/*
1890			 * This range is up-to-date on local component,
1891			 * so handle request locally.
1892			 */
1893			 /* Local component is 0 for now. */
1894			ncomp = 0;
1895		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1896			PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1897			/*
1898			 * This range is out-of-date on local component,
1899			 * so send request to the remote node.
1900			 */
1901			 /* Remote component is 1 for now. */
1902			ncomp = 1;
1903		}
1904		mtx_unlock(&metadata_lock);
1905		refcount_init(&hio->hio_countdown, 1);
1906		QUEUE_INSERT1(hio, send, ncomp);
1907
1908		/*
1909		 * Let's wait for READ to finish.
1910		 */
1911		mtx_lock(&sync_lock);
1912		while (!ISSYNCREQDONE(hio))
1913			cv_wait(&sync_cond, &sync_lock);
1914		mtx_unlock(&sync_lock);
1915
1916		if (hio->hio_errors[ncomp] != 0) {
1917			pjdlog_error("Unable to read synchronization data: %s.",
1918			    strerror(hio->hio_errors[ncomp]));
1919			goto free_queue;
1920		}
1921
1922		/*
1923		 * We read the data from synchronization source, now write it
1924		 * to synchronization target.
1925		 */
1926		SYNCREQ(hio);
1927		ggio->gctl_cmd = BIO_WRITE;
1928		for (ii = 0; ii < ncomps; ii++)
1929			hio->hio_errors[ii] = EINVAL;
1930		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1931		    hio);
1932		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1933		    hio);
1934		mtx_lock(&metadata_lock);
1935		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1936			/*
1937			 * This range is up-to-date on local component,
1938			 * so we update remote component.
1939			 */
1940			 /* Remote component is 1 for now. */
1941			ncomp = 1;
1942		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1943			PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1944			/*
1945			 * This range is out-of-date on local component,
1946			 * so we update it.
1947			 */
1948			 /* Local component is 0 for now. */
1949			ncomp = 0;
1950		}
1951		mtx_unlock(&metadata_lock);
1952
1953		pjdlog_debug(2, "sync: (%p) Moving request to the send queues.",
1954		    hio);
1955		refcount_init(&hio->hio_countdown, 1);
1956		QUEUE_INSERT1(hio, send, ncomp);
1957
1958		/*
1959		 * Let's wait for WRITE to finish.
1960		 */
1961		mtx_lock(&sync_lock);
1962		while (!ISSYNCREQDONE(hio))
1963			cv_wait(&sync_cond, &sync_lock);
1964		mtx_unlock(&sync_lock);
1965
1966		if (hio->hio_errors[ncomp] != 0) {
1967			pjdlog_error("Unable to write synchronization data: %s.",
1968			    strerror(hio->hio_errors[ncomp]));
1969			goto free_queue;
1970		}
1971
1972		synced += length;
1973free_queue:
1974		mtx_lock(&range_lock);
1975		rangelock_del(range_sync, offset, length);
1976		if (range_regular_wait)
1977			cv_signal(&range_regular_cond);
1978		mtx_unlock(&range_lock);
1979		pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
1980		    hio);
1981		QUEUE_INSERT2(hio, free);
1982	}
1983	/* NOTREACHED */
1984	return (NULL);
1985}
1986
1987void
1988primary_config_reload(struct hast_resource *res, struct nv *nv)
1989{
1990	unsigned int ii, ncomps;
1991	int modified, vint;
1992	const char *vstr;
1993
1994	pjdlog_info("Reloading configuration...");
1995
1996	PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY);
1997	PJDLOG_ASSERT(gres == res);
1998	nv_assert(nv, "remoteaddr");
1999	nv_assert(nv, "sourceaddr");
2000	nv_assert(nv, "replication");
2001	nv_assert(nv, "checksum");
2002	nv_assert(nv, "compression");
2003	nv_assert(nv, "timeout");
2004	nv_assert(nv, "exec");
2005
2006	ncomps = HAST_NCOMPONENTS;
2007
2008#define MODIFIED_REMOTEADDR	0x01
2009#define MODIFIED_SOURCEADDR	0x02
2010#define MODIFIED_REPLICATION	0x04
2011#define MODIFIED_CHECKSUM	0x08
2012#define MODIFIED_COMPRESSION	0x10
2013#define MODIFIED_TIMEOUT	0x20
2014#define MODIFIED_EXEC		0x40
2015	modified = 0;
2016
2017	vstr = nv_get_string(nv, "remoteaddr");
2018	if (strcmp(gres->hr_remoteaddr, vstr) != 0) {
2019		/*
2020		 * Don't copy res->hr_remoteaddr to gres just yet.
2021		 * We want remote_close() to log disconnect from the old
2022		 * addresses, not from the new ones.
2023		 */
2024		modified |= MODIFIED_REMOTEADDR;
2025	}
2026	vstr = nv_get_string(nv, "sourceaddr");
2027	if (strcmp(gres->hr_sourceaddr, vstr) != 0) {
2028		strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr));
2029		modified |= MODIFIED_SOURCEADDR;
2030	}
2031	vint = nv_get_int32(nv, "replication");
2032	if (gres->hr_replication != vint) {
2033		gres->hr_replication = vint;
2034		modified |= MODIFIED_REPLICATION;
2035	}
2036	vint = nv_get_int32(nv, "checksum");
2037	if (gres->hr_checksum != vint) {
2038		gres->hr_checksum = vint;
2039		modified |= MODIFIED_CHECKSUM;
2040	}
2041	vint = nv_get_int32(nv, "compression");
2042	if (gres->hr_compression != vint) {
2043		gres->hr_compression = vint;
2044		modified |= MODIFIED_COMPRESSION;
2045	}
2046	vint = nv_get_int32(nv, "timeout");
2047	if (gres->hr_timeout != vint) {
2048		gres->hr_timeout = vint;
2049		modified |= MODIFIED_TIMEOUT;
2050	}
2051	vstr = nv_get_string(nv, "exec");
2052	if (strcmp(gres->hr_exec, vstr) != 0) {
2053		strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec));
2054		modified |= MODIFIED_EXEC;
2055	}
2056
2057	/*
2058	 * Change timeout for connected sockets.
2059	 * Don't bother if we need to reconnect.
2060	 */
2061	if ((modified & MODIFIED_TIMEOUT) != 0 &&
2062	    (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
2063	    MODIFIED_REPLICATION)) == 0) {
2064		for (ii = 0; ii < ncomps; ii++) {
2065			if (!ISREMOTE(ii))
2066				continue;
2067			rw_rlock(&hio_remote_lock[ii]);
2068			if (!ISCONNECTED(gres, ii)) {
2069				rw_unlock(&hio_remote_lock[ii]);
2070				continue;
2071			}
2072			rw_unlock(&hio_remote_lock[ii]);
2073			if (proto_timeout(gres->hr_remotein,
2074			    gres->hr_timeout) < 0) {
2075				pjdlog_errno(LOG_WARNING,
2076				    "Unable to set connection timeout");
2077			}
2078			if (proto_timeout(gres->hr_remoteout,
2079			    gres->hr_timeout) < 0) {
2080				pjdlog_errno(LOG_WARNING,
2081				    "Unable to set connection timeout");
2082			}
2083		}
2084	}
2085	if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
2086	    MODIFIED_REPLICATION)) != 0) {
2087		for (ii = 0; ii < ncomps; ii++) {
2088			if (!ISREMOTE(ii))
2089				continue;
2090			remote_close(gres, ii);
2091		}
2092		if (modified & MODIFIED_REMOTEADDR) {
2093			vstr = nv_get_string(nv, "remoteaddr");
2094			strlcpy(gres->hr_remoteaddr, vstr,
2095			    sizeof(gres->hr_remoteaddr));
2096		}
2097	}
2098#undef	MODIFIED_REMOTEADDR
2099#undef	MODIFIED_SOURCEADDR
2100#undef	MODIFIED_REPLICATION
2101#undef	MODIFIED_CHECKSUM
2102#undef	MODIFIED_COMPRESSION
2103#undef	MODIFIED_TIMEOUT
2104#undef	MODIFIED_EXEC
2105
2106	pjdlog_info("Configuration reloaded successfully.");
2107}
2108
2109static void
2110guard_one(struct hast_resource *res, unsigned int ncomp)
2111{
2112	struct proto_conn *in, *out;
2113
2114	if (!ISREMOTE(ncomp))
2115		return;
2116
2117	rw_rlock(&hio_remote_lock[ncomp]);
2118
2119	if (!real_remote(res)) {
2120		rw_unlock(&hio_remote_lock[ncomp]);
2121		return;
2122	}
2123
2124	if (ISCONNECTED(res, ncomp)) {
2125		PJDLOG_ASSERT(res->hr_remotein != NULL);
2126		PJDLOG_ASSERT(res->hr_remoteout != NULL);
2127		rw_unlock(&hio_remote_lock[ncomp]);
2128		pjdlog_debug(2, "remote_guard: Connection to %s is ok.",
2129		    res->hr_remoteaddr);
2130		return;
2131	}
2132
2133	PJDLOG_ASSERT(res->hr_remotein == NULL);
2134	PJDLOG_ASSERT(res->hr_remoteout == NULL);
2135	/*
2136	 * Upgrade the lock. It doesn't have to be atomic as no other thread
2137	 * can change connection status from disconnected to connected.
2138	 */
2139	rw_unlock(&hio_remote_lock[ncomp]);
2140	pjdlog_debug(2, "remote_guard: Reconnecting to %s.",
2141	    res->hr_remoteaddr);
2142	in = out = NULL;
2143	if (init_remote(res, &in, &out) == 0) {
2144		rw_wlock(&hio_remote_lock[ncomp]);
2145		PJDLOG_ASSERT(res->hr_remotein == NULL);
2146		PJDLOG_ASSERT(res->hr_remoteout == NULL);
2147		PJDLOG_ASSERT(in != NULL && out != NULL);
2148		res->hr_remotein = in;
2149		res->hr_remoteout = out;
2150		rw_unlock(&hio_remote_lock[ncomp]);
2151		pjdlog_info("Successfully reconnected to %s.",
2152		    res->hr_remoteaddr);
2153		sync_start();
2154	} else {
2155		/* Both connections should be NULL. */
2156		PJDLOG_ASSERT(res->hr_remotein == NULL);
2157		PJDLOG_ASSERT(res->hr_remoteout == NULL);
2158		PJDLOG_ASSERT(in == NULL && out == NULL);
2159		pjdlog_debug(2, "remote_guard: Reconnect to %s failed.",
2160		    res->hr_remoteaddr);
2161	}
2162}
2163
2164/*
2165 * Thread guards remote connections and reconnects when needed, handles
2166 * signals, etc.
2167 */
2168static void *
2169guard_thread(void *arg)
2170{
2171	struct hast_resource *res = arg;
2172	unsigned int ii, ncomps;
2173	struct timespec timeout;
2174	time_t lastcheck, now;
2175	sigset_t mask;
2176	int signo;
2177
2178	ncomps = HAST_NCOMPONENTS;
2179	lastcheck = time(NULL);
2180
2181	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
2182	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
2183	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
2184
2185	timeout.tv_sec = HAST_KEEPALIVE;
2186	timeout.tv_nsec = 0;
2187	signo = -1;
2188
2189	for (;;) {
2190		switch (signo) {
2191		case SIGINT:
2192		case SIGTERM:
2193			sigexit_received = true;
2194			primary_exitx(EX_OK,
2195			    "Termination signal received, exiting.");
2196			break;
2197		default:
2198			break;
2199		}
2200
2201		/*
2202		 * Don't check connections until we fully started,
2203		 * as we may still be looping, waiting for remote node
2204		 * to switch from primary to secondary.
2205		 */
2206		if (fullystarted) {
2207			pjdlog_debug(2, "remote_guard: Checking connections.");
2208			now = time(NULL);
2209			if (lastcheck + HAST_KEEPALIVE <= now) {
2210				for (ii = 0; ii < ncomps; ii++)
2211					guard_one(res, ii);
2212				lastcheck = now;
2213			}
2214		}
2215		signo = sigtimedwait(&mask, NULL, &timeout);
2216	}
2217	/* NOTREACHED */
2218	return (NULL);
2219}
2220