primary.c revision 256027
1251212Spfg/*- 2219639Smm * Copyright (c) 2009 The FreeBSD Foundation 3219639Smm * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4219639Smm * All rights reserved. 5219639Smm * 6219639Smm * This software was developed by Pawel Jakub Dawidek under sponsorship from 7219639Smm * the FreeBSD Foundation. 8219639Smm * 9219639Smm * Redistribution and use in source and binary forms, with or without 10219639Smm * modification, are permitted provided that the following conditions 11219639Smm * are met: 12219639Smm * 1. Redistributions of source code must retain the above copyright 13219639Smm * notice, this list of conditions and the following disclaimer. 14219639Smm * 2. Redistributions in binary form must reproduce the above copyright 15219639Smm * notice, this list of conditions and the following disclaimer in the 16219639Smm * documentation and/or other materials provided with the distribution. 17219639Smm * 18219639Smm * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19219639Smm * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20219639Smm * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21219639Smm * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22219639Smm * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23219639Smm * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24219639Smm * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25219639Smm * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26219639Smm * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27219639Smm * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28219639Smm * SUCH DAMAGE. 29219639Smm */ 30219639Smm 31219639Smm#include <sys/cdefs.h> 32219639Smm__FBSDID("$FreeBSD: stable/9/sbin/hastd/primary.c 256027 2013-10-03 18:52:04Z trociny $"); 33251212Spfg 34251212Spfg#include <sys/types.h> 35251212Spfg#include <sys/time.h> 36251212Spfg#include <sys/bio.h> 37251212Spfg#include <sys/disk.h> 38237021Spfg#include <sys/stat.h> 39219639Smm 40237021Spfg#include <geom/gate/g_gate.h> 41237021Spfg 42237021Spfg#include <err.h> 43237021Spfg#include <errno.h> 44237021Spfg#include <fcntl.h> 45237021Spfg#include <libgeom.h> 46237021Spfg#include <pthread.h> 47237021Spfg#include <signal.h> 48237021Spfg#include <stdint.h> 49237021Spfg#include <stdio.h> 50237021Spfg#include <string.h> 51237021Spfg#include <sysexits.h> 52237021Spfg#include <unistd.h> 53237021Spfg 54237021Spfg#include <activemap.h> 55237021Spfg#include <nv.h> 56237021Spfg#include <rangelock.h> 57237021Spfg 58237021Spfg#include "control.h" 59237021Spfg#include "event.h" 60237021Spfg#include "hast.h" 61237021Spfg#include "hast_proto.h" 62237021Spfg#include "hastd.h" 63237021Spfg#include "hooks.h" 64237021Spfg#include "metadata.h" 65237021Spfg#include "proto.h" 66237021Spfg#include "pjdlog.h" 67237021Spfg#include "refcnt.h" 68237021Spfg#include "subr.h" 69237021Spfg#include "synch.h" 70237021Spfg 71237021Spfg/* The is only one remote component for now. */ 72237021Spfg#define ISREMOTE(no) ((no) == 1) 73237021Spfg 74237021Spfgstruct hio { 75237021Spfg /* 76237021Spfg * Number of components we are still waiting for. 77237021Spfg * When this field goes to 0, we can send the request back to the 78237021Spfg * kernel. Each component has to decrease this counter by one 79237021Spfg * even on failure. 80237021Spfg */ 81237021Spfg unsigned int hio_countdown; 82237021Spfg /* 83237021Spfg * Each component has a place to store its own error. 84237021Spfg * Once the request is handled by all components we can decide if the 85237021Spfg * request overall is successful or not. 86237021Spfg */ 87237021Spfg int *hio_errors; 88237021Spfg /* 89237021Spfg * Structure used to communicate with GEOM Gate class. 90237021Spfg */ 91237021Spfg struct g_gate_ctl_io hio_ggio; 92237021Spfg /* 93237021Spfg * Request was already confirmed to GEOM Gate. 94237021Spfg */ 95237021Spfg bool hio_done; 96237021Spfg /* 97237021Spfg * Remember replication from the time the request was initiated, 98237021Spfg * so we won't get confused when replication changes on reload. 99237021Spfg */ 100237021Spfg int hio_replication; 101237021Spfg TAILQ_ENTRY(hio) *hio_next; 102237021Spfg}; 103237021Spfg#define hio_free_next hio_next[0] 104237021Spfg#define hio_done_next hio_next[0] 105237021Spfg 106237021Spfg/* 107237021Spfg * Free list holds unused structures. When free list is empty, we have to wait 108237021Spfg * until some in-progress requests are freed. 109237021Spfg */ 110237021Spfgstatic TAILQ_HEAD(, hio) hio_free_list; 111237021Spfgstatic pthread_mutex_t hio_free_list_lock; 112237021Spfgstatic pthread_cond_t hio_free_list_cond; 113237021Spfg/* 114237021Spfg * There is one send list for every component. One requests is placed on all 115237021Spfg * send lists - each component gets the same request, but each component is 116237021Spfg * responsible for managing his own send list. 117237021Spfg */ 118237021Spfgstatic TAILQ_HEAD(, hio) *hio_send_list; 119237021Spfgstatic pthread_mutex_t *hio_send_list_lock; 120237021Spfgstatic pthread_cond_t *hio_send_list_cond; 121237021Spfg/* 122237021Spfg * There is one recv list for every component, although local components don't 123237021Spfg * use recv lists as local requests are done synchronously. 124237021Spfg */ 125237021Spfgstatic TAILQ_HEAD(, hio) *hio_recv_list; 126237021Spfgstatic pthread_mutex_t *hio_recv_list_lock; 127237021Spfgstatic pthread_cond_t *hio_recv_list_cond; 128237021Spfg/* 129237021Spfg * Request is placed on done list by the slowest component (the one that 130237021Spfg * decreased hio_countdown from 1 to 0). 131237021Spfg */ 132237021Spfgstatic TAILQ_HEAD(, hio) hio_done_list; 133237021Spfgstatic pthread_mutex_t hio_done_list_lock; 134237021Spfgstatic pthread_cond_t hio_done_list_cond; 135237021Spfg/* 136237021Spfg * Structure below are for interaction with sync thread. 137237021Spfg */ 138237021Spfgstatic bool sync_inprogress; 139237021Spfgstatic pthread_mutex_t sync_lock; 140237021Spfgstatic pthread_cond_t sync_cond; 141237021Spfg/* 142237021Spfg * The lock below allows to synchornize access to remote connections. 143237021Spfg */ 144237021Spfgstatic pthread_rwlock_t *hio_remote_lock; 145237021Spfg 146237021Spfg/* 147237021Spfg * Lock to synchronize metadata updates. Also synchronize access to 148237021Spfg * hr_primary_localcnt and hr_primary_remotecnt fields. 149237021Spfg */ 150237021Spfgstatic pthread_mutex_t metadata_lock; 151237021Spfg 152237021Spfg/* 153237021Spfg * Maximum number of outstanding I/O requests. 154237021Spfg */ 155237021Spfg#define HAST_HIO_MAX 256 156237021Spfg/* 157237021Spfg * Number of components. At this point there are only two components: local 158237021Spfg * and remote, but in the future it might be possible to use multiple local 159237021Spfg * and remote components. 160237021Spfg */ 161237021Spfg#define HAST_NCOMPONENTS 2 162237021Spfg 163237021Spfg#define ISCONNECTED(res, no) \ 164237021Spfg ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165237021Spfg 166237021Spfg#define QUEUE_INSERT1(hio, name, ncomp) do { \ 167237021Spfg bool _wakeup; \ 168237021Spfg \ 169237021Spfg mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170237021Spfg _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171237021Spfg TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172237021Spfg hio_next[(ncomp)]); \ 173237021Spfg mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174237021Spfg if (_wakeup) \ 175237021Spfg cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ 176237021Spfg} while (0) 177237021Spfg#define QUEUE_INSERT2(hio, name) do { \ 178237021Spfg bool _wakeup; \ 179237021Spfg \ 180237021Spfg mtx_lock(&hio_##name##_list_lock); \ 181237021Spfg _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182237021Spfg TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183237021Spfg mtx_unlock(&hio_##name##_list_lock); \ 184237021Spfg if (_wakeup) \ 185237021Spfg cv_broadcast(&hio_##name##_list_cond); \ 186237021Spfg} while (0) 187237021Spfg#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188237021Spfg bool _last; \ 189237021Spfg \ 190237021Spfg mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191237021Spfg _last = false; \ 192237021Spfg while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193237021Spfg cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194237021Spfg &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195237021Spfg if ((timeout) != 0) \ 196237021Spfg _last = true; \ 197237021Spfg } \ 198237021Spfg if (hio != NULL) { \ 199237021Spfg TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200237021Spfg hio_next[(ncomp)]); \ 201237021Spfg } \ 202237021Spfg mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203237021Spfg} while (0) 204237021Spfg#define QUEUE_TAKE2(hio, name) do { \ 205237021Spfg mtx_lock(&hio_##name##_list_lock); \ 206237021Spfg while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207237021Spfg cv_wait(&hio_##name##_list_cond, \ 208237021Spfg &hio_##name##_list_lock); \ 209237021Spfg } \ 210237021Spfg TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211237021Spfg mtx_unlock(&hio_##name##_list_lock); \ 212237021Spfg} while (0) 213237021Spfg 214237021Spfg#define SYNCREQ(hio) do { \ 215237021Spfg (hio)->hio_ggio.gctl_unit = -1; \ 216237021Spfg (hio)->hio_ggio.gctl_seq = 1; \ 217237021Spfg} while (0) 218237021Spfg#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219237021Spfg#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220237021Spfg#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221237021Spfg 222237021Spfgstatic struct hast_resource *gres; 223237021Spfg 224237021Spfgstatic pthread_mutex_t range_lock; 225237021Spfgstatic struct rangelocks *range_regular; 226237021Spfgstatic bool range_regular_wait; 227237021Spfgstatic pthread_cond_t range_regular_cond; 228237021Spfgstatic struct rangelocks *range_sync; 229static bool range_sync_wait; 230static pthread_cond_t range_sync_cond; 231static bool fullystarted; 232 233static void *ggate_recv_thread(void *arg); 234static void *local_send_thread(void *arg); 235static void *remote_send_thread(void *arg); 236static void *remote_recv_thread(void *arg); 237static void *ggate_send_thread(void *arg); 238static void *sync_thread(void *arg); 239static void *guard_thread(void *arg); 240 241static void 242cleanup(struct hast_resource *res) 243{ 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267} 268 269static __dead2 void 270primary_exit(int exitcode, const char *fmt, ...) 271{ 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280} 281 282static __dead2 void 283primary_exitx(int exitcode, const char *fmt, ...) 284{ 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292} 293 294/* Expects res->hr_amp locked, returns unlocked. */ 295static int 296hast_activemap_flush(struct hast_resource *res) 297{ 298 const unsigned char *buf; 299 size_t size; 300 int ret; 301 302 mtx_lock(&res->hr_amp_diskmap_lock); 303 buf = activemap_bitmap(res->hr_amp, &size); 304 mtx_unlock(&res->hr_amp_lock); 305 PJDLOG_ASSERT(buf != NULL); 306 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 307 ret = 0; 308 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 309 (ssize_t)size) { 310 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 311 res->hr_stat_activemap_write_error++; 312 ret = -1; 313 } 314 if (ret == 0 && res->hr_metaflush == 1 && 315 g_flush(res->hr_localfd) == -1) { 316 if (errno == EOPNOTSUPP) { 317 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 318 res->hr_localpath); 319 res->hr_metaflush = 0; 320 } else { 321 pjdlog_errno(LOG_ERR, 322 "Unable to flush disk cache on activemap update"); 323 res->hr_stat_activemap_flush_error++; 324 ret = -1; 325 } 326 } 327 mtx_unlock(&res->hr_amp_diskmap_lock); 328 return (ret); 329} 330 331static bool 332real_remote(const struct hast_resource *res) 333{ 334 335 return (strcmp(res->hr_remoteaddr, "none") != 0); 336} 337 338static void 339init_environment(struct hast_resource *res __unused) 340{ 341 struct hio *hio; 342 unsigned int ii, ncomps; 343 344 /* 345 * In the future it might be per-resource value. 346 */ 347 ncomps = HAST_NCOMPONENTS; 348 349 /* 350 * Allocate memory needed by lists. 351 */ 352 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 353 if (hio_send_list == NULL) { 354 primary_exitx(EX_TEMPFAIL, 355 "Unable to allocate %zu bytes of memory for send lists.", 356 sizeof(hio_send_list[0]) * ncomps); 357 } 358 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 359 if (hio_send_list_lock == NULL) { 360 primary_exitx(EX_TEMPFAIL, 361 "Unable to allocate %zu bytes of memory for send list locks.", 362 sizeof(hio_send_list_lock[0]) * ncomps); 363 } 364 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 365 if (hio_send_list_cond == NULL) { 366 primary_exitx(EX_TEMPFAIL, 367 "Unable to allocate %zu bytes of memory for send list condition variables.", 368 sizeof(hio_send_list_cond[0]) * ncomps); 369 } 370 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 371 if (hio_recv_list == NULL) { 372 primary_exitx(EX_TEMPFAIL, 373 "Unable to allocate %zu bytes of memory for recv lists.", 374 sizeof(hio_recv_list[0]) * ncomps); 375 } 376 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 377 if (hio_recv_list_lock == NULL) { 378 primary_exitx(EX_TEMPFAIL, 379 "Unable to allocate %zu bytes of memory for recv list locks.", 380 sizeof(hio_recv_list_lock[0]) * ncomps); 381 } 382 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 383 if (hio_recv_list_cond == NULL) { 384 primary_exitx(EX_TEMPFAIL, 385 "Unable to allocate %zu bytes of memory for recv list condition variables.", 386 sizeof(hio_recv_list_cond[0]) * ncomps); 387 } 388 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 389 if (hio_remote_lock == NULL) { 390 primary_exitx(EX_TEMPFAIL, 391 "Unable to allocate %zu bytes of memory for remote connections locks.", 392 sizeof(hio_remote_lock[0]) * ncomps); 393 } 394 395 /* 396 * Initialize lists, their locks and theirs condition variables. 397 */ 398 TAILQ_INIT(&hio_free_list); 399 mtx_init(&hio_free_list_lock); 400 cv_init(&hio_free_list_cond); 401 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 402 TAILQ_INIT(&hio_send_list[ii]); 403 mtx_init(&hio_send_list_lock[ii]); 404 cv_init(&hio_send_list_cond[ii]); 405 TAILQ_INIT(&hio_recv_list[ii]); 406 mtx_init(&hio_recv_list_lock[ii]); 407 cv_init(&hio_recv_list_cond[ii]); 408 rw_init(&hio_remote_lock[ii]); 409 } 410 TAILQ_INIT(&hio_done_list); 411 mtx_init(&hio_done_list_lock); 412 cv_init(&hio_done_list_cond); 413 mtx_init(&metadata_lock); 414 415 /* 416 * Allocate requests pool and initialize requests. 417 */ 418 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 419 hio = malloc(sizeof(*hio)); 420 if (hio == NULL) { 421 primary_exitx(EX_TEMPFAIL, 422 "Unable to allocate %zu bytes of memory for hio request.", 423 sizeof(*hio)); 424 } 425 hio->hio_countdown = 0; 426 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 427 if (hio->hio_errors == NULL) { 428 primary_exitx(EX_TEMPFAIL, 429 "Unable allocate %zu bytes of memory for hio errors.", 430 sizeof(hio->hio_errors[0]) * ncomps); 431 } 432 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 433 if (hio->hio_next == NULL) { 434 primary_exitx(EX_TEMPFAIL, 435 "Unable allocate %zu bytes of memory for hio_next field.", 436 sizeof(hio->hio_next[0]) * ncomps); 437 } 438 hio->hio_ggio.gctl_version = G_GATE_VERSION; 439 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 440 if (hio->hio_ggio.gctl_data == NULL) { 441 primary_exitx(EX_TEMPFAIL, 442 "Unable to allocate %zu bytes of memory for gctl_data.", 443 MAXPHYS); 444 } 445 hio->hio_ggio.gctl_length = MAXPHYS; 446 hio->hio_ggio.gctl_error = 0; 447 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 448 } 449} 450 451static bool 452init_resuid(struct hast_resource *res) 453{ 454 455 mtx_lock(&metadata_lock); 456 if (res->hr_resuid != 0) { 457 mtx_unlock(&metadata_lock); 458 return (false); 459 } else { 460 /* Initialize unique resource identifier. */ 461 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 462 mtx_unlock(&metadata_lock); 463 if (metadata_write(res) == -1) 464 exit(EX_NOINPUT); 465 return (true); 466 } 467} 468 469static void 470init_local(struct hast_resource *res) 471{ 472 unsigned char *buf; 473 size_t mapsize; 474 475 if (metadata_read(res, true) == -1) 476 exit(EX_NOINPUT); 477 mtx_init(&res->hr_amp_lock); 478 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 479 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 480 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 481 } 482 mtx_init(&range_lock); 483 cv_init(&range_regular_cond); 484 if (rangelock_init(&range_regular) == -1) 485 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 486 cv_init(&range_sync_cond); 487 if (rangelock_init(&range_sync) == -1) 488 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 489 mapsize = activemap_ondisk_size(res->hr_amp); 490 buf = calloc(1, mapsize); 491 if (buf == NULL) { 492 primary_exitx(EX_TEMPFAIL, 493 "Unable to allocate buffer for activemap."); 494 } 495 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 496 (ssize_t)mapsize) { 497 primary_exit(EX_NOINPUT, "Unable to read activemap"); 498 } 499 activemap_copyin(res->hr_amp, buf, mapsize); 500 free(buf); 501 if (res->hr_resuid != 0) 502 return; 503 /* 504 * We're using provider for the first time. Initialize local and remote 505 * counters. We don't initialize resuid here, as we want to do it just 506 * in time. The reason for this is that we want to inform secondary 507 * that there were no writes yet, so there is no need to synchronize 508 * anything. 509 */ 510 res->hr_primary_localcnt = 0; 511 res->hr_primary_remotecnt = 0; 512 if (metadata_write(res) == -1) 513 exit(EX_NOINPUT); 514} 515 516static int 517primary_connect(struct hast_resource *res, struct proto_conn **connp) 518{ 519 struct proto_conn *conn; 520 int16_t val; 521 522 val = 1; 523 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 524 primary_exit(EX_TEMPFAIL, 525 "Unable to send connection request to parent"); 526 } 527 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 528 primary_exit(EX_TEMPFAIL, 529 "Unable to receive reply to connection request from parent"); 530 } 531 if (val != 0) { 532 errno = val; 533 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534 res->hr_remoteaddr); 535 return (-1); 536 } 537 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 538 primary_exit(EX_TEMPFAIL, 539 "Unable to receive connection from parent"); 540 } 541 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 542 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 543 res->hr_remoteaddr); 544 proto_close(conn); 545 return (-1); 546 } 547 /* Error in setting timeout is not critical, but why should it fail? */ 548 if (proto_timeout(conn, res->hr_timeout) == -1) 549 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 550 551 *connp = conn; 552 553 return (0); 554} 555 556/* 557 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 558 */ 559static void 560enable_direct_reads(struct hast_resource *res) 561{ 562 struct g_gate_ctl_modify ggiomodify; 563 564 bzero(&ggiomodify, sizeof(ggiomodify)); 565 ggiomodify.gctl_version = G_GATE_VERSION; 566 ggiomodify.gctl_unit = res->hr_ggateunit; 567 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 568 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 569 sizeof(ggiomodify.gctl_readprov)); 570 ggiomodify.gctl_readoffset = res->hr_localoff; 571 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 572 pjdlog_debug(1, "Direct reads enabled."); 573 else 574 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 575} 576 577static int 578init_remote(struct hast_resource *res, struct proto_conn **inp, 579 struct proto_conn **outp) 580{ 581 struct proto_conn *in, *out; 582 struct nv *nvout, *nvin; 583 const unsigned char *token; 584 unsigned char *map; 585 const char *errmsg; 586 int32_t extentsize; 587 int64_t datasize; 588 uint32_t mapsize; 589 uint8_t version; 590 size_t size; 591 int error; 592 593 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 594 PJDLOG_ASSERT(real_remote(res)); 595 596 in = out = NULL; 597 errmsg = NULL; 598 599 if (primary_connect(res, &out) == -1) 600 return (ECONNREFUSED); 601 602 error = ECONNABORTED; 603 604 /* 605 * First handshake step. 606 * Setup outgoing connection with remote node. 607 */ 608 nvout = nv_alloc(); 609 nv_add_string(nvout, res->hr_name, "resource"); 610 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 611 if (nv_error(nvout) != 0) { 612 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 613 "Unable to allocate header for connection with %s", 614 res->hr_remoteaddr); 615 nv_free(nvout); 616 goto close; 617 } 618 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 619 pjdlog_errno(LOG_WARNING, 620 "Unable to send handshake header to %s", 621 res->hr_remoteaddr); 622 nv_free(nvout); 623 goto close; 624 } 625 nv_free(nvout); 626 if (hast_proto_recv_hdr(out, &nvin) == -1) { 627 pjdlog_errno(LOG_WARNING, 628 "Unable to receive handshake header from %s", 629 res->hr_remoteaddr); 630 goto close; 631 } 632 errmsg = nv_get_string(nvin, "errmsg"); 633 if (errmsg != NULL) { 634 pjdlog_warning("%s", errmsg); 635 if (nv_exists(nvin, "wait")) 636 error = EBUSY; 637 nv_free(nvin); 638 goto close; 639 } 640 version = nv_get_uint8(nvin, "version"); 641 if (version == 0) { 642 /* 643 * If no version is sent, it means this is protocol version 1. 644 */ 645 version = 1; 646 } 647 if (version > HAST_PROTO_VERSION) { 648 pjdlog_warning("Invalid version received (%hhu).", version); 649 nv_free(nvin); 650 goto close; 651 } 652 res->hr_version = version; 653 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 654 token = nv_get_uint8_array(nvin, &size, "token"); 655 if (token == NULL) { 656 pjdlog_warning("Handshake header from %s has no 'token' field.", 657 res->hr_remoteaddr); 658 nv_free(nvin); 659 goto close; 660 } 661 if (size != sizeof(res->hr_token)) { 662 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 663 res->hr_remoteaddr, size, sizeof(res->hr_token)); 664 nv_free(nvin); 665 goto close; 666 } 667 bcopy(token, res->hr_token, sizeof(res->hr_token)); 668 nv_free(nvin); 669 670 /* 671 * Second handshake step. 672 * Setup incoming connection with remote node. 673 */ 674 if (primary_connect(res, &in) == -1) 675 goto close; 676 677 nvout = nv_alloc(); 678 nv_add_string(nvout, res->hr_name, "resource"); 679 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 680 "token"); 681 if (res->hr_resuid == 0) { 682 /* 683 * The resuid field was not yet initialized. 684 * Because we do synchronization inside init_resuid(), it is 685 * possible that someone already initialized it, the function 686 * will return false then, but if we successfully initialized 687 * it, we will get true. True means that there were no writes 688 * to this resource yet and we want to inform secondary that 689 * synchronization is not needed by sending "virgin" argument. 690 */ 691 if (init_resuid(res)) 692 nv_add_int8(nvout, 1, "virgin"); 693 } 694 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 695 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 696 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 697 if (nv_error(nvout) != 0) { 698 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 699 "Unable to allocate header for connection with %s", 700 res->hr_remoteaddr); 701 nv_free(nvout); 702 goto close; 703 } 704 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 705 pjdlog_errno(LOG_WARNING, 706 "Unable to send handshake header to %s", 707 res->hr_remoteaddr); 708 nv_free(nvout); 709 goto close; 710 } 711 nv_free(nvout); 712 if (hast_proto_recv_hdr(out, &nvin) == -1) { 713 pjdlog_errno(LOG_WARNING, 714 "Unable to receive handshake header from %s", 715 res->hr_remoteaddr); 716 goto close; 717 } 718 errmsg = nv_get_string(nvin, "errmsg"); 719 if (errmsg != NULL) { 720 pjdlog_warning("%s", errmsg); 721 nv_free(nvin); 722 goto close; 723 } 724 datasize = nv_get_int64(nvin, "datasize"); 725 if (datasize != res->hr_datasize) { 726 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 727 (intmax_t)res->hr_datasize, (intmax_t)datasize); 728 nv_free(nvin); 729 goto close; 730 } 731 extentsize = nv_get_int32(nvin, "extentsize"); 732 if (extentsize != res->hr_extentsize) { 733 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 734 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 735 nv_free(nvin); 736 goto close; 737 } 738 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 739 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 740 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 741 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 742 enable_direct_reads(res); 743 if (nv_exists(nvin, "virgin")) { 744 /* 745 * Secondary was reinitialized, bump localcnt if it is 0 as 746 * only we have the data. 747 */ 748 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 749 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 750 751 if (res->hr_primary_localcnt == 0) { 752 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 753 754 mtx_lock(&metadata_lock); 755 res->hr_primary_localcnt++; 756 pjdlog_debug(1, "Increasing localcnt to %ju.", 757 (uintmax_t)res->hr_primary_localcnt); 758 (void)metadata_write(res); 759 mtx_unlock(&metadata_lock); 760 } 761 } 762 map = NULL; 763 mapsize = nv_get_uint32(nvin, "mapsize"); 764 if (mapsize > 0) { 765 map = malloc(mapsize); 766 if (map == NULL) { 767 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 768 (uintmax_t)mapsize); 769 nv_free(nvin); 770 goto close; 771 } 772 /* 773 * Remote node have some dirty extents on its own, lets 774 * download its activemap. 775 */ 776 if (hast_proto_recv_data(res, out, nvin, map, 777 mapsize) == -1) { 778 pjdlog_errno(LOG_ERR, 779 "Unable to receive remote activemap"); 780 nv_free(nvin); 781 free(map); 782 goto close; 783 } 784 /* 785 * Merge local and remote bitmaps. 786 */ 787 activemap_merge(res->hr_amp, map, mapsize); 788 free(map); 789 /* 790 * Now that we merged bitmaps from both nodes, flush it to the 791 * disk before we start to synchronize. 792 */ 793 mtx_lock(&res->hr_amp_lock); 794 (void)hast_activemap_flush(res); 795 } 796 nv_free(nvin); 797#ifdef notyet 798 /* Setup directions. */ 799 if (proto_send(out, NULL, 0) == -1) 800 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 801 if (proto_recv(in, NULL, 0) == -1) 802 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 803#endif 804 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 805 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 806 res->hr_version < 2) { 807 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 808 res->hr_replication = HAST_REPLICATION_FULLSYNC; 809 } else if (res->hr_replication != res->hr_original_replication) { 810 /* 811 * This is in case hastd disconnected and was upgraded. 812 */ 813 res->hr_replication = res->hr_original_replication; 814 } 815 if (inp != NULL && outp != NULL) { 816 *inp = in; 817 *outp = out; 818 } else { 819 res->hr_remotein = in; 820 res->hr_remoteout = out; 821 } 822 event_send(res, EVENT_CONNECT); 823 return (0); 824close: 825 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 826 event_send(res, EVENT_SPLITBRAIN); 827 proto_close(out); 828 if (in != NULL) 829 proto_close(in); 830 return (error); 831} 832 833static void 834sync_start(void) 835{ 836 837 mtx_lock(&sync_lock); 838 sync_inprogress = true; 839 mtx_unlock(&sync_lock); 840 cv_signal(&sync_cond); 841} 842 843static void 844sync_stop(void) 845{ 846 847 mtx_lock(&sync_lock); 848 if (sync_inprogress) 849 sync_inprogress = false; 850 mtx_unlock(&sync_lock); 851} 852 853static void 854init_ggate(struct hast_resource *res) 855{ 856 struct g_gate_ctl_create ggiocreate; 857 struct g_gate_ctl_cancel ggiocancel; 858 859 /* 860 * We communicate with ggate via /dev/ggctl. Open it. 861 */ 862 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 863 if (res->hr_ggatefd == -1) 864 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 865 /* 866 * Create provider before trying to connect, as connection failure 867 * is not critical, but may take some time. 868 */ 869 bzero(&ggiocreate, sizeof(ggiocreate)); 870 ggiocreate.gctl_version = G_GATE_VERSION; 871 ggiocreate.gctl_mediasize = res->hr_datasize; 872 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 873 ggiocreate.gctl_flags = 0; 874 ggiocreate.gctl_maxcount = 0; 875 ggiocreate.gctl_timeout = 0; 876 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 877 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 878 res->hr_provname); 879 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 880 pjdlog_info("Device hast/%s created.", res->hr_provname); 881 res->hr_ggateunit = ggiocreate.gctl_unit; 882 return; 883 } 884 if (errno != EEXIST) { 885 primary_exit(EX_OSERR, "Unable to create hast/%s device", 886 res->hr_provname); 887 } 888 pjdlog_debug(1, 889 "Device hast/%s already exists, we will try to take it over.", 890 res->hr_provname); 891 /* 892 * If we received EEXIST, we assume that the process who created the 893 * provider died and didn't clean up. In that case we will start from 894 * where he left of. 895 */ 896 bzero(&ggiocancel, sizeof(ggiocancel)); 897 ggiocancel.gctl_version = G_GATE_VERSION; 898 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 899 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 900 res->hr_provname); 901 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 902 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 903 res->hr_ggateunit = ggiocancel.gctl_unit; 904 return; 905 } 906 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 907 res->hr_provname); 908} 909 910void 911hastd_primary(struct hast_resource *res) 912{ 913 pthread_t td; 914 pid_t pid; 915 int error, mode, debuglevel; 916 917 /* 918 * Create communication channel for sending control commands from 919 * parent to child. 920 */ 921 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 922 /* TODO: There's no need for this to be fatal error. */ 923 KEEP_ERRNO((void)pidfile_remove(pfh)); 924 pjdlog_exit(EX_OSERR, 925 "Unable to create control sockets between parent and child"); 926 } 927 /* 928 * Create communication channel for sending events from child to parent. 929 */ 930 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 931 /* TODO: There's no need for this to be fatal error. */ 932 KEEP_ERRNO((void)pidfile_remove(pfh)); 933 pjdlog_exit(EX_OSERR, 934 "Unable to create event sockets between child and parent"); 935 } 936 /* 937 * Create communication channel for sending connection requests from 938 * child to parent. 939 */ 940 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 941 /* TODO: There's no need for this to be fatal error. */ 942 KEEP_ERRNO((void)pidfile_remove(pfh)); 943 pjdlog_exit(EX_OSERR, 944 "Unable to create connection sockets between child and parent"); 945 } 946 947 pid = fork(); 948 if (pid == -1) { 949 /* TODO: There's no need for this to be fatal error. */ 950 KEEP_ERRNO((void)pidfile_remove(pfh)); 951 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 952 } 953 954 if (pid > 0) { 955 /* This is parent. */ 956 /* Declare that we are receiver. */ 957 proto_recv(res->hr_event, NULL, 0); 958 proto_recv(res->hr_conn, NULL, 0); 959 /* Declare that we are sender. */ 960 proto_send(res->hr_ctrl, NULL, 0); 961 res->hr_workerpid = pid; 962 return; 963 } 964 965 gres = res; 966 mode = pjdlog_mode_get(); 967 debuglevel = pjdlog_debug_get(); 968 969 /* Declare that we are sender. */ 970 proto_send(res->hr_event, NULL, 0); 971 proto_send(res->hr_conn, NULL, 0); 972 /* Declare that we are receiver. */ 973 proto_recv(res->hr_ctrl, NULL, 0); 974 descriptors_cleanup(res); 975 976 descriptors_assert(res, mode); 977 978 pjdlog_init(mode); 979 pjdlog_debug_set(debuglevel); 980 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 981 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 982 983 init_local(res); 984 init_ggate(res); 985 init_environment(res); 986 987 if (drop_privs(res) != 0) { 988 cleanup(res); 989 exit(EX_CONFIG); 990 } 991 pjdlog_info("Privileges successfully dropped."); 992 993 /* 994 * Create the guard thread first, so we can handle signals from the 995 * very beginning. 996 */ 997 error = pthread_create(&td, NULL, guard_thread, res); 998 PJDLOG_ASSERT(error == 0); 999 /* 1000 * Create the control thread before sending any event to the parent, 1001 * as we can deadlock when parent sends control request to worker, 1002 * but worker has no control thread started yet, so parent waits. 1003 * In the meantime worker sends an event to the parent, but parent 1004 * is unable to handle the event, because it waits for control 1005 * request response. 1006 */ 1007 error = pthread_create(&td, NULL, ctrl_thread, res); 1008 PJDLOG_ASSERT(error == 0); 1009 if (real_remote(res)) { 1010 error = init_remote(res, NULL, NULL); 1011 if (error == 0) { 1012 sync_start(); 1013 } else if (error == EBUSY) { 1014 time_t start = time(NULL); 1015 1016 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1017 role2str(HAST_ROLE_SECONDARY), 1018 res->hr_timeout); 1019 for (;;) { 1020 sleep(1); 1021 error = init_remote(res, NULL, NULL); 1022 if (error != EBUSY) 1023 break; 1024 if (time(NULL) > start + res->hr_timeout) 1025 break; 1026 } 1027 if (error == EBUSY) { 1028 pjdlog_warning("Remote node is still %s, starting anyway.", 1029 role2str(HAST_ROLE_PRIMARY)); 1030 } 1031 } 1032 } 1033 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1034 PJDLOG_ASSERT(error == 0); 1035 error = pthread_create(&td, NULL, local_send_thread, res); 1036 PJDLOG_ASSERT(error == 0); 1037 error = pthread_create(&td, NULL, remote_send_thread, res); 1038 PJDLOG_ASSERT(error == 0); 1039 error = pthread_create(&td, NULL, remote_recv_thread, res); 1040 PJDLOG_ASSERT(error == 0); 1041 error = pthread_create(&td, NULL, ggate_send_thread, res); 1042 PJDLOG_ASSERT(error == 0); 1043 fullystarted = true; 1044 (void)sync_thread(res); 1045} 1046 1047static void 1048reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1049 const char *fmt, ...) 1050{ 1051 char msg[1024]; 1052 va_list ap; 1053 1054 va_start(ap, fmt); 1055 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1056 va_end(ap); 1057 switch (ggio->gctl_cmd) { 1058 case BIO_READ: 1059 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1060 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1061 break; 1062 case BIO_DELETE: 1063 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1064 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1065 break; 1066 case BIO_FLUSH: 1067 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1068 break; 1069 case BIO_WRITE: 1070 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1071 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1072 break; 1073 default: 1074 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1075 (unsigned int)ggio->gctl_cmd); 1076 break; 1077 } 1078 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1079} 1080 1081static void 1082remote_close(struct hast_resource *res, int ncomp) 1083{ 1084 1085 rw_wlock(&hio_remote_lock[ncomp]); 1086 /* 1087 * Check for a race between dropping rlock and acquiring wlock - 1088 * another thread can close connection in-between. 1089 */ 1090 if (!ISCONNECTED(res, ncomp)) { 1091 PJDLOG_ASSERT(res->hr_remotein == NULL); 1092 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1093 rw_unlock(&hio_remote_lock[ncomp]); 1094 return; 1095 } 1096 1097 PJDLOG_ASSERT(res->hr_remotein != NULL); 1098 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1099 1100 pjdlog_debug(2, "Closing incoming connection to %s.", 1101 res->hr_remoteaddr); 1102 proto_close(res->hr_remotein); 1103 res->hr_remotein = NULL; 1104 pjdlog_debug(2, "Closing outgoing connection to %s.", 1105 res->hr_remoteaddr); 1106 proto_close(res->hr_remoteout); 1107 res->hr_remoteout = NULL; 1108 1109 rw_unlock(&hio_remote_lock[ncomp]); 1110 1111 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1112 1113 /* 1114 * Stop synchronization if in-progress. 1115 */ 1116 sync_stop(); 1117 1118 event_send(res, EVENT_DISCONNECT); 1119} 1120 1121/* 1122 * Acknowledge write completion to the kernel, but don't update activemap yet. 1123 */ 1124static void 1125write_complete(struct hast_resource *res, struct hio *hio) 1126{ 1127 struct g_gate_ctl_io *ggio; 1128 unsigned int ncomp; 1129 1130 PJDLOG_ASSERT(!hio->hio_done); 1131 1132 ggio = &hio->hio_ggio; 1133 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1134 1135 /* 1136 * Bump local count if this is first write after 1137 * connection failure with remote node. 1138 */ 1139 ncomp = 1; 1140 rw_rlock(&hio_remote_lock[ncomp]); 1141 if (!ISCONNECTED(res, ncomp)) { 1142 mtx_lock(&metadata_lock); 1143 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1144 res->hr_primary_localcnt++; 1145 pjdlog_debug(1, "Increasing localcnt to %ju.", 1146 (uintmax_t)res->hr_primary_localcnt); 1147 (void)metadata_write(res); 1148 } 1149 mtx_unlock(&metadata_lock); 1150 } 1151 rw_unlock(&hio_remote_lock[ncomp]); 1152 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1153 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1154 hio->hio_done = true; 1155} 1156 1157/* 1158 * Thread receives ggate I/O requests from the kernel and passes them to 1159 * appropriate threads: 1160 * WRITE - always goes to both local_send and remote_send threads 1161 * READ (when the block is up-to-date on local component) - 1162 * only local_send thread 1163 * READ (when the block isn't up-to-date on local component) - 1164 * only remote_send thread 1165 * DELETE - always goes to both local_send and remote_send threads 1166 * FLUSH - always goes to both local_send and remote_send threads 1167 */ 1168static void * 1169ggate_recv_thread(void *arg) 1170{ 1171 struct hast_resource *res = arg; 1172 struct g_gate_ctl_io *ggio; 1173 struct hio *hio; 1174 unsigned int ii, ncomp, ncomps; 1175 int error; 1176 1177 for (;;) { 1178 pjdlog_debug(2, "ggate_recv: Taking free request."); 1179 QUEUE_TAKE2(hio, free); 1180 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1181 ggio = &hio->hio_ggio; 1182 ggio->gctl_unit = res->hr_ggateunit; 1183 ggio->gctl_length = MAXPHYS; 1184 ggio->gctl_error = 0; 1185 hio->hio_done = false; 1186 hio->hio_replication = res->hr_replication; 1187 pjdlog_debug(2, 1188 "ggate_recv: (%p) Waiting for request from the kernel.", 1189 hio); 1190 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1191 if (sigexit_received) 1192 pthread_exit(NULL); 1193 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1194 } 1195 error = ggio->gctl_error; 1196 switch (error) { 1197 case 0: 1198 break; 1199 case ECANCELED: 1200 /* Exit gracefully. */ 1201 if (!sigexit_received) { 1202 pjdlog_debug(2, 1203 "ggate_recv: (%p) Received cancel from the kernel.", 1204 hio); 1205 pjdlog_info("Received cancel from the kernel, exiting."); 1206 } 1207 pthread_exit(NULL); 1208 case ENOMEM: 1209 /* 1210 * Buffer too small? Impossible, we allocate MAXPHYS 1211 * bytes - request can't be bigger than that. 1212 */ 1213 /* FALLTHROUGH */ 1214 case ENXIO: 1215 default: 1216 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1217 strerror(error)); 1218 } 1219 1220 ncomp = 0; 1221 ncomps = HAST_NCOMPONENTS; 1222 1223 for (ii = 0; ii < ncomps; ii++) 1224 hio->hio_errors[ii] = EINVAL; 1225 reqlog(LOG_DEBUG, 2, ggio, 1226 "ggate_recv: (%p) Request received from the kernel: ", 1227 hio); 1228 1229 /* 1230 * Inform all components about new write request. 1231 * For read request prefer local component unless the given 1232 * range is out-of-date, then use remote component. 1233 */ 1234 switch (ggio->gctl_cmd) { 1235 case BIO_READ: 1236 res->hr_stat_read++; 1237 ncomps = 1; 1238 mtx_lock(&metadata_lock); 1239 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1240 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1241 /* 1242 * This range is up-to-date on local component, 1243 * so handle request locally. 1244 */ 1245 /* Local component is 0 for now. */ 1246 ncomp = 0; 1247 } else /* if (res->hr_syncsrc == 1248 HAST_SYNCSRC_SECONDARY) */ { 1249 PJDLOG_ASSERT(res->hr_syncsrc == 1250 HAST_SYNCSRC_SECONDARY); 1251 /* 1252 * This range is out-of-date on local component, 1253 * so send request to the remote node. 1254 */ 1255 /* Remote component is 1 for now. */ 1256 ncomp = 1; 1257 } 1258 mtx_unlock(&metadata_lock); 1259 break; 1260 case BIO_WRITE: 1261 res->hr_stat_write++; 1262 if (res->hr_resuid == 0 && 1263 res->hr_primary_localcnt == 0) { 1264 /* This is first write. */ 1265 res->hr_primary_localcnt = 1; 1266 } 1267 for (;;) { 1268 mtx_lock(&range_lock); 1269 if (rangelock_islocked(range_sync, 1270 ggio->gctl_offset, ggio->gctl_length)) { 1271 pjdlog_debug(2, 1272 "regular: Range offset=%jd length=%zu locked.", 1273 (intmax_t)ggio->gctl_offset, 1274 (size_t)ggio->gctl_length); 1275 range_regular_wait = true; 1276 cv_wait(&range_regular_cond, &range_lock); 1277 range_regular_wait = false; 1278 mtx_unlock(&range_lock); 1279 continue; 1280 } 1281 if (rangelock_add(range_regular, 1282 ggio->gctl_offset, ggio->gctl_length) == -1) { 1283 mtx_unlock(&range_lock); 1284 pjdlog_debug(2, 1285 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1286 (intmax_t)ggio->gctl_offset, 1287 (size_t)ggio->gctl_length); 1288 sleep(1); 1289 continue; 1290 } 1291 mtx_unlock(&range_lock); 1292 break; 1293 } 1294 mtx_lock(&res->hr_amp_lock); 1295 if (activemap_write_start(res->hr_amp, 1296 ggio->gctl_offset, ggio->gctl_length)) { 1297 res->hr_stat_activemap_update++; 1298 (void)hast_activemap_flush(res); 1299 } else { 1300 mtx_unlock(&res->hr_amp_lock); 1301 } 1302 break; 1303 case BIO_DELETE: 1304 res->hr_stat_delete++; 1305 break; 1306 case BIO_FLUSH: 1307 res->hr_stat_flush++; 1308 break; 1309 } 1310 pjdlog_debug(2, 1311 "ggate_recv: (%p) Moving request to the send queues.", hio); 1312 hio->hio_countdown = ncomps; 1313 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1314 ggio->gctl_cmd == BIO_WRITE) { 1315 /* Each remote request needs two responses in memsync. */ 1316 hio->hio_countdown++; 1317 } 1318 for (ii = ncomp; ii < ncomps; ii++) 1319 QUEUE_INSERT1(hio, send, ii); 1320 } 1321 /* NOTREACHED */ 1322 return (NULL); 1323} 1324 1325/* 1326 * Thread reads from or writes to local component. 1327 * If local read fails, it redirects it to remote_send thread. 1328 */ 1329static void * 1330local_send_thread(void *arg) 1331{ 1332 struct hast_resource *res = arg; 1333 struct g_gate_ctl_io *ggio; 1334 struct hio *hio; 1335 unsigned int ncomp, rncomp; 1336 ssize_t ret; 1337 1338 /* Local component is 0 for now. */ 1339 ncomp = 0; 1340 /* Remote component is 1 for now. */ 1341 rncomp = 1; 1342 1343 for (;;) { 1344 pjdlog_debug(2, "local_send: Taking request."); 1345 QUEUE_TAKE1(hio, send, ncomp, 0); 1346 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1347 ggio = &hio->hio_ggio; 1348 switch (ggio->gctl_cmd) { 1349 case BIO_READ: 1350 ret = pread(res->hr_localfd, ggio->gctl_data, 1351 ggio->gctl_length, 1352 ggio->gctl_offset + res->hr_localoff); 1353 if (ret == ggio->gctl_length) 1354 hio->hio_errors[ncomp] = 0; 1355 else if (!ISSYNCREQ(hio)) { 1356 /* 1357 * If READ failed, try to read from remote node. 1358 */ 1359 if (ret == -1) { 1360 reqlog(LOG_WARNING, 0, ggio, 1361 "Local request failed (%s), trying remote node. ", 1362 strerror(errno)); 1363 } else if (ret != ggio->gctl_length) { 1364 reqlog(LOG_WARNING, 0, ggio, 1365 "Local request failed (%zd != %jd), trying remote node. ", 1366 ret, (intmax_t)ggio->gctl_length); 1367 } 1368 QUEUE_INSERT1(hio, send, rncomp); 1369 continue; 1370 } 1371 break; 1372 case BIO_WRITE: 1373 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1374 ggio->gctl_length, 1375 ggio->gctl_offset + res->hr_localoff); 1376 if (ret == -1) { 1377 hio->hio_errors[ncomp] = errno; 1378 reqlog(LOG_WARNING, 0, ggio, 1379 "Local request failed (%s): ", 1380 strerror(errno)); 1381 } else if (ret != ggio->gctl_length) { 1382 hio->hio_errors[ncomp] = EIO; 1383 reqlog(LOG_WARNING, 0, ggio, 1384 "Local request failed (%zd != %jd): ", 1385 ret, (intmax_t)ggio->gctl_length); 1386 } else { 1387 hio->hio_errors[ncomp] = 0; 1388 if (hio->hio_replication == 1389 HAST_REPLICATION_ASYNC) { 1390 ggio->gctl_error = 0; 1391 write_complete(res, hio); 1392 } 1393 } 1394 break; 1395 case BIO_DELETE: 1396 ret = g_delete(res->hr_localfd, 1397 ggio->gctl_offset + res->hr_localoff, 1398 ggio->gctl_length); 1399 if (ret == -1) { 1400 hio->hio_errors[ncomp] = errno; 1401 reqlog(LOG_WARNING, 0, ggio, 1402 "Local request failed (%s): ", 1403 strerror(errno)); 1404 } else { 1405 hio->hio_errors[ncomp] = 0; 1406 } 1407 break; 1408 case BIO_FLUSH: 1409 if (!res->hr_localflush) { 1410 ret = -1; 1411 errno = EOPNOTSUPP; 1412 break; 1413 } 1414 ret = g_flush(res->hr_localfd); 1415 if (ret == -1) { 1416 if (errno == EOPNOTSUPP) 1417 res->hr_localflush = false; 1418 hio->hio_errors[ncomp] = errno; 1419 reqlog(LOG_WARNING, 0, ggio, 1420 "Local request failed (%s): ", 1421 strerror(errno)); 1422 } else { 1423 hio->hio_errors[ncomp] = 0; 1424 } 1425 break; 1426 } 1427 1428 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1429 ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1430 if (refcnt_release(&hio->hio_countdown) > 0) 1431 continue; 1432 } else { 1433 /* 1434 * Depending on hio_countdown value, requests finished 1435 * in the following order: 1436 * 0: remote memsync, remote final, local write 1437 * 1: remote memsync, local write, (remote final) 1438 * 2: local write, (remote memsync), (remote final) 1439 */ 1440 switch (refcnt_release(&hio->hio_countdown)) { 1441 case 0: 1442 /* 1443 * Local write finished as last. 1444 */ 1445 break; 1446 case 1: 1447 /* 1448 * Local write finished after remote memsync 1449 * reply arrvied. We can complete the write now. 1450 */ 1451 if (hio->hio_errors[0] == 0) 1452 write_complete(res, hio); 1453 continue; 1454 case 2: 1455 /* 1456 * Local write finished as first. 1457 */ 1458 continue; 1459 default: 1460 PJDLOG_ABORT("Invalid hio_countdown."); 1461 } 1462 } 1463 if (ISSYNCREQ(hio)) { 1464 mtx_lock(&sync_lock); 1465 SYNCREQDONE(hio); 1466 mtx_unlock(&sync_lock); 1467 cv_signal(&sync_cond); 1468 } else { 1469 pjdlog_debug(2, 1470 "local_send: (%p) Moving request to the done queue.", 1471 hio); 1472 QUEUE_INSERT2(hio, done); 1473 } 1474 } 1475 /* NOTREACHED */ 1476 return (NULL); 1477} 1478 1479static void 1480keepalive_send(struct hast_resource *res, unsigned int ncomp) 1481{ 1482 struct nv *nv; 1483 1484 rw_rlock(&hio_remote_lock[ncomp]); 1485 1486 if (!ISCONNECTED(res, ncomp)) { 1487 rw_unlock(&hio_remote_lock[ncomp]); 1488 return; 1489 } 1490 1491 PJDLOG_ASSERT(res->hr_remotein != NULL); 1492 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1493 1494 nv = nv_alloc(); 1495 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1496 if (nv_error(nv) != 0) { 1497 rw_unlock(&hio_remote_lock[ncomp]); 1498 nv_free(nv); 1499 pjdlog_debug(1, 1500 "keepalive_send: Unable to prepare header to send."); 1501 return; 1502 } 1503 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1504 rw_unlock(&hio_remote_lock[ncomp]); 1505 pjdlog_common(LOG_DEBUG, 1, errno, 1506 "keepalive_send: Unable to send request"); 1507 nv_free(nv); 1508 remote_close(res, ncomp); 1509 return; 1510 } 1511 1512 rw_unlock(&hio_remote_lock[ncomp]); 1513 nv_free(nv); 1514 pjdlog_debug(2, "keepalive_send: Request sent."); 1515} 1516 1517/* 1518 * Thread sends request to secondary node. 1519 */ 1520static void * 1521remote_send_thread(void *arg) 1522{ 1523 struct hast_resource *res = arg; 1524 struct g_gate_ctl_io *ggio; 1525 time_t lastcheck, now; 1526 struct hio *hio; 1527 struct nv *nv; 1528 unsigned int ncomp; 1529 bool wakeup; 1530 uint64_t offset, length; 1531 uint8_t cmd; 1532 void *data; 1533 1534 /* Remote component is 1 for now. */ 1535 ncomp = 1; 1536 lastcheck = time(NULL); 1537 1538 for (;;) { 1539 pjdlog_debug(2, "remote_send: Taking request."); 1540 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1541 if (hio == NULL) { 1542 now = time(NULL); 1543 if (lastcheck + HAST_KEEPALIVE <= now) { 1544 keepalive_send(res, ncomp); 1545 lastcheck = now; 1546 } 1547 continue; 1548 } 1549 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1550 ggio = &hio->hio_ggio; 1551 switch (ggio->gctl_cmd) { 1552 case BIO_READ: 1553 cmd = HIO_READ; 1554 data = NULL; 1555 offset = ggio->gctl_offset; 1556 length = ggio->gctl_length; 1557 break; 1558 case BIO_WRITE: 1559 cmd = HIO_WRITE; 1560 data = ggio->gctl_data; 1561 offset = ggio->gctl_offset; 1562 length = ggio->gctl_length; 1563 break; 1564 case BIO_DELETE: 1565 cmd = HIO_DELETE; 1566 data = NULL; 1567 offset = ggio->gctl_offset; 1568 length = ggio->gctl_length; 1569 break; 1570 case BIO_FLUSH: 1571 cmd = HIO_FLUSH; 1572 data = NULL; 1573 offset = 0; 1574 length = 0; 1575 break; 1576 default: 1577 PJDLOG_ABORT("invalid condition"); 1578 } 1579 nv = nv_alloc(); 1580 nv_add_uint8(nv, cmd, "cmd"); 1581 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1582 nv_add_uint64(nv, offset, "offset"); 1583 nv_add_uint64(nv, length, "length"); 1584 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1585 ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { 1586 nv_add_uint8(nv, 1, "memsync"); 1587 } 1588 if (nv_error(nv) != 0) { 1589 hio->hio_errors[ncomp] = nv_error(nv); 1590 pjdlog_debug(2, 1591 "remote_send: (%p) Unable to prepare header to send.", 1592 hio); 1593 reqlog(LOG_ERR, 0, ggio, 1594 "Unable to prepare header to send (%s): ", 1595 strerror(nv_error(nv))); 1596 /* Move failed request immediately to the done queue. */ 1597 goto done_queue; 1598 } 1599 /* 1600 * Protect connection from disappearing. 1601 */ 1602 rw_rlock(&hio_remote_lock[ncomp]); 1603 if (!ISCONNECTED(res, ncomp)) { 1604 rw_unlock(&hio_remote_lock[ncomp]); 1605 hio->hio_errors[ncomp] = ENOTCONN; 1606 goto done_queue; 1607 } 1608 /* 1609 * Move the request to recv queue before sending it, because 1610 * in different order we can get reply before we move request 1611 * to recv queue. 1612 */ 1613 pjdlog_debug(2, 1614 "remote_send: (%p) Moving request to the recv queue.", 1615 hio); 1616 mtx_lock(&hio_recv_list_lock[ncomp]); 1617 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1618 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1619 mtx_unlock(&hio_recv_list_lock[ncomp]); 1620 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1621 data != NULL ? length : 0) == -1) { 1622 hio->hio_errors[ncomp] = errno; 1623 rw_unlock(&hio_remote_lock[ncomp]); 1624 pjdlog_debug(2, 1625 "remote_send: (%p) Unable to send request.", hio); 1626 reqlog(LOG_ERR, 0, ggio, 1627 "Unable to send request (%s): ", 1628 strerror(hio->hio_errors[ncomp])); 1629 remote_close(res, ncomp); 1630 /* 1631 * Take request back from the receive queue and move 1632 * it immediately to the done queue. 1633 */ 1634 mtx_lock(&hio_recv_list_lock[ncomp]); 1635 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1636 hio_next[ncomp]); 1637 mtx_unlock(&hio_recv_list_lock[ncomp]); 1638 goto done_queue; 1639 } 1640 rw_unlock(&hio_remote_lock[ncomp]); 1641 nv_free(nv); 1642 if (wakeup) 1643 cv_signal(&hio_recv_list_cond[ncomp]); 1644 continue; 1645done_queue: 1646 nv_free(nv); 1647 if (ISSYNCREQ(hio)) { 1648 if (refcnt_release(&hio->hio_countdown) > 0) 1649 continue; 1650 mtx_lock(&sync_lock); 1651 SYNCREQDONE(hio); 1652 mtx_unlock(&sync_lock); 1653 cv_signal(&sync_cond); 1654 continue; 1655 } 1656 if (ggio->gctl_cmd == BIO_WRITE) { 1657 mtx_lock(&res->hr_amp_lock); 1658 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1659 ggio->gctl_length)) { 1660 (void)hast_activemap_flush(res); 1661 } else { 1662 mtx_unlock(&res->hr_amp_lock); 1663 } 1664 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) 1665 (void)refcnt_release(&hio->hio_countdown); 1666 } 1667 if (refcnt_release(&hio->hio_countdown) > 0) 1668 continue; 1669 pjdlog_debug(2, 1670 "remote_send: (%p) Moving request to the done queue.", 1671 hio); 1672 QUEUE_INSERT2(hio, done); 1673 } 1674 /* NOTREACHED */ 1675 return (NULL); 1676} 1677 1678/* 1679 * Thread receives answer from secondary node and passes it to ggate_send 1680 * thread. 1681 */ 1682static void * 1683remote_recv_thread(void *arg) 1684{ 1685 struct hast_resource *res = arg; 1686 struct g_gate_ctl_io *ggio; 1687 struct hio *hio; 1688 struct nv *nv; 1689 unsigned int ncomp; 1690 uint64_t seq; 1691 bool memsyncack; 1692 int error; 1693 1694 /* Remote component is 1 for now. */ 1695 ncomp = 1; 1696 1697 for (;;) { 1698 /* Wait until there is anything to receive. */ 1699 mtx_lock(&hio_recv_list_lock[ncomp]); 1700 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1701 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1702 cv_wait(&hio_recv_list_cond[ncomp], 1703 &hio_recv_list_lock[ncomp]); 1704 } 1705 mtx_unlock(&hio_recv_list_lock[ncomp]); 1706 1707 memsyncack = false; 1708 1709 rw_rlock(&hio_remote_lock[ncomp]); 1710 if (!ISCONNECTED(res, ncomp)) { 1711 rw_unlock(&hio_remote_lock[ncomp]); 1712 /* 1713 * Connection is dead, so move all pending requests to 1714 * the done queue (one-by-one). 1715 */ 1716 mtx_lock(&hio_recv_list_lock[ncomp]); 1717 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1718 PJDLOG_ASSERT(hio != NULL); 1719 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1720 hio_next[ncomp]); 1721 mtx_unlock(&hio_recv_list_lock[ncomp]); 1722 goto done_queue; 1723 } 1724 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1725 pjdlog_errno(LOG_ERR, 1726 "Unable to receive reply header"); 1727 rw_unlock(&hio_remote_lock[ncomp]); 1728 remote_close(res, ncomp); 1729 continue; 1730 } 1731 rw_unlock(&hio_remote_lock[ncomp]); 1732 seq = nv_get_uint64(nv, "seq"); 1733 if (seq == 0) { 1734 pjdlog_error("Header contains no 'seq' field."); 1735 nv_free(nv); 1736 continue; 1737 } 1738 memsyncack = nv_exists(nv, "received"); 1739 mtx_lock(&hio_recv_list_lock[ncomp]); 1740 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1741 if (hio->hio_ggio.gctl_seq == seq) { 1742 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1743 hio_next[ncomp]); 1744 break; 1745 } 1746 } 1747 mtx_unlock(&hio_recv_list_lock[ncomp]); 1748 if (hio == NULL) { 1749 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1750 (uintmax_t)seq); 1751 nv_free(nv); 1752 continue; 1753 } 1754 ggio = &hio->hio_ggio; 1755 error = nv_get_int16(nv, "error"); 1756 if (error != 0) { 1757 /* Request failed on remote side. */ 1758 hio->hio_errors[ncomp] = error; 1759 reqlog(LOG_WARNING, 0, ggio, 1760 "Remote request failed (%s): ", strerror(error)); 1761 nv_free(nv); 1762 goto done_queue; 1763 } 1764 switch (ggio->gctl_cmd) { 1765 case BIO_READ: 1766 rw_rlock(&hio_remote_lock[ncomp]); 1767 if (!ISCONNECTED(res, ncomp)) { 1768 rw_unlock(&hio_remote_lock[ncomp]); 1769 nv_free(nv); 1770 goto done_queue; 1771 } 1772 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1773 ggio->gctl_data, ggio->gctl_length) == -1) { 1774 hio->hio_errors[ncomp] = errno; 1775 pjdlog_errno(LOG_ERR, 1776 "Unable to receive reply data"); 1777 rw_unlock(&hio_remote_lock[ncomp]); 1778 nv_free(nv); 1779 remote_close(res, ncomp); 1780 goto done_queue; 1781 } 1782 rw_unlock(&hio_remote_lock[ncomp]); 1783 break; 1784 case BIO_WRITE: 1785 case BIO_DELETE: 1786 case BIO_FLUSH: 1787 break; 1788 default: 1789 PJDLOG_ABORT("invalid condition"); 1790 } 1791 hio->hio_errors[ncomp] = 0; 1792 nv_free(nv); 1793done_queue: 1794 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1795 hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1796 if (refcnt_release(&hio->hio_countdown) > 0) 1797 continue; 1798 } else { 1799 /* 1800 * Depending on hio_countdown value, requests finished 1801 * in the following order: 1802 * 1803 * 0: local write, remote memsync, remote final 1804 * or 1805 * 0: remote memsync, local write, remote final 1806 * 1807 * 1: local write, remote memsync, (remote final) 1808 * or 1809 * 1: remote memsync, remote final, (local write) 1810 * 1811 * 2: remote memsync, (local write), (remote final) 1812 * or 1813 * 2: remote memsync, (remote final), (local write) 1814 */ 1815 switch (refcnt_release(&hio->hio_countdown)) { 1816 case 0: 1817 /* 1818 * Remote final reply arrived. 1819 */ 1820 PJDLOG_ASSERT(!memsyncack); 1821 break; 1822 case 1: 1823 if (memsyncack) { 1824 /* 1825 * Local request already finished, so we 1826 * can complete the write. 1827 */ 1828 if (hio->hio_errors[0] == 0) 1829 write_complete(res, hio); 1830 /* 1831 * We still need to wait for final 1832 * remote reply. 1833 */ 1834 pjdlog_debug(2, 1835 "remote_recv: (%p) Moving request back to the recv queue.", 1836 hio); 1837 mtx_lock(&hio_recv_list_lock[ncomp]); 1838 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1839 hio, hio_next[ncomp]); 1840 mtx_unlock(&hio_recv_list_lock[ncomp]); 1841 } else { 1842 /* 1843 * Remote final reply arrived before 1844 * local write finished. 1845 * Nothing to do in such case. 1846 */ 1847 } 1848 continue; 1849 case 2: 1850 /* 1851 * We received remote memsync reply even before 1852 * local write finished. 1853 */ 1854 PJDLOG_ASSERT(memsyncack); 1855 1856 pjdlog_debug(2, 1857 "remote_recv: (%p) Moving request back to the recv queue.", 1858 hio); 1859 mtx_lock(&hio_recv_list_lock[ncomp]); 1860 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, 1861 hio_next[ncomp]); 1862 mtx_unlock(&hio_recv_list_lock[ncomp]); 1863 continue; 1864 default: 1865 PJDLOG_ABORT("Invalid hio_countdown."); 1866 } 1867 } 1868 if (ISSYNCREQ(hio)) { 1869 mtx_lock(&sync_lock); 1870 SYNCREQDONE(hio); 1871 mtx_unlock(&sync_lock); 1872 cv_signal(&sync_cond); 1873 } else { 1874 pjdlog_debug(2, 1875 "remote_recv: (%p) Moving request to the done queue.", 1876 hio); 1877 QUEUE_INSERT2(hio, done); 1878 } 1879 } 1880 /* NOTREACHED */ 1881 return (NULL); 1882} 1883 1884/* 1885 * Thread sends answer to the kernel. 1886 */ 1887static void * 1888ggate_send_thread(void *arg) 1889{ 1890 struct hast_resource *res = arg; 1891 struct g_gate_ctl_io *ggio; 1892 struct hio *hio; 1893 unsigned int ii, ncomps; 1894 1895 ncomps = HAST_NCOMPONENTS; 1896 1897 for (;;) { 1898 pjdlog_debug(2, "ggate_send: Taking request."); 1899 QUEUE_TAKE2(hio, done); 1900 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1901 ggio = &hio->hio_ggio; 1902 for (ii = 0; ii < ncomps; ii++) { 1903 if (hio->hio_errors[ii] == 0) { 1904 /* 1905 * One successful request is enough to declare 1906 * success. 1907 */ 1908 ggio->gctl_error = 0; 1909 break; 1910 } 1911 } 1912 if (ii == ncomps) { 1913 /* 1914 * None of the requests were successful. 1915 * Use the error from local component except the 1916 * case when we did only remote request. 1917 */ 1918 if (ggio->gctl_cmd == BIO_READ && 1919 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1920 ggio->gctl_error = hio->hio_errors[1]; 1921 else 1922 ggio->gctl_error = hio->hio_errors[0]; 1923 } 1924 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1925 mtx_lock(&res->hr_amp_lock); 1926 if (activemap_write_complete(res->hr_amp, 1927 ggio->gctl_offset, ggio->gctl_length)) { 1928 res->hr_stat_activemap_update++; 1929 (void)hast_activemap_flush(res); 1930 } else { 1931 mtx_unlock(&res->hr_amp_lock); 1932 } 1933 } 1934 if (ggio->gctl_cmd == BIO_WRITE) { 1935 /* 1936 * Unlock range we locked. 1937 */ 1938 mtx_lock(&range_lock); 1939 rangelock_del(range_regular, ggio->gctl_offset, 1940 ggio->gctl_length); 1941 if (range_sync_wait) 1942 cv_signal(&range_sync_cond); 1943 mtx_unlock(&range_lock); 1944 if (!hio->hio_done) 1945 write_complete(res, hio); 1946 } else { 1947 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1948 primary_exit(EX_OSERR, 1949 "G_GATE_CMD_DONE failed"); 1950 } 1951 } 1952 if (hio->hio_errors[0]) { 1953 switch (ggio->gctl_cmd) { 1954 case BIO_READ: 1955 res->hr_stat_read_error++; 1956 break; 1957 case BIO_WRITE: 1958 res->hr_stat_write_error++; 1959 break; 1960 case BIO_DELETE: 1961 res->hr_stat_delete_error++; 1962 break; 1963 case BIO_FLUSH: 1964 res->hr_stat_flush_error++; 1965 break; 1966 } 1967 } 1968 pjdlog_debug(2, 1969 "ggate_send: (%p) Moving request to the free queue.", hio); 1970 QUEUE_INSERT2(hio, free); 1971 } 1972 /* NOTREACHED */ 1973 return (NULL); 1974} 1975 1976/* 1977 * Thread synchronize local and remote components. 1978 */ 1979static void * 1980sync_thread(void *arg __unused) 1981{ 1982 struct hast_resource *res = arg; 1983 struct hio *hio; 1984 struct g_gate_ctl_io *ggio; 1985 struct timeval tstart, tend, tdiff; 1986 unsigned int ii, ncomp, ncomps; 1987 off_t offset, length, synced; 1988 bool dorewind, directreads; 1989 int syncext; 1990 1991 ncomps = HAST_NCOMPONENTS; 1992 dorewind = true; 1993 synced = 0; 1994 offset = -1; 1995 directreads = false; 1996 1997 for (;;) { 1998 mtx_lock(&sync_lock); 1999 if (offset >= 0 && !sync_inprogress) { 2000 gettimeofday(&tend, NULL); 2001 timersub(&tend, &tstart, &tdiff); 2002 pjdlog_info("Synchronization interrupted after %#.0T. " 2003 "%NB synchronized so far.", &tdiff, 2004 (intmax_t)synced); 2005 event_send(res, EVENT_SYNCINTR); 2006 } 2007 while (!sync_inprogress) { 2008 dorewind = true; 2009 synced = 0; 2010 cv_wait(&sync_cond, &sync_lock); 2011 } 2012 mtx_unlock(&sync_lock); 2013 /* 2014 * Obtain offset at which we should synchronize. 2015 * Rewind synchronization if needed. 2016 */ 2017 mtx_lock(&res->hr_amp_lock); 2018 if (dorewind) 2019 activemap_sync_rewind(res->hr_amp); 2020 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2021 if (syncext != -1) { 2022 /* 2023 * We synchronized entire syncext extent, we can mark 2024 * it as clean now. 2025 */ 2026 if (activemap_extent_complete(res->hr_amp, syncext)) 2027 (void)hast_activemap_flush(res); 2028 else 2029 mtx_unlock(&res->hr_amp_lock); 2030 } else { 2031 mtx_unlock(&res->hr_amp_lock); 2032 } 2033 if (dorewind) { 2034 dorewind = false; 2035 if (offset == -1) 2036 pjdlog_info("Nodes are in sync."); 2037 else { 2038 pjdlog_info("Synchronization started. %NB to go.", 2039 (intmax_t)(res->hr_extentsize * 2040 activemap_ndirty(res->hr_amp))); 2041 event_send(res, EVENT_SYNCSTART); 2042 gettimeofday(&tstart, NULL); 2043 } 2044 } 2045 if (offset == -1) { 2046 sync_stop(); 2047 pjdlog_debug(1, "Nothing to synchronize."); 2048 /* 2049 * Synchronization complete, make both localcnt and 2050 * remotecnt equal. 2051 */ 2052 ncomp = 1; 2053 rw_rlock(&hio_remote_lock[ncomp]); 2054 if (ISCONNECTED(res, ncomp)) { 2055 if (synced > 0) { 2056 int64_t bps; 2057 2058 gettimeofday(&tend, NULL); 2059 timersub(&tend, &tstart, &tdiff); 2060 bps = (int64_t)((double)synced / 2061 ((double)tdiff.tv_sec + 2062 (double)tdiff.tv_usec / 1000000)); 2063 pjdlog_info("Synchronization complete. " 2064 "%NB synchronized in %#.0lT (%NB/sec).", 2065 (intmax_t)synced, &tdiff, 2066 (intmax_t)bps); 2067 event_send(res, EVENT_SYNCDONE); 2068 } 2069 mtx_lock(&metadata_lock); 2070 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2071 directreads = true; 2072 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2073 res->hr_primary_localcnt = 2074 res->hr_secondary_remotecnt; 2075 res->hr_primary_remotecnt = 2076 res->hr_secondary_localcnt; 2077 pjdlog_debug(1, 2078 "Setting localcnt to %ju and remotecnt to %ju.", 2079 (uintmax_t)res->hr_primary_localcnt, 2080 (uintmax_t)res->hr_primary_remotecnt); 2081 (void)metadata_write(res); 2082 mtx_unlock(&metadata_lock); 2083 } 2084 rw_unlock(&hio_remote_lock[ncomp]); 2085 if (directreads) { 2086 directreads = false; 2087 enable_direct_reads(res); 2088 } 2089 continue; 2090 } 2091 pjdlog_debug(2, "sync: Taking free request."); 2092 QUEUE_TAKE2(hio, free); 2093 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2094 /* 2095 * Lock the range we are going to synchronize. We don't want 2096 * race where someone writes between our read and write. 2097 */ 2098 for (;;) { 2099 mtx_lock(&range_lock); 2100 if (rangelock_islocked(range_regular, offset, length)) { 2101 pjdlog_debug(2, 2102 "sync: Range offset=%jd length=%jd locked.", 2103 (intmax_t)offset, (intmax_t)length); 2104 range_sync_wait = true; 2105 cv_wait(&range_sync_cond, &range_lock); 2106 range_sync_wait = false; 2107 mtx_unlock(&range_lock); 2108 continue; 2109 } 2110 if (rangelock_add(range_sync, offset, length) == -1) { 2111 mtx_unlock(&range_lock); 2112 pjdlog_debug(2, 2113 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2114 (intmax_t)offset, (intmax_t)length); 2115 sleep(1); 2116 continue; 2117 } 2118 mtx_unlock(&range_lock); 2119 break; 2120 } 2121 /* 2122 * First read the data from synchronization source. 2123 */ 2124 SYNCREQ(hio); 2125 ggio = &hio->hio_ggio; 2126 ggio->gctl_cmd = BIO_READ; 2127 ggio->gctl_offset = offset; 2128 ggio->gctl_length = length; 2129 ggio->gctl_error = 0; 2130 hio->hio_done = false; 2131 hio->hio_replication = res->hr_replication; 2132 for (ii = 0; ii < ncomps; ii++) 2133 hio->hio_errors[ii] = EINVAL; 2134 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2135 hio); 2136 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2137 hio); 2138 mtx_lock(&metadata_lock); 2139 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2140 /* 2141 * This range is up-to-date on local component, 2142 * so handle request locally. 2143 */ 2144 /* Local component is 0 for now. */ 2145 ncomp = 0; 2146 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2147 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2148 /* 2149 * This range is out-of-date on local component, 2150 * so send request to the remote node. 2151 */ 2152 /* Remote component is 1 for now. */ 2153 ncomp = 1; 2154 } 2155 mtx_unlock(&metadata_lock); 2156 hio->hio_countdown = 1; 2157 QUEUE_INSERT1(hio, send, ncomp); 2158 2159 /* 2160 * Let's wait for READ to finish. 2161 */ 2162 mtx_lock(&sync_lock); 2163 while (!ISSYNCREQDONE(hio)) 2164 cv_wait(&sync_cond, &sync_lock); 2165 mtx_unlock(&sync_lock); 2166 2167 if (hio->hio_errors[ncomp] != 0) { 2168 pjdlog_error("Unable to read synchronization data: %s.", 2169 strerror(hio->hio_errors[ncomp])); 2170 goto free_queue; 2171 } 2172 2173 /* 2174 * We read the data from synchronization source, now write it 2175 * to synchronization target. 2176 */ 2177 SYNCREQ(hio); 2178 ggio->gctl_cmd = BIO_WRITE; 2179 for (ii = 0; ii < ncomps; ii++) 2180 hio->hio_errors[ii] = EINVAL; 2181 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2182 hio); 2183 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2184 hio); 2185 mtx_lock(&metadata_lock); 2186 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2187 /* 2188 * This range is up-to-date on local component, 2189 * so we update remote component. 2190 */ 2191 /* Remote component is 1 for now. */ 2192 ncomp = 1; 2193 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2194 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2195 /* 2196 * This range is out-of-date on local component, 2197 * so we update it. 2198 */ 2199 /* Local component is 0 for now. */ 2200 ncomp = 0; 2201 } 2202 mtx_unlock(&metadata_lock); 2203 2204 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2205 hio); 2206 hio->hio_countdown = 1; 2207 QUEUE_INSERT1(hio, send, ncomp); 2208 2209 /* 2210 * Let's wait for WRITE to finish. 2211 */ 2212 mtx_lock(&sync_lock); 2213 while (!ISSYNCREQDONE(hio)) 2214 cv_wait(&sync_cond, &sync_lock); 2215 mtx_unlock(&sync_lock); 2216 2217 if (hio->hio_errors[ncomp] != 0) { 2218 pjdlog_error("Unable to write synchronization data: %s.", 2219 strerror(hio->hio_errors[ncomp])); 2220 goto free_queue; 2221 } 2222 2223 synced += length; 2224free_queue: 2225 mtx_lock(&range_lock); 2226 rangelock_del(range_sync, offset, length); 2227 if (range_regular_wait) 2228 cv_signal(&range_regular_cond); 2229 mtx_unlock(&range_lock); 2230 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2231 hio); 2232 QUEUE_INSERT2(hio, free); 2233 } 2234 /* NOTREACHED */ 2235 return (NULL); 2236} 2237 2238void 2239primary_config_reload(struct hast_resource *res, struct nv *nv) 2240{ 2241 unsigned int ii, ncomps; 2242 int modified, vint; 2243 const char *vstr; 2244 2245 pjdlog_info("Reloading configuration..."); 2246 2247 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2248 PJDLOG_ASSERT(gres == res); 2249 nv_assert(nv, "remoteaddr"); 2250 nv_assert(nv, "sourceaddr"); 2251 nv_assert(nv, "replication"); 2252 nv_assert(nv, "checksum"); 2253 nv_assert(nv, "compression"); 2254 nv_assert(nv, "timeout"); 2255 nv_assert(nv, "exec"); 2256 nv_assert(nv, "metaflush"); 2257 2258 ncomps = HAST_NCOMPONENTS; 2259 2260#define MODIFIED_REMOTEADDR 0x01 2261#define MODIFIED_SOURCEADDR 0x02 2262#define MODIFIED_REPLICATION 0x04 2263#define MODIFIED_CHECKSUM 0x08 2264#define MODIFIED_COMPRESSION 0x10 2265#define MODIFIED_TIMEOUT 0x20 2266#define MODIFIED_EXEC 0x40 2267#define MODIFIED_METAFLUSH 0x80 2268 modified = 0; 2269 2270 vstr = nv_get_string(nv, "remoteaddr"); 2271 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2272 /* 2273 * Don't copy res->hr_remoteaddr to gres just yet. 2274 * We want remote_close() to log disconnect from the old 2275 * addresses, not from the new ones. 2276 */ 2277 modified |= MODIFIED_REMOTEADDR; 2278 } 2279 vstr = nv_get_string(nv, "sourceaddr"); 2280 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2281 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2282 modified |= MODIFIED_SOURCEADDR; 2283 } 2284 vint = nv_get_int32(nv, "replication"); 2285 if (gres->hr_replication != vint) { 2286 gres->hr_replication = vint; 2287 modified |= MODIFIED_REPLICATION; 2288 } 2289 vint = nv_get_int32(nv, "checksum"); 2290 if (gres->hr_checksum != vint) { 2291 gres->hr_checksum = vint; 2292 modified |= MODIFIED_CHECKSUM; 2293 } 2294 vint = nv_get_int32(nv, "compression"); 2295 if (gres->hr_compression != vint) { 2296 gres->hr_compression = vint; 2297 modified |= MODIFIED_COMPRESSION; 2298 } 2299 vint = nv_get_int32(nv, "timeout"); 2300 if (gres->hr_timeout != vint) { 2301 gres->hr_timeout = vint; 2302 modified |= MODIFIED_TIMEOUT; 2303 } 2304 vstr = nv_get_string(nv, "exec"); 2305 if (strcmp(gres->hr_exec, vstr) != 0) { 2306 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2307 modified |= MODIFIED_EXEC; 2308 } 2309 vint = nv_get_int32(nv, "metaflush"); 2310 if (gres->hr_metaflush != vint) { 2311 gres->hr_metaflush = vint; 2312 modified |= MODIFIED_METAFLUSH; 2313 } 2314 2315 /* 2316 * Change timeout for connected sockets. 2317 * Don't bother if we need to reconnect. 2318 */ 2319 if ((modified & MODIFIED_TIMEOUT) != 0 && 2320 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2321 for (ii = 0; ii < ncomps; ii++) { 2322 if (!ISREMOTE(ii)) 2323 continue; 2324 rw_rlock(&hio_remote_lock[ii]); 2325 if (!ISCONNECTED(gres, ii)) { 2326 rw_unlock(&hio_remote_lock[ii]); 2327 continue; 2328 } 2329 rw_unlock(&hio_remote_lock[ii]); 2330 if (proto_timeout(gres->hr_remotein, 2331 gres->hr_timeout) == -1) { 2332 pjdlog_errno(LOG_WARNING, 2333 "Unable to set connection timeout"); 2334 } 2335 if (proto_timeout(gres->hr_remoteout, 2336 gres->hr_timeout) == -1) { 2337 pjdlog_errno(LOG_WARNING, 2338 "Unable to set connection timeout"); 2339 } 2340 } 2341 } 2342 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2343 for (ii = 0; ii < ncomps; ii++) { 2344 if (!ISREMOTE(ii)) 2345 continue; 2346 remote_close(gres, ii); 2347 } 2348 if (modified & MODIFIED_REMOTEADDR) { 2349 vstr = nv_get_string(nv, "remoteaddr"); 2350 strlcpy(gres->hr_remoteaddr, vstr, 2351 sizeof(gres->hr_remoteaddr)); 2352 } 2353 } 2354#undef MODIFIED_REMOTEADDR 2355#undef MODIFIED_SOURCEADDR 2356#undef MODIFIED_REPLICATION 2357#undef MODIFIED_CHECKSUM 2358#undef MODIFIED_COMPRESSION 2359#undef MODIFIED_TIMEOUT 2360#undef MODIFIED_EXEC 2361#undef MODIFIED_METAFLUSH 2362 2363 pjdlog_info("Configuration reloaded successfully."); 2364} 2365 2366static void 2367guard_one(struct hast_resource *res, unsigned int ncomp) 2368{ 2369 struct proto_conn *in, *out; 2370 2371 if (!ISREMOTE(ncomp)) 2372 return; 2373 2374 rw_rlock(&hio_remote_lock[ncomp]); 2375 2376 if (!real_remote(res)) { 2377 rw_unlock(&hio_remote_lock[ncomp]); 2378 return; 2379 } 2380 2381 if (ISCONNECTED(res, ncomp)) { 2382 PJDLOG_ASSERT(res->hr_remotein != NULL); 2383 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2384 rw_unlock(&hio_remote_lock[ncomp]); 2385 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2386 res->hr_remoteaddr); 2387 return; 2388 } 2389 2390 PJDLOG_ASSERT(res->hr_remotein == NULL); 2391 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2392 /* 2393 * Upgrade the lock. It doesn't have to be atomic as no other thread 2394 * can change connection status from disconnected to connected. 2395 */ 2396 rw_unlock(&hio_remote_lock[ncomp]); 2397 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2398 res->hr_remoteaddr); 2399 in = out = NULL; 2400 if (init_remote(res, &in, &out) == 0) { 2401 rw_wlock(&hio_remote_lock[ncomp]); 2402 PJDLOG_ASSERT(res->hr_remotein == NULL); 2403 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2404 PJDLOG_ASSERT(in != NULL && out != NULL); 2405 res->hr_remotein = in; 2406 res->hr_remoteout = out; 2407 rw_unlock(&hio_remote_lock[ncomp]); 2408 pjdlog_info("Successfully reconnected to %s.", 2409 res->hr_remoteaddr); 2410 sync_start(); 2411 } else { 2412 /* Both connections should be NULL. */ 2413 PJDLOG_ASSERT(res->hr_remotein == NULL); 2414 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2415 PJDLOG_ASSERT(in == NULL && out == NULL); 2416 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2417 res->hr_remoteaddr); 2418 } 2419} 2420 2421/* 2422 * Thread guards remote connections and reconnects when needed, handles 2423 * signals, etc. 2424 */ 2425static void * 2426guard_thread(void *arg) 2427{ 2428 struct hast_resource *res = arg; 2429 unsigned int ii, ncomps; 2430 struct timespec timeout; 2431 time_t lastcheck, now; 2432 sigset_t mask; 2433 int signo; 2434 2435 ncomps = HAST_NCOMPONENTS; 2436 lastcheck = time(NULL); 2437 2438 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2439 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2440 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2441 2442 timeout.tv_sec = HAST_KEEPALIVE; 2443 timeout.tv_nsec = 0; 2444 signo = -1; 2445 2446 for (;;) { 2447 switch (signo) { 2448 case SIGINT: 2449 case SIGTERM: 2450 sigexit_received = true; 2451 primary_exitx(EX_OK, 2452 "Termination signal received, exiting."); 2453 break; 2454 default: 2455 break; 2456 } 2457 2458 /* 2459 * Don't check connections until we fully started, 2460 * as we may still be looping, waiting for remote node 2461 * to switch from primary to secondary. 2462 */ 2463 if (fullystarted) { 2464 pjdlog_debug(2, "remote_guard: Checking connections."); 2465 now = time(NULL); 2466 if (lastcheck + HAST_KEEPALIVE <= now) { 2467 for (ii = 0; ii < ncomps; ii++) 2468 guard_one(res, ii); 2469 lastcheck = now; 2470 } 2471 } 2472 signo = sigtimedwait(&mask, NULL, &timeout); 2473 } 2474 /* NOTREACHED */ 2475 return (NULL); 2476} 2477