primary.c revision 219882
1262569Simp/*- 2262569Simp * Copyright (c) 2009 The FreeBSD Foundation 3262569Simp * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4262569Simp * All rights reserved. 5262569Simp * 6262569Simp * This software was developed by Pawel Jakub Dawidek under sponsorship from 7262569Simp * the FreeBSD Foundation. 8262569Simp * 9262569Simp * Redistribution and use in source and binary forms, with or without 10262569Simp * modification, are permitted provided that the following conditions 11262569Simp * are met: 12262569Simp * 1. Redistributions of source code must retain the above copyright 13262569Simp * notice, this list of conditions and the following disclaimer. 14262569Simp * 2. Redistributions in binary form must reproduce the above copyright 15270864Simp * notice, this list of conditions and the following disclaimer in the 16262569Simp * documentation and/or other materials provided with the distribution. 17262569Simp * 18262569Simp * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19262569Simp * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20262569Simp * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21262569Simp * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22262569Simp * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23262569Simp * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24262569Simp * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25262569Simp * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26262569Simp * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27262569Simp * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28262569Simp * SUCH DAMAGE. 29262569Simp */ 30262569Simp 31262569Simp#include <sys/cdefs.h> 32262569Simp__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 219882 2011-03-22 20:27:26Z trociny $"); 33262569Simp 34262569Simp#include <sys/types.h> 35262569Simp#include <sys/time.h> 36262569Simp#include <sys/bio.h> 37262569Simp#include <sys/disk.h> 38262569Simp#include <sys/refcount.h> 39262569Simp#include <sys/stat.h> 40262569Simp 41262569Simp#include <geom/gate/g_gate.h> 42262569Simp 43262569Simp#include <err.h> 44262569Simp#include <errno.h> 45262569Simp#include <fcntl.h> 46262569Simp#include <libgeom.h> 47262569Simp#include <pthread.h> 48262569Simp#include <signal.h> 49262569Simp#include <stdint.h> 50262569Simp#include <stdio.h> 51262569Simp#include <string.h> 52270864Simp#include <sysexits.h> 53270864Simp#include <unistd.h> 54270864Simp 55270864Simp#include <activemap.h> 56270864Simp#include <nv.h> 57270864Simp#include <rangelock.h> 58270864Simp 59270864Simp#include "control.h" 60270864Simp#include "event.h" 61270864Simp#include "hast.h" 62270864Simp#include "hast_proto.h" 63270864Simp#include "hastd.h" 64270864Simp#include "hooks.h" 65270864Simp#include "metadata.h" 66270864Simp#include "proto.h" 67270864Simp#include "pjdlog.h" 68270864Simp#include "subr.h" 69270864Simp#include "synch.h" 70270864Simp 71270864Simp/* The is only one remote component for now. */ 72284090Sian#define ISREMOTE(no) ((no) == 1) 73284090Sian 74284090Sianstruct hio { 75284090Sian /* 76284090Sian * Number of components we are still waiting for. 77262569Simp * When this field goes to 0, we can send the request back to the 78262569Simp * kernel. Each component has to decrease this counter by one 79262569Simp * even on failure. 80262569Simp */ 81262569Simp unsigned int hio_countdown; 82262569Simp /* 83262569Simp * Each component has a place to store its own error. 84262569Simp * Once the request is handled by all components we can decide if the 85262569Simp * request overall is successful or not. 86262569Simp */ 87262569Simp int *hio_errors; 88262569Simp /* 89262569Simp * Structure used to communicate with GEOM Gate class. 90262569Simp */ 91262569Simp struct g_gate_ctl_io hio_ggio; 92262569Simp TAILQ_ENTRY(hio) *hio_next; 93262569Simp}; 94262569Simp#define hio_free_next hio_next[0] 95262569Simp#define hio_done_next hio_next[0] 96262569Simp 97262569Simp/* 98262569Simp * Free list holds unused structures. When free list is empty, we have to wait 99262569Simp * until some in-progress requests are freed. 100262569Simp */ 101262569Simpstatic TAILQ_HEAD(, hio) hio_free_list; 102262569Simpstatic pthread_mutex_t hio_free_list_lock; 103270864Simpstatic pthread_cond_t hio_free_list_cond; 104262569Simp/* 105270864Simp * There is one send list for every component. One requests is placed on all 106270864Simp * send lists - each component gets the same request, but each component is 107270864Simp * responsible for managing his own send list. 108270864Simp */ 109270864Simpstatic TAILQ_HEAD(, hio) *hio_send_list; 110270864Simpstatic pthread_mutex_t *hio_send_list_lock; 111270864Simpstatic pthread_cond_t *hio_send_list_cond; 112270864Simp/* 113270864Simp * There is one recv list for every component, although local components don't 114270864Simp * use recv lists as local requests are done synchronously. 115270864Simp */ 116270864Simpstatic TAILQ_HEAD(, hio) *hio_recv_list; 117270864Simpstatic pthread_mutex_t *hio_recv_list_lock; 118270864Simpstatic pthread_cond_t *hio_recv_list_cond; 119270864Simp/* 120270864Simp * Request is placed on done list by the slowest component (the one that 121270864Simp * decreased hio_countdown from 1 to 0). 122270864Simp */ 123270864Simpstatic TAILQ_HEAD(, hio) hio_done_list; 124270864Simpstatic pthread_mutex_t hio_done_list_lock; 125270864Simpstatic pthread_cond_t hio_done_list_cond; 126270864Simp/* 127270864Simp * Structure below are for interaction with sync thread. 128270864Simp */ 129270864Simpstatic bool sync_inprogress; 130270864Simpstatic pthread_mutex_t sync_lock; 131270864Simpstatic pthread_cond_t sync_cond; 132270864Simp/* 133270864Simp * The lock below allows to synchornize access to remote connections. 134270864Simp */ 135270864Simpstatic pthread_rwlock_t *hio_remote_lock; 136270864Simp 137270864Simp/* 138270864Simp * Lock to synchronize metadata updates. Also synchronize access to 139270864Simp * hr_primary_localcnt and hr_primary_remotecnt fields. 140270864Simp */ 141270864Simpstatic pthread_mutex_t metadata_lock; 142270864Simp 143270864Simp/* 144270864Simp * Maximum number of outstanding I/O requests. 145270864Simp */ 146270864Simp#define HAST_HIO_MAX 256 147270864Simp/* 148270864Simp * Number of components. At this point there are only two components: local 149270864Simp * and remote, but in the future it might be possible to use multiple local 150270864Simp * and remote components. 151270864Simp */ 152270864Simp#define HAST_NCOMPONENTS 2 153270864Simp 154270864Simp#define ISCONNECTED(res, no) \ 155270864Simp ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156270864Simp 157270864Simp#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158270864Simp bool _wakeup; \ 159270864Simp \ 160270864Simp mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161270864Simp _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162270864Simp TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163270864Simp hio_next[(ncomp)]); \ 164270864Simp mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165270864Simp if (_wakeup) \ 166270864Simp cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167270864Simp} while (0) 168270864Simp#define QUEUE_INSERT2(hio, name) do { \ 169270864Simp bool _wakeup; \ 170270864Simp \ 171270864Simp mtx_lock(&hio_##name##_list_lock); \ 172270864Simp _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173270864Simp TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174270864Simp mtx_unlock(&hio_##name##_list_lock); \ 175270864Simp if (_wakeup) \ 176270864Simp cv_signal(&hio_##name##_list_cond); \ 177270864Simp} while (0) 178270864Simp#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 179270864Simp bool _last; \ 180270864Simp \ 181270864Simp mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 182270864Simp _last = false; \ 183270864Simp while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 184270864Simp cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 185270864Simp &hio_##name##_list_lock[(ncomp)], (timeout)); \ 186270864Simp if ((timeout) != 0) \ 187270864Simp _last = true; \ 188270864Simp } \ 189270864Simp if (hio != NULL) { \ 190270864Simp TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 191270864Simp hio_next[(ncomp)]); \ 192270864Simp } \ 193270864Simp mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 194270864Simp} while (0) 195270864Simp#define QUEUE_TAKE2(hio, name) do { \ 196270864Simp mtx_lock(&hio_##name##_list_lock); \ 197270864Simp while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 198270864Simp cv_wait(&hio_##name##_list_cond, \ 199270864Simp &hio_##name##_list_lock); \ 200270864Simp } \ 201270864Simp TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 202270864Simp mtx_unlock(&hio_##name##_list_lock); \ 203270864Simp} while (0) 204270864Simp 205270864Simp#define SYNCREQ(hio) do { \ 206270864Simp (hio)->hio_ggio.gctl_unit = -1; \ 207270864Simp (hio)->hio_ggio.gctl_seq = 1; \ 208270864Simp} while (0) 209270864Simp#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 210270864Simp#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 211270864Simp#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 212270864Simp 213270864Simpstatic struct hast_resource *gres; 214270864Simp 215270864Simpstatic pthread_mutex_t range_lock; 216270864Simpstatic struct rangelocks *range_regular; 217270864Simpstatic bool range_regular_wait; 218270864Simpstatic pthread_cond_t range_regular_cond; 219270864Simpstatic struct rangelocks *range_sync; 220270864Simpstatic bool range_sync_wait; 221270864Simpstatic pthread_cond_t range_sync_cond; 222270864Simp 223270864Simpstatic void *ggate_recv_thread(void *arg); 224270864Simpstatic void *local_send_thread(void *arg); 225270864Simpstatic void *remote_send_thread(void *arg); 226270864Simpstatic void *remote_recv_thread(void *arg); 227270864Simpstatic void *ggate_send_thread(void *arg); 228270864Simpstatic void *sync_thread(void *arg); 229270864Simpstatic void *guard_thread(void *arg); 230270864Simp 231270864Simpstatic void 232270864Simpcleanup(struct hast_resource *res) 233270864Simp{ 234270864Simp int rerrno; 235270864Simp 236270864Simp /* Remember errno. */ 237270864Simp rerrno = errno; 238270864Simp 239270864Simp /* Destroy ggate provider if we created one. */ 240270864Simp if (res->hr_ggateunit >= 0) { 241270864Simp struct g_gate_ctl_destroy ggiod; 242270864Simp 243270864Simp bzero(&ggiod, sizeof(ggiod)); 244270864Simp ggiod.gctl_version = G_GATE_VERSION; 245270864Simp ggiod.gctl_unit = res->hr_ggateunit; 246270864Simp ggiod.gctl_force = 1; 247270864Simp if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 248270864Simp pjdlog_errno(LOG_WARNING, 249270864Simp "Unable to destroy hast/%s device", 250270864Simp res->hr_provname); 251270864Simp } 252270864Simp res->hr_ggateunit = -1; 253270864Simp } 254270864Simp 255270864Simp /* Restore errno. */ 256270864Simp errno = rerrno; 257270864Simp} 258270864Simp 259270864Simpstatic __dead2 void 260270864Simpprimary_exit(int exitcode, const char *fmt, ...) 261270864Simp{ 262270864Simp va_list ap; 263270864Simp 264270864Simp PJDLOG_ASSERT(exitcode != EX_OK); 265270864Simp va_start(ap, fmt); 266270864Simp pjdlogv_errno(LOG_ERR, fmt, ap); 267270864Simp va_end(ap); 268270864Simp cleanup(gres); 269270864Simp exit(exitcode); 270270864Simp} 271270864Simp 272270864Simpstatic __dead2 void 273270864Simpprimary_exitx(int exitcode, const char *fmt, ...) 274270864Simp{ 275270864Simp va_list ap; 276270864Simp 277270864Simp va_start(ap, fmt); 278270864Simp pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 279270864Simp va_end(ap); 280270864Simp cleanup(gres); 281270864Simp exit(exitcode); 282270864Simp} 283270864Simp 284270864Simpstatic int 285270864Simphast_activemap_flush(struct hast_resource *res) 286270864Simp{ 287270864Simp const unsigned char *buf; 288270864Simp size_t size; 289270864Simp 290270864Simp buf = activemap_bitmap(res->hr_amp, &size); 291270864Simp PJDLOG_ASSERT(buf != NULL); 292270864Simp PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 293270864Simp if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 294270864Simp (ssize_t)size) { 295270864Simp KEEP_ERRNO(pjdlog_errno(LOG_ERR, 296270864Simp "Unable to flush activemap to disk")); 297270864Simp return (-1); 298270864Simp } 299270864Simp return (0); 300270864Simp} 301270864Simp 302270864Simpstatic bool 303270864Simpreal_remote(const struct hast_resource *res) 304270864Simp{ 305270864Simp 306270864Simp return (strcmp(res->hr_remoteaddr, "none") != 0); 307270864Simp} 308270864Simp 309270864Simpstatic void 310270864Simpinit_environment(struct hast_resource *res __unused) 311270864Simp{ 312270864Simp struct hio *hio; 313270864Simp unsigned int ii, ncomps; 314270864Simp 315270864Simp /* 316270864Simp * In the future it might be per-resource value. 317270864Simp */ 318270864Simp ncomps = HAST_NCOMPONENTS; 319270864Simp 320270864Simp /* 321270864Simp * Allocate memory needed by lists. 322270864Simp */ 323270864Simp hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 324270864Simp if (hio_send_list == NULL) { 325270864Simp primary_exitx(EX_TEMPFAIL, 326270864Simp "Unable to allocate %zu bytes of memory for send lists.", 327270864Simp sizeof(hio_send_list[0]) * ncomps); 328270864Simp } 329270864Simp hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 330270864Simp if (hio_send_list_lock == NULL) { 331270864Simp primary_exitx(EX_TEMPFAIL, 332270864Simp "Unable to allocate %zu bytes of memory for send list locks.", 333270864Simp sizeof(hio_send_list_lock[0]) * ncomps); 334270864Simp } 335270864Simp hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 336270864Simp if (hio_send_list_cond == NULL) { 337270864Simp primary_exitx(EX_TEMPFAIL, 338270864Simp "Unable to allocate %zu bytes of memory for send list condition variables.", 339270864Simp sizeof(hio_send_list_cond[0]) * ncomps); 340270864Simp } 341270864Simp hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 342270864Simp if (hio_recv_list == NULL) { 343270864Simp primary_exitx(EX_TEMPFAIL, 344270864Simp "Unable to allocate %zu bytes of memory for recv lists.", 345270864Simp sizeof(hio_recv_list[0]) * ncomps); 346270864Simp } 347270864Simp hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 348270864Simp if (hio_recv_list_lock == NULL) { 349270864Simp primary_exitx(EX_TEMPFAIL, 350270864Simp "Unable to allocate %zu bytes of memory for recv list locks.", 351270864Simp sizeof(hio_recv_list_lock[0]) * ncomps); 352270864Simp } 353270864Simp hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 354270864Simp if (hio_recv_list_cond == NULL) { 355270864Simp primary_exitx(EX_TEMPFAIL, 356270864Simp "Unable to allocate %zu bytes of memory for recv list condition variables.", 357262569Simp sizeof(hio_recv_list_cond[0]) * ncomps); 358262569Simp } 359262569Simp hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 360262569Simp if (hio_remote_lock == NULL) { 361262569Simp primary_exitx(EX_TEMPFAIL, 362262569Simp "Unable to allocate %zu bytes of memory for remote connections locks.", 363262569Simp sizeof(hio_remote_lock[0]) * ncomps); 364262569Simp } 365262569Simp 366262569Simp /* 367262569Simp * Initialize lists, their locks and theirs condition variables. 368262569Simp */ 369262569Simp TAILQ_INIT(&hio_free_list); 370262569Simp mtx_init(&hio_free_list_lock); 371262569Simp cv_init(&hio_free_list_cond); 372262569Simp for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 373270864Simp TAILQ_INIT(&hio_send_list[ii]); 374262569Simp mtx_init(&hio_send_list_lock[ii]); 375262569Simp cv_init(&hio_send_list_cond[ii]); 376262569Simp TAILQ_INIT(&hio_recv_list[ii]); 377262569Simp mtx_init(&hio_recv_list_lock[ii]); 378262569Simp cv_init(&hio_recv_list_cond[ii]); 379262569Simp rw_init(&hio_remote_lock[ii]); 380262569Simp } 381262569Simp TAILQ_INIT(&hio_done_list); 382270864Simp mtx_init(&hio_done_list_lock); 383270864Simp cv_init(&hio_done_list_cond); 384262569Simp mtx_init(&metadata_lock); 385262569Simp 386262569Simp /* 387262569Simp * Allocate requests pool and initialize requests. 388262569Simp */ 389262569Simp for (ii = 0; ii < HAST_HIO_MAX; ii++) { 390262569Simp hio = malloc(sizeof(*hio)); 391262569Simp if (hio == NULL) { 392270864Simp primary_exitx(EX_TEMPFAIL, 393270864Simp "Unable to allocate %zu bytes of memory for hio request.", 394262569Simp sizeof(*hio)); 395262569Simp } 396262569Simp hio->hio_countdown = 0; 397262569Simp hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 398262569Simp if (hio->hio_errors == NULL) { 399262569Simp primary_exitx(EX_TEMPFAIL, 400262569Simp "Unable allocate %zu bytes of memory for hio errors.", 401262569Simp sizeof(hio->hio_errors[0]) * ncomps); 402262569Simp } 403262569Simp hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 404262569Simp if (hio->hio_next == NULL) { 405262569Simp primary_exitx(EX_TEMPFAIL, 406262569Simp "Unable allocate %zu bytes of memory for hio_next field.", 407262569Simp sizeof(hio->hio_next[0]) * ncomps); 408262569Simp } 409262569Simp hio->hio_ggio.gctl_version = G_GATE_VERSION; 410262569Simp hio->hio_ggio.gctl_data = malloc(MAXPHYS); 411262569Simp if (hio->hio_ggio.gctl_data == NULL) { 412262569Simp primary_exitx(EX_TEMPFAIL, 413262569Simp "Unable to allocate %zu bytes of memory for gctl_data.", 414262569Simp MAXPHYS); 415262569Simp } 416262569Simp hio->hio_ggio.gctl_length = MAXPHYS; 417262569Simp hio->hio_ggio.gctl_error = 0; 418262569Simp TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 419262569Simp } 420262569Simp} 421262569Simp 422262569Simpstatic bool 423262569Simpinit_resuid(struct hast_resource *res) 424262569Simp{ 425262569Simp 426262569Simp mtx_lock(&metadata_lock); 427262569Simp if (res->hr_resuid != 0) { 428262569Simp mtx_unlock(&metadata_lock); 429262569Simp return (false); 430262569Simp } else { 431262569Simp /* Initialize unique resource identifier. */ 432262569Simp arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 433262569Simp mtx_unlock(&metadata_lock); 434262569Simp if (metadata_write(res) < 0) 435262569Simp exit(EX_NOINPUT); 436262569Simp return (true); 437262569Simp } 438262569Simp} 439262569Simp 440262569Simpstatic void 441262569Simpinit_local(struct hast_resource *res) 442262569Simp{ 443262569Simp unsigned char *buf; 444262569Simp size_t mapsize; 445262569Simp 446262569Simp if (metadata_read(res, true) < 0) 447262569Simp exit(EX_NOINPUT); 448262569Simp mtx_init(&res->hr_amp_lock); 449262569Simp if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 450262569Simp res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 451262569Simp primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 452262569Simp } 453262569Simp mtx_init(&range_lock); 454262569Simp cv_init(&range_regular_cond); 455262569Simp if (rangelock_init(&range_regular) < 0) 456262569Simp primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 457262569Simp cv_init(&range_sync_cond); 458262569Simp if (rangelock_init(&range_sync) < 0) 459262569Simp primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 460262569Simp mapsize = activemap_ondisk_size(res->hr_amp); 461262569Simp buf = calloc(1, mapsize); 462262569Simp if (buf == NULL) { 463262569Simp primary_exitx(EX_TEMPFAIL, 464262569Simp "Unable to allocate buffer for activemap."); 465262569Simp } 466262569Simp if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 467262569Simp (ssize_t)mapsize) { 468262569Simp primary_exit(EX_NOINPUT, "Unable to read activemap"); 469262569Simp } 470262569Simp activemap_copyin(res->hr_amp, buf, mapsize); 471262569Simp free(buf); 472262569Simp if (res->hr_resuid != 0) 473262569Simp return; 474262569Simp /* 475262569Simp * We're using provider for the first time. Initialize local and remote 476262569Simp * counters. We don't initialize resuid here, as we want to do it just 477262569Simp * in time. The reason for this is that we want to inform secondary 478262569Simp * that there were no writes yet, so there is no need to synchronize 479262569Simp * anything. 480262569Simp */ 481262569Simp res->hr_primary_localcnt = 0; 482262569Simp res->hr_primary_remotecnt = 0; 483262569Simp if (metadata_write(res) < 0) 484262569Simp exit(EX_NOINPUT); 485262569Simp} 486262569Simp 487262569Simpstatic int 488262569Simpprimary_connect(struct hast_resource *res, struct proto_conn **connp) 489262569Simp{ 490262569Simp struct proto_conn *conn; 491262569Simp int16_t val; 492262569Simp 493262569Simp val = 1; 494262569Simp if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 495262569Simp primary_exit(EX_TEMPFAIL, 496262569Simp "Unable to send connection request to parent"); 497262569Simp } 498262569Simp if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 499262569Simp primary_exit(EX_TEMPFAIL, 500262569Simp "Unable to receive reply to connection request from parent"); 501262569Simp } 502262569Simp if (val != 0) { 503262569Simp errno = val; 504262569Simp pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 505262569Simp res->hr_remoteaddr); 506262569Simp return (-1); 507262569Simp } 508262569Simp if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 509262569Simp primary_exit(EX_TEMPFAIL, 510262569Simp "Unable to receive connection from parent"); 511262569Simp } 512262569Simp if (proto_connect_wait(conn, HAST_TIMEOUT) < 0) { 513262569Simp pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 514262569Simp res->hr_remoteaddr); 515262569Simp proto_close(conn); 516262569Simp return (-1); 517262569Simp } 518262569Simp /* Error in setting timeout is not critical, but why should it fail? */ 519262569Simp if (proto_timeout(conn, res->hr_timeout) < 0) 520262569Simp pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 521262569Simp 522262569Simp *connp = conn; 523262569Simp 524262569Simp return (0); 525262569Simp} 526262569Simp 527262569Simpstatic bool 528262569Simpinit_remote(struct hast_resource *res, struct proto_conn **inp, 529262569Simp struct proto_conn **outp) 530262569Simp{ 531262569Simp struct proto_conn *in, *out; 532262569Simp struct nv *nvout, *nvin; 533262569Simp const unsigned char *token; 534262569Simp unsigned char *map; 535262569Simp const char *errmsg; 536262569Simp int32_t extentsize; 537262569Simp int64_t datasize; 538262569Simp uint32_t mapsize; 539262569Simp size_t size; 540262569Simp 541262569Simp PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 542262569Simp PJDLOG_ASSERT(real_remote(res)); 543262569Simp 544262569Simp in = out = NULL; 545262569Simp errmsg = NULL; 546262569Simp 547262569Simp if (primary_connect(res, &out) == -1) 548262569Simp return (false); 549262569Simp 550262569Simp /* 551262569Simp * First handshake step. 552262569Simp * Setup outgoing connection with remote node. 553262569Simp */ 554262569Simp nvout = nv_alloc(); 555262569Simp nv_add_string(nvout, res->hr_name, "resource"); 556262569Simp if (nv_error(nvout) != 0) { 557262569Simp pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 558262569Simp "Unable to allocate header for connection with %s", 559262569Simp res->hr_remoteaddr); 560262569Simp nv_free(nvout); 561262569Simp goto close; 562262569Simp } 563262569Simp if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 564262569Simp pjdlog_errno(LOG_WARNING, 565262569Simp "Unable to send handshake header to %s", 566262569Simp res->hr_remoteaddr); 567262569Simp nv_free(nvout); 568262569Simp goto close; 569262569Simp } 570262569Simp nv_free(nvout); 571262569Simp if (hast_proto_recv_hdr(out, &nvin) < 0) { 572262569Simp pjdlog_errno(LOG_WARNING, 573262569Simp "Unable to receive handshake header from %s", 574262569Simp res->hr_remoteaddr); 575262569Simp goto close; 576262569Simp } 577262569Simp errmsg = nv_get_string(nvin, "errmsg"); 578262569Simp if (errmsg != NULL) { 579262569Simp pjdlog_warning("%s", errmsg); 580262569Simp nv_free(nvin); 581262569Simp goto close; 582262569Simp } 583262569Simp token = nv_get_uint8_array(nvin, &size, "token"); 584262569Simp if (token == NULL) { 585262569Simp pjdlog_warning("Handshake header from %s has no 'token' field.", 586262569Simp res->hr_remoteaddr); 587262569Simp nv_free(nvin); 588262569Simp goto close; 589262569Simp } 590262569Simp if (size != sizeof(res->hr_token)) { 591262569Simp pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 592262569Simp res->hr_remoteaddr, size, sizeof(res->hr_token)); 593262569Simp nv_free(nvin); 594262569Simp goto close; 595262569Simp } 596262569Simp bcopy(token, res->hr_token, sizeof(res->hr_token)); 597262569Simp nv_free(nvin); 598262569Simp 599262569Simp /* 600262569Simp * Second handshake step. 601262569Simp * Setup incoming connection with remote node. 602262569Simp */ 603262569Simp if (primary_connect(res, &in) == -1) 604262569Simp goto close; 605262569Simp 606262569Simp nvout = nv_alloc(); 607262569Simp nv_add_string(nvout, res->hr_name, "resource"); 608262569Simp nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 609262569Simp "token"); 610262569Simp if (res->hr_resuid == 0) { 611262569Simp /* 612262569Simp * The resuid field was not yet initialized. 613262569Simp * Because we do synchronization inside init_resuid(), it is 614262569Simp * possible that someone already initialized it, the function 615262569Simp * will return false then, but if we successfully initialized 616262569Simp * it, we will get true. True means that there were no writes 617262569Simp * to this resource yet and we want to inform secondary that 618262569Simp * synchronization is not needed by sending "virgin" argument. 619262569Simp */ 620262569Simp if (init_resuid(res)) 621262569Simp nv_add_int8(nvout, 1, "virgin"); 622262569Simp } 623262569Simp nv_add_uint64(nvout, res->hr_resuid, "resuid"); 624262569Simp nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 625262569Simp nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 626262569Simp if (nv_error(nvout) != 0) { 627262569Simp pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 628262569Simp "Unable to allocate header for connection with %s", 629262569Simp res->hr_remoteaddr); 630262569Simp nv_free(nvout); 631262569Simp goto close; 632262569Simp } 633262569Simp if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 634262569Simp pjdlog_errno(LOG_WARNING, 635262569Simp "Unable to send handshake header to %s", 636262569Simp res->hr_remoteaddr); 637262569Simp nv_free(nvout); 638262569Simp goto close; 639262569Simp } 640262569Simp nv_free(nvout); 641262569Simp if (hast_proto_recv_hdr(out, &nvin) < 0) { 642262569Simp pjdlog_errno(LOG_WARNING, 643262569Simp "Unable to receive handshake header from %s", 644262569Simp res->hr_remoteaddr); 645262569Simp goto close; 646262569Simp } 647262569Simp errmsg = nv_get_string(nvin, "errmsg"); 648262569Simp if (errmsg != NULL) { 649262569Simp pjdlog_warning("%s", errmsg); 650262569Simp nv_free(nvin); 651262569Simp goto close; 652262569Simp } 653262569Simp datasize = nv_get_int64(nvin, "datasize"); 654262569Simp if (datasize != res->hr_datasize) { 655262569Simp pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 656262569Simp (intmax_t)res->hr_datasize, (intmax_t)datasize); 657262569Simp nv_free(nvin); 658262569Simp goto close; 659262569Simp } 660262569Simp extentsize = nv_get_int32(nvin, "extentsize"); 661262569Simp if (extentsize != res->hr_extentsize) { 662262569Simp pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 663262569Simp (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 664262569Simp nv_free(nvin); 665262569Simp goto close; 666262569Simp } 667262569Simp res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 668262569Simp res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 669262569Simp res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 670262569Simp map = NULL; 671262569Simp mapsize = nv_get_uint32(nvin, "mapsize"); 672262569Simp if (mapsize > 0) { 673262569Simp map = malloc(mapsize); 674262569Simp if (map == NULL) { 675262569Simp pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 676262569Simp (uintmax_t)mapsize); 677262569Simp nv_free(nvin); 678262569Simp goto close; 679262569Simp } 680262569Simp /* 681262569Simp * Remote node have some dirty extents on its own, lets 682262569Simp * download its activemap. 683262569Simp */ 684262569Simp if (hast_proto_recv_data(res, out, nvin, map, 685262569Simp mapsize) < 0) { 686262569Simp pjdlog_errno(LOG_ERR, 687262569Simp "Unable to receive remote activemap"); 688262569Simp nv_free(nvin); 689262569Simp free(map); 690262569Simp goto close; 691262569Simp } 692262569Simp /* 693262569Simp * Merge local and remote bitmaps. 694262569Simp */ 695262569Simp activemap_merge(res->hr_amp, map, mapsize); 696262569Simp free(map); 697262569Simp /* 698262569Simp * Now that we merged bitmaps from both nodes, flush it to the 699262569Simp * disk before we start to synchronize. 700262569Simp */ 701262569Simp (void)hast_activemap_flush(res); 702262569Simp } 703262569Simp nv_free(nvin); 704262569Simp pjdlog_info("Connected to %s.", res->hr_remoteaddr); 705262569Simp if (inp != NULL && outp != NULL) { 706262569Simp *inp = in; 707262569Simp *outp = out; 708262569Simp } else { 709262569Simp res->hr_remotein = in; 710262569Simp res->hr_remoteout = out; 711262569Simp } 712262569Simp event_send(res, EVENT_CONNECT); 713262569Simp return (true); 714262569Simpclose: 715262569Simp if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 716262569Simp event_send(res, EVENT_SPLITBRAIN); 717262569Simp proto_close(out); 718262569Simp if (in != NULL) 719262569Simp proto_close(in); 720262569Simp return (false); 721262569Simp} 722262569Simp 723262569Simpstatic void 724262569Simpsync_start(void) 725262569Simp{ 726262569Simp 727262569Simp mtx_lock(&sync_lock); 728262569Simp sync_inprogress = true; 729270864Simp mtx_unlock(&sync_lock); 730262569Simp cv_signal(&sync_cond); 731262569Simp} 732262569Simp 733262569Simpstatic void 734262569Simpsync_stop(void) 735262569Simp{ 736262569Simp 737262569Simp mtx_lock(&sync_lock); 738262569Simp if (sync_inprogress) 739262569Simp sync_inprogress = false; 740270864Simp mtx_unlock(&sync_lock); 741262569Simp} 742262569Simp 743262569Simpstatic void 744262569Simpinit_ggate(struct hast_resource *res) 745262569Simp{ 746262569Simp struct g_gate_ctl_create ggiocreate; 747262569Simp struct g_gate_ctl_cancel ggiocancel; 748262569Simp 749262569Simp /* 750262569Simp * We communicate with ggate via /dev/ggctl. Open it. 751270864Simp */ 752262569Simp res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 753262569Simp if (res->hr_ggatefd < 0) 754262569Simp primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 755262569Simp /* 756262569Simp * Create provider before trying to connect, as connection failure 757262569Simp * is not critical, but may take some time. 758262569Simp */ 759262569Simp bzero(&ggiocreate, sizeof(ggiocreate)); 760262569Simp ggiocreate.gctl_version = G_GATE_VERSION; 761270864Simp ggiocreate.gctl_mediasize = res->hr_datasize; 762270864Simp ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 763262569Simp ggiocreate.gctl_flags = 0; 764262569Simp ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 765262569Simp ggiocreate.gctl_timeout = 0; 766262569Simp ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 767262569Simp snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 768262569Simp res->hr_provname); 769262569Simp if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 770262569Simp pjdlog_info("Device hast/%s created.", res->hr_provname); 771262569Simp res->hr_ggateunit = ggiocreate.gctl_unit; 772262569Simp return; 773262569Simp } 774270864Simp if (errno != EEXIST) { 775270864Simp primary_exit(EX_OSERR, "Unable to create hast/%s device", 776262569Simp res->hr_provname); 777262569Simp } 778262569Simp pjdlog_debug(1, 779262569Simp "Device hast/%s already exists, we will try to take it over.", 780262569Simp res->hr_provname); 781262569Simp /* 782262569Simp * If we received EEXIST, we assume that the process who created the 783262569Simp * provider died and didn't clean up. In that case we will start from 784262569Simp * where he left of. 785262569Simp */ 786262569Simp bzero(&ggiocancel, sizeof(ggiocancel)); 787270864Simp ggiocancel.gctl_version = G_GATE_VERSION; 788270864Simp ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 789262569Simp snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 790262569Simp res->hr_provname); 791262569Simp if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 792262569Simp pjdlog_info("Device hast/%s recovered.", res->hr_provname); 793262569Simp res->hr_ggateunit = ggiocancel.gctl_unit; 794262569Simp return; 795262569Simp } 796262569Simp primary_exit(EX_OSERR, "Unable to take over hast/%s device", 797262569Simp res->hr_provname); 798262569Simp} 799262569Simp 800270864Simpvoid 801270864Simphastd_primary(struct hast_resource *res) 802262569Simp{ 803262569Simp pthread_t td; 804262569Simp pid_t pid; 805262569Simp int error, mode, debuglevel; 806262569Simp 807262569Simp /* 808262569Simp * Create communication channel for sending control commands from 809262569Simp * parent to child. 810262569Simp */ 811262569Simp if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 812262569Simp /* TODO: There's no need for this to be fatal error. */ 813270864Simp KEEP_ERRNO((void)pidfile_remove(pfh)); 814270864Simp pjdlog_exit(EX_OSERR, 815262569Simp "Unable to create control sockets between parent and child"); 816262569Simp } 817262569Simp /* 818262569Simp * Create communication channel for sending events from child to parent. 819262569Simp */ 820262569Simp if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 821262569Simp /* TODO: There's no need for this to be fatal error. */ 822262569Simp KEEP_ERRNO((void)pidfile_remove(pfh)); 823262569Simp pjdlog_exit(EX_OSERR, 824262569Simp "Unable to create event sockets between child and parent"); 825262569Simp } 826270864Simp /* 827270864Simp * Create communication channel for sending connection requests from 828262569Simp * child to parent. 829262569Simp */ 830262569Simp if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 831262569Simp /* TODO: There's no need for this to be fatal error. */ 832262569Simp KEEP_ERRNO((void)pidfile_remove(pfh)); 833262569Simp pjdlog_exit(EX_OSERR, 834262569Simp "Unable to create connection sockets between child and parent"); 835262569Simp } 836262569Simp 837262569Simp pid = fork(); 838262569Simp if (pid < 0) { 839270864Simp /* TODO: There's no need for this to be fatal error. */ 840270864Simp KEEP_ERRNO((void)pidfile_remove(pfh)); 841262569Simp pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 842262569Simp } 843262569Simp 844262569Simp if (pid > 0) { 845262569Simp /* This is parent. */ 846262569Simp /* Declare that we are receiver. */ 847262569Simp proto_recv(res->hr_event, NULL, 0); 848262569Simp proto_recv(res->hr_conn, NULL, 0); 849262569Simp /* Declare that we are sender. */ 850270864Simp proto_send(res->hr_ctrl, NULL, 0); 851270864Simp res->hr_workerpid = pid; 852262569Simp return; 853262569Simp } 854262569Simp 855262569Simp gres = res; 856262569Simp mode = pjdlog_mode_get(); 857262569Simp debuglevel = pjdlog_debug_get(); 858262569Simp 859270864Simp /* Declare that we are sender. */ 860270864Simp proto_send(res->hr_event, NULL, 0); 861262569Simp proto_send(res->hr_conn, NULL, 0); 862262569Simp /* Declare that we are receiver. */ 863262569Simp proto_recv(res->hr_ctrl, NULL, 0); 864262569Simp descriptors_cleanup(res); 865262569Simp 866262569Simp descriptors_assert(res, mode); 867262569Simp 868262569Simp pjdlog_init(mode); 869262569Simp pjdlog_debug_set(debuglevel); 870270864Simp pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 871262569Simp setproctitle("%s (primary)", res->hr_name); 872262569Simp 873262569Simp init_local(res); 874262569Simp init_ggate(res); 875262569Simp init_environment(res); 876262569Simp 877262569Simp if (drop_privs(true) != 0) { 878262569Simp cleanup(res); 879262569Simp exit(EX_CONFIG); 880262569Simp } 881270864Simp pjdlog_info("Privileges successfully dropped."); 882270864Simp 883262569Simp /* 884262569Simp * Create the guard thread first, so we can handle signals from the 885262569Simp * very begining. 886262569Simp */ 887262569Simp error = pthread_create(&td, NULL, guard_thread, res); 888262569Simp PJDLOG_ASSERT(error == 0); 889262569Simp /* 890262569Simp * Create the control thread before sending any event to the parent, 891262569Simp * as we can deadlock when parent sends control request to worker, 892270864Simp * but worker has no control thread started yet, so parent waits. 893270864Simp * In the meantime worker sends an event to the parent, but parent 894262569Simp * is unable to handle the event, because it waits for control 895262569Simp * request response. 896262569Simp */ 897262569Simp error = pthread_create(&td, NULL, ctrl_thread, res); 898262569Simp PJDLOG_ASSERT(error == 0); 899262569Simp if (real_remote(res) && init_remote(res, NULL, NULL)) 900262569Simp sync_start(); 901262569Simp error = pthread_create(&td, NULL, ggate_recv_thread, res); 902262569Simp PJDLOG_ASSERT(error == 0); 903262569Simp error = pthread_create(&td, NULL, local_send_thread, res); 904262569Simp PJDLOG_ASSERT(error == 0); 905270864Simp error = pthread_create(&td, NULL, remote_send_thread, res); 906270864Simp PJDLOG_ASSERT(error == 0); 907262569Simp error = pthread_create(&td, NULL, remote_recv_thread, res); 908262569Simp PJDLOG_ASSERT(error == 0); 909262569Simp error = pthread_create(&td, NULL, ggate_send_thread, res); 910262569Simp PJDLOG_ASSERT(error == 0); 911262569Simp (void)sync_thread(res); 912262569Simp} 913262569Simp 914262569Simpstatic void 915262569Simpreqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 916262569Simp{ 917262569Simp char msg[1024]; 918270864Simp va_list ap; 919270864Simp int len; 920262569Simp 921262569Simp va_start(ap, fmt); 922262569Simp len = vsnprintf(msg, sizeof(msg), fmt, ap); 923262569Simp va_end(ap); 924270864Simp if ((size_t)len < sizeof(msg)) { 925270864Simp switch (ggio->gctl_cmd) { 926262569Simp case BIO_READ: 927262569Simp (void)snprintf(msg + len, sizeof(msg) - len, 928262569Simp "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 929270864Simp (uintmax_t)ggio->gctl_length); 930270864Simp break; 931262569Simp case BIO_DELETE: 932262569Simp (void)snprintf(msg + len, sizeof(msg) - len, 933262569Simp "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 934262569Simp (uintmax_t)ggio->gctl_length); 935262569Simp break; 936262569Simp case BIO_FLUSH: 937262569Simp (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 938262569Simp break; 939262569Simp case BIO_WRITE: 940270864Simp (void)snprintf(msg + len, sizeof(msg) - len, 941262569Simp "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 942262569Simp (uintmax_t)ggio->gctl_length); 943262569Simp break; 944262569Simp default: 945270864Simp (void)snprintf(msg + len, sizeof(msg) - len, 946262569Simp "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 947262569Simp break; 948262569Simp } 949262569Simp } 950262569Simp pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 951270864Simp} 952262569Simp 953262569Simpstatic void 954262569Simpremote_close(struct hast_resource *res, int ncomp) 955262569Simp{ 956262569Simp 957270864Simp rw_wlock(&hio_remote_lock[ncomp]); 958262569Simp /* 959270864Simp * A race is possible between dropping rlock and acquiring wlock - 960262569Simp * another thread can close connection in-between. 961262569Simp */ 962262569Simp if (!ISCONNECTED(res, ncomp)) { 963262569Simp PJDLOG_ASSERT(res->hr_remotein == NULL); 964284090Sian PJDLOG_ASSERT(res->hr_remoteout == NULL); 965284090Sian rw_unlock(&hio_remote_lock[ncomp]); 966284090Sian return; 967284090Sian } 968284090Sian 969284090Sian PJDLOG_ASSERT(res->hr_remotein != NULL); 970284090Sian PJDLOG_ASSERT(res->hr_remoteout != NULL); 971284090Sian 972262569Simp pjdlog_debug(2, "Closing incoming connection to %s.", 973262569Simp res->hr_remoteaddr); 974262569Simp proto_close(res->hr_remotein); 975262569Simp res->hr_remotein = NULL; 976262569Simp pjdlog_debug(2, "Closing outgoing connection to %s.", 977262569Simp res->hr_remoteaddr); 978262569Simp proto_close(res->hr_remoteout); 979262569Simp res->hr_remoteout = NULL; 980262569Simp 981262569Simp rw_unlock(&hio_remote_lock[ncomp]); 982284090Sian 983284090Sian pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 984284090Sian 985284090Sian /* 986284090Sian * Stop synchronization if in-progress. 987284090Sian */ 988262569Simp sync_stop(); 989262569Simp 990262569Simp event_send(res, EVENT_DISCONNECT); 991262569Simp} 992262569Simp 993262569Simp/* 994262569Simp * Thread receives ggate I/O requests from the kernel and passes them to 995262569Simp * appropriate threads: 996262569Simp * WRITE - always goes to both local_send and remote_send threads 997262569Simp * READ (when the block is up-to-date on local component) - 998262569Simp * only local_send thread 999262569Simp * READ (when the block isn't up-to-date on local component) - 1000262569Simp * only remote_send thread 1001262569Simp * DELETE - always goes to both local_send and remote_send threads 1002262569Simp * FLUSH - always goes to both local_send and remote_send threads 1003262569Simp */ 1004262569Simpstatic void * 1005262569Simpggate_recv_thread(void *arg) 1006262569Simp{ 1007262569Simp struct hast_resource *res = arg; 1008262569Simp struct g_gate_ctl_io *ggio; 1009262569Simp struct hio *hio; 1010262569Simp unsigned int ii, ncomp, ncomps; 1011262569Simp int error; 1012270864Simp 1013270864Simp ncomps = HAST_NCOMPONENTS; 1014262569Simp 1015262569Simp for (;;) { 1016262569Simp pjdlog_debug(2, "ggate_recv: Taking free request."); 1017262569Simp QUEUE_TAKE2(hio, free); 1018262569Simp pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1019262569Simp ggio = &hio->hio_ggio; 1020262569Simp ggio->gctl_unit = res->hr_ggateunit; 1021262569Simp ggio->gctl_length = MAXPHYS; 1022262569Simp ggio->gctl_error = 0; 1023262569Simp pjdlog_debug(2, 1024262569Simp "ggate_recv: (%p) Waiting for request from the kernel.", 1025262569Simp hio); 1026262569Simp if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1027262569Simp if (sigexit_received) 1028262569Simp pthread_exit(NULL); 1029262569Simp primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1030262569Simp } 1031262569Simp error = ggio->gctl_error; 1032262569Simp switch (error) { 1033 case 0: 1034 break; 1035 case ECANCELED: 1036 /* Exit gracefully. */ 1037 if (!sigexit_received) { 1038 pjdlog_debug(2, 1039 "ggate_recv: (%p) Received cancel from the kernel.", 1040 hio); 1041 pjdlog_info("Received cancel from the kernel, exiting."); 1042 } 1043 pthread_exit(NULL); 1044 case ENOMEM: 1045 /* 1046 * Buffer too small? Impossible, we allocate MAXPHYS 1047 * bytes - request can't be bigger than that. 1048 */ 1049 /* FALLTHROUGH */ 1050 case ENXIO: 1051 default: 1052 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1053 strerror(error)); 1054 } 1055 for (ii = 0; ii < ncomps; ii++) 1056 hio->hio_errors[ii] = EINVAL; 1057 reqlog(LOG_DEBUG, 2, ggio, 1058 "ggate_recv: (%p) Request received from the kernel: ", 1059 hio); 1060 /* 1061 * Inform all components about new write request. 1062 * For read request prefer local component unless the given 1063 * range is out-of-date, then use remote component. 1064 */ 1065 switch (ggio->gctl_cmd) { 1066 case BIO_READ: 1067 pjdlog_debug(2, 1068 "ggate_recv: (%p) Moving request to the send queue.", 1069 hio); 1070 refcount_init(&hio->hio_countdown, 1); 1071 mtx_lock(&metadata_lock); 1072 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1073 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1074 /* 1075 * This range is up-to-date on local component, 1076 * so handle request locally. 1077 */ 1078 /* Local component is 0 for now. */ 1079 ncomp = 0; 1080 } else /* if (res->hr_syncsrc == 1081 HAST_SYNCSRC_SECONDARY) */ { 1082 PJDLOG_ASSERT(res->hr_syncsrc == 1083 HAST_SYNCSRC_SECONDARY); 1084 /* 1085 * This range is out-of-date on local component, 1086 * so send request to the remote node. 1087 */ 1088 /* Remote component is 1 for now. */ 1089 ncomp = 1; 1090 } 1091 mtx_unlock(&metadata_lock); 1092 QUEUE_INSERT1(hio, send, ncomp); 1093 break; 1094 case BIO_WRITE: 1095 if (res->hr_resuid == 0) { 1096 /* 1097 * This is first write, initialize localcnt and 1098 * resuid. 1099 */ 1100 res->hr_primary_localcnt = 1; 1101 (void)init_resuid(res); 1102 } 1103 for (;;) { 1104 mtx_lock(&range_lock); 1105 if (rangelock_islocked(range_sync, 1106 ggio->gctl_offset, ggio->gctl_length)) { 1107 pjdlog_debug(2, 1108 "regular: Range offset=%jd length=%zu locked.", 1109 (intmax_t)ggio->gctl_offset, 1110 (size_t)ggio->gctl_length); 1111 range_regular_wait = true; 1112 cv_wait(&range_regular_cond, &range_lock); 1113 range_regular_wait = false; 1114 mtx_unlock(&range_lock); 1115 continue; 1116 } 1117 if (rangelock_add(range_regular, 1118 ggio->gctl_offset, ggio->gctl_length) < 0) { 1119 mtx_unlock(&range_lock); 1120 pjdlog_debug(2, 1121 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1122 (intmax_t)ggio->gctl_offset, 1123 (size_t)ggio->gctl_length); 1124 sleep(1); 1125 continue; 1126 } 1127 mtx_unlock(&range_lock); 1128 break; 1129 } 1130 mtx_lock(&res->hr_amp_lock); 1131 if (activemap_write_start(res->hr_amp, 1132 ggio->gctl_offset, ggio->gctl_length)) { 1133 (void)hast_activemap_flush(res); 1134 } 1135 mtx_unlock(&res->hr_amp_lock); 1136 /* FALLTHROUGH */ 1137 case BIO_DELETE: 1138 case BIO_FLUSH: 1139 pjdlog_debug(2, 1140 "ggate_recv: (%p) Moving request to the send queues.", 1141 hio); 1142 refcount_init(&hio->hio_countdown, ncomps); 1143 for (ii = 0; ii < ncomps; ii++) 1144 QUEUE_INSERT1(hio, send, ii); 1145 break; 1146 } 1147 } 1148 /* NOTREACHED */ 1149 return (NULL); 1150} 1151 1152/* 1153 * Thread reads from or writes to local component. 1154 * If local read fails, it redirects it to remote_send thread. 1155 */ 1156static void * 1157local_send_thread(void *arg) 1158{ 1159 struct hast_resource *res = arg; 1160 struct g_gate_ctl_io *ggio; 1161 struct hio *hio; 1162 unsigned int ncomp, rncomp; 1163 ssize_t ret; 1164 1165 /* Local component is 0 for now. */ 1166 ncomp = 0; 1167 /* Remote component is 1 for now. */ 1168 rncomp = 1; 1169 1170 for (;;) { 1171 pjdlog_debug(2, "local_send: Taking request."); 1172 QUEUE_TAKE1(hio, send, ncomp, 0); 1173 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1174 ggio = &hio->hio_ggio; 1175 switch (ggio->gctl_cmd) { 1176 case BIO_READ: 1177 ret = pread(res->hr_localfd, ggio->gctl_data, 1178 ggio->gctl_length, 1179 ggio->gctl_offset + res->hr_localoff); 1180 if (ret == ggio->gctl_length) 1181 hio->hio_errors[ncomp] = 0; 1182 else { 1183 /* 1184 * If READ failed, try to read from remote node. 1185 */ 1186 if (ret < 0) { 1187 reqlog(LOG_WARNING, 0, ggio, 1188 "Local request failed (%s), trying remote node. ", 1189 strerror(errno)); 1190 } else if (ret != ggio->gctl_length) { 1191 reqlog(LOG_WARNING, 0, ggio, 1192 "Local request failed (%zd != %jd), trying remote node. ", 1193 ret, (intmax_t)ggio->gctl_length); 1194 } 1195 QUEUE_INSERT1(hio, send, rncomp); 1196 continue; 1197 } 1198 break; 1199 case BIO_WRITE: 1200 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1201 ggio->gctl_length, 1202 ggio->gctl_offset + res->hr_localoff); 1203 if (ret < 0) { 1204 hio->hio_errors[ncomp] = errno; 1205 reqlog(LOG_WARNING, 0, ggio, 1206 "Local request failed (%s): ", 1207 strerror(errno)); 1208 } else if (ret != ggio->gctl_length) { 1209 hio->hio_errors[ncomp] = EIO; 1210 reqlog(LOG_WARNING, 0, ggio, 1211 "Local request failed (%zd != %jd): ", 1212 ret, (intmax_t)ggio->gctl_length); 1213 } else { 1214 hio->hio_errors[ncomp] = 0; 1215 } 1216 break; 1217 case BIO_DELETE: 1218 ret = g_delete(res->hr_localfd, 1219 ggio->gctl_offset + res->hr_localoff, 1220 ggio->gctl_length); 1221 if (ret < 0) { 1222 hio->hio_errors[ncomp] = errno; 1223 reqlog(LOG_WARNING, 0, ggio, 1224 "Local request failed (%s): ", 1225 strerror(errno)); 1226 } else { 1227 hio->hio_errors[ncomp] = 0; 1228 } 1229 break; 1230 case BIO_FLUSH: 1231 ret = g_flush(res->hr_localfd); 1232 if (ret < 0) { 1233 hio->hio_errors[ncomp] = errno; 1234 reqlog(LOG_WARNING, 0, ggio, 1235 "Local request failed (%s): ", 1236 strerror(errno)); 1237 } else { 1238 hio->hio_errors[ncomp] = 0; 1239 } 1240 break; 1241 } 1242 if (refcount_release(&hio->hio_countdown)) { 1243 if (ISSYNCREQ(hio)) { 1244 mtx_lock(&sync_lock); 1245 SYNCREQDONE(hio); 1246 mtx_unlock(&sync_lock); 1247 cv_signal(&sync_cond); 1248 } else { 1249 pjdlog_debug(2, 1250 "local_send: (%p) Moving request to the done queue.", 1251 hio); 1252 QUEUE_INSERT2(hio, done); 1253 } 1254 } 1255 } 1256 /* NOTREACHED */ 1257 return (NULL); 1258} 1259 1260static void 1261keepalive_send(struct hast_resource *res, unsigned int ncomp) 1262{ 1263 struct nv *nv; 1264 1265 rw_rlock(&hio_remote_lock[ncomp]); 1266 1267 if (!ISCONNECTED(res, ncomp)) { 1268 rw_unlock(&hio_remote_lock[ncomp]); 1269 return; 1270 } 1271 1272 PJDLOG_ASSERT(res->hr_remotein != NULL); 1273 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1274 1275 nv = nv_alloc(); 1276 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1277 if (nv_error(nv) != 0) { 1278 rw_unlock(&hio_remote_lock[ncomp]); 1279 nv_free(nv); 1280 pjdlog_debug(1, 1281 "keepalive_send: Unable to prepare header to send."); 1282 return; 1283 } 1284 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1285 rw_unlock(&hio_remote_lock[ncomp]); 1286 pjdlog_common(LOG_DEBUG, 1, errno, 1287 "keepalive_send: Unable to send request"); 1288 nv_free(nv); 1289 remote_close(res, ncomp); 1290 return; 1291 } 1292 1293 rw_unlock(&hio_remote_lock[ncomp]); 1294 nv_free(nv); 1295 pjdlog_debug(2, "keepalive_send: Request sent."); 1296} 1297 1298/* 1299 * Thread sends request to secondary node. 1300 */ 1301static void * 1302remote_send_thread(void *arg) 1303{ 1304 struct hast_resource *res = arg; 1305 struct g_gate_ctl_io *ggio; 1306 time_t lastcheck, now; 1307 struct hio *hio; 1308 struct nv *nv; 1309 unsigned int ncomp; 1310 bool wakeup; 1311 uint64_t offset, length; 1312 uint8_t cmd; 1313 void *data; 1314 1315 /* Remote component is 1 for now. */ 1316 ncomp = 1; 1317 lastcheck = time(NULL); 1318 1319 for (;;) { 1320 pjdlog_debug(2, "remote_send: Taking request."); 1321 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1322 if (hio == NULL) { 1323 now = time(NULL); 1324 if (lastcheck + HAST_KEEPALIVE <= now) { 1325 keepalive_send(res, ncomp); 1326 lastcheck = now; 1327 } 1328 continue; 1329 } 1330 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1331 ggio = &hio->hio_ggio; 1332 switch (ggio->gctl_cmd) { 1333 case BIO_READ: 1334 cmd = HIO_READ; 1335 data = NULL; 1336 offset = ggio->gctl_offset; 1337 length = ggio->gctl_length; 1338 break; 1339 case BIO_WRITE: 1340 cmd = HIO_WRITE; 1341 data = ggio->gctl_data; 1342 offset = ggio->gctl_offset; 1343 length = ggio->gctl_length; 1344 break; 1345 case BIO_DELETE: 1346 cmd = HIO_DELETE; 1347 data = NULL; 1348 offset = ggio->gctl_offset; 1349 length = ggio->gctl_length; 1350 break; 1351 case BIO_FLUSH: 1352 cmd = HIO_FLUSH; 1353 data = NULL; 1354 offset = 0; 1355 length = 0; 1356 break; 1357 default: 1358 PJDLOG_ASSERT(!"invalid condition"); 1359 abort(); 1360 } 1361 nv = nv_alloc(); 1362 nv_add_uint8(nv, cmd, "cmd"); 1363 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1364 nv_add_uint64(nv, offset, "offset"); 1365 nv_add_uint64(nv, length, "length"); 1366 if (nv_error(nv) != 0) { 1367 hio->hio_errors[ncomp] = nv_error(nv); 1368 pjdlog_debug(2, 1369 "remote_send: (%p) Unable to prepare header to send.", 1370 hio); 1371 reqlog(LOG_ERR, 0, ggio, 1372 "Unable to prepare header to send (%s): ", 1373 strerror(nv_error(nv))); 1374 /* Move failed request immediately to the done queue. */ 1375 goto done_queue; 1376 } 1377 pjdlog_debug(2, 1378 "remote_send: (%p) Moving request to the recv queue.", 1379 hio); 1380 /* 1381 * Protect connection from disappearing. 1382 */ 1383 rw_rlock(&hio_remote_lock[ncomp]); 1384 if (!ISCONNECTED(res, ncomp)) { 1385 rw_unlock(&hio_remote_lock[ncomp]); 1386 hio->hio_errors[ncomp] = ENOTCONN; 1387 goto done_queue; 1388 } 1389 /* 1390 * Move the request to recv queue before sending it, because 1391 * in different order we can get reply before we move request 1392 * to recv queue. 1393 */ 1394 mtx_lock(&hio_recv_list_lock[ncomp]); 1395 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1396 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1397 mtx_unlock(&hio_recv_list_lock[ncomp]); 1398 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1399 data != NULL ? length : 0) < 0) { 1400 hio->hio_errors[ncomp] = errno; 1401 rw_unlock(&hio_remote_lock[ncomp]); 1402 pjdlog_debug(2, 1403 "remote_send: (%p) Unable to send request.", hio); 1404 reqlog(LOG_ERR, 0, ggio, 1405 "Unable to send request (%s): ", 1406 strerror(hio->hio_errors[ncomp])); 1407 remote_close(res, ncomp); 1408 /* 1409 * Take request back from the receive queue and move 1410 * it immediately to the done queue. 1411 */ 1412 mtx_lock(&hio_recv_list_lock[ncomp]); 1413 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1414 mtx_unlock(&hio_recv_list_lock[ncomp]); 1415 goto done_queue; 1416 } 1417 rw_unlock(&hio_remote_lock[ncomp]); 1418 nv_free(nv); 1419 if (wakeup) 1420 cv_signal(&hio_recv_list_cond[ncomp]); 1421 continue; 1422done_queue: 1423 nv_free(nv); 1424 if (ISSYNCREQ(hio)) { 1425 if (!refcount_release(&hio->hio_countdown)) 1426 continue; 1427 mtx_lock(&sync_lock); 1428 SYNCREQDONE(hio); 1429 mtx_unlock(&sync_lock); 1430 cv_signal(&sync_cond); 1431 continue; 1432 } 1433 if (ggio->gctl_cmd == BIO_WRITE) { 1434 mtx_lock(&res->hr_amp_lock); 1435 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1436 ggio->gctl_length)) { 1437 (void)hast_activemap_flush(res); 1438 } 1439 mtx_unlock(&res->hr_amp_lock); 1440 } 1441 if (!refcount_release(&hio->hio_countdown)) 1442 continue; 1443 pjdlog_debug(2, 1444 "remote_send: (%p) Moving request to the done queue.", 1445 hio); 1446 QUEUE_INSERT2(hio, done); 1447 } 1448 /* NOTREACHED */ 1449 return (NULL); 1450} 1451 1452/* 1453 * Thread receives answer from secondary node and passes it to ggate_send 1454 * thread. 1455 */ 1456static void * 1457remote_recv_thread(void *arg) 1458{ 1459 struct hast_resource *res = arg; 1460 struct g_gate_ctl_io *ggio; 1461 struct hio *hio; 1462 struct nv *nv; 1463 unsigned int ncomp; 1464 uint64_t seq; 1465 int error; 1466 1467 /* Remote component is 1 for now. */ 1468 ncomp = 1; 1469 1470 for (;;) { 1471 /* Wait until there is anything to receive. */ 1472 mtx_lock(&hio_recv_list_lock[ncomp]); 1473 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1474 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1475 cv_wait(&hio_recv_list_cond[ncomp], 1476 &hio_recv_list_lock[ncomp]); 1477 } 1478 mtx_unlock(&hio_recv_list_lock[ncomp]); 1479 rw_rlock(&hio_remote_lock[ncomp]); 1480 if (!ISCONNECTED(res, ncomp)) { 1481 rw_unlock(&hio_remote_lock[ncomp]); 1482 /* 1483 * Connection is dead, so move all pending requests to 1484 * the done queue (one-by-one). 1485 */ 1486 mtx_lock(&hio_recv_list_lock[ncomp]); 1487 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1488 PJDLOG_ASSERT(hio != NULL); 1489 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1490 hio_next[ncomp]); 1491 mtx_unlock(&hio_recv_list_lock[ncomp]); 1492 goto done_queue; 1493 } 1494 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1495 pjdlog_errno(LOG_ERR, 1496 "Unable to receive reply header"); 1497 rw_unlock(&hio_remote_lock[ncomp]); 1498 remote_close(res, ncomp); 1499 continue; 1500 } 1501 rw_unlock(&hio_remote_lock[ncomp]); 1502 seq = nv_get_uint64(nv, "seq"); 1503 if (seq == 0) { 1504 pjdlog_error("Header contains no 'seq' field."); 1505 nv_free(nv); 1506 continue; 1507 } 1508 mtx_lock(&hio_recv_list_lock[ncomp]); 1509 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1510 if (hio->hio_ggio.gctl_seq == seq) { 1511 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1512 hio_next[ncomp]); 1513 break; 1514 } 1515 } 1516 mtx_unlock(&hio_recv_list_lock[ncomp]); 1517 if (hio == NULL) { 1518 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1519 (uintmax_t)seq); 1520 nv_free(nv); 1521 continue; 1522 } 1523 error = nv_get_int16(nv, "error"); 1524 if (error != 0) { 1525 /* Request failed on remote side. */ 1526 hio->hio_errors[ncomp] = error; 1527 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1528 "Remote request failed (%s): ", strerror(error)); 1529 nv_free(nv); 1530 goto done_queue; 1531 } 1532 ggio = &hio->hio_ggio; 1533 switch (ggio->gctl_cmd) { 1534 case BIO_READ: 1535 rw_rlock(&hio_remote_lock[ncomp]); 1536 if (!ISCONNECTED(res, ncomp)) { 1537 rw_unlock(&hio_remote_lock[ncomp]); 1538 nv_free(nv); 1539 goto done_queue; 1540 } 1541 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1542 ggio->gctl_data, ggio->gctl_length) < 0) { 1543 hio->hio_errors[ncomp] = errno; 1544 pjdlog_errno(LOG_ERR, 1545 "Unable to receive reply data"); 1546 rw_unlock(&hio_remote_lock[ncomp]); 1547 nv_free(nv); 1548 remote_close(res, ncomp); 1549 goto done_queue; 1550 } 1551 rw_unlock(&hio_remote_lock[ncomp]); 1552 break; 1553 case BIO_WRITE: 1554 case BIO_DELETE: 1555 case BIO_FLUSH: 1556 break; 1557 default: 1558 PJDLOG_ASSERT(!"invalid condition"); 1559 abort(); 1560 } 1561 hio->hio_errors[ncomp] = 0; 1562 nv_free(nv); 1563done_queue: 1564 if (refcount_release(&hio->hio_countdown)) { 1565 if (ISSYNCREQ(hio)) { 1566 mtx_lock(&sync_lock); 1567 SYNCREQDONE(hio); 1568 mtx_unlock(&sync_lock); 1569 cv_signal(&sync_cond); 1570 } else { 1571 pjdlog_debug(2, 1572 "remote_recv: (%p) Moving request to the done queue.", 1573 hio); 1574 QUEUE_INSERT2(hio, done); 1575 } 1576 } 1577 } 1578 /* NOTREACHED */ 1579 return (NULL); 1580} 1581 1582/* 1583 * Thread sends answer to the kernel. 1584 */ 1585static void * 1586ggate_send_thread(void *arg) 1587{ 1588 struct hast_resource *res = arg; 1589 struct g_gate_ctl_io *ggio; 1590 struct hio *hio; 1591 unsigned int ii, ncomp, ncomps; 1592 1593 ncomps = HAST_NCOMPONENTS; 1594 1595 for (;;) { 1596 pjdlog_debug(2, "ggate_send: Taking request."); 1597 QUEUE_TAKE2(hio, done); 1598 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1599 ggio = &hio->hio_ggio; 1600 for (ii = 0; ii < ncomps; ii++) { 1601 if (hio->hio_errors[ii] == 0) { 1602 /* 1603 * One successful request is enough to declare 1604 * success. 1605 */ 1606 ggio->gctl_error = 0; 1607 break; 1608 } 1609 } 1610 if (ii == ncomps) { 1611 /* 1612 * None of the requests were successful. 1613 * Use the error from local component except the 1614 * case when we did only remote request. 1615 */ 1616 if (ggio->gctl_cmd == BIO_READ && 1617 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1618 ggio->gctl_error = hio->hio_errors[1]; 1619 else 1620 ggio->gctl_error = hio->hio_errors[0]; 1621 } 1622 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1623 mtx_lock(&res->hr_amp_lock); 1624 activemap_write_complete(res->hr_amp, 1625 ggio->gctl_offset, ggio->gctl_length); 1626 mtx_unlock(&res->hr_amp_lock); 1627 } 1628 if (ggio->gctl_cmd == BIO_WRITE) { 1629 /* 1630 * Unlock range we locked. 1631 */ 1632 mtx_lock(&range_lock); 1633 rangelock_del(range_regular, ggio->gctl_offset, 1634 ggio->gctl_length); 1635 if (range_sync_wait) 1636 cv_signal(&range_sync_cond); 1637 mtx_unlock(&range_lock); 1638 /* 1639 * Bump local count if this is first write after 1640 * connection failure with remote node. 1641 */ 1642 ncomp = 1; 1643 rw_rlock(&hio_remote_lock[ncomp]); 1644 if (!ISCONNECTED(res, ncomp)) { 1645 mtx_lock(&metadata_lock); 1646 if (res->hr_primary_localcnt == 1647 res->hr_secondary_remotecnt) { 1648 res->hr_primary_localcnt++; 1649 pjdlog_debug(1, 1650 "Increasing localcnt to %ju.", 1651 (uintmax_t)res->hr_primary_localcnt); 1652 (void)metadata_write(res); 1653 } 1654 mtx_unlock(&metadata_lock); 1655 } 1656 rw_unlock(&hio_remote_lock[ncomp]); 1657 } 1658 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1659 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1660 pjdlog_debug(2, 1661 "ggate_send: (%p) Moving request to the free queue.", hio); 1662 QUEUE_INSERT2(hio, free); 1663 } 1664 /* NOTREACHED */ 1665 return (NULL); 1666} 1667 1668/* 1669 * Thread synchronize local and remote components. 1670 */ 1671static void * 1672sync_thread(void *arg __unused) 1673{ 1674 struct hast_resource *res = arg; 1675 struct hio *hio; 1676 struct g_gate_ctl_io *ggio; 1677 struct timeval tstart, tend, tdiff; 1678 unsigned int ii, ncomp, ncomps; 1679 off_t offset, length, synced; 1680 bool dorewind; 1681 int syncext; 1682 1683 ncomps = HAST_NCOMPONENTS; 1684 dorewind = true; 1685 synced = 0; 1686 offset = -1; 1687 1688 for (;;) { 1689 mtx_lock(&sync_lock); 1690 if (offset >= 0 && !sync_inprogress) { 1691 gettimeofday(&tend, NULL); 1692 timersub(&tend, &tstart, &tdiff); 1693 pjdlog_info("Synchronization interrupted after %#.0T. " 1694 "%NB synchronized so far.", &tdiff, 1695 (intmax_t)synced); 1696 event_send(res, EVENT_SYNCINTR); 1697 } 1698 while (!sync_inprogress) { 1699 dorewind = true; 1700 synced = 0; 1701 cv_wait(&sync_cond, &sync_lock); 1702 } 1703 mtx_unlock(&sync_lock); 1704 /* 1705 * Obtain offset at which we should synchronize. 1706 * Rewind synchronization if needed. 1707 */ 1708 mtx_lock(&res->hr_amp_lock); 1709 if (dorewind) 1710 activemap_sync_rewind(res->hr_amp); 1711 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1712 if (syncext != -1) { 1713 /* 1714 * We synchronized entire syncext extent, we can mark 1715 * it as clean now. 1716 */ 1717 if (activemap_extent_complete(res->hr_amp, syncext)) 1718 (void)hast_activemap_flush(res); 1719 } 1720 mtx_unlock(&res->hr_amp_lock); 1721 if (dorewind) { 1722 dorewind = false; 1723 if (offset < 0) 1724 pjdlog_info("Nodes are in sync."); 1725 else { 1726 pjdlog_info("Synchronization started. %NB to go.", 1727 (intmax_t)(res->hr_extentsize * 1728 activemap_ndirty(res->hr_amp))); 1729 event_send(res, EVENT_SYNCSTART); 1730 gettimeofday(&tstart, NULL); 1731 } 1732 } 1733 if (offset < 0) { 1734 sync_stop(); 1735 pjdlog_debug(1, "Nothing to synchronize."); 1736 /* 1737 * Synchronization complete, make both localcnt and 1738 * remotecnt equal. 1739 */ 1740 ncomp = 1; 1741 rw_rlock(&hio_remote_lock[ncomp]); 1742 if (ISCONNECTED(res, ncomp)) { 1743 if (synced > 0) { 1744 int64_t bps; 1745 1746 gettimeofday(&tend, NULL); 1747 timersub(&tend, &tstart, &tdiff); 1748 bps = (int64_t)((double)synced / 1749 ((double)tdiff.tv_sec + 1750 (double)tdiff.tv_usec / 1000000)); 1751 pjdlog_info("Synchronization complete. " 1752 "%NB synchronized in %#.0lT (%NB/sec).", 1753 (intmax_t)synced, &tdiff, 1754 (intmax_t)bps); 1755 event_send(res, EVENT_SYNCDONE); 1756 } 1757 mtx_lock(&metadata_lock); 1758 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1759 res->hr_primary_localcnt = 1760 res->hr_secondary_remotecnt; 1761 res->hr_primary_remotecnt = 1762 res->hr_secondary_localcnt; 1763 pjdlog_debug(1, 1764 "Setting localcnt to %ju and remotecnt to %ju.", 1765 (uintmax_t)res->hr_primary_localcnt, 1766 (uintmax_t)res->hr_primary_remotecnt); 1767 (void)metadata_write(res); 1768 mtx_unlock(&metadata_lock); 1769 } 1770 rw_unlock(&hio_remote_lock[ncomp]); 1771 continue; 1772 } 1773 pjdlog_debug(2, "sync: Taking free request."); 1774 QUEUE_TAKE2(hio, free); 1775 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1776 /* 1777 * Lock the range we are going to synchronize. We don't want 1778 * race where someone writes between our read and write. 1779 */ 1780 for (;;) { 1781 mtx_lock(&range_lock); 1782 if (rangelock_islocked(range_regular, offset, length)) { 1783 pjdlog_debug(2, 1784 "sync: Range offset=%jd length=%jd locked.", 1785 (intmax_t)offset, (intmax_t)length); 1786 range_sync_wait = true; 1787 cv_wait(&range_sync_cond, &range_lock); 1788 range_sync_wait = false; 1789 mtx_unlock(&range_lock); 1790 continue; 1791 } 1792 if (rangelock_add(range_sync, offset, length) < 0) { 1793 mtx_unlock(&range_lock); 1794 pjdlog_debug(2, 1795 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1796 (intmax_t)offset, (intmax_t)length); 1797 sleep(1); 1798 continue; 1799 } 1800 mtx_unlock(&range_lock); 1801 break; 1802 } 1803 /* 1804 * First read the data from synchronization source. 1805 */ 1806 SYNCREQ(hio); 1807 ggio = &hio->hio_ggio; 1808 ggio->gctl_cmd = BIO_READ; 1809 ggio->gctl_offset = offset; 1810 ggio->gctl_length = length; 1811 ggio->gctl_error = 0; 1812 for (ii = 0; ii < ncomps; ii++) 1813 hio->hio_errors[ii] = EINVAL; 1814 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1815 hio); 1816 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1817 hio); 1818 mtx_lock(&metadata_lock); 1819 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1820 /* 1821 * This range is up-to-date on local component, 1822 * so handle request locally. 1823 */ 1824 /* Local component is 0 for now. */ 1825 ncomp = 0; 1826 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1827 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1828 /* 1829 * This range is out-of-date on local component, 1830 * so send request to the remote node. 1831 */ 1832 /* Remote component is 1 for now. */ 1833 ncomp = 1; 1834 } 1835 mtx_unlock(&metadata_lock); 1836 refcount_init(&hio->hio_countdown, 1); 1837 QUEUE_INSERT1(hio, send, ncomp); 1838 1839 /* 1840 * Let's wait for READ to finish. 1841 */ 1842 mtx_lock(&sync_lock); 1843 while (!ISSYNCREQDONE(hio)) 1844 cv_wait(&sync_cond, &sync_lock); 1845 mtx_unlock(&sync_lock); 1846 1847 if (hio->hio_errors[ncomp] != 0) { 1848 pjdlog_error("Unable to read synchronization data: %s.", 1849 strerror(hio->hio_errors[ncomp])); 1850 goto free_queue; 1851 } 1852 1853 /* 1854 * We read the data from synchronization source, now write it 1855 * to synchronization target. 1856 */ 1857 SYNCREQ(hio); 1858 ggio->gctl_cmd = BIO_WRITE; 1859 for (ii = 0; ii < ncomps; ii++) 1860 hio->hio_errors[ii] = EINVAL; 1861 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1862 hio); 1863 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1864 hio); 1865 mtx_lock(&metadata_lock); 1866 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1867 /* 1868 * This range is up-to-date on local component, 1869 * so we update remote component. 1870 */ 1871 /* Remote component is 1 for now. */ 1872 ncomp = 1; 1873 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1874 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1875 /* 1876 * This range is out-of-date on local component, 1877 * so we update it. 1878 */ 1879 /* Local component is 0 for now. */ 1880 ncomp = 0; 1881 } 1882 mtx_unlock(&metadata_lock); 1883 1884 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1885 hio); 1886 refcount_init(&hio->hio_countdown, 1); 1887 QUEUE_INSERT1(hio, send, ncomp); 1888 1889 /* 1890 * Let's wait for WRITE to finish. 1891 */ 1892 mtx_lock(&sync_lock); 1893 while (!ISSYNCREQDONE(hio)) 1894 cv_wait(&sync_cond, &sync_lock); 1895 mtx_unlock(&sync_lock); 1896 1897 if (hio->hio_errors[ncomp] != 0) { 1898 pjdlog_error("Unable to write synchronization data: %s.", 1899 strerror(hio->hio_errors[ncomp])); 1900 goto free_queue; 1901 } 1902 1903 synced += length; 1904free_queue: 1905 mtx_lock(&range_lock); 1906 rangelock_del(range_sync, offset, length); 1907 if (range_regular_wait) 1908 cv_signal(&range_regular_cond); 1909 mtx_unlock(&range_lock); 1910 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1911 hio); 1912 QUEUE_INSERT2(hio, free); 1913 } 1914 /* NOTREACHED */ 1915 return (NULL); 1916} 1917 1918void 1919primary_config_reload(struct hast_resource *res, struct nv *nv) 1920{ 1921 unsigned int ii, ncomps; 1922 int modified, vint; 1923 const char *vstr; 1924 1925 pjdlog_info("Reloading configuration..."); 1926 1927 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1928 PJDLOG_ASSERT(gres == res); 1929 nv_assert(nv, "remoteaddr"); 1930 nv_assert(nv, "sourceaddr"); 1931 nv_assert(nv, "replication"); 1932 nv_assert(nv, "checksum"); 1933 nv_assert(nv, "compression"); 1934 nv_assert(nv, "timeout"); 1935 nv_assert(nv, "exec"); 1936 1937 ncomps = HAST_NCOMPONENTS; 1938 1939#define MODIFIED_REMOTEADDR 0x01 1940#define MODIFIED_SOURCEADDR 0x02 1941#define MODIFIED_REPLICATION 0x04 1942#define MODIFIED_CHECKSUM 0x08 1943#define MODIFIED_COMPRESSION 0x10 1944#define MODIFIED_TIMEOUT 0x20 1945#define MODIFIED_EXEC 0x40 1946 modified = 0; 1947 1948 vstr = nv_get_string(nv, "remoteaddr"); 1949 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 1950 /* 1951 * Don't copy res->hr_remoteaddr to gres just yet. 1952 * We want remote_close() to log disconnect from the old 1953 * addresses, not from the new ones. 1954 */ 1955 modified |= MODIFIED_REMOTEADDR; 1956 } 1957 vstr = nv_get_string(nv, "sourceaddr"); 1958 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 1959 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 1960 modified |= MODIFIED_SOURCEADDR; 1961 } 1962 vint = nv_get_int32(nv, "replication"); 1963 if (gres->hr_replication != vint) { 1964 gres->hr_replication = vint; 1965 modified |= MODIFIED_REPLICATION; 1966 } 1967 vint = nv_get_int32(nv, "checksum"); 1968 if (gres->hr_checksum != vint) { 1969 gres->hr_checksum = vint; 1970 modified |= MODIFIED_CHECKSUM; 1971 } 1972 vint = nv_get_int32(nv, "compression"); 1973 if (gres->hr_compression != vint) { 1974 gres->hr_compression = vint; 1975 modified |= MODIFIED_COMPRESSION; 1976 } 1977 vint = nv_get_int32(nv, "timeout"); 1978 if (gres->hr_timeout != vint) { 1979 gres->hr_timeout = vint; 1980 modified |= MODIFIED_TIMEOUT; 1981 } 1982 vstr = nv_get_string(nv, "exec"); 1983 if (strcmp(gres->hr_exec, vstr) != 0) { 1984 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 1985 modified |= MODIFIED_EXEC; 1986 } 1987 1988 /* 1989 * Change timeout for connected sockets. 1990 * Don't bother if we need to reconnect. 1991 */ 1992 if ((modified & MODIFIED_TIMEOUT) != 0 && 1993 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 1994 MODIFIED_REPLICATION)) == 0) { 1995 for (ii = 0; ii < ncomps; ii++) { 1996 if (!ISREMOTE(ii)) 1997 continue; 1998 rw_rlock(&hio_remote_lock[ii]); 1999 if (!ISCONNECTED(gres, ii)) { 2000 rw_unlock(&hio_remote_lock[ii]); 2001 continue; 2002 } 2003 rw_unlock(&hio_remote_lock[ii]); 2004 if (proto_timeout(gres->hr_remotein, 2005 gres->hr_timeout) < 0) { 2006 pjdlog_errno(LOG_WARNING, 2007 "Unable to set connection timeout"); 2008 } 2009 if (proto_timeout(gres->hr_remoteout, 2010 gres->hr_timeout) < 0) { 2011 pjdlog_errno(LOG_WARNING, 2012 "Unable to set connection timeout"); 2013 } 2014 } 2015 } 2016 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2017 MODIFIED_REPLICATION)) != 0) { 2018 for (ii = 0; ii < ncomps; ii++) { 2019 if (!ISREMOTE(ii)) 2020 continue; 2021 remote_close(gres, ii); 2022 } 2023 if (modified & MODIFIED_REMOTEADDR) { 2024 vstr = nv_get_string(nv, "remoteaddr"); 2025 strlcpy(gres->hr_remoteaddr, vstr, 2026 sizeof(gres->hr_remoteaddr)); 2027 } 2028 } 2029#undef MODIFIED_REMOTEADDR 2030#undef MODIFIED_SOURCEADDR 2031#undef MODIFIED_REPLICATION 2032#undef MODIFIED_CHECKSUM 2033#undef MODIFIED_COMPRESSION 2034#undef MODIFIED_TIMEOUT 2035#undef MODIFIED_EXEC 2036 2037 pjdlog_info("Configuration reloaded successfully."); 2038} 2039 2040static void 2041guard_one(struct hast_resource *res, unsigned int ncomp) 2042{ 2043 struct proto_conn *in, *out; 2044 2045 if (!ISREMOTE(ncomp)) 2046 return; 2047 2048 rw_rlock(&hio_remote_lock[ncomp]); 2049 2050 if (!real_remote(res)) { 2051 rw_unlock(&hio_remote_lock[ncomp]); 2052 return; 2053 } 2054 2055 if (ISCONNECTED(res, ncomp)) { 2056 PJDLOG_ASSERT(res->hr_remotein != NULL); 2057 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2058 rw_unlock(&hio_remote_lock[ncomp]); 2059 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2060 res->hr_remoteaddr); 2061 return; 2062 } 2063 2064 PJDLOG_ASSERT(res->hr_remotein == NULL); 2065 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2066 /* 2067 * Upgrade the lock. It doesn't have to be atomic as no other thread 2068 * can change connection status from disconnected to connected. 2069 */ 2070 rw_unlock(&hio_remote_lock[ncomp]); 2071 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2072 res->hr_remoteaddr); 2073 in = out = NULL; 2074 if (init_remote(res, &in, &out)) { 2075 rw_wlock(&hio_remote_lock[ncomp]); 2076 PJDLOG_ASSERT(res->hr_remotein == NULL); 2077 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2078 PJDLOG_ASSERT(in != NULL && out != NULL); 2079 res->hr_remotein = in; 2080 res->hr_remoteout = out; 2081 rw_unlock(&hio_remote_lock[ncomp]); 2082 pjdlog_info("Successfully reconnected to %s.", 2083 res->hr_remoteaddr); 2084 sync_start(); 2085 } else { 2086 /* Both connections should be NULL. */ 2087 PJDLOG_ASSERT(res->hr_remotein == NULL); 2088 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2089 PJDLOG_ASSERT(in == NULL && out == NULL); 2090 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2091 res->hr_remoteaddr); 2092 } 2093} 2094 2095/* 2096 * Thread guards remote connections and reconnects when needed, handles 2097 * signals, etc. 2098 */ 2099static void * 2100guard_thread(void *arg) 2101{ 2102 struct hast_resource *res = arg; 2103 unsigned int ii, ncomps; 2104 struct timespec timeout; 2105 time_t lastcheck, now; 2106 sigset_t mask; 2107 int signo; 2108 2109 ncomps = HAST_NCOMPONENTS; 2110 lastcheck = time(NULL); 2111 2112 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2113 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2114 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2115 2116 timeout.tv_sec = HAST_KEEPALIVE; 2117 timeout.tv_nsec = 0; 2118 signo = -1; 2119 2120 for (;;) { 2121 switch (signo) { 2122 case SIGINT: 2123 case SIGTERM: 2124 sigexit_received = true; 2125 primary_exitx(EX_OK, 2126 "Termination signal received, exiting."); 2127 break; 2128 default: 2129 break; 2130 } 2131 2132 pjdlog_debug(2, "remote_guard: Checking connections."); 2133 now = time(NULL); 2134 if (lastcheck + HAST_KEEPALIVE <= now) { 2135 for (ii = 0; ii < ncomps; ii++) 2136 guard_one(res, ii); 2137 lastcheck = now; 2138 } 2139 signo = sigtimedwait(&mask, NULL, &timeout); 2140 } 2141 /* NOTREACHED */ 2142 return (NULL); 2143} 2144