secondary.c revision 247866
169800Stomsoft/*-
269800Stomsoft * Copyright (c) 2009-2010 The FreeBSD Foundation
369800Stomsoft * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
469800Stomsoft * All rights reserved.
569800Stomsoft *
669800Stomsoft * This software was developed by Pawel Jakub Dawidek under sponsorship from
769800Stomsoft * the FreeBSD Foundation.
869800Stomsoft *
969800Stomsoft * Redistribution and use in source and binary forms, with or without
1069800Stomsoft * modification, are permitted provided that the following conditions
1169800Stomsoft * are met:
1269800Stomsoft * 1. Redistributions of source code must retain the above copyright
1369800Stomsoft *    notice, this list of conditions and the following disclaimer.
1469800Stomsoft * 2. Redistributions in binary form must reproduce the above copyright
1569800Stomsoft *    notice, this list of conditions and the following disclaimer in the
1669800Stomsoft *    documentation and/or other materials provided with the distribution.
1769800Stomsoft *
1869800Stomsoft * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
1969800Stomsoft * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
2069800Stomsoft * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
2169800Stomsoft * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
2269800Stomsoft * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
2369800Stomsoft * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2469800Stomsoft * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2569800Stomsoft * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2669800Stomsoft * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2769800Stomsoft * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2869800Stomsoft * SUCH DAMAGE.
2969800Stomsoft */
3069800Stomsoft
3169800Stomsoft#include <sys/cdefs.h>
3269800Stomsoft__FBSDID("$FreeBSD: stable/9/sbin/hastd/secondary.c 247866 2013-03-06 06:57:18Z trociny $");
3369800Stomsoft
3469800Stomsoft#include <sys/param.h>
3569800Stomsoft#include <sys/time.h>
3669800Stomsoft#include <sys/bio.h>
3769800Stomsoft#include <sys/disk.h>
3869926Stomsoft#include <sys/stat.h>
3969800Stomsoft
4069800Stomsoft#include <err.h>
4169800Stomsoft#include <errno.h>
4269800Stomsoft#include <fcntl.h>
4369800Stomsoft#include <libgeom.h>
4469926Stomsoft#include <pthread.h>
4569800Stomsoft#include <signal.h>
4669800Stomsoft#include <stdint.h>
4769800Stomsoft#include <stdio.h>
4869800Stomsoft#include <string.h>
4969800Stomsoft#include <sysexits.h>
50103949Smike#include <unistd.h>
5169800Stomsoft
5269800Stomsoft#include <activemap.h>
5369800Stomsoft#include <nv.h>
5469800Stomsoft#include <pjdlog.h>
5569800Stomsoft
5669800Stomsoft#include "control.h"
5769800Stomsoft#include "event.h"
5869800Stomsoft#include "hast.h"
5969800Stomsoft#include "hast_proto.h"
6069800Stomsoft#include "hastd.h"
6169800Stomsoft#include "hooks.h"
6269800Stomsoft#include "metadata.h"
6369800Stomsoft#include "proto.h"
6469800Stomsoft#include "subr.h"
6569800Stomsoft#include "synch.h"
6669800Stomsoft
6769800Stomsoftstruct hio {
6869800Stomsoft	uint64_t	 hio_seq;
6969800Stomsoft	int		 hio_error;
7069800Stomsoft	void		*hio_data;
7169800Stomsoft	uint8_t		 hio_cmd;
7269800Stomsoft	uint64_t	 hio_offset;
7369800Stomsoft	uint64_t	 hio_length;
7469800Stomsoft	TAILQ_ENTRY(hio) hio_next;
7592743Srwatson};
7692743Srwatson
7792743Srwatsonstatic struct hast_resource *gres;
7892743Srwatson
7969800Stomsoft/*
8069800Stomsoft * Free list holds unused structures. When free list is empty, we have to wait
8169800Stomsoft * until some in-progress requests are freed.
8269800Stomsoft */
8369800Stomsoftstatic TAILQ_HEAD(, hio) hio_free_list;
8469800Stomsoftstatic pthread_mutex_t hio_free_list_lock;
8569800Stomsoftstatic pthread_cond_t hio_free_list_cond;
8669800Stomsoft/*
8769800Stomsoft * Disk thread (the one that do I/O requests) takes requests from this list.
8869800Stomsoft */
8969800Stomsoftstatic TAILQ_HEAD(, hio) hio_disk_list;
9069800Stomsoftstatic pthread_mutex_t hio_disk_list_lock;
9169800Stomsoftstatic pthread_cond_t hio_disk_list_cond;
9269800Stomsoft/*
9369800Stomsoft * There is one recv list for every component, although local components don't
9469800Stomsoft * use recv lists as local requests are done synchronously.
9569800Stomsoft */
9669800Stomsoftstatic TAILQ_HEAD(, hio) hio_send_list;
9769800Stomsoftstatic pthread_mutex_t hio_send_list_lock;
9869800Stomsoftstatic pthread_cond_t hio_send_list_cond;
9969800Stomsoft
10069800Stomsoft/*
101102231Strhodes * Maximum number of outstanding I/O requests.
10269800Stomsoft */
10369800Stomsoft#define	HAST_HIO_MAX	256
10469800Stomsoft
10569800Stomsoftstatic void *recv_thread(void *arg);
10669800Stomsoftstatic void *disk_thread(void *arg);
10769800Stomsoftstatic void *send_thread(void *arg);
10869800Stomsoft
10969800Stomsoft#define	QUEUE_INSERT(name, hio)	do {					\
11069800Stomsoft	bool _wakeup;							\
11169800Stomsoft									\
11269800Stomsoft	mtx_lock(&hio_##name##_list_lock);				\
11369800Stomsoft	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
11469800Stomsoft	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);		\
11569800Stomsoft	mtx_unlock(&hio_##name##_list_lock);				\
11669800Stomsoft	if (_wakeup)							\
11769800Stomsoft		cv_signal(&hio_##name##_list_cond);			\
11869800Stomsoft} while (0)
11969800Stomsoft#define	QUEUE_TAKE(name, hio)	do {					\
12069800Stomsoft	mtx_lock(&hio_##name##_list_lock);				\
12169800Stomsoft	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
12269800Stomsoft		cv_wait(&hio_##name##_list_cond,			\
12369800Stomsoft		    &hio_##name##_list_lock);				\
12469800Stomsoft	}								\
12569800Stomsoft	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);		\
12669800Stomsoft	mtx_unlock(&hio_##name##_list_lock);				\
12769800Stomsoft} while (0)
12869800Stomsoft
12969800Stomsoftstatic void
13069800Stomsofthio_clear(struct hio *hio)
13169800Stomsoft{
13269800Stomsoft
13369800Stomsoft	hio->hio_seq = 0;
13469800Stomsoft	hio->hio_error = 0;
13569800Stomsoft	hio->hio_cmd = HIO_UNDEF;
13669800Stomsoft	hio->hio_offset = 0;
13769800Stomsoft	hio->hio_length = 0;
13869800Stomsoft}
13969800Stomsoft
14069800Stomsoftstatic void
14169800Stomsoftinit_environment(void)
14269800Stomsoft{
14369800Stomsoft	struct hio *hio;
14469800Stomsoft	unsigned int ii;
14569800Stomsoft
14669800Stomsoft	/*
14769800Stomsoft	 * Initialize lists, their locks and theirs condition variables.
14869800Stomsoft	 */
14969800Stomsoft	TAILQ_INIT(&hio_free_list);
15069800Stomsoft	mtx_init(&hio_free_list_lock);
15169800Stomsoft	cv_init(&hio_free_list_cond);
15269800Stomsoft	TAILQ_INIT(&hio_disk_list);
15369800Stomsoft	mtx_init(&hio_disk_list_lock);
15469800Stomsoft	cv_init(&hio_disk_list_cond);
15569800Stomsoft	TAILQ_INIT(&hio_send_list);
15669800Stomsoft	mtx_init(&hio_send_list_lock);
15769800Stomsoft	cv_init(&hio_send_list_cond);
15869800Stomsoft
15969800Stomsoft	/*
16069800Stomsoft	 * Allocate requests pool and initialize requests.
16177885Stomsoft	 */
16277885Stomsoft	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
16369800Stomsoft		hio = malloc(sizeof(*hio));
16469800Stomsoft		if (hio == NULL) {
16569800Stomsoft			pjdlog_exitx(EX_TEMPFAIL,
16669800Stomsoft			    "Unable to allocate memory (%zu bytes) for hio request.",
16769800Stomsoft			    sizeof(*hio));
16869800Stomsoft		}
16969800Stomsoft		hio->hio_data = malloc(MAXPHYS);
17069800Stomsoft		if (hio->hio_data == NULL) {
17169800Stomsoft			pjdlog_exitx(EX_TEMPFAIL,
17269800Stomsoft			    "Unable to allocate memory (%zu bytes) for gctl_data.",
17369800Stomsoft			    (size_t)MAXPHYS);
17469800Stomsoft		}
17569800Stomsoft		hio_clear(hio);
17669800Stomsoft		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
17769800Stomsoft	}
17869800Stomsoft}
17969800Stomsoft
18069800Stomsoftstatic void
18169800Stomsoftinit_local(struct hast_resource *res)
18269800Stomsoft{
18369800Stomsoft
18469800Stomsoft	if (metadata_read(res, true) == -1)
18569800Stomsoft		exit(EX_NOINPUT);
18669800Stomsoft}
18769800Stomsoft
18869800Stomsoftstatic void
18969800Stomsoftinit_remote(struct hast_resource *res, struct nv *nvin)
19069800Stomsoft{
19169800Stomsoft	uint64_t resuid;
19269800Stomsoft	struct nv *nvout;
19369800Stomsoft	unsigned char *map;
19469800Stomsoft	size_t mapsize;
19569800Stomsoft
19669800Stomsoft#ifdef notyet
19769800Stomsoft	/* Setup direction. */
19869800Stomsoft	if (proto_send(res->hr_remoteout, NULL, 0) == -1)
19969800Stomsoft		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
20069800Stomsoft#endif
20169800Stomsoft
20269800Stomsoft	nvout = nv_alloc();
20369800Stomsoft	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
20469800Stomsoft	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
20569800Stomsoft	resuid = nv_get_uint64(nvin, "resuid");
20669800Stomsoft	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
20769800Stomsoft	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
20869800Stomsoft	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
20969800Stomsoft	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
21069800Stomsoft	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
21169800Stomsoft	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
21269800Stomsoft	map = malloc(mapsize);
21369800Stomsoft	if (map == NULL) {
21469800Stomsoft		pjdlog_exitx(EX_TEMPFAIL,
21569800Stomsoft		    "Unable to allocate memory (%zu bytes) for activemap.",
21669800Stomsoft		    mapsize);
21769800Stomsoft	}
21869800Stomsoft	/*
21969800Stomsoft	 * When we work as primary and secondary is missing we will increase
22069800Stomsoft	 * localcnt in our metadata. When secondary is connected and synced
22169800Stomsoft	 * we make localcnt be equal to remotecnt, which means nodes are more
22269800Stomsoft	 * or less in sync.
22369800Stomsoft	 * Split-brain condition is when both nodes are not able to communicate
22469800Stomsoft	 * and are both configured as primary nodes. In turn, they can both
22569800Stomsoft	 * make incompatible changes to the data and we have to detect that.
22669800Stomsoft	 * Under split-brain condition we will increase our localcnt on first
22769800Stomsoft	 * write and remote node will increase its localcnt on first write.
22869800Stomsoft	 * When we connect we can see that primary's localcnt is greater than
22969800Stomsoft	 * our remotecnt (primary was modified while we weren't watching) and
23069800Stomsoft	 * our localcnt is greater than primary's remotecnt (we were modified
23169800Stomsoft	 * while primary wasn't watching).
23269800Stomsoft	 * There are many possible combinations which are all gathered below.
23369800Stomsoft	 * Don't pay too much attention to exact numbers, the more important
23469800Stomsoft	 * is to compare them. We compare secondary's local with primary's
23569800Stomsoft	 * remote and secondary's remote with primary's local.
23669800Stomsoft	 * Note that every case where primary's localcnt is smaller than
23769800Stomsoft	 * secondary's remotecnt and where secondary's localcnt is smaller than
23869800Stomsoft	 * primary's remotecnt should be impossible in practise. We will perform
23969800Stomsoft	 * full synchronization then. Those cases are marked with an asterisk.
24069800Stomsoft	 * Regular synchronization means that only extents marked as dirty are
24169800Stomsoft	 * synchronized (regular synchronization).
24269800Stomsoft	 *
24369800Stomsoft	 * SECONDARY METADATA PRIMARY METADATA
24469800Stomsoft	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
24569800Stomsoft	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
24669800Stomsoft	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
24769800Stomsoft	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
24869800Stomsoft	 *                                       regular sync from secondary.
24969800Stomsoft	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
25069800Stomsoft	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
25169800Stomsoft	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
25269800Stomsoft	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
25369800Stomsoft	 *                                       regular sync from primary.
25469800Stomsoft	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
25569800Stomsoft	 */
25669800Stomsoft	if (res->hr_resuid == 0) {
25769800Stomsoft		/*
25869800Stomsoft		 * Provider is used for the first time. If primary node done no
25969800Stomsoft		 * writes yet as well (we will find "virgin" argument) then
26069800Stomsoft		 * there is no need to synchronize anything. If primary node
26169800Stomsoft		 * done any writes already we have to synchronize everything.
26269800Stomsoft		 */
26369800Stomsoft		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
26469800Stomsoft		res->hr_resuid = resuid;
26569800Stomsoft		if (metadata_write(res) == -1)
26669800Stomsoft			exit(EX_NOINPUT);
26769800Stomsoft		if (nv_exists(nvin, "virgin")) {
26869800Stomsoft			free(map);
26969800Stomsoft			map = NULL;
27069800Stomsoft			mapsize = 0;
27169800Stomsoft		} else {
27269800Stomsoft			memset(map, 0xff, mapsize);
27369800Stomsoft		}
27469800Stomsoft		nv_add_int8(nvout, 1, "virgin");
27569800Stomsoft		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
27669800Stomsoft	} else if (res->hr_resuid != resuid) {
27769800Stomsoft		char errmsg[256];
27869800Stomsoft
27969800Stomsoft		free(map);
28069800Stomsoft		(void)snprintf(errmsg, sizeof(errmsg),
28169800Stomsoft		    "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
28269800Stomsoft		    (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
28369800Stomsoft		pjdlog_error("%s", errmsg);
28469800Stomsoft		nv_add_string(nvout, errmsg, "errmsg");
28569800Stomsoft		if (hast_proto_send(res, res->hr_remotein, nvout,
28669800Stomsoft		    NULL, 0) == -1) {
28769800Stomsoft			pjdlog_exit(EX_TEMPFAIL,
28869800Stomsoft			    "Unable to send response to %s",
28969800Stomsoft			    res->hr_remoteaddr);
29069800Stomsoft		}
29169800Stomsoft		nv_free(nvout);
29269800Stomsoft		exit(EX_CONFIG);
29369800Stomsoft	} else if (
29469800Stomsoft	    /* Is primary out-of-date? */
29569800Stomsoft	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
29669800Stomsoft	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
29769800Stomsoft	    /* Are the nodes more or less in sync? */
29869800Stomsoft	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
29969800Stomsoft	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
30069800Stomsoft	    /* Is secondary out-of-date? */
30169800Stomsoft	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
30269800Stomsoft	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
30369800Stomsoft		/*
30469800Stomsoft		 * Nodes are more or less in sync or one of the nodes is
30569800Stomsoft		 * out-of-date.
30669800Stomsoft		 * It doesn't matter at this point which one, we just have to
30769800Stomsoft		 * send out local bitmap to the remote node.
30869800Stomsoft		 */
30969800Stomsoft		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
31069800Stomsoft		    (ssize_t)mapsize) {
31169800Stomsoft			pjdlog_exit(LOG_ERR, "Unable to read activemap");
31269800Stomsoft		}
31369800Stomsoft		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
31469800Stomsoft		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
31569800Stomsoft			/* Primary is out-of-date, sync from secondary. */
31669800Stomsoft			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
31769800Stomsoft		} else {
31869800Stomsoft			/*
31969800Stomsoft			 * Secondary is out-of-date or counts match.
32069800Stomsoft			 * Sync from primary.
32169800Stomsoft			 */
32269800Stomsoft			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
32369800Stomsoft		}
32469800Stomsoft	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
32569800Stomsoft	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
32669800Stomsoft		/*
32769800Stomsoft		 * Not good, we have split-brain condition.
32869800Stomsoft		 */
32969800Stomsoft		free(map);
33069800Stomsoft		pjdlog_error("Split-brain detected, exiting.");
33169800Stomsoft		nv_add_string(nvout, "Split-brain condition!", "errmsg");
33269800Stomsoft		if (hast_proto_send(res, res->hr_remotein, nvout,
33369800Stomsoft		    NULL, 0) == -1) {
33469800Stomsoft			pjdlog_exit(EX_TEMPFAIL,
33569800Stomsoft			    "Unable to send response to %s",
33677885Stomsoft			    res->hr_remoteaddr);
33777885Stomsoft		}
33869800Stomsoft		nv_free(nvout);
33969800Stomsoft		/* Exit on split-brain. */
34069800Stomsoft		event_send(res, EVENT_SPLITBRAIN);
34169800Stomsoft		exit(EX_CONFIG);
34269800Stomsoft	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
34369800Stomsoft	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
34469800Stomsoft		/*
34569800Stomsoft		 * This should never happen in practise, but we will perform
34669800Stomsoft		 * full synchronization.
34769800Stomsoft		 */
34869800Stomsoft		PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
34969800Stomsoft		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
35069800Stomsoft		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
35169800Stomsoft		    METADATA_SIZE, res->hr_extentsize,
35269800Stomsoft		    res->hr_local_sectorsize);
35369800Stomsoft		memset(map, 0xff, mapsize);
35469800Stomsoft		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
35569800Stomsoft			/* In this one of five cases sync from secondary. */
35669800Stomsoft			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
35769800Stomsoft		} else {
35869800Stomsoft			/* For the rest four cases sync from primary. */
35969800Stomsoft			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
36069800Stomsoft		}
36169800Stomsoft		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
36269800Stomsoft		    (uintmax_t)res->hr_primary_localcnt,
36369800Stomsoft		    (uintmax_t)res->hr_primary_remotecnt,
36469800Stomsoft		    (uintmax_t)res->hr_secondary_localcnt,
36569800Stomsoft		    (uintmax_t)res->hr_secondary_remotecnt);
36669800Stomsoft	}
36769800Stomsoft	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
36869800Stomsoft	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) {
36969800Stomsoft		pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
37069800Stomsoft		    res->hr_remoteaddr);
37169800Stomsoft	}
37269800Stomsoft	if (map != NULL)
37369800Stomsoft		free(map);
37469800Stomsoft	nv_free(nvout);
37569800Stomsoft#ifdef notyet
37669800Stomsoft	/* Setup direction. */
37769800Stomsoft	if (proto_recv(res->hr_remotein, NULL, 0) == -1)
37869800Stomsoft		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
37969800Stomsoft#endif
38069800Stomsoft}
38169800Stomsoft
38269800Stomsoftvoid
38369800Stomsofthastd_secondary(struct hast_resource *res, struct nv *nvin)
38469800Stomsoft{
38569800Stomsoft	sigset_t mask;
38669800Stomsoft	pthread_t td;
38769800Stomsoft	pid_t pid;
38869800Stomsoft	int error, mode, debuglevel;
38969800Stomsoft
39069800Stomsoft	/*
39169800Stomsoft	 * Create communication channel between parent and child.
39269800Stomsoft	 */
39369800Stomsoft	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) {
39469800Stomsoft		KEEP_ERRNO((void)pidfile_remove(pfh));
39569800Stomsoft		pjdlog_exit(EX_OSERR,
39669800Stomsoft		    "Unable to create control sockets between parent and child");
39769800Stomsoft	}
39869800Stomsoft	/*
39969800Stomsoft	 * Create communication channel between child and parent.
40069800Stomsoft	 */
40169800Stomsoft	if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) {
40269800Stomsoft		KEEP_ERRNO((void)pidfile_remove(pfh));
40369800Stomsoft		pjdlog_exit(EX_OSERR,
40469800Stomsoft		    "Unable to create event sockets between child and parent");
40569800Stomsoft	}
40669800Stomsoft
40769800Stomsoft	pid = fork();
40869800Stomsoft	if (pid == -1) {
40969800Stomsoft		KEEP_ERRNO((void)pidfile_remove(pfh));
41069800Stomsoft		pjdlog_exit(EX_OSERR, "Unable to fork");
41169800Stomsoft	}
41269800Stomsoft
41369800Stomsoft	if (pid > 0) {
41469800Stomsoft		/* This is parent. */
41569800Stomsoft		proto_close(res->hr_remotein);
41669800Stomsoft		res->hr_remotein = NULL;
41769800Stomsoft		proto_close(res->hr_remoteout);
41869800Stomsoft		res->hr_remoteout = NULL;
41969800Stomsoft		/* Declare that we are receiver. */
42069800Stomsoft		proto_recv(res->hr_event, NULL, 0);
42169800Stomsoft		/* Declare that we are sender. */
42269800Stomsoft		proto_send(res->hr_ctrl, NULL, 0);
42369800Stomsoft		res->hr_workerpid = pid;
42469800Stomsoft		return;
42569800Stomsoft	}
42669800Stomsoft
42769800Stomsoft	gres = res;
42869800Stomsoft	mode = pjdlog_mode_get();
42969800Stomsoft	debuglevel = pjdlog_debug_get();
43069800Stomsoft
43169800Stomsoft	/* Declare that we are sender. */
43269800Stomsoft	proto_send(res->hr_event, NULL, 0);
43369800Stomsoft	/* Declare that we are receiver. */
43469800Stomsoft	proto_recv(res->hr_ctrl, NULL, 0);
43569800Stomsoft	descriptors_cleanup(res);
43669800Stomsoft
43769800Stomsoft	descriptors_assert(res, mode);
43869800Stomsoft
43969800Stomsoft	pjdlog_init(mode);
44069800Stomsoft	pjdlog_debug_set(debuglevel);
44169800Stomsoft	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
44269800Stomsoft	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
44369800Stomsoft
44469800Stomsoft	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
44569800Stomsoft	PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
44669800Stomsoft
44769800Stomsoft	/* Error in setting timeout is not critical, but why should it fail? */
44869800Stomsoft	if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1)
44969800Stomsoft		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
45069800Stomsoft	if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1)
45169800Stomsoft		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
45269800Stomsoft
45369800Stomsoft	init_local(res);
45469800Stomsoft	init_environment();
45569800Stomsoft
45669800Stomsoft	if (drop_privs(res) != 0)
45769800Stomsoft		exit(EX_CONFIG);
45869800Stomsoft	pjdlog_info("Privileges successfully dropped.");
45969800Stomsoft
460103949Smike	/*
46169800Stomsoft	 * Create the control thread before sending any event to the parent,
46269800Stomsoft	 * as we can deadlock when parent sends control request to worker,
46369800Stomsoft	 * but worker has no control thread started yet, so parent waits.
46469800Stomsoft	 * In the meantime worker sends an event to the parent, but parent
46569800Stomsoft	 * is unable to handle the event, because it waits for control
46669800Stomsoft	 * request response.
46769800Stomsoft	 */
46869800Stomsoft	error = pthread_create(&td, NULL, ctrl_thread, res);
46969800Stomsoft	PJDLOG_ASSERT(error == 0);
47069800Stomsoft
47169800Stomsoft	init_remote(res, nvin);
47269800Stomsoft	event_send(res, EVENT_CONNECT);
47369800Stomsoft
47469800Stomsoft	error = pthread_create(&td, NULL, recv_thread, res);
47569800Stomsoft	PJDLOG_ASSERT(error == 0);
47669800Stomsoft	error = pthread_create(&td, NULL, disk_thread, res);
47769800Stomsoft	PJDLOG_ASSERT(error == 0);
47869800Stomsoft	(void)send_thread(res);
47969800Stomsoft}
48069800Stomsoft
48169800Stomsoftstatic void
48269800Stomsoftreqlog(int loglevel, int debuglevel, int error, struct hio *hio,
48369800Stomsoft    const char *fmt, ...)
48469800Stomsoft{
48569800Stomsoft	char msg[1024];
48669800Stomsoft	va_list ap;
48769800Stomsoft	int len;
48869800Stomsoft
48969800Stomsoft	va_start(ap, fmt);
49069800Stomsoft	len = vsnprintf(msg, sizeof(msg), fmt, ap);
49169800Stomsoft	va_end(ap);
49269800Stomsoft	if ((size_t)len < sizeof(msg)) {
49369800Stomsoft		switch (hio->hio_cmd) {
49469800Stomsoft		case HIO_READ:
49569800Stomsoft			(void)snprintf(msg + len, sizeof(msg) - len,
49669800Stomsoft			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
49769800Stomsoft			    (uintmax_t)hio->hio_length);
49869800Stomsoft			break;
49969800Stomsoft		case HIO_DELETE:
50069800Stomsoft			(void)snprintf(msg + len, sizeof(msg) - len,
50169800Stomsoft			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
50269800Stomsoft			    (uintmax_t)hio->hio_length);
50369800Stomsoft			break;
504103949Smike		case HIO_FLUSH:
50569800Stomsoft			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
50669800Stomsoft			break;
50769800Stomsoft		case HIO_WRITE:
50869800Stomsoft			(void)snprintf(msg + len, sizeof(msg) - len,
50969800Stomsoft			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
51069800Stomsoft			    (uintmax_t)hio->hio_length);
51169800Stomsoft			break;
51269800Stomsoft		case HIO_KEEPALIVE:
51369800Stomsoft			(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
51469800Stomsoft			break;
51569800Stomsoft		default:
51669800Stomsoft			(void)snprintf(msg + len, sizeof(msg) - len,
51769800Stomsoft			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
51869800Stomsoft			break;
51969800Stomsoft		}
52069800Stomsoft	}
52169800Stomsoft	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
52269800Stomsoft}
52369800Stomsoft
52469800Stomsoftstatic int
52569800Stomsoftrequnpack(struct hast_resource *res, struct hio *hio, struct nv *nv)
52669800Stomsoft{
52769800Stomsoft
52869800Stomsoft	hio->hio_cmd = nv_get_uint8(nv, "cmd");
52969800Stomsoft	if (hio->hio_cmd == 0) {
53069800Stomsoft		pjdlog_error("Header contains no 'cmd' field.");
53169800Stomsoft		hio->hio_error = EINVAL;
53269800Stomsoft		goto end;
53369800Stomsoft	}
53469800Stomsoft	if (hio->hio_cmd != HIO_KEEPALIVE) {
53569800Stomsoft		hio->hio_seq = nv_get_uint64(nv, "seq");
53669800Stomsoft		if (hio->hio_seq == 0) {
53777885Stomsoft			pjdlog_error("Header contains no 'seq' field.");
53869800Stomsoft			hio->hio_error = EINVAL;
53969800Stomsoft			goto end;
54069800Stomsoft		}
54169800Stomsoft	}
54269800Stomsoft	switch (hio->hio_cmd) {
54369800Stomsoft	case HIO_FLUSH:
54469800Stomsoft	case HIO_KEEPALIVE:
54569800Stomsoft		break;
54669800Stomsoft	case HIO_READ:
54777885Stomsoft	case HIO_WRITE:
54869800Stomsoft	case HIO_DELETE:
54977885Stomsoft		hio->hio_offset = nv_get_uint64(nv, "offset");
55069800Stomsoft		if (nv_error(nv) != 0) {
55169800Stomsoft			pjdlog_error("Header is missing 'offset' field.");
55269800Stomsoft			hio->hio_error = EINVAL;
55369800Stomsoft			goto end;
55469800Stomsoft		}
55569800Stomsoft		hio->hio_length = nv_get_uint64(nv, "length");
55669800Stomsoft		if (nv_error(nv) != 0) {
55769800Stomsoft			pjdlog_error("Header is missing 'length' field.");
55869800Stomsoft			hio->hio_error = EINVAL;
55969800Stomsoft			goto end;
56069800Stomsoft		}
56169800Stomsoft		if (hio->hio_length == 0) {
56269800Stomsoft			pjdlog_error("Data length is zero.");
56369800Stomsoft			hio->hio_error = EINVAL;
56469800Stomsoft			goto end;
56569800Stomsoft		}
56677885Stomsoft		if (hio->hio_length > MAXPHYS) {
56769800Stomsoft			pjdlog_error("Data length is too large (%ju > %ju).",
56869800Stomsoft			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
56969800Stomsoft			hio->hio_error = EINVAL;
57069800Stomsoft			goto end;
57169800Stomsoft		}
57269800Stomsoft		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
57369800Stomsoft			pjdlog_error("Offset %ju is not multiple of sector size.",
57469800Stomsoft			    (uintmax_t)hio->hio_offset);
57569800Stomsoft			hio->hio_error = EINVAL;
57669800Stomsoft			goto end;
57777885Stomsoft		}
57869800Stomsoft		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
57977885Stomsoft			pjdlog_error("Length %ju is not multiple of sector size.",
58069800Stomsoft			    (uintmax_t)hio->hio_length);
58169800Stomsoft			hio->hio_error = EINVAL;
58269800Stomsoft			goto end;
58369800Stomsoft		}
58469800Stomsoft		if (hio->hio_offset + hio->hio_length >
58569800Stomsoft		    (uint64_t)res->hr_datasize) {
58669800Stomsoft			pjdlog_error("Data offset is too large (%ju > %ju).",
58769800Stomsoft			    (uintmax_t)(hio->hio_offset + hio->hio_length),
58869800Stomsoft			    (uintmax_t)res->hr_datasize);
58969800Stomsoft			hio->hio_error = EINVAL;
59069800Stomsoft			goto end;
59169800Stomsoft		}
59269800Stomsoft		break;
59369800Stomsoft	default:
59469800Stomsoft		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
59569800Stomsoft		    hio->hio_cmd);
59669800Stomsoft		hio->hio_error = EINVAL;
59769800Stomsoft		goto end;
59869800Stomsoft	}
59969800Stomsoft	hio->hio_error = 0;
60069800Stomsoftend:
60169800Stomsoft	return (hio->hio_error);
60269800Stomsoft}
60369800Stomsoft
60469800Stomsoftstatic __dead2 void
60569800Stomsoftsecondary_exit(int exitcode, const char *fmt, ...)
60669800Stomsoft{
60769800Stomsoft	va_list ap;
60869800Stomsoft
60969800Stomsoft	PJDLOG_ASSERT(exitcode != EX_OK);
61069800Stomsoft	va_start(ap, fmt);
61169800Stomsoft	pjdlogv_errno(LOG_ERR, fmt, ap);
61269800Stomsoft	va_end(ap);
61369800Stomsoft	event_send(gres, EVENT_DISCONNECT);
61469800Stomsoft	exit(exitcode);
61569800Stomsoft}
61669800Stomsoft
61769800Stomsoft/*
61869800Stomsoft * Thread receives requests from the primary node.
61969800Stomsoft */
62069800Stomsoftstatic void *
62169800Stomsoftrecv_thread(void *arg)
62269800Stomsoft{
62369800Stomsoft	struct hast_resource *res = arg;
62469800Stomsoft	struct hio *hio;
62569800Stomsoft	struct nv *nv;
62669800Stomsoft
62769800Stomsoft	for (;;) {
62869800Stomsoft		pjdlog_debug(2, "recv: Taking free request.");
62969800Stomsoft		QUEUE_TAKE(free, hio);
63069800Stomsoft		pjdlog_debug(2, "recv: (%p) Got request.", hio);
63169800Stomsoft		if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) {
63269800Stomsoft			secondary_exit(EX_TEMPFAIL,
63369800Stomsoft			    "Unable to receive request header");
63469800Stomsoft		}
63569800Stomsoft		if (requnpack(res, hio, nv) != 0) {
63669800Stomsoft			nv_free(nv);
63769800Stomsoft			pjdlog_debug(2,
63869800Stomsoft			    "recv: (%p) Moving request to the send queue.",
63969800Stomsoft			    hio);
64069800Stomsoft			QUEUE_INSERT(send, hio);
64169800Stomsoft			continue;
64269800Stomsoft		}
64369800Stomsoft		switch (hio->hio_cmd) {
64469800Stomsoft		case HIO_READ:
64569800Stomsoft			res->hr_stat_read++;
64669800Stomsoft			break;
64769800Stomsoft		case HIO_WRITE:
64869800Stomsoft			res->hr_stat_write++;
64969800Stomsoft			break;
65069800Stomsoft		case HIO_DELETE:
65169800Stomsoft			res->hr_stat_delete++;
65269800Stomsoft			break;
65369800Stomsoft		case HIO_FLUSH:
65469800Stomsoft			res->hr_stat_flush++;
65569800Stomsoft			break;
65669800Stomsoft		case HIO_KEEPALIVE:
65769800Stomsoft			break;
65869800Stomsoft		default:
65969800Stomsoft			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
66069800Stomsoft			    hio->hio_cmd);
66169800Stomsoft		}
66269800Stomsoft		reqlog(LOG_DEBUG, 2, -1, hio,
66369800Stomsoft		    "recv: (%p) Got request header: ", hio);
66469800Stomsoft		if (hio->hio_cmd == HIO_KEEPALIVE) {
66569800Stomsoft			nv_free(nv);
66669800Stomsoft			pjdlog_debug(2,
66769800Stomsoft			    "recv: (%p) Moving request to the free queue.",
66869800Stomsoft			    hio);
66969800Stomsoft			hio_clear(hio);
67069800Stomsoft			QUEUE_INSERT(free, hio);
67169800Stomsoft			continue;
67269800Stomsoft		} else if (hio->hio_cmd == HIO_WRITE) {
67369800Stomsoft			if (hast_proto_recv_data(res, res->hr_remotein, nv,
67469800Stomsoft			    hio->hio_data, MAXPHYS) == -1) {
67569800Stomsoft				secondary_exit(EX_TEMPFAIL,
67669800Stomsoft				    "Unable to receive request data");
67769800Stomsoft			}
67869800Stomsoft		}
67969800Stomsoft		nv_free(nv);
68069800Stomsoft		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
68169800Stomsoft		    hio);
68269800Stomsoft		QUEUE_INSERT(disk, hio);
68369800Stomsoft	}
68469800Stomsoft	/* NOTREACHED */
68569800Stomsoft	return (NULL);
68669800Stomsoft}
68769800Stomsoft
68869800Stomsoft/*
68969800Stomsoft * Thread reads from or writes to local component and also handles DELETE and
69069800Stomsoft * FLUSH requests.
69169800Stomsoft */
69269800Stomsoftstatic void *
69369800Stomsoftdisk_thread(void *arg)
69469800Stomsoft{
69569800Stomsoft	struct hast_resource *res = arg;
69669800Stomsoft	struct hio *hio;
69769800Stomsoft	ssize_t ret;
69869800Stomsoft	bool clear_activemap, logerror;
69969800Stomsoft
70069800Stomsoft	clear_activemap = true;
70169800Stomsoft
70269800Stomsoft	for (;;) {
70369800Stomsoft		pjdlog_debug(2, "disk: Taking request.");
704		QUEUE_TAKE(disk, hio);
705		while (clear_activemap) {
706			unsigned char *map;
707			size_t mapsize;
708
709			/*
710			 * When first request is received, it means that primary
711			 * already received our activemap, merged it and stored
712			 * locally. We can now safely clear our activemap.
713			 */
714			mapsize =
715			    activemap_calc_ondisk_size(res->hr_local_mediasize -
716			    METADATA_SIZE, res->hr_extentsize,
717			    res->hr_local_sectorsize);
718			map = calloc(1, mapsize);
719			if (map == NULL) {
720				pjdlog_warning("Unable to allocate memory to clear local activemap.");
721				break;
722			}
723			if (pwrite(res->hr_localfd, map, mapsize,
724			    METADATA_SIZE) != (ssize_t)mapsize) {
725				pjdlog_errno(LOG_WARNING,
726				    "Unable to store cleared activemap");
727				free(map);
728				res->hr_stat_activemap_write_error++;
729				break;
730			}
731			free(map);
732			clear_activemap = false;
733			pjdlog_debug(1, "Local activemap cleared.");
734			break;
735		}
736		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
737		logerror = true;
738		/* Handle the actual request. */
739		switch (hio->hio_cmd) {
740		case HIO_READ:
741			ret = pread(res->hr_localfd, hio->hio_data,
742			    hio->hio_length,
743			    hio->hio_offset + res->hr_localoff);
744			if (ret == -1)
745				hio->hio_error = errno;
746			else if (ret != (int64_t)hio->hio_length)
747				hio->hio_error = EIO;
748			else
749				hio->hio_error = 0;
750			break;
751		case HIO_WRITE:
752			ret = pwrite(res->hr_localfd, hio->hio_data,
753			    hio->hio_length,
754			    hio->hio_offset + res->hr_localoff);
755			if (ret == -1)
756				hio->hio_error = errno;
757			else if (ret != (int64_t)hio->hio_length)
758				hio->hio_error = EIO;
759			else
760				hio->hio_error = 0;
761			break;
762		case HIO_DELETE:
763			ret = g_delete(res->hr_localfd,
764			    hio->hio_offset + res->hr_localoff,
765			    hio->hio_length);
766			if (ret == -1)
767				hio->hio_error = errno;
768			else
769				hio->hio_error = 0;
770			break;
771		case HIO_FLUSH:
772			if (!res->hr_localflush) {
773				ret = -1;
774				hio->hio_error = EOPNOTSUPP;
775				logerror = false;
776				break;
777			}
778			ret = g_flush(res->hr_localfd);
779			if (ret == -1) {
780				if (errno == EOPNOTSUPP)
781					res->hr_localflush = false;
782				hio->hio_error = errno;
783			} else {
784				hio->hio_error = 0;
785			}
786			break;
787		default:
788			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
789			    hio->hio_cmd);
790		}
791		if (logerror && hio->hio_error != 0) {
792			reqlog(LOG_ERR, 0, hio->hio_error, hio,
793			    "Request failed: ");
794		}
795		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
796		    hio);
797		QUEUE_INSERT(send, hio);
798	}
799	/* NOTREACHED */
800	return (NULL);
801}
802
803/*
804 * Thread sends requests back to primary node.
805 */
806static void *
807send_thread(void *arg)
808{
809	struct hast_resource *res = arg;
810	struct nv *nvout;
811	struct hio *hio;
812	void *data;
813	size_t length;
814
815	for (;;) {
816		pjdlog_debug(2, "send: Taking request.");
817		QUEUE_TAKE(send, hio);
818		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
819		nvout = nv_alloc();
820		/* Copy sequence number. */
821		nv_add_uint64(nvout, hio->hio_seq, "seq");
822		switch (hio->hio_cmd) {
823		case HIO_READ:
824			if (hio->hio_error == 0) {
825				data = hio->hio_data;
826				length = hio->hio_length;
827				break;
828			}
829			/*
830			 * We send no data in case of an error.
831			 */
832			/* FALLTHROUGH */
833		case HIO_DELETE:
834		case HIO_FLUSH:
835		case HIO_WRITE:
836			data = NULL;
837			length = 0;
838			break;
839		default:
840			PJDLOG_ABORT("Unexpected command (cmd=%hhu).",
841			    hio->hio_cmd);
842		}
843		if (hio->hio_error != 0) {
844			switch (hio->hio_cmd) {
845			case HIO_READ:
846				res->hr_stat_read_error++;
847				break;
848			case HIO_WRITE:
849				res->hr_stat_write_error++;
850				break;
851			case HIO_DELETE:
852				res->hr_stat_delete_error++;
853				break;
854			case HIO_FLUSH:
855				res->hr_stat_flush_error++;
856				break;
857			}
858			nv_add_int16(nvout, hio->hio_error, "error");
859		}
860		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
861		    length) == -1) {
862			secondary_exit(EX_TEMPFAIL, "Unable to send reply");
863		}
864		nv_free(nvout);
865		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
866		    hio);
867		hio_clear(hio);
868		QUEUE_INSERT(free, hio);
869	}
870	/* NOTREACHED */
871	return (NULL);
872}
873