secondary.c revision 249236
12862Swollman/*-
22862Swollman * Copyright (c) 2009-2010 The FreeBSD Foundation
32862Swollman * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
42862Swollman * All rights reserved.
52862Swollman *
62862Swollman * This software was developed by Pawel Jakub Dawidek under sponsorship from
72862Swollman * the FreeBSD Foundation.
82862Swollman *
92862Swollman * Redistribution and use in source and binary forms, with or without
102862Swollman * modification, are permitted provided that the following conditions
112862Swollman * are met:
122862Swollman * 1. Redistributions of source code must retain the above copyright
132862Swollman *    notice, this list of conditions and the following disclaimer.
142862Swollman * 2. Redistributions in binary form must reproduce the above copyright
152862Swollman *    notice, this list of conditions and the following disclaimer in the
162862Swollman *    documentation and/or other materials provided with the distribution.
172862Swollman *
182862Swollman * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
192862Swollman * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
202862Swollman * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
212862Swollman * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
222862Swollman * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
232862Swollman * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
242862Swollman * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
252862Swollman * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
262862Swollman * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
272862Swollman * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
282862Swollman * SUCH DAMAGE.
292862Swollman */
302862Swollman
312862Swollman#include <sys/cdefs.h>
322862Swollman__FBSDID("$FreeBSD: stable/9/sbin/hastd/secondary.c 249236 2013-04-07 17:05:16Z trociny $");
332862Swollman
342862Swollman#include <sys/param.h>
3520420Ssteve#include <sys/time.h>
362862Swollman#include <sys/bio.h>
372862Swollman#include <sys/disk.h>
382862Swollman#include <sys/stat.h>
392862Swollman
402862Swollman#include <err.h>
4136009Scharnier#include <errno.h>
4220420Ssteve#include <fcntl.h>
4336009Scharnier#include <libgeom.h>
4436009Scharnier#include <pthread.h>
4550471Speter#include <signal.h>
462862Swollman#include <stdint.h>
472862Swollman#include <stdio.h>
482862Swollman#include <string.h>
492862Swollman#include <sysexits.h>
502862Swollman#include <unistd.h>
512862Swollman
522862Swollman#include <activemap.h>
532862Swollman#include <nv.h>
542862Swollman#include <pjdlog.h>
552862Swollman
5690108Simp#include "control.h"
5726402Scharnier#include "event.h"
582862Swollman#include "hast.h"
5990108Simp#include "hast_proto.h"
602862Swollman#include "hastd.h"
6126402Scharnier#include "hooks.h"
627165Sjoerg#include "metadata.h"
632862Swollman#include "proto.h"
6426402Scharnier#include "subr.h"
6526402Scharnier#include "synch.h"
6626402Scharnier
6726402Scharnierstruct hio {
6826402Scharnier	uint64_t	 hio_seq;
6926402Scharnier	int		 hio_error;
7026402Scharnier	void		*hio_data;
712862Swollman	uint8_t		 hio_cmd;
7226402Scharnier	uint64_t	 hio_offset;
7326402Scharnier	uint64_t	 hio_length;
7426402Scharnier	bool		 hio_memsync;
752862Swollman	TAILQ_ENTRY(hio) hio_next;
7676874Skris};
772862Swollman
782862Swollmanstatic struct hast_resource *gres;
7976874Skris
802862Swollman/*
812862Swollman * Free list holds unused structures. When free list is empty, we have to wait
822862Swollman * until some in-progress requests are freed.
832862Swollman */
842862Swollmanstatic TAILQ_HEAD(, hio) hio_free_list;
8526402Scharnierstatic pthread_mutex_t hio_free_list_lock;
8690108Simpstatic pthread_cond_t hio_free_list_cond;
8790108Simp/*
8826402Scharnier * Disk thread (the one that do I/O requests) takes requests from this list.
8926434Scharnier */
9026402Scharnierstatic TAILQ_HEAD(, hio) hio_disk_list;
9126402Scharnierstatic pthread_mutex_t hio_disk_list_lock;
92static pthread_cond_t hio_disk_list_cond;
93/*
94 * There is one recv list for every component, although local components don't
95 * use recv lists as local requests are done synchronously.
96 */
97static TAILQ_HEAD(, hio) hio_send_list;
98static pthread_mutex_t hio_send_list_lock;
99static pthread_cond_t hio_send_list_cond;
100
101/*
102 * Maximum number of outstanding I/O requests.
103 */
104#define	HAST_HIO_MAX	256
105
106static void *recv_thread(void *arg);
107static void *disk_thread(void *arg);
108static void *send_thread(void *arg);
109
110#define	QUEUE_INSERT(name, hio)	do {					\
111	bool _wakeup;							\
112									\
113	mtx_lock(&hio_##name##_list_lock);				\
114	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
115	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);		\
116	mtx_unlock(&hio_##name##_list_lock);				\
117	if (_wakeup)							\
118		cv_signal(&hio_##name##_list_cond);			\
119} while (0)
120#define	QUEUE_TAKE(name, hio)	do {					\
121	mtx_lock(&hio_##name##_list_lock);				\
122	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
123		cv_wait(&hio_##name##_list_cond,			\
124		    &hio_##name##_list_lock);				\
125	}								\
126	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);		\
127	mtx_unlock(&hio_##name##_list_lock);				\
128} while (0)
129
130static void
131hio_clear(struct hio *hio)
132{
133
134	hio->hio_seq = 0;
135	hio->hio_error = 0;
136	hio->hio_cmd = HIO_UNDEF;
137	hio->hio_offset = 0;
138	hio->hio_length = 0;
139	hio->hio_memsync = false;
140}
141
142static void
143hio_copy(const struct hio *srchio, struct hio *dsthio)
144{
145
146	/*
147	 * We don't copy hio_error, hio_data and hio_next fields.
148	 */
149
150	dsthio->hio_seq = srchio->hio_seq;
151	dsthio->hio_cmd = srchio->hio_cmd;
152	dsthio->hio_offset = srchio->hio_offset;
153	dsthio->hio_length = srchio->hio_length;
154	dsthio->hio_memsync = srchio->hio_memsync;
155}
156
157static void
158init_environment(void)
159{
160	struct hio *hio;
161	unsigned int ii;
162
163	/*
164	 * Initialize lists, their locks and theirs condition variables.
165	 */
166	TAILQ_INIT(&hio_free_list);
167	mtx_init(&hio_free_list_lock);
168	cv_init(&hio_free_list_cond);
169	TAILQ_INIT(&hio_disk_list);
170	mtx_init(&hio_disk_list_lock);
171	cv_init(&hio_disk_list_cond);
172	TAILQ_INIT(&hio_send_list);
173	mtx_init(&hio_send_list_lock);
174	cv_init(&hio_send_list_cond);
175
176	/*
177	 * Allocate requests pool and initialize requests.
178	 */
179	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
180		hio = malloc(sizeof(*hio));
181		if (hio == NULL) {
182			pjdlog_exitx(EX_TEMPFAIL,
183			    "Unable to allocate memory (%zu bytes) for hio request.",
184			    sizeof(*hio));
185		}
186		hio->hio_data = malloc(MAXPHYS);
187		if (hio->hio_data == NULL) {
188			pjdlog_exitx(EX_TEMPFAIL,
189			    "Unable to allocate memory (%zu bytes) for gctl_data.",
190			    (size_t)MAXPHYS);
191		}
192		hio_clear(hio);
193		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
194	}
195}
196
197static void
198init_local(struct hast_resource *res)
199{
200
201	if (metadata_read(res, true) == -1)
202		exit(EX_NOINPUT);
203}
204
205static void
206init_remote(struct hast_resource *res, struct nv *nvin)
207{
208	uint64_t resuid;
209	struct nv *nvout;
210	unsigned char *map;
211	size_t mapsize;
212
213#ifdef notyet
214	/* Setup direction. */
215	if (proto_send(res->hr_remoteout, NULL, 0) == -1)
216		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
217#endif
218
219	nvout = nv_alloc();
220	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
221	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
222	resuid = nv_get_uint64(nvin, "resuid");
223	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
224	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
225	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
226	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
227	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
228	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
229	map = malloc(mapsize);
230	if (map == NULL) {
231		pjdlog_exitx(EX_TEMPFAIL,
232		    "Unable to allocate memory (%zu bytes) for activemap.",
233		    mapsize);
234	}
235	/*
236	 * When we work as primary and secondary is missing we will increase
237	 * localcnt in our metadata. When secondary is connected and synced
238	 * we make localcnt be equal to remotecnt, which means nodes are more
239	 * or less in sync.
240	 * Split-brain condition is when both nodes are not able to communicate
241	 * and are both configured as primary nodes. In turn, they can both
242	 * make incompatible changes to the data and we have to detect that.
243	 * Under split-brain condition we will increase our localcnt on first
244	 * write and remote node will increase its localcnt on first write.
245	 * When we connect we can see that primary's localcnt is greater than
246	 * our remotecnt (primary was modified while we weren't watching) and
247	 * our localcnt is greater than primary's remotecnt (we were modified
248	 * while primary wasn't watching).
249	 * There are many possible combinations which are all gathered below.
250	 * Don't pay too much attention to exact numbers, the more important
251	 * is to compare them. We compare secondary's local with primary's
252	 * remote and secondary's remote with primary's local.
253	 * Note that every case where primary's localcnt is smaller than
254	 * secondary's remotecnt and where secondary's localcnt is smaller than
255	 * primary's remotecnt should be impossible in practise. We will perform
256	 * full synchronization then. Those cases are marked with an asterisk.
257	 * Regular synchronization means that only extents marked as dirty are
258	 * synchronized (regular synchronization).
259	 *
260	 * SECONDARY METADATA PRIMARY METADATA
261	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
262	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
263	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
264	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
265	 *                                       regular sync from secondary.
266	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
267	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
268	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
269	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
270	 *                                       regular sync from primary.
271	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
272	 */
273	if (res->hr_resuid == 0) {
274		/*
275		 * Provider is used for the first time. If primary node done no
276		 * writes yet as well (we will find "virgin" argument) then
277		 * there is no need to synchronize anything. If primary node
278		 * done any writes already we have to synchronize everything.
279		 */
280		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
281		res->hr_resuid = resuid;
282		if (metadata_write(res) == -1)
283			exit(EX_NOINPUT);
284		if (nv_exists(nvin, "virgin")) {
285			free(map);
286			map = NULL;
287			mapsize = 0;
288		} else {
289			memset(map, 0xff, mapsize);
290		}
291		nv_add_int8(nvout, 1, "virgin");
292		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
293	} else if (res->hr_resuid != resuid) {
294		char errmsg[256];
295
296		free(map);
297		(void)snprintf(errmsg, sizeof(errmsg),
298		    "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
299		    (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
300		pjdlog_error("%s", errmsg);
301		nv_add_string(nvout, errmsg, "errmsg");
302		if (hast_proto_send(res, res->hr_remotein, nvout,
303		    NULL, 0) == -1) {
304			pjdlog_exit(EX_TEMPFAIL,
305			    "Unable to send response to %s",
306			    res->hr_remoteaddr);
307		}
308		nv_free(nvout);
309		exit(EX_CONFIG);
310	} else if (
311	    /* Is primary out-of-date? */
312	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
313	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
314	    /* Are the nodes more or less in sync? */
315	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
316	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
317	    /* Is secondary out-of-date? */
318	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
319	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
320		/*
321		 * Nodes are more or less in sync or one of the nodes is
322		 * out-of-date.
323		 * It doesn't matter at this point which one, we just have to
324		 * send out local bitmap to the remote node.
325		 */
326		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
327		    (ssize_t)mapsize) {
328			pjdlog_exit(LOG_ERR, "Unable to read activemap");
329		}
330		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
331		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
332			/* Primary is out-of-date, sync from secondary. */
333			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
334		} else {
335			/*
336			 * Secondary is out-of-date or counts match.
337			 * Sync from primary.
338			 */
339			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
340		}
341	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
342	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
343		/*
344		 * Not good, we have split-brain condition.
345		 */
346		free(map);
347		pjdlog_error("Split-brain detected, exiting.");
348		nv_add_string(nvout, "Split-brain condition!", "errmsg");
349		if (hast_proto_send(res, res->hr_remotein, nvout,
350		    NULL, 0) == -1) {
351			pjdlog_exit(EX_TEMPFAIL,
352			    "Unable to send response to %s",
353			    res->hr_remoteaddr);
354		}
355		nv_free(nvout);
356		/* Exit on split-brain. */
357		event_send(res, EVENT_SPLITBRAIN);
358		exit(EX_CONFIG);
359	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
360	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
361		/*
362		 * This should never happen in practise, but we will perform
363		 * full synchronization.
364		 */
365		PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
366		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
367		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
368		    METADATA_SIZE, res->hr_extentsize,
369		    res->hr_local_sectorsize);
370		memset(map, 0xff, mapsize);
371		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
372			/* In this one of five cases sync from secondary. */
373			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
374		} else {
375			/* For the rest four cases sync from primary. */
376			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
377		}
378		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
379		    (uintmax_t)res->hr_primary_localcnt,
380		    (uintmax_t)res->hr_primary_remotecnt,
381		    (uintmax_t)res->hr_secondary_localcnt,
382		    (uintmax_t)res->hr_secondary_remotecnt);
383	}
384	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
385	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
386		pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
387		    res->hr_remoteaddr);
388	}
389	if (map != NULL)
390		free(map);
391	nv_free(nvout);
392#ifdef notyet
393	/* Setup direction. */
394	if (proto_recv(res->hr_remotein, NULL, 0) == -1)
395		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
396#endif
397}
398
399void
400hastd_secondary(struct hast_resource *res, struct nv *nvin)
401{
402	sigset_t mask;
403	pthread_t td;
404	pid_t pid;
405	int error, mode, debuglevel;
406
407	/*
408	 * Create communication channel between parent and child.
409	 */
410	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
411		KEEP_ERRNO((void)pidfile_remove(pfh));
412		pjdlog_exit(EX_OSERR,
413		    "Unable to create control sockets between parent and child");
414	}
415	/*
416	 * Create communication channel between child and parent.
417	 */
418	if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
419		KEEP_ERRNO((void)pidfile_remove(pfh));
420		pjdlog_exit(EX_OSERR,
421		    "Unable to create event sockets between child and parent");
422	}
423
424	pid = fork();
425	if (pid == -1) {
426		KEEP_ERRNO((void)pidfile_remove(pfh));
427		pjdlog_exit(EX_OSERR, "Unable to fork");
428	}
429
430	if (pid > 0) {
431		/* This is parent. */
432		proto_close(res->hr_remotein);
433		res->hr_remotein = NULL;
434		proto_close(res->hr_remoteout);
435		res->hr_remoteout = NULL;
436		/* Declare that we are receiver. */
437		proto_recv(res->hr_event, NULL, 0);
438		/* Declare that we are sender. */
439		proto_send(res->hr_ctrl, NULL, 0);
440		res->hr_workerpid = pid;
441		return;
442	}
443
444	gres = res;
445	mode = pjdlog_mode_get();
446	debuglevel = pjdlog_debug_get();
447
448	/* Declare that we are sender. */
449	proto_send(res->hr_event, NULL, 0);
450	/* Declare that we are receiver. */
451	proto_recv(res->hr_ctrl, NULL, 0);
452	descriptors_cleanup(res);
453
454	descriptors_assert(res, mode);
455
456	pjdlog_init(mode);
457	pjdlog_debug_set(debuglevel);
458	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
459	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
460
461	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
462	PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
463
464	/* Error in setting timeout is not critical, but why should it fail? */
465	if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
466		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
467	if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
468		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
469
470	init_local(res);
471	init_environment();
472
473	if (drop_privs(res) != 0)
474		exit(EX_CONFIG);
475	pjdlog_info("Privileges successfully dropped.");
476
477	/*
478	 * Create the control thread before sending any event to the parent,
479	 * as we can deadlock when parent sends control request to worker,
480	 * but worker has no control thread started yet, so parent waits.
481	 * In the meantime worker sends an event to the parent, but parent
482	 * is unable to handle the event, because it waits for control
483	 * request response.
484	 */
485	error = pthread_create(&td, NULL, ctrl_thread, res);
486	PJDLOG_ASSERT(error == 0);
487
488	init_remote(res, nvin);
489	event_send(res, EVENT_CONNECT);
490
491	error = pthread_create(&td, NULL, recv_thread, res);
492	PJDLOG_ASSERT(error == 0);
493	error = pthread_create(&td, NULL, disk_thread, res);
494	PJDLOG_ASSERT(error == 0);
495	(void)send_thread(res);
496}
497
498static void
499reqlog(int loglevel, int debuglevel, int error, struct hio *hio,
500    const char *fmt, ...)
501{
502	char msg[1024];
503	va_list ap;
504	int len;
505
506	va_start(ap, fmt);
507	len = vsnprintf(msg, sizeof(msg), fmt, ap);
508	va_end(ap);
509	if ((size_t)len < sizeof(msg)) {
510		switch (hio->hio_cmd) {
511		case HIO_READ:
512			(void)snprintf(msg + len, sizeof(msg) - len,
513			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
514			    (uintmax_t)hio->hio_length);
515			break;
516		case HIO_DELETE:
517			(void)snprintf(msg + len, sizeof(msg) - len,
518			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
519			    (uintmax_t)hio->hio_length);
520			break;
521		case HIO_FLUSH:
522			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
523			break;
524		case HIO_WRITE:
525			(void)snprintf(msg + len, sizeof(msg) - len,
526			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
527			    (uintmax_t)hio->hio_length);
528			break;
529		case HIO_KEEPALIVE:
530			(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
531			break;
532		default:
533			(void)snprintf(msg + len, sizeof(msg) - len,
534			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
535			break;
536		}
537	}
538	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
539}
540
541static int
542requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
543{
544
545	hio->hio_cmd = nv_get_uint8(nv, "cmd");
546	if (hio->hio_cmd == 0) {
547		pjdlog_error("Header contains no 'cmd' field.");
548		hio->hio_error = EINVAL;
549		goto end;
550	}
551	if (hio->hio_cmd != HIO_KEEPALIVE) {
552		hio->hio_seq = nv_get_uint64(nv, "seq");
553		if (hio->hio_seq == 0) {
554			pjdlog_error("Header contains no 'seq' field.");
555			hio->hio_error = EINVAL;
556			goto end;
557		}
558	}
559	switch (hio->hio_cmd) {
560	case HIO_FLUSH:
561	case HIO_KEEPALIVE:
562		break;
563	case HIO_WRITE:
564		hio->hio_memsync = nv_exists(nv, "memsync");
565		/* FALLTHROUGH */
566	case HIO_READ:
567	case HIO_DELETE:
568		hio->hio_offset = nv_get_uint64(nv, "offset");
569		if (nv_error(nv) != 0) {
570			pjdlog_error("Header is missing 'offset' field.");
571			hio->hio_error = EINVAL;
572			goto end;
573		}
574		hio->hio_length = nv_get_uint64(nv, "length");
575		if (nv_error(nv) != 0) {
576			pjdlog_error("Header is missing 'length' field.");
577			hio->hio_error = EINVAL;
578			goto end;
579		}
580		if (hio->hio_length == 0) {
581			pjdlog_error("Data length is zero.");
582			hio->hio_error = EINVAL;
583			goto end;
584		}
585		if (hio->hio_length > MAXPHYS) {
586			pjdlog_error("Data length is too large (%ju > %ju).",
587			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
588			hio->hio_error = EINVAL;
589			goto end;
590		}
591		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
592			pjdlog_error("Offset %ju is not multiple of sector size.",
593			    (uintmax_t)hio->hio_offset);
594			hio->hio_error = EINVAL;
595			goto end;
596		}
597		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
598			pjdlog_error("Length %ju is not multiple of sector size.",
599			    (uintmax_t)hio->hio_length);
600			hio->hio_error = EINVAL;
601			goto end;
602		}
603		if (hio->hio_offset + hio->hio_length >
604		    (uint64_t)res->hr_datasize) {
605			pjdlog_error("Data offset is too large (%ju > %ju).",
606			    (uintmax_t)(hio->hio_offset + hio->hio_length),
607			    (uintmax_t)res->hr_datasize);
608			hio->hio_error = EINVAL;
609			goto end;
610		}
611		break;
612	default:
613		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
614		    hio->hio_cmd);
615		hio->hio_error = EINVAL;
616		goto end;
617	}
618	hio->hio_error = 0;
619end:
620	return (hio->hio_error);
621}
622
623static __dead2 void
624secondary_exit(int exitcode, const char *fmt, ...)
625{
626	va_list ap;
627
628	PJDLOG_ASSERT(exitcode != EX_OK);
629	va_start(ap, fmt);
630	pjdlogv_errno(LOG_ERR, fmt, ap);
631	va_end(ap);
632	event_send(gres, EVENT_DISCONNECT);
633	exit(exitcode);
634}
635
636/*
637 * Thread receives requests from the primary node.
638 */
639static void *
640recv_thread(void *arg)
641{
642	struct hast_resource *res = arg;
643	struct hio *hio, *mshio;
644	struct nv *nv;
645
646	for (;;) {
647		pjdlog_debug(2, "recv: Taking free request.");
648		QUEUE_TAKE(free, hio);
649		pjdlog_debug(2, "recv: (%p) Got request.", hio);
650		if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
651			secondary_exit(EX_TEMPFAIL,
652			    "Unable to receive request header");
653		}
654		if (requnpack(res, hio, nv) != 0) {
655			nv_free(nv);
656			pjdlog_debug(2,
657			    "recv: (%p) Moving request to the send queue.",
658			    hio);
659			QUEUE_INSERT(send, hio);
660			continue;
661		}
662		switch (hio->hio_cmd) {
663		case HIO_READ:
664			res->hr_stat_read++;
665			break;
666		case HIO_WRITE:
667			res->hr_stat_write++;
668			break;
669		case HIO_DELETE:
670			res->hr_stat_delete++;
671			break;
672		case HIO_FLUSH:
673			res->hr_stat_flush++;
674			break;
675		case HIO_KEEPALIVE:
676			break;
677		default:
678			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
679			    hio->hio_cmd);
680		}
681		reqlog(LOG_DEBUG, 2, -1, hio,
682		    "recv: (%p) Got request header: ", hio);
683		if (hio->hio_cmd == HIO_KEEPALIVE) {
684			nv_free(nv);
685			pjdlog_debug(2,
686			    "recv: (%p) Moving request to the free queue.",
687			    hio);
688			hio_clear(hio);
689			QUEUE_INSERT(free, hio);
690			continue;
691		} else if (hio->hio_cmd == HIO_WRITE) {
692			if (hast_proto_recv_data(res, res->hr_remotein, nv,
693			    hio->hio_data, MAXPHYS) == -1) {
694				secondary_exit(EX_TEMPFAIL,
695				    "Unable to receive request data");
696			}
697			if (hio->hio_memsync) {
698				/*
699				 * For memsync requests we expect two replies.
700				 * Clone the hio so we can handle both of them.
701				 */
702				pjdlog_debug(2, "recv: Taking free request.");
703				QUEUE_TAKE(free, mshio);
704				pjdlog_debug(2, "recv: (%p) Got request.",
705				    mshio);
706				hio_copy(hio, mshio);
707				mshio->hio_error = 0;
708				/*
709				 * We want to keep 'memsync' tag only on the
710				 * request going onto send queue (mshio).
711				 */
712				hio->hio_memsync = false;
713				pjdlog_debug(2,
714				    "recv: (%p) Moving memsync request to the send queue.",
715				    mshio);
716				QUEUE_INSERT(send, mshio);
717			}
718		}
719		nv_free(nv);
720		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
721		    hio);
722		QUEUE_INSERT(disk, hio);
723	}
724	/* NOTREACHED */
725	return (NULL);
726}
727
728/*
729 * Thread reads from or writes to local component and also handles DELETE and
730 * FLUSH requests.
731 */
732static void *
733disk_thread(void *arg)
734{
735	struct hast_resource *res = arg;
736	struct hio *hio;
737	ssize_t ret;
738	bool clear_activemap, logerror;
739
740	clear_activemap = true;
741
742	for (;;) {
743		pjdlog_debug(2, "disk: Taking request.");
744		QUEUE_TAKE(disk, hio);
745		while (clear_activemap) {
746			unsigned char *map;
747			size_t mapsize;
748
749			/*
750			 * When first request is received, it means that primary
751			 * already received our activemap, merged it and stored
752			 * locally. We can now safely clear our activemap.
753			 */
754			mapsize =
755			    activemap_calc_ondisk_size(res->hr_local_mediasize -
756			    METADATA_SIZE, res->hr_extentsize,
757			    res->hr_local_sectorsize);
758			map = calloc(1, mapsize);
759			if (map == NULL) {
760				pjdlog_warning("Unable to allocate memory to clear local activemap.");
761				break;
762			}
763			if (pwrite(res->hr_localfd, map, mapsize,
764			    METADATA_SIZE) != (ssize_t)mapsize) {
765				pjdlog_errno(LOG_WARNING,
766				    "Unable to store cleared activemap");
767				free(map);
768				res->hr_stat_activemap_write_error++;
769				break;
770			}
771			free(map);
772			clear_activemap = false;
773			pjdlog_debug(1, "Local activemap cleared.");
774			break;
775		}
776		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
777		logerror = true;
778		/* Handle the actual request. */
779		switch (hio->hio_cmd) {
780		case HIO_READ:
781			ret = pread(res->hr_localfd, hio->hio_data,
782			    hio->hio_length,
783			    hio->hio_offset + res->hr_localoff);
784			if (ret == -1)
785				hio->hio_error = errno;
786			else if (ret != (int64_t)hio->hio_length)
787				hio->hio_error = EIO;
788			else
789				hio->hio_error = 0;
790			break;
791		case HIO_WRITE:
792			ret = pwrite(res->hr_localfd, hio->hio_data,
793			    hio->hio_length,
794			    hio->hio_offset + res->hr_localoff);
795			if (ret == -1)
796				hio->hio_error = errno;
797			else if (ret != (int64_t)hio->hio_length)
798				hio->hio_error = EIO;
799			else
800				hio->hio_error = 0;
801			break;
802		case HIO_DELETE:
803			ret = g_delete(res->hr_localfd,
804			    hio->hio_offset + res->hr_localoff,
805			    hio->hio_length);
806			if (ret == -1)
807				hio->hio_error = errno;
808			else
809				hio->hio_error = 0;
810			break;
811		case HIO_FLUSH:
812			if (!res->hr_localflush) {
813				ret = -1;
814				hio->hio_error = EOPNOTSUPP;
815				logerror = false;
816				break;
817			}
818			ret = g_flush(res->hr_localfd);
819			if (ret == -1) {
820				if (errno == EOPNOTSUPP)
821					res->hr_localflush = false;
822				hio->hio_error = errno;
823			} else {
824				hio->hio_error = 0;
825			}
826			break;
827		default:
828			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
829			    hio->hio_cmd);
830		}
831		if (logerror && hio->hio_error != 0) {
832			reqlog(LOG_ERR, 0, hio->hio_error, hio,
833			    "Request failed: ");
834		}
835		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
836		    hio);
837		QUEUE_INSERT(send, hio);
838	}
839	/* NOTREACHED */
840	return (NULL);
841}
842
843/*
844 * Thread sends requests back to primary node.
845 */
846static void *
847send_thread(void *arg)
848{
849	struct hast_resource *res = arg;
850	struct nv *nvout;
851	struct hio *hio;
852	void *data;
853	size_t length;
854
855	for (;;) {
856		pjdlog_debug(2, "send: Taking request.");
857		QUEUE_TAKE(send, hio);
858		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
859		nvout = nv_alloc();
860		/* Copy sequence number. */
861		nv_add_uint64(nvout, hio->hio_seq, "seq");
862		if (hio->hio_memsync) {
863			PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
864			nv_add_int8(nvout, 1, "received");
865		}
866		switch (hio->hio_cmd) {
867		case HIO_READ:
868			if (hio->hio_error == 0) {
869				data = hio->hio_data;
870				length = hio->hio_length;
871				break;
872			}
873			/*
874			 * We send no data in case of an error.
875			 */
876			/* FALLTHROUGH */
877		case HIO_DELETE:
878		case HIO_FLUSH:
879		case HIO_WRITE:
880			data = NULL;
881			length = 0;
882			break;
883		default:
884			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
885			    hio->hio_cmd);
886		}
887		if (hio->hio_error != 0) {
888			switch (hio->hio_cmd) {
889			case HIO_READ:
890				res->hr_stat_read_error++;
891				break;
892			case HIO_WRITE:
893				res->hr_stat_write_error++;
894				break;
895			case HIO_DELETE:
896				res->hr_stat_delete_error++;
897				break;
898			case HIO_FLUSH:
899				res->hr_stat_flush_error++;
900				break;
901			}
902			nv_add_int16(nvout, hio->hio_error, "error");
903		}
904		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
905		    length) == -1) {
906			secondary_exit(EX_TEMPFAIL, "Unable to send reply");
907		}
908		nv_free(nvout);
909		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
910		    hio);
911		hio_clear(hio);
912		QUEUE_INSERT(free, hio);
913	}
914	/* NOTREACHED */
915	return (NULL);
916}
917