1/*-
2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3 *
4 * Copyright (c) 2009 The FreeBSD Foundation
5 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net>
6 * All rights reserved.
7 *
8 * This software was developed by Pawel Jakub Dawidek under sponsorship from
9 * the FreeBSD Foundation.
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions
13 * are met:
14 * 1. Redistributions of source code must retain the above copyright
15 *    notice, this list of conditions and the following disclaimer.
16 * 2. Redistributions in binary form must reproduce the above copyright
17 *    notice, this list of conditions and the following disclaimer in the
18 *    documentation and/or other materials provided with the distribution.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
21 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
24 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
26 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
27 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
29 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
31 */
32
33#include <sys/cdefs.h>
34__FBSDID("$FreeBSD: stable/11/sbin/hastd/primary.c 330449 2018-03-05 07:26:05Z eadler $");
35
36#include <sys/types.h>
37#include <sys/time.h>
38#include <sys/bio.h>
39#include <sys/disk.h>
40#include <sys/stat.h>
41
42#include <geom/gate/g_gate.h>
43
44#include <err.h>
45#include <errno.h>
46#include <fcntl.h>
47#include <libgeom.h>
48#include <pthread.h>
49#include <signal.h>
50#include <stdint.h>
51#include <stdio.h>
52#include <string.h>
53#include <sysexits.h>
54#include <unistd.h>
55
56#include <activemap.h>
57#include <nv.h>
58#include <rangelock.h>
59
60#include "control.h"
61#include "event.h"
62#include "hast.h"
63#include "hast_proto.h"
64#include "hastd.h"
65#include "hooks.h"
66#include "metadata.h"
67#include "proto.h"
68#include "pjdlog.h"
69#include "refcnt.h"
70#include "subr.h"
71#include "synch.h"
72
73/* The is only one remote component for now. */
74#define	ISREMOTE(no)	((no) == 1)
75
76struct hio {
77	/*
78	 * Number of components we are still waiting for.
79	 * When this field goes to 0, we can send the request back to the
80	 * kernel. Each component has to decrease this counter by one
81	 * even on failure.
82	 */
83	refcnt_t		 hio_countdown;
84	/*
85	 * Each component has a place to store its own error.
86	 * Once the request is handled by all components we can decide if the
87	 * request overall is successful or not.
88	 */
89	int			*hio_errors;
90	/*
91	 * Structure used to communicate with GEOM Gate class.
92	 */
93	struct g_gate_ctl_io	 hio_ggio;
94	/*
95	 * Request was already confirmed to GEOM Gate.
96	 */
97	bool			 hio_done;
98	/*
99	 * Number of components we are still waiting before sending write
100	 * completion ack to GEOM Gate. Used for memsync.
101	 */
102	refcnt_t		 hio_writecount;
103	/*
104	 * Memsync request was acknowleged by remote.
105	 */
106	bool			 hio_memsyncacked;
107	/*
108	 * Remember replication from the time the request was initiated,
109	 * so we won't get confused when replication changes on reload.
110	 */
111	int			 hio_replication;
112	TAILQ_ENTRY(hio)	*hio_next;
113};
114#define	hio_free_next	hio_next[0]
115#define	hio_done_next	hio_next[0]
116
117/*
118 * Free list holds unused structures. When free list is empty, we have to wait
119 * until some in-progress requests are freed.
120 */
121static TAILQ_HEAD(, hio) hio_free_list;
122static size_t hio_free_list_size;
123static pthread_mutex_t hio_free_list_lock;
124static pthread_cond_t hio_free_list_cond;
125/*
126 * There is one send list for every component. One requests is placed on all
127 * send lists - each component gets the same request, but each component is
128 * responsible for managing his own send list.
129 */
130static TAILQ_HEAD(, hio) *hio_send_list;
131static size_t *hio_send_list_size;
132static pthread_mutex_t *hio_send_list_lock;
133static pthread_cond_t *hio_send_list_cond;
134#define	hio_send_local_list_size	hio_send_list_size[0]
135#define	hio_send_remote_list_size	hio_send_list_size[1]
136/*
137 * There is one recv list for every component, although local components don't
138 * use recv lists as local requests are done synchronously.
139 */
140static TAILQ_HEAD(, hio) *hio_recv_list;
141static size_t *hio_recv_list_size;
142static pthread_mutex_t *hio_recv_list_lock;
143static pthread_cond_t *hio_recv_list_cond;
144#define	hio_recv_remote_list_size	hio_recv_list_size[1]
145/*
146 * Request is placed on done list by the slowest component (the one that
147 * decreased hio_countdown from 1 to 0).
148 */
149static TAILQ_HEAD(, hio) hio_done_list;
150static size_t hio_done_list_size;
151static pthread_mutex_t hio_done_list_lock;
152static pthread_cond_t hio_done_list_cond;
153/*
154 * Structure below are for interaction with sync thread.
155 */
156static bool sync_inprogress;
157static pthread_mutex_t sync_lock;
158static pthread_cond_t sync_cond;
159/*
160 * The lock below allows to synchornize access to remote connections.
161 */
162static pthread_rwlock_t *hio_remote_lock;
163
164/*
165 * Lock to synchronize metadata updates. Also synchronize access to
166 * hr_primary_localcnt and hr_primary_remotecnt fields.
167 */
168static pthread_mutex_t metadata_lock;
169
170/*
171 * Maximum number of outstanding I/O requests.
172 */
173#define	HAST_HIO_MAX	256
174/*
175 * Number of components. At this point there are only two components: local
176 * and remote, but in the future it might be possible to use multiple local
177 * and remote components.
178 */
179#define	HAST_NCOMPONENTS	2
180
181#define	ISCONNECTED(res, no)	\
182	((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
183
184#define	QUEUE_INSERT1(hio, name, ncomp)	do {				\
185	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
186	if (TAILQ_EMPTY(&hio_##name##_list[(ncomp)]))			\
187		cv_broadcast(&hio_##name##_list_cond[(ncomp)]);		\
188	TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio),		\
189	    hio_next[(ncomp)]);						\
190	hio_##name##_list_size[(ncomp)]++;				\
191	mtx_unlock(&hio_##name##_list_lock[(ncomp)]);			\
192} while (0)
193#define	QUEUE_INSERT2(hio, name)	do {				\
194	mtx_lock(&hio_##name##_list_lock);				\
195	if (TAILQ_EMPTY(&hio_##name##_list))				\
196		cv_broadcast(&hio_##name##_list_cond);			\
197	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
198	hio_##name##_list_size++;					\
199	mtx_unlock(&hio_##name##_list_lock);				\
200} while (0)
201#define	QUEUE_TAKE1(hio, name, ncomp, timeout)	do {			\
202	bool _last;							\
203									\
204	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
205	_last = false;							\
206	while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \
207		cv_timedwait(&hio_##name##_list_cond[(ncomp)],		\
208		    &hio_##name##_list_lock[(ncomp)], (timeout));	\
209		if ((timeout) != 0)					\
210			_last = true;					\
211	}								\
212	if (hio != NULL) {						\
213		PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0);	\
214		hio_##name##_list_size[(ncomp)]--;			\
215		TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio),	\
216		    hio_next[(ncomp)]);					\
217	}								\
218	mtx_unlock(&hio_##name##_list_lock[(ncomp)]);			\
219} while (0)
220#define	QUEUE_TAKE2(hio, name)	do {					\
221	mtx_lock(&hio_##name##_list_lock);				\
222	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
223		cv_wait(&hio_##name##_list_cond,			\
224		    &hio_##name##_list_lock);				\
225	}								\
226	PJDLOG_ASSERT(hio_##name##_list_size != 0);			\
227	hio_##name##_list_size--;					\
228	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next);	\
229	mtx_unlock(&hio_##name##_list_lock);				\
230} while (0)
231
232#define ISFULLSYNC(hio)	((hio)->hio_replication == HAST_REPLICATION_FULLSYNC)
233#define ISMEMSYNC(hio)	((hio)->hio_replication == HAST_REPLICATION_MEMSYNC)
234#define ISASYNC(hio)	((hio)->hio_replication == HAST_REPLICATION_ASYNC)
235
236#define	SYNCREQ(hio)		do {					\
237	(hio)->hio_ggio.gctl_unit = -1;					\
238	(hio)->hio_ggio.gctl_seq = 1;					\
239} while (0)
240#define	ISSYNCREQ(hio)		((hio)->hio_ggio.gctl_unit == -1)
241#define	SYNCREQDONE(hio)	do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
242#define	ISSYNCREQDONE(hio)	((hio)->hio_ggio.gctl_unit == -2)
243
244#define ISMEMSYNCWRITE(hio)	(ISMEMSYNC(hio) &&			\
245	    (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio))
246
247static struct hast_resource *gres;
248
249static pthread_mutex_t range_lock;
250static struct rangelocks *range_regular;
251static bool range_regular_wait;
252static pthread_cond_t range_regular_cond;
253static struct rangelocks *range_sync;
254static bool range_sync_wait;
255static pthread_cond_t range_sync_cond;
256static bool fullystarted;
257
258static void *ggate_recv_thread(void *arg);
259static void *local_send_thread(void *arg);
260static void *remote_send_thread(void *arg);
261static void *remote_recv_thread(void *arg);
262static void *ggate_send_thread(void *arg);
263static void *sync_thread(void *arg);
264static void *guard_thread(void *arg);
265
266static void
267output_status_aux(struct nv *nvout)
268{
269
270	nv_add_uint64(nvout, (uint64_t)hio_free_list_size,
271	    "idle_queue_size");
272	nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size,
273	    "local_queue_size");
274	nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size,
275	    "send_queue_size");
276	nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size,
277	    "recv_queue_size");
278	nv_add_uint64(nvout, (uint64_t)hio_done_list_size,
279	    "done_queue_size");
280}
281
282static void
283cleanup(struct hast_resource *res)
284{
285	int rerrno;
286
287	/* Remember errno. */
288	rerrno = errno;
289
290	/* Destroy ggate provider if we created one. */
291	if (res->hr_ggateunit >= 0) {
292		struct g_gate_ctl_destroy ggiod;
293
294		bzero(&ggiod, sizeof(ggiod));
295		ggiod.gctl_version = G_GATE_VERSION;
296		ggiod.gctl_unit = res->hr_ggateunit;
297		ggiod.gctl_force = 1;
298		if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) {
299			pjdlog_errno(LOG_WARNING,
300			    "Unable to destroy hast/%s device",
301			    res->hr_provname);
302		}
303		res->hr_ggateunit = -1;
304	}
305
306	/* Restore errno. */
307	errno = rerrno;
308}
309
310static __dead2 void
311primary_exit(int exitcode, const char *fmt, ...)
312{
313	va_list ap;
314
315	PJDLOG_ASSERT(exitcode != EX_OK);
316	va_start(ap, fmt);
317	pjdlogv_errno(LOG_ERR, fmt, ap);
318	va_end(ap);
319	cleanup(gres);
320	exit(exitcode);
321}
322
323static __dead2 void
324primary_exitx(int exitcode, const char *fmt, ...)
325{
326	va_list ap;
327
328	va_start(ap, fmt);
329	pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
330	va_end(ap);
331	cleanup(gres);
332	exit(exitcode);
333}
334
335static int
336hast_activemap_flush(struct hast_resource *res) __unlocks(res->hr_amp_lock)
337{
338	const unsigned char *buf;
339	size_t size;
340	int ret;
341
342	mtx_lock(&res->hr_amp_diskmap_lock);
343	buf = activemap_bitmap(res->hr_amp, &size);
344	mtx_unlock(&res->hr_amp_lock);
345	PJDLOG_ASSERT(buf != NULL);
346	PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0);
347	ret = 0;
348	if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
349	    (ssize_t)size) {
350		pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk");
351		res->hr_stat_activemap_write_error++;
352		ret = -1;
353	}
354	if (ret == 0 && res->hr_metaflush == 1 &&
355	    g_flush(res->hr_localfd) == -1) {
356		if (errno == EOPNOTSUPP) {
357			pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.",
358			    res->hr_localpath);
359			res->hr_metaflush = 0;
360		} else {
361			pjdlog_errno(LOG_ERR,
362			    "Unable to flush disk cache on activemap update");
363			res->hr_stat_activemap_flush_error++;
364			ret = -1;
365		}
366	}
367	mtx_unlock(&res->hr_amp_diskmap_lock);
368	return (ret);
369}
370
371static bool
372real_remote(const struct hast_resource *res)
373{
374
375	return (strcmp(res->hr_remoteaddr, "none") != 0);
376}
377
378static void
379init_environment(struct hast_resource *res __unused)
380{
381	struct hio *hio;
382	unsigned int ii, ncomps;
383
384	/*
385	 * In the future it might be per-resource value.
386	 */
387	ncomps = HAST_NCOMPONENTS;
388
389	/*
390	 * Allocate memory needed by lists.
391	 */
392	hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
393	if (hio_send_list == NULL) {
394		primary_exitx(EX_TEMPFAIL,
395		    "Unable to allocate %zu bytes of memory for send lists.",
396		    sizeof(hio_send_list[0]) * ncomps);
397	}
398	hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps);
399	if (hio_send_list_size == NULL) {
400		primary_exitx(EX_TEMPFAIL,
401		    "Unable to allocate %zu bytes of memory for send list counters.",
402		    sizeof(hio_send_list_size[0]) * ncomps);
403	}
404	hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
405	if (hio_send_list_lock == NULL) {
406		primary_exitx(EX_TEMPFAIL,
407		    "Unable to allocate %zu bytes of memory for send list locks.",
408		    sizeof(hio_send_list_lock[0]) * ncomps);
409	}
410	hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
411	if (hio_send_list_cond == NULL) {
412		primary_exitx(EX_TEMPFAIL,
413		    "Unable to allocate %zu bytes of memory for send list condition variables.",
414		    sizeof(hio_send_list_cond[0]) * ncomps);
415	}
416	hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
417	if (hio_recv_list == NULL) {
418		primary_exitx(EX_TEMPFAIL,
419		    "Unable to allocate %zu bytes of memory for recv lists.",
420		    sizeof(hio_recv_list[0]) * ncomps);
421	}
422	hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps);
423	if (hio_recv_list_size == NULL) {
424		primary_exitx(EX_TEMPFAIL,
425		    "Unable to allocate %zu bytes of memory for recv list counters.",
426		    sizeof(hio_recv_list_size[0]) * ncomps);
427	}
428	hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
429	if (hio_recv_list_lock == NULL) {
430		primary_exitx(EX_TEMPFAIL,
431		    "Unable to allocate %zu bytes of memory for recv list locks.",
432		    sizeof(hio_recv_list_lock[0]) * ncomps);
433	}
434	hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
435	if (hio_recv_list_cond == NULL) {
436		primary_exitx(EX_TEMPFAIL,
437		    "Unable to allocate %zu bytes of memory for recv list condition variables.",
438		    sizeof(hio_recv_list_cond[0]) * ncomps);
439	}
440	hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
441	if (hio_remote_lock == NULL) {
442		primary_exitx(EX_TEMPFAIL,
443		    "Unable to allocate %zu bytes of memory for remote connections locks.",
444		    sizeof(hio_remote_lock[0]) * ncomps);
445	}
446
447	/*
448	 * Initialize lists, their counters, locks and condition variables.
449	 */
450	TAILQ_INIT(&hio_free_list);
451	mtx_init(&hio_free_list_lock);
452	cv_init(&hio_free_list_cond);
453	for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
454		TAILQ_INIT(&hio_send_list[ii]);
455		hio_send_list_size[ii] = 0;
456		mtx_init(&hio_send_list_lock[ii]);
457		cv_init(&hio_send_list_cond[ii]);
458		TAILQ_INIT(&hio_recv_list[ii]);
459		hio_recv_list_size[ii] = 0;
460		mtx_init(&hio_recv_list_lock[ii]);
461		cv_init(&hio_recv_list_cond[ii]);
462		rw_init(&hio_remote_lock[ii]);
463	}
464	TAILQ_INIT(&hio_done_list);
465	mtx_init(&hio_done_list_lock);
466	cv_init(&hio_done_list_cond);
467	mtx_init(&metadata_lock);
468
469	/*
470	 * Allocate requests pool and initialize requests.
471	 */
472	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
473		hio = malloc(sizeof(*hio));
474		if (hio == NULL) {
475			primary_exitx(EX_TEMPFAIL,
476			    "Unable to allocate %zu bytes of memory for hio request.",
477			    sizeof(*hio));
478		}
479		refcnt_init(&hio->hio_countdown, 0);
480		hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
481		if (hio->hio_errors == NULL) {
482			primary_exitx(EX_TEMPFAIL,
483			    "Unable allocate %zu bytes of memory for hio errors.",
484			    sizeof(hio->hio_errors[0]) * ncomps);
485		}
486		hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
487		if (hio->hio_next == NULL) {
488			primary_exitx(EX_TEMPFAIL,
489			    "Unable allocate %zu bytes of memory for hio_next field.",
490			    sizeof(hio->hio_next[0]) * ncomps);
491		}
492		hio->hio_ggio.gctl_version = G_GATE_VERSION;
493		hio->hio_ggio.gctl_data = malloc(MAXPHYS);
494		if (hio->hio_ggio.gctl_data == NULL) {
495			primary_exitx(EX_TEMPFAIL,
496			    "Unable to allocate %zu bytes of memory for gctl_data.",
497			    MAXPHYS);
498		}
499		hio->hio_ggio.gctl_length = MAXPHYS;
500		hio->hio_ggio.gctl_error = 0;
501		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
502		hio_free_list_size++;
503	}
504}
505
506static bool
507init_resuid(struct hast_resource *res)
508{
509
510	mtx_lock(&metadata_lock);
511	if (res->hr_resuid != 0) {
512		mtx_unlock(&metadata_lock);
513		return (false);
514	} else {
515		/* Initialize unique resource identifier. */
516		arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
517		mtx_unlock(&metadata_lock);
518		if (metadata_write(res) == -1)
519			exit(EX_NOINPUT);
520		return (true);
521	}
522}
523
524static void
525init_local(struct hast_resource *res)
526{
527	unsigned char *buf;
528	size_t mapsize;
529
530	if (metadata_read(res, true) == -1)
531		exit(EX_NOINPUT);
532	mtx_init(&res->hr_amp_lock);
533	if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
534	    res->hr_local_sectorsize, res->hr_keepdirty) == -1) {
535		primary_exit(EX_TEMPFAIL, "Unable to create activemap");
536	}
537	mtx_init(&range_lock);
538	cv_init(&range_regular_cond);
539	if (rangelock_init(&range_regular) == -1)
540		primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
541	cv_init(&range_sync_cond);
542	if (rangelock_init(&range_sync) == -1)
543		primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
544	mapsize = activemap_ondisk_size(res->hr_amp);
545	buf = calloc(1, mapsize);
546	if (buf == NULL) {
547		primary_exitx(EX_TEMPFAIL,
548		    "Unable to allocate buffer for activemap.");
549	}
550	if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
551	    (ssize_t)mapsize) {
552		primary_exit(EX_NOINPUT, "Unable to read activemap");
553	}
554	activemap_copyin(res->hr_amp, buf, mapsize);
555	free(buf);
556	if (res->hr_resuid != 0)
557		return;
558	/*
559	 * We're using provider for the first time. Initialize local and remote
560	 * counters. We don't initialize resuid here, as we want to do it just
561	 * in time. The reason for this is that we want to inform secondary
562	 * that there were no writes yet, so there is no need to synchronize
563	 * anything.
564	 */
565	res->hr_primary_localcnt = 0;
566	res->hr_primary_remotecnt = 0;
567	if (metadata_write(res) == -1)
568		exit(EX_NOINPUT);
569}
570
571static int
572primary_connect(struct hast_resource *res, struct proto_conn **connp)
573{
574	struct proto_conn *conn;
575	int16_t val;
576
577	val = 1;
578	if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) {
579		primary_exit(EX_TEMPFAIL,
580		    "Unable to send connection request to parent");
581	}
582	if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) {
583		primary_exit(EX_TEMPFAIL,
584		    "Unable to receive reply to connection request from parent");
585	}
586	if (val != 0) {
587		errno = val;
588		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
589		    res->hr_remoteaddr);
590		return (-1);
591	}
592	if (proto_connection_recv(res->hr_conn, true, &conn) == -1) {
593		primary_exit(EX_TEMPFAIL,
594		    "Unable to receive connection from parent");
595	}
596	if (proto_connect_wait(conn, res->hr_timeout) == -1) {
597		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
598		    res->hr_remoteaddr);
599		proto_close(conn);
600		return (-1);
601	}
602	/* Error in setting timeout is not critical, but why should it fail? */
603	if (proto_timeout(conn, res->hr_timeout) == -1)
604		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
605
606	*connp = conn;
607
608	return (0);
609}
610
611/*
612 * Function instructs GEOM_GATE to handle reads directly from within the kernel.
613 */
614static void
615enable_direct_reads(struct hast_resource *res)
616{
617	struct g_gate_ctl_modify ggiomodify;
618
619	bzero(&ggiomodify, sizeof(ggiomodify));
620	ggiomodify.gctl_version = G_GATE_VERSION;
621	ggiomodify.gctl_unit = res->hr_ggateunit;
622	ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET;
623	strlcpy(ggiomodify.gctl_readprov, res->hr_localpath,
624	    sizeof(ggiomodify.gctl_readprov));
625	ggiomodify.gctl_readoffset = res->hr_localoff;
626	if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0)
627		pjdlog_debug(1, "Direct reads enabled.");
628	else
629		pjdlog_errno(LOG_WARNING, "Failed to enable direct reads");
630}
631
632static int
633init_remote(struct hast_resource *res, struct proto_conn **inp,
634    struct proto_conn **outp)
635{
636	struct proto_conn *in, *out;
637	struct nv *nvout, *nvin;
638	const unsigned char *token;
639	unsigned char *map;
640	const char *errmsg;
641	int32_t extentsize;
642	int64_t datasize;
643	uint32_t mapsize;
644	uint8_t version;
645	size_t size;
646	int error;
647
648	PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
649	PJDLOG_ASSERT(real_remote(res));
650
651	in = out = NULL;
652	errmsg = NULL;
653
654	if (primary_connect(res, &out) == -1)
655		return (ECONNREFUSED);
656
657	error = ECONNABORTED;
658
659	/*
660	 * First handshake step.
661	 * Setup outgoing connection with remote node.
662	 */
663	nvout = nv_alloc();
664	nv_add_string(nvout, res->hr_name, "resource");
665	nv_add_uint8(nvout, HAST_PROTO_VERSION, "version");
666	if (nv_error(nvout) != 0) {
667		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
668		    "Unable to allocate header for connection with %s",
669		    res->hr_remoteaddr);
670		nv_free(nvout);
671		goto close;
672	}
673	if (hast_proto_send(res, out, nvout, NULL, 0) == -1) {
674		pjdlog_errno(LOG_WARNING,
675		    "Unable to send handshake header to %s",
676		    res->hr_remoteaddr);
677		nv_free(nvout);
678		goto close;
679	}
680	nv_free(nvout);
681	if (hast_proto_recv_hdr(out, &nvin) == -1) {
682		pjdlog_errno(LOG_WARNING,
683		    "Unable to receive handshake header from %s",
684		    res->hr_remoteaddr);
685		goto close;
686	}
687	errmsg = nv_get_string(nvin, "errmsg");
688	if (errmsg != NULL) {
689		pjdlog_warning("%s", errmsg);
690		if (nv_exists(nvin, "wait"))
691			error = EBUSY;
692		nv_free(nvin);
693		goto close;
694	}
695	version = nv_get_uint8(nvin, "version");
696	if (version == 0) {
697		/*
698		 * If no version is sent, it means this is protocol version 1.
699		 */
700		version = 1;
701	}
702	if (version > HAST_PROTO_VERSION) {
703		pjdlog_warning("Invalid version received (%hhu).", version);
704		nv_free(nvin);
705		goto close;
706	}
707	res->hr_version = version;
708	pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version);
709	token = nv_get_uint8_array(nvin, &size, "token");
710	if (token == NULL) {
711		pjdlog_warning("Handshake header from %s has no 'token' field.",
712		    res->hr_remoteaddr);
713		nv_free(nvin);
714		goto close;
715	}
716	if (size != sizeof(res->hr_token)) {
717		pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
718		    res->hr_remoteaddr, size, sizeof(res->hr_token));
719		nv_free(nvin);
720		goto close;
721	}
722	bcopy(token, res->hr_token, sizeof(res->hr_token));
723	nv_free(nvin);
724
725	/*
726	 * Second handshake step.
727	 * Setup incoming connection with remote node.
728	 */
729	if (primary_connect(res, &in) == -1)
730		goto close;
731
732	nvout = nv_alloc();
733	nv_add_string(nvout, res->hr_name, "resource");
734	nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
735	    "token");
736	if (res->hr_resuid == 0) {
737		/*
738		 * The resuid field was not yet initialized.
739		 * Because we do synchronization inside init_resuid(), it is
740		 * possible that someone already initialized it, the function
741		 * will return false then, but if we successfully initialized
742		 * it, we will get true. True means that there were no writes
743		 * to this resource yet and we want to inform secondary that
744		 * synchronization is not needed by sending "virgin" argument.
745		 */
746		if (init_resuid(res))
747			nv_add_int8(nvout, 1, "virgin");
748	}
749	nv_add_uint64(nvout, res->hr_resuid, "resuid");
750	nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
751	nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
752	if (nv_error(nvout) != 0) {
753		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
754		    "Unable to allocate header for connection with %s",
755		    res->hr_remoteaddr);
756		nv_free(nvout);
757		goto close;
758	}
759	if (hast_proto_send(res, in, nvout, NULL, 0) == -1) {
760		pjdlog_errno(LOG_WARNING,
761		    "Unable to send handshake header to %s",
762		    res->hr_remoteaddr);
763		nv_free(nvout);
764		goto close;
765	}
766	nv_free(nvout);
767	if (hast_proto_recv_hdr(out, &nvin) == -1) {
768		pjdlog_errno(LOG_WARNING,
769		    "Unable to receive handshake header from %s",
770		    res->hr_remoteaddr);
771		goto close;
772	}
773	errmsg = nv_get_string(nvin, "errmsg");
774	if (errmsg != NULL) {
775		pjdlog_warning("%s", errmsg);
776		nv_free(nvin);
777		goto close;
778	}
779	datasize = nv_get_int64(nvin, "datasize");
780	if (datasize != res->hr_datasize) {
781		pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
782		    (intmax_t)res->hr_datasize, (intmax_t)datasize);
783		nv_free(nvin);
784		goto close;
785	}
786	extentsize = nv_get_int32(nvin, "extentsize");
787	if (extentsize != res->hr_extentsize) {
788		pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
789		    (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
790		nv_free(nvin);
791		goto close;
792	}
793	res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
794	res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
795	res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
796	if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY)
797		enable_direct_reads(res);
798	if (nv_exists(nvin, "virgin")) {
799		/*
800		 * Secondary was reinitialized, bump localcnt if it is 0 as
801		 * only we have the data.
802		 */
803		PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY);
804		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
805
806		if (res->hr_primary_localcnt == 0) {
807			PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0);
808
809			mtx_lock(&metadata_lock);
810			res->hr_primary_localcnt++;
811			pjdlog_debug(1, "Increasing localcnt to %ju.",
812			    (uintmax_t)res->hr_primary_localcnt);
813			(void)metadata_write(res);
814			mtx_unlock(&metadata_lock);
815		}
816	}
817	map = NULL;
818	mapsize = nv_get_uint32(nvin, "mapsize");
819	if (mapsize > 0) {
820		map = malloc(mapsize);
821		if (map == NULL) {
822			pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
823			    (uintmax_t)mapsize);
824			nv_free(nvin);
825			goto close;
826		}
827		/*
828		 * Remote node have some dirty extents on its own, lets
829		 * download its activemap.
830		 */
831		if (hast_proto_recv_data(res, out, nvin, map,
832		    mapsize) == -1) {
833			pjdlog_errno(LOG_ERR,
834			    "Unable to receive remote activemap");
835			nv_free(nvin);
836			free(map);
837			goto close;
838		}
839		mtx_lock(&res->hr_amp_lock);
840		/*
841		 * Merge local and remote bitmaps.
842		 */
843		activemap_merge(res->hr_amp, map, mapsize);
844		free(map);
845		/*
846		 * Now that we merged bitmaps from both nodes, flush it to the
847		 * disk before we start to synchronize.
848		 */
849		(void)hast_activemap_flush(res);
850	}
851	nv_free(nvin);
852#ifdef notyet
853	/* Setup directions. */
854	if (proto_send(out, NULL, 0) == -1)
855		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
856	if (proto_recv(in, NULL, 0) == -1)
857		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
858#endif
859	pjdlog_info("Connected to %s.", res->hr_remoteaddr);
860	if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC &&
861	    res->hr_version < 2) {
862		pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode.");
863		res->hr_replication = HAST_REPLICATION_FULLSYNC;
864	} else if (res->hr_replication != res->hr_original_replication) {
865		/*
866		 * This is in case hastd disconnected and was upgraded.
867		 */
868		res->hr_replication = res->hr_original_replication;
869	}
870	if (inp != NULL && outp != NULL) {
871		*inp = in;
872		*outp = out;
873	} else {
874		res->hr_remotein = in;
875		res->hr_remoteout = out;
876	}
877	event_send(res, EVENT_CONNECT);
878	return (0);
879close:
880	if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0)
881		event_send(res, EVENT_SPLITBRAIN);
882	proto_close(out);
883	if (in != NULL)
884		proto_close(in);
885	return (error);
886}
887
888static void
889sync_start(void)
890{
891
892	mtx_lock(&sync_lock);
893	sync_inprogress = true;
894	mtx_unlock(&sync_lock);
895	cv_signal(&sync_cond);
896}
897
898static void
899sync_stop(void)
900{
901
902	mtx_lock(&sync_lock);
903	if (sync_inprogress)
904		sync_inprogress = false;
905	mtx_unlock(&sync_lock);
906}
907
908static void
909init_ggate(struct hast_resource *res)
910{
911	struct g_gate_ctl_create ggiocreate;
912	struct g_gate_ctl_cancel ggiocancel;
913
914	/*
915	 * We communicate with ggate via /dev/ggctl. Open it.
916	 */
917	res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
918	if (res->hr_ggatefd == -1)
919		primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
920	/*
921	 * Create provider before trying to connect, as connection failure
922	 * is not critical, but may take some time.
923	 */
924	bzero(&ggiocreate, sizeof(ggiocreate));
925	ggiocreate.gctl_version = G_GATE_VERSION;
926	ggiocreate.gctl_mediasize = res->hr_datasize;
927	ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
928	ggiocreate.gctl_flags = 0;
929	ggiocreate.gctl_maxcount = 0;
930	ggiocreate.gctl_timeout = 0;
931	ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
932	snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
933	    res->hr_provname);
934	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
935		pjdlog_info("Device hast/%s created.", res->hr_provname);
936		res->hr_ggateunit = ggiocreate.gctl_unit;
937		return;
938	}
939	if (errno != EEXIST) {
940		primary_exit(EX_OSERR, "Unable to create hast/%s device",
941		    res->hr_provname);
942	}
943	pjdlog_debug(1,
944	    "Device hast/%s already exists, we will try to take it over.",
945	    res->hr_provname);
946	/*
947	 * If we received EEXIST, we assume that the process who created the
948	 * provider died and didn't clean up. In that case we will start from
949	 * where he left of.
950	 */
951	bzero(&ggiocancel, sizeof(ggiocancel));
952	ggiocancel.gctl_version = G_GATE_VERSION;
953	ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
954	snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
955	    res->hr_provname);
956	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
957		pjdlog_info("Device hast/%s recovered.", res->hr_provname);
958		res->hr_ggateunit = ggiocancel.gctl_unit;
959		return;
960	}
961	primary_exit(EX_OSERR, "Unable to take over hast/%s device",
962	    res->hr_provname);
963}
964
965void
966hastd_primary(struct hast_resource *res)
967{
968	pthread_t td;
969	pid_t pid;
970	int error, mode, debuglevel;
971
972	/*
973	 * Create communication channel for sending control commands from
974	 * parent to child.
975	 */
976	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
977		/* TODO: There's no need for this to be fatal error. */
978		KEEP_ERRNO((void)pidfile_remove(pfh));
979		pjdlog_exit(EX_OSERR,
980		    "Unable to create control sockets between parent and child");
981	}
982	/*
983	 * Create communication channel for sending events from child to parent.
984	 */
985	if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
986		/* TODO: There's no need for this to be fatal error. */
987		KEEP_ERRNO((void)pidfile_remove(pfh));
988		pjdlog_exit(EX_OSERR,
989		    "Unable to create event sockets between child and parent");
990	}
991	/*
992	 * Create communication channel for sending connection requests from
993	 * child to parent.
994	 */
995	if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) {
996		/* TODO: There's no need for this to be fatal error. */
997		KEEP_ERRNO((void)pidfile_remove(pfh));
998		pjdlog_exit(EX_OSERR,
999		    "Unable to create connection sockets between child and parent");
1000	}
1001
1002	pid = fork();
1003	if (pid == -1) {
1004		/* TODO: There's no need for this to be fatal error. */
1005		KEEP_ERRNO((void)pidfile_remove(pfh));
1006		pjdlog_exit(EX_TEMPFAIL, "Unable to fork");
1007	}
1008
1009	if (pid > 0) {
1010		/* This is parent. */
1011		/* Declare that we are receiver. */
1012		proto_recv(res->hr_event, NULL, 0);
1013		proto_recv(res->hr_conn, NULL, 0);
1014		/* Declare that we are sender. */
1015		proto_send(res->hr_ctrl, NULL, 0);
1016		res->hr_workerpid = pid;
1017		return;
1018	}
1019
1020	gres = res;
1021	res->output_status_aux = output_status_aux;
1022	mode = pjdlog_mode_get();
1023	debuglevel = pjdlog_debug_get();
1024
1025	/* Declare that we are sender. */
1026	proto_send(res->hr_event, NULL, 0);
1027	proto_send(res->hr_conn, NULL, 0);
1028	/* Declare that we are receiver. */
1029	proto_recv(res->hr_ctrl, NULL, 0);
1030	descriptors_cleanup(res);
1031
1032	descriptors_assert(res, mode);
1033
1034	pjdlog_init(mode);
1035	pjdlog_debug_set(debuglevel);
1036	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
1037	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
1038
1039	init_local(res);
1040	init_ggate(res);
1041	init_environment(res);
1042
1043	if (drop_privs(res) != 0) {
1044		cleanup(res);
1045		exit(EX_CONFIG);
1046	}
1047	pjdlog_info("Privileges successfully dropped.");
1048
1049	/*
1050	 * Create the guard thread first, so we can handle signals from the
1051	 * very beginning.
1052	 */
1053	error = pthread_create(&td, NULL, guard_thread, res);
1054	PJDLOG_ASSERT(error == 0);
1055	/*
1056	 * Create the control thread before sending any event to the parent,
1057	 * as we can deadlock when parent sends control request to worker,
1058	 * but worker has no control thread started yet, so parent waits.
1059	 * In the meantime worker sends an event to the parent, but parent
1060	 * is unable to handle the event, because it waits for control
1061	 * request response.
1062	 */
1063	error = pthread_create(&td, NULL, ctrl_thread, res);
1064	PJDLOG_ASSERT(error == 0);
1065	if (real_remote(res)) {
1066		error = init_remote(res, NULL, NULL);
1067		if (error == 0) {
1068			sync_start();
1069		} else if (error == EBUSY) {
1070			time_t start = time(NULL);
1071
1072			pjdlog_warning("Waiting for remote node to become %s for %ds.",
1073			    role2str(HAST_ROLE_SECONDARY),
1074			    res->hr_timeout);
1075			for (;;) {
1076				sleep(1);
1077				error = init_remote(res, NULL, NULL);
1078				if (error != EBUSY)
1079					break;
1080				if (time(NULL) > start + res->hr_timeout)
1081					break;
1082			}
1083			if (error == EBUSY) {
1084				pjdlog_warning("Remote node is still %s, starting anyway.",
1085				    role2str(HAST_ROLE_PRIMARY));
1086			}
1087		}
1088	}
1089	error = pthread_create(&td, NULL, ggate_recv_thread, res);
1090	PJDLOG_ASSERT(error == 0);
1091	error = pthread_create(&td, NULL, local_send_thread, res);
1092	PJDLOG_ASSERT(error == 0);
1093	error = pthread_create(&td, NULL, remote_send_thread, res);
1094	PJDLOG_ASSERT(error == 0);
1095	error = pthread_create(&td, NULL, remote_recv_thread, res);
1096	PJDLOG_ASSERT(error == 0);
1097	error = pthread_create(&td, NULL, ggate_send_thread, res);
1098	PJDLOG_ASSERT(error == 0);
1099	fullystarted = true;
1100	(void)sync_thread(res);
1101}
1102
1103static void
1104reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio,
1105    const char *fmt, ...)
1106{
1107	char msg[1024];
1108	va_list ap;
1109
1110	va_start(ap, fmt);
1111	(void)vsnprintf(msg, sizeof(msg), fmt, ap);
1112	va_end(ap);
1113	switch (ggio->gctl_cmd) {
1114	case BIO_READ:
1115		(void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
1116		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1117		break;
1118	case BIO_DELETE:
1119		(void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
1120		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1121		break;
1122	case BIO_FLUSH:
1123		(void)snprlcat(msg, sizeof(msg), "FLUSH.");
1124		break;
1125	case BIO_WRITE:
1126		(void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
1127		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
1128		break;
1129	default:
1130		(void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
1131		    (unsigned int)ggio->gctl_cmd);
1132		break;
1133	}
1134	pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
1135}
1136
1137static void
1138remote_close(struct hast_resource *res, int ncomp)
1139{
1140
1141	rw_wlock(&hio_remote_lock[ncomp]);
1142	/*
1143	 * Check for a race between dropping rlock and acquiring wlock -
1144	 * another thread can close connection in-between.
1145	 */
1146	if (!ISCONNECTED(res, ncomp)) {
1147		PJDLOG_ASSERT(res->hr_remotein == NULL);
1148		PJDLOG_ASSERT(res->hr_remoteout == NULL);
1149		rw_unlock(&hio_remote_lock[ncomp]);
1150		return;
1151	}
1152
1153	PJDLOG_ASSERT(res->hr_remotein != NULL);
1154	PJDLOG_ASSERT(res->hr_remoteout != NULL);
1155
1156	pjdlog_debug(2, "Closing incoming connection to %s.",
1157	    res->hr_remoteaddr);
1158	proto_close(res->hr_remotein);
1159	res->hr_remotein = NULL;
1160	pjdlog_debug(2, "Closing outgoing connection to %s.",
1161	    res->hr_remoteaddr);
1162	proto_close(res->hr_remoteout);
1163	res->hr_remoteout = NULL;
1164
1165	rw_unlock(&hio_remote_lock[ncomp]);
1166
1167	pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr);
1168
1169	/*
1170	 * Stop synchronization if in-progress.
1171	 */
1172	sync_stop();
1173
1174	event_send(res, EVENT_DISCONNECT);
1175}
1176
1177/*
1178 * Acknowledge write completion to the kernel, but don't update activemap yet.
1179 */
1180static void
1181write_complete(struct hast_resource *res, struct hio *hio)
1182{
1183	struct g_gate_ctl_io *ggio;
1184	unsigned int ncomp;
1185
1186	PJDLOG_ASSERT(!hio->hio_done);
1187
1188	ggio = &hio->hio_ggio;
1189	PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
1190
1191	/*
1192	 * Bump local count if this is first write after
1193	 * connection failure with remote node.
1194	 */
1195	ncomp = 1;
1196	rw_rlock(&hio_remote_lock[ncomp]);
1197	if (!ISCONNECTED(res, ncomp)) {
1198		mtx_lock(&metadata_lock);
1199		if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
1200			res->hr_primary_localcnt++;
1201			pjdlog_debug(1, "Increasing localcnt to %ju.",
1202			    (uintmax_t)res->hr_primary_localcnt);
1203			(void)metadata_write(res);
1204		}
1205		mtx_unlock(&metadata_lock);
1206	}
1207	rw_unlock(&hio_remote_lock[ncomp]);
1208	if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1)
1209		primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1210	hio->hio_done = true;
1211}
1212
1213/*
1214 * Thread receives ggate I/O requests from the kernel and passes them to
1215 * appropriate threads:
1216 * WRITE - always goes to both local_send and remote_send threads
1217 * READ (when the block is up-to-date on local component) -
1218 *	only local_send thread
1219 * READ (when the block isn't up-to-date on local component) -
1220 *	only remote_send thread
1221 * DELETE - always goes to both local_send and remote_send threads
1222 * FLUSH - always goes to both local_send and remote_send threads
1223 */
1224static void *
1225ggate_recv_thread(void *arg)
1226{
1227	struct hast_resource *res = arg;
1228	struct g_gate_ctl_io *ggio;
1229	struct hio *hio;
1230	unsigned int ii, ncomp, ncomps;
1231	int error;
1232
1233	for (;;) {
1234		pjdlog_debug(2, "ggate_recv: Taking free request.");
1235		QUEUE_TAKE2(hio, free);
1236		pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
1237		ggio = &hio->hio_ggio;
1238		ggio->gctl_unit = res->hr_ggateunit;
1239		ggio->gctl_length = MAXPHYS;
1240		ggio->gctl_error = 0;
1241		hio->hio_done = false;
1242		hio->hio_replication = res->hr_replication;
1243		pjdlog_debug(2,
1244		    "ggate_recv: (%p) Waiting for request from the kernel.",
1245		    hio);
1246		if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) {
1247			if (sigexit_received)
1248				pthread_exit(NULL);
1249			primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
1250		}
1251		error = ggio->gctl_error;
1252		switch (error) {
1253		case 0:
1254			break;
1255		case ECANCELED:
1256			/* Exit gracefully. */
1257			if (!sigexit_received) {
1258				pjdlog_debug(2,
1259				    "ggate_recv: (%p) Received cancel from the kernel.",
1260				    hio);
1261				pjdlog_info("Received cancel from the kernel, exiting.");
1262			}
1263			pthread_exit(NULL);
1264		case ENOMEM:
1265			/*
1266			 * Buffer too small? Impossible, we allocate MAXPHYS
1267			 * bytes - request can't be bigger than that.
1268			 */
1269			/* FALLTHROUGH */
1270		case ENXIO:
1271		default:
1272			primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
1273			    strerror(error));
1274		}
1275
1276		ncomp = 0;
1277		ncomps = HAST_NCOMPONENTS;
1278
1279		for (ii = 0; ii < ncomps; ii++)
1280			hio->hio_errors[ii] = EINVAL;
1281		reqlog(LOG_DEBUG, 2, ggio,
1282		    "ggate_recv: (%p) Request received from the kernel: ",
1283		    hio);
1284
1285		/*
1286		 * Inform all components about new write request.
1287		 * For read request prefer local component unless the given
1288		 * range is out-of-date, then use remote component.
1289		 */
1290		switch (ggio->gctl_cmd) {
1291		case BIO_READ:
1292			res->hr_stat_read++;
1293			ncomps = 1;
1294			mtx_lock(&metadata_lock);
1295			if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
1296			    res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1297				/*
1298				 * This range is up-to-date on local component,
1299				 * so handle request locally.
1300				 */
1301				 /* Local component is 0 for now. */
1302				ncomp = 0;
1303			} else /* if (res->hr_syncsrc ==
1304			    HAST_SYNCSRC_SECONDARY) */ {
1305				PJDLOG_ASSERT(res->hr_syncsrc ==
1306				    HAST_SYNCSRC_SECONDARY);
1307				/*
1308				 * This range is out-of-date on local component,
1309				 * so send request to the remote node.
1310				 */
1311				 /* Remote component is 1 for now. */
1312				ncomp = 1;
1313			}
1314			mtx_unlock(&metadata_lock);
1315			break;
1316		case BIO_WRITE:
1317			res->hr_stat_write++;
1318			if (res->hr_resuid == 0 &&
1319			    res->hr_primary_localcnt == 0) {
1320				/* This is first write. */
1321				res->hr_primary_localcnt = 1;
1322			}
1323			for (;;) {
1324				mtx_lock(&range_lock);
1325				if (rangelock_islocked(range_sync,
1326				    ggio->gctl_offset, ggio->gctl_length)) {
1327					pjdlog_debug(2,
1328					    "regular: Range offset=%jd length=%zu locked.",
1329					    (intmax_t)ggio->gctl_offset,
1330					    (size_t)ggio->gctl_length);
1331					range_regular_wait = true;
1332					cv_wait(&range_regular_cond, &range_lock);
1333					range_regular_wait = false;
1334					mtx_unlock(&range_lock);
1335					continue;
1336				}
1337				if (rangelock_add(range_regular,
1338				    ggio->gctl_offset, ggio->gctl_length) == -1) {
1339					mtx_unlock(&range_lock);
1340					pjdlog_debug(2,
1341					    "regular: Range offset=%jd length=%zu is already locked, waiting.",
1342					    (intmax_t)ggio->gctl_offset,
1343					    (size_t)ggio->gctl_length);
1344					sleep(1);
1345					continue;
1346				}
1347				mtx_unlock(&range_lock);
1348				break;
1349			}
1350			mtx_lock(&res->hr_amp_lock);
1351			if (activemap_write_start(res->hr_amp,
1352			    ggio->gctl_offset, ggio->gctl_length)) {
1353				res->hr_stat_activemap_update++;
1354				(void)hast_activemap_flush(res);
1355			} else {
1356				mtx_unlock(&res->hr_amp_lock);
1357			}
1358			if (ISMEMSYNC(hio)) {
1359				hio->hio_memsyncacked = false;
1360				refcnt_init(&hio->hio_writecount, ncomps);
1361			}
1362			break;
1363		case BIO_DELETE:
1364			res->hr_stat_delete++;
1365			break;
1366		case BIO_FLUSH:
1367			res->hr_stat_flush++;
1368			break;
1369		}
1370		pjdlog_debug(2,
1371		    "ggate_recv: (%p) Moving request to the send queues.", hio);
1372		refcnt_init(&hio->hio_countdown, ncomps);
1373		for (ii = ncomp; ii < ncomps; ii++)
1374			QUEUE_INSERT1(hio, send, ii);
1375	}
1376	/* NOTREACHED */
1377	return (NULL);
1378}
1379
1380/*
1381 * Thread reads from or writes to local component.
1382 * If local read fails, it redirects it to remote_send thread.
1383 */
1384static void *
1385local_send_thread(void *arg)
1386{
1387	struct hast_resource *res = arg;
1388	struct g_gate_ctl_io *ggio;
1389	struct hio *hio;
1390	unsigned int ncomp, rncomp;
1391	ssize_t ret;
1392
1393	/* Local component is 0 for now. */
1394	ncomp = 0;
1395	/* Remote component is 1 for now. */
1396	rncomp = 1;
1397
1398	for (;;) {
1399		pjdlog_debug(2, "local_send: Taking request.");
1400		QUEUE_TAKE1(hio, send, ncomp, 0);
1401		pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1402		ggio = &hio->hio_ggio;
1403		switch (ggio->gctl_cmd) {
1404		case BIO_READ:
1405			ret = pread(res->hr_localfd, ggio->gctl_data,
1406			    ggio->gctl_length,
1407			    ggio->gctl_offset + res->hr_localoff);
1408			if (ret == ggio->gctl_length)
1409				hio->hio_errors[ncomp] = 0;
1410			else if (!ISSYNCREQ(hio)) {
1411				/*
1412				 * If READ failed, try to read from remote node.
1413				 */
1414				if (ret == -1) {
1415					reqlog(LOG_WARNING, 0, ggio,
1416					    "Local request failed (%s), trying remote node. ",
1417					    strerror(errno));
1418				} else if (ret != ggio->gctl_length) {
1419					reqlog(LOG_WARNING, 0, ggio,
1420					    "Local request failed (%zd != %jd), trying remote node. ",
1421					    ret, (intmax_t)ggio->gctl_length);
1422				}
1423				QUEUE_INSERT1(hio, send, rncomp);
1424				continue;
1425			}
1426			break;
1427		case BIO_WRITE:
1428			ret = pwrite(res->hr_localfd, ggio->gctl_data,
1429			    ggio->gctl_length,
1430			    ggio->gctl_offset + res->hr_localoff);
1431			if (ret == -1) {
1432				hio->hio_errors[ncomp] = errno;
1433				reqlog(LOG_WARNING, 0, ggio,
1434				    "Local request failed (%s): ",
1435				    strerror(errno));
1436			} else if (ret != ggio->gctl_length) {
1437				hio->hio_errors[ncomp] = EIO;
1438				reqlog(LOG_WARNING, 0, ggio,
1439				    "Local request failed (%zd != %jd): ",
1440				    ret, (intmax_t)ggio->gctl_length);
1441			} else {
1442				hio->hio_errors[ncomp] = 0;
1443				if (ISASYNC(hio)) {
1444					ggio->gctl_error = 0;
1445					write_complete(res, hio);
1446				}
1447			}
1448			break;
1449		case BIO_DELETE:
1450			ret = g_delete(res->hr_localfd,
1451			    ggio->gctl_offset + res->hr_localoff,
1452			    ggio->gctl_length);
1453			if (ret == -1) {
1454				hio->hio_errors[ncomp] = errno;
1455				reqlog(LOG_WARNING, 0, ggio,
1456				    "Local request failed (%s): ",
1457				    strerror(errno));
1458			} else {
1459				hio->hio_errors[ncomp] = 0;
1460			}
1461			break;
1462		case BIO_FLUSH:
1463			if (!res->hr_localflush) {
1464				ret = -1;
1465				errno = EOPNOTSUPP;
1466				break;
1467			}
1468			ret = g_flush(res->hr_localfd);
1469			if (ret == -1) {
1470				if (errno == EOPNOTSUPP)
1471					res->hr_localflush = false;
1472				hio->hio_errors[ncomp] = errno;
1473				reqlog(LOG_WARNING, 0, ggio,
1474				    "Local request failed (%s): ",
1475				    strerror(errno));
1476			} else {
1477				hio->hio_errors[ncomp] = 0;
1478			}
1479			break;
1480		}
1481		if (ISMEMSYNCWRITE(hio)) {
1482			if (refcnt_release(&hio->hio_writecount) == 0) {
1483				write_complete(res, hio);
1484			}
1485		}
1486		if (refcnt_release(&hio->hio_countdown) > 0)
1487			continue;
1488		if (ISSYNCREQ(hio)) {
1489			mtx_lock(&sync_lock);
1490			SYNCREQDONE(hio);
1491			mtx_unlock(&sync_lock);
1492			cv_signal(&sync_cond);
1493		} else {
1494			pjdlog_debug(2,
1495			    "local_send: (%p) Moving request to the done queue.",
1496			    hio);
1497			QUEUE_INSERT2(hio, done);
1498		}
1499	}
1500	/* NOTREACHED */
1501	return (NULL);
1502}
1503
1504static void
1505keepalive_send(struct hast_resource *res, unsigned int ncomp)
1506{
1507	struct nv *nv;
1508
1509	rw_rlock(&hio_remote_lock[ncomp]);
1510
1511	if (!ISCONNECTED(res, ncomp)) {
1512		rw_unlock(&hio_remote_lock[ncomp]);
1513		return;
1514	}
1515
1516	PJDLOG_ASSERT(res->hr_remotein != NULL);
1517	PJDLOG_ASSERT(res->hr_remoteout != NULL);
1518
1519	nv = nv_alloc();
1520	nv_add_uint8(nv, HIO_KEEPALIVE, "cmd");
1521	if (nv_error(nv) != 0) {
1522		rw_unlock(&hio_remote_lock[ncomp]);
1523		nv_free(nv);
1524		pjdlog_debug(1,
1525		    "keepalive_send: Unable to prepare header to send.");
1526		return;
1527	}
1528	if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) {
1529		rw_unlock(&hio_remote_lock[ncomp]);
1530		pjdlog_common(LOG_DEBUG, 1, errno,
1531		    "keepalive_send: Unable to send request");
1532		nv_free(nv);
1533		remote_close(res, ncomp);
1534		return;
1535	}
1536
1537	rw_unlock(&hio_remote_lock[ncomp]);
1538	nv_free(nv);
1539	pjdlog_debug(2, "keepalive_send: Request sent.");
1540}
1541
1542/*
1543 * Thread sends request to secondary node.
1544 */
1545static void *
1546remote_send_thread(void *arg)
1547{
1548	struct hast_resource *res = arg;
1549	struct g_gate_ctl_io *ggio;
1550	time_t lastcheck, now;
1551	struct hio *hio;
1552	struct nv *nv;
1553	unsigned int ncomp;
1554	bool wakeup;
1555	uint64_t offset, length;
1556	uint8_t cmd;
1557	void *data;
1558
1559	/* Remote component is 1 for now. */
1560	ncomp = 1;
1561	lastcheck = time(NULL);
1562
1563	for (;;) {
1564		pjdlog_debug(2, "remote_send: Taking request.");
1565		QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE);
1566		if (hio == NULL) {
1567			now = time(NULL);
1568			if (lastcheck + HAST_KEEPALIVE <= now) {
1569				keepalive_send(res, ncomp);
1570				lastcheck = now;
1571			}
1572			continue;
1573		}
1574		pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1575		ggio = &hio->hio_ggio;
1576		switch (ggio->gctl_cmd) {
1577		case BIO_READ:
1578			cmd = HIO_READ;
1579			data = NULL;
1580			offset = ggio->gctl_offset;
1581			length = ggio->gctl_length;
1582			break;
1583		case BIO_WRITE:
1584			cmd = HIO_WRITE;
1585			data = ggio->gctl_data;
1586			offset = ggio->gctl_offset;
1587			length = ggio->gctl_length;
1588			break;
1589		case BIO_DELETE:
1590			cmd = HIO_DELETE;
1591			data = NULL;
1592			offset = ggio->gctl_offset;
1593			length = ggio->gctl_length;
1594			break;
1595		case BIO_FLUSH:
1596			cmd = HIO_FLUSH;
1597			data = NULL;
1598			offset = 0;
1599			length = 0;
1600			break;
1601		default:
1602			PJDLOG_ABORT("invalid condition");
1603		}
1604		nv = nv_alloc();
1605		nv_add_uint8(nv, cmd, "cmd");
1606		nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1607		nv_add_uint64(nv, offset, "offset");
1608		nv_add_uint64(nv, length, "length");
1609		if (ISMEMSYNCWRITE(hio))
1610			nv_add_uint8(nv, 1, "memsync");
1611		if (nv_error(nv) != 0) {
1612			hio->hio_errors[ncomp] = nv_error(nv);
1613			pjdlog_debug(2,
1614			    "remote_send: (%p) Unable to prepare header to send.",
1615			    hio);
1616			reqlog(LOG_ERR, 0, ggio,
1617			    "Unable to prepare header to send (%s): ",
1618			    strerror(nv_error(nv)));
1619			/* Move failed request immediately to the done queue. */
1620			goto done_queue;
1621		}
1622		/*
1623		 * Protect connection from disappearing.
1624		 */
1625		rw_rlock(&hio_remote_lock[ncomp]);
1626		if (!ISCONNECTED(res, ncomp)) {
1627			rw_unlock(&hio_remote_lock[ncomp]);
1628			hio->hio_errors[ncomp] = ENOTCONN;
1629			goto done_queue;
1630		}
1631		/*
1632		 * Move the request to recv queue before sending it, because
1633		 * in different order we can get reply before we move request
1634		 * to recv queue.
1635		 */
1636		pjdlog_debug(2,
1637		    "remote_send: (%p) Moving request to the recv queue.",
1638		    hio);
1639		mtx_lock(&hio_recv_list_lock[ncomp]);
1640		wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1641		TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1642		hio_recv_list_size[ncomp]++;
1643		mtx_unlock(&hio_recv_list_lock[ncomp]);
1644		if (hast_proto_send(res, res->hr_remoteout, nv, data,
1645		    data != NULL ? length : 0) == -1) {
1646			hio->hio_errors[ncomp] = errno;
1647			rw_unlock(&hio_remote_lock[ncomp]);
1648			pjdlog_debug(2,
1649			    "remote_send: (%p) Unable to send request.", hio);
1650			reqlog(LOG_ERR, 0, ggio,
1651			    "Unable to send request (%s): ",
1652			    strerror(hio->hio_errors[ncomp]));
1653			remote_close(res, ncomp);
1654		} else {
1655			rw_unlock(&hio_remote_lock[ncomp]);
1656		}
1657		nv_free(nv);
1658		if (wakeup)
1659			cv_signal(&hio_recv_list_cond[ncomp]);
1660		continue;
1661done_queue:
1662		nv_free(nv);
1663		if (ISSYNCREQ(hio)) {
1664			if (refcnt_release(&hio->hio_countdown) > 0)
1665				continue;
1666			mtx_lock(&sync_lock);
1667			SYNCREQDONE(hio);
1668			mtx_unlock(&sync_lock);
1669			cv_signal(&sync_cond);
1670			continue;
1671		}
1672		if (ggio->gctl_cmd == BIO_WRITE) {
1673			mtx_lock(&res->hr_amp_lock);
1674			if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1675			    ggio->gctl_length)) {
1676				(void)hast_activemap_flush(res);
1677			} else {
1678				mtx_unlock(&res->hr_amp_lock);
1679			}
1680			if (ISMEMSYNCWRITE(hio)) {
1681				if (refcnt_release(&hio->hio_writecount) == 0) {
1682					if (hio->hio_errors[0] == 0)
1683						write_complete(res, hio);
1684				}
1685			}
1686		}
1687		if (refcnt_release(&hio->hio_countdown) > 0)
1688			continue;
1689		pjdlog_debug(2,
1690		    "remote_send: (%p) Moving request to the done queue.",
1691		    hio);
1692		QUEUE_INSERT2(hio, done);
1693	}
1694	/* NOTREACHED */
1695	return (NULL);
1696}
1697
1698/*
1699 * Thread receives answer from secondary node and passes it to ggate_send
1700 * thread.
1701 */
1702static void *
1703remote_recv_thread(void *arg)
1704{
1705	struct hast_resource *res = arg;
1706	struct g_gate_ctl_io *ggio;
1707	struct hio *hio;
1708	struct nv *nv;
1709	unsigned int ncomp;
1710	uint64_t seq;
1711	bool memsyncack;
1712	int error;
1713
1714	/* Remote component is 1 for now. */
1715	ncomp = 1;
1716
1717	for (;;) {
1718		/* Wait until there is anything to receive. */
1719		mtx_lock(&hio_recv_list_lock[ncomp]);
1720		while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1721			pjdlog_debug(2, "remote_recv: No requests, waiting.");
1722			cv_wait(&hio_recv_list_cond[ncomp],
1723			    &hio_recv_list_lock[ncomp]);
1724		}
1725		mtx_unlock(&hio_recv_list_lock[ncomp]);
1726
1727		memsyncack = false;
1728
1729		rw_rlock(&hio_remote_lock[ncomp]);
1730		if (!ISCONNECTED(res, ncomp)) {
1731			rw_unlock(&hio_remote_lock[ncomp]);
1732			/*
1733			 * Connection is dead, so move all pending requests to
1734			 * the done queue (one-by-one).
1735			 */
1736			mtx_lock(&hio_recv_list_lock[ncomp]);
1737			hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1738			PJDLOG_ASSERT(hio != NULL);
1739			TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1740			    hio_next[ncomp]);
1741			hio_recv_list_size[ncomp]--;
1742			mtx_unlock(&hio_recv_list_lock[ncomp]);
1743			hio->hio_errors[ncomp] = ENOTCONN;
1744			goto done_queue;
1745		}
1746		if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
1747			pjdlog_errno(LOG_ERR,
1748			    "Unable to receive reply header");
1749			rw_unlock(&hio_remote_lock[ncomp]);
1750			remote_close(res, ncomp);
1751			continue;
1752		}
1753		rw_unlock(&hio_remote_lock[ncomp]);
1754		seq = nv_get_uint64(nv, "seq");
1755		if (seq == 0) {
1756			pjdlog_error("Header contains no 'seq' field.");
1757			nv_free(nv);
1758			continue;
1759		}
1760		memsyncack = nv_exists(nv, "received");
1761		mtx_lock(&hio_recv_list_lock[ncomp]);
1762		TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1763			if (hio->hio_ggio.gctl_seq == seq) {
1764				TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1765				    hio_next[ncomp]);
1766				hio_recv_list_size[ncomp]--;
1767				break;
1768			}
1769		}
1770		mtx_unlock(&hio_recv_list_lock[ncomp]);
1771		if (hio == NULL) {
1772			pjdlog_error("Found no request matching received 'seq' field (%ju).",
1773			    (uintmax_t)seq);
1774			nv_free(nv);
1775			continue;
1776		}
1777		ggio = &hio->hio_ggio;
1778		error = nv_get_int16(nv, "error");
1779		if (error != 0) {
1780			/* Request failed on remote side. */
1781			hio->hio_errors[ncomp] = error;
1782			reqlog(LOG_WARNING, 0, ggio,
1783			    "Remote request failed (%s): ", strerror(error));
1784			nv_free(nv);
1785			goto done_queue;
1786		}
1787		switch (ggio->gctl_cmd) {
1788		case BIO_READ:
1789			rw_rlock(&hio_remote_lock[ncomp]);
1790			if (!ISCONNECTED(res, ncomp)) {
1791				rw_unlock(&hio_remote_lock[ncomp]);
1792				nv_free(nv);
1793				goto done_queue;
1794			}
1795			if (hast_proto_recv_data(res, res->hr_remotein, nv,
1796			    ggio->gctl_data, ggio->gctl_length) == -1) {
1797				hio->hio_errors[ncomp] = errno;
1798				pjdlog_errno(LOG_ERR,
1799				    "Unable to receive reply data");
1800				rw_unlock(&hio_remote_lock[ncomp]);
1801				nv_free(nv);
1802				remote_close(res, ncomp);
1803				goto done_queue;
1804			}
1805			rw_unlock(&hio_remote_lock[ncomp]);
1806			break;
1807		case BIO_WRITE:
1808		case BIO_DELETE:
1809		case BIO_FLUSH:
1810			break;
1811		default:
1812			PJDLOG_ABORT("invalid condition");
1813		}
1814		hio->hio_errors[ncomp] = 0;
1815		nv_free(nv);
1816done_queue:
1817		if (ISMEMSYNCWRITE(hio)) {
1818			if (!hio->hio_memsyncacked) {
1819				PJDLOG_ASSERT(memsyncack ||
1820				    hio->hio_errors[ncomp] != 0);
1821				/* Remote ack arrived. */
1822				if (refcnt_release(&hio->hio_writecount) == 0) {
1823					if (hio->hio_errors[0] == 0)
1824						write_complete(res, hio);
1825				}
1826				hio->hio_memsyncacked = true;
1827				if (hio->hio_errors[ncomp] == 0) {
1828					pjdlog_debug(2,
1829					    "remote_recv: (%p) Moving request "
1830					    "back to the recv queue.", hio);
1831					mtx_lock(&hio_recv_list_lock[ncomp]);
1832					TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
1833					    hio, hio_next[ncomp]);
1834					hio_recv_list_size[ncomp]++;
1835					mtx_unlock(&hio_recv_list_lock[ncomp]);
1836					continue;
1837				}
1838			} else {
1839				PJDLOG_ASSERT(!memsyncack);
1840				/* Remote final reply arrived. */
1841			}
1842		}
1843		if (refcnt_release(&hio->hio_countdown) > 0)
1844			continue;
1845		if (ISSYNCREQ(hio)) {
1846			mtx_lock(&sync_lock);
1847			SYNCREQDONE(hio);
1848			mtx_unlock(&sync_lock);
1849			cv_signal(&sync_cond);
1850		} else {
1851			pjdlog_debug(2,
1852			    "remote_recv: (%p) Moving request to the done queue.",
1853			    hio);
1854			QUEUE_INSERT2(hio, done);
1855		}
1856	}
1857	/* NOTREACHED */
1858	return (NULL);
1859}
1860
1861/*
1862 * Thread sends answer to the kernel.
1863 */
1864static void *
1865ggate_send_thread(void *arg)
1866{
1867	struct hast_resource *res = arg;
1868	struct g_gate_ctl_io *ggio;
1869	struct hio *hio;
1870	unsigned int ii, ncomps;
1871
1872	ncomps = HAST_NCOMPONENTS;
1873
1874	for (;;) {
1875		pjdlog_debug(2, "ggate_send: Taking request.");
1876		QUEUE_TAKE2(hio, done);
1877		pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1878		ggio = &hio->hio_ggio;
1879		for (ii = 0; ii < ncomps; ii++) {
1880			if (hio->hio_errors[ii] == 0) {
1881				/*
1882				 * One successful request is enough to declare
1883				 * success.
1884				 */
1885				ggio->gctl_error = 0;
1886				break;
1887			}
1888		}
1889		if (ii == ncomps) {
1890			/*
1891			 * None of the requests were successful.
1892			 * Use the error from local component except the
1893			 * case when we did only remote request.
1894			 */
1895			if (ggio->gctl_cmd == BIO_READ &&
1896			    res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
1897				ggio->gctl_error = hio->hio_errors[1];
1898			else
1899				ggio->gctl_error = hio->hio_errors[0];
1900		}
1901		if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1902			mtx_lock(&res->hr_amp_lock);
1903			if (activemap_write_complete(res->hr_amp,
1904			    ggio->gctl_offset, ggio->gctl_length)) {
1905				res->hr_stat_activemap_update++;
1906				(void)hast_activemap_flush(res);
1907			} else {
1908				mtx_unlock(&res->hr_amp_lock);
1909			}
1910		}
1911		if (ggio->gctl_cmd == BIO_WRITE) {
1912			/*
1913			 * Unlock range we locked.
1914			 */
1915			mtx_lock(&range_lock);
1916			rangelock_del(range_regular, ggio->gctl_offset,
1917			    ggio->gctl_length);
1918			if (range_sync_wait)
1919				cv_signal(&range_sync_cond);
1920			mtx_unlock(&range_lock);
1921			if (!hio->hio_done)
1922				write_complete(res, hio);
1923		} else {
1924			if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) {
1925				primary_exit(EX_OSERR,
1926				    "G_GATE_CMD_DONE failed");
1927			}
1928		}
1929		if (hio->hio_errors[0]) {
1930			switch (ggio->gctl_cmd) {
1931			case BIO_READ:
1932				res->hr_stat_read_error++;
1933				break;
1934			case BIO_WRITE:
1935				res->hr_stat_write_error++;
1936				break;
1937			case BIO_DELETE:
1938				res->hr_stat_delete_error++;
1939				break;
1940			case BIO_FLUSH:
1941				res->hr_stat_flush_error++;
1942				break;
1943			}
1944		}
1945		pjdlog_debug(2,
1946		    "ggate_send: (%p) Moving request to the free queue.", hio);
1947		QUEUE_INSERT2(hio, free);
1948	}
1949	/* NOTREACHED */
1950	return (NULL);
1951}
1952
1953/*
1954 * Thread synchronize local and remote components.
1955 */
1956static void *
1957sync_thread(void *arg __unused)
1958{
1959	struct hast_resource *res = arg;
1960	struct hio *hio;
1961	struct g_gate_ctl_io *ggio;
1962	struct timeval tstart, tend, tdiff;
1963	unsigned int ii, ncomp, ncomps;
1964	off_t offset, length, synced;
1965	bool dorewind, directreads;
1966	int syncext;
1967
1968	ncomps = HAST_NCOMPONENTS;
1969	dorewind = true;
1970	synced = 0;
1971	offset = -1;
1972	directreads = false;
1973
1974	for (;;) {
1975		mtx_lock(&sync_lock);
1976		if (offset >= 0 && !sync_inprogress) {
1977			gettimeofday(&tend, NULL);
1978			timersub(&tend, &tstart, &tdiff);
1979			pjdlog_info("Synchronization interrupted after %#.0T. "
1980			    "%NB synchronized so far.", &tdiff,
1981			    (intmax_t)synced);
1982			event_send(res, EVENT_SYNCINTR);
1983		}
1984		while (!sync_inprogress) {
1985			dorewind = true;
1986			synced = 0;
1987			cv_wait(&sync_cond, &sync_lock);
1988		}
1989		mtx_unlock(&sync_lock);
1990		/*
1991		 * Obtain offset at which we should synchronize.
1992		 * Rewind synchronization if needed.
1993		 */
1994		mtx_lock(&res->hr_amp_lock);
1995		if (dorewind)
1996			activemap_sync_rewind(res->hr_amp);
1997		offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1998		if (syncext != -1) {
1999			/*
2000			 * We synchronized entire syncext extent, we can mark
2001			 * it as clean now.
2002			 */
2003			if (activemap_extent_complete(res->hr_amp, syncext))
2004				(void)hast_activemap_flush(res);
2005			else
2006				mtx_unlock(&res->hr_amp_lock);
2007		} else {
2008			mtx_unlock(&res->hr_amp_lock);
2009		}
2010		if (dorewind) {
2011			dorewind = false;
2012			if (offset == -1)
2013				pjdlog_info("Nodes are in sync.");
2014			else {
2015				pjdlog_info("Synchronization started. %NB to go.",
2016				    (intmax_t)(res->hr_extentsize *
2017				    activemap_ndirty(res->hr_amp)));
2018				event_send(res, EVENT_SYNCSTART);
2019				gettimeofday(&tstart, NULL);
2020			}
2021		}
2022		if (offset == -1) {
2023			sync_stop();
2024			pjdlog_debug(1, "Nothing to synchronize.");
2025			/*
2026			 * Synchronization complete, make both localcnt and
2027			 * remotecnt equal.
2028			 */
2029			ncomp = 1;
2030			rw_rlock(&hio_remote_lock[ncomp]);
2031			if (ISCONNECTED(res, ncomp)) {
2032				if (synced > 0) {
2033					int64_t bps;
2034
2035					gettimeofday(&tend, NULL);
2036					timersub(&tend, &tstart, &tdiff);
2037					bps = (int64_t)((double)synced /
2038					    ((double)tdiff.tv_sec +
2039					    (double)tdiff.tv_usec / 1000000));
2040					pjdlog_info("Synchronization complete. "
2041					    "%NB synchronized in %#.0lT (%NB/sec).",
2042					    (intmax_t)synced, &tdiff,
2043					    (intmax_t)bps);
2044					event_send(res, EVENT_SYNCDONE);
2045				}
2046				mtx_lock(&metadata_lock);
2047				if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY)
2048					directreads = true;
2049				res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
2050				res->hr_primary_localcnt =
2051				    res->hr_secondary_remotecnt;
2052				res->hr_primary_remotecnt =
2053				    res->hr_secondary_localcnt;
2054				pjdlog_debug(1,
2055				    "Setting localcnt to %ju and remotecnt to %ju.",
2056				    (uintmax_t)res->hr_primary_localcnt,
2057				    (uintmax_t)res->hr_primary_remotecnt);
2058				(void)metadata_write(res);
2059				mtx_unlock(&metadata_lock);
2060			}
2061			rw_unlock(&hio_remote_lock[ncomp]);
2062			if (directreads) {
2063				directreads = false;
2064				enable_direct_reads(res);
2065			}
2066			continue;
2067		}
2068		pjdlog_debug(2, "sync: Taking free request.");
2069		QUEUE_TAKE2(hio, free);
2070		pjdlog_debug(2, "sync: (%p) Got free request.", hio);
2071		/*
2072		 * Lock the range we are going to synchronize. We don't want
2073		 * race where someone writes between our read and write.
2074		 */
2075		for (;;) {
2076			mtx_lock(&range_lock);
2077			if (rangelock_islocked(range_regular, offset, length)) {
2078				pjdlog_debug(2,
2079				    "sync: Range offset=%jd length=%jd locked.",
2080				    (intmax_t)offset, (intmax_t)length);
2081				range_sync_wait = true;
2082				cv_wait(&range_sync_cond, &range_lock);
2083				range_sync_wait = false;
2084				mtx_unlock(&range_lock);
2085				continue;
2086			}
2087			if (rangelock_add(range_sync, offset, length) == -1) {
2088				mtx_unlock(&range_lock);
2089				pjdlog_debug(2,
2090				    "sync: Range offset=%jd length=%jd is already locked, waiting.",
2091				    (intmax_t)offset, (intmax_t)length);
2092				sleep(1);
2093				continue;
2094			}
2095			mtx_unlock(&range_lock);
2096			break;
2097		}
2098		/*
2099		 * First read the data from synchronization source.
2100		 */
2101		SYNCREQ(hio);
2102		ggio = &hio->hio_ggio;
2103		ggio->gctl_cmd = BIO_READ;
2104		ggio->gctl_offset = offset;
2105		ggio->gctl_length = length;
2106		ggio->gctl_error = 0;
2107		hio->hio_done = false;
2108		hio->hio_replication = res->hr_replication;
2109		for (ii = 0; ii < ncomps; ii++)
2110			hio->hio_errors[ii] = EINVAL;
2111		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
2112		    hio);
2113		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2114		    hio);
2115		mtx_lock(&metadata_lock);
2116		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
2117			/*
2118			 * This range is up-to-date on local component,
2119			 * so handle request locally.
2120			 */
2121			 /* Local component is 0 for now. */
2122			ncomp = 0;
2123		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
2124			PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
2125			/*
2126			 * This range is out-of-date on local component,
2127			 * so send request to the remote node.
2128			 */
2129			 /* Remote component is 1 for now. */
2130			ncomp = 1;
2131		}
2132		mtx_unlock(&metadata_lock);
2133		refcnt_init(&hio->hio_countdown, 1);
2134		QUEUE_INSERT1(hio, send, ncomp);
2135
2136		/*
2137		 * Let's wait for READ to finish.
2138		 */
2139		mtx_lock(&sync_lock);
2140		while (!ISSYNCREQDONE(hio))
2141			cv_wait(&sync_cond, &sync_lock);
2142		mtx_unlock(&sync_lock);
2143
2144		if (hio->hio_errors[ncomp] != 0) {
2145			pjdlog_error("Unable to read synchronization data: %s.",
2146			    strerror(hio->hio_errors[ncomp]));
2147			goto free_queue;
2148		}
2149
2150		/*
2151		 * We read the data from synchronization source, now write it
2152		 * to synchronization target.
2153		 */
2154		SYNCREQ(hio);
2155		ggio->gctl_cmd = BIO_WRITE;
2156		for (ii = 0; ii < ncomps; ii++)
2157			hio->hio_errors[ii] = EINVAL;
2158		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
2159		    hio);
2160		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2161		    hio);
2162		mtx_lock(&metadata_lock);
2163		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
2164			/*
2165			 * This range is up-to-date on local component,
2166			 * so we update remote component.
2167			 */
2168			 /* Remote component is 1 for now. */
2169			ncomp = 1;
2170		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
2171			PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
2172			/*
2173			 * This range is out-of-date on local component,
2174			 * so we update it.
2175			 */
2176			 /* Local component is 0 for now. */
2177			ncomp = 0;
2178		}
2179		mtx_unlock(&metadata_lock);
2180
2181		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
2182		    hio);
2183		refcnt_init(&hio->hio_countdown, 1);
2184		QUEUE_INSERT1(hio, send, ncomp);
2185
2186		/*
2187		 * Let's wait for WRITE to finish.
2188		 */
2189		mtx_lock(&sync_lock);
2190		while (!ISSYNCREQDONE(hio))
2191			cv_wait(&sync_cond, &sync_lock);
2192		mtx_unlock(&sync_lock);
2193
2194		if (hio->hio_errors[ncomp] != 0) {
2195			pjdlog_error("Unable to write synchronization data: %s.",
2196			    strerror(hio->hio_errors[ncomp]));
2197			goto free_queue;
2198		}
2199
2200		synced += length;
2201free_queue:
2202		mtx_lock(&range_lock);
2203		rangelock_del(range_sync, offset, length);
2204		if (range_regular_wait)
2205			cv_signal(&range_regular_cond);
2206		mtx_unlock(&range_lock);
2207		pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
2208		    hio);
2209		QUEUE_INSERT2(hio, free);
2210	}
2211	/* NOTREACHED */
2212	return (NULL);
2213}
2214
2215void
2216primary_config_reload(struct hast_resource *res, struct nv *nv)
2217{
2218	unsigned int ii, ncomps;
2219	int modified, vint;
2220	const char *vstr;
2221
2222	pjdlog_info("Reloading configuration...");
2223
2224	PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY);
2225	PJDLOG_ASSERT(gres == res);
2226	nv_assert(nv, "remoteaddr");
2227	nv_assert(nv, "sourceaddr");
2228	nv_assert(nv, "replication");
2229	nv_assert(nv, "checksum");
2230	nv_assert(nv, "compression");
2231	nv_assert(nv, "timeout");
2232	nv_assert(nv, "exec");
2233	nv_assert(nv, "metaflush");
2234
2235	ncomps = HAST_NCOMPONENTS;
2236
2237#define MODIFIED_REMOTEADDR	0x01
2238#define MODIFIED_SOURCEADDR	0x02
2239#define MODIFIED_REPLICATION	0x04
2240#define MODIFIED_CHECKSUM	0x08
2241#define MODIFIED_COMPRESSION	0x10
2242#define MODIFIED_TIMEOUT	0x20
2243#define MODIFIED_EXEC		0x40
2244#define MODIFIED_METAFLUSH	0x80
2245	modified = 0;
2246
2247	vstr = nv_get_string(nv, "remoteaddr");
2248	if (strcmp(gres->hr_remoteaddr, vstr) != 0) {
2249		/*
2250		 * Don't copy res->hr_remoteaddr to gres just yet.
2251		 * We want remote_close() to log disconnect from the old
2252		 * addresses, not from the new ones.
2253		 */
2254		modified |= MODIFIED_REMOTEADDR;
2255	}
2256	vstr = nv_get_string(nv, "sourceaddr");
2257	if (strcmp(gres->hr_sourceaddr, vstr) != 0) {
2258		strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr));
2259		modified |= MODIFIED_SOURCEADDR;
2260	}
2261	vint = nv_get_int32(nv, "replication");
2262	if (gres->hr_replication != vint) {
2263		gres->hr_replication = vint;
2264		modified |= MODIFIED_REPLICATION;
2265	}
2266	vint = nv_get_int32(nv, "checksum");
2267	if (gres->hr_checksum != vint) {
2268		gres->hr_checksum = vint;
2269		modified |= MODIFIED_CHECKSUM;
2270	}
2271	vint = nv_get_int32(nv, "compression");
2272	if (gres->hr_compression != vint) {
2273		gres->hr_compression = vint;
2274		modified |= MODIFIED_COMPRESSION;
2275	}
2276	vint = nv_get_int32(nv, "timeout");
2277	if (gres->hr_timeout != vint) {
2278		gres->hr_timeout = vint;
2279		modified |= MODIFIED_TIMEOUT;
2280	}
2281	vstr = nv_get_string(nv, "exec");
2282	if (strcmp(gres->hr_exec, vstr) != 0) {
2283		strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec));
2284		modified |= MODIFIED_EXEC;
2285	}
2286	vint = nv_get_int32(nv, "metaflush");
2287	if (gres->hr_metaflush != vint) {
2288		gres->hr_metaflush = vint;
2289		modified |= MODIFIED_METAFLUSH;
2290	}
2291
2292	/*
2293	 * Change timeout for connected sockets.
2294	 * Don't bother if we need to reconnect.
2295	 */
2296	if ((modified & MODIFIED_TIMEOUT) != 0 &&
2297	    (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
2298		for (ii = 0; ii < ncomps; ii++) {
2299			if (!ISREMOTE(ii))
2300				continue;
2301			rw_rlock(&hio_remote_lock[ii]);
2302			if (!ISCONNECTED(gres, ii)) {
2303				rw_unlock(&hio_remote_lock[ii]);
2304				continue;
2305			}
2306			rw_unlock(&hio_remote_lock[ii]);
2307			if (proto_timeout(gres->hr_remotein,
2308			    gres->hr_timeout) == -1) {
2309				pjdlog_errno(LOG_WARNING,
2310				    "Unable to set connection timeout");
2311			}
2312			if (proto_timeout(gres->hr_remoteout,
2313			    gres->hr_timeout) == -1) {
2314				pjdlog_errno(LOG_WARNING,
2315				    "Unable to set connection timeout");
2316			}
2317		}
2318	}
2319	if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
2320		for (ii = 0; ii < ncomps; ii++) {
2321			if (!ISREMOTE(ii))
2322				continue;
2323			remote_close(gres, ii);
2324		}
2325		if (modified & MODIFIED_REMOTEADDR) {
2326			vstr = nv_get_string(nv, "remoteaddr");
2327			strlcpy(gres->hr_remoteaddr, vstr,
2328			    sizeof(gres->hr_remoteaddr));
2329		}
2330	}
2331#undef	MODIFIED_REMOTEADDR
2332#undef	MODIFIED_SOURCEADDR
2333#undef	MODIFIED_REPLICATION
2334#undef	MODIFIED_CHECKSUM
2335#undef	MODIFIED_COMPRESSION
2336#undef	MODIFIED_TIMEOUT
2337#undef	MODIFIED_EXEC
2338#undef	MODIFIED_METAFLUSH
2339
2340	pjdlog_info("Configuration reloaded successfully.");
2341}
2342
2343static void
2344guard_one(struct hast_resource *res, unsigned int ncomp)
2345{
2346	struct proto_conn *in, *out;
2347
2348	if (!ISREMOTE(ncomp))
2349		return;
2350
2351	rw_rlock(&hio_remote_lock[ncomp]);
2352
2353	if (!real_remote(res)) {
2354		rw_unlock(&hio_remote_lock[ncomp]);
2355		return;
2356	}
2357
2358	if (ISCONNECTED(res, ncomp)) {
2359		PJDLOG_ASSERT(res->hr_remotein != NULL);
2360		PJDLOG_ASSERT(res->hr_remoteout != NULL);
2361		rw_unlock(&hio_remote_lock[ncomp]);
2362		pjdlog_debug(2, "remote_guard: Connection to %s is ok.",
2363		    res->hr_remoteaddr);
2364		return;
2365	}
2366
2367	PJDLOG_ASSERT(res->hr_remotein == NULL);
2368	PJDLOG_ASSERT(res->hr_remoteout == NULL);
2369	/*
2370	 * Upgrade the lock. It doesn't have to be atomic as no other thread
2371	 * can change connection status from disconnected to connected.
2372	 */
2373	rw_unlock(&hio_remote_lock[ncomp]);
2374	pjdlog_debug(2, "remote_guard: Reconnecting to %s.",
2375	    res->hr_remoteaddr);
2376	in = out = NULL;
2377	if (init_remote(res, &in, &out) == 0) {
2378		rw_wlock(&hio_remote_lock[ncomp]);
2379		PJDLOG_ASSERT(res->hr_remotein == NULL);
2380		PJDLOG_ASSERT(res->hr_remoteout == NULL);
2381		PJDLOG_ASSERT(in != NULL && out != NULL);
2382		res->hr_remotein = in;
2383		res->hr_remoteout = out;
2384		rw_unlock(&hio_remote_lock[ncomp]);
2385		pjdlog_info("Successfully reconnected to %s.",
2386		    res->hr_remoteaddr);
2387		sync_start();
2388	} else {
2389		/* Both connections should be NULL. */
2390		PJDLOG_ASSERT(res->hr_remotein == NULL);
2391		PJDLOG_ASSERT(res->hr_remoteout == NULL);
2392		PJDLOG_ASSERT(in == NULL && out == NULL);
2393		pjdlog_debug(2, "remote_guard: Reconnect to %s failed.",
2394		    res->hr_remoteaddr);
2395	}
2396}
2397
2398/*
2399 * Thread guards remote connections and reconnects when needed, handles
2400 * signals, etc.
2401 */
2402static void *
2403guard_thread(void *arg)
2404{
2405	struct hast_resource *res = arg;
2406	unsigned int ii, ncomps;
2407	struct timespec timeout;
2408	time_t lastcheck, now;
2409	sigset_t mask;
2410	int signo;
2411
2412	ncomps = HAST_NCOMPONENTS;
2413	lastcheck = time(NULL);
2414
2415	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
2416	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
2417	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
2418
2419	timeout.tv_sec = HAST_KEEPALIVE;
2420	timeout.tv_nsec = 0;
2421	signo = -1;
2422
2423	for (;;) {
2424		switch (signo) {
2425		case SIGINT:
2426		case SIGTERM:
2427			sigexit_received = true;
2428			primary_exitx(EX_OK,
2429			    "Termination signal received, exiting.");
2430			break;
2431		default:
2432			break;
2433		}
2434
2435		/*
2436		 * Don't check connections until we fully started,
2437		 * as we may still be looping, waiting for remote node
2438		 * to switch from primary to secondary.
2439		 */
2440		if (fullystarted) {
2441			pjdlog_debug(2, "remote_guard: Checking connections.");
2442			now = time(NULL);
2443			if (lastcheck + HAST_KEEPALIVE <= now) {
2444				for (ii = 0; ii < ncomps; ii++)
2445					guard_one(res, ii);
2446				lastcheck = now;
2447			}
2448		}
2449		signo = sigtimedwait(&mask, NULL, &timeout);
2450	}
2451	/* NOTREACHED */
2452	return (NULL);
2453}
2454