secondary.c revision 211977
11590Srgrimes/*-
21590Srgrimes * Copyright (c) 2009-2010 The FreeBSD Foundation
31590Srgrimes * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
41590Srgrimes * All rights reserved.
51590Srgrimes *
61590Srgrimes * This software was developed by Pawel Jakub Dawidek under sponsorship from
71590Srgrimes * the FreeBSD Foundation.
81590Srgrimes *
91590Srgrimes * Redistribution and use in source and binary forms, with or without
101590Srgrimes * modification, are permitted provided that the following conditions
111590Srgrimes * are met:
121590Srgrimes * 1. Redistributions of source code must retain the above copyright
131590Srgrimes *    notice, this list of conditions and the following disclaimer.
141590Srgrimes * 2. Redistributions in binary form must reproduce the above copyright
151590Srgrimes *    notice, this list of conditions and the following disclaimer in the
161590Srgrimes *    documentation and/or other materials provided with the distribution.
171590Srgrimes *
181590Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
191590Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
201590Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
211590Srgrimes * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
221590Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
231590Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
241590Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
251590Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
261590Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
271590Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
281590Srgrimes * SUCH DAMAGE.
291590Srgrimes */
301590Srgrimes
311590Srgrimes#include <sys/cdefs.h>
321590Srgrimes__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 211977 2010-08-29 21:41:53Z pjd $");
331590Srgrimes
341590Srgrimes#include <sys/param.h>
351590Srgrimes#include <sys/time.h>
361590Srgrimes#include <sys/bio.h>
3794609Sdwmalone#include <sys/disk.h>
3894609Sdwmalone#include <sys/stat.h>
3994609Sdwmalone
4094609Sdwmalone#include <assert.h>
4194609Sdwmalone#include <err.h>
4294609Sdwmalone#include <errno.h>
4387712Smarkm#include <fcntl.h>
4487712Smarkm#include <libgeom.h>
4587712Smarkm#include <pthread.h>
461590Srgrimes#include <stdint.h>
471590Srgrimes#include <stdio.h>
481590Srgrimes#include <string.h>
491590Srgrimes#include <sysexits.h>
5087712Smarkm#include <unistd.h>
5187712Smarkm
521590Srgrimes#include <activemap.h>
531590Srgrimes#include <nv.h>
541590Srgrimes#include <pjdlog.h>
551590Srgrimes
5687712Smarkm#include "control.h"
5787712Smarkm#include "hast.h"
581590Srgrimes#include "hast_proto.h"
591590Srgrimes#include "hastd.h"
6092922Simp#include "hooks.h"
6192922Simp#include "metadata.h"
621590Srgrimes#include "proto.h"
631590Srgrimes#include "subr.h"
641590Srgrimes#include "synch.h"
651590Srgrimes
661590Srgrimesstruct hio {
671590Srgrimes	uint64_t 	 hio_seq;
681590Srgrimes	int	 	 hio_error;
691590Srgrimes	struct nv	*hio_nv;
701590Srgrimes	void		*hio_data;
711590Srgrimes	uint8_t		 hio_cmd;
721590Srgrimes	uint64_t	 hio_offset;
731590Srgrimes	uint64_t	 hio_length;
741590Srgrimes	TAILQ_ENTRY(hio) hio_next;
751590Srgrimes};
761590Srgrimes
771590Srgrimes/*
781590Srgrimes * Free list holds unused structures. When free list is empty, we have to wait
791590Srgrimes * until some in-progress requests are freed.
801590Srgrimes */
811590Srgrimesstatic TAILQ_HEAD(, hio) hio_free_list;
821590Srgrimesstatic pthread_mutex_t hio_free_list_lock;
831590Srgrimesstatic pthread_cond_t hio_free_list_cond;
841590Srgrimes/*
8582762Sache * Disk thread (the one that do I/O requests) takes requests from this list.
861590Srgrimes */
871590Srgrimesstatic TAILQ_HEAD(, hio) hio_disk_list;
881590Srgrimesstatic pthread_mutex_t hio_disk_list_lock;
891590Srgrimesstatic pthread_cond_t hio_disk_list_cond;
901590Srgrimes/*
911590Srgrimes * There is one recv list for every component, although local components don't
921590Srgrimes * use recv lists as local requests are done synchronously.
931590Srgrimes */
941590Srgrimesstatic TAILQ_HEAD(, hio) hio_send_list;
951590Srgrimesstatic pthread_mutex_t hio_send_list_lock;
961590Srgrimesstatic pthread_cond_t hio_send_list_cond;
971590Srgrimes
981590Srgrimes/*
991590Srgrimes * Maximum number of outstanding I/O requests.
1001590Srgrimes */
1011590Srgrimes#define	HAST_HIO_MAX	256
1021590Srgrimes
1031590Srgrimesstatic void *recv_thread(void *arg);
1041590Srgrimesstatic void *disk_thread(void *arg);
1051590Srgrimesstatic void *send_thread(void *arg);
10687712Smarkm
10794178Smurray#define	QUEUE_INSERT(name, hio)	do {					\
1081590Srgrimes	bool _wakeup;							\
1091590Srgrimes									\
1101590Srgrimes	mtx_lock(&hio_##name##_list_lock);				\
1111590Srgrimes	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
1121590Srgrimes	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);		\
1131590Srgrimes	mtx_unlock(&hio_##name##_list_lock);				\
1141590Srgrimes	if (_wakeup)							\
1151590Srgrimes		cv_signal(&hio_##name##_list_cond);			\
1161590Srgrimes} while (0)
11769552Sasmodai#define	QUEUE_TAKE(name, hio)	do {					\
11882762Sache	mtx_lock(&hio_##name##_list_lock);				\
1191590Srgrimes	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
1201590Srgrimes		cv_wait(&hio_##name##_list_cond,			\
12174876Sdwmalone		    &hio_##name##_list_lock);				\
12274876Sdwmalone	}								\
12374876Sdwmalone	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);		\
1241590Srgrimes	mtx_unlock(&hio_##name##_list_lock);				\
1251590Srgrimes} while (0)
1261590Srgrimes
1271590Srgrimesstatic void
12874876Sdwmaloneinit_environment(void)
12974876Sdwmalone{
13074876Sdwmalone	struct hio *hio;
1311590Srgrimes	unsigned int ii;
13274876Sdwmalone
13374876Sdwmalone	/*
13474876Sdwmalone	 * Initialize lists, their locks and theirs condition variables.
13574876Sdwmalone	 */
13674876Sdwmalone	TAILQ_INIT(&hio_free_list);
13774876Sdwmalone	mtx_init(&hio_free_list_lock);
13874876Sdwmalone	cv_init(&hio_free_list_cond);
13974876Sdwmalone	TAILQ_INIT(&hio_disk_list);
14074876Sdwmalone	mtx_init(&hio_disk_list_lock);
14174876Sdwmalone	cv_init(&hio_disk_list_cond);
14274876Sdwmalone	TAILQ_INIT(&hio_send_list);
14374876Sdwmalone	mtx_init(&hio_send_list_lock);
14474876Sdwmalone	cv_init(&hio_send_list_cond);
14574876Sdwmalone
14674876Sdwmalone	/*
14774876Sdwmalone	 * Allocate requests pool and initialize requests.
14874876Sdwmalone	 */
14974876Sdwmalone	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
15074876Sdwmalone		hio = malloc(sizeof(*hio));
15174876Sdwmalone		if (hio == NULL) {
15274876Sdwmalone			pjdlog_exitx(EX_TEMPFAIL,
15374876Sdwmalone			    "Unable to allocate memory (%zu bytes) for hio request.",
15474876Sdwmalone			    sizeof(*hio));
1551590Srgrimes		}
15674876Sdwmalone		hio->hio_error = 0;
15774876Sdwmalone		hio->hio_data = malloc(MAXPHYS);
15874876Sdwmalone		if (hio->hio_data == NULL) {
15974876Sdwmalone			pjdlog_exitx(EX_TEMPFAIL,
16074876Sdwmalone			    "Unable to allocate memory (%zu bytes) for gctl_data.",
16174876Sdwmalone			    (size_t)MAXPHYS);
16274876Sdwmalone		}
1631590Srgrimes		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
16474876Sdwmalone	}
16574876Sdwmalone}
16674876Sdwmalone
16774876Sdwmalonestatic void
16874876Sdwmaloneinit_local(struct hast_resource *res)
16974876Sdwmalone{
17074876Sdwmalone
1711590Srgrimes	if (metadata_read(res, true) < 0)
17274876Sdwmalone		exit(EX_NOINPUT);
17374876Sdwmalone}
17417833Sadam
17574876Sdwmalonestatic void
17674876Sdwmaloneinit_remote(struct hast_resource *res, struct nv *nvin)
17774876Sdwmalone{
17874876Sdwmalone	uint64_t resuid;
1791590Srgrimes	struct nv *nvout;
1801590Srgrimes	unsigned char *map;
1811590Srgrimes	size_t mapsize;
1821590Srgrimes
1831590Srgrimes	map = NULL;
1841590Srgrimes	mapsize = 0;
1851590Srgrimes	nvout = nv_alloc();
1861590Srgrimes	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
1871590Srgrimes	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
1881590Srgrimes	resuid = nv_get_uint64(nvin, "resuid");
1891590Srgrimes	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
1901590Srgrimes	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
1911590Srgrimes	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
1921590Srgrimes	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
1931590Srgrimes	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
1941590Srgrimes	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
1951590Srgrimes	map = malloc(mapsize);
1961590Srgrimes	if (map == NULL) {
1971590Srgrimes		pjdlog_exitx(EX_TEMPFAIL,
1981590Srgrimes		    "Unable to allocate memory (%zu bytes) for activemap.",
1991590Srgrimes		    mapsize);
2001590Srgrimes	}
2011590Srgrimes	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
20269552Sasmodai	/*
20369552Sasmodai	 * When we work as primary and secondary is missing we will increase
20469552Sasmodai	 * localcnt in our metadata. When secondary is connected and synced
2051590Srgrimes	 * we make localcnt be equal to remotecnt, which means nodes are more
2061590Srgrimes	 * or less in sync.
2071590Srgrimes	 * Split-brain condition is when both nodes are not able to communicate
2081590Srgrimes	 * and are both configured as primary nodes. In turn, they can both
2091590Srgrimes	 * make incompatible changes to the data and we have to detect that.
2101590Srgrimes	 * Under split-brain condition we will increase our localcnt on first
2111590Srgrimes	 * write and remote node will increase its localcnt on first write.
2121590Srgrimes	 * When we connect we can see that primary's localcnt is greater than
2131590Srgrimes	 * our remotecnt (primary was modified while we weren't watching) and
2141590Srgrimes	 * our localcnt is greater than primary's remotecnt (we were modified
2151590Srgrimes	 * while primary wasn't watching).
2161590Srgrimes	 * There are many possible combinations which are all gathered below.
21717825Speter	 * Don't pay too much attention to exact numbers, the more important
2181590Srgrimes	 * is to compare them. We compare secondary's local with primary's
2191590Srgrimes	 * remote and secondary's remote with primary's local.
2201590Srgrimes	 * Note that every case where primary's localcnt is smaller than
2211590Srgrimes	 * secondary's remotecnt and where secondary's localcnt is smaller than
2221590Srgrimes	 * primary's remotecnt should be impossible in practise. We will perform
2231590Srgrimes	 * full synchronization then. Those cases are marked with an asterisk.
2241590Srgrimes	 * Regular synchronization means that only extents marked as dirty are
22594609Sdwmalone	 * synchronized (regular synchronization).
22694609Sdwmalone	 *
22794609Sdwmalone	 * SECONDARY METADATA PRIMARY METADATA
22894609Sdwmalone	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
2291590Srgrimes	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
2301590Srgrimes	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
2311590Srgrimes	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
2321590Srgrimes	 *                                       regular sync from secondary.
2331590Srgrimes	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
2341590Srgrimes	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
23517339Sadam	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
23617339Sadam	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
23717339Sadam	 *                                       regular sync from primary.
23817339Sadam	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
23917339Sadam	 */
2401590Srgrimes	if (res->hr_resuid == 0) {
2411590Srgrimes		/*
2421590Srgrimes		 * Provider is used for the first time. Initialize everything.
2431590Srgrimes		 */
2441590Srgrimes		assert(res->hr_secondary_localcnt == 0);
2451590Srgrimes		res->hr_resuid = resuid;
2461590Srgrimes		if (metadata_write(res) < 0)
2471590Srgrimes			exit(EX_NOINPUT);
2481590Srgrimes		memset(map, 0xff, mapsize);
2491590Srgrimes		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
2501590Srgrimes	} else if (
2511590Srgrimes	    /* Is primary is out-of-date? */
2521590Srgrimes	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
2531590Srgrimes	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
2541590Srgrimes	    /* Node are more or less in sync? */
2551590Srgrimes	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
2561590Srgrimes	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
25737453Sbde	    /* Is secondary is out-of-date? */
2581590Srgrimes	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
2591590Srgrimes	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
2601590Srgrimes		/*
2611590Srgrimes		 * Nodes are more or less in sync or one of the nodes is
2621590Srgrimes		 * out-of-date.
2631590Srgrimes		 * It doesn't matter at this point which one, we just have to
2641590Srgrimes		 * send out local bitmap to the remote node.
2651590Srgrimes		 */
2661590Srgrimes		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
2671590Srgrimes		    (ssize_t)mapsize) {
2681590Srgrimes			pjdlog_exit(LOG_ERR, "Unable to read activemap");
2691590Srgrimes		}
2701590Srgrimes		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
2711590Srgrimes		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
2721590Srgrimes			/* Primary is out-of-date, sync from secondary. */
2731590Srgrimes			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
2741590Srgrimes		} else {
2751590Srgrimes			/*
2761590Srgrimes			 * Secondary is out-of-date or counts match.
2771590Srgrimes			 * Sync from primary.
2781590Srgrimes			 */
2791590Srgrimes			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
2801590Srgrimes		}
2811590Srgrimes	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
2821590Srgrimes	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
2831590Srgrimes		/*
2841590Srgrimes		 * Not good, we have split-brain condition.
2851590Srgrimes		 */
2861590Srgrimes		pjdlog_error("Split-brain detected, exiting.");
2871590Srgrimes		nv_add_string(nvout, "Split-brain condition!", "errmsg");
2881590Srgrimes		free(map);
2891590Srgrimes		map = NULL;
2901590Srgrimes		mapsize = 0;
2911590Srgrimes	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
2921590Srgrimes	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
2931590Srgrimes		/*
2941590Srgrimes		 * This should never happen in practise, but we will perform
2951590Srgrimes		 * full synchronization.
296		 */
297		assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
298		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
299		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
300		    METADATA_SIZE, res->hr_extentsize,
301		    res->hr_local_sectorsize);
302		memset(map, 0xff, mapsize);
303		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
304			/* In this one of five cases sync from secondary. */
305			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
306		} else {
307			/* For the rest four cases sync from primary. */
308			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
309		}
310		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
311		    (uintmax_t)res->hr_primary_localcnt,
312		    (uintmax_t)res->hr_primary_remotecnt,
313		    (uintmax_t)res->hr_secondary_localcnt,
314		    (uintmax_t)res->hr_secondary_remotecnt);
315	}
316	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) {
317		pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s",
318		    res->hr_remoteaddr);
319		nv_free(nvout);
320		exit(EX_TEMPFAIL);
321	}
322	nv_free(nvout);
323	if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
324	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
325		/* Exit on split-brain. */
326		exit(EX_CONFIG);
327	}
328}
329
330void
331hastd_secondary(struct hast_resource *res, struct nv *nvin)
332{
333	pthread_t td;
334	pid_t pid;
335	int error;
336
337	/*
338	 * Create communication channel between parent and child.
339	 */
340	if (proto_client("socketpair://", &res->hr_ctrl) < 0) {
341		KEEP_ERRNO((void)pidfile_remove(pfh));
342		pjdlog_exit(EX_OSERR,
343		    "Unable to create control sockets between parent and child");
344	}
345
346	pid = fork();
347	if (pid < 0) {
348		KEEP_ERRNO((void)pidfile_remove(pfh));
349		pjdlog_exit(EX_OSERR, "Unable to fork");
350	}
351
352	if (pid > 0) {
353		/* This is parent. */
354		proto_close(res->hr_remotein);
355		res->hr_remotein = NULL;
356		proto_close(res->hr_remoteout);
357		res->hr_remoteout = NULL;
358		res->hr_workerpid = pid;
359		return;
360	}
361
362	(void)pidfile_close(pfh);
363	hook_fini();
364
365	setproctitle("%s (secondary)", res->hr_name);
366
367	signal(SIGHUP, SIG_DFL);
368	signal(SIGCHLD, SIG_DFL);
369
370	/* Error in setting timeout is not critical, but why should it fail? */
371	if (proto_timeout(res->hr_remotein, 0) < 0)
372		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
373	if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0)
374		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
375
376	init_local(res);
377	init_remote(res, nvin);
378	init_environment();
379
380	error = pthread_create(&td, NULL, recv_thread, res);
381	assert(error == 0);
382	error = pthread_create(&td, NULL, disk_thread, res);
383	assert(error == 0);
384	error = pthread_create(&td, NULL, send_thread, res);
385	assert(error == 0);
386	(void)ctrl_thread(res);
387}
388
389static void
390reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...)
391{
392	char msg[1024];
393	va_list ap;
394	int len;
395
396	va_start(ap, fmt);
397	len = vsnprintf(msg, sizeof(msg), fmt, ap);
398	va_end(ap);
399	if ((size_t)len < sizeof(msg)) {
400		switch (hio->hio_cmd) {
401		case HIO_READ:
402			(void)snprintf(msg + len, sizeof(msg) - len,
403			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
404			    (uintmax_t)hio->hio_length);
405			break;
406		case HIO_DELETE:
407			(void)snprintf(msg + len, sizeof(msg) - len,
408			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
409			    (uintmax_t)hio->hio_length);
410			break;
411		case HIO_FLUSH:
412			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
413			break;
414		case HIO_WRITE:
415			(void)snprintf(msg + len, sizeof(msg) - len,
416			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
417			    (uintmax_t)hio->hio_length);
418			break;
419		case HIO_KEEPALIVE:
420			(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
421			break;
422		default:
423			(void)snprintf(msg + len, sizeof(msg) - len,
424			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
425			break;
426		}
427	}
428	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
429}
430
431static int
432requnpack(struct hast_resource *res, struct hio *hio)
433{
434
435	hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd");
436	if (hio->hio_cmd == 0) {
437		pjdlog_error("Header contains no 'cmd' field.");
438		hio->hio_error = EINVAL;
439		goto end;
440	}
441	switch (hio->hio_cmd) {
442	case HIO_KEEPALIVE:
443		break;
444	case HIO_READ:
445	case HIO_WRITE:
446	case HIO_DELETE:
447		hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset");
448		if (nv_error(hio->hio_nv) != 0) {
449			pjdlog_error("Header is missing 'offset' field.");
450			hio->hio_error = EINVAL;
451			goto end;
452		}
453		hio->hio_length = nv_get_uint64(hio->hio_nv, "length");
454		if (nv_error(hio->hio_nv) != 0) {
455			pjdlog_error("Header is missing 'length' field.");
456			hio->hio_error = EINVAL;
457			goto end;
458		}
459		if (hio->hio_length == 0) {
460			pjdlog_error("Data length is zero.");
461			hio->hio_error = EINVAL;
462			goto end;
463		}
464		if (hio->hio_length > MAXPHYS) {
465			pjdlog_error("Data length is too large (%ju > %ju).",
466			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
467			hio->hio_error = EINVAL;
468			goto end;
469		}
470		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
471			pjdlog_error("Offset %ju is not multiple of sector size.",
472			    (uintmax_t)hio->hio_offset);
473			hio->hio_error = EINVAL;
474			goto end;
475		}
476		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
477			pjdlog_error("Length %ju is not multiple of sector size.",
478			    (uintmax_t)hio->hio_length);
479			hio->hio_error = EINVAL;
480			goto end;
481		}
482		if (hio->hio_offset + hio->hio_length >
483		    (uint64_t)res->hr_datasize) {
484			pjdlog_error("Data offset is too large (%ju > %ju).",
485			    (uintmax_t)(hio->hio_offset + hio->hio_length),
486			    (uintmax_t)res->hr_datasize);
487			hio->hio_error = EINVAL;
488			goto end;
489		}
490		break;
491	default:
492		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
493		    hio->hio_cmd);
494		hio->hio_error = EINVAL;
495		goto end;
496	}
497	hio->hio_error = 0;
498end:
499	return (hio->hio_error);
500}
501
502/*
503 * Thread receives requests from the primary node.
504 */
505static void *
506recv_thread(void *arg)
507{
508	struct hast_resource *res = arg;
509	struct hio *hio;
510
511	for (;;) {
512		pjdlog_debug(2, "recv: Taking free request.");
513		QUEUE_TAKE(free, hio);
514		pjdlog_debug(2, "recv: (%p) Got request.", hio);
515		if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) {
516			pjdlog_exit(EX_TEMPFAIL,
517			    "Unable to receive request header");
518		}
519		if (requnpack(res, hio) != 0) {
520			pjdlog_debug(2,
521			    "recv: (%p) Moving request to the send queue.",
522			    hio);
523			QUEUE_INSERT(send, hio);
524			continue;
525		}
526		reqlog(LOG_DEBUG, 2, -1, hio,
527		    "recv: (%p) Got request header: ", hio);
528		if (hio->hio_cmd == HIO_KEEPALIVE) {
529			pjdlog_debug(2,
530			    "recv: (%p) Moving request to the free queue.",
531			    hio);
532			nv_free(hio->hio_nv);
533			QUEUE_INSERT(free, hio);
534			continue;
535		} else if (hio->hio_cmd == HIO_WRITE) {
536			if (hast_proto_recv_data(res, res->hr_remotein,
537			    hio->hio_nv, hio->hio_data, MAXPHYS) < 0) {
538				pjdlog_exit(EX_TEMPFAIL,
539				    "Unable to receive reply data");
540			}
541		}
542		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
543		    hio);
544		QUEUE_INSERT(disk, hio);
545	}
546	/* NOTREACHED */
547	return (NULL);
548}
549
550/*
551 * Thread reads from or writes to local component and also handles DELETE and
552 * FLUSH requests.
553 */
554static void *
555disk_thread(void *arg)
556{
557	struct hast_resource *res = arg;
558	struct hio *hio;
559	ssize_t ret;
560	bool clear_activemap;
561
562	clear_activemap = true;
563
564	for (;;) {
565		pjdlog_debug(2, "disk: Taking request.");
566		QUEUE_TAKE(disk, hio);
567		while (clear_activemap) {
568			unsigned char *map;
569			size_t mapsize;
570
571			/*
572			 * When first request is received, it means that primary
573			 * already received our activemap, merged it and stored
574			 * locally. We can now safely clear our activemap.
575			 */
576			mapsize =
577			    activemap_calc_ondisk_size(res->hr_local_mediasize -
578			    METADATA_SIZE, res->hr_extentsize,
579			    res->hr_local_sectorsize);
580			map = calloc(1, mapsize);
581			if (map == NULL) {
582				pjdlog_warning("Unable to allocate memory to clear local activemap.");
583				break;
584			}
585			if (pwrite(res->hr_localfd, map, mapsize,
586			    METADATA_SIZE) != (ssize_t)mapsize) {
587				pjdlog_errno(LOG_WARNING,
588				    "Unable to store cleared activemap");
589				free(map);
590				break;
591			}
592			free(map);
593			clear_activemap = false;
594			pjdlog_debug(1, "Local activemap cleared.");
595		}
596		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
597		/* Handle the actual request. */
598		switch (hio->hio_cmd) {
599		case HIO_READ:
600			ret = pread(res->hr_localfd, hio->hio_data,
601			    hio->hio_length,
602			    hio->hio_offset + res->hr_localoff);
603			if (ret < 0)
604				hio->hio_error = errno;
605			else if (ret != (int64_t)hio->hio_length)
606				hio->hio_error = EIO;
607			else
608				hio->hio_error = 0;
609			break;
610		case HIO_WRITE:
611			ret = pwrite(res->hr_localfd, hio->hio_data,
612			    hio->hio_length,
613			    hio->hio_offset + res->hr_localoff);
614			if (ret < 0)
615				hio->hio_error = errno;
616			else if (ret != (int64_t)hio->hio_length)
617				hio->hio_error = EIO;
618			else
619				hio->hio_error = 0;
620			break;
621		case HIO_DELETE:
622			ret = g_delete(res->hr_localfd,
623			    hio->hio_offset + res->hr_localoff,
624			    hio->hio_length);
625			if (ret < 0)
626				hio->hio_error = errno;
627			else
628				hio->hio_error = 0;
629			break;
630		case HIO_FLUSH:
631			ret = g_flush(res->hr_localfd);
632			if (ret < 0)
633				hio->hio_error = errno;
634			else
635				hio->hio_error = 0;
636			break;
637		}
638		if (hio->hio_error != 0) {
639			reqlog(LOG_ERR, 0, hio->hio_error, hio,
640			    "Request failed: ");
641		}
642		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
643		    hio);
644		QUEUE_INSERT(send, hio);
645	}
646	/* NOTREACHED */
647	return (NULL);
648}
649
650/*
651 * Thread sends requests back to primary node.
652 */
653static void *
654send_thread(void *arg)
655{
656	struct hast_resource *res = arg;
657	struct nv *nvout;
658	struct hio *hio;
659	void *data;
660	size_t length;
661
662	for (;;) {
663		pjdlog_debug(2, "send: Taking request.");
664		QUEUE_TAKE(send, hio);
665		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
666		nvout = nv_alloc();
667		/* Copy sequence number. */
668		nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq");
669		switch (hio->hio_cmd) {
670		case HIO_READ:
671			if (hio->hio_error == 0) {
672				data = hio->hio_data;
673				length = hio->hio_length;
674				break;
675			}
676			/*
677			 * We send no data in case of an error.
678			 */
679			/* FALLTHROUGH */
680		case HIO_DELETE:
681		case HIO_FLUSH:
682		case HIO_WRITE:
683			data = NULL;
684			length = 0;
685			break;
686		default:
687			abort();
688			break;
689		}
690		if (hio->hio_error != 0)
691			nv_add_int16(nvout, hio->hio_error, "error");
692		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
693		    length) < 0) {
694			pjdlog_exit(EX_TEMPFAIL, "Unable to send reply.");
695		}
696		nv_free(nvout);
697		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
698		    hio);
699		nv_free(hio->hio_nv);
700		hio->hio_error = 0;
701		QUEUE_INSERT(free, hio);
702	}
703	/* NOTREACHED */
704	return (NULL);
705}
706