primary.c revision 213530
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 213530 2010-10-07 18:19:02Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <assert.h> 44#include <err.h> 45#include <errno.h> 46#include <fcntl.h> 47#include <libgeom.h> 48#include <pthread.h> 49#include <signal.h> 50#include <stdint.h> 51#include <stdio.h> 52#include <string.h> 53#include <sysexits.h> 54#include <unistd.h> 55 56#include <activemap.h> 57#include <nv.h> 58#include <rangelock.h> 59 60#include "control.h" 61#include "event.h" 62#include "hast.h" 63#include "hast_proto.h" 64#include "hastd.h" 65#include "hooks.h" 66#include "metadata.h" 67#include "proto.h" 68#include "pjdlog.h" 69#include "subr.h" 70#include "synch.h" 71 72/* The is only one remote component for now. */ 73#define ISREMOTE(no) ((no) == 1) 74 75struct hio { 76 /* 77 * Number of components we are still waiting for. 78 * When this field goes to 0, we can send the request back to the 79 * kernel. Each component has to decrease this counter by one 80 * even on failure. 81 */ 82 unsigned int hio_countdown; 83 /* 84 * Each component has a place to store its own error. 85 * Once the request is handled by all components we can decide if the 86 * request overall is successful or not. 87 */ 88 int *hio_errors; 89 /* 90 * Structure used to comunicate with GEOM Gate class. 91 */ 92 struct g_gate_ctl_io hio_ggio; 93 TAILQ_ENTRY(hio) *hio_next; 94}; 95#define hio_free_next hio_next[0] 96#define hio_done_next hio_next[0] 97 98/* 99 * Free list holds unused structures. When free list is empty, we have to wait 100 * until some in-progress requests are freed. 101 */ 102static TAILQ_HEAD(, hio) hio_free_list; 103static pthread_mutex_t hio_free_list_lock; 104static pthread_cond_t hio_free_list_cond; 105/* 106 * There is one send list for every component. One requests is placed on all 107 * send lists - each component gets the same request, but each component is 108 * responsible for managing his own send list. 109 */ 110static TAILQ_HEAD(, hio) *hio_send_list; 111static pthread_mutex_t *hio_send_list_lock; 112static pthread_cond_t *hio_send_list_cond; 113/* 114 * There is one recv list for every component, although local components don't 115 * use recv lists as local requests are done synchronously. 116 */ 117static TAILQ_HEAD(, hio) *hio_recv_list; 118static pthread_mutex_t *hio_recv_list_lock; 119static pthread_cond_t *hio_recv_list_cond; 120/* 121 * Request is placed on done list by the slowest component (the one that 122 * decreased hio_countdown from 1 to 0). 123 */ 124static TAILQ_HEAD(, hio) hio_done_list; 125static pthread_mutex_t hio_done_list_lock; 126static pthread_cond_t hio_done_list_cond; 127/* 128 * Structure below are for interaction with sync thread. 129 */ 130static bool sync_inprogress; 131static pthread_mutex_t sync_lock; 132static pthread_cond_t sync_cond; 133/* 134 * The lock below allows to synchornize access to remote connections. 135 */ 136static pthread_rwlock_t *hio_remote_lock; 137 138/* 139 * Lock to synchronize metadata updates. Also synchronize access to 140 * hr_primary_localcnt and hr_primary_remotecnt fields. 141 */ 142static pthread_mutex_t metadata_lock; 143 144/* 145 * Maximum number of outstanding I/O requests. 146 */ 147#define HAST_HIO_MAX 256 148/* 149 * Number of components. At this point there are only two components: local 150 * and remote, but in the future it might be possible to use multiple local 151 * and remote components. 152 */ 153#define HAST_NCOMPONENTS 2 154/* 155 * Number of seconds to sleep between reconnect retries or keepalive packets. 156 */ 157#define RETRY_SLEEP 10 158 159#define ISCONNECTED(res, no) \ 160 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 161 162#define QUEUE_INSERT1(hio, name, ncomp) do { \ 163 bool _wakeup; \ 164 \ 165 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 166 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 167 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 168 hio_next[(ncomp)]); \ 169 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 170 if (_wakeup) \ 171 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 172} while (0) 173#define QUEUE_INSERT2(hio, name) do { \ 174 bool _wakeup; \ 175 \ 176 mtx_lock(&hio_##name##_list_lock); \ 177 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 178 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 179 mtx_unlock(&hio_##name##_list_lock); \ 180 if (_wakeup) \ 181 cv_signal(&hio_##name##_list_cond); \ 182} while (0) 183#define QUEUE_TAKE1(hio, name, ncomp) do { \ 184 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 185 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 186 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 187 &hio_##name##_list_lock[(ncomp)]); \ 188 } \ 189 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 190 hio_next[(ncomp)]); \ 191 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 192} while (0) 193#define QUEUE_TAKE2(hio, name) do { \ 194 mtx_lock(&hio_##name##_list_lock); \ 195 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 196 cv_wait(&hio_##name##_list_cond, \ 197 &hio_##name##_list_lock); \ 198 } \ 199 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 200 mtx_unlock(&hio_##name##_list_lock); \ 201} while (0) 202 203#define SYNCREQ(hio) do { \ 204 (hio)->hio_ggio.gctl_unit = -1; \ 205 (hio)->hio_ggio.gctl_seq = 1; \ 206} while (0) 207#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 208#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 209#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 210 211static struct hast_resource *gres; 212 213static pthread_mutex_t range_lock; 214static struct rangelocks *range_regular; 215static bool range_regular_wait; 216static pthread_cond_t range_regular_cond; 217static struct rangelocks *range_sync; 218static bool range_sync_wait; 219static pthread_cond_t range_sync_cond; 220 221static void *ggate_recv_thread(void *arg); 222static void *local_send_thread(void *arg); 223static void *remote_send_thread(void *arg); 224static void *remote_recv_thread(void *arg); 225static void *ggate_send_thread(void *arg); 226static void *sync_thread(void *arg); 227static void *guard_thread(void *arg); 228 229static void 230cleanup(struct hast_resource *res) 231{ 232 int rerrno; 233 234 /* Remember errno. */ 235 rerrno = errno; 236 237 /* Destroy ggate provider if we created one. */ 238 if (res->hr_ggateunit >= 0) { 239 struct g_gate_ctl_destroy ggiod; 240 241 ggiod.gctl_version = G_GATE_VERSION; 242 ggiod.gctl_unit = res->hr_ggateunit; 243 ggiod.gctl_force = 1; 244 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 245 pjdlog_warning("Unable to destroy hast/%s device", 246 res->hr_provname); 247 } 248 res->hr_ggateunit = -1; 249 } 250 251 /* Restore errno. */ 252 errno = rerrno; 253} 254 255static __dead2 void 256primary_exit(int exitcode, const char *fmt, ...) 257{ 258 va_list ap; 259 260 assert(exitcode != EX_OK); 261 va_start(ap, fmt); 262 pjdlogv_errno(LOG_ERR, fmt, ap); 263 va_end(ap); 264 cleanup(gres); 265 exit(exitcode); 266} 267 268static __dead2 void 269primary_exitx(int exitcode, const char *fmt, ...) 270{ 271 va_list ap; 272 273 va_start(ap, fmt); 274 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 275 va_end(ap); 276 cleanup(gres); 277 exit(exitcode); 278} 279 280static int 281hast_activemap_flush(struct hast_resource *res) 282{ 283 const unsigned char *buf; 284 size_t size; 285 286 buf = activemap_bitmap(res->hr_amp, &size); 287 assert(buf != NULL); 288 assert((size % res->hr_local_sectorsize) == 0); 289 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 290 (ssize_t)size) { 291 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 292 "Unable to flush activemap to disk")); 293 return (-1); 294 } 295 return (0); 296} 297 298static bool 299real_remote(const struct hast_resource *res) 300{ 301 302 return (strcmp(res->hr_remoteaddr, "none") != 0); 303} 304 305static void 306init_environment(struct hast_resource *res __unused) 307{ 308 struct hio *hio; 309 unsigned int ii, ncomps; 310 311 /* 312 * In the future it might be per-resource value. 313 */ 314 ncomps = HAST_NCOMPONENTS; 315 316 /* 317 * Allocate memory needed by lists. 318 */ 319 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 320 if (hio_send_list == NULL) { 321 primary_exitx(EX_TEMPFAIL, 322 "Unable to allocate %zu bytes of memory for send lists.", 323 sizeof(hio_send_list[0]) * ncomps); 324 } 325 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 326 if (hio_send_list_lock == NULL) { 327 primary_exitx(EX_TEMPFAIL, 328 "Unable to allocate %zu bytes of memory for send list locks.", 329 sizeof(hio_send_list_lock[0]) * ncomps); 330 } 331 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 332 if (hio_send_list_cond == NULL) { 333 primary_exitx(EX_TEMPFAIL, 334 "Unable to allocate %zu bytes of memory for send list condition variables.", 335 sizeof(hio_send_list_cond[0]) * ncomps); 336 } 337 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 338 if (hio_recv_list == NULL) { 339 primary_exitx(EX_TEMPFAIL, 340 "Unable to allocate %zu bytes of memory for recv lists.", 341 sizeof(hio_recv_list[0]) * ncomps); 342 } 343 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 344 if (hio_recv_list_lock == NULL) { 345 primary_exitx(EX_TEMPFAIL, 346 "Unable to allocate %zu bytes of memory for recv list locks.", 347 sizeof(hio_recv_list_lock[0]) * ncomps); 348 } 349 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 350 if (hio_recv_list_cond == NULL) { 351 primary_exitx(EX_TEMPFAIL, 352 "Unable to allocate %zu bytes of memory for recv list condition variables.", 353 sizeof(hio_recv_list_cond[0]) * ncomps); 354 } 355 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 356 if (hio_remote_lock == NULL) { 357 primary_exitx(EX_TEMPFAIL, 358 "Unable to allocate %zu bytes of memory for remote connections locks.", 359 sizeof(hio_remote_lock[0]) * ncomps); 360 } 361 362 /* 363 * Initialize lists, their locks and theirs condition variables. 364 */ 365 TAILQ_INIT(&hio_free_list); 366 mtx_init(&hio_free_list_lock); 367 cv_init(&hio_free_list_cond); 368 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 369 TAILQ_INIT(&hio_send_list[ii]); 370 mtx_init(&hio_send_list_lock[ii]); 371 cv_init(&hio_send_list_cond[ii]); 372 TAILQ_INIT(&hio_recv_list[ii]); 373 mtx_init(&hio_recv_list_lock[ii]); 374 cv_init(&hio_recv_list_cond[ii]); 375 rw_init(&hio_remote_lock[ii]); 376 } 377 TAILQ_INIT(&hio_done_list); 378 mtx_init(&hio_done_list_lock); 379 cv_init(&hio_done_list_cond); 380 mtx_init(&metadata_lock); 381 382 /* 383 * Allocate requests pool and initialize requests. 384 */ 385 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 386 hio = malloc(sizeof(*hio)); 387 if (hio == NULL) { 388 primary_exitx(EX_TEMPFAIL, 389 "Unable to allocate %zu bytes of memory for hio request.", 390 sizeof(*hio)); 391 } 392 hio->hio_countdown = 0; 393 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 394 if (hio->hio_errors == NULL) { 395 primary_exitx(EX_TEMPFAIL, 396 "Unable allocate %zu bytes of memory for hio errors.", 397 sizeof(hio->hio_errors[0]) * ncomps); 398 } 399 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 400 if (hio->hio_next == NULL) { 401 primary_exitx(EX_TEMPFAIL, 402 "Unable allocate %zu bytes of memory for hio_next field.", 403 sizeof(hio->hio_next[0]) * ncomps); 404 } 405 hio->hio_ggio.gctl_version = G_GATE_VERSION; 406 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 407 if (hio->hio_ggio.gctl_data == NULL) { 408 primary_exitx(EX_TEMPFAIL, 409 "Unable to allocate %zu bytes of memory for gctl_data.", 410 MAXPHYS); 411 } 412 hio->hio_ggio.gctl_length = MAXPHYS; 413 hio->hio_ggio.gctl_error = 0; 414 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 415 } 416} 417 418static void 419init_local(struct hast_resource *res) 420{ 421 unsigned char *buf; 422 size_t mapsize; 423 424 if (metadata_read(res, true) < 0) 425 exit(EX_NOINPUT); 426 mtx_init(&res->hr_amp_lock); 427 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 428 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 429 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 430 } 431 mtx_init(&range_lock); 432 cv_init(&range_regular_cond); 433 if (rangelock_init(&range_regular) < 0) 434 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 435 cv_init(&range_sync_cond); 436 if (rangelock_init(&range_sync) < 0) 437 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 438 mapsize = activemap_ondisk_size(res->hr_amp); 439 buf = calloc(1, mapsize); 440 if (buf == NULL) { 441 primary_exitx(EX_TEMPFAIL, 442 "Unable to allocate buffer for activemap."); 443 } 444 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 445 (ssize_t)mapsize) { 446 primary_exit(EX_NOINPUT, "Unable to read activemap"); 447 } 448 activemap_copyin(res->hr_amp, buf, mapsize); 449 free(buf); 450 if (res->hr_resuid != 0) 451 return; 452 /* 453 * We're using provider for the first time, so we have to generate 454 * resource unique identifier and initialize local and remote counts. 455 */ 456 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 457 res->hr_primary_localcnt = 1; 458 res->hr_primary_remotecnt = 0; 459 if (metadata_write(res) < 0) 460 exit(EX_NOINPUT); 461} 462 463static bool 464init_remote(struct hast_resource *res, struct proto_conn **inp, 465 struct proto_conn **outp) 466{ 467 struct proto_conn *in, *out; 468 struct nv *nvout, *nvin; 469 const unsigned char *token; 470 unsigned char *map; 471 const char *errmsg; 472 int32_t extentsize; 473 int64_t datasize; 474 uint32_t mapsize; 475 size_t size; 476 477 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 478 assert(real_remote(res)); 479 480 in = out = NULL; 481 errmsg = NULL; 482 483 /* Prepare outgoing connection with remote node. */ 484 if (proto_client(res->hr_remoteaddr, &out) < 0) { 485 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 486 res->hr_remoteaddr); 487 } 488 /* Try to connect, but accept failure. */ 489 if (proto_connect(out) < 0) { 490 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 491 res->hr_remoteaddr); 492 goto close; 493 } 494 /* Error in setting timeout is not critical, but why should it fail? */ 495 if (proto_timeout(out, res->hr_timeout) < 0) 496 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 497 /* 498 * First handshake step. 499 * Setup outgoing connection with remote node. 500 */ 501 nvout = nv_alloc(); 502 nv_add_string(nvout, res->hr_name, "resource"); 503 if (nv_error(nvout) != 0) { 504 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 505 "Unable to allocate header for connection with %s", 506 res->hr_remoteaddr); 507 nv_free(nvout); 508 goto close; 509 } 510 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 511 pjdlog_errno(LOG_WARNING, 512 "Unable to send handshake header to %s", 513 res->hr_remoteaddr); 514 nv_free(nvout); 515 goto close; 516 } 517 nv_free(nvout); 518 if (hast_proto_recv_hdr(out, &nvin) < 0) { 519 pjdlog_errno(LOG_WARNING, 520 "Unable to receive handshake header from %s", 521 res->hr_remoteaddr); 522 goto close; 523 } 524 errmsg = nv_get_string(nvin, "errmsg"); 525 if (errmsg != NULL) { 526 pjdlog_warning("%s", errmsg); 527 nv_free(nvin); 528 goto close; 529 } 530 token = nv_get_uint8_array(nvin, &size, "token"); 531 if (token == NULL) { 532 pjdlog_warning("Handshake header from %s has no 'token' field.", 533 res->hr_remoteaddr); 534 nv_free(nvin); 535 goto close; 536 } 537 if (size != sizeof(res->hr_token)) { 538 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 539 res->hr_remoteaddr, size, sizeof(res->hr_token)); 540 nv_free(nvin); 541 goto close; 542 } 543 bcopy(token, res->hr_token, sizeof(res->hr_token)); 544 nv_free(nvin); 545 546 /* 547 * Second handshake step. 548 * Setup incoming connection with remote node. 549 */ 550 if (proto_client(res->hr_remoteaddr, &in) < 0) { 551 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 552 res->hr_remoteaddr); 553 } 554 /* Try to connect, but accept failure. */ 555 if (proto_connect(in) < 0) { 556 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 557 res->hr_remoteaddr); 558 goto close; 559 } 560 /* Error in setting timeout is not critical, but why should it fail? */ 561 if (proto_timeout(in, res->hr_timeout) < 0) 562 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 563 nvout = nv_alloc(); 564 nv_add_string(nvout, res->hr_name, "resource"); 565 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 566 "token"); 567 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 568 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 569 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 570 if (nv_error(nvout) != 0) { 571 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 572 "Unable to allocate header for connection with %s", 573 res->hr_remoteaddr); 574 nv_free(nvout); 575 goto close; 576 } 577 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 578 pjdlog_errno(LOG_WARNING, 579 "Unable to send handshake header to %s", 580 res->hr_remoteaddr); 581 nv_free(nvout); 582 goto close; 583 } 584 nv_free(nvout); 585 if (hast_proto_recv_hdr(out, &nvin) < 0) { 586 pjdlog_errno(LOG_WARNING, 587 "Unable to receive handshake header from %s", 588 res->hr_remoteaddr); 589 goto close; 590 } 591 errmsg = nv_get_string(nvin, "errmsg"); 592 if (errmsg != NULL) { 593 pjdlog_warning("%s", errmsg); 594 nv_free(nvin); 595 goto close; 596 } 597 datasize = nv_get_int64(nvin, "datasize"); 598 if (datasize != res->hr_datasize) { 599 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 600 (intmax_t)res->hr_datasize, (intmax_t)datasize); 601 nv_free(nvin); 602 goto close; 603 } 604 extentsize = nv_get_int32(nvin, "extentsize"); 605 if (extentsize != res->hr_extentsize) { 606 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 607 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 608 nv_free(nvin); 609 goto close; 610 } 611 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 612 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 613 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 614 map = NULL; 615 mapsize = nv_get_uint32(nvin, "mapsize"); 616 if (mapsize > 0) { 617 map = malloc(mapsize); 618 if (map == NULL) { 619 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 620 (uintmax_t)mapsize); 621 nv_free(nvin); 622 goto close; 623 } 624 /* 625 * Remote node have some dirty extents on its own, lets 626 * download its activemap. 627 */ 628 if (hast_proto_recv_data(res, out, nvin, map, 629 mapsize) < 0) { 630 pjdlog_errno(LOG_ERR, 631 "Unable to receive remote activemap"); 632 nv_free(nvin); 633 free(map); 634 goto close; 635 } 636 /* 637 * Merge local and remote bitmaps. 638 */ 639 activemap_merge(res->hr_amp, map, mapsize); 640 free(map); 641 /* 642 * Now that we merged bitmaps from both nodes, flush it to the 643 * disk before we start to synchronize. 644 */ 645 (void)hast_activemap_flush(res); 646 } 647 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 648 if (inp != NULL && outp != NULL) { 649 *inp = in; 650 *outp = out; 651 } else { 652 res->hr_remotein = in; 653 res->hr_remoteout = out; 654 } 655 event_send(res, EVENT_CONNECT); 656 return (true); 657close: 658 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 659 event_send(res, EVENT_SPLITBRAIN); 660 proto_close(out); 661 if (in != NULL) 662 proto_close(in); 663 return (false); 664} 665 666static void 667sync_start(void) 668{ 669 670 mtx_lock(&sync_lock); 671 sync_inprogress = true; 672 mtx_unlock(&sync_lock); 673 cv_signal(&sync_cond); 674} 675 676static void 677sync_stop(void) 678{ 679 680 mtx_lock(&sync_lock); 681 if (sync_inprogress) 682 sync_inprogress = false; 683 mtx_unlock(&sync_lock); 684} 685 686static void 687init_ggate(struct hast_resource *res) 688{ 689 struct g_gate_ctl_create ggiocreate; 690 struct g_gate_ctl_cancel ggiocancel; 691 692 /* 693 * We communicate with ggate via /dev/ggctl. Open it. 694 */ 695 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 696 if (res->hr_ggatefd < 0) 697 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 698 /* 699 * Create provider before trying to connect, as connection failure 700 * is not critical, but may take some time. 701 */ 702 ggiocreate.gctl_version = G_GATE_VERSION; 703 ggiocreate.gctl_mediasize = res->hr_datasize; 704 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 705 ggiocreate.gctl_flags = 0; 706 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 707 ggiocreate.gctl_timeout = 0; 708 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 709 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 710 res->hr_provname); 711 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 712 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 713 pjdlog_info("Device hast/%s created.", res->hr_provname); 714 res->hr_ggateunit = ggiocreate.gctl_unit; 715 return; 716 } 717 if (errno != EEXIST) { 718 primary_exit(EX_OSERR, "Unable to create hast/%s device", 719 res->hr_provname); 720 } 721 pjdlog_debug(1, 722 "Device hast/%s already exists, we will try to take it over.", 723 res->hr_provname); 724 /* 725 * If we received EEXIST, we assume that the process who created the 726 * provider died and didn't clean up. In that case we will start from 727 * where he left of. 728 */ 729 ggiocancel.gctl_version = G_GATE_VERSION; 730 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 731 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 732 res->hr_provname); 733 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 734 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 735 res->hr_ggateunit = ggiocancel.gctl_unit; 736 return; 737 } 738 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 739 res->hr_provname); 740} 741 742void 743hastd_primary(struct hast_resource *res) 744{ 745 pthread_t td; 746 pid_t pid; 747 int error; 748 749 /* 750 * Create communication channel between parent and child. 751 */ 752 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 753 KEEP_ERRNO((void)pidfile_remove(pfh)); 754 pjdlog_exit(EX_OSERR, 755 "Unable to create control sockets between parent and child"); 756 } 757 /* 758 * Create communication channel between child and parent. 759 */ 760 if (proto_client("socketpair://", &res->hr_event) < 0) { 761 KEEP_ERRNO((void)pidfile_remove(pfh)); 762 pjdlog_exit(EX_OSERR, 763 "Unable to create event sockets between child and parent"); 764 } 765 766 pid = fork(); 767 if (pid < 0) { 768 KEEP_ERRNO((void)pidfile_remove(pfh)); 769 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 770 } 771 772 if (pid > 0) { 773 /* This is parent. */ 774 /* Declare that we are receiver. */ 775 proto_recv(res->hr_event, NULL, 0); 776 res->hr_workerpid = pid; 777 return; 778 } 779 780 gres = res; 781 782 (void)pidfile_close(pfh); 783 hook_fini(); 784 785 setproctitle("%s (primary)", res->hr_name); 786 787 /* Declare that we are sender. */ 788 proto_send(res->hr_event, NULL, 0); 789 790 init_local(res); 791 init_ggate(res); 792 init_environment(res); 793 /* 794 * Create the guard thread first, so we can handle signals from the 795 * very begining. 796 */ 797 error = pthread_create(&td, NULL, guard_thread, res); 798 assert(error == 0); 799 /* 800 * Create the control thread before sending any event to the parent, 801 * as we can deadlock when parent sends control request to worker, 802 * but worker has no control thread started yet, so parent waits. 803 * In the meantime worker sends an event to the parent, but parent 804 * is unable to handle the event, because it waits for control 805 * request response. 806 */ 807 error = pthread_create(&td, NULL, ctrl_thread, res); 808 assert(error == 0); 809 if (real_remote(res) && init_remote(res, NULL, NULL)) 810 sync_start(); 811 error = pthread_create(&td, NULL, ggate_recv_thread, res); 812 assert(error == 0); 813 error = pthread_create(&td, NULL, local_send_thread, res); 814 assert(error == 0); 815 error = pthread_create(&td, NULL, remote_send_thread, res); 816 assert(error == 0); 817 error = pthread_create(&td, NULL, remote_recv_thread, res); 818 assert(error == 0); 819 error = pthread_create(&td, NULL, ggate_send_thread, res); 820 assert(error == 0); 821 (void)sync_thread(res); 822} 823 824static void 825reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 826{ 827 char msg[1024]; 828 va_list ap; 829 int len; 830 831 va_start(ap, fmt); 832 len = vsnprintf(msg, sizeof(msg), fmt, ap); 833 va_end(ap); 834 if ((size_t)len < sizeof(msg)) { 835 switch (ggio->gctl_cmd) { 836 case BIO_READ: 837 (void)snprintf(msg + len, sizeof(msg) - len, 838 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 839 (uintmax_t)ggio->gctl_length); 840 break; 841 case BIO_DELETE: 842 (void)snprintf(msg + len, sizeof(msg) - len, 843 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 844 (uintmax_t)ggio->gctl_length); 845 break; 846 case BIO_FLUSH: 847 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 848 break; 849 case BIO_WRITE: 850 (void)snprintf(msg + len, sizeof(msg) - len, 851 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 852 (uintmax_t)ggio->gctl_length); 853 break; 854 default: 855 (void)snprintf(msg + len, sizeof(msg) - len, 856 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 857 break; 858 } 859 } 860 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 861} 862 863static void 864remote_close(struct hast_resource *res, int ncomp) 865{ 866 867 rw_wlock(&hio_remote_lock[ncomp]); 868 /* 869 * A race is possible between dropping rlock and acquiring wlock - 870 * another thread can close connection in-between. 871 */ 872 if (!ISCONNECTED(res, ncomp)) { 873 assert(res->hr_remotein == NULL); 874 assert(res->hr_remoteout == NULL); 875 rw_unlock(&hio_remote_lock[ncomp]); 876 return; 877 } 878 879 assert(res->hr_remotein != NULL); 880 assert(res->hr_remoteout != NULL); 881 882 pjdlog_debug(2, "Closing incoming connection to %s.", 883 res->hr_remoteaddr); 884 proto_close(res->hr_remotein); 885 res->hr_remotein = NULL; 886 pjdlog_debug(2, "Closing outgoing connection to %s.", 887 res->hr_remoteaddr); 888 proto_close(res->hr_remoteout); 889 res->hr_remoteout = NULL; 890 891 rw_unlock(&hio_remote_lock[ncomp]); 892 893 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 894 895 /* 896 * Stop synchronization if in-progress. 897 */ 898 sync_stop(); 899 900 event_send(res, EVENT_DISCONNECT); 901} 902 903/* 904 * Thread receives ggate I/O requests from the kernel and passes them to 905 * appropriate threads: 906 * WRITE - always goes to both local_send and remote_send threads 907 * READ (when the block is up-to-date on local component) - 908 * only local_send thread 909 * READ (when the block isn't up-to-date on local component) - 910 * only remote_send thread 911 * DELETE - always goes to both local_send and remote_send threads 912 * FLUSH - always goes to both local_send and remote_send threads 913 */ 914static void * 915ggate_recv_thread(void *arg) 916{ 917 struct hast_resource *res = arg; 918 struct g_gate_ctl_io *ggio; 919 struct hio *hio; 920 unsigned int ii, ncomp, ncomps; 921 int error; 922 923 ncomps = HAST_NCOMPONENTS; 924 925 for (;;) { 926 pjdlog_debug(2, "ggate_recv: Taking free request."); 927 QUEUE_TAKE2(hio, free); 928 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 929 ggio = &hio->hio_ggio; 930 ggio->gctl_unit = res->hr_ggateunit; 931 ggio->gctl_length = MAXPHYS; 932 ggio->gctl_error = 0; 933 pjdlog_debug(2, 934 "ggate_recv: (%p) Waiting for request from the kernel.", 935 hio); 936 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 937 if (sigexit_received) 938 pthread_exit(NULL); 939 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 940 } 941 error = ggio->gctl_error; 942 switch (error) { 943 case 0: 944 break; 945 case ECANCELED: 946 /* Exit gracefully. */ 947 if (!sigexit_received) { 948 pjdlog_debug(2, 949 "ggate_recv: (%p) Received cancel from the kernel.", 950 hio); 951 pjdlog_info("Received cancel from the kernel, exiting."); 952 } 953 pthread_exit(NULL); 954 case ENOMEM: 955 /* 956 * Buffer too small? Impossible, we allocate MAXPHYS 957 * bytes - request can't be bigger than that. 958 */ 959 /* FALLTHROUGH */ 960 case ENXIO: 961 default: 962 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 963 strerror(error)); 964 } 965 for (ii = 0; ii < ncomps; ii++) 966 hio->hio_errors[ii] = EINVAL; 967 reqlog(LOG_DEBUG, 2, ggio, 968 "ggate_recv: (%p) Request received from the kernel: ", 969 hio); 970 /* 971 * Inform all components about new write request. 972 * For read request prefer local component unless the given 973 * range is out-of-date, then use remote component. 974 */ 975 switch (ggio->gctl_cmd) { 976 case BIO_READ: 977 pjdlog_debug(2, 978 "ggate_recv: (%p) Moving request to the send queue.", 979 hio); 980 refcount_init(&hio->hio_countdown, 1); 981 mtx_lock(&metadata_lock); 982 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 983 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 984 /* 985 * This range is up-to-date on local component, 986 * so handle request locally. 987 */ 988 /* Local component is 0 for now. */ 989 ncomp = 0; 990 } else /* if (res->hr_syncsrc == 991 HAST_SYNCSRC_SECONDARY) */ { 992 assert(res->hr_syncsrc == 993 HAST_SYNCSRC_SECONDARY); 994 /* 995 * This range is out-of-date on local component, 996 * so send request to the remote node. 997 */ 998 /* Remote component is 1 for now. */ 999 ncomp = 1; 1000 } 1001 mtx_unlock(&metadata_lock); 1002 QUEUE_INSERT1(hio, send, ncomp); 1003 break; 1004 case BIO_WRITE: 1005 for (;;) { 1006 mtx_lock(&range_lock); 1007 if (rangelock_islocked(range_sync, 1008 ggio->gctl_offset, ggio->gctl_length)) { 1009 pjdlog_debug(2, 1010 "regular: Range offset=%jd length=%zu locked.", 1011 (intmax_t)ggio->gctl_offset, 1012 (size_t)ggio->gctl_length); 1013 range_regular_wait = true; 1014 cv_wait(&range_regular_cond, &range_lock); 1015 range_regular_wait = false; 1016 mtx_unlock(&range_lock); 1017 continue; 1018 } 1019 if (rangelock_add(range_regular, 1020 ggio->gctl_offset, ggio->gctl_length) < 0) { 1021 mtx_unlock(&range_lock); 1022 pjdlog_debug(2, 1023 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1024 (intmax_t)ggio->gctl_offset, 1025 (size_t)ggio->gctl_length); 1026 sleep(1); 1027 continue; 1028 } 1029 mtx_unlock(&range_lock); 1030 break; 1031 } 1032 mtx_lock(&res->hr_amp_lock); 1033 if (activemap_write_start(res->hr_amp, 1034 ggio->gctl_offset, ggio->gctl_length)) { 1035 (void)hast_activemap_flush(res); 1036 } 1037 mtx_unlock(&res->hr_amp_lock); 1038 /* FALLTHROUGH */ 1039 case BIO_DELETE: 1040 case BIO_FLUSH: 1041 pjdlog_debug(2, 1042 "ggate_recv: (%p) Moving request to the send queues.", 1043 hio); 1044 refcount_init(&hio->hio_countdown, ncomps); 1045 for (ii = 0; ii < ncomps; ii++) 1046 QUEUE_INSERT1(hio, send, ii); 1047 break; 1048 } 1049 } 1050 /* NOTREACHED */ 1051 return (NULL); 1052} 1053 1054/* 1055 * Thread reads from or writes to local component. 1056 * If local read fails, it redirects it to remote_send thread. 1057 */ 1058static void * 1059local_send_thread(void *arg) 1060{ 1061 struct hast_resource *res = arg; 1062 struct g_gate_ctl_io *ggio; 1063 struct hio *hio; 1064 unsigned int ncomp, rncomp; 1065 ssize_t ret; 1066 1067 /* Local component is 0 for now. */ 1068 ncomp = 0; 1069 /* Remote component is 1 for now. */ 1070 rncomp = 1; 1071 1072 for (;;) { 1073 pjdlog_debug(2, "local_send: Taking request."); 1074 QUEUE_TAKE1(hio, send, ncomp); 1075 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1076 ggio = &hio->hio_ggio; 1077 switch (ggio->gctl_cmd) { 1078 case BIO_READ: 1079 ret = pread(res->hr_localfd, ggio->gctl_data, 1080 ggio->gctl_length, 1081 ggio->gctl_offset + res->hr_localoff); 1082 if (ret == ggio->gctl_length) 1083 hio->hio_errors[ncomp] = 0; 1084 else { 1085 /* 1086 * If READ failed, try to read from remote node. 1087 */ 1088 QUEUE_INSERT1(hio, send, rncomp); 1089 continue; 1090 } 1091 break; 1092 case BIO_WRITE: 1093 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1094 ggio->gctl_length, 1095 ggio->gctl_offset + res->hr_localoff); 1096 if (ret < 0) 1097 hio->hio_errors[ncomp] = errno; 1098 else if (ret != ggio->gctl_length) 1099 hio->hio_errors[ncomp] = EIO; 1100 else 1101 hio->hio_errors[ncomp] = 0; 1102 break; 1103 case BIO_DELETE: 1104 ret = g_delete(res->hr_localfd, 1105 ggio->gctl_offset + res->hr_localoff, 1106 ggio->gctl_length); 1107 if (ret < 0) 1108 hio->hio_errors[ncomp] = errno; 1109 else 1110 hio->hio_errors[ncomp] = 0; 1111 break; 1112 case BIO_FLUSH: 1113 ret = g_flush(res->hr_localfd); 1114 if (ret < 0) 1115 hio->hio_errors[ncomp] = errno; 1116 else 1117 hio->hio_errors[ncomp] = 0; 1118 break; 1119 } 1120 if (refcount_release(&hio->hio_countdown)) { 1121 if (ISSYNCREQ(hio)) { 1122 mtx_lock(&sync_lock); 1123 SYNCREQDONE(hio); 1124 mtx_unlock(&sync_lock); 1125 cv_signal(&sync_cond); 1126 } else { 1127 pjdlog_debug(2, 1128 "local_send: (%p) Moving request to the done queue.", 1129 hio); 1130 QUEUE_INSERT2(hio, done); 1131 } 1132 } 1133 } 1134 /* NOTREACHED */ 1135 return (NULL); 1136} 1137 1138/* 1139 * Thread sends request to secondary node. 1140 */ 1141static void * 1142remote_send_thread(void *arg) 1143{ 1144 struct hast_resource *res = arg; 1145 struct g_gate_ctl_io *ggio; 1146 struct hio *hio; 1147 struct nv *nv; 1148 unsigned int ncomp; 1149 bool wakeup; 1150 uint64_t offset, length; 1151 uint8_t cmd; 1152 void *data; 1153 1154 /* Remote component is 1 for now. */ 1155 ncomp = 1; 1156 1157 for (;;) { 1158 pjdlog_debug(2, "remote_send: Taking request."); 1159 QUEUE_TAKE1(hio, send, ncomp); 1160 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1161 ggio = &hio->hio_ggio; 1162 switch (ggio->gctl_cmd) { 1163 case BIO_READ: 1164 cmd = HIO_READ; 1165 data = NULL; 1166 offset = ggio->gctl_offset; 1167 length = ggio->gctl_length; 1168 break; 1169 case BIO_WRITE: 1170 cmd = HIO_WRITE; 1171 data = ggio->gctl_data; 1172 offset = ggio->gctl_offset; 1173 length = ggio->gctl_length; 1174 break; 1175 case BIO_DELETE: 1176 cmd = HIO_DELETE; 1177 data = NULL; 1178 offset = ggio->gctl_offset; 1179 length = ggio->gctl_length; 1180 break; 1181 case BIO_FLUSH: 1182 cmd = HIO_FLUSH; 1183 data = NULL; 1184 offset = 0; 1185 length = 0; 1186 break; 1187 default: 1188 assert(!"invalid condition"); 1189 abort(); 1190 } 1191 nv = nv_alloc(); 1192 nv_add_uint8(nv, cmd, "cmd"); 1193 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1194 nv_add_uint64(nv, offset, "offset"); 1195 nv_add_uint64(nv, length, "length"); 1196 if (nv_error(nv) != 0) { 1197 hio->hio_errors[ncomp] = nv_error(nv); 1198 pjdlog_debug(2, 1199 "remote_send: (%p) Unable to prepare header to send.", 1200 hio); 1201 reqlog(LOG_ERR, 0, ggio, 1202 "Unable to prepare header to send (%s): ", 1203 strerror(nv_error(nv))); 1204 /* Move failed request immediately to the done queue. */ 1205 goto done_queue; 1206 } 1207 pjdlog_debug(2, 1208 "remote_send: (%p) Moving request to the recv queue.", 1209 hio); 1210 /* 1211 * Protect connection from disappearing. 1212 */ 1213 rw_rlock(&hio_remote_lock[ncomp]); 1214 if (!ISCONNECTED(res, ncomp)) { 1215 rw_unlock(&hio_remote_lock[ncomp]); 1216 hio->hio_errors[ncomp] = ENOTCONN; 1217 goto done_queue; 1218 } 1219 /* 1220 * Move the request to recv queue before sending it, because 1221 * in different order we can get reply before we move request 1222 * to recv queue. 1223 */ 1224 mtx_lock(&hio_recv_list_lock[ncomp]); 1225 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1226 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1227 mtx_unlock(&hio_recv_list_lock[ncomp]); 1228 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1229 data != NULL ? length : 0) < 0) { 1230 hio->hio_errors[ncomp] = errno; 1231 rw_unlock(&hio_remote_lock[ncomp]); 1232 pjdlog_debug(2, 1233 "remote_send: (%p) Unable to send request.", hio); 1234 reqlog(LOG_ERR, 0, ggio, 1235 "Unable to send request (%s): ", 1236 strerror(hio->hio_errors[ncomp])); 1237 remote_close(res, ncomp); 1238 /* 1239 * Take request back from the receive queue and move 1240 * it immediately to the done queue. 1241 */ 1242 mtx_lock(&hio_recv_list_lock[ncomp]); 1243 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1244 mtx_unlock(&hio_recv_list_lock[ncomp]); 1245 goto done_queue; 1246 } 1247 rw_unlock(&hio_remote_lock[ncomp]); 1248 nv_free(nv); 1249 if (wakeup) 1250 cv_signal(&hio_recv_list_cond[ncomp]); 1251 continue; 1252done_queue: 1253 nv_free(nv); 1254 if (ISSYNCREQ(hio)) { 1255 if (!refcount_release(&hio->hio_countdown)) 1256 continue; 1257 mtx_lock(&sync_lock); 1258 SYNCREQDONE(hio); 1259 mtx_unlock(&sync_lock); 1260 cv_signal(&sync_cond); 1261 continue; 1262 } 1263 if (ggio->gctl_cmd == BIO_WRITE) { 1264 mtx_lock(&res->hr_amp_lock); 1265 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1266 ggio->gctl_length)) { 1267 (void)hast_activemap_flush(res); 1268 } 1269 mtx_unlock(&res->hr_amp_lock); 1270 } 1271 if (!refcount_release(&hio->hio_countdown)) 1272 continue; 1273 pjdlog_debug(2, 1274 "remote_send: (%p) Moving request to the done queue.", 1275 hio); 1276 QUEUE_INSERT2(hio, done); 1277 } 1278 /* NOTREACHED */ 1279 return (NULL); 1280} 1281 1282/* 1283 * Thread receives answer from secondary node and passes it to ggate_send 1284 * thread. 1285 */ 1286static void * 1287remote_recv_thread(void *arg) 1288{ 1289 struct hast_resource *res = arg; 1290 struct g_gate_ctl_io *ggio; 1291 struct hio *hio; 1292 struct nv *nv; 1293 unsigned int ncomp; 1294 uint64_t seq; 1295 int error; 1296 1297 /* Remote component is 1 for now. */ 1298 ncomp = 1; 1299 1300 for (;;) { 1301 /* Wait until there is anything to receive. */ 1302 mtx_lock(&hio_recv_list_lock[ncomp]); 1303 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1304 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1305 cv_wait(&hio_recv_list_cond[ncomp], 1306 &hio_recv_list_lock[ncomp]); 1307 } 1308 mtx_unlock(&hio_recv_list_lock[ncomp]); 1309 rw_rlock(&hio_remote_lock[ncomp]); 1310 if (!ISCONNECTED(res, ncomp)) { 1311 rw_unlock(&hio_remote_lock[ncomp]); 1312 /* 1313 * Connection is dead, so move all pending requests to 1314 * the done queue (one-by-one). 1315 */ 1316 mtx_lock(&hio_recv_list_lock[ncomp]); 1317 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1318 assert(hio != NULL); 1319 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1320 hio_next[ncomp]); 1321 mtx_unlock(&hio_recv_list_lock[ncomp]); 1322 goto done_queue; 1323 } 1324 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1325 pjdlog_errno(LOG_ERR, 1326 "Unable to receive reply header"); 1327 rw_unlock(&hio_remote_lock[ncomp]); 1328 remote_close(res, ncomp); 1329 continue; 1330 } 1331 rw_unlock(&hio_remote_lock[ncomp]); 1332 seq = nv_get_uint64(nv, "seq"); 1333 if (seq == 0) { 1334 pjdlog_error("Header contains no 'seq' field."); 1335 nv_free(nv); 1336 continue; 1337 } 1338 mtx_lock(&hio_recv_list_lock[ncomp]); 1339 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1340 if (hio->hio_ggio.gctl_seq == seq) { 1341 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1342 hio_next[ncomp]); 1343 break; 1344 } 1345 } 1346 mtx_unlock(&hio_recv_list_lock[ncomp]); 1347 if (hio == NULL) { 1348 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1349 (uintmax_t)seq); 1350 nv_free(nv); 1351 continue; 1352 } 1353 error = nv_get_int16(nv, "error"); 1354 if (error != 0) { 1355 /* Request failed on remote side. */ 1356 hio->hio_errors[ncomp] = 0; 1357 nv_free(nv); 1358 goto done_queue; 1359 } 1360 ggio = &hio->hio_ggio; 1361 switch (ggio->gctl_cmd) { 1362 case BIO_READ: 1363 rw_rlock(&hio_remote_lock[ncomp]); 1364 if (!ISCONNECTED(res, ncomp)) { 1365 rw_unlock(&hio_remote_lock[ncomp]); 1366 nv_free(nv); 1367 goto done_queue; 1368 } 1369 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1370 ggio->gctl_data, ggio->gctl_length) < 0) { 1371 hio->hio_errors[ncomp] = errno; 1372 pjdlog_errno(LOG_ERR, 1373 "Unable to receive reply data"); 1374 rw_unlock(&hio_remote_lock[ncomp]); 1375 nv_free(nv); 1376 remote_close(res, ncomp); 1377 goto done_queue; 1378 } 1379 rw_unlock(&hio_remote_lock[ncomp]); 1380 break; 1381 case BIO_WRITE: 1382 case BIO_DELETE: 1383 case BIO_FLUSH: 1384 break; 1385 default: 1386 assert(!"invalid condition"); 1387 abort(); 1388 } 1389 hio->hio_errors[ncomp] = 0; 1390 nv_free(nv); 1391done_queue: 1392 if (refcount_release(&hio->hio_countdown)) { 1393 if (ISSYNCREQ(hio)) { 1394 mtx_lock(&sync_lock); 1395 SYNCREQDONE(hio); 1396 mtx_unlock(&sync_lock); 1397 cv_signal(&sync_cond); 1398 } else { 1399 pjdlog_debug(2, 1400 "remote_recv: (%p) Moving request to the done queue.", 1401 hio); 1402 QUEUE_INSERT2(hio, done); 1403 } 1404 } 1405 } 1406 /* NOTREACHED */ 1407 return (NULL); 1408} 1409 1410/* 1411 * Thread sends answer to the kernel. 1412 */ 1413static void * 1414ggate_send_thread(void *arg) 1415{ 1416 struct hast_resource *res = arg; 1417 struct g_gate_ctl_io *ggio; 1418 struct hio *hio; 1419 unsigned int ii, ncomp, ncomps; 1420 1421 ncomps = HAST_NCOMPONENTS; 1422 1423 for (;;) { 1424 pjdlog_debug(2, "ggate_send: Taking request."); 1425 QUEUE_TAKE2(hio, done); 1426 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1427 ggio = &hio->hio_ggio; 1428 for (ii = 0; ii < ncomps; ii++) { 1429 if (hio->hio_errors[ii] == 0) { 1430 /* 1431 * One successful request is enough to declare 1432 * success. 1433 */ 1434 ggio->gctl_error = 0; 1435 break; 1436 } 1437 } 1438 if (ii == ncomps) { 1439 /* 1440 * None of the requests were successful. 1441 * Use first error. 1442 */ 1443 ggio->gctl_error = hio->hio_errors[0]; 1444 } 1445 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1446 mtx_lock(&res->hr_amp_lock); 1447 activemap_write_complete(res->hr_amp, 1448 ggio->gctl_offset, ggio->gctl_length); 1449 mtx_unlock(&res->hr_amp_lock); 1450 } 1451 if (ggio->gctl_cmd == BIO_WRITE) { 1452 /* 1453 * Unlock range we locked. 1454 */ 1455 mtx_lock(&range_lock); 1456 rangelock_del(range_regular, ggio->gctl_offset, 1457 ggio->gctl_length); 1458 if (range_sync_wait) 1459 cv_signal(&range_sync_cond); 1460 mtx_unlock(&range_lock); 1461 /* 1462 * Bump local count if this is first write after 1463 * connection failure with remote node. 1464 */ 1465 ncomp = 1; 1466 rw_rlock(&hio_remote_lock[ncomp]); 1467 if (!ISCONNECTED(res, ncomp)) { 1468 mtx_lock(&metadata_lock); 1469 if (res->hr_primary_localcnt == 1470 res->hr_secondary_remotecnt) { 1471 res->hr_primary_localcnt++; 1472 pjdlog_debug(1, 1473 "Increasing localcnt to %ju.", 1474 (uintmax_t)res->hr_primary_localcnt); 1475 (void)metadata_write(res); 1476 } 1477 mtx_unlock(&metadata_lock); 1478 } 1479 rw_unlock(&hio_remote_lock[ncomp]); 1480 } 1481 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1482 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1483 pjdlog_debug(2, 1484 "ggate_send: (%p) Moving request to the free queue.", hio); 1485 QUEUE_INSERT2(hio, free); 1486 } 1487 /* NOTREACHED */ 1488 return (NULL); 1489} 1490 1491/* 1492 * Thread synchronize local and remote components. 1493 */ 1494static void * 1495sync_thread(void *arg __unused) 1496{ 1497 struct hast_resource *res = arg; 1498 struct hio *hio; 1499 struct g_gate_ctl_io *ggio; 1500 unsigned int ii, ncomp, ncomps; 1501 off_t offset, length, synced; 1502 bool dorewind; 1503 int syncext; 1504 1505 ncomps = HAST_NCOMPONENTS; 1506 dorewind = true; 1507 synced = 0; 1508 offset = -1; 1509 1510 for (;;) { 1511 mtx_lock(&sync_lock); 1512 if (offset >= 0 && !sync_inprogress) { 1513 pjdlog_info("Synchronization interrupted. " 1514 "%jd bytes synchronized so far.", 1515 (intmax_t)synced); 1516 event_send(res, EVENT_SYNCINTR); 1517 } 1518 while (!sync_inprogress) { 1519 dorewind = true; 1520 synced = 0; 1521 cv_wait(&sync_cond, &sync_lock); 1522 } 1523 mtx_unlock(&sync_lock); 1524 /* 1525 * Obtain offset at which we should synchronize. 1526 * Rewind synchronization if needed. 1527 */ 1528 mtx_lock(&res->hr_amp_lock); 1529 if (dorewind) 1530 activemap_sync_rewind(res->hr_amp); 1531 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1532 if (syncext != -1) { 1533 /* 1534 * We synchronized entire syncext extent, we can mark 1535 * it as clean now. 1536 */ 1537 if (activemap_extent_complete(res->hr_amp, syncext)) 1538 (void)hast_activemap_flush(res); 1539 } 1540 mtx_unlock(&res->hr_amp_lock); 1541 if (dorewind) { 1542 dorewind = false; 1543 if (offset < 0) 1544 pjdlog_info("Nodes are in sync."); 1545 else { 1546 pjdlog_info("Synchronization started. %ju bytes to go.", 1547 (uintmax_t)(res->hr_extentsize * 1548 activemap_ndirty(res->hr_amp))); 1549 event_send(res, EVENT_SYNCSTART); 1550 } 1551 } 1552 if (offset < 0) { 1553 sync_stop(); 1554 pjdlog_debug(1, "Nothing to synchronize."); 1555 /* 1556 * Synchronization complete, make both localcnt and 1557 * remotecnt equal. 1558 */ 1559 ncomp = 1; 1560 rw_rlock(&hio_remote_lock[ncomp]); 1561 if (ISCONNECTED(res, ncomp)) { 1562 if (synced > 0) { 1563 pjdlog_info("Synchronization complete. " 1564 "%jd bytes synchronized.", 1565 (intmax_t)synced); 1566 event_send(res, EVENT_SYNCDONE); 1567 } 1568 mtx_lock(&metadata_lock); 1569 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1570 res->hr_primary_localcnt = 1571 res->hr_secondary_localcnt; 1572 res->hr_primary_remotecnt = 1573 res->hr_secondary_remotecnt; 1574 pjdlog_debug(1, 1575 "Setting localcnt to %ju and remotecnt to %ju.", 1576 (uintmax_t)res->hr_primary_localcnt, 1577 (uintmax_t)res->hr_secondary_localcnt); 1578 (void)metadata_write(res); 1579 mtx_unlock(&metadata_lock); 1580 } 1581 rw_unlock(&hio_remote_lock[ncomp]); 1582 continue; 1583 } 1584 pjdlog_debug(2, "sync: Taking free request."); 1585 QUEUE_TAKE2(hio, free); 1586 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1587 /* 1588 * Lock the range we are going to synchronize. We don't want 1589 * race where someone writes between our read and write. 1590 */ 1591 for (;;) { 1592 mtx_lock(&range_lock); 1593 if (rangelock_islocked(range_regular, offset, length)) { 1594 pjdlog_debug(2, 1595 "sync: Range offset=%jd length=%jd locked.", 1596 (intmax_t)offset, (intmax_t)length); 1597 range_sync_wait = true; 1598 cv_wait(&range_sync_cond, &range_lock); 1599 range_sync_wait = false; 1600 mtx_unlock(&range_lock); 1601 continue; 1602 } 1603 if (rangelock_add(range_sync, offset, length) < 0) { 1604 mtx_unlock(&range_lock); 1605 pjdlog_debug(2, 1606 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1607 (intmax_t)offset, (intmax_t)length); 1608 sleep(1); 1609 continue; 1610 } 1611 mtx_unlock(&range_lock); 1612 break; 1613 } 1614 /* 1615 * First read the data from synchronization source. 1616 */ 1617 SYNCREQ(hio); 1618 ggio = &hio->hio_ggio; 1619 ggio->gctl_cmd = BIO_READ; 1620 ggio->gctl_offset = offset; 1621 ggio->gctl_length = length; 1622 ggio->gctl_error = 0; 1623 for (ii = 0; ii < ncomps; ii++) 1624 hio->hio_errors[ii] = EINVAL; 1625 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1626 hio); 1627 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1628 hio); 1629 mtx_lock(&metadata_lock); 1630 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1631 /* 1632 * This range is up-to-date on local component, 1633 * so handle request locally. 1634 */ 1635 /* Local component is 0 for now. */ 1636 ncomp = 0; 1637 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1638 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1639 /* 1640 * This range is out-of-date on local component, 1641 * so send request to the remote node. 1642 */ 1643 /* Remote component is 1 for now. */ 1644 ncomp = 1; 1645 } 1646 mtx_unlock(&metadata_lock); 1647 refcount_init(&hio->hio_countdown, 1); 1648 QUEUE_INSERT1(hio, send, ncomp); 1649 1650 /* 1651 * Let's wait for READ to finish. 1652 */ 1653 mtx_lock(&sync_lock); 1654 while (!ISSYNCREQDONE(hio)) 1655 cv_wait(&sync_cond, &sync_lock); 1656 mtx_unlock(&sync_lock); 1657 1658 if (hio->hio_errors[ncomp] != 0) { 1659 pjdlog_error("Unable to read synchronization data: %s.", 1660 strerror(hio->hio_errors[ncomp])); 1661 goto free_queue; 1662 } 1663 1664 /* 1665 * We read the data from synchronization source, now write it 1666 * to synchronization target. 1667 */ 1668 SYNCREQ(hio); 1669 ggio->gctl_cmd = BIO_WRITE; 1670 for (ii = 0; ii < ncomps; ii++) 1671 hio->hio_errors[ii] = EINVAL; 1672 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1673 hio); 1674 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1675 hio); 1676 mtx_lock(&metadata_lock); 1677 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1678 /* 1679 * This range is up-to-date on local component, 1680 * so we update remote component. 1681 */ 1682 /* Remote component is 1 for now. */ 1683 ncomp = 1; 1684 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1685 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1686 /* 1687 * This range is out-of-date on local component, 1688 * so we update it. 1689 */ 1690 /* Local component is 0 for now. */ 1691 ncomp = 0; 1692 } 1693 mtx_unlock(&metadata_lock); 1694 1695 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1696 hio); 1697 refcount_init(&hio->hio_countdown, 1); 1698 QUEUE_INSERT1(hio, send, ncomp); 1699 1700 /* 1701 * Let's wait for WRITE to finish. 1702 */ 1703 mtx_lock(&sync_lock); 1704 while (!ISSYNCREQDONE(hio)) 1705 cv_wait(&sync_cond, &sync_lock); 1706 mtx_unlock(&sync_lock); 1707 1708 if (hio->hio_errors[ncomp] != 0) { 1709 pjdlog_error("Unable to write synchronization data: %s.", 1710 strerror(hio->hio_errors[ncomp])); 1711 goto free_queue; 1712 } 1713 1714 synced += length; 1715free_queue: 1716 mtx_lock(&range_lock); 1717 rangelock_del(range_sync, offset, length); 1718 if (range_regular_wait) 1719 cv_signal(&range_regular_cond); 1720 mtx_unlock(&range_lock); 1721 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1722 hio); 1723 QUEUE_INSERT2(hio, free); 1724 } 1725 /* NOTREACHED */ 1726 return (NULL); 1727} 1728 1729static void 1730config_reload(void) 1731{ 1732 struct hastd_config *newcfg; 1733 struct hast_resource *res; 1734 unsigned int ii, ncomps; 1735 int modified; 1736 1737 pjdlog_info("Reloading configuration..."); 1738 1739 ncomps = HAST_NCOMPONENTS; 1740 1741 newcfg = yy_config_parse(cfgpath, false); 1742 if (newcfg == NULL) 1743 goto failed; 1744 1745 TAILQ_FOREACH(res, &newcfg->hc_resources, hr_next) { 1746 if (strcmp(res->hr_name, gres->hr_name) == 0) 1747 break; 1748 } 1749 /* 1750 * If resource was removed from the configuration file, resource 1751 * name, provider name or path to local component was modified we 1752 * shouldn't be here. This means that someone modified configuration 1753 * file and send SIGHUP to us instead of main hastd process. 1754 * Log advice and ignore the signal. 1755 */ 1756 if (res == NULL || strcmp(gres->hr_name, res->hr_name) != 0 || 1757 strcmp(gres->hr_provname, res->hr_provname) != 0 || 1758 strcmp(gres->hr_localpath, res->hr_localpath) != 0) { 1759 pjdlog_warning("To reload configuration send SIGHUP to the main hastd process (pid %u).", 1760 (unsigned int)getppid()); 1761 goto failed; 1762 } 1763 1764#define MODIFIED_REMOTEADDR 0x1 1765#define MODIFIED_REPLICATION 0x2 1766#define MODIFIED_TIMEOUT 0x4 1767#define MODIFIED_EXEC 0x8 1768 modified = 0; 1769 if (strcmp(gres->hr_remoteaddr, res->hr_remoteaddr) != 0) { 1770 /* 1771 * Don't copy res->hr_remoteaddr to gres just yet. 1772 * We want remote_close() to log disconnect from the old 1773 * addresses, not from the new ones. 1774 */ 1775 modified |= MODIFIED_REMOTEADDR; 1776 } 1777 if (gres->hr_replication != res->hr_replication) { 1778 gres->hr_replication = res->hr_replication; 1779 modified |= MODIFIED_REPLICATION; 1780 } 1781 if (gres->hr_timeout != res->hr_timeout) { 1782 gres->hr_timeout = res->hr_timeout; 1783 modified |= MODIFIED_TIMEOUT; 1784 } 1785 if (strcmp(gres->hr_exec, res->hr_exec) != 0) { 1786 strlcpy(gres->hr_exec, res->hr_exec, sizeof(gres->hr_exec)); 1787 modified |= MODIFIED_EXEC; 1788 } 1789 /* 1790 * If only timeout was modified we only need to change it without 1791 * reconnecting. 1792 */ 1793 if (modified == MODIFIED_TIMEOUT) { 1794 for (ii = 0; ii < ncomps; ii++) { 1795 if (!ISREMOTE(ii)) 1796 continue; 1797 rw_rlock(&hio_remote_lock[ii]); 1798 if (!ISCONNECTED(gres, ii)) { 1799 rw_unlock(&hio_remote_lock[ii]); 1800 continue; 1801 } 1802 rw_unlock(&hio_remote_lock[ii]); 1803 if (proto_timeout(gres->hr_remotein, 1804 gres->hr_timeout) < 0) { 1805 pjdlog_errno(LOG_WARNING, 1806 "Unable to set connection timeout"); 1807 } 1808 if (proto_timeout(gres->hr_remoteout, 1809 gres->hr_timeout) < 0) { 1810 pjdlog_errno(LOG_WARNING, 1811 "Unable to set connection timeout"); 1812 } 1813 } 1814 } else if ((modified & 1815 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 1816 for (ii = 0; ii < ncomps; ii++) { 1817 if (!ISREMOTE(ii)) 1818 continue; 1819 remote_close(gres, ii); 1820 } 1821 if (modified & MODIFIED_REMOTEADDR) { 1822 strlcpy(gres->hr_remoteaddr, res->hr_remoteaddr, 1823 sizeof(gres->hr_remoteaddr)); 1824 } 1825 } 1826#undef MODIFIED_REMOTEADDR 1827#undef MODIFIED_REPLICATION 1828#undef MODIFIED_TIMEOUT 1829#undef MODIFIED_EXEC 1830 1831 pjdlog_info("Configuration reloaded successfully."); 1832 return; 1833failed: 1834 if (newcfg != NULL) { 1835 if (newcfg->hc_controlconn != NULL) 1836 proto_close(newcfg->hc_controlconn); 1837 if (newcfg->hc_listenconn != NULL) 1838 proto_close(newcfg->hc_listenconn); 1839 yy_config_free(newcfg); 1840 } 1841 pjdlog_warning("Configuration not reloaded."); 1842} 1843 1844static void 1845keepalive_send(struct hast_resource *res, unsigned int ncomp) 1846{ 1847 struct nv *nv; 1848 1849 nv = nv_alloc(); 1850 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1851 if (nv_error(nv) != 0) { 1852 nv_free(nv); 1853 pjdlog_debug(1, 1854 "keepalive_send: Unable to prepare header to send."); 1855 return; 1856 } 1857 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1858 pjdlog_common(LOG_DEBUG, 1, errno, 1859 "keepalive_send: Unable to send request"); 1860 nv_free(nv); 1861 rw_unlock(&hio_remote_lock[ncomp]); 1862 remote_close(res, ncomp); 1863 rw_rlock(&hio_remote_lock[ncomp]); 1864 return; 1865 } 1866 nv_free(nv); 1867 pjdlog_debug(2, "keepalive_send: Request sent."); 1868} 1869 1870static void 1871guard_one(struct hast_resource *res, unsigned int ncomp) 1872{ 1873 struct proto_conn *in, *out; 1874 1875 if (!ISREMOTE(ncomp)) 1876 return; 1877 1878 rw_rlock(&hio_remote_lock[ncomp]); 1879 1880 if (!real_remote(res)) { 1881 rw_unlock(&hio_remote_lock[ncomp]); 1882 return; 1883 } 1884 1885 if (ISCONNECTED(res, ncomp)) { 1886 assert(res->hr_remotein != NULL); 1887 assert(res->hr_remoteout != NULL); 1888 keepalive_send(res, ncomp); 1889 } 1890 1891 if (ISCONNECTED(res, ncomp)) { 1892 assert(res->hr_remotein != NULL); 1893 assert(res->hr_remoteout != NULL); 1894 rw_unlock(&hio_remote_lock[ncomp]); 1895 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 1896 res->hr_remoteaddr); 1897 return; 1898 } 1899 1900 assert(res->hr_remotein == NULL); 1901 assert(res->hr_remoteout == NULL); 1902 /* 1903 * Upgrade the lock. It doesn't have to be atomic as no other thread 1904 * can change connection status from disconnected to connected. 1905 */ 1906 rw_unlock(&hio_remote_lock[ncomp]); 1907 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 1908 res->hr_remoteaddr); 1909 in = out = NULL; 1910 if (init_remote(res, &in, &out)) { 1911 rw_wlock(&hio_remote_lock[ncomp]); 1912 assert(res->hr_remotein == NULL); 1913 assert(res->hr_remoteout == NULL); 1914 assert(in != NULL && out != NULL); 1915 res->hr_remotein = in; 1916 res->hr_remoteout = out; 1917 rw_unlock(&hio_remote_lock[ncomp]); 1918 pjdlog_info("Successfully reconnected to %s.", 1919 res->hr_remoteaddr); 1920 sync_start(); 1921 } else { 1922 /* Both connections should be NULL. */ 1923 assert(res->hr_remotein == NULL); 1924 assert(res->hr_remoteout == NULL); 1925 assert(in == NULL && out == NULL); 1926 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 1927 res->hr_remoteaddr); 1928 } 1929} 1930 1931/* 1932 * Thread guards remote connections and reconnects when needed, handles 1933 * signals, etc. 1934 */ 1935static void * 1936guard_thread(void *arg) 1937{ 1938 struct hast_resource *res = arg; 1939 unsigned int ii, ncomps; 1940 struct timespec timeout; 1941 time_t lastcheck, now; 1942 sigset_t mask; 1943 int signo; 1944 1945 ncomps = HAST_NCOMPONENTS; 1946 lastcheck = time(NULL); 1947 1948 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 1949 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); 1950 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 1951 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 1952 1953 timeout.tv_nsec = 0; 1954 signo = -1; 1955 1956 for (;;) { 1957 switch (signo) { 1958 case SIGHUP: 1959 config_reload(); 1960 break; 1961 case SIGINT: 1962 case SIGTERM: 1963 sigexit_received = true; 1964 primary_exitx(EX_OK, 1965 "Termination signal received, exiting."); 1966 break; 1967 default: 1968 break; 1969 } 1970 1971 pjdlog_debug(2, "remote_guard: Checking connections."); 1972 now = time(NULL); 1973 if (lastcheck + RETRY_SLEEP <= now) { 1974 for (ii = 0; ii < ncomps; ii++) 1975 guard_one(res, ii); 1976 lastcheck = now; 1977 } 1978 timeout.tv_sec = RETRY_SLEEP; 1979 signo = sigtimedwait(&mask, NULL, &timeout); 1980 } 1981 /* NOTREACHED */ 1982 return (NULL); 1983} 1984