primary.c revision 247866
1218887Sdim/*-
2218887Sdim * Copyright (c) 2009 The FreeBSD Foundation
3218887Sdim * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net>
4218887Sdim * All rights reserved.
5218887Sdim *
6218887Sdim * This software was developed by Pawel Jakub Dawidek under sponsorship from
7218887Sdim * the FreeBSD Foundation.
8218887Sdim *
9218887Sdim * Redistribution and use in source and binary forms, with or without
10218887Sdim * modification, are permitted provided that the following conditions
11218887Sdim * are met:
12218887Sdim * 1. Redistributions of source code must retain the above copyright
13218887Sdim *    notice, this list of conditions and the following disclaimer.
14218887Sdim * 2. Redistributions in binary form must reproduce the above copyright
15221345Sdim *    notice, this list of conditions and the following disclaimer in the
16234353Sdim *    documentation and/or other materials provided with the distribution.
17249423Sdim *
18221345Sdim * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19221345Sdim * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20221345Sdim * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21234353Sdim * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22249423Sdim * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23218887Sdim * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24218887Sdim * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25218887Sdim * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26218887Sdim * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27218887Sdim * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28221345Sdim * SUCH DAMAGE.
29221345Sdim */
30239462Sdim
31239462Sdim#include <sys/cdefs.h>
32234353Sdim__FBSDID("$FreeBSD: stable/9/sbin/hastd/primary.c 247866 2013-03-06 06:57:18Z trociny $");
33234353Sdim
34221345Sdim#include <sys/types.h>
35239462Sdim#include <sys/time.h>
36239462Sdim#include <sys/bio.h>
37239462Sdim#include <sys/disk.h>
38218887Sdim#include <sys/refcount.h>
39226633Sdim#include <sys/stat.h>
40226633Sdim
41239462Sdim#include <geom/gate/g_gate.h>
42218887Sdim
43243830Sdim#include <err.h>
44226633Sdim#include <errno.h>
45234353Sdim#include <fcntl.h>
46234353Sdim#include <libgeom.h>
47234353Sdim#include <pthread.h>
48218887Sdim#include <signal.h>
49218887Sdim#include <stdint.h>
50218887Sdim#include <stdio.h>
51243830Sdim#include <string.h>
52234353Sdim#include <sysexits.h>
53234353Sdim#include <unistd.h>
54234353Sdim
55234353Sdim#include <activemap.h>
56234353Sdim#include <nv.h>
57234353Sdim#include <rangelock.h>
58218887Sdim
59218887Sdim#include "control.h"
60218887Sdim#include "event.h"
61234353Sdim#include "hast.h"
62218887Sdim#include "hast_proto.h"
63218887Sdim#include "hastd.h"
64218887Sdim#include "hooks.h"
65218887Sdim#include "metadata.h"
66218887Sdim#include "proto.h"
67218887Sdim#include "pjdlog.h"
68218887Sdim#include "subr.h"
69234353Sdim#include "synch.h"
70218887Sdim
71218887Sdim/* The is only one remote component for now. */
72218887Sdim#define	ISREMOTE(no)	((no) == 1)
73218887Sdim
74218887Sdimstruct hio {
75218887Sdim	/*
76218887Sdim	 * Number of components we are still waiting for.
77218887Sdim	 * When this field goes to 0, we can send the request back to the
78218887Sdim	 * kernel. Each component has to decrease this counter by one
79249423Sdim	 * even on failure.
80249423Sdim	 */
81249423Sdim	unsigned int		 hio_countdown;
82249423Sdim	/*
83249423Sdim	 * Each component has a place to store its own error.
84249423Sdim	 * Once the request is handled by all components we can decide if the
85249423Sdim	 * request overall is successful or not.
86249423Sdim	 */
87218887Sdim	int			*hio_errors;
88218887Sdim	/*
89218887Sdim	 * Structure used to communicate with GEOM Gate class.
90239462Sdim	 */
91239462Sdim	struct g_gate_ctl_io	 hio_ggio;
92239462Sdim	/*
93239462Sdim	 * Request was already confirmed to GEOM Gate.
94239462Sdim	 */
95239462Sdim	bool			 hio_done;
96239462Sdim	/*
97239462Sdim	 * Remember replication from the time the request was initiated,
98239462Sdim	 * so we won't get confused when replication changes on reload.
99239462Sdim	 */
100239462Sdim	int			 hio_replication;
101239462Sdim	TAILQ_ENTRY(hio)	*hio_next;
102239462Sdim};
103243830Sdim#define	hio_free_next	hio_next[0]
104243830Sdim#define	hio_done_next	hio_next[0]
105239462Sdim
106239462Sdim/*
107239462Sdim * Free list holds unused structures. When free list is empty, we have to wait
108239462Sdim * until some in-progress requests are freed.
109239462Sdim */
110239462Sdimstatic TAILQ_HEAD(, hio) hio_free_list;
111239462Sdimstatic pthread_mutex_t hio_free_list_lock;
112239462Sdimstatic pthread_cond_t hio_free_list_cond;
113239462Sdim/*
114239462Sdim * There is one send list for every component. One requests is placed on all
115239462Sdim * send lists - each component gets the same request, but each component is
116239462Sdim * responsible for managing his own send list.
117239462Sdim */
118239462Sdimstatic TAILQ_HEAD(, hio) *hio_send_list;
119239462Sdimstatic pthread_mutex_t *hio_send_list_lock;
120239462Sdimstatic pthread_cond_t *hio_send_list_cond;
121239462Sdim/*
122239462Sdim * There is one recv list for every component, although local components don't
123239462Sdim * use recv lists as local requests are done synchronously.
124239462Sdim */
125239462Sdimstatic TAILQ_HEAD(, hio) *hio_recv_list;
126239462Sdimstatic pthread_mutex_t *hio_recv_list_lock;
127239462Sdimstatic pthread_cond_t *hio_recv_list_cond;
128243830Sdim/*
129243830Sdim * Request is placed on done list by the slowest component (the one that
130239462Sdim * decreased hio_countdown from 1 to 0).
131239462Sdim */
132239462Sdimstatic TAILQ_HEAD(, hio) hio_done_list;
133239462Sdimstatic pthread_mutex_t hio_done_list_lock;
134239462Sdimstatic pthread_cond_t hio_done_list_cond;
135239462Sdim/*
136243830Sdim * Structure below are for interaction with sync thread.
137243830Sdim */
138239462Sdimstatic bool sync_inprogress;
139239462Sdimstatic pthread_mutex_t sync_lock;
140239462Sdimstatic pthread_cond_t sync_cond;
141239462Sdim/*
142243830Sdim * The lock below allows to synchornize access to remote connections.
143239462Sdim */
144239462Sdimstatic pthread_rwlock_t *hio_remote_lock;
145243830Sdim
146243830Sdim/*
147239462Sdim * Lock to synchronize metadata updates. Also synchronize access to
148239462Sdim * hr_primary_localcnt and hr_primary_remotecnt fields.
149239462Sdim */
150239462Sdimstatic pthread_mutex_t metadata_lock;
151239462Sdim
152243830Sdim/*
153243830Sdim * Maximum number of outstanding I/O requests.
154243830Sdim */
155243830Sdim#define	HAST_HIO_MAX	256
156239462Sdim/*
157239462Sdim * Number of components. At this point there are only two components: local
158239462Sdim * and remote, but in the future it might be possible to use multiple local
159239462Sdim * and remote components.
160239462Sdim */
161239462Sdim#define	HAST_NCOMPONENTS	2
162243830Sdim
163239462Sdim#define	ISCONNECTED(res, no)	\
164239462Sdim	((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
165239462Sdim
166239462Sdim#define	QUEUE_INSERT1(hio, name, ncomp)	do {				\
167239462Sdim	bool _wakeup;							\
168249423Sdim									\
169239462Sdim	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
170239462Sdim	_wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]);		\
171239462Sdim	TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio),		\
172239462Sdim	    hio_next[(ncomp)]);						\
173239462Sdim	mtx_unlock(&hio_##name##_list_lock[ncomp]);			\
174243830Sdim	if (_wakeup)							\
175239462Sdim		cv_signal(&hio_##name##_list_cond[(ncomp)]);		\
176239462Sdim} while (0)
177226633Sdim#define	QUEUE_INSERT2(hio, name)	do {				\
178221345Sdim	bool _wakeup;							\
179218887Sdim									\
180218887Sdim	mtx_lock(&hio_##name##_list_lock);				\
181218887Sdim	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
182218887Sdim	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
183221345Sdim	mtx_unlock(&hio_##name##_list_lock);				\
184218887Sdim	if (_wakeup)							\
185226633Sdim		cv_signal(&hio_##name##_list_cond);			\
186226633Sdim} while (0)
187249423Sdim#define	QUEUE_TAKE1(hio, name, ncomp, timeout)	do {			\
188243830Sdim	bool _last;							\
189243830Sdim									\
190218887Sdim	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
191218887Sdim	_last = false;							\
192218887Sdim	while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
193218887Sdim		cv_timedwait(&hio_##name##_list_cond[(ncomp)],		\
194249423Sdim		    &hio_##name##_list_lock[(ncomp)], (timeout));	\
195218887Sdim		if ((timeout) != 0)					\
196218887Sdim			_last = true;					\
197249423Sdim	}								\
198218887Sdim	if (hio != NULL) {						\
199218887Sdim		TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio),	\
200234353Sdim		    hio_next[(ncomp)]);					\
201239462Sdim	}								\
202234353Sdim	mtx_unlock(&hio_##name##_list_lock[(ncomp)]);			\
203218887Sdim} while (0)
204218887Sdim#define	QUEUE_TAKE2(hio, name)	do {					\
205218887Sdim	mtx_lock(&hio_##name##_list_lock);				\
206218887Sdim	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
207218887Sdim		cv_wait(&hio_##name##_list_cond,			\
208239462Sdim		    &hio_##name##_list_lock);				\
209239462Sdim	}								\
210239462Sdim	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next);	\
211218887Sdim	mtx_unlock(&hio_##name##_list_lock);				\
212239462Sdim} while (0)
213239462Sdim
214239462Sdim#define	SYNCREQ(hio)		do {					\
215239462Sdim	(hio)->hio_ggio.gctl_unit = -1;					\
216239462Sdim	(hio)->hio_ggio.gctl_seq = 1;					\
217239462Sdim} while (0)
218239462Sdim#define	ISSYNCREQ(hio)		((hio)->hio_ggio.gctl_unit == -1)
219239462Sdim#define	SYNCREQDONE(hio)	do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
220218887Sdim#define	ISSYNCREQDONE(hio)	((hio)->hio_ggio.gctl_unit == -2)
221239462Sdim
222239462Sdimstatic struct hast_resource *gres;
223239462Sdim
224218887Sdimstatic pthread_mutex_t range_lock;
225239462Sdimstatic struct rangelocks *range_regular;
226239462Sdimstatic bool range_regular_wait;
227239462Sdimstatic pthread_cond_t range_regular_cond;
228239462Sdimstatic struct rangelocks *range_sync;
229239462Sdimstatic bool range_sync_wait;
230218887Sdimstatic pthread_cond_t range_sync_cond;
231239462Sdimstatic bool fullystarted;
232239462Sdim
233239462Sdimstatic void *ggate_recv_thread(void *arg);
234239462Sdimstatic void *local_send_thread(void *arg);
235218887Sdimstatic void *remote_send_thread(void *arg);
236239462Sdimstatic void *remote_recv_thread(void *arg);
237239462Sdimstatic void *ggate_send_thread(void *arg);
238218887Sdimstatic void *sync_thread(void *arg);
239239462Sdimstatic void *guard_thread(void *arg);
240218887Sdim
241239462Sdimstatic void
242249423Sdimcleanup(struct hast_resource *res)
243249423Sdim{
244234353Sdim	int rerrno;
245239462Sdim
246239462Sdim	/* Remember errno. */
247239462Sdim	rerrno = errno;
248218887Sdim
249218887Sdim	/* Destroy ggate provider if we created one. */
250239462Sdim	if (res->hr_ggateunit >= 0) {
251239462Sdim		struct g_gate_ctl_destroy ggiod;
252239462Sdim
253239462Sdim		bzero(&ggiod, sizeof(ggiod));
254239462Sdim		ggiod.gctl_version = G_GATE_VERSION;
255239462Sdim		ggiod.gctl_unit = res->hr_ggateunit;
256239462Sdim		ggiod.gctl_force = 1;
257218887Sdim		if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) {
258218887Sdim			pjdlog_errno(LOG_WARNING,
259218887Sdim			    "Unable to destroy hast/%s device",
260239462Sdim			    res->hr_provname);
261239462Sdim		}
262239462Sdim		res->hr_ggateunit = -1;
263239462Sdim	}
264239462Sdim
265239462Sdim	/* Restore errno. */
266239462Sdim	errno = rerrno;
267239462Sdim}
268239462Sdim
269239462Sdimstatic __dead2 void
270239462Sdimprimary_exit(int exitcode, const char *fmt, ...)
271239462Sdim{
272239462Sdim	va_list ap;
273239462Sdim
274239462Sdim	PJDLOG_ASSERT(exitcode != EX_OK);
275239462Sdim	va_start(ap, fmt);
276239462Sdim	pjdlogv_errno(LOG_ERR, fmt, ap);
277218887Sdim	va_end(ap);
278221345Sdim	cleanup(gres);
279221345Sdim	exit(exitcode);
280221345Sdim}
281221345Sdim
282static __dead2 void
283primary_exitx(int exitcode, const char *fmt, ...)
284{
285	va_list ap;
286
287	va_start(ap, fmt);
288	pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
289	va_end(ap);
290	cleanup(gres);
291	exit(exitcode);
292}
293
294static int
295hast_activemap_flush(struct hast_resource *res)
296{
297	const unsigned char *buf;
298	size_t size;
299
300	buf = activemap_bitmap(res->hr_amp, &size);
301	PJDLOG_ASSERT(buf != NULL);
302	PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0);
303	if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
304	    (ssize_t)size) {
305		pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk");
306		res->hr_stat_activemap_write_error++;
307		return (-1);
308	}
309	if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) {
310		if (errno == EOPNOTSUPP) {
311			pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.",
312			    res->hr_localpath);
313			res->hr_metaflush = 0;
314		} else {
315			pjdlog_errno(LOG_ERR,
316			    "Unable to flush disk cache on activemap update");
317			res->hr_stat_activemap_flush_error++;
318			return (-1);
319		}
320	}
321	return (0);
322}
323
324static bool
325real_remote(const struct hast_resource *res)
326{
327
328	return (strcmp(res->hr_remoteaddr, "none") != 0);
329}
330
331static void
332init_environment(struct hast_resource *res __unused)
333{
334	struct hio *hio;
335	unsigned int ii, ncomps;
336
337	/*
338	 * In the future it might be per-resource value.
339	 */
340	ncomps = HAST_NCOMPONENTS;
341
342	/*
343	 * Allocate memory needed by lists.
344	 */
345	hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
346	if (hio_send_list == NULL) {
347		primary_exitx(EX_TEMPFAIL,
348		    "Unable to allocate %zu bytes of memory for send lists.",
349		    sizeof(hio_send_list[0]) * ncomps);
350	}
351	hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
352	if (hio_send_list_lock == NULL) {
353		primary_exitx(EX_TEMPFAIL,
354		    "Unable to allocate %zu bytes of memory for send list locks.",
355		    sizeof(hio_send_list_lock[0]) * ncomps);
356	}
357	hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
358	if (hio_send_list_cond == NULL) {
359		primary_exitx(EX_TEMPFAIL,
360		    "Unable to allocate %zu bytes of memory for send list condition variables.",
361		    sizeof(hio_send_list_cond[0]) * ncomps);
362	}
363	hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
364	if (hio_recv_list == NULL) {
365		primary_exitx(EX_TEMPFAIL,
366		    "Unable to allocate %zu bytes of memory for recv lists.",
367		    sizeof(hio_recv_list[0]) * ncomps);
368	}
369	hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
370	if (hio_recv_list_lock == NULL) {
371		primary_exitx(EX_TEMPFAIL,
372		    "Unable to allocate %zu bytes of memory for recv list locks.",
373		    sizeof(hio_recv_list_lock[0]) * ncomps);
374	}
375	hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
376	if (hio_recv_list_cond == NULL) {
377		primary_exitx(EX_TEMPFAIL,
378		    "Unable to allocate %zu bytes of memory for recv list condition variables.",
379		    sizeof(hio_recv_list_cond[0]) * ncomps);
380	}
381	hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
382	if (hio_remote_lock == NULL) {
383		primary_exitx(EX_TEMPFAIL,
384		    "Unable to allocate %zu bytes of memory for remote connections locks.",
385		    sizeof(hio_remote_lock[0]) * ncomps);
386	}
387
388	/*
389	 * Initialize lists, their locks and theirs condition variables.
390	 */
391	TAILQ_INIT(&hio_free_list);
392	mtx_init(&hio_free_list_lock);
393	cv_init(&hio_free_list_cond);
394	for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
395		TAILQ_INIT(&hio_send_list[ii]);
396		mtx_init(&hio_send_list_lock[ii]);
397		cv_init(&hio_send_list_cond[ii]);
398		TAILQ_INIT(&hio_recv_list[ii]);
399		mtx_init(&hio_recv_list_lock[ii]);
400		cv_init(&hio_recv_list_cond[ii]);
401		rw_init(&hio_remote_lock[ii]);
402	}
403	TAILQ_INIT(&hio_done_list);
404	mtx_init(&hio_done_list_lock);
405	cv_init(&hio_done_list_cond);
406	mtx_init(&metadata_lock);
407
408	/*
409	 * Allocate requests pool and initialize requests.
410	 */
411	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
412		hio = malloc(sizeof(*hio));
413		if (hio == NULL) {
414			primary_exitx(EX_TEMPFAIL,
415			    "Unable to allocate %zu bytes of memory for hio request.",
416			    sizeof(*hio));
417		}
418		hio->hio_countdown = 0;
419		hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
420		if (hio->hio_errors == NULL) {
421			primary_exitx(EX_TEMPFAIL,
422			    "Unable allocate %zu bytes of memory for hio errors.",
423			    sizeof(hio->hio_errors[0]) * ncomps);
424		}
425		hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
426		if (hio->hio_next == NULL) {
427			primary_exitx(EX_TEMPFAIL,
428			    "Unable allocate %zu bytes of memory for hio_next field.",
429			    sizeof(hio->hio_next[0]) * ncomps);
430		}
431		hio->hio_ggio.gctl_version = G_GATE_VERSION;
432		hio->hio_ggio.gctl_data = malloc(MAXPHYS);
433		if (hio->hio_ggio.gctl_data == NULL) {
434			primary_exitx(EX_TEMPFAIL,
435			    "Unable to allocate %zu bytes of memory for gctl_data.",
436			    MAXPHYS);
437		}
438		hio->hio_ggio.gctl_length = MAXPHYS;
439		hio->hio_ggio.gctl_error = 0;
440		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
441	}
442}
443
444static bool
445init_resuid(struct hast_resource *res)
446{
447
448	mtx_lock(&metadata_lock);
449	if (res->hr_resuid != 0) {
450		mtx_unlock(&metadata_lock);
451		return (false);
452	} else {
453		/* Initialize unique resource identifier. */
454		arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
455		mtx_unlock(&metadata_lock);
456		if (metadata_write(res) == -1)
457			exit(EX_NOINPUT);
458		return (true);
459	}
460}
461
462static void
463init_local(struct hast_resource *res)
464{
465	unsigned char *buf;
466	size_t mapsize;
467
468	if (metadata_read(res, true) == -1)
469		exit(EX_NOINPUT);
470	mtx_init(&res->hr_amp_lock);
471	if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
472	    res->hr_local_sectorsize, res->hr_keepdirty) == -1) {
473		primary_exit(EX_TEMPFAIL, "Unable to create activemap");
474	}
475	mtx_init(&range_lock);
476	cv_init(&range_regular_cond);
477	if (rangelock_init(&range_regular) == -1)
478		primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
479	cv_init(&range_sync_cond);
480	if (rangelock_init(&range_sync) == -1)
481		primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
482	mapsize = activemap_ondisk_size(res->hr_amp);
483	buf = calloc(1, mapsize);
484	if (buf == NULL) {
485		primary_exitx(EX_TEMPFAIL,
486		    "Unable to allocate buffer for activemap.");
487	}
488	if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
489	    (ssize_t)mapsize) {
490		primary_exit(EX_NOINPUT, "Unable to read activemap");
491	}
492	activemap_copyin(res->hr_amp, buf, mapsize);
493	free(buf);
494	if (res->hr_resuid != 0)
495		return;
496	/*
497	 * We're using provider for the first time. Initialize local and remote
498	 * counters. We don't initialize resuid here, as we want to do it just
499	 * in time. The reason for this is that we want to inform secondary
500	 * that there were no writes yet, so there is no need to synchronize
501	 * anything.
502	 */
503	res->hr_primary_localcnt = 0;
504	res->hr_primary_remotecnt = 0;
505	if (metadata_write(res) == -1)
506		exit(EX_NOINPUT);
507}
508
509static int
510primary_connect(struct hast_resource *res, struct proto_conn **connp)
511{
512	struct proto_conn *conn;
513	int16_t val;
514
515	val = 1;
516	if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) {
517		primary_exit(EX_TEMPFAIL,
518		    "Unable to send connection request to parent");
519	}
520	if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) {
521		primary_exit(EX_TEMPFAIL,
522		    "Unable to receive reply to connection request from parent");
523	}
524	if (val != 0) {
525		errno = val;
526		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
527		    res->hr_remoteaddr);
528		return (-1);
529	}
530	if (proto_connection_recv(res->hr_conn, true, &conn) == -1) {
531		primary_exit(EX_TEMPFAIL,
532		    "Unable to receive connection from parent");
533	}
534	if (proto_connect_wait(conn, res->hr_timeout) == -1) {
535		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
536		    res->hr_remoteaddr);
537		proto_close(conn);
538		return (-1);
539	}
540	/* Error in setting timeout is not critical, but why should it fail? */
541	if (proto_timeout(conn, res->hr_timeout) == -1)
542		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
543
544	*connp = conn;
545
546	return (0);
547}
548
549/*
550 * Function instructs GEOM_GATE to handle reads directly from within the kernel.
551 */
552static void
553enable_direct_reads(struct hast_resource *res)
554{
555	struct g_gate_ctl_modify ggiomodify;
556
557	bzero(&ggiomodify, sizeof(ggiomodify));
558	ggiomodify.gctl_version = G_GATE_VERSION;
559	ggiomodify.gctl_unit = res->hr_ggateunit;
560	ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET;
561	strlcpy(ggiomodify.gctl_readprov, res->hr_localpath,
562	    sizeof(ggiomodify.gctl_readprov));
563	ggiomodify.gctl_readoffset = res->hr_localoff;
564	if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0)
565		pjdlog_debug(1, "Direct reads enabled.");
566	else
567		pjdlog_errno(LOG_WARNING, "Failed to enable direct reads");
568}
569
570static int
571init_remote(struct hast_resource *res, struct proto_conn **inp,
572    struct proto_conn **outp)
573{
574	struct proto_conn *in, *out;
575	struct nv *nvout, *nvin;
576	const unsigned char *token;
577	unsigned char *map;
578	const char *errmsg;
579	int32_t extentsize;
580	int64_t datasize;
581	uint32_t mapsize;
582	size_t size;
583	int error;
584
585	PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
586	PJDLOG_ASSERT(real_remote(res));
587
588	in = out = NULL;
589	errmsg = NULL;
590
591	if (primary_connect(res, &out) == -1)
592		return (ECONNREFUSED);
593
594	error = ECONNABORTED;
595
596	/*
597	 * First handshake step.
598	 * Setup outgoing connection with remote node.
599	 */
600	nvout = nv_alloc();
601	nv_add_string(nvout, res->hr_name, "resource");
602	if (nv_error(nvout) != 0) {
603		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
604		    "Unable to allocate header for connection with %s",
605		    res->hr_remoteaddr);
606		nv_free(nvout);
607		goto close;
608	}
609	if (hast_proto_send(res, out, nvout, NULL, 0) == -1) {
610		pjdlog_errno(LOG_WARNING,
611		    "Unable to send handshake header to %s",
612		    res->hr_remoteaddr);
613		nv_free(nvout);
614		goto close;
615	}
616	nv_free(nvout);
617	if (hast_proto_recv_hdr(out, &nvin) == -1) {
618		pjdlog_errno(LOG_WARNING,
619		    "Unable to receive handshake header from %s",
620		    res->hr_remoteaddr);
621		goto close;
622	}
623	errmsg = nv_get_string(nvin, "errmsg");
624	if (errmsg != NULL) {
625		pjdlog_warning("%s", errmsg);
626		if (nv_exists(nvin, "wait"))
627			error = EBUSY;
628		nv_free(nvin);
629		goto close;
630	}
631	token = nv_get_uint8_array(nvin, &size, "token");
632	if (token == NULL) {
633		pjdlog_warning("Handshake header from %s has no 'token' field.",
634		    res->hr_remoteaddr);
635		nv_free(nvin);
636		goto close;
637	}
638	if (size != sizeof(res->hr_token)) {
639		pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
640		    res->hr_remoteaddr, size, sizeof(res->hr_token));
641		nv_free(nvin);
642		goto close;
643	}
644	bcopy(token, res->hr_token, sizeof(res->hr_token));
645	nv_free(nvin);
646
647	/*
648	 * Second handshake step.
649	 * Setup incoming connection with remote node.
650	 */
651	if (primary_connect(res, &in) == -1)
652		goto close;
653
654	nvout = nv_alloc();
655	nv_add_string(nvout, res->hr_name, "resource");
656	nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
657	    "token");
658	if (res->hr_resuid == 0) {
659		/*
660		 * The resuid field was not yet initialized.
661		 * Because we do synchronization inside init_resuid(), it is
662		 * possible that someone already initialized it, the function
663		 * will return false then, but if we successfully initialized
664		 * it, we will get true. True means that there were no writes
665		 * to this resource yet and we want to inform secondary that
666		 * synchronization is not needed by sending "virgin" argument.
667		 */
668		if (init_resuid(res))
669			nv_add_int8(nvout, 1, "virgin");
670	}
671	nv_add_uint64(nvout, res->hr_resuid, "resuid");
672	nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
673	nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
674	if (nv_error(nvout) != 0) {
675		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
676		    "Unable to allocate header for connection with %s",
677		    res->hr_remoteaddr);
678		nv_free(nvout);
679		goto close;
680	}
681	if (hast_proto_send(res, in, nvout, NULL, 0) == -1) {
682		pjdlog_errno(LOG_WARNING,
683		    "Unable to send handshake header to %s",
684		    res->hr_remoteaddr);
685		nv_free(nvout);
686		goto close;
687	}
688	nv_free(nvout);
689	if (hast_proto_recv_hdr(out, &nvin) == -1) {
690		pjdlog_errno(LOG_WARNING,
691		    "Unable to receive handshake header from %s",
692		    res->hr_remoteaddr);
693		goto close;
694	}
695	errmsg = nv_get_string(nvin, "errmsg");
696	if (errmsg != NULL) {
697		pjdlog_warning("%s", errmsg);
698		nv_free(nvin);
699		goto close;
700	}
701	datasize = nv_get_int64(nvin, "datasize");
702	if (datasize != res->hr_datasize) {
703		pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
704		    (intmax_t)res->hr_datasize, (intmax_t)datasize);
705		nv_free(nvin);
706		goto close;
707	}
708	extentsize = nv_get_int32(nvin, "extentsize");
709	if (extentsize != res->hr_extentsize) {
710		pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
711		    (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
712		nv_free(nvin);
713		goto close;
714	}
715	res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
716	res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
717	res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
718	if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY)
719		enable_direct_reads(res);
720	if (nv_exists(nvin, "virgin")) {
721		/*
722		 * Secondary was reinitialized, bump localcnt if it is 0 as
723		 * only we have the data.
724		 */
725		PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY);
726		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
727
728		if (res->hr_primary_localcnt == 0) {
729			PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0);
730
731			mtx_lock(&metadata_lock);
732			res->hr_primary_localcnt++;
733			pjdlog_debug(1, "Increasing localcnt to %ju.",
734			    (uintmax_t)res->hr_primary_localcnt);
735			(void)metadata_write(res);
736			mtx_unlock(&metadata_lock);
737		}
738	}
739	map = NULL;
740	mapsize = nv_get_uint32(nvin, "mapsize");
741	if (mapsize > 0) {
742		map = malloc(mapsize);
743		if (map == NULL) {
744			pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
745			    (uintmax_t)mapsize);
746			nv_free(nvin);
747			goto close;
748		}
749		/*
750		 * Remote node have some dirty extents on its own, lets
751		 * download its activemap.
752		 */
753		if (hast_proto_recv_data(res, out, nvin, map,
754		    mapsize) == -1) {
755			pjdlog_errno(LOG_ERR,
756			    "Unable to receive remote activemap");
757			nv_free(nvin);
758			free(map);
759			goto close;
760		}
761		/*
762		 * Merge local and remote bitmaps.
763		 */
764		activemap_merge(res->hr_amp, map, mapsize);
765		free(map);
766		/*
767		 * Now that we merged bitmaps from both nodes, flush it to the
768		 * disk before we start to synchronize.
769		 */
770		(void)hast_activemap_flush(res);
771	}
772	nv_free(nvin);
773#ifdef notyet
774	/* Setup directions. */
775	if (proto_send(out, NULL, 0) == -1)
776		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
777	if (proto_recv(in, NULL, 0) == -1)
778		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
779#endif
780	pjdlog_info("Connected to %s.", res->hr_remoteaddr);
781	if (inp != NULL && outp != NULL) {
782		*inp = in;
783		*outp = out;
784	} else {
785		res->hr_remotein = in;
786		res->hr_remoteout = out;
787	}
788	event_send(res, EVENT_CONNECT);
789	return (0);
790close:
791	if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
792		event_send(res, EVENT_SPLITBRAIN);
793	proto_close(out);
794	if (in != NULL)
795		proto_close(in);
796	return (error);
797}
798
799static void
800sync_start(void)
801{
802
803	mtx_lock(&sync_lock);
804	sync_inprogress = true;
805	mtx_unlock(&sync_lock);
806	cv_signal(&sync_cond);
807}
808
809static void
810sync_stop(void)
811{
812
813	mtx_lock(&sync_lock);
814	if (sync_inprogress)
815		sync_inprogress = false;
816	mtx_unlock(&sync_lock);
817}
818
819static void
820init_ggate(struct hast_resource *res)
821{
822	struct g_gate_ctl_create ggiocreate;
823	struct g_gate_ctl_cancel ggiocancel;
824
825	/*
826	 * We communicate with ggate via /dev/ggctl. Open it.
827	 */
828	res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
829	if (res->hr_ggatefd == -1)
830		primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
831	/*
832	 * Create provider before trying to connect, as connection failure
833	 * is not critical, but may take some time.
834	 */
835	bzero(&ggiocreate, sizeof(ggiocreate));
836	ggiocreate.gctl_version = G_GATE_VERSION;
837	ggiocreate.gctl_mediasize = res->hr_datasize;
838	ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
839	ggiocreate.gctl_flags = 0;
840	ggiocreate.gctl_maxcount = 0;
841	ggiocreate.gctl_timeout = 0;
842	ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
843	snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
844	    res->hr_provname);
845	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
846		pjdlog_info("Device hast/%s created.", res->hr_provname);
847		res->hr_ggateunit = ggiocreate.gctl_unit;
848		return;
849	}
850	if (errno != EEXIST) {
851		primary_exit(EX_OSERR, "Unable to create hast/%s device",
852		    res->hr_provname);
853	}
854	pjdlog_debug(1,
855	    "Device hast/%s already exists, we will try to take it over.",
856	    res->hr_provname);
857	/*
858	 * If we received EEXIST, we assume that the process who created the
859	 * provider died and didn't clean up. In that case we will start from
860	 * where he left of.
861	 */
862	bzero(&ggiocancel, sizeof(ggiocancel));
863	ggiocancel.gctl_version = G_GATE_VERSION;
864	ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
865	snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
866	    res->hr_provname);
867	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
868		pjdlog_info("Device hast/%s recovered.", res->hr_provname);
869		res->hr_ggateunit = ggiocancel.gctl_unit;
870		return;
871	}
872	primary_exit(EX_OSERR, "Unable to take over hast/%s device",
873	    res->hr_provname);
874}
875
876void
877hastd_primary(struct hast_resource *res)
878{
879	pthread_t td;
880	pid_t pid;
881	int error, mode, debuglevel;
882
883	/*
884	 * Create communication channel for sending control commands from
885	 * parent to child.
886	 */
887	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
888		/* TODO: There's no need for this to be fatal error. */
889		KEEP_ERRNO((void)pidfile_remove(pfh));
890		pjdlog_exit(EX_OSERR,
891		    "Unable to create control sockets between parent and child");
892	}
893	/*
894	 * Create communication channel for sending events from child to parent.
895	 */
896	if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
897		/* TODO: There's no need for this to be fatal error. */
898		KEEP_ERRNO((void)pidfile_remove(pfh));
899		pjdlog_exit(EX_OSERR,
900		    "Unable to create event sockets between child and parent");
901	}
902	/*
903	 * Create communication channel for sending connection requests from
904	 * child to parent.
905	 */
906	if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) {
907		/* TODO: There's no need for this to be fatal error. */
908		KEEP_ERRNO((void)pidfile_remove(pfh));
909		pjdlog_exit(EX_OSERR,
910		    "Unable to create connection sockets between child and parent");
911	}
912
913	pid = fork();
914	if (pid == -1) {
915		/* TODO: There's no need for this to be fatal error. */
916		KEEP_ERRNO((void)pidfile_remove(pfh));
917		pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
918	}
919
920	if (pid > 0) {
921		/* This is parent. */
922		/* Declare that we are receiver. */
923		proto_recv(res->hr_event, NULL, 0);
924		proto_recv(res->hr_conn, NULL, 0);
925		/* Declare that we are sender. */
926		proto_send(res->hr_ctrl, NULL, 0);
927		res->hr_workerpid = pid;
928		return;
929	}
930
931	gres = res;
932	mode = pjdlog_mode_get();
933	debuglevel = pjdlog_debug_get();
934
935	/* Declare that we are sender. */
936	proto_send(res->hr_event, NULL, 0);
937	proto_send(res->hr_conn, NULL, 0);
938	/* Declare that we are receiver. */
939	proto_recv(res->hr_ctrl, NULL, 0);
940	descriptors_cleanup(res);
941
942	descriptors_assert(res, mode);
943
944	pjdlog_init(mode);
945	pjdlog_debug_set(debuglevel);
946	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
947	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
948
949	init_local(res);
950	init_ggate(res);
951	init_environment(res);
952
953	if (drop_privs(res) != 0) {
954		cleanup(res);
955		exit(EX_CONFIG);
956	}
957	pjdlog_info("Privileges successfully dropped.");
958
959	/*
960	 * Create the guard thread first, so we can handle signals from the
961	 * very beginning.
962	 */
963	error = pthread_create(&td, NULL, guard_thread, res);
964	PJDLOG_ASSERT(error == 0);
965	/*
966	 * Create the control thread before sending any event to the parent,
967	 * as we can deadlock when parent sends control request to worker,
968	 * but worker has no control thread started yet, so parent waits.
969	 * In the meantime worker sends an event to the parent, but parent
970	 * is unable to handle the event, because it waits for control
971	 * request response.
972	 */
973	error = pthread_create(&td, NULL, ctrl_thread, res);
974	PJDLOG_ASSERT(error == 0);
975	if (real_remote(res)) {
976		error = init_remote(res, NULL, NULL);
977		if (error == 0) {
978			sync_start();
979		} else if (error == EBUSY) {
980			time_t start = time(NULL);
981
982			pjdlog_warning("Waiting for remote node to become %s for %ds.",
983			    role2str(HAST_ROLE_SECONDARY),
984			    res->hr_timeout);
985			for (;;) {
986				sleep(1);
987				error = init_remote(res, NULL, NULL);
988				if (error != EBUSY)
989					break;
990				if (time(NULL) > start + res->hr_timeout)
991					break;
992			}
993			if (error == EBUSY) {
994				pjdlog_warning("Remote node is still %s, starting anyway.",
995				    role2str(HAST_ROLE_PRIMARY));
996			}
997		}
998	}
999	error = pthread_create(&td, NULL, ggate_recv_thread, res);
1000	PJDLOG_ASSERT(error == 0);
1001	error = pthread_create(&td, NULL, local_send_thread, res);
1002	PJDLOG_ASSERT(error == 0);
1003	error = pthread_create(&td, NULL, remote_send_thread, res);
1004	PJDLOG_ASSERT(error == 0);
1005	error = pthread_create(&td, NULL, remote_recv_thread, res);
1006	PJDLOG_ASSERT(error == 0);
1007	error = pthread_create(&td, NULL, ggate_send_thread, res);
1008	PJDLOG_ASSERT(error == 0);
1009	fullystarted = true;
1010	(void)sync_thread(res);
1011}
1012
1013static void
1014reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
1015{
1016	char msg[1024];
1017	va_list ap;
1018
1019	va_start(ap, fmt);
1020	(void)vsnprintf(msg, sizeof(msg), fmt, ap);
1021	va_end(ap);
1022	switch (ggio->gctl_cmd) {
1023	case BIO_READ:
1024		(void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
1025		    (uintmax_t)ggio->gctl_offset,
1026		    (uintmax_t)ggio->gctl_length);
1027		break;
1028	case BIO_DELETE:
1029		(void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
1030		    (uintmax_t)ggio->gctl_offset,
1031		    (uintmax_t)ggio->gctl_length);
1032		break;
1033	case BIO_FLUSH:
1034		(void)snprlcat(msg, sizeof(msg), "FLUSH.");
1035		break;
1036	case BIO_WRITE:
1037		(void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
1038		    (uintmax_t)ggio->gctl_offset,
1039		    (uintmax_t)ggio->gctl_length);
1040		break;
1041	default:
1042		(void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
1043		    (unsigned int)ggio->gctl_cmd);
1044		break;
1045	}
1046	pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
1047}
1048
1049static void
1050remote_close(struct hast_resource *res, int ncomp)
1051{
1052
1053	rw_wlock(&hio_remote_lock[ncomp]);
1054	/*
1055	 * Check for a race between dropping rlock and acquiring wlock -
1056	 * another thread can close connection in-between.
1057	 */
1058	if (!ISCONNECTED(res, ncomp)) {
1059		PJDLOG_ASSERT(res->hr_remotein == NULL);
1060		PJDLOG_ASSERT(res->hr_remoteout == NULL);
1061		rw_unlock(&hio_remote_lock[ncomp]);
1062		return;
1063	}
1064
1065	PJDLOG_ASSERT(res->hr_remotein != NULL);
1066	PJDLOG_ASSERT(res->hr_remoteout != NULL);
1067
1068	pjdlog_debug(2, "Closing incoming connection to %s.",
1069	    res->hr_remoteaddr);
1070	proto_close(res->hr_remotein);
1071	res->hr_remotein = NULL;
1072	pjdlog_debug(2, "Closing outgoing connection to %s.",
1073	    res->hr_remoteaddr);
1074	proto_close(res->hr_remoteout);
1075	res->hr_remoteout = NULL;
1076
1077	rw_unlock(&hio_remote_lock[ncomp]);
1078
1079	pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
1080
1081	/*
1082	 * Stop synchronization if in-progress.
1083	 */
1084	sync_stop();
1085
1086	event_send(res, EVENT_DISCONNECT);
1087}
1088
1089/*
1090 * Acknowledge write completion to the kernel, but don't update activemap yet.
1091 */
1092static void
1093write_complete(struct hast_resource *res, struct hio *hio)
1094{
1095	struct g_gate_ctl_io *ggio;
1096	unsigned int ncomp;
1097
1098	PJDLOG_ASSERT(!hio->hio_done);
1099
1100	ggio = &hio->hio_ggio;
1101	PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
1102
1103	/*
1104	 * Bump local count if this is first write after
1105	 * connection failure with remote node.
1106	 */
1107	ncomp = 1;
1108	rw_rlock(&hio_remote_lock[ncomp]);
1109	if (!ISCONNECTED(res, ncomp)) {
1110		mtx_lock(&metadata_lock);
1111		if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
1112			res->hr_primary_localcnt++;
1113			pjdlog_debug(1, "Increasing localcnt to %ju.",
1114			    (uintmax_t)res->hr_primary_localcnt);
1115			(void)metadata_write(res);
1116		}
1117		mtx_unlock(&metadata_lock);
1118	}
1119	rw_unlock(&hio_remote_lock[ncomp]);
1120	if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1)
1121		primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1122	hio->hio_done = true;
1123}
1124
1125/*
1126 * Thread receives ggate I/O requests from the kernel and passes them to
1127 * appropriate threads:
1128 * WRITE - always goes to both local_send and remote_send threads
1129 * READ (when the block is up-to-date on local component) -
1130 *	only local_send thread
1131 * READ (when the block isn't up-to-date on local component) -
1132 *	only remote_send thread
1133 * DELETE - always goes to both local_send and remote_send threads
1134 * FLUSH - always goes to both local_send and remote_send threads
1135 */
1136static void *
1137ggate_recv_thread(void *arg)
1138{
1139	struct hast_resource *res = arg;
1140	struct g_gate_ctl_io *ggio;
1141	struct hio *hio;
1142	unsigned int ii, ncomp, ncomps;
1143	int error;
1144
1145	for (;;) {
1146		pjdlog_debug(2, "ggate_recv: Taking free request.");
1147		QUEUE_TAKE2(hio, free);
1148		pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
1149		ggio = &hio->hio_ggio;
1150		ggio->gctl_unit = res->hr_ggateunit;
1151		ggio->gctl_length = MAXPHYS;
1152		ggio->gctl_error = 0;
1153		hio->hio_done = false;
1154		hio->hio_replication = res->hr_replication;
1155		pjdlog_debug(2,
1156		    "ggate_recv: (%p) Waiting for request from the kernel.",
1157		    hio);
1158		if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) {
1159			if (sigexit_received)
1160				pthread_exit(NULL);
1161			primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
1162		}
1163		error = ggio->gctl_error;
1164		switch (error) {
1165		case 0:
1166			break;
1167		case ECANCELED:
1168			/* Exit gracefully. */
1169			if (!sigexit_received) {
1170				pjdlog_debug(2,
1171				    "ggate_recv: (%p) Received cancel from the kernel.",
1172				    hio);
1173				pjdlog_info("Received cancel from the kernel, exiting.");
1174			}
1175			pthread_exit(NULL);
1176		case ENOMEM:
1177			/*
1178			 * Buffer too small? Impossible, we allocate MAXPHYS
1179			 * bytes - request can't be bigger than that.
1180			 */
1181			/* FALLTHROUGH */
1182		case ENXIO:
1183		default:
1184			primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
1185			    strerror(error));
1186		}
1187
1188		ncomp = 0;
1189		ncomps = HAST_NCOMPONENTS;
1190
1191		for (ii = 0; ii < ncomps; ii++)
1192			hio->hio_errors[ii] = EINVAL;
1193		reqlog(LOG_DEBUG, 2, ggio,
1194		    "ggate_recv: (%p) Request received from the kernel: ",
1195		    hio);
1196
1197		/*
1198		 * Inform all components about new write request.
1199		 * For read request prefer local component unless the given
1200		 * range is out-of-date, then use remote component.
1201		 */
1202		switch (ggio->gctl_cmd) {
1203		case BIO_READ:
1204			res->hr_stat_read++;
1205			ncomps = 1;
1206			mtx_lock(&metadata_lock);
1207			if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
1208			    res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1209				/*
1210				 * This range is up-to-date on local component,
1211				 * so handle request locally.
1212				 */
1213				 /* Local component is 0 for now. */
1214				ncomp = 0;
1215			} else /* if (res->hr_syncsrc ==
1216			    HAST_SYNCSRC_SECONDARY) */ {
1217				PJDLOG_ASSERT(res->hr_syncsrc ==
1218				    HAST_SYNCSRC_SECONDARY);
1219				/*
1220				 * This range is out-of-date on local component,
1221				 * so send request to the remote node.
1222				 */
1223				 /* Remote component is 1 for now. */
1224				ncomp = 1;
1225			}
1226			mtx_unlock(&metadata_lock);
1227			break;
1228		case BIO_WRITE:
1229			res->hr_stat_write++;
1230			if (res->hr_resuid == 0 &&
1231			    res->hr_primary_localcnt == 0) {
1232				/* This is first write. */
1233				res->hr_primary_localcnt = 1;
1234			}
1235			for (;;) {
1236				mtx_lock(&range_lock);
1237				if (rangelock_islocked(range_sync,
1238				    ggio->gctl_offset, ggio->gctl_length)) {
1239					pjdlog_debug(2,
1240					    "regular: Range offset=%jd length=%zu locked.",
1241					    (intmax_t)ggio->gctl_offset,
1242					    (size_t)ggio->gctl_length);
1243					range_regular_wait = true;
1244					cv_wait(&range_regular_cond, &range_lock);
1245					range_regular_wait = false;
1246					mtx_unlock(&range_lock);
1247					continue;
1248				}
1249				if (rangelock_add(range_regular,
1250				    ggio->gctl_offset, ggio->gctl_length) == -1) {
1251					mtx_unlock(&range_lock);
1252					pjdlog_debug(2,
1253					    "regular: Range offset=%jd length=%zu is already locked, waiting.",
1254					    (intmax_t)ggio->gctl_offset,
1255					    (size_t)ggio->gctl_length);
1256					sleep(1);
1257					continue;
1258				}
1259				mtx_unlock(&range_lock);
1260				break;
1261			}
1262			mtx_lock(&res->hr_amp_lock);
1263			if (activemap_write_start(res->hr_amp,
1264			    ggio->gctl_offset, ggio->gctl_length)) {
1265				res->hr_stat_activemap_update++;
1266				(void)hast_activemap_flush(res);
1267			}
1268			mtx_unlock(&res->hr_amp_lock);
1269			break;
1270		case BIO_DELETE:
1271			res->hr_stat_delete++;
1272			break;
1273		case BIO_FLUSH:
1274			res->hr_stat_flush++;
1275			break;
1276		}
1277		pjdlog_debug(2,
1278		    "ggate_recv: (%p) Moving request to the send queues.", hio);
1279		refcount_init(&hio->hio_countdown, ncomps);
1280		for (ii = ncomp; ii < ncomp + ncomps; ii++)
1281			QUEUE_INSERT1(hio, send, ii);
1282	}
1283	/* NOTREACHED */
1284	return (NULL);
1285}
1286
1287/*
1288 * Thread reads from or writes to local component.
1289 * If local read fails, it redirects it to remote_send thread.
1290 */
1291static void *
1292local_send_thread(void *arg)
1293{
1294	struct hast_resource *res = arg;
1295	struct g_gate_ctl_io *ggio;
1296	struct hio *hio;
1297	unsigned int ncomp, rncomp;
1298	ssize_t ret;
1299
1300	/* Local component is 0 for now. */
1301	ncomp = 0;
1302	/* Remote component is 1 for now. */
1303	rncomp = 1;
1304
1305	for (;;) {
1306		pjdlog_debug(2, "local_send: Taking request.");
1307		QUEUE_TAKE1(hio, send, ncomp, 0);
1308		pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1309		ggio = &hio->hio_ggio;
1310		switch (ggio->gctl_cmd) {
1311		case BIO_READ:
1312			ret = pread(res->hr_localfd, ggio->gctl_data,
1313			    ggio->gctl_length,
1314			    ggio->gctl_offset + res->hr_localoff);
1315			if (ret == ggio->gctl_length)
1316				hio->hio_errors[ncomp] = 0;
1317			else if (!ISSYNCREQ(hio)) {
1318				/*
1319				 * If READ failed, try to read from remote node.
1320				 */
1321				if (ret == -1) {
1322					reqlog(LOG_WARNING, 0, ggio,
1323					    "Local request failed (%s), trying remote node. ",
1324					    strerror(errno));
1325				} else if (ret != ggio->gctl_length) {
1326					reqlog(LOG_WARNING, 0, ggio,
1327					    "Local request failed (%zd != %jd), trying remote node. ",
1328					    ret, (intmax_t)ggio->gctl_length);
1329				}
1330				QUEUE_INSERT1(hio, send, rncomp);
1331				continue;
1332			}
1333			break;
1334		case BIO_WRITE:
1335			ret = pwrite(res->hr_localfd, ggio->gctl_data,
1336			    ggio->gctl_length,
1337			    ggio->gctl_offset + res->hr_localoff);
1338			if (ret == -1) {
1339				hio->hio_errors[ncomp] = errno;
1340				reqlog(LOG_WARNING, 0, ggio,
1341				    "Local request failed (%s): ",
1342				    strerror(errno));
1343			} else if (ret != ggio->gctl_length) {
1344				hio->hio_errors[ncomp] = EIO;
1345				reqlog(LOG_WARNING, 0, ggio,
1346				    "Local request failed (%zd != %jd): ",
1347				    ret, (intmax_t)ggio->gctl_length);
1348			} else {
1349				hio->hio_errors[ncomp] = 0;
1350				if (hio->hio_replication ==
1351				    HAST_REPLICATION_ASYNC &&
1352				    !ISSYNCREQ(hio)) {
1353					ggio->gctl_error = 0;
1354					write_complete(res, hio);
1355				}
1356			}
1357			break;
1358		case BIO_DELETE:
1359			ret = g_delete(res->hr_localfd,
1360			    ggio->gctl_offset + res->hr_localoff,
1361			    ggio->gctl_length);
1362			if (ret == -1) {
1363				hio->hio_errors[ncomp] = errno;
1364				reqlog(LOG_WARNING, 0, ggio,
1365				    "Local request failed (%s): ",
1366				    strerror(errno));
1367			} else {
1368				hio->hio_errors[ncomp] = 0;
1369			}
1370			break;
1371		case BIO_FLUSH:
1372			if (!res->hr_localflush) {
1373				ret = -1;
1374				errno = EOPNOTSUPP;
1375				break;
1376			}
1377			ret = g_flush(res->hr_localfd);
1378			if (ret == -1) {
1379				if (errno == EOPNOTSUPP)
1380					res->hr_localflush = false;
1381				hio->hio_errors[ncomp] = errno;
1382				reqlog(LOG_WARNING, 0, ggio,
1383				    "Local request failed (%s): ",
1384				    strerror(errno));
1385			} else {
1386				hio->hio_errors[ncomp] = 0;
1387			}
1388			break;
1389		}
1390		if (!refcount_release(&hio->hio_countdown))
1391			continue;
1392		if (ISSYNCREQ(hio)) {
1393			mtx_lock(&sync_lock);
1394			SYNCREQDONE(hio);
1395			mtx_unlock(&sync_lock);
1396			cv_signal(&sync_cond);
1397		} else {
1398			pjdlog_debug(2,
1399			    "local_send: (%p) Moving request to the done queue.",
1400			    hio);
1401			QUEUE_INSERT2(hio, done);
1402		}
1403	}
1404	/* NOTREACHED */
1405	return (NULL);
1406}
1407
1408static void
1409keepalive_send(struct hast_resource *res, unsigned int ncomp)
1410{
1411	struct nv *nv;
1412
1413	rw_rlock(&hio_remote_lock[ncomp]);
1414
1415	if (!ISCONNECTED(res, ncomp)) {
1416		rw_unlock(&hio_remote_lock[ncomp]);
1417		return;
1418	}
1419
1420	PJDLOG_ASSERT(res->hr_remotein != NULL);
1421	PJDLOG_ASSERT(res->hr_remoteout != NULL);
1422
1423	nv = nv_alloc();
1424	nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
1425	if (nv_error(nv) != 0) {
1426		rw_unlock(&hio_remote_lock[ncomp]);
1427		nv_free(nv);
1428		pjdlog_debug(1,
1429		    "keepalive_send: Unable to prepare header to send.");
1430		return;
1431	}
1432	if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) {
1433		rw_unlock(&hio_remote_lock[ncomp]);
1434		pjdlog_common(LOG_DEBUG, 1, errno,
1435		    "keepalive_send: Unable to send request");
1436		nv_free(nv);
1437		remote_close(res, ncomp);
1438		return;
1439	}
1440
1441	rw_unlock(&hio_remote_lock[ncomp]);
1442	nv_free(nv);
1443	pjdlog_debug(2, "keepalive_send: Request sent.");
1444}
1445
1446/*
1447 * Thread sends request to secondary node.
1448 */
1449static void *
1450remote_send_thread(void *arg)
1451{
1452	struct hast_resource *res = arg;
1453	struct g_gate_ctl_io *ggio;
1454	time_t lastcheck, now;
1455	struct hio *hio;
1456	struct nv *nv;
1457	unsigned int ncomp;
1458	bool wakeup;
1459	uint64_t offset, length;
1460	uint8_t cmd;
1461	void *data;
1462
1463	/* Remote component is 1 for now. */
1464	ncomp = 1;
1465	lastcheck = time(NULL);
1466
1467	for (;;) {
1468		pjdlog_debug(2, "remote_send: Taking request.");
1469		QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE);
1470		if (hio == NULL) {
1471			now = time(NULL);
1472			if (lastcheck + HAST_KEEPALIVE <= now) {
1473				keepalive_send(res, ncomp);
1474				lastcheck = now;
1475			}
1476			continue;
1477		}
1478		pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1479		ggio = &hio->hio_ggio;
1480		switch (ggio->gctl_cmd) {
1481		case BIO_READ:
1482			cmd = HIO_READ;
1483			data = NULL;
1484			offset = ggio->gctl_offset;
1485			length = ggio->gctl_length;
1486			break;
1487		case BIO_WRITE:
1488			cmd = HIO_WRITE;
1489			data = ggio->gctl_data;
1490			offset = ggio->gctl_offset;
1491			length = ggio->gctl_length;
1492			break;
1493		case BIO_DELETE:
1494			cmd = HIO_DELETE;
1495			data = NULL;
1496			offset = ggio->gctl_offset;
1497			length = ggio->gctl_length;
1498			break;
1499		case BIO_FLUSH:
1500			cmd = HIO_FLUSH;
1501			data = NULL;
1502			offset = 0;
1503			length = 0;
1504			break;
1505		default:
1506			PJDLOG_ABORT("invalid condition");
1507		}
1508		nv = nv_alloc();
1509		nv_add_uint8(nv, cmd, "cmd");
1510		nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1511		nv_add_uint64(nv, offset, "offset");
1512		nv_add_uint64(nv, length, "length");
1513		if (nv_error(nv) != 0) {
1514			hio->hio_errors[ncomp] = nv_error(nv);
1515			pjdlog_debug(2,
1516			    "remote_send: (%p) Unable to prepare header to send.",
1517			    hio);
1518			reqlog(LOG_ERR, 0, ggio,
1519			    "Unable to prepare header to send (%s): ",
1520			    strerror(nv_error(nv)));
1521			/* Move failed request immediately to the done queue. */
1522			goto done_queue;
1523		}
1524		/*
1525		 * Protect connection from disappearing.
1526		 */
1527		rw_rlock(&hio_remote_lock[ncomp]);
1528		if (!ISCONNECTED(res, ncomp)) {
1529			rw_unlock(&hio_remote_lock[ncomp]);
1530			hio->hio_errors[ncomp] = ENOTCONN;
1531			goto done_queue;
1532		}
1533		/*
1534		 * Move the request to recv queue before sending it, because
1535		 * in different order we can get reply before we move request
1536		 * to recv queue.
1537		 */
1538		pjdlog_debug(2,
1539		    "remote_send: (%p) Moving request to the recv queue.",
1540		    hio);
1541		mtx_lock(&hio_recv_list_lock[ncomp]);
1542		wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1543		TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1544		mtx_unlock(&hio_recv_list_lock[ncomp]);
1545		if (hast_proto_send(res, res->hr_remoteout, nv, data,
1546		    data != NULL ? length : 0) == -1) {
1547			hio->hio_errors[ncomp] = errno;
1548			rw_unlock(&hio_remote_lock[ncomp]);
1549			pjdlog_debug(2,
1550			    "remote_send: (%p) Unable to send request.", hio);
1551			reqlog(LOG_ERR, 0, ggio,
1552			    "Unable to send request (%s): ",
1553			    strerror(hio->hio_errors[ncomp]));
1554			remote_close(res, ncomp);
1555			/*
1556			 * Take request back from the receive queue and move
1557			 * it immediately to the done queue.
1558			 */
1559			mtx_lock(&hio_recv_list_lock[ncomp]);
1560			TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1561			    hio_next[ncomp]);
1562			mtx_unlock(&hio_recv_list_lock[ncomp]);
1563			goto done_queue;
1564		}
1565		rw_unlock(&hio_remote_lock[ncomp]);
1566		nv_free(nv);
1567		if (wakeup)
1568			cv_signal(&hio_recv_list_cond[ncomp]);
1569		continue;
1570done_queue:
1571		nv_free(nv);
1572		if (ISSYNCREQ(hio)) {
1573			if (!refcount_release(&hio->hio_countdown))
1574				continue;
1575			mtx_lock(&sync_lock);
1576			SYNCREQDONE(hio);
1577			mtx_unlock(&sync_lock);
1578			cv_signal(&sync_cond);
1579			continue;
1580		}
1581		if (ggio->gctl_cmd == BIO_WRITE) {
1582			mtx_lock(&res->hr_amp_lock);
1583			if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1584			    ggio->gctl_length)) {
1585				(void)hast_activemap_flush(res);
1586			}
1587			mtx_unlock(&res->hr_amp_lock);
1588		}
1589		if (!refcount_release(&hio->hio_countdown))
1590			continue;
1591		pjdlog_debug(2,
1592		    "remote_send: (%p) Moving request to the done queue.",
1593		    hio);
1594		QUEUE_INSERT2(hio, done);
1595	}
1596	/* NOTREACHED */
1597	return (NULL);
1598}
1599
1600/*
1601 * Thread receives answer from secondary node and passes it to ggate_send
1602 * thread.
1603 */
1604static void *
1605remote_recv_thread(void *arg)
1606{
1607	struct hast_resource *res = arg;
1608	struct g_gate_ctl_io *ggio;
1609	struct hio *hio;
1610	struct nv *nv;
1611	unsigned int ncomp;
1612	uint64_t seq;
1613	int error;
1614
1615	/* Remote component is 1 for now. */
1616	ncomp = 1;
1617
1618	for (;;) {
1619		/* Wait until there is anything to receive. */
1620		mtx_lock(&hio_recv_list_lock[ncomp]);
1621		while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1622			pjdlog_debug(2, "remote_recv: No requests, waiting.");
1623			cv_wait(&hio_recv_list_cond[ncomp],
1624			    &hio_recv_list_lock[ncomp]);
1625		}
1626		mtx_unlock(&hio_recv_list_lock[ncomp]);
1627
1628		rw_rlock(&hio_remote_lock[ncomp]);
1629		if (!ISCONNECTED(res, ncomp)) {
1630			rw_unlock(&hio_remote_lock[ncomp]);
1631			/*
1632			 * Connection is dead, so move all pending requests to
1633			 * the done queue (one-by-one).
1634			 */
1635			mtx_lock(&hio_recv_list_lock[ncomp]);
1636			hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1637			PJDLOG_ASSERT(hio != NULL);
1638			TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1639			    hio_next[ncomp]);
1640			mtx_unlock(&hio_recv_list_lock[ncomp]);
1641			goto done_queue;
1642		}
1643		if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
1644			pjdlog_errno(LOG_ERR,
1645			    "Unable to receive reply header");
1646			rw_unlock(&hio_remote_lock[ncomp]);
1647			remote_close(res, ncomp);
1648			continue;
1649		}
1650		rw_unlock(&hio_remote_lock[ncomp]);
1651		seq = nv_get_uint64(nv, "seq");
1652		if (seq == 0) {
1653			pjdlog_error("Header contains no 'seq' field.");
1654			nv_free(nv);
1655			continue;
1656		}
1657		mtx_lock(&hio_recv_list_lock[ncomp]);
1658		TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1659			if (hio->hio_ggio.gctl_seq == seq) {
1660				TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1661				    hio_next[ncomp]);
1662				break;
1663			}
1664		}
1665		mtx_unlock(&hio_recv_list_lock[ncomp]);
1666		if (hio == NULL) {
1667			pjdlog_error("Found no request matching received 'seq' field (%ju).",
1668			    (uintmax_t)seq);
1669			nv_free(nv);
1670			continue;
1671		}
1672		ggio = &hio->hio_ggio;
1673		error = nv_get_int16(nv, "error");
1674		if (error != 0) {
1675			/* Request failed on remote side. */
1676			hio->hio_errors[ncomp] = error;
1677			reqlog(LOG_WARNING, 0, ggio,
1678			    "Remote request failed (%s): ", strerror(error));
1679			nv_free(nv);
1680			goto done_queue;
1681		}
1682		switch (ggio->gctl_cmd) {
1683		case BIO_READ:
1684			rw_rlock(&hio_remote_lock[ncomp]);
1685			if (!ISCONNECTED(res, ncomp)) {
1686				rw_unlock(&hio_remote_lock[ncomp]);
1687				nv_free(nv);
1688				goto done_queue;
1689			}
1690			if (hast_proto_recv_data(res, res->hr_remotein, nv,
1691			    ggio->gctl_data, ggio->gctl_length) == -1) {
1692				hio->hio_errors[ncomp] = errno;
1693				pjdlog_errno(LOG_ERR,
1694				    "Unable to receive reply data");
1695				rw_unlock(&hio_remote_lock[ncomp]);
1696				nv_free(nv);
1697				remote_close(res, ncomp);
1698				goto done_queue;
1699			}
1700			rw_unlock(&hio_remote_lock[ncomp]);
1701			break;
1702		case BIO_WRITE:
1703		case BIO_DELETE:
1704		case BIO_FLUSH:
1705			break;
1706		default:
1707			PJDLOG_ABORT("invalid condition");
1708		}
1709		hio->hio_errors[ncomp] = 0;
1710		nv_free(nv);
1711done_queue:
1712		if (!refcount_release(&hio->hio_countdown))
1713			continue;
1714		if (ISSYNCREQ(hio)) {
1715			mtx_lock(&sync_lock);
1716			SYNCREQDONE(hio);
1717			mtx_unlock(&sync_lock);
1718			cv_signal(&sync_cond);
1719		} else {
1720			pjdlog_debug(2,
1721			    "remote_recv: (%p) Moving request to the done queue.",
1722			    hio);
1723			QUEUE_INSERT2(hio, done);
1724		}
1725	}
1726	/* NOTREACHED */
1727	return (NULL);
1728}
1729
1730/*
1731 * Thread sends answer to the kernel.
1732 */
1733static void *
1734ggate_send_thread(void *arg)
1735{
1736	struct hast_resource *res = arg;
1737	struct g_gate_ctl_io *ggio;
1738	struct hio *hio;
1739	unsigned int ii, ncomps;
1740
1741	ncomps = HAST_NCOMPONENTS;
1742
1743	for (;;) {
1744		pjdlog_debug(2, "ggate_send: Taking request.");
1745		QUEUE_TAKE2(hio, done);
1746		pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1747		ggio = &hio->hio_ggio;
1748		for (ii = 0; ii < ncomps; ii++) {
1749			if (hio->hio_errors[ii] == 0) {
1750				/*
1751				 * One successful request is enough to declare
1752				 * success.
1753				 */
1754				ggio->gctl_error = 0;
1755				break;
1756			}
1757		}
1758		if (ii == ncomps) {
1759			/*
1760			 * None of the requests were successful.
1761			 * Use the error from local component except the
1762			 * case when we did only remote request.
1763			 */
1764			if (ggio->gctl_cmd == BIO_READ &&
1765			    res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
1766				ggio->gctl_error = hio->hio_errors[1];
1767			else
1768				ggio->gctl_error = hio->hio_errors[0];
1769		}
1770		if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1771			mtx_lock(&res->hr_amp_lock);
1772			if (activemap_write_complete(res->hr_amp,
1773			    ggio->gctl_offset, ggio->gctl_length)) {
1774				res->hr_stat_activemap_update++;
1775				(void)hast_activemap_flush(res);
1776			}
1777			mtx_unlock(&res->hr_amp_lock);
1778		}
1779		if (ggio->gctl_cmd == BIO_WRITE) {
1780			/*
1781			 * Unlock range we locked.
1782			 */
1783			mtx_lock(&range_lock);
1784			rangelock_del(range_regular, ggio->gctl_offset,
1785			    ggio->gctl_length);
1786			if (range_sync_wait)
1787				cv_signal(&range_sync_cond);
1788			mtx_unlock(&range_lock);
1789			if (!hio->hio_done)
1790				write_complete(res, hio);
1791		} else {
1792			if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) {
1793				primary_exit(EX_OSERR,
1794				    "G_GATE_CMD_DONE failed");
1795			}
1796		}
1797		if (hio->hio_errors[0]) {
1798			switch (ggio->gctl_cmd) {
1799			case BIO_READ:
1800				res->hr_stat_read_error++;
1801				break;
1802			case BIO_WRITE:
1803				res->hr_stat_write_error++;
1804				break;
1805			case BIO_DELETE:
1806				res->hr_stat_delete_error++;
1807				break;
1808			case BIO_FLUSH:
1809				res->hr_stat_flush_error++;
1810				break;
1811			}
1812		}
1813		pjdlog_debug(2,
1814		    "ggate_send: (%p) Moving request to the free queue.", hio);
1815		QUEUE_INSERT2(hio, free);
1816	}
1817	/* NOTREACHED */
1818	return (NULL);
1819}
1820
1821/*
1822 * Thread synchronize local and remote components.
1823 */
1824static void *
1825sync_thread(void *arg __unused)
1826{
1827	struct hast_resource *res = arg;
1828	struct hio *hio;
1829	struct g_gate_ctl_io *ggio;
1830	struct timeval tstart, tend, tdiff;
1831	unsigned int ii, ncomp, ncomps;
1832	off_t offset, length, synced;
1833	bool dorewind, directreads;
1834	int syncext;
1835
1836	ncomps = HAST_NCOMPONENTS;
1837	dorewind = true;
1838	synced = 0;
1839	offset = -1;
1840	directreads = false;
1841
1842	for (;;) {
1843		mtx_lock(&sync_lock);
1844		if (offset >= 0 && !sync_inprogress) {
1845			gettimeofday(&tend, NULL);
1846			timersub(&tend, &tstart, &tdiff);
1847			pjdlog_info("Synchronization interrupted after %#.0T. "
1848			    "%NB synchronized so far.", &tdiff,
1849			    (intmax_t)synced);
1850			event_send(res, EVENT_SYNCINTR);
1851		}
1852		while (!sync_inprogress) {
1853			dorewind = true;
1854			synced = 0;
1855			cv_wait(&sync_cond, &sync_lock);
1856		}
1857		mtx_unlock(&sync_lock);
1858		/*
1859		 * Obtain offset at which we should synchronize.
1860		 * Rewind synchronization if needed.
1861		 */
1862		mtx_lock(&res->hr_amp_lock);
1863		if (dorewind)
1864			activemap_sync_rewind(res->hr_amp);
1865		offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1866		if (syncext != -1) {
1867			/*
1868			 * We synchronized entire syncext extent, we can mark
1869			 * it as clean now.
1870			 */
1871			if (activemap_extent_complete(res->hr_amp, syncext))
1872				(void)hast_activemap_flush(res);
1873		}
1874		mtx_unlock(&res->hr_amp_lock);
1875		if (dorewind) {
1876			dorewind = false;
1877			if (offset == -1)
1878				pjdlog_info("Nodes are in sync.");
1879			else {
1880				pjdlog_info("Synchronization started. %NB to go.",
1881				    (intmax_t)(res->hr_extentsize *
1882				    activemap_ndirty(res->hr_amp)));
1883				event_send(res, EVENT_SYNCSTART);
1884				gettimeofday(&tstart, NULL);
1885			}
1886		}
1887		if (offset == -1) {
1888			sync_stop();
1889			pjdlog_debug(1, "Nothing to synchronize.");
1890			/*
1891			 * Synchronization complete, make both localcnt and
1892			 * remotecnt equal.
1893			 */
1894			ncomp = 1;
1895			rw_rlock(&hio_remote_lock[ncomp]);
1896			if (ISCONNECTED(res, ncomp)) {
1897				if (synced > 0) {
1898					int64_t bps;
1899
1900					gettimeofday(&tend, NULL);
1901					timersub(&tend, &tstart, &tdiff);
1902					bps = (int64_t)((double)synced /
1903					    ((double)tdiff.tv_sec +
1904					    (double)tdiff.tv_usec / 1000000));
1905					pjdlog_info("Synchronization complete. "
1906					    "%NB synchronized in %#.0lT (%NB/sec).",
1907					    (intmax_t)synced, &tdiff,
1908					    (intmax_t)bps);
1909					event_send(res, EVENT_SYNCDONE);
1910				}
1911				mtx_lock(&metadata_lock);
1912				if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
1913					directreads = true;
1914				res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1915				res->hr_primary_localcnt =
1916				    res->hr_secondary_remotecnt;
1917				res->hr_primary_remotecnt =
1918				    res->hr_secondary_localcnt;
1919				pjdlog_debug(1,
1920				    "Setting localcnt to %ju and remotecnt to %ju.",
1921				    (uintmax_t)res->hr_primary_localcnt,
1922				    (uintmax_t)res->hr_primary_remotecnt);
1923				(void)metadata_write(res);
1924				mtx_unlock(&metadata_lock);
1925			}
1926			rw_unlock(&hio_remote_lock[ncomp]);
1927			if (directreads) {
1928				directreads = false;
1929				enable_direct_reads(res);
1930			}
1931			continue;
1932		}
1933		pjdlog_debug(2, "sync: Taking free request.");
1934		QUEUE_TAKE2(hio, free);
1935		pjdlog_debug(2, "sync: (%p) Got free request.", hio);
1936		/*
1937		 * Lock the range we are going to synchronize. We don't want
1938		 * race where someone writes between our read and write.
1939		 */
1940		for (;;) {
1941			mtx_lock(&range_lock);
1942			if (rangelock_islocked(range_regular, offset, length)) {
1943				pjdlog_debug(2,
1944				    "sync: Range offset=%jd length=%jd locked.",
1945				    (intmax_t)offset, (intmax_t)length);
1946				range_sync_wait = true;
1947				cv_wait(&range_sync_cond, &range_lock);
1948				range_sync_wait = false;
1949				mtx_unlock(&range_lock);
1950				continue;
1951			}
1952			if (rangelock_add(range_sync, offset, length) == -1) {
1953				mtx_unlock(&range_lock);
1954				pjdlog_debug(2,
1955				    "sync: Range offset=%jd length=%jd is already locked, waiting.",
1956				    (intmax_t)offset, (intmax_t)length);
1957				sleep(1);
1958				continue;
1959			}
1960			mtx_unlock(&range_lock);
1961			break;
1962		}
1963		/*
1964		 * First read the data from synchronization source.
1965		 */
1966		SYNCREQ(hio);
1967		ggio = &hio->hio_ggio;
1968		ggio->gctl_cmd = BIO_READ;
1969		ggio->gctl_offset = offset;
1970		ggio->gctl_length = length;
1971		ggio->gctl_error = 0;
1972		hio->hio_done = false;
1973		hio->hio_replication = res->hr_replication;
1974		for (ii = 0; ii < ncomps; ii++)
1975			hio->hio_errors[ii] = EINVAL;
1976		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1977		    hio);
1978		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1979		    hio);
1980		mtx_lock(&metadata_lock);
1981		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1982			/*
1983			 * This range is up-to-date on local component,
1984			 * so handle request locally.
1985			 */
1986			 /* Local component is 0 for now. */
1987			ncomp = 0;
1988		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1989			PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1990			/*
1991			 * This range is out-of-date on local component,
1992			 * so send request to the remote node.
1993			 */
1994			 /* Remote component is 1 for now. */
1995			ncomp = 1;
1996		}
1997		mtx_unlock(&metadata_lock);
1998		refcount_init(&hio->hio_countdown, 1);
1999		QUEUE_INSERT1(hio, send, ncomp);
2000
2001		/*
2002		 * Let's wait for READ to finish.
2003		 */
2004		mtx_lock(&sync_lock);
2005		while (!ISSYNCREQDONE(hio))
2006			cv_wait(&sync_cond, &sync_lock);
2007		mtx_unlock(&sync_lock);
2008
2009		if (hio->hio_errors[ncomp] != 0) {
2010			pjdlog_error("Unable to read synchronization data: %s.",
2011			    strerror(hio->hio_errors[ncomp]));
2012			goto free_queue;
2013		}
2014
2015		/*
2016		 * We read the data from synchronization source, now write it
2017		 * to synchronization target.
2018		 */
2019		SYNCREQ(hio);
2020		ggio->gctl_cmd = BIO_WRITE;
2021		for (ii = 0; ii < ncomps; ii++)
2022			hio->hio_errors[ii] = EINVAL;
2023		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
2024		    hio);
2025		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2026		    hio);
2027		mtx_lock(&metadata_lock);
2028		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
2029			/*
2030			 * This range is up-to-date on local component,
2031			 * so we update remote component.
2032			 */
2033			 /* Remote component is 1 for now. */
2034			ncomp = 1;
2035		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
2036			PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
2037			/*
2038			 * This range is out-of-date on local component,
2039			 * so we update it.
2040			 */
2041			 /* Local component is 0 for now. */
2042			ncomp = 0;
2043		}
2044		mtx_unlock(&metadata_lock);
2045
2046		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2047		    hio);
2048		refcount_init(&hio->hio_countdown, 1);
2049		QUEUE_INSERT1(hio, send, ncomp);
2050
2051		/*
2052		 * Let's wait for WRITE to finish.
2053		 */
2054		mtx_lock(&sync_lock);
2055		while (!ISSYNCREQDONE(hio))
2056			cv_wait(&sync_cond, &sync_lock);
2057		mtx_unlock(&sync_lock);
2058
2059		if (hio->hio_errors[ncomp] != 0) {
2060			pjdlog_error("Unable to write synchronization data: %s.",
2061			    strerror(hio->hio_errors[ncomp]));
2062			goto free_queue;
2063		}
2064
2065		synced += length;
2066free_queue:
2067		mtx_lock(&range_lock);
2068		rangelock_del(range_sync, offset, length);
2069		if (range_regular_wait)
2070			cv_signal(&range_regular_cond);
2071		mtx_unlock(&range_lock);
2072		pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
2073		    hio);
2074		QUEUE_INSERT2(hio, free);
2075	}
2076	/* NOTREACHED */
2077	return (NULL);
2078}
2079
2080void
2081primary_config_reload(struct hast_resource *res, struct nv *nv)
2082{
2083	unsigned int ii, ncomps;
2084	int modified, vint;
2085	const char *vstr;
2086
2087	pjdlog_info("Reloading configuration...");
2088
2089	PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY);
2090	PJDLOG_ASSERT(gres == res);
2091	nv_assert(nv, "remoteaddr");
2092	nv_assert(nv, "sourceaddr");
2093	nv_assert(nv, "replication");
2094	nv_assert(nv, "checksum");
2095	nv_assert(nv, "compression");
2096	nv_assert(nv, "timeout");
2097	nv_assert(nv, "exec");
2098	nv_assert(nv, "metaflush");
2099
2100	ncomps = HAST_NCOMPONENTS;
2101
2102#define MODIFIED_REMOTEADDR	0x01
2103#define MODIFIED_SOURCEADDR	0x02
2104#define MODIFIED_REPLICATION	0x04
2105#define MODIFIED_CHECKSUM	0x08
2106#define MODIFIED_COMPRESSION	0x10
2107#define MODIFIED_TIMEOUT	0x20
2108#define MODIFIED_EXEC		0x40
2109#define MODIFIED_METAFLUSH	0x80
2110	modified = 0;
2111
2112	vstr = nv_get_string(nv, "remoteaddr");
2113	if (strcmp(gres->hr_remoteaddr, vstr) != 0) {
2114		/*
2115		 * Don't copy res->hr_remoteaddr to gres just yet.
2116		 * We want remote_close() to log disconnect from the old
2117		 * addresses, not from the new ones.
2118		 */
2119		modified |= MODIFIED_REMOTEADDR;
2120	}
2121	vstr = nv_get_string(nv, "sourceaddr");
2122	if (strcmp(gres->hr_sourceaddr, vstr) != 0) {
2123		strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr));
2124		modified |= MODIFIED_SOURCEADDR;
2125	}
2126	vint = nv_get_int32(nv, "replication");
2127	if (gres->hr_replication != vint) {
2128		gres->hr_replication = vint;
2129		modified |= MODIFIED_REPLICATION;
2130	}
2131	vint = nv_get_int32(nv, "checksum");
2132	if (gres->hr_checksum != vint) {
2133		gres->hr_checksum = vint;
2134		modified |= MODIFIED_CHECKSUM;
2135	}
2136	vint = nv_get_int32(nv, "compression");
2137	if (gres->hr_compression != vint) {
2138		gres->hr_compression = vint;
2139		modified |= MODIFIED_COMPRESSION;
2140	}
2141	vint = nv_get_int32(nv, "timeout");
2142	if (gres->hr_timeout != vint) {
2143		gres->hr_timeout = vint;
2144		modified |= MODIFIED_TIMEOUT;
2145	}
2146	vstr = nv_get_string(nv, "exec");
2147	if (strcmp(gres->hr_exec, vstr) != 0) {
2148		strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec));
2149		modified |= MODIFIED_EXEC;
2150	}
2151	vint = nv_get_int32(nv, "metaflush");
2152	if (gres->hr_metaflush != vint) {
2153		gres->hr_metaflush = vint;
2154		modified |= MODIFIED_METAFLUSH;
2155	}
2156
2157	/*
2158	 * Change timeout for connected sockets.
2159	 * Don't bother if we need to reconnect.
2160	 */
2161	if ((modified & MODIFIED_TIMEOUT) != 0 &&
2162	    (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
2163		for (ii = 0; ii < ncomps; ii++) {
2164			if (!ISREMOTE(ii))
2165				continue;
2166			rw_rlock(&hio_remote_lock[ii]);
2167			if (!ISCONNECTED(gres, ii)) {
2168				rw_unlock(&hio_remote_lock[ii]);
2169				continue;
2170			}
2171			rw_unlock(&hio_remote_lock[ii]);
2172			if (proto_timeout(gres->hr_remotein,
2173			    gres->hr_timeout) == -1) {
2174				pjdlog_errno(LOG_WARNING,
2175				    "Unable to set connection timeout");
2176			}
2177			if (proto_timeout(gres->hr_remoteout,
2178			    gres->hr_timeout) == -1) {
2179				pjdlog_errno(LOG_WARNING,
2180				    "Unable to set connection timeout");
2181			}
2182		}
2183	}
2184	if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
2185		for (ii = 0; ii < ncomps; ii++) {
2186			if (!ISREMOTE(ii))
2187				continue;
2188			remote_close(gres, ii);
2189		}
2190		if (modified & MODIFIED_REMOTEADDR) {
2191			vstr = nv_get_string(nv, "remoteaddr");
2192			strlcpy(gres->hr_remoteaddr, vstr,
2193			    sizeof(gres->hr_remoteaddr));
2194		}
2195	}
2196#undef	MODIFIED_REMOTEADDR
2197#undef	MODIFIED_SOURCEADDR
2198#undef	MODIFIED_REPLICATION
2199#undef	MODIFIED_CHECKSUM
2200#undef	MODIFIED_COMPRESSION
2201#undef	MODIFIED_TIMEOUT
2202#undef	MODIFIED_EXEC
2203#undef	MODIFIED_METAFLUSH
2204
2205	pjdlog_info("Configuration reloaded successfully.");
2206}
2207
2208static void
2209guard_one(struct hast_resource *res, unsigned int ncomp)
2210{
2211	struct proto_conn *in, *out;
2212
2213	if (!ISREMOTE(ncomp))
2214		return;
2215
2216	rw_rlock(&hio_remote_lock[ncomp]);
2217
2218	if (!real_remote(res)) {
2219		rw_unlock(&hio_remote_lock[ncomp]);
2220		return;
2221	}
2222
2223	if (ISCONNECTED(res, ncomp)) {
2224		PJDLOG_ASSERT(res->hr_remotein != NULL);
2225		PJDLOG_ASSERT(res->hr_remoteout != NULL);
2226		rw_unlock(&hio_remote_lock[ncomp]);
2227		pjdlog_debug(2, "remote_guard: Connection to %s is ok.",
2228		    res->hr_remoteaddr);
2229		return;
2230	}
2231
2232	PJDLOG_ASSERT(res->hr_remotein == NULL);
2233	PJDLOG_ASSERT(res->hr_remoteout == NULL);
2234	/*
2235	 * Upgrade the lock. It doesn't have to be atomic as no other thread
2236	 * can change connection status from disconnected to connected.
2237	 */
2238	rw_unlock(&hio_remote_lock[ncomp]);
2239	pjdlog_debug(2, "remote_guard: Reconnecting to %s.",
2240	    res->hr_remoteaddr);
2241	in = out = NULL;
2242	if (init_remote(res, &in, &out) == 0) {
2243		rw_wlock(&hio_remote_lock[ncomp]);
2244		PJDLOG_ASSERT(res->hr_remotein == NULL);
2245		PJDLOG_ASSERT(res->hr_remoteout == NULL);
2246		PJDLOG_ASSERT(in != NULL && out != NULL);
2247		res->hr_remotein = in;
2248		res->hr_remoteout = out;
2249		rw_unlock(&hio_remote_lock[ncomp]);
2250		pjdlog_info("Successfully reconnected to %s.",
2251		    res->hr_remoteaddr);
2252		sync_start();
2253	} else {
2254		/* Both connections should be NULL. */
2255		PJDLOG_ASSERT(res->hr_remotein == NULL);
2256		PJDLOG_ASSERT(res->hr_remoteout == NULL);
2257		PJDLOG_ASSERT(in == NULL && out == NULL);
2258		pjdlog_debug(2, "remote_guard: Reconnect to %s failed.",
2259		    res->hr_remoteaddr);
2260	}
2261}
2262
2263/*
2264 * Thread guards remote connections and reconnects when needed, handles
2265 * signals, etc.
2266 */
2267static void *
2268guard_thread(void *arg)
2269{
2270	struct hast_resource *res = arg;
2271	unsigned int ii, ncomps;
2272	struct timespec timeout;
2273	time_t lastcheck, now;
2274	sigset_t mask;
2275	int signo;
2276
2277	ncomps = HAST_NCOMPONENTS;
2278	lastcheck = time(NULL);
2279
2280	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
2281	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
2282	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
2283
2284	timeout.tv_sec = HAST_KEEPALIVE;
2285	timeout.tv_nsec = 0;
2286	signo = -1;
2287
2288	for (;;) {
2289		switch (signo) {
2290		case SIGINT:
2291		case SIGTERM:
2292			sigexit_received = true;
2293			primary_exitx(EX_OK,
2294			    "Termination signal received, exiting.");
2295			break;
2296		default:
2297			break;
2298		}
2299
2300		/*
2301		 * Don't check connections until we fully started,
2302		 * as we may still be looping, waiting for remote node
2303		 * to switch from primary to secondary.
2304		 */
2305		if (fullystarted) {
2306			pjdlog_debug(2, "remote_guard: Checking connections.");
2307			now = time(NULL);
2308			if (lastcheck + HAST_KEEPALIVE <= now) {
2309				for (ii = 0; ii < ncomps; ii++)
2310					guard_one(res, ii);
2311				lastcheck = now;
2312			}
2313		}
2314		signo = sigtimedwait(&mask, NULL, &timeout);
2315	}
2316	/* NOTREACHED */
2317	return (NULL);
2318}
2319