secondary.c revision 223181
198937Sdes/*- 298937Sdes * Copyright (c) 2009-2010 The FreeBSD Foundation 398937Sdes * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 498937Sdes * All rights reserved. 598937Sdes * 698937Sdes * This software was developed by Pawel Jakub Dawidek under sponsorship from 798937Sdes * the FreeBSD Foundation. 898937Sdes * 998937Sdes * Redistribution and use in source and binary forms, with or without 1098937Sdes * modification, are permitted provided that the following conditions 1198937Sdes * are met: 1298937Sdes * 1. Redistributions of source code must retain the above copyright 1398937Sdes * notice, this list of conditions and the following disclaimer. 1498937Sdes * 2. Redistributions in binary form must reproduce the above copyright 1598937Sdes * notice, this list of conditions and the following disclaimer in the 1698937Sdes * documentation and/or other materials provided with the distribution. 1798937Sdes * 1898937Sdes * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 1998937Sdes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 2098937Sdes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2198937Sdes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 2298937Sdes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2398937Sdes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2498937Sdes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2598937Sdes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2698937Sdes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2798937Sdes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2898937Sdes * SUCH DAMAGE. 2998937Sdes */ 3098937Sdes 3198937Sdes#include <sys/cdefs.h> 3298937Sdes__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 223181 2011-06-17 07:07:26Z trociny $"); 3398937Sdes 3498937Sdes#include <sys/param.h> 3598937Sdes#include <sys/time.h> 3698937Sdes#include <sys/bio.h> 3798937Sdes#include <sys/disk.h> 3898937Sdes#include <sys/stat.h> 3998937Sdes 4098937Sdes#include <err.h> 4198937Sdes#include <errno.h> 4298937Sdes#include <fcntl.h> 4398937Sdes#include <libgeom.h> 4498937Sdes#include <pthread.h> 4598937Sdes#include <signal.h> 4698937Sdes#include <stdint.h> 4798937Sdes#include <stdio.h> 4898937Sdes#include <string.h> 4998937Sdes#include <sysexits.h> 5098937Sdes#include <unistd.h> 5198937Sdes 5298937Sdes#include <activemap.h> 5398937Sdes#include <nv.h> 5498937Sdes#include <pjdlog.h> 5598937Sdes 5698937Sdes#include "control.h" 5798937Sdes#include "event.h" 5898937Sdes#include "hast.h" 5998937Sdes#include "hast_proto.h" 6098937Sdes#include "hastd.h" 6198937Sdes#include "hooks.h" 6298937Sdes#include "metadata.h" 6398937Sdes#include "proto.h" 6498937Sdes#include "subr.h" 6598937Sdes#include "synch.h" 6698937Sdes 6798937Sdesstruct hio { 6898937Sdes uint64_t hio_seq; 6998937Sdes int hio_error; 7098937Sdes struct nv *hio_nv; 7198937Sdes void *hio_data; 7298937Sdes uint8_t hio_cmd; 7398937Sdes uint64_t hio_offset; 7498937Sdes uint64_t hio_length; 7598937Sdes TAILQ_ENTRY(hio) hio_next; 7698937Sdes}; 7798937Sdes 7898937Sdesstatic struct hast_resource *gres; 7998937Sdes 8098937Sdes/* 8198937Sdes * Free list holds unused structures. When free list is empty, we have to wait 8298937Sdes * until some in-progress requests are freed. 8398937Sdes */ 8498937Sdesstatic TAILQ_HEAD(, hio) hio_free_list; 8598937Sdesstatic pthread_mutex_t hio_free_list_lock; 8698937Sdesstatic pthread_cond_t hio_free_list_cond; 8798937Sdes/* 8898937Sdes * Disk thread (the one that do I/O requests) takes requests from this list. 8998937Sdes */ 9098937Sdesstatic TAILQ_HEAD(, hio) hio_disk_list; 9198937Sdesstatic pthread_mutex_t hio_disk_list_lock; 9298937Sdesstatic pthread_cond_t hio_disk_list_cond; 9398937Sdes/* 9498937Sdes * There is one recv list for every component, although local components don't 9598937Sdes * use recv lists as local requests are done synchronously. 9698937Sdes */ 9798937Sdesstatic TAILQ_HEAD(, hio) hio_send_list; 9898937Sdesstatic pthread_mutex_t hio_send_list_lock; 9998937Sdesstatic pthread_cond_t hio_send_list_cond; 10098937Sdes 10198937Sdes/* 10298937Sdes * Maximum number of outstanding I/O requests. 10398937Sdes */ 10498937Sdes#define HAST_HIO_MAX 256 10598937Sdes 10698937Sdesstatic void *recv_thread(void *arg); 10798937Sdesstatic void *disk_thread(void *arg); 10898937Sdesstatic void *send_thread(void *arg); 10998937Sdes 11098937Sdes#define QUEUE_INSERT(name, hio) do { \ 11198937Sdes bool _wakeup; \ 11298937Sdes \ 11398937Sdes mtx_lock(&hio_##name##_list_lock); \ 11498937Sdes _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 11598937Sdes TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 11698937Sdes mtx_unlock(&hio_##name##_list_lock); \ 11798937Sdes if (_wakeup) \ 11898937Sdes cv_signal(&hio_##name##_list_cond); \ 11998937Sdes} while (0) 12098937Sdes#define QUEUE_TAKE(name, hio) do { \ 12198937Sdes mtx_lock(&hio_##name##_list_lock); \ 12298937Sdes while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 12398937Sdes cv_wait(&hio_##name##_list_cond, \ 12498937Sdes &hio_##name##_list_lock); \ 12598937Sdes } \ 12698937Sdes TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 12798937Sdes mtx_unlock(&hio_##name##_list_lock); \ 12898937Sdes} while (0) 12998937Sdes 13098937Sdesstatic void 13198937Sdesinit_environment(void) 13298937Sdes{ 13398937Sdes struct hio *hio; 13498937Sdes unsigned int ii; 13598937Sdes 13698937Sdes /* 13798937Sdes * Initialize lists, their locks and theirs condition variables. 13898937Sdes */ 13998937Sdes TAILQ_INIT(&hio_free_list); 14098937Sdes mtx_init(&hio_free_list_lock); 14198937Sdes cv_init(&hio_free_list_cond); 14298937Sdes TAILQ_INIT(&hio_disk_list); 14398937Sdes mtx_init(&hio_disk_list_lock); 14498937Sdes cv_init(&hio_disk_list_cond); 14598937Sdes TAILQ_INIT(&hio_send_list); 14698937Sdes mtx_init(&hio_send_list_lock); 14798937Sdes cv_init(&hio_send_list_cond); 14898937Sdes 14998937Sdes /* 15098937Sdes * Allocate requests pool and initialize requests. 15198937Sdes */ 15298937Sdes for (ii = 0; ii < HAST_HIO_MAX; ii++) { 15398937Sdes hio = malloc(sizeof(*hio)); 15498937Sdes if (hio == NULL) { 15598937Sdes pjdlog_exitx(EX_TEMPFAIL, 15698937Sdes "Unable to allocate memory (%zu bytes) for hio request.", 15798937Sdes sizeof(*hio)); 15898937Sdes } 15998937Sdes hio->hio_error = 0; 16098937Sdes hio->hio_data = malloc(MAXPHYS); 161124211Sdes if (hio->hio_data == NULL) { 16299768Sdes pjdlog_exitx(EX_TEMPFAIL, 16398937Sdes "Unable to allocate memory (%zu bytes) for gctl_data.", 16498937Sdes (size_t)MAXPHYS); 16598937Sdes } 16698937Sdes TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 16798937Sdes } 16898937Sdes} 16998937Sdes 17098937Sdesstatic void 17198937Sdesinit_local(struct hast_resource *res) 17298937Sdes{ 17398937Sdes 17498937Sdes if (metadata_read(res, true) < 0) 17598937Sdes exit(EX_NOINPUT); 17698937Sdes} 17798937Sdes 17898937Sdesstatic void 17998937Sdesinit_remote(struct hast_resource *res, struct nv *nvin) 18098937Sdes{ 18198937Sdes uint64_t resuid; 18298937Sdes struct nv *nvout; 18398937Sdes unsigned char *map; 18498937Sdes size_t mapsize; 18598937Sdes 18698937Sdes#ifdef notyet 18798937Sdes /* Setup direction. */ 18898937Sdes if (proto_send(res->hr_remoteout, NULL, 0) == -1) 18998937Sdes pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 19098937Sdes#endif 19198937Sdes 19298937Sdes map = NULL; 19398937Sdes mapsize = 0; 19498937Sdes nvout = nv_alloc(); 19598937Sdes nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 19698937Sdes nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 19798937Sdes resuid = nv_get_uint64(nvin, "resuid"); 19898937Sdes res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 19998937Sdes res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 20098937Sdes nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 20198937Sdes nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 20298937Sdes mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 20398937Sdes METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 20498937Sdes map = malloc(mapsize); 20598937Sdes if (map == NULL) { 20698937Sdes pjdlog_exitx(EX_TEMPFAIL, 20798937Sdes "Unable to allocate memory (%zu bytes) for activemap.", 20898937Sdes mapsize); 20998937Sdes } 21098937Sdes /* 21198937Sdes * When we work as primary and secondary is missing we will increase 21298937Sdes * localcnt in our metadata. When secondary is connected and synced 21398937Sdes * we make localcnt be equal to remotecnt, which means nodes are more 21498937Sdes * or less in sync. 21598937Sdes * Split-brain condition is when both nodes are not able to communicate 21698937Sdes * and are both configured as primary nodes. In turn, they can both 21798937Sdes * make incompatible changes to the data and we have to detect that. 21898937Sdes * Under split-brain condition we will increase our localcnt on first 21998937Sdes * write and remote node will increase its localcnt on first write. 22098937Sdes * When we connect we can see that primary's localcnt is greater than 22198937Sdes * our remotecnt (primary was modified while we weren't watching) and 22298937Sdes * our localcnt is greater than primary's remotecnt (we were modified 22398937Sdes * while primary wasn't watching). 22498937Sdes * There are many possible combinations which are all gathered below. 22598937Sdes * Don't pay too much attention to exact numbers, the more important 22698937Sdes * is to compare them. We compare secondary's local with primary's 22798937Sdes * remote and secondary's remote with primary's local. 22898937Sdes * Note that every case where primary's localcnt is smaller than 22998937Sdes * secondary's remotecnt and where secondary's localcnt is smaller than 23098937Sdes * primary's remotecnt should be impossible in practise. We will perform 23198937Sdes * full synchronization then. Those cases are marked with an asterisk. 23298937Sdes * Regular synchronization means that only extents marked as dirty are 23398937Sdes * synchronized (regular synchronization). 23498937Sdes * 23598937Sdes * SECONDARY METADATA PRIMARY METADATA 23698937Sdes * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 23798937Sdes * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 23898937Sdes * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 23998937Sdes * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 24098937Sdes * regular sync from secondary. 24198937Sdes * local=3 remote=3 local=3 remote=3 Regular sync just in case. 24298937Sdes * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 24398937Sdes * local=3 remote=3 local=4 remote=2 Split-brain condition. 24498937Sdes * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 24598937Sdes * regular sync from primary. 24698937Sdes * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 24798937Sdes */ 24898937Sdes if (res->hr_resuid == 0) { 24998937Sdes /* 25098937Sdes * Provider is used for the first time. If primary node done no 25198937Sdes * writes yet as well (we will find "virgin" argument) then 25298937Sdes * there is no need to synchronize anything. If primary node 25398937Sdes * done any writes already we have to synchronize everything. 25498937Sdes */ 25598937Sdes PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 25698937Sdes res->hr_resuid = resuid; 25798937Sdes if (metadata_write(res) < 0) 25898937Sdes exit(EX_NOINPUT); 25998937Sdes if (nv_exists(nvin, "virgin")) { 26098937Sdes free(map); 26198937Sdes map = NULL; 26298937Sdes mapsize = 0; 26398937Sdes } else { 26498937Sdes memset(map, 0xff, mapsize); 26598937Sdes } 26698937Sdes nv_add_int8(nvout, 1, "virgin"); 26798937Sdes nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 26898937Sdes } else if (res->hr_resuid != resuid) { 26998937Sdes char errmsg[256]; 27098937Sdes 27198937Sdes (void)snprintf(errmsg, sizeof(errmsg), 27298937Sdes "Resource unique ID mismatch (primary=%ju, secondary=%ju).", 27398937Sdes (uintmax_t)resuid, (uintmax_t)res->hr_resuid); 27498937Sdes pjdlog_error("%s", errmsg); 27598937Sdes nv_add_string(nvout, errmsg, "errmsg"); 27698937Sdes if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) < 0) { 27798937Sdes pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", 27898937Sdes res->hr_remoteaddr); 27998937Sdes } 28098937Sdes nv_free(nvout); 28198937Sdes exit(EX_CONFIG); 28298937Sdes } else if ( 28398937Sdes /* Is primary is out-of-date? */ 28498937Sdes (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 28598937Sdes res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 28698937Sdes /* Nodes are more or less in sync? */ 28798937Sdes (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 28898937Sdes res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 28998937Sdes /* Is secondary is out-of-date? */ 29098937Sdes (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 29198937Sdes res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 29298937Sdes /* 29398937Sdes * Nodes are more or less in sync or one of the nodes is 29498937Sdes * out-of-date. 29598937Sdes * It doesn't matter at this point which one, we just have to 29698937Sdes * send out local bitmap to the remote node. 29798937Sdes */ 29898937Sdes if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 29998937Sdes (ssize_t)mapsize) { 30098937Sdes pjdlog_exit(LOG_ERR, "Unable to read activemap"); 30198937Sdes } 30298937Sdes if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 30398937Sdes res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 30498937Sdes /* Primary is out-of-date, sync from secondary. */ 30598937Sdes nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 30698937Sdes } else { 30798937Sdes /* 30898937Sdes * Secondary is out-of-date or counts match. 30998937Sdes * Sync from primary. 31098937Sdes */ 31198937Sdes nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 31298937Sdes } 31398937Sdes } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 31498937Sdes res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 31598937Sdes /* 31698937Sdes * Not good, we have split-brain condition. 31798937Sdes */ 31898937Sdes pjdlog_error("Split-brain detected, exiting."); 31998937Sdes nv_add_string(nvout, "Split-brain condition!", "errmsg"); 32098937Sdes free(map); 32198937Sdes map = NULL; 32298937Sdes mapsize = 0; 32398937Sdes } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 32498937Sdes res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 32598937Sdes /* 32698937Sdes * This should never happen in practise, but we will perform 32798937Sdes * full synchronization. 32898937Sdes */ 32998937Sdes PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 33098937Sdes res->hr_primary_localcnt < res->hr_secondary_remotecnt); 33198937Sdes mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 33298937Sdes METADATA_SIZE, res->hr_extentsize, 33398937Sdes res->hr_local_sectorsize); 33498937Sdes memset(map, 0xff, mapsize); 33598937Sdes if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 33698937Sdes /* In this one of five cases sync from secondary. */ 33798937Sdes nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 33898937Sdes } else { 33998937Sdes /* For the rest four cases sync from primary. */ 34098937Sdes nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 34198937Sdes } 34298937Sdes pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 34398937Sdes (uintmax_t)res->hr_primary_localcnt, 34498937Sdes (uintmax_t)res->hr_primary_remotecnt, 34598937Sdes (uintmax_t)res->hr_secondary_localcnt, 34698937Sdes (uintmax_t)res->hr_secondary_remotecnt); 34798937Sdes } 34898937Sdes nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 34998937Sdes if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 35098937Sdes pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", 35198937Sdes res->hr_remoteaddr); 35298937Sdes } 35398937Sdes if (map != NULL) 35498937Sdes free(map); 35598937Sdes nv_free(nvout); 35698937Sdes#ifdef notyet 35798937Sdes /* Setup direction. */ 35898937Sdes if (proto_recv(res->hr_remotein, NULL, 0) == -1) 35998937Sdes pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 36098937Sdes#endif 36198937Sdes if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 36298937Sdes res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 36398937Sdes /* Exit on split-brain. */ 36498937Sdes event_send(res, EVENT_SPLITBRAIN); 36598937Sdes exit(EX_CONFIG); 36698937Sdes } 36798937Sdes} 36898937Sdes 36998937Sdesvoid 37098937Sdeshastd_secondary(struct hast_resource *res, struct nv *nvin) 37198937Sdes{ 37298937Sdes sigset_t mask; 37398937Sdes pthread_t td; 37498937Sdes pid_t pid; 37598937Sdes int error, mode, debuglevel; 37698937Sdes 37798937Sdes /* 37898937Sdes * Create communication channel between parent and child. 37998937Sdes */ 38098937Sdes if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 38198937Sdes KEEP_ERRNO((void)pidfile_remove(pfh)); 38298937Sdes pjdlog_exit(EX_OSERR, 38398937Sdes "Unable to create control sockets between parent and child"); 38498937Sdes } 38598937Sdes /* 38698937Sdes * Create communication channel between child and parent. 38798937Sdes */ 38898937Sdes if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 38998937Sdes KEEP_ERRNO((void)pidfile_remove(pfh)); 39098937Sdes pjdlog_exit(EX_OSERR, 39198937Sdes "Unable to create event sockets between child and parent"); 39298937Sdes } 39398937Sdes 39498937Sdes pid = fork(); 39598937Sdes if (pid < 0) { 39698937Sdes KEEP_ERRNO((void)pidfile_remove(pfh)); 39798937Sdes pjdlog_exit(EX_OSERR, "Unable to fork"); 39898937Sdes } 39998937Sdes 40098937Sdes if (pid > 0) { 40198937Sdes /* This is parent. */ 40298937Sdes proto_close(res->hr_remotein); 40398937Sdes res->hr_remotein = NULL; 40498937Sdes proto_close(res->hr_remoteout); 40598937Sdes res->hr_remoteout = NULL; 40698937Sdes /* Declare that we are receiver. */ 40798937Sdes proto_recv(res->hr_event, NULL, 0); 40898937Sdes /* Declare that we are sender. */ 40998937Sdes proto_send(res->hr_ctrl, NULL, 0); 41098937Sdes res->hr_workerpid = pid; 41198937Sdes return; 412124211Sdes } 41398937Sdes 41498937Sdes gres = res; 41598937Sdes mode = pjdlog_mode_get(); 41698937Sdes debuglevel = pjdlog_debug_get(); 41798937Sdes 41898937Sdes /* Declare that we are sender. */ 41998937Sdes proto_send(res->hr_event, NULL, 0); 42098937Sdes /* Declare that we are receiver. */ 42198937Sdes proto_recv(res->hr_ctrl, NULL, 0); 42298937Sdes descriptors_cleanup(res); 42398937Sdes 42498937Sdes descriptors_assert(res, mode); 42598937Sdes 42698937Sdes pjdlog_init(mode); 42798937Sdes pjdlog_debug_set(debuglevel); 42898937Sdes pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 42998937Sdes setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 43098937Sdes 43198937Sdes PJDLOG_VERIFY(sigemptyset(&mask) == 0); 43298937Sdes PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 43398937Sdes 43498937Sdes /* Error in setting timeout is not critical, but why should it fail? */ 43598937Sdes if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) < 0) 43698937Sdes pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 43798937Sdes if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 43898937Sdes pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 43998937Sdes 44098937Sdes init_local(res); 44198937Sdes init_environment(); 44298937Sdes 44398937Sdes if (drop_privs(res) != 0) 44498937Sdes exit(EX_CONFIG); 44598937Sdes pjdlog_info("Privileges successfully dropped."); 44698937Sdes 44798937Sdes /* 44898937Sdes * Create the control thread before sending any event to the parent, 44998937Sdes * as we can deadlock when parent sends control request to worker, 45098937Sdes * but worker has no control thread started yet, so parent waits. 45198937Sdes * In the meantime worker sends an event to the parent, but parent 45298937Sdes * is unable to handle the event, because it waits for control 45398937Sdes * request response. 45498937Sdes */ 45598937Sdes error = pthread_create(&td, NULL, ctrl_thread, res); 45698937Sdes PJDLOG_ASSERT(error == 0); 45798937Sdes 45898937Sdes init_remote(res, nvin); 45998937Sdes event_send(res, EVENT_CONNECT); 46098937Sdes 46198937Sdes error = pthread_create(&td, NULL, recv_thread, res); 46298937Sdes PJDLOG_ASSERT(error == 0); 46398937Sdes error = pthread_create(&td, NULL, disk_thread, res); 46498937Sdes PJDLOG_ASSERT(error == 0); 46598937Sdes (void)send_thread(res); 46698937Sdes} 46798937Sdes 46898937Sdesstatic void 46998937Sdesreqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 47098937Sdes{ 47198937Sdes char msg[1024]; 47298937Sdes va_list ap; 47398937Sdes int len; 47498937Sdes 47598937Sdes va_start(ap, fmt); 47698937Sdes len = vsnprintf(msg, sizeof(msg), fmt, ap); 47798937Sdes va_end(ap); 47898937Sdes if ((size_t)len < sizeof(msg)) { 47998937Sdes switch (hio->hio_cmd) { 48098937Sdes case HIO_READ: 48198937Sdes (void)snprintf(msg + len, sizeof(msg) - len, 48298937Sdes "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 48398937Sdes (uintmax_t)hio->hio_length); 48498937Sdes break; 48598937Sdes case HIO_DELETE: 48698937Sdes (void)snprintf(msg + len, sizeof(msg) - len, 48798937Sdes "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 48898937Sdes (uintmax_t)hio->hio_length); 48998937Sdes break; 49098937Sdes case HIO_FLUSH: 49198937Sdes (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 49298937Sdes break; 49398937Sdes case HIO_WRITE: 49498937Sdes (void)snprintf(msg + len, sizeof(msg) - len, 49598937Sdes "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 49698937Sdes (uintmax_t)hio->hio_length); 49798937Sdes break; 49898937Sdes case HIO_KEEPALIVE: 49998937Sdes (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 50098937Sdes break; 50198937Sdes default: 50298937Sdes (void)snprintf(msg + len, sizeof(msg) - len, 50398937Sdes "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 50498937Sdes break; 50598937Sdes } 50698937Sdes } 50798937Sdes pjdlog_common(loglevel, debuglevel, error, "%s", msg); 50898937Sdes} 50998937Sdes 51098937Sdesstatic int 51198937Sdesrequnpack(struct hast_resource *res, struct hio *hio) 51298937Sdes{ 51398937Sdes 51498937Sdes hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd"); 51598937Sdes if (hio->hio_cmd == 0) { 51698937Sdes pjdlog_error("Header contains no 'cmd' field."); 51798937Sdes hio->hio_error = EINVAL; 51898937Sdes goto end; 51998937Sdes } 52098937Sdes switch (hio->hio_cmd) { 52198937Sdes case HIO_FLUSH: 52298937Sdes case HIO_KEEPALIVE: 52398937Sdes break; 52498937Sdes case HIO_READ: 52598937Sdes case HIO_WRITE: 52698937Sdes case HIO_DELETE: 52798937Sdes hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset"); 52898937Sdes if (nv_error(hio->hio_nv) != 0) { 52998937Sdes pjdlog_error("Header is missing 'offset' field."); 53098937Sdes hio->hio_error = EINVAL; 53198937Sdes goto end; 53298937Sdes } 53398937Sdes hio->hio_length = nv_get_uint64(hio->hio_nv, "length"); 53498937Sdes if (nv_error(hio->hio_nv) != 0) { 53598937Sdes pjdlog_error("Header is missing 'length' field."); 53698937Sdes hio->hio_error = EINVAL; 53798937Sdes goto end; 53898937Sdes } 53998937Sdes if (hio->hio_length == 0) { 54098937Sdes pjdlog_error("Data length is zero."); 54198937Sdes hio->hio_error = EINVAL; 54298937Sdes goto end; 54398937Sdes } 54498937Sdes if (hio->hio_length > MAXPHYS) { 54598937Sdes pjdlog_error("Data length is too large (%ju > %ju).", 54698937Sdes (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 54798937Sdes hio->hio_error = EINVAL; 54898937Sdes goto end; 54998937Sdes } 55098937Sdes if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 55198937Sdes pjdlog_error("Offset %ju is not multiple of sector size.", 55298937Sdes (uintmax_t)hio->hio_offset); 55398937Sdes hio->hio_error = EINVAL; 55498937Sdes goto end; 55598937Sdes } 55698937Sdes if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 55798937Sdes pjdlog_error("Length %ju is not multiple of sector size.", 55898937Sdes (uintmax_t)hio->hio_length); 55998937Sdes hio->hio_error = EINVAL; 56098937Sdes goto end; 56198937Sdes } 56298937Sdes if (hio->hio_offset + hio->hio_length > 56398937Sdes (uint64_t)res->hr_datasize) { 56498937Sdes pjdlog_error("Data offset is too large (%ju > %ju).", 56598937Sdes (uintmax_t)(hio->hio_offset + hio->hio_length), 56698937Sdes (uintmax_t)res->hr_datasize); 56798937Sdes hio->hio_error = EINVAL; 56898937Sdes goto end; 56998937Sdes } 57098937Sdes break; 57198937Sdes default: 57298937Sdes pjdlog_error("Header contains invalid 'cmd' (%hhu).", 57398937Sdes hio->hio_cmd); 57498937Sdes hio->hio_error = EINVAL; 57598937Sdes goto end; 57698937Sdes } 57798937Sdes hio->hio_error = 0; 57898937Sdesend: 57998937Sdes return (hio->hio_error); 58098937Sdes} 58198937Sdes 58298937Sdesstatic __dead2 void 58398937Sdessecondary_exit(int exitcode, const char *fmt, ...) 58498937Sdes{ 58598937Sdes va_list ap; 58698937Sdes 58798937Sdes PJDLOG_ASSERT(exitcode != EX_OK); 58898937Sdes va_start(ap, fmt); 58998937Sdes pjdlogv_errno(LOG_ERR, fmt, ap); 59098937Sdes va_end(ap); 59198937Sdes event_send(gres, EVENT_DISCONNECT); 59298937Sdes exit(exitcode); 59398937Sdes} 59498937Sdes 59598937Sdes/* 59698937Sdes * Thread receives requests from the primary node. 59798937Sdes */ 59898937Sdesstatic void * 59998937Sdesrecv_thread(void *arg) 60098937Sdes{ 60198937Sdes struct hast_resource *res = arg; 60298937Sdes struct hio *hio; 60398937Sdes 60498937Sdes for (;;) { 60598937Sdes pjdlog_debug(2, "recv: Taking free request."); 60698937Sdes QUEUE_TAKE(free, hio); 60798937Sdes pjdlog_debug(2, "recv: (%p) Got request.", hio); 608113911Sdes if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) { 609113911Sdes secondary_exit(EX_TEMPFAIL, 610113911Sdes "Unable to receive request header"); 61198937Sdes } 61298937Sdes if (requnpack(res, hio) != 0) { 61398937Sdes pjdlog_debug(2, 61498937Sdes "recv: (%p) Moving request to the send queue.", 61598937Sdes hio); 61698937Sdes QUEUE_INSERT(send, hio); 61798937Sdes continue; 61898937Sdes } 61998937Sdes switch (hio->hio_cmd) { 62098937Sdes case HIO_READ: 62198937Sdes res->hr_stat_read++; 62298937Sdes break; 62398937Sdes case HIO_WRITE: 624106130Sdes res->hr_stat_write++; 62598937Sdes break; 62698937Sdes case HIO_DELETE: 62798937Sdes res->hr_stat_delete++; 62898937Sdes break; 62998937Sdes case HIO_FLUSH: 630106130Sdes res->hr_stat_flush++; 63198937Sdes break; 63298937Sdes } 63398937Sdes reqlog(LOG_DEBUG, 2, -1, hio, 63498937Sdes "recv: (%p) Got request header: ", hio); 63598937Sdes if (hio->hio_cmd == HIO_KEEPALIVE) { 63698937Sdes pjdlog_debug(2, 63798937Sdes "recv: (%p) Moving request to the free queue.", 63898937Sdes hio); 63998937Sdes nv_free(hio->hio_nv); 64098937Sdes QUEUE_INSERT(free, hio); 64198937Sdes continue; 64298937Sdes } else if (hio->hio_cmd == HIO_WRITE) { 64398937Sdes if (hast_proto_recv_data(res, res->hr_remotein, 64498937Sdes hio->hio_nv, hio->hio_data, MAXPHYS) < 0) { 64598937Sdes secondary_exit(EX_TEMPFAIL, 64698937Sdes "Unable to receive request data"); 64798937Sdes } 64898937Sdes } 64998937Sdes pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 65098937Sdes hio); 65198937Sdes QUEUE_INSERT(disk, hio); 65298937Sdes } 65398937Sdes /* NOTREACHED */ 65498937Sdes return (NULL); 65598937Sdes} 65699768Sdes 65799768Sdes/* 65898937Sdes * Thread reads from or writes to local component and also handles DELETE and 65998937Sdes * FLUSH requests. 66098937Sdes */ 66198937Sdesstatic void * 66298937Sdesdisk_thread(void *arg) 66398937Sdes{ 664113911Sdes struct hast_resource *res = arg; 665113911Sdes struct hio *hio; 666113911Sdes ssize_t ret; 667113911Sdes bool clear_activemap; 668113911Sdes 669113911Sdes clear_activemap = true; 670113911Sdes 671113911Sdes for (;;) { 672113911Sdes pjdlog_debug(2, "disk: Taking request."); 673113911Sdes QUEUE_TAKE(disk, hio); 674113911Sdes while (clear_activemap) { 675113911Sdes unsigned char *map; 676113911Sdes size_t mapsize; 67798937Sdes 67898937Sdes /* 67998937Sdes * When first request is received, it means that primary 68098937Sdes * already received our activemap, merged it and stored 68198937Sdes * locally. We can now safely clear our activemap. 68298937Sdes */ 68398937Sdes mapsize = 68498937Sdes activemap_calc_ondisk_size(res->hr_local_mediasize - 68598937Sdes METADATA_SIZE, res->hr_extentsize, 68698937Sdes res->hr_local_sectorsize); 68798937Sdes map = calloc(1, mapsize); 68898937Sdes if (map == NULL) { 68998937Sdes pjdlog_warning("Unable to allocate memory to clear local activemap."); 69098937Sdes break; 69198937Sdes } 69298937Sdes if (pwrite(res->hr_localfd, map, mapsize, 69398937Sdes METADATA_SIZE) != (ssize_t)mapsize) { 69498937Sdes pjdlog_errno(LOG_WARNING, 69598937Sdes "Unable to store cleared activemap"); 69698937Sdes free(map); 69798937Sdes break; 69898937Sdes } 69998937Sdes free(map); 70098937Sdes clear_activemap = false; 70198937Sdes pjdlog_debug(1, "Local activemap cleared."); 70298937Sdes } 70398937Sdes reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 70498937Sdes /* Handle the actual request. */ 705113911Sdes switch (hio->hio_cmd) { 706113911Sdes case HIO_READ: 707113911Sdes ret = pread(res->hr_localfd, hio->hio_data, 70898937Sdes hio->hio_length, 70998937Sdes hio->hio_offset + res->hr_localoff); 71098937Sdes if (ret < 0) 71198937Sdes hio->hio_error = errno; 71298937Sdes else if (ret != (int64_t)hio->hio_length) 71398937Sdes hio->hio_error = EIO; 71498937Sdes else 71598937Sdes hio->hio_error = 0; 71698937Sdes break; 71798937Sdes case HIO_WRITE: 71898937Sdes ret = pwrite(res->hr_localfd, hio->hio_data, 71998937Sdes hio->hio_length, 72098937Sdes hio->hio_offset + res->hr_localoff); 72198937Sdes if (ret < 0) 72298937Sdes hio->hio_error = errno; 72398937Sdes else if (ret != (int64_t)hio->hio_length) 72498937Sdes hio->hio_error = EIO; 72598937Sdes else 72698937Sdes hio->hio_error = 0; 72798937Sdes break; 72898937Sdes case HIO_DELETE: 72998937Sdes ret = g_delete(res->hr_localfd, 73098937Sdes hio->hio_offset + res->hr_localoff, 73198937Sdes hio->hio_length); 73298937Sdes if (ret < 0) 73398937Sdes hio->hio_error = errno; 73498937Sdes else 73598937Sdes hio->hio_error = 0; 73698937Sdes break; 73798937Sdes case HIO_FLUSH: 73898937Sdes ret = g_flush(res->hr_localfd); 73998937Sdes if (ret < 0) 74098937Sdes hio->hio_error = errno; 74198937Sdes else 74298937Sdes hio->hio_error = 0; 74398937Sdes break; 744113911Sdes } 745113911Sdes if (hio->hio_error != 0) { 746113911Sdes reqlog(LOG_ERR, 0, hio->hio_error, hio, 747113911Sdes "Request failed: "); 748113911Sdes } 749113911Sdes pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 750113911Sdes hio); 751113911Sdes QUEUE_INSERT(send, hio); 752113911Sdes } 753113911Sdes /* NOTREACHED */ 754113911Sdes return (NULL); 755113911Sdes} 756113911Sdes 75798937Sdes/* 75898937Sdes * Thread sends requests back to primary node. 75998937Sdes */ 76098937Sdesstatic void * 76198937Sdessend_thread(void *arg) 76298937Sdes{ 76398937Sdes struct hast_resource *res = arg; 76498937Sdes struct nv *nvout; 76598937Sdes struct hio *hio; 76698937Sdes void *data; 76798937Sdes size_t length; 76898937Sdes 76998937Sdes for (;;) { 77098937Sdes pjdlog_debug(2, "send: Taking request."); 77198937Sdes QUEUE_TAKE(send, hio); 77298937Sdes reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 77398937Sdes nvout = nv_alloc(); 77498937Sdes /* Copy sequence number. */ 77598937Sdes nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq"); 77698937Sdes switch (hio->hio_cmd) { 77798937Sdes case HIO_READ: 77898937Sdes if (hio->hio_error == 0) { 77998937Sdes data = hio->hio_data; 78098937Sdes length = hio->hio_length; 78198937Sdes break; 78298937Sdes } 78398937Sdes /* 78498937Sdes * We send no data in case of an error. 78598937Sdes */ 78698937Sdes /* FALLTHROUGH */ 78798937Sdes case HIO_DELETE: 78898937Sdes case HIO_FLUSH: 78998937Sdes case HIO_WRITE: 79098937Sdes data = NULL; 79198937Sdes length = 0; 79298937Sdes break; 79398937Sdes default: 79498937Sdes abort(); 79598937Sdes break; 79698937Sdes } 79798937Sdes if (hio->hio_error != 0) 79898937Sdes nv_add_int16(nvout, hio->hio_error, "error"); 79998937Sdes if (hast_proto_send(res, res->hr_remoteout, nvout, data, 80098937Sdes length) < 0) { 80198937Sdes secondary_exit(EX_TEMPFAIL, "Unable to send reply."); 80298937Sdes } 80398937Sdes nv_free(nvout); 80498937Sdes pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 80598937Sdes hio); 80698937Sdes nv_free(hio->hio_nv); 80798937Sdes hio->hio_error = 0; 80898937Sdes QUEUE_INSERT(free, hio); 80998937Sdes } 81098937Sdes /* NOTREACHED */ 81198937Sdes return (NULL); 81298937Sdes} 81398937Sdes