primary.c revision 222467
1139825Simp/*- 2107868Siedowse * Copyright (c) 2009 The FreeBSD Foundation 379561Siedowse * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 479561Siedowse * All rights reserved. 579561Siedowse * 679561Siedowse * This software was developed by Pawel Jakub Dawidek under sponsorship from 779561Siedowse * the FreeBSD Foundation. 879561Siedowse * 979561Siedowse * Redistribution and use in source and binary forms, with or without 1079561Siedowse * modification, are permitted provided that the following conditions 1179561Siedowse * are met: 1279561Siedowse * 1. Redistributions of source code must retain the above copyright 1379561Siedowse * notice, this list of conditions and the following disclaimer. 1479561Siedowse * 2. Redistributions in binary form must reproduce the above copyright 1579561Siedowse * notice, this list of conditions and the following disclaimer in the 1679561Siedowse * documentation and/or other materials provided with the distribution. 1779561Siedowse * 1879561Siedowse * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 1979561Siedowse * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 2079561Siedowse * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2179561Siedowse * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 2279561Siedowse * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2379561Siedowse * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2479561Siedowse * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25116192Sobrien * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2679561Siedowse * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2779561Siedowse * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2879561Siedowse * SUCH DAMAGE. 2979561Siedowse */ 30116192Sobrien 31116192Sobrien#include <sys/cdefs.h> 32116192Sobrien__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 222467 2011-05-29 21:20:47Z trociny $"); 3379561Siedowse 3479561Siedowse#include <sys/types.h> 3579561Siedowse#include <sys/time.h> 3679561Siedowse#include <sys/bio.h> 3779561Siedowse#include <sys/disk.h> 3879561Siedowse#include <sys/refcount.h> 3979561Siedowse#include <sys/stat.h> 4084811Sjhb 4179561Siedowse#include <geom/gate/g_gate.h> 4279561Siedowse 4379561Siedowse#include <err.h> 4479561Siedowse#include <errno.h> 4579561Siedowse#include <fcntl.h> 4679561Siedowse#include <libgeom.h> 4779561Siedowse#include <pthread.h> 4879561Siedowse#include <signal.h> 49183080Sjhb#include <stdint.h> 5079561Siedowse#include <stdio.h> 51183080Sjhb#include <string.h> 5292768Sjeff#include <sysexits.h> 5379561Siedowse#include <unistd.h> 5479561Siedowse 5579561Siedowse#include <activemap.h> 5679561Siedowse#include <nv.h> 5779561Siedowse#include <rangelock.h> 5879561Siedowse 5979561Siedowse#include "control.h" 6079561Siedowse#include "event.h" 6179561Siedowse#include "hast.h" 6279561Siedowse#include "hast_proto.h" 6392807Sdwmalone#include "hastd.h" 6479561Siedowse#include "hooks.h" 6592098Siedowse#include "metadata.h" 6679561Siedowse#include "proto.h" 67151897Srwatson#include "pjdlog.h" 6879561Siedowse#include "subr.h" 69141631Sphk#include "synch.h" 7079561Siedowse 7179561Siedowse/* The is only one remote component for now. */ 7279561Siedowse#define ISREMOTE(no) ((no) == 1) 7379561Siedowse 7479561Siedowsestruct hio { 7579561Siedowse /* 7679561Siedowse * Number of components we are still waiting for. 7779561Siedowse * When this field goes to 0, we can send the request back to the 7879561Siedowse * kernel. Each component has to decrease this counter by one 7979561Siedowse * even on failure. 8079561Siedowse */ 8185512Siedowse unsigned int hio_countdown; 8279561Siedowse /* 8379561Siedowse * Each component has a place to store its own error. 8479561Siedowse * Once the request is handled by all components we can decide if the 8579561Siedowse * request overall is successful or not. 8679561Siedowse */ 8779561Siedowse int *hio_errors; 8879561Siedowse /* 8979561Siedowse * Structure used to communicate with GEOM Gate class. 9079561Siedowse */ 9179561Siedowse struct g_gate_ctl_io hio_ggio; 9279561Siedowse TAILQ_ENTRY(hio) *hio_next; 93178110Sjeff}; 9479561Siedowse#define hio_free_next hio_next[0] 9592768Sjeff#define hio_done_next hio_next[0] 9679561Siedowse 97125854Sdwmalone/* 98125854Sdwmalone * Free list holds unused structures. When free list is empty, we have to wait 99125854Sdwmalone * until some in-progress requests are freed. 100125854Sdwmalone */ 101178110Sjeffstatic TAILQ_HEAD(, hio) hio_free_list; 102183080Sjhbstatic pthread_mutex_t hio_free_list_lock; 103125854Sdwmalonestatic pthread_cond_t hio_free_list_cond; 10479561Siedowse/* 10579561Siedowse * There is one send list for every component. One requests is placed on all 10679561Siedowse * send lists - each component gets the same request, but each component is 10779561Siedowse * responsible for managing his own send list. 10879561Siedowse */ 10979561Siedowsestatic TAILQ_HEAD(, hio) *hio_send_list; 11079561Siedowsestatic pthread_mutex_t *hio_send_list_lock; 111178110Sjeffstatic pthread_cond_t *hio_send_list_cond; 11279561Siedowse/* 113178110Sjeff * There is one recv list for every component, although local components don't 114178110Sjeff * use recv lists as local requests are done synchronously. 115183080Sjhb */ 116183080Sjhbstatic TAILQ_HEAD(, hio) *hio_recv_list; 117183080Sjhbstatic pthread_mutex_t *hio_recv_list_lock; 118183080Sjhbstatic pthread_cond_t *hio_recv_list_cond; 119183080Sjhb/* 120183080Sjhb * Request is placed on done list by the slowest component (the one that 121178110Sjeff * decreased hio_countdown from 1 to 0). 122178110Sjeff */ 123178110Sjeffstatic TAILQ_HEAD(, hio) hio_done_list; 124178110Sjeffstatic pthread_mutex_t hio_done_list_lock; 125178110Sjeffstatic pthread_cond_t hio_done_list_cond; 126178110Sjeff/* 127178110Sjeff * Structure below are for interaction with sync thread. 128178110Sjeff */ 129187474Sjhbstatic bool sync_inprogress; 130187474Sjhbstatic pthread_mutex_t sync_lock; 131187474Sjhbstatic pthread_cond_t sync_cond; 132187474Sjhb/* 133187474Sjhb * The lock below allows to synchornize access to remote connections. 134187474Sjhb */ 135187474Sjhbstatic pthread_rwlock_t *hio_remote_lock; 136187474Sjhb 137187474Sjhb/* 138187474Sjhb * Lock to synchronize metadata updates. Also synchronize access to 139187474Sjhb * hr_primary_localcnt and hr_primary_remotecnt fields. 140187474Sjhb */ 14179561Siedowsestatic pthread_mutex_t metadata_lock; 14279561Siedowse 143183080Sjhb/* 144183080Sjhb * Maximum number of outstanding I/O requests. 145183080Sjhb */ 146183080Sjhb#define HAST_HIO_MAX 256 147183080Sjhb/* 148183080Sjhb * Number of components. At this point there are only two components: local 149183080Sjhb * and remote, but in the future it might be possible to use multiple local 150183080Sjhb * and remote components. 151183080Sjhb */ 152183080Sjhb#define HAST_NCOMPONENTS 2 153183080Sjhb 154183080Sjhb#define ISCONNECTED(res, no) \ 155183080Sjhb ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156183080Sjhb 157183080Sjhb#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158183080Sjhb bool _wakeup; \ 159183080Sjhb \ 16079561Siedowse mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161178110Sjeff _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162178110Sjeff TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163178110Sjeff hio_next[(ncomp)]); \ 164178110Sjeff mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165178110Sjeff if (_wakeup) \ 166178110Sjeff cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167183080Sjhb} while (0) 168178110Sjeff#define QUEUE_INSERT2(hio, name) do { \ 169178110Sjeff bool _wakeup; \ 170178110Sjeff \ 171178110Sjeff mtx_lock(&hio_##name##_list_lock); \ 172178110Sjeff _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173178110Sjeff TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174178110Sjeff mtx_unlock(&hio_##name##_list_lock); \ 175178110Sjeff if (_wakeup) \ 176178110Sjeff cv_signal(&hio_##name##_list_cond); \ 177178110Sjeff} while (0) 178178110Sjeff#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 179178110Sjeff bool _last; \ 180178110Sjeff \ 181178110Sjeff mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 182178110Sjeff _last = false; \ 183178110Sjeff while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 184178110Sjeff cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 185178110Sjeff &hio_##name##_list_lock[(ncomp)], (timeout)); \ 186178110Sjeff if ((timeout) != 0) \ 187178110Sjeff _last = true; \ 188178110Sjeff } \ 189185102Sjhb if (hio != NULL) { \ 190178110Sjeff TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 191184205Sdes hio_next[(ncomp)]); \ 192178110Sjeff } \ 193178110Sjeff mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 194178110Sjeff} while (0) 195183080Sjhb#define QUEUE_TAKE2(hio, name) do { \ 196184651Sjhb mtx_lock(&hio_##name##_list_lock); \ 197184651Sjhb while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 198184651Sjhb cv_wait(&hio_##name##_list_cond, \ 199184651Sjhb &hio_##name##_list_lock); \ 200184651Sjhb } \ 201184651Sjhb TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 202184651Sjhb mtx_unlock(&hio_##name##_list_lock); \ 203184651Sjhb} while (0) 204184651Sjhb 205184651Sjhb#define SYNCREQ(hio) do { \ 206183080Sjhb (hio)->hio_ggio.gctl_unit = -1; \ 207178110Sjeff (hio)->hio_ggio.gctl_seq = 1; \ 208178110Sjeff} while (0) 209178110Sjeff#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 210178110Sjeff#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 211178110Sjeff#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 212178110Sjeff 213178110Sjeffstatic struct hast_resource *gres; 214178110Sjeff 215178110Sjeffstatic pthread_mutex_t range_lock; 216178110Sjeffstatic struct rangelocks *range_regular; 217178110Sjeffstatic bool range_regular_wait; 218178110Sjeffstatic pthread_cond_t range_regular_cond; 219178110Sjeffstatic struct rangelocks *range_sync; 220178110Sjeffstatic bool range_sync_wait; 221183080Sjhbstatic pthread_cond_t range_sync_cond; 222183080Sjhbstatic bool fullystarted; 223183080Sjhb 224183080Sjhbstatic void *ggate_recv_thread(void *arg); 225183080Sjhbstatic void *local_send_thread(void *arg); 226183080Sjhbstatic void *remote_send_thread(void *arg); 227178110Sjeffstatic void *remote_recv_thread(void *arg); 228183080Sjhbstatic void *ggate_send_thread(void *arg); 229178110Sjeffstatic void *sync_thread(void *arg); 230183080Sjhbstatic void *guard_thread(void *arg); 231178110Sjeff 232183080Sjhbstatic void 233178110Sjeffcleanup(struct hast_resource *res) 234178110Sjeff{ 235183080Sjhb int rerrno; 236183080Sjhb 237183080Sjhb /* Remember errno. */ 238178110Sjeff rerrno = errno; 239178110Sjeff 240178110Sjeff /* Destroy ggate provider if we created one. */ 241178110Sjeff if (res->hr_ggateunit >= 0) { 242178110Sjeff struct g_gate_ctl_destroy ggiod; 243183080Sjhb 244183080Sjhb bzero(&ggiod, sizeof(ggiod)); 245178110Sjeff ggiod.gctl_version = G_GATE_VERSION; 246183080Sjhb ggiod.gctl_unit = res->hr_ggateunit; 247178110Sjeff ggiod.gctl_force = 1; 248183080Sjhb if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 249178110Sjeff pjdlog_errno(LOG_WARNING, 250178110Sjeff "Unable to destroy hast/%s device", 251178110Sjeff res->hr_provname); 252183080Sjhb } 253183080Sjhb res->hr_ggateunit = -1; 254178110Sjeff } 255178110Sjeff 256178110Sjeff /* Restore errno. */ 257178110Sjeff errno = rerrno; 258178110Sjeff} 259178110Sjeff 260178110Sjeffstatic __dead2 void 261178110Sjeffprimary_exit(int exitcode, const char *fmt, ...) 262178110Sjeff{ 263178110Sjeff va_list ap; 264178110Sjeff 265178110Sjeff PJDLOG_ASSERT(exitcode != EX_OK); 266178110Sjeff va_start(ap, fmt); 267178110Sjeff pjdlogv_errno(LOG_ERR, fmt, ap); 268178110Sjeff va_end(ap); 269178110Sjeff cleanup(gres); 270178110Sjeff exit(exitcode); 271178110Sjeff} 272178110Sjeff 273178110Sjeffstatic __dead2 void 274178110Sjeffprimary_exitx(int exitcode, const char *fmt, ...) 275183080Sjhb{ 276178110Sjeff va_list ap; 277178110Sjeff 278178110Sjeff va_start(ap, fmt); 279178110Sjeff pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 280178110Sjeff va_end(ap); 281178110Sjeff cleanup(gres); 282178110Sjeff exit(exitcode); 283178110Sjeff} 284178110Sjeff 285178110Sjeffstatic int 286178110Sjeffhast_activemap_flush(struct hast_resource *res) 287178110Sjeff{ 288178110Sjeff const unsigned char *buf; 289178110Sjeff size_t size; 290178110Sjeff 291178110Sjeff buf = activemap_bitmap(res->hr_amp, &size); 292178110Sjeff PJDLOG_ASSERT(buf != NULL); 293178110Sjeff PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 294183080Sjhb if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 295178110Sjeff (ssize_t)size) { 296178110Sjeff KEEP_ERRNO(pjdlog_errno(LOG_ERR, 297178110Sjeff "Unable to flush activemap to disk")); 298178110Sjeff return (-1); 299178110Sjeff } 300178110Sjeff return (0); 301183080Sjhb} 302183080Sjhb 303183080Sjhbstatic bool 304183080Sjhbreal_remote(const struct hast_resource *res) 305183080Sjhb{ 306183080Sjhb 307183080Sjhb return (strcmp(res->hr_remoteaddr, "none") != 0); 308183080Sjhb} 309183080Sjhb 310183080Sjhbstatic void 311183080Sjhbinit_environment(struct hast_resource *res __unused) 312178110Sjeff{ 313183080Sjhb struct hio *hio; 314183080Sjhb unsigned int ii, ncomps; 315183080Sjhb 316183080Sjhb /* 317183080Sjhb * In the future it might be per-resource value. 318183080Sjhb */ 319183080Sjhb ncomps = HAST_NCOMPONENTS; 320178110Sjeff 321183080Sjhb /* 322178110Sjeff * Allocate memory needed by lists. 323178110Sjeff */ 324178110Sjeff hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 325178110Sjeff if (hio_send_list == NULL) { 326178110Sjeff primary_exitx(EX_TEMPFAIL, 32779561Siedowse "Unable to allocate %zu bytes of memory for send lists.", 32879561Siedowse sizeof(hio_send_list[0]) * ncomps); 32979561Siedowse } 33079561Siedowse hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 33179561Siedowse if (hio_send_list_lock == NULL) { 33279561Siedowse primary_exitx(EX_TEMPFAIL, 33379561Siedowse "Unable to allocate %zu bytes of memory for send list locks.", 33479561Siedowse sizeof(hio_send_list_lock[0]) * ncomps); 33579561Siedowse } 33679561Siedowse hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 33779561Siedowse if (hio_send_list_cond == NULL) { 33879561Siedowse primary_exitx(EX_TEMPFAIL, 33979561Siedowse "Unable to allocate %zu bytes of memory for send list condition variables.", 340178110Sjeff sizeof(hio_send_list_cond[0]) * ncomps); 341178110Sjeff } 342178110Sjeff hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 343178110Sjeff if (hio_recv_list == NULL) { 344178110Sjeff primary_exitx(EX_TEMPFAIL, 34579561Siedowse "Unable to allocate %zu bytes of memory for recv lists.", 346178110Sjeff sizeof(hio_recv_list[0]) * ncomps); 347178110Sjeff } 348178110Sjeff hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 34979561Siedowse if (hio_recv_list_lock == NULL) { 350178110Sjeff primary_exitx(EX_TEMPFAIL, 35179561Siedowse "Unable to allocate %zu bytes of memory for recv list locks.", 352178110Sjeff sizeof(hio_recv_list_lock[0]) * ncomps); 353178110Sjeff } 35482364Siedowse hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 355178110Sjeff if (hio_recv_list_cond == NULL) { 356178110Sjeff primary_exitx(EX_TEMPFAIL, 35782364Siedowse "Unable to allocate %zu bytes of memory for recv list condition variables.", 35879561Siedowse sizeof(hio_recv_list_cond[0]) * ncomps); 35979561Siedowse } 36079561Siedowse hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 36179561Siedowse if (hio_remote_lock == NULL) { 36279561Siedowse primary_exitx(EX_TEMPFAIL, 36379561Siedowse "Unable to allocate %zu bytes of memory for remote connections locks.", 36479561Siedowse sizeof(hio_remote_lock[0]) * ncomps); 36579561Siedowse } 36679561Siedowse 36779561Siedowse /* 36879561Siedowse * Initialize lists, their locks and theirs condition variables. 36979561Siedowse */ 370125854Sdwmalone TAILQ_INIT(&hio_free_list); 37179561Siedowse mtx_init(&hio_free_list_lock); 372125854Sdwmalone cv_init(&hio_free_list_cond); 37379561Siedowse for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 374178110Sjeff TAILQ_INIT(&hio_send_list[ii]); 37579561Siedowse mtx_init(&hio_send_list_lock[ii]); 37679561Siedowse cv_init(&hio_send_list_cond[ii]); 377178110Sjeff TAILQ_INIT(&hio_recv_list[ii]); 378125854Sdwmalone mtx_init(&hio_recv_list_lock[ii]); 37979561Siedowse cv_init(&hio_recv_list_cond[ii]); 38079561Siedowse rw_init(&hio_remote_lock[ii]); 381125854Sdwmalone } 38279561Siedowse TAILQ_INIT(&hio_done_list); 383178110Sjeff mtx_init(&hio_done_list_lock); 384178110Sjeff cv_init(&hio_done_list_cond); 385178110Sjeff mtx_init(&metadata_lock); 386178110Sjeff 387178110Sjeff /* 388178110Sjeff * Allocate requests pool and initialize requests. 389178110Sjeff */ 390178110Sjeff for (ii = 0; ii < HAST_HIO_MAX; ii++) { 391178110Sjeff hio = malloc(sizeof(*hio)); 392178110Sjeff if (hio == NULL) { 393178110Sjeff primary_exitx(EX_TEMPFAIL, 394178110Sjeff "Unable to allocate %zu bytes of memory for hio request.", 395178110Sjeff sizeof(*hio)); 396178110Sjeff } 39779561Siedowse hio->hio_countdown = 0; 39879561Siedowse hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 39979561Siedowse if (hio->hio_errors == NULL) { 40079561Siedowse primary_exitx(EX_TEMPFAIL, 401184205Sdes "Unable allocate %zu bytes of memory for hio errors.", 40279561Siedowse sizeof(hio->hio_errors[0]) * ncomps); 403178110Sjeff } 404178110Sjeff hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 405184205Sdes if (hio->hio_next == NULL) { 40679561Siedowse primary_exitx(EX_TEMPFAIL, 407178110Sjeff "Unable allocate %zu bytes of memory for hio_next field.", 40879561Siedowse sizeof(hio->hio_next[0]) * ncomps); 40979561Siedowse } 410125854Sdwmalone hio->hio_ggio.gctl_version = G_GATE_VERSION; 41179561Siedowse hio->hio_ggio.gctl_data = malloc(MAXPHYS); 41279561Siedowse if (hio->hio_ggio.gctl_data == NULL) { 41379561Siedowse primary_exitx(EX_TEMPFAIL, 41479561Siedowse "Unable to allocate %zu bytes of memory for gctl_data.", 41579561Siedowse MAXPHYS); 41679561Siedowse } 41779561Siedowse hio->hio_ggio.gctl_length = MAXPHYS; 41879561Siedowse hio->hio_ggio.gctl_error = 0; 41979561Siedowse TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 42079561Siedowse } 42179561Siedowse} 42279561Siedowse 42379561Siedowsestatic bool 42479561Siedowseinit_resuid(struct hast_resource *res) 42579561Siedowse{ 42679561Siedowse 42779561Siedowse mtx_lock(&metadata_lock); 42879561Siedowse if (res->hr_resuid != 0) { 42979561Siedowse mtx_unlock(&metadata_lock); 43079561Siedowse return (false); 43179561Siedowse } else { 43279561Siedowse /* Initialize unique resource identifier. */ 43379561Siedowse arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 43479561Siedowse mtx_unlock(&metadata_lock); 43579561Siedowse if (metadata_write(res) < 0) 43679561Siedowse exit(EX_NOINPUT); 43779561Siedowse return (true); 43879561Siedowse } 43979561Siedowse} 44079561Siedowse 44179561Siedowsestatic void 44279561Siedowseinit_local(struct hast_resource *res) 44379561Siedowse{ 44479561Siedowse unsigned char *buf; 44579561Siedowse size_t mapsize; 44679561Siedowse 44779561Siedowse if (metadata_read(res, true) < 0) 44879561Siedowse exit(EX_NOINPUT); 44979561Siedowse mtx_init(&res->hr_amp_lock); 450125854Sdwmalone if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 45179561Siedowse res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 45279561Siedowse primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 453125854Sdwmalone } 454183080Sjhb mtx_init(&range_lock); 45579561Siedowse cv_init(&range_regular_cond); 45679561Siedowse if (rangelock_init(&range_regular) < 0) 45779561Siedowse primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 458178110Sjeff cv_init(&range_sync_cond); 45979561Siedowse if (rangelock_init(&range_sync) < 0) 46079561Siedowse primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 46179561Siedowse mapsize = activemap_ondisk_size(res->hr_amp); 46279561Siedowse buf = calloc(1, mapsize); 46379561Siedowse if (buf == NULL) { 46479561Siedowse primary_exitx(EX_TEMPFAIL, 465178110Sjeff "Unable to allocate buffer for activemap."); 466178110Sjeff } 46779561Siedowse if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 46879561Siedowse (ssize_t)mapsize) { 469178110Sjeff primary_exit(EX_NOINPUT, "Unable to read activemap"); 470178110Sjeff } 47179561Siedowse activemap_copyin(res->hr_amp, buf, mapsize); 472178110Sjeff free(buf); 473183080Sjhb if (res->hr_resuid != 0) 474178110Sjeff return; 475178110Sjeff /* 476178110Sjeff * We're using provider for the first time. Initialize local and remote 477178110Sjeff * counters. We don't initialize resuid here, as we want to do it just 478178110Sjeff * in time. The reason for this is that we want to inform secondary 479178110Sjeff * that there were no writes yet, so there is no need to synchronize 480178110Sjeff * anything. 481178110Sjeff */ 482178110Sjeff res->hr_primary_localcnt = 0; 483183080Sjhb res->hr_primary_remotecnt = 0; 484178110Sjeff if (metadata_write(res) < 0) 485183280Sjhb exit(EX_NOINPUT); 486183280Sjhb} 487183280Sjhb 488183280Sjhbstatic int 489183280Sjhbprimary_connect(struct hast_resource *res, struct proto_conn **connp) 490183280Sjhb{ 491183280Sjhb struct proto_conn *conn; 492183280Sjhb int16_t val; 493183280Sjhb 494183280Sjhb val = 1; 495183080Sjhb if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 496183080Sjhb primary_exit(EX_TEMPFAIL, 497183080Sjhb "Unable to send connection request to parent"); 498183080Sjhb } 499178110Sjeff if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 500183080Sjhb primary_exit(EX_TEMPFAIL, 501183080Sjhb "Unable to receive reply to connection request from parent"); 502178110Sjeff } 503178110Sjeff if (val != 0) { 504178110Sjeff errno = val; 505178110Sjeff pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 506178110Sjeff res->hr_remoteaddr); 507178110Sjeff return (-1); 508178110Sjeff } 509184205Sdes if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 510178110Sjeff primary_exit(EX_TEMPFAIL, 511184205Sdes "Unable to receive connection from parent"); 512178110Sjeff } 513183080Sjhb if (proto_connect_wait(conn, res->hr_timeout) < 0) { 514178110Sjeff pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 515183080Sjhb res->hr_remoteaddr); 516178110Sjeff proto_close(conn); 517183080Sjhb return (-1); 51879561Siedowse } 51979561Siedowse /* Error in setting timeout is not critical, but why should it fail? */ 52079561Siedowse if (proto_timeout(conn, res->hr_timeout) < 0) 52179561Siedowse pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 52279561Siedowse 52379561Siedowse *connp = conn; 52479561Siedowse 52579690Siedowse return (0); 52679690Siedowse} 52779561Siedowse 52879561Siedowsestatic int 52979561Siedowseinit_remote(struct hast_resource *res, struct proto_conn **inp, 530178110Sjeff struct proto_conn **outp) 531178110Sjeff{ 53279561Siedowse struct proto_conn *in, *out; 53379561Siedowse struct nv *nvout, *nvin; 53479561Siedowse const unsigned char *token; 53579690Siedowse unsigned char *map; 53679561Siedowse const char *errmsg; 53779561Siedowse int32_t extentsize; 53879561Siedowse int64_t datasize; 53979561Siedowse uint32_t mapsize; 54079561Siedowse size_t size; 54179561Siedowse int error; 54279561Siedowse 543178110Sjeff PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 54479561Siedowse PJDLOG_ASSERT(real_remote(res)); 545178110Sjeff 546178110Sjeff in = out = NULL; 547178110Sjeff errmsg = NULL; 548178110Sjeff 54979561Siedowse if (primary_connect(res, &out) == -1) 55079561Siedowse return (ECONNREFUSED); 551178110Sjeff 55279561Siedowse error = ECONNABORTED; 553178110Sjeff 55479561Siedowse /* 55579561Siedowse * First handshake step. 55679561Siedowse * Setup outgoing connection with remote node. 55779561Siedowse */ 55879561Siedowse nvout = nv_alloc(); 55979561Siedowse nv_add_string(nvout, res->hr_name, "resource"); 56079561Siedowse if (nv_error(nvout) != 0) { 561178110Sjeff pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 56279561Siedowse "Unable to allocate header for connection with %s", 56379561Siedowse res->hr_remoteaddr); 56479561Siedowse nv_free(nvout); 56579561Siedowse goto close; 56679561Siedowse } 56779561Siedowse if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 56879561Siedowse pjdlog_errno(LOG_WARNING, 56979561Siedowse "Unable to send handshake header to %s", 57079561Siedowse res->hr_remoteaddr); 57179561Siedowse nv_free(nvout); 572178110Sjeff goto close; 57379561Siedowse } 57479561Siedowse nv_free(nvout); 57579561Siedowse if (hast_proto_recv_hdr(out, &nvin) < 0) { 57679561Siedowse pjdlog_errno(LOG_WARNING, 57779561Siedowse "Unable to receive handshake header from %s", 57879561Siedowse res->hr_remoteaddr); 57979561Siedowse goto close; 58079561Siedowse } 58179561Siedowse errmsg = nv_get_string(nvin, "errmsg"); 58279561Siedowse if (errmsg != NULL) { 58379561Siedowse pjdlog_warning("%s", errmsg); 58479561Siedowse if (nv_exists(nvin, "wait")) 58579561Siedowse error = EBUSY; 58679561Siedowse nv_free(nvin); 58779561Siedowse goto close; 58879561Siedowse } 58979561Siedowse token = nv_get_uint8_array(nvin, &size, "token"); 59086350Siedowse if (token == NULL) { 59179561Siedowse pjdlog_warning("Handshake header from %s has no 'token' field.", 59279561Siedowse res->hr_remoteaddr); 59379561Siedowse nv_free(nvin); 59479561Siedowse goto close; 59579561Siedowse } 596149178Siedowse if (size != sizeof(res->hr_token)) { 59779561Siedowse pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 59879561Siedowse res->hr_remoteaddr, size, sizeof(res->hr_token)); 59979561Siedowse nv_free(nvin); 60079561Siedowse goto close; 60179561Siedowse } 60279561Siedowse bcopy(token, res->hr_token, sizeof(res->hr_token)); 60379561Siedowse nv_free(nvin); 60479561Siedowse 60579561Siedowse /* 60679561Siedowse * Second handshake step. 60779561Siedowse * Setup incoming connection with remote node. 60879561Siedowse */ 60979561Siedowse if (primary_connect(res, &in) == -1) 61079561Siedowse goto close; 61179561Siedowse 61279561Siedowse nvout = nv_alloc(); 613178110Sjeff nv_add_string(nvout, res->hr_name, "resource"); 614178110Sjeff nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 615178110Sjeff "token"); 616178110Sjeff if (res->hr_resuid == 0) { 61779561Siedowse /* 61879561Siedowse * The resuid field was not yet initialized. 61979561Siedowse * Because we do synchronization inside init_resuid(), it is 62079561Siedowse * possible that someone already initialized it, the function 62179561Siedowse * will return false then, but if we successfully initialized 622178110Sjeff * it, we will get true. True means that there were no writes 623178110Sjeff * to this resource yet and we want to inform secondary that 62479561Siedowse * synchronization is not needed by sending "virgin" argument. 62579561Siedowse */ 62679561Siedowse if (init_resuid(res)) 62779561Siedowse nv_add_int8(nvout, 1, "virgin"); 62879561Siedowse } 62979561Siedowse nv_add_uint64(nvout, res->hr_resuid, "resuid"); 63079561Siedowse nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 63179561Siedowse nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 63279561Siedowse if (nv_error(nvout) != 0) { 633178110Sjeff pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 634178110Sjeff "Unable to allocate header for connection with %s", 63579561Siedowse res->hr_remoteaddr); 63679561Siedowse nv_free(nvout); 63779561Siedowse goto close; 63879561Siedowse } 63979561Siedowse if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 64079561Siedowse pjdlog_errno(LOG_WARNING, 64179561Siedowse "Unable to send handshake header to %s", 64279561Siedowse res->hr_remoteaddr); 64379561Siedowse nv_free(nvout); 64479561Siedowse goto close; 64579690Siedowse } 64679561Siedowse nv_free(nvout); 647178110Sjeff if (hast_proto_recv_hdr(out, &nvin) < 0) { 64879561Siedowse pjdlog_errno(LOG_WARNING, 64979561Siedowse "Unable to receive handshake header from %s", 65079561Siedowse res->hr_remoteaddr); 65179561Siedowse goto close; 65279561Siedowse } 65379561Siedowse errmsg = nv_get_string(nvin, "errmsg"); 65479561Siedowse if (errmsg != NULL) { 65579561Siedowse pjdlog_warning("%s", errmsg); 65679561Siedowse nv_free(nvin); 65779561Siedowse goto close; 65879561Siedowse } 65979561Siedowse datasize = nv_get_int64(nvin, "datasize"); 660178110Sjeff if (datasize != res->hr_datasize) { 661178110Sjeff pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 662178110Sjeff (intmax_t)res->hr_datasize, (intmax_t)datasize); 66379561Siedowse nv_free(nvin); 66479561Siedowse goto close; 665178110Sjeff } 66679561Siedowse extentsize = nv_get_int32(nvin, "extentsize"); 66779561Siedowse if (extentsize != res->hr_extentsize) { 66879561Siedowse pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 66979561Siedowse (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 67079561Siedowse nv_free(nvin); 67179561Siedowse goto close; 67279561Siedowse } 67379561Siedowse res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 67479561Siedowse res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 67579561Siedowse res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 67679561Siedowse if (nv_exists(nvin, "virgin")) { 67779561Siedowse /* 67879561Siedowse * Secondary was reinitialized, bump localcnt if it is 0 as 67979561Siedowse * only we have the data. 68079561Siedowse */ 68179561Siedowse PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 68279561Siedowse PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 68379561Siedowse 68479561Siedowse if (res->hr_primary_localcnt == 0) { 68579561Siedowse PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 68679561Siedowse 68779561Siedowse mtx_lock(&metadata_lock); 68879561Siedowse res->hr_primary_localcnt++; 68979561Siedowse pjdlog_debug(1, "Increasing localcnt to %ju.", 69079561Siedowse (uintmax_t)res->hr_primary_localcnt); 69179561Siedowse (void)metadata_write(res); 69279561Siedowse mtx_unlock(&metadata_lock); 693178110Sjeff } 694178110Sjeff } 695178110Sjeff map = NULL; 696178110Sjeff mapsize = nv_get_uint32(nvin, "mapsize"); 69779561Siedowse if (mapsize > 0) { 69879561Siedowse map = malloc(mapsize); 69979561Siedowse if (map == NULL) { 70079561Siedowse pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 70179561Siedowse (uintmax_t)mapsize); 70279561Siedowse nv_free(nvin); 703178110Sjeff goto close; 70479561Siedowse } 70579561Siedowse /* 70679561Siedowse * Remote node have some dirty extents on its own, lets 70779561Siedowse * download its activemap. 70879561Siedowse */ 70979561Siedowse if (hast_proto_recv_data(res, out, nvin, map, 71079561Siedowse mapsize) < 0) { 71179561Siedowse pjdlog_errno(LOG_ERR, 71279561Siedowse "Unable to receive remote activemap"); 71379561Siedowse nv_free(nvin); 71479561Siedowse free(map); 71579561Siedowse goto close; 71679561Siedowse } 71779561Siedowse /* 71879561Siedowse * Merge local and remote bitmaps. 71979561Siedowse */ 72079561Siedowse activemap_merge(res->hr_amp, map, mapsize); 72179561Siedowse free(map); 72279561Siedowse /* 72379561Siedowse * Now that we merged bitmaps from both nodes, flush it to the 72479561Siedowse * disk before we start to synchronize. 72579561Siedowse */ 72679561Siedowse (void)hast_activemap_flush(res); 72779561Siedowse } 72879561Siedowse nv_free(nvin); 72979561Siedowse /* Setup directions. */ 73079561Siedowse if (proto_send(out, NULL, 0) == -1) 73179561Siedowse pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 73279561Siedowse if (proto_recv(in, NULL, 0) == -1) 73379561Siedowse pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 73479561Siedowse pjdlog_info("Connected to %s.", res->hr_remoteaddr); 73579561Siedowse if (inp != NULL && outp != NULL) { 73679561Siedowse *inp = in; 73779561Siedowse *outp = out; 73879561Siedowse } else { 73979561Siedowse res->hr_remotein = in; 74079561Siedowse res->hr_remoteout = out; 74179561Siedowse } 74279561Siedowse event_send(res, EVENT_CONNECT); 74379561Siedowse return (0); 74479561Siedowseclose: 74579561Siedowse if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 74679561Siedowse event_send(res, EVENT_SPLITBRAIN); 74779561Siedowse proto_close(out); 74879561Siedowse if (in != NULL) 74979561Siedowse proto_close(in); 75079561Siedowse return (error); 75179561Siedowse} 75279561Siedowse 75379561Siedowsestatic void 75479561Siedowsesync_start(void) 75579561Siedowse{ 75679561Siedowse 75779561Siedowse mtx_lock(&sync_lock); 75879561Siedowse sync_inprogress = true; 75979561Siedowse mtx_unlock(&sync_lock); 76079561Siedowse cv_signal(&sync_cond); 76179561Siedowse} 76279561Siedowse 76379561Siedowsestatic void 76479561Siedowsesync_stop(void) 76579561Siedowse{ 766178110Sjeff 767178110Sjeff mtx_lock(&sync_lock); 768178110Sjeff if (sync_inprogress) 769178110Sjeff sync_inprogress = false; 77079561Siedowse mtx_unlock(&sync_lock); 771178110Sjeff} 77279561Siedowse 77379561Siedowsestatic void 77479561Siedowseinit_ggate(struct hast_resource *res) 77579561Siedowse{ 77679561Siedowse struct g_gate_ctl_create ggiocreate; 777178110Sjeff struct g_gate_ctl_cancel ggiocancel; 77879561Siedowse 77979561Siedowse /* 78079561Siedowse * We communicate with ggate via /dev/ggctl. Open it. 78179561Siedowse */ 78279561Siedowse res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 78379561Siedowse if (res->hr_ggatefd < 0) 78479561Siedowse primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 78579561Siedowse /* 78679561Siedowse * Create provider before trying to connect, as connection failure 78779561Siedowse * is not critical, but may take some time. 78879561Siedowse */ 78979561Siedowse bzero(&ggiocreate, sizeof(ggiocreate)); 79079561Siedowse ggiocreate.gctl_version = G_GATE_VERSION; 79179561Siedowse ggiocreate.gctl_mediasize = res->hr_datasize; 792178110Sjeff ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 79379561Siedowse ggiocreate.gctl_flags = 0; 794178110Sjeff ggiocreate.gctl_maxcount = 0; 79579561Siedowse ggiocreate.gctl_timeout = 0; 79679561Siedowse ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 79779561Siedowse snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 79879561Siedowse res->hr_provname); 79979561Siedowse if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 80079561Siedowse pjdlog_info("Device hast/%s created.", res->hr_provname); 80179561Siedowse res->hr_ggateunit = ggiocreate.gctl_unit; 802178110Sjeff return; 80379561Siedowse } 80479561Siedowse if (errno != EEXIST) { 80579561Siedowse primary_exit(EX_OSERR, "Unable to create hast/%s device", 80679561Siedowse res->hr_provname); 80779561Siedowse } 80879561Siedowse pjdlog_debug(1, 80979561Siedowse "Device hast/%s already exists, we will try to take it over.", 81079561Siedowse res->hr_provname); 81179561Siedowse /* 81279561Siedowse * If we received EEXIST, we assume that the process who created the 81379561Siedowse * provider died and didn't clean up. In that case we will start from 81479561Siedowse * where he left of. 81579561Siedowse */ 816178110Sjeff bzero(&ggiocancel, sizeof(ggiocancel)); 81779561Siedowse ggiocancel.gctl_version = G_GATE_VERSION; 81879561Siedowse ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 81979561Siedowse snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 82079561Siedowse res->hr_provname); 82179561Siedowse if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 82279561Siedowse pjdlog_info("Device hast/%s recovered.", res->hr_provname); 82379561Siedowse res->hr_ggateunit = ggiocancel.gctl_unit; 82479561Siedowse return; 82579561Siedowse } 82679561Siedowse primary_exit(EX_OSERR, "Unable to take over hast/%s device", 82779561Siedowse res->hr_provname); 82879561Siedowse} 82979561Siedowse 830178110Sjeffvoid 83179561Siedowsehastd_primary(struct hast_resource *res) 83279561Siedowse{ 83379561Siedowse pthread_t td; 83479561Siedowse pid_t pid; 83579561Siedowse int error, mode, debuglevel; 83679561Siedowse 83779561Siedowse /* 83879561Siedowse * Create communication channel for sending control commands from 83979561Siedowse * parent to child. 84079561Siedowse */ 84179561Siedowse if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 84279561Siedowse /* TODO: There's no need for this to be fatal error. */ 843178110Sjeff KEEP_ERRNO((void)pidfile_remove(pfh)); 84479561Siedowse pjdlog_exit(EX_OSERR, 84579561Siedowse "Unable to create control sockets between parent and child"); 84679561Siedowse } 84779561Siedowse /* 84879561Siedowse * Create communication channel for sending events from child to parent. 84979561Siedowse */ 85079561Siedowse if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 85179561Siedowse /* TODO: There's no need for this to be fatal error. */ 85279561Siedowse KEEP_ERRNO((void)pidfile_remove(pfh)); 85379561Siedowse pjdlog_exit(EX_OSERR, 85479561Siedowse "Unable to create event sockets between child and parent"); 85579561Siedowse } 85679561Siedowse /* 857178110Sjeff * Create communication channel for sending connection requests from 85879561Siedowse * child to parent. 85979561Siedowse */ 86079561Siedowse if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 86179561Siedowse /* TODO: There's no need for this to be fatal error. */ 86279561Siedowse KEEP_ERRNO((void)pidfile_remove(pfh)); 86379561Siedowse pjdlog_exit(EX_OSERR, 86479561Siedowse "Unable to create connection sockets between child and parent"); 86579561Siedowse } 866178110Sjeff 86779561Siedowse pid = fork(); 86879561Siedowse if (pid < 0) { 86979561Siedowse /* TODO: There's no need for this to be fatal error. */ 87079561Siedowse KEEP_ERRNO((void)pidfile_remove(pfh)); 87179561Siedowse pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 87279561Siedowse } 87379561Siedowse 87479561Siedowse if (pid > 0) { 87579561Siedowse /* This is parent. */ 87679561Siedowse /* Declare that we are receiver. */ 87779561Siedowse proto_recv(res->hr_event, NULL, 0); 87879561Siedowse proto_recv(res->hr_conn, NULL, 0); 879178110Sjeff /* Declare that we are sender. */ 88079561Siedowse proto_send(res->hr_ctrl, NULL, 0); 88179561Siedowse res->hr_workerpid = pid; 88279561Siedowse return; 88379561Siedowse } 88479561Siedowse 88579561Siedowse gres = res; 88679561Siedowse mode = pjdlog_mode_get(); 887178110Sjeff debuglevel = pjdlog_debug_get(); 88879561Siedowse 88979561Siedowse /* Declare that we are sender. */ 89079561Siedowse proto_send(res->hr_event, NULL, 0); 89179561Siedowse proto_send(res->hr_conn, NULL, 0); 89279561Siedowse /* Declare that we are receiver. */ 89379561Siedowse proto_recv(res->hr_ctrl, NULL, 0); 89479561Siedowse descriptors_cleanup(res); 89579561Siedowse 896178110Sjeff descriptors_assert(res, mode); 89779561Siedowse 89879561Siedowse pjdlog_init(mode); 89979561Siedowse pjdlog_debug_set(debuglevel); 90079561Siedowse pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 90179561Siedowse setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 90279561Siedowse 90379561Siedowse init_local(res); 90479561Siedowse init_ggate(res); 90579561Siedowse init_environment(res); 90679561Siedowse 90779561Siedowse if (drop_privs(res) != 0) { 908178110Sjeff cleanup(res); 90979561Siedowse exit(EX_CONFIG); 91079561Siedowse } 91179561Siedowse pjdlog_info("Privileges successfully dropped."); 91279561Siedowse 91379561Siedowse /* 91479561Siedowse * Create the guard thread first, so we can handle signals from the 91579561Siedowse * very begining. 91679561Siedowse */ 91779561Siedowse error = pthread_create(&td, NULL, guard_thread, res); 91879561Siedowse PJDLOG_ASSERT(error == 0); 91979561Siedowse /* 92079561Siedowse * Create the control thread before sending any event to the parent, 921178110Sjeff * as we can deadlock when parent sends control request to worker, 92279561Siedowse * but worker has no control thread started yet, so parent waits. 92379561Siedowse * In the meantime worker sends an event to the parent, but parent 92479561Siedowse * is unable to handle the event, because it waits for control 92579561Siedowse * request response. 92679561Siedowse */ 92779561Siedowse error = pthread_create(&td, NULL, ctrl_thread, res); 92879561Siedowse PJDLOG_ASSERT(error == 0); 92979561Siedowse if (real_remote(res)) { 93079561Siedowse error = init_remote(res, NULL, NULL); 93179561Siedowse if (error == 0) { 93279561Siedowse sync_start(); 93379561Siedowse } else if (error == EBUSY) { 93479561Siedowse time_t start = time(NULL); 93579561Siedowse 93679561Siedowse pjdlog_warning("Waiting for remote node to become %s for %ds.", 93779561Siedowse role2str(HAST_ROLE_SECONDARY), 93879561Siedowse res->hr_timeout); 939178110Sjeff for (;;) { 94079561Siedowse sleep(1); 94179561Siedowse error = init_remote(res, NULL, NULL); 94279561Siedowse if (error != EBUSY) 94379561Siedowse break; 94479561Siedowse if (time(NULL) > start + res->hr_timeout) 94579561Siedowse break; 94679561Siedowse } 94779561Siedowse if (error == EBUSY) { 94879561Siedowse pjdlog_warning("Remote node is still %s, starting anyway.", 94979561Siedowse role2str(HAST_ROLE_PRIMARY)); 95079561Siedowse } 95179561Siedowse } 95279561Siedowse } 95379561Siedowse error = pthread_create(&td, NULL, ggate_recv_thread, res); 95479561Siedowse PJDLOG_ASSERT(error == 0); 95579561Siedowse error = pthread_create(&td, NULL, local_send_thread, res); 95679561Siedowse PJDLOG_ASSERT(error == 0); 95779561Siedowse error = pthread_create(&td, NULL, remote_send_thread, res); 95879561Siedowse PJDLOG_ASSERT(error == 0); 95979561Siedowse error = pthread_create(&td, NULL, remote_recv_thread, res); 960178110Sjeff PJDLOG_ASSERT(error == 0); 96179561Siedowse error = pthread_create(&td, NULL, ggate_send_thread, res); 96279561Siedowse PJDLOG_ASSERT(error == 0); 96379561Siedowse fullystarted = true; 96479561Siedowse (void)sync_thread(res); 96579561Siedowse} 96679561Siedowse 96779561Siedowsestatic void 96879561Siedowsereqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 96979561Siedowse{ 97079561Siedowse char msg[1024]; 97179561Siedowse va_list ap; 97279561Siedowse int len; 97379561Siedowse 97480456Siedowse va_start(ap, fmt); 97580456Siedowse len = vsnprintf(msg, sizeof(msg), fmt, ap); 97680456Siedowse va_end(ap); 97780456Siedowse if ((size_t)len < sizeof(msg)) { 97880456Siedowse switch (ggio->gctl_cmd) { 97980456Siedowse case BIO_READ: 98080456Siedowse (void)snprintf(msg + len, sizeof(msg) - len, 98179561Siedowse "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 98279561Siedowse (uintmax_t)ggio->gctl_length); 98380456Siedowse break; 98479561Siedowse case BIO_DELETE: 98579561Siedowse (void)snprintf(msg + len, sizeof(msg) - len, 98679561Siedowse "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 98779561Siedowse (uintmax_t)ggio->gctl_length); 98879561Siedowse break; 98979561Siedowse case BIO_FLUSH: 99079561Siedowse (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 99179561Siedowse break; 99279561Siedowse case BIO_WRITE: 99379561Siedowse (void)snprintf(msg + len, sizeof(msg) - len, 99479561Siedowse "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 99579561Siedowse (uintmax_t)ggio->gctl_length); 99679561Siedowse break; 99779561Siedowse default: 99879561Siedowse (void)snprintf(msg + len, sizeof(msg) - len, 99992098Siedowse "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 100079561Siedowse break; 100179561Siedowse } 100279561Siedowse } 100392098Siedowse pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 100492098Siedowse} 1005178110Sjeff 100679561Siedowsestatic void 100779561Siedowseremote_close(struct hast_resource *res, int ncomp) 100879561Siedowse{ 100979561Siedowse 101079561Siedowse rw_wlock(&hio_remote_lock[ncomp]); 101179561Siedowse /* 101279561Siedowse * A race is possible between dropping rlock and acquiring wlock - 101379561Siedowse * another thread can close connection in-between. 101492807Sdwmalone */ 101592807Sdwmalone if (!ISCONNECTED(res, ncomp)) { 101692807Sdwmalone PJDLOG_ASSERT(res->hr_remotein == NULL); 1017107868Siedowse PJDLOG_ASSERT(res->hr_remoteout == NULL); 1018107868Siedowse rw_unlock(&hio_remote_lock[ncomp]); 101992807Sdwmalone return; 102092807Sdwmalone } 102192807Sdwmalone 102292807Sdwmalone PJDLOG_ASSERT(res->hr_remotein != NULL); 1023133837Sdwmalone PJDLOG_ASSERT(res->hr_remoteout != NULL); 102492807Sdwmalone 102579561Siedowse pjdlog_debug(2, "Closing incoming connection to %s.", 102679561Siedowse res->hr_remoteaddr); 102779561Siedowse proto_close(res->hr_remotein); 102879561Siedowse res->hr_remotein = NULL; 102979561Siedowse pjdlog_debug(2, "Closing outgoing connection to %s.", 103079561Siedowse res->hr_remoteaddr); 103179561Siedowse proto_close(res->hr_remoteout); 1032178110Sjeff res->hr_remoteout = NULL; 103379561Siedowse 103479561Siedowse rw_unlock(&hio_remote_lock[ncomp]); 103579561Siedowse 103679561Siedowse pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 103779561Siedowse 103879561Siedowse /* 103979561Siedowse * Stop synchronization if in-progress. 104079561Siedowse */ 104179561Siedowse sync_stop(); 104279561Siedowse 104379561Siedowse event_send(res, EVENT_DISCONNECT); 104492098Siedowse} 104579561Siedowse 104692098Siedowse/* 104779561Siedowse * Thread receives ggate I/O requests from the kernel and passes them to 104879561Siedowse * appropriate threads: 104979561Siedowse * WRITE - always goes to both local_send and remote_send threads 105079561Siedowse * READ (when the block is up-to-date on local component) - 105179561Siedowse * only local_send thread 105279561Siedowse * READ (when the block isn't up-to-date on local component) - 105392098Siedowse * only remote_send thread 105479561Siedowse * DELETE - always goes to both local_send and remote_send threads 105579561Siedowse * FLUSH - always goes to both local_send and remote_send threads 105679561Siedowse */ 105779561Siedowsestatic void * 105879561Siedowseggate_recv_thread(void *arg) 105979561Siedowse{ 106079561Siedowse struct hast_resource *res = arg; 106179561Siedowse struct g_gate_ctl_io *ggio; 106279561Siedowse struct hio *hio; 106379561Siedowse unsigned int ii, ncomp, ncomps; 106479561Siedowse int error; 106579561Siedowse 106679561Siedowse ncomps = HAST_NCOMPONENTS; 106779561Siedowse 106879561Siedowse for (;;) { 106979561Siedowse pjdlog_debug(2, "ggate_recv: Taking free request."); 107079561Siedowse QUEUE_TAKE2(hio, free); 107179561Siedowse pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 107279561Siedowse ggio = &hio->hio_ggio; 107379561Siedowse ggio->gctl_unit = res->hr_ggateunit; 107479561Siedowse ggio->gctl_length = MAXPHYS; 107579561Siedowse ggio->gctl_error = 0; 1076178110Sjeff pjdlog_debug(2, 107779561Siedowse "ggate_recv: (%p) Waiting for request from the kernel.", 107879561Siedowse hio); 107979561Siedowse if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 108079561Siedowse if (sigexit_received) 108179561Siedowse pthread_exit(NULL); 108279561Siedowse primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 108379561Siedowse } 108479561Siedowse error = ggio->gctl_error; 108579561Siedowse switch (error) { 108679561Siedowse case 0: 108779561Siedowse break; 108879561Siedowse case ECANCELED: 108979561Siedowse /* Exit gracefully. */ 109079561Siedowse if (!sigexit_received) { 109179561Siedowse pjdlog_debug(2, 109279561Siedowse "ggate_recv: (%p) Received cancel from the kernel.", 109379561Siedowse hio); 109479561Siedowse pjdlog_info("Received cancel from the kernel, exiting."); 109579561Siedowse } 109679561Siedowse pthread_exit(NULL); 109779561Siedowse case ENOMEM: 109879561Siedowse /* 109979561Siedowse * Buffer too small? Impossible, we allocate MAXPHYS 1100178110Sjeff * bytes - request can't be bigger than that. 110179561Siedowse */ 110279561Siedowse /* FALLTHROUGH */ 110379561Siedowse case ENXIO: 110479561Siedowse default: 110579561Siedowse primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 110679561Siedowse strerror(error)); 110779561Siedowse } 110879561Siedowse for (ii = 0; ii < ncomps; ii++) 110992807Sdwmalone hio->hio_errors[ii] = EINVAL; 111092807Sdwmalone reqlog(LOG_DEBUG, 2, ggio, 111179561Siedowse "ggate_recv: (%p) Request received from the kernel: ", 111279561Siedowse hio); 111392807Sdwmalone /* 111479561Siedowse * Inform all components about new write request. 111579561Siedowse * For read request prefer local component unless the given 111679561Siedowse * range is out-of-date, then use remote component. 111779561Siedowse */ 111879561Siedowse switch (ggio->gctl_cmd) { 111979561Siedowse case BIO_READ: 112079561Siedowse res->hr_stat_read++; 112179561Siedowse pjdlog_debug(2, 112279561Siedowse "ggate_recv: (%p) Moving request to the send queue.", 112379561Siedowse hio); 112479561Siedowse refcount_init(&hio->hio_countdown, 1); 112579561Siedowse mtx_lock(&metadata_lock); 112679561Siedowse if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 112779561Siedowse res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 112879561Siedowse /* 112979561Siedowse * This range is up-to-date on local component, 113079561Siedowse * so handle request locally. 113179561Siedowse */ 113279561Siedowse /* Local component is 0 for now. */ 113379561Siedowse ncomp = 0; 113479561Siedowse } else /* if (res->hr_syncsrc == 113579561Siedowse HAST_SYNCSRC_SECONDARY) */ { 113679561Siedowse PJDLOG_ASSERT(res->hr_syncsrc == 113779561Siedowse HAST_SYNCSRC_SECONDARY); 113879561Siedowse /* 113979561Siedowse * This range is out-of-date on local component, 114079561Siedowse * so send request to the remote node. 114179561Siedowse */ 114279561Siedowse /* Remote component is 1 for now. */ 114379561Siedowse ncomp = 1; 114479561Siedowse } 114579561Siedowse mtx_unlock(&metadata_lock); 114679561Siedowse QUEUE_INSERT1(hio, send, ncomp); 114779561Siedowse break; 114879561Siedowse case BIO_WRITE: 114979561Siedowse res->hr_stat_write++; 115079561Siedowse if (res->hr_resuid == 0) { 115179561Siedowse /* 115279561Siedowse * This is first write, initialize localcnt and 115379561Siedowse * resuid. 1154125854Sdwmalone */ 115579561Siedowse res->hr_primary_localcnt = 1; 115679561Siedowse (void)init_resuid(res); 115779561Siedowse } 115879561Siedowse for (;;) { 115979561Siedowse mtx_lock(&range_lock); 116079561Siedowse if (rangelock_islocked(range_sync, 116179561Siedowse ggio->gctl_offset, ggio->gctl_length)) { 116279561Siedowse pjdlog_debug(2, 116379561Siedowse "regular: Range offset=%jd length=%zu locked.", 1164125854Sdwmalone (intmax_t)ggio->gctl_offset, 1165178110Sjeff (size_t)ggio->gctl_length); 116679561Siedowse range_regular_wait = true; 1167178110Sjeff cv_wait(&range_regular_cond, &range_lock); 1168178110Sjeff range_regular_wait = false; 1169125854Sdwmalone mtx_unlock(&range_lock); 117079561Siedowse continue; 117179561Siedowse } 1172178110Sjeff if (rangelock_add(range_regular, 1173178110Sjeff ggio->gctl_offset, ggio->gctl_length) < 0) { 1174178110Sjeff mtx_unlock(&range_lock); 1175178110Sjeff pjdlog_debug(2, 1176183080Sjhb "regular: Range offset=%jd length=%zu is already locked, waiting.", 1177178110Sjeff (intmax_t)ggio->gctl_offset, 1178178110Sjeff (size_t)ggio->gctl_length); 1179178110Sjeff sleep(1); 118079561Siedowse continue; 118179561Siedowse } 118279561Siedowse mtx_unlock(&range_lock); 118379561Siedowse break; 118479561Siedowse } 118579561Siedowse mtx_lock(&res->hr_amp_lock); 118679561Siedowse if (activemap_write_start(res->hr_amp, 118779561Siedowse ggio->gctl_offset, ggio->gctl_length)) { 118879561Siedowse res->hr_stat_activemap_update++; 118979561Siedowse (void)hast_activemap_flush(res); 1190178110Sjeff } 1191178110Sjeff mtx_unlock(&res->hr_amp_lock); 119279561Siedowse /* FALLTHROUGH */ 119379561Siedowse case BIO_DELETE: 1194178110Sjeff case BIO_FLUSH: 1195125854Sdwmalone switch (ggio->gctl_cmd) { 119679561Siedowse case BIO_DELETE: 1197125854Sdwmalone res->hr_stat_delete++; 1198184205Sdes break; 1199184205Sdes case BIO_FLUSH: 120079561Siedowse res->hr_stat_flush++; 120179561Siedowse break; 1202125854Sdwmalone } 120379561Siedowse pjdlog_debug(2, 1204178110Sjeff "ggate_recv: (%p) Moving request to the send queues.", 120579561Siedowse hio); 1206125854Sdwmalone refcount_init(&hio->hio_countdown, ncomps); 120779561Siedowse for (ii = 0; ii < ncomps; ii++) 120879561Siedowse QUEUE_INSERT1(hio, send, ii); 120979561Siedowse break; 121079561Siedowse } 121199101Siedowse } 121279561Siedowse /* NOTREACHED */ 121379561Siedowse return (NULL); 121496874Siedowse} 121592768Sjeff 121693818Sjhb/* 121779561Siedowse * Thread reads from or writes to local component. 121879561Siedowse * If local read fails, it redirects it to remote_send thread. 121979561Siedowse */ 122099101Siedowsestatic void * 122199101Siedowselocal_send_thread(void *arg) 122299101Siedowse{ 122399101Siedowse struct hast_resource *res = arg; 122499101Siedowse struct g_gate_ctl_io *ggio; 122599101Siedowse struct hio *hio; 122699101Siedowse unsigned int ncomp, rncomp; 122779561Siedowse ssize_t ret; 122879561Siedowse 1229 /* Local component is 0 for now. */ 1230 ncomp = 0; 1231 /* Remote component is 1 for now. */ 1232 rncomp = 1; 1233 1234 for (;;) { 1235 pjdlog_debug(2, "local_send: Taking request."); 1236 QUEUE_TAKE1(hio, send, ncomp, 0); 1237 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1238 ggio = &hio->hio_ggio; 1239 switch (ggio->gctl_cmd) { 1240 case BIO_READ: 1241 ret = pread(res->hr_localfd, ggio->gctl_data, 1242 ggio->gctl_length, 1243 ggio->gctl_offset + res->hr_localoff); 1244 if (ret == ggio->gctl_length) 1245 hio->hio_errors[ncomp] = 0; 1246 else if (!ISSYNCREQ(hio)) { 1247 /* 1248 * If READ failed, try to read from remote node. 1249 */ 1250 if (ret < 0) { 1251 reqlog(LOG_WARNING, 0, ggio, 1252 "Local request failed (%s), trying remote node. ", 1253 strerror(errno)); 1254 } else if (ret != ggio->gctl_length) { 1255 reqlog(LOG_WARNING, 0, ggio, 1256 "Local request failed (%zd != %jd), trying remote node. ", 1257 ret, (intmax_t)ggio->gctl_length); 1258 } 1259 QUEUE_INSERT1(hio, send, rncomp); 1260 continue; 1261 } 1262 break; 1263 case BIO_WRITE: 1264 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1265 ggio->gctl_length, 1266 ggio->gctl_offset + res->hr_localoff); 1267 if (ret < 0) { 1268 hio->hio_errors[ncomp] = errno; 1269 reqlog(LOG_WARNING, 0, ggio, 1270 "Local request failed (%s): ", 1271 strerror(errno)); 1272 } else if (ret != ggio->gctl_length) { 1273 hio->hio_errors[ncomp] = EIO; 1274 reqlog(LOG_WARNING, 0, ggio, 1275 "Local request failed (%zd != %jd): ", 1276 ret, (intmax_t)ggio->gctl_length); 1277 } else { 1278 hio->hio_errors[ncomp] = 0; 1279 } 1280 break; 1281 case BIO_DELETE: 1282 ret = g_delete(res->hr_localfd, 1283 ggio->gctl_offset + res->hr_localoff, 1284 ggio->gctl_length); 1285 if (ret < 0) { 1286 hio->hio_errors[ncomp] = errno; 1287 reqlog(LOG_WARNING, 0, ggio, 1288 "Local request failed (%s): ", 1289 strerror(errno)); 1290 } else { 1291 hio->hio_errors[ncomp] = 0; 1292 } 1293 break; 1294 case BIO_FLUSH: 1295 ret = g_flush(res->hr_localfd); 1296 if (ret < 0) { 1297 hio->hio_errors[ncomp] = errno; 1298 reqlog(LOG_WARNING, 0, ggio, 1299 "Local request failed (%s): ", 1300 strerror(errno)); 1301 } else { 1302 hio->hio_errors[ncomp] = 0; 1303 } 1304 break; 1305 } 1306 if (refcount_release(&hio->hio_countdown)) { 1307 if (ISSYNCREQ(hio)) { 1308 mtx_lock(&sync_lock); 1309 SYNCREQDONE(hio); 1310 mtx_unlock(&sync_lock); 1311 cv_signal(&sync_cond); 1312 } else { 1313 pjdlog_debug(2, 1314 "local_send: (%p) Moving request to the done queue.", 1315 hio); 1316 QUEUE_INSERT2(hio, done); 1317 } 1318 } 1319 } 1320 /* NOTREACHED */ 1321 return (NULL); 1322} 1323 1324static void 1325keepalive_send(struct hast_resource *res, unsigned int ncomp) 1326{ 1327 struct nv *nv; 1328 1329 rw_rlock(&hio_remote_lock[ncomp]); 1330 1331 if (!ISCONNECTED(res, ncomp)) { 1332 rw_unlock(&hio_remote_lock[ncomp]); 1333 return; 1334 } 1335 1336 PJDLOG_ASSERT(res->hr_remotein != NULL); 1337 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1338 1339 nv = nv_alloc(); 1340 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1341 if (nv_error(nv) != 0) { 1342 rw_unlock(&hio_remote_lock[ncomp]); 1343 nv_free(nv); 1344 pjdlog_debug(1, 1345 "keepalive_send: Unable to prepare header to send."); 1346 return; 1347 } 1348 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1349 rw_unlock(&hio_remote_lock[ncomp]); 1350 pjdlog_common(LOG_DEBUG, 1, errno, 1351 "keepalive_send: Unable to send request"); 1352 nv_free(nv); 1353 remote_close(res, ncomp); 1354 return; 1355 } 1356 1357 rw_unlock(&hio_remote_lock[ncomp]); 1358 nv_free(nv); 1359 pjdlog_debug(2, "keepalive_send: Request sent."); 1360} 1361 1362/* 1363 * Thread sends request to secondary node. 1364 */ 1365static void * 1366remote_send_thread(void *arg) 1367{ 1368 struct hast_resource *res = arg; 1369 struct g_gate_ctl_io *ggio; 1370 time_t lastcheck, now; 1371 struct hio *hio; 1372 struct nv *nv; 1373 unsigned int ncomp; 1374 bool wakeup; 1375 uint64_t offset, length; 1376 uint8_t cmd; 1377 void *data; 1378 1379 /* Remote component is 1 for now. */ 1380 ncomp = 1; 1381 lastcheck = time(NULL); 1382 1383 for (;;) { 1384 pjdlog_debug(2, "remote_send: Taking request."); 1385 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1386 if (hio == NULL) { 1387 now = time(NULL); 1388 if (lastcheck + HAST_KEEPALIVE <= now) { 1389 keepalive_send(res, ncomp); 1390 lastcheck = now; 1391 } 1392 continue; 1393 } 1394 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1395 ggio = &hio->hio_ggio; 1396 switch (ggio->gctl_cmd) { 1397 case BIO_READ: 1398 cmd = HIO_READ; 1399 data = NULL; 1400 offset = ggio->gctl_offset; 1401 length = ggio->gctl_length; 1402 break; 1403 case BIO_WRITE: 1404 cmd = HIO_WRITE; 1405 data = ggio->gctl_data; 1406 offset = ggio->gctl_offset; 1407 length = ggio->gctl_length; 1408 break; 1409 case BIO_DELETE: 1410 cmd = HIO_DELETE; 1411 data = NULL; 1412 offset = ggio->gctl_offset; 1413 length = ggio->gctl_length; 1414 break; 1415 case BIO_FLUSH: 1416 cmd = HIO_FLUSH; 1417 data = NULL; 1418 offset = 0; 1419 length = 0; 1420 break; 1421 default: 1422 PJDLOG_ASSERT(!"invalid condition"); 1423 abort(); 1424 } 1425 nv = nv_alloc(); 1426 nv_add_uint8(nv, cmd, "cmd"); 1427 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1428 nv_add_uint64(nv, offset, "offset"); 1429 nv_add_uint64(nv, length, "length"); 1430 if (nv_error(nv) != 0) { 1431 hio->hio_errors[ncomp] = nv_error(nv); 1432 pjdlog_debug(2, 1433 "remote_send: (%p) Unable to prepare header to send.", 1434 hio); 1435 reqlog(LOG_ERR, 0, ggio, 1436 "Unable to prepare header to send (%s): ", 1437 strerror(nv_error(nv))); 1438 /* Move failed request immediately to the done queue. */ 1439 goto done_queue; 1440 } 1441 pjdlog_debug(2, 1442 "remote_send: (%p) Moving request to the recv queue.", 1443 hio); 1444 /* 1445 * Protect connection from disappearing. 1446 */ 1447 rw_rlock(&hio_remote_lock[ncomp]); 1448 if (!ISCONNECTED(res, ncomp)) { 1449 rw_unlock(&hio_remote_lock[ncomp]); 1450 hio->hio_errors[ncomp] = ENOTCONN; 1451 goto done_queue; 1452 } 1453 /* 1454 * Move the request to recv queue before sending it, because 1455 * in different order we can get reply before we move request 1456 * to recv queue. 1457 */ 1458 mtx_lock(&hio_recv_list_lock[ncomp]); 1459 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1460 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1461 mtx_unlock(&hio_recv_list_lock[ncomp]); 1462 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1463 data != NULL ? length : 0) < 0) { 1464 hio->hio_errors[ncomp] = errno; 1465 rw_unlock(&hio_remote_lock[ncomp]); 1466 pjdlog_debug(2, 1467 "remote_send: (%p) Unable to send request.", hio); 1468 reqlog(LOG_ERR, 0, ggio, 1469 "Unable to send request (%s): ", 1470 strerror(hio->hio_errors[ncomp])); 1471 remote_close(res, ncomp); 1472 /* 1473 * Take request back from the receive queue and move 1474 * it immediately to the done queue. 1475 */ 1476 mtx_lock(&hio_recv_list_lock[ncomp]); 1477 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1478 mtx_unlock(&hio_recv_list_lock[ncomp]); 1479 goto done_queue; 1480 } 1481 rw_unlock(&hio_remote_lock[ncomp]); 1482 nv_free(nv); 1483 if (wakeup) 1484 cv_signal(&hio_recv_list_cond[ncomp]); 1485 continue; 1486done_queue: 1487 nv_free(nv); 1488 if (ISSYNCREQ(hio)) { 1489 if (!refcount_release(&hio->hio_countdown)) 1490 continue; 1491 mtx_lock(&sync_lock); 1492 SYNCREQDONE(hio); 1493 mtx_unlock(&sync_lock); 1494 cv_signal(&sync_cond); 1495 continue; 1496 } 1497 if (ggio->gctl_cmd == BIO_WRITE) { 1498 mtx_lock(&res->hr_amp_lock); 1499 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1500 ggio->gctl_length)) { 1501 (void)hast_activemap_flush(res); 1502 } 1503 mtx_unlock(&res->hr_amp_lock); 1504 } 1505 if (!refcount_release(&hio->hio_countdown)) 1506 continue; 1507 pjdlog_debug(2, 1508 "remote_send: (%p) Moving request to the done queue.", 1509 hio); 1510 QUEUE_INSERT2(hio, done); 1511 } 1512 /* NOTREACHED */ 1513 return (NULL); 1514} 1515 1516/* 1517 * Thread receives answer from secondary node and passes it to ggate_send 1518 * thread. 1519 */ 1520static void * 1521remote_recv_thread(void *arg) 1522{ 1523 struct hast_resource *res = arg; 1524 struct g_gate_ctl_io *ggio; 1525 struct hio *hio; 1526 struct nv *nv; 1527 unsigned int ncomp; 1528 uint64_t seq; 1529 int error; 1530 1531 /* Remote component is 1 for now. */ 1532 ncomp = 1; 1533 1534 for (;;) { 1535 /* Wait until there is anything to receive. */ 1536 mtx_lock(&hio_recv_list_lock[ncomp]); 1537 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1538 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1539 cv_wait(&hio_recv_list_cond[ncomp], 1540 &hio_recv_list_lock[ncomp]); 1541 } 1542 mtx_unlock(&hio_recv_list_lock[ncomp]); 1543 rw_rlock(&hio_remote_lock[ncomp]); 1544 if (!ISCONNECTED(res, ncomp)) { 1545 rw_unlock(&hio_remote_lock[ncomp]); 1546 /* 1547 * Connection is dead, so move all pending requests to 1548 * the done queue (one-by-one). 1549 */ 1550 mtx_lock(&hio_recv_list_lock[ncomp]); 1551 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1552 PJDLOG_ASSERT(hio != NULL); 1553 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1554 hio_next[ncomp]); 1555 mtx_unlock(&hio_recv_list_lock[ncomp]); 1556 goto done_queue; 1557 } 1558 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1559 pjdlog_errno(LOG_ERR, 1560 "Unable to receive reply header"); 1561 rw_unlock(&hio_remote_lock[ncomp]); 1562 remote_close(res, ncomp); 1563 continue; 1564 } 1565 rw_unlock(&hio_remote_lock[ncomp]); 1566 seq = nv_get_uint64(nv, "seq"); 1567 if (seq == 0) { 1568 pjdlog_error("Header contains no 'seq' field."); 1569 nv_free(nv); 1570 continue; 1571 } 1572 mtx_lock(&hio_recv_list_lock[ncomp]); 1573 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1574 if (hio->hio_ggio.gctl_seq == seq) { 1575 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1576 hio_next[ncomp]); 1577 break; 1578 } 1579 } 1580 mtx_unlock(&hio_recv_list_lock[ncomp]); 1581 if (hio == NULL) { 1582 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1583 (uintmax_t)seq); 1584 nv_free(nv); 1585 continue; 1586 } 1587 error = nv_get_int16(nv, "error"); 1588 if (error != 0) { 1589 /* Request failed on remote side. */ 1590 hio->hio_errors[ncomp] = error; 1591 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1592 "Remote request failed (%s): ", strerror(error)); 1593 nv_free(nv); 1594 goto done_queue; 1595 } 1596 ggio = &hio->hio_ggio; 1597 switch (ggio->gctl_cmd) { 1598 case BIO_READ: 1599 rw_rlock(&hio_remote_lock[ncomp]); 1600 if (!ISCONNECTED(res, ncomp)) { 1601 rw_unlock(&hio_remote_lock[ncomp]); 1602 nv_free(nv); 1603 goto done_queue; 1604 } 1605 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1606 ggio->gctl_data, ggio->gctl_length) < 0) { 1607 hio->hio_errors[ncomp] = errno; 1608 pjdlog_errno(LOG_ERR, 1609 "Unable to receive reply data"); 1610 rw_unlock(&hio_remote_lock[ncomp]); 1611 nv_free(nv); 1612 remote_close(res, ncomp); 1613 goto done_queue; 1614 } 1615 rw_unlock(&hio_remote_lock[ncomp]); 1616 break; 1617 case BIO_WRITE: 1618 case BIO_DELETE: 1619 case BIO_FLUSH: 1620 break; 1621 default: 1622 PJDLOG_ASSERT(!"invalid condition"); 1623 abort(); 1624 } 1625 hio->hio_errors[ncomp] = 0; 1626 nv_free(nv); 1627done_queue: 1628 if (refcount_release(&hio->hio_countdown)) { 1629 if (ISSYNCREQ(hio)) { 1630 mtx_lock(&sync_lock); 1631 SYNCREQDONE(hio); 1632 mtx_unlock(&sync_lock); 1633 cv_signal(&sync_cond); 1634 } else { 1635 pjdlog_debug(2, 1636 "remote_recv: (%p) Moving request to the done queue.", 1637 hio); 1638 QUEUE_INSERT2(hio, done); 1639 } 1640 } 1641 } 1642 /* NOTREACHED */ 1643 return (NULL); 1644} 1645 1646/* 1647 * Thread sends answer to the kernel. 1648 */ 1649static void * 1650ggate_send_thread(void *arg) 1651{ 1652 struct hast_resource *res = arg; 1653 struct g_gate_ctl_io *ggio; 1654 struct hio *hio; 1655 unsigned int ii, ncomp, ncomps; 1656 1657 ncomps = HAST_NCOMPONENTS; 1658 1659 for (;;) { 1660 pjdlog_debug(2, "ggate_send: Taking request."); 1661 QUEUE_TAKE2(hio, done); 1662 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1663 ggio = &hio->hio_ggio; 1664 for (ii = 0; ii < ncomps; ii++) { 1665 if (hio->hio_errors[ii] == 0) { 1666 /* 1667 * One successful request is enough to declare 1668 * success. 1669 */ 1670 ggio->gctl_error = 0; 1671 break; 1672 } 1673 } 1674 if (ii == ncomps) { 1675 /* 1676 * None of the requests were successful. 1677 * Use the error from local component except the 1678 * case when we did only remote request. 1679 */ 1680 if (ggio->gctl_cmd == BIO_READ && 1681 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1682 ggio->gctl_error = hio->hio_errors[1]; 1683 else 1684 ggio->gctl_error = hio->hio_errors[0]; 1685 } 1686 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1687 mtx_lock(&res->hr_amp_lock); 1688 activemap_write_complete(res->hr_amp, 1689 ggio->gctl_offset, ggio->gctl_length); 1690 mtx_unlock(&res->hr_amp_lock); 1691 } 1692 if (ggio->gctl_cmd == BIO_WRITE) { 1693 /* 1694 * Unlock range we locked. 1695 */ 1696 mtx_lock(&range_lock); 1697 rangelock_del(range_regular, ggio->gctl_offset, 1698 ggio->gctl_length); 1699 if (range_sync_wait) 1700 cv_signal(&range_sync_cond); 1701 mtx_unlock(&range_lock); 1702 /* 1703 * Bump local count if this is first write after 1704 * connection failure with remote node. 1705 */ 1706 ncomp = 1; 1707 rw_rlock(&hio_remote_lock[ncomp]); 1708 if (!ISCONNECTED(res, ncomp)) { 1709 mtx_lock(&metadata_lock); 1710 if (res->hr_primary_localcnt == 1711 res->hr_secondary_remotecnt) { 1712 res->hr_primary_localcnt++; 1713 pjdlog_debug(1, 1714 "Increasing localcnt to %ju.", 1715 (uintmax_t)res->hr_primary_localcnt); 1716 (void)metadata_write(res); 1717 } 1718 mtx_unlock(&metadata_lock); 1719 } 1720 rw_unlock(&hio_remote_lock[ncomp]); 1721 } 1722 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1723 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1724 pjdlog_debug(2, 1725 "ggate_send: (%p) Moving request to the free queue.", hio); 1726 QUEUE_INSERT2(hio, free); 1727 } 1728 /* NOTREACHED */ 1729 return (NULL); 1730} 1731 1732/* 1733 * Thread synchronize local and remote components. 1734 */ 1735static void * 1736sync_thread(void *arg __unused) 1737{ 1738 struct hast_resource *res = arg; 1739 struct hio *hio; 1740 struct g_gate_ctl_io *ggio; 1741 struct timeval tstart, tend, tdiff; 1742 unsigned int ii, ncomp, ncomps; 1743 off_t offset, length, synced; 1744 bool dorewind; 1745 int syncext; 1746 1747 ncomps = HAST_NCOMPONENTS; 1748 dorewind = true; 1749 synced = 0; 1750 offset = -1; 1751 1752 for (;;) { 1753 mtx_lock(&sync_lock); 1754 if (offset >= 0 && !sync_inprogress) { 1755 gettimeofday(&tend, NULL); 1756 timersub(&tend, &tstart, &tdiff); 1757 pjdlog_info("Synchronization interrupted after %#.0T. " 1758 "%NB synchronized so far.", &tdiff, 1759 (intmax_t)synced); 1760 event_send(res, EVENT_SYNCINTR); 1761 } 1762 while (!sync_inprogress) { 1763 dorewind = true; 1764 synced = 0; 1765 cv_wait(&sync_cond, &sync_lock); 1766 } 1767 mtx_unlock(&sync_lock); 1768 /* 1769 * Obtain offset at which we should synchronize. 1770 * Rewind synchronization if needed. 1771 */ 1772 mtx_lock(&res->hr_amp_lock); 1773 if (dorewind) 1774 activemap_sync_rewind(res->hr_amp); 1775 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1776 if (syncext != -1) { 1777 /* 1778 * We synchronized entire syncext extent, we can mark 1779 * it as clean now. 1780 */ 1781 if (activemap_extent_complete(res->hr_amp, syncext)) 1782 (void)hast_activemap_flush(res); 1783 } 1784 mtx_unlock(&res->hr_amp_lock); 1785 if (dorewind) { 1786 dorewind = false; 1787 if (offset < 0) 1788 pjdlog_info("Nodes are in sync."); 1789 else { 1790 pjdlog_info("Synchronization started. %NB to go.", 1791 (intmax_t)(res->hr_extentsize * 1792 activemap_ndirty(res->hr_amp))); 1793 event_send(res, EVENT_SYNCSTART); 1794 gettimeofday(&tstart, NULL); 1795 } 1796 } 1797 if (offset < 0) { 1798 sync_stop(); 1799 pjdlog_debug(1, "Nothing to synchronize."); 1800 /* 1801 * Synchronization complete, make both localcnt and 1802 * remotecnt equal. 1803 */ 1804 ncomp = 1; 1805 rw_rlock(&hio_remote_lock[ncomp]); 1806 if (ISCONNECTED(res, ncomp)) { 1807 if (synced > 0) { 1808 int64_t bps; 1809 1810 gettimeofday(&tend, NULL); 1811 timersub(&tend, &tstart, &tdiff); 1812 bps = (int64_t)((double)synced / 1813 ((double)tdiff.tv_sec + 1814 (double)tdiff.tv_usec / 1000000)); 1815 pjdlog_info("Synchronization complete. " 1816 "%NB synchronized in %#.0lT (%NB/sec).", 1817 (intmax_t)synced, &tdiff, 1818 (intmax_t)bps); 1819 event_send(res, EVENT_SYNCDONE); 1820 } 1821 mtx_lock(&metadata_lock); 1822 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1823 res->hr_primary_localcnt = 1824 res->hr_secondary_remotecnt; 1825 res->hr_primary_remotecnt = 1826 res->hr_secondary_localcnt; 1827 pjdlog_debug(1, 1828 "Setting localcnt to %ju and remotecnt to %ju.", 1829 (uintmax_t)res->hr_primary_localcnt, 1830 (uintmax_t)res->hr_primary_remotecnt); 1831 (void)metadata_write(res); 1832 mtx_unlock(&metadata_lock); 1833 } 1834 rw_unlock(&hio_remote_lock[ncomp]); 1835 continue; 1836 } 1837 pjdlog_debug(2, "sync: Taking free request."); 1838 QUEUE_TAKE2(hio, free); 1839 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1840 /* 1841 * Lock the range we are going to synchronize. We don't want 1842 * race where someone writes between our read and write. 1843 */ 1844 for (;;) { 1845 mtx_lock(&range_lock); 1846 if (rangelock_islocked(range_regular, offset, length)) { 1847 pjdlog_debug(2, 1848 "sync: Range offset=%jd length=%jd locked.", 1849 (intmax_t)offset, (intmax_t)length); 1850 range_sync_wait = true; 1851 cv_wait(&range_sync_cond, &range_lock); 1852 range_sync_wait = false; 1853 mtx_unlock(&range_lock); 1854 continue; 1855 } 1856 if (rangelock_add(range_sync, offset, length) < 0) { 1857 mtx_unlock(&range_lock); 1858 pjdlog_debug(2, 1859 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1860 (intmax_t)offset, (intmax_t)length); 1861 sleep(1); 1862 continue; 1863 } 1864 mtx_unlock(&range_lock); 1865 break; 1866 } 1867 /* 1868 * First read the data from synchronization source. 1869 */ 1870 SYNCREQ(hio); 1871 ggio = &hio->hio_ggio; 1872 ggio->gctl_cmd = BIO_READ; 1873 ggio->gctl_offset = offset; 1874 ggio->gctl_length = length; 1875 ggio->gctl_error = 0; 1876 for (ii = 0; ii < ncomps; ii++) 1877 hio->hio_errors[ii] = EINVAL; 1878 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1879 hio); 1880 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1881 hio); 1882 mtx_lock(&metadata_lock); 1883 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1884 /* 1885 * This range is up-to-date on local component, 1886 * so handle request locally. 1887 */ 1888 /* Local component is 0 for now. */ 1889 ncomp = 0; 1890 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1891 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1892 /* 1893 * This range is out-of-date on local component, 1894 * so send request to the remote node. 1895 */ 1896 /* Remote component is 1 for now. */ 1897 ncomp = 1; 1898 } 1899 mtx_unlock(&metadata_lock); 1900 refcount_init(&hio->hio_countdown, 1); 1901 QUEUE_INSERT1(hio, send, ncomp); 1902 1903 /* 1904 * Let's wait for READ to finish. 1905 */ 1906 mtx_lock(&sync_lock); 1907 while (!ISSYNCREQDONE(hio)) 1908 cv_wait(&sync_cond, &sync_lock); 1909 mtx_unlock(&sync_lock); 1910 1911 if (hio->hio_errors[ncomp] != 0) { 1912 pjdlog_error("Unable to read synchronization data: %s.", 1913 strerror(hio->hio_errors[ncomp])); 1914 goto free_queue; 1915 } 1916 1917 /* 1918 * We read the data from synchronization source, now write it 1919 * to synchronization target. 1920 */ 1921 SYNCREQ(hio); 1922 ggio->gctl_cmd = BIO_WRITE; 1923 for (ii = 0; ii < ncomps; ii++) 1924 hio->hio_errors[ii] = EINVAL; 1925 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1926 hio); 1927 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1928 hio); 1929 mtx_lock(&metadata_lock); 1930 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1931 /* 1932 * This range is up-to-date on local component, 1933 * so we update remote component. 1934 */ 1935 /* Remote component is 1 for now. */ 1936 ncomp = 1; 1937 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1938 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1939 /* 1940 * This range is out-of-date on local component, 1941 * so we update it. 1942 */ 1943 /* Local component is 0 for now. */ 1944 ncomp = 0; 1945 } 1946 mtx_unlock(&metadata_lock); 1947 1948 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1949 hio); 1950 refcount_init(&hio->hio_countdown, 1); 1951 QUEUE_INSERT1(hio, send, ncomp); 1952 1953 /* 1954 * Let's wait for WRITE to finish. 1955 */ 1956 mtx_lock(&sync_lock); 1957 while (!ISSYNCREQDONE(hio)) 1958 cv_wait(&sync_cond, &sync_lock); 1959 mtx_unlock(&sync_lock); 1960 1961 if (hio->hio_errors[ncomp] != 0) { 1962 pjdlog_error("Unable to write synchronization data: %s.", 1963 strerror(hio->hio_errors[ncomp])); 1964 goto free_queue; 1965 } 1966 1967 synced += length; 1968free_queue: 1969 mtx_lock(&range_lock); 1970 rangelock_del(range_sync, offset, length); 1971 if (range_regular_wait) 1972 cv_signal(&range_regular_cond); 1973 mtx_unlock(&range_lock); 1974 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1975 hio); 1976 QUEUE_INSERT2(hio, free); 1977 } 1978 /* NOTREACHED */ 1979 return (NULL); 1980} 1981 1982void 1983primary_config_reload(struct hast_resource *res, struct nv *nv) 1984{ 1985 unsigned int ii, ncomps; 1986 int modified, vint; 1987 const char *vstr; 1988 1989 pjdlog_info("Reloading configuration..."); 1990 1991 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1992 PJDLOG_ASSERT(gres == res); 1993 nv_assert(nv, "remoteaddr"); 1994 nv_assert(nv, "sourceaddr"); 1995 nv_assert(nv, "replication"); 1996 nv_assert(nv, "checksum"); 1997 nv_assert(nv, "compression"); 1998 nv_assert(nv, "timeout"); 1999 nv_assert(nv, "exec"); 2000 2001 ncomps = HAST_NCOMPONENTS; 2002 2003#define MODIFIED_REMOTEADDR 0x01 2004#define MODIFIED_SOURCEADDR 0x02 2005#define MODIFIED_REPLICATION 0x04 2006#define MODIFIED_CHECKSUM 0x08 2007#define MODIFIED_COMPRESSION 0x10 2008#define MODIFIED_TIMEOUT 0x20 2009#define MODIFIED_EXEC 0x40 2010 modified = 0; 2011 2012 vstr = nv_get_string(nv, "remoteaddr"); 2013 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2014 /* 2015 * Don't copy res->hr_remoteaddr to gres just yet. 2016 * We want remote_close() to log disconnect from the old 2017 * addresses, not from the new ones. 2018 */ 2019 modified |= MODIFIED_REMOTEADDR; 2020 } 2021 vstr = nv_get_string(nv, "sourceaddr"); 2022 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2023 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2024 modified |= MODIFIED_SOURCEADDR; 2025 } 2026 vint = nv_get_int32(nv, "replication"); 2027 if (gres->hr_replication != vint) { 2028 gres->hr_replication = vint; 2029 modified |= MODIFIED_REPLICATION; 2030 } 2031 vint = nv_get_int32(nv, "checksum"); 2032 if (gres->hr_checksum != vint) { 2033 gres->hr_checksum = vint; 2034 modified |= MODIFIED_CHECKSUM; 2035 } 2036 vint = nv_get_int32(nv, "compression"); 2037 if (gres->hr_compression != vint) { 2038 gres->hr_compression = vint; 2039 modified |= MODIFIED_COMPRESSION; 2040 } 2041 vint = nv_get_int32(nv, "timeout"); 2042 if (gres->hr_timeout != vint) { 2043 gres->hr_timeout = vint; 2044 modified |= MODIFIED_TIMEOUT; 2045 } 2046 vstr = nv_get_string(nv, "exec"); 2047 if (strcmp(gres->hr_exec, vstr) != 0) { 2048 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2049 modified |= MODIFIED_EXEC; 2050 } 2051 2052 /* 2053 * Change timeout for connected sockets. 2054 * Don't bother if we need to reconnect. 2055 */ 2056 if ((modified & MODIFIED_TIMEOUT) != 0 && 2057 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2058 MODIFIED_REPLICATION)) == 0) { 2059 for (ii = 0; ii < ncomps; ii++) { 2060 if (!ISREMOTE(ii)) 2061 continue; 2062 rw_rlock(&hio_remote_lock[ii]); 2063 if (!ISCONNECTED(gres, ii)) { 2064 rw_unlock(&hio_remote_lock[ii]); 2065 continue; 2066 } 2067 rw_unlock(&hio_remote_lock[ii]); 2068 if (proto_timeout(gres->hr_remotein, 2069 gres->hr_timeout) < 0) { 2070 pjdlog_errno(LOG_WARNING, 2071 "Unable to set connection timeout"); 2072 } 2073 if (proto_timeout(gres->hr_remoteout, 2074 gres->hr_timeout) < 0) { 2075 pjdlog_errno(LOG_WARNING, 2076 "Unable to set connection timeout"); 2077 } 2078 } 2079 } 2080 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2081 MODIFIED_REPLICATION)) != 0) { 2082 for (ii = 0; ii < ncomps; ii++) { 2083 if (!ISREMOTE(ii)) 2084 continue; 2085 remote_close(gres, ii); 2086 } 2087 if (modified & MODIFIED_REMOTEADDR) { 2088 vstr = nv_get_string(nv, "remoteaddr"); 2089 strlcpy(gres->hr_remoteaddr, vstr, 2090 sizeof(gres->hr_remoteaddr)); 2091 } 2092 } 2093#undef MODIFIED_REMOTEADDR 2094#undef MODIFIED_SOURCEADDR 2095#undef MODIFIED_REPLICATION 2096#undef MODIFIED_CHECKSUM 2097#undef MODIFIED_COMPRESSION 2098#undef MODIFIED_TIMEOUT 2099#undef MODIFIED_EXEC 2100 2101 pjdlog_info("Configuration reloaded successfully."); 2102} 2103 2104static void 2105guard_one(struct hast_resource *res, unsigned int ncomp) 2106{ 2107 struct proto_conn *in, *out; 2108 2109 if (!ISREMOTE(ncomp)) 2110 return; 2111 2112 rw_rlock(&hio_remote_lock[ncomp]); 2113 2114 if (!real_remote(res)) { 2115 rw_unlock(&hio_remote_lock[ncomp]); 2116 return; 2117 } 2118 2119 if (ISCONNECTED(res, ncomp)) { 2120 PJDLOG_ASSERT(res->hr_remotein != NULL); 2121 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2122 rw_unlock(&hio_remote_lock[ncomp]); 2123 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2124 res->hr_remoteaddr); 2125 return; 2126 } 2127 2128 PJDLOG_ASSERT(res->hr_remotein == NULL); 2129 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2130 /* 2131 * Upgrade the lock. It doesn't have to be atomic as no other thread 2132 * can change connection status from disconnected to connected. 2133 */ 2134 rw_unlock(&hio_remote_lock[ncomp]); 2135 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2136 res->hr_remoteaddr); 2137 in = out = NULL; 2138 if (init_remote(res, &in, &out) == 0) { 2139 rw_wlock(&hio_remote_lock[ncomp]); 2140 PJDLOG_ASSERT(res->hr_remotein == NULL); 2141 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2142 PJDLOG_ASSERT(in != NULL && out != NULL); 2143 res->hr_remotein = in; 2144 res->hr_remoteout = out; 2145 rw_unlock(&hio_remote_lock[ncomp]); 2146 pjdlog_info("Successfully reconnected to %s.", 2147 res->hr_remoteaddr); 2148 sync_start(); 2149 } else { 2150 /* Both connections should be NULL. */ 2151 PJDLOG_ASSERT(res->hr_remotein == NULL); 2152 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2153 PJDLOG_ASSERT(in == NULL && out == NULL); 2154 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2155 res->hr_remoteaddr); 2156 } 2157} 2158 2159/* 2160 * Thread guards remote connections and reconnects when needed, handles 2161 * signals, etc. 2162 */ 2163static void * 2164guard_thread(void *arg) 2165{ 2166 struct hast_resource *res = arg; 2167 unsigned int ii, ncomps; 2168 struct timespec timeout; 2169 time_t lastcheck, now; 2170 sigset_t mask; 2171 int signo; 2172 2173 ncomps = HAST_NCOMPONENTS; 2174 lastcheck = time(NULL); 2175 2176 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2177 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2178 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2179 2180 timeout.tv_sec = HAST_KEEPALIVE; 2181 timeout.tv_nsec = 0; 2182 signo = -1; 2183 2184 for (;;) { 2185 switch (signo) { 2186 case SIGINT: 2187 case SIGTERM: 2188 sigexit_received = true; 2189 primary_exitx(EX_OK, 2190 "Termination signal received, exiting."); 2191 break; 2192 default: 2193 break; 2194 } 2195 2196 /* 2197 * Don't check connections until we fully started, 2198 * as we may still be looping, waiting for remote node 2199 * to switch from primary to secondary. 2200 */ 2201 if (fullystarted) { 2202 pjdlog_debug(2, "remote_guard: Checking connections."); 2203 now = time(NULL); 2204 if (lastcheck + HAST_KEEPALIVE <= now) { 2205 for (ii = 0; ii < ncomps; ii++) 2206 guard_one(res, ii); 2207 lastcheck = now; 2208 } 2209 } 2210 signo = sigtimedwait(&mask, NULL, &timeout); 2211 } 2212 /* NOTREACHED */ 2213 return (NULL); 2214} 2215