secondary.c revision 260007
1/*-
2 * Copyright (c) 2009-2010 The FreeBSD Foundation
3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
4 * All rights reserved.
5 *
6 * This software was developed by Pawel Jakub Dawidek under sponsorship from
7 * the FreeBSD Foundation.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 *    notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 *    notice, this list of conditions and the following disclaimer in the
16 *    documentation and/or other materials provided with the distribution.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 * SUCH DAMAGE.
29 */
30
31#include <sys/cdefs.h>
32__FBSDID("$FreeBSD: stable/9/sbin/hastd/secondary.c 260007 2013-12-28 19:22:23Z trociny $");
33
34#include <sys/param.h>
35#include <sys/time.h>
36#include <sys/bio.h>
37#include <sys/disk.h>
38#include <sys/stat.h>
39
40#include <err.h>
41#include <errno.h>
42#include <fcntl.h>
43#include <libgeom.h>
44#include <pthread.h>
45#include <signal.h>
46#include <stdint.h>
47#include <stdio.h>
48#include <string.h>
49#include <sysexits.h>
50#include <unistd.h>
51
52#include <activemap.h>
53#include <nv.h>
54#include <pjdlog.h>
55
56#include "control.h"
57#include "event.h"
58#include "hast.h"
59#include "hast_proto.h"
60#include "hastd.h"
61#include "hooks.h"
62#include "metadata.h"
63#include "proto.h"
64#include "subr.h"
65#include "synch.h"
66
67struct hio {
68	uint64_t	 hio_seq;
69	int		 hio_error;
70	void		*hio_data;
71	uint8_t		 hio_cmd;
72	uint64_t	 hio_offset;
73	uint64_t	 hio_length;
74	bool		 hio_memsync;
75	TAILQ_ENTRY(hio) hio_next;
76};
77
78static struct hast_resource *gres;
79
80/*
81 * Free list holds unused structures. When free list is empty, we have to wait
82 * until some in-progress requests are freed.
83 */
84static TAILQ_HEAD(, hio) hio_free_list;
85static size_t hio_free_list_size;
86static pthread_mutex_t hio_free_list_lock;
87static pthread_cond_t hio_free_list_cond;
88/*
89 * Disk thread (the one that does I/O requests) takes requests from this list.
90 */
91static TAILQ_HEAD(, hio) hio_disk_list;
92static size_t hio_disk_list_size;
93static pthread_mutex_t hio_disk_list_lock;
94static pthread_cond_t hio_disk_list_cond;
95/*
96 * Thread that sends requests back to primary takes requests from this list.
97 */
98static TAILQ_HEAD(, hio) hio_send_list;
99static size_t hio_send_list_size;
100static pthread_mutex_t hio_send_list_lock;
101static pthread_cond_t hio_send_list_cond;
102
103/*
104 * Maximum number of outstanding I/O requests.
105 */
106#define	HAST_HIO_MAX	256
107
108static void *recv_thread(void *arg);
109static void *disk_thread(void *arg);
110static void *send_thread(void *arg);
111
112#define	QUEUE_INSERT(name, hio)	do {					\
113	mtx_lock(&hio_##name##_list_lock);				\
114	if (TAILQ_EMPTY(&hio_##name##_list))				\
115		cv_broadcast(&hio_##name##_list_cond);			\
116	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);		\
117	hio_##name##_list_size++;					\
118	mtx_unlock(&hio_##name##_list_lock);				\
119} while (0)
120#define	QUEUE_TAKE(name, hio)	do {					\
121	mtx_lock(&hio_##name##_list_lock);				\
122	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
123		cv_wait(&hio_##name##_list_cond,			\
124		    &hio_##name##_list_lock);				\
125	}								\
126	PJDLOG_ASSERT(hio_##name##_list_size != 0);			\
127	hio_##name##_list_size--;					\
128	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);		\
129	mtx_unlock(&hio_##name##_list_lock);				\
130} while (0)
131
132static void
133output_status_aux(struct nv *nvout)
134{
135
136	nv_add_uint64(nvout, (uint64_t)hio_free_list_size, "idle_queue_size");
137	nv_add_uint64(nvout, (uint64_t)hio_disk_list_size, "local_queue_size");
138	nv_add_uint64(nvout, (uint64_t)hio_send_list_size, "send_queue_size");
139}
140
141static void
142hio_clear(struct hio *hio)
143{
144
145	hio->hio_seq = 0;
146	hio->hio_error = 0;
147	hio->hio_cmd = HIO_UNDEF;
148	hio->hio_offset = 0;
149	hio->hio_length = 0;
150	hio->hio_memsync = false;
151}
152
153static void
154hio_copy(const struct hio *srchio, struct hio *dsthio)
155{
156
157	/*
158	 * We don't copy hio_error, hio_data and hio_next fields.
159	 */
160
161	dsthio->hio_seq = srchio->hio_seq;
162	dsthio->hio_cmd = srchio->hio_cmd;
163	dsthio->hio_offset = srchio->hio_offset;
164	dsthio->hio_length = srchio->hio_length;
165	dsthio->hio_memsync = srchio->hio_memsync;
166}
167
168static void
169init_environment(void)
170{
171	struct hio *hio;
172	unsigned int ii;
173
174	/*
175	 * Initialize lists, their locks and theirs condition variables.
176	 */
177	TAILQ_INIT(&hio_free_list);
178	mtx_init(&hio_free_list_lock);
179	cv_init(&hio_free_list_cond);
180	TAILQ_INIT(&hio_disk_list);
181	mtx_init(&hio_disk_list_lock);
182	cv_init(&hio_disk_list_cond);
183	TAILQ_INIT(&hio_send_list);
184	mtx_init(&hio_send_list_lock);
185	cv_init(&hio_send_list_cond);
186
187	/*
188	 * Allocate requests pool and initialize requests.
189	 */
190	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
191		hio = malloc(sizeof(*hio));
192		if (hio == NULL) {
193			pjdlog_exitx(EX_TEMPFAIL,
194			    "Unable to allocate memory (%zu bytes) for hio request.",
195			    sizeof(*hio));
196		}
197		hio->hio_data = malloc(MAXPHYS);
198		if (hio->hio_data == NULL) {
199			pjdlog_exitx(EX_TEMPFAIL,
200			    "Unable to allocate memory (%zu bytes) for gctl_data.",
201			    (size_t)MAXPHYS);
202		}
203		hio_clear(hio);
204		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
205		hio_free_list_size++;
206	}
207}
208
209static void
210init_local(struct hast_resource *res)
211{
212
213	if (metadata_read(res, true) == -1)
214		exit(EX_NOINPUT);
215}
216
217static void
218init_remote(struct hast_resource *res, struct nv *nvin)
219{
220	uint64_t resuid;
221	struct nv *nvout;
222	unsigned char *map;
223	size_t mapsize;
224
225#ifdef notyet
226	/* Setup direction. */
227	if (proto_send(res->hr_remoteout, NULL, 0) == -1)
228		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
229#endif
230
231	nvout = nv_alloc();
232	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
233	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
234	resuid = nv_get_uint64(nvin, "resuid");
235	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
236	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
237	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
238	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
239	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
240	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
241	map = malloc(mapsize);
242	if (map == NULL) {
243		pjdlog_exitx(EX_TEMPFAIL,
244		    "Unable to allocate memory (%zu bytes) for activemap.",
245		    mapsize);
246	}
247	/*
248	 * When we work as primary and secondary is missing we will increase
249	 * localcnt in our metadata. When secondary is connected and synced
250	 * we make localcnt be equal to remotecnt, which means nodes are more
251	 * or less in sync.
252	 * Split-brain condition is when both nodes are not able to communicate
253	 * and are both configured as primary nodes. In turn, they can both
254	 * make incompatible changes to the data and we have to detect that.
255	 * Under split-brain condition we will increase our localcnt on first
256	 * write and remote node will increase its localcnt on first write.
257	 * When we connect we can see that primary's localcnt is greater than
258	 * our remotecnt (primary was modified while we weren't watching) and
259	 * our localcnt is greater than primary's remotecnt (we were modified
260	 * while primary wasn't watching).
261	 * There are many possible combinations which are all gathered below.
262	 * Don't pay too much attention to exact numbers, the more important
263	 * is to compare them. We compare secondary's local with primary's
264	 * remote and secondary's remote with primary's local.
265	 * Note that every case where primary's localcnt is smaller than
266	 * secondary's remotecnt and where secondary's localcnt is smaller than
267	 * primary's remotecnt should be impossible in practise. We will perform
268	 * full synchronization then. Those cases are marked with an asterisk.
269	 * Regular synchronization means that only extents marked as dirty are
270	 * synchronized (regular synchronization).
271	 *
272	 * SECONDARY METADATA PRIMARY METADATA
273	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
274	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
275	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
276	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
277	 *                                       regular sync from secondary.
278	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
279	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
280	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
281	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
282	 *                                       regular sync from primary.
283	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
284	 */
285	if (res->hr_resuid == 0) {
286		/*
287		 * Provider is used for the first time. If primary node done no
288		 * writes yet as well (we will find "virgin" argument) then
289		 * there is no need to synchronize anything. If primary node
290		 * done any writes already we have to synchronize everything.
291		 */
292		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
293		res->hr_resuid = resuid;
294		if (metadata_write(res) == -1)
295			exit(EX_NOINPUT);
296		if (nv_exists(nvin, "virgin")) {
297			free(map);
298			map = NULL;
299			mapsize = 0;
300		} else {
301			memset(map, 0xff, mapsize);
302		}
303		nv_add_int8(nvout, 1, "virgin");
304		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
305	} else if (res->hr_resuid != resuid) {
306		char errmsg[256];
307
308		free(map);
309		(void)snprintf(errmsg, sizeof(errmsg),
310		    "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
311		    (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
312		pjdlog_error("%s", errmsg);
313		nv_add_string(nvout, errmsg, "errmsg");
314		if (hast_proto_send(res, res->hr_remotein, nvout,
315		    NULL, 0) == -1) {
316			pjdlog_exit(EX_TEMPFAIL,
317			    "Unable to send response to %s",
318			    res->hr_remoteaddr);
319		}
320		nv_free(nvout);
321		exit(EX_CONFIG);
322	} else if (
323	    /* Is primary out-of-date? */
324	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
325	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
326	    /* Are the nodes more or less in sync? */
327	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
328	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
329	    /* Is secondary out-of-date? */
330	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
331	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
332		/*
333		 * Nodes are more or less in sync or one of the nodes is
334		 * out-of-date.
335		 * It doesn't matter at this point which one, we just have to
336		 * send out local bitmap to the remote node.
337		 */
338		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
339		    (ssize_t)mapsize) {
340			pjdlog_exit(LOG_ERR, "Unable to read activemap");
341		}
342		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
343		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
344			/* Primary is out-of-date, sync from secondary. */
345			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
346		} else {
347			/*
348			 * Secondary is out-of-date or counts match.
349			 * Sync from primary.
350			 */
351			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
352		}
353	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
354	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
355		/*
356		 * Not good, we have split-brain condition.
357		 */
358		free(map);
359		pjdlog_error("Split-brain detected, exiting.");
360		nv_add_string(nvout, "Split-brain condition!", "errmsg");
361		if (hast_proto_send(res, res->hr_remotein, nvout,
362		    NULL, 0) == -1) {
363			pjdlog_exit(EX_TEMPFAIL,
364			    "Unable to send response to %s",
365			    res->hr_remoteaddr);
366		}
367		nv_free(nvout);
368		/* Exit on split-brain. */
369		event_send(res, EVENT_SPLITBRAIN);
370		exit(EX_CONFIG);
371	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
372	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
373		/*
374		 * This should never happen in practise, but we will perform
375		 * full synchronization.
376		 */
377		PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
378		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
379		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
380		    METADATA_SIZE, res->hr_extentsize,
381		    res->hr_local_sectorsize);
382		memset(map, 0xff, mapsize);
383		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
384			/* In this one of five cases sync from secondary. */
385			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
386		} else {
387			/* For the rest four cases sync from primary. */
388			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
389		}
390		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
391		    (uintmax_t)res->hr_primary_localcnt,
392		    (uintmax_t)res->hr_primary_remotecnt,
393		    (uintmax_t)res->hr_secondary_localcnt,
394		    (uintmax_t)res->hr_secondary_remotecnt);
395	}
396	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
397	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
398		pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
399		    res->hr_remoteaddr);
400	}
401	if (map != NULL)
402		free(map);
403	nv_free(nvout);
404#ifdef notyet
405	/* Setup direction. */
406	if (proto_recv(res->hr_remotein, NULL, 0) == -1)
407		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
408#endif
409}
410
411void
412hastd_secondary(struct hast_resource *res, struct nv *nvin)
413{
414	sigset_t mask;
415	pthread_t td;
416	pid_t pid;
417	int error, mode, debuglevel;
418
419	/*
420	 * Create communication channel between parent and child.
421	 */
422	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
423		KEEP_ERRNO((void)pidfile_remove(pfh));
424		pjdlog_exit(EX_OSERR,
425		    "Unable to create control sockets between parent and child");
426	}
427	/*
428	 * Create communication channel between child and parent.
429	 */
430	if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
431		KEEP_ERRNO((void)pidfile_remove(pfh));
432		pjdlog_exit(EX_OSERR,
433		    "Unable to create event sockets between child and parent");
434	}
435
436	pid = fork();
437	if (pid == -1) {
438		KEEP_ERRNO((void)pidfile_remove(pfh));
439		pjdlog_exit(EX_OSERR, "Unable to fork");
440	}
441
442	if (pid > 0) {
443		/* This is parent. */
444		proto_close(res->hr_remotein);
445		res->hr_remotein = NULL;
446		proto_close(res->hr_remoteout);
447		res->hr_remoteout = NULL;
448		/* Declare that we are receiver. */
449		proto_recv(res->hr_event, NULL, 0);
450		/* Declare that we are sender. */
451		proto_send(res->hr_ctrl, NULL, 0);
452		res->hr_workerpid = pid;
453		return;
454	}
455
456	gres = res;
457	res->output_status_aux = output_status_aux;
458	mode = pjdlog_mode_get();
459	debuglevel = pjdlog_debug_get();
460
461	/* Declare that we are sender. */
462	proto_send(res->hr_event, NULL, 0);
463	/* Declare that we are receiver. */
464	proto_recv(res->hr_ctrl, NULL, 0);
465	descriptors_cleanup(res);
466
467	descriptors_assert(res, mode);
468
469	pjdlog_init(mode);
470	pjdlog_debug_set(debuglevel);
471	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
472	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
473
474	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
475	PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
476
477	/* Error in setting timeout is not critical, but why should it fail? */
478	if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
479		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
480	if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
481		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
482
483	init_local(res);
484	init_environment();
485
486	if (drop_privs(res) != 0)
487		exit(EX_CONFIG);
488	pjdlog_info("Privileges successfully dropped.");
489
490	/*
491	 * Create the control thread before sending any event to the parent,
492	 * as we can deadlock when parent sends control request to worker,
493	 * but worker has no control thread started yet, so parent waits.
494	 * In the meantime worker sends an event to the parent, but parent
495	 * is unable to handle the event, because it waits for control
496	 * request response.
497	 */
498	error = pthread_create(&td, NULL, ctrl_thread, res);
499	PJDLOG_ASSERT(error == 0);
500
501	init_remote(res, nvin);
502	event_send(res, EVENT_CONNECT);
503
504	error = pthread_create(&td, NULL, recv_thread, res);
505	PJDLOG_ASSERT(error == 0);
506	error = pthread_create(&td, NULL, disk_thread, res);
507	PJDLOG_ASSERT(error == 0);
508	(void)send_thread(res);
509}
510
511static void
512reqlog(int loglevel, int debuglevel, int error, struct hio *hio,
513    const char *fmt, ...)
514{
515	char msg[1024];
516	va_list ap;
517	int len;
518
519	va_start(ap, fmt);
520	len = vsnprintf(msg, sizeof(msg), fmt, ap);
521	va_end(ap);
522	if ((size_t)len < sizeof(msg)) {
523		switch (hio->hio_cmd) {
524		case HIO_READ:
525			(void)snprintf(msg + len, sizeof(msg) - len,
526			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
527			    (uintmax_t)hio->hio_length);
528			break;
529		case HIO_DELETE:
530			(void)snprintf(msg + len, sizeof(msg) - len,
531			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
532			    (uintmax_t)hio->hio_length);
533			break;
534		case HIO_FLUSH:
535			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
536			break;
537		case HIO_WRITE:
538			(void)snprintf(msg + len, sizeof(msg) - len,
539			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
540			    (uintmax_t)hio->hio_length);
541			break;
542		case HIO_KEEPALIVE:
543			(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
544			break;
545		default:
546			(void)snprintf(msg + len, sizeof(msg) - len,
547			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
548			break;
549		}
550	}
551	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
552}
553
554static int
555requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
556{
557
558	hio->hio_cmd = nv_get_uint8(nv, "cmd");
559	if (hio->hio_cmd == 0) {
560		pjdlog_error("Header contains no 'cmd' field.");
561		hio->hio_error = EINVAL;
562		goto end;
563	}
564	if (hio->hio_cmd != HIO_KEEPALIVE) {
565		hio->hio_seq = nv_get_uint64(nv, "seq");
566		if (hio->hio_seq == 0) {
567			pjdlog_error("Header contains no 'seq' field.");
568			hio->hio_error = EINVAL;
569			goto end;
570		}
571	}
572	switch (hio->hio_cmd) {
573	case HIO_FLUSH:
574	case HIO_KEEPALIVE:
575		break;
576	case HIO_WRITE:
577		hio->hio_memsync = nv_exists(nv, "memsync");
578		/* FALLTHROUGH */
579	case HIO_READ:
580	case HIO_DELETE:
581		hio->hio_offset = nv_get_uint64(nv, "offset");
582		if (nv_error(nv) != 0) {
583			pjdlog_error("Header is missing 'offset' field.");
584			hio->hio_error = EINVAL;
585			goto end;
586		}
587		hio->hio_length = nv_get_uint64(nv, "length");
588		if (nv_error(nv) != 0) {
589			pjdlog_error("Header is missing 'length' field.");
590			hio->hio_error = EINVAL;
591			goto end;
592		}
593		if (hio->hio_length == 0) {
594			pjdlog_error("Data length is zero.");
595			hio->hio_error = EINVAL;
596			goto end;
597		}
598		if (hio->hio_cmd != HIO_DELETE && hio->hio_length > MAXPHYS) {
599			pjdlog_error("Data length is too large (%ju > %ju).",
600			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
601			hio->hio_error = EINVAL;
602			goto end;
603		}
604		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
605			pjdlog_error("Offset %ju is not multiple of sector size.",
606			    (uintmax_t)hio->hio_offset);
607			hio->hio_error = EINVAL;
608			goto end;
609		}
610		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
611			pjdlog_error("Length %ju is not multiple of sector size.",
612			    (uintmax_t)hio->hio_length);
613			hio->hio_error = EINVAL;
614			goto end;
615		}
616		if (hio->hio_offset + hio->hio_length >
617		    (uint64_t)res->hr_datasize) {
618			pjdlog_error("Data offset is too large (%ju > %ju).",
619			    (uintmax_t)(hio->hio_offset + hio->hio_length),
620			    (uintmax_t)res->hr_datasize);
621			hio->hio_error = EINVAL;
622			goto end;
623		}
624		break;
625	default:
626		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
627		    hio->hio_cmd);
628		hio->hio_error = EINVAL;
629		goto end;
630	}
631	hio->hio_error = 0;
632end:
633	return (hio->hio_error);
634}
635
636static __dead2 void
637secondary_exit(int exitcode, const char *fmt, ...)
638{
639	va_list ap;
640
641	PJDLOG_ASSERT(exitcode != EX_OK);
642	va_start(ap, fmt);
643	pjdlogv_errno(LOG_ERR, fmt, ap);
644	va_end(ap);
645	event_send(gres, EVENT_DISCONNECT);
646	exit(exitcode);
647}
648
649/*
650 * Thread receives requests from the primary node.
651 */
652static void *
653recv_thread(void *arg)
654{
655	struct hast_resource *res = arg;
656	struct hio *hio, *mshio;
657	struct nv *nv;
658
659	for (;;) {
660		pjdlog_debug(2, "recv: Taking free request.");
661		QUEUE_TAKE(free, hio);
662		pjdlog_debug(2, "recv: (%p) Got request.", hio);
663		if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
664			secondary_exit(EX_TEMPFAIL,
665			    "Unable to receive request header");
666		}
667		if (requnpack(res, hio, nv) != 0) {
668			nv_free(nv);
669			pjdlog_debug(2,
670			    "recv: (%p) Moving request to the send queue.",
671			    hio);
672			QUEUE_INSERT(send, hio);
673			continue;
674		}
675		switch (hio->hio_cmd) {
676		case HIO_READ:
677			res->hr_stat_read++;
678			break;
679		case HIO_WRITE:
680			res->hr_stat_write++;
681			break;
682		case HIO_DELETE:
683			res->hr_stat_delete++;
684			break;
685		case HIO_FLUSH:
686			res->hr_stat_flush++;
687			break;
688		case HIO_KEEPALIVE:
689			break;
690		default:
691			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
692			    hio->hio_cmd);
693		}
694		reqlog(LOG_DEBUG, 2, -1, hio,
695		    "recv: (%p) Got request header: ", hio);
696		if (hio->hio_cmd == HIO_KEEPALIVE) {
697			nv_free(nv);
698			pjdlog_debug(2,
699			    "recv: (%p) Moving request to the free queue.",
700			    hio);
701			hio_clear(hio);
702			QUEUE_INSERT(free, hio);
703			continue;
704		} else if (hio->hio_cmd == HIO_WRITE) {
705			if (hast_proto_recv_data(res, res->hr_remotein, nv,
706			    hio->hio_data, MAXPHYS) == -1) {
707				secondary_exit(EX_TEMPFAIL,
708				    "Unable to receive request data");
709			}
710			if (hio->hio_memsync) {
711				/*
712				 * For memsync requests we expect two replies.
713				 * Clone the hio so we can handle both of them.
714				 */
715				pjdlog_debug(2, "recv: Taking free request.");
716				QUEUE_TAKE(free, mshio);
717				pjdlog_debug(2, "recv: (%p) Got request.",
718				    mshio);
719				hio_copy(hio, mshio);
720				mshio->hio_error = 0;
721				/*
722				 * We want to keep 'memsync' tag only on the
723				 * request going onto send queue (mshio).
724				 */
725				hio->hio_memsync = false;
726				pjdlog_debug(2,
727				    "recv: (%p) Moving memsync request to the send queue.",
728				    mshio);
729				QUEUE_INSERT(send, mshio);
730			}
731		}
732		nv_free(nv);
733		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
734		    hio);
735		QUEUE_INSERT(disk, hio);
736	}
737	/* NOTREACHED */
738	return (NULL);
739}
740
741/*
742 * Thread reads from or writes to local component and also handles DELETE and
743 * FLUSH requests.
744 */
745static void *
746disk_thread(void *arg)
747{
748	struct hast_resource *res = arg;
749	struct hio *hio;
750	ssize_t ret;
751	bool clear_activemap, logerror;
752
753	clear_activemap = true;
754
755	for (;;) {
756		pjdlog_debug(2, "disk: Taking request.");
757		QUEUE_TAKE(disk, hio);
758		while (clear_activemap) {
759			unsigned char *map;
760			size_t mapsize;
761
762			/*
763			 * When first request is received, it means that primary
764			 * already received our activemap, merged it and stored
765			 * locally. We can now safely clear our activemap.
766			 */
767			mapsize =
768			    activemap_calc_ondisk_size(res->hr_local_mediasize -
769			    METADATA_SIZE, res->hr_extentsize,
770			    res->hr_local_sectorsize);
771			map = calloc(1, mapsize);
772			if (map == NULL) {
773				pjdlog_warning("Unable to allocate memory to clear local activemap.");
774				break;
775			}
776			if (pwrite(res->hr_localfd, map, mapsize,
777			    METADATA_SIZE) != (ssize_t)mapsize) {
778				pjdlog_errno(LOG_WARNING,
779				    "Unable to store cleared activemap");
780				free(map);
781				res->hr_stat_activemap_write_error++;
782				break;
783			}
784			free(map);
785			clear_activemap = false;
786			pjdlog_debug(1, "Local activemap cleared.");
787			break;
788		}
789		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
790		logerror = true;
791		/* Handle the actual request. */
792		switch (hio->hio_cmd) {
793		case HIO_READ:
794			ret = pread(res->hr_localfd, hio->hio_data,
795			    hio->hio_length,
796			    hio->hio_offset + res->hr_localoff);
797			if (ret == -1)
798				hio->hio_error = errno;
799			else if (ret != (int64_t)hio->hio_length)
800				hio->hio_error = EIO;
801			else
802				hio->hio_error = 0;
803			break;
804		case HIO_WRITE:
805			ret = pwrite(res->hr_localfd, hio->hio_data,
806			    hio->hio_length,
807			    hio->hio_offset + res->hr_localoff);
808			if (ret == -1)
809				hio->hio_error = errno;
810			else if (ret != (int64_t)hio->hio_length)
811				hio->hio_error = EIO;
812			else
813				hio->hio_error = 0;
814			break;
815		case HIO_DELETE:
816			ret = g_delete(res->hr_localfd,
817			    hio->hio_offset + res->hr_localoff,
818			    hio->hio_length);
819			if (ret == -1)
820				hio->hio_error = errno;
821			else
822				hio->hio_error = 0;
823			break;
824		case HIO_FLUSH:
825			if (!res->hr_localflush) {
826				ret = -1;
827				hio->hio_error = EOPNOTSUPP;
828				logerror = false;
829				break;
830			}
831			ret = g_flush(res->hr_localfd);
832			if (ret == -1) {
833				if (errno == EOPNOTSUPP)
834					res->hr_localflush = false;
835				hio->hio_error = errno;
836			} else {
837				hio->hio_error = 0;
838			}
839			break;
840		default:
841			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
842			    hio->hio_cmd);
843		}
844		if (logerror && hio->hio_error != 0) {
845			reqlog(LOG_ERR, 0, hio->hio_error, hio,
846			    "Request failed: ");
847		}
848		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
849		    hio);
850		QUEUE_INSERT(send, hio);
851	}
852	/* NOTREACHED */
853	return (NULL);
854}
855
856/*
857 * Thread sends requests back to primary node.
858 */
859static void *
860send_thread(void *arg)
861{
862	struct hast_resource *res = arg;
863	struct nv *nvout;
864	struct hio *hio;
865	void *data;
866	size_t length;
867
868	for (;;) {
869		pjdlog_debug(2, "send: Taking request.");
870		QUEUE_TAKE(send, hio);
871		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
872		nvout = nv_alloc();
873		/* Copy sequence number. */
874		nv_add_uint64(nvout, hio->hio_seq, "seq");
875		if (hio->hio_memsync) {
876			PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
877			nv_add_int8(nvout, 1, "received");
878		}
879		switch (hio->hio_cmd) {
880		case HIO_READ:
881			if (hio->hio_error == 0) {
882				data = hio->hio_data;
883				length = hio->hio_length;
884				break;
885			}
886			/*
887			 * We send no data in case of an error.
888			 */
889			/* FALLTHROUGH */
890		case HIO_DELETE:
891		case HIO_FLUSH:
892		case HIO_WRITE:
893			data = NULL;
894			length = 0;
895			break;
896		default:
897			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
898			    hio->hio_cmd);
899		}
900		if (hio->hio_error != 0) {
901			switch (hio->hio_cmd) {
902			case HIO_READ:
903				res->hr_stat_read_error++;
904				break;
905			case HIO_WRITE:
906				res->hr_stat_write_error++;
907				break;
908			case HIO_DELETE:
909				res->hr_stat_delete_error++;
910				break;
911			case HIO_FLUSH:
912				res->hr_stat_flush_error++;
913				break;
914			}
915			nv_add_int16(nvout, hio->hio_error, "error");
916		}
917		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
918		    length) == -1) {
919			secondary_exit(EX_TEMPFAIL, "Unable to send reply");
920		}
921		nv_free(nvout);
922		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
923		    hio);
924		hio_clear(hio);
925		QUEUE_INSERT(free, hio);
926	}
927	/* NOTREACHED */
928	return (NULL);
929}
930