secondary.c revision 223181
198937Sdes/*-
298937Sdes * Copyright (c) 2009-2010 The FreeBSD Foundation
398937Sdes * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org>
498937Sdes * All rights reserved.
598937Sdes *
698937Sdes * This software was developed by Pawel Jakub Dawidek under sponsorship from
798937Sdes * the FreeBSD Foundation.
898937Sdes *
998937Sdes * Redistribution and use in source and binary forms, with or without
1098937Sdes * modification, are permitted provided that the following conditions
1198937Sdes * are met:
1298937Sdes * 1. Redistributions of source code must retain the above copyright
1398937Sdes *    notice, this list of conditions and the following disclaimer.
1498937Sdes * 2. Redistributions in binary form must reproduce the above copyright
1598937Sdes *    notice, this list of conditions and the following disclaimer in the
1698937Sdes *    documentation and/or other materials provided with the distribution.
1798937Sdes *
1898937Sdes * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
1998937Sdes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
2098937Sdes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
2198937Sdes * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
2298937Sdes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
2398937Sdes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2498937Sdes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2598937Sdes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2698937Sdes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2798937Sdes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2898937Sdes * SUCH DAMAGE.
2998937Sdes */
3098937Sdes
3198937Sdes#include <sys/cdefs.h>
3298937Sdes__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 223181 2011-06-17 07:07:26Z trociny $");
3398937Sdes
3498937Sdes#include <sys/param.h>
3598937Sdes#include <sys/time.h>
3698937Sdes#include <sys/bio.h>
3798937Sdes#include <sys/disk.h>
3898937Sdes#include <sys/stat.h>
3998937Sdes
4098937Sdes#include <err.h>
4198937Sdes#include <errno.h>
4298937Sdes#include <fcntl.h>
4398937Sdes#include <libgeom.h>
4498937Sdes#include <pthread.h>
4598937Sdes#include <signal.h>
4698937Sdes#include <stdint.h>
4798937Sdes#include <stdio.h>
4898937Sdes#include <string.h>
4998937Sdes#include <sysexits.h>
5098937Sdes#include <unistd.h>
5198937Sdes
5298937Sdes#include <activemap.h>
5398937Sdes#include <nv.h>
5498937Sdes#include <pjdlog.h>
5598937Sdes
5698937Sdes#include "control.h"
5798937Sdes#include "event.h"
5898937Sdes#include "hast.h"
5998937Sdes#include "hast_proto.h"
6098937Sdes#include "hastd.h"
6198937Sdes#include "hooks.h"
6298937Sdes#include "metadata.h"
6398937Sdes#include "proto.h"
6498937Sdes#include "subr.h"
6598937Sdes#include "synch.h"
6698937Sdes
6798937Sdesstruct hio {
6898937Sdes	uint64_t	 hio_seq;
6998937Sdes	int		 hio_error;
7098937Sdes	struct nv	*hio_nv;
7198937Sdes	void		*hio_data;
7298937Sdes	uint8_t		 hio_cmd;
7398937Sdes	uint64_t	 hio_offset;
7498937Sdes	uint64_t	 hio_length;
7598937Sdes	TAILQ_ENTRY(hio) hio_next;
7698937Sdes};
7798937Sdes
7898937Sdesstatic struct hast_resource *gres;
7998937Sdes
8098937Sdes/*
8198937Sdes * Free list holds unused structures. When free list is empty, we have to wait
8298937Sdes * until some in-progress requests are freed.
8398937Sdes */
8498937Sdesstatic TAILQ_HEAD(, hio) hio_free_list;
8598937Sdesstatic pthread_mutex_t hio_free_list_lock;
8698937Sdesstatic pthread_cond_t hio_free_list_cond;
8798937Sdes/*
8898937Sdes * Disk thread (the one that do I/O requests) takes requests from this list.
8998937Sdes */
9098937Sdesstatic TAILQ_HEAD(, hio) hio_disk_list;
9198937Sdesstatic pthread_mutex_t hio_disk_list_lock;
9298937Sdesstatic pthread_cond_t hio_disk_list_cond;
9398937Sdes/*
9498937Sdes * There is one recv list for every component, although local components don't
9598937Sdes * use recv lists as local requests are done synchronously.
9698937Sdes */
9798937Sdesstatic TAILQ_HEAD(, hio) hio_send_list;
9898937Sdesstatic pthread_mutex_t hio_send_list_lock;
9998937Sdesstatic pthread_cond_t hio_send_list_cond;
10098937Sdes
10198937Sdes/*
10298937Sdes * Maximum number of outstanding I/O requests.
10398937Sdes */
10498937Sdes#define	HAST_HIO_MAX	256
10598937Sdes
10698937Sdesstatic void *recv_thread(void *arg);
10798937Sdesstatic void *disk_thread(void *arg);
10898937Sdesstatic void *send_thread(void *arg);
10998937Sdes
11098937Sdes#define	QUEUE_INSERT(name, hio)	do {					\
11198937Sdes	bool _wakeup;							\
11298937Sdes									\
11398937Sdes	mtx_lock(&hio_##name##_list_lock);				\
11498937Sdes	_wakeup = TAILQ_EMPTY(&hio_##name##_list);			\
11598937Sdes	TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next);		\
11698937Sdes	mtx_unlock(&hio_##name##_list_lock);				\
11798937Sdes	if (_wakeup)							\
11898937Sdes		cv_signal(&hio_##name##_list_cond);			\
11998937Sdes} while (0)
12098937Sdes#define	QUEUE_TAKE(name, hio)	do {					\
12198937Sdes	mtx_lock(&hio_##name##_list_lock);				\
12298937Sdes	while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) {	\
12398937Sdes		cv_wait(&hio_##name##_list_cond,			\
12498937Sdes		    &hio_##name##_list_lock);				\
12598937Sdes	}								\
12698937Sdes	TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next);		\
12798937Sdes	mtx_unlock(&hio_##name##_list_lock);				\
12898937Sdes} while (0)
12998937Sdes
13098937Sdesstatic void
13198937Sdesinit_environment(void)
13298937Sdes{
13398937Sdes	struct hio *hio;
13498937Sdes	unsigned int ii;
13598937Sdes
13698937Sdes	/*
13798937Sdes	 * Initialize lists, their locks and theirs condition variables.
13898937Sdes	 */
13998937Sdes	TAILQ_INIT(&hio_free_list);
14098937Sdes	mtx_init(&hio_free_list_lock);
14198937Sdes	cv_init(&hio_free_list_cond);
14298937Sdes	TAILQ_INIT(&hio_disk_list);
14398937Sdes	mtx_init(&hio_disk_list_lock);
14498937Sdes	cv_init(&hio_disk_list_cond);
14598937Sdes	TAILQ_INIT(&hio_send_list);
14698937Sdes	mtx_init(&hio_send_list_lock);
14798937Sdes	cv_init(&hio_send_list_cond);
14898937Sdes
14998937Sdes	/*
15098937Sdes	 * Allocate requests pool and initialize requests.
15198937Sdes	 */
15298937Sdes	for (ii = 0; ii < HAST_HIO_MAX; ii++) {
15398937Sdes		hio = malloc(sizeof(*hio));
15498937Sdes		if (hio == NULL) {
15598937Sdes			pjdlog_exitx(EX_TEMPFAIL,
15698937Sdes			    "Unable to allocate memory (%zu bytes) for hio request.",
15798937Sdes			    sizeof(*hio));
15898937Sdes		}
15998937Sdes		hio->hio_error = 0;
16098937Sdes		hio->hio_data = malloc(MAXPHYS);
161124211Sdes		if (hio->hio_data == NULL) {
16299768Sdes			pjdlog_exitx(EX_TEMPFAIL,
16398937Sdes			    "Unable to allocate memory (%zu bytes) for gctl_data.",
16498937Sdes			    (size_t)MAXPHYS);
16598937Sdes		}
16698937Sdes		TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next);
16798937Sdes	}
16898937Sdes}
16998937Sdes
17098937Sdesstatic void
17198937Sdesinit_local(struct hast_resource *res)
17298937Sdes{
17398937Sdes
17498937Sdes	if (metadata_read(res, true) < 0)
17598937Sdes		exit(EX_NOINPUT);
17698937Sdes}
17798937Sdes
17898937Sdesstatic void
17998937Sdesinit_remote(struct hast_resource *res, struct nv *nvin)
18098937Sdes{
18198937Sdes	uint64_t resuid;
18298937Sdes	struct nv *nvout;
18398937Sdes	unsigned char *map;
18498937Sdes	size_t mapsize;
18598937Sdes
18698937Sdes#ifdef notyet
18798937Sdes	/* Setup direction. */
18898937Sdes	if (proto_send(res->hr_remoteout, NULL, 0) == -1)
18998937Sdes		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
19098937Sdes#endif
19198937Sdes
19298937Sdes	map = NULL;
19398937Sdes	mapsize = 0;
19498937Sdes	nvout = nv_alloc();
19598937Sdes	nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize");
19698937Sdes	nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize");
19798937Sdes	resuid = nv_get_uint64(nvin, "resuid");
19898937Sdes	res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt");
19998937Sdes	res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt");
20098937Sdes	nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt");
20198937Sdes	nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt");
20298937Sdes	mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
20398937Sdes	    METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize);
20498937Sdes	map = malloc(mapsize);
20598937Sdes	if (map == NULL) {
20698937Sdes		pjdlog_exitx(EX_TEMPFAIL,
20798937Sdes		    "Unable to allocate memory (%zu bytes) for activemap.",
20898937Sdes		    mapsize);
20998937Sdes	}
21098937Sdes	/*
21198937Sdes	 * When we work as primary and secondary is missing we will increase
21298937Sdes	 * localcnt in our metadata. When secondary is connected and synced
21398937Sdes	 * we make localcnt be equal to remotecnt, which means nodes are more
21498937Sdes	 * or less in sync.
21598937Sdes	 * Split-brain condition is when both nodes are not able to communicate
21698937Sdes	 * and are both configured as primary nodes. In turn, they can both
21798937Sdes	 * make incompatible changes to the data and we have to detect that.
21898937Sdes	 * Under split-brain condition we will increase our localcnt on first
21998937Sdes	 * write and remote node will increase its localcnt on first write.
22098937Sdes	 * When we connect we can see that primary's localcnt is greater than
22198937Sdes	 * our remotecnt (primary was modified while we weren't watching) and
22298937Sdes	 * our localcnt is greater than primary's remotecnt (we were modified
22398937Sdes	 * while primary wasn't watching).
22498937Sdes	 * There are many possible combinations which are all gathered below.
22598937Sdes	 * Don't pay too much attention to exact numbers, the more important
22698937Sdes	 * is to compare them. We compare secondary's local with primary's
22798937Sdes	 * remote and secondary's remote with primary's local.
22898937Sdes	 * Note that every case where primary's localcnt is smaller than
22998937Sdes	 * secondary's remotecnt and where secondary's localcnt is smaller than
23098937Sdes	 * primary's remotecnt should be impossible in practise. We will perform
23198937Sdes	 * full synchronization then. Those cases are marked with an asterisk.
23298937Sdes	 * Regular synchronization means that only extents marked as dirty are
23398937Sdes	 * synchronized (regular synchronization).
23498937Sdes	 *
23598937Sdes	 * SECONDARY METADATA PRIMARY METADATA
23698937Sdes	 * local=3 remote=3   local=2 remote=2*  ?! Full sync from secondary.
23798937Sdes	 * local=3 remote=3   local=2 remote=3*  ?! Full sync from primary.
23898937Sdes	 * local=3 remote=3   local=2 remote=4*  ?! Full sync from primary.
23998937Sdes	 * local=3 remote=3   local=3 remote=2   Primary is out-of-date,
24098937Sdes	 *                                       regular sync from secondary.
24198937Sdes	 * local=3 remote=3   local=3 remote=3   Regular sync just in case.
24298937Sdes	 * local=3 remote=3   local=3 remote=4*  ?! Full sync from primary.
24398937Sdes	 * local=3 remote=3   local=4 remote=2   Split-brain condition.
24498937Sdes	 * local=3 remote=3   local=4 remote=3   Secondary out-of-date,
24598937Sdes	 *                                       regular sync from primary.
24698937Sdes	 * local=3 remote=3   local=4 remote=4*  ?! Full sync from primary.
24798937Sdes	 */
24898937Sdes	if (res->hr_resuid == 0) {
24998937Sdes		/*
25098937Sdes		 * Provider is used for the first time. If primary node done no
25198937Sdes		 * writes yet as well (we will find "virgin" argument) then
25298937Sdes		 * there is no need to synchronize anything. If primary node
25398937Sdes		 * done any writes already we have to synchronize everything.
25498937Sdes		 */
25598937Sdes		PJDLOG_ASSERT(res->hr_secondary_localcnt == 0);
25698937Sdes		res->hr_resuid = resuid;
25798937Sdes		if (metadata_write(res) < 0)
25898937Sdes			exit(EX_NOINPUT);
25998937Sdes		if (nv_exists(nvin, "virgin")) {
26098937Sdes			free(map);
26198937Sdes			map = NULL;
26298937Sdes			mapsize = 0;
26398937Sdes		} else {
26498937Sdes			memset(map, 0xff, mapsize);
26598937Sdes		}
26698937Sdes		nv_add_int8(nvout, 1, "virgin");
26798937Sdes		nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
26898937Sdes	} else if (res->hr_resuid != resuid) {
26998937Sdes		char errmsg[256];
27098937Sdes
27198937Sdes		(void)snprintf(errmsg, sizeof(errmsg),
27298937Sdes		    "Resource unique ID mismatch (primary=%ju, secondary=%ju).",
27398937Sdes		    (uintmax_t)resuid, (uintmax_t)res->hr_resuid);
27498937Sdes		pjdlog_error("%s", errmsg);
27598937Sdes		nv_add_string(nvout, errmsg, "errmsg");
27698937Sdes		if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) {
27798937Sdes			pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s",
27898937Sdes			    res->hr_remoteaddr);
27998937Sdes		}
28098937Sdes		nv_free(nvout);
28198937Sdes		exit(EX_CONFIG);
28298937Sdes	} else if (
28398937Sdes	    /* Is primary is out-of-date? */
28498937Sdes	    (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
28598937Sdes	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
28698937Sdes	    /* Nodes are more or less in sync? */
28798937Sdes	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
28898937Sdes	     res->hr_secondary_remotecnt == res->hr_primary_localcnt) ||
28998937Sdes	    /* Is secondary is out-of-date? */
29098937Sdes	    (res->hr_secondary_localcnt == res->hr_primary_remotecnt &&
29198937Sdes	     res->hr_secondary_remotecnt < res->hr_primary_localcnt)) {
29298937Sdes		/*
29398937Sdes		 * Nodes are more or less in sync or one of the nodes is
29498937Sdes		 * out-of-date.
29598937Sdes		 * It doesn't matter at this point which one, we just have to
29698937Sdes		 * send out local bitmap to the remote node.
29798937Sdes		 */
29898937Sdes		if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) !=
29998937Sdes		    (ssize_t)mapsize) {
30098937Sdes			pjdlog_exit(LOG_ERR, "Unable to read activemap");
30198937Sdes		}
30298937Sdes		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
30398937Sdes		     res->hr_secondary_remotecnt == res->hr_primary_localcnt) {
30498937Sdes			/* Primary is out-of-date, sync from secondary. */
30598937Sdes			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
30698937Sdes		} else {
30798937Sdes			/*
30898937Sdes			 * Secondary is out-of-date or counts match.
30998937Sdes			 * Sync from primary.
31098937Sdes			 */
31198937Sdes			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
31298937Sdes		}
31398937Sdes	} else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
31498937Sdes	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
31598937Sdes		/*
31698937Sdes		 * Not good, we have split-brain condition.
31798937Sdes		 */
31898937Sdes		pjdlog_error("Split-brain detected, exiting.");
31998937Sdes		nv_add_string(nvout, "Split-brain condition!", "errmsg");
32098937Sdes		free(map);
32198937Sdes		map = NULL;
32298937Sdes		mapsize = 0;
32398937Sdes	} else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
32498937Sdes	    res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ {
32598937Sdes		/*
32698937Sdes		 * This should never happen in practise, but we will perform
32798937Sdes		 * full synchronization.
32898937Sdes		 */
32998937Sdes		PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt ||
33098937Sdes		    res->hr_primary_localcnt < res->hr_secondary_remotecnt);
33198937Sdes		mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize -
33298937Sdes		    METADATA_SIZE, res->hr_extentsize,
33398937Sdes		    res->hr_local_sectorsize);
33498937Sdes		memset(map, 0xff, mapsize);
33598937Sdes		if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) {
33698937Sdes			/* In this one of five cases sync from secondary. */
33798937Sdes			nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc");
33898937Sdes		} else {
33998937Sdes			/* For the rest four cases sync from primary. */
34098937Sdes			nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc");
34198937Sdes		}
34298937Sdes		pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).",
34398937Sdes		    (uintmax_t)res->hr_primary_localcnt,
34498937Sdes		    (uintmax_t)res->hr_primary_remotecnt,
34598937Sdes		    (uintmax_t)res->hr_secondary_localcnt,
34698937Sdes		    (uintmax_t)res->hr_secondary_remotecnt);
34798937Sdes	}
34898937Sdes	nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize");
34998937Sdes	if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) {
35098937Sdes		pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s",
35198937Sdes		    res->hr_remoteaddr);
35298937Sdes	}
35398937Sdes	if (map != NULL)
35498937Sdes		free(map);
35598937Sdes	nv_free(nvout);
35698937Sdes#ifdef notyet
35798937Sdes	/* Setup direction. */
35898937Sdes	if (proto_recv(res->hr_remotein, NULL, 0) == -1)
35998937Sdes		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
36098937Sdes#endif
36198937Sdes	if (res->hr_secondary_localcnt > res->hr_primary_remotecnt &&
36298937Sdes	     res->hr_primary_localcnt > res->hr_secondary_remotecnt) {
36398937Sdes		/* Exit on split-brain. */
36498937Sdes		event_send(res, EVENT_SPLITBRAIN);
36598937Sdes		exit(EX_CONFIG);
36698937Sdes	}
36798937Sdes}
36898937Sdes
36998937Sdesvoid
37098937Sdeshastd_secondary(struct hast_resource *res, struct nv *nvin)
37198937Sdes{
37298937Sdes	sigset_t mask;
37398937Sdes	pthread_t td;
37498937Sdes	pid_t pid;
37598937Sdes	int error, mode, debuglevel;
37698937Sdes
37798937Sdes	/*
37898937Sdes	 * Create communication channel between parent and child.
37998937Sdes	 */
38098937Sdes	if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) {
38198937Sdes		KEEP_ERRNO((void)pidfile_remove(pfh));
38298937Sdes		pjdlog_exit(EX_OSERR,
38398937Sdes		    "Unable to create control sockets between parent and child");
38498937Sdes	}
38598937Sdes	/*
38698937Sdes	 * Create communication channel between child and parent.
38798937Sdes	 */
38898937Sdes	if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) {
38998937Sdes		KEEP_ERRNO((void)pidfile_remove(pfh));
39098937Sdes		pjdlog_exit(EX_OSERR,
39198937Sdes		    "Unable to create event sockets between child and parent");
39298937Sdes	}
39398937Sdes
39498937Sdes	pid = fork();
39598937Sdes	if (pid < 0) {
39698937Sdes		KEEP_ERRNO((void)pidfile_remove(pfh));
39798937Sdes		pjdlog_exit(EX_OSERR, "Unable to fork");
39898937Sdes	}
39998937Sdes
40098937Sdes	if (pid > 0) {
40198937Sdes		/* This is parent. */
40298937Sdes		proto_close(res->hr_remotein);
40398937Sdes		res->hr_remotein = NULL;
40498937Sdes		proto_close(res->hr_remoteout);
40598937Sdes		res->hr_remoteout = NULL;
40698937Sdes		/* Declare that we are receiver. */
40798937Sdes		proto_recv(res->hr_event, NULL, 0);
40898937Sdes		/* Declare that we are sender. */
40998937Sdes		proto_send(res->hr_ctrl, NULL, 0);
41098937Sdes		res->hr_workerpid = pid;
41198937Sdes		return;
412124211Sdes	}
41398937Sdes
41498937Sdes	gres = res;
41598937Sdes	mode = pjdlog_mode_get();
41698937Sdes	debuglevel = pjdlog_debug_get();
41798937Sdes
41898937Sdes	/* Declare that we are sender. */
41998937Sdes	proto_send(res->hr_event, NULL, 0);
42098937Sdes	/* Declare that we are receiver. */
42198937Sdes	proto_recv(res->hr_ctrl, NULL, 0);
42298937Sdes	descriptors_cleanup(res);
42398937Sdes
42498937Sdes	descriptors_assert(res, mode);
42598937Sdes
42698937Sdes	pjdlog_init(mode);
42798937Sdes	pjdlog_debug_set(debuglevel);
42898937Sdes	pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role));
42998937Sdes	setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role));
43098937Sdes
43198937Sdes	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
43298937Sdes	PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
43398937Sdes
43498937Sdes	/* Error in setting timeout is not critical, but why should it fail? */
43598937Sdes	if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) < 0)
43698937Sdes		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
43798937Sdes	if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0)
43898937Sdes		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
43998937Sdes
44098937Sdes	init_local(res);
44198937Sdes	init_environment();
44298937Sdes
44398937Sdes	if (drop_privs(res) != 0)
44498937Sdes		exit(EX_CONFIG);
44598937Sdes	pjdlog_info("Privileges successfully dropped.");
44698937Sdes
44798937Sdes	/*
44898937Sdes	 * Create the control thread before sending any event to the parent,
44998937Sdes	 * as we can deadlock when parent sends control request to worker,
45098937Sdes	 * but worker has no control thread started yet, so parent waits.
45198937Sdes	 * In the meantime worker sends an event to the parent, but parent
45298937Sdes	 * is unable to handle the event, because it waits for control
45398937Sdes	 * request response.
45498937Sdes	 */
45598937Sdes	error = pthread_create(&td, NULL, ctrl_thread, res);
45698937Sdes	PJDLOG_ASSERT(error == 0);
45798937Sdes
45898937Sdes	init_remote(res, nvin);
45998937Sdes	event_send(res, EVENT_CONNECT);
46098937Sdes
46198937Sdes	error = pthread_create(&td, NULL, recv_thread, res);
46298937Sdes	PJDLOG_ASSERT(error == 0);
46398937Sdes	error = pthread_create(&td, NULL, disk_thread, res);
46498937Sdes	PJDLOG_ASSERT(error == 0);
46598937Sdes	(void)send_thread(res);
46698937Sdes}
46798937Sdes
46898937Sdesstatic void
46998937Sdesreqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...)
47098937Sdes{
47198937Sdes	char msg[1024];
47298937Sdes	va_list ap;
47398937Sdes	int len;
47498937Sdes
47598937Sdes	va_start(ap, fmt);
47698937Sdes	len = vsnprintf(msg, sizeof(msg), fmt, ap);
47798937Sdes	va_end(ap);
47898937Sdes	if ((size_t)len < sizeof(msg)) {
47998937Sdes		switch (hio->hio_cmd) {
48098937Sdes		case HIO_READ:
48198937Sdes			(void)snprintf(msg + len, sizeof(msg) - len,
48298937Sdes			    "READ(%ju, %ju).", (uintmax_t)hio->hio_offset,
48398937Sdes			    (uintmax_t)hio->hio_length);
48498937Sdes			break;
48598937Sdes		case HIO_DELETE:
48698937Sdes			(void)snprintf(msg + len, sizeof(msg) - len,
48798937Sdes			    "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset,
48898937Sdes			    (uintmax_t)hio->hio_length);
48998937Sdes			break;
49098937Sdes		case HIO_FLUSH:
49198937Sdes			(void)snprintf(msg + len, sizeof(msg) - len, "FLUSH.");
49298937Sdes			break;
49398937Sdes		case HIO_WRITE:
49498937Sdes			(void)snprintf(msg + len, sizeof(msg) - len,
49598937Sdes			    "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset,
49698937Sdes			    (uintmax_t)hio->hio_length);
49798937Sdes			break;
49898937Sdes		case HIO_KEEPALIVE:
49998937Sdes			(void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE.");
50098937Sdes			break;
50198937Sdes		default:
50298937Sdes			(void)snprintf(msg + len, sizeof(msg) - len,
50398937Sdes			    "UNKNOWN(%u).", (unsigned int)hio->hio_cmd);
50498937Sdes			break;
50598937Sdes		}
50698937Sdes	}
50798937Sdes	pjdlog_common(loglevel, debuglevel, error, "%s", msg);
50898937Sdes}
50998937Sdes
51098937Sdesstatic int
51198937Sdesrequnpack(struct hast_resource *res, struct hio *hio)
51298937Sdes{
51398937Sdes
51498937Sdes	hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd");
51598937Sdes	if (hio->hio_cmd == 0) {
51698937Sdes		pjdlog_error("Header contains no 'cmd' field.");
51798937Sdes		hio->hio_error = EINVAL;
51898937Sdes		goto end;
51998937Sdes	}
52098937Sdes	switch (hio->hio_cmd) {
52198937Sdes	case HIO_FLUSH:
52298937Sdes	case HIO_KEEPALIVE:
52398937Sdes		break;
52498937Sdes	case HIO_READ:
52598937Sdes	case HIO_WRITE:
52698937Sdes	case HIO_DELETE:
52798937Sdes		hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset");
52898937Sdes		if (nv_error(hio->hio_nv) != 0) {
52998937Sdes			pjdlog_error("Header is missing 'offset' field.");
53098937Sdes			hio->hio_error = EINVAL;
53198937Sdes			goto end;
53298937Sdes		}
53398937Sdes		hio->hio_length = nv_get_uint64(hio->hio_nv, "length");
53498937Sdes		if (nv_error(hio->hio_nv) != 0) {
53598937Sdes			pjdlog_error("Header is missing 'length' field.");
53698937Sdes			hio->hio_error = EINVAL;
53798937Sdes			goto end;
53898937Sdes		}
53998937Sdes		if (hio->hio_length == 0) {
54098937Sdes			pjdlog_error("Data length is zero.");
54198937Sdes			hio->hio_error = EINVAL;
54298937Sdes			goto end;
54398937Sdes		}
54498937Sdes		if (hio->hio_length > MAXPHYS) {
54598937Sdes			pjdlog_error("Data length is too large (%ju > %ju).",
54698937Sdes			    (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS);
54798937Sdes			hio->hio_error = EINVAL;
54898937Sdes			goto end;
54998937Sdes		}
55098937Sdes		if ((hio->hio_offset % res->hr_local_sectorsize) != 0) {
55198937Sdes			pjdlog_error("Offset %ju is not multiple of sector size.",
55298937Sdes			    (uintmax_t)hio->hio_offset);
55398937Sdes			hio->hio_error = EINVAL;
55498937Sdes			goto end;
55598937Sdes		}
55698937Sdes		if ((hio->hio_length % res->hr_local_sectorsize) != 0) {
55798937Sdes			pjdlog_error("Length %ju is not multiple of sector size.",
55898937Sdes			    (uintmax_t)hio->hio_length);
55998937Sdes			hio->hio_error = EINVAL;
56098937Sdes			goto end;
56198937Sdes		}
56298937Sdes		if (hio->hio_offset + hio->hio_length >
56398937Sdes		    (uint64_t)res->hr_datasize) {
56498937Sdes			pjdlog_error("Data offset is too large (%ju > %ju).",
56598937Sdes			    (uintmax_t)(hio->hio_offset + hio->hio_length),
56698937Sdes			    (uintmax_t)res->hr_datasize);
56798937Sdes			hio->hio_error = EINVAL;
56898937Sdes			goto end;
56998937Sdes		}
57098937Sdes		break;
57198937Sdes	default:
57298937Sdes		pjdlog_error("Header contains invalid 'cmd' (%hhu).",
57398937Sdes		    hio->hio_cmd);
57498937Sdes		hio->hio_error = EINVAL;
57598937Sdes		goto end;
57698937Sdes	}
57798937Sdes	hio->hio_error = 0;
57898937Sdesend:
57998937Sdes	return (hio->hio_error);
58098937Sdes}
58198937Sdes
58298937Sdesstatic __dead2 void
58398937Sdessecondary_exit(int exitcode, const char *fmt, ...)
58498937Sdes{
58598937Sdes	va_list ap;
58698937Sdes
58798937Sdes	PJDLOG_ASSERT(exitcode != EX_OK);
58898937Sdes	va_start(ap, fmt);
58998937Sdes	pjdlogv_errno(LOG_ERR, fmt, ap);
59098937Sdes	va_end(ap);
59198937Sdes	event_send(gres, EVENT_DISCONNECT);
59298937Sdes	exit(exitcode);
59398937Sdes}
59498937Sdes
59598937Sdes/*
59698937Sdes * Thread receives requests from the primary node.
59798937Sdes */
59898937Sdesstatic void *
59998937Sdesrecv_thread(void *arg)
60098937Sdes{
60198937Sdes	struct hast_resource *res = arg;
60298937Sdes	struct hio *hio;
60398937Sdes
60498937Sdes	for (;;) {
60598937Sdes		pjdlog_debug(2, "recv: Taking free request.");
60698937Sdes		QUEUE_TAKE(free, hio);
60798937Sdes		pjdlog_debug(2, "recv: (%p) Got request.", hio);
608113911Sdes		if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) {
609113911Sdes			secondary_exit(EX_TEMPFAIL,
610113911Sdes			    "Unable to receive request header");
61198937Sdes		}
61298937Sdes		if (requnpack(res, hio) != 0) {
61398937Sdes			pjdlog_debug(2,
61498937Sdes			    "recv: (%p) Moving request to the send queue.",
61598937Sdes			    hio);
61698937Sdes			QUEUE_INSERT(send, hio);
61798937Sdes			continue;
61898937Sdes		}
61998937Sdes		switch (hio->hio_cmd) {
62098937Sdes		case HIO_READ:
62198937Sdes			res->hr_stat_read++;
62298937Sdes			break;
62398937Sdes		case HIO_WRITE:
624106130Sdes			res->hr_stat_write++;
62598937Sdes			break;
62698937Sdes		case HIO_DELETE:
62798937Sdes			res->hr_stat_delete++;
62898937Sdes			break;
62998937Sdes		case HIO_FLUSH:
630106130Sdes			res->hr_stat_flush++;
63198937Sdes			break;
63298937Sdes		}
63398937Sdes		reqlog(LOG_DEBUG, 2, -1, hio,
63498937Sdes		    "recv: (%p) Got request header: ", hio);
63598937Sdes		if (hio->hio_cmd == HIO_KEEPALIVE) {
63698937Sdes			pjdlog_debug(2,
63798937Sdes			    "recv: (%p) Moving request to the free queue.",
63898937Sdes			    hio);
63998937Sdes			nv_free(hio->hio_nv);
64098937Sdes			QUEUE_INSERT(free, hio);
64198937Sdes			continue;
64298937Sdes		} else if (hio->hio_cmd == HIO_WRITE) {
64398937Sdes			if (hast_proto_recv_data(res, res->hr_remotein,
64498937Sdes			    hio->hio_nv, hio->hio_data, MAXPHYS) < 0) {
64598937Sdes				secondary_exit(EX_TEMPFAIL,
64698937Sdes				    "Unable to receive request data");
64798937Sdes			}
64898937Sdes		}
64998937Sdes		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
65098937Sdes		    hio);
65198937Sdes		QUEUE_INSERT(disk, hio);
65298937Sdes	}
65398937Sdes	/* NOTREACHED */
65498937Sdes	return (NULL);
65598937Sdes}
65699768Sdes
65799768Sdes/*
65898937Sdes * Thread reads from or writes to local component and also handles DELETE and
65998937Sdes * FLUSH requests.
66098937Sdes */
66198937Sdesstatic void *
66298937Sdesdisk_thread(void *arg)
66398937Sdes{
664113911Sdes	struct hast_resource *res = arg;
665113911Sdes	struct hio *hio;
666113911Sdes	ssize_t ret;
667113911Sdes	bool clear_activemap;
668113911Sdes
669113911Sdes	clear_activemap = true;
670113911Sdes
671113911Sdes	for (;;) {
672113911Sdes		pjdlog_debug(2, "disk: Taking request.");
673113911Sdes		QUEUE_TAKE(disk, hio);
674113911Sdes		while (clear_activemap) {
675113911Sdes			unsigned char *map;
676113911Sdes			size_t mapsize;
67798937Sdes
67898937Sdes			/*
67998937Sdes			 * When first request is received, it means that primary
68098937Sdes			 * already received our activemap, merged it and stored
68198937Sdes			 * locally. We can now safely clear our activemap.
68298937Sdes			 */
68398937Sdes			mapsize =
68498937Sdes			    activemap_calc_ondisk_size(res->hr_local_mediasize -
68598937Sdes			    METADATA_SIZE, res->hr_extentsize,
68698937Sdes			    res->hr_local_sectorsize);
68798937Sdes			map = calloc(1, mapsize);
68898937Sdes			if (map == NULL) {
68998937Sdes				pjdlog_warning("Unable to allocate memory to clear local activemap.");
69098937Sdes				break;
69198937Sdes			}
69298937Sdes			if (pwrite(res->hr_localfd, map, mapsize,
69398937Sdes			    METADATA_SIZE) != (ssize_t)mapsize) {
69498937Sdes				pjdlog_errno(LOG_WARNING,
69598937Sdes				    "Unable to store cleared activemap");
69698937Sdes				free(map);
69798937Sdes				break;
69898937Sdes			}
69998937Sdes			free(map);
70098937Sdes			clear_activemap = false;
70198937Sdes			pjdlog_debug(1, "Local activemap cleared.");
70298937Sdes		}
70398937Sdes		reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio);
70498937Sdes		/* Handle the actual request. */
705113911Sdes		switch (hio->hio_cmd) {
706113911Sdes		case HIO_READ:
707113911Sdes			ret = pread(res->hr_localfd, hio->hio_data,
70898937Sdes			    hio->hio_length,
70998937Sdes			    hio->hio_offset + res->hr_localoff);
71098937Sdes			if (ret < 0)
71198937Sdes				hio->hio_error = errno;
71298937Sdes			else if (ret != (int64_t)hio->hio_length)
71398937Sdes				hio->hio_error = EIO;
71498937Sdes			else
71598937Sdes				hio->hio_error = 0;
71698937Sdes			break;
71798937Sdes		case HIO_WRITE:
71898937Sdes			ret = pwrite(res->hr_localfd, hio->hio_data,
71998937Sdes			    hio->hio_length,
72098937Sdes			    hio->hio_offset + res->hr_localoff);
72198937Sdes			if (ret < 0)
72298937Sdes				hio->hio_error = errno;
72398937Sdes			else if (ret != (int64_t)hio->hio_length)
72498937Sdes				hio->hio_error = EIO;
72598937Sdes			else
72698937Sdes				hio->hio_error = 0;
72798937Sdes			break;
72898937Sdes		case HIO_DELETE:
72998937Sdes			ret = g_delete(res->hr_localfd,
73098937Sdes			    hio->hio_offset + res->hr_localoff,
73198937Sdes			    hio->hio_length);
73298937Sdes			if (ret < 0)
73398937Sdes				hio->hio_error = errno;
73498937Sdes			else
73598937Sdes				hio->hio_error = 0;
73698937Sdes			break;
73798937Sdes		case HIO_FLUSH:
73898937Sdes			ret = g_flush(res->hr_localfd);
73998937Sdes			if (ret < 0)
74098937Sdes				hio->hio_error = errno;
74198937Sdes			else
74298937Sdes				hio->hio_error = 0;
74398937Sdes			break;
744113911Sdes		}
745113911Sdes		if (hio->hio_error != 0) {
746113911Sdes			reqlog(LOG_ERR, 0, hio->hio_error, hio,
747113911Sdes			    "Request failed: ");
748113911Sdes		}
749113911Sdes		pjdlog_debug(2, "disk: (%p) Moving request to the send queue.",
750113911Sdes		    hio);
751113911Sdes		QUEUE_INSERT(send, hio);
752113911Sdes	}
753113911Sdes	/* NOTREACHED */
754113911Sdes	return (NULL);
755113911Sdes}
756113911Sdes
75798937Sdes/*
75898937Sdes * Thread sends requests back to primary node.
75998937Sdes */
76098937Sdesstatic void *
76198937Sdessend_thread(void *arg)
76298937Sdes{
76398937Sdes	struct hast_resource *res = arg;
76498937Sdes	struct nv *nvout;
76598937Sdes	struct hio *hio;
76698937Sdes	void *data;
76798937Sdes	size_t length;
76898937Sdes
76998937Sdes	for (;;) {
77098937Sdes		pjdlog_debug(2, "send: Taking request.");
77198937Sdes		QUEUE_TAKE(send, hio);
77298937Sdes		reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio);
77398937Sdes		nvout = nv_alloc();
77498937Sdes		/* Copy sequence number. */
77598937Sdes		nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq");
77698937Sdes		switch (hio->hio_cmd) {
77798937Sdes		case HIO_READ:
77898937Sdes			if (hio->hio_error == 0) {
77998937Sdes				data = hio->hio_data;
78098937Sdes				length = hio->hio_length;
78198937Sdes				break;
78298937Sdes			}
78398937Sdes			/*
78498937Sdes			 * We send no data in case of an error.
78598937Sdes			 */
78698937Sdes			/* FALLTHROUGH */
78798937Sdes		case HIO_DELETE:
78898937Sdes		case HIO_FLUSH:
78998937Sdes		case HIO_WRITE:
79098937Sdes			data = NULL;
79198937Sdes			length = 0;
79298937Sdes			break;
79398937Sdes		default:
79498937Sdes			abort();
79598937Sdes			break;
79698937Sdes		}
79798937Sdes		if (hio->hio_error != 0)
79898937Sdes			nv_add_int16(nvout, hio->hio_error, "error");
79998937Sdes		if (hast_proto_send(res, res->hr_remoteout, nvout, data,
80098937Sdes		    length) < 0) {
80198937Sdes			secondary_exit(EX_TEMPFAIL, "Unable to send reply.");
80298937Sdes		}
80398937Sdes		nv_free(nvout);
80498937Sdes		pjdlog_debug(2, "send: (%p) Moving request to the free queue.",
80598937Sdes		    hio);
80698937Sdes		nv_free(hio->hio_nv);
80798937Sdes		hio->hio_error = 0;
80898937Sdes		QUEUE_INSERT(free, hio);
80998937Sdes	}
81098937Sdes	/* NOTREACHED */
81198937Sdes	return (NULL);
81298937Sdes}
81398937Sdes