primary.c revision 225832
153642Sguido/*- 2255332Scy * Copyright (c) 2009 The FreeBSD Foundation 353642Sguido * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 480482Sdarrenr * All rights reserved. 553642Sguido * 653642Sguido * This software was developed by Pawel Jakub Dawidek under sponsorship from 757126Sguido * the FreeBSD Foundation. 8172776Sdarrenr * 953642Sguido * Redistribution and use in source and binary forms, with or without 1053642Sguido * modification, are permitted provided that the following conditions 1153642Sguido * are met: 1253642Sguido * 1. Redistributions of source code must retain the above copyright 1353642Sguido * notice, this list of conditions and the following disclaimer. 1453642Sguido * 2. Redistributions in binary form must reproduce the above copyright 1553642Sguido * notice, this list of conditions and the following disclaimer in the 1653642Sguido * documentation and/or other materials provided with the distribution. 1753642Sguido * 1853642Sguido * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 1953642Sguido * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 2053642Sguido * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2153642Sguido * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 2253642Sguido * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23305138Sdim * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24344833Scy * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25305138Sdim * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26305138Sdim * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27305138Sdim * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28305138Sdim * SUCH DAMAGE. 29305138Sdim */ 30145522Sdarrenr 31145522Sdarrenr#include <sys/cdefs.h> 32344833Scy__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 225832 2011-09-28 13:19:47Z pjd $"); 33145522Sdarrenr 34145522Sdarrenr#include <sys/types.h> 3553642Sguido#include <sys/time.h> 3653642Sguido#include <sys/bio.h> 3753642Sguido#include <sys/disk.h> 3853642Sguido#include <sys/refcount.h> 3953642Sguido#include <sys/stat.h> 4053642Sguido 41145522Sdarrenr#include <geom/gate/g_gate.h> 42145522Sdarrenr 4392685Sdarrenr#include <err.h> 4492685Sdarrenr#include <errno.h> 4592685Sdarrenr#include <fcntl.h> 4692685Sdarrenr#include <libgeom.h> 4792685Sdarrenr#include <pthread.h> 4892685Sdarrenr#include <signal.h> 4992685Sdarrenr#include <stdint.h> 5092685Sdarrenr#include <stdio.h> 5192685Sdarrenr#include <string.h> 5292685Sdarrenr#include <sysexits.h> 53145522Sdarrenr#include <unistd.h> 5453642Sguido 5553642Sguido#include <activemap.h> 5692685Sdarrenr#include <nv.h> 5792685Sdarrenr#include <rangelock.h> 5892685Sdarrenr 59369246Scy#include "control.h" 60369246Scy#include "event.h" 61369246Scy#include "hast.h" 62369246Scy#include "hast_proto.h" 63369246Scy#include "hastd.h" 64369246Scy#include "hooks.h" 65369246Scy#include "metadata.h" 66369246Scy#include "proto.h" 67369246Scy#include "pjdlog.h" 68369246Scy#include "subr.h" 6992685Sdarrenr#include "synch.h" 70255332Scy 71255332Scy/* The is only one remote component for now. */ 72255332Scy#define ISREMOTE(no) ((no) == 1) 73255332Scy 74255332Scystruct hio { 75255332Scy /* 76145522Sdarrenr * Number of components we are still waiting for. 77145522Sdarrenr * When this field goes to 0, we can send the request back to the 78145522Sdarrenr * kernel. Each component has to decrease this counter by one 79145522Sdarrenr * even on failure. 80145522Sdarrenr */ 81369246Scy unsigned int hio_countdown; 82369246Scy /* 83369246Scy * Each component has a place to store its own error. 84369246Scy * Once the request is handled by all components we can decide if the 85369246Scy * request overall is successful or not. 86369246Scy */ 8760857Sdarrenr int *hio_errors; 88369246Scy /* 89369246Scy * Structure used to communicate with GEOM Gate class. 90369246Scy */ 91369246Scy struct g_gate_ctl_io hio_ggio; 92369246Scy TAILQ_ENTRY(hio) *hio_next; 93145522Sdarrenr}; 94369246Scy#define hio_free_next hio_next[0] 95369246Scy#define hio_done_next hio_next[0] 96369246Scy 97369246Scy/* 98255332Scy * Free list holds unused structures. When free list is empty, we have to wait 9953642Sguido * until some in-progress requests are freed. 100145522Sdarrenr */ 101145522Sdarrenrstatic TAILQ_HEAD(, hio) hio_free_list; 10253642Sguidostatic pthread_mutex_t hio_free_list_lock; 103369246Scystatic pthread_cond_t hio_free_list_cond; 104369246Scy/* 105369246Scy * There is one send list for every component. One requests is placed on all 106369246Scy * send lists - each component gets the same request, but each component is 107369246Scy * responsible for managing his own send list. 108369246Scy */ 109145522Sdarrenrstatic TAILQ_HEAD(, hio) *hio_send_list; 110369246Scystatic pthread_mutex_t *hio_send_list_lock; 111369246Scystatic pthread_cond_t *hio_send_list_cond; 112369246Scy/* 113145522Sdarrenr * There is one recv list for every component, although local components don't 114369246Scy * use recv lists as local requests are done synchronously. 115369246Scy */ 116369246Scystatic TAILQ_HEAD(, hio) *hio_recv_list; 117369246Scystatic pthread_mutex_t *hio_recv_list_lock; 118369246Scystatic pthread_cond_t *hio_recv_list_cond; 119369246Scy/* 120369246Scy * Request is placed on done list by the slowest component (the one that 121369246Scy * decreased hio_countdown from 1 to 0). 122369246Scy */ 123281192Sglebiusstatic TAILQ_HEAD(, hio) hio_done_list; 124281192Sglebiusstatic pthread_mutex_t hio_done_list_lock; 125369246Scystatic pthread_cond_t hio_done_list_cond; 126369246Scy/* 127369246Scy * Structure below are for interaction with sync thread. 128369246Scy */ 129369246Scystatic bool sync_inprogress; 130369246Scystatic pthread_mutex_t sync_lock; 131369246Scystatic pthread_cond_t sync_cond; 132172776Sdarrenr/* 133255332Scy * The lock below allows to synchornize access to remote connections. 134255332Scy */ 135172776Sdarrenrstatic pthread_rwlock_t *hio_remote_lock; 136172776Sdarrenr 137369246Scy/* 138369246Scy * Lock to synchronize metadata updates. Also synchronize access to 139145522Sdarrenr * hr_primary_localcnt and hr_primary_remotecnt fields. 140369246Scy */ 141369246Scystatic pthread_mutex_t metadata_lock; 142369246Scy 143145522Sdarrenr/* 144369246Scy * Maximum number of outstanding I/O requests. 145369246Scy */ 146369246Scy#define HAST_HIO_MAX 256 147369246Scy/* 148145522Sdarrenr * Number of components. At this point there are only two components: local 149369246Scy * and remote, but in the future it might be possible to use multiple local 150369246Scy * and remote components. 151255332Scy */ 152255332Scy#define HAST_NCOMPONENTS 2 153255332Scy 154255332Scy#define ISCONNECTED(res, no) \ 155255332Scy ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156369246Scy 157369246Scy#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158255332Scy bool _wakeup; \ 159369246Scy \ 160255332Scy mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161369246Scy _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162369246Scy TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163369246Scy hio_next[(ncomp)]); \ 164369246Scy mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165369246Scy if (_wakeup) \ 166369246Scy cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167369246Scy} while (0) 168369246Scy#define QUEUE_INSERT2(hio, name) do { \ 169369246Scy bool _wakeup; \ 170369246Scy \ 171369246Scy mtx_lock(&hio_##name##_list_lock); \ 172369246Scy _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173369246Scy TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174369246Scy mtx_unlock(&hio_##name##_list_lock); \ 175369246Scy if (_wakeup) \ 176369246Scy cv_signal(&hio_##name##_list_cond); \ 177369246Scy} while (0) 178145522Sdarrenr#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 179145522Sdarrenr bool _last; \ 180281192Sglebius \ 181369246Scy mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 182281192Sglebius _last = false; \ 183281192Sglebius while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 184281192Sglebius cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 185281192Sglebius &hio_##name##_list_lock[(ncomp)], (timeout)); \ 186281192Sglebius if ((timeout) != 0) \ 187281192Sglebius _last = true; \ 188281192Sglebius } \ 189281192Sglebius if (hio != NULL) { \ 190281192Sglebius TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 191281192Sglebius hio_next[(ncomp)]); \ 192281192Sglebius } \ 193369246Scy mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 194281192Sglebius} while (0) 195145522Sdarrenr#define QUEUE_TAKE2(hio, name) do { \ 196369246Scy mtx_lock(&hio_##name##_list_lock); \ 197369246Scy while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 198369246Scy cv_wait(&hio_##name##_list_cond, \ 199369246Scy &hio_##name##_list_lock); \ 200369246Scy } \ 201281192Sglebius TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 202145522Sdarrenr mtx_unlock(&hio_##name##_list_lock); \ 203145522Sdarrenr} while (0) 204145522Sdarrenr 205145522Sdarrenr#define SYNCREQ(hio) do { \ 206145522Sdarrenr (hio)->hio_ggio.gctl_unit = -1; \ 207145522Sdarrenr (hio)->hio_ggio.gctl_seq = 1; \ 208145522Sdarrenr} while (0) 209145522Sdarrenr#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 210145522Sdarrenr#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 211145522Sdarrenr#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 212145522Sdarrenr 213145522Sdarrenrstatic struct hast_resource *gres; 214145522Sdarrenr 215145522Sdarrenrstatic pthread_mutex_t range_lock; 216145522Sdarrenrstatic struct rangelocks *range_regular; 217153876Sguidostatic bool range_regular_wait; 218153876Sguidostatic pthread_cond_t range_regular_cond; 219153876Sguidostatic struct rangelocks *range_sync; 220153876Sguidostatic bool range_sync_wait; 221153876Sguidostatic pthread_cond_t range_sync_cond; 222145522Sdarrenrstatic bool fullystarted; 223145522Sdarrenr 224145522Sdarrenrstatic void *ggate_recv_thread(void *arg); 225145522Sdarrenrstatic void *local_send_thread(void *arg); 226145522Sdarrenrstatic void *remote_send_thread(void *arg); 227255332Scystatic void *remote_recv_thread(void *arg); 228369246Scystatic void *ggate_send_thread(void *arg); 229369246Scystatic void *sync_thread(void *arg); 230369246Scystatic void *guard_thread(void *arg); 231369246Scy 232369246Scystatic void 233369246Scycleanup(struct hast_resource *res) 234255332Scy{ 235255332Scy int rerrno; 236145522Sdarrenr 237145522Sdarrenr /* Remember errno. */ 238145522Sdarrenr rerrno = errno; 239145522Sdarrenr 240145522Sdarrenr /* Destroy ggate provider if we created one. */ 241145522Sdarrenr if (res->hr_ggateunit >= 0) { 242145522Sdarrenr struct g_gate_ctl_destroy ggiod; 243145522Sdarrenr 244145522Sdarrenr bzero(&ggiod, sizeof(ggiod)); 245145522Sdarrenr ggiod.gctl_version = G_GATE_VERSION; 246145522Sdarrenr ggiod.gctl_unit = res->hr_ggateunit; 247145522Sdarrenr ggiod.gctl_force = 1; 248145522Sdarrenr if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 249145522Sdarrenr pjdlog_errno(LOG_WARNING, 250145522Sdarrenr "Unable to destroy hast/%s device", 251145522Sdarrenr res->hr_provname); 252145522Sdarrenr } 253145522Sdarrenr res->hr_ggateunit = -1; 254255332Scy } 255145522Sdarrenr 256145522Sdarrenr /* Restore errno. */ 257145522Sdarrenr errno = rerrno; 258255332Scy} 259145522Sdarrenr 260145522Sdarrenrstatic __dead2 void 261145522Sdarrenrprimary_exit(int exitcode, const char *fmt, ...) 262145522Sdarrenr{ 263145522Sdarrenr va_list ap; 264145522Sdarrenr 265255332Scy PJDLOG_ASSERT(exitcode != EX_OK); 266145522Sdarrenr va_start(ap, fmt); 267145522Sdarrenr pjdlogv_errno(LOG_ERR, fmt, ap); 268145522Sdarrenr va_end(ap); 269255332Scy cleanup(gres); 270145522Sdarrenr exit(exitcode); 271145522Sdarrenr} 272145522Sdarrenr 273145522Sdarrenrstatic __dead2 void 274145522Sdarrenrprimary_exitx(int exitcode, const char *fmt, ...) 275145522Sdarrenr{ 276145522Sdarrenr va_list ap; 277145522Sdarrenr 278145522Sdarrenr va_start(ap, fmt); 279145522Sdarrenr pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 280145522Sdarrenr va_end(ap); 281145522Sdarrenr cleanup(gres); 282145522Sdarrenr exit(exitcode); 283145522Sdarrenr} 284344833Scy 285145522Sdarrenrstatic int 286145522Sdarrenrhast_activemap_flush(struct hast_resource *res) 287145522Sdarrenr{ 288145522Sdarrenr const unsigned char *buf; 289145522Sdarrenr size_t size; 290145522Sdarrenr 291145522Sdarrenr buf = activemap_bitmap(res->hr_amp, &size); 292255332Scy PJDLOG_ASSERT(buf != NULL); 293255332Scy PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 294255332Scy if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 295145522Sdarrenr (ssize_t)size) { 296145522Sdarrenr pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 297255332Scy return (-1); 298145522Sdarrenr } 299145522Sdarrenr if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 300145522Sdarrenr if (errno == EOPNOTSUPP) { 301145522Sdarrenr pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 302145522Sdarrenr res->hr_localpath); 303145522Sdarrenr res->hr_metaflush = 0; 304145522Sdarrenr } else { 305145522Sdarrenr pjdlog_errno(LOG_ERR, 306255332Scy "Unable to flush disk cache on activemap update"); 307255332Scy return (-1); 308145522Sdarrenr } 309255332Scy } 310145522Sdarrenr return (0); 311145522Sdarrenr} 312145522Sdarrenr 313145522Sdarrenrstatic bool 314255332Scyreal_remote(const struct hast_resource *res) 315255332Scy{ 316255332Scy 317255332Scy return (strcmp(res->hr_remoteaddr, "none") != 0); 318255332Scy} 319255332Scy 320255332Scystatic void 321255332Scyinit_environment(struct hast_resource *res __unused) 322255332Scy{ 323255332Scy struct hio *hio; 324255332Scy unsigned int ii, ncomps; 325255332Scy 326255332Scy /* 327255332Scy * In the future it might be per-resource value. 328255332Scy */ 329255332Scy ncomps = HAST_NCOMPONENTS; 330255332Scy 331145522Sdarrenr /* 332255332Scy * Allocate memory needed by lists. 333255332Scy */ 334255332Scy hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 335255332Scy if (hio_send_list == NULL) { 336255332Scy primary_exitx(EX_TEMPFAIL, 337255332Scy "Unable to allocate %zu bytes of memory for send lists.", 338255332Scy sizeof(hio_send_list[0]) * ncomps); 339255332Scy } 340145522Sdarrenr hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 341145522Sdarrenr if (hio_send_list_lock == NULL) { 342161356Sguido primary_exitx(EX_TEMPFAIL, 343145522Sdarrenr "Unable to allocate %zu bytes of memory for send list locks.", 344145522Sdarrenr sizeof(hio_send_list_lock[0]) * ncomps); 345170268Sdarrenr } 346145522Sdarrenr hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 347145522Sdarrenr if (hio_send_list_cond == NULL) { 348145522Sdarrenr primary_exitx(EX_TEMPFAIL, 349145522Sdarrenr "Unable to allocate %zu bytes of memory for send list condition variables.", 350145522Sdarrenr sizeof(hio_send_list_cond[0]) * ncomps); 351145522Sdarrenr } 352145522Sdarrenr hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 353145522Sdarrenr if (hio_recv_list == NULL) { 354255332Scy primary_exitx(EX_TEMPFAIL, 355255332Scy "Unable to allocate %zu bytes of memory for recv lists.", 356153876Sguido sizeof(hio_recv_list[0]) * ncomps); 357153876Sguido } 358145522Sdarrenr hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 359145522Sdarrenr if (hio_recv_list_lock == NULL) { 360145522Sdarrenr primary_exitx(EX_TEMPFAIL, 361145522Sdarrenr "Unable to allocate %zu bytes of memory for recv list locks.", 362145522Sdarrenr sizeof(hio_recv_list_lock[0]) * ncomps); 363172776Sdarrenr } 364369245Sgit2svn hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 365369245Sgit2svn if (hio_recv_list_cond == NULL) { 366369245Sgit2svn primary_exitx(EX_TEMPFAIL, 367369245Sgit2svn "Unable to allocate %zu bytes of memory for recv list condition variables.", 368369245Sgit2svn sizeof(hio_recv_list_cond[0]) * ncomps); 369369245Sgit2svn } 370145522Sdarrenr hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 371255332Scy if (hio_remote_lock == NULL) { 372255332Scy primary_exitx(EX_TEMPFAIL, 373145522Sdarrenr "Unable to allocate %zu bytes of memory for remote connections locks.", 374145522Sdarrenr sizeof(hio_remote_lock[0]) * ncomps); 375255332Scy } 376255332Scy 377255332Scy /* 378255332Scy * Initialize lists, their locks and theirs condition variables. 379145522Sdarrenr */ 380145522Sdarrenr TAILQ_INIT(&hio_free_list); 381145522Sdarrenr mtx_init(&hio_free_list_lock); 382145522Sdarrenr cv_init(&hio_free_list_cond); 383314251Scy for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 384314251Scy TAILQ_INIT(&hio_send_list[ii]); 385145522Sdarrenr mtx_init(&hio_send_list_lock[ii]); 386145522Sdarrenr cv_init(&hio_send_list_cond[ii]); 387145522Sdarrenr TAILQ_INIT(&hio_recv_list[ii]); 388145522Sdarrenr mtx_init(&hio_recv_list_lock[ii]); 389145522Sdarrenr cv_init(&hio_recv_list_cond[ii]); 390145522Sdarrenr rw_init(&hio_remote_lock[ii]); 391145522Sdarrenr } 392145522Sdarrenr TAILQ_INIT(&hio_done_list); 393145522Sdarrenr mtx_init(&hio_done_list_lock); 394145522Sdarrenr cv_init(&hio_done_list_cond); 395145522Sdarrenr mtx_init(&metadata_lock); 396369245Sgit2svn 397369245Sgit2svn /* 398369245Sgit2svn * Allocate requests pool and initialize requests. 399369245Sgit2svn */ 400369245Sgit2svn for (ii = 0; ii < HAST_HIO_MAX; ii++) { 401369245Sgit2svn hio = malloc(sizeof(*hio)); 402369245Sgit2svn if (hio == NULL) { 403369245Sgit2svn primary_exitx(EX_TEMPFAIL, 404369245Sgit2svn "Unable to allocate %zu bytes of memory for hio request.", 405369245Sgit2svn sizeof(*hio)); 406145522Sdarrenr } 407145522Sdarrenr hio->hio_countdown = 0; 408145522Sdarrenr hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 409255332Scy if (hio->hio_errors == NULL) { 410255332Scy primary_exitx(EX_TEMPFAIL, 411145522Sdarrenr "Unable allocate %zu bytes of memory for hio errors.", 412145522Sdarrenr sizeof(hio->hio_errors[0]) * ncomps); 413145522Sdarrenr } 414145522Sdarrenr hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 415145522Sdarrenr if (hio->hio_next == NULL) { 416145522Sdarrenr primary_exitx(EX_TEMPFAIL, 417145522Sdarrenr "Unable allocate %zu bytes of memory for hio_next field.", 418145522Sdarrenr sizeof(hio->hio_next[0]) * ncomps); 419145522Sdarrenr } 420145522Sdarrenr hio->hio_ggio.gctl_version = G_GATE_VERSION; 421145522Sdarrenr hio->hio_ggio.gctl_data = malloc(MAXPHYS); 422358666Scy if (hio->hio_ggio.gctl_data == NULL) { 423145522Sdarrenr primary_exitx(EX_TEMPFAIL, 424145640Sdarrenr "Unable to allocate %zu bytes of memory for gctl_data.", 425145640Sdarrenr MAXPHYS); 426369273Scy } 427255332Scy hio->hio_ggio.gctl_length = MAXPHYS; 428255332Scy hio->hio_ggio.gctl_error = 0; 429255332Scy TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 430255332Scy } 431369273Scy} 432145522Sdarrenr 433145522Sdarrenrstatic bool 43460857Sdarrenrinit_resuid(struct hast_resource *res) 435344833Scy{ 43660857Sdarrenr 437145522Sdarrenr mtx_lock(&metadata_lock); 438344833Scy if (res->hr_resuid != 0) { 439145522Sdarrenr mtx_unlock(&metadata_lock); 440145522Sdarrenr return (false); 44160857Sdarrenr } else { 44260857Sdarrenr /* Initialize unique resource identifier. */ 44360857Sdarrenr arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 44460857Sdarrenr mtx_unlock(&metadata_lock); 44553642Sguido if (metadata_write(res) < 0) 446145522Sdarrenr exit(EX_NOINPUT); 44753642Sguido return (true); 44853642Sguido } 449145522Sdarrenr} 450369272Scy 451145522Sdarrenrstatic void 452145522Sdarrenrinit_local(struct hast_resource *res) 453145522Sdarrenr{ 454145522Sdarrenr unsigned char *buf; 455145522Sdarrenr size_t mapsize; 456145522Sdarrenr 457369246Scy if (metadata_read(res, true) < 0) 458369246Scy exit(EX_NOINPUT); 459369246Scy mtx_init(&res->hr_amp_lock); 460369246Scy if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 461369246Scy res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 462369246Scy primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 463369246Scy } 464369246Scy mtx_init(&range_lock); 465145522Sdarrenr cv_init(&range_regular_cond); 466369246Scy if (rangelock_init(&range_regular) < 0) 467369246Scy primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 468145522Sdarrenr cv_init(&range_sync_cond); 469369246Scy if (rangelock_init(&range_sync) < 0) 470369246Scy primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 471145522Sdarrenr mapsize = activemap_ondisk_size(res->hr_amp); 472369246Scy buf = calloc(1, mapsize); 473369246Scy if (buf == NULL) { 474369246Scy primary_exitx(EX_TEMPFAIL, 475369246Scy "Unable to allocate buffer for activemap."); 476369246Scy } 477369246Scy if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 478145522Sdarrenr (ssize_t)mapsize) { 479145522Sdarrenr primary_exit(EX_NOINPUT, "Unable to read activemap"); 480369246Scy } 481369246Scy activemap_copyin(res->hr_amp, buf, mapsize); 482369246Scy free(buf); 483369246Scy if (res->hr_resuid != 0) 484369246Scy return; 485369246Scy /* 486369246Scy * We're using provider for the first time. Initialize local and remote 487369246Scy * counters. We don't initialize resuid here, as we want to do it just 488369246Scy * in time. The reason for this is that we want to inform secondary 489369246Scy * that there were no writes yet, so there is no need to synchronize 490369246Scy * anything. 491369246Scy */ 492369246Scy res->hr_primary_localcnt = 0; 493369246Scy res->hr_primary_remotecnt = 0; 494369246Scy if (metadata_write(res) < 0) 495369246Scy exit(EX_NOINPUT); 496369246Scy} 497369246Scy 498369246Scystatic int 499369246Scyprimary_connect(struct hast_resource *res, struct proto_conn **connp) 500369246Scy{ 501369246Scy struct proto_conn *conn; 502369246Scy int16_t val; 503369246Scy 504369246Scy val = 1; 505145522Sdarrenr if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 506145522Sdarrenr primary_exit(EX_TEMPFAIL, 507255754Scy "Unable to send connection request to parent"); 508255754Scy } 509172776Sdarrenr if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 510172776Sdarrenr primary_exit(EX_TEMPFAIL, 511172776Sdarrenr "Unable to receive reply to connection request from parent"); 512145522Sdarrenr } 513145522Sdarrenr if (val != 0) { 514145522Sdarrenr errno = val; 515145522Sdarrenr pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 516145522Sdarrenr res->hr_remoteaddr); 517145522Sdarrenr return (-1); 518255332Scy } 519255332Scy if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 520255332Scy primary_exit(EX_TEMPFAIL, 521260715Sglebius "Unable to receive connection from parent"); 522255332Scy } 523255332Scy if (proto_connect_wait(conn, res->hr_timeout) < 0) { 524255332Scy pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 525255332Scy res->hr_remoteaddr); 526255332Scy proto_close(conn); 527255332Scy return (-1); 528255332Scy } 529260715Sglebius /* Error in setting timeout is not critical, but why should it fail? */ 530255332Scy if (proto_timeout(conn, res->hr_timeout) < 0) 531255332Scy pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 532255332Scy 533255332Scy *connp = conn; 534255332Scy 535255332Scy return (0); 536255332Scy} 537255332Scy 538255332Scystatic int 539255332Scyinit_remote(struct hast_resource *res, struct proto_conn **inp, 540255332Scy struct proto_conn **outp) 541255332Scy{ 542255332Scy struct proto_conn *in, *out; 543255332Scy struct nv *nvout, *nvin; 544255332Scy const unsigned char *token; 545255332Scy unsigned char *map; 546255332Scy const char *errmsg; 547255332Scy int32_t extentsize; 548255332Scy int64_t datasize; 549255332Scy uint32_t mapsize; 550255332Scy size_t size; 551255332Scy int error; 552255332Scy 553255332Scy PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 554255332Scy PJDLOG_ASSERT(real_remote(res)); 555255332Scy 556255332Scy in = out = NULL; 557145522Sdarrenr errmsg = NULL; 558145522Sdarrenr 559145522Sdarrenr if (primary_connect(res, &out) == -1) 560145522Sdarrenr return (ECONNREFUSED); 561145522Sdarrenr 562145522Sdarrenr error = ECONNABORTED; 563145522Sdarrenr 564145522Sdarrenr /* 565145522Sdarrenr * First handshake step. 566145522Sdarrenr * Setup outgoing connection with remote node. 567344837Scy */ 568145522Sdarrenr nvout = nv_alloc(); 569145522Sdarrenr nv_add_string(nvout, res->hr_name, "resource"); 570145522Sdarrenr if (nv_error(nvout) != 0) { 571145522Sdarrenr pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 572145522Sdarrenr "Unable to allocate header for connection with %s", 573145522Sdarrenr res->hr_remoteaddr); 574183397Sed nv_free(nvout); 575145522Sdarrenr goto close; 576145522Sdarrenr } 577145522Sdarrenr if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 578145522Sdarrenr pjdlog_errno(LOG_WARNING, 579172776Sdarrenr "Unable to send handshake header to %s", 580255332Scy res->hr_remoteaddr); 581145522Sdarrenr nv_free(nvout); 582145522Sdarrenr goto close; 583145522Sdarrenr } 584369245Sgit2svn nv_free(nvout); 585172776Sdarrenr if (hast_proto_recv_hdr(out, &nvin) < 0) { 586255332Scy pjdlog_errno(LOG_WARNING, 587145522Sdarrenr "Unable to receive handshake header from %s", 588145522Sdarrenr res->hr_remoteaddr); 589145522Sdarrenr goto close; 590172776Sdarrenr } 591172776Sdarrenr errmsg = nv_get_string(nvin, "errmsg"); 592172776Sdarrenr if (errmsg != NULL) { 593172776Sdarrenr pjdlog_warning("%s", errmsg); 594172776Sdarrenr if (nv_exists(nvin, "wait")) 595145522Sdarrenr error = EBUSY; 596145522Sdarrenr nv_free(nvin); 597170268Sdarrenr goto close; 598170268Sdarrenr } 599170268Sdarrenr token = nv_get_uint8_array(nvin, &size, "token"); 600170268Sdarrenr if (token == NULL) { 601170268Sdarrenr pjdlog_warning("Handshake header from %s has no 'token' field.", 60253642Sguido res->hr_remoteaddr); 603145522Sdarrenr nv_free(nvin); 604145522Sdarrenr goto close; 605145522Sdarrenr } 606145522Sdarrenr if (size != sizeof(res->hr_token)) { 607145522Sdarrenr pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 608145522Sdarrenr res->hr_remoteaddr, size, sizeof(res->hr_token)); 609145522Sdarrenr nv_free(nvin); 610145522Sdarrenr goto close; 611145522Sdarrenr } 612145522Sdarrenr bcopy(token, res->hr_token, sizeof(res->hr_token)); 613145522Sdarrenr nv_free(nvin); 614145522Sdarrenr 615145522Sdarrenr /* 616145522Sdarrenr * Second handshake step. 617145522Sdarrenr * Setup incoming connection with remote node. 618145522Sdarrenr */ 619145522Sdarrenr if (primary_connect(res, &in) == -1) 620145522Sdarrenr goto close; 621145522Sdarrenr 622145522Sdarrenr nvout = nv_alloc(); 623145522Sdarrenr nv_add_string(nvout, res->hr_name, "resource"); 624145522Sdarrenr nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 625145522Sdarrenr "token"); 626145522Sdarrenr if (res->hr_resuid == 0) { 627145522Sdarrenr /* 628145522Sdarrenr * The resuid field was not yet initialized. 629314251Scy * Because we do synchronization inside init_resuid(), it is 630145522Sdarrenr * possible that someone already initialized it, the function 631145522Sdarrenr * will return false then, but if we successfully initialized 632145522Sdarrenr * it, we will get true. True means that there were no writes 633145522Sdarrenr * to this resource yet and we want to inform secondary that 634145522Sdarrenr * synchronization is not needed by sending "virgin" argument. 635145522Sdarrenr */ 636145522Sdarrenr if (init_resuid(res)) 637145522Sdarrenr nv_add_int8(nvout, 1, "virgin"); 638145522Sdarrenr } 639145522Sdarrenr nv_add_uint64(nvout, res->hr_resuid, "resuid"); 640145522Sdarrenr nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 641145522Sdarrenr nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 642145522Sdarrenr if (nv_error(nvout) != 0) { 643153876Sguido pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 644153876Sguido "Unable to allocate header for connection with %s", 645153876Sguido res->hr_remoteaddr); 646153876Sguido nv_free(nvout); 647153876Sguido goto close; 648153876Sguido } 649145522Sdarrenr if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 650145522Sdarrenr pjdlog_errno(LOG_WARNING, 651145522Sdarrenr "Unable to send handshake header to %s", 652145522Sdarrenr res->hr_remoteaddr); 653145522Sdarrenr nv_free(nvout); 654145522Sdarrenr goto close; 655145522Sdarrenr } 656145522Sdarrenr nv_free(nvout); 657145522Sdarrenr if (hast_proto_recv_hdr(out, &nvin) < 0) { 658145522Sdarrenr pjdlog_errno(LOG_WARNING, 659145522Sdarrenr "Unable to receive handshake header from %s", 660145522Sdarrenr res->hr_remoteaddr); 661145522Sdarrenr goto close; 662145522Sdarrenr } 663145522Sdarrenr errmsg = nv_get_string(nvin, "errmsg"); 664145522Sdarrenr if (errmsg != NULL) { 665145522Sdarrenr pjdlog_warning("%s", errmsg); 666145522Sdarrenr nv_free(nvin); 667145522Sdarrenr goto close; 668145522Sdarrenr } 669145522Sdarrenr datasize = nv_get_int64(nvin, "datasize"); 670145522Sdarrenr if (datasize != res->hr_datasize) { 671145522Sdarrenr pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 672145522Sdarrenr (intmax_t)res->hr_datasize, (intmax_t)datasize); 673145522Sdarrenr nv_free(nvin); 674145522Sdarrenr goto close; 675145522Sdarrenr } 676255332Scy extentsize = nv_get_int32(nvin, "extentsize"); 677145522Sdarrenr if (extentsize != res->hr_extentsize) { 678255332Scy pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 679255332Scy (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 680255332Scy nv_free(nvin); 681145522Sdarrenr goto close; 682145522Sdarrenr } 683145522Sdarrenr res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 684145522Sdarrenr res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 685145522Sdarrenr res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 686145522Sdarrenr if (nv_exists(nvin, "virgin")) { 687145522Sdarrenr /* 688145522Sdarrenr * Secondary was reinitialized, bump localcnt if it is 0 as 689145522Sdarrenr * only we have the data. 690145522Sdarrenr */ 691145522Sdarrenr PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 692145522Sdarrenr PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 693145522Sdarrenr 694145522Sdarrenr if (res->hr_primary_localcnt == 0) { 695145522Sdarrenr PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 696145522Sdarrenr 697170268Sdarrenr mtx_lock(&metadata_lock); 698145522Sdarrenr res->hr_primary_localcnt++; 699145522Sdarrenr pjdlog_debug(1, "Increasing localcnt to %ju.", 700145522Sdarrenr (uintmax_t)res->hr_primary_localcnt); 701145522Sdarrenr (void)metadata_write(res); 702145522Sdarrenr mtx_unlock(&metadata_lock); 703145522Sdarrenr } 704145522Sdarrenr } 705145522Sdarrenr map = NULL; 706145522Sdarrenr mapsize = nv_get_uint32(nvin, "mapsize"); 707145522Sdarrenr if (mapsize > 0) { 708145522Sdarrenr map = malloc(mapsize); 709145522Sdarrenr if (map == NULL) { 710145522Sdarrenr pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 711145522Sdarrenr (uintmax_t)mapsize); 712145522Sdarrenr nv_free(nvin); 713145522Sdarrenr goto close; 714145522Sdarrenr } 715145522Sdarrenr /* 716358666Scy * Remote node have some dirty extents on its own, lets 717145522Sdarrenr * download its activemap. 718255332Scy */ 719145522Sdarrenr if (hast_proto_recv_data(res, out, nvin, map, 720145522Sdarrenr mapsize) < 0) { 72153642Sguido pjdlog_errno(LOG_ERR, 72253642Sguido "Unable to receive remote activemap"); 72353642Sguido nv_free(nvin); 72453642Sguido free(map); 72553642Sguido goto close; 72653642Sguido } 72753642Sguido /* 72853642Sguido * Merge local and remote bitmaps. 72953642Sguido */ 73053642Sguido activemap_merge(res->hr_amp, map, mapsize); 73153642Sguido free(map); 73253642Sguido /* 73353642Sguido * Now that we merged bitmaps from both nodes, flush it to the 73453642Sguido * disk before we start to synchronize. 73553642Sguido */ 73653642Sguido (void)hast_activemap_flush(res); 73753642Sguido } 73853642Sguido nv_free(nvin); 73953642Sguido#ifdef notyet 74053642Sguido /* Setup directions. */ 74153642Sguido if (proto_send(out, NULL, 0) == -1) 74253642Sguido pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 74353642Sguido if (proto_recv(in, NULL, 0) == -1) 74453642Sguido pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 74553642Sguido#endif 74653642Sguido pjdlog_info("Connected to %s.", res->hr_remoteaddr); 74753642Sguido if (inp != NULL && outp != NULL) { 74853642Sguido *inp = in; 74953642Sguido *outp = out; 75053642Sguido } else { 75153642Sguido res->hr_remotein = in; 75253642Sguido res->hr_remoteout = out; 75353642Sguido } 754145522Sdarrenr event_send(res, EVENT_CONNECT); 755255332Scy return (0); 756145522Sdarrenrclose: 75753642Sguido if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 758145522Sdarrenr event_send(res, EVENT_SPLITBRAIN); 75953642Sguido proto_close(out); 760145522Sdarrenr if (in != NULL) 76153642Sguido proto_close(in); 762145522Sdarrenr return (error); 76353642Sguido} 764145522Sdarrenr 765145522Sdarrenrstatic void 766145522Sdarrenrsync_start(void) 76753642Sguido{ 768145522Sdarrenr 769145522Sdarrenr mtx_lock(&sync_lock); 770145522Sdarrenr sync_inprogress = true; 771145522Sdarrenr mtx_unlock(&sync_lock); 772145522Sdarrenr cv_signal(&sync_cond); 77353642Sguido} 774145522Sdarrenr 77553642Sguidostatic void 776145522Sdarrenrsync_stop(void) 777145522Sdarrenr{ 77853642Sguido 77953642Sguido mtx_lock(&sync_lock); 78053642Sguido if (sync_inprogress) 781145522Sdarrenr sync_inprogress = false; 782145522Sdarrenr mtx_unlock(&sync_lock); 783145522Sdarrenr} 78453642Sguido 785145522Sdarrenrstatic void 78653642Sguidoinit_ggate(struct hast_resource *res) 787145522Sdarrenr{ 78853642Sguido struct g_gate_ctl_create ggiocreate; 789145522Sdarrenr struct g_gate_ctl_cancel ggiocancel; 79053642Sguido 791145522Sdarrenr /* 792145522Sdarrenr * We communicate with ggate via /dev/ggctl. Open it. 793145522Sdarrenr */ 794145522Sdarrenr res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 795145522Sdarrenr if (res->hr_ggatefd < 0) 796145522Sdarrenr primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 797145522Sdarrenr /* 798145522Sdarrenr * Create provider before trying to connect, as connection failure 799145522Sdarrenr * is not critical, but may take some time. 800145522Sdarrenr */ 801145522Sdarrenr bzero(&ggiocreate, sizeof(ggiocreate)); 80253642Sguido ggiocreate.gctl_version = G_GATE_VERSION; 803255332Scy ggiocreate.gctl_mediasize = res->hr_datasize; 804255332Scy ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 80553642Sguido ggiocreate.gctl_flags = 0; 806369246Scy ggiocreate.gctl_maxcount = 0; 807369246Scy ggiocreate.gctl_timeout = 0; 808349653Scy ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 80953642Sguido snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 81053642Sguido res->hr_provname); 81153642Sguido if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 81253642Sguido pjdlog_info("Device hast/%s created.", res->hr_provname); 81353642Sguido res->hr_ggateunit = ggiocreate.gctl_unit; 81453642Sguido return; 81553642Sguido } 81653642Sguido if (errno != EEXIST) { 81753642Sguido primary_exit(EX_OSERR, "Unable to create hast/%s device", 81853642Sguido res->hr_provname); 81953642Sguido } 82053642Sguido pjdlog_debug(1, 82153642Sguido "Device hast/%s already exists, we will try to take it over.", 82253642Sguido res->hr_provname); 82353642Sguido /* 82453642Sguido * If we received EEXIST, we assume that the process who created the 82553642Sguido * provider died and didn't clean up. In that case we will start from 82653642Sguido * where he left of. 82753642Sguido */ 82853642Sguido bzero(&ggiocancel, sizeof(ggiocancel)); 82953642Sguido ggiocancel.gctl_version = G_GATE_VERSION; 83053642Sguido ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 83153642Sguido snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 83253642Sguido res->hr_provname); 83353642Sguido if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 83453642Sguido pjdlog_info("Device hast/%s recovered.", res->hr_provname); 83553642Sguido res->hr_ggateunit = ggiocancel.gctl_unit; 83653642Sguido return; 83753642Sguido } 83853642Sguido primary_exit(EX_OSERR, "Unable to take over hast/%s device", 83953642Sguido res->hr_provname); 84053642Sguido} 84153642Sguido 84253642Sguidovoid 84353642Sguidohastd_primary(struct hast_resource *res) 84453642Sguido{ 84553642Sguido pthread_t td; 84653642Sguido pid_t pid; 84753642Sguido int error, mode, debuglevel; 848145522Sdarrenr 849145522Sdarrenr /* 850145522Sdarrenr * Create communication channel for sending control commands from 85153642Sguido * parent to child. 85253642Sguido */ 85353642Sguido if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 85453642Sguido /* TODO: There's no need for this to be fatal error. */ 85553642Sguido KEEP_ERRNO((void)pidfile_remove(pfh)); 85653642Sguido pjdlog_exit(EX_OSERR, 85753642Sguido "Unable to create control sockets between parent and child"); 85853642Sguido } 85953642Sguido /* 86053642Sguido * Create communication channel for sending events from child to parent. 86153642Sguido */ 86253642Sguido if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 86353642Sguido /* TODO: There's no need for this to be fatal error. */ 86453642Sguido KEEP_ERRNO((void)pidfile_remove(pfh)); 86553642Sguido pjdlog_exit(EX_OSERR, 86653642Sguido "Unable to create event sockets between child and parent"); 86753642Sguido } 86853642Sguido /* 86953642Sguido * Create communication channel for sending connection requests from 87053642Sguido * child to parent. 87153642Sguido */ 87253642Sguido if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 87353642Sguido /* TODO: There's no need for this to be fatal error. */ 87453642Sguido KEEP_ERRNO((void)pidfile_remove(pfh)); 87553642Sguido pjdlog_exit(EX_OSERR, 87653642Sguido "Unable to create connection sockets between child and parent"); 87753642Sguido } 87853642Sguido 87953642Sguido pid = fork(); 88053642Sguido if (pid < 0) { 88153642Sguido /* TODO: There's no need for this to be fatal error. */ 88253642Sguido KEEP_ERRNO((void)pidfile_remove(pfh)); 88353642Sguido pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 88453642Sguido } 88553642Sguido 88653642Sguido if (pid > 0) { 88753642Sguido /* This is parent. */ 88853642Sguido /* Declare that we are receiver. */ 88953642Sguido proto_recv(res->hr_event, NULL, 0); 89053642Sguido proto_recv(res->hr_conn, NULL, 0); 89153642Sguido /* Declare that we are sender. */ 89253642Sguido proto_send(res->hr_ctrl, NULL, 0); 89353642Sguido res->hr_workerpid = pid; 89453642Sguido return; 89553642Sguido } 89653642Sguido 89753642Sguido gres = res; 89853642Sguido mode = pjdlog_mode_get(); 899145522Sdarrenr debuglevel = pjdlog_debug_get(); 900145522Sdarrenr 901145522Sdarrenr /* Declare that we are sender. */ 902255332Scy proto_send(res->hr_event, NULL, 0); 903255332Scy proto_send(res->hr_conn, NULL, 0); 904255332Scy /* Declare that we are receiver. */ 905145522Sdarrenr proto_recv(res->hr_ctrl, NULL, 0); 906255332Scy descriptors_cleanup(res); 907145522Sdarrenr 908145522Sdarrenr descriptors_assert(res, mode); 909145522Sdarrenr 910145522Sdarrenr pjdlog_init(mode); 911145522Sdarrenr pjdlog_debug_set(debuglevel); 912145522Sdarrenr pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 913145522Sdarrenr setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 914145522Sdarrenr 915145522Sdarrenr init_local(res); 916145522Sdarrenr init_ggate(res); 91792685Sdarrenr init_environment(res); 91892685Sdarrenr 91992685Sdarrenr if (drop_privs(res) != 0) { 92092685Sdarrenr cleanup(res); 92192685Sdarrenr exit(EX_CONFIG); 92292685Sdarrenr } 923145522Sdarrenr pjdlog_info("Privileges successfully dropped."); 924145522Sdarrenr 925145522Sdarrenr /* 92692685Sdarrenr * Create the guard thread first, so we can handle signals from the 92792685Sdarrenr * very begining. 92892685Sdarrenr */ 929145522Sdarrenr error = pthread_create(&td, NULL, guard_thread, res); 930145522Sdarrenr PJDLOG_ASSERT(error == 0); 93153642Sguido /* 932145522Sdarrenr * Create the control thread before sending any event to the parent, 933145522Sdarrenr * as we can deadlock when parent sends control request to worker, 93453642Sguido * but worker has no control thread started yet, so parent waits. 935153876Sguido * In the meantime worker sends an event to the parent, but parent 936153876Sguido * is unable to handle the event, because it waits for control 93753642Sguido * request response. 938153876Sguido */ 93953642Sguido error = pthread_create(&td, NULL, ctrl_thread, res); 94053642Sguido PJDLOG_ASSERT(error == 0); 94153642Sguido if (real_remote(res)) { 94253642Sguido error = init_remote(res, NULL, NULL); 94353642Sguido if (error == 0) { 94453642Sguido sync_start(); 94592685Sdarrenr } else if (error == EBUSY) { 94692685Sdarrenr time_t start = time(NULL); 94792685Sdarrenr 94892685Sdarrenr pjdlog_warning("Waiting for remote node to become %s for %ds.", 94992685Sdarrenr role2str(HAST_ROLE_SECONDARY), 95092685Sdarrenr res->hr_timeout); 95192685Sdarrenr for (;;) { 95292685Sdarrenr sleep(1); 95392685Sdarrenr error = init_remote(res, NULL, NULL); 95492685Sdarrenr if (error != EBUSY) 95592685Sdarrenr break; 95692685Sdarrenr if (time(NULL) > start + res->hr_timeout) 95792685Sdarrenr break; 95892685Sdarrenr } 95992685Sdarrenr if (error == EBUSY) { 96092685Sdarrenr pjdlog_warning("Remote node is still %s, starting anyway.", 96192685Sdarrenr role2str(HAST_ROLE_PRIMARY)); 96292685Sdarrenr } 96392685Sdarrenr } 96492685Sdarrenr } 96592685Sdarrenr error = pthread_create(&td, NULL, ggate_recv_thread, res); 96692685Sdarrenr PJDLOG_ASSERT(error == 0); 96792685Sdarrenr error = pthread_create(&td, NULL, local_send_thread, res); 96892685Sdarrenr PJDLOG_ASSERT(error == 0); 96992685Sdarrenr error = pthread_create(&td, NULL, remote_send_thread, res); 97092685Sdarrenr PJDLOG_ASSERT(error == 0); 97192685Sdarrenr error = pthread_create(&td, NULL, remote_recv_thread, res); 97292685Sdarrenr PJDLOG_ASSERT(error == 0); 97392685Sdarrenr error = pthread_create(&td, NULL, ggate_send_thread, res); 97492685Sdarrenr PJDLOG_ASSERT(error == 0); 97592685Sdarrenr fullystarted = true; 97692685Sdarrenr (void)sync_thread(res); 97792685Sdarrenr} 97892685Sdarrenr 97992685Sdarrenrstatic void 98092685Sdarrenrreqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 98192685Sdarrenr{ 98292685Sdarrenr char msg[1024]; 98392685Sdarrenr va_list ap; 98492685Sdarrenr int len; 98592685Sdarrenr 98692685Sdarrenr va_start(ap, fmt); 98792685Sdarrenr len = vsnprintf(msg, sizeof(msg), fmt, ap); 98892685Sdarrenr va_end(ap); 98992685Sdarrenr if ((size_t)len < sizeof(msg)) { 99092685Sdarrenr switch (ggio->gctl_cmd) { 99192685Sdarrenr case BIO_READ: 99292685Sdarrenr (void)snprintf(msg + len, sizeof(msg) - len, 99392685Sdarrenr "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 99492685Sdarrenr (uintmax_t)ggio->gctl_length); 99592685Sdarrenr break; 99692685Sdarrenr case BIO_DELETE: 99792685Sdarrenr (void)snprintf(msg + len, sizeof(msg) - len, 99892685Sdarrenr "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 99992685Sdarrenr (uintmax_t)ggio->gctl_length); 100092685Sdarrenr break; 100192685Sdarrenr case BIO_FLUSH: 100292685Sdarrenr (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 100392685Sdarrenr break; 100492685Sdarrenr case BIO_WRITE: 100592685Sdarrenr (void)snprintf(msg + len, sizeof(msg) - len, 100692685Sdarrenr "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 100792685Sdarrenr (uintmax_t)ggio->gctl_length); 100892685Sdarrenr break; 100992685Sdarrenr default: 101092685Sdarrenr (void)snprintf(msg + len, sizeof(msg) - len, 101192685Sdarrenr "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 101292685Sdarrenr break; 101392685Sdarrenr } 101492685Sdarrenr } 101592685Sdarrenr pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 101692685Sdarrenr} 101792685Sdarrenr 101892685Sdarrenrstatic void 101992685Sdarrenrremote_close(struct hast_resource *res, int ncomp) 102092685Sdarrenr{ 102192685Sdarrenr 102292685Sdarrenr rw_wlock(&hio_remote_lock[ncomp]); 102392685Sdarrenr /* 102492685Sdarrenr * A race is possible between dropping rlock and acquiring wlock - 102592685Sdarrenr * another thread can close connection in-between. 102692685Sdarrenr */ 102792685Sdarrenr if (!ISCONNECTED(res, ncomp)) { 102892685Sdarrenr PJDLOG_ASSERT(res->hr_remotein == NULL); 102992685Sdarrenr PJDLOG_ASSERT(res->hr_remoteout == NULL); 103092685Sdarrenr rw_unlock(&hio_remote_lock[ncomp]); 103192685Sdarrenr return; 103292685Sdarrenr } 103392685Sdarrenr 103492685Sdarrenr PJDLOG_ASSERT(res->hr_remotein != NULL); 103592685Sdarrenr PJDLOG_ASSERT(res->hr_remoteout != NULL); 103692685Sdarrenr 103792685Sdarrenr pjdlog_debug(2, "Closing incoming connection to %s.", 103892685Sdarrenr res->hr_remoteaddr); 103992685Sdarrenr proto_close(res->hr_remotein); 104092685Sdarrenr res->hr_remotein = NULL; 104192685Sdarrenr pjdlog_debug(2, "Closing outgoing connection to %s.", 104292685Sdarrenr res->hr_remoteaddr); 104392685Sdarrenr proto_close(res->hr_remoteout); 104492685Sdarrenr res->hr_remoteout = NULL; 104592685Sdarrenr 104692685Sdarrenr rw_unlock(&hio_remote_lock[ncomp]); 104792685Sdarrenr 104892685Sdarrenr pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 104992685Sdarrenr 105092685Sdarrenr /* 105192685Sdarrenr * Stop synchronization if in-progress. 105292685Sdarrenr */ 105392685Sdarrenr sync_stop(); 105492685Sdarrenr 105592685Sdarrenr event_send(res, EVENT_DISCONNECT); 105692685Sdarrenr} 105792685Sdarrenr 105892685Sdarrenr/* 105992685Sdarrenr * Thread receives ggate I/O requests from the kernel and passes them to 106092685Sdarrenr * appropriate threads: 106192685Sdarrenr * WRITE - always goes to both local_send and remote_send threads 106292685Sdarrenr * READ (when the block is up-to-date on local component) - 106392685Sdarrenr * only local_send thread 106492685Sdarrenr * READ (when the block isn't up-to-date on local component) - 106592685Sdarrenr * only remote_send thread 106692685Sdarrenr * DELETE - always goes to both local_send and remote_send threads 106792685Sdarrenr * FLUSH - always goes to both local_send and remote_send threads 106892685Sdarrenr */ 106992685Sdarrenrstatic void * 107092685Sdarrenrggate_recv_thread(void *arg) 107192685Sdarrenr{ 107292685Sdarrenr struct hast_resource *res = arg; 107392685Sdarrenr struct g_gate_ctl_io *ggio; 107492685Sdarrenr struct hio *hio; 107592685Sdarrenr unsigned int ii, ncomp, ncomps; 107692685Sdarrenr int error; 107792685Sdarrenr 107892685Sdarrenr ncomps = HAST_NCOMPONENTS; 107992685Sdarrenr 108092685Sdarrenr for (;;) { 108192685Sdarrenr pjdlog_debug(2, "ggate_recv: Taking free request."); 108292685Sdarrenr QUEUE_TAKE2(hio, free); 108392685Sdarrenr pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 108492685Sdarrenr ggio = &hio->hio_ggio; 108592685Sdarrenr ggio->gctl_unit = res->hr_ggateunit; 108692685Sdarrenr ggio->gctl_length = MAXPHYS; 108792685Sdarrenr ggio->gctl_error = 0; 108892685Sdarrenr pjdlog_debug(2, 108992685Sdarrenr "ggate_recv: (%p) Waiting for request from the kernel.", 109092685Sdarrenr hio); 109192685Sdarrenr if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 109292685Sdarrenr if (sigexit_received) 109392685Sdarrenr pthread_exit(NULL); 109492685Sdarrenr primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 109592685Sdarrenr } 109692685Sdarrenr error = ggio->gctl_error; 109792685Sdarrenr switch (error) { 109892685Sdarrenr case 0: 109992685Sdarrenr break; 110092685Sdarrenr case ECANCELED: 110192685Sdarrenr /* Exit gracefully. */ 110292685Sdarrenr if (!sigexit_received) { 110392685Sdarrenr pjdlog_debug(2, 110492685Sdarrenr "ggate_recv: (%p) Received cancel from the kernel.", 1105255332Scy hio); 1106255332Scy pjdlog_info("Received cancel from the kernel, exiting."); 1107255332Scy } 1108255332Scy pthread_exit(NULL); 1109255332Scy case ENOMEM: 1110255332Scy /* 1111255332Scy * Buffer too small? Impossible, we allocate MAXPHYS 1112255332Scy * bytes - request can't be bigger than that. 1113255332Scy */ 1114255332Scy /* FALLTHROUGH */ 1115255332Scy case ENXIO: 1116255332Scy default: 1117255332Scy primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1118255332Scy strerror(error)); 1119255332Scy } 1120255332Scy for (ii = 0; ii < ncomps; ii++) 1121255332Scy hio->hio_errors[ii] = EINVAL; 1122255332Scy reqlog(LOG_DEBUG, 2, ggio, 1123255332Scy "ggate_recv: (%p) Request received from the kernel: ", 1124255332Scy hio); 1125255332Scy /* 1126255332Scy * Inform all components about new write request. 1127255332Scy * For read request prefer local component unless the given 1128255332Scy * range is out-of-date, then use remote component. 1129255332Scy */ 1130255332Scy switch (ggio->gctl_cmd) { 1131255332Scy case BIO_READ: 1132255332Scy res->hr_stat_read++; 1133255332Scy pjdlog_debug(2, 1134255332Scy "ggate_recv: (%p) Moving request to the send queue.", 1135255332Scy hio); 1136255332Scy refcount_init(&hio->hio_countdown, 1); 113772006Sdarrenr mtx_lock(&metadata_lock); 113872006Sdarrenr if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 113972006Sdarrenr res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 114072006Sdarrenr /* 114172006Sdarrenr * This range is up-to-date on local component, 114272006Sdarrenr * so handle request locally. 114372006Sdarrenr */ 114472006Sdarrenr /* Local component is 0 for now. */ 114572006Sdarrenr ncomp = 0; 114672006Sdarrenr } else /* if (res->hr_syncsrc == 114772006Sdarrenr HAST_SYNCSRC_SECONDARY) */ { 1148145522Sdarrenr PJDLOG_ASSERT(res->hr_syncsrc == 1149145522Sdarrenr HAST_SYNCSRC_SECONDARY); 1150145522Sdarrenr /* 1151170268Sdarrenr * This range is out-of-date on local component, 1152170268Sdarrenr * so send request to the remote node. 1153170268Sdarrenr */ 1154170268Sdarrenr /* Remote component is 1 for now. */ 1155145522Sdarrenr ncomp = 1; 1156170268Sdarrenr } 1157170268Sdarrenr mtx_unlock(&metadata_lock); 1158145522Sdarrenr QUEUE_INSERT1(hio, send, ncomp); 1159170268Sdarrenr break; 1160170268Sdarrenr case BIO_WRITE: 1161170268Sdarrenr res->hr_stat_write++; 1162145522Sdarrenr if (res->hr_resuid == 0) { 1163170268Sdarrenr /* 1164170268Sdarrenr * This is first write, initialize localcnt and 1165170268Sdarrenr * resuid. 1166145522Sdarrenr */ 116772006Sdarrenr res->hr_primary_localcnt = 1; 1168145522Sdarrenr (void)init_resuid(res); 1169145522Sdarrenr } 1170145522Sdarrenr for (;;) { 1171145522Sdarrenr mtx_lock(&range_lock); 1172145522Sdarrenr if (rangelock_islocked(range_sync, 1173145522Sdarrenr ggio->gctl_offset, ggio->gctl_length)) { 1174145522Sdarrenr pjdlog_debug(2, 1175145522Sdarrenr "regular: Range offset=%jd length=%zu locked.", 1176145522Sdarrenr (intmax_t)ggio->gctl_offset, 1177145522Sdarrenr (size_t)ggio->gctl_length); 1178145522Sdarrenr range_regular_wait = true; 1179145522Sdarrenr cv_wait(&range_regular_cond, &range_lock); 1180145522Sdarrenr range_regular_wait = false; 1181145522Sdarrenr mtx_unlock(&range_lock); 1182145522Sdarrenr continue; 1183145522Sdarrenr } 1184145522Sdarrenr if (rangelock_add(range_regular, 1185145522Sdarrenr ggio->gctl_offset, ggio->gctl_length) < 0) { 1186145522Sdarrenr mtx_unlock(&range_lock); 1187145522Sdarrenr pjdlog_debug(2, 1188145522Sdarrenr "regular: Range offset=%jd length=%zu is already locked, waiting.", 1189145522Sdarrenr (intmax_t)ggio->gctl_offset, 1190145522Sdarrenr (size_t)ggio->gctl_length); 1191145522Sdarrenr sleep(1); 1192145522Sdarrenr continue; 1193145522Sdarrenr } 1194145522Sdarrenr mtx_unlock(&range_lock); 1195145522Sdarrenr break; 1196145522Sdarrenr } 1197145522Sdarrenr mtx_lock(&res->hr_amp_lock); 1198145522Sdarrenr if (activemap_write_start(res->hr_amp, 1199145522Sdarrenr ggio->gctl_offset, ggio->gctl_length)) { 1200153876Sguido res->hr_stat_activemap_update++; 1201145522Sdarrenr (void)hast_activemap_flush(res); 1202145522Sdarrenr } 1203145522Sdarrenr mtx_unlock(&res->hr_amp_lock); 1204145522Sdarrenr /* FALLTHROUGH */ 1205145522Sdarrenr case BIO_DELETE: 1206145522Sdarrenr case BIO_FLUSH: 1207145522Sdarrenr switch (ggio->gctl_cmd) { 1208145522Sdarrenr case BIO_DELETE: 1209145522Sdarrenr res->hr_stat_delete++; 1210145522Sdarrenr break; 1211145522Sdarrenr case BIO_FLUSH: 1212145522Sdarrenr res->hr_stat_flush++; 1213255332Scy break; 1214255332Scy } 1215255332Scy pjdlog_debug(2, 1216255332Scy "ggate_recv: (%p) Moving request to the send queues.", 1217145522Sdarrenr hio); 1218145522Sdarrenr refcount_init(&hio->hio_countdown, ncomps); 1219145522Sdarrenr for (ii = 0; ii < ncomps; ii++) 1220145522Sdarrenr QUEUE_INSERT1(hio, send, ii); 1221145522Sdarrenr break; 1222145522Sdarrenr } 1223255332Scy } 1224255332Scy /* NOTREACHED */ 1225255332Scy return (NULL); 1226255332Scy} 1227255332Scy 1228255332Scy/* 1229255332Scy * Thread reads from or writes to local component. 1230255332Scy * If local read fails, it redirects it to remote_send thread. 1231255332Scy */ 1232255332Scystatic void * 1233255332Scylocal_send_thread(void *arg) 1234255332Scy{ 1235255332Scy struct hast_resource *res = arg; 1236255332Scy struct g_gate_ctl_io *ggio; 1237255332Scy struct hio *hio; 1238255332Scy unsigned int ncomp, rncomp; 1239255332Scy ssize_t ret; 1240255332Scy 1241255332Scy /* Local component is 0 for now. */ 1242255332Scy ncomp = 0; 1243255332Scy /* Remote component is 1 for now. */ 1244255332Scy rncomp = 1; 1245255332Scy 1246255332Scy for (;;) { 1247255332Scy pjdlog_debug(2, "local_send: Taking request."); 1248255332Scy QUEUE_TAKE1(hio, send, ncomp, 0); 1249255332Scy pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1250255332Scy ggio = &hio->hio_ggio; 1251255332Scy switch (ggio->gctl_cmd) { 1252255332Scy case BIO_READ: 1253255332Scy ret = pread(res->hr_localfd, ggio->gctl_data, 1254255332Scy ggio->gctl_length, 125553642Sguido ggio->gctl_offset + res->hr_localoff); 1256 if (ret == ggio->gctl_length) 1257 hio->hio_errors[ncomp] = 0; 1258 else if (!ISSYNCREQ(hio)) { 1259 /* 1260 * If READ failed, try to read from remote node. 1261 */ 1262 if (ret < 0) { 1263 reqlog(LOG_WARNING, 0, ggio, 1264 "Local request failed (%s), trying remote node. ", 1265 strerror(errno)); 1266 } else if (ret != ggio->gctl_length) { 1267 reqlog(LOG_WARNING, 0, ggio, 1268 "Local request failed (%zd != %jd), trying remote node. ", 1269 ret, (intmax_t)ggio->gctl_length); 1270 } 1271 QUEUE_INSERT1(hio, send, rncomp); 1272 continue; 1273 } 1274 break; 1275 case BIO_WRITE: 1276 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1277 ggio->gctl_length, 1278 ggio->gctl_offset + res->hr_localoff); 1279 if (ret < 0) { 1280 hio->hio_errors[ncomp] = errno; 1281 reqlog(LOG_WARNING, 0, ggio, 1282 "Local request failed (%s): ", 1283 strerror(errno)); 1284 } else if (ret != ggio->gctl_length) { 1285 hio->hio_errors[ncomp] = EIO; 1286 reqlog(LOG_WARNING, 0, ggio, 1287 "Local request failed (%zd != %jd): ", 1288 ret, (intmax_t)ggio->gctl_length); 1289 } else { 1290 hio->hio_errors[ncomp] = 0; 1291 } 1292 break; 1293 case BIO_DELETE: 1294 ret = g_delete(res->hr_localfd, 1295 ggio->gctl_offset + res->hr_localoff, 1296 ggio->gctl_length); 1297 if (ret < 0) { 1298 hio->hio_errors[ncomp] = errno; 1299 reqlog(LOG_WARNING, 0, ggio, 1300 "Local request failed (%s): ", 1301 strerror(errno)); 1302 } else { 1303 hio->hio_errors[ncomp] = 0; 1304 } 1305 break; 1306 case BIO_FLUSH: 1307 if (!res->hr_localflush) { 1308 ret = -1; 1309 errno = EOPNOTSUPP; 1310 break; 1311 } 1312 ret = g_flush(res->hr_localfd); 1313 if (ret < 0) { 1314 if (errno == EOPNOTSUPP) 1315 res->hr_localflush = false; 1316 hio->hio_errors[ncomp] = errno; 1317 reqlog(LOG_WARNING, 0, ggio, 1318 "Local request failed (%s): ", 1319 strerror(errno)); 1320 } else { 1321 hio->hio_errors[ncomp] = 0; 1322 } 1323 break; 1324 } 1325 if (refcount_release(&hio->hio_countdown)) { 1326 if (ISSYNCREQ(hio)) { 1327 mtx_lock(&sync_lock); 1328 SYNCREQDONE(hio); 1329 mtx_unlock(&sync_lock); 1330 cv_signal(&sync_cond); 1331 } else { 1332 pjdlog_debug(2, 1333 "local_send: (%p) Moving request to the done queue.", 1334 hio); 1335 QUEUE_INSERT2(hio, done); 1336 } 1337 } 1338 } 1339 /* NOTREACHED */ 1340 return (NULL); 1341} 1342 1343static void 1344keepalive_send(struct hast_resource *res, unsigned int ncomp) 1345{ 1346 struct nv *nv; 1347 1348 rw_rlock(&hio_remote_lock[ncomp]); 1349 1350 if (!ISCONNECTED(res, ncomp)) { 1351 rw_unlock(&hio_remote_lock[ncomp]); 1352 return; 1353 } 1354 1355 PJDLOG_ASSERT(res->hr_remotein != NULL); 1356 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1357 1358 nv = nv_alloc(); 1359 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1360 if (nv_error(nv) != 0) { 1361 rw_unlock(&hio_remote_lock[ncomp]); 1362 nv_free(nv); 1363 pjdlog_debug(1, 1364 "keepalive_send: Unable to prepare header to send."); 1365 return; 1366 } 1367 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1368 rw_unlock(&hio_remote_lock[ncomp]); 1369 pjdlog_common(LOG_DEBUG, 1, errno, 1370 "keepalive_send: Unable to send request"); 1371 nv_free(nv); 1372 remote_close(res, ncomp); 1373 return; 1374 } 1375 1376 rw_unlock(&hio_remote_lock[ncomp]); 1377 nv_free(nv); 1378 pjdlog_debug(2, "keepalive_send: Request sent."); 1379} 1380 1381/* 1382 * Thread sends request to secondary node. 1383 */ 1384static void * 1385remote_send_thread(void *arg) 1386{ 1387 struct hast_resource *res = arg; 1388 struct g_gate_ctl_io *ggio; 1389 time_t lastcheck, now; 1390 struct hio *hio; 1391 struct nv *nv; 1392 unsigned int ncomp; 1393 bool wakeup; 1394 uint64_t offset, length; 1395 uint8_t cmd; 1396 void *data; 1397 1398 /* Remote component is 1 for now. */ 1399 ncomp = 1; 1400 lastcheck = time(NULL); 1401 1402 for (;;) { 1403 pjdlog_debug(2, "remote_send: Taking request."); 1404 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1405 if (hio == NULL) { 1406 now = time(NULL); 1407 if (lastcheck + HAST_KEEPALIVE <= now) { 1408 keepalive_send(res, ncomp); 1409 lastcheck = now; 1410 } 1411 continue; 1412 } 1413 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1414 ggio = &hio->hio_ggio; 1415 switch (ggio->gctl_cmd) { 1416 case BIO_READ: 1417 cmd = HIO_READ; 1418 data = NULL; 1419 offset = ggio->gctl_offset; 1420 length = ggio->gctl_length; 1421 break; 1422 case BIO_WRITE: 1423 cmd = HIO_WRITE; 1424 data = ggio->gctl_data; 1425 offset = ggio->gctl_offset; 1426 length = ggio->gctl_length; 1427 break; 1428 case BIO_DELETE: 1429 cmd = HIO_DELETE; 1430 data = NULL; 1431 offset = ggio->gctl_offset; 1432 length = ggio->gctl_length; 1433 break; 1434 case BIO_FLUSH: 1435 cmd = HIO_FLUSH; 1436 data = NULL; 1437 offset = 0; 1438 length = 0; 1439 break; 1440 default: 1441 PJDLOG_ABORT("invalid condition"); 1442 } 1443 nv = nv_alloc(); 1444 nv_add_uint8(nv, cmd, "cmd"); 1445 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1446 nv_add_uint64(nv, offset, "offset"); 1447 nv_add_uint64(nv, length, "length"); 1448 if (nv_error(nv) != 0) { 1449 hio->hio_errors[ncomp] = nv_error(nv); 1450 pjdlog_debug(2, 1451 "remote_send: (%p) Unable to prepare header to send.", 1452 hio); 1453 reqlog(LOG_ERR, 0, ggio, 1454 "Unable to prepare header to send (%s): ", 1455 strerror(nv_error(nv))); 1456 /* Move failed request immediately to the done queue. */ 1457 goto done_queue; 1458 } 1459 pjdlog_debug(2, 1460 "remote_send: (%p) Moving request to the recv queue.", 1461 hio); 1462 /* 1463 * Protect connection from disappearing. 1464 */ 1465 rw_rlock(&hio_remote_lock[ncomp]); 1466 if (!ISCONNECTED(res, ncomp)) { 1467 rw_unlock(&hio_remote_lock[ncomp]); 1468 hio->hio_errors[ncomp] = ENOTCONN; 1469 goto done_queue; 1470 } 1471 /* 1472 * Move the request to recv queue before sending it, because 1473 * in different order we can get reply before we move request 1474 * to recv queue. 1475 */ 1476 mtx_lock(&hio_recv_list_lock[ncomp]); 1477 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1478 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1479 mtx_unlock(&hio_recv_list_lock[ncomp]); 1480 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1481 data != NULL ? length : 0) < 0) { 1482 hio->hio_errors[ncomp] = errno; 1483 rw_unlock(&hio_remote_lock[ncomp]); 1484 pjdlog_debug(2, 1485 "remote_send: (%p) Unable to send request.", hio); 1486 reqlog(LOG_ERR, 0, ggio, 1487 "Unable to send request (%s): ", 1488 strerror(hio->hio_errors[ncomp])); 1489 remote_close(res, ncomp); 1490 /* 1491 * Take request back from the receive queue and move 1492 * it immediately to the done queue. 1493 */ 1494 mtx_lock(&hio_recv_list_lock[ncomp]); 1495 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1496 mtx_unlock(&hio_recv_list_lock[ncomp]); 1497 goto done_queue; 1498 } 1499 rw_unlock(&hio_remote_lock[ncomp]); 1500 nv_free(nv); 1501 if (wakeup) 1502 cv_signal(&hio_recv_list_cond[ncomp]); 1503 continue; 1504done_queue: 1505 nv_free(nv); 1506 if (ISSYNCREQ(hio)) { 1507 if (!refcount_release(&hio->hio_countdown)) 1508 continue; 1509 mtx_lock(&sync_lock); 1510 SYNCREQDONE(hio); 1511 mtx_unlock(&sync_lock); 1512 cv_signal(&sync_cond); 1513 continue; 1514 } 1515 if (ggio->gctl_cmd == BIO_WRITE) { 1516 mtx_lock(&res->hr_amp_lock); 1517 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1518 ggio->gctl_length)) { 1519 (void)hast_activemap_flush(res); 1520 } 1521 mtx_unlock(&res->hr_amp_lock); 1522 } 1523 if (!refcount_release(&hio->hio_countdown)) 1524 continue; 1525 pjdlog_debug(2, 1526 "remote_send: (%p) Moving request to the done queue.", 1527 hio); 1528 QUEUE_INSERT2(hio, done); 1529 } 1530 /* NOTREACHED */ 1531 return (NULL); 1532} 1533 1534/* 1535 * Thread receives answer from secondary node and passes it to ggate_send 1536 * thread. 1537 */ 1538static void * 1539remote_recv_thread(void *arg) 1540{ 1541 struct hast_resource *res = arg; 1542 struct g_gate_ctl_io *ggio; 1543 struct hio *hio; 1544 struct nv *nv; 1545 unsigned int ncomp; 1546 uint64_t seq; 1547 int error; 1548 1549 /* Remote component is 1 for now. */ 1550 ncomp = 1; 1551 1552 for (;;) { 1553 /* Wait until there is anything to receive. */ 1554 mtx_lock(&hio_recv_list_lock[ncomp]); 1555 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1556 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1557 cv_wait(&hio_recv_list_cond[ncomp], 1558 &hio_recv_list_lock[ncomp]); 1559 } 1560 mtx_unlock(&hio_recv_list_lock[ncomp]); 1561 rw_rlock(&hio_remote_lock[ncomp]); 1562 if (!ISCONNECTED(res, ncomp)) { 1563 rw_unlock(&hio_remote_lock[ncomp]); 1564 /* 1565 * Connection is dead, so move all pending requests to 1566 * the done queue (one-by-one). 1567 */ 1568 mtx_lock(&hio_recv_list_lock[ncomp]); 1569 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1570 PJDLOG_ASSERT(hio != NULL); 1571 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1572 hio_next[ncomp]); 1573 mtx_unlock(&hio_recv_list_lock[ncomp]); 1574 goto done_queue; 1575 } 1576 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1577 pjdlog_errno(LOG_ERR, 1578 "Unable to receive reply header"); 1579 rw_unlock(&hio_remote_lock[ncomp]); 1580 remote_close(res, ncomp); 1581 continue; 1582 } 1583 rw_unlock(&hio_remote_lock[ncomp]); 1584 seq = nv_get_uint64(nv, "seq"); 1585 if (seq == 0) { 1586 pjdlog_error("Header contains no 'seq' field."); 1587 nv_free(nv); 1588 continue; 1589 } 1590 mtx_lock(&hio_recv_list_lock[ncomp]); 1591 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1592 if (hio->hio_ggio.gctl_seq == seq) { 1593 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1594 hio_next[ncomp]); 1595 break; 1596 } 1597 } 1598 mtx_unlock(&hio_recv_list_lock[ncomp]); 1599 if (hio == NULL) { 1600 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1601 (uintmax_t)seq); 1602 nv_free(nv); 1603 continue; 1604 } 1605 error = nv_get_int16(nv, "error"); 1606 if (error != 0) { 1607 /* Request failed on remote side. */ 1608 hio->hio_errors[ncomp] = error; 1609 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1610 "Remote request failed (%s): ", strerror(error)); 1611 nv_free(nv); 1612 goto done_queue; 1613 } 1614 ggio = &hio->hio_ggio; 1615 switch (ggio->gctl_cmd) { 1616 case BIO_READ: 1617 rw_rlock(&hio_remote_lock[ncomp]); 1618 if (!ISCONNECTED(res, ncomp)) { 1619 rw_unlock(&hio_remote_lock[ncomp]); 1620 nv_free(nv); 1621 goto done_queue; 1622 } 1623 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1624 ggio->gctl_data, ggio->gctl_length) < 0) { 1625 hio->hio_errors[ncomp] = errno; 1626 pjdlog_errno(LOG_ERR, 1627 "Unable to receive reply data"); 1628 rw_unlock(&hio_remote_lock[ncomp]); 1629 nv_free(nv); 1630 remote_close(res, ncomp); 1631 goto done_queue; 1632 } 1633 rw_unlock(&hio_remote_lock[ncomp]); 1634 break; 1635 case BIO_WRITE: 1636 case BIO_DELETE: 1637 case BIO_FLUSH: 1638 break; 1639 default: 1640 PJDLOG_ABORT("invalid condition"); 1641 } 1642 hio->hio_errors[ncomp] = 0; 1643 nv_free(nv); 1644done_queue: 1645 if (refcount_release(&hio->hio_countdown)) { 1646 if (ISSYNCREQ(hio)) { 1647 mtx_lock(&sync_lock); 1648 SYNCREQDONE(hio); 1649 mtx_unlock(&sync_lock); 1650 cv_signal(&sync_cond); 1651 } else { 1652 pjdlog_debug(2, 1653 "remote_recv: (%p) Moving request to the done queue.", 1654 hio); 1655 QUEUE_INSERT2(hio, done); 1656 } 1657 } 1658 } 1659 /* NOTREACHED */ 1660 return (NULL); 1661} 1662 1663/* 1664 * Thread sends answer to the kernel. 1665 */ 1666static void * 1667ggate_send_thread(void *arg) 1668{ 1669 struct hast_resource *res = arg; 1670 struct g_gate_ctl_io *ggio; 1671 struct hio *hio; 1672 unsigned int ii, ncomp, ncomps; 1673 1674 ncomps = HAST_NCOMPONENTS; 1675 1676 for (;;) { 1677 pjdlog_debug(2, "ggate_send: Taking request."); 1678 QUEUE_TAKE2(hio, done); 1679 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1680 ggio = &hio->hio_ggio; 1681 for (ii = 0; ii < ncomps; ii++) { 1682 if (hio->hio_errors[ii] == 0) { 1683 /* 1684 * One successful request is enough to declare 1685 * success. 1686 */ 1687 ggio->gctl_error = 0; 1688 break; 1689 } 1690 } 1691 if (ii == ncomps) { 1692 /* 1693 * None of the requests were successful. 1694 * Use the error from local component except the 1695 * case when we did only remote request. 1696 */ 1697 if (ggio->gctl_cmd == BIO_READ && 1698 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1699 ggio->gctl_error = hio->hio_errors[1]; 1700 else 1701 ggio->gctl_error = hio->hio_errors[0]; 1702 } 1703 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1704 mtx_lock(&res->hr_amp_lock); 1705 if (activemap_write_complete(res->hr_amp, 1706 ggio->gctl_offset, ggio->gctl_length)) { 1707 res->hr_stat_activemap_update++; 1708 (void)hast_activemap_flush(res); 1709 } 1710 mtx_unlock(&res->hr_amp_lock); 1711 } 1712 if (ggio->gctl_cmd == BIO_WRITE) { 1713 /* 1714 * Unlock range we locked. 1715 */ 1716 mtx_lock(&range_lock); 1717 rangelock_del(range_regular, ggio->gctl_offset, 1718 ggio->gctl_length); 1719 if (range_sync_wait) 1720 cv_signal(&range_sync_cond); 1721 mtx_unlock(&range_lock); 1722 /* 1723 * Bump local count if this is first write after 1724 * connection failure with remote node. 1725 */ 1726 ncomp = 1; 1727 rw_rlock(&hio_remote_lock[ncomp]); 1728 if (!ISCONNECTED(res, ncomp)) { 1729 mtx_lock(&metadata_lock); 1730 if (res->hr_primary_localcnt == 1731 res->hr_secondary_remotecnt) { 1732 res->hr_primary_localcnt++; 1733 pjdlog_debug(1, 1734 "Increasing localcnt to %ju.", 1735 (uintmax_t)res->hr_primary_localcnt); 1736 (void)metadata_write(res); 1737 } 1738 mtx_unlock(&metadata_lock); 1739 } 1740 rw_unlock(&hio_remote_lock[ncomp]); 1741 } 1742 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1743 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1744 pjdlog_debug(2, 1745 "ggate_send: (%p) Moving request to the free queue.", hio); 1746 QUEUE_INSERT2(hio, free); 1747 } 1748 /* NOTREACHED */ 1749 return (NULL); 1750} 1751 1752/* 1753 * Thread synchronize local and remote components. 1754 */ 1755static void * 1756sync_thread(void *arg __unused) 1757{ 1758 struct hast_resource *res = arg; 1759 struct hio *hio; 1760 struct g_gate_ctl_io *ggio; 1761 struct timeval tstart, tend, tdiff; 1762 unsigned int ii, ncomp, ncomps; 1763 off_t offset, length, synced; 1764 bool dorewind; 1765 int syncext; 1766 1767 ncomps = HAST_NCOMPONENTS; 1768 dorewind = true; 1769 synced = 0; 1770 offset = -1; 1771 1772 for (;;) { 1773 mtx_lock(&sync_lock); 1774 if (offset >= 0 && !sync_inprogress) { 1775 gettimeofday(&tend, NULL); 1776 timersub(&tend, &tstart, &tdiff); 1777 pjdlog_info("Synchronization interrupted after %#.0T. " 1778 "%NB synchronized so far.", &tdiff, 1779 (intmax_t)synced); 1780 event_send(res, EVENT_SYNCINTR); 1781 } 1782 while (!sync_inprogress) { 1783 dorewind = true; 1784 synced = 0; 1785 cv_wait(&sync_cond, &sync_lock); 1786 } 1787 mtx_unlock(&sync_lock); 1788 /* 1789 * Obtain offset at which we should synchronize. 1790 * Rewind synchronization if needed. 1791 */ 1792 mtx_lock(&res->hr_amp_lock); 1793 if (dorewind) 1794 activemap_sync_rewind(res->hr_amp); 1795 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1796 if (syncext != -1) { 1797 /* 1798 * We synchronized entire syncext extent, we can mark 1799 * it as clean now. 1800 */ 1801 if (activemap_extent_complete(res->hr_amp, syncext)) 1802 (void)hast_activemap_flush(res); 1803 } 1804 mtx_unlock(&res->hr_amp_lock); 1805 if (dorewind) { 1806 dorewind = false; 1807 if (offset < 0) 1808 pjdlog_info("Nodes are in sync."); 1809 else { 1810 pjdlog_info("Synchronization started. %NB to go.", 1811 (intmax_t)(res->hr_extentsize * 1812 activemap_ndirty(res->hr_amp))); 1813 event_send(res, EVENT_SYNCSTART); 1814 gettimeofday(&tstart, NULL); 1815 } 1816 } 1817 if (offset < 0) { 1818 sync_stop(); 1819 pjdlog_debug(1, "Nothing to synchronize."); 1820 /* 1821 * Synchronization complete, make both localcnt and 1822 * remotecnt equal. 1823 */ 1824 ncomp = 1; 1825 rw_rlock(&hio_remote_lock[ncomp]); 1826 if (ISCONNECTED(res, ncomp)) { 1827 if (synced > 0) { 1828 int64_t bps; 1829 1830 gettimeofday(&tend, NULL); 1831 timersub(&tend, &tstart, &tdiff); 1832 bps = (int64_t)((double)synced / 1833 ((double)tdiff.tv_sec + 1834 (double)tdiff.tv_usec / 1000000)); 1835 pjdlog_info("Synchronization complete. " 1836 "%NB synchronized in %#.0lT (%NB/sec).", 1837 (intmax_t)synced, &tdiff, 1838 (intmax_t)bps); 1839 event_send(res, EVENT_SYNCDONE); 1840 } 1841 mtx_lock(&metadata_lock); 1842 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1843 res->hr_primary_localcnt = 1844 res->hr_secondary_remotecnt; 1845 res->hr_primary_remotecnt = 1846 res->hr_secondary_localcnt; 1847 pjdlog_debug(1, 1848 "Setting localcnt to %ju and remotecnt to %ju.", 1849 (uintmax_t)res->hr_primary_localcnt, 1850 (uintmax_t)res->hr_primary_remotecnt); 1851 (void)metadata_write(res); 1852 mtx_unlock(&metadata_lock); 1853 } 1854 rw_unlock(&hio_remote_lock[ncomp]); 1855 continue; 1856 } 1857 pjdlog_debug(2, "sync: Taking free request."); 1858 QUEUE_TAKE2(hio, free); 1859 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1860 /* 1861 * Lock the range we are going to synchronize. We don't want 1862 * race where someone writes between our read and write. 1863 */ 1864 for (;;) { 1865 mtx_lock(&range_lock); 1866 if (rangelock_islocked(range_regular, offset, length)) { 1867 pjdlog_debug(2, 1868 "sync: Range offset=%jd length=%jd locked.", 1869 (intmax_t)offset, (intmax_t)length); 1870 range_sync_wait = true; 1871 cv_wait(&range_sync_cond, &range_lock); 1872 range_sync_wait = false; 1873 mtx_unlock(&range_lock); 1874 continue; 1875 } 1876 if (rangelock_add(range_sync, offset, length) < 0) { 1877 mtx_unlock(&range_lock); 1878 pjdlog_debug(2, 1879 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1880 (intmax_t)offset, (intmax_t)length); 1881 sleep(1); 1882 continue; 1883 } 1884 mtx_unlock(&range_lock); 1885 break; 1886 } 1887 /* 1888 * First read the data from synchronization source. 1889 */ 1890 SYNCREQ(hio); 1891 ggio = &hio->hio_ggio; 1892 ggio->gctl_cmd = BIO_READ; 1893 ggio->gctl_offset = offset; 1894 ggio->gctl_length = length; 1895 ggio->gctl_error = 0; 1896 for (ii = 0; ii < ncomps; ii++) 1897 hio->hio_errors[ii] = EINVAL; 1898 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1899 hio); 1900 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1901 hio); 1902 mtx_lock(&metadata_lock); 1903 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1904 /* 1905 * This range is up-to-date on local component, 1906 * so handle request locally. 1907 */ 1908 /* Local component is 0 for now. */ 1909 ncomp = 0; 1910 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1911 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1912 /* 1913 * This range is out-of-date on local component, 1914 * so send request to the remote node. 1915 */ 1916 /* Remote component is 1 for now. */ 1917 ncomp = 1; 1918 } 1919 mtx_unlock(&metadata_lock); 1920 refcount_init(&hio->hio_countdown, 1); 1921 QUEUE_INSERT1(hio, send, ncomp); 1922 1923 /* 1924 * Let's wait for READ to finish. 1925 */ 1926 mtx_lock(&sync_lock); 1927 while (!ISSYNCREQDONE(hio)) 1928 cv_wait(&sync_cond, &sync_lock); 1929 mtx_unlock(&sync_lock); 1930 1931 if (hio->hio_errors[ncomp] != 0) { 1932 pjdlog_error("Unable to read synchronization data: %s.", 1933 strerror(hio->hio_errors[ncomp])); 1934 goto free_queue; 1935 } 1936 1937 /* 1938 * We read the data from synchronization source, now write it 1939 * to synchronization target. 1940 */ 1941 SYNCREQ(hio); 1942 ggio->gctl_cmd = BIO_WRITE; 1943 for (ii = 0; ii < ncomps; ii++) 1944 hio->hio_errors[ii] = EINVAL; 1945 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1946 hio); 1947 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1948 hio); 1949 mtx_lock(&metadata_lock); 1950 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1951 /* 1952 * This range is up-to-date on local component, 1953 * so we update remote component. 1954 */ 1955 /* Remote component is 1 for now. */ 1956 ncomp = 1; 1957 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1958 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1959 /* 1960 * This range is out-of-date on local component, 1961 * so we update it. 1962 */ 1963 /* Local component is 0 for now. */ 1964 ncomp = 0; 1965 } 1966 mtx_unlock(&metadata_lock); 1967 1968 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1969 hio); 1970 refcount_init(&hio->hio_countdown, 1); 1971 QUEUE_INSERT1(hio, send, ncomp); 1972 1973 /* 1974 * Let's wait for WRITE to finish. 1975 */ 1976 mtx_lock(&sync_lock); 1977 while (!ISSYNCREQDONE(hio)) 1978 cv_wait(&sync_cond, &sync_lock); 1979 mtx_unlock(&sync_lock); 1980 1981 if (hio->hio_errors[ncomp] != 0) { 1982 pjdlog_error("Unable to write synchronization data: %s.", 1983 strerror(hio->hio_errors[ncomp])); 1984 goto free_queue; 1985 } 1986 1987 synced += length; 1988free_queue: 1989 mtx_lock(&range_lock); 1990 rangelock_del(range_sync, offset, length); 1991 if (range_regular_wait) 1992 cv_signal(&range_regular_cond); 1993 mtx_unlock(&range_lock); 1994 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1995 hio); 1996 QUEUE_INSERT2(hio, free); 1997 } 1998 /* NOTREACHED */ 1999 return (NULL); 2000} 2001 2002void 2003primary_config_reload(struct hast_resource *res, struct nv *nv) 2004{ 2005 unsigned int ii, ncomps; 2006 int modified, vint; 2007 const char *vstr; 2008 2009 pjdlog_info("Reloading configuration..."); 2010 2011 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2012 PJDLOG_ASSERT(gres == res); 2013 nv_assert(nv, "remoteaddr"); 2014 nv_assert(nv, "sourceaddr"); 2015 nv_assert(nv, "replication"); 2016 nv_assert(nv, "checksum"); 2017 nv_assert(nv, "compression"); 2018 nv_assert(nv, "timeout"); 2019 nv_assert(nv, "exec"); 2020 nv_assert(nv, "metaflush"); 2021 2022 ncomps = HAST_NCOMPONENTS; 2023 2024#define MODIFIED_REMOTEADDR 0x01 2025#define MODIFIED_SOURCEADDR 0x02 2026#define MODIFIED_REPLICATION 0x04 2027#define MODIFIED_CHECKSUM 0x08 2028#define MODIFIED_COMPRESSION 0x10 2029#define MODIFIED_TIMEOUT 0x20 2030#define MODIFIED_EXEC 0x40 2031#define MODIFIED_METAFLUSH 0x80 2032 modified = 0; 2033 2034 vstr = nv_get_string(nv, "remoteaddr"); 2035 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2036 /* 2037 * Don't copy res->hr_remoteaddr to gres just yet. 2038 * We want remote_close() to log disconnect from the old 2039 * addresses, not from the new ones. 2040 */ 2041 modified |= MODIFIED_REMOTEADDR; 2042 } 2043 vstr = nv_get_string(nv, "sourceaddr"); 2044 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2045 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2046 modified |= MODIFIED_SOURCEADDR; 2047 } 2048 vint = nv_get_int32(nv, "replication"); 2049 if (gres->hr_replication != vint) { 2050 gres->hr_replication = vint; 2051 modified |= MODIFIED_REPLICATION; 2052 } 2053 vint = nv_get_int32(nv, "checksum"); 2054 if (gres->hr_checksum != vint) { 2055 gres->hr_checksum = vint; 2056 modified |= MODIFIED_CHECKSUM; 2057 } 2058 vint = nv_get_int32(nv, "compression"); 2059 if (gres->hr_compression != vint) { 2060 gres->hr_compression = vint; 2061 modified |= MODIFIED_COMPRESSION; 2062 } 2063 vint = nv_get_int32(nv, "timeout"); 2064 if (gres->hr_timeout != vint) { 2065 gres->hr_timeout = vint; 2066 modified |= MODIFIED_TIMEOUT; 2067 } 2068 vstr = nv_get_string(nv, "exec"); 2069 if (strcmp(gres->hr_exec, vstr) != 0) { 2070 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2071 modified |= MODIFIED_EXEC; 2072 } 2073 vint = nv_get_int32(nv, "metaflush"); 2074 if (gres->hr_metaflush != vint) { 2075 gres->hr_metaflush = vint; 2076 modified |= MODIFIED_METAFLUSH; 2077 } 2078 2079 /* 2080 * Change timeout for connected sockets. 2081 * Don't bother if we need to reconnect. 2082 */ 2083 if ((modified & MODIFIED_TIMEOUT) != 0 && 2084 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2085 MODIFIED_REPLICATION)) == 0) { 2086 for (ii = 0; ii < ncomps; ii++) { 2087 if (!ISREMOTE(ii)) 2088 continue; 2089 rw_rlock(&hio_remote_lock[ii]); 2090 if (!ISCONNECTED(gres, ii)) { 2091 rw_unlock(&hio_remote_lock[ii]); 2092 continue; 2093 } 2094 rw_unlock(&hio_remote_lock[ii]); 2095 if (proto_timeout(gres->hr_remotein, 2096 gres->hr_timeout) < 0) { 2097 pjdlog_errno(LOG_WARNING, 2098 "Unable to set connection timeout"); 2099 } 2100 if (proto_timeout(gres->hr_remoteout, 2101 gres->hr_timeout) < 0) { 2102 pjdlog_errno(LOG_WARNING, 2103 "Unable to set connection timeout"); 2104 } 2105 } 2106 } 2107 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2108 MODIFIED_REPLICATION)) != 0) { 2109 for (ii = 0; ii < ncomps; ii++) { 2110 if (!ISREMOTE(ii)) 2111 continue; 2112 remote_close(gres, ii); 2113 } 2114 if (modified & MODIFIED_REMOTEADDR) { 2115 vstr = nv_get_string(nv, "remoteaddr"); 2116 strlcpy(gres->hr_remoteaddr, vstr, 2117 sizeof(gres->hr_remoteaddr)); 2118 } 2119 } 2120#undef MODIFIED_REMOTEADDR 2121#undef MODIFIED_SOURCEADDR 2122#undef MODIFIED_REPLICATION 2123#undef MODIFIED_CHECKSUM 2124#undef MODIFIED_COMPRESSION 2125#undef MODIFIED_TIMEOUT 2126#undef MODIFIED_EXEC 2127#undef MODIFIED_METAFLUSH 2128 2129 pjdlog_info("Configuration reloaded successfully."); 2130} 2131 2132static void 2133guard_one(struct hast_resource *res, unsigned int ncomp) 2134{ 2135 struct proto_conn *in, *out; 2136 2137 if (!ISREMOTE(ncomp)) 2138 return; 2139 2140 rw_rlock(&hio_remote_lock[ncomp]); 2141 2142 if (!real_remote(res)) { 2143 rw_unlock(&hio_remote_lock[ncomp]); 2144 return; 2145 } 2146 2147 if (ISCONNECTED(res, ncomp)) { 2148 PJDLOG_ASSERT(res->hr_remotein != NULL); 2149 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2150 rw_unlock(&hio_remote_lock[ncomp]); 2151 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2152 res->hr_remoteaddr); 2153 return; 2154 } 2155 2156 PJDLOG_ASSERT(res->hr_remotein == NULL); 2157 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2158 /* 2159 * Upgrade the lock. It doesn't have to be atomic as no other thread 2160 * can change connection status from disconnected to connected. 2161 */ 2162 rw_unlock(&hio_remote_lock[ncomp]); 2163 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2164 res->hr_remoteaddr); 2165 in = out = NULL; 2166 if (init_remote(res, &in, &out) == 0) { 2167 rw_wlock(&hio_remote_lock[ncomp]); 2168 PJDLOG_ASSERT(res->hr_remotein == NULL); 2169 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2170 PJDLOG_ASSERT(in != NULL && out != NULL); 2171 res->hr_remotein = in; 2172 res->hr_remoteout = out; 2173 rw_unlock(&hio_remote_lock[ncomp]); 2174 pjdlog_info("Successfully reconnected to %s.", 2175 res->hr_remoteaddr); 2176 sync_start(); 2177 } else { 2178 /* Both connections should be NULL. */ 2179 PJDLOG_ASSERT(res->hr_remotein == NULL); 2180 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2181 PJDLOG_ASSERT(in == NULL && out == NULL); 2182 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2183 res->hr_remoteaddr); 2184 } 2185} 2186 2187/* 2188 * Thread guards remote connections and reconnects when needed, handles 2189 * signals, etc. 2190 */ 2191static void * 2192guard_thread(void *arg) 2193{ 2194 struct hast_resource *res = arg; 2195 unsigned int ii, ncomps; 2196 struct timespec timeout; 2197 time_t lastcheck, now; 2198 sigset_t mask; 2199 int signo; 2200 2201 ncomps = HAST_NCOMPONENTS; 2202 lastcheck = time(NULL); 2203 2204 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2205 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2206 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2207 2208 timeout.tv_sec = HAST_KEEPALIVE; 2209 timeout.tv_nsec = 0; 2210 signo = -1; 2211 2212 for (;;) { 2213 switch (signo) { 2214 case SIGINT: 2215 case SIGTERM: 2216 sigexit_received = true; 2217 primary_exitx(EX_OK, 2218 "Termination signal received, exiting."); 2219 break; 2220 default: 2221 break; 2222 } 2223 2224 /* 2225 * Don't check connections until we fully started, 2226 * as we may still be looping, waiting for remote node 2227 * to switch from primary to secondary. 2228 */ 2229 if (fullystarted) { 2230 pjdlog_debug(2, "remote_guard: Checking connections."); 2231 now = time(NULL); 2232 if (lastcheck + HAST_KEEPALIVE <= now) { 2233 for (ii = 0; ii < ncomps; ii++) 2234 guard_one(res, ii); 2235 lastcheck = now; 2236 } 2237 } 2238 signo = sigtimedwait(&mask, NULL, &timeout); 2239 } 2240 /* NOTREACHED */ 2241 return (NULL); 2242} 2243