primary.c revision 223655
1149871Sscottl/*- 2149871Sscottl * Copyright (c) 2009 The FreeBSD Foundation 3136849Sscottl * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4136849Sscottl * All rights reserved. 5136849Sscottl * 6136849Sscottl * This software was developed by Pawel Jakub Dawidek under sponsorship from 7136849Sscottl * the FreeBSD Foundation. 8136849Sscottl * 9136849Sscottl * Redistribution and use in source and binary forms, with or without 10136849Sscottl * modification, are permitted provided that the following conditions 11136849Sscottl * are met: 12136849Sscottl * 1. Redistributions of source code must retain the above copyright 13136849Sscottl * notice, this list of conditions and the following disclaimer. 14136849Sscottl * 2. Redistributions in binary form must reproduce the above copyright 15136849Sscottl * notice, this list of conditions and the following disclaimer in the 16136849Sscottl * documentation and/or other materials provided with the distribution. 17136849Sscottl * 18136849Sscottl * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19136849Sscottl * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20136849Sscottl * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21136849Sscottl * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22136849Sscottl * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23136849Sscottl * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24136849Sscottl * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25136849Sscottl * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26136849Sscottl * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27136849Sscottl * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28136849Sscottl * SUCH DAMAGE. 29136849Sscottl */ 30136849Sscottl 31136849Sscottl#include <sys/cdefs.h> 32136849Sscottl__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 223655 2011-06-28 21:01:32Z trociny $"); 33136849Sscottl 34136849Sscottl#include <sys/types.h> 35136849Sscottl#include <sys/time.h> 36136849Sscottl#include <sys/bio.h> 37136849Sscottl#include <sys/disk.h> 38136849Sscottl#include <sys/refcount.h> 39136849Sscottl#include <sys/stat.h> 40136849Sscottl 41136849Sscottl#include <geom/gate/g_gate.h> 42136849Sscottl 43136849Sscottl#include <err.h> 44136849Sscottl#include <errno.h> 45136849Sscottl#include <fcntl.h> 46136849Sscottl#include <libgeom.h> 47136849Sscottl#include <pthread.h> 48136849Sscottl#include <signal.h> 49136849Sscottl#include <stdint.h> 50136849Sscottl#include <stdio.h> 51136849Sscottl#include <string.h> 52136849Sscottl#include <sysexits.h> 53136849Sscottl#include <unistd.h> 54136849Sscottl 55136849Sscottl#include <activemap.h> 56136849Sscottl#include <nv.h> 57136849Sscottl#include <rangelock.h> 58136849Sscottl 59136849Sscottl#include "control.h" 60136849Sscottl#include "event.h" 61136849Sscottl#include "hast.h" 62136849Sscottl#include "hast_proto.h" 63136849Sscottl#include "hastd.h" 64136849Sscottl#include "hooks.h" 65136849Sscottl#include "metadata.h" 66136849Sscottl#include "proto.h" 67136849Sscottl#include "pjdlog.h" 68136849Sscottl#include "subr.h" 69136849Sscottl#include "synch.h" 70136849Sscottl 71136849Sscottl/* The is only one remote component for now. */ 72136849Sscottl#define ISREMOTE(no) ((no) == 1) 73136849Sscottl 74136849Sscottlstruct hio { 75136849Sscottl /* 76136849Sscottl * Number of components we are still waiting for. 77136849Sscottl * When this field goes to 0, we can send the request back to the 78136849Sscottl * kernel. Each component has to decrease this counter by one 79136849Sscottl * even on failure. 80136849Sscottl */ 81136849Sscottl unsigned int hio_countdown; 82136849Sscottl /* 83136849Sscottl * Each component has a place to store its own error. 84136849Sscottl * Once the request is handled by all components we can decide if the 85136849Sscottl * request overall is successful or not. 86136849Sscottl */ 87136849Sscottl int *hio_errors; 88136849Sscottl /* 89136849Sscottl * Structure used to communicate with GEOM Gate class. 90136849Sscottl */ 91136849Sscottl struct g_gate_ctl_io hio_ggio; 92136849Sscottl TAILQ_ENTRY(hio) *hio_next; 93136849Sscottl}; 94136849Sscottl#define hio_free_next hio_next[0] 95136849Sscottl#define hio_done_next hio_next[0] 96136849Sscottl 97136849Sscottl/* 98136849Sscottl * Free list holds unused structures. When free list is empty, we have to wait 99136849Sscottl * until some in-progress requests are freed. 100136849Sscottl */ 101136849Sscottlstatic TAILQ_HEAD(, hio) hio_free_list; 102136849Sscottlstatic pthread_mutex_t hio_free_list_lock; 103136849Sscottlstatic pthread_cond_t hio_free_list_cond; 104136849Sscottl/* 105136849Sscottl * There is one send list for every component. One requests is placed on all 106136849Sscottl * send lists - each component gets the same request, but each component is 107136849Sscottl * responsible for managing his own send list. 108136849Sscottl */ 109136849Sscottlstatic TAILQ_HEAD(, hio) *hio_send_list; 110136849Sscottlstatic pthread_mutex_t *hio_send_list_lock; 111136849Sscottlstatic pthread_cond_t *hio_send_list_cond; 112136849Sscottl/* 113136849Sscottl * There is one recv list for every component, although local components don't 114136849Sscottl * use recv lists as local requests are done synchronously. 115136849Sscottl */ 116136849Sscottlstatic TAILQ_HEAD(, hio) *hio_recv_list; 117136849Sscottlstatic pthread_mutex_t *hio_recv_list_lock; 118136849Sscottlstatic pthread_cond_t *hio_recv_list_cond; 119136849Sscottl/* 120136849Sscottl * Request is placed on done list by the slowest component (the one that 121136849Sscottl * decreased hio_countdown from 1 to 0). 122136849Sscottl */ 123136849Sscottlstatic TAILQ_HEAD(, hio) hio_done_list; 124136849Sscottlstatic pthread_mutex_t hio_done_list_lock; 125136849Sscottlstatic pthread_cond_t hio_done_list_cond; 126136849Sscottl/* 127136849Sscottl * Structure below are for interaction with sync thread. 128136849Sscottl */ 129136849Sscottlstatic bool sync_inprogress; 130136849Sscottlstatic pthread_mutex_t sync_lock; 131136849Sscottlstatic pthread_cond_t sync_cond; 132136849Sscottl/* 133136849Sscottl * The lock below allows to synchornize access to remote connections. 134136849Sscottl */ 135136849Sscottlstatic pthread_rwlock_t *hio_remote_lock; 136136849Sscottl 137136849Sscottl/* 138136849Sscottl * Lock to synchronize metadata updates. Also synchronize access to 139136849Sscottl * hr_primary_localcnt and hr_primary_remotecnt fields. 140136849Sscottl */ 141136849Sscottlstatic pthread_mutex_t metadata_lock; 142136849Sscottl 143136849Sscottl/* 144136849Sscottl * Maximum number of outstanding I/O requests. 145136849Sscottl */ 146136849Sscottl#define HAST_HIO_MAX 256 147136849Sscottl/* 148136849Sscottl * Number of components. At this point there are only two components: local 149136849Sscottl * and remote, but in the future it might be possible to use multiple local 150136849Sscottl * and remote components. 151136849Sscottl */ 152136849Sscottl#define HAST_NCOMPONENTS 2 153136849Sscottl 154136849Sscottl#define ISCONNECTED(res, no) \ 155136849Sscottl ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156136849Sscottl 157136849Sscottl#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158136849Sscottl bool _wakeup; \ 159136849Sscottl \ 160136849Sscottl mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161136849Sscottl _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162136849Sscottl TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163136849Sscottl hio_next[(ncomp)]); \ 164136849Sscottl mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165190809Sdelphij if (_wakeup) \ 166136849Sscottl cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167136849Sscottl} while (0) 168136849Sscottl#define QUEUE_INSERT2(hio, name) do { \ 169136849Sscottl bool _wakeup; \ 170136849Sscottl \ 171136849Sscottl mtx_lock(&hio_##name##_list_lock); \ 172136849Sscottl _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173136849Sscottl TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174136849Sscottl mtx_unlock(&hio_##name##_list_lock); \ 175136849Sscottl if (_wakeup) \ 176136849Sscottl cv_signal(&hio_##name##_list_cond); \ 177136849Sscottl} while (0) 178136849Sscottl#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 179136849Sscottl bool _last; \ 180136849Sscottl \ 181136849Sscottl mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 182136849Sscottl _last = false; \ 183136849Sscottl while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 184136849Sscottl cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 185136849Sscottl &hio_##name##_list_lock[(ncomp)], (timeout)); \ 186136849Sscottl if ((timeout) != 0) \ 187136849Sscottl _last = true; \ 188136849Sscottl } \ 189136849Sscottl if (hio != NULL) { \ 190136849Sscottl TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 191136849Sscottl hio_next[(ncomp)]); \ 192136849Sscottl } \ 193136849Sscottl mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 194136849Sscottl} while (0) 195136849Sscottl#define QUEUE_TAKE2(hio, name) do { \ 196136849Sscottl mtx_lock(&hio_##name##_list_lock); \ 197136849Sscottl while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 198136849Sscottl cv_wait(&hio_##name##_list_cond, \ 199136849Sscottl &hio_##name##_list_lock); \ 200136849Sscottl } \ 201136849Sscottl TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 202136849Sscottl mtx_unlock(&hio_##name##_list_lock); \ 203136849Sscottl} while (0) 204136849Sscottl 205136849Sscottl#define SYNCREQ(hio) do { \ 206136849Sscottl (hio)->hio_ggio.gctl_unit = -1; \ 207136849Sscottl (hio)->hio_ggio.gctl_seq = 1; \ 208136849Sscottl} while (0) 209136849Sscottl#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 210136849Sscottl#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 211136849Sscottl#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 212136849Sscottl 213136849Sscottlstatic struct hast_resource *gres; 214136849Sscottl 215136849Sscottlstatic pthread_mutex_t range_lock; 216136849Sscottlstatic struct rangelocks *range_regular; 217136849Sscottlstatic bool range_regular_wait; 218136849Sscottlstatic pthread_cond_t range_regular_cond; 219136849Sscottlstatic struct rangelocks *range_sync; 220136849Sscottlstatic bool range_sync_wait; 221136849Sscottlstatic pthread_cond_t range_sync_cond; 222136849Sscottlstatic bool fullystarted; 223136849Sscottl 224190809Sdelphijstatic void *ggate_recv_thread(void *arg); 225190809Sdelphijstatic void *local_send_thread(void *arg); 226190809Sdelphijstatic void *remote_send_thread(void *arg); 227190809Sdelphijstatic void *remote_recv_thread(void *arg); 228190809Sdelphijstatic void *ggate_send_thread(void *arg); 229136849Sscottlstatic void *sync_thread(void *arg); 230136849Sscottlstatic void *guard_thread(void *arg); 231136849Sscottl 232136849Sscottlstatic void 233136849Sscottlcleanup(struct hast_resource *res) 234136849Sscottl{ 235136849Sscottl int rerrno; 236136849Sscottl 237136849Sscottl /* Remember errno. */ 238136849Sscottl rerrno = errno; 239136849Sscottl 240136849Sscottl /* Destroy ggate provider if we created one. */ 241136849Sscottl if (res->hr_ggateunit >= 0) { 242136849Sscottl struct g_gate_ctl_destroy ggiod; 243136849Sscottl 244136849Sscottl bzero(&ggiod, sizeof(ggiod)); 245136849Sscottl ggiod.gctl_version = G_GATE_VERSION; 246136849Sscottl ggiod.gctl_unit = res->hr_ggateunit; 247136849Sscottl ggiod.gctl_force = 1; 248136849Sscottl if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 249136849Sscottl pjdlog_errno(LOG_WARNING, 250136849Sscottl "Unable to destroy hast/%s device", 251136849Sscottl res->hr_provname); 252136849Sscottl } 253136849Sscottl res->hr_ggateunit = -1; 254136849Sscottl } 255136849Sscottl 256136849Sscottl /* Restore errno. */ 257136849Sscottl errno = rerrno; 258136849Sscottl} 259136849Sscottl 260136849Sscottlstatic __dead2 void 261136849Sscottlprimary_exit(int exitcode, const char *fmt, ...) 262136849Sscottl{ 263136849Sscottl va_list ap; 264136849Sscottl 265136849Sscottl PJDLOG_ASSERT(exitcode != EX_OK); 266136849Sscottl va_start(ap, fmt); 267136849Sscottl pjdlogv_errno(LOG_ERR, fmt, ap); 268136849Sscottl va_end(ap); 269136849Sscottl cleanup(gres); 270136849Sscottl exit(exitcode); 271136849Sscottl} 272136849Sscottl 273136849Sscottlstatic __dead2 void 274136849Sscottlprimary_exitx(int exitcode, const char *fmt, ...) 275136849Sscottl{ 276136849Sscottl va_list ap; 277136849Sscottl 278136849Sscottl va_start(ap, fmt); 279136849Sscottl pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 280136849Sscottl va_end(ap); 281136849Sscottl cleanup(gres); 282136849Sscottl exit(exitcode); 283136849Sscottl} 284136849Sscottl 285136849Sscottlstatic int 286136849Sscottlhast_activemap_flush(struct hast_resource *res) 287136849Sscottl{ 288136849Sscottl const unsigned char *buf; 289136849Sscottl size_t size; 290136849Sscottl 291136849Sscottl buf = activemap_bitmap(res->hr_amp, &size); 292136849Sscottl PJDLOG_ASSERT(buf != NULL); 293136849Sscottl PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 294136849Sscottl if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 295136849Sscottl (ssize_t)size) { 296136849Sscottl KEEP_ERRNO(pjdlog_errno(LOG_ERR, 297136849Sscottl "Unable to flush activemap to disk")); 298136849Sscottl return (-1); 299136849Sscottl } 300136849Sscottl return (0); 301136849Sscottl} 302136849Sscottl 303136849Sscottlstatic bool 304136849Sscottlreal_remote(const struct hast_resource *res) 305136849Sscottl{ 306136849Sscottl 307136849Sscottl return (strcmp(res->hr_remoteaddr, "none") != 0); 308136849Sscottl} 309136849Sscottl 310136849Sscottlstatic void 311136849Sscottlinit_environment(struct hast_resource *res __unused) 312136849Sscottl{ 313136849Sscottl struct hio *hio; 314136849Sscottl unsigned int ii, ncomps; 315136849Sscottl 316136849Sscottl /* 317136849Sscottl * In the future it might be per-resource value. 318136849Sscottl */ 319136849Sscottl ncomps = HAST_NCOMPONENTS; 320136849Sscottl 321136849Sscottl /* 322136849Sscottl * Allocate memory needed by lists. 323136849Sscottl */ 324136849Sscottl hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 325136849Sscottl if (hio_send_list == NULL) { 326136849Sscottl primary_exitx(EX_TEMPFAIL, 327136849Sscottl "Unable to allocate %zu bytes of memory for send lists.", 328136849Sscottl sizeof(hio_send_list[0]) * ncomps); 329136849Sscottl } 330136849Sscottl hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 331136849Sscottl if (hio_send_list_lock == NULL) { 332136849Sscottl primary_exitx(EX_TEMPFAIL, 333136849Sscottl "Unable to allocate %zu bytes of memory for send list locks.", 334136849Sscottl sizeof(hio_send_list_lock[0]) * ncomps); 335136849Sscottl } 336136849Sscottl hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 337136849Sscottl if (hio_send_list_cond == NULL) { 338136849Sscottl primary_exitx(EX_TEMPFAIL, 339136849Sscottl "Unable to allocate %zu bytes of memory for send list condition variables.", 340136849Sscottl sizeof(hio_send_list_cond[0]) * ncomps); 341136849Sscottl } 342136849Sscottl hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 343136849Sscottl if (hio_recv_list == NULL) { 344136849Sscottl primary_exitx(EX_TEMPFAIL, 345136849Sscottl "Unable to allocate %zu bytes of memory for recv lists.", 346136849Sscottl sizeof(hio_recv_list[0]) * ncomps); 347136849Sscottl } 348136849Sscottl hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 349136849Sscottl if (hio_recv_list_lock == NULL) { 350136849Sscottl primary_exitx(EX_TEMPFAIL, 351136849Sscottl "Unable to allocate %zu bytes of memory for recv list locks.", 352136849Sscottl sizeof(hio_recv_list_lock[0]) * ncomps); 353136849Sscottl } 354136849Sscottl hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 355136849Sscottl if (hio_recv_list_cond == NULL) { 356136849Sscottl primary_exitx(EX_TEMPFAIL, 357136849Sscottl "Unable to allocate %zu bytes of memory for recv list condition variables.", 358136849Sscottl sizeof(hio_recv_list_cond[0]) * ncomps); 359136849Sscottl } 360136849Sscottl hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 361136849Sscottl if (hio_remote_lock == NULL) { 362136849Sscottl primary_exitx(EX_TEMPFAIL, 363136849Sscottl "Unable to allocate %zu bytes of memory for remote connections locks.", 364136849Sscottl sizeof(hio_remote_lock[0]) * ncomps); 365136849Sscottl } 366136849Sscottl 367136849Sscottl /* 368136849Sscottl * Initialize lists, their locks and theirs condition variables. 369136849Sscottl */ 370136849Sscottl TAILQ_INIT(&hio_free_list); 371136849Sscottl mtx_init(&hio_free_list_lock); 372136849Sscottl cv_init(&hio_free_list_cond); 373136849Sscottl for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 374136849Sscottl TAILQ_INIT(&hio_send_list[ii]); 375136849Sscottl mtx_init(&hio_send_list_lock[ii]); 376190809Sdelphij cv_init(&hio_send_list_cond[ii]); 377190809Sdelphij TAILQ_INIT(&hio_recv_list[ii]); 378136849Sscottl mtx_init(&hio_recv_list_lock[ii]); 379190809Sdelphij cv_init(&hio_recv_list_cond[ii]); 380190809Sdelphij rw_init(&hio_remote_lock[ii]); 381136849Sscottl } 382190809Sdelphij TAILQ_INIT(&hio_done_list); 383136849Sscottl mtx_init(&hio_done_list_lock); 384136849Sscottl cv_init(&hio_done_list_cond); 385136849Sscottl mtx_init(&metadata_lock); 386136849Sscottl 387136849Sscottl /* 388136849Sscottl * Allocate requests pool and initialize requests. 389136849Sscottl */ 390136849Sscottl for (ii = 0; ii < HAST_HIO_MAX; ii++) { 391136849Sscottl hio = malloc(sizeof(*hio)); 392136849Sscottl if (hio == NULL) { 393136849Sscottl primary_exitx(EX_TEMPFAIL, 394136849Sscottl "Unable to allocate %zu bytes of memory for hio request.", 395136849Sscottl sizeof(*hio)); 396136849Sscottl } 397136849Sscottl hio->hio_countdown = 0; 398136849Sscottl hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 399136849Sscottl if (hio->hio_errors == NULL) { 400136849Sscottl primary_exitx(EX_TEMPFAIL, 401136849Sscottl "Unable allocate %zu bytes of memory for hio errors.", 402136849Sscottl sizeof(hio->hio_errors[0]) * ncomps); 403136849Sscottl } 404136849Sscottl hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 405136849Sscottl if (hio->hio_next == NULL) { 406136849Sscottl primary_exitx(EX_TEMPFAIL, 407136849Sscottl "Unable allocate %zu bytes of memory for hio_next field.", 408136849Sscottl sizeof(hio->hio_next[0]) * ncomps); 409136849Sscottl } 410136849Sscottl hio->hio_ggio.gctl_version = G_GATE_VERSION; 411136849Sscottl hio->hio_ggio.gctl_data = malloc(MAXPHYS); 412136849Sscottl if (hio->hio_ggio.gctl_data == NULL) { 413136849Sscottl primary_exitx(EX_TEMPFAIL, 414136849Sscottl "Unable to allocate %zu bytes of memory for gctl_data.", 415136849Sscottl MAXPHYS); 416136849Sscottl } 417136849Sscottl hio->hio_ggio.gctl_length = MAXPHYS; 418136849Sscottl hio->hio_ggio.gctl_error = 0; 419136849Sscottl TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 420136849Sscottl } 421136849Sscottl} 422136849Sscottl 423136849Sscottlstatic bool 424136849Sscottlinit_resuid(struct hast_resource *res) 425136849Sscottl{ 426136849Sscottl 427136849Sscottl mtx_lock(&metadata_lock); 428136849Sscottl if (res->hr_resuid != 0) { 429136849Sscottl mtx_unlock(&metadata_lock); 430136849Sscottl return (false); 431136849Sscottl } else { 432136849Sscottl /* Initialize unique resource identifier. */ 433136849Sscottl arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 434136849Sscottl mtx_unlock(&metadata_lock); 435136849Sscottl if (metadata_write(res) < 0) 436136849Sscottl exit(EX_NOINPUT); 437136849Sscottl return (true); 438136849Sscottl } 439136849Sscottl} 440136849Sscottl 441136849Sscottlstatic void 442136849Sscottlinit_local(struct hast_resource *res) 443136849Sscottl{ 444136849Sscottl unsigned char *buf; 445136849Sscottl size_t mapsize; 446136849Sscottl 447136849Sscottl if (metadata_read(res, true) < 0) 448136849Sscottl exit(EX_NOINPUT); 449136849Sscottl mtx_init(&res->hr_amp_lock); 450136849Sscottl if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 451136849Sscottl res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 452136849Sscottl primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 453136849Sscottl } 454136849Sscottl mtx_init(&range_lock); 455136849Sscottl cv_init(&range_regular_cond); 456136849Sscottl if (rangelock_init(&range_regular) < 0) 457136849Sscottl primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 458136849Sscottl cv_init(&range_sync_cond); 459136849Sscottl if (rangelock_init(&range_sync) < 0) 460136849Sscottl primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 461136849Sscottl mapsize = activemap_ondisk_size(res->hr_amp); 462136849Sscottl buf = calloc(1, mapsize); 463136849Sscottl if (buf == NULL) { 464136849Sscottl primary_exitx(EX_TEMPFAIL, 465136849Sscottl "Unable to allocate buffer for activemap."); 466136849Sscottl } 467136849Sscottl if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 468136849Sscottl (ssize_t)mapsize) { 469136849Sscottl primary_exit(EX_NOINPUT, "Unable to read activemap"); 470136849Sscottl } 471136849Sscottl activemap_copyin(res->hr_amp, buf, mapsize); 472136849Sscottl free(buf); 473136849Sscottl if (res->hr_resuid != 0) 474136849Sscottl return; 475136849Sscottl /* 476136849Sscottl * We're using provider for the first time. Initialize local and remote 477136849Sscottl * counters. We don't initialize resuid here, as we want to do it just 478136849Sscottl * in time. The reason for this is that we want to inform secondary 479136849Sscottl * that there were no writes yet, so there is no need to synchronize 480136849Sscottl * anything. 481136849Sscottl */ 482136849Sscottl res->hr_primary_localcnt = 0; 483136849Sscottl res->hr_primary_remotecnt = 0; 484136849Sscottl if (metadata_write(res) < 0) 485136849Sscottl exit(EX_NOINPUT); 486136849Sscottl} 487136849Sscottl 488136849Sscottlstatic int 489136849Sscottlprimary_connect(struct hast_resource *res, struct proto_conn **connp) 490136849Sscottl{ 491136849Sscottl struct proto_conn *conn; 492136849Sscottl int16_t val; 493136849Sscottl 494136849Sscottl val = 1; 495136849Sscottl if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 496136849Sscottl primary_exit(EX_TEMPFAIL, 497136849Sscottl "Unable to send connection request to parent"); 498136849Sscottl } 499136849Sscottl if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 500136849Sscottl primary_exit(EX_TEMPFAIL, 501136849Sscottl "Unable to receive reply to connection request from parent"); 502136849Sscottl } 503136849Sscottl if (val != 0) { 504136849Sscottl errno = val; 505136849Sscottl pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 506136849Sscottl res->hr_remoteaddr); 507136849Sscottl return (-1); 508136849Sscottl } 509136849Sscottl if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 510136849Sscottl primary_exit(EX_TEMPFAIL, 511136849Sscottl "Unable to receive connection from parent"); 512136849Sscottl } 513136849Sscottl if (proto_connect_wait(conn, res->hr_timeout) < 0) { 514136849Sscottl pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 515136849Sscottl res->hr_remoteaddr); 516136849Sscottl proto_close(conn); 517136849Sscottl return (-1); 518136849Sscottl } 519136849Sscottl /* Error in setting timeout is not critical, but why should it fail? */ 520136849Sscottl if (proto_timeout(conn, res->hr_timeout) < 0) 521136849Sscottl pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 522136849Sscottl 523136849Sscottl *connp = conn; 524136849Sscottl 525136849Sscottl return (0); 526136849Sscottl} 527136849Sscottl 528136849Sscottlstatic int 529136849Sscottlinit_remote(struct hast_resource *res, struct proto_conn **inp, 530136849Sscottl struct proto_conn **outp) 531136849Sscottl{ 532136849Sscottl struct proto_conn *in, *out; 533136849Sscottl struct nv *nvout, *nvin; 534136849Sscottl const unsigned char *token; 535136849Sscottl unsigned char *map; 536136849Sscottl const char *errmsg; 537136849Sscottl int32_t extentsize; 538136849Sscottl int64_t datasize; 539136849Sscottl uint32_t mapsize; 540136849Sscottl size_t size; 541136849Sscottl int error; 542136849Sscottl 543136849Sscottl PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 544136849Sscottl PJDLOG_ASSERT(real_remote(res)); 545136849Sscottl 546136849Sscottl in = out = NULL; 547136849Sscottl errmsg = NULL; 548136849Sscottl 549136849Sscottl if (primary_connect(res, &out) == -1) 550136849Sscottl return (ECONNREFUSED); 551136849Sscottl 552136849Sscottl error = ECONNABORTED; 553136849Sscottl 554136849Sscottl /* 555136849Sscottl * First handshake step. 556136849Sscottl * Setup outgoing connection with remote node. 557136849Sscottl */ 558136849Sscottl nvout = nv_alloc(); 559136849Sscottl nv_add_string(nvout, res->hr_name, "resource"); 560136849Sscottl if (nv_error(nvout) != 0) { 561136849Sscottl pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 562136849Sscottl "Unable to allocate header for connection with %s", 563136849Sscottl res->hr_remoteaddr); 564136849Sscottl nv_free(nvout); 565136849Sscottl goto close; 566136849Sscottl } 567136849Sscottl if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 568136849Sscottl pjdlog_errno(LOG_WARNING, 569136849Sscottl "Unable to send handshake header to %s", 570136849Sscottl res->hr_remoteaddr); 571136849Sscottl nv_free(nvout); 572136849Sscottl goto close; 573136849Sscottl } 574136849Sscottl nv_free(nvout); 575136849Sscottl if (hast_proto_recv_hdr(out, &nvin) < 0) { 576136849Sscottl pjdlog_errno(LOG_WARNING, 577136849Sscottl "Unable to receive handshake header from %s", 578136849Sscottl res->hr_remoteaddr); 579136849Sscottl goto close; 580136849Sscottl } 581136849Sscottl errmsg = nv_get_string(nvin, "errmsg"); 582136849Sscottl if (errmsg != NULL) { 583136849Sscottl pjdlog_warning("%s", errmsg); 584136849Sscottl if (nv_exists(nvin, "wait")) 585136849Sscottl error = EBUSY; 586136849Sscottl nv_free(nvin); 587136849Sscottl goto close; 588136849Sscottl } 589136849Sscottl token = nv_get_uint8_array(nvin, &size, "token"); 590136849Sscottl if (token == NULL) { 591136849Sscottl pjdlog_warning("Handshake header from %s has no 'token' field.", 592136849Sscottl res->hr_remoteaddr); 593136849Sscottl nv_free(nvin); 594136849Sscottl goto close; 595136849Sscottl } 596136849Sscottl if (size != sizeof(res->hr_token)) { 597136849Sscottl pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 598136849Sscottl res->hr_remoteaddr, size, sizeof(res->hr_token)); 599136849Sscottl nv_free(nvin); 600136849Sscottl goto close; 601136849Sscottl } 602136849Sscottl bcopy(token, res->hr_token, sizeof(res->hr_token)); 603136849Sscottl nv_free(nvin); 604136849Sscottl 605136849Sscottl /* 606136849Sscottl * Second handshake step. 607136849Sscottl * Setup incoming connection with remote node. 608136849Sscottl */ 609136849Sscottl if (primary_connect(res, &in) == -1) 610136849Sscottl goto close; 611136849Sscottl 612136849Sscottl nvout = nv_alloc(); 613136849Sscottl nv_add_string(nvout, res->hr_name, "resource"); 614136849Sscottl nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 615136849Sscottl "token"); 616136849Sscottl if (res->hr_resuid == 0) { 617136849Sscottl /* 618136849Sscottl * The resuid field was not yet initialized. 619136849Sscottl * Because we do synchronization inside init_resuid(), it is 620136849Sscottl * possible that someone already initialized it, the function 621136849Sscottl * will return false then, but if we successfully initialized 622136849Sscottl * it, we will get true. True means that there were no writes 623136849Sscottl * to this resource yet and we want to inform secondary that 624136849Sscottl * synchronization is not needed by sending "virgin" argument. 625136849Sscottl */ 626136849Sscottl if (init_resuid(res)) 627136849Sscottl nv_add_int8(nvout, 1, "virgin"); 628136849Sscottl } 629136849Sscottl nv_add_uint64(nvout, res->hr_resuid, "resuid"); 630136849Sscottl nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 631136849Sscottl nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 632136849Sscottl if (nv_error(nvout) != 0) { 633136849Sscottl pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 634136849Sscottl "Unable to allocate header for connection with %s", 635136849Sscottl res->hr_remoteaddr); 636136849Sscottl nv_free(nvout); 637136849Sscottl goto close; 638136849Sscottl } 639136849Sscottl if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 640136849Sscottl pjdlog_errno(LOG_WARNING, 641136849Sscottl "Unable to send handshake header to %s", 642136849Sscottl res->hr_remoteaddr); 643136849Sscottl nv_free(nvout); 644136849Sscottl goto close; 645136849Sscottl } 646136849Sscottl nv_free(nvout); 647136849Sscottl if (hast_proto_recv_hdr(out, &nvin) < 0) { 648136849Sscottl pjdlog_errno(LOG_WARNING, 649136849Sscottl "Unable to receive handshake header from %s", 650136849Sscottl res->hr_remoteaddr); 651136849Sscottl goto close; 652136849Sscottl } 653136849Sscottl errmsg = nv_get_string(nvin, "errmsg"); 654136849Sscottl if (errmsg != NULL) { 655136849Sscottl pjdlog_warning("%s", errmsg); 656136849Sscottl nv_free(nvin); 657136849Sscottl goto close; 658136849Sscottl } 659136849Sscottl datasize = nv_get_int64(nvin, "datasize"); 660136849Sscottl if (datasize != res->hr_datasize) { 661136849Sscottl pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 662136849Sscottl (intmax_t)res->hr_datasize, (intmax_t)datasize); 663136849Sscottl nv_free(nvin); 664136849Sscottl goto close; 665136849Sscottl } 666136849Sscottl extentsize = nv_get_int32(nvin, "extentsize"); 667136849Sscottl if (extentsize != res->hr_extentsize) { 668136849Sscottl pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 669136849Sscottl (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 670136849Sscottl nv_free(nvin); 671136849Sscottl goto close; 672136849Sscottl } 673136849Sscottl res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 674136849Sscottl res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 675136849Sscottl res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 676136849Sscottl if (nv_exists(nvin, "virgin")) { 677136849Sscottl /* 678136849Sscottl * Secondary was reinitialized, bump localcnt if it is 0 as 679136849Sscottl * only we have the data. 680136849Sscottl */ 681136849Sscottl PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 682136849Sscottl PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 683136849Sscottl 684136849Sscottl if (res->hr_primary_localcnt == 0) { 685136849Sscottl PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 686136849Sscottl 687136849Sscottl mtx_lock(&metadata_lock); 688136849Sscottl res->hr_primary_localcnt++; 689136849Sscottl pjdlog_debug(1, "Increasing localcnt to %ju.", 690136849Sscottl (uintmax_t)res->hr_primary_localcnt); 691136849Sscottl (void)metadata_write(res); 692136849Sscottl mtx_unlock(&metadata_lock); 693136849Sscottl } 694136849Sscottl } 695136849Sscottl map = NULL; 696136849Sscottl mapsize = nv_get_uint32(nvin, "mapsize"); 697136849Sscottl if (mapsize > 0) { 698136849Sscottl map = malloc(mapsize); 699136849Sscottl if (map == NULL) { 700190809Sdelphij pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 701190809Sdelphij (uintmax_t)mapsize); 702190809Sdelphij nv_free(nvin); 703136849Sscottl goto close; 704136849Sscottl } 705190809Sdelphij /* 706190809Sdelphij * Remote node have some dirty extents on its own, lets 707190809Sdelphij * download its activemap. 708190809Sdelphij */ 709136849Sscottl if (hast_proto_recv_data(res, out, nvin, map, 710136849Sscottl mapsize) < 0) { 711136849Sscottl pjdlog_errno(LOG_ERR, 712136849Sscottl "Unable to receive remote activemap"); 713136849Sscottl nv_free(nvin); 714136849Sscottl free(map); 715136849Sscottl goto close; 716136849Sscottl } 717136849Sscottl /* 718136849Sscottl * Merge local and remote bitmaps. 719136849Sscottl */ 720136849Sscottl activemap_merge(res->hr_amp, map, mapsize); 721136849Sscottl free(map); 722136849Sscottl /* 723136849Sscottl * Now that we merged bitmaps from both nodes, flush it to the 724136849Sscottl * disk before we start to synchronize. 725136849Sscottl */ 726136849Sscottl (void)hast_activemap_flush(res); 727136849Sscottl } 728136849Sscottl nv_free(nvin); 729136849Sscottl#ifdef notyet 730136849Sscottl /* Setup directions. */ 731136849Sscottl if (proto_send(out, NULL, 0) == -1) 732136849Sscottl pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 733136849Sscottl if (proto_recv(in, NULL, 0) == -1) 734136849Sscottl pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 735136849Sscottl#endif 736136849Sscottl pjdlog_info("Connected to %s.", res->hr_remoteaddr); 737136849Sscottl if (inp != NULL && outp != NULL) { 738136849Sscottl *inp = in; 739136849Sscottl *outp = out; 740136849Sscottl } else { 741190809Sdelphij res->hr_remotein = in; 742190809Sdelphij res->hr_remoteout = out; 743136849Sscottl } 744136849Sscottl event_send(res, EVENT_CONNECT); 745136849Sscottl return (0); 746136849Sscottlclose: 747136849Sscottl if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 748136849Sscottl event_send(res, EVENT_SPLITBRAIN); 749136849Sscottl proto_close(out); 750136849Sscottl if (in != NULL) 751136849Sscottl proto_close(in); 752136849Sscottl return (error); 753136849Sscottl} 754136849Sscottl 755190809Sdelphijstatic void 756136849Sscottlsync_start(void) 757136849Sscottl{ 758136849Sscottl 759136849Sscottl mtx_lock(&sync_lock); 760136849Sscottl sync_inprogress = true; 761136849Sscottl mtx_unlock(&sync_lock); 762136849Sscottl cv_signal(&sync_cond); 763136849Sscottl} 764136849Sscottl 765136849Sscottlstatic void 766136849Sscottlsync_stop(void) 767136849Sscottl{ 768136849Sscottl 769136849Sscottl mtx_lock(&sync_lock); 770136849Sscottl if (sync_inprogress) 771136849Sscottl sync_inprogress = false; 772136849Sscottl mtx_unlock(&sync_lock); 773136849Sscottl} 774136849Sscottl 775136849Sscottlstatic void 776136849Sscottlinit_ggate(struct hast_resource *res) 777136849Sscottl{ 778136849Sscottl struct g_gate_ctl_create ggiocreate; 779136849Sscottl struct g_gate_ctl_cancel ggiocancel; 780136849Sscottl 781136849Sscottl /* 782136849Sscottl * We communicate with ggate via /dev/ggctl. Open it. 783136849Sscottl */ 784136849Sscottl res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 785136849Sscottl if (res->hr_ggatefd < 0) 786136849Sscottl primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 787136849Sscottl /* 788136849Sscottl * Create provider before trying to connect, as connection failure 789136849Sscottl * is not critical, but may take some time. 790136849Sscottl */ 791136849Sscottl bzero(&ggiocreate, sizeof(ggiocreate)); 792149871Sscottl ggiocreate.gctl_version = G_GATE_VERSION; 793136849Sscottl ggiocreate.gctl_mediasize = res->hr_datasize; 794136849Sscottl ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 795136849Sscottl ggiocreate.gctl_flags = 0; 796136849Sscottl ggiocreate.gctl_maxcount = 0; 797136849Sscottl ggiocreate.gctl_timeout = 0; 798136849Sscottl ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 799136849Sscottl snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 800136849Sscottl res->hr_provname); 801136849Sscottl if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 802136849Sscottl pjdlog_info("Device hast/%s created.", res->hr_provname); 803136849Sscottl res->hr_ggateunit = ggiocreate.gctl_unit; 804136849Sscottl return; 805136849Sscottl } 806136849Sscottl if (errno != EEXIST) { 807136849Sscottl primary_exit(EX_OSERR, "Unable to create hast/%s device", 808136849Sscottl res->hr_provname); 809136849Sscottl } 810136849Sscottl pjdlog_debug(1, 811136849Sscottl "Device hast/%s already exists, we will try to take it over.", 812136849Sscottl res->hr_provname); 813136849Sscottl /* 814136849Sscottl * If we received EEXIST, we assume that the process who created the 815136849Sscottl * provider died and didn't clean up. In that case we will start from 816136849Sscottl * where he left of. 817136849Sscottl */ 818149871Sscottl bzero(&ggiocancel, sizeof(ggiocancel)); 819136849Sscottl ggiocancel.gctl_version = G_GATE_VERSION; 820136849Sscottl ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 821136849Sscottl snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 822136849Sscottl res->hr_provname); 823136849Sscottl if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 824136849Sscottl pjdlog_info("Device hast/%s recovered.", res->hr_provname); 825136849Sscottl res->hr_ggateunit = ggiocancel.gctl_unit; 826136849Sscottl return; 827136849Sscottl } 828136849Sscottl primary_exit(EX_OSERR, "Unable to take over hast/%s device", 829136849Sscottl res->hr_provname); 830136849Sscottl} 831136849Sscottl 832136849Sscottlvoid 833136849Sscottlhastd_primary(struct hast_resource *res) 834136849Sscottl{ 835136849Sscottl pthread_t td; 836136849Sscottl pid_t pid; 837136849Sscottl int error, mode, debuglevel; 838136849Sscottl 839136849Sscottl /* 840136849Sscottl * Create communication channel for sending control commands from 841136849Sscottl * parent to child. 842136849Sscottl */ 843136849Sscottl if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 844136849Sscottl /* TODO: There's no need for this to be fatal error. */ 845136849Sscottl KEEP_ERRNO((void)pidfile_remove(pfh)); 846136849Sscottl pjdlog_exit(EX_OSERR, 847136849Sscottl "Unable to create control sockets between parent and child"); 848136849Sscottl } 849136849Sscottl /* 850136849Sscottl * Create communication channel for sending events from child to parent. 851136849Sscottl */ 852136849Sscottl if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 853136849Sscottl /* TODO: There's no need for this to be fatal error. */ 854136849Sscottl KEEP_ERRNO((void)pidfile_remove(pfh)); 855136849Sscottl pjdlog_exit(EX_OSERR, 856136849Sscottl "Unable to create event sockets between child and parent"); 857136849Sscottl } 858136849Sscottl /* 859136849Sscottl * Create communication channel for sending connection requests from 860136849Sscottl * child to parent. 861136849Sscottl */ 862136849Sscottl if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 863136849Sscottl /* TODO: There's no need for this to be fatal error. */ 864136849Sscottl KEEP_ERRNO((void)pidfile_remove(pfh)); 865136849Sscottl pjdlog_exit(EX_OSERR, 866136849Sscottl "Unable to create connection sockets between child and parent"); 867136849Sscottl } 868136849Sscottl 869136849Sscottl pid = fork(); 870136849Sscottl if (pid < 0) { 871136849Sscottl /* TODO: There's no need for this to be fatal error. */ 872136849Sscottl KEEP_ERRNO((void)pidfile_remove(pfh)); 873136849Sscottl pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 874136849Sscottl } 875136849Sscottl 876136849Sscottl if (pid > 0) { 877136849Sscottl /* This is parent. */ 878136849Sscottl /* Declare that we are receiver. */ 879136849Sscottl proto_recv(res->hr_event, NULL, 0); 880136849Sscottl proto_recv(res->hr_conn, NULL, 0); 881136849Sscottl /* Declare that we are sender. */ 882136849Sscottl proto_send(res->hr_ctrl, NULL, 0); 883136849Sscottl res->hr_workerpid = pid; 884136849Sscottl return; 885136849Sscottl } 886136849Sscottl 887136849Sscottl gres = res; 888136849Sscottl mode = pjdlog_mode_get(); 889136849Sscottl debuglevel = pjdlog_debug_get(); 890136849Sscottl 891136849Sscottl /* Declare that we are sender. */ 892136849Sscottl proto_send(res->hr_event, NULL, 0); 893136849Sscottl proto_send(res->hr_conn, NULL, 0); 894136849Sscottl /* Declare that we are receiver. */ 895136849Sscottl proto_recv(res->hr_ctrl, NULL, 0); 896136849Sscottl descriptors_cleanup(res); 897136849Sscottl 898136849Sscottl descriptors_assert(res, mode); 899136849Sscottl 900136849Sscottl pjdlog_init(mode); 901136849Sscottl pjdlog_debug_set(debuglevel); 902136849Sscottl pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 903136849Sscottl setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 904136849Sscottl 905136849Sscottl init_local(res); 906136849Sscottl init_ggate(res); 907136849Sscottl init_environment(res); 908136849Sscottl 909136849Sscottl if (drop_privs(res) != 0) { 910136849Sscottl cleanup(res); 911136849Sscottl exit(EX_CONFIG); 912136849Sscottl } 913136849Sscottl pjdlog_info("Privileges successfully dropped."); 914136849Sscottl 915136849Sscottl /* 916136849Sscottl * Create the guard thread first, so we can handle signals from the 917136849Sscottl * very begining. 918136849Sscottl */ 919136849Sscottl error = pthread_create(&td, NULL, guard_thread, res); 920136849Sscottl PJDLOG_ASSERT(error == 0); 921136849Sscottl /* 922136849Sscottl * Create the control thread before sending any event to the parent, 923136849Sscottl * as we can deadlock when parent sends control request to worker, 924136849Sscottl * but worker has no control thread started yet, so parent waits. 925136849Sscottl * In the meantime worker sends an event to the parent, but parent 926136849Sscottl * is unable to handle the event, because it waits for control 927136849Sscottl * request response. 928136849Sscottl */ 929136849Sscottl error = pthread_create(&td, NULL, ctrl_thread, res); 930136849Sscottl PJDLOG_ASSERT(error == 0); 931136849Sscottl if (real_remote(res)) { 932136849Sscottl error = init_remote(res, NULL, NULL); 933136849Sscottl if (error == 0) { 934136849Sscottl sync_start(); 935136849Sscottl } else if (error == EBUSY) { 936136849Sscottl time_t start = time(NULL); 937136849Sscottl 938136849Sscottl pjdlog_warning("Waiting for remote node to become %s for %ds.", 939136849Sscottl role2str(HAST_ROLE_SECONDARY), 940136849Sscottl res->hr_timeout); 941136849Sscottl for (;;) { 942136849Sscottl sleep(1); 943136849Sscottl error = init_remote(res, NULL, NULL); 944136849Sscottl if (error != EBUSY) 945136849Sscottl break; 946136849Sscottl if (time(NULL) > start + res->hr_timeout) 947136849Sscottl break; 948136849Sscottl } 949136849Sscottl if (error == EBUSY) { 950136849Sscottl pjdlog_warning("Remote node is still %s, starting anyway.", 951136849Sscottl role2str(HAST_ROLE_PRIMARY)); 952136849Sscottl } 953136849Sscottl } 954136849Sscottl } 955136849Sscottl error = pthread_create(&td, NULL, ggate_recv_thread, res); 956136849Sscottl PJDLOG_ASSERT(error == 0); 957136849Sscottl error = pthread_create(&td, NULL, local_send_thread, res); 958136849Sscottl PJDLOG_ASSERT(error == 0); 959136849Sscottl error = pthread_create(&td, NULL, remote_send_thread, res); 960136849Sscottl PJDLOG_ASSERT(error == 0); 961136849Sscottl error = pthread_create(&td, NULL, remote_recv_thread, res); 962136849Sscottl PJDLOG_ASSERT(error == 0); 963136849Sscottl error = pthread_create(&td, NULL, ggate_send_thread, res); 964136849Sscottl PJDLOG_ASSERT(error == 0); 965136849Sscottl fullystarted = true; 966136849Sscottl (void)sync_thread(res); 967136849Sscottl} 968136849Sscottl 969136849Sscottlstatic void 970136849Sscottlreqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 971136849Sscottl{ 972136849Sscottl char msg[1024]; 973136849Sscottl va_list ap; 974136849Sscottl int len; 975136849Sscottl 976136849Sscottl va_start(ap, fmt); 977136849Sscottl len = vsnprintf(msg, sizeof(msg), fmt, ap); 978136849Sscottl va_end(ap); 979136849Sscottl if ((size_t)len < sizeof(msg)) { 980136849Sscottl switch (ggio->gctl_cmd) { 981136849Sscottl case BIO_READ: 982136849Sscottl (void)snprintf(msg + len, sizeof(msg) - len, 983136849Sscottl "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 984136849Sscottl (uintmax_t)ggio->gctl_length); 985136849Sscottl break; 986136849Sscottl case BIO_DELETE: 987136849Sscottl (void)snprintf(msg + len, sizeof(msg) - len, 988136849Sscottl "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 989136849Sscottl (uintmax_t)ggio->gctl_length); 990136849Sscottl break; 991136849Sscottl case BIO_FLUSH: 992136849Sscottl (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 993136849Sscottl break; 994136849Sscottl case BIO_WRITE: 995136849Sscottl (void)snprintf(msg + len, sizeof(msg) - len, 996136849Sscottl "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 997136849Sscottl (uintmax_t)ggio->gctl_length); 998136849Sscottl break; 999136849Sscottl default: 1000136849Sscottl (void)snprintf(msg + len, sizeof(msg) - len, 1001136849Sscottl "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 1002136849Sscottl break; 1003136849Sscottl } 1004136849Sscottl } 1005136849Sscottl pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1006136849Sscottl} 1007136849Sscottl 1008136849Sscottlstatic void 1009136849Sscottlremote_close(struct hast_resource *res, int ncomp) 1010136849Sscottl{ 1011136849Sscottl 1012136849Sscottl rw_wlock(&hio_remote_lock[ncomp]); 1013136849Sscottl /* 1014136849Sscottl * A race is possible between dropping rlock and acquiring wlock - 1015136849Sscottl * another thread can close connection in-between. 1016136849Sscottl */ 1017136849Sscottl if (!ISCONNECTED(res, ncomp)) { 1018136849Sscottl PJDLOG_ASSERT(res->hr_remotein == NULL); 1019136849Sscottl PJDLOG_ASSERT(res->hr_remoteout == NULL); 1020136849Sscottl rw_unlock(&hio_remote_lock[ncomp]); 1021136849Sscottl return; 1022136849Sscottl } 1023136849Sscottl 1024136849Sscottl PJDLOG_ASSERT(res->hr_remotein != NULL); 1025136849Sscottl PJDLOG_ASSERT(res->hr_remoteout != NULL); 1026136849Sscottl 1027136849Sscottl pjdlog_debug(2, "Closing incoming connection to %s.", 1028136849Sscottl res->hr_remoteaddr); 1029136849Sscottl proto_close(res->hr_remotein); 1030136849Sscottl res->hr_remotein = NULL; 1031136849Sscottl pjdlog_debug(2, "Closing outgoing connection to %s.", 1032136849Sscottl res->hr_remoteaddr); 1033136849Sscottl proto_close(res->hr_remoteout); 1034136849Sscottl res->hr_remoteout = NULL; 1035136849Sscottl 1036136849Sscottl rw_unlock(&hio_remote_lock[ncomp]); 1037136849Sscottl 1038136849Sscottl pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1039136849Sscottl 1040136849Sscottl /* 1041136849Sscottl * Stop synchronization if in-progress. 1042136849Sscottl */ 1043136849Sscottl sync_stop(); 1044136849Sscottl 1045136849Sscottl event_send(res, EVENT_DISCONNECT); 1046136849Sscottl} 1047136849Sscottl 1048136849Sscottl/* 1049136849Sscottl * Thread receives ggate I/O requests from the kernel and passes them to 1050136849Sscottl * appropriate threads: 1051136849Sscottl * WRITE - always goes to both local_send and remote_send threads 1052136849Sscottl * READ (when the block is up-to-date on local component) - 1053136849Sscottl * only local_send thread 1054136849Sscottl * READ (when the block isn't up-to-date on local component) - 1055136849Sscottl * only remote_send thread 1056136849Sscottl * DELETE - always goes to both local_send and remote_send threads 1057136849Sscottl * FLUSH - always goes to both local_send and remote_send threads 1058136849Sscottl */ 1059136849Sscottlstatic void * 1060149871Sscottlggate_recv_thread(void *arg) 1061136849Sscottl{ 1062136849Sscottl struct hast_resource *res = arg; 1063136849Sscottl struct g_gate_ctl_io *ggio; 1064136849Sscottl struct hio *hio; 1065136849Sscottl unsigned int ii, ncomp, ncomps; 1066136849Sscottl int error; 1067136849Sscottl 1068136849Sscottl ncomps = HAST_NCOMPONENTS; 1069136849Sscottl 1070136849Sscottl for (;;) { 1071136849Sscottl pjdlog_debug(2, "ggate_recv: Taking free request."); 1072136849Sscottl QUEUE_TAKE2(hio, free); 1073136849Sscottl pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1074136849Sscottl ggio = &hio->hio_ggio; 1075136849Sscottl ggio->gctl_unit = res->hr_ggateunit; 1076136849Sscottl ggio->gctl_length = MAXPHYS; 1077136849Sscottl ggio->gctl_error = 0; 1078136849Sscottl pjdlog_debug(2, 1079136849Sscottl "ggate_recv: (%p) Waiting for request from the kernel.", 1080136849Sscottl hio); 1081136849Sscottl if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1082136849Sscottl if (sigexit_received) 1083136849Sscottl pthread_exit(NULL); 1084136849Sscottl primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1085136849Sscottl } 1086136849Sscottl error = ggio->gctl_error; 1087136849Sscottl switch (error) { 1088136849Sscottl case 0: 1089136849Sscottl break; 1090136849Sscottl case ECANCELED: 1091136849Sscottl /* Exit gracefully. */ 1092136849Sscottl if (!sigexit_received) { 1093136849Sscottl pjdlog_debug(2, 1094136849Sscottl "ggate_recv: (%p) Received cancel from the kernel.", 1095136849Sscottl hio); 1096136849Sscottl pjdlog_info("Received cancel from the kernel, exiting."); 1097136849Sscottl } 1098136849Sscottl pthread_exit(NULL); 1099136849Sscottl case ENOMEM: 1100136849Sscottl /* 1101136849Sscottl * Buffer too small? Impossible, we allocate MAXPHYS 1102136849Sscottl * bytes - request can't be bigger than that. 1103136849Sscottl */ 1104136849Sscottl /* FALLTHROUGH */ 1105136849Sscottl case ENXIO: 1106136849Sscottl default: 1107136849Sscottl primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1108136849Sscottl strerror(error)); 1109136849Sscottl } 1110136849Sscottl for (ii = 0; ii < ncomps; ii++) 1111136849Sscottl hio->hio_errors[ii] = EINVAL; 1112136849Sscottl reqlog(LOG_DEBUG, 2, ggio, 1113136849Sscottl "ggate_recv: (%p) Request received from the kernel: ", 1114136849Sscottl hio); 1115136849Sscottl /* 1116136849Sscottl * Inform all components about new write request. 1117136849Sscottl * For read request prefer local component unless the given 1118136849Sscottl * range is out-of-date, then use remote component. 1119136849Sscottl */ 1120136849Sscottl switch (ggio->gctl_cmd) { 1121136849Sscottl case BIO_READ: 1122136849Sscottl res->hr_stat_read++; 1123136849Sscottl pjdlog_debug(2, 1124136849Sscottl "ggate_recv: (%p) Moving request to the send queue.", 1125136849Sscottl hio); 1126136849Sscottl refcount_init(&hio->hio_countdown, 1); 1127136849Sscottl mtx_lock(&metadata_lock); 1128136849Sscottl if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1129136849Sscottl res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1130136849Sscottl /* 1131136849Sscottl * This range is up-to-date on local component, 1132136849Sscottl * so handle request locally. 1133136849Sscottl */ 1134136849Sscottl /* Local component is 0 for now. */ 1135136849Sscottl ncomp = 0; 1136136849Sscottl } else /* if (res->hr_syncsrc == 1137136849Sscottl HAST_SYNCSRC_SECONDARY) */ { 1138136849Sscottl PJDLOG_ASSERT(res->hr_syncsrc == 1139136849Sscottl HAST_SYNCSRC_SECONDARY); 1140136849Sscottl /* 1141136849Sscottl * This range is out-of-date on local component, 1142136849Sscottl * so send request to the remote node. 1143136849Sscottl */ 1144136849Sscottl /* Remote component is 1 for now. */ 1145136849Sscottl ncomp = 1; 1146136849Sscottl } 1147136849Sscottl mtx_unlock(&metadata_lock); 1148136849Sscottl QUEUE_INSERT1(hio, send, ncomp); 1149136849Sscottl break; 1150136849Sscottl case BIO_WRITE: 1151136849Sscottl res->hr_stat_write++; 1152136849Sscottl if (res->hr_resuid == 0) { 1153136849Sscottl /* 1154136849Sscottl * This is first write, initialize localcnt and 1155136849Sscottl * resuid. 1156136849Sscottl */ 1157136849Sscottl res->hr_primary_localcnt = 1; 1158136849Sscottl (void)init_resuid(res); 1159136849Sscottl } 1160136849Sscottl for (;;) { 1161136849Sscottl mtx_lock(&range_lock); 1162136849Sscottl if (rangelock_islocked(range_sync, 1163136849Sscottl ggio->gctl_offset, ggio->gctl_length)) { 1164136849Sscottl pjdlog_debug(2, 1165136849Sscottl "regular: Range offset=%jd length=%zu locked.", 1166136849Sscottl (intmax_t)ggio->gctl_offset, 1167136849Sscottl (size_t)ggio->gctl_length); 1168136849Sscottl range_regular_wait = true; 1169136849Sscottl cv_wait(&range_regular_cond, &range_lock); 1170136849Sscottl range_regular_wait = false; 1171136849Sscottl mtx_unlock(&range_lock); 1172136849Sscottl continue; 1173136849Sscottl } 1174136849Sscottl if (rangelock_add(range_regular, 1175136849Sscottl ggio->gctl_offset, ggio->gctl_length) < 0) { 1176136849Sscottl mtx_unlock(&range_lock); 1177136849Sscottl pjdlog_debug(2, 1178136849Sscottl "regular: Range offset=%jd length=%zu is already locked, waiting.", 1179136849Sscottl (intmax_t)ggio->gctl_offset, 1180136849Sscottl (size_t)ggio->gctl_length); 1181136849Sscottl sleep(1); 1182136849Sscottl continue; 1183136849Sscottl } 1184136849Sscottl mtx_unlock(&range_lock); 1185136849Sscottl break; 1186136849Sscottl } 1187136849Sscottl mtx_lock(&res->hr_amp_lock); 1188136849Sscottl if (activemap_write_start(res->hr_amp, 1189136849Sscottl ggio->gctl_offset, ggio->gctl_length)) { 1190136849Sscottl res->hr_stat_activemap_update++; 1191136849Sscottl (void)hast_activemap_flush(res); 1192136849Sscottl } 1193136849Sscottl mtx_unlock(&res->hr_amp_lock); 1194136849Sscottl /* FALLTHROUGH */ 1195136849Sscottl case BIO_DELETE: 1196136849Sscottl case BIO_FLUSH: 1197136849Sscottl switch (ggio->gctl_cmd) { 1198136849Sscottl case BIO_DELETE: 1199136849Sscottl res->hr_stat_delete++; 1200136849Sscottl break; 1201136849Sscottl case BIO_FLUSH: 1202136849Sscottl res->hr_stat_flush++; 1203136849Sscottl break; 1204136849Sscottl } 1205136849Sscottl pjdlog_debug(2, 1206136849Sscottl "ggate_recv: (%p) Moving request to the send queues.", 1207136849Sscottl hio); 1208136849Sscottl refcount_init(&hio->hio_countdown, ncomps); 1209136849Sscottl for (ii = 0; ii < ncomps; ii++) 1210136849Sscottl QUEUE_INSERT1(hio, send, ii); 1211136849Sscottl break; 1212136849Sscottl } 1213136849Sscottl } 1214136849Sscottl /* NOTREACHED */ 1215136849Sscottl return (NULL); 1216136849Sscottl} 1217136849Sscottl 1218136849Sscottl/* 1219190809Sdelphij * Thread reads from or writes to local component. 1220190809Sdelphij * If local read fails, it redirects it to remote_send thread. 1221190809Sdelphij */ 1222190809Sdelphijstatic void * 1223190809Sdelphijlocal_send_thread(void *arg) 1224190809Sdelphij{ 1225190809Sdelphij struct hast_resource *res = arg; 1226190809Sdelphij struct g_gate_ctl_io *ggio; 1227190809Sdelphij struct hio *hio; 1228190809Sdelphij unsigned int ncomp, rncomp; 1229190809Sdelphij ssize_t ret; 1230190809Sdelphij 1231136849Sscottl /* Local component is 0 for now. */ 1232136849Sscottl ncomp = 0; 1233136849Sscottl /* Remote component is 1 for now. */ 1234136849Sscottl rncomp = 1; 1235 1236 for (;;) { 1237 pjdlog_debug(2, "local_send: Taking request."); 1238 QUEUE_TAKE1(hio, send, ncomp, 0); 1239 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1240 ggio = &hio->hio_ggio; 1241 switch (ggio->gctl_cmd) { 1242 case BIO_READ: 1243 ret = pread(res->hr_localfd, ggio->gctl_data, 1244 ggio->gctl_length, 1245 ggio->gctl_offset + res->hr_localoff); 1246 if (ret == ggio->gctl_length) 1247 hio->hio_errors[ncomp] = 0; 1248 else if (!ISSYNCREQ(hio)) { 1249 /* 1250 * If READ failed, try to read from remote node. 1251 */ 1252 if (ret < 0) { 1253 reqlog(LOG_WARNING, 0, ggio, 1254 "Local request failed (%s), trying remote node. ", 1255 strerror(errno)); 1256 } else if (ret != ggio->gctl_length) { 1257 reqlog(LOG_WARNING, 0, ggio, 1258 "Local request failed (%zd != %jd), trying remote node. ", 1259 ret, (intmax_t)ggio->gctl_length); 1260 } 1261 QUEUE_INSERT1(hio, send, rncomp); 1262 continue; 1263 } 1264 break; 1265 case BIO_WRITE: 1266 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1267 ggio->gctl_length, 1268 ggio->gctl_offset + res->hr_localoff); 1269 if (ret < 0) { 1270 hio->hio_errors[ncomp] = errno; 1271 reqlog(LOG_WARNING, 0, ggio, 1272 "Local request failed (%s): ", 1273 strerror(errno)); 1274 } else if (ret != ggio->gctl_length) { 1275 hio->hio_errors[ncomp] = EIO; 1276 reqlog(LOG_WARNING, 0, ggio, 1277 "Local request failed (%zd != %jd): ", 1278 ret, (intmax_t)ggio->gctl_length); 1279 } else { 1280 hio->hio_errors[ncomp] = 0; 1281 } 1282 break; 1283 case BIO_DELETE: 1284 ret = g_delete(res->hr_localfd, 1285 ggio->gctl_offset + res->hr_localoff, 1286 ggio->gctl_length); 1287 if (ret < 0) { 1288 hio->hio_errors[ncomp] = errno; 1289 reqlog(LOG_WARNING, 0, ggio, 1290 "Local request failed (%s): ", 1291 strerror(errno)); 1292 } else { 1293 hio->hio_errors[ncomp] = 0; 1294 } 1295 break; 1296 case BIO_FLUSH: 1297 ret = g_flush(res->hr_localfd); 1298 if (ret < 0) { 1299 hio->hio_errors[ncomp] = errno; 1300 reqlog(LOG_WARNING, 0, ggio, 1301 "Local request failed (%s): ", 1302 strerror(errno)); 1303 } else { 1304 hio->hio_errors[ncomp] = 0; 1305 } 1306 break; 1307 } 1308 if (refcount_release(&hio->hio_countdown)) { 1309 if (ISSYNCREQ(hio)) { 1310 mtx_lock(&sync_lock); 1311 SYNCREQDONE(hio); 1312 mtx_unlock(&sync_lock); 1313 cv_signal(&sync_cond); 1314 } else { 1315 pjdlog_debug(2, 1316 "local_send: (%p) Moving request to the done queue.", 1317 hio); 1318 QUEUE_INSERT2(hio, done); 1319 } 1320 } 1321 } 1322 /* NOTREACHED */ 1323 return (NULL); 1324} 1325 1326static void 1327keepalive_send(struct hast_resource *res, unsigned int ncomp) 1328{ 1329 struct nv *nv; 1330 1331 rw_rlock(&hio_remote_lock[ncomp]); 1332 1333 if (!ISCONNECTED(res, ncomp)) { 1334 rw_unlock(&hio_remote_lock[ncomp]); 1335 return; 1336 } 1337 1338 PJDLOG_ASSERT(res->hr_remotein != NULL); 1339 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1340 1341 nv = nv_alloc(); 1342 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1343 if (nv_error(nv) != 0) { 1344 rw_unlock(&hio_remote_lock[ncomp]); 1345 nv_free(nv); 1346 pjdlog_debug(1, 1347 "keepalive_send: Unable to prepare header to send."); 1348 return; 1349 } 1350 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1351 rw_unlock(&hio_remote_lock[ncomp]); 1352 pjdlog_common(LOG_DEBUG, 1, errno, 1353 "keepalive_send: Unable to send request"); 1354 nv_free(nv); 1355 remote_close(res, ncomp); 1356 return; 1357 } 1358 1359 rw_unlock(&hio_remote_lock[ncomp]); 1360 nv_free(nv); 1361 pjdlog_debug(2, "keepalive_send: Request sent."); 1362} 1363 1364/* 1365 * Thread sends request to secondary node. 1366 */ 1367static void * 1368remote_send_thread(void *arg) 1369{ 1370 struct hast_resource *res = arg; 1371 struct g_gate_ctl_io *ggio; 1372 time_t lastcheck, now; 1373 struct hio *hio; 1374 struct nv *nv; 1375 unsigned int ncomp; 1376 bool wakeup; 1377 uint64_t offset, length; 1378 uint8_t cmd; 1379 void *data; 1380 1381 /* Remote component is 1 for now. */ 1382 ncomp = 1; 1383 lastcheck = time(NULL); 1384 1385 for (;;) { 1386 pjdlog_debug(2, "remote_send: Taking request."); 1387 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1388 if (hio == NULL) { 1389 now = time(NULL); 1390 if (lastcheck + HAST_KEEPALIVE <= now) { 1391 keepalive_send(res, ncomp); 1392 lastcheck = now; 1393 } 1394 continue; 1395 } 1396 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1397 ggio = &hio->hio_ggio; 1398 switch (ggio->gctl_cmd) { 1399 case BIO_READ: 1400 cmd = HIO_READ; 1401 data = NULL; 1402 offset = ggio->gctl_offset; 1403 length = ggio->gctl_length; 1404 break; 1405 case BIO_WRITE: 1406 cmd = HIO_WRITE; 1407 data = ggio->gctl_data; 1408 offset = ggio->gctl_offset; 1409 length = ggio->gctl_length; 1410 break; 1411 case BIO_DELETE: 1412 cmd = HIO_DELETE; 1413 data = NULL; 1414 offset = ggio->gctl_offset; 1415 length = ggio->gctl_length; 1416 break; 1417 case BIO_FLUSH: 1418 cmd = HIO_FLUSH; 1419 data = NULL; 1420 offset = 0; 1421 length = 0; 1422 break; 1423 default: 1424 PJDLOG_ASSERT(!"invalid condition"); 1425 abort(); 1426 } 1427 nv = nv_alloc(); 1428 nv_add_uint8(nv, cmd, "cmd"); 1429 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1430 nv_add_uint64(nv, offset, "offset"); 1431 nv_add_uint64(nv, length, "length"); 1432 if (nv_error(nv) != 0) { 1433 hio->hio_errors[ncomp] = nv_error(nv); 1434 pjdlog_debug(2, 1435 "remote_send: (%p) Unable to prepare header to send.", 1436 hio); 1437 reqlog(LOG_ERR, 0, ggio, 1438 "Unable to prepare header to send (%s): ", 1439 strerror(nv_error(nv))); 1440 /* Move failed request immediately to the done queue. */ 1441 goto done_queue; 1442 } 1443 pjdlog_debug(2, 1444 "remote_send: (%p) Moving request to the recv queue.", 1445 hio); 1446 /* 1447 * Protect connection from disappearing. 1448 */ 1449 rw_rlock(&hio_remote_lock[ncomp]); 1450 if (!ISCONNECTED(res, ncomp)) { 1451 rw_unlock(&hio_remote_lock[ncomp]); 1452 hio->hio_errors[ncomp] = ENOTCONN; 1453 goto done_queue; 1454 } 1455 /* 1456 * Move the request to recv queue before sending it, because 1457 * in different order we can get reply before we move request 1458 * to recv queue. 1459 */ 1460 mtx_lock(&hio_recv_list_lock[ncomp]); 1461 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1462 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1463 mtx_unlock(&hio_recv_list_lock[ncomp]); 1464 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1465 data != NULL ? length : 0) < 0) { 1466 hio->hio_errors[ncomp] = errno; 1467 rw_unlock(&hio_remote_lock[ncomp]); 1468 pjdlog_debug(2, 1469 "remote_send: (%p) Unable to send request.", hio); 1470 reqlog(LOG_ERR, 0, ggio, 1471 "Unable to send request (%s): ", 1472 strerror(hio->hio_errors[ncomp])); 1473 remote_close(res, ncomp); 1474 /* 1475 * Take request back from the receive queue and move 1476 * it immediately to the done queue. 1477 */ 1478 mtx_lock(&hio_recv_list_lock[ncomp]); 1479 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1480 mtx_unlock(&hio_recv_list_lock[ncomp]); 1481 goto done_queue; 1482 } 1483 rw_unlock(&hio_remote_lock[ncomp]); 1484 nv_free(nv); 1485 if (wakeup) 1486 cv_signal(&hio_recv_list_cond[ncomp]); 1487 continue; 1488done_queue: 1489 nv_free(nv); 1490 if (ISSYNCREQ(hio)) { 1491 if (!refcount_release(&hio->hio_countdown)) 1492 continue; 1493 mtx_lock(&sync_lock); 1494 SYNCREQDONE(hio); 1495 mtx_unlock(&sync_lock); 1496 cv_signal(&sync_cond); 1497 continue; 1498 } 1499 if (ggio->gctl_cmd == BIO_WRITE) { 1500 mtx_lock(&res->hr_amp_lock); 1501 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1502 ggio->gctl_length)) { 1503 (void)hast_activemap_flush(res); 1504 } 1505 mtx_unlock(&res->hr_amp_lock); 1506 } 1507 if (!refcount_release(&hio->hio_countdown)) 1508 continue; 1509 pjdlog_debug(2, 1510 "remote_send: (%p) Moving request to the done queue.", 1511 hio); 1512 QUEUE_INSERT2(hio, done); 1513 } 1514 /* NOTREACHED */ 1515 return (NULL); 1516} 1517 1518/* 1519 * Thread receives answer from secondary node and passes it to ggate_send 1520 * thread. 1521 */ 1522static void * 1523remote_recv_thread(void *arg) 1524{ 1525 struct hast_resource *res = arg; 1526 struct g_gate_ctl_io *ggio; 1527 struct hio *hio; 1528 struct nv *nv; 1529 unsigned int ncomp; 1530 uint64_t seq; 1531 int error; 1532 1533 /* Remote component is 1 for now. */ 1534 ncomp = 1; 1535 1536 for (;;) { 1537 /* Wait until there is anything to receive. */ 1538 mtx_lock(&hio_recv_list_lock[ncomp]); 1539 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1540 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1541 cv_wait(&hio_recv_list_cond[ncomp], 1542 &hio_recv_list_lock[ncomp]); 1543 } 1544 mtx_unlock(&hio_recv_list_lock[ncomp]); 1545 rw_rlock(&hio_remote_lock[ncomp]); 1546 if (!ISCONNECTED(res, ncomp)) { 1547 rw_unlock(&hio_remote_lock[ncomp]); 1548 /* 1549 * Connection is dead, so move all pending requests to 1550 * the done queue (one-by-one). 1551 */ 1552 mtx_lock(&hio_recv_list_lock[ncomp]); 1553 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1554 PJDLOG_ASSERT(hio != NULL); 1555 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1556 hio_next[ncomp]); 1557 mtx_unlock(&hio_recv_list_lock[ncomp]); 1558 goto done_queue; 1559 } 1560 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1561 pjdlog_errno(LOG_ERR, 1562 "Unable to receive reply header"); 1563 rw_unlock(&hio_remote_lock[ncomp]); 1564 remote_close(res, ncomp); 1565 continue; 1566 } 1567 rw_unlock(&hio_remote_lock[ncomp]); 1568 seq = nv_get_uint64(nv, "seq"); 1569 if (seq == 0) { 1570 pjdlog_error("Header contains no 'seq' field."); 1571 nv_free(nv); 1572 continue; 1573 } 1574 mtx_lock(&hio_recv_list_lock[ncomp]); 1575 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1576 if (hio->hio_ggio.gctl_seq == seq) { 1577 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1578 hio_next[ncomp]); 1579 break; 1580 } 1581 } 1582 mtx_unlock(&hio_recv_list_lock[ncomp]); 1583 if (hio == NULL) { 1584 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1585 (uintmax_t)seq); 1586 nv_free(nv); 1587 continue; 1588 } 1589 error = nv_get_int16(nv, "error"); 1590 if (error != 0) { 1591 /* Request failed on remote side. */ 1592 hio->hio_errors[ncomp] = error; 1593 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1594 "Remote request failed (%s): ", strerror(error)); 1595 nv_free(nv); 1596 goto done_queue; 1597 } 1598 ggio = &hio->hio_ggio; 1599 switch (ggio->gctl_cmd) { 1600 case BIO_READ: 1601 rw_rlock(&hio_remote_lock[ncomp]); 1602 if (!ISCONNECTED(res, ncomp)) { 1603 rw_unlock(&hio_remote_lock[ncomp]); 1604 nv_free(nv); 1605 goto done_queue; 1606 } 1607 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1608 ggio->gctl_data, ggio->gctl_length) < 0) { 1609 hio->hio_errors[ncomp] = errno; 1610 pjdlog_errno(LOG_ERR, 1611 "Unable to receive reply data"); 1612 rw_unlock(&hio_remote_lock[ncomp]); 1613 nv_free(nv); 1614 remote_close(res, ncomp); 1615 goto done_queue; 1616 } 1617 rw_unlock(&hio_remote_lock[ncomp]); 1618 break; 1619 case BIO_WRITE: 1620 case BIO_DELETE: 1621 case BIO_FLUSH: 1622 break; 1623 default: 1624 PJDLOG_ASSERT(!"invalid condition"); 1625 abort(); 1626 } 1627 hio->hio_errors[ncomp] = 0; 1628 nv_free(nv); 1629done_queue: 1630 if (refcount_release(&hio->hio_countdown)) { 1631 if (ISSYNCREQ(hio)) { 1632 mtx_lock(&sync_lock); 1633 SYNCREQDONE(hio); 1634 mtx_unlock(&sync_lock); 1635 cv_signal(&sync_cond); 1636 } else { 1637 pjdlog_debug(2, 1638 "remote_recv: (%p) Moving request to the done queue.", 1639 hio); 1640 QUEUE_INSERT2(hio, done); 1641 } 1642 } 1643 } 1644 /* NOTREACHED */ 1645 return (NULL); 1646} 1647 1648/* 1649 * Thread sends answer to the kernel. 1650 */ 1651static void * 1652ggate_send_thread(void *arg) 1653{ 1654 struct hast_resource *res = arg; 1655 struct g_gate_ctl_io *ggio; 1656 struct hio *hio; 1657 unsigned int ii, ncomp, ncomps; 1658 1659 ncomps = HAST_NCOMPONENTS; 1660 1661 for (;;) { 1662 pjdlog_debug(2, "ggate_send: Taking request."); 1663 QUEUE_TAKE2(hio, done); 1664 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1665 ggio = &hio->hio_ggio; 1666 for (ii = 0; ii < ncomps; ii++) { 1667 if (hio->hio_errors[ii] == 0) { 1668 /* 1669 * One successful request is enough to declare 1670 * success. 1671 */ 1672 ggio->gctl_error = 0; 1673 break; 1674 } 1675 } 1676 if (ii == ncomps) { 1677 /* 1678 * None of the requests were successful. 1679 * Use the error from local component except the 1680 * case when we did only remote request. 1681 */ 1682 if (ggio->gctl_cmd == BIO_READ && 1683 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1684 ggio->gctl_error = hio->hio_errors[1]; 1685 else 1686 ggio->gctl_error = hio->hio_errors[0]; 1687 } 1688 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1689 mtx_lock(&res->hr_amp_lock); 1690 if (activemap_write_complete(res->hr_amp, 1691 ggio->gctl_offset, ggio->gctl_length)) { 1692 res->hr_stat_activemap_update++; 1693 (void)hast_activemap_flush(res); 1694 } 1695 mtx_unlock(&res->hr_amp_lock); 1696 } 1697 if (ggio->gctl_cmd == BIO_WRITE) { 1698 /* 1699 * Unlock range we locked. 1700 */ 1701 mtx_lock(&range_lock); 1702 rangelock_del(range_regular, ggio->gctl_offset, 1703 ggio->gctl_length); 1704 if (range_sync_wait) 1705 cv_signal(&range_sync_cond); 1706 mtx_unlock(&range_lock); 1707 /* 1708 * Bump local count if this is first write after 1709 * connection failure with remote node. 1710 */ 1711 ncomp = 1; 1712 rw_rlock(&hio_remote_lock[ncomp]); 1713 if (!ISCONNECTED(res, ncomp)) { 1714 mtx_lock(&metadata_lock); 1715 if (res->hr_primary_localcnt == 1716 res->hr_secondary_remotecnt) { 1717 res->hr_primary_localcnt++; 1718 pjdlog_debug(1, 1719 "Increasing localcnt to %ju.", 1720 (uintmax_t)res->hr_primary_localcnt); 1721 (void)metadata_write(res); 1722 } 1723 mtx_unlock(&metadata_lock); 1724 } 1725 rw_unlock(&hio_remote_lock[ncomp]); 1726 } 1727 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1728 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1729 pjdlog_debug(2, 1730 "ggate_send: (%p) Moving request to the free queue.", hio); 1731 QUEUE_INSERT2(hio, free); 1732 } 1733 /* NOTREACHED */ 1734 return (NULL); 1735} 1736 1737/* 1738 * Thread synchronize local and remote components. 1739 */ 1740static void * 1741sync_thread(void *arg __unused) 1742{ 1743 struct hast_resource *res = arg; 1744 struct hio *hio; 1745 struct g_gate_ctl_io *ggio; 1746 struct timeval tstart, tend, tdiff; 1747 unsigned int ii, ncomp, ncomps; 1748 off_t offset, length, synced; 1749 bool dorewind; 1750 int syncext; 1751 1752 ncomps = HAST_NCOMPONENTS; 1753 dorewind = true; 1754 synced = 0; 1755 offset = -1; 1756 1757 for (;;) { 1758 mtx_lock(&sync_lock); 1759 if (offset >= 0 && !sync_inprogress) { 1760 gettimeofday(&tend, NULL); 1761 timersub(&tend, &tstart, &tdiff); 1762 pjdlog_info("Synchronization interrupted after %#.0T. " 1763 "%NB synchronized so far.", &tdiff, 1764 (intmax_t)synced); 1765 event_send(res, EVENT_SYNCINTR); 1766 } 1767 while (!sync_inprogress) { 1768 dorewind = true; 1769 synced = 0; 1770 cv_wait(&sync_cond, &sync_lock); 1771 } 1772 mtx_unlock(&sync_lock); 1773 /* 1774 * Obtain offset at which we should synchronize. 1775 * Rewind synchronization if needed. 1776 */ 1777 mtx_lock(&res->hr_amp_lock); 1778 if (dorewind) 1779 activemap_sync_rewind(res->hr_amp); 1780 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1781 if (syncext != -1) { 1782 /* 1783 * We synchronized entire syncext extent, we can mark 1784 * it as clean now. 1785 */ 1786 if (activemap_extent_complete(res->hr_amp, syncext)) 1787 (void)hast_activemap_flush(res); 1788 } 1789 mtx_unlock(&res->hr_amp_lock); 1790 if (dorewind) { 1791 dorewind = false; 1792 if (offset < 0) 1793 pjdlog_info("Nodes are in sync."); 1794 else { 1795 pjdlog_info("Synchronization started. %NB to go.", 1796 (intmax_t)(res->hr_extentsize * 1797 activemap_ndirty(res->hr_amp))); 1798 event_send(res, EVENT_SYNCSTART); 1799 gettimeofday(&tstart, NULL); 1800 } 1801 } 1802 if (offset < 0) { 1803 sync_stop(); 1804 pjdlog_debug(1, "Nothing to synchronize."); 1805 /* 1806 * Synchronization complete, make both localcnt and 1807 * remotecnt equal. 1808 */ 1809 ncomp = 1; 1810 rw_rlock(&hio_remote_lock[ncomp]); 1811 if (ISCONNECTED(res, ncomp)) { 1812 if (synced > 0) { 1813 int64_t bps; 1814 1815 gettimeofday(&tend, NULL); 1816 timersub(&tend, &tstart, &tdiff); 1817 bps = (int64_t)((double)synced / 1818 ((double)tdiff.tv_sec + 1819 (double)tdiff.tv_usec / 1000000)); 1820 pjdlog_info("Synchronization complete. " 1821 "%NB synchronized in %#.0lT (%NB/sec).", 1822 (intmax_t)synced, &tdiff, 1823 (intmax_t)bps); 1824 event_send(res, EVENT_SYNCDONE); 1825 } 1826 mtx_lock(&metadata_lock); 1827 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1828 res->hr_primary_localcnt = 1829 res->hr_secondary_remotecnt; 1830 res->hr_primary_remotecnt = 1831 res->hr_secondary_localcnt; 1832 pjdlog_debug(1, 1833 "Setting localcnt to %ju and remotecnt to %ju.", 1834 (uintmax_t)res->hr_primary_localcnt, 1835 (uintmax_t)res->hr_primary_remotecnt); 1836 (void)metadata_write(res); 1837 mtx_unlock(&metadata_lock); 1838 } 1839 rw_unlock(&hio_remote_lock[ncomp]); 1840 continue; 1841 } 1842 pjdlog_debug(2, "sync: Taking free request."); 1843 QUEUE_TAKE2(hio, free); 1844 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1845 /* 1846 * Lock the range we are going to synchronize. We don't want 1847 * race where someone writes between our read and write. 1848 */ 1849 for (;;) { 1850 mtx_lock(&range_lock); 1851 if (rangelock_islocked(range_regular, offset, length)) { 1852 pjdlog_debug(2, 1853 "sync: Range offset=%jd length=%jd locked.", 1854 (intmax_t)offset, (intmax_t)length); 1855 range_sync_wait = true; 1856 cv_wait(&range_sync_cond, &range_lock); 1857 range_sync_wait = false; 1858 mtx_unlock(&range_lock); 1859 continue; 1860 } 1861 if (rangelock_add(range_sync, offset, length) < 0) { 1862 mtx_unlock(&range_lock); 1863 pjdlog_debug(2, 1864 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1865 (intmax_t)offset, (intmax_t)length); 1866 sleep(1); 1867 continue; 1868 } 1869 mtx_unlock(&range_lock); 1870 break; 1871 } 1872 /* 1873 * First read the data from synchronization source. 1874 */ 1875 SYNCREQ(hio); 1876 ggio = &hio->hio_ggio; 1877 ggio->gctl_cmd = BIO_READ; 1878 ggio->gctl_offset = offset; 1879 ggio->gctl_length = length; 1880 ggio->gctl_error = 0; 1881 for (ii = 0; ii < ncomps; ii++) 1882 hio->hio_errors[ii] = EINVAL; 1883 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1884 hio); 1885 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1886 hio); 1887 mtx_lock(&metadata_lock); 1888 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1889 /* 1890 * This range is up-to-date on local component, 1891 * so handle request locally. 1892 */ 1893 /* Local component is 0 for now. */ 1894 ncomp = 0; 1895 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1896 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1897 /* 1898 * This range is out-of-date on local component, 1899 * so send request to the remote node. 1900 */ 1901 /* Remote component is 1 for now. */ 1902 ncomp = 1; 1903 } 1904 mtx_unlock(&metadata_lock); 1905 refcount_init(&hio->hio_countdown, 1); 1906 QUEUE_INSERT1(hio, send, ncomp); 1907 1908 /* 1909 * Let's wait for READ to finish. 1910 */ 1911 mtx_lock(&sync_lock); 1912 while (!ISSYNCREQDONE(hio)) 1913 cv_wait(&sync_cond, &sync_lock); 1914 mtx_unlock(&sync_lock); 1915 1916 if (hio->hio_errors[ncomp] != 0) { 1917 pjdlog_error("Unable to read synchronization data: %s.", 1918 strerror(hio->hio_errors[ncomp])); 1919 goto free_queue; 1920 } 1921 1922 /* 1923 * We read the data from synchronization source, now write it 1924 * to synchronization target. 1925 */ 1926 SYNCREQ(hio); 1927 ggio->gctl_cmd = BIO_WRITE; 1928 for (ii = 0; ii < ncomps; ii++) 1929 hio->hio_errors[ii] = EINVAL; 1930 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1931 hio); 1932 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1933 hio); 1934 mtx_lock(&metadata_lock); 1935 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1936 /* 1937 * This range is up-to-date on local component, 1938 * so we update remote component. 1939 */ 1940 /* Remote component is 1 for now. */ 1941 ncomp = 1; 1942 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1943 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1944 /* 1945 * This range is out-of-date on local component, 1946 * so we update it. 1947 */ 1948 /* Local component is 0 for now. */ 1949 ncomp = 0; 1950 } 1951 mtx_unlock(&metadata_lock); 1952 1953 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1954 hio); 1955 refcount_init(&hio->hio_countdown, 1); 1956 QUEUE_INSERT1(hio, send, ncomp); 1957 1958 /* 1959 * Let's wait for WRITE to finish. 1960 */ 1961 mtx_lock(&sync_lock); 1962 while (!ISSYNCREQDONE(hio)) 1963 cv_wait(&sync_cond, &sync_lock); 1964 mtx_unlock(&sync_lock); 1965 1966 if (hio->hio_errors[ncomp] != 0) { 1967 pjdlog_error("Unable to write synchronization data: %s.", 1968 strerror(hio->hio_errors[ncomp])); 1969 goto free_queue; 1970 } 1971 1972 synced += length; 1973free_queue: 1974 mtx_lock(&range_lock); 1975 rangelock_del(range_sync, offset, length); 1976 if (range_regular_wait) 1977 cv_signal(&range_regular_cond); 1978 mtx_unlock(&range_lock); 1979 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1980 hio); 1981 QUEUE_INSERT2(hio, free); 1982 } 1983 /* NOTREACHED */ 1984 return (NULL); 1985} 1986 1987void 1988primary_config_reload(struct hast_resource *res, struct nv *nv) 1989{ 1990 unsigned int ii, ncomps; 1991 int modified, vint; 1992 const char *vstr; 1993 1994 pjdlog_info("Reloading configuration..."); 1995 1996 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1997 PJDLOG_ASSERT(gres == res); 1998 nv_assert(nv, "remoteaddr"); 1999 nv_assert(nv, "sourceaddr"); 2000 nv_assert(nv, "replication"); 2001 nv_assert(nv, "checksum"); 2002 nv_assert(nv, "compression"); 2003 nv_assert(nv, "timeout"); 2004 nv_assert(nv, "exec"); 2005 2006 ncomps = HAST_NCOMPONENTS; 2007 2008#define MODIFIED_REMOTEADDR 0x01 2009#define MODIFIED_SOURCEADDR 0x02 2010#define MODIFIED_REPLICATION 0x04 2011#define MODIFIED_CHECKSUM 0x08 2012#define MODIFIED_COMPRESSION 0x10 2013#define MODIFIED_TIMEOUT 0x20 2014#define MODIFIED_EXEC 0x40 2015 modified = 0; 2016 2017 vstr = nv_get_string(nv, "remoteaddr"); 2018 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2019 /* 2020 * Don't copy res->hr_remoteaddr to gres just yet. 2021 * We want remote_close() to log disconnect from the old 2022 * addresses, not from the new ones. 2023 */ 2024 modified |= MODIFIED_REMOTEADDR; 2025 } 2026 vstr = nv_get_string(nv, "sourceaddr"); 2027 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2028 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2029 modified |= MODIFIED_SOURCEADDR; 2030 } 2031 vint = nv_get_int32(nv, "replication"); 2032 if (gres->hr_replication != vint) { 2033 gres->hr_replication = vint; 2034 modified |= MODIFIED_REPLICATION; 2035 } 2036 vint = nv_get_int32(nv, "checksum"); 2037 if (gres->hr_checksum != vint) { 2038 gres->hr_checksum = vint; 2039 modified |= MODIFIED_CHECKSUM; 2040 } 2041 vint = nv_get_int32(nv, "compression"); 2042 if (gres->hr_compression != vint) { 2043 gres->hr_compression = vint; 2044 modified |= MODIFIED_COMPRESSION; 2045 } 2046 vint = nv_get_int32(nv, "timeout"); 2047 if (gres->hr_timeout != vint) { 2048 gres->hr_timeout = vint; 2049 modified |= MODIFIED_TIMEOUT; 2050 } 2051 vstr = nv_get_string(nv, "exec"); 2052 if (strcmp(gres->hr_exec, vstr) != 0) { 2053 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2054 modified |= MODIFIED_EXEC; 2055 } 2056 2057 /* 2058 * Change timeout for connected sockets. 2059 * Don't bother if we need to reconnect. 2060 */ 2061 if ((modified & MODIFIED_TIMEOUT) != 0 && 2062 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2063 MODIFIED_REPLICATION)) == 0) { 2064 for (ii = 0; ii < ncomps; ii++) { 2065 if (!ISREMOTE(ii)) 2066 continue; 2067 rw_rlock(&hio_remote_lock[ii]); 2068 if (!ISCONNECTED(gres, ii)) { 2069 rw_unlock(&hio_remote_lock[ii]); 2070 continue; 2071 } 2072 rw_unlock(&hio_remote_lock[ii]); 2073 if (proto_timeout(gres->hr_remotein, 2074 gres->hr_timeout) < 0) { 2075 pjdlog_errno(LOG_WARNING, 2076 "Unable to set connection timeout"); 2077 } 2078 if (proto_timeout(gres->hr_remoteout, 2079 gres->hr_timeout) < 0) { 2080 pjdlog_errno(LOG_WARNING, 2081 "Unable to set connection timeout"); 2082 } 2083 } 2084 } 2085 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2086 MODIFIED_REPLICATION)) != 0) { 2087 for (ii = 0; ii < ncomps; ii++) { 2088 if (!ISREMOTE(ii)) 2089 continue; 2090 remote_close(gres, ii); 2091 } 2092 if (modified & MODIFIED_REMOTEADDR) { 2093 vstr = nv_get_string(nv, "remoteaddr"); 2094 strlcpy(gres->hr_remoteaddr, vstr, 2095 sizeof(gres->hr_remoteaddr)); 2096 } 2097 } 2098#undef MODIFIED_REMOTEADDR 2099#undef MODIFIED_SOURCEADDR 2100#undef MODIFIED_REPLICATION 2101#undef MODIFIED_CHECKSUM 2102#undef MODIFIED_COMPRESSION 2103#undef MODIFIED_TIMEOUT 2104#undef MODIFIED_EXEC 2105 2106 pjdlog_info("Configuration reloaded successfully."); 2107} 2108 2109static void 2110guard_one(struct hast_resource *res, unsigned int ncomp) 2111{ 2112 struct proto_conn *in, *out; 2113 2114 if (!ISREMOTE(ncomp)) 2115 return; 2116 2117 rw_rlock(&hio_remote_lock[ncomp]); 2118 2119 if (!real_remote(res)) { 2120 rw_unlock(&hio_remote_lock[ncomp]); 2121 return; 2122 } 2123 2124 if (ISCONNECTED(res, ncomp)) { 2125 PJDLOG_ASSERT(res->hr_remotein != NULL); 2126 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2127 rw_unlock(&hio_remote_lock[ncomp]); 2128 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2129 res->hr_remoteaddr); 2130 return; 2131 } 2132 2133 PJDLOG_ASSERT(res->hr_remotein == NULL); 2134 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2135 /* 2136 * Upgrade the lock. It doesn't have to be atomic as no other thread 2137 * can change connection status from disconnected to connected. 2138 */ 2139 rw_unlock(&hio_remote_lock[ncomp]); 2140 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2141 res->hr_remoteaddr); 2142 in = out = NULL; 2143 if (init_remote(res, &in, &out) == 0) { 2144 rw_wlock(&hio_remote_lock[ncomp]); 2145 PJDLOG_ASSERT(res->hr_remotein == NULL); 2146 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2147 PJDLOG_ASSERT(in != NULL && out != NULL); 2148 res->hr_remotein = in; 2149 res->hr_remoteout = out; 2150 rw_unlock(&hio_remote_lock[ncomp]); 2151 pjdlog_info("Successfully reconnected to %s.", 2152 res->hr_remoteaddr); 2153 sync_start(); 2154 } else { 2155 /* Both connections should be NULL. */ 2156 PJDLOG_ASSERT(res->hr_remotein == NULL); 2157 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2158 PJDLOG_ASSERT(in == NULL && out == NULL); 2159 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2160 res->hr_remoteaddr); 2161 } 2162} 2163 2164/* 2165 * Thread guards remote connections and reconnects when needed, handles 2166 * signals, etc. 2167 */ 2168static void * 2169guard_thread(void *arg) 2170{ 2171 struct hast_resource *res = arg; 2172 unsigned int ii, ncomps; 2173 struct timespec timeout; 2174 time_t lastcheck, now; 2175 sigset_t mask; 2176 int signo; 2177 2178 ncomps = HAST_NCOMPONENTS; 2179 lastcheck = time(NULL); 2180 2181 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2182 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2183 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2184 2185 timeout.tv_sec = HAST_KEEPALIVE; 2186 timeout.tv_nsec = 0; 2187 signo = -1; 2188 2189 for (;;) { 2190 switch (signo) { 2191 case SIGINT: 2192 case SIGTERM: 2193 sigexit_received = true; 2194 primary_exitx(EX_OK, 2195 "Termination signal received, exiting."); 2196 break; 2197 default: 2198 break; 2199 } 2200 2201 /* 2202 * Don't check connections until we fully started, 2203 * as we may still be looping, waiting for remote node 2204 * to switch from primary to secondary. 2205 */ 2206 if (fullystarted) { 2207 pjdlog_debug(2, "remote_guard: Checking connections."); 2208 now = time(NULL); 2209 if (lastcheck + HAST_KEEPALIVE <= now) { 2210 for (ii = 0; ii < ncomps; ii++) 2211 guard_one(res, ii); 2212 lastcheck = now; 2213 } 2214 } 2215 signo = sigtimedwait(&mask, NULL, &timeout); 2216 } 2217 /* NOTREACHED */ 2218 return (NULL); 2219} 2220