primary.c revision 218217
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 218217 2011-02-03 11:33:32Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <signal.h> 49#include <stdint.h> 50#include <stdio.h> 51#include <string.h> 52#include <sysexits.h> 53#include <unistd.h> 54 55#include <activemap.h> 56#include <nv.h> 57#include <rangelock.h> 58 59#include "control.h" 60#include "event.h" 61#include "hast.h" 62#include "hast_proto.h" 63#include "hastd.h" 64#include "hooks.h" 65#include "metadata.h" 66#include "proto.h" 67#include "pjdlog.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to comunicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 TAILQ_ENTRY(hio) *hio_next; 93}; 94#define hio_free_next hio_next[0] 95#define hio_done_next hio_next[0] 96 97/* 98 * Free list holds unused structures. When free list is empty, we have to wait 99 * until some in-progress requests are freed. 100 */ 101static TAILQ_HEAD(, hio) hio_free_list; 102static pthread_mutex_t hio_free_list_lock; 103static pthread_cond_t hio_free_list_cond; 104/* 105 * There is one send list for every component. One requests is placed on all 106 * send lists - each component gets the same request, but each component is 107 * responsible for managing his own send list. 108 */ 109static TAILQ_HEAD(, hio) *hio_send_list; 110static pthread_mutex_t *hio_send_list_lock; 111static pthread_cond_t *hio_send_list_cond; 112/* 113 * There is one recv list for every component, although local components don't 114 * use recv lists as local requests are done synchronously. 115 */ 116static TAILQ_HEAD(, hio) *hio_recv_list; 117static pthread_mutex_t *hio_recv_list_lock; 118static pthread_cond_t *hio_recv_list_cond; 119/* 120 * Request is placed on done list by the slowest component (the one that 121 * decreased hio_countdown from 1 to 0). 122 */ 123static TAILQ_HEAD(, hio) hio_done_list; 124static pthread_mutex_t hio_done_list_lock; 125static pthread_cond_t hio_done_list_cond; 126/* 127 * Structure below are for interaction with sync thread. 128 */ 129static bool sync_inprogress; 130static pthread_mutex_t sync_lock; 131static pthread_cond_t sync_cond; 132/* 133 * The lock below allows to synchornize access to remote connections. 134 */ 135static pthread_rwlock_t *hio_remote_lock; 136 137/* 138 * Lock to synchronize metadata updates. Also synchronize access to 139 * hr_primary_localcnt and hr_primary_remotecnt fields. 140 */ 141static pthread_mutex_t metadata_lock; 142 143/* 144 * Maximum number of outstanding I/O requests. 145 */ 146#define HAST_HIO_MAX 256 147/* 148 * Number of components. At this point there are only two components: local 149 * and remote, but in the future it might be possible to use multiple local 150 * and remote components. 151 */ 152#define HAST_NCOMPONENTS 2 153/* 154 * Number of seconds to sleep between reconnect retries or keepalive packets. 155 */ 156#define RETRY_SLEEP 10 157 158#define ISCONNECTED(res, no) \ 159 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 160 161#define QUEUE_INSERT1(hio, name, ncomp) do { \ 162 bool _wakeup; \ 163 \ 164 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 165 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 166 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 167 hio_next[(ncomp)]); \ 168 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 169 if (_wakeup) \ 170 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 171} while (0) 172#define QUEUE_INSERT2(hio, name) do { \ 173 bool _wakeup; \ 174 \ 175 mtx_lock(&hio_##name##_list_lock); \ 176 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 177 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 178 mtx_unlock(&hio_##name##_list_lock); \ 179 if (_wakeup) \ 180 cv_signal(&hio_##name##_list_cond); \ 181} while (0) 182#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 183 bool _last; \ 184 \ 185 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 186 _last = false; \ 187 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 188 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 189 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 190 if ((timeout) != 0) \ 191 _last = true; \ 192 } \ 193 if (hio != NULL) { \ 194 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 195 hio_next[(ncomp)]); \ 196 } \ 197 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 198} while (0) 199#define QUEUE_TAKE2(hio, name) do { \ 200 mtx_lock(&hio_##name##_list_lock); \ 201 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 202 cv_wait(&hio_##name##_list_cond, \ 203 &hio_##name##_list_lock); \ 204 } \ 205 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 206 mtx_unlock(&hio_##name##_list_lock); \ 207} while (0) 208 209#define SYNCREQ(hio) do { \ 210 (hio)->hio_ggio.gctl_unit = -1; \ 211 (hio)->hio_ggio.gctl_seq = 1; \ 212} while (0) 213#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 214#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 215#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 216 217static struct hast_resource *gres; 218 219static pthread_mutex_t range_lock; 220static struct rangelocks *range_regular; 221static bool range_regular_wait; 222static pthread_cond_t range_regular_cond; 223static struct rangelocks *range_sync; 224static bool range_sync_wait; 225static pthread_cond_t range_sync_cond; 226 227static void *ggate_recv_thread(void *arg); 228static void *local_send_thread(void *arg); 229static void *remote_send_thread(void *arg); 230static void *remote_recv_thread(void *arg); 231static void *ggate_send_thread(void *arg); 232static void *sync_thread(void *arg); 233static void *guard_thread(void *arg); 234 235static void 236cleanup(struct hast_resource *res) 237{ 238 int rerrno; 239 240 /* Remember errno. */ 241 rerrno = errno; 242 243 /* Destroy ggate provider if we created one. */ 244 if (res->hr_ggateunit >= 0) { 245 struct g_gate_ctl_destroy ggiod; 246 247 bzero(&ggiod, sizeof(ggiod)); 248 ggiod.gctl_version = G_GATE_VERSION; 249 ggiod.gctl_unit = res->hr_ggateunit; 250 ggiod.gctl_force = 1; 251 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 252 pjdlog_errno(LOG_WARNING, 253 "Unable to destroy hast/%s device", 254 res->hr_provname); 255 } 256 res->hr_ggateunit = -1; 257 } 258 259 /* Restore errno. */ 260 errno = rerrno; 261} 262 263static __dead2 void 264primary_exit(int exitcode, const char *fmt, ...) 265{ 266 va_list ap; 267 268 PJDLOG_ASSERT(exitcode != EX_OK); 269 va_start(ap, fmt); 270 pjdlogv_errno(LOG_ERR, fmt, ap); 271 va_end(ap); 272 cleanup(gres); 273 exit(exitcode); 274} 275 276static __dead2 void 277primary_exitx(int exitcode, const char *fmt, ...) 278{ 279 va_list ap; 280 281 va_start(ap, fmt); 282 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 283 va_end(ap); 284 cleanup(gres); 285 exit(exitcode); 286} 287 288static int 289hast_activemap_flush(struct hast_resource *res) 290{ 291 const unsigned char *buf; 292 size_t size; 293 294 buf = activemap_bitmap(res->hr_amp, &size); 295 PJDLOG_ASSERT(buf != NULL); 296 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 297 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 298 (ssize_t)size) { 299 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 300 "Unable to flush activemap to disk")); 301 return (-1); 302 } 303 return (0); 304} 305 306static bool 307real_remote(const struct hast_resource *res) 308{ 309 310 return (strcmp(res->hr_remoteaddr, "none") != 0); 311} 312 313static void 314init_environment(struct hast_resource *res __unused) 315{ 316 struct hio *hio; 317 unsigned int ii, ncomps; 318 319 /* 320 * In the future it might be per-resource value. 321 */ 322 ncomps = HAST_NCOMPONENTS; 323 324 /* 325 * Allocate memory needed by lists. 326 */ 327 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 328 if (hio_send_list == NULL) { 329 primary_exitx(EX_TEMPFAIL, 330 "Unable to allocate %zu bytes of memory for send lists.", 331 sizeof(hio_send_list[0]) * ncomps); 332 } 333 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 334 if (hio_send_list_lock == NULL) { 335 primary_exitx(EX_TEMPFAIL, 336 "Unable to allocate %zu bytes of memory for send list locks.", 337 sizeof(hio_send_list_lock[0]) * ncomps); 338 } 339 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 340 if (hio_send_list_cond == NULL) { 341 primary_exitx(EX_TEMPFAIL, 342 "Unable to allocate %zu bytes of memory for send list condition variables.", 343 sizeof(hio_send_list_cond[0]) * ncomps); 344 } 345 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 346 if (hio_recv_list == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for recv lists.", 349 sizeof(hio_recv_list[0]) * ncomps); 350 } 351 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 352 if (hio_recv_list_lock == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for recv list locks.", 355 sizeof(hio_recv_list_lock[0]) * ncomps); 356 } 357 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 358 if (hio_recv_list_cond == NULL) { 359 primary_exitx(EX_TEMPFAIL, 360 "Unable to allocate %zu bytes of memory for recv list condition variables.", 361 sizeof(hio_recv_list_cond[0]) * ncomps); 362 } 363 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 364 if (hio_remote_lock == NULL) { 365 primary_exitx(EX_TEMPFAIL, 366 "Unable to allocate %zu bytes of memory for remote connections locks.", 367 sizeof(hio_remote_lock[0]) * ncomps); 368 } 369 370 /* 371 * Initialize lists, their locks and theirs condition variables. 372 */ 373 TAILQ_INIT(&hio_free_list); 374 mtx_init(&hio_free_list_lock); 375 cv_init(&hio_free_list_cond); 376 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 377 TAILQ_INIT(&hio_send_list[ii]); 378 mtx_init(&hio_send_list_lock[ii]); 379 cv_init(&hio_send_list_cond[ii]); 380 TAILQ_INIT(&hio_recv_list[ii]); 381 mtx_init(&hio_recv_list_lock[ii]); 382 cv_init(&hio_recv_list_cond[ii]); 383 rw_init(&hio_remote_lock[ii]); 384 } 385 TAILQ_INIT(&hio_done_list); 386 mtx_init(&hio_done_list_lock); 387 cv_init(&hio_done_list_cond); 388 mtx_init(&metadata_lock); 389 390 /* 391 * Allocate requests pool and initialize requests. 392 */ 393 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 394 hio = malloc(sizeof(*hio)); 395 if (hio == NULL) { 396 primary_exitx(EX_TEMPFAIL, 397 "Unable to allocate %zu bytes of memory for hio request.", 398 sizeof(*hio)); 399 } 400 hio->hio_countdown = 0; 401 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 402 if (hio->hio_errors == NULL) { 403 primary_exitx(EX_TEMPFAIL, 404 "Unable allocate %zu bytes of memory for hio errors.", 405 sizeof(hio->hio_errors[0]) * ncomps); 406 } 407 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 408 if (hio->hio_next == NULL) { 409 primary_exitx(EX_TEMPFAIL, 410 "Unable allocate %zu bytes of memory for hio_next field.", 411 sizeof(hio->hio_next[0]) * ncomps); 412 } 413 hio->hio_ggio.gctl_version = G_GATE_VERSION; 414 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 415 if (hio->hio_ggio.gctl_data == NULL) { 416 primary_exitx(EX_TEMPFAIL, 417 "Unable to allocate %zu bytes of memory for gctl_data.", 418 MAXPHYS); 419 } 420 hio->hio_ggio.gctl_length = MAXPHYS; 421 hio->hio_ggio.gctl_error = 0; 422 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 423 } 424} 425 426static bool 427init_resuid(struct hast_resource *res) 428{ 429 430 mtx_lock(&metadata_lock); 431 if (res->hr_resuid != 0) { 432 mtx_unlock(&metadata_lock); 433 return (false); 434 } else { 435 /* Initialize unique resource identifier. */ 436 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 437 mtx_unlock(&metadata_lock); 438 if (metadata_write(res) < 0) 439 exit(EX_NOINPUT); 440 return (true); 441 } 442} 443 444static void 445init_local(struct hast_resource *res) 446{ 447 unsigned char *buf; 448 size_t mapsize; 449 450 if (metadata_read(res, true) < 0) 451 exit(EX_NOINPUT); 452 mtx_init(&res->hr_amp_lock); 453 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 454 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 455 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 456 } 457 mtx_init(&range_lock); 458 cv_init(&range_regular_cond); 459 if (rangelock_init(&range_regular) < 0) 460 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 461 cv_init(&range_sync_cond); 462 if (rangelock_init(&range_sync) < 0) 463 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 464 mapsize = activemap_ondisk_size(res->hr_amp); 465 buf = calloc(1, mapsize); 466 if (buf == NULL) { 467 primary_exitx(EX_TEMPFAIL, 468 "Unable to allocate buffer for activemap."); 469 } 470 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 471 (ssize_t)mapsize) { 472 primary_exit(EX_NOINPUT, "Unable to read activemap"); 473 } 474 activemap_copyin(res->hr_amp, buf, mapsize); 475 free(buf); 476 if (res->hr_resuid != 0) 477 return; 478 /* 479 * We're using provider for the first time. Initialize local and remote 480 * counters. We don't initialize resuid here, as we want to do it just 481 * in time. The reason for this is that we want to inform secondary 482 * that there were no writes yet, so there is no need to synchronize 483 * anything. 484 */ 485 res->hr_primary_localcnt = 1; 486 res->hr_primary_remotecnt = 0; 487 if (metadata_write(res) < 0) 488 exit(EX_NOINPUT); 489} 490 491static bool 492init_remote(struct hast_resource *res, struct proto_conn **inp, 493 struct proto_conn **outp) 494{ 495 struct proto_conn *in, *out; 496 struct nv *nvout, *nvin; 497 const unsigned char *token; 498 unsigned char *map; 499 const char *errmsg; 500 int32_t extentsize; 501 int64_t datasize; 502 uint32_t mapsize; 503 size_t size; 504 505 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 506 PJDLOG_ASSERT(real_remote(res)); 507 508 in = out = NULL; 509 errmsg = NULL; 510 511 /* Prepare outgoing connection with remote node. */ 512 if (proto_client(res->hr_remoteaddr, &out) < 0) { 513 primary_exit(EX_TEMPFAIL, 514 "Unable to create outgoing connection to %s", 515 res->hr_remoteaddr); 516 } 517 /* Try to connect, but accept failure. */ 518 if (proto_connect(out, HAST_TIMEOUT) < 0) { 519 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 520 res->hr_remoteaddr); 521 goto close; 522 } 523 /* Error in setting timeout is not critical, but why should it fail? */ 524 if (proto_timeout(out, res->hr_timeout) < 0) 525 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 526 /* 527 * First handshake step. 528 * Setup outgoing connection with remote node. 529 */ 530 nvout = nv_alloc(); 531 nv_add_string(nvout, res->hr_name, "resource"); 532 if (nv_error(nvout) != 0) { 533 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 534 "Unable to allocate header for connection with %s", 535 res->hr_remoteaddr); 536 nv_free(nvout); 537 goto close; 538 } 539 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 540 pjdlog_errno(LOG_WARNING, 541 "Unable to send handshake header to %s", 542 res->hr_remoteaddr); 543 nv_free(nvout); 544 goto close; 545 } 546 nv_free(nvout); 547 if (hast_proto_recv_hdr(out, &nvin) < 0) { 548 pjdlog_errno(LOG_WARNING, 549 "Unable to receive handshake header from %s", 550 res->hr_remoteaddr); 551 goto close; 552 } 553 errmsg = nv_get_string(nvin, "errmsg"); 554 if (errmsg != NULL) { 555 pjdlog_warning("%s", errmsg); 556 nv_free(nvin); 557 goto close; 558 } 559 token = nv_get_uint8_array(nvin, &size, "token"); 560 if (token == NULL) { 561 pjdlog_warning("Handshake header from %s has no 'token' field.", 562 res->hr_remoteaddr); 563 nv_free(nvin); 564 goto close; 565 } 566 if (size != sizeof(res->hr_token)) { 567 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 568 res->hr_remoteaddr, size, sizeof(res->hr_token)); 569 nv_free(nvin); 570 goto close; 571 } 572 bcopy(token, res->hr_token, sizeof(res->hr_token)); 573 nv_free(nvin); 574 575 /* 576 * Second handshake step. 577 * Setup incoming connection with remote node. 578 */ 579 if (proto_client(res->hr_remoteaddr, &in) < 0) { 580 primary_exit(EX_TEMPFAIL, 581 "Unable to create incoming connection to %s", 582 res->hr_remoteaddr); 583 } 584 /* Try to connect, but accept failure. */ 585 if (proto_connect(in, HAST_TIMEOUT) < 0) { 586 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 587 res->hr_remoteaddr); 588 goto close; 589 } 590 /* Error in setting timeout is not critical, but why should it fail? */ 591 if (proto_timeout(in, res->hr_timeout) < 0) 592 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 593 nvout = nv_alloc(); 594 nv_add_string(nvout, res->hr_name, "resource"); 595 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 596 "token"); 597 if (res->hr_resuid == 0) { 598 /* 599 * The resuid field was not yet initialized. 600 * Because we do synchronization inside init_resuid(), it is 601 * possible that someone already initialized it, the function 602 * will return false then, but if we successfully initialized 603 * it, we will get true. True means that there were no writes 604 * to this resource yet and we want to inform secondary that 605 * synchronization is not needed by sending "virgin" argument. 606 */ 607 if (init_resuid(res)) 608 nv_add_int8(nvout, 1, "virgin"); 609 } 610 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 611 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 612 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 613 if (nv_error(nvout) != 0) { 614 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 615 "Unable to allocate header for connection with %s", 616 res->hr_remoteaddr); 617 nv_free(nvout); 618 goto close; 619 } 620 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 621 pjdlog_errno(LOG_WARNING, 622 "Unable to send handshake header to %s", 623 res->hr_remoteaddr); 624 nv_free(nvout); 625 goto close; 626 } 627 nv_free(nvout); 628 if (hast_proto_recv_hdr(out, &nvin) < 0) { 629 pjdlog_errno(LOG_WARNING, 630 "Unable to receive handshake header from %s", 631 res->hr_remoteaddr); 632 goto close; 633 } 634 errmsg = nv_get_string(nvin, "errmsg"); 635 if (errmsg != NULL) { 636 pjdlog_warning("%s", errmsg); 637 nv_free(nvin); 638 goto close; 639 } 640 datasize = nv_get_int64(nvin, "datasize"); 641 if (datasize != res->hr_datasize) { 642 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 643 (intmax_t)res->hr_datasize, (intmax_t)datasize); 644 nv_free(nvin); 645 goto close; 646 } 647 extentsize = nv_get_int32(nvin, "extentsize"); 648 if (extentsize != res->hr_extentsize) { 649 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 650 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 651 nv_free(nvin); 652 goto close; 653 } 654 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 655 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 656 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 657 map = NULL; 658 mapsize = nv_get_uint32(nvin, "mapsize"); 659 if (mapsize > 0) { 660 map = malloc(mapsize); 661 if (map == NULL) { 662 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 663 (uintmax_t)mapsize); 664 nv_free(nvin); 665 goto close; 666 } 667 /* 668 * Remote node have some dirty extents on its own, lets 669 * download its activemap. 670 */ 671 if (hast_proto_recv_data(res, out, nvin, map, 672 mapsize) < 0) { 673 pjdlog_errno(LOG_ERR, 674 "Unable to receive remote activemap"); 675 nv_free(nvin); 676 free(map); 677 goto close; 678 } 679 /* 680 * Merge local and remote bitmaps. 681 */ 682 activemap_merge(res->hr_amp, map, mapsize); 683 free(map); 684 /* 685 * Now that we merged bitmaps from both nodes, flush it to the 686 * disk before we start to synchronize. 687 */ 688 (void)hast_activemap_flush(res); 689 } 690 nv_free(nvin); 691 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 692 if (inp != NULL && outp != NULL) { 693 *inp = in; 694 *outp = out; 695 } else { 696 res->hr_remotein = in; 697 res->hr_remoteout = out; 698 } 699 event_send(res, EVENT_CONNECT); 700 return (true); 701close: 702 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 703 event_send(res, EVENT_SPLITBRAIN); 704 proto_close(out); 705 if (in != NULL) 706 proto_close(in); 707 return (false); 708} 709 710static void 711sync_start(void) 712{ 713 714 mtx_lock(&sync_lock); 715 sync_inprogress = true; 716 mtx_unlock(&sync_lock); 717 cv_signal(&sync_cond); 718} 719 720static void 721sync_stop(void) 722{ 723 724 mtx_lock(&sync_lock); 725 if (sync_inprogress) 726 sync_inprogress = false; 727 mtx_unlock(&sync_lock); 728} 729 730static void 731init_ggate(struct hast_resource *res) 732{ 733 struct g_gate_ctl_create ggiocreate; 734 struct g_gate_ctl_cancel ggiocancel; 735 736 /* 737 * We communicate with ggate via /dev/ggctl. Open it. 738 */ 739 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 740 if (res->hr_ggatefd < 0) 741 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 742 /* 743 * Create provider before trying to connect, as connection failure 744 * is not critical, but may take some time. 745 */ 746 bzero(&ggiocreate, sizeof(ggiocreate)); 747 ggiocreate.gctl_version = G_GATE_VERSION; 748 ggiocreate.gctl_mediasize = res->hr_datasize; 749 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 750 ggiocreate.gctl_flags = 0; 751 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 752 ggiocreate.gctl_timeout = 0; 753 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 754 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 755 res->hr_provname); 756 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 757 pjdlog_info("Device hast/%s created.", res->hr_provname); 758 res->hr_ggateunit = ggiocreate.gctl_unit; 759 return; 760 } 761 if (errno != EEXIST) { 762 primary_exit(EX_OSERR, "Unable to create hast/%s device", 763 res->hr_provname); 764 } 765 pjdlog_debug(1, 766 "Device hast/%s already exists, we will try to take it over.", 767 res->hr_provname); 768 /* 769 * If we received EEXIST, we assume that the process who created the 770 * provider died and didn't clean up. In that case we will start from 771 * where he left of. 772 */ 773 bzero(&ggiocancel, sizeof(ggiocancel)); 774 ggiocancel.gctl_version = G_GATE_VERSION; 775 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 776 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 777 res->hr_provname); 778 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 779 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 780 res->hr_ggateunit = ggiocancel.gctl_unit; 781 return; 782 } 783 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 784 res->hr_provname); 785} 786 787void 788hastd_primary(struct hast_resource *res) 789{ 790 pthread_t td; 791 pid_t pid; 792 int error, mode; 793 794 /* 795 * Create communication channel between parent and child. 796 */ 797 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 798 /* TODO: There's no need for this to be fatal error. */ 799 KEEP_ERRNO((void)pidfile_remove(pfh)); 800 pjdlog_exit(EX_OSERR, 801 "Unable to create control sockets between parent and child"); 802 } 803 /* 804 * Create communication channel between child and parent. 805 */ 806 if (proto_client("socketpair://", &res->hr_event) < 0) { 807 /* TODO: There's no need for this to be fatal error. */ 808 KEEP_ERRNO((void)pidfile_remove(pfh)); 809 pjdlog_exit(EX_OSERR, 810 "Unable to create event sockets between child and parent"); 811 } 812 813 pid = fork(); 814 if (pid < 0) { 815 /* TODO: There's no need for this to be fatal error. */ 816 KEEP_ERRNO((void)pidfile_remove(pfh)); 817 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 818 } 819 820 if (pid > 0) { 821 /* This is parent. */ 822 /* Declare that we are receiver. */ 823 proto_recv(res->hr_event, NULL, 0); 824 /* Declare that we are sender. */ 825 proto_send(res->hr_ctrl, NULL, 0); 826 res->hr_workerpid = pid; 827 return; 828 } 829 830 gres = res; 831 mode = pjdlog_mode_get(); 832 833 /* Declare that we are sender. */ 834 proto_send(res->hr_event, NULL, 0); 835 /* Declare that we are receiver. */ 836 proto_recv(res->hr_ctrl, NULL, 0); 837 descriptors_cleanup(res); 838 839 descriptors_assert(res, mode); 840 841 pjdlog_init(mode); 842 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 843 setproctitle("%s (primary)", res->hr_name); 844 845 init_local(res); 846 init_ggate(res); 847 init_environment(res); 848 849 if (drop_privs() != 0) { 850 cleanup(res); 851 exit(EX_CONFIG); 852 } 853 pjdlog_info("Privileges successfully dropped."); 854 855 /* 856 * Create the guard thread first, so we can handle signals from the 857 * very begining. 858 */ 859 error = pthread_create(&td, NULL, guard_thread, res); 860 PJDLOG_ASSERT(error == 0); 861 /* 862 * Create the control thread before sending any event to the parent, 863 * as we can deadlock when parent sends control request to worker, 864 * but worker has no control thread started yet, so parent waits. 865 * In the meantime worker sends an event to the parent, but parent 866 * is unable to handle the event, because it waits for control 867 * request response. 868 */ 869 error = pthread_create(&td, NULL, ctrl_thread, res); 870 PJDLOG_ASSERT(error == 0); 871 if (real_remote(res) && init_remote(res, NULL, NULL)) 872 sync_start(); 873 error = pthread_create(&td, NULL, ggate_recv_thread, res); 874 PJDLOG_ASSERT(error == 0); 875 error = pthread_create(&td, NULL, local_send_thread, res); 876 PJDLOG_ASSERT(error == 0); 877 error = pthread_create(&td, NULL, remote_send_thread, res); 878 PJDLOG_ASSERT(error == 0); 879 error = pthread_create(&td, NULL, remote_recv_thread, res); 880 PJDLOG_ASSERT(error == 0); 881 error = pthread_create(&td, NULL, ggate_send_thread, res); 882 PJDLOG_ASSERT(error == 0); 883 (void)sync_thread(res); 884} 885 886static void 887reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 888{ 889 char msg[1024]; 890 va_list ap; 891 int len; 892 893 va_start(ap, fmt); 894 len = vsnprintf(msg, sizeof(msg), fmt, ap); 895 va_end(ap); 896 if ((size_t)len < sizeof(msg)) { 897 switch (ggio->gctl_cmd) { 898 case BIO_READ: 899 (void)snprintf(msg + len, sizeof(msg) - len, 900 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 901 (uintmax_t)ggio->gctl_length); 902 break; 903 case BIO_DELETE: 904 (void)snprintf(msg + len, sizeof(msg) - len, 905 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 906 (uintmax_t)ggio->gctl_length); 907 break; 908 case BIO_FLUSH: 909 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 910 break; 911 case BIO_WRITE: 912 (void)snprintf(msg + len, sizeof(msg) - len, 913 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 914 (uintmax_t)ggio->gctl_length); 915 break; 916 default: 917 (void)snprintf(msg + len, sizeof(msg) - len, 918 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 919 break; 920 } 921 } 922 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 923} 924 925static void 926remote_close(struct hast_resource *res, int ncomp) 927{ 928 929 rw_wlock(&hio_remote_lock[ncomp]); 930 /* 931 * A race is possible between dropping rlock and acquiring wlock - 932 * another thread can close connection in-between. 933 */ 934 if (!ISCONNECTED(res, ncomp)) { 935 PJDLOG_ASSERT(res->hr_remotein == NULL); 936 PJDLOG_ASSERT(res->hr_remoteout == NULL); 937 rw_unlock(&hio_remote_lock[ncomp]); 938 return; 939 } 940 941 PJDLOG_ASSERT(res->hr_remotein != NULL); 942 PJDLOG_ASSERT(res->hr_remoteout != NULL); 943 944 pjdlog_debug(2, "Closing incoming connection to %s.", 945 res->hr_remoteaddr); 946 proto_close(res->hr_remotein); 947 res->hr_remotein = NULL; 948 pjdlog_debug(2, "Closing outgoing connection to %s.", 949 res->hr_remoteaddr); 950 proto_close(res->hr_remoteout); 951 res->hr_remoteout = NULL; 952 953 rw_unlock(&hio_remote_lock[ncomp]); 954 955 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 956 957 /* 958 * Stop synchronization if in-progress. 959 */ 960 sync_stop(); 961 962 event_send(res, EVENT_DISCONNECT); 963} 964 965/* 966 * Thread receives ggate I/O requests from the kernel and passes them to 967 * appropriate threads: 968 * WRITE - always goes to both local_send and remote_send threads 969 * READ (when the block is up-to-date on local component) - 970 * only local_send thread 971 * READ (when the block isn't up-to-date on local component) - 972 * only remote_send thread 973 * DELETE - always goes to both local_send and remote_send threads 974 * FLUSH - always goes to both local_send and remote_send threads 975 */ 976static void * 977ggate_recv_thread(void *arg) 978{ 979 struct hast_resource *res = arg; 980 struct g_gate_ctl_io *ggio; 981 struct hio *hio; 982 unsigned int ii, ncomp, ncomps; 983 int error; 984 985 ncomps = HAST_NCOMPONENTS; 986 987 for (;;) { 988 pjdlog_debug(2, "ggate_recv: Taking free request."); 989 QUEUE_TAKE2(hio, free); 990 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 991 ggio = &hio->hio_ggio; 992 ggio->gctl_unit = res->hr_ggateunit; 993 ggio->gctl_length = MAXPHYS; 994 ggio->gctl_error = 0; 995 pjdlog_debug(2, 996 "ggate_recv: (%p) Waiting for request from the kernel.", 997 hio); 998 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 999 if (sigexit_received) 1000 pthread_exit(NULL); 1001 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1002 } 1003 error = ggio->gctl_error; 1004 switch (error) { 1005 case 0: 1006 break; 1007 case ECANCELED: 1008 /* Exit gracefully. */ 1009 if (!sigexit_received) { 1010 pjdlog_debug(2, 1011 "ggate_recv: (%p) Received cancel from the kernel.", 1012 hio); 1013 pjdlog_info("Received cancel from the kernel, exiting."); 1014 } 1015 pthread_exit(NULL); 1016 case ENOMEM: 1017 /* 1018 * Buffer too small? Impossible, we allocate MAXPHYS 1019 * bytes - request can't be bigger than that. 1020 */ 1021 /* FALLTHROUGH */ 1022 case ENXIO: 1023 default: 1024 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1025 strerror(error)); 1026 } 1027 for (ii = 0; ii < ncomps; ii++) 1028 hio->hio_errors[ii] = EINVAL; 1029 reqlog(LOG_DEBUG, 2, ggio, 1030 "ggate_recv: (%p) Request received from the kernel: ", 1031 hio); 1032 /* 1033 * Inform all components about new write request. 1034 * For read request prefer local component unless the given 1035 * range is out-of-date, then use remote component. 1036 */ 1037 switch (ggio->gctl_cmd) { 1038 case BIO_READ: 1039 pjdlog_debug(2, 1040 "ggate_recv: (%p) Moving request to the send queue.", 1041 hio); 1042 refcount_init(&hio->hio_countdown, 1); 1043 mtx_lock(&metadata_lock); 1044 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1045 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1046 /* 1047 * This range is up-to-date on local component, 1048 * so handle request locally. 1049 */ 1050 /* Local component is 0 for now. */ 1051 ncomp = 0; 1052 } else /* if (res->hr_syncsrc == 1053 HAST_SYNCSRC_SECONDARY) */ { 1054 PJDLOG_ASSERT(res->hr_syncsrc == 1055 HAST_SYNCSRC_SECONDARY); 1056 /* 1057 * This range is out-of-date on local component, 1058 * so send request to the remote node. 1059 */ 1060 /* Remote component is 1 for now. */ 1061 ncomp = 1; 1062 } 1063 mtx_unlock(&metadata_lock); 1064 QUEUE_INSERT1(hio, send, ncomp); 1065 break; 1066 case BIO_WRITE: 1067 if (res->hr_resuid == 0) { 1068 /* This is first write, initialize resuid. */ 1069 (void)init_resuid(res); 1070 } 1071 for (;;) { 1072 mtx_lock(&range_lock); 1073 if (rangelock_islocked(range_sync, 1074 ggio->gctl_offset, ggio->gctl_length)) { 1075 pjdlog_debug(2, 1076 "regular: Range offset=%jd length=%zu locked.", 1077 (intmax_t)ggio->gctl_offset, 1078 (size_t)ggio->gctl_length); 1079 range_regular_wait = true; 1080 cv_wait(&range_regular_cond, &range_lock); 1081 range_regular_wait = false; 1082 mtx_unlock(&range_lock); 1083 continue; 1084 } 1085 if (rangelock_add(range_regular, 1086 ggio->gctl_offset, ggio->gctl_length) < 0) { 1087 mtx_unlock(&range_lock); 1088 pjdlog_debug(2, 1089 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1090 (intmax_t)ggio->gctl_offset, 1091 (size_t)ggio->gctl_length); 1092 sleep(1); 1093 continue; 1094 } 1095 mtx_unlock(&range_lock); 1096 break; 1097 } 1098 mtx_lock(&res->hr_amp_lock); 1099 if (activemap_write_start(res->hr_amp, 1100 ggio->gctl_offset, ggio->gctl_length)) { 1101 (void)hast_activemap_flush(res); 1102 } 1103 mtx_unlock(&res->hr_amp_lock); 1104 /* FALLTHROUGH */ 1105 case BIO_DELETE: 1106 case BIO_FLUSH: 1107 pjdlog_debug(2, 1108 "ggate_recv: (%p) Moving request to the send queues.", 1109 hio); 1110 refcount_init(&hio->hio_countdown, ncomps); 1111 for (ii = 0; ii < ncomps; ii++) 1112 QUEUE_INSERT1(hio, send, ii); 1113 break; 1114 } 1115 } 1116 /* NOTREACHED */ 1117 return (NULL); 1118} 1119 1120/* 1121 * Thread reads from or writes to local component. 1122 * If local read fails, it redirects it to remote_send thread. 1123 */ 1124static void * 1125local_send_thread(void *arg) 1126{ 1127 struct hast_resource *res = arg; 1128 struct g_gate_ctl_io *ggio; 1129 struct hio *hio; 1130 unsigned int ncomp, rncomp; 1131 ssize_t ret; 1132 1133 /* Local component is 0 for now. */ 1134 ncomp = 0; 1135 /* Remote component is 1 for now. */ 1136 rncomp = 1; 1137 1138 for (;;) { 1139 pjdlog_debug(2, "local_send: Taking request."); 1140 QUEUE_TAKE1(hio, send, ncomp, 0); 1141 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1142 ggio = &hio->hio_ggio; 1143 switch (ggio->gctl_cmd) { 1144 case BIO_READ: 1145 ret = pread(res->hr_localfd, ggio->gctl_data, 1146 ggio->gctl_length, 1147 ggio->gctl_offset + res->hr_localoff); 1148 if (ret == ggio->gctl_length) 1149 hio->hio_errors[ncomp] = 0; 1150 else { 1151 /* 1152 * If READ failed, try to read from remote node. 1153 */ 1154 if (ret < 0) { 1155 reqlog(LOG_WARNING, 0, ggio, 1156 "Local request failed (%s), trying remote node. ", 1157 strerror(errno)); 1158 } else if (ret != ggio->gctl_length) { 1159 reqlog(LOG_WARNING, 0, ggio, 1160 "Local request failed (%zd != %jd), trying remote node. ", 1161 ret, (intmax_t)ggio->gctl_length); 1162 } 1163 QUEUE_INSERT1(hio, send, rncomp); 1164 continue; 1165 } 1166 break; 1167 case BIO_WRITE: 1168 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1169 ggio->gctl_length, 1170 ggio->gctl_offset + res->hr_localoff); 1171 if (ret < 0) { 1172 hio->hio_errors[ncomp] = errno; 1173 reqlog(LOG_WARNING, 0, ggio, 1174 "Local request failed (%s): ", 1175 strerror(errno)); 1176 } else if (ret != ggio->gctl_length) { 1177 hio->hio_errors[ncomp] = EIO; 1178 reqlog(LOG_WARNING, 0, ggio, 1179 "Local request failed (%zd != %jd): ", 1180 ret, (intmax_t)ggio->gctl_length); 1181 } else { 1182 hio->hio_errors[ncomp] = 0; 1183 } 1184 break; 1185 case BIO_DELETE: 1186 ret = g_delete(res->hr_localfd, 1187 ggio->gctl_offset + res->hr_localoff, 1188 ggio->gctl_length); 1189 if (ret < 0) { 1190 hio->hio_errors[ncomp] = errno; 1191 reqlog(LOG_WARNING, 0, ggio, 1192 "Local request failed (%s): ", 1193 strerror(errno)); 1194 } else { 1195 hio->hio_errors[ncomp] = 0; 1196 } 1197 break; 1198 case BIO_FLUSH: 1199 ret = g_flush(res->hr_localfd); 1200 if (ret < 0) { 1201 hio->hio_errors[ncomp] = errno; 1202 reqlog(LOG_WARNING, 0, ggio, 1203 "Local request failed (%s): ", 1204 strerror(errno)); 1205 } else { 1206 hio->hio_errors[ncomp] = 0; 1207 } 1208 break; 1209 } 1210 if (refcount_release(&hio->hio_countdown)) { 1211 if (ISSYNCREQ(hio)) { 1212 mtx_lock(&sync_lock); 1213 SYNCREQDONE(hio); 1214 mtx_unlock(&sync_lock); 1215 cv_signal(&sync_cond); 1216 } else { 1217 pjdlog_debug(2, 1218 "local_send: (%p) Moving request to the done queue.", 1219 hio); 1220 QUEUE_INSERT2(hio, done); 1221 } 1222 } 1223 } 1224 /* NOTREACHED */ 1225 return (NULL); 1226} 1227 1228static void 1229keepalive_send(struct hast_resource *res, unsigned int ncomp) 1230{ 1231 struct nv *nv; 1232 1233 rw_rlock(&hio_remote_lock[ncomp]); 1234 1235 if (!ISCONNECTED(res, ncomp)) { 1236 rw_unlock(&hio_remote_lock[ncomp]); 1237 return; 1238 } 1239 1240 PJDLOG_ASSERT(res->hr_remotein != NULL); 1241 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1242 1243 nv = nv_alloc(); 1244 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1245 if (nv_error(nv) != 0) { 1246 rw_unlock(&hio_remote_lock[ncomp]); 1247 nv_free(nv); 1248 pjdlog_debug(1, 1249 "keepalive_send: Unable to prepare header to send."); 1250 return; 1251 } 1252 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1253 rw_unlock(&hio_remote_lock[ncomp]); 1254 pjdlog_common(LOG_DEBUG, 1, errno, 1255 "keepalive_send: Unable to send request"); 1256 nv_free(nv); 1257 remote_close(res, ncomp); 1258 return; 1259 } 1260 1261 rw_unlock(&hio_remote_lock[ncomp]); 1262 nv_free(nv); 1263 pjdlog_debug(2, "keepalive_send: Request sent."); 1264} 1265 1266/* 1267 * Thread sends request to secondary node. 1268 */ 1269static void * 1270remote_send_thread(void *arg) 1271{ 1272 struct hast_resource *res = arg; 1273 struct g_gate_ctl_io *ggio; 1274 time_t lastcheck, now; 1275 struct hio *hio; 1276 struct nv *nv; 1277 unsigned int ncomp; 1278 bool wakeup; 1279 uint64_t offset, length; 1280 uint8_t cmd; 1281 void *data; 1282 1283 /* Remote component is 1 for now. */ 1284 ncomp = 1; 1285 lastcheck = time(NULL); 1286 1287 for (;;) { 1288 pjdlog_debug(2, "remote_send: Taking request."); 1289 QUEUE_TAKE1(hio, send, ncomp, RETRY_SLEEP); 1290 if (hio == NULL) { 1291 now = time(NULL); 1292 if (lastcheck + RETRY_SLEEP <= now) { 1293 keepalive_send(res, ncomp); 1294 lastcheck = now; 1295 } 1296 continue; 1297 } 1298 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1299 ggio = &hio->hio_ggio; 1300 switch (ggio->gctl_cmd) { 1301 case BIO_READ: 1302 cmd = HIO_READ; 1303 data = NULL; 1304 offset = ggio->gctl_offset; 1305 length = ggio->gctl_length; 1306 break; 1307 case BIO_WRITE: 1308 cmd = HIO_WRITE; 1309 data = ggio->gctl_data; 1310 offset = ggio->gctl_offset; 1311 length = ggio->gctl_length; 1312 break; 1313 case BIO_DELETE: 1314 cmd = HIO_DELETE; 1315 data = NULL; 1316 offset = ggio->gctl_offset; 1317 length = ggio->gctl_length; 1318 break; 1319 case BIO_FLUSH: 1320 cmd = HIO_FLUSH; 1321 data = NULL; 1322 offset = 0; 1323 length = 0; 1324 break; 1325 default: 1326 PJDLOG_ASSERT(!"invalid condition"); 1327 abort(); 1328 } 1329 nv = nv_alloc(); 1330 nv_add_uint8(nv, cmd, "cmd"); 1331 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1332 nv_add_uint64(nv, offset, "offset"); 1333 nv_add_uint64(nv, length, "length"); 1334 if (nv_error(nv) != 0) { 1335 hio->hio_errors[ncomp] = nv_error(nv); 1336 pjdlog_debug(2, 1337 "remote_send: (%p) Unable to prepare header to send.", 1338 hio); 1339 reqlog(LOG_ERR, 0, ggio, 1340 "Unable to prepare header to send (%s): ", 1341 strerror(nv_error(nv))); 1342 /* Move failed request immediately to the done queue. */ 1343 goto done_queue; 1344 } 1345 pjdlog_debug(2, 1346 "remote_send: (%p) Moving request to the recv queue.", 1347 hio); 1348 /* 1349 * Protect connection from disappearing. 1350 */ 1351 rw_rlock(&hio_remote_lock[ncomp]); 1352 if (!ISCONNECTED(res, ncomp)) { 1353 rw_unlock(&hio_remote_lock[ncomp]); 1354 hio->hio_errors[ncomp] = ENOTCONN; 1355 goto done_queue; 1356 } 1357 /* 1358 * Move the request to recv queue before sending it, because 1359 * in different order we can get reply before we move request 1360 * to recv queue. 1361 */ 1362 mtx_lock(&hio_recv_list_lock[ncomp]); 1363 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1364 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1365 mtx_unlock(&hio_recv_list_lock[ncomp]); 1366 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1367 data != NULL ? length : 0) < 0) { 1368 hio->hio_errors[ncomp] = errno; 1369 rw_unlock(&hio_remote_lock[ncomp]); 1370 pjdlog_debug(2, 1371 "remote_send: (%p) Unable to send request.", hio); 1372 reqlog(LOG_ERR, 0, ggio, 1373 "Unable to send request (%s): ", 1374 strerror(hio->hio_errors[ncomp])); 1375 remote_close(res, ncomp); 1376 /* 1377 * Take request back from the receive queue and move 1378 * it immediately to the done queue. 1379 */ 1380 mtx_lock(&hio_recv_list_lock[ncomp]); 1381 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1382 mtx_unlock(&hio_recv_list_lock[ncomp]); 1383 goto done_queue; 1384 } 1385 rw_unlock(&hio_remote_lock[ncomp]); 1386 nv_free(nv); 1387 if (wakeup) 1388 cv_signal(&hio_recv_list_cond[ncomp]); 1389 continue; 1390done_queue: 1391 nv_free(nv); 1392 if (ISSYNCREQ(hio)) { 1393 if (!refcount_release(&hio->hio_countdown)) 1394 continue; 1395 mtx_lock(&sync_lock); 1396 SYNCREQDONE(hio); 1397 mtx_unlock(&sync_lock); 1398 cv_signal(&sync_cond); 1399 continue; 1400 } 1401 if (ggio->gctl_cmd == BIO_WRITE) { 1402 mtx_lock(&res->hr_amp_lock); 1403 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1404 ggio->gctl_length)) { 1405 (void)hast_activemap_flush(res); 1406 } 1407 mtx_unlock(&res->hr_amp_lock); 1408 } 1409 if (!refcount_release(&hio->hio_countdown)) 1410 continue; 1411 pjdlog_debug(2, 1412 "remote_send: (%p) Moving request to the done queue.", 1413 hio); 1414 QUEUE_INSERT2(hio, done); 1415 } 1416 /* NOTREACHED */ 1417 return (NULL); 1418} 1419 1420/* 1421 * Thread receives answer from secondary node and passes it to ggate_send 1422 * thread. 1423 */ 1424static void * 1425remote_recv_thread(void *arg) 1426{ 1427 struct hast_resource *res = arg; 1428 struct g_gate_ctl_io *ggio; 1429 struct hio *hio; 1430 struct nv *nv; 1431 unsigned int ncomp; 1432 uint64_t seq; 1433 int error; 1434 1435 /* Remote component is 1 for now. */ 1436 ncomp = 1; 1437 1438 for (;;) { 1439 /* Wait until there is anything to receive. */ 1440 mtx_lock(&hio_recv_list_lock[ncomp]); 1441 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1442 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1443 cv_wait(&hio_recv_list_cond[ncomp], 1444 &hio_recv_list_lock[ncomp]); 1445 } 1446 mtx_unlock(&hio_recv_list_lock[ncomp]); 1447 rw_rlock(&hio_remote_lock[ncomp]); 1448 if (!ISCONNECTED(res, ncomp)) { 1449 rw_unlock(&hio_remote_lock[ncomp]); 1450 /* 1451 * Connection is dead, so move all pending requests to 1452 * the done queue (one-by-one). 1453 */ 1454 mtx_lock(&hio_recv_list_lock[ncomp]); 1455 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1456 PJDLOG_ASSERT(hio != NULL); 1457 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1458 hio_next[ncomp]); 1459 mtx_unlock(&hio_recv_list_lock[ncomp]); 1460 goto done_queue; 1461 } 1462 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1463 pjdlog_errno(LOG_ERR, 1464 "Unable to receive reply header"); 1465 rw_unlock(&hio_remote_lock[ncomp]); 1466 remote_close(res, ncomp); 1467 continue; 1468 } 1469 rw_unlock(&hio_remote_lock[ncomp]); 1470 seq = nv_get_uint64(nv, "seq"); 1471 if (seq == 0) { 1472 pjdlog_error("Header contains no 'seq' field."); 1473 nv_free(nv); 1474 continue; 1475 } 1476 mtx_lock(&hio_recv_list_lock[ncomp]); 1477 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1478 if (hio->hio_ggio.gctl_seq == seq) { 1479 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1480 hio_next[ncomp]); 1481 break; 1482 } 1483 } 1484 mtx_unlock(&hio_recv_list_lock[ncomp]); 1485 if (hio == NULL) { 1486 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1487 (uintmax_t)seq); 1488 nv_free(nv); 1489 continue; 1490 } 1491 error = nv_get_int16(nv, "error"); 1492 if (error != 0) { 1493 /* Request failed on remote side. */ 1494 hio->hio_errors[ncomp] = error; 1495 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1496 "Remote request failed (%s): ", strerror(error)); 1497 nv_free(nv); 1498 goto done_queue; 1499 } 1500 ggio = &hio->hio_ggio; 1501 switch (ggio->gctl_cmd) { 1502 case BIO_READ: 1503 rw_rlock(&hio_remote_lock[ncomp]); 1504 if (!ISCONNECTED(res, ncomp)) { 1505 rw_unlock(&hio_remote_lock[ncomp]); 1506 nv_free(nv); 1507 goto done_queue; 1508 } 1509 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1510 ggio->gctl_data, ggio->gctl_length) < 0) { 1511 hio->hio_errors[ncomp] = errno; 1512 pjdlog_errno(LOG_ERR, 1513 "Unable to receive reply data"); 1514 rw_unlock(&hio_remote_lock[ncomp]); 1515 nv_free(nv); 1516 remote_close(res, ncomp); 1517 goto done_queue; 1518 } 1519 rw_unlock(&hio_remote_lock[ncomp]); 1520 break; 1521 case BIO_WRITE: 1522 case BIO_DELETE: 1523 case BIO_FLUSH: 1524 break; 1525 default: 1526 PJDLOG_ASSERT(!"invalid condition"); 1527 abort(); 1528 } 1529 hio->hio_errors[ncomp] = 0; 1530 nv_free(nv); 1531done_queue: 1532 if (refcount_release(&hio->hio_countdown)) { 1533 if (ISSYNCREQ(hio)) { 1534 mtx_lock(&sync_lock); 1535 SYNCREQDONE(hio); 1536 mtx_unlock(&sync_lock); 1537 cv_signal(&sync_cond); 1538 } else { 1539 pjdlog_debug(2, 1540 "remote_recv: (%p) Moving request to the done queue.", 1541 hio); 1542 QUEUE_INSERT2(hio, done); 1543 } 1544 } 1545 } 1546 /* NOTREACHED */ 1547 return (NULL); 1548} 1549 1550/* 1551 * Thread sends answer to the kernel. 1552 */ 1553static void * 1554ggate_send_thread(void *arg) 1555{ 1556 struct hast_resource *res = arg; 1557 struct g_gate_ctl_io *ggio; 1558 struct hio *hio; 1559 unsigned int ii, ncomp, ncomps; 1560 1561 ncomps = HAST_NCOMPONENTS; 1562 1563 for (;;) { 1564 pjdlog_debug(2, "ggate_send: Taking request."); 1565 QUEUE_TAKE2(hio, done); 1566 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1567 ggio = &hio->hio_ggio; 1568 for (ii = 0; ii < ncomps; ii++) { 1569 if (hio->hio_errors[ii] == 0) { 1570 /* 1571 * One successful request is enough to declare 1572 * success. 1573 */ 1574 ggio->gctl_error = 0; 1575 break; 1576 } 1577 } 1578 if (ii == ncomps) { 1579 /* 1580 * None of the requests were successful. 1581 * Use first error. 1582 */ 1583 ggio->gctl_error = hio->hio_errors[0]; 1584 } 1585 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1586 mtx_lock(&res->hr_amp_lock); 1587 activemap_write_complete(res->hr_amp, 1588 ggio->gctl_offset, ggio->gctl_length); 1589 mtx_unlock(&res->hr_amp_lock); 1590 } 1591 if (ggio->gctl_cmd == BIO_WRITE) { 1592 /* 1593 * Unlock range we locked. 1594 */ 1595 mtx_lock(&range_lock); 1596 rangelock_del(range_regular, ggio->gctl_offset, 1597 ggio->gctl_length); 1598 if (range_sync_wait) 1599 cv_signal(&range_sync_cond); 1600 mtx_unlock(&range_lock); 1601 /* 1602 * Bump local count if this is first write after 1603 * connection failure with remote node. 1604 */ 1605 ncomp = 1; 1606 rw_rlock(&hio_remote_lock[ncomp]); 1607 if (!ISCONNECTED(res, ncomp)) { 1608 mtx_lock(&metadata_lock); 1609 if (res->hr_primary_localcnt == 1610 res->hr_secondary_remotecnt) { 1611 res->hr_primary_localcnt++; 1612 pjdlog_debug(1, 1613 "Increasing localcnt to %ju.", 1614 (uintmax_t)res->hr_primary_localcnt); 1615 (void)metadata_write(res); 1616 } 1617 mtx_unlock(&metadata_lock); 1618 } 1619 rw_unlock(&hio_remote_lock[ncomp]); 1620 } 1621 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1622 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1623 pjdlog_debug(2, 1624 "ggate_send: (%p) Moving request to the free queue.", hio); 1625 QUEUE_INSERT2(hio, free); 1626 } 1627 /* NOTREACHED */ 1628 return (NULL); 1629} 1630 1631/* 1632 * Thread synchronize local and remote components. 1633 */ 1634static void * 1635sync_thread(void *arg __unused) 1636{ 1637 struct hast_resource *res = arg; 1638 struct hio *hio; 1639 struct g_gate_ctl_io *ggio; 1640 unsigned int ii, ncomp, ncomps; 1641 off_t offset, length, synced; 1642 bool dorewind; 1643 int syncext; 1644 1645 ncomps = HAST_NCOMPONENTS; 1646 dorewind = true; 1647 synced = 0; 1648 offset = -1; 1649 1650 for (;;) { 1651 mtx_lock(&sync_lock); 1652 if (offset >= 0 && !sync_inprogress) { 1653 pjdlog_info("Synchronization interrupted. " 1654 "%jd bytes synchronized so far.", 1655 (intmax_t)synced); 1656 event_send(res, EVENT_SYNCINTR); 1657 } 1658 while (!sync_inprogress) { 1659 dorewind = true; 1660 synced = 0; 1661 cv_wait(&sync_cond, &sync_lock); 1662 } 1663 mtx_unlock(&sync_lock); 1664 /* 1665 * Obtain offset at which we should synchronize. 1666 * Rewind synchronization if needed. 1667 */ 1668 mtx_lock(&res->hr_amp_lock); 1669 if (dorewind) 1670 activemap_sync_rewind(res->hr_amp); 1671 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1672 if (syncext != -1) { 1673 /* 1674 * We synchronized entire syncext extent, we can mark 1675 * it as clean now. 1676 */ 1677 if (activemap_extent_complete(res->hr_amp, syncext)) 1678 (void)hast_activemap_flush(res); 1679 } 1680 mtx_unlock(&res->hr_amp_lock); 1681 if (dorewind) { 1682 dorewind = false; 1683 if (offset < 0) 1684 pjdlog_info("Nodes are in sync."); 1685 else { 1686 pjdlog_info("Synchronization started. %ju bytes to go.", 1687 (uintmax_t)(res->hr_extentsize * 1688 activemap_ndirty(res->hr_amp))); 1689 event_send(res, EVENT_SYNCSTART); 1690 } 1691 } 1692 if (offset < 0) { 1693 sync_stop(); 1694 pjdlog_debug(1, "Nothing to synchronize."); 1695 /* 1696 * Synchronization complete, make both localcnt and 1697 * remotecnt equal. 1698 */ 1699 ncomp = 1; 1700 rw_rlock(&hio_remote_lock[ncomp]); 1701 if (ISCONNECTED(res, ncomp)) { 1702 if (synced > 0) { 1703 pjdlog_info("Synchronization complete. " 1704 "%jd bytes synchronized.", 1705 (intmax_t)synced); 1706 event_send(res, EVENT_SYNCDONE); 1707 } 1708 mtx_lock(&metadata_lock); 1709 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1710 res->hr_primary_localcnt = 1711 res->hr_secondary_localcnt; 1712 res->hr_primary_remotecnt = 1713 res->hr_secondary_remotecnt; 1714 pjdlog_debug(1, 1715 "Setting localcnt to %ju and remotecnt to %ju.", 1716 (uintmax_t)res->hr_primary_localcnt, 1717 (uintmax_t)res->hr_secondary_localcnt); 1718 (void)metadata_write(res); 1719 mtx_unlock(&metadata_lock); 1720 } 1721 rw_unlock(&hio_remote_lock[ncomp]); 1722 continue; 1723 } 1724 pjdlog_debug(2, "sync: Taking free request."); 1725 QUEUE_TAKE2(hio, free); 1726 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1727 /* 1728 * Lock the range we are going to synchronize. We don't want 1729 * race where someone writes between our read and write. 1730 */ 1731 for (;;) { 1732 mtx_lock(&range_lock); 1733 if (rangelock_islocked(range_regular, offset, length)) { 1734 pjdlog_debug(2, 1735 "sync: Range offset=%jd length=%jd locked.", 1736 (intmax_t)offset, (intmax_t)length); 1737 range_sync_wait = true; 1738 cv_wait(&range_sync_cond, &range_lock); 1739 range_sync_wait = false; 1740 mtx_unlock(&range_lock); 1741 continue; 1742 } 1743 if (rangelock_add(range_sync, offset, length) < 0) { 1744 mtx_unlock(&range_lock); 1745 pjdlog_debug(2, 1746 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1747 (intmax_t)offset, (intmax_t)length); 1748 sleep(1); 1749 continue; 1750 } 1751 mtx_unlock(&range_lock); 1752 break; 1753 } 1754 /* 1755 * First read the data from synchronization source. 1756 */ 1757 SYNCREQ(hio); 1758 ggio = &hio->hio_ggio; 1759 ggio->gctl_cmd = BIO_READ; 1760 ggio->gctl_offset = offset; 1761 ggio->gctl_length = length; 1762 ggio->gctl_error = 0; 1763 for (ii = 0; ii < ncomps; ii++) 1764 hio->hio_errors[ii] = EINVAL; 1765 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1766 hio); 1767 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1768 hio); 1769 mtx_lock(&metadata_lock); 1770 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1771 /* 1772 * This range is up-to-date on local component, 1773 * so handle request locally. 1774 */ 1775 /* Local component is 0 for now. */ 1776 ncomp = 0; 1777 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1778 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1779 /* 1780 * This range is out-of-date on local component, 1781 * so send request to the remote node. 1782 */ 1783 /* Remote component is 1 for now. */ 1784 ncomp = 1; 1785 } 1786 mtx_unlock(&metadata_lock); 1787 refcount_init(&hio->hio_countdown, 1); 1788 QUEUE_INSERT1(hio, send, ncomp); 1789 1790 /* 1791 * Let's wait for READ to finish. 1792 */ 1793 mtx_lock(&sync_lock); 1794 while (!ISSYNCREQDONE(hio)) 1795 cv_wait(&sync_cond, &sync_lock); 1796 mtx_unlock(&sync_lock); 1797 1798 if (hio->hio_errors[ncomp] != 0) { 1799 pjdlog_error("Unable to read synchronization data: %s.", 1800 strerror(hio->hio_errors[ncomp])); 1801 goto free_queue; 1802 } 1803 1804 /* 1805 * We read the data from synchronization source, now write it 1806 * to synchronization target. 1807 */ 1808 SYNCREQ(hio); 1809 ggio->gctl_cmd = BIO_WRITE; 1810 for (ii = 0; ii < ncomps; ii++) 1811 hio->hio_errors[ii] = EINVAL; 1812 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1813 hio); 1814 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1815 hio); 1816 mtx_lock(&metadata_lock); 1817 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1818 /* 1819 * This range is up-to-date on local component, 1820 * so we update remote component. 1821 */ 1822 /* Remote component is 1 for now. */ 1823 ncomp = 1; 1824 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1825 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1826 /* 1827 * This range is out-of-date on local component, 1828 * so we update it. 1829 */ 1830 /* Local component is 0 for now. */ 1831 ncomp = 0; 1832 } 1833 mtx_unlock(&metadata_lock); 1834 1835 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1836 hio); 1837 refcount_init(&hio->hio_countdown, 1); 1838 QUEUE_INSERT1(hio, send, ncomp); 1839 1840 /* 1841 * Let's wait for WRITE to finish. 1842 */ 1843 mtx_lock(&sync_lock); 1844 while (!ISSYNCREQDONE(hio)) 1845 cv_wait(&sync_cond, &sync_lock); 1846 mtx_unlock(&sync_lock); 1847 1848 if (hio->hio_errors[ncomp] != 0) { 1849 pjdlog_error("Unable to write synchronization data: %s.", 1850 strerror(hio->hio_errors[ncomp])); 1851 goto free_queue; 1852 } 1853 1854 synced += length; 1855free_queue: 1856 mtx_lock(&range_lock); 1857 rangelock_del(range_sync, offset, length); 1858 if (range_regular_wait) 1859 cv_signal(&range_regular_cond); 1860 mtx_unlock(&range_lock); 1861 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1862 hio); 1863 QUEUE_INSERT2(hio, free); 1864 } 1865 /* NOTREACHED */ 1866 return (NULL); 1867} 1868 1869void 1870primary_config_reload(struct hast_resource *res, struct nv *nv) 1871{ 1872 unsigned int ii, ncomps; 1873 int modified, vint; 1874 const char *vstr; 1875 1876 pjdlog_info("Reloading configuration..."); 1877 1878 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1879 PJDLOG_ASSERT(gres == res); 1880 nv_assert(nv, "remoteaddr"); 1881 nv_assert(nv, "replication"); 1882 nv_assert(nv, "timeout"); 1883 nv_assert(nv, "exec"); 1884 1885 ncomps = HAST_NCOMPONENTS; 1886 1887#define MODIFIED_REMOTEADDR 0x1 1888#define MODIFIED_REPLICATION 0x2 1889#define MODIFIED_TIMEOUT 0x4 1890#define MODIFIED_EXEC 0x8 1891 modified = 0; 1892 1893 vstr = nv_get_string(nv, "remoteaddr"); 1894 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 1895 /* 1896 * Don't copy res->hr_remoteaddr to gres just yet. 1897 * We want remote_close() to log disconnect from the old 1898 * addresses, not from the new ones. 1899 */ 1900 modified |= MODIFIED_REMOTEADDR; 1901 } 1902 vint = nv_get_int32(nv, "replication"); 1903 if (gres->hr_replication != vint) { 1904 gres->hr_replication = vint; 1905 modified |= MODIFIED_REPLICATION; 1906 } 1907 vint = nv_get_int32(nv, "timeout"); 1908 if (gres->hr_timeout != vint) { 1909 gres->hr_timeout = vint; 1910 modified |= MODIFIED_TIMEOUT; 1911 } 1912 vstr = nv_get_string(nv, "exec"); 1913 if (strcmp(gres->hr_exec, vstr) != 0) { 1914 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 1915 modified |= MODIFIED_EXEC; 1916 } 1917 1918 /* 1919 * If only timeout was modified we only need to change it without 1920 * reconnecting. 1921 */ 1922 if (modified == MODIFIED_TIMEOUT) { 1923 for (ii = 0; ii < ncomps; ii++) { 1924 if (!ISREMOTE(ii)) 1925 continue; 1926 rw_rlock(&hio_remote_lock[ii]); 1927 if (!ISCONNECTED(gres, ii)) { 1928 rw_unlock(&hio_remote_lock[ii]); 1929 continue; 1930 } 1931 rw_unlock(&hio_remote_lock[ii]); 1932 if (proto_timeout(gres->hr_remotein, 1933 gres->hr_timeout) < 0) { 1934 pjdlog_errno(LOG_WARNING, 1935 "Unable to set connection timeout"); 1936 } 1937 if (proto_timeout(gres->hr_remoteout, 1938 gres->hr_timeout) < 0) { 1939 pjdlog_errno(LOG_WARNING, 1940 "Unable to set connection timeout"); 1941 } 1942 } 1943 } else if ((modified & 1944 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 1945 for (ii = 0; ii < ncomps; ii++) { 1946 if (!ISREMOTE(ii)) 1947 continue; 1948 remote_close(gres, ii); 1949 } 1950 if (modified & MODIFIED_REMOTEADDR) { 1951 vstr = nv_get_string(nv, "remoteaddr"); 1952 strlcpy(gres->hr_remoteaddr, vstr, 1953 sizeof(gres->hr_remoteaddr)); 1954 } 1955 } 1956#undef MODIFIED_REMOTEADDR 1957#undef MODIFIED_REPLICATION 1958#undef MODIFIED_TIMEOUT 1959#undef MODIFIED_EXEC 1960 1961 pjdlog_info("Configuration reloaded successfully."); 1962} 1963 1964static void 1965guard_one(struct hast_resource *res, unsigned int ncomp) 1966{ 1967 struct proto_conn *in, *out; 1968 1969 if (!ISREMOTE(ncomp)) 1970 return; 1971 1972 rw_rlock(&hio_remote_lock[ncomp]); 1973 1974 if (!real_remote(res)) { 1975 rw_unlock(&hio_remote_lock[ncomp]); 1976 return; 1977 } 1978 1979 if (ISCONNECTED(res, ncomp)) { 1980 PJDLOG_ASSERT(res->hr_remotein != NULL); 1981 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1982 rw_unlock(&hio_remote_lock[ncomp]); 1983 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 1984 res->hr_remoteaddr); 1985 return; 1986 } 1987 1988 PJDLOG_ASSERT(res->hr_remotein == NULL); 1989 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1990 /* 1991 * Upgrade the lock. It doesn't have to be atomic as no other thread 1992 * can change connection status from disconnected to connected. 1993 */ 1994 rw_unlock(&hio_remote_lock[ncomp]); 1995 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 1996 res->hr_remoteaddr); 1997 in = out = NULL; 1998 if (init_remote(res, &in, &out)) { 1999 rw_wlock(&hio_remote_lock[ncomp]); 2000 PJDLOG_ASSERT(res->hr_remotein == NULL); 2001 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2002 PJDLOG_ASSERT(in != NULL && out != NULL); 2003 res->hr_remotein = in; 2004 res->hr_remoteout = out; 2005 rw_unlock(&hio_remote_lock[ncomp]); 2006 pjdlog_info("Successfully reconnected to %s.", 2007 res->hr_remoteaddr); 2008 sync_start(); 2009 } else { 2010 /* Both connections should be NULL. */ 2011 PJDLOG_ASSERT(res->hr_remotein == NULL); 2012 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2013 PJDLOG_ASSERT(in == NULL && out == NULL); 2014 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2015 res->hr_remoteaddr); 2016 } 2017} 2018 2019/* 2020 * Thread guards remote connections and reconnects when needed, handles 2021 * signals, etc. 2022 */ 2023static void * 2024guard_thread(void *arg) 2025{ 2026 struct hast_resource *res = arg; 2027 unsigned int ii, ncomps; 2028 struct timespec timeout; 2029 time_t lastcheck, now; 2030 sigset_t mask; 2031 int signo; 2032 2033 ncomps = HAST_NCOMPONENTS; 2034 lastcheck = time(NULL); 2035 2036 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2037 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2038 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2039 2040 timeout.tv_sec = RETRY_SLEEP; 2041 timeout.tv_nsec = 0; 2042 signo = -1; 2043 2044 for (;;) { 2045 switch (signo) { 2046 case SIGINT: 2047 case SIGTERM: 2048 sigexit_received = true; 2049 primary_exitx(EX_OK, 2050 "Termination signal received, exiting."); 2051 break; 2052 default: 2053 break; 2054 } 2055 2056 pjdlog_debug(2, "remote_guard: Checking connections."); 2057 now = time(NULL); 2058 if (lastcheck + RETRY_SLEEP <= now) { 2059 for (ii = 0; ii < ncomps; ii++) 2060 guard_one(res, ii); 2061 lastcheck = now; 2062 } 2063 signo = sigtimedwait(&mask, NULL, &timeout); 2064 } 2065 /* NOTREACHED */ 2066 return (NULL); 2067} 2068