secondary.c revision 211977
1121986Sjhb/*-
2121986Sjhb * Copyright (c) 2009-2010 The FreeBSD Foundation
3121986Sjhb * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
4121986Sjhb * All rights reserved.
5121986Sjhb *
6121986Sjhb * This software was developed by Pawel Jakub Dawidek under sponsorship from
7121986Sjhb * the FreeBSD Foundation.
8121986Sjhb *
9121986Sjhb * Redistribution and use in source and binary forms, with or without
10121986Sjhb * modification, are permitted provided that the following conditions
11121986Sjhb * are met:
12121986Sjhb * 1. Redistributions of source code must retain the above copyright
13121986Sjhb *    notice, this list of conditions and the following disclaimer.
14121986Sjhb * 2. Redistributions in binary form must reproduce the above copyright
15121986Sjhb *    notice, this list of conditions and the following disclaimer in the
16121986Sjhb *    documentation and/or other materials provided with the distribution.
17121986Sjhb *
18121986Sjhb * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19121986Sjhb * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20121986Sjhb * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21121986Sjhb * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22121986Sjhb * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23121986Sjhb * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24121986Sjhb * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25121986Sjhb * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26121986Sjhb * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27121986Sjhb * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28121986Sjhb * SUCH DAMAGE.
29121986Sjhb */
30121986Sjhb
31121986Sjhb#include <sys/cdefs.h>
32121986Sjhb__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 211977 2010-08-29 21:41:53Z pjd $");
33121986Sjhb
34121986Sjhb#include <sys/param.h>
35121986Sjhb#include <sys/time.h>
36167240Sjhb#include <sys/bio.h>
37121986Sjhb#include <sys/disk.h>
38167240Sjhb#include <sys/stat.h>
39121986Sjhb
40148538Sjhb#include <assert.h>
41121986Sjhb#include <err.h>
42167240Sjhb#include <errno.h>
43167240Sjhb#include <fcntl.h>
44167240Sjhb#include <libgeom.h>
45121986Sjhb#include <pthread.h>
46121986Sjhb#include <stdint.h>
47121986Sjhb#include <stdio.h>
48214631Sjhb#include <string.h>
49121986Sjhb#include <sysexits.h>
50121986Sjhb#include <unistd.h>
51121986Sjhb
52167747Sjhb#include <activemap.h>
53121986Sjhb#include <nv.h>
54121986Sjhb#include <pjdlog.h>
55121986Sjhb
56121986Sjhb#include "control.h"
57121986Sjhb#include "hast.h"
58121986Sjhb#include "hast_proto.h"
59121986Sjhb#include "hastd.h"
60151979Sjhb#include "hooks.h"
61151979Sjhb#include "metadata.h"
62151979Sjhb#include "proto.h"
63151979Sjhb#include "subr.h"
64121986Sjhb#include "synch.h"
65151897Srwatson
66121986Sjhbstruct hio {
67121986Sjhb	uint64_t 	 hio_seq;
68151979Sjhb	int	 	 hio_error;
69151979Sjhb	struct nv	*hio_nv;
70151979Sjhb	void		*hio_data;
71151979Sjhb	uint8_t		 hio_cmd;
72151979Sjhb	uint64_t	 hio_offset;
73151979Sjhb	uint64_t	 hio_length;
74151979Sjhb	TAILQ_ENTRY(hio) hio_next;
75152461Sandre};
76152461Sandre
77152461Sandre/*
78152461Sandre * Free list holds unused structures. When free list is empty, we have to wait
79121986Sjhb * until some in-progress requests are freed.
80121986Sjhb */
81121986Sjhbstatic TAILQ_HEAD(, hio) hio_free_list;
82121986Sjhbstatic pthread_mutex_t hio_free_list_lock;
83151979Sjhbstatic pthread_cond_t hio_free_list_cond;
84122124Sjhb/*
85122124Sjhb * Disk thread (the one that do I/O requests) takes requests from this list.
86156124Sjhb */
87122124Sjhbstatic TAILQ_HEAD(, hio) hio_disk_list;
88122124Sjhbstatic pthread_mutex_t hio_disk_list_lock;
89122124Sjhbstatic pthread_cond_t hio_disk_list_cond;
90130980Sjhb/*
91157541Sjhb * There is one recv list for every component, although local components don't
92121986Sjhb * use recv lists as local requests are done synchronously.
93121986Sjhb */
94121986Sjhbstatic TAILQ_HEAD(, hio) hio_send_list;
95121986Sjhbstatic pthread_mutex_t hio_send_list_lock;
96121986Sjhbstatic pthread_cond_t hio_send_list_cond;
97121986Sjhb
98121986Sjhb/*
99121986Sjhb * Maximum number of outstanding I/O requests.
100121986Sjhb */
101167747Sjhb#define	HAST_HIO_MAX	256
102121986Sjhb
103121986Sjhbstatic void *recv_thread(void *arg);
104121986Sjhbstatic void *disk_thread(void *arg);
105121986Sjhbstatic void *send_thread(void *arg);
106121986Sjhb
107121986Sjhb#define	QUEUE_INSERT(name, hio)	do {					\
108130980Sjhb	bool _wakeup;							\
109151979Sjhb									\
110121986Sjhb	mtx_lock(&hio_##name##_list_lock);				\
111133017Sscottl	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
112121986Sjhb	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);		\
113121986Sjhb	mtx_unlock(&hio_##name##_list_lock);				\
114169391Sjhb	if (_wakeup)							\
115121986Sjhb		cv_signal(&hio_##name##_list_cond);			\
116121986Sjhb} while (0)
117128931Sjhb#define	QUEUE_TAKE(name, hio)	do {					\
118128931Sjhb	mtx_lock(&hio_##name##_list_lock);				\
119163219Sjhb	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
120195249Sjhb		cv_wait(&hio_##name##_list_cond,			\
121129964Sjhb		    &hio_##name##_list_lock);				\
122121986Sjhb	}								\
123129097Sjhb	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);		\
124121986Sjhb	mtx_unlock(&hio_##name##_list_lock);				\
125121986Sjhb} while (0)
126169391Sjhb
127169391Sjhbstatic void
128156124Sjhbinit_environment(void)
129121986Sjhb{
130156124Sjhb	struct hio *hio;
131156124Sjhb	unsigned int ii;
132156124Sjhb
133248085Smarius	/*
134148538Sjhb	 * Initialize lists, their locks and theirs condition variables.
135148538Sjhb	 */
136148538Sjhb	TAILQ_INIT(&hio_free_list);
137148538Sjhb	mtx_init(&hio_free_list_lock);
138148538Sjhb	cv_init(&hio_free_list_cond);
139133017Sscottl	TAILQ_INIT(&hio_disk_list);
140133017Sscottl	mtx_init(&hio_disk_list_lock);
141133017Sscottl	cv_init(&hio_disk_list_cond);
142133017Sscottl	TAILQ_INIT(&hio_send_list);
143133017Sscottl	mtx_init(&hio_send_list_lock);
144133017Sscottl	cv_init(&hio_send_list_cond);
145121986Sjhb
146121986Sjhb	/*
147121986Sjhb	 * Allocate requests pool and initialize requests.
148121986Sjhb	 */
149121986Sjhb	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
150121986Sjhb		hio = malloc(sizeof(*hio));
151121986Sjhb		if (hio == NULL) {
152121986Sjhb			pjdlog_exitx(EX_TEMPFAIL,
153121986Sjhb			    "Unable to allocate memory (%zu bytes) for hio request.",
154121986Sjhb			    sizeof(*hio));
155121986Sjhb		}
156121986Sjhb		hio->hio_error = 0;
157121986Sjhb		hio->hio_data = malloc(MAXPHYS);
158121986Sjhb		if (hio->hio_data == NULL) {
159121986Sjhb			pjdlog_exitx(EX_TEMPFAIL,
160121986Sjhb			    "Unable to allocate memory (%zu bytes) for gctl_data.",
161121986Sjhb			    (size_t)MAXPHYS);
162121986Sjhb		}
163130980Sjhb		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
164130980Sjhb	}
165130980Sjhb}
166130980Sjhb
167130980Sjhbstatic void
168130980Sjhbinit_local(struct hast_resource *res)
169130980Sjhb{
170130980Sjhb
171130980Sjhb	if (metadata_read(res, true) < 0)
172130980Sjhb		exit(EX_NOINPUT);
173130980Sjhb}
174130980Sjhb
175130980Sjhbstatic void
176130980Sjhbinit_remote(struct hast_resource *res, struct nv *nvin)
177130980Sjhb{
178130980Sjhb	uint64_t resuid;
179121986Sjhb	struct nv *nvout;
180151979Sjhb	unsigned char *map;
181130980Sjhb	size_t mapsize;
182130980Sjhb
183151979Sjhb	map = NULL;
184151979Sjhb	mapsize = 0;
185130980Sjhb	nvout = nv_alloc();
186130980Sjhb	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
187151979Sjhb	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
188130980Sjhb	resuid = nv_get_uint64(nvin, "resuid");
189130980Sjhb	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
190151979Sjhb	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
191130980Sjhb	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
192130980Sjhb	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
193151979Sjhb	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
194130980Sjhb	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
195130980Sjhb	map = malloc(mapsize);
196130980Sjhb	if (map == NULL) {
197130980Sjhb		pjdlog_exitx(EX_TEMPFAIL,
198151979Sjhb		    "Unable to allocate memory (%zu bytes) for activemap.",
199130980Sjhb		    mapsize);
200130980Sjhb	}
201130980Sjhb	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
202130980Sjhb	/*
203121986Sjhb	 * When we work as primary and secondary is missing we will increase
204121986Sjhb	 * localcnt in our metadata. When secondary is connected and synced
205121986Sjhb	 * we make localcnt be equal to remotecnt, which means nodes are more
206121986Sjhb	 * or less in sync.
207121986Sjhb	 * Split-brain condition is when both nodes are not able to communicate
208121986Sjhb	 * and are both configured as primary nodes. In turn, they can both
209121986Sjhb	 * make incompatible changes to the data and we have to detect that.
210121986Sjhb	 * Under split-brain condition we will increase our localcnt on first
211157541Sjhb	 * write and remote node will increase its localcnt on first write.
212121986Sjhb	 * When we connect we can see that primary's localcnt is greater than
213121986Sjhb	 * our remotecnt (primary was modified while we weren't watching) and
214121986Sjhb	 * our localcnt is greater than primary's remotecnt (we were modified
215121986Sjhb	 * while primary wasn't watching).
216121986Sjhb	 * There are many possible combinations which are all gathered below.
217121986Sjhb	 * Don't pay too much attention to exact numbers, the more important
218121986Sjhb	 * is to compare them. We compare secondary's local with primary's
219121986Sjhb	 * remote and secondary's remote with primary's local.
220133017Sscottl	 * Note that every case where primary's localcnt is smaller than
221121986Sjhb	 * secondary's remotecnt and where secondary's localcnt is smaller than
222121986Sjhb	 * primary's remotecnt should be impossible in practise. We will perform
223121986Sjhb	 * full synchronization then. Those cases are marked with an asterisk.
224121986Sjhb	 * Regular synchronization means that only extents marked as dirty are
225121986Sjhb	 * synchronized (regular synchronization).
226121986Sjhb	 *
227121986Sjhb	 * SECONDARY METADATA PRIMARY METADATA
228157541Sjhb	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
229121986Sjhb	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
230121986Sjhb	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
231121986Sjhb	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
232121986Sjhb	 *                                       regular sync from secondary.
233133017Sscottl	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
234133017Sscottl	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
235133017Sscottl	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
236133017Sscottl	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
237121986Sjhb	 *                                       regular sync from primary.
238121986Sjhb	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
239121986Sjhb	 */
240121986Sjhb	if (res->hr_resuid == 0) {
241121986Sjhb		/*
242121986Sjhb		 * Provider is used for the first time. Initialize everything.
243122148Sjhb		 */
244133017Sscottl		assert(res->hr_secondary_localcnt == 0);
245121986Sjhb		res->hr_resuid = resuid;
246121986Sjhb		if (metadata_write(res) < 0)
247121986Sjhb			exit(EX_NOINPUT);
248129964Sjhb		memset(map, 0xff, mapsize);
249129964Sjhb		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
250129964Sjhb	} else if (
251129964Sjhb	    /* Is primary is out-of-date? */
252129964Sjhb	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
253129964Sjhb	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
254129964Sjhb	    /* Node are more or less in sync? */
255129964Sjhb	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
256129964Sjhb	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
257151979Sjhb	    /* Is secondary is out-of-date? */
258151979Sjhb	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
259151979Sjhb	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
260151979Sjhb		/*
261208915Sjhb		 * Nodes are more or less in sync or one of the nodes is
262151979Sjhb		 * out-of-date.
263151979Sjhb		 * It doesn't matter at this point which one, we just have to
264129964Sjhb		 * send out local bitmap to the remote node.
265129964Sjhb		 */
266129964Sjhb		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
267129964Sjhb		    (ssize_t)mapsize) {
268129964Sjhb			pjdlog_exit(LOG_ERR, "Unable to read activemap");
269129964Sjhb		}
270129964Sjhb		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
271129964Sjhb		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
272129964Sjhb			/* Primary is out-of-date, sync from secondary. */
273129964Sjhb			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
274156124Sjhb		} else {
275156124Sjhb			/*
276129964Sjhb			 * Secondary is out-of-date or counts match.
277129964Sjhb			 * Sync from primary.
278129964Sjhb			 */
279129964Sjhb			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
280129964Sjhb		}
281129964Sjhb	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
282129964Sjhb	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
283129964Sjhb		/*
284129964Sjhb		 * Not good, we have split-brain condition.
285129964Sjhb		 */
286129964Sjhb		pjdlog_error("Split-brain detected, exiting.");
287129964Sjhb		nv_add_string(nvout, "Split-brain condition!", "errmsg");
288151979Sjhb		free(map);
289151979Sjhb		map = NULL;
290129964Sjhb		mapsize = 0;
291148538Sjhb	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
292129964Sjhb	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
293129964Sjhb		/*
294151979Sjhb		 * This should never happen in practise, but we will perform
295129964Sjhb		 * full synchronization.
296129964Sjhb		 */
297129964Sjhb		assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
298129964Sjhb		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
299151979Sjhb		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
300129964Sjhb		    METADATA_SIZE, res->hr_extentsize,
301129964Sjhb		    res->hr_local_sectorsize);
302129964Sjhb		memset(map, 0xff, mapsize);
303129964Sjhb		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
304129964Sjhb			/* In this one of five cases sync from secondary. */
305151979Sjhb			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
306151979Sjhb		} else {
307156124Sjhb			/* For the rest four cases sync from primary. */
308129964Sjhb			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
309129964Sjhb		}
310129964Sjhb		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
311157541Sjhb		    (uintmax_t)res->hr_primary_localcnt,
312129964Sjhb		    (uintmax_t)res->hr_primary_remotecnt,
313129964Sjhb		    (uintmax_t)res->hr_secondary_localcnt,
314129964Sjhb		    (uintmax_t)res->hr_secondary_remotecnt);
315129964Sjhb	}
316129964Sjhb	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) {
317129964Sjhb		pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s",
318129964Sjhb		    res->hr_remoteaddr);
319195249Sjhb		nv_free(nvout);
320156124Sjhb		exit(EX_TEMPFAIL);
321121986Sjhb	}
322156124Sjhb	nv_free(nvout);
323156124Sjhb	if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
324195415Sjhb	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
325187880Sjeff		/* Exit on split-brain. */
326121986Sjhb		exit(EX_CONFIG);
327187880Sjeff	}
328187880Sjeff}
329187880Sjeff
330187880Sjeffvoid
331187880Sjeffhastd_secondary(struct hast_resource *res, struct nv *nvin)
332187880Sjeff{
333187880Sjeff	pthread_t td;
334187880Sjeff	pid_t pid;
335187880Sjeff	int error;
336187880Sjeff
337187880Sjeff	/*
338187880Sjeff	 * Create communication channel between parent and child.
339195249Sjhb	 */
340187880Sjeff	if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
341187880Sjeff		KEEP_ERRNO((void)pidfile_remove(pfh));
342187880Sjeff		pjdlog_exit(EX_OSERR,
343187880Sjeff		    "Unable to create control sockets between parent and child");
344187880Sjeff	}
345195415Sjhb
346195415Sjhb	pid = fork();
347195249Sjhb	if (pid < 0) {
348195249Sjhb		KEEP_ERRNO((void)pidfile_remove(pfh));
349208915Sjhb		pjdlog_exit(EX_OSERR, "Unable to fork");
350208915Sjhb	}
351208915Sjhb
352208915Sjhb	if (pid > 0) {
353208915Sjhb		/* This is parent. */
354208915Sjhb		proto_close(res->hr_remotein);
355208915Sjhb		res->hr_remotein = NULL;
356208991Smav		proto_close(res->hr_remoteout);
357208915Sjhb		res->hr_remoteout = NULL;
358208915Sjhb		res->hr_workerpid = pid;
359216679Sjhb		return;
360208915Sjhb	}
361216679Sjhb
362208915Sjhb	(void)pidfile_close(pfh);
363208915Sjhb	hook_fini();
364195415Sjhb
365195415Sjhb	setproctitle("%s (secondary)", res->hr_name);
366195415Sjhb
367195415Sjhb	signal(SIGHUP, SIG_DFL);
368121986Sjhb	signal(SIGCHLD, SIG_DFL);
369187880Sjeff
370187880Sjeff	/* Error in setting timeout is not critical, but why should it fail? */
371151979Sjhb	if (proto_timeout(res->hr_remotein, 0) < 0)
372187880Sjeff		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
373187880Sjeff	if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0)
374121986Sjhb		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
375129964Sjhb
376208915Sjhb	init_local(res);
377208915Sjhb	init_remote(res, nvin);
378187880Sjeff	init_environment();
379187880Sjeff
380187880Sjeff	error = pthread_create(&td, NULL, recv_thread, res);
381187880Sjeff	assert(error == 0);
382195415Sjhb	error = pthread_create(&td, NULL, disk_thread, res);
383195415Sjhb	assert(error == 0);
384195415Sjhb	error = pthread_create(&td, NULL, send_thread, res);
385187880Sjeff	assert(error == 0);
386195415Sjhb	(void)ctrl_thread(res);
387195249Sjhb}
388121986Sjhb
389121986Sjhbstatic void
390121986Sjhbreqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...)
391121986Sjhb{
392121986Sjhb	char msg[1024];
393121986Sjhb	va_list ap;
394121986Sjhb	int len;
395187880Sjeff
396195249Sjhb	va_start(ap, fmt);
397195249Sjhb	len = vsnprintf(msg, sizeof(msg), fmt, ap);
398195249Sjhb	va_end(ap);
399187880Sjeff	if ((size_t)len < sizeof(msg)) {
400121986Sjhb		switch (hio->hio_cmd) {
401121986Sjhb		case HIO_READ:
402187880Sjeff			(void)snprintf(msg + len, sizeof(msg) - len,
403169391Sjhb			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
404169391Sjhb			    (uintmax_t)hio->hio_length);
405169391Sjhb			break;
406169391Sjhb		case HIO_DELETE:
407169391Sjhb			(void)snprintf(msg + len, sizeof(msg) - len,
408169391Sjhb			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
409169391Sjhb			    (uintmax_t)hio->hio_length);
410169391Sjhb			break;
411169391Sjhb		case HIO_FLUSH:
412187880Sjeff			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
413208915Sjhb			break;
414169391Sjhb		case HIO_WRITE:
415169391Sjhb			(void)snprintf(msg + len, sizeof(msg) - len,
416169391Sjhb			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
417208915Sjhb			    (uintmax_t)hio->hio_length);
418187880Sjeff			break;
419169391Sjhb		case HIO_KEEPALIVE:
420169391Sjhb			(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
421169391Sjhb			break;
422121986Sjhb		default:
423121986Sjhb			(void)snprintf(msg + len, sizeof(msg) - len,
424121986Sjhb			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
425121986Sjhb			break;
426121986Sjhb		}
427121986Sjhb	}
428151979Sjhb	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
429121986Sjhb}
430121986Sjhb
431121986Sjhbstatic int
432121986Sjhbrequnpack(struct hast_resource *res, struct hio *hio)
433121986Sjhb{
434121986Sjhb
435121986Sjhb	hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd");
436151979Sjhb	if (hio->hio_cmd == 0) {
437151979Sjhb		pjdlog_error("Header contains no 'cmd' field.");
438121986Sjhb		hio->hio_error = EINVAL;
439121986Sjhb		goto end;
440121986Sjhb	}
441128931Sjhb	switch (hio->hio_cmd) {
442128931Sjhb	case HIO_KEEPALIVE:
443128931Sjhb		break;
444128931Sjhb	case HIO_READ:
445128931Sjhb	case HIO_WRITE:
446128931Sjhb	case HIO_DELETE:
447130984Sjhb		hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset");
448128931Sjhb		if (nv_error(hio->hio_nv) != 0) {
449128931Sjhb			pjdlog_error("Header is missing 'offset' field.");
450128931Sjhb			hio->hio_error = EINVAL;
451128931Sjhb			goto end;
452128931Sjhb		}
453130984Sjhb		hio->hio_length = nv_get_uint64(hio->hio_nv, "length");
454130984Sjhb		if (nv_error(hio->hio_nv) != 0) {
455130984Sjhb			pjdlog_error("Header is missing 'length' field.");
456130984Sjhb			hio->hio_error = EINVAL;
457140452Sjhb			goto end;
458128931Sjhb		}
459208915Sjhb		if (hio->hio_length == 0) {
460130984Sjhb			pjdlog_error("Data length is zero.");
461130984Sjhb			hio->hio_error = EINVAL;
462130984Sjhb			goto end;
463130984Sjhb		}
464130984Sjhb		if (hio->hio_length > MAXPHYS) {
465130984Sjhb			pjdlog_error("Data length is too large (%ju > %ju).",
466130984Sjhb			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
467130984Sjhb			hio->hio_error = EINVAL;
468130984Sjhb			goto end;
469130984Sjhb		}
470130984Sjhb		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
471130984Sjhb			pjdlog_error("Offset %ju is not multiple of sector size.",
472130984Sjhb			    (uintmax_t)hio->hio_offset);
473130984Sjhb			hio->hio_error = EINVAL;
474130984Sjhb			goto end;
475130984Sjhb		}
476130984Sjhb		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
477130984Sjhb			pjdlog_error("Length %ju is not multiple of sector size.",
478130984Sjhb			    (uintmax_t)hio->hio_length);
479130984Sjhb			hio->hio_error = EINVAL;
480130984Sjhb			goto end;
481208915Sjhb		}
482128931Sjhb		if (hio->hio_offset + hio->hio_length >
483128931Sjhb		    (uint64_t)res->hr_datasize) {
484128931Sjhb			pjdlog_error("Data offset is too large (%ju > %ju).",
485121986Sjhb			    (uintmax_t)(hio->hio_offset + hio->hio_length),
486163219Sjhb			    (uintmax_t)res->hr_datasize);
487121986Sjhb			hio->hio_error = EINVAL;
488163219Sjhb			goto end;
489163219Sjhb		}
490121986Sjhb		break;
491208915Sjhb	default:
492163219Sjhb		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
493163219Sjhb		    hio->hio_cmd);
494208915Sjhb		hio->hio_error = EINVAL;
495121986Sjhb		goto end;
496121986Sjhb	}
497121986Sjhb	hio->hio_error = 0;
498121986Sjhbend:
499121986Sjhb	return (hio->hio_error);
500121986Sjhb}
501167247Sjhb
502121986Sjhb/*
503121986Sjhb * Thread receives requests from the primary node.
504121986Sjhb */
505121986Sjhbstatic void *
506121986Sjhbrecv_thread(void *arg)
507121986Sjhb{
508121986Sjhb	struct hast_resource *res = arg;
509145054Sjhb	struct hio *hio;
510156920Sjhb
511121986Sjhb	for (;;) {
512145054Sjhb		pjdlog_debug(2, "recv: Taking free request.");
513121986Sjhb		QUEUE_TAKE(free, hio);
514145054Sjhb		pjdlog_debug(2, "recv: (%p) Got request.", hio);
515145054Sjhb		if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) {
516152528Sjhb			pjdlog_exit(EX_TEMPFAIL,
517145057Sjhb			    "Unable to receive request header");
518145054Sjhb		}
519145054Sjhb		if (requnpack(res, hio) != 0) {
520145054Sjhb			pjdlog_debug(2,
521145054Sjhb			    "recv: (%p) Moving request to the send queue.",
522145054Sjhb			    hio);
523121986Sjhb			QUEUE_INSERT(send, hio);
524121986Sjhb			continue;
525121986Sjhb		}
526121986Sjhb		reqlog(LOG_DEBUG, 2, -1, hio,
527121986Sjhb		    "recv: (%p) Got request header: ", hio);
528195249Sjhb		if (hio->hio_cmd == HIO_KEEPALIVE) {
529121986Sjhb			pjdlog_debug(2,
530121986Sjhb			    "recv: (%p) Moving request to the free queue.",
531121986Sjhb			    hio);
532121986Sjhb			nv_free(hio->hio_nv);
533121986Sjhb			QUEUE_INSERT(free, hio);
534121986Sjhb			continue;
535121986Sjhb		} else if (hio->hio_cmd == HIO_WRITE) {
536121986Sjhb			if (hast_proto_recv_data(res, res->hr_remotein,
537121986Sjhb			    hio->hio_nv, hio->hio_data, MAXPHYS) < 0) {
538121986Sjhb				pjdlog_exit(EX_TEMPFAIL,
539121986Sjhb				    "Unable to receive reply data");
540121986Sjhb			}
541170340Sjhb		}
542121986Sjhb		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
543121986Sjhb		    hio);
544121986Sjhb		QUEUE_INSERT(disk, hio);
545122124Sjhb	}
546121986Sjhb	/* NOTREACHED */
547121986Sjhb	return (NULL);
548167747Sjhb}
549121986Sjhb
550121986Sjhb/*
551121986Sjhb * Thread reads from or writes to local component and also handles DELETE and
552121986Sjhb * FLUSH requests.
553121986Sjhb */
554121986Sjhbstatic void *
555121986Sjhbdisk_thread(void *arg)
556121986Sjhb{
557121986Sjhb	struct hast_resource *res = arg;
558121986Sjhb	struct hio *hio;
559121986Sjhb	ssize_t ret;
560151979Sjhb	bool clear_activemap;
561121986Sjhb
562121986Sjhb	clear_activemap = true;
563145080Sjhb
564142256Sjhb	for (;;) {
565130980Sjhb		pjdlog_debug(2, "disk: Taking request.");
566121986Sjhb		QUEUE_TAKE(disk, hio);
567151979Sjhb		while (clear_activemap) {
568130980Sjhb			unsigned char *map;
569151979Sjhb			size_t mapsize;
570130980Sjhb
571121986Sjhb			/*
572121986Sjhb			 * When first request is received, it means that primary
573121986Sjhb			 * already received our activemap, merged it and stored
574121986Sjhb			 * locally. We can now safely clear our activemap.
575130980Sjhb			 */
576121986Sjhb			mapsize =
577121986Sjhb			    activemap_calc_ondisk_size(res->hr_local_mediasize -
578121986Sjhb			    METADATA_SIZE, res->hr_extentsize,
579121986Sjhb			    res->hr_local_sectorsize);
580121986Sjhb			map = calloc(1, mapsize);
581121986Sjhb			if (map == NULL) {
582156124Sjhb				pjdlog_warning("Unable to allocate memory to clear local activemap.");
583156124Sjhb				break;
584121986Sjhb			}
585156124Sjhb			if (pwrite(res->hr_localfd, map, mapsize,
586121986Sjhb			    METADATA_SIZE) != (ssize_t)mapsize) {
587121986Sjhb				pjdlog_errno(LOG_WARNING,
588121986Sjhb				    "Unable to store cleared activemap");
589121986Sjhb				free(map);
590121986Sjhb				break;
591121986Sjhb			}
592121986Sjhb			free(map);
593121986Sjhb			clear_activemap = false;
594121986Sjhb			pjdlog_debug(1, "Local activemap cleared.");
595121986Sjhb		}
596121986Sjhb		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
597121986Sjhb		/* Handle the actual request. */
598121986Sjhb		switch (hio->hio_cmd) {
599121986Sjhb		case HIO_READ:
600121986Sjhb			ret = pread(res->hr_localfd, hio->hio_data,
601121986Sjhb			    hio->hio_length,
602151979Sjhb			    hio->hio_offset + res->hr_localoff);
603121986Sjhb			if (ret < 0)
604121986Sjhb				hio->hio_error = errno;
605121986Sjhb			else if (ret != (int64_t)hio->hio_length)
606121986Sjhb				hio->hio_error = EIO;
607121986Sjhb			else
608121986Sjhb				hio->hio_error = 0;
609121986Sjhb			break;
610121986Sjhb		case HIO_WRITE:
611121986Sjhb			ret = pwrite(res->hr_localfd, hio->hio_data,
612121986Sjhb			    hio->hio_length,
613151979Sjhb			    hio->hio_offset + res->hr_localoff);
614121986Sjhb			if (ret < 0)
615151979Sjhb				hio->hio_error = errno;
616121986Sjhb			else if (ret != (int64_t)hio->hio_length)
617121986Sjhb				hio->hio_error = EIO;
618121986Sjhb			else
619121986Sjhb				hio->hio_error = 0;
620121986Sjhb			break;
621121986Sjhb		case HIO_DELETE:
622121986Sjhb			ret = g_delete(res->hr_localfd,
623121986Sjhb			    hio->hio_offset + res->hr_localoff,
624121986Sjhb			    hio->hio_length);
625121986Sjhb			if (ret < 0)
626121986Sjhb				hio->hio_error = errno;
627121986Sjhb			else
628121986Sjhb				hio->hio_error = 0;
629151979Sjhb			break;
630121986Sjhb		case HIO_FLUSH:
631151979Sjhb			ret = g_flush(res->hr_localfd);
632121986Sjhb			if (ret < 0)
633121986Sjhb				hio->hio_error = errno;
634121986Sjhb			else
635121986Sjhb				hio->hio_error = 0;
636121986Sjhb			break;
637121986Sjhb		}
638121986Sjhb		if (hio->hio_error != 0) {
639130980Sjhb			reqlog(LOG_ERR, 0, hio->hio_error, hio,
640130980Sjhb			    "Request failed: ");
641130980Sjhb		}
642130980Sjhb		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
643130980Sjhb		    hio);
644130980Sjhb		QUEUE_INSERT(send, hio);
645130980Sjhb	}
646130980Sjhb	/* NOTREACHED */
647130980Sjhb	return (NULL);
648151979Sjhb}
649130980Sjhb
650164358Sjhb/*
651164358Sjhb * Thread sends requests back to primary node.
652130980Sjhb */
653130980Sjhbstatic void *
654130980Sjhbsend_thread(void *arg)
655130980Sjhb{
656130980Sjhb	struct hast_resource *res = arg;
657130980Sjhb	struct nv *nvout;
658130980Sjhb	struct hio *hio;
659130980Sjhb	void *data;
660121986Sjhb	size_t length;
661121986Sjhb
662121986Sjhb	for (;;) {
663121986Sjhb		pjdlog_debug(2, "send: Taking request.");
664121986Sjhb		QUEUE_TAKE(send, hio);
665121986Sjhb		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
666121986Sjhb		nvout = nv_alloc();
667151979Sjhb		/* Copy sequence number. */
668130980Sjhb		nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq");
669151979Sjhb		switch (hio->hio_cmd) {
670121986Sjhb		case HIO_READ:
671130980Sjhb			if (hio->hio_error == 0) {
672151979Sjhb				data = hio->hio_data;
673121986Sjhb				length = hio->hio_length;
674121986Sjhb				break;
675121986Sjhb			}
676121986Sjhb			/*
677121986Sjhb			 * We send no data in case of an error.
678121986Sjhb			 */
679121986Sjhb			/* FALLTHROUGH */
680121986Sjhb		case HIO_DELETE:
681121986Sjhb		case HIO_FLUSH:
682121986Sjhb		case HIO_WRITE:
683121986Sjhb			data = NULL;
684121986Sjhb			length = 0;
685121986Sjhb			break;
686121986Sjhb		default:
687121986Sjhb			abort();
688121986Sjhb			break;
689121986Sjhb		}
690151979Sjhb		if (hio->hio_error != 0)
691130980Sjhb			nv_add_int16(nvout, hio->hio_error, "error");
692151979Sjhb		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
693121986Sjhb		    length) < 0) {
694130980Sjhb			pjdlog_exit(EX_TEMPFAIL, "Unable to send reply.");
695151979Sjhb		}
696121986Sjhb		nv_free(nvout);
697121986Sjhb		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
698121986Sjhb		    hio);
699121986Sjhb		nv_free(hio->hio_nv);
700121986Sjhb		hio->hio_error = 0;
701121986Sjhb		QUEUE_INSERT(free, hio);
702121986Sjhb	}
703121986Sjhb	/* NOTREACHED */
704121986Sjhb	return (NULL);
705121986Sjhb}
706121986Sjhb