primary.c revision 259192
1204076Spjd/*- 2204076Spjd * Copyright (c) 2009 The FreeBSD Foundation 3219351Spjd * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4204076Spjd * All rights reserved. 5204076Spjd * 6204076Spjd * This software was developed by Pawel Jakub Dawidek under sponsorship from 7204076Spjd * the FreeBSD Foundation. 8204076Spjd * 9204076Spjd * Redistribution and use in source and binary forms, with or without 10204076Spjd * modification, are permitted provided that the following conditions 11204076Spjd * are met: 12204076Spjd * 1. Redistributions of source code must retain the above copyright 13204076Spjd * notice, this list of conditions and the following disclaimer. 14204076Spjd * 2. Redistributions in binary form must reproduce the above copyright 15204076Spjd * notice, this list of conditions and the following disclaimer in the 16204076Spjd * documentation and/or other materials provided with the distribution. 17204076Spjd * 18204076Spjd * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19204076Spjd * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20204076Spjd * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21204076Spjd * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22204076Spjd * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23204076Spjd * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24204076Spjd * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25204076Spjd * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26204076Spjd * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27204076Spjd * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28204076Spjd * SUCH DAMAGE. 29204076Spjd */ 30204076Spjd 31204076Spjd#include <sys/cdefs.h> 32204076Spjd__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 259192 2013-12-10 19:58:10Z trociny $"); 33204076Spjd 34204076Spjd#include <sys/types.h> 35204076Spjd#include <sys/time.h> 36204076Spjd#include <sys/bio.h> 37204076Spjd#include <sys/disk.h> 38204076Spjd#include <sys/stat.h> 39204076Spjd 40204076Spjd#include <geom/gate/g_gate.h> 41204076Spjd 42204076Spjd#include <err.h> 43204076Spjd#include <errno.h> 44204076Spjd#include <fcntl.h> 45204076Spjd#include <libgeom.h> 46204076Spjd#include <pthread.h> 47211982Spjd#include <signal.h> 48204076Spjd#include <stdint.h> 49204076Spjd#include <stdio.h> 50204076Spjd#include <string.h> 51204076Spjd#include <sysexits.h> 52204076Spjd#include <unistd.h> 53204076Spjd 54204076Spjd#include <activemap.h> 55204076Spjd#include <nv.h> 56204076Spjd#include <rangelock.h> 57204076Spjd 58204076Spjd#include "control.h" 59212038Spjd#include "event.h" 60204076Spjd#include "hast.h" 61204076Spjd#include "hast_proto.h" 62204076Spjd#include "hastd.h" 63211886Spjd#include "hooks.h" 64204076Spjd#include "metadata.h" 65204076Spjd#include "proto.h" 66204076Spjd#include "pjdlog.h" 67246922Spjd#include "refcnt.h" 68204076Spjd#include "subr.h" 69204076Spjd#include "synch.h" 70204076Spjd 71210886Spjd/* The is only one remote component for now. */ 72210886Spjd#define ISREMOTE(no) ((no) == 1) 73210886Spjd 74204076Spjdstruct hio { 75204076Spjd /* 76204076Spjd * Number of components we are still waiting for. 77204076Spjd * When this field goes to 0, we can send the request back to the 78204076Spjd * kernel. Each component has to decrease this counter by one 79204076Spjd * even on failure. 80204076Spjd */ 81249969Sed refcnt_t hio_countdown; 82204076Spjd /* 83204076Spjd * Each component has a place to store its own error. 84204076Spjd * Once the request is handled by all components we can decide if the 85204076Spjd * request overall is successful or not. 86204076Spjd */ 87204076Spjd int *hio_errors; 88204076Spjd /* 89219818Spjd * Structure used to communicate with GEOM Gate class. 90204076Spjd */ 91204076Spjd struct g_gate_ctl_io hio_ggio; 92226859Spjd /* 93226859Spjd * Request was already confirmed to GEOM Gate. 94226859Spjd */ 95226859Spjd bool hio_done; 96226859Spjd /* 97259191Strociny * Number of components we are still waiting before sending write 98259191Strociny * completion ack to GEOM Gate. Used for memsync. 99259191Strociny */ 100259191Strociny refcnt_t hio_writecount; 101259191Strociny /* 102259191Strociny * Memsync request was acknowleged by remote. 103259191Strociny */ 104259191Strociny bool hio_memsyncacked; 105259191Strociny /* 106226859Spjd * Remember replication from the time the request was initiated, 107226859Spjd * so we won't get confused when replication changes on reload. 108226859Spjd */ 109226859Spjd int hio_replication; 110204076Spjd TAILQ_ENTRY(hio) *hio_next; 111204076Spjd}; 112204076Spjd#define hio_free_next hio_next[0] 113204076Spjd#define hio_done_next hio_next[0] 114204076Spjd 115204076Spjd/* 116204076Spjd * Free list holds unused structures. When free list is empty, we have to wait 117204076Spjd * until some in-progress requests are freed. 118204076Spjd */ 119204076Spjdstatic TAILQ_HEAD(, hio) hio_free_list; 120257155Strocinystatic size_t hio_free_list_size; 121204076Spjdstatic pthread_mutex_t hio_free_list_lock; 122204076Spjdstatic pthread_cond_t hio_free_list_cond; 123204076Spjd/* 124204076Spjd * There is one send list for every component. One requests is placed on all 125204076Spjd * send lists - each component gets the same request, but each component is 126204076Spjd * responsible for managing his own send list. 127204076Spjd */ 128204076Spjdstatic TAILQ_HEAD(, hio) *hio_send_list; 129257155Strocinystatic size_t *hio_send_list_size; 130204076Spjdstatic pthread_mutex_t *hio_send_list_lock; 131204076Spjdstatic pthread_cond_t *hio_send_list_cond; 132257155Strociny#define hio_send_local_list_size hio_send_list_size[0] 133257155Strociny#define hio_send_remote_list_size hio_send_list_size[1] 134204076Spjd/* 135204076Spjd * There is one recv list for every component, although local components don't 136204076Spjd * use recv lists as local requests are done synchronously. 137204076Spjd */ 138204076Spjdstatic TAILQ_HEAD(, hio) *hio_recv_list; 139257155Strocinystatic size_t *hio_recv_list_size; 140204076Spjdstatic pthread_mutex_t *hio_recv_list_lock; 141204076Spjdstatic pthread_cond_t *hio_recv_list_cond; 142257155Strociny#define hio_recv_remote_list_size hio_recv_list_size[1] 143204076Spjd/* 144204076Spjd * Request is placed on done list by the slowest component (the one that 145204076Spjd * decreased hio_countdown from 1 to 0). 146204076Spjd */ 147204076Spjdstatic TAILQ_HEAD(, hio) hio_done_list; 148257155Strocinystatic size_t hio_done_list_size; 149204076Spjdstatic pthread_mutex_t hio_done_list_lock; 150204076Spjdstatic pthread_cond_t hio_done_list_cond; 151204076Spjd/* 152204076Spjd * Structure below are for interaction with sync thread. 153204076Spjd */ 154204076Spjdstatic bool sync_inprogress; 155204076Spjdstatic pthread_mutex_t sync_lock; 156204076Spjdstatic pthread_cond_t sync_cond; 157204076Spjd/* 158204076Spjd * The lock below allows to synchornize access to remote connections. 159204076Spjd */ 160204076Spjdstatic pthread_rwlock_t *hio_remote_lock; 161204076Spjd 162204076Spjd/* 163204076Spjd * Lock to synchronize metadata updates. Also synchronize access to 164204076Spjd * hr_primary_localcnt and hr_primary_remotecnt fields. 165204076Spjd */ 166204076Spjdstatic pthread_mutex_t metadata_lock; 167204076Spjd 168204076Spjd/* 169204076Spjd * Maximum number of outstanding I/O requests. 170204076Spjd */ 171204076Spjd#define HAST_HIO_MAX 256 172204076Spjd/* 173204076Spjd * Number of components. At this point there are only two components: local 174204076Spjd * and remote, but in the future it might be possible to use multiple local 175204076Spjd * and remote components. 176204076Spjd */ 177204076Spjd#define HAST_NCOMPONENTS 2 178204076Spjd 179204076Spjd#define ISCONNECTED(res, no) \ 180204076Spjd ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 181204076Spjd 182204076Spjd#define QUEUE_INSERT1(hio, name, ncomp) do { \ 183204076Spjd bool _wakeup; \ 184204076Spjd \ 185204076Spjd mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 186204076Spjd _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 187204076Spjd TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 188204076Spjd hio_next[(ncomp)]); \ 189257155Strociny hio_##name##_list_size[(ncomp)]++; \ 190204076Spjd mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 191204076Spjd if (_wakeup) \ 192255714Strociny cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ 193204076Spjd} while (0) 194204076Spjd#define QUEUE_INSERT2(hio, name) do { \ 195204076Spjd bool _wakeup; \ 196204076Spjd \ 197204076Spjd mtx_lock(&hio_##name##_list_lock); \ 198204076Spjd _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 199204076Spjd TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 200257155Strociny hio_##name##_list_size++; \ 201204076Spjd mtx_unlock(&hio_##name##_list_lock); \ 202204076Spjd if (_wakeup) \ 203255714Strociny cv_broadcast(&hio_##name##_list_cond); \ 204204076Spjd} while (0) 205214692Spjd#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 206214692Spjd bool _last; \ 207214692Spjd \ 208204076Spjd mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 209214692Spjd _last = false; \ 210214692Spjd while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 211214692Spjd cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 212214692Spjd &hio_##name##_list_lock[(ncomp)], (timeout)); \ 213219864Spjd if ((timeout) != 0) \ 214214692Spjd _last = true; \ 215204076Spjd } \ 216214692Spjd if (hio != NULL) { \ 217257155Strociny PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \ 218257155Strociny hio_##name##_list_size[(ncomp)]--; \ 219214692Spjd TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 220214692Spjd hio_next[(ncomp)]); \ 221214692Spjd } \ 222204076Spjd mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 223204076Spjd} while (0) 224204076Spjd#define QUEUE_TAKE2(hio, name) do { \ 225204076Spjd mtx_lock(&hio_##name##_list_lock); \ 226204076Spjd while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 227204076Spjd cv_wait(&hio_##name##_list_cond, \ 228204076Spjd &hio_##name##_list_lock); \ 229204076Spjd } \ 230257155Strociny PJDLOG_ASSERT(hio_##name##_list_size != 0); \ 231257155Strociny hio_##name##_list_size--; \ 232204076Spjd TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 233204076Spjd mtx_unlock(&hio_##name##_list_lock); \ 234204076Spjd} while (0) 235204076Spjd 236259192Strociny#define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC) 237259192Strociny#define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC) 238259192Strociny#define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC) 239259192Strociny 240209183Spjd#define SYNCREQ(hio) do { \ 241209183Spjd (hio)->hio_ggio.gctl_unit = -1; \ 242209183Spjd (hio)->hio_ggio.gctl_seq = 1; \ 243209183Spjd} while (0) 244204076Spjd#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 245204076Spjd#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 246204076Spjd#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 247204076Spjd 248259192Strociny#define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \ 249259192Strociny (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) 250259192Strociny 251204076Spjdstatic struct hast_resource *gres; 252204076Spjd 253204076Spjdstatic pthread_mutex_t range_lock; 254204076Spjdstatic struct rangelocks *range_regular; 255204076Spjdstatic bool range_regular_wait; 256204076Spjdstatic pthread_cond_t range_regular_cond; 257204076Spjdstatic struct rangelocks *range_sync; 258204076Spjdstatic bool range_sync_wait; 259204076Spjdstatic pthread_cond_t range_sync_cond; 260220898Spjdstatic bool fullystarted; 261204076Spjd 262204076Spjdstatic void *ggate_recv_thread(void *arg); 263204076Spjdstatic void *local_send_thread(void *arg); 264204076Spjdstatic void *remote_send_thread(void *arg); 265204076Spjdstatic void *remote_recv_thread(void *arg); 266204076Spjdstatic void *ggate_send_thread(void *arg); 267204076Spjdstatic void *sync_thread(void *arg); 268204076Spjdstatic void *guard_thread(void *arg); 269204076Spjd 270211982Spjdstatic void 271257155Strocinyoutput_status_aux(struct nv *nvout) 272257155Strociny{ 273257155Strociny 274257155Strociny nv_add_uint64(nvout, (uint64_t)hio_free_list_size, 275257155Strociny "idle_queue_size"); 276257155Strociny nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size, 277257155Strociny "local_queue_size"); 278257155Strociny nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size, 279257155Strociny "send_queue_size"); 280257155Strociny nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size, 281257155Strociny "recv_queue_size"); 282257155Strociny nv_add_uint64(nvout, (uint64_t)hio_done_list_size, 283257155Strociny "done_queue_size"); 284257155Strociny} 285257155Strociny 286257155Strocinystatic void 287204076Spjdcleanup(struct hast_resource *res) 288204076Spjd{ 289204076Spjd int rerrno; 290204076Spjd 291204076Spjd /* Remember errno. */ 292204076Spjd rerrno = errno; 293204076Spjd 294204076Spjd /* Destroy ggate provider if we created one. */ 295204076Spjd if (res->hr_ggateunit >= 0) { 296204076Spjd struct g_gate_ctl_destroy ggiod; 297204076Spjd 298213533Spjd bzero(&ggiod, sizeof(ggiod)); 299204076Spjd ggiod.gctl_version = G_GATE_VERSION; 300204076Spjd ggiod.gctl_unit = res->hr_ggateunit; 301204076Spjd ggiod.gctl_force = 1; 302229945Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 303213531Spjd pjdlog_errno(LOG_WARNING, 304213531Spjd "Unable to destroy hast/%s device", 305204076Spjd res->hr_provname); 306204076Spjd } 307204076Spjd res->hr_ggateunit = -1; 308204076Spjd } 309204076Spjd 310204076Spjd /* Restore errno. */ 311204076Spjd errno = rerrno; 312204076Spjd} 313204076Spjd 314212899Spjdstatic __dead2 void 315204076Spjdprimary_exit(int exitcode, const char *fmt, ...) 316204076Spjd{ 317204076Spjd va_list ap; 318204076Spjd 319218138Spjd PJDLOG_ASSERT(exitcode != EX_OK); 320204076Spjd va_start(ap, fmt); 321204076Spjd pjdlogv_errno(LOG_ERR, fmt, ap); 322204076Spjd va_end(ap); 323204076Spjd cleanup(gres); 324204076Spjd exit(exitcode); 325204076Spjd} 326204076Spjd 327212899Spjdstatic __dead2 void 328204076Spjdprimary_exitx(int exitcode, const char *fmt, ...) 329204076Spjd{ 330204076Spjd va_list ap; 331204076Spjd 332204076Spjd va_start(ap, fmt); 333204076Spjd pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 334204076Spjd va_end(ap); 335204076Spjd cleanup(gres); 336204076Spjd exit(exitcode); 337204076Spjd} 338204076Spjd 339255716Strociny/* Expects res->hr_amp locked, returns unlocked. */ 340204076Spjdstatic int 341204076Spjdhast_activemap_flush(struct hast_resource *res) 342204076Spjd{ 343204076Spjd const unsigned char *buf; 344204076Spjd size_t size; 345255716Strociny int ret; 346204076Spjd 347255716Strociny mtx_lock(&res->hr_amp_diskmap_lock); 348204076Spjd buf = activemap_bitmap(res->hr_amp, &size); 349255716Strociny mtx_unlock(&res->hr_amp_lock); 350218138Spjd PJDLOG_ASSERT(buf != NULL); 351218138Spjd PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 352255716Strociny ret = 0; 353204076Spjd if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 354204076Spjd (ssize_t)size) { 355225786Spjd pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 356247281Strociny res->hr_stat_activemap_write_error++; 357255716Strociny ret = -1; 358204076Spjd } 359255716Strociny if (ret == 0 && res->hr_metaflush == 1 && 360255716Strociny g_flush(res->hr_localfd) == -1) { 361225830Spjd if (errno == EOPNOTSUPP) { 362225830Spjd pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 363225830Spjd res->hr_localpath); 364225830Spjd res->hr_metaflush = 0; 365225830Spjd } else { 366225830Spjd pjdlog_errno(LOG_ERR, 367225830Spjd "Unable to flush disk cache on activemap update"); 368247281Strociny res->hr_stat_activemap_flush_error++; 369255716Strociny ret = -1; 370225830Spjd } 371225830Spjd } 372255716Strociny mtx_unlock(&res->hr_amp_diskmap_lock); 373255716Strociny return (ret); 374204076Spjd} 375204076Spjd 376210881Spjdstatic bool 377210881Spjdreal_remote(const struct hast_resource *res) 378210881Spjd{ 379210881Spjd 380210881Spjd return (strcmp(res->hr_remoteaddr, "none") != 0); 381210881Spjd} 382210881Spjd 383204076Spjdstatic void 384204076Spjdinit_environment(struct hast_resource *res __unused) 385204076Spjd{ 386204076Spjd struct hio *hio; 387204076Spjd unsigned int ii, ncomps; 388204076Spjd 389204076Spjd /* 390204076Spjd * In the future it might be per-resource value. 391204076Spjd */ 392204076Spjd ncomps = HAST_NCOMPONENTS; 393204076Spjd 394204076Spjd /* 395204076Spjd * Allocate memory needed by lists. 396204076Spjd */ 397204076Spjd hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 398204076Spjd if (hio_send_list == NULL) { 399204076Spjd primary_exitx(EX_TEMPFAIL, 400204076Spjd "Unable to allocate %zu bytes of memory for send lists.", 401204076Spjd sizeof(hio_send_list[0]) * ncomps); 402204076Spjd } 403257155Strociny hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps); 404257155Strociny if (hio_send_list_size == NULL) { 405257155Strociny primary_exitx(EX_TEMPFAIL, 406257155Strociny "Unable to allocate %zu bytes of memory for send list counters.", 407257155Strociny sizeof(hio_send_list_size[0]) * ncomps); 408257155Strociny } 409204076Spjd hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 410204076Spjd if (hio_send_list_lock == NULL) { 411204076Spjd primary_exitx(EX_TEMPFAIL, 412204076Spjd "Unable to allocate %zu bytes of memory for send list locks.", 413204076Spjd sizeof(hio_send_list_lock[0]) * ncomps); 414204076Spjd } 415204076Spjd hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 416204076Spjd if (hio_send_list_cond == NULL) { 417204076Spjd primary_exitx(EX_TEMPFAIL, 418204076Spjd "Unable to allocate %zu bytes of memory for send list condition variables.", 419204076Spjd sizeof(hio_send_list_cond[0]) * ncomps); 420204076Spjd } 421204076Spjd hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 422204076Spjd if (hio_recv_list == NULL) { 423204076Spjd primary_exitx(EX_TEMPFAIL, 424204076Spjd "Unable to allocate %zu bytes of memory for recv lists.", 425204076Spjd sizeof(hio_recv_list[0]) * ncomps); 426204076Spjd } 427257155Strociny hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps); 428257155Strociny if (hio_recv_list_size == NULL) { 429257155Strociny primary_exitx(EX_TEMPFAIL, 430257155Strociny "Unable to allocate %zu bytes of memory for recv list counters.", 431257155Strociny sizeof(hio_recv_list_size[0]) * ncomps); 432257155Strociny } 433204076Spjd hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 434204076Spjd if (hio_recv_list_lock == NULL) { 435204076Spjd primary_exitx(EX_TEMPFAIL, 436204076Spjd "Unable to allocate %zu bytes of memory for recv list locks.", 437204076Spjd sizeof(hio_recv_list_lock[0]) * ncomps); 438204076Spjd } 439204076Spjd hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 440204076Spjd if (hio_recv_list_cond == NULL) { 441204076Spjd primary_exitx(EX_TEMPFAIL, 442204076Spjd "Unable to allocate %zu bytes of memory for recv list condition variables.", 443204076Spjd sizeof(hio_recv_list_cond[0]) * ncomps); 444204076Spjd } 445204076Spjd hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 446204076Spjd if (hio_remote_lock == NULL) { 447204076Spjd primary_exitx(EX_TEMPFAIL, 448204076Spjd "Unable to allocate %zu bytes of memory for remote connections locks.", 449204076Spjd sizeof(hio_remote_lock[0]) * ncomps); 450204076Spjd } 451204076Spjd 452204076Spjd /* 453257155Strociny * Initialize lists, their counters, locks and condition variables. 454204076Spjd */ 455204076Spjd TAILQ_INIT(&hio_free_list); 456204076Spjd mtx_init(&hio_free_list_lock); 457204076Spjd cv_init(&hio_free_list_cond); 458204076Spjd for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 459204076Spjd TAILQ_INIT(&hio_send_list[ii]); 460257155Strociny hio_send_list_size[ii] = 0; 461204076Spjd mtx_init(&hio_send_list_lock[ii]); 462204076Spjd cv_init(&hio_send_list_cond[ii]); 463204076Spjd TAILQ_INIT(&hio_recv_list[ii]); 464257155Strociny hio_recv_list_size[ii] = 0; 465204076Spjd mtx_init(&hio_recv_list_lock[ii]); 466204076Spjd cv_init(&hio_recv_list_cond[ii]); 467204076Spjd rw_init(&hio_remote_lock[ii]); 468204076Spjd } 469204076Spjd TAILQ_INIT(&hio_done_list); 470204076Spjd mtx_init(&hio_done_list_lock); 471204076Spjd cv_init(&hio_done_list_cond); 472204076Spjd mtx_init(&metadata_lock); 473204076Spjd 474204076Spjd /* 475204076Spjd * Allocate requests pool and initialize requests. 476204076Spjd */ 477204076Spjd for (ii = 0; ii < HAST_HIO_MAX; ii++) { 478204076Spjd hio = malloc(sizeof(*hio)); 479204076Spjd if (hio == NULL) { 480204076Spjd primary_exitx(EX_TEMPFAIL, 481204076Spjd "Unable to allocate %zu bytes of memory for hio request.", 482204076Spjd sizeof(*hio)); 483204076Spjd } 484249969Sed refcnt_init(&hio->hio_countdown, 0); 485204076Spjd hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 486204076Spjd if (hio->hio_errors == NULL) { 487204076Spjd primary_exitx(EX_TEMPFAIL, 488204076Spjd "Unable allocate %zu bytes of memory for hio errors.", 489204076Spjd sizeof(hio->hio_errors[0]) * ncomps); 490204076Spjd } 491204076Spjd hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 492204076Spjd if (hio->hio_next == NULL) { 493204076Spjd primary_exitx(EX_TEMPFAIL, 494204076Spjd "Unable allocate %zu bytes of memory for hio_next field.", 495204076Spjd sizeof(hio->hio_next[0]) * ncomps); 496204076Spjd } 497204076Spjd hio->hio_ggio.gctl_version = G_GATE_VERSION; 498204076Spjd hio->hio_ggio.gctl_data = malloc(MAXPHYS); 499204076Spjd if (hio->hio_ggio.gctl_data == NULL) { 500204076Spjd primary_exitx(EX_TEMPFAIL, 501204076Spjd "Unable to allocate %zu bytes of memory for gctl_data.", 502204076Spjd MAXPHYS); 503204076Spjd } 504204076Spjd hio->hio_ggio.gctl_length = MAXPHYS; 505204076Spjd hio->hio_ggio.gctl_error = 0; 506204076Spjd TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 507257155Strociny hio_free_list_size++; 508204076Spjd } 509204076Spjd} 510204076Spjd 511214284Spjdstatic bool 512214284Spjdinit_resuid(struct hast_resource *res) 513214284Spjd{ 514214284Spjd 515214284Spjd mtx_lock(&metadata_lock); 516214284Spjd if (res->hr_resuid != 0) { 517214284Spjd mtx_unlock(&metadata_lock); 518214284Spjd return (false); 519214284Spjd } else { 520214284Spjd /* Initialize unique resource identifier. */ 521214284Spjd arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 522214284Spjd mtx_unlock(&metadata_lock); 523229945Spjd if (metadata_write(res) == -1) 524214284Spjd exit(EX_NOINPUT); 525214284Spjd return (true); 526214284Spjd } 527214284Spjd} 528214284Spjd 529204076Spjdstatic void 530204076Spjdinit_local(struct hast_resource *res) 531204076Spjd{ 532204076Spjd unsigned char *buf; 533204076Spjd size_t mapsize; 534204076Spjd 535229945Spjd if (metadata_read(res, true) == -1) 536204076Spjd exit(EX_NOINPUT); 537204076Spjd mtx_init(&res->hr_amp_lock); 538204076Spjd if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 539229945Spjd res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 540204076Spjd primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 541204076Spjd } 542204076Spjd mtx_init(&range_lock); 543204076Spjd cv_init(&range_regular_cond); 544229945Spjd if (rangelock_init(&range_regular) == -1) 545204076Spjd primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 546204076Spjd cv_init(&range_sync_cond); 547229945Spjd if (rangelock_init(&range_sync) == -1) 548204076Spjd primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 549204076Spjd mapsize = activemap_ondisk_size(res->hr_amp); 550204076Spjd buf = calloc(1, mapsize); 551204076Spjd if (buf == NULL) { 552204076Spjd primary_exitx(EX_TEMPFAIL, 553204076Spjd "Unable to allocate buffer for activemap."); 554204076Spjd } 555204076Spjd if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 556204076Spjd (ssize_t)mapsize) { 557204076Spjd primary_exit(EX_NOINPUT, "Unable to read activemap"); 558204076Spjd } 559204076Spjd activemap_copyin(res->hr_amp, buf, mapsize); 560209181Spjd free(buf); 561204076Spjd if (res->hr_resuid != 0) 562204076Spjd return; 563204076Spjd /* 564214284Spjd * We're using provider for the first time. Initialize local and remote 565214284Spjd * counters. We don't initialize resuid here, as we want to do it just 566214284Spjd * in time. The reason for this is that we want to inform secondary 567214284Spjd * that there were no writes yet, so there is no need to synchronize 568214284Spjd * anything. 569204076Spjd */ 570219844Spjd res->hr_primary_localcnt = 0; 571204076Spjd res->hr_primary_remotecnt = 0; 572229945Spjd if (metadata_write(res) == -1) 573204076Spjd exit(EX_NOINPUT); 574204076Spjd} 575204076Spjd 576218218Spjdstatic int 577218218Spjdprimary_connect(struct hast_resource *res, struct proto_conn **connp) 578218218Spjd{ 579218218Spjd struct proto_conn *conn; 580218218Spjd int16_t val; 581218218Spjd 582218218Spjd val = 1; 583229945Spjd if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 584218218Spjd primary_exit(EX_TEMPFAIL, 585218218Spjd "Unable to send connection request to parent"); 586218218Spjd } 587229945Spjd if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 588218218Spjd primary_exit(EX_TEMPFAIL, 589218218Spjd "Unable to receive reply to connection request from parent"); 590218218Spjd } 591218218Spjd if (val != 0) { 592218218Spjd errno = val; 593218218Spjd pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 594218218Spjd res->hr_remoteaddr); 595218218Spjd return (-1); 596218218Spjd } 597229945Spjd if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 598218218Spjd primary_exit(EX_TEMPFAIL, 599218218Spjd "Unable to receive connection from parent"); 600218218Spjd } 601229945Spjd if (proto_connect_wait(conn, res->hr_timeout) == -1) { 602218218Spjd pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 603218218Spjd res->hr_remoteaddr); 604218218Spjd proto_close(conn); 605218218Spjd return (-1); 606218218Spjd } 607218218Spjd /* Error in setting timeout is not critical, but why should it fail? */ 608229945Spjd if (proto_timeout(conn, res->hr_timeout) == -1) 609218218Spjd pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 610218218Spjd 611218218Spjd *connp = conn; 612218218Spjd 613218218Spjd return (0); 614218218Spjd} 615246922Spjd 616238120Spjd/* 617238120Spjd * Function instructs GEOM_GATE to handle reads directly from within the kernel. 618238120Spjd */ 619238120Spjdstatic void 620238120Spjdenable_direct_reads(struct hast_resource *res) 621238120Spjd{ 622238120Spjd struct g_gate_ctl_modify ggiomodify; 623218218Spjd 624238120Spjd bzero(&ggiomodify, sizeof(ggiomodify)); 625238120Spjd ggiomodify.gctl_version = G_GATE_VERSION; 626238120Spjd ggiomodify.gctl_unit = res->hr_ggateunit; 627238120Spjd ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 628238120Spjd strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 629238120Spjd sizeof(ggiomodify.gctl_readprov)); 630238120Spjd ggiomodify.gctl_readoffset = res->hr_localoff; 631238120Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 632238120Spjd pjdlog_debug(1, "Direct reads enabled."); 633238120Spjd else 634238120Spjd pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 635238120Spjd} 636238120Spjd 637220898Spjdstatic int 638205738Spjdinit_remote(struct hast_resource *res, struct proto_conn **inp, 639205738Spjd struct proto_conn **outp) 640204076Spjd{ 641205738Spjd struct proto_conn *in, *out; 642204076Spjd struct nv *nvout, *nvin; 643204076Spjd const unsigned char *token; 644204076Spjd unsigned char *map; 645204076Spjd const char *errmsg; 646204076Spjd int32_t extentsize; 647204076Spjd int64_t datasize; 648204076Spjd uint32_t mapsize; 649246922Spjd uint8_t version; 650204076Spjd size_t size; 651220898Spjd int error; 652204076Spjd 653218138Spjd PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 654218138Spjd PJDLOG_ASSERT(real_remote(res)); 655205738Spjd 656205738Spjd in = out = NULL; 657211983Spjd errmsg = NULL; 658205738Spjd 659218218Spjd if (primary_connect(res, &out) == -1) 660220898Spjd return (ECONNREFUSED); 661218218Spjd 662220898Spjd error = ECONNABORTED; 663220898Spjd 664204076Spjd /* 665204076Spjd * First handshake step. 666204076Spjd * Setup outgoing connection with remote node. 667204076Spjd */ 668204076Spjd nvout = nv_alloc(); 669204076Spjd nv_add_string(nvout, res->hr_name, "resource"); 670246922Spjd nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 671204076Spjd if (nv_error(nvout) != 0) { 672204076Spjd pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 673204076Spjd "Unable to allocate header for connection with %s", 674204076Spjd res->hr_remoteaddr); 675204076Spjd nv_free(nvout); 676204076Spjd goto close; 677204076Spjd } 678229945Spjd if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 679204076Spjd pjdlog_errno(LOG_WARNING, 680204076Spjd "Unable to send handshake header to %s", 681204076Spjd res->hr_remoteaddr); 682204076Spjd nv_free(nvout); 683204076Spjd goto close; 684204076Spjd } 685204076Spjd nv_free(nvout); 686229945Spjd if (hast_proto_recv_hdr(out, &nvin) == -1) { 687204076Spjd pjdlog_errno(LOG_WARNING, 688204076Spjd "Unable to receive handshake header from %s", 689204076Spjd res->hr_remoteaddr); 690204076Spjd goto close; 691204076Spjd } 692204076Spjd errmsg = nv_get_string(nvin, "errmsg"); 693204076Spjd if (errmsg != NULL) { 694204076Spjd pjdlog_warning("%s", errmsg); 695220898Spjd if (nv_exists(nvin, "wait")) 696220898Spjd error = EBUSY; 697204076Spjd nv_free(nvin); 698204076Spjd goto close; 699204076Spjd } 700246922Spjd version = nv_get_uint8(nvin, "version"); 701246922Spjd if (version == 0) { 702246922Spjd /* 703246922Spjd * If no version is sent, it means this is protocol version 1. 704246922Spjd */ 705246922Spjd version = 1; 706246922Spjd } 707246922Spjd if (version > HAST_PROTO_VERSION) { 708246922Spjd pjdlog_warning("Invalid version received (%hhu).", version); 709246922Spjd nv_free(nvin); 710246922Spjd goto close; 711246922Spjd } 712246922Spjd res->hr_version = version; 713246922Spjd pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 714204076Spjd token = nv_get_uint8_array(nvin, &size, "token"); 715204076Spjd if (token == NULL) { 716204076Spjd pjdlog_warning("Handshake header from %s has no 'token' field.", 717204076Spjd res->hr_remoteaddr); 718204076Spjd nv_free(nvin); 719204076Spjd goto close; 720204076Spjd } 721204076Spjd if (size != sizeof(res->hr_token)) { 722204076Spjd pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 723204076Spjd res->hr_remoteaddr, size, sizeof(res->hr_token)); 724204076Spjd nv_free(nvin); 725204076Spjd goto close; 726204076Spjd } 727204076Spjd bcopy(token, res->hr_token, sizeof(res->hr_token)); 728204076Spjd nv_free(nvin); 729204076Spjd 730204076Spjd /* 731204076Spjd * Second handshake step. 732204076Spjd * Setup incoming connection with remote node. 733204076Spjd */ 734218218Spjd if (primary_connect(res, &in) == -1) 735204076Spjd goto close; 736218218Spjd 737204076Spjd nvout = nv_alloc(); 738204076Spjd nv_add_string(nvout, res->hr_name, "resource"); 739204076Spjd nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 740204076Spjd "token"); 741214284Spjd if (res->hr_resuid == 0) { 742214284Spjd /* 743214284Spjd * The resuid field was not yet initialized. 744214284Spjd * Because we do synchronization inside init_resuid(), it is 745214284Spjd * possible that someone already initialized it, the function 746214284Spjd * will return false then, but if we successfully initialized 747214284Spjd * it, we will get true. True means that there were no writes 748214284Spjd * to this resource yet and we want to inform secondary that 749214284Spjd * synchronization is not needed by sending "virgin" argument. 750214284Spjd */ 751214284Spjd if (init_resuid(res)) 752214284Spjd nv_add_int8(nvout, 1, "virgin"); 753214284Spjd } 754204076Spjd nv_add_uint64(nvout, res->hr_resuid, "resuid"); 755204076Spjd nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 756204076Spjd nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 757204076Spjd if (nv_error(nvout) != 0) { 758204076Spjd pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 759204076Spjd "Unable to allocate header for connection with %s", 760204076Spjd res->hr_remoteaddr); 761204076Spjd nv_free(nvout); 762204076Spjd goto close; 763204076Spjd } 764229945Spjd if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 765204076Spjd pjdlog_errno(LOG_WARNING, 766204076Spjd "Unable to send handshake header to %s", 767204076Spjd res->hr_remoteaddr); 768204076Spjd nv_free(nvout); 769204076Spjd goto close; 770204076Spjd } 771204076Spjd nv_free(nvout); 772229945Spjd if (hast_proto_recv_hdr(out, &nvin) == -1) { 773204076Spjd pjdlog_errno(LOG_WARNING, 774204076Spjd "Unable to receive handshake header from %s", 775204076Spjd res->hr_remoteaddr); 776204076Spjd goto close; 777204076Spjd } 778204076Spjd errmsg = nv_get_string(nvin, "errmsg"); 779204076Spjd if (errmsg != NULL) { 780204076Spjd pjdlog_warning("%s", errmsg); 781204076Spjd nv_free(nvin); 782204076Spjd goto close; 783204076Spjd } 784204076Spjd datasize = nv_get_int64(nvin, "datasize"); 785204076Spjd if (datasize != res->hr_datasize) { 786204076Spjd pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 787204076Spjd (intmax_t)res->hr_datasize, (intmax_t)datasize); 788204076Spjd nv_free(nvin); 789204076Spjd goto close; 790204076Spjd } 791204076Spjd extentsize = nv_get_int32(nvin, "extentsize"); 792204076Spjd if (extentsize != res->hr_extentsize) { 793204076Spjd pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 794204076Spjd (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 795204076Spjd nv_free(nvin); 796204076Spjd goto close; 797204076Spjd } 798204076Spjd res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 799204076Spjd res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 800204076Spjd res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 801238120Spjd if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 802238120Spjd enable_direct_reads(res); 803220865Spjd if (nv_exists(nvin, "virgin")) { 804220865Spjd /* 805220865Spjd * Secondary was reinitialized, bump localcnt if it is 0 as 806220865Spjd * only we have the data. 807220865Spjd */ 808220865Spjd PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 809220865Spjd PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 810220865Spjd 811220865Spjd if (res->hr_primary_localcnt == 0) { 812220865Spjd PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 813220865Spjd 814220865Spjd mtx_lock(&metadata_lock); 815220865Spjd res->hr_primary_localcnt++; 816220865Spjd pjdlog_debug(1, "Increasing localcnt to %ju.", 817220865Spjd (uintmax_t)res->hr_primary_localcnt); 818220865Spjd (void)metadata_write(res); 819220865Spjd mtx_unlock(&metadata_lock); 820220865Spjd } 821220865Spjd } 822204076Spjd map = NULL; 823204076Spjd mapsize = nv_get_uint32(nvin, "mapsize"); 824204076Spjd if (mapsize > 0) { 825204076Spjd map = malloc(mapsize); 826204076Spjd if (map == NULL) { 827204076Spjd pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 828204076Spjd (uintmax_t)mapsize); 829204076Spjd nv_free(nvin); 830204076Spjd goto close; 831204076Spjd } 832204076Spjd /* 833204076Spjd * Remote node have some dirty extents on its own, lets 834204076Spjd * download its activemap. 835204076Spjd */ 836205738Spjd if (hast_proto_recv_data(res, out, nvin, map, 837229945Spjd mapsize) == -1) { 838204076Spjd pjdlog_errno(LOG_ERR, 839204076Spjd "Unable to receive remote activemap"); 840204076Spjd nv_free(nvin); 841204076Spjd free(map); 842204076Spjd goto close; 843204076Spjd } 844257154Strociny mtx_lock(&res->hr_amp_lock); 845204076Spjd /* 846204076Spjd * Merge local and remote bitmaps. 847204076Spjd */ 848204076Spjd activemap_merge(res->hr_amp, map, mapsize); 849204076Spjd free(map); 850204076Spjd /* 851204076Spjd * Now that we merged bitmaps from both nodes, flush it to the 852204076Spjd * disk before we start to synchronize. 853204076Spjd */ 854204076Spjd (void)hast_activemap_flush(res); 855204076Spjd } 856214274Spjd nv_free(nvin); 857223181Strociny#ifdef notyet 858220271Spjd /* Setup directions. */ 859220271Spjd if (proto_send(out, NULL, 0) == -1) 860220271Spjd pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 861220271Spjd if (proto_recv(in, NULL, 0) == -1) 862220271Spjd pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 863223181Strociny#endif 864204076Spjd pjdlog_info("Connected to %s.", res->hr_remoteaddr); 865246922Spjd if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 866246922Spjd res->hr_version < 2) { 867246922Spjd pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 868246922Spjd res->hr_replication = HAST_REPLICATION_FULLSYNC; 869246922Spjd } else if (res->hr_replication != res->hr_original_replication) { 870246922Spjd /* 871246922Spjd * This is in case hastd disconnected and was upgraded. 872246922Spjd */ 873246922Spjd res->hr_replication = res->hr_original_replication; 874246922Spjd } 875205738Spjd if (inp != NULL && outp != NULL) { 876205738Spjd *inp = in; 877205738Spjd *outp = out; 878205738Spjd } else { 879205738Spjd res->hr_remotein = in; 880205738Spjd res->hr_remoteout = out; 881205738Spjd } 882212038Spjd event_send(res, EVENT_CONNECT); 883220898Spjd return (0); 884205738Spjdclose: 885211983Spjd if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 886212038Spjd event_send(res, EVENT_SPLITBRAIN); 887205738Spjd proto_close(out); 888205738Spjd if (in != NULL) 889205738Spjd proto_close(in); 890220898Spjd return (error); 891205738Spjd} 892205738Spjd 893205738Spjdstatic void 894205738Spjdsync_start(void) 895205738Spjd{ 896205738Spjd 897204076Spjd mtx_lock(&sync_lock); 898204076Spjd sync_inprogress = true; 899204076Spjd mtx_unlock(&sync_lock); 900204076Spjd cv_signal(&sync_cond); 901204076Spjd} 902204076Spjd 903204076Spjdstatic void 904211878Spjdsync_stop(void) 905211878Spjd{ 906211878Spjd 907211878Spjd mtx_lock(&sync_lock); 908211878Spjd if (sync_inprogress) 909211878Spjd sync_inprogress = false; 910211878Spjd mtx_unlock(&sync_lock); 911211878Spjd} 912211878Spjd 913211878Spjdstatic void 914204076Spjdinit_ggate(struct hast_resource *res) 915204076Spjd{ 916204076Spjd struct g_gate_ctl_create ggiocreate; 917204076Spjd struct g_gate_ctl_cancel ggiocancel; 918204076Spjd 919204076Spjd /* 920204076Spjd * We communicate with ggate via /dev/ggctl. Open it. 921204076Spjd */ 922204076Spjd res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 923229945Spjd if (res->hr_ggatefd == -1) 924204076Spjd primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 925204076Spjd /* 926204076Spjd * Create provider before trying to connect, as connection failure 927204076Spjd * is not critical, but may take some time. 928204076Spjd */ 929213533Spjd bzero(&ggiocreate, sizeof(ggiocreate)); 930204076Spjd ggiocreate.gctl_version = G_GATE_VERSION; 931204076Spjd ggiocreate.gctl_mediasize = res->hr_datasize; 932204076Spjd ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 933204076Spjd ggiocreate.gctl_flags = 0; 934220266Spjd ggiocreate.gctl_maxcount = 0; 935204076Spjd ggiocreate.gctl_timeout = 0; 936204076Spjd ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 937204076Spjd snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 938204076Spjd res->hr_provname); 939204076Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 940204076Spjd pjdlog_info("Device hast/%s created.", res->hr_provname); 941204076Spjd res->hr_ggateunit = ggiocreate.gctl_unit; 942204076Spjd return; 943204076Spjd } 944204076Spjd if (errno != EEXIST) { 945204076Spjd primary_exit(EX_OSERR, "Unable to create hast/%s device", 946204076Spjd res->hr_provname); 947204076Spjd } 948204076Spjd pjdlog_debug(1, 949204076Spjd "Device hast/%s already exists, we will try to take it over.", 950204076Spjd res->hr_provname); 951204076Spjd /* 952204076Spjd * If we received EEXIST, we assume that the process who created the 953204076Spjd * provider died and didn't clean up. In that case we will start from 954204076Spjd * where he left of. 955204076Spjd */ 956213533Spjd bzero(&ggiocancel, sizeof(ggiocancel)); 957204076Spjd ggiocancel.gctl_version = G_GATE_VERSION; 958204076Spjd ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 959204076Spjd snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 960204076Spjd res->hr_provname); 961204076Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 962204076Spjd pjdlog_info("Device hast/%s recovered.", res->hr_provname); 963204076Spjd res->hr_ggateunit = ggiocancel.gctl_unit; 964204076Spjd return; 965204076Spjd } 966204076Spjd primary_exit(EX_OSERR, "Unable to take over hast/%s device", 967204076Spjd res->hr_provname); 968204076Spjd} 969204076Spjd 970204076Spjdvoid 971204076Spjdhastd_primary(struct hast_resource *res) 972204076Spjd{ 973204076Spjd pthread_t td; 974204076Spjd pid_t pid; 975219482Strociny int error, mode, debuglevel; 976204076Spjd 977204076Spjd /* 978218218Spjd * Create communication channel for sending control commands from 979218218Spjd * parent to child. 980204076Spjd */ 981229945Spjd if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 982218042Spjd /* TODO: There's no need for this to be fatal error. */ 983204076Spjd KEEP_ERRNO((void)pidfile_remove(pfh)); 984212034Spjd pjdlog_exit(EX_OSERR, 985204076Spjd "Unable to create control sockets between parent and child"); 986204076Spjd } 987212038Spjd /* 988218218Spjd * Create communication channel for sending events from child to parent. 989212038Spjd */ 990229945Spjd if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 991218042Spjd /* TODO: There's no need for this to be fatal error. */ 992212038Spjd KEEP_ERRNO((void)pidfile_remove(pfh)); 993212038Spjd pjdlog_exit(EX_OSERR, 994212038Spjd "Unable to create event sockets between child and parent"); 995212038Spjd } 996218218Spjd /* 997218218Spjd * Create communication channel for sending connection requests from 998218218Spjd * child to parent. 999218218Spjd */ 1000229945Spjd if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 1001218218Spjd /* TODO: There's no need for this to be fatal error. */ 1002218218Spjd KEEP_ERRNO((void)pidfile_remove(pfh)); 1003218218Spjd pjdlog_exit(EX_OSERR, 1004218218Spjd "Unable to create connection sockets between child and parent"); 1005218218Spjd } 1006204076Spjd 1007204076Spjd pid = fork(); 1008229744Spjd if (pid == -1) { 1009218042Spjd /* TODO: There's no need for this to be fatal error. */ 1010204076Spjd KEEP_ERRNO((void)pidfile_remove(pfh)); 1011212034Spjd pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 1012204076Spjd } 1013204076Spjd 1014204076Spjd if (pid > 0) { 1015204076Spjd /* This is parent. */ 1016212038Spjd /* Declare that we are receiver. */ 1017212038Spjd proto_recv(res->hr_event, NULL, 0); 1018218218Spjd proto_recv(res->hr_conn, NULL, 0); 1019218043Spjd /* Declare that we are sender. */ 1020218043Spjd proto_send(res->hr_ctrl, NULL, 0); 1021204076Spjd res->hr_workerpid = pid; 1022204076Spjd return; 1023204076Spjd } 1024211977Spjd 1025211984Spjd gres = res; 1026257155Strociny res->output_status_aux = output_status_aux; 1027218043Spjd mode = pjdlog_mode_get(); 1028219482Strociny debuglevel = pjdlog_debug_get(); 1029211984Spjd 1030218043Spjd /* Declare that we are sender. */ 1031218043Spjd proto_send(res->hr_event, NULL, 0); 1032218218Spjd proto_send(res->hr_conn, NULL, 0); 1033218043Spjd /* Declare that we are receiver. */ 1034218043Spjd proto_recv(res->hr_ctrl, NULL, 0); 1035218043Spjd descriptors_cleanup(res); 1036204076Spjd 1037218045Spjd descriptors_assert(res, mode); 1038218045Spjd 1039218043Spjd pjdlog_init(mode); 1040219482Strociny pjdlog_debug_set(debuglevel); 1041218043Spjd pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 1042220005Spjd setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 1043204076Spjd 1044204076Spjd init_local(res); 1045213007Spjd init_ggate(res); 1046213007Spjd init_environment(res); 1047217784Spjd 1048221899Spjd if (drop_privs(res) != 0) { 1049218049Spjd cleanup(res); 1050218049Spjd exit(EX_CONFIG); 1051218049Spjd } 1052218214Spjd pjdlog_info("Privileges successfully dropped."); 1053218049Spjd 1054213007Spjd /* 1055213530Spjd * Create the guard thread first, so we can handle signals from the 1056229778Suqs * very beginning. 1057213530Spjd */ 1058213530Spjd error = pthread_create(&td, NULL, guard_thread, res); 1059218138Spjd PJDLOG_ASSERT(error == 0); 1060213530Spjd /* 1061213007Spjd * Create the control thread before sending any event to the parent, 1062213007Spjd * as we can deadlock when parent sends control request to worker, 1063213007Spjd * but worker has no control thread started yet, so parent waits. 1064213007Spjd * In the meantime worker sends an event to the parent, but parent 1065213007Spjd * is unable to handle the event, because it waits for control 1066213007Spjd * request response. 1067213007Spjd */ 1068213007Spjd error = pthread_create(&td, NULL, ctrl_thread, res); 1069218138Spjd PJDLOG_ASSERT(error == 0); 1070220898Spjd if (real_remote(res)) { 1071220898Spjd error = init_remote(res, NULL, NULL); 1072220898Spjd if (error == 0) { 1073220898Spjd sync_start(); 1074220898Spjd } else if (error == EBUSY) { 1075220898Spjd time_t start = time(NULL); 1076220898Spjd 1077220898Spjd pjdlog_warning("Waiting for remote node to become %s for %ds.", 1078220898Spjd role2str(HAST_ROLE_SECONDARY), 1079220898Spjd res->hr_timeout); 1080220898Spjd for (;;) { 1081220898Spjd sleep(1); 1082220898Spjd error = init_remote(res, NULL, NULL); 1083220898Spjd if (error != EBUSY) 1084220898Spjd break; 1085220898Spjd if (time(NULL) > start + res->hr_timeout) 1086220898Spjd break; 1087220898Spjd } 1088220898Spjd if (error == EBUSY) { 1089220898Spjd pjdlog_warning("Remote node is still %s, starting anyway.", 1090220898Spjd role2str(HAST_ROLE_PRIMARY)); 1091220898Spjd } 1092220898Spjd } 1093220898Spjd } 1094204076Spjd error = pthread_create(&td, NULL, ggate_recv_thread, res); 1095218138Spjd PJDLOG_ASSERT(error == 0); 1096204076Spjd error = pthread_create(&td, NULL, local_send_thread, res); 1097218138Spjd PJDLOG_ASSERT(error == 0); 1098204076Spjd error = pthread_create(&td, NULL, remote_send_thread, res); 1099218138Spjd PJDLOG_ASSERT(error == 0); 1100204076Spjd error = pthread_create(&td, NULL, remote_recv_thread, res); 1101218138Spjd PJDLOG_ASSERT(error == 0); 1102204076Spjd error = pthread_create(&td, NULL, ggate_send_thread, res); 1103218138Spjd PJDLOG_ASSERT(error == 0); 1104220898Spjd fullystarted = true; 1105213530Spjd (void)sync_thread(res); 1106204076Spjd} 1107204076Spjd 1108204076Spjdstatic void 1109246922Spjdreqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1110246922Spjd const char *fmt, ...) 1111204076Spjd{ 1112204076Spjd char msg[1024]; 1113204076Spjd va_list ap; 1114204076Spjd 1115204076Spjd va_start(ap, fmt); 1116236507Spjd (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1117204076Spjd va_end(ap); 1118236507Spjd switch (ggio->gctl_cmd) { 1119236507Spjd case BIO_READ: 1120236507Spjd (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1121246922Spjd (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1122236507Spjd break; 1123236507Spjd case BIO_DELETE: 1124236507Spjd (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1125246922Spjd (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1126236507Spjd break; 1127236507Spjd case BIO_FLUSH: 1128236507Spjd (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1129236507Spjd break; 1130236507Spjd case BIO_WRITE: 1131236507Spjd (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1132246922Spjd (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1133236507Spjd break; 1134236507Spjd default: 1135236507Spjd (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1136236507Spjd (unsigned int)ggio->gctl_cmd); 1137236507Spjd break; 1138204076Spjd } 1139204076Spjd pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1140204076Spjd} 1141204076Spjd 1142204076Spjdstatic void 1143204076Spjdremote_close(struct hast_resource *res, int ncomp) 1144204076Spjd{ 1145204076Spjd 1146204076Spjd rw_wlock(&hio_remote_lock[ncomp]); 1147204076Spjd /* 1148226855Spjd * Check for a race between dropping rlock and acquiring wlock - 1149204076Spjd * another thread can close connection in-between. 1150204076Spjd */ 1151204076Spjd if (!ISCONNECTED(res, ncomp)) { 1152218138Spjd PJDLOG_ASSERT(res->hr_remotein == NULL); 1153218138Spjd PJDLOG_ASSERT(res->hr_remoteout == NULL); 1154204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1155204076Spjd return; 1156204076Spjd } 1157204076Spjd 1158218138Spjd PJDLOG_ASSERT(res->hr_remotein != NULL); 1159218138Spjd PJDLOG_ASSERT(res->hr_remoteout != NULL); 1160204076Spjd 1161211881Spjd pjdlog_debug(2, "Closing incoming connection to %s.", 1162204076Spjd res->hr_remoteaddr); 1163204076Spjd proto_close(res->hr_remotein); 1164204076Spjd res->hr_remotein = NULL; 1165211881Spjd pjdlog_debug(2, "Closing outgoing connection to %s.", 1166204076Spjd res->hr_remoteaddr); 1167204076Spjd proto_close(res->hr_remoteout); 1168204076Spjd res->hr_remoteout = NULL; 1169204076Spjd 1170204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1171204076Spjd 1172211881Spjd pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1173211881Spjd 1174204076Spjd /* 1175204076Spjd * Stop synchronization if in-progress. 1176204076Spjd */ 1177211878Spjd sync_stop(); 1178211984Spjd 1179212038Spjd event_send(res, EVENT_DISCONNECT); 1180204076Spjd} 1181204076Spjd 1182204076Spjd/* 1183226859Spjd * Acknowledge write completion to the kernel, but don't update activemap yet. 1184226859Spjd */ 1185226859Spjdstatic void 1186226859Spjdwrite_complete(struct hast_resource *res, struct hio *hio) 1187226859Spjd{ 1188226859Spjd struct g_gate_ctl_io *ggio; 1189226859Spjd unsigned int ncomp; 1190226859Spjd 1191226859Spjd PJDLOG_ASSERT(!hio->hio_done); 1192226859Spjd 1193226859Spjd ggio = &hio->hio_ggio; 1194226859Spjd PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1195226859Spjd 1196226859Spjd /* 1197226859Spjd * Bump local count if this is first write after 1198226859Spjd * connection failure with remote node. 1199226859Spjd */ 1200226859Spjd ncomp = 1; 1201226859Spjd rw_rlock(&hio_remote_lock[ncomp]); 1202226859Spjd if (!ISCONNECTED(res, ncomp)) { 1203226859Spjd mtx_lock(&metadata_lock); 1204226859Spjd if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1205226859Spjd res->hr_primary_localcnt++; 1206226859Spjd pjdlog_debug(1, "Increasing localcnt to %ju.", 1207226859Spjd (uintmax_t)res->hr_primary_localcnt); 1208226859Spjd (void)metadata_write(res); 1209226859Spjd } 1210226859Spjd mtx_unlock(&metadata_lock); 1211226859Spjd } 1212226859Spjd rw_unlock(&hio_remote_lock[ncomp]); 1213229945Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1214226859Spjd primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1215226859Spjd hio->hio_done = true; 1216226859Spjd} 1217226859Spjd 1218226859Spjd/* 1219204076Spjd * Thread receives ggate I/O requests from the kernel and passes them to 1220204076Spjd * appropriate threads: 1221204076Spjd * WRITE - always goes to both local_send and remote_send threads 1222204076Spjd * READ (when the block is up-to-date on local component) - 1223204076Spjd * only local_send thread 1224204076Spjd * READ (when the block isn't up-to-date on local component) - 1225204076Spjd * only remote_send thread 1226204076Spjd * DELETE - always goes to both local_send and remote_send threads 1227204076Spjd * FLUSH - always goes to both local_send and remote_send threads 1228204076Spjd */ 1229204076Spjdstatic void * 1230204076Spjdggate_recv_thread(void *arg) 1231204076Spjd{ 1232204076Spjd struct hast_resource *res = arg; 1233204076Spjd struct g_gate_ctl_io *ggio; 1234204076Spjd struct hio *hio; 1235204076Spjd unsigned int ii, ncomp, ncomps; 1236204076Spjd int error; 1237204076Spjd 1238204076Spjd for (;;) { 1239204076Spjd pjdlog_debug(2, "ggate_recv: Taking free request."); 1240204076Spjd QUEUE_TAKE2(hio, free); 1241204076Spjd pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1242204076Spjd ggio = &hio->hio_ggio; 1243204076Spjd ggio->gctl_unit = res->hr_ggateunit; 1244204076Spjd ggio->gctl_length = MAXPHYS; 1245204076Spjd ggio->gctl_error = 0; 1246226859Spjd hio->hio_done = false; 1247226859Spjd hio->hio_replication = res->hr_replication; 1248204076Spjd pjdlog_debug(2, 1249204076Spjd "ggate_recv: (%p) Waiting for request from the kernel.", 1250204076Spjd hio); 1251229945Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1252204076Spjd if (sigexit_received) 1253204076Spjd pthread_exit(NULL); 1254204076Spjd primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1255204076Spjd } 1256204076Spjd error = ggio->gctl_error; 1257204076Spjd switch (error) { 1258204076Spjd case 0: 1259204076Spjd break; 1260204076Spjd case ECANCELED: 1261204076Spjd /* Exit gracefully. */ 1262204076Spjd if (!sigexit_received) { 1263204076Spjd pjdlog_debug(2, 1264204076Spjd "ggate_recv: (%p) Received cancel from the kernel.", 1265204076Spjd hio); 1266204076Spjd pjdlog_info("Received cancel from the kernel, exiting."); 1267204076Spjd } 1268204076Spjd pthread_exit(NULL); 1269204076Spjd case ENOMEM: 1270204076Spjd /* 1271204076Spjd * Buffer too small? Impossible, we allocate MAXPHYS 1272204076Spjd * bytes - request can't be bigger than that. 1273204076Spjd */ 1274204076Spjd /* FALLTHROUGH */ 1275204076Spjd case ENXIO: 1276204076Spjd default: 1277204076Spjd primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1278204076Spjd strerror(error)); 1279204076Spjd } 1280226859Spjd 1281226859Spjd ncomp = 0; 1282226859Spjd ncomps = HAST_NCOMPONENTS; 1283226859Spjd 1284204076Spjd for (ii = 0; ii < ncomps; ii++) 1285204076Spjd hio->hio_errors[ii] = EINVAL; 1286204076Spjd reqlog(LOG_DEBUG, 2, ggio, 1287204076Spjd "ggate_recv: (%p) Request received from the kernel: ", 1288204076Spjd hio); 1289226859Spjd 1290204076Spjd /* 1291204076Spjd * Inform all components about new write request. 1292204076Spjd * For read request prefer local component unless the given 1293204076Spjd * range is out-of-date, then use remote component. 1294204076Spjd */ 1295204076Spjd switch (ggio->gctl_cmd) { 1296204076Spjd case BIO_READ: 1297222228Spjd res->hr_stat_read++; 1298226859Spjd ncomps = 1; 1299204076Spjd mtx_lock(&metadata_lock); 1300204076Spjd if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1301204076Spjd res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1302204076Spjd /* 1303204076Spjd * This range is up-to-date on local component, 1304204076Spjd * so handle request locally. 1305204076Spjd */ 1306204076Spjd /* Local component is 0 for now. */ 1307204076Spjd ncomp = 0; 1308204076Spjd } else /* if (res->hr_syncsrc == 1309204076Spjd HAST_SYNCSRC_SECONDARY) */ { 1310218138Spjd PJDLOG_ASSERT(res->hr_syncsrc == 1311204076Spjd HAST_SYNCSRC_SECONDARY); 1312204076Spjd /* 1313204076Spjd * This range is out-of-date on local component, 1314204076Spjd * so send request to the remote node. 1315204076Spjd */ 1316204076Spjd /* Remote component is 1 for now. */ 1317204076Spjd ncomp = 1; 1318204076Spjd } 1319204076Spjd mtx_unlock(&metadata_lock); 1320204076Spjd break; 1321204076Spjd case BIO_WRITE: 1322222228Spjd res->hr_stat_write++; 1323226851Spjd if (res->hr_resuid == 0 && 1324226851Spjd res->hr_primary_localcnt == 0) { 1325226851Spjd /* This is first write. */ 1326219844Spjd res->hr_primary_localcnt = 1; 1327214284Spjd } 1328204076Spjd for (;;) { 1329204076Spjd mtx_lock(&range_lock); 1330204076Spjd if (rangelock_islocked(range_sync, 1331204076Spjd ggio->gctl_offset, ggio->gctl_length)) { 1332204076Spjd pjdlog_debug(2, 1333204076Spjd "regular: Range offset=%jd length=%zu locked.", 1334204076Spjd (intmax_t)ggio->gctl_offset, 1335204076Spjd (size_t)ggio->gctl_length); 1336204076Spjd range_regular_wait = true; 1337204076Spjd cv_wait(&range_regular_cond, &range_lock); 1338204076Spjd range_regular_wait = false; 1339204076Spjd mtx_unlock(&range_lock); 1340204076Spjd continue; 1341204076Spjd } 1342204076Spjd if (rangelock_add(range_regular, 1343229945Spjd ggio->gctl_offset, ggio->gctl_length) == -1) { 1344204076Spjd mtx_unlock(&range_lock); 1345204076Spjd pjdlog_debug(2, 1346204076Spjd "regular: Range offset=%jd length=%zu is already locked, waiting.", 1347204076Spjd (intmax_t)ggio->gctl_offset, 1348204076Spjd (size_t)ggio->gctl_length); 1349204076Spjd sleep(1); 1350204076Spjd continue; 1351204076Spjd } 1352204076Spjd mtx_unlock(&range_lock); 1353204076Spjd break; 1354204076Spjd } 1355204076Spjd mtx_lock(&res->hr_amp_lock); 1356204076Spjd if (activemap_write_start(res->hr_amp, 1357204076Spjd ggio->gctl_offset, ggio->gctl_length)) { 1358222228Spjd res->hr_stat_activemap_update++; 1359204076Spjd (void)hast_activemap_flush(res); 1360255716Strociny } else { 1361255716Strociny mtx_unlock(&res->hr_amp_lock); 1362204076Spjd } 1363259192Strociny if (ISMEMSYNC(hio)) { 1364259191Strociny hio->hio_memsyncacked = false; 1365259191Strociny refcnt_init(&hio->hio_writecount, ncomps); 1366259191Strociny } 1367226859Spjd break; 1368204076Spjd case BIO_DELETE: 1369226859Spjd res->hr_stat_delete++; 1370226859Spjd break; 1371204076Spjd case BIO_FLUSH: 1372226859Spjd res->hr_stat_flush++; 1373204076Spjd break; 1374204076Spjd } 1375226859Spjd pjdlog_debug(2, 1376226859Spjd "ggate_recv: (%p) Moving request to the send queues.", hio); 1377259191Strociny refcnt_init(&hio->hio_countdown, ncomps); 1378246922Spjd for (ii = ncomp; ii < ncomps; ii++) 1379226859Spjd QUEUE_INSERT1(hio, send, ii); 1380204076Spjd } 1381204076Spjd /* NOTREACHED */ 1382204076Spjd return (NULL); 1383204076Spjd} 1384204076Spjd 1385204076Spjd/* 1386204076Spjd * Thread reads from or writes to local component. 1387204076Spjd * If local read fails, it redirects it to remote_send thread. 1388204076Spjd */ 1389204076Spjdstatic void * 1390204076Spjdlocal_send_thread(void *arg) 1391204076Spjd{ 1392204076Spjd struct hast_resource *res = arg; 1393204076Spjd struct g_gate_ctl_io *ggio; 1394204076Spjd struct hio *hio; 1395204076Spjd unsigned int ncomp, rncomp; 1396204076Spjd ssize_t ret; 1397204076Spjd 1398204076Spjd /* Local component is 0 for now. */ 1399204076Spjd ncomp = 0; 1400204076Spjd /* Remote component is 1 for now. */ 1401204076Spjd rncomp = 1; 1402204076Spjd 1403204076Spjd for (;;) { 1404204076Spjd pjdlog_debug(2, "local_send: Taking request."); 1405214692Spjd QUEUE_TAKE1(hio, send, ncomp, 0); 1406204076Spjd pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1407204076Spjd ggio = &hio->hio_ggio; 1408204076Spjd switch (ggio->gctl_cmd) { 1409204076Spjd case BIO_READ: 1410204076Spjd ret = pread(res->hr_localfd, ggio->gctl_data, 1411204076Spjd ggio->gctl_length, 1412204076Spjd ggio->gctl_offset + res->hr_localoff); 1413204076Spjd if (ret == ggio->gctl_length) 1414204076Spjd hio->hio_errors[ncomp] = 0; 1415222467Strociny else if (!ISSYNCREQ(hio)) { 1416204076Spjd /* 1417204076Spjd * If READ failed, try to read from remote node. 1418204076Spjd */ 1419229945Spjd if (ret == -1) { 1420216479Spjd reqlog(LOG_WARNING, 0, ggio, 1421216479Spjd "Local request failed (%s), trying remote node. ", 1422216479Spjd strerror(errno)); 1423216479Spjd } else if (ret != ggio->gctl_length) { 1424216479Spjd reqlog(LOG_WARNING, 0, ggio, 1425216479Spjd "Local request failed (%zd != %jd), trying remote node. ", 1426216494Spjd ret, (intmax_t)ggio->gctl_length); 1427216479Spjd } 1428204076Spjd QUEUE_INSERT1(hio, send, rncomp); 1429204076Spjd continue; 1430204076Spjd } 1431204076Spjd break; 1432204076Spjd case BIO_WRITE: 1433204076Spjd ret = pwrite(res->hr_localfd, ggio->gctl_data, 1434204076Spjd ggio->gctl_length, 1435204076Spjd ggio->gctl_offset + res->hr_localoff); 1436229945Spjd if (ret == -1) { 1437204076Spjd hio->hio_errors[ncomp] = errno; 1438216479Spjd reqlog(LOG_WARNING, 0, ggio, 1439216479Spjd "Local request failed (%s): ", 1440216479Spjd strerror(errno)); 1441216479Spjd } else if (ret != ggio->gctl_length) { 1442204076Spjd hio->hio_errors[ncomp] = EIO; 1443216479Spjd reqlog(LOG_WARNING, 0, ggio, 1444216479Spjd "Local request failed (%zd != %jd): ", 1445216494Spjd ret, (intmax_t)ggio->gctl_length); 1446216479Spjd } else { 1447204076Spjd hio->hio_errors[ncomp] = 0; 1448259192Strociny if (ISASYNC(hio)) { 1449226859Spjd ggio->gctl_error = 0; 1450226859Spjd write_complete(res, hio); 1451226859Spjd } 1452216479Spjd } 1453204076Spjd break; 1454204076Spjd case BIO_DELETE: 1455204076Spjd ret = g_delete(res->hr_localfd, 1456204076Spjd ggio->gctl_offset + res->hr_localoff, 1457204076Spjd ggio->gctl_length); 1458229945Spjd if (ret == -1) { 1459204076Spjd hio->hio_errors[ncomp] = errno; 1460216479Spjd reqlog(LOG_WARNING, 0, ggio, 1461216479Spjd "Local request failed (%s): ", 1462216479Spjd strerror(errno)); 1463216479Spjd } else { 1464204076Spjd hio->hio_errors[ncomp] = 0; 1465216479Spjd } 1466204076Spjd break; 1467204076Spjd case BIO_FLUSH: 1468225832Spjd if (!res->hr_localflush) { 1469225832Spjd ret = -1; 1470225832Spjd errno = EOPNOTSUPP; 1471225832Spjd break; 1472225832Spjd } 1473204076Spjd ret = g_flush(res->hr_localfd); 1474229945Spjd if (ret == -1) { 1475225832Spjd if (errno == EOPNOTSUPP) 1476225832Spjd res->hr_localflush = false; 1477204076Spjd hio->hio_errors[ncomp] = errno; 1478216479Spjd reqlog(LOG_WARNING, 0, ggio, 1479216479Spjd "Local request failed (%s): ", 1480216479Spjd strerror(errno)); 1481216479Spjd } else { 1482204076Spjd hio->hio_errors[ncomp] = 0; 1483216479Spjd } 1484204076Spjd break; 1485204076Spjd } 1486259191Strociny if (ISMEMSYNCWRITE(hio)) { 1487259191Strociny if (refcnt_release(&hio->hio_writecount) == 0) { 1488259191Strociny write_complete(res, hio); 1489246922Spjd } 1490246922Spjd } 1491259191Strociny if (refcnt_release(&hio->hio_countdown) > 0) 1492259191Strociny continue; 1493226856Spjd if (ISSYNCREQ(hio)) { 1494226856Spjd mtx_lock(&sync_lock); 1495226856Spjd SYNCREQDONE(hio); 1496226856Spjd mtx_unlock(&sync_lock); 1497226856Spjd cv_signal(&sync_cond); 1498226856Spjd } else { 1499226856Spjd pjdlog_debug(2, 1500226856Spjd "local_send: (%p) Moving request to the done queue.", 1501226856Spjd hio); 1502226856Spjd QUEUE_INSERT2(hio, done); 1503204076Spjd } 1504204076Spjd } 1505204076Spjd /* NOTREACHED */ 1506204076Spjd return (NULL); 1507204076Spjd} 1508204076Spjd 1509214692Spjdstatic void 1510214692Spjdkeepalive_send(struct hast_resource *res, unsigned int ncomp) 1511214692Spjd{ 1512214692Spjd struct nv *nv; 1513214692Spjd 1514218217Spjd rw_rlock(&hio_remote_lock[ncomp]); 1515218217Spjd 1516218217Spjd if (!ISCONNECTED(res, ncomp)) { 1517218217Spjd rw_unlock(&hio_remote_lock[ncomp]); 1518214692Spjd return; 1519218217Spjd } 1520219864Spjd 1521218138Spjd PJDLOG_ASSERT(res->hr_remotein != NULL); 1522218138Spjd PJDLOG_ASSERT(res->hr_remoteout != NULL); 1523214692Spjd 1524214692Spjd nv = nv_alloc(); 1525214692Spjd nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1526214692Spjd if (nv_error(nv) != 0) { 1527218217Spjd rw_unlock(&hio_remote_lock[ncomp]); 1528214692Spjd nv_free(nv); 1529214692Spjd pjdlog_debug(1, 1530214692Spjd "keepalive_send: Unable to prepare header to send."); 1531214692Spjd return; 1532214692Spjd } 1533229945Spjd if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1534218217Spjd rw_unlock(&hio_remote_lock[ncomp]); 1535214692Spjd pjdlog_common(LOG_DEBUG, 1, errno, 1536214692Spjd "keepalive_send: Unable to send request"); 1537214692Spjd nv_free(nv); 1538214692Spjd remote_close(res, ncomp); 1539214692Spjd return; 1540214692Spjd } 1541218217Spjd 1542218217Spjd rw_unlock(&hio_remote_lock[ncomp]); 1543214692Spjd nv_free(nv); 1544214692Spjd pjdlog_debug(2, "keepalive_send: Request sent."); 1545214692Spjd} 1546214692Spjd 1547204076Spjd/* 1548204076Spjd * Thread sends request to secondary node. 1549204076Spjd */ 1550204076Spjdstatic void * 1551204076Spjdremote_send_thread(void *arg) 1552204076Spjd{ 1553204076Spjd struct hast_resource *res = arg; 1554204076Spjd struct g_gate_ctl_io *ggio; 1555214692Spjd time_t lastcheck, now; 1556204076Spjd struct hio *hio; 1557204076Spjd struct nv *nv; 1558204076Spjd unsigned int ncomp; 1559204076Spjd bool wakeup; 1560204076Spjd uint64_t offset, length; 1561204076Spjd uint8_t cmd; 1562204076Spjd void *data; 1563204076Spjd 1564204076Spjd /* Remote component is 1 for now. */ 1565204076Spjd ncomp = 1; 1566219864Spjd lastcheck = time(NULL); 1567204076Spjd 1568204076Spjd for (;;) { 1569204076Spjd pjdlog_debug(2, "remote_send: Taking request."); 1570219721Strociny QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1571214692Spjd if (hio == NULL) { 1572214692Spjd now = time(NULL); 1573219721Strociny if (lastcheck + HAST_KEEPALIVE <= now) { 1574214692Spjd keepalive_send(res, ncomp); 1575214692Spjd lastcheck = now; 1576214692Spjd } 1577214692Spjd continue; 1578214692Spjd } 1579204076Spjd pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1580204076Spjd ggio = &hio->hio_ggio; 1581204076Spjd switch (ggio->gctl_cmd) { 1582204076Spjd case BIO_READ: 1583204076Spjd cmd = HIO_READ; 1584204076Spjd data = NULL; 1585204076Spjd offset = ggio->gctl_offset; 1586204076Spjd length = ggio->gctl_length; 1587204076Spjd break; 1588204076Spjd case BIO_WRITE: 1589204076Spjd cmd = HIO_WRITE; 1590204076Spjd data = ggio->gctl_data; 1591204076Spjd offset = ggio->gctl_offset; 1592204076Spjd length = ggio->gctl_length; 1593204076Spjd break; 1594204076Spjd case BIO_DELETE: 1595204076Spjd cmd = HIO_DELETE; 1596204076Spjd data = NULL; 1597204076Spjd offset = ggio->gctl_offset; 1598204076Spjd length = ggio->gctl_length; 1599204076Spjd break; 1600204076Spjd case BIO_FLUSH: 1601204076Spjd cmd = HIO_FLUSH; 1602204076Spjd data = NULL; 1603204076Spjd offset = 0; 1604204076Spjd length = 0; 1605204076Spjd break; 1606204076Spjd default: 1607225783Spjd PJDLOG_ABORT("invalid condition"); 1608204076Spjd } 1609204076Spjd nv = nv_alloc(); 1610204076Spjd nv_add_uint8(nv, cmd, "cmd"); 1611204076Spjd nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1612204076Spjd nv_add_uint64(nv, offset, "offset"); 1613204076Spjd nv_add_uint64(nv, length, "length"); 1614259191Strociny if (ISMEMSYNCWRITE(hio)) 1615246922Spjd nv_add_uint8(nv, 1, "memsync"); 1616204076Spjd if (nv_error(nv) != 0) { 1617204076Spjd hio->hio_errors[ncomp] = nv_error(nv); 1618204076Spjd pjdlog_debug(2, 1619204076Spjd "remote_send: (%p) Unable to prepare header to send.", 1620204076Spjd hio); 1621204076Spjd reqlog(LOG_ERR, 0, ggio, 1622204076Spjd "Unable to prepare header to send (%s): ", 1623204076Spjd strerror(nv_error(nv))); 1624204076Spjd /* Move failed request immediately to the done queue. */ 1625204076Spjd goto done_queue; 1626204076Spjd } 1627204076Spjd /* 1628204076Spjd * Protect connection from disappearing. 1629204076Spjd */ 1630204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1631204076Spjd if (!ISCONNECTED(res, ncomp)) { 1632204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1633204076Spjd hio->hio_errors[ncomp] = ENOTCONN; 1634204076Spjd goto done_queue; 1635204076Spjd } 1636204076Spjd /* 1637204076Spjd * Move the request to recv queue before sending it, because 1638204076Spjd * in different order we can get reply before we move request 1639204076Spjd * to recv queue. 1640204076Spjd */ 1641226852Spjd pjdlog_debug(2, 1642226852Spjd "remote_send: (%p) Moving request to the recv queue.", 1643226852Spjd hio); 1644204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1645204076Spjd wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1646204076Spjd TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1647257155Strociny hio_recv_list_size[ncomp]++; 1648204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1649204076Spjd if (hast_proto_send(res, res->hr_remoteout, nv, data, 1650229945Spjd data != NULL ? length : 0) == -1) { 1651204076Spjd hio->hio_errors[ncomp] = errno; 1652204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1653204076Spjd pjdlog_debug(2, 1654204076Spjd "remote_send: (%p) Unable to send request.", hio); 1655204076Spjd reqlog(LOG_ERR, 0, ggio, 1656204076Spjd "Unable to send request (%s): ", 1657204076Spjd strerror(hio->hio_errors[ncomp])); 1658211979Spjd remote_close(res, ncomp); 1659204076Spjd /* 1660204076Spjd * Take request back from the receive queue and move 1661204076Spjd * it immediately to the done queue. 1662204076Spjd */ 1663204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1664226852Spjd TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1665226852Spjd hio_next[ncomp]); 1666257155Strociny hio_recv_list_size[ncomp]--; 1667204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1668204076Spjd goto done_queue; 1669204076Spjd } 1670204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1671204076Spjd nv_free(nv); 1672204076Spjd if (wakeup) 1673204076Spjd cv_signal(&hio_recv_list_cond[ncomp]); 1674204076Spjd continue; 1675204076Spjddone_queue: 1676204076Spjd nv_free(nv); 1677204076Spjd if (ISSYNCREQ(hio)) { 1678246922Spjd if (refcnt_release(&hio->hio_countdown) > 0) 1679204076Spjd continue; 1680204076Spjd mtx_lock(&sync_lock); 1681204076Spjd SYNCREQDONE(hio); 1682204076Spjd mtx_unlock(&sync_lock); 1683204076Spjd cv_signal(&sync_cond); 1684204076Spjd continue; 1685204076Spjd } 1686204076Spjd if (ggio->gctl_cmd == BIO_WRITE) { 1687204076Spjd mtx_lock(&res->hr_amp_lock); 1688204076Spjd if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1689204076Spjd ggio->gctl_length)) { 1690204076Spjd (void)hast_activemap_flush(res); 1691255716Strociny } else { 1692255716Strociny mtx_unlock(&res->hr_amp_lock); 1693204076Spjd } 1694259191Strociny if (ISMEMSYNCWRITE(hio)) { 1695259191Strociny if (refcnt_release(&hio->hio_writecount) == 0) { 1696259191Strociny if (hio->hio_errors[0] == 0) 1697259191Strociny write_complete(res, hio); 1698259191Strociny } 1699259191Strociny } 1700204076Spjd } 1701246922Spjd if (refcnt_release(&hio->hio_countdown) > 0) 1702204076Spjd continue; 1703204076Spjd pjdlog_debug(2, 1704204076Spjd "remote_send: (%p) Moving request to the done queue.", 1705204076Spjd hio); 1706204076Spjd QUEUE_INSERT2(hio, done); 1707204076Spjd } 1708204076Spjd /* NOTREACHED */ 1709204076Spjd return (NULL); 1710204076Spjd} 1711204076Spjd 1712204076Spjd/* 1713204076Spjd * Thread receives answer from secondary node and passes it to ggate_send 1714204076Spjd * thread. 1715204076Spjd */ 1716204076Spjdstatic void * 1717204076Spjdremote_recv_thread(void *arg) 1718204076Spjd{ 1719204076Spjd struct hast_resource *res = arg; 1720204076Spjd struct g_gate_ctl_io *ggio; 1721204076Spjd struct hio *hio; 1722204076Spjd struct nv *nv; 1723204076Spjd unsigned int ncomp; 1724204076Spjd uint64_t seq; 1725246922Spjd bool memsyncack; 1726204076Spjd int error; 1727204076Spjd 1728204076Spjd /* Remote component is 1 for now. */ 1729204076Spjd ncomp = 1; 1730204076Spjd 1731204076Spjd for (;;) { 1732204076Spjd /* Wait until there is anything to receive. */ 1733204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1734204076Spjd while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1735204076Spjd pjdlog_debug(2, "remote_recv: No requests, waiting."); 1736204076Spjd cv_wait(&hio_recv_list_cond[ncomp], 1737204076Spjd &hio_recv_list_lock[ncomp]); 1738204076Spjd } 1739204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1740226857Spjd 1741246922Spjd memsyncack = false; 1742246922Spjd 1743204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1744204076Spjd if (!ISCONNECTED(res, ncomp)) { 1745204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1746204076Spjd /* 1747204076Spjd * Connection is dead, so move all pending requests to 1748204076Spjd * the done queue (one-by-one). 1749204076Spjd */ 1750204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1751204076Spjd hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1752218138Spjd PJDLOG_ASSERT(hio != NULL); 1753204076Spjd TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1754204076Spjd hio_next[ncomp]); 1755257155Strociny hio_recv_list_size[ncomp]--; 1756204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1757259191Strociny hio->hio_errors[ncomp] = ENOTCONN; 1758204076Spjd goto done_queue; 1759204076Spjd } 1760229945Spjd if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1761204076Spjd pjdlog_errno(LOG_ERR, 1762204076Spjd "Unable to receive reply header"); 1763204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1764204076Spjd remote_close(res, ncomp); 1765204076Spjd continue; 1766204076Spjd } 1767204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1768204076Spjd seq = nv_get_uint64(nv, "seq"); 1769204076Spjd if (seq == 0) { 1770204076Spjd pjdlog_error("Header contains no 'seq' field."); 1771204076Spjd nv_free(nv); 1772204076Spjd continue; 1773204076Spjd } 1774246922Spjd memsyncack = nv_exists(nv, "received"); 1775204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1776204076Spjd TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1777204076Spjd if (hio->hio_ggio.gctl_seq == seq) { 1778204076Spjd TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1779204076Spjd hio_next[ncomp]); 1780257155Strociny hio_recv_list_size[ncomp]--; 1781204076Spjd break; 1782204076Spjd } 1783204076Spjd } 1784204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1785204076Spjd if (hio == NULL) { 1786204076Spjd pjdlog_error("Found no request matching received 'seq' field (%ju).", 1787204076Spjd (uintmax_t)seq); 1788204076Spjd nv_free(nv); 1789204076Spjd continue; 1790204076Spjd } 1791226852Spjd ggio = &hio->hio_ggio; 1792204076Spjd error = nv_get_int16(nv, "error"); 1793204076Spjd if (error != 0) { 1794204076Spjd /* Request failed on remote side. */ 1795216478Spjd hio->hio_errors[ncomp] = error; 1796226852Spjd reqlog(LOG_WARNING, 0, ggio, 1797216479Spjd "Remote request failed (%s): ", strerror(error)); 1798204076Spjd nv_free(nv); 1799204076Spjd goto done_queue; 1800204076Spjd } 1801204076Spjd switch (ggio->gctl_cmd) { 1802204076Spjd case BIO_READ: 1803204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1804204076Spjd if (!ISCONNECTED(res, ncomp)) { 1805204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1806204076Spjd nv_free(nv); 1807204076Spjd goto done_queue; 1808204076Spjd } 1809204076Spjd if (hast_proto_recv_data(res, res->hr_remotein, nv, 1810229945Spjd ggio->gctl_data, ggio->gctl_length) == -1) { 1811204076Spjd hio->hio_errors[ncomp] = errno; 1812204076Spjd pjdlog_errno(LOG_ERR, 1813204076Spjd "Unable to receive reply data"); 1814204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1815204076Spjd nv_free(nv); 1816204076Spjd remote_close(res, ncomp); 1817204076Spjd goto done_queue; 1818204076Spjd } 1819204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1820204076Spjd break; 1821204076Spjd case BIO_WRITE: 1822204076Spjd case BIO_DELETE: 1823204076Spjd case BIO_FLUSH: 1824204076Spjd break; 1825204076Spjd default: 1826225783Spjd PJDLOG_ABORT("invalid condition"); 1827204076Spjd } 1828204076Spjd hio->hio_errors[ncomp] = 0; 1829204076Spjd nv_free(nv); 1830204076Spjddone_queue: 1831259191Strociny if (ISMEMSYNCWRITE(hio)) { 1832259191Strociny if (!hio->hio_memsyncacked) { 1833259191Strociny PJDLOG_ASSERT(memsyncack || 1834259191Strociny hio->hio_errors[ncomp] != 0); 1835259191Strociny /* Remote ack arrived. */ 1836259191Strociny if (refcnt_release(&hio->hio_writecount) == 0) { 1837246922Spjd if (hio->hio_errors[0] == 0) 1838246922Spjd write_complete(res, hio); 1839259191Strociny } 1840259191Strociny hio->hio_memsyncacked = true; 1841259191Strociny if (hio->hio_errors[ncomp] == 0) { 1842246922Spjd pjdlog_debug(2, 1843259191Strociny "remote_recv: (%p) Moving request " 1844259191Strociny "back to the recv queue.", hio); 1845246922Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1846246922Spjd TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1847246922Spjd hio, hio_next[ncomp]); 1848257155Strociny hio_recv_list_size[ncomp]++; 1849246922Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1850259191Strociny continue; 1851246922Spjd } 1852259191Strociny } else { 1853259191Strociny PJDLOG_ASSERT(!memsyncack); 1854259191Strociny /* Remote final reply arrived. */ 1855246922Spjd } 1856246922Spjd } 1857259191Strociny if (refcnt_release(&hio->hio_countdown) > 0) 1858259191Strociny continue; 1859226856Spjd if (ISSYNCREQ(hio)) { 1860226856Spjd mtx_lock(&sync_lock); 1861226856Spjd SYNCREQDONE(hio); 1862226856Spjd mtx_unlock(&sync_lock); 1863226856Spjd cv_signal(&sync_cond); 1864226856Spjd } else { 1865226856Spjd pjdlog_debug(2, 1866226856Spjd "remote_recv: (%p) Moving request to the done queue.", 1867226856Spjd hio); 1868226856Spjd QUEUE_INSERT2(hio, done); 1869204076Spjd } 1870204076Spjd } 1871204076Spjd /* NOTREACHED */ 1872204076Spjd return (NULL); 1873204076Spjd} 1874204076Spjd 1875204076Spjd/* 1876204076Spjd * Thread sends answer to the kernel. 1877204076Spjd */ 1878204076Spjdstatic void * 1879204076Spjdggate_send_thread(void *arg) 1880204076Spjd{ 1881204076Spjd struct hast_resource *res = arg; 1882204076Spjd struct g_gate_ctl_io *ggio; 1883204076Spjd struct hio *hio; 1884226859Spjd unsigned int ii, ncomps; 1885204076Spjd 1886204076Spjd ncomps = HAST_NCOMPONENTS; 1887204076Spjd 1888204076Spjd for (;;) { 1889204076Spjd pjdlog_debug(2, "ggate_send: Taking request."); 1890204076Spjd QUEUE_TAKE2(hio, done); 1891204076Spjd pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1892204076Spjd ggio = &hio->hio_ggio; 1893204076Spjd for (ii = 0; ii < ncomps; ii++) { 1894204076Spjd if (hio->hio_errors[ii] == 0) { 1895204076Spjd /* 1896204076Spjd * One successful request is enough to declare 1897204076Spjd * success. 1898204076Spjd */ 1899204076Spjd ggio->gctl_error = 0; 1900204076Spjd break; 1901204076Spjd } 1902204076Spjd } 1903204076Spjd if (ii == ncomps) { 1904204076Spjd /* 1905204076Spjd * None of the requests were successful. 1906219879Strociny * Use the error from local component except the 1907219879Strociny * case when we did only remote request. 1908204076Spjd */ 1909219879Strociny if (ggio->gctl_cmd == BIO_READ && 1910219879Strociny res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1911219879Strociny ggio->gctl_error = hio->hio_errors[1]; 1912219879Strociny else 1913219879Strociny ggio->gctl_error = hio->hio_errors[0]; 1914204076Spjd } 1915204076Spjd if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1916204076Spjd mtx_lock(&res->hr_amp_lock); 1917223655Strociny if (activemap_write_complete(res->hr_amp, 1918223974Strociny ggio->gctl_offset, ggio->gctl_length)) { 1919223655Strociny res->hr_stat_activemap_update++; 1920223655Strociny (void)hast_activemap_flush(res); 1921255716Strociny } else { 1922255716Strociny mtx_unlock(&res->hr_amp_lock); 1923223655Strociny } 1924204076Spjd } 1925204076Spjd if (ggio->gctl_cmd == BIO_WRITE) { 1926204076Spjd /* 1927204076Spjd * Unlock range we locked. 1928204076Spjd */ 1929204076Spjd mtx_lock(&range_lock); 1930204076Spjd rangelock_del(range_regular, ggio->gctl_offset, 1931204076Spjd ggio->gctl_length); 1932204076Spjd if (range_sync_wait) 1933204076Spjd cv_signal(&range_sync_cond); 1934204076Spjd mtx_unlock(&range_lock); 1935226859Spjd if (!hio->hio_done) 1936226859Spjd write_complete(res, hio); 1937226859Spjd } else { 1938229945Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1939226859Spjd primary_exit(EX_OSERR, 1940226859Spjd "G_GATE_CMD_DONE failed"); 1941204076Spjd } 1942204076Spjd } 1943247281Strociny if (hio->hio_errors[0]) { 1944247281Strociny switch (ggio->gctl_cmd) { 1945247281Strociny case BIO_READ: 1946247281Strociny res->hr_stat_read_error++; 1947247281Strociny break; 1948247281Strociny case BIO_WRITE: 1949247281Strociny res->hr_stat_write_error++; 1950247281Strociny break; 1951247281Strociny case BIO_DELETE: 1952247281Strociny res->hr_stat_delete_error++; 1953247281Strociny break; 1954247281Strociny case BIO_FLUSH: 1955247281Strociny res->hr_stat_flush_error++; 1956247281Strociny break; 1957247281Strociny } 1958247281Strociny } 1959204076Spjd pjdlog_debug(2, 1960204076Spjd "ggate_send: (%p) Moving request to the free queue.", hio); 1961204076Spjd QUEUE_INSERT2(hio, free); 1962204076Spjd } 1963204076Spjd /* NOTREACHED */ 1964204076Spjd return (NULL); 1965204076Spjd} 1966204076Spjd 1967204076Spjd/* 1968204076Spjd * Thread synchronize local and remote components. 1969204076Spjd */ 1970204076Spjdstatic void * 1971204076Spjdsync_thread(void *arg __unused) 1972204076Spjd{ 1973204076Spjd struct hast_resource *res = arg; 1974204076Spjd struct hio *hio; 1975204076Spjd struct g_gate_ctl_io *ggio; 1976219372Spjd struct timeval tstart, tend, tdiff; 1977204076Spjd unsigned int ii, ncomp, ncomps; 1978204076Spjd off_t offset, length, synced; 1979238120Spjd bool dorewind, directreads; 1980204076Spjd int syncext; 1981204076Spjd 1982204076Spjd ncomps = HAST_NCOMPONENTS; 1983204076Spjd dorewind = true; 1984211897Spjd synced = 0; 1985211897Spjd offset = -1; 1986238120Spjd directreads = false; 1987204076Spjd 1988204076Spjd for (;;) { 1989204076Spjd mtx_lock(&sync_lock); 1990211897Spjd if (offset >= 0 && !sync_inprogress) { 1991219372Spjd gettimeofday(&tend, NULL); 1992219372Spjd timersub(&tend, &tstart, &tdiff); 1993219372Spjd pjdlog_info("Synchronization interrupted after %#.0T. " 1994219372Spjd "%NB synchronized so far.", &tdiff, 1995211879Spjd (intmax_t)synced); 1996212038Spjd event_send(res, EVENT_SYNCINTR); 1997211879Spjd } 1998204076Spjd while (!sync_inprogress) { 1999204076Spjd dorewind = true; 2000204076Spjd synced = 0; 2001204076Spjd cv_wait(&sync_cond, &sync_lock); 2002204076Spjd } 2003204076Spjd mtx_unlock(&sync_lock); 2004204076Spjd /* 2005204076Spjd * Obtain offset at which we should synchronize. 2006204076Spjd * Rewind synchronization if needed. 2007204076Spjd */ 2008204076Spjd mtx_lock(&res->hr_amp_lock); 2009204076Spjd if (dorewind) 2010204076Spjd activemap_sync_rewind(res->hr_amp); 2011204076Spjd offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2012204076Spjd if (syncext != -1) { 2013204076Spjd /* 2014204076Spjd * We synchronized entire syncext extent, we can mark 2015204076Spjd * it as clean now. 2016204076Spjd */ 2017204076Spjd if (activemap_extent_complete(res->hr_amp, syncext)) 2018204076Spjd (void)hast_activemap_flush(res); 2019255716Strociny else 2020255716Strociny mtx_unlock(&res->hr_amp_lock); 2021255716Strociny } else { 2022255716Strociny mtx_unlock(&res->hr_amp_lock); 2023204076Spjd } 2024204076Spjd if (dorewind) { 2025204076Spjd dorewind = false; 2026229945Spjd if (offset == -1) 2027204076Spjd pjdlog_info("Nodes are in sync."); 2028204076Spjd else { 2029219372Spjd pjdlog_info("Synchronization started. %NB to go.", 2030219372Spjd (intmax_t)(res->hr_extentsize * 2031204076Spjd activemap_ndirty(res->hr_amp))); 2032212038Spjd event_send(res, EVENT_SYNCSTART); 2033219372Spjd gettimeofday(&tstart, NULL); 2034204076Spjd } 2035204076Spjd } 2036229945Spjd if (offset == -1) { 2037211878Spjd sync_stop(); 2038204076Spjd pjdlog_debug(1, "Nothing to synchronize."); 2039204076Spjd /* 2040204076Spjd * Synchronization complete, make both localcnt and 2041204076Spjd * remotecnt equal. 2042204076Spjd */ 2043204076Spjd ncomp = 1; 2044204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 2045204076Spjd if (ISCONNECTED(res, ncomp)) { 2046204076Spjd if (synced > 0) { 2047219372Spjd int64_t bps; 2048219372Spjd 2049219372Spjd gettimeofday(&tend, NULL); 2050219372Spjd timersub(&tend, &tstart, &tdiff); 2051219372Spjd bps = (int64_t)((double)synced / 2052219372Spjd ((double)tdiff.tv_sec + 2053219372Spjd (double)tdiff.tv_usec / 1000000)); 2054204076Spjd pjdlog_info("Synchronization complete. " 2055219372Spjd "%NB synchronized in %#.0lT (%NB/sec).", 2056219372Spjd (intmax_t)synced, &tdiff, 2057219372Spjd (intmax_t)bps); 2058212038Spjd event_send(res, EVENT_SYNCDONE); 2059204076Spjd } 2060204076Spjd mtx_lock(&metadata_lock); 2061238120Spjd if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2062238120Spjd directreads = true; 2063204076Spjd res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2064204076Spjd res->hr_primary_localcnt = 2065219882Strociny res->hr_secondary_remotecnt; 2066219882Strociny res->hr_primary_remotecnt = 2067204076Spjd res->hr_secondary_localcnt; 2068204076Spjd pjdlog_debug(1, 2069204076Spjd "Setting localcnt to %ju and remotecnt to %ju.", 2070204076Spjd (uintmax_t)res->hr_primary_localcnt, 2071219882Strociny (uintmax_t)res->hr_primary_remotecnt); 2072204076Spjd (void)metadata_write(res); 2073204076Spjd mtx_unlock(&metadata_lock); 2074204076Spjd } 2075204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 2076238120Spjd if (directreads) { 2077238120Spjd directreads = false; 2078238120Spjd enable_direct_reads(res); 2079238120Spjd } 2080204076Spjd continue; 2081204076Spjd } 2082204076Spjd pjdlog_debug(2, "sync: Taking free request."); 2083204076Spjd QUEUE_TAKE2(hio, free); 2084204076Spjd pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2085204076Spjd /* 2086204076Spjd * Lock the range we are going to synchronize. We don't want 2087204076Spjd * race where someone writes between our read and write. 2088204076Spjd */ 2089204076Spjd for (;;) { 2090204076Spjd mtx_lock(&range_lock); 2091204076Spjd if (rangelock_islocked(range_regular, offset, length)) { 2092204076Spjd pjdlog_debug(2, 2093204076Spjd "sync: Range offset=%jd length=%jd locked.", 2094204076Spjd (intmax_t)offset, (intmax_t)length); 2095204076Spjd range_sync_wait = true; 2096204076Spjd cv_wait(&range_sync_cond, &range_lock); 2097204076Spjd range_sync_wait = false; 2098204076Spjd mtx_unlock(&range_lock); 2099204076Spjd continue; 2100204076Spjd } 2101229945Spjd if (rangelock_add(range_sync, offset, length) == -1) { 2102204076Spjd mtx_unlock(&range_lock); 2103204076Spjd pjdlog_debug(2, 2104204076Spjd "sync: Range offset=%jd length=%jd is already locked, waiting.", 2105204076Spjd (intmax_t)offset, (intmax_t)length); 2106204076Spjd sleep(1); 2107204076Spjd continue; 2108204076Spjd } 2109204076Spjd mtx_unlock(&range_lock); 2110204076Spjd break; 2111204076Spjd } 2112204076Spjd /* 2113204076Spjd * First read the data from synchronization source. 2114204076Spjd */ 2115204076Spjd SYNCREQ(hio); 2116204076Spjd ggio = &hio->hio_ggio; 2117204076Spjd ggio->gctl_cmd = BIO_READ; 2118204076Spjd ggio->gctl_offset = offset; 2119204076Spjd ggio->gctl_length = length; 2120204076Spjd ggio->gctl_error = 0; 2121226859Spjd hio->hio_done = false; 2122226859Spjd hio->hio_replication = res->hr_replication; 2123204076Spjd for (ii = 0; ii < ncomps; ii++) 2124204076Spjd hio->hio_errors[ii] = EINVAL; 2125204076Spjd reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2126204076Spjd hio); 2127204076Spjd pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2128204076Spjd hio); 2129204076Spjd mtx_lock(&metadata_lock); 2130204076Spjd if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2131204076Spjd /* 2132204076Spjd * This range is up-to-date on local component, 2133204076Spjd * so handle request locally. 2134204076Spjd */ 2135204076Spjd /* Local component is 0 for now. */ 2136204076Spjd ncomp = 0; 2137204076Spjd } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2138218138Spjd PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2139204076Spjd /* 2140204076Spjd * This range is out-of-date on local component, 2141204076Spjd * so send request to the remote node. 2142204076Spjd */ 2143204076Spjd /* Remote component is 1 for now. */ 2144204076Spjd ncomp = 1; 2145204076Spjd } 2146204076Spjd mtx_unlock(&metadata_lock); 2147249969Sed refcnt_init(&hio->hio_countdown, 1); 2148204076Spjd QUEUE_INSERT1(hio, send, ncomp); 2149204076Spjd 2150204076Spjd /* 2151204076Spjd * Let's wait for READ to finish. 2152204076Spjd */ 2153204076Spjd mtx_lock(&sync_lock); 2154204076Spjd while (!ISSYNCREQDONE(hio)) 2155204076Spjd cv_wait(&sync_cond, &sync_lock); 2156204076Spjd mtx_unlock(&sync_lock); 2157204076Spjd 2158204076Spjd if (hio->hio_errors[ncomp] != 0) { 2159204076Spjd pjdlog_error("Unable to read synchronization data: %s.", 2160204076Spjd strerror(hio->hio_errors[ncomp])); 2161204076Spjd goto free_queue; 2162204076Spjd } 2163204076Spjd 2164204076Spjd /* 2165204076Spjd * We read the data from synchronization source, now write it 2166204076Spjd * to synchronization target. 2167204076Spjd */ 2168204076Spjd SYNCREQ(hio); 2169204076Spjd ggio->gctl_cmd = BIO_WRITE; 2170204076Spjd for (ii = 0; ii < ncomps; ii++) 2171204076Spjd hio->hio_errors[ii] = EINVAL; 2172204076Spjd reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2173204076Spjd hio); 2174204076Spjd pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2175204076Spjd hio); 2176204076Spjd mtx_lock(&metadata_lock); 2177204076Spjd if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2178204076Spjd /* 2179204076Spjd * This range is up-to-date on local component, 2180204076Spjd * so we update remote component. 2181204076Spjd */ 2182204076Spjd /* Remote component is 1 for now. */ 2183204076Spjd ncomp = 1; 2184204076Spjd } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2185218138Spjd PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2186204076Spjd /* 2187204076Spjd * This range is out-of-date on local component, 2188204076Spjd * so we update it. 2189204076Spjd */ 2190204076Spjd /* Local component is 0 for now. */ 2191204076Spjd ncomp = 0; 2192204076Spjd } 2193204076Spjd mtx_unlock(&metadata_lock); 2194204076Spjd 2195226857Spjd pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2196204076Spjd hio); 2197249969Sed refcnt_init(&hio->hio_countdown, 1); 2198204076Spjd QUEUE_INSERT1(hio, send, ncomp); 2199204076Spjd 2200204076Spjd /* 2201204076Spjd * Let's wait for WRITE to finish. 2202204076Spjd */ 2203204076Spjd mtx_lock(&sync_lock); 2204204076Spjd while (!ISSYNCREQDONE(hio)) 2205204076Spjd cv_wait(&sync_cond, &sync_lock); 2206204076Spjd mtx_unlock(&sync_lock); 2207204076Spjd 2208204076Spjd if (hio->hio_errors[ncomp] != 0) { 2209204076Spjd pjdlog_error("Unable to write synchronization data: %s.", 2210204076Spjd strerror(hio->hio_errors[ncomp])); 2211204076Spjd goto free_queue; 2212204076Spjd } 2213211880Spjd 2214211880Spjd synced += length; 2215204076Spjdfree_queue: 2216204076Spjd mtx_lock(&range_lock); 2217204076Spjd rangelock_del(range_sync, offset, length); 2218204076Spjd if (range_regular_wait) 2219204076Spjd cv_signal(&range_regular_cond); 2220204076Spjd mtx_unlock(&range_lock); 2221204076Spjd pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2222204076Spjd hio); 2223204076Spjd QUEUE_INSERT2(hio, free); 2224204076Spjd } 2225204076Spjd /* NOTREACHED */ 2226204076Spjd return (NULL); 2227204076Spjd} 2228204076Spjd 2229217784Spjdvoid 2230217784Spjdprimary_config_reload(struct hast_resource *res, struct nv *nv) 2231210886Spjd{ 2232210886Spjd unsigned int ii, ncomps; 2233217784Spjd int modified, vint; 2234217784Spjd const char *vstr; 2235210886Spjd 2236210886Spjd pjdlog_info("Reloading configuration..."); 2237210886Spjd 2238218138Spjd PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2239218138Spjd PJDLOG_ASSERT(gres == res); 2240217784Spjd nv_assert(nv, "remoteaddr"); 2241219818Spjd nv_assert(nv, "sourceaddr"); 2242217784Spjd nv_assert(nv, "replication"); 2243219351Spjd nv_assert(nv, "checksum"); 2244219354Spjd nv_assert(nv, "compression"); 2245217784Spjd nv_assert(nv, "timeout"); 2246217784Spjd nv_assert(nv, "exec"); 2247225830Spjd nv_assert(nv, "metaflush"); 2248217784Spjd 2249210886Spjd ncomps = HAST_NCOMPONENTS; 2250210886Spjd 2251219351Spjd#define MODIFIED_REMOTEADDR 0x01 2252219818Spjd#define MODIFIED_SOURCEADDR 0x02 2253219818Spjd#define MODIFIED_REPLICATION 0x04 2254219818Spjd#define MODIFIED_CHECKSUM 0x08 2255219818Spjd#define MODIFIED_COMPRESSION 0x10 2256219818Spjd#define MODIFIED_TIMEOUT 0x20 2257219818Spjd#define MODIFIED_EXEC 0x40 2258225830Spjd#define MODIFIED_METAFLUSH 0x80 2259210886Spjd modified = 0; 2260217784Spjd 2261217784Spjd vstr = nv_get_string(nv, "remoteaddr"); 2262217784Spjd if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2263210886Spjd /* 2264210886Spjd * Don't copy res->hr_remoteaddr to gres just yet. 2265210886Spjd * We want remote_close() to log disconnect from the old 2266210886Spjd * addresses, not from the new ones. 2267210886Spjd */ 2268210886Spjd modified |= MODIFIED_REMOTEADDR; 2269210886Spjd } 2270219818Spjd vstr = nv_get_string(nv, "sourceaddr"); 2271219818Spjd if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2272219818Spjd strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2273219818Spjd modified |= MODIFIED_SOURCEADDR; 2274219818Spjd } 2275217784Spjd vint = nv_get_int32(nv, "replication"); 2276217784Spjd if (gres->hr_replication != vint) { 2277217784Spjd gres->hr_replication = vint; 2278210886Spjd modified |= MODIFIED_REPLICATION; 2279210886Spjd } 2280219351Spjd vint = nv_get_int32(nv, "checksum"); 2281219351Spjd if (gres->hr_checksum != vint) { 2282219351Spjd gres->hr_checksum = vint; 2283219351Spjd modified |= MODIFIED_CHECKSUM; 2284219351Spjd } 2285219354Spjd vint = nv_get_int32(nv, "compression"); 2286219354Spjd if (gres->hr_compression != vint) { 2287219354Spjd gres->hr_compression = vint; 2288219354Spjd modified |= MODIFIED_COMPRESSION; 2289219354Spjd } 2290217784Spjd vint = nv_get_int32(nv, "timeout"); 2291217784Spjd if (gres->hr_timeout != vint) { 2292217784Spjd gres->hr_timeout = vint; 2293210886Spjd modified |= MODIFIED_TIMEOUT; 2294210886Spjd } 2295217784Spjd vstr = nv_get_string(nv, "exec"); 2296217784Spjd if (strcmp(gres->hr_exec, vstr) != 0) { 2297217784Spjd strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2298211886Spjd modified |= MODIFIED_EXEC; 2299211886Spjd } 2300225830Spjd vint = nv_get_int32(nv, "metaflush"); 2301225830Spjd if (gres->hr_metaflush != vint) { 2302225830Spjd gres->hr_metaflush = vint; 2303225830Spjd modified |= MODIFIED_METAFLUSH; 2304225830Spjd } 2305217784Spjd 2306210886Spjd /* 2307219351Spjd * Change timeout for connected sockets. 2308219351Spjd * Don't bother if we need to reconnect. 2309210886Spjd */ 2310219351Spjd if ((modified & MODIFIED_TIMEOUT) != 0 && 2311226859Spjd (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2312210886Spjd for (ii = 0; ii < ncomps; ii++) { 2313210886Spjd if (!ISREMOTE(ii)) 2314210886Spjd continue; 2315210886Spjd rw_rlock(&hio_remote_lock[ii]); 2316210886Spjd if (!ISCONNECTED(gres, ii)) { 2317210886Spjd rw_unlock(&hio_remote_lock[ii]); 2318210886Spjd continue; 2319210886Spjd } 2320210886Spjd rw_unlock(&hio_remote_lock[ii]); 2321210886Spjd if (proto_timeout(gres->hr_remotein, 2322229945Spjd gres->hr_timeout) == -1) { 2323210886Spjd pjdlog_errno(LOG_WARNING, 2324210886Spjd "Unable to set connection timeout"); 2325210886Spjd } 2326210886Spjd if (proto_timeout(gres->hr_remoteout, 2327229945Spjd gres->hr_timeout) == -1) { 2328210886Spjd pjdlog_errno(LOG_WARNING, 2329210886Spjd "Unable to set connection timeout"); 2330210886Spjd } 2331210886Spjd } 2332219351Spjd } 2333226859Spjd if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2334210886Spjd for (ii = 0; ii < ncomps; ii++) { 2335210886Spjd if (!ISREMOTE(ii)) 2336210886Spjd continue; 2337210886Spjd remote_close(gres, ii); 2338210886Spjd } 2339210886Spjd if (modified & MODIFIED_REMOTEADDR) { 2340217784Spjd vstr = nv_get_string(nv, "remoteaddr"); 2341217784Spjd strlcpy(gres->hr_remoteaddr, vstr, 2342210886Spjd sizeof(gres->hr_remoteaddr)); 2343210886Spjd } 2344210886Spjd } 2345210886Spjd#undef MODIFIED_REMOTEADDR 2346219818Spjd#undef MODIFIED_SOURCEADDR 2347210886Spjd#undef MODIFIED_REPLICATION 2348219351Spjd#undef MODIFIED_CHECKSUM 2349219354Spjd#undef MODIFIED_COMPRESSION 2350210886Spjd#undef MODIFIED_TIMEOUT 2351211886Spjd#undef MODIFIED_EXEC 2352225830Spjd#undef MODIFIED_METAFLUSH 2353210886Spjd 2354210886Spjd pjdlog_info("Configuration reloaded successfully."); 2355210886Spjd} 2356210886Spjd 2357211882Spjdstatic void 2358211981Spjdguard_one(struct hast_resource *res, unsigned int ncomp) 2359211981Spjd{ 2360211981Spjd struct proto_conn *in, *out; 2361211981Spjd 2362211981Spjd if (!ISREMOTE(ncomp)) 2363211981Spjd return; 2364211981Spjd 2365211981Spjd rw_rlock(&hio_remote_lock[ncomp]); 2366211981Spjd 2367211981Spjd if (!real_remote(res)) { 2368211981Spjd rw_unlock(&hio_remote_lock[ncomp]); 2369211981Spjd return; 2370211981Spjd } 2371211981Spjd 2372211981Spjd if (ISCONNECTED(res, ncomp)) { 2373218138Spjd PJDLOG_ASSERT(res->hr_remotein != NULL); 2374218138Spjd PJDLOG_ASSERT(res->hr_remoteout != NULL); 2375211981Spjd rw_unlock(&hio_remote_lock[ncomp]); 2376211981Spjd pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2377211981Spjd res->hr_remoteaddr); 2378211981Spjd return; 2379211981Spjd } 2380211981Spjd 2381218138Spjd PJDLOG_ASSERT(res->hr_remotein == NULL); 2382218138Spjd PJDLOG_ASSERT(res->hr_remoteout == NULL); 2383211981Spjd /* 2384211981Spjd * Upgrade the lock. It doesn't have to be atomic as no other thread 2385211981Spjd * can change connection status from disconnected to connected. 2386211981Spjd */ 2387211981Spjd rw_unlock(&hio_remote_lock[ncomp]); 2388211981Spjd pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2389211981Spjd res->hr_remoteaddr); 2390211981Spjd in = out = NULL; 2391220898Spjd if (init_remote(res, &in, &out) == 0) { 2392211981Spjd rw_wlock(&hio_remote_lock[ncomp]); 2393218138Spjd PJDLOG_ASSERT(res->hr_remotein == NULL); 2394218138Spjd PJDLOG_ASSERT(res->hr_remoteout == NULL); 2395218138Spjd PJDLOG_ASSERT(in != NULL && out != NULL); 2396211981Spjd res->hr_remotein = in; 2397211981Spjd res->hr_remoteout = out; 2398211981Spjd rw_unlock(&hio_remote_lock[ncomp]); 2399211981Spjd pjdlog_info("Successfully reconnected to %s.", 2400211981Spjd res->hr_remoteaddr); 2401211981Spjd sync_start(); 2402211981Spjd } else { 2403211981Spjd /* Both connections should be NULL. */ 2404218138Spjd PJDLOG_ASSERT(res->hr_remotein == NULL); 2405218138Spjd PJDLOG_ASSERT(res->hr_remoteout == NULL); 2406218138Spjd PJDLOG_ASSERT(in == NULL && out == NULL); 2407211981Spjd pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2408211981Spjd res->hr_remoteaddr); 2409211981Spjd } 2410211981Spjd} 2411211981Spjd 2412204076Spjd/* 2413204076Spjd * Thread guards remote connections and reconnects when needed, handles 2414204076Spjd * signals, etc. 2415204076Spjd */ 2416204076Spjdstatic void * 2417204076Spjdguard_thread(void *arg) 2418204076Spjd{ 2419204076Spjd struct hast_resource *res = arg; 2420204076Spjd unsigned int ii, ncomps; 2421211982Spjd struct timespec timeout; 2422211981Spjd time_t lastcheck, now; 2423211982Spjd sigset_t mask; 2424211982Spjd int signo; 2425204076Spjd 2426204076Spjd ncomps = HAST_NCOMPONENTS; 2427211981Spjd lastcheck = time(NULL); 2428204076Spjd 2429211982Spjd PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2430211982Spjd PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2431211982Spjd PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2432211982Spjd 2433219721Strociny timeout.tv_sec = HAST_KEEPALIVE; 2434211982Spjd timeout.tv_nsec = 0; 2435211982Spjd signo = -1; 2436211982Spjd 2437204076Spjd for (;;) { 2438211982Spjd switch (signo) { 2439211982Spjd case SIGINT: 2440211982Spjd case SIGTERM: 2441211982Spjd sigexit_received = true; 2442204076Spjd primary_exitx(EX_OK, 2443204076Spjd "Termination signal received, exiting."); 2444211982Spjd break; 2445211982Spjd default: 2446211982Spjd break; 2447204076Spjd } 2448211882Spjd 2449220898Spjd /* 2450220898Spjd * Don't check connections until we fully started, 2451220898Spjd * as we may still be looping, waiting for remote node 2452220898Spjd * to switch from primary to secondary. 2453220898Spjd */ 2454220898Spjd if (fullystarted) { 2455220898Spjd pjdlog_debug(2, "remote_guard: Checking connections."); 2456220898Spjd now = time(NULL); 2457220898Spjd if (lastcheck + HAST_KEEPALIVE <= now) { 2458220898Spjd for (ii = 0; ii < ncomps; ii++) 2459220898Spjd guard_one(res, ii); 2460220898Spjd lastcheck = now; 2461220898Spjd } 2462204076Spjd } 2463211982Spjd signo = sigtimedwait(&mask, NULL, &timeout); 2464204076Spjd } 2465204076Spjd /* NOTREACHED */ 2466204076Spjd return (NULL); 2467204076Spjd} 2468