primary.c revision 210880
1/*-
2 * Copyright (c) 2009 The FreeBSD Foundation
3 * All rights reserved.
4 *
5 * This software was developed by Pawel Jakub Dawidek under sponsorship from
6 * the FreeBSD Foundation.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 */
29
30#include <sys/cdefs.h>
31__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 210880 2010-08-05 18:58:00Z pjd $");
32
33#include <sys/types.h>
34#include <sys/time.h>
35#include <sys/bio.h>
36#include <sys/disk.h>
37#include <sys/refcount.h>
38#include <sys/stat.h>
39
40#include <geom/gate/g_gate.h>
41
42#include <assert.h>
43#include <err.h>
44#include <errno.h>
45#include <fcntl.h>
46#include <libgeom.h>
47#include <pthread.h>
48#include <stdint.h>
49#include <stdio.h>
50#include <string.h>
51#include <sysexits.h>
52#include <unistd.h>
53
54#include <activemap.h>
55#include <nv.h>
56#include <rangelock.h>
57
58#include "control.h"
59#include "hast.h"
60#include "hast_proto.h"
61#include "hastd.h"
62#include "metadata.h"
63#include "proto.h"
64#include "pjdlog.h"
65#include "subr.h"
66#include "synch.h"
67
68struct hio {
69	/*
70	 * Number of components we are still waiting for.
71	 * When this field goes to 0, we can send the request back to the
72	 * kernel. Each component has to decrease this counter by one
73	 * even on failure.
74	 */
75	unsigned int		 hio_countdown;
76	/*
77	 * Each component has a place to store its own error.
78	 * Once the request is handled by all components we can decide if the
79	 * request overall is successful or not.
80	 */
81	int			*hio_errors;
82	/*
83	 * Structure used to comunicate with GEOM Gate class.
84	 */
85	struct g_gate_ctl_io	 hio_ggio;
86	TAILQ_ENTRY(hio)	*hio_next;
87};
88#define	hio_free_next	hio_next[0]
89#define	hio_done_next	hio_next[0]
90
91/*
92 * Free list holds unused structures. When free list is empty, we have to wait
93 * until some in-progress requests are freed.
94 */
95static TAILQ_HEAD(, hio) hio_free_list;
96static pthread_mutex_t hio_free_list_lock;
97static pthread_cond_t hio_free_list_cond;
98/*
99 * There is one send list for every component. One requests is placed on all
100 * send lists - each component gets the same request, but each component is
101 * responsible for managing his own send list.
102 */
103static TAILQ_HEAD(, hio) *hio_send_list;
104static pthread_mutex_t *hio_send_list_lock;
105static pthread_cond_t *hio_send_list_cond;
106/*
107 * There is one recv list for every component, although local components don't
108 * use recv lists as local requests are done synchronously.
109 */
110static TAILQ_HEAD(, hio) *hio_recv_list;
111static pthread_mutex_t *hio_recv_list_lock;
112static pthread_cond_t *hio_recv_list_cond;
113/*
114 * Request is placed on done list by the slowest component (the one that
115 * decreased hio_countdown from 1 to 0).
116 */
117static TAILQ_HEAD(, hio) hio_done_list;
118static pthread_mutex_t hio_done_list_lock;
119static pthread_cond_t hio_done_list_cond;
120/*
121 * Structure below are for interaction with sync thread.
122 */
123static bool sync_inprogress;
124static pthread_mutex_t sync_lock;
125static pthread_cond_t sync_cond;
126/*
127 * The lock below allows to synchornize access to remote connections.
128 */
129static pthread_rwlock_t *hio_remote_lock;
130static pthread_mutex_t hio_guard_lock;
131static pthread_cond_t hio_guard_cond;
132
133/*
134 * Lock to synchronize metadata updates. Also synchronize access to
135 * hr_primary_localcnt and hr_primary_remotecnt fields.
136 */
137static pthread_mutex_t metadata_lock;
138
139/*
140 * Maximum number of outstanding I/O requests.
141 */
142#define	HAST_HIO_MAX	256
143/*
144 * Number of components. At this point there are only two components: local
145 * and remote, but in the future it might be possible to use multiple local
146 * and remote components.
147 */
148#define	HAST_NCOMPONENTS	2
149/*
150 * Number of seconds to sleep before next reconnect try.
151 */
152#define	RECONNECT_SLEEP		5
153
154#define	ISCONNECTED(res, no)	\
155	((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
156
157#define	QUEUE_INSERT1(hio, name, ncomp)	do {				\
158	bool _wakeup;							\
159									\
160	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
161	_wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]);		\
162	TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio),		\
163	    hio_next[(ncomp)]);						\
164	mtx_unlock(&hio_##name##_list_lock[ncomp]);			\
165	if (_wakeup)							\
166		cv_signal(&hio_##name##_list_cond[(ncomp)]);		\
167} while (0)
168#define	QUEUE_INSERT2(hio, name)	do {				\
169	bool _wakeup;							\
170									\
171	mtx_lock(&hio_##name##_list_lock);				\
172	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
173	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\
174	mtx_unlock(&hio_##name##_list_lock);				\
175	if (_wakeup)							\
176		cv_signal(&hio_##name##_list_cond);			\
177} while (0)
178#define	QUEUE_TAKE1(hio, name, ncomp)	do {				\
179	mtx_lock(&hio_##name##_list_lock[(ncomp)]);			\
180	while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \
181		cv_wait(&hio_##name##_list_cond[(ncomp)],		\
182		    &hio_##name##_list_lock[(ncomp)]);			\
183	}								\
184	TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio),		\
185	    hio_next[(ncomp)]);						\
186	mtx_unlock(&hio_##name##_list_lock[(ncomp)]);			\
187} while (0)
188#define	QUEUE_TAKE2(hio, name)	do {					\
189	mtx_lock(&hio_##name##_list_lock);				\
190	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
191		cv_wait(&hio_##name##_list_cond,			\
192		    &hio_##name##_list_lock);				\
193	}								\
194	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next);	\
195	mtx_unlock(&hio_##name##_list_lock);				\
196} while (0)
197
198#define	SYNCREQ(hio)		do {					\
199	(hio)->hio_ggio.gctl_unit = -1;					\
200	(hio)->hio_ggio.gctl_seq = 1;					\
201} while (0)
202#define	ISSYNCREQ(hio)		((hio)->hio_ggio.gctl_unit == -1)
203#define	SYNCREQDONE(hio)	do { (hio)->hio_ggio.gctl_unit = -2; } while (0)
204#define	ISSYNCREQDONE(hio)	((hio)->hio_ggio.gctl_unit == -2)
205
206static struct hast_resource *gres;
207
208static pthread_mutex_t range_lock;
209static struct rangelocks *range_regular;
210static bool range_regular_wait;
211static pthread_cond_t range_regular_cond;
212static struct rangelocks *range_sync;
213static bool range_sync_wait;
214static pthread_cond_t range_sync_cond;
215
216static void *ggate_recv_thread(void *arg);
217static void *local_send_thread(void *arg);
218static void *remote_send_thread(void *arg);
219static void *remote_recv_thread(void *arg);
220static void *ggate_send_thread(void *arg);
221static void *sync_thread(void *arg);
222static void *guard_thread(void *arg);
223
224static void sighandler(int sig);
225
226static void
227cleanup(struct hast_resource *res)
228{
229	int rerrno;
230
231	/* Remember errno. */
232	rerrno = errno;
233
234	/*
235	 * Close descriptor to /dev/hast/<name>
236	 * to work-around race in the kernel.
237	 */
238	close(res->hr_localfd);
239
240	/* Destroy ggate provider if we created one. */
241	if (res->hr_ggateunit >= 0) {
242		struct g_gate_ctl_destroy ggiod;
243
244		ggiod.gctl_version = G_GATE_VERSION;
245		ggiod.gctl_unit = res->hr_ggateunit;
246		ggiod.gctl_force = 1;
247		if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) {
248			pjdlog_warning("Unable to destroy hast/%s device",
249			    res->hr_provname);
250		}
251		res->hr_ggateunit = -1;
252	}
253
254	/* Restore errno. */
255	errno = rerrno;
256}
257
258static void
259primary_exit(int exitcode, const char *fmt, ...)
260{
261	va_list ap;
262
263	assert(exitcode != EX_OK);
264	va_start(ap, fmt);
265	pjdlogv_errno(LOG_ERR, fmt, ap);
266	va_end(ap);
267	cleanup(gres);
268	exit(exitcode);
269}
270
271static void
272primary_exitx(int exitcode, const char *fmt, ...)
273{
274	va_list ap;
275
276	va_start(ap, fmt);
277	pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap);
278	va_end(ap);
279	cleanup(gres);
280	exit(exitcode);
281}
282
283static int
284hast_activemap_flush(struct hast_resource *res)
285{
286	const unsigned char *buf;
287	size_t size;
288
289	buf = activemap_bitmap(res->hr_amp, &size);
290	assert(buf != NULL);
291	assert((size % res->hr_local_sectorsize) == 0);
292	if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) !=
293	    (ssize_t)size) {
294		KEEP_ERRNO(pjdlog_errno(LOG_ERR,
295		    "Unable to flush activemap to disk"));
296		return (-1);
297	}
298	return (0);
299}
300
301static void
302init_environment(struct hast_resource *res __unused)
303{
304	struct hio *hio;
305	unsigned int ii, ncomps;
306
307	/*
308	 * In the future it might be per-resource value.
309	 */
310	ncomps = HAST_NCOMPONENTS;
311
312	/*
313	 * Allocate memory needed by lists.
314	 */
315	hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps);
316	if (hio_send_list == NULL) {
317		primary_exitx(EX_TEMPFAIL,
318		    "Unable to allocate %zu bytes of memory for send lists.",
319		    sizeof(hio_send_list[0]) * ncomps);
320	}
321	hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps);
322	if (hio_send_list_lock == NULL) {
323		primary_exitx(EX_TEMPFAIL,
324		    "Unable to allocate %zu bytes of memory for send list locks.",
325		    sizeof(hio_send_list_lock[0]) * ncomps);
326	}
327	hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps);
328	if (hio_send_list_cond == NULL) {
329		primary_exitx(EX_TEMPFAIL,
330		    "Unable to allocate %zu bytes of memory for send list condition variables.",
331		    sizeof(hio_send_list_cond[0]) * ncomps);
332	}
333	hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps);
334	if (hio_recv_list == NULL) {
335		primary_exitx(EX_TEMPFAIL,
336		    "Unable to allocate %zu bytes of memory for recv lists.",
337		    sizeof(hio_recv_list[0]) * ncomps);
338	}
339	hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps);
340	if (hio_recv_list_lock == NULL) {
341		primary_exitx(EX_TEMPFAIL,
342		    "Unable to allocate %zu bytes of memory for recv list locks.",
343		    sizeof(hio_recv_list_lock[0]) * ncomps);
344	}
345	hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps);
346	if (hio_recv_list_cond == NULL) {
347		primary_exitx(EX_TEMPFAIL,
348		    "Unable to allocate %zu bytes of memory for recv list condition variables.",
349		    sizeof(hio_recv_list_cond[0]) * ncomps);
350	}
351	hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps);
352	if (hio_remote_lock == NULL) {
353		primary_exitx(EX_TEMPFAIL,
354		    "Unable to allocate %zu bytes of memory for remote connections locks.",
355		    sizeof(hio_remote_lock[0]) * ncomps);
356	}
357
358	/*
359	 * Initialize lists, their locks and theirs condition variables.
360	 */
361	TAILQ_INIT(&hio_free_list);
362	mtx_init(&hio_free_list_lock);
363	cv_init(&hio_free_list_cond);
364	for (ii = 0; ii < HAST_NCOMPONENTS; ii++) {
365		TAILQ_INIT(&hio_send_list[ii]);
366		mtx_init(&hio_send_list_lock[ii]);
367		cv_init(&hio_send_list_cond[ii]);
368		TAILQ_INIT(&hio_recv_list[ii]);
369		mtx_init(&hio_recv_list_lock[ii]);
370		cv_init(&hio_recv_list_cond[ii]);
371		rw_init(&hio_remote_lock[ii]);
372	}
373	TAILQ_INIT(&hio_done_list);
374	mtx_init(&hio_done_list_lock);
375	cv_init(&hio_done_list_cond);
376	mtx_init(&hio_guard_lock);
377	cv_init(&hio_guard_cond);
378	mtx_init(&metadata_lock);
379
380	/*
381	 * Allocate requests pool and initialize requests.
382	 */
383	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
384		hio = malloc(sizeof(*hio));
385		if (hio == NULL) {
386			primary_exitx(EX_TEMPFAIL,
387			    "Unable to allocate %zu bytes of memory for hio request.",
388			    sizeof(*hio));
389		}
390		hio->hio_countdown = 0;
391		hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps);
392		if (hio->hio_errors == NULL) {
393			primary_exitx(EX_TEMPFAIL,
394			    "Unable allocate %zu bytes of memory for hio errors.",
395			    sizeof(hio->hio_errors[0]) * ncomps);
396		}
397		hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps);
398		if (hio->hio_next == NULL) {
399			primary_exitx(EX_TEMPFAIL,
400			    "Unable allocate %zu bytes of memory for hio_next field.",
401			    sizeof(hio->hio_next[0]) * ncomps);
402		}
403		hio->hio_ggio.gctl_version = G_GATE_VERSION;
404		hio->hio_ggio.gctl_data = malloc(MAXPHYS);
405		if (hio->hio_ggio.gctl_data == NULL) {
406			primary_exitx(EX_TEMPFAIL,
407			    "Unable to allocate %zu bytes of memory for gctl_data.",
408			    MAXPHYS);
409		}
410		hio->hio_ggio.gctl_length = MAXPHYS;
411		hio->hio_ggio.gctl_error = 0;
412		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next);
413	}
414
415	/*
416	 * Turn on signals handling.
417	 */
418	signal(SIGINT, sighandler);
419	signal(SIGTERM, sighandler);
420}
421
422static void
423init_local(struct hast_resource *res)
424{
425	unsigned char *buf;
426	size_t mapsize;
427
428	if (metadata_read(res, true) < 0)
429		exit(EX_NOINPUT);
430	mtx_init(&res->hr_amp_lock);
431	if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize,
432	    res->hr_local_sectorsize, res->hr_keepdirty) < 0) {
433		primary_exit(EX_TEMPFAIL, "Unable to create activemap");
434	}
435	mtx_init(&range_lock);
436	cv_init(&range_regular_cond);
437	if (rangelock_init(&range_regular) < 0)
438		primary_exit(EX_TEMPFAIL, "Unable to create regular range lock");
439	cv_init(&range_sync_cond);
440	if (rangelock_init(&range_sync) < 0)
441		primary_exit(EX_TEMPFAIL, "Unable to create sync range lock");
442	mapsize = activemap_ondisk_size(res->hr_amp);
443	buf = calloc(1, mapsize);
444	if (buf == NULL) {
445		primary_exitx(EX_TEMPFAIL,
446		    "Unable to allocate buffer for activemap.");
447	}
448	if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) !=
449	    (ssize_t)mapsize) {
450		primary_exit(EX_NOINPUT, "Unable to read activemap");
451	}
452	activemap_copyin(res->hr_amp, buf, mapsize);
453	free(buf);
454	if (res->hr_resuid != 0)
455		return;
456	/*
457	 * We're using provider for the first time, so we have to generate
458	 * resource unique identifier and initialize local and remote counts.
459	 */
460	arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid));
461	res->hr_primary_localcnt = 1;
462	res->hr_primary_remotecnt = 0;
463	if (metadata_write(res) < 0)
464		exit(EX_NOINPUT);
465}
466
467static bool
468init_remote(struct hast_resource *res, struct proto_conn **inp,
469    struct proto_conn **outp)
470{
471	struct proto_conn *in, *out;
472	struct nv *nvout, *nvin;
473	const unsigned char *token;
474	unsigned char *map;
475	const char *errmsg;
476	int32_t extentsize;
477	int64_t datasize;
478	uint32_t mapsize;
479	size_t size;
480
481	assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL));
482
483	in = out = NULL;
484
485	/* Prepare outgoing connection with remote node. */
486	if (proto_client(res->hr_remoteaddr, &out) < 0) {
487		primary_exit(EX_TEMPFAIL, "Unable to create connection to %s",
488		    res->hr_remoteaddr);
489	}
490	/* Try to connect, but accept failure. */
491	if (proto_connect(out) < 0) {
492		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
493		    res->hr_remoteaddr);
494		goto close;
495	}
496	/* Error in setting timeout is not critical, but why should it fail? */
497	if (proto_timeout(out, res->hr_timeout) < 0)
498		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
499	/*
500	 * First handshake step.
501	 * Setup outgoing connection with remote node.
502	 */
503	nvout = nv_alloc();
504	nv_add_string(nvout, res->hr_name, "resource");
505	if (nv_error(nvout) != 0) {
506		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
507		    "Unable to allocate header for connection with %s",
508		    res->hr_remoteaddr);
509		nv_free(nvout);
510		goto close;
511	}
512	if (hast_proto_send(res, out, nvout, NULL, 0) < 0) {
513		pjdlog_errno(LOG_WARNING,
514		    "Unable to send handshake header to %s",
515		    res->hr_remoteaddr);
516		nv_free(nvout);
517		goto close;
518	}
519	nv_free(nvout);
520	if (hast_proto_recv_hdr(out, &nvin) < 0) {
521		pjdlog_errno(LOG_WARNING,
522		    "Unable to receive handshake header from %s",
523		    res->hr_remoteaddr);
524		goto close;
525	}
526	errmsg = nv_get_string(nvin, "errmsg");
527	if (errmsg != NULL) {
528		pjdlog_warning("%s", errmsg);
529		nv_free(nvin);
530		goto close;
531	}
532	token = nv_get_uint8_array(nvin, &size, "token");
533	if (token == NULL) {
534		pjdlog_warning("Handshake header from %s has no 'token' field.",
535		    res->hr_remoteaddr);
536		nv_free(nvin);
537		goto close;
538	}
539	if (size != sizeof(res->hr_token)) {
540		pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).",
541		    res->hr_remoteaddr, size, sizeof(res->hr_token));
542		nv_free(nvin);
543		goto close;
544	}
545	bcopy(token, res->hr_token, sizeof(res->hr_token));
546	nv_free(nvin);
547
548	/*
549	 * Second handshake step.
550	 * Setup incoming connection with remote node.
551	 */
552	if (proto_client(res->hr_remoteaddr, &in) < 0) {
553		pjdlog_errno(LOG_WARNING, "Unable to create connection to %s",
554		    res->hr_remoteaddr);
555	}
556	/* Try to connect, but accept failure. */
557	if (proto_connect(in) < 0) {
558		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
559		    res->hr_remoteaddr);
560		goto close;
561	}
562	/* Error in setting timeout is not critical, but why should it fail? */
563	if (proto_timeout(in, res->hr_timeout) < 0)
564		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
565	nvout = nv_alloc();
566	nv_add_string(nvout, res->hr_name, "resource");
567	nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token),
568	    "token");
569	nv_add_uint64(nvout, res->hr_resuid, "resuid");
570	nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt");
571	nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt");
572	if (nv_error(nvout) != 0) {
573		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
574		    "Unable to allocate header for connection with %s",
575		    res->hr_remoteaddr);
576		nv_free(nvout);
577		goto close;
578	}
579	if (hast_proto_send(res, in, nvout, NULL, 0) < 0) {
580		pjdlog_errno(LOG_WARNING,
581		    "Unable to send handshake header to %s",
582		    res->hr_remoteaddr);
583		nv_free(nvout);
584		goto close;
585	}
586	nv_free(nvout);
587	if (hast_proto_recv_hdr(out, &nvin) < 0) {
588		pjdlog_errno(LOG_WARNING,
589		    "Unable to receive handshake header from %s",
590		    res->hr_remoteaddr);
591		goto close;
592	}
593	errmsg = nv_get_string(nvin, "errmsg");
594	if (errmsg != NULL) {
595		pjdlog_warning("%s", errmsg);
596		nv_free(nvin);
597		goto close;
598	}
599	datasize = nv_get_int64(nvin, "datasize");
600	if (datasize != res->hr_datasize) {
601		pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).",
602		    (intmax_t)res->hr_datasize, (intmax_t)datasize);
603		nv_free(nvin);
604		goto close;
605	}
606	extentsize = nv_get_int32(nvin, "extentsize");
607	if (extentsize != res->hr_extentsize) {
608		pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).",
609		    (ssize_t)res->hr_extentsize, (ssize_t)extentsize);
610		nv_free(nvin);
611		goto close;
612	}
613	res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt");
614	res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt");
615	res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc");
616	map = NULL;
617	mapsize = nv_get_uint32(nvin, "mapsize");
618	if (mapsize > 0) {
619		map = malloc(mapsize);
620		if (map == NULL) {
621			pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).",
622			    (uintmax_t)mapsize);
623			nv_free(nvin);
624			goto close;
625		}
626		/*
627		 * Remote node have some dirty extents on its own, lets
628		 * download its activemap.
629		 */
630		if (hast_proto_recv_data(res, out, nvin, map,
631		    mapsize) < 0) {
632			pjdlog_errno(LOG_ERR,
633			    "Unable to receive remote activemap");
634			nv_free(nvin);
635			free(map);
636			goto close;
637		}
638		/*
639		 * Merge local and remote bitmaps.
640		 */
641		activemap_merge(res->hr_amp, map, mapsize);
642		free(map);
643		/*
644		 * Now that we merged bitmaps from both nodes, flush it to the
645		 * disk before we start to synchronize.
646		 */
647		(void)hast_activemap_flush(res);
648	}
649	pjdlog_info("Connected to %s.", res->hr_remoteaddr);
650	if (inp != NULL && outp != NULL) {
651		*inp = in;
652		*outp = out;
653	} else {
654		res->hr_remotein = in;
655		res->hr_remoteout = out;
656	}
657	return (true);
658close:
659	proto_close(out);
660	if (in != NULL)
661		proto_close(in);
662	return (false);
663}
664
665static void
666sync_start(void)
667{
668
669	mtx_lock(&sync_lock);
670	sync_inprogress = true;
671	mtx_unlock(&sync_lock);
672	cv_signal(&sync_cond);
673}
674
675static void
676init_ggate(struct hast_resource *res)
677{
678	struct g_gate_ctl_create ggiocreate;
679	struct g_gate_ctl_cancel ggiocancel;
680
681	/*
682	 * We communicate with ggate via /dev/ggctl. Open it.
683	 */
684	res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR);
685	if (res->hr_ggatefd < 0)
686		primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME);
687	/*
688	 * Create provider before trying to connect, as connection failure
689	 * is not critical, but may take some time.
690	 */
691	ggiocreate.gctl_version = G_GATE_VERSION;
692	ggiocreate.gctl_mediasize = res->hr_datasize;
693	ggiocreate.gctl_sectorsize = res->hr_local_sectorsize;
694	ggiocreate.gctl_flags = 0;
695	ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE;
696	ggiocreate.gctl_timeout = 0;
697	ggiocreate.gctl_unit = G_GATE_NAME_GIVEN;
698	snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s",
699	    res->hr_provname);
700	bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info));
701	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) {
702		pjdlog_info("Device hast/%s created.", res->hr_provname);
703		res->hr_ggateunit = ggiocreate.gctl_unit;
704		return;
705	}
706	if (errno != EEXIST) {
707		primary_exit(EX_OSERR, "Unable to create hast/%s device",
708		    res->hr_provname);
709	}
710	pjdlog_debug(1,
711	    "Device hast/%s already exists, we will try to take it over.",
712	    res->hr_provname);
713	/*
714	 * If we received EEXIST, we assume that the process who created the
715	 * provider died and didn't clean up. In that case we will start from
716	 * where he left of.
717	 */
718	ggiocancel.gctl_version = G_GATE_VERSION;
719	ggiocancel.gctl_unit = G_GATE_NAME_GIVEN;
720	snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s",
721	    res->hr_provname);
722	if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) {
723		pjdlog_info("Device hast/%s recovered.", res->hr_provname);
724		res->hr_ggateunit = ggiocancel.gctl_unit;
725		return;
726	}
727	primary_exit(EX_OSERR, "Unable to take over hast/%s device",
728	    res->hr_provname);
729}
730
731void
732hastd_primary(struct hast_resource *res)
733{
734	pthread_t td;
735	pid_t pid;
736	int error;
737
738	gres = res;
739
740	/*
741	 * Create communication channel between parent and child.
742	 */
743	if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
744		KEEP_ERRNO((void)pidfile_remove(pfh));
745		primary_exit(EX_OSERR,
746		    "Unable to create control sockets between parent and child");
747	}
748
749	pid = fork();
750	if (pid < 0) {
751		KEEP_ERRNO((void)pidfile_remove(pfh));
752		primary_exit(EX_TEMPFAIL, "Unable to fork");
753	}
754
755	if (pid > 0) {
756		/* This is parent. */
757		res->hr_workerpid = pid;
758		return;
759	}
760	(void)pidfile_close(pfh);
761
762	setproctitle("%s (primary)", res->hr_name);
763
764	signal(SIGHUP, SIG_DFL);
765	signal(SIGCHLD, SIG_DFL);
766
767	init_local(res);
768	if (init_remote(res, NULL, NULL))
769		sync_start();
770	init_ggate(res);
771	init_environment(res);
772	error = pthread_create(&td, NULL, ggate_recv_thread, res);
773	assert(error == 0);
774	error = pthread_create(&td, NULL, local_send_thread, res);
775	assert(error == 0);
776	error = pthread_create(&td, NULL, remote_send_thread, res);
777	assert(error == 0);
778	error = pthread_create(&td, NULL, remote_recv_thread, res);
779	assert(error == 0);
780	error = pthread_create(&td, NULL, ggate_send_thread, res);
781	assert(error == 0);
782	error = pthread_create(&td, NULL, sync_thread, res);
783	assert(error == 0);
784	error = pthread_create(&td, NULL, ctrl_thread, res);
785	assert(error == 0);
786	(void)guard_thread(res);
787}
788
789static void
790reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
791{
792	char msg[1024];
793	va_list ap;
794	int len;
795
796	va_start(ap, fmt);
797	len = vsnprintf(msg, sizeof(msg), fmt, ap);
798	va_end(ap);
799	if ((size_t)len < sizeof(msg)) {
800		switch (ggio->gctl_cmd) {
801		case BIO_READ:
802			(void)snprintf(msg + len, sizeof(msg) - len,
803			    "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
804			    (uintmax_t)ggio->gctl_length);
805			break;
806		case BIO_DELETE:
807			(void)snprintf(msg + len, sizeof(msg) - len,
808			    "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
809			    (uintmax_t)ggio->gctl_length);
810			break;
811		case BIO_FLUSH:
812			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
813			break;
814		case BIO_WRITE:
815			(void)snprintf(msg + len, sizeof(msg) - len,
816			    "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset,
817			    (uintmax_t)ggio->gctl_length);
818			break;
819		default:
820			(void)snprintf(msg + len, sizeof(msg) - len,
821			    "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd);
822			break;
823		}
824	}
825	pjdlog_common(loglevel, debuglevel, -1, "%s", msg);
826}
827
828static void
829remote_close(struct hast_resource *res, int ncomp)
830{
831
832	rw_wlock(&hio_remote_lock[ncomp]);
833	/*
834	 * A race is possible between dropping rlock and acquiring wlock -
835	 * another thread can close connection in-between.
836	 */
837	if (!ISCONNECTED(res, ncomp)) {
838		assert(res->hr_remotein == NULL);
839		assert(res->hr_remoteout == NULL);
840		rw_unlock(&hio_remote_lock[ncomp]);
841		return;
842	}
843
844	assert(res->hr_remotein != NULL);
845	assert(res->hr_remoteout != NULL);
846
847	pjdlog_debug(2, "Closing old incoming connection to %s.",
848	    res->hr_remoteaddr);
849	proto_close(res->hr_remotein);
850	res->hr_remotein = NULL;
851	pjdlog_debug(2, "Closing old outgoing connection to %s.",
852	    res->hr_remoteaddr);
853	proto_close(res->hr_remoteout);
854	res->hr_remoteout = NULL;
855
856	rw_unlock(&hio_remote_lock[ncomp]);
857
858	/*
859	 * Stop synchronization if in-progress.
860	 */
861	mtx_lock(&sync_lock);
862	if (sync_inprogress)
863		sync_inprogress = false;
864	mtx_unlock(&sync_lock);
865
866	/*
867	 * Wake up guard thread, so it can immediately start reconnect.
868	 */
869	mtx_lock(&hio_guard_lock);
870	cv_signal(&hio_guard_cond);
871	mtx_unlock(&hio_guard_lock);
872}
873
874/*
875 * Thread receives ggate I/O requests from the kernel and passes them to
876 * appropriate threads:
877 * WRITE - always goes to both local_send and remote_send threads
878 * READ (when the block is up-to-date on local component) -
879 *	only local_send thread
880 * READ (when the block isn't up-to-date on local component) -
881 *	only remote_send thread
882 * DELETE - always goes to both local_send and remote_send threads
883 * FLUSH - always goes to both local_send and remote_send threads
884 */
885static void *
886ggate_recv_thread(void *arg)
887{
888	struct hast_resource *res = arg;
889	struct g_gate_ctl_io *ggio;
890	struct hio *hio;
891	unsigned int ii, ncomp, ncomps;
892	int error;
893
894	ncomps = HAST_NCOMPONENTS;
895
896	for (;;) {
897		pjdlog_debug(2, "ggate_recv: Taking free request.");
898		QUEUE_TAKE2(hio, free);
899		pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio);
900		ggio = &hio->hio_ggio;
901		ggio->gctl_unit = res->hr_ggateunit;
902		ggio->gctl_length = MAXPHYS;
903		ggio->gctl_error = 0;
904		pjdlog_debug(2,
905		    "ggate_recv: (%p) Waiting for request from the kernel.",
906		    hio);
907		if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) {
908			if (sigexit_received)
909				pthread_exit(NULL);
910			primary_exit(EX_OSERR, "G_GATE_CMD_START failed");
911		}
912		error = ggio->gctl_error;
913		switch (error) {
914		case 0:
915			break;
916		case ECANCELED:
917			/* Exit gracefully. */
918			if (!sigexit_received) {
919				pjdlog_debug(2,
920				    "ggate_recv: (%p) Received cancel from the kernel.",
921				    hio);
922				pjdlog_info("Received cancel from the kernel, exiting.");
923			}
924			pthread_exit(NULL);
925		case ENOMEM:
926			/*
927			 * Buffer too small? Impossible, we allocate MAXPHYS
928			 * bytes - request can't be bigger than that.
929			 */
930			/* FALLTHROUGH */
931		case ENXIO:
932		default:
933			primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
934			    strerror(error));
935		}
936		for (ii = 0; ii < ncomps; ii++)
937			hio->hio_errors[ii] = EINVAL;
938		reqlog(LOG_DEBUG, 2, ggio,
939		    "ggate_recv: (%p) Request received from the kernel: ",
940		    hio);
941		/*
942		 * Inform all components about new write request.
943		 * For read request prefer local component unless the given
944		 * range is out-of-date, then use remote component.
945		 */
946		switch (ggio->gctl_cmd) {
947		case BIO_READ:
948			pjdlog_debug(2,
949			    "ggate_recv: (%p) Moving request to the send queue.",
950			    hio);
951			refcount_init(&hio->hio_countdown, 1);
952			mtx_lock(&metadata_lock);
953			if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
954			    res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
955				/*
956				 * This range is up-to-date on local component,
957				 * so handle request locally.
958				 */
959				 /* Local component is 0 for now. */
960				ncomp = 0;
961			} else /* if (res->hr_syncsrc ==
962			    HAST_SYNCSRC_SECONDARY) */ {
963				assert(res->hr_syncsrc ==
964				    HAST_SYNCSRC_SECONDARY);
965				/*
966				 * This range is out-of-date on local component,
967				 * so send request to the remote node.
968				 */
969				 /* Remote component is 1 for now. */
970				ncomp = 1;
971			}
972			mtx_unlock(&metadata_lock);
973			QUEUE_INSERT1(hio, send, ncomp);
974			break;
975		case BIO_WRITE:
976			for (;;) {
977				mtx_lock(&range_lock);
978				if (rangelock_islocked(range_sync,
979				    ggio->gctl_offset, ggio->gctl_length)) {
980					pjdlog_debug(2,
981					    "regular: Range offset=%jd length=%zu locked.",
982					    (intmax_t)ggio->gctl_offset,
983					    (size_t)ggio->gctl_length);
984					range_regular_wait = true;
985					cv_wait(&range_regular_cond, &range_lock);
986					range_regular_wait = false;
987					mtx_unlock(&range_lock);
988					continue;
989				}
990				if (rangelock_add(range_regular,
991				    ggio->gctl_offset, ggio->gctl_length) < 0) {
992					mtx_unlock(&range_lock);
993					pjdlog_debug(2,
994					    "regular: Range offset=%jd length=%zu is already locked, waiting.",
995					    (intmax_t)ggio->gctl_offset,
996					    (size_t)ggio->gctl_length);
997					sleep(1);
998					continue;
999				}
1000				mtx_unlock(&range_lock);
1001				break;
1002			}
1003			mtx_lock(&res->hr_amp_lock);
1004			if (activemap_write_start(res->hr_amp,
1005			    ggio->gctl_offset, ggio->gctl_length)) {
1006				(void)hast_activemap_flush(res);
1007			}
1008			mtx_unlock(&res->hr_amp_lock);
1009			/* FALLTHROUGH */
1010		case BIO_DELETE:
1011		case BIO_FLUSH:
1012			pjdlog_debug(2,
1013			    "ggate_recv: (%p) Moving request to the send queues.",
1014			    hio);
1015			refcount_init(&hio->hio_countdown, ncomps);
1016			for (ii = 0; ii < ncomps; ii++)
1017				QUEUE_INSERT1(hio, send, ii);
1018			break;
1019		}
1020	}
1021	/* NOTREACHED */
1022	return (NULL);
1023}
1024
1025/*
1026 * Thread reads from or writes to local component.
1027 * If local read fails, it redirects it to remote_send thread.
1028 */
1029static void *
1030local_send_thread(void *arg)
1031{
1032	struct hast_resource *res = arg;
1033	struct g_gate_ctl_io *ggio;
1034	struct hio *hio;
1035	unsigned int ncomp, rncomp;
1036	ssize_t ret;
1037
1038	/* Local component is 0 for now. */
1039	ncomp = 0;
1040	/* Remote component is 1 for now. */
1041	rncomp = 1;
1042
1043	for (;;) {
1044		pjdlog_debug(2, "local_send: Taking request.");
1045		QUEUE_TAKE1(hio, send, ncomp);
1046		pjdlog_debug(2, "local_send: (%p) Got request.", hio);
1047		ggio = &hio->hio_ggio;
1048		switch (ggio->gctl_cmd) {
1049		case BIO_READ:
1050			ret = pread(res->hr_localfd, ggio->gctl_data,
1051			    ggio->gctl_length,
1052			    ggio->gctl_offset + res->hr_localoff);
1053			if (ret == ggio->gctl_length)
1054				hio->hio_errors[ncomp] = 0;
1055			else {
1056				/*
1057				 * If READ failed, try to read from remote node.
1058				 */
1059				QUEUE_INSERT1(hio, send, rncomp);
1060				continue;
1061			}
1062			break;
1063		case BIO_WRITE:
1064			ret = pwrite(res->hr_localfd, ggio->gctl_data,
1065			    ggio->gctl_length,
1066			    ggio->gctl_offset + res->hr_localoff);
1067			if (ret < 0)
1068				hio->hio_errors[ncomp] = errno;
1069			else if (ret != ggio->gctl_length)
1070				hio->hio_errors[ncomp] = EIO;
1071			else
1072				hio->hio_errors[ncomp] = 0;
1073			break;
1074		case BIO_DELETE:
1075			ret = g_delete(res->hr_localfd,
1076			    ggio->gctl_offset + res->hr_localoff,
1077			    ggio->gctl_length);
1078			if (ret < 0)
1079				hio->hio_errors[ncomp] = errno;
1080			else
1081				hio->hio_errors[ncomp] = 0;
1082			break;
1083		case BIO_FLUSH:
1084			ret = g_flush(res->hr_localfd);
1085			if (ret < 0)
1086				hio->hio_errors[ncomp] = errno;
1087			else
1088				hio->hio_errors[ncomp] = 0;
1089			break;
1090		}
1091		if (refcount_release(&hio->hio_countdown)) {
1092			if (ISSYNCREQ(hio)) {
1093				mtx_lock(&sync_lock);
1094				SYNCREQDONE(hio);
1095				mtx_unlock(&sync_lock);
1096				cv_signal(&sync_cond);
1097			} else {
1098				pjdlog_debug(2,
1099				    "local_send: (%p) Moving request to the done queue.",
1100				    hio);
1101				QUEUE_INSERT2(hio, done);
1102			}
1103		}
1104	}
1105	/* NOTREACHED */
1106	return (NULL);
1107}
1108
1109/*
1110 * Thread sends request to secondary node.
1111 */
1112static void *
1113remote_send_thread(void *arg)
1114{
1115	struct hast_resource *res = arg;
1116	struct g_gate_ctl_io *ggio;
1117	struct hio *hio;
1118	struct nv *nv;
1119	unsigned int ncomp;
1120	bool wakeup;
1121	uint64_t offset, length;
1122	uint8_t cmd;
1123	void *data;
1124
1125	/* Remote component is 1 for now. */
1126	ncomp = 1;
1127
1128	for (;;) {
1129		pjdlog_debug(2, "remote_send: Taking request.");
1130		QUEUE_TAKE1(hio, send, ncomp);
1131		pjdlog_debug(2, "remote_send: (%p) Got request.", hio);
1132		ggio = &hio->hio_ggio;
1133		switch (ggio->gctl_cmd) {
1134		case BIO_READ:
1135			cmd = HIO_READ;
1136			data = NULL;
1137			offset = ggio->gctl_offset;
1138			length = ggio->gctl_length;
1139			break;
1140		case BIO_WRITE:
1141			cmd = HIO_WRITE;
1142			data = ggio->gctl_data;
1143			offset = ggio->gctl_offset;
1144			length = ggio->gctl_length;
1145			break;
1146		case BIO_DELETE:
1147			cmd = HIO_DELETE;
1148			data = NULL;
1149			offset = ggio->gctl_offset;
1150			length = ggio->gctl_length;
1151			break;
1152		case BIO_FLUSH:
1153			cmd = HIO_FLUSH;
1154			data = NULL;
1155			offset = 0;
1156			length = 0;
1157			break;
1158		default:
1159			assert(!"invalid condition");
1160			abort();
1161		}
1162		nv = nv_alloc();
1163		nv_add_uint8(nv, cmd, "cmd");
1164		nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
1165		nv_add_uint64(nv, offset, "offset");
1166		nv_add_uint64(nv, length, "length");
1167		if (nv_error(nv) != 0) {
1168			hio->hio_errors[ncomp] = nv_error(nv);
1169			pjdlog_debug(2,
1170			    "remote_send: (%p) Unable to prepare header to send.",
1171			    hio);
1172			reqlog(LOG_ERR, 0, ggio,
1173			    "Unable to prepare header to send (%s): ",
1174			    strerror(nv_error(nv)));
1175			/* Move failed request immediately to the done queue. */
1176			goto done_queue;
1177		}
1178		pjdlog_debug(2,
1179		    "remote_send: (%p) Moving request to the recv queue.",
1180		    hio);
1181		/*
1182		 * Protect connection from disappearing.
1183		 */
1184		rw_rlock(&hio_remote_lock[ncomp]);
1185		if (!ISCONNECTED(res, ncomp)) {
1186			rw_unlock(&hio_remote_lock[ncomp]);
1187			hio->hio_errors[ncomp] = ENOTCONN;
1188			goto done_queue;
1189		}
1190		/*
1191		 * Move the request to recv queue before sending it, because
1192		 * in different order we can get reply before we move request
1193		 * to recv queue.
1194		 */
1195		mtx_lock(&hio_recv_list_lock[ncomp]);
1196		wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]);
1197		TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1198		mtx_unlock(&hio_recv_list_lock[ncomp]);
1199		if (hast_proto_send(res, res->hr_remoteout, nv, data,
1200		    data != NULL ? length : 0) < 0) {
1201			hio->hio_errors[ncomp] = errno;
1202			rw_unlock(&hio_remote_lock[ncomp]);
1203			remote_close(res, ncomp);
1204			pjdlog_debug(2,
1205			    "remote_send: (%p) Unable to send request.", hio);
1206			reqlog(LOG_ERR, 0, ggio,
1207			    "Unable to send request (%s): ",
1208			    strerror(hio->hio_errors[ncomp]));
1209			/*
1210			 * Take request back from the receive queue and move
1211			 * it immediately to the done queue.
1212			 */
1213			mtx_lock(&hio_recv_list_lock[ncomp]);
1214			TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]);
1215			mtx_unlock(&hio_recv_list_lock[ncomp]);
1216			goto done_queue;
1217		}
1218		rw_unlock(&hio_remote_lock[ncomp]);
1219		nv_free(nv);
1220		if (wakeup)
1221			cv_signal(&hio_recv_list_cond[ncomp]);
1222		continue;
1223done_queue:
1224		nv_free(nv);
1225		if (ISSYNCREQ(hio)) {
1226			if (!refcount_release(&hio->hio_countdown))
1227				continue;
1228			mtx_lock(&sync_lock);
1229			SYNCREQDONE(hio);
1230			mtx_unlock(&sync_lock);
1231			cv_signal(&sync_cond);
1232			continue;
1233		}
1234		if (ggio->gctl_cmd == BIO_WRITE) {
1235			mtx_lock(&res->hr_amp_lock);
1236			if (activemap_need_sync(res->hr_amp, ggio->gctl_offset,
1237			    ggio->gctl_length)) {
1238				(void)hast_activemap_flush(res);
1239			}
1240			mtx_unlock(&res->hr_amp_lock);
1241		}
1242		if (!refcount_release(&hio->hio_countdown))
1243			continue;
1244		pjdlog_debug(2,
1245		    "remote_send: (%p) Moving request to the done queue.",
1246		    hio);
1247		QUEUE_INSERT2(hio, done);
1248	}
1249	/* NOTREACHED */
1250	return (NULL);
1251}
1252
1253/*
1254 * Thread receives answer from secondary node and passes it to ggate_send
1255 * thread.
1256 */
1257static void *
1258remote_recv_thread(void *arg)
1259{
1260	struct hast_resource *res = arg;
1261	struct g_gate_ctl_io *ggio;
1262	struct hio *hio;
1263	struct nv *nv;
1264	unsigned int ncomp;
1265	uint64_t seq;
1266	int error;
1267
1268	/* Remote component is 1 for now. */
1269	ncomp = 1;
1270
1271	for (;;) {
1272		/* Wait until there is anything to receive. */
1273		mtx_lock(&hio_recv_list_lock[ncomp]);
1274		while (TAILQ_EMPTY(&hio_recv_list[ncomp])) {
1275			pjdlog_debug(2, "remote_recv: No requests, waiting.");
1276			cv_wait(&hio_recv_list_cond[ncomp],
1277			    &hio_recv_list_lock[ncomp]);
1278		}
1279		mtx_unlock(&hio_recv_list_lock[ncomp]);
1280		rw_rlock(&hio_remote_lock[ncomp]);
1281		if (!ISCONNECTED(res, ncomp)) {
1282			rw_unlock(&hio_remote_lock[ncomp]);
1283			/*
1284			 * Connection is dead, so move all pending requests to
1285			 * the done queue (one-by-one).
1286			 */
1287			mtx_lock(&hio_recv_list_lock[ncomp]);
1288			hio = TAILQ_FIRST(&hio_recv_list[ncomp]);
1289			assert(hio != NULL);
1290			TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1291			    hio_next[ncomp]);
1292			mtx_unlock(&hio_recv_list_lock[ncomp]);
1293			goto done_queue;
1294		}
1295		if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) {
1296			pjdlog_errno(LOG_ERR,
1297			    "Unable to receive reply header");
1298			rw_unlock(&hio_remote_lock[ncomp]);
1299			remote_close(res, ncomp);
1300			continue;
1301		}
1302		rw_unlock(&hio_remote_lock[ncomp]);
1303		seq = nv_get_uint64(nv, "seq");
1304		if (seq == 0) {
1305			pjdlog_error("Header contains no 'seq' field.");
1306			nv_free(nv);
1307			continue;
1308		}
1309		mtx_lock(&hio_recv_list_lock[ncomp]);
1310		TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
1311			if (hio->hio_ggio.gctl_seq == seq) {
1312				TAILQ_REMOVE(&hio_recv_list[ncomp], hio,
1313				    hio_next[ncomp]);
1314				break;
1315			}
1316		}
1317		mtx_unlock(&hio_recv_list_lock[ncomp]);
1318		if (hio == NULL) {
1319			pjdlog_error("Found no request matching received 'seq' field (%ju).",
1320			    (uintmax_t)seq);
1321			nv_free(nv);
1322			continue;
1323		}
1324		error = nv_get_int16(nv, "error");
1325		if (error != 0) {
1326			/* Request failed on remote side. */
1327			hio->hio_errors[ncomp] = 0;
1328			nv_free(nv);
1329			goto done_queue;
1330		}
1331		ggio = &hio->hio_ggio;
1332		switch (ggio->gctl_cmd) {
1333		case BIO_READ:
1334			rw_rlock(&hio_remote_lock[ncomp]);
1335			if (!ISCONNECTED(res, ncomp)) {
1336				rw_unlock(&hio_remote_lock[ncomp]);
1337				nv_free(nv);
1338				goto done_queue;
1339			}
1340			if (hast_proto_recv_data(res, res->hr_remotein, nv,
1341			    ggio->gctl_data, ggio->gctl_length) < 0) {
1342				hio->hio_errors[ncomp] = errno;
1343				pjdlog_errno(LOG_ERR,
1344				    "Unable to receive reply data");
1345				rw_unlock(&hio_remote_lock[ncomp]);
1346				nv_free(nv);
1347				remote_close(res, ncomp);
1348				goto done_queue;
1349			}
1350			rw_unlock(&hio_remote_lock[ncomp]);
1351			break;
1352		case BIO_WRITE:
1353		case BIO_DELETE:
1354		case BIO_FLUSH:
1355			break;
1356		default:
1357			assert(!"invalid condition");
1358			abort();
1359		}
1360		hio->hio_errors[ncomp] = 0;
1361		nv_free(nv);
1362done_queue:
1363		if (refcount_release(&hio->hio_countdown)) {
1364			if (ISSYNCREQ(hio)) {
1365				mtx_lock(&sync_lock);
1366				SYNCREQDONE(hio);
1367				mtx_unlock(&sync_lock);
1368				cv_signal(&sync_cond);
1369			} else {
1370				pjdlog_debug(2,
1371				    "remote_recv: (%p) Moving request to the done queue.",
1372				    hio);
1373				QUEUE_INSERT2(hio, done);
1374			}
1375		}
1376	}
1377	/* NOTREACHED */
1378	return (NULL);
1379}
1380
1381/*
1382 * Thread sends answer to the kernel.
1383 */
1384static void *
1385ggate_send_thread(void *arg)
1386{
1387	struct hast_resource *res = arg;
1388	struct g_gate_ctl_io *ggio;
1389	struct hio *hio;
1390	unsigned int ii, ncomp, ncomps;
1391
1392	ncomps = HAST_NCOMPONENTS;
1393
1394	for (;;) {
1395		pjdlog_debug(2, "ggate_send: Taking request.");
1396		QUEUE_TAKE2(hio, done);
1397		pjdlog_debug(2, "ggate_send: (%p) Got request.", hio);
1398		ggio = &hio->hio_ggio;
1399		for (ii = 0; ii < ncomps; ii++) {
1400			if (hio->hio_errors[ii] == 0) {
1401				/*
1402				 * One successful request is enough to declare
1403				 * success.
1404				 */
1405				ggio->gctl_error = 0;
1406				break;
1407			}
1408		}
1409		if (ii == ncomps) {
1410			/*
1411			 * None of the requests were successful.
1412			 * Use first error.
1413			 */
1414			ggio->gctl_error = hio->hio_errors[0];
1415		}
1416		if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) {
1417			mtx_lock(&res->hr_amp_lock);
1418			activemap_write_complete(res->hr_amp,
1419			    ggio->gctl_offset, ggio->gctl_length);
1420			mtx_unlock(&res->hr_amp_lock);
1421		}
1422		if (ggio->gctl_cmd == BIO_WRITE) {
1423			/*
1424			 * Unlock range we locked.
1425			 */
1426			mtx_lock(&range_lock);
1427			rangelock_del(range_regular, ggio->gctl_offset,
1428			    ggio->gctl_length);
1429			if (range_sync_wait)
1430				cv_signal(&range_sync_cond);
1431			mtx_unlock(&range_lock);
1432			/*
1433			 * Bump local count if this is first write after
1434			 * connection failure with remote node.
1435			 */
1436			ncomp = 1;
1437			rw_rlock(&hio_remote_lock[ncomp]);
1438			if (!ISCONNECTED(res, ncomp)) {
1439				mtx_lock(&metadata_lock);
1440				if (res->hr_primary_localcnt ==
1441				    res->hr_secondary_remotecnt) {
1442					res->hr_primary_localcnt++;
1443					pjdlog_debug(1,
1444					    "Increasing localcnt to %ju.",
1445					    (uintmax_t)res->hr_primary_localcnt);
1446					(void)metadata_write(res);
1447				}
1448				mtx_unlock(&metadata_lock);
1449			}
1450			rw_unlock(&hio_remote_lock[ncomp]);
1451		}
1452		if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
1453			primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
1454		pjdlog_debug(2,
1455		    "ggate_send: (%p) Moving request to the free queue.", hio);
1456		QUEUE_INSERT2(hio, free);
1457	}
1458	/* NOTREACHED */
1459	return (NULL);
1460}
1461
1462/*
1463 * Thread synchronize local and remote components.
1464 */
1465static void *
1466sync_thread(void *arg __unused)
1467{
1468	struct hast_resource *res = arg;
1469	struct hio *hio;
1470	struct g_gate_ctl_io *ggio;
1471	unsigned int ii, ncomp, ncomps;
1472	off_t offset, length, synced;
1473	bool dorewind;
1474	int syncext;
1475
1476	ncomps = HAST_NCOMPONENTS;
1477	dorewind = true;
1478	synced = 0;
1479
1480	for (;;) {
1481		mtx_lock(&sync_lock);
1482		while (!sync_inprogress) {
1483			dorewind = true;
1484			synced = 0;
1485			cv_wait(&sync_cond, &sync_lock);
1486		}
1487		mtx_unlock(&sync_lock);
1488		/*
1489		 * Obtain offset at which we should synchronize.
1490		 * Rewind synchronization if needed.
1491		 */
1492		mtx_lock(&res->hr_amp_lock);
1493		if (dorewind)
1494			activemap_sync_rewind(res->hr_amp);
1495		offset = activemap_sync_offset(res->hr_amp, &length, &syncext);
1496		if (syncext != -1) {
1497			/*
1498			 * We synchronized entire syncext extent, we can mark
1499			 * it as clean now.
1500			 */
1501			if (activemap_extent_complete(res->hr_amp, syncext))
1502				(void)hast_activemap_flush(res);
1503		}
1504		mtx_unlock(&res->hr_amp_lock);
1505		if (dorewind) {
1506			dorewind = false;
1507			if (offset < 0)
1508				pjdlog_info("Nodes are in sync.");
1509			else {
1510				pjdlog_info("Synchronization started. %ju bytes to go.",
1511				    (uintmax_t)(res->hr_extentsize *
1512				    activemap_ndirty(res->hr_amp)));
1513			}
1514		}
1515		if (offset < 0) {
1516			mtx_lock(&sync_lock);
1517			sync_inprogress = false;
1518			mtx_unlock(&sync_lock);
1519			pjdlog_debug(1, "Nothing to synchronize.");
1520			/*
1521			 * Synchronization complete, make both localcnt and
1522			 * remotecnt equal.
1523			 */
1524			ncomp = 1;
1525			rw_rlock(&hio_remote_lock[ncomp]);
1526			if (ISCONNECTED(res, ncomp)) {
1527				if (synced > 0) {
1528					pjdlog_info("Synchronization complete. "
1529					    "%jd bytes synchronized.",
1530					    (intmax_t)synced);
1531				}
1532				mtx_lock(&metadata_lock);
1533				res->hr_syncsrc = HAST_SYNCSRC_UNDEF;
1534				res->hr_primary_localcnt =
1535				    res->hr_secondary_localcnt;
1536				res->hr_primary_remotecnt =
1537				    res->hr_secondary_remotecnt;
1538				pjdlog_debug(1,
1539				    "Setting localcnt to %ju and remotecnt to %ju.",
1540				    (uintmax_t)res->hr_primary_localcnt,
1541				    (uintmax_t)res->hr_secondary_localcnt);
1542				(void)metadata_write(res);
1543				mtx_unlock(&metadata_lock);
1544			} else if (synced > 0) {
1545				pjdlog_info("Synchronization interrupted. "
1546				    "%jd bytes synchronized so far.",
1547				    (intmax_t)synced);
1548			}
1549			rw_unlock(&hio_remote_lock[ncomp]);
1550			continue;
1551		}
1552		pjdlog_debug(2, "sync: Taking free request.");
1553		QUEUE_TAKE2(hio, free);
1554		pjdlog_debug(2, "sync: (%p) Got free request.", hio);
1555		/*
1556		 * Lock the range we are going to synchronize. We don't want
1557		 * race where someone writes between our read and write.
1558		 */
1559		for (;;) {
1560			mtx_lock(&range_lock);
1561			if (rangelock_islocked(range_regular, offset, length)) {
1562				pjdlog_debug(2,
1563				    "sync: Range offset=%jd length=%jd locked.",
1564				    (intmax_t)offset, (intmax_t)length);
1565				range_sync_wait = true;
1566				cv_wait(&range_sync_cond, &range_lock);
1567				range_sync_wait = false;
1568				mtx_unlock(&range_lock);
1569				continue;
1570			}
1571			if (rangelock_add(range_sync, offset, length) < 0) {
1572				mtx_unlock(&range_lock);
1573				pjdlog_debug(2,
1574				    "sync: Range offset=%jd length=%jd is already locked, waiting.",
1575				    (intmax_t)offset, (intmax_t)length);
1576				sleep(1);
1577				continue;
1578			}
1579			mtx_unlock(&range_lock);
1580			break;
1581		}
1582		/*
1583		 * First read the data from synchronization source.
1584		 */
1585		SYNCREQ(hio);
1586		ggio = &hio->hio_ggio;
1587		ggio->gctl_cmd = BIO_READ;
1588		ggio->gctl_offset = offset;
1589		ggio->gctl_length = length;
1590		ggio->gctl_error = 0;
1591		for (ii = 0; ii < ncomps; ii++)
1592			hio->hio_errors[ii] = EINVAL;
1593		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1594		    hio);
1595		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1596		    hio);
1597		mtx_lock(&metadata_lock);
1598		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1599			/*
1600			 * This range is up-to-date on local component,
1601			 * so handle request locally.
1602			 */
1603			 /* Local component is 0 for now. */
1604			ncomp = 0;
1605		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1606			assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1607			/*
1608			 * This range is out-of-date on local component,
1609			 * so send request to the remote node.
1610			 */
1611			 /* Remote component is 1 for now. */
1612			ncomp = 1;
1613		}
1614		mtx_unlock(&metadata_lock);
1615		refcount_init(&hio->hio_countdown, 1);
1616		QUEUE_INSERT1(hio, send, ncomp);
1617
1618		/*
1619		 * Let's wait for READ to finish.
1620		 */
1621		mtx_lock(&sync_lock);
1622		while (!ISSYNCREQDONE(hio))
1623			cv_wait(&sync_cond, &sync_lock);
1624		mtx_unlock(&sync_lock);
1625
1626		if (hio->hio_errors[ncomp] != 0) {
1627			pjdlog_error("Unable to read synchronization data: %s.",
1628			    strerror(hio->hio_errors[ncomp]));
1629			goto free_queue;
1630		}
1631
1632		/*
1633		 * We read the data from synchronization source, now write it
1634		 * to synchronization target.
1635		 */
1636		SYNCREQ(hio);
1637		ggio->gctl_cmd = BIO_WRITE;
1638		for (ii = 0; ii < ncomps; ii++)
1639			hio->hio_errors[ii] = EINVAL;
1640		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
1641		    hio);
1642		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
1643		    hio);
1644		mtx_lock(&metadata_lock);
1645		if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
1646			/*
1647			 * This range is up-to-date on local component,
1648			 * so we update remote component.
1649			 */
1650			 /* Remote component is 1 for now. */
1651			ncomp = 1;
1652		} else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ {
1653			assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY);
1654			/*
1655			 * This range is out-of-date on local component,
1656			 * so we update it.
1657			 */
1658			 /* Local component is 0 for now. */
1659			ncomp = 0;
1660		}
1661		mtx_unlock(&metadata_lock);
1662
1663		pjdlog_debug(2, "sync: (%p) Moving request to the send queues.",
1664		    hio);
1665		refcount_init(&hio->hio_countdown, 1);
1666		QUEUE_INSERT1(hio, send, ncomp);
1667
1668		/*
1669		 * Let's wait for WRITE to finish.
1670		 */
1671		mtx_lock(&sync_lock);
1672		while (!ISSYNCREQDONE(hio))
1673			cv_wait(&sync_cond, &sync_lock);
1674		mtx_unlock(&sync_lock);
1675
1676		if (hio->hio_errors[ncomp] != 0) {
1677			pjdlog_error("Unable to write synchronization data: %s.",
1678			    strerror(hio->hio_errors[ncomp]));
1679			goto free_queue;
1680		}
1681free_queue:
1682		mtx_lock(&range_lock);
1683		rangelock_del(range_sync, offset, length);
1684		if (range_regular_wait)
1685			cv_signal(&range_regular_cond);
1686		mtx_unlock(&range_lock);
1687
1688		synced += length;
1689
1690		pjdlog_debug(2, "sync: (%p) Moving request to the free queue.",
1691		    hio);
1692		QUEUE_INSERT2(hio, free);
1693	}
1694	/* NOTREACHED */
1695	return (NULL);
1696}
1697
1698static void
1699sighandler(int sig)
1700{
1701	bool unlock;
1702
1703	switch (sig) {
1704	case SIGINT:
1705	case SIGTERM:
1706		sigexit_received = true;
1707		break;
1708	default:
1709		assert(!"invalid condition");
1710	}
1711	/*
1712	 * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't
1713	 * want to risk deadlock.
1714	 */
1715	unlock = mtx_trylock(&hio_guard_lock);
1716	cv_signal(&hio_guard_cond);
1717	if (unlock)
1718		mtx_unlock(&hio_guard_lock);
1719}
1720
1721/*
1722 * Thread guards remote connections and reconnects when needed, handles
1723 * signals, etc.
1724 */
1725static void *
1726guard_thread(void *arg)
1727{
1728	struct hast_resource *res = arg;
1729	struct proto_conn *in, *out;
1730	unsigned int ii, ncomps;
1731	int timeout;
1732
1733	ncomps = HAST_NCOMPONENTS;
1734	/* The is only one remote component for now. */
1735#define	ISREMOTE(no)	((no) == 1)
1736
1737	for (;;) {
1738		if (sigexit_received) {
1739			primary_exitx(EX_OK,
1740			    "Termination signal received, exiting.");
1741		}
1742		/*
1743		 * If all the connection will be fine, we will sleep until
1744		 * someone wakes us up.
1745		 * If any of the connections will be broken and we won't be
1746		 * able to connect, we will sleep only for RECONNECT_SLEEP
1747		 * seconds so we can retry soon.
1748		 */
1749		timeout = 0;
1750		pjdlog_debug(2, "remote_guard: Checking connections.");
1751		mtx_lock(&hio_guard_lock);
1752		for (ii = 0; ii < ncomps; ii++) {
1753			if (!ISREMOTE(ii))
1754				continue;
1755			rw_rlock(&hio_remote_lock[ii]);
1756			if (ISCONNECTED(res, ii)) {
1757				assert(res->hr_remotein != NULL);
1758				assert(res->hr_remoteout != NULL);
1759				rw_unlock(&hio_remote_lock[ii]);
1760				pjdlog_debug(2,
1761				    "remote_guard: Connection to %s is ok.",
1762				    res->hr_remoteaddr);
1763			} else {
1764				assert(res->hr_remotein == NULL);
1765				assert(res->hr_remoteout == NULL);
1766				/*
1767				 * Upgrade the lock. It doesn't have to be
1768				 * atomic as no other thread can change
1769				 * connection status from disconnected to
1770				 * connected.
1771				 */
1772				rw_unlock(&hio_remote_lock[ii]);
1773				pjdlog_debug(2,
1774				    "remote_guard: Reconnecting to %s.",
1775				    res->hr_remoteaddr);
1776				in = out = NULL;
1777				if (init_remote(res, &in, &out)) {
1778					rw_wlock(&hio_remote_lock[ii]);
1779					assert(res->hr_remotein == NULL);
1780					assert(res->hr_remoteout == NULL);
1781					assert(in != NULL && out != NULL);
1782					res->hr_remotein = in;
1783					res->hr_remoteout = out;
1784					rw_unlock(&hio_remote_lock[ii]);
1785					pjdlog_info("Successfully reconnected to %s.",
1786					    res->hr_remoteaddr);
1787					sync_start();
1788				} else {
1789					/* Both connections should be NULL. */
1790					assert(res->hr_remotein == NULL);
1791					assert(res->hr_remoteout == NULL);
1792					assert(in == NULL && out == NULL);
1793					pjdlog_debug(2,
1794					    "remote_guard: Reconnect to %s failed.",
1795					    res->hr_remoteaddr);
1796					timeout = RECONNECT_SLEEP;
1797				}
1798			}
1799		}
1800		(void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout);
1801		mtx_unlock(&hio_guard_lock);
1802	}
1803#undef	ISREMOTE
1804	/* NOTREACHED */
1805	return (NULL);
1806}
1807