primary.c revision 255716
1258945Sroberto/*- 2280849Scy * Copyright (c) 2009 The FreeBSD Foundation 3258945Sroberto * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4258945Sroberto * All rights reserved. 5258945Sroberto * 6258945Sroberto * This software was developed by Pawel Jakub Dawidek under sponsorship from 7258945Sroberto * the FreeBSD Foundation. 8258945Sroberto * 9258945Sroberto * Redistribution and use in source and binary forms, with or without 10258945Sroberto * modification, are permitted provided that the following conditions 11258945Sroberto * are met: 12258945Sroberto * 1. Redistributions of source code must retain the above copyright 13258945Sroberto * notice, this list of conditions and the following disclaimer. 14258945Sroberto * 2. Redistributions in binary form must reproduce the above copyright 15258945Sroberto * notice, this list of conditions and the following disclaimer in the 16258945Sroberto * documentation and/or other materials provided with the distribution. 17258945Sroberto * 18280849Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19258945Sroberto * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20258945Sroberto * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21258945Sroberto * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22258945Sroberto * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23258945Sroberto * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24258945Sroberto * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25258945Sroberto * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26258945Sroberto * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27258945Sroberto * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28258945Sroberto * SUCH DAMAGE. 29258945Sroberto */ 30258945Sroberto 31258945Sroberto#include <sys/cdefs.h> 32258945Sroberto__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 255716 2013-09-19 20:19:08Z trociny $"); 33258945Sroberto 34258945Sroberto#include <sys/types.h> 35258945Sroberto#include <sys/time.h> 36258945Sroberto#include <sys/bio.h> 37258945Sroberto#include <sys/disk.h> 38258945Sroberto#include <sys/stat.h> 39258945Sroberto 40258945Sroberto#include <geom/gate/g_gate.h> 41258945Sroberto 42258945Sroberto#include <err.h> 43258945Sroberto#include <errno.h> 44258945Sroberto#include <fcntl.h> 45258945Sroberto#include <libgeom.h> 46258945Sroberto#include <pthread.h> 47258945Sroberto#include <signal.h> 48258945Sroberto#include <stdint.h> 49258945Sroberto#include <stdio.h> 50258945Sroberto#include <string.h> 51258945Sroberto#include <sysexits.h> 52258945Sroberto#include <unistd.h> 53258945Sroberto 54258945Sroberto#include <activemap.h> 55258945Sroberto#include <nv.h> 56258945Sroberto#include <rangelock.h> 57258945Sroberto 58258945Sroberto#include "control.h" 59258945Sroberto#include "event.h" 60258945Sroberto#include "hast.h" 61258945Sroberto#include "hast_proto.h" 62258945Sroberto#include "hastd.h" 63258945Sroberto#include "hooks.h" 64258945Sroberto#include "metadata.h" 65258945Sroberto#include "proto.h" 66258945Sroberto#include "pjdlog.h" 67258945Sroberto#include "refcnt.h" 68258945Sroberto#include "subr.h" 69258945Sroberto#include "synch.h" 70280849Scy 71258945Sroberto/* The is only one remote component for now. */ 72280849Scy#define ISREMOTE(no) ((no) == 1) 73280849Scy 74258945Srobertostruct hio { 75280849Scy /* 76258945Sroberto * Number of components we are still waiting for. 77258945Sroberto * When this field goes to 0, we can send the request back to the 78258945Sroberto * kernel. Each component has to decrease this counter by one 79280849Scy * even on failure. 80280849Scy */ 81280849Scy refcnt_t hio_countdown; 82280849Scy /* 83280849Scy * Each component has a place to store its own error. 84280849Scy * Once the request is handled by all components we can decide if the 85280849Scy * request overall is successful or not. 86280849Scy */ 87280849Scy int *hio_errors; 88280849Scy /* 89258945Sroberto * Structure used to communicate with GEOM Gate class. 90280849Scy */ 91280849Scy struct g_gate_ctl_io hio_ggio; 92258945Sroberto /* 93258945Sroberto * Request was already confirmed to GEOM Gate. 94258945Sroberto */ 95258945Sroberto bool hio_done; 96258945Sroberto /* 97258945Sroberto * Remember replication from the time the request was initiated, 98258945Sroberto * so we won't get confused when replication changes on reload. 99258945Sroberto */ 100258945Sroberto int hio_replication; 101258945Sroberto TAILQ_ENTRY(hio) *hio_next; 102258945Sroberto}; 103258945Sroberto#define hio_free_next hio_next[0] 104258945Sroberto#define hio_done_next hio_next[0] 105258945Sroberto 106258945Sroberto/* 107258945Sroberto * Free list holds unused structures. When free list is empty, we have to wait 108258945Sroberto * until some in-progress requests are freed. 109258945Sroberto */ 110258945Srobertostatic TAILQ_HEAD(, hio) hio_free_list; 111258945Srobertostatic pthread_mutex_t hio_free_list_lock; 112258945Srobertostatic pthread_cond_t hio_free_list_cond; 113258945Sroberto/* 114280849Scy * There is one send list for every component. One requests is placed on all 115258945Sroberto * send lists - each component gets the same request, but each component is 116258945Sroberto * responsible for managing his own send list. 117258945Sroberto */ 118258945Srobertostatic TAILQ_HEAD(, hio) *hio_send_list; 119258945Srobertostatic pthread_mutex_t *hio_send_list_lock; 120258945Srobertostatic pthread_cond_t *hio_send_list_cond; 121258945Sroberto/* 122258945Sroberto * There is one recv list for every component, although local components don't 123258945Sroberto * use recv lists as local requests are done synchronously. 124258945Sroberto */ 125258945Srobertostatic TAILQ_HEAD(, hio) *hio_recv_list; 126258945Srobertostatic pthread_mutex_t *hio_recv_list_lock; 127280849Scystatic pthread_cond_t *hio_recv_list_cond; 128258945Sroberto/* 129258945Sroberto * Request is placed on done list by the slowest component (the one that 130258945Sroberto * decreased hio_countdown from 1 to 0). 131258945Sroberto */ 132258945Srobertostatic TAILQ_HEAD(, hio) hio_done_list; 133258945Srobertostatic pthread_mutex_t hio_done_list_lock; 134258945Srobertostatic pthread_cond_t hio_done_list_cond; 135258945Sroberto/* 136258945Sroberto * Structure below are for interaction with sync thread. 137258945Sroberto */ 138258945Srobertostatic bool sync_inprogress; 139258945Srobertostatic pthread_mutex_t sync_lock; 140258945Srobertostatic pthread_cond_t sync_cond; 141258945Sroberto/* 142258945Sroberto * The lock below allows to synchornize access to remote connections. 143258945Sroberto */ 144258945Srobertostatic pthread_rwlock_t *hio_remote_lock; 145258945Sroberto 146258945Sroberto/* 147258945Sroberto * Lock to synchronize metadata updates. Also synchronize access to 148258945Sroberto * hr_primary_localcnt and hr_primary_remotecnt fields. 149258945Sroberto */ 150258945Srobertostatic pthread_mutex_t metadata_lock; 151258945Sroberto 152258945Sroberto/* 153258945Sroberto * Maximum number of outstanding I/O requests. 154258945Sroberto */ 155258945Sroberto#define HAST_HIO_MAX 256 156258945Sroberto/* 157258945Sroberto * Number of components. At this point there are only two components: local 158258945Sroberto * and remote, but in the future it might be possible to use multiple local 159258945Sroberto * and remote components. 160258945Sroberto */ 161258945Sroberto#define HAST_NCOMPONENTS 2 162258945Sroberto 163258945Sroberto#define ISCONNECTED(res, no) \ 164258945Sroberto ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165258945Sroberto 166258945Sroberto#define QUEUE_INSERT1(hio, name, ncomp) do { \ 167258945Sroberto bool _wakeup; \ 168258945Sroberto \ 169258945Sroberto mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170258945Sroberto _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171258945Sroberto TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172258945Sroberto hio_next[(ncomp)]); \ 173258945Sroberto mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174258945Sroberto if (_wakeup) \ 175258945Sroberto cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ 176258945Sroberto} while (0) 177258945Sroberto#define QUEUE_INSERT2(hio, name) do { \ 178258945Sroberto bool _wakeup; \ 179258945Sroberto \ 180258945Sroberto mtx_lock(&hio_##name##_list_lock); \ 181258945Sroberto _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182258945Sroberto TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183258945Sroberto mtx_unlock(&hio_##name##_list_lock); \ 184258945Sroberto if (_wakeup) \ 185258945Sroberto cv_broadcast(&hio_##name##_list_cond); \ 186258945Sroberto} while (0) 187258945Sroberto#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188258945Sroberto bool _last; \ 189258945Sroberto \ 190258945Sroberto mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191258945Sroberto _last = false; \ 192258945Sroberto while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193258945Sroberto cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194258945Sroberto &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195258945Sroberto if ((timeout) != 0) \ 196258945Sroberto _last = true; \ 197258945Sroberto } \ 198258945Sroberto if (hio != NULL) { \ 199258945Sroberto TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200258945Sroberto hio_next[(ncomp)]); \ 201258945Sroberto } \ 202258945Sroberto mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203258945Sroberto} while (0) 204258945Sroberto#define QUEUE_TAKE2(hio, name) do { \ 205258945Sroberto mtx_lock(&hio_##name##_list_lock); \ 206258945Sroberto while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207258945Sroberto cv_wait(&hio_##name##_list_cond, \ 208258945Sroberto &hio_##name##_list_lock); \ 209258945Sroberto } \ 210258945Sroberto TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211258945Sroberto mtx_unlock(&hio_##name##_list_lock); \ 212258945Sroberto} while (0) 213258945Sroberto 214258945Sroberto#define SYNCREQ(hio) do { \ 215258945Sroberto (hio)->hio_ggio.gctl_unit = -1; \ 216258945Sroberto (hio)->hio_ggio.gctl_seq = 1; \ 217258945Sroberto} while (0) 218258945Sroberto#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219258945Sroberto#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220258945Sroberto#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221258945Sroberto 222258945Srobertostatic struct hast_resource *gres; 223258945Sroberto 224258945Srobertostatic pthread_mutex_t range_lock; 225258945Srobertostatic struct rangelocks *range_regular; 226258945Srobertostatic bool range_regular_wait; 227258945Srobertostatic pthread_cond_t range_regular_cond; 228258945Srobertostatic struct rangelocks *range_sync; 229258945Srobertostatic bool range_sync_wait; 230258945Srobertostatic pthread_cond_t range_sync_cond; 231258945Srobertostatic bool fullystarted; 232258945Sroberto 233258945Srobertostatic void *ggate_recv_thread(void *arg); 234258945Srobertostatic void *local_send_thread(void *arg); 235258945Srobertostatic void *remote_send_thread(void *arg); 236258945Srobertostatic void *remote_recv_thread(void *arg); 237258945Srobertostatic void *ggate_send_thread(void *arg); 238258945Srobertostatic void *sync_thread(void *arg); 239258945Srobertostatic void *guard_thread(void *arg); 240258945Sroberto 241258945Srobertostatic void 242258945Srobertocleanup(struct hast_resource *res) 243258945Sroberto{ 244258945Sroberto int rerrno; 245258945Sroberto 246258945Sroberto /* Remember errno. */ 247258945Sroberto rerrno = errno; 248258945Sroberto 249258945Sroberto /* Destroy ggate provider if we created one. */ 250258945Sroberto if (res->hr_ggateunit >= 0) { 251258945Sroberto struct g_gate_ctl_destroy ggiod; 252258945Sroberto 253258945Sroberto bzero(&ggiod, sizeof(ggiod)); 254258945Sroberto ggiod.gctl_version = G_GATE_VERSION; 255258945Sroberto ggiod.gctl_unit = res->hr_ggateunit; 256258945Sroberto ggiod.gctl_force = 1; 257258945Sroberto if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258258945Sroberto pjdlog_errno(LOG_WARNING, 259258945Sroberto "Unable to destroy hast/%s device", 260258945Sroberto res->hr_provname); 261280849Scy } 262258945Sroberto res->hr_ggateunit = -1; 263258945Sroberto } 264258945Sroberto 265258945Sroberto /* Restore errno. */ 266258945Sroberto errno = rerrno; 267258945Sroberto} 268258945Sroberto 269258945Srobertostatic __dead2 void 270258945Srobertoprimary_exit(int exitcode, const char *fmt, ...) 271258945Sroberto{ 272258945Sroberto va_list ap; 273258945Sroberto 274258945Sroberto PJDLOG_ASSERT(exitcode != EX_OK); 275258945Sroberto va_start(ap, fmt); 276258945Sroberto pjdlogv_errno(LOG_ERR, fmt, ap); 277258945Sroberto va_end(ap); 278258945Sroberto cleanup(gres); 279258945Sroberto exit(exitcode); 280258945Sroberto} 281258945Sroberto 282258945Srobertostatic __dead2 void 283258945Srobertoprimary_exitx(int exitcode, const char *fmt, ...) 284258945Sroberto{ 285258945Sroberto va_list ap; 286258945Sroberto 287258945Sroberto va_start(ap, fmt); 288258945Sroberto pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289258945Sroberto va_end(ap); 290258945Sroberto cleanup(gres); 291258945Sroberto exit(exitcode); 292258945Sroberto} 293258945Sroberto 294258945Sroberto/* Expects res->hr_amp locked, returns unlocked. */ 295280849Scystatic int 296280849Scyhast_activemap_flush(struct hast_resource *res) 297280849Scy{ 298280849Scy const unsigned char *buf; 299280849Scy size_t size; 300280849Scy int ret; 301258945Sroberto 302280849Scy mtx_lock(&res->hr_amp_diskmap_lock); 303280849Scy buf = activemap_bitmap(res->hr_amp, &size); 304258945Sroberto mtx_unlock(&res->hr_amp_lock); 305258945Sroberto PJDLOG_ASSERT(buf != NULL); 306258945Sroberto PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 307258945Sroberto ret = 0; 308258945Sroberto if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 309280849Scy (ssize_t)size) { 310258945Sroberto pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 311258945Sroberto res->hr_stat_activemap_write_error++; 312258945Sroberto ret = -1; 313258945Sroberto } 314258945Sroberto if (ret == 0 && res->hr_metaflush == 1 && 315258945Sroberto g_flush(res->hr_localfd) == -1) { 316258945Sroberto if (errno == EOPNOTSUPP) { 317258945Sroberto pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 318258945Sroberto res->hr_localpath); 319258945Sroberto res->hr_metaflush = 0; 320258945Sroberto } else { 321258945Sroberto pjdlog_errno(LOG_ERR, 322258945Sroberto "Unable to flush disk cache on activemap update"); 323258945Sroberto res->hr_stat_activemap_flush_error++; 324258945Sroberto ret = -1; 325258945Sroberto } 326258945Sroberto } 327258945Sroberto mtx_unlock(&res->hr_amp_diskmap_lock); 328258945Sroberto return (ret); 329258945Sroberto} 330258945Sroberto 331258945Srobertostatic bool 332258945Srobertoreal_remote(const struct hast_resource *res) 333258945Sroberto{ 334258945Sroberto 335258945Sroberto return (strcmp(res->hr_remoteaddr, "none") != 0); 336258945Sroberto} 337280849Scy 338280849Scystatic void 339258945Srobertoinit_environment(struct hast_resource *res __unused) 340258945Sroberto{ 341258945Sroberto struct hio *hio; 342258945Sroberto unsigned int ii, ncomps; 343258945Sroberto 344258945Sroberto /* 345258945Sroberto * In the future it might be per-resource value. 346258945Sroberto */ 347258945Sroberto ncomps = HAST_NCOMPONENTS; 348258945Sroberto 349258945Sroberto /* 350258945Sroberto * Allocate memory needed by lists. 351258945Sroberto */ 352258945Sroberto hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 353258945Sroberto if (hio_send_list == NULL) { 354258945Sroberto primary_exitx(EX_TEMPFAIL, 355258945Sroberto "Unable to allocate %zu bytes of memory for send lists.", 356258945Sroberto sizeof(hio_send_list[0]) * ncomps); 357258945Sroberto } 358280849Scy hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 359258945Sroberto if (hio_send_list_lock == NULL) { 360280849Scy primary_exitx(EX_TEMPFAIL, 361258945Sroberto "Unable to allocate %zu bytes of memory for send list locks.", 362258945Sroberto sizeof(hio_send_list_lock[0]) * ncomps); 363258945Sroberto } 364258945Sroberto hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 365258945Sroberto if (hio_send_list_cond == NULL) { 366258945Sroberto primary_exitx(EX_TEMPFAIL, 367258945Sroberto "Unable to allocate %zu bytes of memory for send list condition variables.", 368258945Sroberto sizeof(hio_send_list_cond[0]) * ncomps); 369258945Sroberto } 370258945Sroberto hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 371258945Sroberto if (hio_recv_list == NULL) { 372258945Sroberto primary_exitx(EX_TEMPFAIL, 373258945Sroberto "Unable to allocate %zu bytes of memory for recv lists.", 374258945Sroberto sizeof(hio_recv_list[0]) * ncomps); 375258945Sroberto } 376258945Sroberto hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 377258945Sroberto if (hio_recv_list_lock == NULL) { 378258945Sroberto primary_exitx(EX_TEMPFAIL, 379258945Sroberto "Unable to allocate %zu bytes of memory for recv list locks.", 380258945Sroberto sizeof(hio_recv_list_lock[0]) * ncomps); 381258945Sroberto } 382258945Sroberto hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 383258945Sroberto if (hio_recv_list_cond == NULL) { 384258945Sroberto primary_exitx(EX_TEMPFAIL, 385258945Sroberto "Unable to allocate %zu bytes of memory for recv list condition variables.", 386258945Sroberto sizeof(hio_recv_list_cond[0]) * ncomps); 387258945Sroberto } 388258945Sroberto hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 389280849Scy if (hio_remote_lock == NULL) { 390258945Sroberto primary_exitx(EX_TEMPFAIL, 391258945Sroberto "Unable to allocate %zu bytes of memory for remote connections locks.", 392258945Sroberto sizeof(hio_remote_lock[0]) * ncomps); 393258945Sroberto } 394258945Sroberto 395258945Sroberto /* 396280849Scy * Initialize lists, their locks and theirs condition variables. 397258945Sroberto */ 398258945Sroberto TAILQ_INIT(&hio_free_list); 399258945Sroberto mtx_init(&hio_free_list_lock); 400258945Sroberto cv_init(&hio_free_list_cond); 401258945Sroberto for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 402258945Sroberto TAILQ_INIT(&hio_send_list[ii]); 403258945Sroberto mtx_init(&hio_send_list_lock[ii]); 404258945Sroberto cv_init(&hio_send_list_cond[ii]); 405280849Scy TAILQ_INIT(&hio_recv_list[ii]); 406258945Sroberto mtx_init(&hio_recv_list_lock[ii]); 407258945Sroberto cv_init(&hio_recv_list_cond[ii]); 408280849Scy rw_init(&hio_remote_lock[ii]); 409258945Sroberto } 410280849Scy TAILQ_INIT(&hio_done_list); 411280849Scy mtx_init(&hio_done_list_lock); 412258945Sroberto cv_init(&hio_done_list_cond); 413258945Sroberto mtx_init(&metadata_lock); 414280849Scy 415280849Scy /* 416280849Scy * Allocate requests pool and initialize requests. 417258945Sroberto */ 418258945Sroberto for (ii = 0; ii < HAST_HIO_MAX; ii++) { 419258945Sroberto hio = malloc(sizeof(*hio)); 420258945Sroberto if (hio == NULL) { 421258945Sroberto primary_exitx(EX_TEMPFAIL, 422258945Sroberto "Unable to allocate %zu bytes of memory for hio request.", 423258945Sroberto sizeof(*hio)); 424258945Sroberto } 425258945Sroberto refcnt_init(&hio->hio_countdown, 0); 426258945Sroberto hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 427258945Sroberto if (hio->hio_errors == NULL) { 428258945Sroberto primary_exitx(EX_TEMPFAIL, 429258945Sroberto "Unable allocate %zu bytes of memory for hio errors.", 430258945Sroberto sizeof(hio->hio_errors[0]) * ncomps); 431258945Sroberto } 432280849Scy hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 433280849Scy if (hio->hio_next == NULL) { 434280849Scy primary_exitx(EX_TEMPFAIL, 435280849Scy "Unable allocate %zu bytes of memory for hio_next field.", 436280849Scy sizeof(hio->hio_next[0]) * ncomps); 437280849Scy } 438280849Scy hio->hio_ggio.gctl_version = G_GATE_VERSION; 439280849Scy hio->hio_ggio.gctl_data = malloc(MAXPHYS); 440280849Scy if (hio->hio_ggio.gctl_data == NULL) { 441280849Scy primary_exitx(EX_TEMPFAIL, 442258945Sroberto "Unable to allocate %zu bytes of memory for gctl_data.", 443258945Sroberto MAXPHYS); 444258945Sroberto } 445258945Sroberto hio->hio_ggio.gctl_length = MAXPHYS; 446258945Sroberto hio->hio_ggio.gctl_error = 0; 447258945Sroberto TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 448280849Scy } 449280849Scy} 450258945Sroberto 451280849Scystatic bool 452258945Srobertoinit_resuid(struct hast_resource *res) 453280849Scy{ 454280849Scy 455258945Sroberto mtx_lock(&metadata_lock); 456258945Sroberto if (res->hr_resuid != 0) { 457280849Scy mtx_unlock(&metadata_lock); 458280849Scy return (false); 459280849Scy } else { 460280849Scy /* Initialize unique resource identifier. */ 461280849Scy arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 462280849Scy mtx_unlock(&metadata_lock); 463280849Scy if (metadata_write(res) == -1) 464280849Scy exit(EX_NOINPUT); 465280849Scy return (true); 466280849Scy } 467280849Scy} 468280849Scy 469280849Scystatic void 470280849Scyinit_local(struct hast_resource *res) 471280849Scy{ 472280849Scy unsigned char *buf; 473280849Scy size_t mapsize; 474280849Scy 475280849Scy if (metadata_read(res, true) == -1) 476280849Scy exit(EX_NOINPUT); 477280849Scy mtx_init(&res->hr_amp_lock); 478280849Scy if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 479280849Scy res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 480280849Scy primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 481280849Scy } 482280849Scy mtx_init(&range_lock); 483280849Scy cv_init(&range_regular_cond); 484280849Scy if (rangelock_init(&range_regular) == -1) 485280849Scy primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 486280849Scy cv_init(&range_sync_cond); 487280849Scy if (rangelock_init(&range_sync) == -1) 488280849Scy primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 489280849Scy mapsize = activemap_ondisk_size(res->hr_amp); 490280849Scy buf = calloc(1, mapsize); 491280849Scy if (buf == NULL) { 492280849Scy primary_exitx(EX_TEMPFAIL, 493280849Scy "Unable to allocate buffer for activemap."); 494280849Scy } 495280849Scy if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 496280849Scy (ssize_t)mapsize) { 497280849Scy primary_exit(EX_NOINPUT, "Unable to read activemap"); 498280849Scy } 499280849Scy activemap_copyin(res->hr_amp, buf, mapsize); 500280849Scy free(buf); 501280849Scy if (res->hr_resuid != 0) 502280849Scy return; 503280849Scy /* 504280849Scy * We're using provider for the first time. Initialize local and remote 505280849Scy * counters. We don't initialize resuid here, as we want to do it just 506280849Scy * in time. The reason for this is that we want to inform secondary 507280849Scy * that there were no writes yet, so there is no need to synchronize 508280849Scy * anything. 509280849Scy */ 510280849Scy res->hr_primary_localcnt = 0; 511280849Scy res->hr_primary_remotecnt = 0; 512280849Scy if (metadata_write(res) == -1) 513280849Scy exit(EX_NOINPUT); 514280849Scy} 515280849Scy 516280849Scystatic int 517280849Scyprimary_connect(struct hast_resource *res, struct proto_conn **connp) 518280849Scy{ 519280849Scy struct proto_conn *conn; 520280849Scy int16_t val; 521280849Scy 522280849Scy val = 1; 523280849Scy if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 524280849Scy primary_exit(EX_TEMPFAIL, 525280849Scy "Unable to send connection request to parent"); 526280849Scy } 527280849Scy if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 528280849Scy primary_exit(EX_TEMPFAIL, 529280849Scy "Unable to receive reply to connection request from parent"); 530280849Scy } 531280849Scy if (val != 0) { 532280849Scy errno = val; 533280849Scy pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534280849Scy res->hr_remoteaddr); 535280849Scy return (-1); 536280849Scy } 537280849Scy if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 538280849Scy primary_exit(EX_TEMPFAIL, 539280849Scy "Unable to receive connection from parent"); 540280849Scy } 541280849Scy if (proto_connect_wait(conn, res->hr_timeout) == -1) { 542280849Scy pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 543280849Scy res->hr_remoteaddr); 544280849Scy proto_close(conn); 545280849Scy return (-1); 546280849Scy } 547280849Scy /* Error in setting timeout is not critical, but why should it fail? */ 548280849Scy if (proto_timeout(conn, res->hr_timeout) == -1) 549280849Scy pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 550280849Scy 551280849Scy *connp = conn; 552280849Scy 553280849Scy return (0); 554280849Scy} 555280849Scy 556280849Scy/* 557280849Scy * Function instructs GEOM_GATE to handle reads directly from within the kernel. 558280849Scy */ 559280849Scystatic void 560280849Scyenable_direct_reads(struct hast_resource *res) 561280849Scy{ 562280849Scy struct g_gate_ctl_modify ggiomodify; 563280849Scy 564280849Scy bzero(&ggiomodify, sizeof(ggiomodify)); 565280849Scy ggiomodify.gctl_version = G_GATE_VERSION; 566280849Scy ggiomodify.gctl_unit = res->hr_ggateunit; 567280849Scy ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 568280849Scy strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 569280849Scy sizeof(ggiomodify.gctl_readprov)); 570280849Scy ggiomodify.gctl_readoffset = res->hr_localoff; 571280849Scy if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 572280849Scy pjdlog_debug(1, "Direct reads enabled."); 573280849Scy else 574280849Scy pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 575280849Scy} 576280849Scy 577280849Scystatic int 578280849Scyinit_remote(struct hast_resource *res, struct proto_conn **inp, 579280849Scy struct proto_conn **outp) 580280849Scy{ 581280849Scy struct proto_conn *in, *out; 582280849Scy struct nv *nvout, *nvin; 583280849Scy const unsigned char *token; 584280849Scy unsigned char *map; 585280849Scy const char *errmsg; 586280849Scy int32_t extentsize; 587280849Scy int64_t datasize; 588280849Scy uint32_t mapsize; 589280849Scy uint8_t version; 590280849Scy size_t size; 591280849Scy int error; 592280849Scy 593280849Scy PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 594280849Scy PJDLOG_ASSERT(real_remote(res)); 595280849Scy 596280849Scy in = out = NULL; 597280849Scy errmsg = NULL; 598280849Scy 599280849Scy if (primary_connect(res, &out) == -1) 600280849Scy return (ECONNREFUSED); 601280849Scy 602258945Sroberto error = ECONNABORTED; 603258945Sroberto 604258945Sroberto /* 605258945Sroberto * First handshake step. 606258945Sroberto * Setup outgoing connection with remote node. 607258945Sroberto */ 608258945Sroberto nvout = nv_alloc(); 609258945Sroberto nv_add_string(nvout, res->hr_name, "resource"); 610258945Sroberto nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 611258945Sroberto if (nv_error(nvout) != 0) { 612258945Sroberto pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 613258945Sroberto "Unable to allocate header for connection with %s", 614258945Sroberto res->hr_remoteaddr); 615258945Sroberto nv_free(nvout); 616258945Sroberto goto close; 617258945Sroberto } 618258945Sroberto if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 619258945Sroberto pjdlog_errno(LOG_WARNING, 620258945Sroberto "Unable to send handshake header to %s", 621258945Sroberto res->hr_remoteaddr); 622258945Sroberto nv_free(nvout); 623258945Sroberto goto close; 624258945Sroberto } 625258945Sroberto nv_free(nvout); 626258945Sroberto if (hast_proto_recv_hdr(out, &nvin) == -1) { 627258945Sroberto pjdlog_errno(LOG_WARNING, 628258945Sroberto "Unable to receive handshake header from %s", 629258945Sroberto res->hr_remoteaddr); 630258945Sroberto goto close; 631258945Sroberto } 632258945Sroberto errmsg = nv_get_string(nvin, "errmsg"); 633258945Sroberto if (errmsg != NULL) { 634258945Sroberto pjdlog_warning("%s", errmsg); 635258945Sroberto if (nv_exists(nvin, "wait")) 636258945Sroberto error = EBUSY; 637258945Sroberto nv_free(nvin); 638258945Sroberto goto close; 639258945Sroberto } 640258945Sroberto version = nv_get_uint8(nvin, "version"); 641258945Sroberto if (version == 0) { 642258945Sroberto /* 643258945Sroberto * If no version is sent, it means this is protocol version 1. 644258945Sroberto */ 645258945Sroberto version = 1; 646258945Sroberto } 647258945Sroberto if (version > HAST_PROTO_VERSION) { 648258945Sroberto pjdlog_warning("Invalid version received (%hhu).", version); 649258945Sroberto nv_free(nvin); 650258945Sroberto goto close; 651258945Sroberto } 652258945Sroberto res->hr_version = version; 653258945Sroberto pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 654258945Sroberto token = nv_get_uint8_array(nvin, &size, "token"); 655258945Sroberto if (token == NULL) { 656258945Sroberto pjdlog_warning("Handshake header from %s has no 'token' field.", 657258945Sroberto res->hr_remoteaddr); 658258945Sroberto nv_free(nvin); 659258945Sroberto goto close; 660258945Sroberto } 661258945Sroberto if (size != sizeof(res->hr_token)) { 662258945Sroberto pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 663258945Sroberto res->hr_remoteaddr, size, sizeof(res->hr_token)); 664258945Sroberto nv_free(nvin); 665258945Sroberto goto close; 666258945Sroberto } 667258945Sroberto bcopy(token, res->hr_token, sizeof(res->hr_token)); 668258945Sroberto nv_free(nvin); 669258945Sroberto 670258945Sroberto /* 671258945Sroberto * Second handshake step. 672258945Sroberto * Setup incoming connection with remote node. 673258945Sroberto */ 674258945Sroberto if (primary_connect(res, &in) == -1) 675258945Sroberto goto close; 676258945Sroberto 677258945Sroberto nvout = nv_alloc(); 678258945Sroberto nv_add_string(nvout, res->hr_name, "resource"); 679258945Sroberto nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 680258945Sroberto "token"); 681258945Sroberto if (res->hr_resuid == 0) { 682258945Sroberto /* 683258945Sroberto * The resuid field was not yet initialized. 684258945Sroberto * Because we do synchronization inside init_resuid(), it is 685258945Sroberto * possible that someone already initialized it, the function 686258945Sroberto * will return false then, but if we successfully initialized 687258945Sroberto * it, we will get true. True means that there were no writes 688258945Sroberto * to this resource yet and we want to inform secondary that 689258945Sroberto * synchronization is not needed by sending "virgin" argument. 690258945Sroberto */ 691258945Sroberto if (init_resuid(res)) 692258945Sroberto nv_add_int8(nvout, 1, "virgin"); 693258945Sroberto } 694258945Sroberto nv_add_uint64(nvout, res->hr_resuid, "resuid"); 695258945Sroberto nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 696258945Sroberto nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 697258945Sroberto if (nv_error(nvout) != 0) { 698258945Sroberto pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 699258945Sroberto "Unable to allocate header for connection with %s", 700280849Scy res->hr_remoteaddr); 701280849Scy nv_free(nvout); 702258945Sroberto goto close; 703280849Scy } 704258945Sroberto if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 705258945Sroberto pjdlog_errno(LOG_WARNING, 706258945Sroberto "Unable to send handshake header to %s", 707280849Scy res->hr_remoteaddr); 708258945Sroberto nv_free(nvout); 709258945Sroberto goto close; 710258945Sroberto } 711258945Sroberto nv_free(nvout); 712258945Sroberto if (hast_proto_recv_hdr(out, &nvin) == -1) { 713258945Sroberto pjdlog_errno(LOG_WARNING, 714258945Sroberto "Unable to receive handshake header from %s", 715258945Sroberto res->hr_remoteaddr); 716258945Sroberto goto close; 717258945Sroberto } 718258945Sroberto errmsg = nv_get_string(nvin, "errmsg"); 719258945Sroberto if (errmsg != NULL) { 720258945Sroberto pjdlog_warning("%s", errmsg); 721258945Sroberto nv_free(nvin); 722258945Sroberto goto close; 723258945Sroberto } 724280849Scy datasize = nv_get_int64(nvin, "datasize"); 725258945Sroberto if (datasize != res->hr_datasize) { 726258945Sroberto pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 727280849Scy (intmax_t)res->hr_datasize, (intmax_t)datasize); 728258945Sroberto nv_free(nvin); 729258945Sroberto goto close; 730258945Sroberto } 731258945Sroberto extentsize = nv_get_int32(nvin, "extentsize"); 732280849Scy if (extentsize != res->hr_extentsize) { 733258945Sroberto pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 734258945Sroberto (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 735258945Sroberto nv_free(nvin); 736258945Sroberto goto close; 737258945Sroberto } 738258945Sroberto res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 739258945Sroberto res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 740258945Sroberto res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 741258945Sroberto if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 742258945Sroberto enable_direct_reads(res); 743258945Sroberto if (nv_exists(nvin, "virgin")) { 744258945Sroberto /* 745258945Sroberto * Secondary was reinitialized, bump localcnt if it is 0 as 746258945Sroberto * only we have the data. 747258945Sroberto */ 748258945Sroberto PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 749258945Sroberto PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 750258945Sroberto 751258945Sroberto if (res->hr_primary_localcnt == 0) { 752258945Sroberto PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 753258945Sroberto 754258945Sroberto mtx_lock(&metadata_lock); 755258945Sroberto res->hr_primary_localcnt++; 756258945Sroberto pjdlog_debug(1, "Increasing localcnt to %ju.", 757258945Sroberto (uintmax_t)res->hr_primary_localcnt); 758258945Sroberto (void)metadata_write(res); 759258945Sroberto mtx_unlock(&metadata_lock); 760258945Sroberto } 761258945Sroberto } 762258945Sroberto map = NULL; 763258945Sroberto mapsize = nv_get_uint32(nvin, "mapsize"); 764258945Sroberto if (mapsize > 0) { 765258945Sroberto map = malloc(mapsize); 766258945Sroberto if (map == NULL) { 767280849Scy pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 768258945Sroberto (uintmax_t)mapsize); 769258945Sroberto nv_free(nvin); 770258945Sroberto goto close; 771258945Sroberto } 772258945Sroberto /* 773258945Sroberto * Remote node have some dirty extents on its own, lets 774258945Sroberto * download its activemap. 775258945Sroberto */ 776258945Sroberto if (hast_proto_recv_data(res, out, nvin, map, 777258945Sroberto mapsize) == -1) { 778280849Scy pjdlog_errno(LOG_ERR, 779258945Sroberto "Unable to receive remote activemap"); 780258945Sroberto nv_free(nvin); 781258945Sroberto free(map); 782258945Sroberto goto close; 783258945Sroberto } 784258945Sroberto /* 785258945Sroberto * Merge local and remote bitmaps. 786258945Sroberto */ 787258945Sroberto activemap_merge(res->hr_amp, map, mapsize); 788258945Sroberto free(map); 789258945Sroberto /* 790258945Sroberto * Now that we merged bitmaps from both nodes, flush it to the 791258945Sroberto * disk before we start to synchronize. 792258945Sroberto */ 793258945Sroberto mtx_lock(&res->hr_amp_lock); 794258945Sroberto (void)hast_activemap_flush(res); 795258945Sroberto } 796258945Sroberto nv_free(nvin); 797258945Sroberto#ifdef notyet 798258945Sroberto /* Setup directions. */ 799258945Sroberto if (proto_send(out, NULL, 0) == -1) 800258945Sroberto pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 801258945Sroberto if (proto_recv(in, NULL, 0) == -1) 802258945Sroberto pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 803258945Sroberto#endif 804258945Sroberto pjdlog_info("Connected to %s.", res->hr_remoteaddr); 805280849Scy if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 806258945Sroberto res->hr_version < 2) { 807258945Sroberto pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 808258945Sroberto res->hr_replication = HAST_REPLICATION_FULLSYNC; 809258945Sroberto } else if (res->hr_replication != res->hr_original_replication) { 810258945Sroberto /* 811258945Sroberto * This is in case hastd disconnected and was upgraded. 812258945Sroberto */ 813258945Sroberto res->hr_replication = res->hr_original_replication; 814258945Sroberto } 815258945Sroberto if (inp != NULL && outp != NULL) { 816258945Sroberto *inp = in; 817258945Sroberto *outp = out; 818258945Sroberto } else { 819258945Sroberto res->hr_remotein = in; 820258945Sroberto res->hr_remoteout = out; 821258945Sroberto } 822258945Sroberto event_send(res, EVENT_CONNECT); 823258945Sroberto return (0); 824258945Srobertoclose: 825258945Sroberto if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 826258945Sroberto event_send(res, EVENT_SPLITBRAIN); 827258945Sroberto proto_close(out); 828258945Sroberto if (in != NULL) 829280849Scy proto_close(in); 830258945Sroberto return (error); 831258945Sroberto} 832258945Sroberto 833258945Srobertostatic void 834258945Srobertosync_start(void) 835258945Sroberto{ 836258945Sroberto 837258945Sroberto mtx_lock(&sync_lock); 838258945Sroberto sync_inprogress = true; 839258945Sroberto mtx_unlock(&sync_lock); 840258945Sroberto cv_signal(&sync_cond); 841258945Sroberto} 842258945Sroberto 843258945Srobertostatic void 844258945Srobertosync_stop(void) 845258945Sroberto{ 846258945Sroberto 847258945Sroberto mtx_lock(&sync_lock); 848258945Sroberto if (sync_inprogress) 849258945Sroberto sync_inprogress = false; 850258945Sroberto mtx_unlock(&sync_lock); 851258945Sroberto} 852258945Sroberto 853258945Srobertostatic void 854258945Srobertoinit_ggate(struct hast_resource *res) 855258945Sroberto{ 856258945Sroberto struct g_gate_ctl_create ggiocreate; 857258945Sroberto struct g_gate_ctl_cancel ggiocancel; 858258945Sroberto 859258945Sroberto /* 860258945Sroberto * We communicate with ggate via /dev/ggctl. Open it. 861258945Sroberto */ 862258945Sroberto res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 863258945Sroberto if (res->hr_ggatefd == -1) 864258945Sroberto primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 865258945Sroberto /* 866258945Sroberto * Create provider before trying to connect, as connection failure 867258945Sroberto * is not critical, but may take some time. 868258945Sroberto */ 869258945Sroberto bzero(&ggiocreate, sizeof(ggiocreate)); 870258945Sroberto ggiocreate.gctl_version = G_GATE_VERSION; 871258945Sroberto ggiocreate.gctl_mediasize = res->hr_datasize; 872258945Sroberto ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 873280849Scy ggiocreate.gctl_flags = 0; 874258945Sroberto ggiocreate.gctl_maxcount = 0; 875258945Sroberto ggiocreate.gctl_timeout = 0; 876258945Sroberto ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 877258945Sroberto snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 878258945Sroberto res->hr_provname); 879258945Sroberto if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 880258945Sroberto pjdlog_info("Device hast/%s created.", res->hr_provname); 881258945Sroberto res->hr_ggateunit = ggiocreate.gctl_unit; 882258945Sroberto return; 883258945Sroberto } 884258945Sroberto if (errno != EEXIST) { 885258945Sroberto primary_exit(EX_OSERR, "Unable to create hast/%s device", 886258945Sroberto res->hr_provname); 887258945Sroberto } 888258945Sroberto pjdlog_debug(1, 889258945Sroberto "Device hast/%s already exists, we will try to take it over.", 890258945Sroberto res->hr_provname); 891258945Sroberto /* 892258945Sroberto * If we received EEXIST, we assume that the process who created the 893258945Sroberto * provider died and didn't clean up. In that case we will start from 894258945Sroberto * where he left of. 895258945Sroberto */ 896258945Sroberto bzero(&ggiocancel, sizeof(ggiocancel)); 897280849Scy ggiocancel.gctl_version = G_GATE_VERSION; 898258945Sroberto ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 899258945Sroberto snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 900258945Sroberto res->hr_provname); 901258945Sroberto if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 902258945Sroberto pjdlog_info("Device hast/%s recovered.", res->hr_provname); 903258945Sroberto res->hr_ggateunit = ggiocancel.gctl_unit; 904258945Sroberto return; 905258945Sroberto } 906258945Sroberto primary_exit(EX_OSERR, "Unable to take over hast/%s device", 907258945Sroberto res->hr_provname); 908258945Sroberto} 909258945Sroberto 910258945Srobertovoid 911258945Srobertohastd_primary(struct hast_resource *res) 912258945Sroberto{ 913258945Sroberto pthread_t td; 914258945Sroberto pid_t pid; 915258945Sroberto int error, mode, debuglevel; 916258945Sroberto 917258945Sroberto /* 918258945Sroberto * Create communication channel for sending control commands from 919258945Sroberto * parent to child. 920258945Sroberto */ 921258945Sroberto if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 922258945Sroberto /* TODO: There's no need for this to be fatal error. */ 923258945Sroberto KEEP_ERRNO((void)pidfile_remove(pfh)); 924258945Sroberto pjdlog_exit(EX_OSERR, 925258945Sroberto "Unable to create control sockets between parent and child"); 926258945Sroberto } 927258945Sroberto /* 928258945Sroberto * Create communication channel for sending events from child to parent. 929258945Sroberto */ 930258945Sroberto if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 931258945Sroberto /* TODO: There's no need for this to be fatal error. */ 932258945Sroberto KEEP_ERRNO((void)pidfile_remove(pfh)); 933258945Sroberto pjdlog_exit(EX_OSERR, 934258945Sroberto "Unable to create event sockets between child and parent"); 935258945Sroberto } 936258945Sroberto /* 937258945Sroberto * Create communication channel for sending connection requests from 938258945Sroberto * child to parent. 939258945Sroberto */ 940258945Sroberto if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 941258945Sroberto /* TODO: There's no need for this to be fatal error. */ 942258945Sroberto KEEP_ERRNO((void)pidfile_remove(pfh)); 943258945Sroberto pjdlog_exit(EX_OSERR, 944258945Sroberto "Unable to create connection sockets between child and parent"); 945258945Sroberto } 946258945Sroberto 947258945Sroberto pid = fork(); 948258945Sroberto if (pid == -1) { 949258945Sroberto /* TODO: There's no need for this to be fatal error. */ 950258945Sroberto KEEP_ERRNO((void)pidfile_remove(pfh)); 951258945Sroberto pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 952258945Sroberto } 953258945Sroberto 954258945Sroberto if (pid > 0) { 955258945Sroberto /* This is parent. */ 956258945Sroberto /* Declare that we are receiver. */ 957258945Sroberto proto_recv(res->hr_event, NULL, 0); 958258945Sroberto proto_recv(res->hr_conn, NULL, 0); 959258945Sroberto /* Declare that we are sender. */ 960280849Scy proto_send(res->hr_ctrl, NULL, 0); 961258945Sroberto res->hr_workerpid = pid; 962258945Sroberto return; 963258945Sroberto } 964258945Sroberto 965258945Sroberto gres = res; 966258945Sroberto mode = pjdlog_mode_get(); 967258945Sroberto debuglevel = pjdlog_debug_get(); 968258945Sroberto 969258945Sroberto /* Declare that we are sender. */ 970258945Sroberto proto_send(res->hr_event, NULL, 0); 971258945Sroberto proto_send(res->hr_conn, NULL, 0); 972258945Sroberto /* Declare that we are receiver. */ 973258945Sroberto proto_recv(res->hr_ctrl, NULL, 0); 974258945Sroberto descriptors_cleanup(res); 975258945Sroberto 976258945Sroberto descriptors_assert(res, mode); 977258945Sroberto 978258945Sroberto pjdlog_init(mode); 979258945Sroberto pjdlog_debug_set(debuglevel); 980258945Sroberto pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 981258945Sroberto setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 982258945Sroberto 983258945Sroberto init_local(res); 984258945Sroberto init_ggate(res); 985258945Sroberto init_environment(res); 986258945Sroberto 987258945Sroberto if (drop_privs(res) != 0) { 988258945Sroberto cleanup(res); 989258945Sroberto exit(EX_CONFIG); 990258945Sroberto } 991258945Sroberto pjdlog_info("Privileges successfully dropped."); 992258945Sroberto 993258945Sroberto /* 994258945Sroberto * Create the guard thread first, so we can handle signals from the 995258945Sroberto * very beginning. 996258945Sroberto */ 997258945Sroberto error = pthread_create(&td, NULL, guard_thread, res); 998258945Sroberto PJDLOG_ASSERT(error == 0); 999258945Sroberto /* 1000258945Sroberto * Create the control thread before sending any event to the parent, 1001258945Sroberto * as we can deadlock when parent sends control request to worker, 1002258945Sroberto * but worker has no control thread started yet, so parent waits. 1003258945Sroberto * In the meantime worker sends an event to the parent, but parent 1004258945Sroberto * is unable to handle the event, because it waits for control 1005258945Sroberto * request response. 1006258945Sroberto */ 1007258945Sroberto error = pthread_create(&td, NULL, ctrl_thread, res); 1008258945Sroberto PJDLOG_ASSERT(error == 0); 1009258945Sroberto if (real_remote(res)) { 1010258945Sroberto error = init_remote(res, NULL, NULL); 1011258945Sroberto if (error == 0) { 1012258945Sroberto sync_start(); 1013258945Sroberto } else if (error == EBUSY) { 1014258945Sroberto time_t start = time(NULL); 1015258945Sroberto 1016258945Sroberto pjdlog_warning("Waiting for remote node to become %s for %ds.", 1017258945Sroberto role2str(HAST_ROLE_SECONDARY), 1018258945Sroberto res->hr_timeout); 1019258945Sroberto for (;;) { 1020258945Sroberto sleep(1); 1021280849Scy error = init_remote(res, NULL, NULL); 1022258945Sroberto if (error != EBUSY) 1023258945Sroberto break; 1024258945Sroberto if (time(NULL) > start + res->hr_timeout) 1025258945Sroberto break; 1026258945Sroberto } 1027258945Sroberto if (error == EBUSY) { 1028280849Scy pjdlog_warning("Remote node is still %s, starting anyway.", 1029258945Sroberto role2str(HAST_ROLE_PRIMARY)); 1030258945Sroberto } 1031258945Sroberto } 1032258945Sroberto } 1033258945Sroberto error = pthread_create(&td, NULL, ggate_recv_thread, res); 1034258945Sroberto PJDLOG_ASSERT(error == 0); 1035258945Sroberto error = pthread_create(&td, NULL, local_send_thread, res); 1036258945Sroberto PJDLOG_ASSERT(error == 0); 1037258945Sroberto error = pthread_create(&td, NULL, remote_send_thread, res); 1038258945Sroberto PJDLOG_ASSERT(error == 0); 1039258945Sroberto error = pthread_create(&td, NULL, remote_recv_thread, res); 1040258945Sroberto PJDLOG_ASSERT(error == 0); 1041258945Sroberto error = pthread_create(&td, NULL, ggate_send_thread, res); 1042258945Sroberto PJDLOG_ASSERT(error == 0); 1043258945Sroberto fullystarted = true; 1044258945Sroberto (void)sync_thread(res); 1045258945Sroberto} 1046258945Sroberto 1047258945Srobertostatic void 1048258945Srobertoreqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1049258945Sroberto const char *fmt, ...) 1050258945Sroberto{ 1051258945Sroberto char msg[1024]; 1052258945Sroberto va_list ap; 1053258945Sroberto 1054258945Sroberto va_start(ap, fmt); 1055258945Sroberto (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1056258945Sroberto va_end(ap); 1057258945Sroberto switch (ggio->gctl_cmd) { 1058258945Sroberto case BIO_READ: 1059258945Sroberto (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1060258945Sroberto (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1061258945Sroberto break; 1062258945Sroberto case BIO_DELETE: 1063258945Sroberto (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1064258945Sroberto (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1065258945Sroberto break; 1066258945Sroberto case BIO_FLUSH: 1067280849Scy (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1068258945Sroberto break; 1069258945Sroberto case BIO_WRITE: 1070258945Sroberto (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1071258945Sroberto (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1072258945Sroberto break; 1073258945Sroberto default: 1074258945Sroberto (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1075258945Sroberto (unsigned int)ggio->gctl_cmd); 1076258945Sroberto break; 1077258945Sroberto } 1078258945Sroberto pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1079258945Sroberto} 1080258945Sroberto 1081258945Srobertostatic void 1082258945Srobertoremote_close(struct hast_resource *res, int ncomp) 1083258945Sroberto{ 1084258945Sroberto 1085258945Sroberto rw_wlock(&hio_remote_lock[ncomp]); 1086258945Sroberto /* 1087258945Sroberto * Check for a race between dropping rlock and acquiring wlock - 1088258945Sroberto * another thread can close connection in-between. 1089258945Sroberto */ 1090258945Sroberto if (!ISCONNECTED(res, ncomp)) { 1091258945Sroberto PJDLOG_ASSERT(res->hr_remotein == NULL); 1092258945Sroberto PJDLOG_ASSERT(res->hr_remoteout == NULL); 1093258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1094280849Scy return; 1095258945Sroberto } 1096258945Sroberto 1097258945Sroberto PJDLOG_ASSERT(res->hr_remotein != NULL); 1098258945Sroberto PJDLOG_ASSERT(res->hr_remoteout != NULL); 1099280849Scy 1100258945Sroberto pjdlog_debug(2, "Closing incoming connection to %s.", 1101258945Sroberto res->hr_remoteaddr); 1102258945Sroberto proto_close(res->hr_remotein); 1103258945Sroberto res->hr_remotein = NULL; 1104258945Sroberto pjdlog_debug(2, "Closing outgoing connection to %s.", 1105258945Sroberto res->hr_remoteaddr); 1106280849Scy proto_close(res->hr_remoteout); 1107258945Sroberto res->hr_remoteout = NULL; 1108258945Sroberto 1109258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1110258945Sroberto 1111258945Sroberto pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1112258945Sroberto 1113258945Sroberto /* 1114258945Sroberto * Stop synchronization if in-progress. 1115258945Sroberto */ 1116258945Sroberto sync_stop(); 1117258945Sroberto 1118258945Sroberto event_send(res, EVENT_DISCONNECT); 1119258945Sroberto} 1120258945Sroberto 1121258945Sroberto/* 1122258945Sroberto * Acknowledge write completion to the kernel, but don't update activemap yet. 1123258945Sroberto */ 1124258945Srobertostatic void 1125258945Srobertowrite_complete(struct hast_resource *res, struct hio *hio) 1126258945Sroberto{ 1127258945Sroberto struct g_gate_ctl_io *ggio; 1128258945Sroberto unsigned int ncomp; 1129258945Sroberto 1130258945Sroberto PJDLOG_ASSERT(!hio->hio_done); 1131258945Sroberto 1132258945Sroberto ggio = &hio->hio_ggio; 1133258945Sroberto PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1134258945Sroberto 1135258945Sroberto /* 1136258945Sroberto * Bump local count if this is first write after 1137258945Sroberto * connection failure with remote node. 1138258945Sroberto */ 1139258945Sroberto ncomp = 1; 1140258945Sroberto rw_rlock(&hio_remote_lock[ncomp]); 1141258945Sroberto if (!ISCONNECTED(res, ncomp)) { 1142258945Sroberto mtx_lock(&metadata_lock); 1143258945Sroberto if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1144258945Sroberto res->hr_primary_localcnt++; 1145258945Sroberto pjdlog_debug(1, "Increasing localcnt to %ju.", 1146258945Sroberto (uintmax_t)res->hr_primary_localcnt); 1147258945Sroberto (void)metadata_write(res); 1148258945Sroberto } 1149258945Sroberto mtx_unlock(&metadata_lock); 1150258945Sroberto } 1151258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1152258945Sroberto if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1153258945Sroberto primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1154258945Sroberto hio->hio_done = true; 1155258945Sroberto} 1156258945Sroberto 1157258945Sroberto/* 1158258945Sroberto * Thread receives ggate I/O requests from the kernel and passes them to 1159258945Sroberto * appropriate threads: 1160258945Sroberto * WRITE - always goes to both local_send and remote_send threads 1161258945Sroberto * READ (when the block is up-to-date on local component) - 1162258945Sroberto * only local_send thread 1163258945Sroberto * READ (when the block isn't up-to-date on local component) - 1164258945Sroberto * only remote_send thread 1165258945Sroberto * DELETE - always goes to both local_send and remote_send threads 1166258945Sroberto * FLUSH - always goes to both local_send and remote_send threads 1167258945Sroberto */ 1168258945Srobertostatic void * 1169258945Srobertoggate_recv_thread(void *arg) 1170258945Sroberto{ 1171258945Sroberto struct hast_resource *res = arg; 1172258945Sroberto struct g_gate_ctl_io *ggio; 1173258945Sroberto struct hio *hio; 1174258945Sroberto unsigned int ii, ncomp, ncomps; 1175258945Sroberto int error; 1176258945Sroberto 1177258945Sroberto for (;;) { 1178258945Sroberto pjdlog_debug(2, "ggate_recv: Taking free request."); 1179258945Sroberto QUEUE_TAKE2(hio, free); 1180258945Sroberto pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1181258945Sroberto ggio = &hio->hio_ggio; 1182258945Sroberto ggio->gctl_unit = res->hr_ggateunit; 1183258945Sroberto ggio->gctl_length = MAXPHYS; 1184258945Sroberto ggio->gctl_error = 0; 1185258945Sroberto hio->hio_done = false; 1186258945Sroberto hio->hio_replication = res->hr_replication; 1187258945Sroberto pjdlog_debug(2, 1188258945Sroberto "ggate_recv: (%p) Waiting for request from the kernel.", 1189258945Sroberto hio); 1190258945Sroberto if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1191258945Sroberto if (sigexit_received) 1192258945Sroberto pthread_exit(NULL); 1193258945Sroberto primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1194258945Sroberto } 1195258945Sroberto error = ggio->gctl_error; 1196258945Sroberto switch (error) { 1197258945Sroberto case 0: 1198258945Sroberto break; 1199280849Scy case ECANCELED: 1200258945Sroberto /* Exit gracefully. */ 1201258945Sroberto if (!sigexit_received) { 1202258945Sroberto pjdlog_debug(2, 1203258945Sroberto "ggate_recv: (%p) Received cancel from the kernel.", 1204258945Sroberto hio); 1205258945Sroberto pjdlog_info("Received cancel from the kernel, exiting."); 1206258945Sroberto } 1207258945Sroberto pthread_exit(NULL); 1208258945Sroberto case ENOMEM: 1209258945Sroberto /* 1210258945Sroberto * Buffer too small? Impossible, we allocate MAXPHYS 1211258945Sroberto * bytes - request can't be bigger than that. 1212258945Sroberto */ 1213258945Sroberto /* FALLTHROUGH */ 1214258945Sroberto case ENXIO: 1215258945Sroberto default: 1216258945Sroberto primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1217258945Sroberto strerror(error)); 1218258945Sroberto } 1219258945Sroberto 1220258945Sroberto ncomp = 0; 1221258945Sroberto ncomps = HAST_NCOMPONENTS; 1222258945Sroberto 1223258945Sroberto for (ii = 0; ii < ncomps; ii++) 1224258945Sroberto hio->hio_errors[ii] = EINVAL; 1225258945Sroberto reqlog(LOG_DEBUG, 2, ggio, 1226258945Sroberto "ggate_recv: (%p) Request received from the kernel: ", 1227258945Sroberto hio); 1228258945Sroberto 1229258945Sroberto /* 1230258945Sroberto * Inform all components about new write request. 1231258945Sroberto * For read request prefer local component unless the given 1232258945Sroberto * range is out-of-date, then use remote component. 1233258945Sroberto */ 1234258945Sroberto switch (ggio->gctl_cmd) { 1235258945Sroberto case BIO_READ: 1236258945Sroberto res->hr_stat_read++; 1237258945Sroberto ncomps = 1; 1238258945Sroberto mtx_lock(&metadata_lock); 1239258945Sroberto if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1240258945Sroberto res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1241258945Sroberto /* 1242258945Sroberto * This range is up-to-date on local component, 1243258945Sroberto * so handle request locally. 1244258945Sroberto */ 1245258945Sroberto /* Local component is 0 for now. */ 1246258945Sroberto ncomp = 0; 1247258945Sroberto } else /* if (res->hr_syncsrc == 1248258945Sroberto HAST_SYNCSRC_SECONDARY) */ { 1249258945Sroberto PJDLOG_ASSERT(res->hr_syncsrc == 1250258945Sroberto HAST_SYNCSRC_SECONDARY); 1251258945Sroberto /* 1252258945Sroberto * This range is out-of-date on local component, 1253258945Sroberto * so send request to the remote node. 1254258945Sroberto */ 1255258945Sroberto /* Remote component is 1 for now. */ 1256258945Sroberto ncomp = 1; 1257258945Sroberto } 1258258945Sroberto mtx_unlock(&metadata_lock); 1259258945Sroberto break; 1260258945Sroberto case BIO_WRITE: 1261258945Sroberto res->hr_stat_write++; 1262258945Sroberto if (res->hr_resuid == 0 && 1263258945Sroberto res->hr_primary_localcnt == 0) { 1264258945Sroberto /* This is first write. */ 1265258945Sroberto res->hr_primary_localcnt = 1; 1266258945Sroberto } 1267258945Sroberto for (;;) { 1268258945Sroberto mtx_lock(&range_lock); 1269258945Sroberto if (rangelock_islocked(range_sync, 1270258945Sroberto ggio->gctl_offset, ggio->gctl_length)) { 1271258945Sroberto pjdlog_debug(2, 1272258945Sroberto "regular: Range offset=%jd length=%zu locked.", 1273258945Sroberto (intmax_t)ggio->gctl_offset, 1274258945Sroberto (size_t)ggio->gctl_length); 1275258945Sroberto range_regular_wait = true; 1276258945Sroberto cv_wait(&range_regular_cond, &range_lock); 1277258945Sroberto range_regular_wait = false; 1278258945Sroberto mtx_unlock(&range_lock); 1279258945Sroberto continue; 1280258945Sroberto } 1281258945Sroberto if (rangelock_add(range_regular, 1282258945Sroberto ggio->gctl_offset, ggio->gctl_length) == -1) { 1283258945Sroberto mtx_unlock(&range_lock); 1284258945Sroberto pjdlog_debug(2, 1285258945Sroberto "regular: Range offset=%jd length=%zu is already locked, waiting.", 1286258945Sroberto (intmax_t)ggio->gctl_offset, 1287258945Sroberto (size_t)ggio->gctl_length); 1288258945Sroberto sleep(1); 1289258945Sroberto continue; 1290258945Sroberto } 1291258945Sroberto mtx_unlock(&range_lock); 1292258945Sroberto break; 1293258945Sroberto } 1294258945Sroberto mtx_lock(&res->hr_amp_lock); 1295258945Sroberto if (activemap_write_start(res->hr_amp, 1296258945Sroberto ggio->gctl_offset, ggio->gctl_length)) { 1297258945Sroberto res->hr_stat_activemap_update++; 1298258945Sroberto (void)hast_activemap_flush(res); 1299258945Sroberto } else { 1300258945Sroberto mtx_unlock(&res->hr_amp_lock); 1301258945Sroberto } 1302280849Scy break; 1303258945Sroberto case BIO_DELETE: 1304258945Sroberto res->hr_stat_delete++; 1305258945Sroberto break; 1306258945Sroberto case BIO_FLUSH: 1307258945Sroberto res->hr_stat_flush++; 1308258945Sroberto break; 1309258945Sroberto } 1310258945Sroberto pjdlog_debug(2, 1311258945Sroberto "ggate_recv: (%p) Moving request to the send queues.", hio); 1312258945Sroberto if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1313258945Sroberto ggio->gctl_cmd == BIO_WRITE) { 1314258945Sroberto /* Each remote request needs two responses in memsync. */ 1315258945Sroberto refcnt_init(&hio->hio_countdown, ncomps + 1); 1316258945Sroberto } else { 1317258945Sroberto refcnt_init(&hio->hio_countdown, ncomps); 1318258945Sroberto } 1319258945Sroberto for (ii = ncomp; ii < ncomps; ii++) 1320258945Sroberto QUEUE_INSERT1(hio, send, ii); 1321258945Sroberto } 1322258945Sroberto /* NOTREACHED */ 1323258945Sroberto return (NULL); 1324258945Sroberto} 1325258945Sroberto 1326258945Sroberto/* 1327258945Sroberto * Thread reads from or writes to local component. 1328258945Sroberto * If local read fails, it redirects it to remote_send thread. 1329258945Sroberto */ 1330258945Srobertostatic void * 1331258945Srobertolocal_send_thread(void *arg) 1332258945Sroberto{ 1333258945Sroberto struct hast_resource *res = arg; 1334258945Sroberto struct g_gate_ctl_io *ggio; 1335258945Sroberto struct hio *hio; 1336258945Sroberto unsigned int ncomp, rncomp; 1337258945Sroberto ssize_t ret; 1338258945Sroberto 1339258945Sroberto /* Local component is 0 for now. */ 1340258945Sroberto ncomp = 0; 1341258945Sroberto /* Remote component is 1 for now. */ 1342258945Sroberto rncomp = 1; 1343258945Sroberto 1344258945Sroberto for (;;) { 1345258945Sroberto pjdlog_debug(2, "local_send: Taking request."); 1346258945Sroberto QUEUE_TAKE1(hio, send, ncomp, 0); 1347258945Sroberto pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1348258945Sroberto ggio = &hio->hio_ggio; 1349258945Sroberto switch (ggio->gctl_cmd) { 1350258945Sroberto case BIO_READ: 1351258945Sroberto ret = pread(res->hr_localfd, ggio->gctl_data, 1352258945Sroberto ggio->gctl_length, 1353258945Sroberto ggio->gctl_offset + res->hr_localoff); 1354258945Sroberto if (ret == ggio->gctl_length) 1355258945Sroberto hio->hio_errors[ncomp] = 0; 1356258945Sroberto else if (!ISSYNCREQ(hio)) { 1357258945Sroberto /* 1358258945Sroberto * If READ failed, try to read from remote node. 1359258945Sroberto */ 1360258945Sroberto if (ret == -1) { 1361258945Sroberto reqlog(LOG_WARNING, 0, ggio, 1362258945Sroberto "Local request failed (%s), trying remote node. ", 1363258945Sroberto strerror(errno)); 1364258945Sroberto } else if (ret != ggio->gctl_length) { 1365258945Sroberto reqlog(LOG_WARNING, 0, ggio, 1366258945Sroberto "Local request failed (%zd != %jd), trying remote node. ", 1367258945Sroberto ret, (intmax_t)ggio->gctl_length); 1368258945Sroberto } 1369258945Sroberto QUEUE_INSERT1(hio, send, rncomp); 1370258945Sroberto continue; 1371258945Sroberto } 1372258945Sroberto break; 1373258945Sroberto case BIO_WRITE: 1374258945Sroberto ret = pwrite(res->hr_localfd, ggio->gctl_data, 1375258945Sroberto ggio->gctl_length, 1376258945Sroberto ggio->gctl_offset + res->hr_localoff); 1377258945Sroberto if (ret == -1) { 1378258945Sroberto hio->hio_errors[ncomp] = errno; 1379280849Scy reqlog(LOG_WARNING, 0, ggio, 1380280849Scy "Local request failed (%s): ", 1381280849Scy strerror(errno)); 1382258945Sroberto } else if (ret != ggio->gctl_length) { 1383258945Sroberto hio->hio_errors[ncomp] = EIO; 1384258945Sroberto reqlog(LOG_WARNING, 0, ggio, 1385258945Sroberto "Local request failed (%zd != %jd): ", 1386258945Sroberto ret, (intmax_t)ggio->gctl_length); 1387258945Sroberto } else { 1388258945Sroberto hio->hio_errors[ncomp] = 0; 1389258945Sroberto if (hio->hio_replication == 1390258945Sroberto HAST_REPLICATION_ASYNC) { 1391258945Sroberto ggio->gctl_error = 0; 1392258945Sroberto write_complete(res, hio); 1393258945Sroberto } 1394258945Sroberto } 1395258945Sroberto break; 1396258945Sroberto case BIO_DELETE: 1397258945Sroberto ret = g_delete(res->hr_localfd, 1398258945Sroberto ggio->gctl_offset + res->hr_localoff, 1399258945Sroberto ggio->gctl_length); 1400280849Scy if (ret == -1) { 1401280849Scy hio->hio_errors[ncomp] = errno; 1402280849Scy reqlog(LOG_WARNING, 0, ggio, 1403280849Scy "Local request failed (%s): ", 1404280849Scy strerror(errno)); 1405280849Scy } else { 1406280849Scy hio->hio_errors[ncomp] = 0; 1407280849Scy } 1408280849Scy break; 1409280849Scy case BIO_FLUSH: 1410280849Scy if (!res->hr_localflush) { 1411280849Scy ret = -1; 1412280849Scy errno = EOPNOTSUPP; 1413280849Scy break; 1414280849Scy } 1415280849Scy ret = g_flush(res->hr_localfd); 1416258945Sroberto if (ret == -1) { 1417258945Sroberto if (errno == EOPNOTSUPP) 1418258945Sroberto res->hr_localflush = false; 1419258945Sroberto hio->hio_errors[ncomp] = errno; 1420258945Sroberto reqlog(LOG_WARNING, 0, ggio, 1421258945Sroberto "Local request failed (%s): ", 1422258945Sroberto strerror(errno)); 1423258945Sroberto } else { 1424258945Sroberto hio->hio_errors[ncomp] = 0; 1425258945Sroberto } 1426258945Sroberto break; 1427258945Sroberto } 1428258945Sroberto 1429258945Sroberto if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1430258945Sroberto ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1431258945Sroberto if (refcnt_release(&hio->hio_countdown) > 0) 1432258945Sroberto continue; 1433258945Sroberto } else { 1434258945Sroberto /* 1435258945Sroberto * Depending on hio_countdown value, requests finished 1436258945Sroberto * in the following order: 1437258945Sroberto * 0: remote memsync, remote final, local write 1438258945Sroberto * 1: remote memsync, local write, (remote final) 1439258945Sroberto * 2: local write, (remote memsync), (remote final) 1440280849Scy */ 1441258945Sroberto switch (refcnt_release(&hio->hio_countdown)) { 1442258945Sroberto case 0: 1443258945Sroberto /* 1444258945Sroberto * Local write finished as last. 1445258945Sroberto */ 1446258945Sroberto break; 1447258945Sroberto case 1: 1448258945Sroberto /* 1449258945Sroberto * Local write finished after remote memsync 1450258945Sroberto * reply arrvied. We can complete the write now. 1451258945Sroberto */ 1452258945Sroberto if (hio->hio_errors[0] == 0) 1453258945Sroberto write_complete(res, hio); 1454258945Sroberto continue; 1455258945Sroberto case 2: 1456258945Sroberto /* 1457258945Sroberto * Local write finished as first. 1458258945Sroberto */ 1459258945Sroberto continue; 1460258945Sroberto default: 1461258945Sroberto PJDLOG_ABORT("Invalid hio_countdown."); 1462258945Sroberto } 1463258945Sroberto } 1464258945Sroberto if (ISSYNCREQ(hio)) { 1465258945Sroberto mtx_lock(&sync_lock); 1466258945Sroberto SYNCREQDONE(hio); 1467258945Sroberto mtx_unlock(&sync_lock); 1468258945Sroberto cv_signal(&sync_cond); 1469258945Sroberto } else { 1470258945Sroberto pjdlog_debug(2, 1471258945Sroberto "local_send: (%p) Moving request to the done queue.", 1472258945Sroberto hio); 1473258945Sroberto QUEUE_INSERT2(hio, done); 1474258945Sroberto } 1475258945Sroberto } 1476258945Sroberto /* NOTREACHED */ 1477258945Sroberto return (NULL); 1478258945Sroberto} 1479258945Sroberto 1480258945Srobertostatic void 1481258945Srobertokeepalive_send(struct hast_resource *res, unsigned int ncomp) 1482258945Sroberto{ 1483258945Sroberto struct nv *nv; 1484258945Sroberto 1485258945Sroberto rw_rlock(&hio_remote_lock[ncomp]); 1486258945Sroberto 1487258945Sroberto if (!ISCONNECTED(res, ncomp)) { 1488258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1489258945Sroberto return; 1490258945Sroberto } 1491258945Sroberto 1492258945Sroberto PJDLOG_ASSERT(res->hr_remotein != NULL); 1493258945Sroberto PJDLOG_ASSERT(res->hr_remoteout != NULL); 1494258945Sroberto 1495258945Sroberto nv = nv_alloc(); 1496258945Sroberto nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1497258945Sroberto if (nv_error(nv) != 0) { 1498258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1499258945Sroberto nv_free(nv); 1500258945Sroberto pjdlog_debug(1, 1501258945Sroberto "keepalive_send: Unable to prepare header to send."); 1502258945Sroberto return; 1503258945Sroberto } 1504258945Sroberto if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1505258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1506258945Sroberto pjdlog_common(LOG_DEBUG, 1, errno, 1507258945Sroberto "keepalive_send: Unable to send request"); 1508258945Sroberto nv_free(nv); 1509258945Sroberto remote_close(res, ncomp); 1510258945Sroberto return; 1511258945Sroberto } 1512258945Sroberto 1513258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1514258945Sroberto nv_free(nv); 1515258945Sroberto pjdlog_debug(2, "keepalive_send: Request sent."); 1516258945Sroberto} 1517258945Sroberto 1518258945Sroberto/* 1519258945Sroberto * Thread sends request to secondary node. 1520258945Sroberto */ 1521258945Srobertostatic void * 1522258945Srobertoremote_send_thread(void *arg) 1523258945Sroberto{ 1524258945Sroberto struct hast_resource *res = arg; 1525258945Sroberto struct g_gate_ctl_io *ggio; 1526258945Sroberto time_t lastcheck, now; 1527258945Sroberto struct hio *hio; 1528258945Sroberto struct nv *nv; 1529258945Sroberto unsigned int ncomp; 1530258945Sroberto bool wakeup; 1531258945Sroberto uint64_t offset, length; 1532258945Sroberto uint8_t cmd; 1533258945Sroberto void *data; 1534258945Sroberto 1535258945Sroberto /* Remote component is 1 for now. */ 1536258945Sroberto ncomp = 1; 1537258945Sroberto lastcheck = time(NULL); 1538258945Sroberto 1539258945Sroberto for (;;) { 1540258945Sroberto pjdlog_debug(2, "remote_send: Taking request."); 1541258945Sroberto QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1542258945Sroberto if (hio == NULL) { 1543258945Sroberto now = time(NULL); 1544258945Sroberto if (lastcheck + HAST_KEEPALIVE <= now) { 1545258945Sroberto keepalive_send(res, ncomp); 1546258945Sroberto lastcheck = now; 1547258945Sroberto } 1548258945Sroberto continue; 1549258945Sroberto } 1550258945Sroberto pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1551258945Sroberto ggio = &hio->hio_ggio; 1552258945Sroberto switch (ggio->gctl_cmd) { 1553258945Sroberto case BIO_READ: 1554258945Sroberto cmd = HIO_READ; 1555258945Sroberto data = NULL; 1556258945Sroberto offset = ggio->gctl_offset; 1557258945Sroberto length = ggio->gctl_length; 1558258945Sroberto break; 1559258945Sroberto case BIO_WRITE: 1560258945Sroberto cmd = HIO_WRITE; 1561280849Scy data = ggio->gctl_data; 1562258945Sroberto offset = ggio->gctl_offset; 1563258945Sroberto length = ggio->gctl_length; 1564258945Sroberto break; 1565258945Sroberto case BIO_DELETE: 1566258945Sroberto cmd = HIO_DELETE; 1567258945Sroberto data = NULL; 1568258945Sroberto offset = ggio->gctl_offset; 1569258945Sroberto length = ggio->gctl_length; 1570258945Sroberto break; 1571258945Sroberto case BIO_FLUSH: 1572258945Sroberto cmd = HIO_FLUSH; 1573258945Sroberto data = NULL; 1574258945Sroberto offset = 0; 1575258945Sroberto length = 0; 1576258945Sroberto break; 1577258945Sroberto default: 1578258945Sroberto PJDLOG_ABORT("invalid condition"); 1579258945Sroberto } 1580258945Sroberto nv = nv_alloc(); 1581258945Sroberto nv_add_uint8(nv, cmd, "cmd"); 1582258945Sroberto nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1583258945Sroberto nv_add_uint64(nv, offset, "offset"); 1584258945Sroberto nv_add_uint64(nv, length, "length"); 1585280849Scy if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1586258945Sroberto ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { 1587258945Sroberto nv_add_uint8(nv, 1, "memsync"); 1588258945Sroberto } 1589258945Sroberto if (nv_error(nv) != 0) { 1590258945Sroberto hio->hio_errors[ncomp] = nv_error(nv); 1591258945Sroberto pjdlog_debug(2, 1592258945Sroberto "remote_send: (%p) Unable to prepare header to send.", 1593258945Sroberto hio); 1594258945Sroberto reqlog(LOG_ERR, 0, ggio, 1595258945Sroberto "Unable to prepare header to send (%s): ", 1596258945Sroberto strerror(nv_error(nv))); 1597258945Sroberto /* Move failed request immediately to the done queue. */ 1598280849Scy goto done_queue; 1599258945Sroberto } 1600258945Sroberto /* 1601258945Sroberto * Protect connection from disappearing. 1602258945Sroberto */ 1603258945Sroberto rw_rlock(&hio_remote_lock[ncomp]); 1604258945Sroberto if (!ISCONNECTED(res, ncomp)) { 1605258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1606258945Sroberto hio->hio_errors[ncomp] = ENOTCONN; 1607258945Sroberto goto done_queue; 1608258945Sroberto } 1609258945Sroberto /* 1610258945Sroberto * Move the request to recv queue before sending it, because 1611258945Sroberto * in different order we can get reply before we move request 1612258945Sroberto * to recv queue. 1613258945Sroberto */ 1614258945Sroberto pjdlog_debug(2, 1615258945Sroberto "remote_send: (%p) Moving request to the recv queue.", 1616258945Sroberto hio); 1617258945Sroberto mtx_lock(&hio_recv_list_lock[ncomp]); 1618258945Sroberto wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1619258945Sroberto TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1620258945Sroberto mtx_unlock(&hio_recv_list_lock[ncomp]); 1621258945Sroberto if (hast_proto_send(res, res->hr_remoteout, nv, data, 1622258945Sroberto data != NULL ? length : 0) == -1) { 1623258945Sroberto hio->hio_errors[ncomp] = errno; 1624258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1625258945Sroberto pjdlog_debug(2, 1626258945Sroberto "remote_send: (%p) Unable to send request.", hio); 1627258945Sroberto reqlog(LOG_ERR, 0, ggio, 1628258945Sroberto "Unable to send request (%s): ", 1629258945Sroberto strerror(hio->hio_errors[ncomp])); 1630258945Sroberto remote_close(res, ncomp); 1631258945Sroberto /* 1632258945Sroberto * Take request back from the receive queue and move 1633258945Sroberto * it immediately to the done queue. 1634258945Sroberto */ 1635258945Sroberto mtx_lock(&hio_recv_list_lock[ncomp]); 1636258945Sroberto TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1637258945Sroberto hio_next[ncomp]); 1638280849Scy mtx_unlock(&hio_recv_list_lock[ncomp]); 1639258945Sroberto goto done_queue; 1640258945Sroberto } 1641258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1642258945Sroberto nv_free(nv); 1643258945Sroberto if (wakeup) 1644258945Sroberto cv_signal(&hio_recv_list_cond[ncomp]); 1645258945Sroberto continue; 1646258945Srobertodone_queue: 1647258945Sroberto nv_free(nv); 1648258945Sroberto if (ISSYNCREQ(hio)) { 1649258945Sroberto if (refcnt_release(&hio->hio_countdown) > 0) 1650258945Sroberto continue; 1651258945Sroberto mtx_lock(&sync_lock); 1652258945Sroberto SYNCREQDONE(hio); 1653258945Sroberto mtx_unlock(&sync_lock); 1654258945Sroberto cv_signal(&sync_cond); 1655258945Sroberto continue; 1656258945Sroberto } 1657258945Sroberto if (ggio->gctl_cmd == BIO_WRITE) { 1658258945Sroberto mtx_lock(&res->hr_amp_lock); 1659258945Sroberto if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1660258945Sroberto ggio->gctl_length)) { 1661258945Sroberto (void)hast_activemap_flush(res); 1662258945Sroberto } else { 1663258945Sroberto mtx_unlock(&res->hr_amp_lock); 1664258945Sroberto } 1665258945Sroberto if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) 1666258945Sroberto (void)refcnt_release(&hio->hio_countdown); 1667258945Sroberto } 1668258945Sroberto if (refcnt_release(&hio->hio_countdown) > 0) 1669258945Sroberto continue; 1670258945Sroberto pjdlog_debug(2, 1671258945Sroberto "remote_send: (%p) Moving request to the done queue.", 1672258945Sroberto hio); 1673258945Sroberto QUEUE_INSERT2(hio, done); 1674258945Sroberto } 1675258945Sroberto /* NOTREACHED */ 1676258945Sroberto return (NULL); 1677258945Sroberto} 1678258945Sroberto 1679258945Sroberto/* 1680258945Sroberto * Thread receives answer from secondary node and passes it to ggate_send 1681258945Sroberto * thread. 1682258945Sroberto */ 1683258945Srobertostatic void * 1684258945Srobertoremote_recv_thread(void *arg) 1685258945Sroberto{ 1686258945Sroberto struct hast_resource *res = arg; 1687258945Sroberto struct g_gate_ctl_io *ggio; 1688258945Sroberto struct hio *hio; 1689258945Sroberto struct nv *nv; 1690258945Sroberto unsigned int ncomp; 1691258945Sroberto uint64_t seq; 1692258945Sroberto bool memsyncack; 1693258945Sroberto int error; 1694258945Sroberto 1695258945Sroberto /* Remote component is 1 for now. */ 1696258945Sroberto ncomp = 1; 1697258945Sroberto 1698258945Sroberto for (;;) { 1699258945Sroberto /* Wait until there is anything to receive. */ 1700258945Sroberto mtx_lock(&hio_recv_list_lock[ncomp]); 1701258945Sroberto while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1702258945Sroberto pjdlog_debug(2, "remote_recv: No requests, waiting."); 1703258945Sroberto cv_wait(&hio_recv_list_cond[ncomp], 1704258945Sroberto &hio_recv_list_lock[ncomp]); 1705258945Sroberto } 1706258945Sroberto mtx_unlock(&hio_recv_list_lock[ncomp]); 1707258945Sroberto 1708258945Sroberto memsyncack = false; 1709258945Sroberto 1710258945Sroberto rw_rlock(&hio_remote_lock[ncomp]); 1711258945Sroberto if (!ISCONNECTED(res, ncomp)) { 1712258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1713258945Sroberto /* 1714258945Sroberto * Connection is dead, so move all pending requests to 1715258945Sroberto * the done queue (one-by-one). 1716258945Sroberto */ 1717258945Sroberto mtx_lock(&hio_recv_list_lock[ncomp]); 1718280849Scy hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1719280849Scy PJDLOG_ASSERT(hio != NULL); 1720280849Scy TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1721258945Sroberto hio_next[ncomp]); 1722280849Scy mtx_unlock(&hio_recv_list_lock[ncomp]); 1723280849Scy goto done_queue; 1724280849Scy } 1725280849Scy if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1726280849Scy pjdlog_errno(LOG_ERR, 1727280849Scy "Unable to receive reply header"); 1728280849Scy rw_unlock(&hio_remote_lock[ncomp]); 1729280849Scy remote_close(res, ncomp); 1730280849Scy continue; 1731280849Scy } 1732280849Scy rw_unlock(&hio_remote_lock[ncomp]); 1733280849Scy seq = nv_get_uint64(nv, "seq"); 1734258945Sroberto if (seq == 0) { 1735258945Sroberto pjdlog_error("Header contains no 'seq' field."); 1736258945Sroberto nv_free(nv); 1737258945Sroberto continue; 1738258945Sroberto } 1739258945Sroberto memsyncack = nv_exists(nv, "received"); 1740258945Sroberto mtx_lock(&hio_recv_list_lock[ncomp]); 1741258945Sroberto TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1742258945Sroberto if (hio->hio_ggio.gctl_seq == seq) { 1743258945Sroberto TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1744258945Sroberto hio_next[ncomp]); 1745258945Sroberto break; 1746280849Scy } 1747280849Scy } 1748280849Scy mtx_unlock(&hio_recv_list_lock[ncomp]); 1749280849Scy if (hio == NULL) { 1750280849Scy pjdlog_error("Found no request matching received 'seq' field (%ju).", 1751280849Scy (uintmax_t)seq); 1752258945Sroberto nv_free(nv); 1753258945Sroberto continue; 1754258945Sroberto } 1755258945Sroberto ggio = &hio->hio_ggio; 1756258945Sroberto error = nv_get_int16(nv, "error"); 1757258945Sroberto if (error != 0) { 1758258945Sroberto /* Request failed on remote side. */ 1759258945Sroberto hio->hio_errors[ncomp] = error; 1760258945Sroberto reqlog(LOG_WARNING, 0, ggio, 1761258945Sroberto "Remote request failed (%s): ", strerror(error)); 1762258945Sroberto nv_free(nv); 1763258945Sroberto goto done_queue; 1764258945Sroberto } 1765258945Sroberto switch (ggio->gctl_cmd) { 1766258945Sroberto case BIO_READ: 1767258945Sroberto rw_rlock(&hio_remote_lock[ncomp]); 1768258945Sroberto if (!ISCONNECTED(res, ncomp)) { 1769258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1770258945Sroberto nv_free(nv); 1771258945Sroberto goto done_queue; 1772258945Sroberto } 1773258945Sroberto if (hast_proto_recv_data(res, res->hr_remotein, nv, 1774258945Sroberto ggio->gctl_data, ggio->gctl_length) == -1) { 1775258945Sroberto hio->hio_errors[ncomp] = errno; 1776258945Sroberto pjdlog_errno(LOG_ERR, 1777258945Sroberto "Unable to receive reply data"); 1778258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1779258945Sroberto nv_free(nv); 1780258945Sroberto remote_close(res, ncomp); 1781258945Sroberto goto done_queue; 1782258945Sroberto } 1783258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 1784258945Sroberto break; 1785258945Sroberto case BIO_WRITE: 1786258945Sroberto case BIO_DELETE: 1787258945Sroberto case BIO_FLUSH: 1788258945Sroberto break; 1789258945Sroberto default: 1790258945Sroberto PJDLOG_ABORT("invalid condition"); 1791258945Sroberto } 1792280849Scy hio->hio_errors[ncomp] = 0; 1793258945Sroberto nv_free(nv); 1794258945Srobertodone_queue: 1795258945Sroberto if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1796258945Sroberto hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1797258945Sroberto if (refcnt_release(&hio->hio_countdown) > 0) 1798258945Sroberto continue; 1799258945Sroberto } else { 1800258945Sroberto /* 1801258945Sroberto * Depending on hio_countdown value, requests finished 1802258945Sroberto * in the following order: 1803258945Sroberto * 1804258945Sroberto * 0: local write, remote memsync, remote final 1805258945Sroberto * or 1806258945Sroberto * 0: remote memsync, local write, remote final 1807258945Sroberto * 1808258945Sroberto * 1: local write, remote memsync, (remote final) 1809258945Sroberto * or 1810258945Sroberto * 1: remote memsync, remote final, (local write) 1811258945Sroberto * 1812258945Sroberto * 2: remote memsync, (local write), (remote final) 1813258945Sroberto * or 1814258945Sroberto * 2: remote memsync, (remote final), (local write) 1815258945Sroberto */ 1816258945Sroberto switch (refcnt_release(&hio->hio_countdown)) { 1817258945Sroberto case 0: 1818258945Sroberto /* 1819258945Sroberto * Remote final reply arrived. 1820258945Sroberto */ 1821258945Sroberto PJDLOG_ASSERT(!memsyncack); 1822258945Sroberto break; 1823258945Sroberto case 1: 1824258945Sroberto if (memsyncack) { 1825258945Sroberto /* 1826258945Sroberto * Local request already finished, so we 1827258945Sroberto * can complete the write. 1828258945Sroberto */ 1829280849Scy if (hio->hio_errors[0] == 0) 1830258945Sroberto write_complete(res, hio); 1831258945Sroberto /* 1832258945Sroberto * We still need to wait for final 1833258945Sroberto * remote reply. 1834258945Sroberto */ 1835258945Sroberto pjdlog_debug(2, 1836258945Sroberto "remote_recv: (%p) Moving request back to the recv queue.", 1837258945Sroberto hio); 1838258945Sroberto mtx_lock(&hio_recv_list_lock[ncomp]); 1839258945Sroberto TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1840258945Sroberto hio, hio_next[ncomp]); 1841258945Sroberto mtx_unlock(&hio_recv_list_lock[ncomp]); 1842258945Sroberto } else { 1843258945Sroberto /* 1844258945Sroberto * Remote final reply arrived before 1845258945Sroberto * local write finished. 1846258945Sroberto * Nothing to do in such case. 1847258945Sroberto */ 1848258945Sroberto } 1849258945Sroberto continue; 1850258945Sroberto case 2: 1851258945Sroberto /* 1852258945Sroberto * We received remote memsync reply even before 1853258945Sroberto * local write finished. 1854258945Sroberto */ 1855258945Sroberto PJDLOG_ASSERT(memsyncack); 1856258945Sroberto 1857258945Sroberto pjdlog_debug(2, 1858258945Sroberto "remote_recv: (%p) Moving request back to the recv queue.", 1859258945Sroberto hio); 1860258945Sroberto mtx_lock(&hio_recv_list_lock[ncomp]); 1861258945Sroberto TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, 1862258945Sroberto hio_next[ncomp]); 1863258945Sroberto mtx_unlock(&hio_recv_list_lock[ncomp]); 1864258945Sroberto continue; 1865258945Sroberto default: 1866258945Sroberto PJDLOG_ABORT("Invalid hio_countdown."); 1867258945Sroberto } 1868258945Sroberto } 1869258945Sroberto if (ISSYNCREQ(hio)) { 1870258945Sroberto mtx_lock(&sync_lock); 1871258945Sroberto SYNCREQDONE(hio); 1872258945Sroberto mtx_unlock(&sync_lock); 1873258945Sroberto cv_signal(&sync_cond); 1874258945Sroberto } else { 1875258945Sroberto pjdlog_debug(2, 1876258945Sroberto "remote_recv: (%p) Moving request to the done queue.", 1877258945Sroberto hio); 1878258945Sroberto QUEUE_INSERT2(hio, done); 1879258945Sroberto } 1880258945Sroberto } 1881258945Sroberto /* NOTREACHED */ 1882258945Sroberto return (NULL); 1883258945Sroberto} 1884258945Sroberto 1885258945Sroberto/* 1886258945Sroberto * Thread sends answer to the kernel. 1887258945Sroberto */ 1888258945Srobertostatic void * 1889258945Srobertoggate_send_thread(void *arg) 1890258945Sroberto{ 1891258945Sroberto struct hast_resource *res = arg; 1892258945Sroberto struct g_gate_ctl_io *ggio; 1893258945Sroberto struct hio *hio; 1894258945Sroberto unsigned int ii, ncomps; 1895258945Sroberto 1896258945Sroberto ncomps = HAST_NCOMPONENTS; 1897258945Sroberto 1898258945Sroberto for (;;) { 1899258945Sroberto pjdlog_debug(2, "ggate_send: Taking request."); 1900258945Sroberto QUEUE_TAKE2(hio, done); 1901258945Sroberto pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1902258945Sroberto ggio = &hio->hio_ggio; 1903258945Sroberto for (ii = 0; ii < ncomps; ii++) { 1904258945Sroberto if (hio->hio_errors[ii] == 0) { 1905258945Sroberto /* 1906258945Sroberto * One successful request is enough to declare 1907258945Sroberto * success. 1908258945Sroberto */ 1909258945Sroberto ggio->gctl_error = 0; 1910258945Sroberto break; 1911258945Sroberto } 1912258945Sroberto } 1913258945Sroberto if (ii == ncomps) { 1914258945Sroberto /* 1915258945Sroberto * None of the requests were successful. 1916258945Sroberto * Use the error from local component except the 1917258945Sroberto * case when we did only remote request. 1918258945Sroberto */ 1919258945Sroberto if (ggio->gctl_cmd == BIO_READ && 1920258945Sroberto res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1921258945Sroberto ggio->gctl_error = hio->hio_errors[1]; 1922258945Sroberto else 1923258945Sroberto ggio->gctl_error = hio->hio_errors[0]; 1924258945Sroberto } 1925258945Sroberto if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1926258945Sroberto mtx_lock(&res->hr_amp_lock); 1927258945Sroberto if (activemap_write_complete(res->hr_amp, 1928258945Sroberto ggio->gctl_offset, ggio->gctl_length)) { 1929258945Sroberto res->hr_stat_activemap_update++; 1930258945Sroberto (void)hast_activemap_flush(res); 1931258945Sroberto } else { 1932258945Sroberto mtx_unlock(&res->hr_amp_lock); 1933258945Sroberto } 1934258945Sroberto } 1935258945Sroberto if (ggio->gctl_cmd == BIO_WRITE) { 1936258945Sroberto /* 1937258945Sroberto * Unlock range we locked. 1938258945Sroberto */ 1939258945Sroberto mtx_lock(&range_lock); 1940280849Scy rangelock_del(range_regular, ggio->gctl_offset, 1941258945Sroberto ggio->gctl_length); 1942258945Sroberto if (range_sync_wait) 1943258945Sroberto cv_signal(&range_sync_cond); 1944258945Sroberto mtx_unlock(&range_lock); 1945258945Sroberto if (!hio->hio_done) 1946258945Sroberto write_complete(res, hio); 1947258945Sroberto } else { 1948258945Sroberto if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1949258945Sroberto primary_exit(EX_OSERR, 1950258945Sroberto "G_GATE_CMD_DONE failed"); 1951258945Sroberto } 1952258945Sroberto } 1953258945Sroberto if (hio->hio_errors[0]) { 1954258945Sroberto switch (ggio->gctl_cmd) { 1955258945Sroberto case BIO_READ: 1956258945Sroberto res->hr_stat_read_error++; 1957258945Sroberto break; 1958258945Sroberto case BIO_WRITE: 1959258945Sroberto res->hr_stat_write_error++; 1960258945Sroberto break; 1961258945Sroberto case BIO_DELETE: 1962258945Sroberto res->hr_stat_delete_error++; 1963258945Sroberto break; 1964258945Sroberto case BIO_FLUSH: 1965258945Sroberto res->hr_stat_flush_error++; 1966258945Sroberto break; 1967258945Sroberto } 1968258945Sroberto } 1969258945Sroberto pjdlog_debug(2, 1970258945Sroberto "ggate_send: (%p) Moving request to the free queue.", hio); 1971258945Sroberto QUEUE_INSERT2(hio, free); 1972258945Sroberto } 1973258945Sroberto /* NOTREACHED */ 1974258945Sroberto return (NULL); 1975258945Sroberto} 1976258945Sroberto 1977258945Sroberto/* 1978258945Sroberto * Thread synchronize local and remote components. 1979258945Sroberto */ 1980258945Srobertostatic void * 1981258945Srobertosync_thread(void *arg __unused) 1982258945Sroberto{ 1983258945Sroberto struct hast_resource *res = arg; 1984258945Sroberto struct hio *hio; 1985258945Sroberto struct g_gate_ctl_io *ggio; 1986258945Sroberto struct timeval tstart, tend, tdiff; 1987258945Sroberto unsigned int ii, ncomp, ncomps; 1988258945Sroberto off_t offset, length, synced; 1989258945Sroberto bool dorewind, directreads; 1990258945Sroberto int syncext; 1991258945Sroberto 1992258945Sroberto ncomps = HAST_NCOMPONENTS; 1993258945Sroberto dorewind = true; 1994258945Sroberto synced = 0; 1995258945Sroberto offset = -1; 1996258945Sroberto directreads = false; 1997258945Sroberto 1998258945Sroberto for (;;) { 1999258945Sroberto mtx_lock(&sync_lock); 2000258945Sroberto if (offset >= 0 && !sync_inprogress) { 2001258945Sroberto gettimeofday(&tend, NULL); 2002258945Sroberto timersub(&tend, &tstart, &tdiff); 2003280849Scy pjdlog_info("Synchronization interrupted after %#.0T. " 2004258945Sroberto "%NB synchronized so far.", &tdiff, 2005280849Scy (intmax_t)synced); 2006280849Scy event_send(res, EVENT_SYNCINTR); 2007258945Sroberto } 2008258945Sroberto while (!sync_inprogress) { 2009258945Sroberto dorewind = true; 2010258945Sroberto synced = 0; 2011258945Sroberto cv_wait(&sync_cond, &sync_lock); 2012258945Sroberto } 2013258945Sroberto mtx_unlock(&sync_lock); 2014258945Sroberto /* 2015258945Sroberto * Obtain offset at which we should synchronize. 2016258945Sroberto * Rewind synchronization if needed. 2017258945Sroberto */ 2018258945Sroberto mtx_lock(&res->hr_amp_lock); 2019258945Sroberto if (dorewind) 2020258945Sroberto activemap_sync_rewind(res->hr_amp); 2021258945Sroberto offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2022258945Sroberto if (syncext != -1) { 2023258945Sroberto /* 2024258945Sroberto * We synchronized entire syncext extent, we can mark 2025258945Sroberto * it as clean now. 2026258945Sroberto */ 2027280849Scy if (activemap_extent_complete(res->hr_amp, syncext)) 2028258945Sroberto (void)hast_activemap_flush(res); 2029258945Sroberto else 2030280849Scy mtx_unlock(&res->hr_amp_lock); 2031258945Sroberto } else { 2032280849Scy mtx_unlock(&res->hr_amp_lock); 2033280849Scy } 2034280849Scy if (dorewind) { 2035258945Sroberto dorewind = false; 2036258945Sroberto if (offset == -1) 2037258945Sroberto pjdlog_info("Nodes are in sync."); 2038258945Sroberto else { 2039280849Scy pjdlog_info("Synchronization started. %NB to go.", 2040280849Scy (intmax_t)(res->hr_extentsize * 2041258945Sroberto activemap_ndirty(res->hr_amp))); 2042280849Scy event_send(res, EVENT_SYNCSTART); 2043258945Sroberto gettimeofday(&tstart, NULL); 2044258945Sroberto } 2045258945Sroberto } 2046258945Sroberto if (offset == -1) { 2047258945Sroberto sync_stop(); 2048258945Sroberto pjdlog_debug(1, "Nothing to synchronize."); 2049258945Sroberto /* 2050258945Sroberto * Synchronization complete, make both localcnt and 2051280849Scy * remotecnt equal. 2052280849Scy */ 2053258945Sroberto ncomp = 1; 2054258945Sroberto rw_rlock(&hio_remote_lock[ncomp]); 2055258945Sroberto if (ISCONNECTED(res, ncomp)) { 2056258945Sroberto if (synced > 0) { 2057258945Sroberto int64_t bps; 2058280849Scy 2059258945Sroberto gettimeofday(&tend, NULL); 2060258945Sroberto timersub(&tend, &tstart, &tdiff); 2061258945Sroberto bps = (int64_t)((double)synced / 2062258945Sroberto ((double)tdiff.tv_sec + 2063258945Sroberto (double)tdiff.tv_usec / 1000000)); 2064258945Sroberto pjdlog_info("Synchronization complete. " 2065258945Sroberto "%NB synchronized in %#.0lT (%NB/sec).", 2066258945Sroberto (intmax_t)synced, &tdiff, 2067258945Sroberto (intmax_t)bps); 2068258945Sroberto event_send(res, EVENT_SYNCDONE); 2069258945Sroberto } 2070258945Sroberto mtx_lock(&metadata_lock); 2071280849Scy if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2072258945Sroberto directreads = true; 2073258945Sroberto res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2074258945Sroberto res->hr_primary_localcnt = 2075258945Sroberto res->hr_secondary_remotecnt; 2076258945Sroberto res->hr_primary_remotecnt = 2077258945Sroberto res->hr_secondary_localcnt; 2078258945Sroberto pjdlog_debug(1, 2079280849Scy "Setting localcnt to %ju and remotecnt to %ju.", 2080280849Scy (uintmax_t)res->hr_primary_localcnt, 2081258945Sroberto (uintmax_t)res->hr_primary_remotecnt); 2082280849Scy (void)metadata_write(res); 2083258945Sroberto mtx_unlock(&metadata_lock); 2084258945Sroberto } 2085258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 2086258945Sroberto if (directreads) { 2087280849Scy directreads = false; 2088280849Scy enable_direct_reads(res); 2089280849Scy } 2090280849Scy continue; 2091280849Scy } 2092280849Scy pjdlog_debug(2, "sync: Taking free request."); 2093280849Scy QUEUE_TAKE2(hio, free); 2094258945Sroberto pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2095280849Scy /* 2096258945Sroberto * Lock the range we are going to synchronize. We don't want 2097258945Sroberto * race where someone writes between our read and write. 2098258945Sroberto */ 2099280849Scy for (;;) { 2100280849Scy mtx_lock(&range_lock); 2101258945Sroberto if (rangelock_islocked(range_regular, offset, length)) { 2102280849Scy pjdlog_debug(2, 2103258945Sroberto "sync: Range offset=%jd length=%jd locked.", 2104258945Sroberto (intmax_t)offset, (intmax_t)length); 2105258945Sroberto range_sync_wait = true; 2106258945Sroberto cv_wait(&range_sync_cond, &range_lock); 2107258945Sroberto range_sync_wait = false; 2108258945Sroberto mtx_unlock(&range_lock); 2109258945Sroberto continue; 2110258945Sroberto } 2111258945Sroberto if (rangelock_add(range_sync, offset, length) == -1) { 2112258945Sroberto mtx_unlock(&range_lock); 2113258945Sroberto pjdlog_debug(2, 2114258945Sroberto "sync: Range offset=%jd length=%jd is already locked, waiting.", 2115258945Sroberto (intmax_t)offset, (intmax_t)length); 2116258945Sroberto sleep(1); 2117258945Sroberto continue; 2118258945Sroberto } 2119258945Sroberto mtx_unlock(&range_lock); 2120258945Sroberto break; 2121258945Sroberto } 2122258945Sroberto /* 2123258945Sroberto * First read the data from synchronization source. 2124258945Sroberto */ 2125258945Sroberto SYNCREQ(hio); 2126258945Sroberto ggio = &hio->hio_ggio; 2127258945Sroberto ggio->gctl_cmd = BIO_READ; 2128280849Scy ggio->gctl_offset = offset; 2129280849Scy ggio->gctl_length = length; 2130258945Sroberto ggio->gctl_error = 0; 2131258945Sroberto hio->hio_done = false; 2132258945Sroberto hio->hio_replication = res->hr_replication; 2133258945Sroberto for (ii = 0; ii < ncomps; ii++) 2134258945Sroberto hio->hio_errors[ii] = EINVAL; 2135258945Sroberto reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2136258945Sroberto hio); 2137258945Sroberto pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2138258945Sroberto hio); 2139258945Sroberto mtx_lock(&metadata_lock); 2140258945Sroberto if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2141258945Sroberto /* 2142258945Sroberto * This range is up-to-date on local component, 2143280849Scy * so handle request locally. 2144280849Scy */ 2145258945Sroberto /* Local component is 0 for now. */ 2146258945Sroberto ncomp = 0; 2147258945Sroberto } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2148258945Sroberto PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2149258945Sroberto /* 2150258945Sroberto * This range is out-of-date on local component, 2151258945Sroberto * so send request to the remote node. 2152258945Sroberto */ 2153258945Sroberto /* Remote component is 1 for now. */ 2154258945Sroberto ncomp = 1; 2155258945Sroberto } 2156258945Sroberto mtx_unlock(&metadata_lock); 2157258945Sroberto refcnt_init(&hio->hio_countdown, 1); 2158258945Sroberto QUEUE_INSERT1(hio, send, ncomp); 2159258945Sroberto 2160258945Sroberto /* 2161258945Sroberto * Let's wait for READ to finish. 2162258945Sroberto */ 2163258945Sroberto mtx_lock(&sync_lock); 2164258945Sroberto while (!ISSYNCREQDONE(hio)) 2165258945Sroberto cv_wait(&sync_cond, &sync_lock); 2166258945Sroberto mtx_unlock(&sync_lock); 2167258945Sroberto 2168258945Sroberto if (hio->hio_errors[ncomp] != 0) { 2169280849Scy pjdlog_error("Unable to read synchronization data: %s.", 2170280849Scy strerror(hio->hio_errors[ncomp])); 2171258945Sroberto goto free_queue; 2172258945Sroberto } 2173258945Sroberto 2174258945Sroberto /* 2175258945Sroberto * We read the data from synchronization source, now write it 2176258945Sroberto * to synchronization target. 2177258945Sroberto */ 2178258945Sroberto SYNCREQ(hio); 2179258945Sroberto ggio->gctl_cmd = BIO_WRITE; 2180258945Sroberto for (ii = 0; ii < ncomps; ii++) 2181258945Sroberto hio->hio_errors[ii] = EINVAL; 2182258945Sroberto reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2183258945Sroberto hio); 2184258945Sroberto pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2185258945Sroberto hio); 2186258945Sroberto mtx_lock(&metadata_lock); 2187258945Sroberto if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2188258945Sroberto /* 2189258945Sroberto * This range is up-to-date on local component, 2190280849Scy * so we update remote component. 2191280849Scy */ 2192258945Sroberto /* Remote component is 1 for now. */ 2193258945Sroberto ncomp = 1; 2194258945Sroberto } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2195258945Sroberto PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2196258945Sroberto /* 2197258945Sroberto * This range is out-of-date on local component, 2198258945Sroberto * so we update it. 2199258945Sroberto */ 2200258945Sroberto /* Local component is 0 for now. */ 2201258945Sroberto ncomp = 0; 2202258945Sroberto } 2203258945Sroberto mtx_unlock(&metadata_lock); 2204258945Sroberto 2205258945Sroberto pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2206258945Sroberto hio); 2207258945Sroberto refcnt_init(&hio->hio_countdown, 1); 2208258945Sroberto QUEUE_INSERT1(hio, send, ncomp); 2209258945Sroberto 2210258945Sroberto /* 2211258945Sroberto * Let's wait for WRITE to finish. 2212258945Sroberto */ 2213258945Sroberto mtx_lock(&sync_lock); 2214258945Sroberto while (!ISSYNCREQDONE(hio)) 2215258945Sroberto cv_wait(&sync_cond, &sync_lock); 2216258945Sroberto mtx_unlock(&sync_lock); 2217258945Sroberto 2218258945Sroberto if (hio->hio_errors[ncomp] != 0) { 2219258945Sroberto pjdlog_error("Unable to write synchronization data: %s.", 2220258945Sroberto strerror(hio->hio_errors[ncomp])); 2221258945Sroberto goto free_queue; 2222258945Sroberto } 2223258945Sroberto 2224258945Sroberto synced += length; 2225258945Srobertofree_queue: 2226258945Sroberto mtx_lock(&range_lock); 2227258945Sroberto rangelock_del(range_sync, offset, length); 2228258945Sroberto if (range_regular_wait) 2229258945Sroberto cv_signal(&range_regular_cond); 2230258945Sroberto mtx_unlock(&range_lock); 2231258945Sroberto pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2232258945Sroberto hio); 2233258945Sroberto QUEUE_INSERT2(hio, free); 2234258945Sroberto } 2235258945Sroberto /* NOTREACHED */ 2236258945Sroberto return (NULL); 2237258945Sroberto} 2238258945Sroberto 2239280849Scyvoid 2240280849Scyprimary_config_reload(struct hast_resource *res, struct nv *nv) 2241280849Scy{ 2242280849Scy unsigned int ii, ncomps; 2243258945Sroberto int modified, vint; 2244258945Sroberto const char *vstr; 2245258945Sroberto 2246258945Sroberto pjdlog_info("Reloading configuration..."); 2247258945Sroberto 2248258945Sroberto PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2249258945Sroberto PJDLOG_ASSERT(gres == res); 2250258945Sroberto nv_assert(nv, "remoteaddr"); 2251258945Sroberto nv_assert(nv, "sourceaddr"); 2252258945Sroberto nv_assert(nv, "replication"); 2253258945Sroberto nv_assert(nv, "checksum"); 2254258945Sroberto nv_assert(nv, "compression"); 2255280849Scy nv_assert(nv, "timeout"); 2256280849Scy nv_assert(nv, "exec"); 2257280849Scy nv_assert(nv, "metaflush"); 2258280849Scy 2259280849Scy ncomps = HAST_NCOMPONENTS; 2260280849Scy 2261280849Scy#define MODIFIED_REMOTEADDR 0x01 2262280849Scy#define MODIFIED_SOURCEADDR 0x02 2263280849Scy#define MODIFIED_REPLICATION 0x04 2264280849Scy#define MODIFIED_CHECKSUM 0x08 2265280849Scy#define MODIFIED_COMPRESSION 0x10 2266280849Scy#define MODIFIED_TIMEOUT 0x20 2267280849Scy#define MODIFIED_EXEC 0x40 2268280849Scy#define MODIFIED_METAFLUSH 0x80 2269280849Scy modified = 0; 2270280849Scy 2271280849Scy vstr = nv_get_string(nv, "remoteaddr"); 2272280849Scy if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2273280849Scy /* 2274280849Scy * Don't copy res->hr_remoteaddr to gres just yet. 2275280849Scy * We want remote_close() to log disconnect from the old 2276280849Scy * addresses, not from the new ones. 2277280849Scy */ 2278258945Sroberto modified |= MODIFIED_REMOTEADDR; 2279258945Sroberto } 2280258945Sroberto vstr = nv_get_string(nv, "sourceaddr"); 2281258945Sroberto if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2282258945Sroberto strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2283258945Sroberto modified |= MODIFIED_SOURCEADDR; 2284258945Sroberto } 2285258945Sroberto vint = nv_get_int32(nv, "replication"); 2286258945Sroberto if (gres->hr_replication != vint) { 2287258945Sroberto gres->hr_replication = vint; 2288258945Sroberto modified |= MODIFIED_REPLICATION; 2289258945Sroberto } 2290258945Sroberto vint = nv_get_int32(nv, "checksum"); 2291258945Sroberto if (gres->hr_checksum != vint) { 2292258945Sroberto gres->hr_checksum = vint; 2293258945Sroberto modified |= MODIFIED_CHECKSUM; 2294258945Sroberto } 2295258945Sroberto vint = nv_get_int32(nv, "compression"); 2296258945Sroberto if (gres->hr_compression != vint) { 2297258945Sroberto gres->hr_compression = vint; 2298258945Sroberto modified |= MODIFIED_COMPRESSION; 2299258945Sroberto } 2300258945Sroberto vint = nv_get_int32(nv, "timeout"); 2301258945Sroberto if (gres->hr_timeout != vint) { 2302258945Sroberto gres->hr_timeout = vint; 2303258945Sroberto modified |= MODIFIED_TIMEOUT; 2304258945Sroberto } 2305258945Sroberto vstr = nv_get_string(nv, "exec"); 2306258945Sroberto if (strcmp(gres->hr_exec, vstr) != 0) { 2307258945Sroberto strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2308258945Sroberto modified |= MODIFIED_EXEC; 2309258945Sroberto } 2310258945Sroberto vint = nv_get_int32(nv, "metaflush"); 2311258945Sroberto if (gres->hr_metaflush != vint) { 2312258945Sroberto gres->hr_metaflush = vint; 2313258945Sroberto modified |= MODIFIED_METAFLUSH; 2314258945Sroberto } 2315258945Sroberto 2316258945Sroberto /* 2317258945Sroberto * Change timeout for connected sockets. 2318258945Sroberto * Don't bother if we need to reconnect. 2319258945Sroberto */ 2320258945Sroberto if ((modified & MODIFIED_TIMEOUT) != 0 && 2321258945Sroberto (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2322258945Sroberto for (ii = 0; ii < ncomps; ii++) { 2323258945Sroberto if (!ISREMOTE(ii)) 2324258945Sroberto continue; 2325258945Sroberto rw_rlock(&hio_remote_lock[ii]); 2326258945Sroberto if (!ISCONNECTED(gres, ii)) { 2327258945Sroberto rw_unlock(&hio_remote_lock[ii]); 2328258945Sroberto continue; 2329258945Sroberto } 2330258945Sroberto rw_unlock(&hio_remote_lock[ii]); 2331258945Sroberto if (proto_timeout(gres->hr_remotein, 2332258945Sroberto gres->hr_timeout) == -1) { 2333258945Sroberto pjdlog_errno(LOG_WARNING, 2334258945Sroberto "Unable to set connection timeout"); 2335258945Sroberto } 2336258945Sroberto if (proto_timeout(gres->hr_remoteout, 2337258945Sroberto gres->hr_timeout) == -1) { 2338258945Sroberto pjdlog_errno(LOG_WARNING, 2339258945Sroberto "Unable to set connection timeout"); 2340258945Sroberto } 2341258945Sroberto } 2342258945Sroberto } 2343258945Sroberto if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2344258945Sroberto for (ii = 0; ii < ncomps; ii++) { 2345258945Sroberto if (!ISREMOTE(ii)) 2346258945Sroberto continue; 2347258945Sroberto remote_close(gres, ii); 2348258945Sroberto } 2349258945Sroberto if (modified & MODIFIED_REMOTEADDR) { 2350258945Sroberto vstr = nv_get_string(nv, "remoteaddr"); 2351258945Sroberto strlcpy(gres->hr_remoteaddr, vstr, 2352258945Sroberto sizeof(gres->hr_remoteaddr)); 2353258945Sroberto } 2354280849Scy } 2355280849Scy#undef MODIFIED_REMOTEADDR 2356280849Scy#undef MODIFIED_SOURCEADDR 2357280849Scy#undef MODIFIED_REPLICATION 2358280849Scy#undef MODIFIED_CHECKSUM 2359258945Sroberto#undef MODIFIED_COMPRESSION 2360280849Scy#undef MODIFIED_TIMEOUT 2361258945Sroberto#undef MODIFIED_EXEC 2362258945Sroberto#undef MODIFIED_METAFLUSH 2363258945Sroberto 2364258945Sroberto pjdlog_info("Configuration reloaded successfully."); 2365258945Sroberto} 2366258945Sroberto 2367258945Srobertostatic void 2368258945Srobertoguard_one(struct hast_resource *res, unsigned int ncomp) 2369258945Sroberto{ 2370258945Sroberto struct proto_conn *in, *out; 2371258945Sroberto 2372258945Sroberto if (!ISREMOTE(ncomp)) 2373258945Sroberto return; 2374258945Sroberto 2375258945Sroberto rw_rlock(&hio_remote_lock[ncomp]); 2376258945Sroberto 2377258945Sroberto if (!real_remote(res)) { 2378258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 2379258945Sroberto return; 2380258945Sroberto } 2381258945Sroberto 2382258945Sroberto if (ISCONNECTED(res, ncomp)) { 2383258945Sroberto PJDLOG_ASSERT(res->hr_remotein != NULL); 2384258945Sroberto PJDLOG_ASSERT(res->hr_remoteout != NULL); 2385258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 2386258945Sroberto pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2387258945Sroberto res->hr_remoteaddr); 2388258945Sroberto return; 2389258945Sroberto } 2390258945Sroberto 2391258945Sroberto PJDLOG_ASSERT(res->hr_remotein == NULL); 2392258945Sroberto PJDLOG_ASSERT(res->hr_remoteout == NULL); 2393258945Sroberto /* 2394258945Sroberto * Upgrade the lock. It doesn't have to be atomic as no other thread 2395258945Sroberto * can change connection status from disconnected to connected. 2396258945Sroberto */ 2397258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 2398258945Sroberto pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2399258945Sroberto res->hr_remoteaddr); 2400258945Sroberto in = out = NULL; 2401258945Sroberto if (init_remote(res, &in, &out) == 0) { 2402258945Sroberto rw_wlock(&hio_remote_lock[ncomp]); 2403258945Sroberto PJDLOG_ASSERT(res->hr_remotein == NULL); 2404258945Sroberto PJDLOG_ASSERT(res->hr_remoteout == NULL); 2405258945Sroberto PJDLOG_ASSERT(in != NULL && out != NULL); 2406258945Sroberto res->hr_remotein = in; 2407258945Sroberto res->hr_remoteout = out; 2408258945Sroberto rw_unlock(&hio_remote_lock[ncomp]); 2409258945Sroberto pjdlog_info("Successfully reconnected to %s.", 2410258945Sroberto res->hr_remoteaddr); 2411258945Sroberto sync_start(); 2412258945Sroberto } else { 2413258945Sroberto /* Both connections should be NULL. */ 2414258945Sroberto PJDLOG_ASSERT(res->hr_remotein == NULL); 2415258945Sroberto PJDLOG_ASSERT(res->hr_remoteout == NULL); 2416258945Sroberto PJDLOG_ASSERT(in == NULL && out == NULL); 2417258945Sroberto pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2418258945Sroberto res->hr_remoteaddr); 2419258945Sroberto } 2420258945Sroberto} 2421258945Sroberto 2422258945Sroberto/* 2423258945Sroberto * Thread guards remote connections and reconnects when needed, handles 2424258945Sroberto * signals, etc. 2425258945Sroberto */ 2426258945Srobertostatic void * 2427258945Srobertoguard_thread(void *arg) 2428258945Sroberto{ 2429258945Sroberto struct hast_resource *res = arg; 2430258945Sroberto unsigned int ii, ncomps; 2431258945Sroberto struct timespec timeout; 2432258945Sroberto time_t lastcheck, now; 2433258945Sroberto sigset_t mask; 2434258945Sroberto int signo; 2435258945Sroberto 2436258945Sroberto ncomps = HAST_NCOMPONENTS; 2437258945Sroberto lastcheck = time(NULL); 2438258945Sroberto 2439258945Sroberto PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2440258945Sroberto PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2441258945Sroberto PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2442258945Sroberto 2443258945Sroberto timeout.tv_sec = HAST_KEEPALIVE; 2444258945Sroberto timeout.tv_nsec = 0; 2445258945Sroberto signo = -1; 2446258945Sroberto 2447258945Sroberto for (;;) { 2448258945Sroberto switch (signo) { 2449258945Sroberto case SIGINT: 2450258945Sroberto case SIGTERM: 2451258945Sroberto sigexit_received = true; 2452258945Sroberto primary_exitx(EX_OK, 2453258945Sroberto "Termination signal received, exiting."); 2454258945Sroberto break; 2455258945Sroberto default: 2456258945Sroberto break; 2457258945Sroberto } 2458258945Sroberto 2459258945Sroberto /* 2460280849Scy * Don't check connections until we fully started, 2461280849Scy * as we may still be looping, waiting for remote node 2462280849Scy * to switch from primary to secondary. 2463280849Scy */ 2464280849Scy if (fullystarted) { 2465280849Scy pjdlog_debug(2, "remote_guard: Checking connections."); 2466280849Scy now = time(NULL); 2467280849Scy if (lastcheck + HAST_KEEPALIVE <= now) { 2468280849Scy for (ii = 0; ii < ncomps; ii++) 2469280849Scy guard_one(res, ii); 2470280849Scy lastcheck = now; 2471280849Scy } 2472280849Scy } 2473280849Scy signo = sigtimedwait(&mask, NULL, &timeout); 2474280849Scy } 2475280849Scy /* NOTREACHED */ 2476280849Scy return (NULL); 2477280849Scy} 2478258945Sroberto