primary.c revision 212038
190075Sobrien/*- 290075Sobrien * Copyright (c) 2009 The FreeBSD Foundation 390075Sobrien * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 490075Sobrien * All rights reserved. 590075Sobrien * 690075Sobrien * This software was developed by Pawel Jakub Dawidek under sponsorship from 790075Sobrien * the FreeBSD Foundation. 890075Sobrien * 990075Sobrien * Redistribution and use in source and binary forms, with or without 1090075Sobrien * modification, are permitted provided that the following conditions 1190075Sobrien * are met: 1290075Sobrien * 1. Redistributions of source code must retain the above copyright 1390075Sobrien * notice, this list of conditions and the following disclaimer. 1490075Sobrien * 2. Redistributions in binary form must reproduce the above copyright 1590075Sobrien * notice, this list of conditions and the following disclaimer in the 1690075Sobrien * documentation and/or other materials provided with the distribution. 1790075Sobrien * 1890075Sobrien * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 1990075Sobrien * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 2090075Sobrien * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2190075Sobrien * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22132718Skan * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23132718Skan * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2490075Sobrien * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2590075Sobrien * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26132718Skan * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2790075Sobrien * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2890075Sobrien * SUCH DAMAGE. 2990075Sobrien */ 3090075Sobrien 3190075Sobrien#include <sys/cdefs.h> 3290075Sobrien__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 212038 2010-08-30 23:26:10Z pjd $"); 33132718Skan 3490075Sobrien#include <sys/types.h> 35132718Skan#include <sys/time.h> 3690075Sobrien#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <assert.h> 44#include <err.h> 45#include <errno.h> 46#include <fcntl.h> 47#include <libgeom.h> 48#include <pthread.h> 49#include <signal.h> 50#include <stdint.h> 51#include <stdio.h> 52#include <string.h> 53#include <sysexits.h> 54#include <unistd.h> 55 56#include <activemap.h> 57#include <nv.h> 58#include <rangelock.h> 59 60#include "control.h" 61#include "event.h" 62#include "hast.h" 63#include "hast_proto.h" 64#include "hastd.h" 65#include "hooks.h" 66#include "metadata.h" 67#include "proto.h" 68#include "pjdlog.h" 69#include "subr.h" 70#include "synch.h" 71 72/* The is only one remote component for now. */ 73#define ISREMOTE(no) ((no) == 1) 74 75struct hio { 76 /* 77 * Number of components we are still waiting for. 78 * When this field goes to 0, we can send the request back to the 79 * kernel. Each component has to decrease this counter by one 80 * even on failure. 81 */ 82 unsigned int hio_countdown; 83 /* 84 * Each component has a place to store its own error. 85 * Once the request is handled by all components we can decide if the 86 * request overall is successful or not. 87 */ 88 int *hio_errors; 89 /* 90 * Structure used to comunicate with GEOM Gate class. 91 */ 92 struct g_gate_ctl_io hio_ggio; 93 TAILQ_ENTRY(hio) *hio_next; 94}; 95#define hio_free_next hio_next[0] 96#define hio_done_next hio_next[0] 97 98/* 99 * Free list holds unused structures. When free list is empty, we have to wait 100 * until some in-progress requests are freed. 101 */ 102static TAILQ_HEAD(, hio) hio_free_list; 103static pthread_mutex_t hio_free_list_lock; 104static pthread_cond_t hio_free_list_cond; 105/* 106 * There is one send list for every component. One requests is placed on all 107 * send lists - each component gets the same request, but each component is 108 * responsible for managing his own send list. 109 */ 110static TAILQ_HEAD(, hio) *hio_send_list; 111static pthread_mutex_t *hio_send_list_lock; 112static pthread_cond_t *hio_send_list_cond; 113/* 114 * There is one recv list for every component, although local components don't 115 * use recv lists as local requests are done synchronously. 116 */ 117static TAILQ_HEAD(, hio) *hio_recv_list; 118static pthread_mutex_t *hio_recv_list_lock; 119static pthread_cond_t *hio_recv_list_cond; 120/* 121 * Request is placed on done list by the slowest component (the one that 122 * decreased hio_countdown from 1 to 0). 123 */ 124static TAILQ_HEAD(, hio) hio_done_list; 125static pthread_mutex_t hio_done_list_lock; 126static pthread_cond_t hio_done_list_cond; 127/* 128 * Structure below are for interaction with sync thread. 129 */ 130static bool sync_inprogress; 131static pthread_mutex_t sync_lock; 132static pthread_cond_t sync_cond; 133/* 134 * The lock below allows to synchornize access to remote connections. 135 */ 136static pthread_rwlock_t *hio_remote_lock; 137 138/* 139 * Lock to synchronize metadata updates. Also synchronize access to 140 * hr_primary_localcnt and hr_primary_remotecnt fields. 141 */ 142static pthread_mutex_t metadata_lock; 143 144/* 145 * Maximum number of outstanding I/O requests. 146 */ 147#define HAST_HIO_MAX 256 148/* 149 * Number of components. At this point there are only two components: local 150 * and remote, but in the future it might be possible to use multiple local 151 * and remote components. 152 */ 153#define HAST_NCOMPONENTS 2 154/* 155 * Number of seconds to sleep between reconnect retries or keepalive packets. 156 */ 157#define RETRY_SLEEP 10 158 159#define ISCONNECTED(res, no) \ 160 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 161 162#define QUEUE_INSERT1(hio, name, ncomp) do { \ 163 bool _wakeup; \ 164 \ 165 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 166 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 167 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 168 hio_next[(ncomp)]); \ 169 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 170 if (_wakeup) \ 171 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 172} while (0) 173#define QUEUE_INSERT2(hio, name) do { \ 174 bool _wakeup; \ 175 \ 176 mtx_lock(&hio_##name##_list_lock); \ 177 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 178 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 179 mtx_unlock(&hio_##name##_list_lock); \ 180 if (_wakeup) \ 181 cv_signal(&hio_##name##_list_cond); \ 182} while (0) 183#define QUEUE_TAKE1(hio, name, ncomp) do { \ 184 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 185 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 186 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 187 &hio_##name##_list_lock[(ncomp)]); \ 188 } \ 189 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 190 hio_next[(ncomp)]); \ 191 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 192} while (0) 193#define QUEUE_TAKE2(hio, name) do { \ 194 mtx_lock(&hio_##name##_list_lock); \ 195 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 196 cv_wait(&hio_##name##_list_cond, \ 197 &hio_##name##_list_lock); \ 198 } \ 199 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 200 mtx_unlock(&hio_##name##_list_lock); \ 201} while (0) 202 203#define SYNCREQ(hio) do { \ 204 (hio)->hio_ggio.gctl_unit = -1; \ 205 (hio)->hio_ggio.gctl_seq = 1; \ 206} while (0) 207#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 208#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 209#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 210 211static struct hast_resource *gres; 212 213static pthread_mutex_t range_lock; 214static struct rangelocks *range_regular; 215static bool range_regular_wait; 216static pthread_cond_t range_regular_cond; 217static struct rangelocks *range_sync; 218static bool range_sync_wait; 219static pthread_cond_t range_sync_cond; 220 221static void *ggate_recv_thread(void *arg); 222static void *local_send_thread(void *arg); 223static void *remote_send_thread(void *arg); 224static void *remote_recv_thread(void *arg); 225static void *ggate_send_thread(void *arg); 226static void *sync_thread(void *arg); 227static void *guard_thread(void *arg); 228 229static void 230cleanup(struct hast_resource *res) 231{ 232 int rerrno; 233 234 /* Remember errno. */ 235 rerrno = errno; 236 237 /* 238 * Close descriptor to /dev/hast/<name> 239 * to work-around race in the kernel. 240 */ 241 close(res->hr_localfd); 242 243 /* Destroy ggate provider if we created one. */ 244 if (res->hr_ggateunit >= 0) { 245 struct g_gate_ctl_destroy ggiod; 246 247 ggiod.gctl_version = G_GATE_VERSION; 248 ggiod.gctl_unit = res->hr_ggateunit; 249 ggiod.gctl_force = 1; 250 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 251 pjdlog_warning("Unable to destroy hast/%s device", 252 res->hr_provname); 253 } 254 res->hr_ggateunit = -1; 255 } 256 257 /* Restore errno. */ 258 errno = rerrno; 259} 260 261static void 262primary_exit(int exitcode, const char *fmt, ...) 263{ 264 va_list ap; 265 266 assert(exitcode != EX_OK); 267 va_start(ap, fmt); 268 pjdlogv_errno(LOG_ERR, fmt, ap); 269 va_end(ap); 270 cleanup(gres); 271 exit(exitcode); 272} 273 274static void 275primary_exitx(int exitcode, const char *fmt, ...) 276{ 277 va_list ap; 278 279 va_start(ap, fmt); 280 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 281 va_end(ap); 282 cleanup(gres); 283 exit(exitcode); 284} 285 286static int 287hast_activemap_flush(struct hast_resource *res) 288{ 289 const unsigned char *buf; 290 size_t size; 291 292 buf = activemap_bitmap(res->hr_amp, &size); 293 assert(buf != NULL); 294 assert((size % res->hr_local_sectorsize) == 0); 295 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 296 (ssize_t)size) { 297 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 298 "Unable to flush activemap to disk")); 299 return (-1); 300 } 301 return (0); 302} 303 304static bool 305real_remote(const struct hast_resource *res) 306{ 307 308 return (strcmp(res->hr_remoteaddr, "none") != 0); 309} 310 311static void 312init_environment(struct hast_resource *res __unused) 313{ 314 struct hio *hio; 315 unsigned int ii, ncomps; 316 sigset_t mask; 317 318 /* 319 * In the future it might be per-resource value. 320 */ 321 ncomps = HAST_NCOMPONENTS; 322 323 /* 324 * Allocate memory needed by lists. 325 */ 326 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 327 if (hio_send_list == NULL) { 328 primary_exitx(EX_TEMPFAIL, 329 "Unable to allocate %zu bytes of memory for send lists.", 330 sizeof(hio_send_list[0]) * ncomps); 331 } 332 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 333 if (hio_send_list_lock == NULL) { 334 primary_exitx(EX_TEMPFAIL, 335 "Unable to allocate %zu bytes of memory for send list locks.", 336 sizeof(hio_send_list_lock[0]) * ncomps); 337 } 338 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 339 if (hio_send_list_cond == NULL) { 340 primary_exitx(EX_TEMPFAIL, 341 "Unable to allocate %zu bytes of memory for send list condition variables.", 342 sizeof(hio_send_list_cond[0]) * ncomps); 343 } 344 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 345 if (hio_recv_list == NULL) { 346 primary_exitx(EX_TEMPFAIL, 347 "Unable to allocate %zu bytes of memory for recv lists.", 348 sizeof(hio_recv_list[0]) * ncomps); 349 } 350 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 351 if (hio_recv_list_lock == NULL) { 352 primary_exitx(EX_TEMPFAIL, 353 "Unable to allocate %zu bytes of memory for recv list locks.", 354 sizeof(hio_recv_list_lock[0]) * ncomps); 355 } 356 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 357 if (hio_recv_list_cond == NULL) { 358 primary_exitx(EX_TEMPFAIL, 359 "Unable to allocate %zu bytes of memory for recv list condition variables.", 360 sizeof(hio_recv_list_cond[0]) * ncomps); 361 } 362 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 363 if (hio_remote_lock == NULL) { 364 primary_exitx(EX_TEMPFAIL, 365 "Unable to allocate %zu bytes of memory for remote connections locks.", 366 sizeof(hio_remote_lock[0]) * ncomps); 367 } 368 369 /* 370 * Initialize lists, their locks and theirs condition variables. 371 */ 372 TAILQ_INIT(&hio_free_list); 373 mtx_init(&hio_free_list_lock); 374 cv_init(&hio_free_list_cond); 375 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 376 TAILQ_INIT(&hio_send_list[ii]); 377 mtx_init(&hio_send_list_lock[ii]); 378 cv_init(&hio_send_list_cond[ii]); 379 TAILQ_INIT(&hio_recv_list[ii]); 380 mtx_init(&hio_recv_list_lock[ii]); 381 cv_init(&hio_recv_list_cond[ii]); 382 rw_init(&hio_remote_lock[ii]); 383 } 384 TAILQ_INIT(&hio_done_list); 385 mtx_init(&hio_done_list_lock); 386 cv_init(&hio_done_list_cond); 387 mtx_init(&metadata_lock); 388 389 /* 390 * Allocate requests pool and initialize requests. 391 */ 392 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 393 hio = malloc(sizeof(*hio)); 394 if (hio == NULL) { 395 primary_exitx(EX_TEMPFAIL, 396 "Unable to allocate %zu bytes of memory for hio request.", 397 sizeof(*hio)); 398 } 399 hio->hio_countdown = 0; 400 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 401 if (hio->hio_errors == NULL) { 402 primary_exitx(EX_TEMPFAIL, 403 "Unable allocate %zu bytes of memory for hio errors.", 404 sizeof(hio->hio_errors[0]) * ncomps); 405 } 406 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 407 if (hio->hio_next == NULL) { 408 primary_exitx(EX_TEMPFAIL, 409 "Unable allocate %zu bytes of memory for hio_next field.", 410 sizeof(hio->hio_next[0]) * ncomps); 411 } 412 hio->hio_ggio.gctl_version = G_GATE_VERSION; 413 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 414 if (hio->hio_ggio.gctl_data == NULL) { 415 primary_exitx(EX_TEMPFAIL, 416 "Unable to allocate %zu bytes of memory for gctl_data.", 417 MAXPHYS); 418 } 419 hio->hio_ggio.gctl_length = MAXPHYS; 420 hio->hio_ggio.gctl_error = 0; 421 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 422 } 423 424 /* 425 * Turn on signals handling. 426 */ 427 PJDLOG_VERIFY(sigfillset(&mask) == 0); 428 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 429} 430 431static void 432init_local(struct hast_resource *res) 433{ 434 unsigned char *buf; 435 size_t mapsize; 436 437 if (metadata_read(res, true) < 0) 438 exit(EX_NOINPUT); 439 mtx_init(&res->hr_amp_lock); 440 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 441 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 442 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 443 } 444 mtx_init(&range_lock); 445 cv_init(&range_regular_cond); 446 if (rangelock_init(&range_regular) < 0) 447 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 448 cv_init(&range_sync_cond); 449 if (rangelock_init(&range_sync) < 0) 450 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 451 mapsize = activemap_ondisk_size(res->hr_amp); 452 buf = calloc(1, mapsize); 453 if (buf == NULL) { 454 primary_exitx(EX_TEMPFAIL, 455 "Unable to allocate buffer for activemap."); 456 } 457 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 458 (ssize_t)mapsize) { 459 primary_exit(EX_NOINPUT, "Unable to read activemap"); 460 } 461 activemap_copyin(res->hr_amp, buf, mapsize); 462 free(buf); 463 if (res->hr_resuid != 0) 464 return; 465 /* 466 * We're using provider for the first time, so we have to generate 467 * resource unique identifier and initialize local and remote counts. 468 */ 469 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 470 res->hr_primary_localcnt = 1; 471 res->hr_primary_remotecnt = 0; 472 if (metadata_write(res) < 0) 473 exit(EX_NOINPUT); 474} 475 476static bool 477init_remote(struct hast_resource *res, struct proto_conn **inp, 478 struct proto_conn **outp) 479{ 480 struct proto_conn *in, *out; 481 struct nv *nvout, *nvin; 482 const unsigned char *token; 483 unsigned char *map; 484 const char *errmsg; 485 int32_t extentsize; 486 int64_t datasize; 487 uint32_t mapsize; 488 size_t size; 489 490 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 491 assert(real_remote(res)); 492 493 in = out = NULL; 494 errmsg = NULL; 495 496 /* Prepare outgoing connection with remote node. */ 497 if (proto_client(res->hr_remoteaddr, &out) < 0) { 498 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 499 res->hr_remoteaddr); 500 } 501 /* Try to connect, but accept failure. */ 502 if (proto_connect(out) < 0) { 503 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 504 res->hr_remoteaddr); 505 goto close; 506 } 507 /* Error in setting timeout is not critical, but why should it fail? */ 508 if (proto_timeout(out, res->hr_timeout) < 0) 509 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 510 /* 511 * First handshake step. 512 * Setup outgoing connection with remote node. 513 */ 514 nvout = nv_alloc(); 515 nv_add_string(nvout, res->hr_name, "resource"); 516 if (nv_error(nvout) != 0) { 517 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 518 "Unable to allocate header for connection with %s", 519 res->hr_remoteaddr); 520 nv_free(nvout); 521 goto close; 522 } 523 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 524 pjdlog_errno(LOG_WARNING, 525 "Unable to send handshake header to %s", 526 res->hr_remoteaddr); 527 nv_free(nvout); 528 goto close; 529 } 530 nv_free(nvout); 531 if (hast_proto_recv_hdr(out, &nvin) < 0) { 532 pjdlog_errno(LOG_WARNING, 533 "Unable to receive handshake header from %s", 534 res->hr_remoteaddr); 535 goto close; 536 } 537 errmsg = nv_get_string(nvin, "errmsg"); 538 if (errmsg != NULL) { 539 pjdlog_warning("%s", errmsg); 540 nv_free(nvin); 541 goto close; 542 } 543 token = nv_get_uint8_array(nvin, &size, "token"); 544 if (token == NULL) { 545 pjdlog_warning("Handshake header from %s has no 'token' field.", 546 res->hr_remoteaddr); 547 nv_free(nvin); 548 goto close; 549 } 550 if (size != sizeof(res->hr_token)) { 551 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 552 res->hr_remoteaddr, size, sizeof(res->hr_token)); 553 nv_free(nvin); 554 goto close; 555 } 556 bcopy(token, res->hr_token, sizeof(res->hr_token)); 557 nv_free(nvin); 558 559 /* 560 * Second handshake step. 561 * Setup incoming connection with remote node. 562 */ 563 if (proto_client(res->hr_remoteaddr, &in) < 0) { 564 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 565 res->hr_remoteaddr); 566 } 567 /* Try to connect, but accept failure. */ 568 if (proto_connect(in) < 0) { 569 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 570 res->hr_remoteaddr); 571 goto close; 572 } 573 /* Error in setting timeout is not critical, but why should it fail? */ 574 if (proto_timeout(in, res->hr_timeout) < 0) 575 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 576 nvout = nv_alloc(); 577 nv_add_string(nvout, res->hr_name, "resource"); 578 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 579 "token"); 580 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 581 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 582 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 583 if (nv_error(nvout) != 0) { 584 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 585 "Unable to allocate header for connection with %s", 586 res->hr_remoteaddr); 587 nv_free(nvout); 588 goto close; 589 } 590 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 591 pjdlog_errno(LOG_WARNING, 592 "Unable to send handshake header to %s", 593 res->hr_remoteaddr); 594 nv_free(nvout); 595 goto close; 596 } 597 nv_free(nvout); 598 if (hast_proto_recv_hdr(out, &nvin) < 0) { 599 pjdlog_errno(LOG_WARNING, 600 "Unable to receive handshake header from %s", 601 res->hr_remoteaddr); 602 goto close; 603 } 604 errmsg = nv_get_string(nvin, "errmsg"); 605 if (errmsg != NULL) { 606 pjdlog_warning("%s", errmsg); 607 nv_free(nvin); 608 goto close; 609 } 610 datasize = nv_get_int64(nvin, "datasize"); 611 if (datasize != res->hr_datasize) { 612 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 613 (intmax_t)res->hr_datasize, (intmax_t)datasize); 614 nv_free(nvin); 615 goto close; 616 } 617 extentsize = nv_get_int32(nvin, "extentsize"); 618 if (extentsize != res->hr_extentsize) { 619 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 620 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 621 nv_free(nvin); 622 goto close; 623 } 624 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 625 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 626 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 627 map = NULL; 628 mapsize = nv_get_uint32(nvin, "mapsize"); 629 if (mapsize > 0) { 630 map = malloc(mapsize); 631 if (map == NULL) { 632 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 633 (uintmax_t)mapsize); 634 nv_free(nvin); 635 goto close; 636 } 637 /* 638 * Remote node have some dirty extents on its own, lets 639 * download its activemap. 640 */ 641 if (hast_proto_recv_data(res, out, nvin, map, 642 mapsize) < 0) { 643 pjdlog_errno(LOG_ERR, 644 "Unable to receive remote activemap"); 645 nv_free(nvin); 646 free(map); 647 goto close; 648 } 649 /* 650 * Merge local and remote bitmaps. 651 */ 652 activemap_merge(res->hr_amp, map, mapsize); 653 free(map); 654 /* 655 * Now that we merged bitmaps from both nodes, flush it to the 656 * disk before we start to synchronize. 657 */ 658 (void)hast_activemap_flush(res); 659 } 660 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 661 if (inp != NULL && outp != NULL) { 662 *inp = in; 663 *outp = out; 664 } else { 665 res->hr_remotein = in; 666 res->hr_remoteout = out; 667 } 668 event_send(res, EVENT_CONNECT); 669 return (true); 670close: 671 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 672 event_send(res, EVENT_SPLITBRAIN); 673 proto_close(out); 674 if (in != NULL) 675 proto_close(in); 676 return (false); 677} 678 679static void 680sync_start(void) 681{ 682 683 mtx_lock(&sync_lock); 684 sync_inprogress = true; 685 mtx_unlock(&sync_lock); 686 cv_signal(&sync_cond); 687} 688 689static void 690sync_stop(void) 691{ 692 693 mtx_lock(&sync_lock); 694 if (sync_inprogress) 695 sync_inprogress = false; 696 mtx_unlock(&sync_lock); 697} 698 699static void 700init_ggate(struct hast_resource *res) 701{ 702 struct g_gate_ctl_create ggiocreate; 703 struct g_gate_ctl_cancel ggiocancel; 704 705 /* 706 * We communicate with ggate via /dev/ggctl. Open it. 707 */ 708 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 709 if (res->hr_ggatefd < 0) 710 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 711 /* 712 * Create provider before trying to connect, as connection failure 713 * is not critical, but may take some time. 714 */ 715 ggiocreate.gctl_version = G_GATE_VERSION; 716 ggiocreate.gctl_mediasize = res->hr_datasize; 717 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 718 ggiocreate.gctl_flags = 0; 719 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 720 ggiocreate.gctl_timeout = 0; 721 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 722 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 723 res->hr_provname); 724 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 725 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 726 pjdlog_info("Device hast/%s created.", res->hr_provname); 727 res->hr_ggateunit = ggiocreate.gctl_unit; 728 return; 729 } 730 if (errno != EEXIST) { 731 primary_exit(EX_OSERR, "Unable to create hast/%s device", 732 res->hr_provname); 733 } 734 pjdlog_debug(1, 735 "Device hast/%s already exists, we will try to take it over.", 736 res->hr_provname); 737 /* 738 * If we received EEXIST, we assume that the process who created the 739 * provider died and didn't clean up. In that case we will start from 740 * where he left of. 741 */ 742 ggiocancel.gctl_version = G_GATE_VERSION; 743 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 744 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 745 res->hr_provname); 746 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 747 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 748 res->hr_ggateunit = ggiocancel.gctl_unit; 749 return; 750 } 751 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 752 res->hr_provname); 753} 754 755void 756hastd_primary(struct hast_resource *res) 757{ 758 pthread_t td; 759 pid_t pid; 760 int error; 761 762 /* 763 * Create communication channel between parent and child. 764 */ 765 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 766 KEEP_ERRNO((void)pidfile_remove(pfh)); 767 pjdlog_exit(EX_OSERR, 768 "Unable to create control sockets between parent and child"); 769 } 770 /* 771 * Create communication channel between child and parent. 772 */ 773 if (proto_client("socketpair://", &res->hr_event) < 0) { 774 KEEP_ERRNO((void)pidfile_remove(pfh)); 775 pjdlog_exit(EX_OSERR, 776 "Unable to create event sockets between child and parent"); 777 } 778 779 pid = fork(); 780 if (pid < 0) { 781 KEEP_ERRNO((void)pidfile_remove(pfh)); 782 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 783 } 784 785 if (pid > 0) { 786 /* This is parent. */ 787 /* Declare that we are receiver. */ 788 proto_recv(res->hr_event, NULL, 0); 789 res->hr_workerpid = pid; 790 return; 791 } 792 793 gres = res; 794 795 (void)pidfile_close(pfh); 796 hook_fini(); 797 798 setproctitle("%s (primary)", res->hr_name); 799 800 signal(SIGHUP, SIG_DFL); 801 signal(SIGCHLD, SIG_DFL); 802 803 /* Declare that we are sender. */ 804 proto_send(res->hr_event, NULL, 0); 805 806 init_local(res); 807 if (real_remote(res) && init_remote(res, NULL, NULL)) 808 sync_start(); 809 init_ggate(res); 810 init_environment(res); 811 error = pthread_create(&td, NULL, ggate_recv_thread, res); 812 assert(error == 0); 813 error = pthread_create(&td, NULL, local_send_thread, res); 814 assert(error == 0); 815 error = pthread_create(&td, NULL, remote_send_thread, res); 816 assert(error == 0); 817 error = pthread_create(&td, NULL, remote_recv_thread, res); 818 assert(error == 0); 819 error = pthread_create(&td, NULL, ggate_send_thread, res); 820 assert(error == 0); 821 error = pthread_create(&td, NULL, sync_thread, res); 822 assert(error == 0); 823 error = pthread_create(&td, NULL, ctrl_thread, res); 824 assert(error == 0); 825 (void)guard_thread(res); 826} 827 828static void 829reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 830{ 831 char msg[1024]; 832 va_list ap; 833 int len; 834 835 va_start(ap, fmt); 836 len = vsnprintf(msg, sizeof(msg), fmt, ap); 837 va_end(ap); 838 if ((size_t)len < sizeof(msg)) { 839 switch (ggio->gctl_cmd) { 840 case BIO_READ: 841 (void)snprintf(msg + len, sizeof(msg) - len, 842 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 843 (uintmax_t)ggio->gctl_length); 844 break; 845 case BIO_DELETE: 846 (void)snprintf(msg + len, sizeof(msg) - len, 847 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 848 (uintmax_t)ggio->gctl_length); 849 break; 850 case BIO_FLUSH: 851 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 852 break; 853 case BIO_WRITE: 854 (void)snprintf(msg + len, sizeof(msg) - len, 855 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 856 (uintmax_t)ggio->gctl_length); 857 break; 858 default: 859 (void)snprintf(msg + len, sizeof(msg) - len, 860 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 861 break; 862 } 863 } 864 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 865} 866 867static void 868remote_close(struct hast_resource *res, int ncomp) 869{ 870 871 rw_wlock(&hio_remote_lock[ncomp]); 872 /* 873 * A race is possible between dropping rlock and acquiring wlock - 874 * another thread can close connection in-between. 875 */ 876 if (!ISCONNECTED(res, ncomp)) { 877 assert(res->hr_remotein == NULL); 878 assert(res->hr_remoteout == NULL); 879 rw_unlock(&hio_remote_lock[ncomp]); 880 return; 881 } 882 883 assert(res->hr_remotein != NULL); 884 assert(res->hr_remoteout != NULL); 885 886 pjdlog_debug(2, "Closing incoming connection to %s.", 887 res->hr_remoteaddr); 888 proto_close(res->hr_remotein); 889 res->hr_remotein = NULL; 890 pjdlog_debug(2, "Closing outgoing connection to %s.", 891 res->hr_remoteaddr); 892 proto_close(res->hr_remoteout); 893 res->hr_remoteout = NULL; 894 895 rw_unlock(&hio_remote_lock[ncomp]); 896 897 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 898 899 /* 900 * Stop synchronization if in-progress. 901 */ 902 sync_stop(); 903 904 event_send(res, EVENT_DISCONNECT); 905} 906 907/* 908 * Thread receives ggate I/O requests from the kernel and passes them to 909 * appropriate threads: 910 * WRITE - always goes to both local_send and remote_send threads 911 * READ (when the block is up-to-date on local component) - 912 * only local_send thread 913 * READ (when the block isn't up-to-date on local component) - 914 * only remote_send thread 915 * DELETE - always goes to both local_send and remote_send threads 916 * FLUSH - always goes to both local_send and remote_send threads 917 */ 918static void * 919ggate_recv_thread(void *arg) 920{ 921 struct hast_resource *res = arg; 922 struct g_gate_ctl_io *ggio; 923 struct hio *hio; 924 unsigned int ii, ncomp, ncomps; 925 int error; 926 927 ncomps = HAST_NCOMPONENTS; 928 929 for (;;) { 930 pjdlog_debug(2, "ggate_recv: Taking free request."); 931 QUEUE_TAKE2(hio, free); 932 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 933 ggio = &hio->hio_ggio; 934 ggio->gctl_unit = res->hr_ggateunit; 935 ggio->gctl_length = MAXPHYS; 936 ggio->gctl_error = 0; 937 pjdlog_debug(2, 938 "ggate_recv: (%p) Waiting for request from the kernel.", 939 hio); 940 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 941 if (sigexit_received) 942 pthread_exit(NULL); 943 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 944 } 945 error = ggio->gctl_error; 946 switch (error) { 947 case 0: 948 break; 949 case ECANCELED: 950 /* Exit gracefully. */ 951 if (!sigexit_received) { 952 pjdlog_debug(2, 953 "ggate_recv: (%p) Received cancel from the kernel.", 954 hio); 955 pjdlog_info("Received cancel from the kernel, exiting."); 956 } 957 pthread_exit(NULL); 958 case ENOMEM: 959 /* 960 * Buffer too small? Impossible, we allocate MAXPHYS 961 * bytes - request can't be bigger than that. 962 */ 963 /* FALLTHROUGH */ 964 case ENXIO: 965 default: 966 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 967 strerror(error)); 968 } 969 for (ii = 0; ii < ncomps; ii++) 970 hio->hio_errors[ii] = EINVAL; 971 reqlog(LOG_DEBUG, 2, ggio, 972 "ggate_recv: (%p) Request received from the kernel: ", 973 hio); 974 /* 975 * Inform all components about new write request. 976 * For read request prefer local component unless the given 977 * range is out-of-date, then use remote component. 978 */ 979 switch (ggio->gctl_cmd) { 980 case BIO_READ: 981 pjdlog_debug(2, 982 "ggate_recv: (%p) Moving request to the send queue.", 983 hio); 984 refcount_init(&hio->hio_countdown, 1); 985 mtx_lock(&metadata_lock); 986 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 987 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 988 /* 989 * This range is up-to-date on local component, 990 * so handle request locally. 991 */ 992 /* Local component is 0 for now. */ 993 ncomp = 0; 994 } else /* if (res->hr_syncsrc == 995 HAST_SYNCSRC_SECONDARY) */ { 996 assert(res->hr_syncsrc == 997 HAST_SYNCSRC_SECONDARY); 998 /* 999 * This range is out-of-date on local component, 1000 * so send request to the remote node. 1001 */ 1002 /* Remote component is 1 for now. */ 1003 ncomp = 1; 1004 } 1005 mtx_unlock(&metadata_lock); 1006 QUEUE_INSERT1(hio, send, ncomp); 1007 break; 1008 case BIO_WRITE: 1009 for (;;) { 1010 mtx_lock(&range_lock); 1011 if (rangelock_islocked(range_sync, 1012 ggio->gctl_offset, ggio->gctl_length)) { 1013 pjdlog_debug(2, 1014 "regular: Range offset=%jd length=%zu locked.", 1015 (intmax_t)ggio->gctl_offset, 1016 (size_t)ggio->gctl_length); 1017 range_regular_wait = true; 1018 cv_wait(&range_regular_cond, &range_lock); 1019 range_regular_wait = false; 1020 mtx_unlock(&range_lock); 1021 continue; 1022 } 1023 if (rangelock_add(range_regular, 1024 ggio->gctl_offset, ggio->gctl_length) < 0) { 1025 mtx_unlock(&range_lock); 1026 pjdlog_debug(2, 1027 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1028 (intmax_t)ggio->gctl_offset, 1029 (size_t)ggio->gctl_length); 1030 sleep(1); 1031 continue; 1032 } 1033 mtx_unlock(&range_lock); 1034 break; 1035 } 1036 mtx_lock(&res->hr_amp_lock); 1037 if (activemap_write_start(res->hr_amp, 1038 ggio->gctl_offset, ggio->gctl_length)) { 1039 (void)hast_activemap_flush(res); 1040 } 1041 mtx_unlock(&res->hr_amp_lock); 1042 /* FALLTHROUGH */ 1043 case BIO_DELETE: 1044 case BIO_FLUSH: 1045 pjdlog_debug(2, 1046 "ggate_recv: (%p) Moving request to the send queues.", 1047 hio); 1048 refcount_init(&hio->hio_countdown, ncomps); 1049 for (ii = 0; ii < ncomps; ii++) 1050 QUEUE_INSERT1(hio, send, ii); 1051 break; 1052 } 1053 } 1054 /* NOTREACHED */ 1055 return (NULL); 1056} 1057 1058/* 1059 * Thread reads from or writes to local component. 1060 * If local read fails, it redirects it to remote_send thread. 1061 */ 1062static void * 1063local_send_thread(void *arg) 1064{ 1065 struct hast_resource *res = arg; 1066 struct g_gate_ctl_io *ggio; 1067 struct hio *hio; 1068 unsigned int ncomp, rncomp; 1069 ssize_t ret; 1070 1071 /* Local component is 0 for now. */ 1072 ncomp = 0; 1073 /* Remote component is 1 for now. */ 1074 rncomp = 1; 1075 1076 for (;;) { 1077 pjdlog_debug(2, "local_send: Taking request."); 1078 QUEUE_TAKE1(hio, send, ncomp); 1079 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1080 ggio = &hio->hio_ggio; 1081 switch (ggio->gctl_cmd) { 1082 case BIO_READ: 1083 ret = pread(res->hr_localfd, ggio->gctl_data, 1084 ggio->gctl_length, 1085 ggio->gctl_offset + res->hr_localoff); 1086 if (ret == ggio->gctl_length) 1087 hio->hio_errors[ncomp] = 0; 1088 else { 1089 /* 1090 * If READ failed, try to read from remote node. 1091 */ 1092 QUEUE_INSERT1(hio, send, rncomp); 1093 continue; 1094 } 1095 break; 1096 case BIO_WRITE: 1097 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1098 ggio->gctl_length, 1099 ggio->gctl_offset + res->hr_localoff); 1100 if (ret < 0) 1101 hio->hio_errors[ncomp] = errno; 1102 else if (ret != ggio->gctl_length) 1103 hio->hio_errors[ncomp] = EIO; 1104 else 1105 hio->hio_errors[ncomp] = 0; 1106 break; 1107 case BIO_DELETE: 1108 ret = g_delete(res->hr_localfd, 1109 ggio->gctl_offset + res->hr_localoff, 1110 ggio->gctl_length); 1111 if (ret < 0) 1112 hio->hio_errors[ncomp] = errno; 1113 else 1114 hio->hio_errors[ncomp] = 0; 1115 break; 1116 case BIO_FLUSH: 1117 ret = g_flush(res->hr_localfd); 1118 if (ret < 0) 1119 hio->hio_errors[ncomp] = errno; 1120 else 1121 hio->hio_errors[ncomp] = 0; 1122 break; 1123 } 1124 if (refcount_release(&hio->hio_countdown)) { 1125 if (ISSYNCREQ(hio)) { 1126 mtx_lock(&sync_lock); 1127 SYNCREQDONE(hio); 1128 mtx_unlock(&sync_lock); 1129 cv_signal(&sync_cond); 1130 } else { 1131 pjdlog_debug(2, 1132 "local_send: (%p) Moving request to the done queue.", 1133 hio); 1134 QUEUE_INSERT2(hio, done); 1135 } 1136 } 1137 } 1138 /* NOTREACHED */ 1139 return (NULL); 1140} 1141 1142/* 1143 * Thread sends request to secondary node. 1144 */ 1145static void * 1146remote_send_thread(void *arg) 1147{ 1148 struct hast_resource *res = arg; 1149 struct g_gate_ctl_io *ggio; 1150 struct hio *hio; 1151 struct nv *nv; 1152 unsigned int ncomp; 1153 bool wakeup; 1154 uint64_t offset, length; 1155 uint8_t cmd; 1156 void *data; 1157 1158 /* Remote component is 1 for now. */ 1159 ncomp = 1; 1160 1161 for (;;) { 1162 pjdlog_debug(2, "remote_send: Taking request."); 1163 QUEUE_TAKE1(hio, send, ncomp); 1164 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1165 ggio = &hio->hio_ggio; 1166 switch (ggio->gctl_cmd) { 1167 case BIO_READ: 1168 cmd = HIO_READ; 1169 data = NULL; 1170 offset = ggio->gctl_offset; 1171 length = ggio->gctl_length; 1172 break; 1173 case BIO_WRITE: 1174 cmd = HIO_WRITE; 1175 data = ggio->gctl_data; 1176 offset = ggio->gctl_offset; 1177 length = ggio->gctl_length; 1178 break; 1179 case BIO_DELETE: 1180 cmd = HIO_DELETE; 1181 data = NULL; 1182 offset = ggio->gctl_offset; 1183 length = ggio->gctl_length; 1184 break; 1185 case BIO_FLUSH: 1186 cmd = HIO_FLUSH; 1187 data = NULL; 1188 offset = 0; 1189 length = 0; 1190 break; 1191 default: 1192 assert(!"invalid condition"); 1193 abort(); 1194 } 1195 nv = nv_alloc(); 1196 nv_add_uint8(nv, cmd, "cmd"); 1197 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1198 nv_add_uint64(nv, offset, "offset"); 1199 nv_add_uint64(nv, length, "length"); 1200 if (nv_error(nv) != 0) { 1201 hio->hio_errors[ncomp] = nv_error(nv); 1202 pjdlog_debug(2, 1203 "remote_send: (%p) Unable to prepare header to send.", 1204 hio); 1205 reqlog(LOG_ERR, 0, ggio, 1206 "Unable to prepare header to send (%s): ", 1207 strerror(nv_error(nv))); 1208 /* Move failed request immediately to the done queue. */ 1209 goto done_queue; 1210 } 1211 pjdlog_debug(2, 1212 "remote_send: (%p) Moving request to the recv queue.", 1213 hio); 1214 /* 1215 * Protect connection from disappearing. 1216 */ 1217 rw_rlock(&hio_remote_lock[ncomp]); 1218 if (!ISCONNECTED(res, ncomp)) { 1219 rw_unlock(&hio_remote_lock[ncomp]); 1220 hio->hio_errors[ncomp] = ENOTCONN; 1221 goto done_queue; 1222 } 1223 /* 1224 * Move the request to recv queue before sending it, because 1225 * in different order we can get reply before we move request 1226 * to recv queue. 1227 */ 1228 mtx_lock(&hio_recv_list_lock[ncomp]); 1229 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1230 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1231 mtx_unlock(&hio_recv_list_lock[ncomp]); 1232 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1233 data != NULL ? length : 0) < 0) { 1234 hio->hio_errors[ncomp] = errno; 1235 rw_unlock(&hio_remote_lock[ncomp]); 1236 pjdlog_debug(2, 1237 "remote_send: (%p) Unable to send request.", hio); 1238 reqlog(LOG_ERR, 0, ggio, 1239 "Unable to send request (%s): ", 1240 strerror(hio->hio_errors[ncomp])); 1241 remote_close(res, ncomp); 1242 /* 1243 * Take request back from the receive queue and move 1244 * it immediately to the done queue. 1245 */ 1246 mtx_lock(&hio_recv_list_lock[ncomp]); 1247 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1248 mtx_unlock(&hio_recv_list_lock[ncomp]); 1249 goto done_queue; 1250 } 1251 rw_unlock(&hio_remote_lock[ncomp]); 1252 nv_free(nv); 1253 if (wakeup) 1254 cv_signal(&hio_recv_list_cond[ncomp]); 1255 continue; 1256done_queue: 1257 nv_free(nv); 1258 if (ISSYNCREQ(hio)) { 1259 if (!refcount_release(&hio->hio_countdown)) 1260 continue; 1261 mtx_lock(&sync_lock); 1262 SYNCREQDONE(hio); 1263 mtx_unlock(&sync_lock); 1264 cv_signal(&sync_cond); 1265 continue; 1266 } 1267 if (ggio->gctl_cmd == BIO_WRITE) { 1268 mtx_lock(&res->hr_amp_lock); 1269 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1270 ggio->gctl_length)) { 1271 (void)hast_activemap_flush(res); 1272 } 1273 mtx_unlock(&res->hr_amp_lock); 1274 } 1275 if (!refcount_release(&hio->hio_countdown)) 1276 continue; 1277 pjdlog_debug(2, 1278 "remote_send: (%p) Moving request to the done queue.", 1279 hio); 1280 QUEUE_INSERT2(hio, done); 1281 } 1282 /* NOTREACHED */ 1283 return (NULL); 1284} 1285 1286/* 1287 * Thread receives answer from secondary node and passes it to ggate_send 1288 * thread. 1289 */ 1290static void * 1291remote_recv_thread(void *arg) 1292{ 1293 struct hast_resource *res = arg; 1294 struct g_gate_ctl_io *ggio; 1295 struct hio *hio; 1296 struct nv *nv; 1297 unsigned int ncomp; 1298 uint64_t seq; 1299 int error; 1300 1301 /* Remote component is 1 for now. */ 1302 ncomp = 1; 1303 1304 for (;;) { 1305 /* Wait until there is anything to receive. */ 1306 mtx_lock(&hio_recv_list_lock[ncomp]); 1307 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1308 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1309 cv_wait(&hio_recv_list_cond[ncomp], 1310 &hio_recv_list_lock[ncomp]); 1311 } 1312 mtx_unlock(&hio_recv_list_lock[ncomp]); 1313 rw_rlock(&hio_remote_lock[ncomp]); 1314 if (!ISCONNECTED(res, ncomp)) { 1315 rw_unlock(&hio_remote_lock[ncomp]); 1316 /* 1317 * Connection is dead, so move all pending requests to 1318 * the done queue (one-by-one). 1319 */ 1320 mtx_lock(&hio_recv_list_lock[ncomp]); 1321 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1322 assert(hio != NULL); 1323 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1324 hio_next[ncomp]); 1325 mtx_unlock(&hio_recv_list_lock[ncomp]); 1326 goto done_queue; 1327 } 1328 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1329 pjdlog_errno(LOG_ERR, 1330 "Unable to receive reply header"); 1331 rw_unlock(&hio_remote_lock[ncomp]); 1332 remote_close(res, ncomp); 1333 continue; 1334 } 1335 rw_unlock(&hio_remote_lock[ncomp]); 1336 seq = nv_get_uint64(nv, "seq"); 1337 if (seq == 0) { 1338 pjdlog_error("Header contains no 'seq' field."); 1339 nv_free(nv); 1340 continue; 1341 } 1342 mtx_lock(&hio_recv_list_lock[ncomp]); 1343 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1344 if (hio->hio_ggio.gctl_seq == seq) { 1345 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1346 hio_next[ncomp]); 1347 break; 1348 } 1349 } 1350 mtx_unlock(&hio_recv_list_lock[ncomp]); 1351 if (hio == NULL) { 1352 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1353 (uintmax_t)seq); 1354 nv_free(nv); 1355 continue; 1356 } 1357 error = nv_get_int16(nv, "error"); 1358 if (error != 0) { 1359 /* Request failed on remote side. */ 1360 hio->hio_errors[ncomp] = 0; 1361 nv_free(nv); 1362 goto done_queue; 1363 } 1364 ggio = &hio->hio_ggio; 1365 switch (ggio->gctl_cmd) { 1366 case BIO_READ: 1367 rw_rlock(&hio_remote_lock[ncomp]); 1368 if (!ISCONNECTED(res, ncomp)) { 1369 rw_unlock(&hio_remote_lock[ncomp]); 1370 nv_free(nv); 1371 goto done_queue; 1372 } 1373 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1374 ggio->gctl_data, ggio->gctl_length) < 0) { 1375 hio->hio_errors[ncomp] = errno; 1376 pjdlog_errno(LOG_ERR, 1377 "Unable to receive reply data"); 1378 rw_unlock(&hio_remote_lock[ncomp]); 1379 nv_free(nv); 1380 remote_close(res, ncomp); 1381 goto done_queue; 1382 } 1383 rw_unlock(&hio_remote_lock[ncomp]); 1384 break; 1385 case BIO_WRITE: 1386 case BIO_DELETE: 1387 case BIO_FLUSH: 1388 break; 1389 default: 1390 assert(!"invalid condition"); 1391 abort(); 1392 } 1393 hio->hio_errors[ncomp] = 0; 1394 nv_free(nv); 1395done_queue: 1396 if (refcount_release(&hio->hio_countdown)) { 1397 if (ISSYNCREQ(hio)) { 1398 mtx_lock(&sync_lock); 1399 SYNCREQDONE(hio); 1400 mtx_unlock(&sync_lock); 1401 cv_signal(&sync_cond); 1402 } else { 1403 pjdlog_debug(2, 1404 "remote_recv: (%p) Moving request to the done queue.", 1405 hio); 1406 QUEUE_INSERT2(hio, done); 1407 } 1408 } 1409 } 1410 /* NOTREACHED */ 1411 return (NULL); 1412} 1413 1414/* 1415 * Thread sends answer to the kernel. 1416 */ 1417static void * 1418ggate_send_thread(void *arg) 1419{ 1420 struct hast_resource *res = arg; 1421 struct g_gate_ctl_io *ggio; 1422 struct hio *hio; 1423 unsigned int ii, ncomp, ncomps; 1424 1425 ncomps = HAST_NCOMPONENTS; 1426 1427 for (;;) { 1428 pjdlog_debug(2, "ggate_send: Taking request."); 1429 QUEUE_TAKE2(hio, done); 1430 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1431 ggio = &hio->hio_ggio; 1432 for (ii = 0; ii < ncomps; ii++) { 1433 if (hio->hio_errors[ii] == 0) { 1434 /* 1435 * One successful request is enough to declare 1436 * success. 1437 */ 1438 ggio->gctl_error = 0; 1439 break; 1440 } 1441 } 1442 if (ii == ncomps) { 1443 /* 1444 * None of the requests were successful. 1445 * Use first error. 1446 */ 1447 ggio->gctl_error = hio->hio_errors[0]; 1448 } 1449 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1450 mtx_lock(&res->hr_amp_lock); 1451 activemap_write_complete(res->hr_amp, 1452 ggio->gctl_offset, ggio->gctl_length); 1453 mtx_unlock(&res->hr_amp_lock); 1454 } 1455 if (ggio->gctl_cmd == BIO_WRITE) { 1456 /* 1457 * Unlock range we locked. 1458 */ 1459 mtx_lock(&range_lock); 1460 rangelock_del(range_regular, ggio->gctl_offset, 1461 ggio->gctl_length); 1462 if (range_sync_wait) 1463 cv_signal(&range_sync_cond); 1464 mtx_unlock(&range_lock); 1465 /* 1466 * Bump local count if this is first write after 1467 * connection failure with remote node. 1468 */ 1469 ncomp = 1; 1470 rw_rlock(&hio_remote_lock[ncomp]); 1471 if (!ISCONNECTED(res, ncomp)) { 1472 mtx_lock(&metadata_lock); 1473 if (res->hr_primary_localcnt == 1474 res->hr_secondary_remotecnt) { 1475 res->hr_primary_localcnt++; 1476 pjdlog_debug(1, 1477 "Increasing localcnt to %ju.", 1478 (uintmax_t)res->hr_primary_localcnt); 1479 (void)metadata_write(res); 1480 } 1481 mtx_unlock(&metadata_lock); 1482 } 1483 rw_unlock(&hio_remote_lock[ncomp]); 1484 } 1485 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1486 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1487 pjdlog_debug(2, 1488 "ggate_send: (%p) Moving request to the free queue.", hio); 1489 QUEUE_INSERT2(hio, free); 1490 } 1491 /* NOTREACHED */ 1492 return (NULL); 1493} 1494 1495/* 1496 * Thread synchronize local and remote components. 1497 */ 1498static void * 1499sync_thread(void *arg __unused) 1500{ 1501 struct hast_resource *res = arg; 1502 struct hio *hio; 1503 struct g_gate_ctl_io *ggio; 1504 unsigned int ii, ncomp, ncomps; 1505 off_t offset, length, synced; 1506 bool dorewind; 1507 int syncext; 1508 1509 ncomps = HAST_NCOMPONENTS; 1510 dorewind = true; 1511 synced = 0; 1512 offset = -1; 1513 1514 for (;;) { 1515 mtx_lock(&sync_lock); 1516 if (offset >= 0 && !sync_inprogress) { 1517 pjdlog_info("Synchronization interrupted. " 1518 "%jd bytes synchronized so far.", 1519 (intmax_t)synced); 1520 event_send(res, EVENT_SYNCINTR); 1521 } 1522 while (!sync_inprogress) { 1523 dorewind = true; 1524 synced = 0; 1525 cv_wait(&sync_cond, &sync_lock); 1526 } 1527 mtx_unlock(&sync_lock); 1528 /* 1529 * Obtain offset at which we should synchronize. 1530 * Rewind synchronization if needed. 1531 */ 1532 mtx_lock(&res->hr_amp_lock); 1533 if (dorewind) 1534 activemap_sync_rewind(res->hr_amp); 1535 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1536 if (syncext != -1) { 1537 /* 1538 * We synchronized entire syncext extent, we can mark 1539 * it as clean now. 1540 */ 1541 if (activemap_extent_complete(res->hr_amp, syncext)) 1542 (void)hast_activemap_flush(res); 1543 } 1544 mtx_unlock(&res->hr_amp_lock); 1545 if (dorewind) { 1546 dorewind = false; 1547 if (offset < 0) 1548 pjdlog_info("Nodes are in sync."); 1549 else { 1550 pjdlog_info("Synchronization started. %ju bytes to go.", 1551 (uintmax_t)(res->hr_extentsize * 1552 activemap_ndirty(res->hr_amp))); 1553 event_send(res, EVENT_SYNCSTART); 1554 } 1555 } 1556 if (offset < 0) { 1557 sync_stop(); 1558 pjdlog_debug(1, "Nothing to synchronize."); 1559 /* 1560 * Synchronization complete, make both localcnt and 1561 * remotecnt equal. 1562 */ 1563 ncomp = 1; 1564 rw_rlock(&hio_remote_lock[ncomp]); 1565 if (ISCONNECTED(res, ncomp)) { 1566 if (synced > 0) { 1567 pjdlog_info("Synchronization complete. " 1568 "%jd bytes synchronized.", 1569 (intmax_t)synced); 1570 event_send(res, EVENT_SYNCDONE); 1571 } 1572 mtx_lock(&metadata_lock); 1573 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1574 res->hr_primary_localcnt = 1575 res->hr_secondary_localcnt; 1576 res->hr_primary_remotecnt = 1577 res->hr_secondary_remotecnt; 1578 pjdlog_debug(1, 1579 "Setting localcnt to %ju and remotecnt to %ju.", 1580 (uintmax_t)res->hr_primary_localcnt, 1581 (uintmax_t)res->hr_secondary_localcnt); 1582 (void)metadata_write(res); 1583 mtx_unlock(&metadata_lock); 1584 } 1585 rw_unlock(&hio_remote_lock[ncomp]); 1586 continue; 1587 } 1588 pjdlog_debug(2, "sync: Taking free request."); 1589 QUEUE_TAKE2(hio, free); 1590 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1591 /* 1592 * Lock the range we are going to synchronize. We don't want 1593 * race where someone writes between our read and write. 1594 */ 1595 for (;;) { 1596 mtx_lock(&range_lock); 1597 if (rangelock_islocked(range_regular, offset, length)) { 1598 pjdlog_debug(2, 1599 "sync: Range offset=%jd length=%jd locked.", 1600 (intmax_t)offset, (intmax_t)length); 1601 range_sync_wait = true; 1602 cv_wait(&range_sync_cond, &range_lock); 1603 range_sync_wait = false; 1604 mtx_unlock(&range_lock); 1605 continue; 1606 } 1607 if (rangelock_add(range_sync, offset, length) < 0) { 1608 mtx_unlock(&range_lock); 1609 pjdlog_debug(2, 1610 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1611 (intmax_t)offset, (intmax_t)length); 1612 sleep(1); 1613 continue; 1614 } 1615 mtx_unlock(&range_lock); 1616 break; 1617 } 1618 /* 1619 * First read the data from synchronization source. 1620 */ 1621 SYNCREQ(hio); 1622 ggio = &hio->hio_ggio; 1623 ggio->gctl_cmd = BIO_READ; 1624 ggio->gctl_offset = offset; 1625 ggio->gctl_length = length; 1626 ggio->gctl_error = 0; 1627 for (ii = 0; ii < ncomps; ii++) 1628 hio->hio_errors[ii] = EINVAL; 1629 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1630 hio); 1631 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1632 hio); 1633 mtx_lock(&metadata_lock); 1634 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1635 /* 1636 * This range is up-to-date on local component, 1637 * so handle request locally. 1638 */ 1639 /* Local component is 0 for now. */ 1640 ncomp = 0; 1641 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1642 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1643 /* 1644 * This range is out-of-date on local component, 1645 * so send request to the remote node. 1646 */ 1647 /* Remote component is 1 for now. */ 1648 ncomp = 1; 1649 } 1650 mtx_unlock(&metadata_lock); 1651 refcount_init(&hio->hio_countdown, 1); 1652 QUEUE_INSERT1(hio, send, ncomp); 1653 1654 /* 1655 * Let's wait for READ to finish. 1656 */ 1657 mtx_lock(&sync_lock); 1658 while (!ISSYNCREQDONE(hio)) 1659 cv_wait(&sync_cond, &sync_lock); 1660 mtx_unlock(&sync_lock); 1661 1662 if (hio->hio_errors[ncomp] != 0) { 1663 pjdlog_error("Unable to read synchronization data: %s.", 1664 strerror(hio->hio_errors[ncomp])); 1665 goto free_queue; 1666 } 1667 1668 /* 1669 * We read the data from synchronization source, now write it 1670 * to synchronization target. 1671 */ 1672 SYNCREQ(hio); 1673 ggio->gctl_cmd = BIO_WRITE; 1674 for (ii = 0; ii < ncomps; ii++) 1675 hio->hio_errors[ii] = EINVAL; 1676 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1677 hio); 1678 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1679 hio); 1680 mtx_lock(&metadata_lock); 1681 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1682 /* 1683 * This range is up-to-date on local component, 1684 * so we update remote component. 1685 */ 1686 /* Remote component is 1 for now. */ 1687 ncomp = 1; 1688 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1689 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1690 /* 1691 * This range is out-of-date on local component, 1692 * so we update it. 1693 */ 1694 /* Local component is 0 for now. */ 1695 ncomp = 0; 1696 } 1697 mtx_unlock(&metadata_lock); 1698 1699 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1700 hio); 1701 refcount_init(&hio->hio_countdown, 1); 1702 QUEUE_INSERT1(hio, send, ncomp); 1703 1704 /* 1705 * Let's wait for WRITE to finish. 1706 */ 1707 mtx_lock(&sync_lock); 1708 while (!ISSYNCREQDONE(hio)) 1709 cv_wait(&sync_cond, &sync_lock); 1710 mtx_unlock(&sync_lock); 1711 1712 if (hio->hio_errors[ncomp] != 0) { 1713 pjdlog_error("Unable to write synchronization data: %s.", 1714 strerror(hio->hio_errors[ncomp])); 1715 goto free_queue; 1716 } 1717 1718 synced += length; 1719free_queue: 1720 mtx_lock(&range_lock); 1721 rangelock_del(range_sync, offset, length); 1722 if (range_regular_wait) 1723 cv_signal(&range_regular_cond); 1724 mtx_unlock(&range_lock); 1725 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1726 hio); 1727 QUEUE_INSERT2(hio, free); 1728 } 1729 /* NOTREACHED */ 1730 return (NULL); 1731} 1732 1733static void 1734config_reload(void) 1735{ 1736 struct hastd_config *newcfg; 1737 struct hast_resource *res; 1738 unsigned int ii, ncomps; 1739 int modified; 1740 1741 pjdlog_info("Reloading configuration..."); 1742 1743 ncomps = HAST_NCOMPONENTS; 1744 1745 newcfg = yy_config_parse(cfgpath, false); 1746 if (newcfg == NULL) 1747 goto failed; 1748 1749 TAILQ_FOREACH(res, &newcfg->hc_resources, hr_next) { 1750 if (strcmp(res->hr_name, gres->hr_name) == 0) 1751 break; 1752 } 1753 /* 1754 * If resource was removed from the configuration file, resource 1755 * name, provider name or path to local component was modified we 1756 * shouldn't be here. This means that someone modified configuration 1757 * file and send SIGHUP to us instead of main hastd process. 1758 * Log advice and ignore the signal. 1759 */ 1760 if (res == NULL || strcmp(gres->hr_name, res->hr_name) != 0 || 1761 strcmp(gres->hr_provname, res->hr_provname) != 0 || 1762 strcmp(gres->hr_localpath, res->hr_localpath) != 0) { 1763 pjdlog_warning("To reload configuration send SIGHUP to the main hastd process (pid %u).", 1764 (unsigned int)getppid()); 1765 goto failed; 1766 } 1767 1768#define MODIFIED_REMOTEADDR 0x1 1769#define MODIFIED_REPLICATION 0x2 1770#define MODIFIED_TIMEOUT 0x4 1771#define MODIFIED_EXEC 0x8 1772 modified = 0; 1773 if (strcmp(gres->hr_remoteaddr, res->hr_remoteaddr) != 0) { 1774 /* 1775 * Don't copy res->hr_remoteaddr to gres just yet. 1776 * We want remote_close() to log disconnect from the old 1777 * addresses, not from the new ones. 1778 */ 1779 modified |= MODIFIED_REMOTEADDR; 1780 } 1781 if (gres->hr_replication != res->hr_replication) { 1782 gres->hr_replication = res->hr_replication; 1783 modified |= MODIFIED_REPLICATION; 1784 } 1785 if (gres->hr_timeout != res->hr_timeout) { 1786 gres->hr_timeout = res->hr_timeout; 1787 modified |= MODIFIED_TIMEOUT; 1788 } 1789 if (strcmp(gres->hr_exec, res->hr_exec) != 0) { 1790 strlcpy(gres->hr_exec, res->hr_exec, sizeof(gres->hr_exec)); 1791 modified |= MODIFIED_EXEC; 1792 } 1793 /* 1794 * If only timeout was modified we only need to change it without 1795 * reconnecting. 1796 */ 1797 if (modified == MODIFIED_TIMEOUT) { 1798 for (ii = 0; ii < ncomps; ii++) { 1799 if (!ISREMOTE(ii)) 1800 continue; 1801 rw_rlock(&hio_remote_lock[ii]); 1802 if (!ISCONNECTED(gres, ii)) { 1803 rw_unlock(&hio_remote_lock[ii]); 1804 continue; 1805 } 1806 rw_unlock(&hio_remote_lock[ii]); 1807 if (proto_timeout(gres->hr_remotein, 1808 gres->hr_timeout) < 0) { 1809 pjdlog_errno(LOG_WARNING, 1810 "Unable to set connection timeout"); 1811 } 1812 if (proto_timeout(gres->hr_remoteout, 1813 gres->hr_timeout) < 0) { 1814 pjdlog_errno(LOG_WARNING, 1815 "Unable to set connection timeout"); 1816 } 1817 } 1818 } else if ((modified & 1819 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 1820 for (ii = 0; ii < ncomps; ii++) { 1821 if (!ISREMOTE(ii)) 1822 continue; 1823 remote_close(gres, ii); 1824 } 1825 if (modified & MODIFIED_REMOTEADDR) { 1826 strlcpy(gres->hr_remoteaddr, res->hr_remoteaddr, 1827 sizeof(gres->hr_remoteaddr)); 1828 } 1829 } 1830#undef MODIFIED_REMOTEADDR 1831#undef MODIFIED_REPLICATION 1832#undef MODIFIED_TIMEOUT 1833#undef MODIFIED_EXEC 1834 1835 pjdlog_info("Configuration reloaded successfully."); 1836 return; 1837failed: 1838 if (newcfg != NULL) { 1839 if (newcfg->hc_controlconn != NULL) 1840 proto_close(newcfg->hc_controlconn); 1841 if (newcfg->hc_listenconn != NULL) 1842 proto_close(newcfg->hc_listenconn); 1843 yy_config_free(newcfg); 1844 } 1845 pjdlog_warning("Configuration not reloaded."); 1846} 1847 1848static void 1849keepalive_send(struct hast_resource *res, unsigned int ncomp) 1850{ 1851 struct nv *nv; 1852 1853 nv = nv_alloc(); 1854 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1855 if (nv_error(nv) != 0) { 1856 nv_free(nv); 1857 pjdlog_debug(1, 1858 "keepalive_send: Unable to prepare header to send."); 1859 return; 1860 } 1861 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1862 pjdlog_common(LOG_DEBUG, 1, errno, 1863 "keepalive_send: Unable to send request"); 1864 nv_free(nv); 1865 rw_unlock(&hio_remote_lock[ncomp]); 1866 remote_close(res, ncomp); 1867 rw_rlock(&hio_remote_lock[ncomp]); 1868 return; 1869 } 1870 nv_free(nv); 1871 pjdlog_debug(2, "keepalive_send: Request sent."); 1872} 1873 1874static void 1875guard_one(struct hast_resource *res, unsigned int ncomp) 1876{ 1877 struct proto_conn *in, *out; 1878 1879 if (!ISREMOTE(ncomp)) 1880 return; 1881 1882 rw_rlock(&hio_remote_lock[ncomp]); 1883 1884 if (!real_remote(res)) { 1885 rw_unlock(&hio_remote_lock[ncomp]); 1886 return; 1887 } 1888 1889 if (ISCONNECTED(res, ncomp)) { 1890 assert(res->hr_remotein != NULL); 1891 assert(res->hr_remoteout != NULL); 1892 keepalive_send(res, ncomp); 1893 } 1894 1895 if (ISCONNECTED(res, ncomp)) { 1896 assert(res->hr_remotein != NULL); 1897 assert(res->hr_remoteout != NULL); 1898 rw_unlock(&hio_remote_lock[ncomp]); 1899 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 1900 res->hr_remoteaddr); 1901 return; 1902 } 1903 1904 assert(res->hr_remotein == NULL); 1905 assert(res->hr_remoteout == NULL); 1906 /* 1907 * Upgrade the lock. It doesn't have to be atomic as no other thread 1908 * can change connection status from disconnected to connected. 1909 */ 1910 rw_unlock(&hio_remote_lock[ncomp]); 1911 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 1912 res->hr_remoteaddr); 1913 in = out = NULL; 1914 if (init_remote(res, &in, &out)) { 1915 rw_wlock(&hio_remote_lock[ncomp]); 1916 assert(res->hr_remotein == NULL); 1917 assert(res->hr_remoteout == NULL); 1918 assert(in != NULL && out != NULL); 1919 res->hr_remotein = in; 1920 res->hr_remoteout = out; 1921 rw_unlock(&hio_remote_lock[ncomp]); 1922 pjdlog_info("Successfully reconnected to %s.", 1923 res->hr_remoteaddr); 1924 sync_start(); 1925 } else { 1926 /* Both connections should be NULL. */ 1927 assert(res->hr_remotein == NULL); 1928 assert(res->hr_remoteout == NULL); 1929 assert(in == NULL && out == NULL); 1930 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 1931 res->hr_remoteaddr); 1932 } 1933} 1934 1935/* 1936 * Thread guards remote connections and reconnects when needed, handles 1937 * signals, etc. 1938 */ 1939static void * 1940guard_thread(void *arg) 1941{ 1942 struct hast_resource *res = arg; 1943 unsigned int ii, ncomps; 1944 struct timespec timeout; 1945 time_t lastcheck, now; 1946 sigset_t mask; 1947 int signo; 1948 1949 ncomps = HAST_NCOMPONENTS; 1950 lastcheck = time(NULL); 1951 1952 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 1953 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); 1954 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 1955 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 1956 1957 timeout.tv_nsec = 0; 1958 signo = -1; 1959 1960 for (;;) { 1961 switch (signo) { 1962 case SIGHUP: 1963 config_reload(); 1964 break; 1965 case SIGINT: 1966 case SIGTERM: 1967 sigexit_received = true; 1968 primary_exitx(EX_OK, 1969 "Termination signal received, exiting."); 1970 break; 1971 default: 1972 break; 1973 } 1974 1975 pjdlog_debug(2, "remote_guard: Checking connections."); 1976 now = time(NULL); 1977 if (lastcheck + RETRY_SLEEP <= now) { 1978 for (ii = 0; ii < ncomps; ii++) 1979 guard_one(res, ii); 1980 lastcheck = now; 1981 } 1982 timeout.tv_sec = RETRY_SLEEP; 1983 signo = sigtimedwait(&mask, NULL, &timeout); 1984 } 1985 /* NOTREACHED */ 1986 return (NULL); 1987} 1988