secondary.c revision 220271
164921Smarcel/*-
264921Smarcel * Copyright (c) 2009-2010 The FreeBSD Foundation
364921Smarcel * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
464921Smarcel * All rights reserved.
564921Smarcel *
664921Smarcel * This software was developed by Pawel Jakub Dawidek under sponsorship from
764921Smarcel * the FreeBSD Foundation.
864921Smarcel *
9111798Sdes * Redistribution and use in source and binary forms, with or without
1064921Smarcel * modification, are permitted provided that the following conditions
1164921Smarcel * are met:
1264921Smarcel * 1. Redistributions of source code must retain the above copyright
1364921Smarcel *    notice, this list of conditions and the following disclaimer.
1464921Smarcel * 2. Redistributions in binary form must reproduce the above copyright
1565067Smarcel *    notice, this list of conditions and the following disclaimer in the
1664921Smarcel *    documentation and/or other materials provided with the distribution.
1764921Smarcel *
1864921Smarcel * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
1964921Smarcel * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
2064921Smarcel * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
2164921Smarcel * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
2264921Smarcel * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
2364921Smarcel * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2464921Smarcel * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2564921Smarcel * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2664921Smarcel * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2764921Smarcel * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2864921Smarcel * SUCH DAMAGE.
29115705Sobrien */
30115705Sobrien
31115705Sobrien#include <sys/cdefs.h>
3264921Smarcel__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 220271 2011-04-02 09:25:13Z pjd $");
3376166Smarkm
34224778Srwatson#include <sys/param.h>
35162472Snetchild#include <sys/time.h>
36162472Snetchild#include <sys/bio.h>
37140992Ssobomax#include <sys/disk.h>
3884811Sjhb#include <sys/stat.h>
39140992Ssobomax
4064921Smarcel#include <err.h>
4176166Smarkm#include <errno.h>
42161310Snetchild#include <fcntl.h>
43164033Srwatson#include <libgeom.h>
4464921Smarcel#include <pthread.h>
45161310Snetchild#include <signal.h>
4676166Smarkm#include <stdint.h>
4776166Smarkm#include <stdio.h>
48134838Sdfr#include <string.h>
49102814Siedowse#include <sysexits.h>
5064921Smarcel#include <unistd.h>
5164921Smarcel
52161310Snetchild#include <activemap.h>
53166188Sjeff#include <nv.h>
5464921Smarcel#include <pjdlog.h>
5564921Smarcel
5664921Smarcel#include "control.h"
5764921Smarcel#include "event.h"
5864921Smarcel#include "hast.h"
5964921Smarcel#include "hast_proto.h"
6067238Sgallatin#include "hastd.h"
6167238Sgallatin#include "hooks.h"
6267238Sgallatin#include "metadata.h"
6367238Sgallatin#include "proto.h"
6464921Smarcel#include "subr.h"
6568583Smarcel#include "synch.h"
6664921Smarcel
67218030Sdchaginstruct hio {
6864921Smarcel	uint64_t	 hio_seq;
6964921Smarcel	int		 hio_error;
70161310Snetchild	struct nv	*hio_nv;
7164921Smarcel	void		*hio_data;
72161310Snetchild	uint8_t		 hio_cmd;
73161310Snetchild	uint64_t	 hio_offset;
74161310Snetchild	uint64_t	 hio_length;
75161310Snetchild	TAILQ_ENTRY(hio) hio_next;
76161310Snetchild};
77161310Snetchild
7883221Smarcelstatic struct hast_resource *gres;
7983221Smarcel
8083221Smarcel/*
8183221Smarcel * Free list holds unused structures. When free list is empty, we have to wait
8283221Smarcel * until some in-progress requests are freed.
8383221Smarcel */
8483221Smarcelstatic TAILQ_HEAD(, hio) hio_free_list;
8583221Smarcelstatic pthread_mutex_t hio_free_list_lock;
8683221Smarcelstatic pthread_cond_t hio_free_list_cond;
8783221Smarcel/*
8864921Smarcel * Disk thread (the one that do I/O requests) takes requests from this list.
8964921Smarcel */
9083221Smarcelstatic TAILQ_HEAD(, hio) hio_disk_list;
9183221Smarcelstatic pthread_mutex_t hio_disk_list_lock;
9283221Smarcelstatic pthread_cond_t hio_disk_list_cond;
9383221Smarcel/*
9483221Smarcel * There is one recv list for every component, although local components don't
9583221Smarcel * use recv lists as local requests are done synchronously.
9664921Smarcel */
9764921Smarcelstatic TAILQ_HEAD(, hio) hio_send_list;
98198554Sjhbstatic pthread_mutex_t hio_send_list_lock;
99198554Sjhbstatic pthread_cond_t hio_send_list_cond;
100198554Sjhb
101198554Sjhb/*
10264921Smarcel * Maximum number of outstanding I/O requests.
10367051Sgallatin */
10467051Sgallatin#define	HAST_HIO_MAX	256
10567051Sgallatin
10667051Sgallatinstatic void *recv_thread(void *arg);
10767051Sgallatinstatic void *disk_thread(void *arg);
10867051Sgallatinstatic void *send_thread(void *arg);
10967051Sgallatin
11067051Sgallatin#define	QUEUE_INSERT(name, hio)	do {					\
11167051Sgallatin	bool _wakeup;							\
11267051Sgallatin									\
11367051Sgallatin	mtx_lock(&hio_##name##_list_lock);				\
11467051Sgallatin	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
11567051Sgallatin	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);		\
11667051Sgallatin	mtx_unlock(&hio_##name##_list_lock);				\
11767051Sgallatin	if (_wakeup)							\
11867051Sgallatin		cv_signal(&hio_##name##_list_cond);			\
11967051Sgallatin} while (0)
12067051Sgallatin#define	QUEUE_TAKE(name, hio)	do {					\
12167051Sgallatin	mtx_lock(&hio_##name##_list_lock);				\
12267051Sgallatin	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
12367051Sgallatin		cv_wait(&hio_##name##_list_cond,			\
12467051Sgallatin		    &hio_##name##_list_lock);				\
12567051Sgallatin	}								\
12667051Sgallatin	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);		\
12783366Sjulian	mtx_unlock(&hio_##name##_list_lock);				\
12864921Smarcel} while (0)
129140992Ssobomax
130140992Ssobomaxstatic void
131140992Ssobomaxinit_environment(void)
13264921Smarcel{
133141468Sjhb	struct hio *hio;
13464921Smarcel	unsigned int ii;
13564921Smarcel
13672543Sjlemon	/*
137140992Ssobomax	 * Initialize lists, their locks and theirs condition variables.
13864921Smarcel	 */
13964921Smarcel	TAILQ_INIT(&hio_free_list);
140140992Ssobomax	mtx_init(&hio_free_list_lock);
141140992Ssobomax	cv_init(&hio_free_list_cond);
142140992Ssobomax	TAILQ_INIT(&hio_disk_list);
143140992Ssobomax	mtx_init(&hio_disk_list_lock);
144148623Ssobomax	cv_init(&hio_disk_list_cond);
145161310Snetchild	TAILQ_INIT(&hio_send_list);
146161310Snetchild	mtx_init(&hio_send_list_lock);
147161310Snetchild	cv_init(&hio_send_list_cond);
148161310Snetchild
149161310Snetchild	/*
150161310Snetchild	 * Allocate requests pool and initialize requests.
151217896Sdchagin	 */
152217896Sdchagin	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
153140992Ssobomax		hio = malloc(sizeof(*hio));
15464921Smarcel		if (hio == NULL) {
15564921Smarcel			pjdlog_exitx(EX_TEMPFAIL,
15683221Smarcel			    "Unable to allocate memory (%zu bytes) for hio request.",
15783221Smarcel			    sizeof(*hio));
15883221Smarcel		}
15983221Smarcel		hio->hio_error = 0;
16083221Smarcel		hio->hio_data = malloc(MAXPHYS);
16164921Smarcel		if (hio->hio_data == NULL) {
16283366Sjulian			pjdlog_exitx(EX_TEMPFAIL,
16364921Smarcel			    "Unable to allocate memory (%zu bytes) for gctl_data.",
16483221Smarcel			    (size_t)MAXPHYS);
16583221Smarcel		}
16683221Smarcel		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
16783221Smarcel	}
16883221Smarcel}
16983221Smarcel
17083221Smarcelstatic void
17183221Smarcelinit_local(struct hast_resource *res)
17283366Sjulian{
17364921Smarcel
17483221Smarcel	if (metadata_read(res, true) < 0)
17583221Smarcel		exit(EX_NOINPUT);
17664921Smarcel}
17783221Smarcel
17883221Smarcelstatic void
17983221Smarcelinit_remote(struct hast_resource *res, struct nv *nvin)
18083366Sjulian{
18183221Smarcel	uint64_t resuid;
18283221Smarcel	struct nv *nvout;
18383221Smarcel	unsigned char *map;
18483221Smarcel	size_t mapsize;
18583221Smarcel
18683221Smarcel	/* Setup direction. */
18783221Smarcel	if (proto_send(res->hr_remoteout, NULL, 0) == -1)
18883221Smarcel		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
189111797Sdes
19083221Smarcel	map = NULL;
19183221Smarcel	mapsize = 0;
19283366Sjulian	nvout = nv_alloc();
19383221Smarcel	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
19483221Smarcel	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
19583221Smarcel	resuid = nv_get_uint64(nvin, "resuid");
19683221Smarcel	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
19783221Smarcel	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
19883221Smarcel	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
19983221Smarcel	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
20083221Smarcel	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
20183366Sjulian	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
20283221Smarcel	map = malloc(mapsize);
20383221Smarcel	if (map == NULL) {
20483221Smarcel		pjdlog_exitx(EX_TEMPFAIL,
20583221Smarcel		    "Unable to allocate memory (%zu bytes) for activemap.",
20683221Smarcel		    mapsize);
20783221Smarcel	}
20883221Smarcel	/*
20983221Smarcel	 * When we work as primary and secondary is missing we will increase
21083221Smarcel	 * localcnt in our metadata. When secondary is connected and synced
21183221Smarcel	 * we make localcnt be equal to remotecnt, which means nodes are more
21283221Smarcel	 * or less in sync.
21383221Smarcel	 * Split-brain condition is when both nodes are not able to communicate
21483221Smarcel	 * and are both configured as primary nodes. In turn, they can both
215111797Sdes	 * make incompatible changes to the data and we have to detect that.
21683221Smarcel	 * Under split-brain condition we will increase our localcnt on first
21783221Smarcel	 * write and remote node will increase its localcnt on first write.
21883221Smarcel	 * When we connect we can see that primary's localcnt is greater than
21983221Smarcel	 * our remotecnt (primary was modified while we weren't watching) and
22083221Smarcel	 * our localcnt is greater than primary's remotecnt (we were modified
22183221Smarcel	 * while primary wasn't watching).
22283221Smarcel	 * There are many possible combinations which are all gathered below.
22383221Smarcel	 * Don't pay too much attention to exact numbers, the more important
22483366Sjulian	 * is to compare them. We compare secondary's local with primary's
22583221Smarcel	 * remote and secondary's remote with primary's local.
22683221Smarcel	 * Note that every case where primary's localcnt is smaller than
22783221Smarcel	 * secondary's remotecnt and where secondary's localcnt is smaller than
22883221Smarcel	 * primary's remotecnt should be impossible in practise. We will perform
22983221Smarcel	 * full synchronization then. Those cases are marked with an asterisk.
23083221Smarcel	 * Regular synchronization means that only extents marked as dirty are
23183366Sjulian	 * synchronized (regular synchronization).
23283221Smarcel	 *
23383221Smarcel	 * SECONDARY METADATA PRIMARY METADATA
23483221Smarcel	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
23583221Smarcel	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
23683221Smarcel	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
23783221Smarcel	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
23883221Smarcel	 *                                       regular sync from secondary.
23983366Sjulian	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
24083221Smarcel	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
24183221Smarcel	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
24283221Smarcel	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
24383221Smarcel	 *                                       regular sync from primary.
24483221Smarcel	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
24583221Smarcel	 */
24683221Smarcel	if (res->hr_resuid == 0) {
24783221Smarcel		/*
24883366Sjulian		 * Provider is used for the first time. If primary node done no
24983221Smarcel		 * writes yet as well (we will find "virgin" argument) then
25083221Smarcel		 * there is no need to synchronize anything. If primary node
25183221Smarcel		 * done any writes already we have to synchronize everything.
25283221Smarcel		 */
25383221Smarcel		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
25483366Sjulian		res->hr_resuid = resuid;
25583221Smarcel		if (metadata_write(res) < 0)
25683221Smarcel			exit(EX_NOINPUT);
25783221Smarcel		if (nv_exists(nvin, "virgin")) {
25883221Smarcel			free(map);
25983221Smarcel			map = NULL;
26083221Smarcel			mapsize = 0;
26183221Smarcel		} else {
26283366Sjulian			memset(map, 0xff, mapsize);
26383221Smarcel		}
26483221Smarcel		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
26583221Smarcel	} else if (res->hr_resuid != resuid) {
26683221Smarcel		char errmsg[256];
26783221Smarcel
26883221Smarcel		(void)snprintf(errmsg, sizeof(errmsg),
26983221Smarcel		    "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
27083366Sjulian		    (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
27183221Smarcel		pjdlog_error("%s", errmsg);
27283221Smarcel		nv_add_string(nvout, errmsg, "errmsg");
27383221Smarcel		if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) {
27483221Smarcel			pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s",
27583221Smarcel			    res->hr_remoteaddr);
27683221Smarcel		}
27764921Smarcel		nv_free(nvout);
27864921Smarcel		exit(EX_CONFIG);
27964921Smarcel	} else if (
28083366Sjulian	    /* Is primary is out-of-date? */
28164921Smarcel	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
28283221Smarcel	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
28383221Smarcel	    /* Nodes are more or less in sync? */
28464921Smarcel	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
28564921Smarcel	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
28683221Smarcel	    /* Is secondary is out-of-date? */
28783221Smarcel	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
28891437Speter	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
28964921Smarcel		/*
29064921Smarcel		 * Nodes are more or less in sync or one of the nodes is
291111797Sdes		 * out-of-date.
29264921Smarcel		 * It doesn't matter at this point which one, we just have to
29364921Smarcel		 * send out local bitmap to the remote node.
29464921Smarcel		 */
29564921Smarcel		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
29664921Smarcel		    (ssize_t)mapsize) {
29764921Smarcel			pjdlog_exit(LOG_ERR, "Unable to read activemap");
29864921Smarcel		}
29964921Smarcel		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
30083366Sjulian		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
30164921Smarcel			/* Primary is out-of-date, sync from secondary. */
30264921Smarcel			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
30364921Smarcel		} else {
304218612Sdchagin			/*
305218612Sdchagin			 * Secondary is out-of-date or counts match.
306218612Sdchagin			 * Sync from primary.
307218612Sdchagin			 */
308218612Sdchagin			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
309218612Sdchagin		}
310218612Sdchagin	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
311218612Sdchagin	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
312218612Sdchagin		/*
313218612Sdchagin		 * Not good, we have split-brain condition.
314218612Sdchagin		 */
315218612Sdchagin		pjdlog_error("Split-brain detected, exiting.");
316218612Sdchagin		nv_add_string(nvout, "Split-brain condition!", "errmsg");
317218612Sdchagin		free(map);
318218612Sdchagin		map = NULL;
319218612Sdchagin		mapsize = 0;
320218612Sdchagin	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
321218612Sdchagin	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
322218612Sdchagin		/*
323218612Sdchagin		 * This should never happen in practise, but we will perform
324218612Sdchagin		 * full synchronization.
325218612Sdchagin		 */
326218612Sdchagin		PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
327218612Sdchagin		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
328218612Sdchagin		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
329218612Sdchagin		    METADATA_SIZE, res->hr_extentsize,
330218612Sdchagin		    res->hr_local_sectorsize);
331218612Sdchagin		memset(map, 0xff, mapsize);
332218612Sdchagin		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
333218612Sdchagin			/* In this one of five cases sync from secondary. */
334218612Sdchagin			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
335218612Sdchagin		} else {
336218612Sdchagin			/* For the rest four cases sync from primary. */
337218612Sdchagin			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
338218612Sdchagin		}
339218612Sdchagin		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
340218612Sdchagin		    (uintmax_t)res->hr_primary_localcnt,
341218612Sdchagin		    (uintmax_t)res->hr_primary_remotecnt,
342218612Sdchagin		    (uintmax_t)res->hr_secondary_localcnt,
343218612Sdchagin		    (uintmax_t)res->hr_secondary_remotecnt);
344218612Sdchagin	}
345218612Sdchagin	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
346218612Sdchagin	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) {
347218612Sdchagin		pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
348218612Sdchagin		    res->hr_remoteaddr);
349218612Sdchagin	}
350218612Sdchagin	if (map != NULL)
351218612Sdchagin		free(map);
352218612Sdchagin	nv_free(nvout);
353218612Sdchagin	/* Setup direction. */
354218612Sdchagin	if (proto_recv(res->hr_remotein, NULL, 0) == -1)
355218612Sdchagin		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
356218612Sdchagin	if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
357218612Sdchagin	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
358218612Sdchagin		/* Exit on split-brain. */
35964921Smarcel		event_send(res, EVENT_SPLITBRAIN);
360218613Sdchagin		exit(EX_CONFIG);
361218613Sdchagin	}
362218613Sdchagin}
363218613Sdchagin
364218613Sdchaginvoid
365218613Sdchaginhastd_secondary(struct hast_resource *res, struct nv *nvin)
366218613Sdchagin{
367218613Sdchagin	sigset_t mask;
36864921Smarcel	pthread_t td;
36964921Smarcel	pid_t pid;
37064921Smarcel	int error, mode, debuglevel;
37164921Smarcel
372104893Ssobomax	/*
373104893Ssobomax	 * Create communication channel between parent and child.
374104893Ssobomax	 */
375104893Ssobomax	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) {
376104893Ssobomax		KEEP_ERRNO((void)pidfile_remove(pfh));
377111798Sdes		pjdlog_exit(EX_OSERR,
378111798Sdes		    "Unable to create control sockets between parent and child");
379111798Sdes	}
380104893Ssobomax	/*
381104893Ssobomax	 * Create communication channel between child and parent.
382198554Sjhb	 */
383198554Sjhb	if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) {
384198554Sjhb		KEEP_ERRNO((void)pidfile_remove(pfh));
385104893Ssobomax		pjdlog_exit(EX_OSERR,
386104893Ssobomax		    "Unable to create event sockets between child and parent");
387104893Ssobomax	}
38883366Sjulian
38964921Smarcel	pid = fork();
39064921Smarcel	if (pid < 0) {
39183221Smarcel		KEEP_ERRNO((void)pidfile_remove(pfh));
39264921Smarcel		pjdlog_exit(EX_OSERR, "Unable to fork");
393111797Sdes	}
39464921Smarcel
39564921Smarcel	if (pid > 0) {
39664921Smarcel		/* This is parent. */
39764921Smarcel		proto_close(res->hr_remotein);
39872543Sjlemon		res->hr_remotein = NULL;
39972543Sjlemon		proto_close(res->hr_remoteout);
400104984Sbde		res->hr_remoteout = NULL;
401166727Sjkim		/* Declare that we are receiver. */
40264921Smarcel		proto_recv(res->hr_event, NULL, 0);
40364921Smarcel		/* Declare that we are sender. */
404198554Sjhb		proto_send(res->hr_ctrl, NULL, 0);
405198554Sjhb		res->hr_workerpid = pid;
406198554Sjhb		return;
407104893Ssobomax	}
408104893Ssobomax
409104893Ssobomax	gres = res;
410198554Sjhb	mode = pjdlog_mode_get();
411198554Sjhb	debuglevel = pjdlog_debug_get();
412104893Ssobomax
413104893Ssobomax	/* Declare that we are sender. */
414104893Ssobomax	proto_send(res->hr_event, NULL, 0);
415104893Ssobomax	/* Declare that we are receiver. */
416104893Ssobomax	proto_recv(res->hr_ctrl, NULL, 0);
417104893Ssobomax	descriptors_cleanup(res);
418104893Ssobomax
419104893Ssobomax	descriptors_assert(res, mode);
420104893Ssobomax
421104893Ssobomax	pjdlog_init(mode);
422104893Ssobomax	pjdlog_debug_set(debuglevel);
423112630Smdodd	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
424162472Snetchild	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
425255219Spjd
426104893Ssobomax	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
427112630Smdodd	PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
42864921Smarcel
429162472Snetchild	/* Error in setting timeout is not critical, but why should it fail? */
430162472Snetchild	if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) < 0)
431162472Snetchild		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
432162472Snetchild	if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0)
433162472Snetchild		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
434162472Snetchild
435198554Sjhb	init_local(res);
436162479Snetchild	init_environment();
437162472Snetchild
438198554Sjhb	if (drop_privs(true) != 0)
43964921Smarcel		exit(EX_CONFIG);
440198554Sjhb	pjdlog_info("Privileges successfully dropped.");
44164921Smarcel
442198554Sjhb	/*
44364921Smarcel	 * Create the control thread before sending any event to the parent,
444208994Skan	 * as we can deadlock when parent sends control request to worker,
445208994Skan	 * but worker has no control thread started yet, so parent waits.
446208994Skan	 * In the meantime worker sends an event to the parent, but parent
447208994Skan	 * is unable to handle the event, because it waits for control
448208994Skan	 * request response.
44964921Smarcel	 */
450208994Skan	error = pthread_create(&td, NULL, ctrl_thread, res);
45173213Sdillon	PJDLOG_ASSERT(error == 0);
452198554Sjhb
45364921Smarcel	init_remote(res, nvin);
45464921Smarcel	event_send(res, EVENT_CONNECT);
455166727Sjkim
456166727Sjkim	error = pthread_create(&td, NULL, recv_thread, res);
457166727Sjkim	PJDLOG_ASSERT(error == 0);
458166727Sjkim	error = pthread_create(&td, NULL, disk_thread, res);
459166727Sjkim	PJDLOG_ASSERT(error == 0);
460198554Sjhb	(void)send_thread(res);
461166727Sjkim}
462166727Sjkim
463166727Sjkimstatic void
464167048Sjkimreqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...)
465198554Sjhb{
466167048Sjkim	char msg[1024];
467166727Sjkim	va_list ap;
468166727Sjkim	int len;
469166727Sjkim
470166727Sjkim	va_start(ap, fmt);
471166727Sjkim	len = vsnprintf(msg, sizeof(msg), fmt, ap);
472224778Srwatson	va_end(ap);
473224778Srwatson	if ((size_t)len < sizeof(msg)) {
474224778Srwatson		switch (hio->hio_cmd) {
475166727Sjkim		case HIO_READ:
476166727Sjkim			(void)snprintf(msg + len, sizeof(msg) - len,
477255219Spjd			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
478255219Spjd			    (uintmax_t)hio->hio_length);
479255219Spjd			break;
480166727Sjkim		case HIO_DELETE:
481166727Sjkim			(void)snprintf(msg + len, sizeof(msg) - len,
482166727Sjkim			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
483166727Sjkim			    (uintmax_t)hio->hio_length);
484166727Sjkim			break;
485166727Sjkim		case HIO_FLUSH:
486166727Sjkim			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
487166727Sjkim			break;
488166727Sjkim		case HIO_WRITE:
489166727Sjkim			(void)snprintf(msg + len, sizeof(msg) - len,
490166727Sjkim			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
491166727Sjkim			    (uintmax_t)hio->hio_length);
492166727Sjkim			break;
493166727Sjkim		case HIO_KEEPALIVE:
494166727Sjkim			(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
495198554Sjhb			break;
496161365Snetchild		default:
497198554Sjhb			(void)snprintf(msg + len, sizeof(msg) - len,
49864921Smarcel			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
49964921Smarcel			break;
50064921Smarcel		}
50164921Smarcel	}
50264921Smarcel	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
50364921Smarcel}
50464921Smarcel
50564921Smarcelstatic int
50664921Smarcelrequnpack(struct hast_resource *res, struct hio *hio)
50764921Smarcel{
50864921Smarcel
50964921Smarcel	hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd");
510166944Snetchild	if (hio->hio_cmd == 0) {
51164921Smarcel		pjdlog_error("Header contains no 'cmd' field.");
51264921Smarcel		hio->hio_error = EINVAL;
51364921Smarcel		goto end;
51464921Smarcel	}
51564921Smarcel	switch (hio->hio_cmd) {
51664921Smarcel	case HIO_KEEPALIVE:
51764921Smarcel		break;
518198554Sjhb	case HIO_READ:
519161365Snetchild	case HIO_WRITE:
520161365Snetchild	case HIO_DELETE:
52167238Sgallatin		hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset");
52267238Sgallatin		if (nv_error(hio->hio_nv) != 0) {
52367238Sgallatin			pjdlog_error("Header is missing 'offset' field.");
52467238Sgallatin			hio->hio_error = EINVAL;
52567238Sgallatin			goto end;
52667238Sgallatin		}
52767238Sgallatin		hio->hio_length = nv_get_uint64(hio->hio_nv, "length");
52867238Sgallatin		if (nv_error(hio->hio_nv) != 0) {
52967238Sgallatin			pjdlog_error("Header is missing 'length' field.");
53067238Sgallatin			hio->hio_error = EINVAL;
53167238Sgallatin			goto end;
53267238Sgallatin		}
53367238Sgallatin		if (hio->hio_length == 0) {
53467238Sgallatin			pjdlog_error("Data length is zero.");
535125454Sjhb			hio->hio_error = EINVAL;
53667238Sgallatin			goto end;
537125454Sjhb		}
538125454Sjhb		if (hio->hio_length > MAXPHYS) {
53967238Sgallatin			pjdlog_error("Data length is too large (%ju > %ju).",
54067238Sgallatin			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
541176193Sjkim			hio->hio_error = EINVAL;
542176193Sjkim			goto end;
543176193Sjkim		}
544176193Sjkim		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
545176193Sjkim			pjdlog_error("Offset %ju is not multiple of sector size.",
546176193Sjkim			    (uintmax_t)hio->hio_offset);
547176193Sjkim			hio->hio_error = EINVAL;
548176193Sjkim			goto end;
549198554Sjhb		}
550198554Sjhb		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
551198554Sjhb			pjdlog_error("Length %ju is not multiple of sector size.",
552176193Sjkim			    (uintmax_t)hio->hio_length);
553198554Sjhb			hio->hio_error = EINVAL;
554198554Sjhb			goto end;
555176193Sjkim		}
556176193Sjkim		if (hio->hio_offset + hio->hio_length >
55764921Smarcel		    (uint64_t)res->hr_datasize) {
558198554Sjhb			pjdlog_error("Data offset is too large (%ju > %ju).",
559198554Sjhb			    (uintmax_t)(hio->hio_offset + hio->hio_length),
56064921Smarcel			    (uintmax_t)res->hr_datasize);
561198554Sjhb			hio->hio_error = EINVAL;
56264921Smarcel			goto end;
56364921Smarcel		}
56472543Sjlemon		break;
565112630Smdodd	default:
566112630Smdodd		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
56772543Sjlemon		    hio->hio_cmd);
56872543Sjlemon		hio->hio_error = EINVAL;
56964921Smarcel		goto end;
570225617Skmacy	}
571112630Smdodd	hio->hio_error = 0;
572112630Smdoddend:
573112630Smdodd	return (hio->hio_error);
574112630Smdodd}
575112630Smdodd
576112630Smdoddstatic __dead2 void
57764921Smarcelsecondary_exit(int exitcode, const char *fmt, ...)
57864921Smarcel{
57964921Smarcel	va_list ap;
580166727Sjkim
581166727Sjkim	PJDLOG_ASSERT(exitcode != EX_OK);
582166727Sjkim	va_start(ap, fmt);
583166727Sjkim	pjdlogv_errno(LOG_ERR, fmt, ap);
584166727Sjkim	va_end(ap);
585166727Sjkim	event_send(gres, EVENT_DISCONNECT);
586166727Sjkim	exit(exitcode);
587166727Sjkim}
588166727Sjkim
589225617Skmacy/*
590166727Sjkim * Thread receives requests from the primary node.
591166727Sjkim */
592166727Sjkimstatic void *
59383366Sjulianrecv_thread(void *arg)
59464921Smarcel{
595140862Ssobomax	struct hast_resource *res = arg;
596140862Ssobomax	struct hio *hio;
59764921Smarcel
598140862Ssobomax	for (;;) {
599140862Ssobomax		pjdlog_debug(2, "recv: Taking free request.");
600140862Ssobomax		QUEUE_TAKE(free, hio);
601140862Ssobomax		pjdlog_debug(2, "recv: (%p) Got request.", hio);
602140862Ssobomax		if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) {
60364921Smarcel			secondary_exit(EX_TEMPFAIL,
60464921Smarcel			    "Unable to receive request header");
60564921Smarcel		}
60683366Sjulian		if (requnpack(res, hio) != 0) {
60764921Smarcel			pjdlog_debug(2,
60864921Smarcel			    "recv: (%p) Moving request to the send queue.",
60964921Smarcel			    hio);
61064921Smarcel			QUEUE_INSERT(send, hio);
61164921Smarcel			continue;
612164033Srwatson		}
61364921Smarcel		reqlog(LOG_DEBUG, 2, -1, hio,
61491406Sjhb		    "recv: (%p) Got request header: ", hio);
61583981Srwatson		if (hio->hio_cmd == HIO_KEEPALIVE) {
61683366Sjulian			pjdlog_debug(2,
61764921Smarcel			    "recv: (%p) Moving request to the free queue.",
61864921Smarcel			    hio);
61964921Smarcel			nv_free(hio->hio_nv);
62064921Smarcel			QUEUE_INSERT(free, hio);
62164921Smarcel			continue;
622105441Smarkm		} else if (hio->hio_cmd == HIO_WRITE) {
62364921Smarcel			if (hast_proto_recv_data(res, res->hr_remotein,
62464921Smarcel			    hio->hio_nv, hio->hio_data, MAXPHYS) < 0) {
625140862Ssobomax				secondary_exit(EX_TEMPFAIL,
62683221Smarcel				    "Unable to receive request data");
627140862Ssobomax			}
628173937Skib		}
62964921Smarcel		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
63064921Smarcel		    hio);
63164921Smarcel		QUEUE_INSERT(disk, hio);
632140862Ssobomax	}
633140862Ssobomax	/* NOTREACHED */
634140862Ssobomax	return (NULL);
635140862Ssobomax}
63683366Sjulian
63764921Smarcel/*
638173937Skib * Thread reads from or writes to local component and also handles DELETE and
639173937Skib * FLUSH requests.
640173937Skib */
641173937Skibstatic void *
642173937Skibdisk_thread(void *arg)
643173937Skib{
644173937Skib	struct hast_resource *res = arg;
645173937Skib	struct hio *hio;
64664921Smarcel	ssize_t ret;
64764921Smarcel	bool clear_activemap;
64864921Smarcel
64964921Smarcel	clear_activemap = true;
65064921Smarcel
65164921Smarcel	for (;;) {
65264921Smarcel		pjdlog_debug(2, "disk: Taking request.");
65364921Smarcel		QUEUE_TAKE(disk, hio);
65464921Smarcel		while (clear_activemap) {
655140862Ssobomax			unsigned char *map;
656140862Ssobomax			size_t mapsize;
657140862Ssobomax
658140862Ssobomax			/*
659140862Ssobomax			 * When first request is received, it means that primary
660140862Ssobomax			 * already received our activemap, merged it and stored
661140862Ssobomax			 * locally. We can now safely clear our activemap.
662140862Ssobomax			 */
66364921Smarcel			mapsize =
664140862Ssobomax			    activemap_calc_ondisk_size(res->hr_local_mediasize -
665140862Ssobomax			    METADATA_SIZE, res->hr_extentsize,
666140862Ssobomax			    res->hr_local_sectorsize);
667140862Ssobomax			map = calloc(1, mapsize);
668140862Ssobomax			if (map == NULL) {
669140862Ssobomax				pjdlog_warning("Unable to allocate memory to clear local activemap.");
67064921Smarcel				break;
67164921Smarcel			}
672195074Sjhb			if (pwrite(res->hr_localfd, map, mapsize,
67364921Smarcel			    METADATA_SIZE) != (ssize_t)mapsize) {
67464921Smarcel				pjdlog_errno(LOG_WARNING,
67564921Smarcel				    "Unable to store cleared activemap");
67664921Smarcel				free(map);
67764921Smarcel				break;
67864921Smarcel			}
67964921Smarcel			free(map);
68064921Smarcel			clear_activemap = false;
68164921Smarcel			pjdlog_debug(1, "Local activemap cleared.");
68264921Smarcel		}
68364921Smarcel		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
68464921Smarcel		/* Handle the actual request. */
68583366Sjulian		switch (hio->hio_cmd) {
68664921Smarcel		case HIO_READ:
68783221Smarcel			ret = pread(res->hr_localfd, hio->hio_data,
68883221Smarcel			    hio->hio_length,
68964921Smarcel			    hio->hio_offset + res->hr_localoff);
69064921Smarcel			if (ret < 0)
69164921Smarcel				hio->hio_error = errno;
69272543Sjlemon			else if (ret != (int64_t)hio->hio_length)
69372543Sjlemon				hio->hio_error = EIO;
69472543Sjlemon			else
69564921Smarcel				hio->hio_error = 0;
69664921Smarcel			break;
69764921Smarcel		case HIO_WRITE:
698111797Sdes			ret = pwrite(res->hr_localfd, hio->hio_data,
69964921Smarcel			    hio->hio_length,
70064921Smarcel			    hio->hio_offset + res->hr_localoff);
70164921Smarcel			if (ret < 0)
70264921Smarcel				hio->hio_error = errno;
70364921Smarcel			else if (ret != (int64_t)hio->hio_length)
70464921Smarcel				hio->hio_error = EIO;
70564921Smarcel			else
70664921Smarcel				hio->hio_error = 0;
70764921Smarcel			break;
70883366Sjulian		case HIO_DELETE:
70964921Smarcel			ret = g_delete(res->hr_localfd,
71064921Smarcel			    hio->hio_offset + res->hr_localoff,
71164921Smarcel			    hio->hio_length);
71264921Smarcel			if (ret < 0)
71364921Smarcel				hio->hio_error = errno;
71464921Smarcel			else
71564921Smarcel				hio->hio_error = 0;
716111797Sdes			break;
71764921Smarcel		case HIO_FLUSH:
71864921Smarcel			ret = g_flush(res->hr_localfd);
71964921Smarcel			if (ret < 0)
72064921Smarcel				hio->hio_error = errno;
72164921Smarcel			else
72264921Smarcel				hio->hio_error = 0;
72364921Smarcel			break;
72464921Smarcel		}
72564921Smarcel		if (hio->hio_error != 0) {
72664921Smarcel			reqlog(LOG_ERR, 0, hio->hio_error, hio,
72764921Smarcel			    "Request failed: ");
72883366Sjulian		}
72964921Smarcel		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
730102814Siedowse		    hio);
73183221Smarcel		QUEUE_INSERT(send, hio);
73264921Smarcel	}
73364921Smarcel	/* NOTREACHED */
73472543Sjlemon	return (NULL);
73572543Sjlemon}
73664921Smarcel
73764921Smarcel/*
73864921Smarcel * Thread sends requests back to primary node.
73964921Smarcel */
740102814Siedowsestatic void *
741102814Siedowsesend_thread(void *arg)
74264921Smarcel{
74364921Smarcel	struct hast_resource *res = arg;
74464921Smarcel	struct nv *nvout;
745105441Smarkm	struct hio *hio;
74664921Smarcel	void *data;
74783221Smarcel	size_t length;
748102814Siedowse
74964921Smarcel	for (;;) {
75064921Smarcel		pjdlog_debug(2, "send: Taking request.");
75164921Smarcel		QUEUE_TAKE(send, hio);
75272543Sjlemon		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
75372543Sjlemon		nvout = nv_alloc();
75472543Sjlemon		/* Copy sequence number. */
75564921Smarcel		nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq");
75664921Smarcel		switch (hio->hio_cmd) {
75783221Smarcel		case HIO_READ:
75864921Smarcel			if (hio->hio_error == 0) {
75964921Smarcel				data = hio->hio_data;
76083221Smarcel				length = hio->hio_length;
76164921Smarcel				break;
76264921Smarcel			}
76364921Smarcel			/*
764102814Siedowse			 * We send no data in case of an error.
765102814Siedowse			 */
76664921Smarcel			/* FALLTHROUGH */
76764921Smarcel		case HIO_DELETE:
76864921Smarcel		case HIO_FLUSH:
76983366Sjulian		case HIO_WRITE:
77064921Smarcel			data = NULL;
77183366Sjulian			length = 0;
772102814Siedowse			break;
77364921Smarcel		default:
77464921Smarcel			abort();
77572543Sjlemon			break;
77672543Sjlemon		}
77764921Smarcel		if (hio->hio_error != 0)
77864921Smarcel			nv_add_int16(nvout, hio->hio_error, "error");
77971494Sjhb		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
780112888Sjeff		    length) < 0) {
78171494Sjhb			secondary_exit(EX_TEMPFAIL, "Unable to send reply.");
782102814Siedowse		}
78364921Smarcel		nv_free(nvout);
78464921Smarcel		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
78564921Smarcel		    hio);
78683366Sjulian		nv_free(hio->hio_nv);
78764921Smarcel		hio->hio_error = 0;
788102814Siedowse		QUEUE_INSERT(free, hio);
78983221Smarcel	}
79064921Smarcel	/* NOTREACHED */
79164921Smarcel	return (NULL);
79264921Smarcel}
79372543Sjlemon