primary.c revision 214284
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 214284 2010-10-24 17:28:25Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <assert.h> 44#include <err.h> 45#include <errno.h> 46#include <fcntl.h> 47#include <libgeom.h> 48#include <pthread.h> 49#include <signal.h> 50#include <stdint.h> 51#include <stdio.h> 52#include <string.h> 53#include <sysexits.h> 54#include <unistd.h> 55 56#include <activemap.h> 57#include <nv.h> 58#include <rangelock.h> 59 60#include "control.h" 61#include "event.h" 62#include "hast.h" 63#include "hast_proto.h" 64#include "hastd.h" 65#include "hooks.h" 66#include "metadata.h" 67#include "proto.h" 68#include "pjdlog.h" 69#include "subr.h" 70#include "synch.h" 71 72/* The is only one remote component for now. */ 73#define ISREMOTE(no) ((no) == 1) 74 75struct hio { 76 /* 77 * Number of components we are still waiting for. 78 * When this field goes to 0, we can send the request back to the 79 * kernel. Each component has to decrease this counter by one 80 * even on failure. 81 */ 82 unsigned int hio_countdown; 83 /* 84 * Each component has a place to store its own error. 85 * Once the request is handled by all components we can decide if the 86 * request overall is successful or not. 87 */ 88 int *hio_errors; 89 /* 90 * Structure used to comunicate with GEOM Gate class. 91 */ 92 struct g_gate_ctl_io hio_ggio; 93 TAILQ_ENTRY(hio) *hio_next; 94}; 95#define hio_free_next hio_next[0] 96#define hio_done_next hio_next[0] 97 98/* 99 * Free list holds unused structures. When free list is empty, we have to wait 100 * until some in-progress requests are freed. 101 */ 102static TAILQ_HEAD(, hio) hio_free_list; 103static pthread_mutex_t hio_free_list_lock; 104static pthread_cond_t hio_free_list_cond; 105/* 106 * There is one send list for every component. One requests is placed on all 107 * send lists - each component gets the same request, but each component is 108 * responsible for managing his own send list. 109 */ 110static TAILQ_HEAD(, hio) *hio_send_list; 111static pthread_mutex_t *hio_send_list_lock; 112static pthread_cond_t *hio_send_list_cond; 113/* 114 * There is one recv list for every component, although local components don't 115 * use recv lists as local requests are done synchronously. 116 */ 117static TAILQ_HEAD(, hio) *hio_recv_list; 118static pthread_mutex_t *hio_recv_list_lock; 119static pthread_cond_t *hio_recv_list_cond; 120/* 121 * Request is placed on done list by the slowest component (the one that 122 * decreased hio_countdown from 1 to 0). 123 */ 124static TAILQ_HEAD(, hio) hio_done_list; 125static pthread_mutex_t hio_done_list_lock; 126static pthread_cond_t hio_done_list_cond; 127/* 128 * Structure below are for interaction with sync thread. 129 */ 130static bool sync_inprogress; 131static pthread_mutex_t sync_lock; 132static pthread_cond_t sync_cond; 133/* 134 * The lock below allows to synchornize access to remote connections. 135 */ 136static pthread_rwlock_t *hio_remote_lock; 137 138/* 139 * Lock to synchronize metadata updates. Also synchronize access to 140 * hr_primary_localcnt and hr_primary_remotecnt fields. 141 */ 142static pthread_mutex_t metadata_lock; 143 144/* 145 * Maximum number of outstanding I/O requests. 146 */ 147#define HAST_HIO_MAX 256 148/* 149 * Number of components. At this point there are only two components: local 150 * and remote, but in the future it might be possible to use multiple local 151 * and remote components. 152 */ 153#define HAST_NCOMPONENTS 2 154/* 155 * Number of seconds to sleep between reconnect retries or keepalive packets. 156 */ 157#define RETRY_SLEEP 10 158 159#define ISCONNECTED(res, no) \ 160 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 161 162#define QUEUE_INSERT1(hio, name, ncomp) do { \ 163 bool _wakeup; \ 164 \ 165 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 166 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 167 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 168 hio_next[(ncomp)]); \ 169 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 170 if (_wakeup) \ 171 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 172} while (0) 173#define QUEUE_INSERT2(hio, name) do { \ 174 bool _wakeup; \ 175 \ 176 mtx_lock(&hio_##name##_list_lock); \ 177 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 178 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 179 mtx_unlock(&hio_##name##_list_lock); \ 180 if (_wakeup) \ 181 cv_signal(&hio_##name##_list_cond); \ 182} while (0) 183#define QUEUE_TAKE1(hio, name, ncomp) do { \ 184 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 185 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 186 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 187 &hio_##name##_list_lock[(ncomp)]); \ 188 } \ 189 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 190 hio_next[(ncomp)]); \ 191 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 192} while (0) 193#define QUEUE_TAKE2(hio, name) do { \ 194 mtx_lock(&hio_##name##_list_lock); \ 195 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 196 cv_wait(&hio_##name##_list_cond, \ 197 &hio_##name##_list_lock); \ 198 } \ 199 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 200 mtx_unlock(&hio_##name##_list_lock); \ 201} while (0) 202 203#define SYNCREQ(hio) do { \ 204 (hio)->hio_ggio.gctl_unit = -1; \ 205 (hio)->hio_ggio.gctl_seq = 1; \ 206} while (0) 207#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 208#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 209#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 210 211static struct hast_resource *gres; 212 213static pthread_mutex_t range_lock; 214static struct rangelocks *range_regular; 215static bool range_regular_wait; 216static pthread_cond_t range_regular_cond; 217static struct rangelocks *range_sync; 218static bool range_sync_wait; 219static pthread_cond_t range_sync_cond; 220 221static void *ggate_recv_thread(void *arg); 222static void *local_send_thread(void *arg); 223static void *remote_send_thread(void *arg); 224static void *remote_recv_thread(void *arg); 225static void *ggate_send_thread(void *arg); 226static void *sync_thread(void *arg); 227static void *guard_thread(void *arg); 228 229static void 230cleanup(struct hast_resource *res) 231{ 232 int rerrno; 233 234 /* Remember errno. */ 235 rerrno = errno; 236 237 /* Destroy ggate provider if we created one. */ 238 if (res->hr_ggateunit >= 0) { 239 struct g_gate_ctl_destroy ggiod; 240 241 bzero(&ggiod, sizeof(ggiod)); 242 ggiod.gctl_version = G_GATE_VERSION; 243 ggiod.gctl_unit = res->hr_ggateunit; 244 ggiod.gctl_force = 1; 245 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 246 pjdlog_errno(LOG_WARNING, 247 "Unable to destroy hast/%s device", 248 res->hr_provname); 249 } 250 res->hr_ggateunit = -1; 251 } 252 253 /* Restore errno. */ 254 errno = rerrno; 255} 256 257static __dead2 void 258primary_exit(int exitcode, const char *fmt, ...) 259{ 260 va_list ap; 261 262 assert(exitcode != EX_OK); 263 va_start(ap, fmt); 264 pjdlogv_errno(LOG_ERR, fmt, ap); 265 va_end(ap); 266 cleanup(gres); 267 exit(exitcode); 268} 269 270static __dead2 void 271primary_exitx(int exitcode, const char *fmt, ...) 272{ 273 va_list ap; 274 275 va_start(ap, fmt); 276 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280} 281 282static int 283hast_activemap_flush(struct hast_resource *res) 284{ 285 const unsigned char *buf; 286 size_t size; 287 288 buf = activemap_bitmap(res->hr_amp, &size); 289 assert(buf != NULL); 290 assert((size % res->hr_local_sectorsize) == 0); 291 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 292 (ssize_t)size) { 293 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 294 "Unable to flush activemap to disk")); 295 return (-1); 296 } 297 return (0); 298} 299 300static bool 301real_remote(const struct hast_resource *res) 302{ 303 304 return (strcmp(res->hr_remoteaddr, "none") != 0); 305} 306 307static void 308init_environment(struct hast_resource *res __unused) 309{ 310 struct hio *hio; 311 unsigned int ii, ncomps; 312 313 /* 314 * In the future it might be per-resource value. 315 */ 316 ncomps = HAST_NCOMPONENTS; 317 318 /* 319 * Allocate memory needed by lists. 320 */ 321 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 322 if (hio_send_list == NULL) { 323 primary_exitx(EX_TEMPFAIL, 324 "Unable to allocate %zu bytes of memory for send lists.", 325 sizeof(hio_send_list[0]) * ncomps); 326 } 327 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 328 if (hio_send_list_lock == NULL) { 329 primary_exitx(EX_TEMPFAIL, 330 "Unable to allocate %zu bytes of memory for send list locks.", 331 sizeof(hio_send_list_lock[0]) * ncomps); 332 } 333 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 334 if (hio_send_list_cond == NULL) { 335 primary_exitx(EX_TEMPFAIL, 336 "Unable to allocate %zu bytes of memory for send list condition variables.", 337 sizeof(hio_send_list_cond[0]) * ncomps); 338 } 339 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 340 if (hio_recv_list == NULL) { 341 primary_exitx(EX_TEMPFAIL, 342 "Unable to allocate %zu bytes of memory for recv lists.", 343 sizeof(hio_recv_list[0]) * ncomps); 344 } 345 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 346 if (hio_recv_list_lock == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for recv list locks.", 349 sizeof(hio_recv_list_lock[0]) * ncomps); 350 } 351 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 352 if (hio_recv_list_cond == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for recv list condition variables.", 355 sizeof(hio_recv_list_cond[0]) * ncomps); 356 } 357 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 358 if (hio_remote_lock == NULL) { 359 primary_exitx(EX_TEMPFAIL, 360 "Unable to allocate %zu bytes of memory for remote connections locks.", 361 sizeof(hio_remote_lock[0]) * ncomps); 362 } 363 364 /* 365 * Initialize lists, their locks and theirs condition variables. 366 */ 367 TAILQ_INIT(&hio_free_list); 368 mtx_init(&hio_free_list_lock); 369 cv_init(&hio_free_list_cond); 370 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 371 TAILQ_INIT(&hio_send_list[ii]); 372 mtx_init(&hio_send_list_lock[ii]); 373 cv_init(&hio_send_list_cond[ii]); 374 TAILQ_INIT(&hio_recv_list[ii]); 375 mtx_init(&hio_recv_list_lock[ii]); 376 cv_init(&hio_recv_list_cond[ii]); 377 rw_init(&hio_remote_lock[ii]); 378 } 379 TAILQ_INIT(&hio_done_list); 380 mtx_init(&hio_done_list_lock); 381 cv_init(&hio_done_list_cond); 382 mtx_init(&metadata_lock); 383 384 /* 385 * Allocate requests pool and initialize requests. 386 */ 387 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 388 hio = malloc(sizeof(*hio)); 389 if (hio == NULL) { 390 primary_exitx(EX_TEMPFAIL, 391 "Unable to allocate %zu bytes of memory for hio request.", 392 sizeof(*hio)); 393 } 394 hio->hio_countdown = 0; 395 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 396 if (hio->hio_errors == NULL) { 397 primary_exitx(EX_TEMPFAIL, 398 "Unable allocate %zu bytes of memory for hio errors.", 399 sizeof(hio->hio_errors[0]) * ncomps); 400 } 401 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 402 if (hio->hio_next == NULL) { 403 primary_exitx(EX_TEMPFAIL, 404 "Unable allocate %zu bytes of memory for hio_next field.", 405 sizeof(hio->hio_next[0]) * ncomps); 406 } 407 hio->hio_ggio.gctl_version = G_GATE_VERSION; 408 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 409 if (hio->hio_ggio.gctl_data == NULL) { 410 primary_exitx(EX_TEMPFAIL, 411 "Unable to allocate %zu bytes of memory for gctl_data.", 412 MAXPHYS); 413 } 414 hio->hio_ggio.gctl_length = MAXPHYS; 415 hio->hio_ggio.gctl_error = 0; 416 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 417 } 418} 419 420static bool 421init_resuid(struct hast_resource *res) 422{ 423 424 mtx_lock(&metadata_lock); 425 if (res->hr_resuid != 0) { 426 mtx_unlock(&metadata_lock); 427 return (false); 428 } else { 429 /* Initialize unique resource identifier. */ 430 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 431 mtx_unlock(&metadata_lock); 432 if (metadata_write(res) < 0) 433 exit(EX_NOINPUT); 434 return (true); 435 } 436} 437 438static void 439init_local(struct hast_resource *res) 440{ 441 unsigned char *buf; 442 size_t mapsize; 443 444 if (metadata_read(res, true) < 0) 445 exit(EX_NOINPUT); 446 mtx_init(&res->hr_amp_lock); 447 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 448 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 449 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 450 } 451 mtx_init(&range_lock); 452 cv_init(&range_regular_cond); 453 if (rangelock_init(&range_regular) < 0) 454 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 455 cv_init(&range_sync_cond); 456 if (rangelock_init(&range_sync) < 0) 457 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 458 mapsize = activemap_ondisk_size(res->hr_amp); 459 buf = calloc(1, mapsize); 460 if (buf == NULL) { 461 primary_exitx(EX_TEMPFAIL, 462 "Unable to allocate buffer for activemap."); 463 } 464 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 465 (ssize_t)mapsize) { 466 primary_exit(EX_NOINPUT, "Unable to read activemap"); 467 } 468 activemap_copyin(res->hr_amp, buf, mapsize); 469 free(buf); 470 if (res->hr_resuid != 0) 471 return; 472 /* 473 * We're using provider for the first time. Initialize local and remote 474 * counters. We don't initialize resuid here, as we want to do it just 475 * in time. The reason for this is that we want to inform secondary 476 * that there were no writes yet, so there is no need to synchronize 477 * anything. 478 */ 479 res->hr_primary_localcnt = 1; 480 res->hr_primary_remotecnt = 0; 481 if (metadata_write(res) < 0) 482 exit(EX_NOINPUT); 483} 484 485static bool 486init_remote(struct hast_resource *res, struct proto_conn **inp, 487 struct proto_conn **outp) 488{ 489 struct proto_conn *in, *out; 490 struct nv *nvout, *nvin; 491 const unsigned char *token; 492 unsigned char *map; 493 const char *errmsg; 494 int32_t extentsize; 495 int64_t datasize; 496 uint32_t mapsize; 497 size_t size; 498 499 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 500 assert(real_remote(res)); 501 502 in = out = NULL; 503 errmsg = NULL; 504 505 /* Prepare outgoing connection with remote node. */ 506 if (proto_client(res->hr_remoteaddr, &out) < 0) { 507 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 508 res->hr_remoteaddr); 509 } 510 /* Try to connect, but accept failure. */ 511 if (proto_connect(out) < 0) { 512 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 513 res->hr_remoteaddr); 514 goto close; 515 } 516 /* Error in setting timeout is not critical, but why should it fail? */ 517 if (proto_timeout(out, res->hr_timeout) < 0) 518 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 519 /* 520 * First handshake step. 521 * Setup outgoing connection with remote node. 522 */ 523 nvout = nv_alloc(); 524 nv_add_string(nvout, res->hr_name, "resource"); 525 if (nv_error(nvout) != 0) { 526 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 527 "Unable to allocate header for connection with %s", 528 res->hr_remoteaddr); 529 nv_free(nvout); 530 goto close; 531 } 532 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 533 pjdlog_errno(LOG_WARNING, 534 "Unable to send handshake header to %s", 535 res->hr_remoteaddr); 536 nv_free(nvout); 537 goto close; 538 } 539 nv_free(nvout); 540 if (hast_proto_recv_hdr(out, &nvin) < 0) { 541 pjdlog_errno(LOG_WARNING, 542 "Unable to receive handshake header from %s", 543 res->hr_remoteaddr); 544 goto close; 545 } 546 errmsg = nv_get_string(nvin, "errmsg"); 547 if (errmsg != NULL) { 548 pjdlog_warning("%s", errmsg); 549 nv_free(nvin); 550 goto close; 551 } 552 token = nv_get_uint8_array(nvin, &size, "token"); 553 if (token == NULL) { 554 pjdlog_warning("Handshake header from %s has no 'token' field.", 555 res->hr_remoteaddr); 556 nv_free(nvin); 557 goto close; 558 } 559 if (size != sizeof(res->hr_token)) { 560 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 561 res->hr_remoteaddr, size, sizeof(res->hr_token)); 562 nv_free(nvin); 563 goto close; 564 } 565 bcopy(token, res->hr_token, sizeof(res->hr_token)); 566 nv_free(nvin); 567 568 /* 569 * Second handshake step. 570 * Setup incoming connection with remote node. 571 */ 572 if (proto_client(res->hr_remoteaddr, &in) < 0) { 573 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 574 res->hr_remoteaddr); 575 } 576 /* Try to connect, but accept failure. */ 577 if (proto_connect(in) < 0) { 578 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 579 res->hr_remoteaddr); 580 goto close; 581 } 582 /* Error in setting timeout is not critical, but why should it fail? */ 583 if (proto_timeout(in, res->hr_timeout) < 0) 584 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 585 nvout = nv_alloc(); 586 nv_add_string(nvout, res->hr_name, "resource"); 587 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 588 "token"); 589 if (res->hr_resuid == 0) { 590 /* 591 * The resuid field was not yet initialized. 592 * Because we do synchronization inside init_resuid(), it is 593 * possible that someone already initialized it, the function 594 * will return false then, but if we successfully initialized 595 * it, we will get true. True means that there were no writes 596 * to this resource yet and we want to inform secondary that 597 * synchronization is not needed by sending "virgin" argument. 598 */ 599 if (init_resuid(res)) 600 nv_add_int8(nvout, 1, "virgin"); 601 } 602 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 603 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 604 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 605 if (nv_error(nvout) != 0) { 606 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 607 "Unable to allocate header for connection with %s", 608 res->hr_remoteaddr); 609 nv_free(nvout); 610 goto close; 611 } 612 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 613 pjdlog_errno(LOG_WARNING, 614 "Unable to send handshake header to %s", 615 res->hr_remoteaddr); 616 nv_free(nvout); 617 goto close; 618 } 619 nv_free(nvout); 620 if (hast_proto_recv_hdr(out, &nvin) < 0) { 621 pjdlog_errno(LOG_WARNING, 622 "Unable to receive handshake header from %s", 623 res->hr_remoteaddr); 624 goto close; 625 } 626 errmsg = nv_get_string(nvin, "errmsg"); 627 if (errmsg != NULL) { 628 pjdlog_warning("%s", errmsg); 629 nv_free(nvin); 630 goto close; 631 } 632 datasize = nv_get_int64(nvin, "datasize"); 633 if (datasize != res->hr_datasize) { 634 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 635 (intmax_t)res->hr_datasize, (intmax_t)datasize); 636 nv_free(nvin); 637 goto close; 638 } 639 extentsize = nv_get_int32(nvin, "extentsize"); 640 if (extentsize != res->hr_extentsize) { 641 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 642 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 643 nv_free(nvin); 644 goto close; 645 } 646 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 647 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 648 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 649 map = NULL; 650 mapsize = nv_get_uint32(nvin, "mapsize"); 651 if (mapsize > 0) { 652 map = malloc(mapsize); 653 if (map == NULL) { 654 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 655 (uintmax_t)mapsize); 656 nv_free(nvin); 657 goto close; 658 } 659 /* 660 * Remote node have some dirty extents on its own, lets 661 * download its activemap. 662 */ 663 if (hast_proto_recv_data(res, out, nvin, map, 664 mapsize) < 0) { 665 pjdlog_errno(LOG_ERR, 666 "Unable to receive remote activemap"); 667 nv_free(nvin); 668 free(map); 669 goto close; 670 } 671 /* 672 * Merge local and remote bitmaps. 673 */ 674 activemap_merge(res->hr_amp, map, mapsize); 675 free(map); 676 /* 677 * Now that we merged bitmaps from both nodes, flush it to the 678 * disk before we start to synchronize. 679 */ 680 (void)hast_activemap_flush(res); 681 } 682 nv_free(nvin); 683 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 684 if (inp != NULL && outp != NULL) { 685 *inp = in; 686 *outp = out; 687 } else { 688 res->hr_remotein = in; 689 res->hr_remoteout = out; 690 } 691 event_send(res, EVENT_CONNECT); 692 return (true); 693close: 694 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 695 event_send(res, EVENT_SPLITBRAIN); 696 proto_close(out); 697 if (in != NULL) 698 proto_close(in); 699 return (false); 700} 701 702static void 703sync_start(void) 704{ 705 706 mtx_lock(&sync_lock); 707 sync_inprogress = true; 708 mtx_unlock(&sync_lock); 709 cv_signal(&sync_cond); 710} 711 712static void 713sync_stop(void) 714{ 715 716 mtx_lock(&sync_lock); 717 if (sync_inprogress) 718 sync_inprogress = false; 719 mtx_unlock(&sync_lock); 720} 721 722static void 723init_ggate(struct hast_resource *res) 724{ 725 struct g_gate_ctl_create ggiocreate; 726 struct g_gate_ctl_cancel ggiocancel; 727 728 /* 729 * We communicate with ggate via /dev/ggctl. Open it. 730 */ 731 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 732 if (res->hr_ggatefd < 0) 733 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 734 /* 735 * Create provider before trying to connect, as connection failure 736 * is not critical, but may take some time. 737 */ 738 bzero(&ggiocreate, sizeof(ggiocreate)); 739 ggiocreate.gctl_version = G_GATE_VERSION; 740 ggiocreate.gctl_mediasize = res->hr_datasize; 741 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 742 ggiocreate.gctl_flags = 0; 743 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 744 ggiocreate.gctl_timeout = 0; 745 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 746 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 747 res->hr_provname); 748 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 749 pjdlog_info("Device hast/%s created.", res->hr_provname); 750 res->hr_ggateunit = ggiocreate.gctl_unit; 751 return; 752 } 753 if (errno != EEXIST) { 754 primary_exit(EX_OSERR, "Unable to create hast/%s device", 755 res->hr_provname); 756 } 757 pjdlog_debug(1, 758 "Device hast/%s already exists, we will try to take it over.", 759 res->hr_provname); 760 /* 761 * If we received EEXIST, we assume that the process who created the 762 * provider died and didn't clean up. In that case we will start from 763 * where he left of. 764 */ 765 bzero(&ggiocancel, sizeof(ggiocancel)); 766 ggiocancel.gctl_version = G_GATE_VERSION; 767 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 768 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 769 res->hr_provname); 770 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 771 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 772 res->hr_ggateunit = ggiocancel.gctl_unit; 773 return; 774 } 775 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 776 res->hr_provname); 777} 778 779void 780hastd_primary(struct hast_resource *res) 781{ 782 pthread_t td; 783 pid_t pid; 784 int error; 785 786 /* 787 * Create communication channel between parent and child. 788 */ 789 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 790 KEEP_ERRNO((void)pidfile_remove(pfh)); 791 pjdlog_exit(EX_OSERR, 792 "Unable to create control sockets between parent and child"); 793 } 794 /* 795 * Create communication channel between child and parent. 796 */ 797 if (proto_client("socketpair://", &res->hr_event) < 0) { 798 KEEP_ERRNO((void)pidfile_remove(pfh)); 799 pjdlog_exit(EX_OSERR, 800 "Unable to create event sockets between child and parent"); 801 } 802 803 pid = fork(); 804 if (pid < 0) { 805 KEEP_ERRNO((void)pidfile_remove(pfh)); 806 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 807 } 808 809 if (pid > 0) { 810 /* This is parent. */ 811 /* Declare that we are receiver. */ 812 proto_recv(res->hr_event, NULL, 0); 813 res->hr_workerpid = pid; 814 return; 815 } 816 817 gres = res; 818 819 (void)pidfile_close(pfh); 820 hook_fini(); 821 822 setproctitle("%s (primary)", res->hr_name); 823 824 /* Declare that we are sender. */ 825 proto_send(res->hr_event, NULL, 0); 826 827 init_local(res); 828 init_ggate(res); 829 init_environment(res); 830 /* 831 * Create the guard thread first, so we can handle signals from the 832 * very begining. 833 */ 834 error = pthread_create(&td, NULL, guard_thread, res); 835 assert(error == 0); 836 /* 837 * Create the control thread before sending any event to the parent, 838 * as we can deadlock when parent sends control request to worker, 839 * but worker has no control thread started yet, so parent waits. 840 * In the meantime worker sends an event to the parent, but parent 841 * is unable to handle the event, because it waits for control 842 * request response. 843 */ 844 error = pthread_create(&td, NULL, ctrl_thread, res); 845 assert(error == 0); 846 if (real_remote(res) && init_remote(res, NULL, NULL)) 847 sync_start(); 848 error = pthread_create(&td, NULL, ggate_recv_thread, res); 849 assert(error == 0); 850 error = pthread_create(&td, NULL, local_send_thread, res); 851 assert(error == 0); 852 error = pthread_create(&td, NULL, remote_send_thread, res); 853 assert(error == 0); 854 error = pthread_create(&td, NULL, remote_recv_thread, res); 855 assert(error == 0); 856 error = pthread_create(&td, NULL, ggate_send_thread, res); 857 assert(error == 0); 858 (void)sync_thread(res); 859} 860 861static void 862reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 863{ 864 char msg[1024]; 865 va_list ap; 866 int len; 867 868 va_start(ap, fmt); 869 len = vsnprintf(msg, sizeof(msg), fmt, ap); 870 va_end(ap); 871 if ((size_t)len < sizeof(msg)) { 872 switch (ggio->gctl_cmd) { 873 case BIO_READ: 874 (void)snprintf(msg + len, sizeof(msg) - len, 875 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 876 (uintmax_t)ggio->gctl_length); 877 break; 878 case BIO_DELETE: 879 (void)snprintf(msg + len, sizeof(msg) - len, 880 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 881 (uintmax_t)ggio->gctl_length); 882 break; 883 case BIO_FLUSH: 884 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 885 break; 886 case BIO_WRITE: 887 (void)snprintf(msg + len, sizeof(msg) - len, 888 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 889 (uintmax_t)ggio->gctl_length); 890 break; 891 default: 892 (void)snprintf(msg + len, sizeof(msg) - len, 893 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 894 break; 895 } 896 } 897 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 898} 899 900static void 901remote_close(struct hast_resource *res, int ncomp) 902{ 903 904 rw_wlock(&hio_remote_lock[ncomp]); 905 /* 906 * A race is possible between dropping rlock and acquiring wlock - 907 * another thread can close connection in-between. 908 */ 909 if (!ISCONNECTED(res, ncomp)) { 910 assert(res->hr_remotein == NULL); 911 assert(res->hr_remoteout == NULL); 912 rw_unlock(&hio_remote_lock[ncomp]); 913 return; 914 } 915 916 assert(res->hr_remotein != NULL); 917 assert(res->hr_remoteout != NULL); 918 919 pjdlog_debug(2, "Closing incoming connection to %s.", 920 res->hr_remoteaddr); 921 proto_close(res->hr_remotein); 922 res->hr_remotein = NULL; 923 pjdlog_debug(2, "Closing outgoing connection to %s.", 924 res->hr_remoteaddr); 925 proto_close(res->hr_remoteout); 926 res->hr_remoteout = NULL; 927 928 rw_unlock(&hio_remote_lock[ncomp]); 929 930 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 931 932 /* 933 * Stop synchronization if in-progress. 934 */ 935 sync_stop(); 936 937 event_send(res, EVENT_DISCONNECT); 938} 939 940/* 941 * Thread receives ggate I/O requests from the kernel and passes them to 942 * appropriate threads: 943 * WRITE - always goes to both local_send and remote_send threads 944 * READ (when the block is up-to-date on local component) - 945 * only local_send thread 946 * READ (when the block isn't up-to-date on local component) - 947 * only remote_send thread 948 * DELETE - always goes to both local_send and remote_send threads 949 * FLUSH - always goes to both local_send and remote_send threads 950 */ 951static void * 952ggate_recv_thread(void *arg) 953{ 954 struct hast_resource *res = arg; 955 struct g_gate_ctl_io *ggio; 956 struct hio *hio; 957 unsigned int ii, ncomp, ncomps; 958 int error; 959 960 ncomps = HAST_NCOMPONENTS; 961 962 for (;;) { 963 pjdlog_debug(2, "ggate_recv: Taking free request."); 964 QUEUE_TAKE2(hio, free); 965 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 966 ggio = &hio->hio_ggio; 967 ggio->gctl_unit = res->hr_ggateunit; 968 ggio->gctl_length = MAXPHYS; 969 ggio->gctl_error = 0; 970 pjdlog_debug(2, 971 "ggate_recv: (%p) Waiting for request from the kernel.", 972 hio); 973 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 974 if (sigexit_received) 975 pthread_exit(NULL); 976 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 977 } 978 error = ggio->gctl_error; 979 switch (error) { 980 case 0: 981 break; 982 case ECANCELED: 983 /* Exit gracefully. */ 984 if (!sigexit_received) { 985 pjdlog_debug(2, 986 "ggate_recv: (%p) Received cancel from the kernel.", 987 hio); 988 pjdlog_info("Received cancel from the kernel, exiting."); 989 } 990 pthread_exit(NULL); 991 case ENOMEM: 992 /* 993 * Buffer too small? Impossible, we allocate MAXPHYS 994 * bytes - request can't be bigger than that. 995 */ 996 /* FALLTHROUGH */ 997 case ENXIO: 998 default: 999 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1000 strerror(error)); 1001 } 1002 for (ii = 0; ii < ncomps; ii++) 1003 hio->hio_errors[ii] = EINVAL; 1004 reqlog(LOG_DEBUG, 2, ggio, 1005 "ggate_recv: (%p) Request received from the kernel: ", 1006 hio); 1007 /* 1008 * Inform all components about new write request. 1009 * For read request prefer local component unless the given 1010 * range is out-of-date, then use remote component. 1011 */ 1012 switch (ggio->gctl_cmd) { 1013 case BIO_READ: 1014 pjdlog_debug(2, 1015 "ggate_recv: (%p) Moving request to the send queue.", 1016 hio); 1017 refcount_init(&hio->hio_countdown, 1); 1018 mtx_lock(&metadata_lock); 1019 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1020 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1021 /* 1022 * This range is up-to-date on local component, 1023 * so handle request locally. 1024 */ 1025 /* Local component is 0 for now. */ 1026 ncomp = 0; 1027 } else /* if (res->hr_syncsrc == 1028 HAST_SYNCSRC_SECONDARY) */ { 1029 assert(res->hr_syncsrc == 1030 HAST_SYNCSRC_SECONDARY); 1031 /* 1032 * This range is out-of-date on local component, 1033 * so send request to the remote node. 1034 */ 1035 /* Remote component is 1 for now. */ 1036 ncomp = 1; 1037 } 1038 mtx_unlock(&metadata_lock); 1039 QUEUE_INSERT1(hio, send, ncomp); 1040 break; 1041 case BIO_WRITE: 1042 if (res->hr_resuid == 0) { 1043 /* This is first write, initialize resuid. */ 1044 (void)init_resuid(res); 1045 } 1046 for (;;) { 1047 mtx_lock(&range_lock); 1048 if (rangelock_islocked(range_sync, 1049 ggio->gctl_offset, ggio->gctl_length)) { 1050 pjdlog_debug(2, 1051 "regular: Range offset=%jd length=%zu locked.", 1052 (intmax_t)ggio->gctl_offset, 1053 (size_t)ggio->gctl_length); 1054 range_regular_wait = true; 1055 cv_wait(&range_regular_cond, &range_lock); 1056 range_regular_wait = false; 1057 mtx_unlock(&range_lock); 1058 continue; 1059 } 1060 if (rangelock_add(range_regular, 1061 ggio->gctl_offset, ggio->gctl_length) < 0) { 1062 mtx_unlock(&range_lock); 1063 pjdlog_debug(2, 1064 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1065 (intmax_t)ggio->gctl_offset, 1066 (size_t)ggio->gctl_length); 1067 sleep(1); 1068 continue; 1069 } 1070 mtx_unlock(&range_lock); 1071 break; 1072 } 1073 mtx_lock(&res->hr_amp_lock); 1074 if (activemap_write_start(res->hr_amp, 1075 ggio->gctl_offset, ggio->gctl_length)) { 1076 (void)hast_activemap_flush(res); 1077 } 1078 mtx_unlock(&res->hr_amp_lock); 1079 /* FALLTHROUGH */ 1080 case BIO_DELETE: 1081 case BIO_FLUSH: 1082 pjdlog_debug(2, 1083 "ggate_recv: (%p) Moving request to the send queues.", 1084 hio); 1085 refcount_init(&hio->hio_countdown, ncomps); 1086 for (ii = 0; ii < ncomps; ii++) 1087 QUEUE_INSERT1(hio, send, ii); 1088 break; 1089 } 1090 } 1091 /* NOTREACHED */ 1092 return (NULL); 1093} 1094 1095/* 1096 * Thread reads from or writes to local component. 1097 * If local read fails, it redirects it to remote_send thread. 1098 */ 1099static void * 1100local_send_thread(void *arg) 1101{ 1102 struct hast_resource *res = arg; 1103 struct g_gate_ctl_io *ggio; 1104 struct hio *hio; 1105 unsigned int ncomp, rncomp; 1106 ssize_t ret; 1107 1108 /* Local component is 0 for now. */ 1109 ncomp = 0; 1110 /* Remote component is 1 for now. */ 1111 rncomp = 1; 1112 1113 for (;;) { 1114 pjdlog_debug(2, "local_send: Taking request."); 1115 QUEUE_TAKE1(hio, send, ncomp); 1116 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1117 ggio = &hio->hio_ggio; 1118 switch (ggio->gctl_cmd) { 1119 case BIO_READ: 1120 ret = pread(res->hr_localfd, ggio->gctl_data, 1121 ggio->gctl_length, 1122 ggio->gctl_offset + res->hr_localoff); 1123 if (ret == ggio->gctl_length) 1124 hio->hio_errors[ncomp] = 0; 1125 else { 1126 /* 1127 * If READ failed, try to read from remote node. 1128 */ 1129 QUEUE_INSERT1(hio, send, rncomp); 1130 continue; 1131 } 1132 break; 1133 case BIO_WRITE: 1134 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1135 ggio->gctl_length, 1136 ggio->gctl_offset + res->hr_localoff); 1137 if (ret < 0) 1138 hio->hio_errors[ncomp] = errno; 1139 else if (ret != ggio->gctl_length) 1140 hio->hio_errors[ncomp] = EIO; 1141 else 1142 hio->hio_errors[ncomp] = 0; 1143 break; 1144 case BIO_DELETE: 1145 ret = g_delete(res->hr_localfd, 1146 ggio->gctl_offset + res->hr_localoff, 1147 ggio->gctl_length); 1148 if (ret < 0) 1149 hio->hio_errors[ncomp] = errno; 1150 else 1151 hio->hio_errors[ncomp] = 0; 1152 break; 1153 case BIO_FLUSH: 1154 ret = g_flush(res->hr_localfd); 1155 if (ret < 0) 1156 hio->hio_errors[ncomp] = errno; 1157 else 1158 hio->hio_errors[ncomp] = 0; 1159 break; 1160 } 1161 if (refcount_release(&hio->hio_countdown)) { 1162 if (ISSYNCREQ(hio)) { 1163 mtx_lock(&sync_lock); 1164 SYNCREQDONE(hio); 1165 mtx_unlock(&sync_lock); 1166 cv_signal(&sync_cond); 1167 } else { 1168 pjdlog_debug(2, 1169 "local_send: (%p) Moving request to the done queue.", 1170 hio); 1171 QUEUE_INSERT2(hio, done); 1172 } 1173 } 1174 } 1175 /* NOTREACHED */ 1176 return (NULL); 1177} 1178 1179/* 1180 * Thread sends request to secondary node. 1181 */ 1182static void * 1183remote_send_thread(void *arg) 1184{ 1185 struct hast_resource *res = arg; 1186 struct g_gate_ctl_io *ggio; 1187 struct hio *hio; 1188 struct nv *nv; 1189 unsigned int ncomp; 1190 bool wakeup; 1191 uint64_t offset, length; 1192 uint8_t cmd; 1193 void *data; 1194 1195 /* Remote component is 1 for now. */ 1196 ncomp = 1; 1197 1198 for (;;) { 1199 pjdlog_debug(2, "remote_send: Taking request."); 1200 QUEUE_TAKE1(hio, send, ncomp); 1201 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1202 ggio = &hio->hio_ggio; 1203 switch (ggio->gctl_cmd) { 1204 case BIO_READ: 1205 cmd = HIO_READ; 1206 data = NULL; 1207 offset = ggio->gctl_offset; 1208 length = ggio->gctl_length; 1209 break; 1210 case BIO_WRITE: 1211 cmd = HIO_WRITE; 1212 data = ggio->gctl_data; 1213 offset = ggio->gctl_offset; 1214 length = ggio->gctl_length; 1215 break; 1216 case BIO_DELETE: 1217 cmd = HIO_DELETE; 1218 data = NULL; 1219 offset = ggio->gctl_offset; 1220 length = ggio->gctl_length; 1221 break; 1222 case BIO_FLUSH: 1223 cmd = HIO_FLUSH; 1224 data = NULL; 1225 offset = 0; 1226 length = 0; 1227 break; 1228 default: 1229 assert(!"invalid condition"); 1230 abort(); 1231 } 1232 nv = nv_alloc(); 1233 nv_add_uint8(nv, cmd, "cmd"); 1234 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1235 nv_add_uint64(nv, offset, "offset"); 1236 nv_add_uint64(nv, length, "length"); 1237 if (nv_error(nv) != 0) { 1238 hio->hio_errors[ncomp] = nv_error(nv); 1239 pjdlog_debug(2, 1240 "remote_send: (%p) Unable to prepare header to send.", 1241 hio); 1242 reqlog(LOG_ERR, 0, ggio, 1243 "Unable to prepare header to send (%s): ", 1244 strerror(nv_error(nv))); 1245 /* Move failed request immediately to the done queue. */ 1246 goto done_queue; 1247 } 1248 pjdlog_debug(2, 1249 "remote_send: (%p) Moving request to the recv queue.", 1250 hio); 1251 /* 1252 * Protect connection from disappearing. 1253 */ 1254 rw_rlock(&hio_remote_lock[ncomp]); 1255 if (!ISCONNECTED(res, ncomp)) { 1256 rw_unlock(&hio_remote_lock[ncomp]); 1257 hio->hio_errors[ncomp] = ENOTCONN; 1258 goto done_queue; 1259 } 1260 /* 1261 * Move the request to recv queue before sending it, because 1262 * in different order we can get reply before we move request 1263 * to recv queue. 1264 */ 1265 mtx_lock(&hio_recv_list_lock[ncomp]); 1266 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1267 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1268 mtx_unlock(&hio_recv_list_lock[ncomp]); 1269 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1270 data != NULL ? length : 0) < 0) { 1271 hio->hio_errors[ncomp] = errno; 1272 rw_unlock(&hio_remote_lock[ncomp]); 1273 pjdlog_debug(2, 1274 "remote_send: (%p) Unable to send request.", hio); 1275 reqlog(LOG_ERR, 0, ggio, 1276 "Unable to send request (%s): ", 1277 strerror(hio->hio_errors[ncomp])); 1278 remote_close(res, ncomp); 1279 /* 1280 * Take request back from the receive queue and move 1281 * it immediately to the done queue. 1282 */ 1283 mtx_lock(&hio_recv_list_lock[ncomp]); 1284 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1285 mtx_unlock(&hio_recv_list_lock[ncomp]); 1286 goto done_queue; 1287 } 1288 rw_unlock(&hio_remote_lock[ncomp]); 1289 nv_free(nv); 1290 if (wakeup) 1291 cv_signal(&hio_recv_list_cond[ncomp]); 1292 continue; 1293done_queue: 1294 nv_free(nv); 1295 if (ISSYNCREQ(hio)) { 1296 if (!refcount_release(&hio->hio_countdown)) 1297 continue; 1298 mtx_lock(&sync_lock); 1299 SYNCREQDONE(hio); 1300 mtx_unlock(&sync_lock); 1301 cv_signal(&sync_cond); 1302 continue; 1303 } 1304 if (ggio->gctl_cmd == BIO_WRITE) { 1305 mtx_lock(&res->hr_amp_lock); 1306 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1307 ggio->gctl_length)) { 1308 (void)hast_activemap_flush(res); 1309 } 1310 mtx_unlock(&res->hr_amp_lock); 1311 } 1312 if (!refcount_release(&hio->hio_countdown)) 1313 continue; 1314 pjdlog_debug(2, 1315 "remote_send: (%p) Moving request to the done queue.", 1316 hio); 1317 QUEUE_INSERT2(hio, done); 1318 } 1319 /* NOTREACHED */ 1320 return (NULL); 1321} 1322 1323/* 1324 * Thread receives answer from secondary node and passes it to ggate_send 1325 * thread. 1326 */ 1327static void * 1328remote_recv_thread(void *arg) 1329{ 1330 struct hast_resource *res = arg; 1331 struct g_gate_ctl_io *ggio; 1332 struct hio *hio; 1333 struct nv *nv; 1334 unsigned int ncomp; 1335 uint64_t seq; 1336 int error; 1337 1338 /* Remote component is 1 for now. */ 1339 ncomp = 1; 1340 1341 for (;;) { 1342 /* Wait until there is anything to receive. */ 1343 mtx_lock(&hio_recv_list_lock[ncomp]); 1344 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1345 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1346 cv_wait(&hio_recv_list_cond[ncomp], 1347 &hio_recv_list_lock[ncomp]); 1348 } 1349 mtx_unlock(&hio_recv_list_lock[ncomp]); 1350 rw_rlock(&hio_remote_lock[ncomp]); 1351 if (!ISCONNECTED(res, ncomp)) { 1352 rw_unlock(&hio_remote_lock[ncomp]); 1353 /* 1354 * Connection is dead, so move all pending requests to 1355 * the done queue (one-by-one). 1356 */ 1357 mtx_lock(&hio_recv_list_lock[ncomp]); 1358 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1359 assert(hio != NULL); 1360 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1361 hio_next[ncomp]); 1362 mtx_unlock(&hio_recv_list_lock[ncomp]); 1363 goto done_queue; 1364 } 1365 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1366 pjdlog_errno(LOG_ERR, 1367 "Unable to receive reply header"); 1368 rw_unlock(&hio_remote_lock[ncomp]); 1369 remote_close(res, ncomp); 1370 continue; 1371 } 1372 rw_unlock(&hio_remote_lock[ncomp]); 1373 seq = nv_get_uint64(nv, "seq"); 1374 if (seq == 0) { 1375 pjdlog_error("Header contains no 'seq' field."); 1376 nv_free(nv); 1377 continue; 1378 } 1379 mtx_lock(&hio_recv_list_lock[ncomp]); 1380 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1381 if (hio->hio_ggio.gctl_seq == seq) { 1382 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1383 hio_next[ncomp]); 1384 break; 1385 } 1386 } 1387 mtx_unlock(&hio_recv_list_lock[ncomp]); 1388 if (hio == NULL) { 1389 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1390 (uintmax_t)seq); 1391 nv_free(nv); 1392 continue; 1393 } 1394 error = nv_get_int16(nv, "error"); 1395 if (error != 0) { 1396 /* Request failed on remote side. */ 1397 hio->hio_errors[ncomp] = 0; 1398 nv_free(nv); 1399 goto done_queue; 1400 } 1401 ggio = &hio->hio_ggio; 1402 switch (ggio->gctl_cmd) { 1403 case BIO_READ: 1404 rw_rlock(&hio_remote_lock[ncomp]); 1405 if (!ISCONNECTED(res, ncomp)) { 1406 rw_unlock(&hio_remote_lock[ncomp]); 1407 nv_free(nv); 1408 goto done_queue; 1409 } 1410 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1411 ggio->gctl_data, ggio->gctl_length) < 0) { 1412 hio->hio_errors[ncomp] = errno; 1413 pjdlog_errno(LOG_ERR, 1414 "Unable to receive reply data"); 1415 rw_unlock(&hio_remote_lock[ncomp]); 1416 nv_free(nv); 1417 remote_close(res, ncomp); 1418 goto done_queue; 1419 } 1420 rw_unlock(&hio_remote_lock[ncomp]); 1421 break; 1422 case BIO_WRITE: 1423 case BIO_DELETE: 1424 case BIO_FLUSH: 1425 break; 1426 default: 1427 assert(!"invalid condition"); 1428 abort(); 1429 } 1430 hio->hio_errors[ncomp] = 0; 1431 nv_free(nv); 1432done_queue: 1433 if (refcount_release(&hio->hio_countdown)) { 1434 if (ISSYNCREQ(hio)) { 1435 mtx_lock(&sync_lock); 1436 SYNCREQDONE(hio); 1437 mtx_unlock(&sync_lock); 1438 cv_signal(&sync_cond); 1439 } else { 1440 pjdlog_debug(2, 1441 "remote_recv: (%p) Moving request to the done queue.", 1442 hio); 1443 QUEUE_INSERT2(hio, done); 1444 } 1445 } 1446 } 1447 /* NOTREACHED */ 1448 return (NULL); 1449} 1450 1451/* 1452 * Thread sends answer to the kernel. 1453 */ 1454static void * 1455ggate_send_thread(void *arg) 1456{ 1457 struct hast_resource *res = arg; 1458 struct g_gate_ctl_io *ggio; 1459 struct hio *hio; 1460 unsigned int ii, ncomp, ncomps; 1461 1462 ncomps = HAST_NCOMPONENTS; 1463 1464 for (;;) { 1465 pjdlog_debug(2, "ggate_send: Taking request."); 1466 QUEUE_TAKE2(hio, done); 1467 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1468 ggio = &hio->hio_ggio; 1469 for (ii = 0; ii < ncomps; ii++) { 1470 if (hio->hio_errors[ii] == 0) { 1471 /* 1472 * One successful request is enough to declare 1473 * success. 1474 */ 1475 ggio->gctl_error = 0; 1476 break; 1477 } 1478 } 1479 if (ii == ncomps) { 1480 /* 1481 * None of the requests were successful. 1482 * Use first error. 1483 */ 1484 ggio->gctl_error = hio->hio_errors[0]; 1485 } 1486 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1487 mtx_lock(&res->hr_amp_lock); 1488 activemap_write_complete(res->hr_amp, 1489 ggio->gctl_offset, ggio->gctl_length); 1490 mtx_unlock(&res->hr_amp_lock); 1491 } 1492 if (ggio->gctl_cmd == BIO_WRITE) { 1493 /* 1494 * Unlock range we locked. 1495 */ 1496 mtx_lock(&range_lock); 1497 rangelock_del(range_regular, ggio->gctl_offset, 1498 ggio->gctl_length); 1499 if (range_sync_wait) 1500 cv_signal(&range_sync_cond); 1501 mtx_unlock(&range_lock); 1502 /* 1503 * Bump local count if this is first write after 1504 * connection failure with remote node. 1505 */ 1506 ncomp = 1; 1507 rw_rlock(&hio_remote_lock[ncomp]); 1508 if (!ISCONNECTED(res, ncomp)) { 1509 mtx_lock(&metadata_lock); 1510 if (res->hr_primary_localcnt == 1511 res->hr_secondary_remotecnt) { 1512 res->hr_primary_localcnt++; 1513 pjdlog_debug(1, 1514 "Increasing localcnt to %ju.", 1515 (uintmax_t)res->hr_primary_localcnt); 1516 (void)metadata_write(res); 1517 } 1518 mtx_unlock(&metadata_lock); 1519 } 1520 rw_unlock(&hio_remote_lock[ncomp]); 1521 } 1522 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1523 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1524 pjdlog_debug(2, 1525 "ggate_send: (%p) Moving request to the free queue.", hio); 1526 QUEUE_INSERT2(hio, free); 1527 } 1528 /* NOTREACHED */ 1529 return (NULL); 1530} 1531 1532/* 1533 * Thread synchronize local and remote components. 1534 */ 1535static void * 1536sync_thread(void *arg __unused) 1537{ 1538 struct hast_resource *res = arg; 1539 struct hio *hio; 1540 struct g_gate_ctl_io *ggio; 1541 unsigned int ii, ncomp, ncomps; 1542 off_t offset, length, synced; 1543 bool dorewind; 1544 int syncext; 1545 1546 ncomps = HAST_NCOMPONENTS; 1547 dorewind = true; 1548 synced = 0; 1549 offset = -1; 1550 1551 for (;;) { 1552 mtx_lock(&sync_lock); 1553 if (offset >= 0 && !sync_inprogress) { 1554 pjdlog_info("Synchronization interrupted. " 1555 "%jd bytes synchronized so far.", 1556 (intmax_t)synced); 1557 event_send(res, EVENT_SYNCINTR); 1558 } 1559 while (!sync_inprogress) { 1560 dorewind = true; 1561 synced = 0; 1562 cv_wait(&sync_cond, &sync_lock); 1563 } 1564 mtx_unlock(&sync_lock); 1565 /* 1566 * Obtain offset at which we should synchronize. 1567 * Rewind synchronization if needed. 1568 */ 1569 mtx_lock(&res->hr_amp_lock); 1570 if (dorewind) 1571 activemap_sync_rewind(res->hr_amp); 1572 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1573 if (syncext != -1) { 1574 /* 1575 * We synchronized entire syncext extent, we can mark 1576 * it as clean now. 1577 */ 1578 if (activemap_extent_complete(res->hr_amp, syncext)) 1579 (void)hast_activemap_flush(res); 1580 } 1581 mtx_unlock(&res->hr_amp_lock); 1582 if (dorewind) { 1583 dorewind = false; 1584 if (offset < 0) 1585 pjdlog_info("Nodes are in sync."); 1586 else { 1587 pjdlog_info("Synchronization started. %ju bytes to go.", 1588 (uintmax_t)(res->hr_extentsize * 1589 activemap_ndirty(res->hr_amp))); 1590 event_send(res, EVENT_SYNCSTART); 1591 } 1592 } 1593 if (offset < 0) { 1594 sync_stop(); 1595 pjdlog_debug(1, "Nothing to synchronize."); 1596 /* 1597 * Synchronization complete, make both localcnt and 1598 * remotecnt equal. 1599 */ 1600 ncomp = 1; 1601 rw_rlock(&hio_remote_lock[ncomp]); 1602 if (ISCONNECTED(res, ncomp)) { 1603 if (synced > 0) { 1604 pjdlog_info("Synchronization complete. " 1605 "%jd bytes synchronized.", 1606 (intmax_t)synced); 1607 event_send(res, EVENT_SYNCDONE); 1608 } 1609 mtx_lock(&metadata_lock); 1610 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1611 res->hr_primary_localcnt = 1612 res->hr_secondary_localcnt; 1613 res->hr_primary_remotecnt = 1614 res->hr_secondary_remotecnt; 1615 pjdlog_debug(1, 1616 "Setting localcnt to %ju and remotecnt to %ju.", 1617 (uintmax_t)res->hr_primary_localcnt, 1618 (uintmax_t)res->hr_secondary_localcnt); 1619 (void)metadata_write(res); 1620 mtx_unlock(&metadata_lock); 1621 } 1622 rw_unlock(&hio_remote_lock[ncomp]); 1623 continue; 1624 } 1625 pjdlog_debug(2, "sync: Taking free request."); 1626 QUEUE_TAKE2(hio, free); 1627 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1628 /* 1629 * Lock the range we are going to synchronize. We don't want 1630 * race where someone writes between our read and write. 1631 */ 1632 for (;;) { 1633 mtx_lock(&range_lock); 1634 if (rangelock_islocked(range_regular, offset, length)) { 1635 pjdlog_debug(2, 1636 "sync: Range offset=%jd length=%jd locked.", 1637 (intmax_t)offset, (intmax_t)length); 1638 range_sync_wait = true; 1639 cv_wait(&range_sync_cond, &range_lock); 1640 range_sync_wait = false; 1641 mtx_unlock(&range_lock); 1642 continue; 1643 } 1644 if (rangelock_add(range_sync, offset, length) < 0) { 1645 mtx_unlock(&range_lock); 1646 pjdlog_debug(2, 1647 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1648 (intmax_t)offset, (intmax_t)length); 1649 sleep(1); 1650 continue; 1651 } 1652 mtx_unlock(&range_lock); 1653 break; 1654 } 1655 /* 1656 * First read the data from synchronization source. 1657 */ 1658 SYNCREQ(hio); 1659 ggio = &hio->hio_ggio; 1660 ggio->gctl_cmd = BIO_READ; 1661 ggio->gctl_offset = offset; 1662 ggio->gctl_length = length; 1663 ggio->gctl_error = 0; 1664 for (ii = 0; ii < ncomps; ii++) 1665 hio->hio_errors[ii] = EINVAL; 1666 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1667 hio); 1668 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1669 hio); 1670 mtx_lock(&metadata_lock); 1671 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1672 /* 1673 * This range is up-to-date on local component, 1674 * so handle request locally. 1675 */ 1676 /* Local component is 0 for now. */ 1677 ncomp = 0; 1678 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1679 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1680 /* 1681 * This range is out-of-date on local component, 1682 * so send request to the remote node. 1683 */ 1684 /* Remote component is 1 for now. */ 1685 ncomp = 1; 1686 } 1687 mtx_unlock(&metadata_lock); 1688 refcount_init(&hio->hio_countdown, 1); 1689 QUEUE_INSERT1(hio, send, ncomp); 1690 1691 /* 1692 * Let's wait for READ to finish. 1693 */ 1694 mtx_lock(&sync_lock); 1695 while (!ISSYNCREQDONE(hio)) 1696 cv_wait(&sync_cond, &sync_lock); 1697 mtx_unlock(&sync_lock); 1698 1699 if (hio->hio_errors[ncomp] != 0) { 1700 pjdlog_error("Unable to read synchronization data: %s.", 1701 strerror(hio->hio_errors[ncomp])); 1702 goto free_queue; 1703 } 1704 1705 /* 1706 * We read the data from synchronization source, now write it 1707 * to synchronization target. 1708 */ 1709 SYNCREQ(hio); 1710 ggio->gctl_cmd = BIO_WRITE; 1711 for (ii = 0; ii < ncomps; ii++) 1712 hio->hio_errors[ii] = EINVAL; 1713 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1714 hio); 1715 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1716 hio); 1717 mtx_lock(&metadata_lock); 1718 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1719 /* 1720 * This range is up-to-date on local component, 1721 * so we update remote component. 1722 */ 1723 /* Remote component is 1 for now. */ 1724 ncomp = 1; 1725 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1726 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1727 /* 1728 * This range is out-of-date on local component, 1729 * so we update it. 1730 */ 1731 /* Local component is 0 for now. */ 1732 ncomp = 0; 1733 } 1734 mtx_unlock(&metadata_lock); 1735 1736 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1737 hio); 1738 refcount_init(&hio->hio_countdown, 1); 1739 QUEUE_INSERT1(hio, send, ncomp); 1740 1741 /* 1742 * Let's wait for WRITE to finish. 1743 */ 1744 mtx_lock(&sync_lock); 1745 while (!ISSYNCREQDONE(hio)) 1746 cv_wait(&sync_cond, &sync_lock); 1747 mtx_unlock(&sync_lock); 1748 1749 if (hio->hio_errors[ncomp] != 0) { 1750 pjdlog_error("Unable to write synchronization data: %s.", 1751 strerror(hio->hio_errors[ncomp])); 1752 goto free_queue; 1753 } 1754 1755 synced += length; 1756free_queue: 1757 mtx_lock(&range_lock); 1758 rangelock_del(range_sync, offset, length); 1759 if (range_regular_wait) 1760 cv_signal(&range_regular_cond); 1761 mtx_unlock(&range_lock); 1762 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1763 hio); 1764 QUEUE_INSERT2(hio, free); 1765 } 1766 /* NOTREACHED */ 1767 return (NULL); 1768} 1769 1770static void 1771config_reload(void) 1772{ 1773 struct hastd_config *newcfg; 1774 struct hast_resource *res; 1775 unsigned int ii, ncomps; 1776 int modified; 1777 1778 pjdlog_info("Reloading configuration..."); 1779 1780 ncomps = HAST_NCOMPONENTS; 1781 1782 newcfg = yy_config_parse(cfgpath, false); 1783 if (newcfg == NULL) 1784 goto failed; 1785 1786 TAILQ_FOREACH(res, &newcfg->hc_resources, hr_next) { 1787 if (strcmp(res->hr_name, gres->hr_name) == 0) 1788 break; 1789 } 1790 /* 1791 * If resource was removed from the configuration file, resource 1792 * name, provider name or path to local component was modified we 1793 * shouldn't be here. This means that someone modified configuration 1794 * file and send SIGHUP to us instead of main hastd process. 1795 * Log advice and ignore the signal. 1796 */ 1797 if (res == NULL || strcmp(gres->hr_name, res->hr_name) != 0 || 1798 strcmp(gres->hr_provname, res->hr_provname) != 0 || 1799 strcmp(gres->hr_localpath, res->hr_localpath) != 0) { 1800 pjdlog_warning("To reload configuration send SIGHUP to the main hastd process (pid %u).", 1801 (unsigned int)getppid()); 1802 goto failed; 1803 } 1804 1805#define MODIFIED_REMOTEADDR 0x1 1806#define MODIFIED_REPLICATION 0x2 1807#define MODIFIED_TIMEOUT 0x4 1808#define MODIFIED_EXEC 0x8 1809 modified = 0; 1810 if (strcmp(gres->hr_remoteaddr, res->hr_remoteaddr) != 0) { 1811 /* 1812 * Don't copy res->hr_remoteaddr to gres just yet. 1813 * We want remote_close() to log disconnect from the old 1814 * addresses, not from the new ones. 1815 */ 1816 modified |= MODIFIED_REMOTEADDR; 1817 } 1818 if (gres->hr_replication != res->hr_replication) { 1819 gres->hr_replication = res->hr_replication; 1820 modified |= MODIFIED_REPLICATION; 1821 } 1822 if (gres->hr_timeout != res->hr_timeout) { 1823 gres->hr_timeout = res->hr_timeout; 1824 modified |= MODIFIED_TIMEOUT; 1825 } 1826 if (strcmp(gres->hr_exec, res->hr_exec) != 0) { 1827 strlcpy(gres->hr_exec, res->hr_exec, sizeof(gres->hr_exec)); 1828 modified |= MODIFIED_EXEC; 1829 } 1830 /* 1831 * If only timeout was modified we only need to change it without 1832 * reconnecting. 1833 */ 1834 if (modified == MODIFIED_TIMEOUT) { 1835 for (ii = 0; ii < ncomps; ii++) { 1836 if (!ISREMOTE(ii)) 1837 continue; 1838 rw_rlock(&hio_remote_lock[ii]); 1839 if (!ISCONNECTED(gres, ii)) { 1840 rw_unlock(&hio_remote_lock[ii]); 1841 continue; 1842 } 1843 rw_unlock(&hio_remote_lock[ii]); 1844 if (proto_timeout(gres->hr_remotein, 1845 gres->hr_timeout) < 0) { 1846 pjdlog_errno(LOG_WARNING, 1847 "Unable to set connection timeout"); 1848 } 1849 if (proto_timeout(gres->hr_remoteout, 1850 gres->hr_timeout) < 0) { 1851 pjdlog_errno(LOG_WARNING, 1852 "Unable to set connection timeout"); 1853 } 1854 } 1855 } else if ((modified & 1856 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 1857 for (ii = 0; ii < ncomps; ii++) { 1858 if (!ISREMOTE(ii)) 1859 continue; 1860 remote_close(gres, ii); 1861 } 1862 if (modified & MODIFIED_REMOTEADDR) { 1863 strlcpy(gres->hr_remoteaddr, res->hr_remoteaddr, 1864 sizeof(gres->hr_remoteaddr)); 1865 } 1866 } 1867#undef MODIFIED_REMOTEADDR 1868#undef MODIFIED_REPLICATION 1869#undef MODIFIED_TIMEOUT 1870#undef MODIFIED_EXEC 1871 1872 pjdlog_info("Configuration reloaded successfully."); 1873 return; 1874failed: 1875 if (newcfg != NULL) { 1876 if (newcfg->hc_controlconn != NULL) 1877 proto_close(newcfg->hc_controlconn); 1878 if (newcfg->hc_listenconn != NULL) 1879 proto_close(newcfg->hc_listenconn); 1880 yy_config_free(newcfg); 1881 } 1882 pjdlog_warning("Configuration not reloaded."); 1883} 1884 1885static void 1886keepalive_send(struct hast_resource *res, unsigned int ncomp) 1887{ 1888 struct nv *nv; 1889 1890 nv = nv_alloc(); 1891 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1892 if (nv_error(nv) != 0) { 1893 nv_free(nv); 1894 pjdlog_debug(1, 1895 "keepalive_send: Unable to prepare header to send."); 1896 return; 1897 } 1898 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1899 pjdlog_common(LOG_DEBUG, 1, errno, 1900 "keepalive_send: Unable to send request"); 1901 nv_free(nv); 1902 rw_unlock(&hio_remote_lock[ncomp]); 1903 remote_close(res, ncomp); 1904 rw_rlock(&hio_remote_lock[ncomp]); 1905 return; 1906 } 1907 nv_free(nv); 1908 pjdlog_debug(2, "keepalive_send: Request sent."); 1909} 1910 1911static void 1912guard_one(struct hast_resource *res, unsigned int ncomp) 1913{ 1914 struct proto_conn *in, *out; 1915 1916 if (!ISREMOTE(ncomp)) 1917 return; 1918 1919 rw_rlock(&hio_remote_lock[ncomp]); 1920 1921 if (!real_remote(res)) { 1922 rw_unlock(&hio_remote_lock[ncomp]); 1923 return; 1924 } 1925 1926 if (ISCONNECTED(res, ncomp)) { 1927 assert(res->hr_remotein != NULL); 1928 assert(res->hr_remoteout != NULL); 1929 keepalive_send(res, ncomp); 1930 } 1931 1932 if (ISCONNECTED(res, ncomp)) { 1933 assert(res->hr_remotein != NULL); 1934 assert(res->hr_remoteout != NULL); 1935 rw_unlock(&hio_remote_lock[ncomp]); 1936 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 1937 res->hr_remoteaddr); 1938 return; 1939 } 1940 1941 assert(res->hr_remotein == NULL); 1942 assert(res->hr_remoteout == NULL); 1943 /* 1944 * Upgrade the lock. It doesn't have to be atomic as no other thread 1945 * can change connection status from disconnected to connected. 1946 */ 1947 rw_unlock(&hio_remote_lock[ncomp]); 1948 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 1949 res->hr_remoteaddr); 1950 in = out = NULL; 1951 if (init_remote(res, &in, &out)) { 1952 rw_wlock(&hio_remote_lock[ncomp]); 1953 assert(res->hr_remotein == NULL); 1954 assert(res->hr_remoteout == NULL); 1955 assert(in != NULL && out != NULL); 1956 res->hr_remotein = in; 1957 res->hr_remoteout = out; 1958 rw_unlock(&hio_remote_lock[ncomp]); 1959 pjdlog_info("Successfully reconnected to %s.", 1960 res->hr_remoteaddr); 1961 sync_start(); 1962 } else { 1963 /* Both connections should be NULL. */ 1964 assert(res->hr_remotein == NULL); 1965 assert(res->hr_remoteout == NULL); 1966 assert(in == NULL && out == NULL); 1967 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 1968 res->hr_remoteaddr); 1969 } 1970} 1971 1972/* 1973 * Thread guards remote connections and reconnects when needed, handles 1974 * signals, etc. 1975 */ 1976static void * 1977guard_thread(void *arg) 1978{ 1979 struct hast_resource *res = arg; 1980 unsigned int ii, ncomps; 1981 struct timespec timeout; 1982 time_t lastcheck, now; 1983 sigset_t mask; 1984 int signo; 1985 1986 ncomps = HAST_NCOMPONENTS; 1987 lastcheck = time(NULL); 1988 1989 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 1990 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); 1991 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 1992 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 1993 1994 timeout.tv_nsec = 0; 1995 signo = -1; 1996 1997 for (;;) { 1998 switch (signo) { 1999 case SIGHUP: 2000 config_reload(); 2001 break; 2002 case SIGINT: 2003 case SIGTERM: 2004 sigexit_received = true; 2005 primary_exitx(EX_OK, 2006 "Termination signal received, exiting."); 2007 break; 2008 default: 2009 break; 2010 } 2011 2012 pjdlog_debug(2, "remote_guard: Checking connections."); 2013 now = time(NULL); 2014 if (lastcheck + RETRY_SLEEP <= now) { 2015 for (ii = 0; ii < ncomps; ii++) 2016 guard_one(res, ii); 2017 lastcheck = now; 2018 } 2019 timeout.tv_sec = RETRY_SLEEP; 2020 signo = sigtimedwait(&mask, NULL, &timeout); 2021 } 2022 /* NOTREACHED */ 2023 return (NULL); 2024} 2025