primary.c revision 219482
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 219482 2011-03-11 12:12:35Z trociny $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <signal.h> 49#include <stdint.h> 50#include <stdio.h> 51#include <string.h> 52#include <sysexits.h> 53#include <unistd.h> 54 55#include <activemap.h> 56#include <nv.h> 57#include <rangelock.h> 58 59#include "control.h" 60#include "event.h" 61#include "hast.h" 62#include "hast_proto.h" 63#include "hastd.h" 64#include "hooks.h" 65#include "metadata.h" 66#include "proto.h" 67#include "pjdlog.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to comunicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 TAILQ_ENTRY(hio) *hio_next; 93}; 94#define hio_free_next hio_next[0] 95#define hio_done_next hio_next[0] 96 97/* 98 * Free list holds unused structures. When free list is empty, we have to wait 99 * until some in-progress requests are freed. 100 */ 101static TAILQ_HEAD(, hio) hio_free_list; 102static pthread_mutex_t hio_free_list_lock; 103static pthread_cond_t hio_free_list_cond; 104/* 105 * There is one send list for every component. One requests is placed on all 106 * send lists - each component gets the same request, but each component is 107 * responsible for managing his own send list. 108 */ 109static TAILQ_HEAD(, hio) *hio_send_list; 110static pthread_mutex_t *hio_send_list_lock; 111static pthread_cond_t *hio_send_list_cond; 112/* 113 * There is one recv list for every component, although local components don't 114 * use recv lists as local requests are done synchronously. 115 */ 116static TAILQ_HEAD(, hio) *hio_recv_list; 117static pthread_mutex_t *hio_recv_list_lock; 118static pthread_cond_t *hio_recv_list_cond; 119/* 120 * Request is placed on done list by the slowest component (the one that 121 * decreased hio_countdown from 1 to 0). 122 */ 123static TAILQ_HEAD(, hio) hio_done_list; 124static pthread_mutex_t hio_done_list_lock; 125static pthread_cond_t hio_done_list_cond; 126/* 127 * Structure below are for interaction with sync thread. 128 */ 129static bool sync_inprogress; 130static pthread_mutex_t sync_lock; 131static pthread_cond_t sync_cond; 132/* 133 * The lock below allows to synchornize access to remote connections. 134 */ 135static pthread_rwlock_t *hio_remote_lock; 136 137/* 138 * Lock to synchronize metadata updates. Also synchronize access to 139 * hr_primary_localcnt and hr_primary_remotecnt fields. 140 */ 141static pthread_mutex_t metadata_lock; 142 143/* 144 * Maximum number of outstanding I/O requests. 145 */ 146#define HAST_HIO_MAX 256 147/* 148 * Number of components. At this point there are only two components: local 149 * and remote, but in the future it might be possible to use multiple local 150 * and remote components. 151 */ 152#define HAST_NCOMPONENTS 2 153/* 154 * Number of seconds to sleep between reconnect retries or keepalive packets. 155 */ 156#define RETRY_SLEEP 10 157 158#define ISCONNECTED(res, no) \ 159 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 160 161#define QUEUE_INSERT1(hio, name, ncomp) do { \ 162 bool _wakeup; \ 163 \ 164 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 165 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 166 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 167 hio_next[(ncomp)]); \ 168 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 169 if (_wakeup) \ 170 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 171} while (0) 172#define QUEUE_INSERT2(hio, name) do { \ 173 bool _wakeup; \ 174 \ 175 mtx_lock(&hio_##name##_list_lock); \ 176 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 177 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 178 mtx_unlock(&hio_##name##_list_lock); \ 179 if (_wakeup) \ 180 cv_signal(&hio_##name##_list_cond); \ 181} while (0) 182#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 183 bool _last; \ 184 \ 185 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 186 _last = false; \ 187 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 188 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 189 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 190 if ((timeout) != 0) \ 191 _last = true; \ 192 } \ 193 if (hio != NULL) { \ 194 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 195 hio_next[(ncomp)]); \ 196 } \ 197 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 198} while (0) 199#define QUEUE_TAKE2(hio, name) do { \ 200 mtx_lock(&hio_##name##_list_lock); \ 201 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 202 cv_wait(&hio_##name##_list_cond, \ 203 &hio_##name##_list_lock); \ 204 } \ 205 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 206 mtx_unlock(&hio_##name##_list_lock); \ 207} while (0) 208 209#define SYNCREQ(hio) do { \ 210 (hio)->hio_ggio.gctl_unit = -1; \ 211 (hio)->hio_ggio.gctl_seq = 1; \ 212} while (0) 213#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 214#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 215#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 216 217static struct hast_resource *gres; 218 219static pthread_mutex_t range_lock; 220static struct rangelocks *range_regular; 221static bool range_regular_wait; 222static pthread_cond_t range_regular_cond; 223static struct rangelocks *range_sync; 224static bool range_sync_wait; 225static pthread_cond_t range_sync_cond; 226 227static void *ggate_recv_thread(void *arg); 228static void *local_send_thread(void *arg); 229static void *remote_send_thread(void *arg); 230static void *remote_recv_thread(void *arg); 231static void *ggate_send_thread(void *arg); 232static void *sync_thread(void *arg); 233static void *guard_thread(void *arg); 234 235static void 236cleanup(struct hast_resource *res) 237{ 238 int rerrno; 239 240 /* Remember errno. */ 241 rerrno = errno; 242 243 /* Destroy ggate provider if we created one. */ 244 if (res->hr_ggateunit >= 0) { 245 struct g_gate_ctl_destroy ggiod; 246 247 bzero(&ggiod, sizeof(ggiod)); 248 ggiod.gctl_version = G_GATE_VERSION; 249 ggiod.gctl_unit = res->hr_ggateunit; 250 ggiod.gctl_force = 1; 251 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 252 pjdlog_errno(LOG_WARNING, 253 "Unable to destroy hast/%s device", 254 res->hr_provname); 255 } 256 res->hr_ggateunit = -1; 257 } 258 259 /* Restore errno. */ 260 errno = rerrno; 261} 262 263static __dead2 void 264primary_exit(int exitcode, const char *fmt, ...) 265{ 266 va_list ap; 267 268 PJDLOG_ASSERT(exitcode != EX_OK); 269 va_start(ap, fmt); 270 pjdlogv_errno(LOG_ERR, fmt, ap); 271 va_end(ap); 272 cleanup(gres); 273 exit(exitcode); 274} 275 276static __dead2 void 277primary_exitx(int exitcode, const char *fmt, ...) 278{ 279 va_list ap; 280 281 va_start(ap, fmt); 282 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 283 va_end(ap); 284 cleanup(gres); 285 exit(exitcode); 286} 287 288static int 289hast_activemap_flush(struct hast_resource *res) 290{ 291 const unsigned char *buf; 292 size_t size; 293 294 buf = activemap_bitmap(res->hr_amp, &size); 295 PJDLOG_ASSERT(buf != NULL); 296 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 297 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 298 (ssize_t)size) { 299 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 300 "Unable to flush activemap to disk")); 301 return (-1); 302 } 303 return (0); 304} 305 306static bool 307real_remote(const struct hast_resource *res) 308{ 309 310 return (strcmp(res->hr_remoteaddr, "none") != 0); 311} 312 313static void 314init_environment(struct hast_resource *res __unused) 315{ 316 struct hio *hio; 317 unsigned int ii, ncomps; 318 319 /* 320 * In the future it might be per-resource value. 321 */ 322 ncomps = HAST_NCOMPONENTS; 323 324 /* 325 * Allocate memory needed by lists. 326 */ 327 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 328 if (hio_send_list == NULL) { 329 primary_exitx(EX_TEMPFAIL, 330 "Unable to allocate %zu bytes of memory for send lists.", 331 sizeof(hio_send_list[0]) * ncomps); 332 } 333 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 334 if (hio_send_list_lock == NULL) { 335 primary_exitx(EX_TEMPFAIL, 336 "Unable to allocate %zu bytes of memory for send list locks.", 337 sizeof(hio_send_list_lock[0]) * ncomps); 338 } 339 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 340 if (hio_send_list_cond == NULL) { 341 primary_exitx(EX_TEMPFAIL, 342 "Unable to allocate %zu bytes of memory for send list condition variables.", 343 sizeof(hio_send_list_cond[0]) * ncomps); 344 } 345 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 346 if (hio_recv_list == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for recv lists.", 349 sizeof(hio_recv_list[0]) * ncomps); 350 } 351 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 352 if (hio_recv_list_lock == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for recv list locks.", 355 sizeof(hio_recv_list_lock[0]) * ncomps); 356 } 357 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 358 if (hio_recv_list_cond == NULL) { 359 primary_exitx(EX_TEMPFAIL, 360 "Unable to allocate %zu bytes of memory for recv list condition variables.", 361 sizeof(hio_recv_list_cond[0]) * ncomps); 362 } 363 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 364 if (hio_remote_lock == NULL) { 365 primary_exitx(EX_TEMPFAIL, 366 "Unable to allocate %zu bytes of memory for remote connections locks.", 367 sizeof(hio_remote_lock[0]) * ncomps); 368 } 369 370 /* 371 * Initialize lists, their locks and theirs condition variables. 372 */ 373 TAILQ_INIT(&hio_free_list); 374 mtx_init(&hio_free_list_lock); 375 cv_init(&hio_free_list_cond); 376 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 377 TAILQ_INIT(&hio_send_list[ii]); 378 mtx_init(&hio_send_list_lock[ii]); 379 cv_init(&hio_send_list_cond[ii]); 380 TAILQ_INIT(&hio_recv_list[ii]); 381 mtx_init(&hio_recv_list_lock[ii]); 382 cv_init(&hio_recv_list_cond[ii]); 383 rw_init(&hio_remote_lock[ii]); 384 } 385 TAILQ_INIT(&hio_done_list); 386 mtx_init(&hio_done_list_lock); 387 cv_init(&hio_done_list_cond); 388 mtx_init(&metadata_lock); 389 390 /* 391 * Allocate requests pool and initialize requests. 392 */ 393 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 394 hio = malloc(sizeof(*hio)); 395 if (hio == NULL) { 396 primary_exitx(EX_TEMPFAIL, 397 "Unable to allocate %zu bytes of memory for hio request.", 398 sizeof(*hio)); 399 } 400 hio->hio_countdown = 0; 401 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 402 if (hio->hio_errors == NULL) { 403 primary_exitx(EX_TEMPFAIL, 404 "Unable allocate %zu bytes of memory for hio errors.", 405 sizeof(hio->hio_errors[0]) * ncomps); 406 } 407 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 408 if (hio->hio_next == NULL) { 409 primary_exitx(EX_TEMPFAIL, 410 "Unable allocate %zu bytes of memory for hio_next field.", 411 sizeof(hio->hio_next[0]) * ncomps); 412 } 413 hio->hio_ggio.gctl_version = G_GATE_VERSION; 414 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 415 if (hio->hio_ggio.gctl_data == NULL) { 416 primary_exitx(EX_TEMPFAIL, 417 "Unable to allocate %zu bytes of memory for gctl_data.", 418 MAXPHYS); 419 } 420 hio->hio_ggio.gctl_length = MAXPHYS; 421 hio->hio_ggio.gctl_error = 0; 422 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 423 } 424} 425 426static bool 427init_resuid(struct hast_resource *res) 428{ 429 430 mtx_lock(&metadata_lock); 431 if (res->hr_resuid != 0) { 432 mtx_unlock(&metadata_lock); 433 return (false); 434 } else { 435 /* Initialize unique resource identifier. */ 436 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 437 mtx_unlock(&metadata_lock); 438 if (metadata_write(res) < 0) 439 exit(EX_NOINPUT); 440 return (true); 441 } 442} 443 444static void 445init_local(struct hast_resource *res) 446{ 447 unsigned char *buf; 448 size_t mapsize; 449 450 if (metadata_read(res, true) < 0) 451 exit(EX_NOINPUT); 452 mtx_init(&res->hr_amp_lock); 453 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 454 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 455 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 456 } 457 mtx_init(&range_lock); 458 cv_init(&range_regular_cond); 459 if (rangelock_init(&range_regular) < 0) 460 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 461 cv_init(&range_sync_cond); 462 if (rangelock_init(&range_sync) < 0) 463 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 464 mapsize = activemap_ondisk_size(res->hr_amp); 465 buf = calloc(1, mapsize); 466 if (buf == NULL) { 467 primary_exitx(EX_TEMPFAIL, 468 "Unable to allocate buffer for activemap."); 469 } 470 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 471 (ssize_t)mapsize) { 472 primary_exit(EX_NOINPUT, "Unable to read activemap"); 473 } 474 activemap_copyin(res->hr_amp, buf, mapsize); 475 free(buf); 476 if (res->hr_resuid != 0) 477 return; 478 /* 479 * We're using provider for the first time. Initialize local and remote 480 * counters. We don't initialize resuid here, as we want to do it just 481 * in time. The reason for this is that we want to inform secondary 482 * that there were no writes yet, so there is no need to synchronize 483 * anything. 484 */ 485 res->hr_primary_localcnt = 1; 486 res->hr_primary_remotecnt = 0; 487 if (metadata_write(res) < 0) 488 exit(EX_NOINPUT); 489} 490 491static int 492primary_connect(struct hast_resource *res, struct proto_conn **connp) 493{ 494 struct proto_conn *conn; 495 int16_t val; 496 497 val = 1; 498 if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 499 primary_exit(EX_TEMPFAIL, 500 "Unable to send connection request to parent"); 501 } 502 if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 503 primary_exit(EX_TEMPFAIL, 504 "Unable to receive reply to connection request from parent"); 505 } 506 if (val != 0) { 507 errno = val; 508 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 509 res->hr_remoteaddr); 510 return (-1); 511 } 512 if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 513 primary_exit(EX_TEMPFAIL, 514 "Unable to receive connection from parent"); 515 } 516 if (proto_connect_wait(conn, HAST_TIMEOUT) < 0) { 517 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 518 res->hr_remoteaddr); 519 proto_close(conn); 520 return (-1); 521 } 522 /* Error in setting timeout is not critical, but why should it fail? */ 523 if (proto_timeout(conn, res->hr_timeout) < 0) 524 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 525 526 *connp = conn; 527 528 return (0); 529} 530 531static bool 532init_remote(struct hast_resource *res, struct proto_conn **inp, 533 struct proto_conn **outp) 534{ 535 struct proto_conn *in, *out; 536 struct nv *nvout, *nvin; 537 const unsigned char *token; 538 unsigned char *map; 539 const char *errmsg; 540 int32_t extentsize; 541 int64_t datasize; 542 uint32_t mapsize; 543 size_t size; 544 545 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 546 PJDLOG_ASSERT(real_remote(res)); 547 548 in = out = NULL; 549 errmsg = NULL; 550 551 if (primary_connect(res, &out) == -1) 552 return (false); 553 554 /* 555 * First handshake step. 556 * Setup outgoing connection with remote node. 557 */ 558 nvout = nv_alloc(); 559 nv_add_string(nvout, res->hr_name, "resource"); 560 if (nv_error(nvout) != 0) { 561 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 562 "Unable to allocate header for connection with %s", 563 res->hr_remoteaddr); 564 nv_free(nvout); 565 goto close; 566 } 567 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 568 pjdlog_errno(LOG_WARNING, 569 "Unable to send handshake header to %s", 570 res->hr_remoteaddr); 571 nv_free(nvout); 572 goto close; 573 } 574 nv_free(nvout); 575 if (hast_proto_recv_hdr(out, &nvin) < 0) { 576 pjdlog_errno(LOG_WARNING, 577 "Unable to receive handshake header from %s", 578 res->hr_remoteaddr); 579 goto close; 580 } 581 errmsg = nv_get_string(nvin, "errmsg"); 582 if (errmsg != NULL) { 583 pjdlog_warning("%s", errmsg); 584 nv_free(nvin); 585 goto close; 586 } 587 token = nv_get_uint8_array(nvin, &size, "token"); 588 if (token == NULL) { 589 pjdlog_warning("Handshake header from %s has no 'token' field.", 590 res->hr_remoteaddr); 591 nv_free(nvin); 592 goto close; 593 } 594 if (size != sizeof(res->hr_token)) { 595 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 596 res->hr_remoteaddr, size, sizeof(res->hr_token)); 597 nv_free(nvin); 598 goto close; 599 } 600 bcopy(token, res->hr_token, sizeof(res->hr_token)); 601 nv_free(nvin); 602 603 /* 604 * Second handshake step. 605 * Setup incoming connection with remote node. 606 */ 607 if (primary_connect(res, &in) == -1) 608 goto close; 609 610 nvout = nv_alloc(); 611 nv_add_string(nvout, res->hr_name, "resource"); 612 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 613 "token"); 614 if (res->hr_resuid == 0) { 615 /* 616 * The resuid field was not yet initialized. 617 * Because we do synchronization inside init_resuid(), it is 618 * possible that someone already initialized it, the function 619 * will return false then, but if we successfully initialized 620 * it, we will get true. True means that there were no writes 621 * to this resource yet and we want to inform secondary that 622 * synchronization is not needed by sending "virgin" argument. 623 */ 624 if (init_resuid(res)) 625 nv_add_int8(nvout, 1, "virgin"); 626 } 627 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 628 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 629 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 630 if (nv_error(nvout) != 0) { 631 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 632 "Unable to allocate header for connection with %s", 633 res->hr_remoteaddr); 634 nv_free(nvout); 635 goto close; 636 } 637 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 638 pjdlog_errno(LOG_WARNING, 639 "Unable to send handshake header to %s", 640 res->hr_remoteaddr); 641 nv_free(nvout); 642 goto close; 643 } 644 nv_free(nvout); 645 if (hast_proto_recv_hdr(out, &nvin) < 0) { 646 pjdlog_errno(LOG_WARNING, 647 "Unable to receive handshake header from %s", 648 res->hr_remoteaddr); 649 goto close; 650 } 651 errmsg = nv_get_string(nvin, "errmsg"); 652 if (errmsg != NULL) { 653 pjdlog_warning("%s", errmsg); 654 nv_free(nvin); 655 goto close; 656 } 657 datasize = nv_get_int64(nvin, "datasize"); 658 if (datasize != res->hr_datasize) { 659 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 660 (intmax_t)res->hr_datasize, (intmax_t)datasize); 661 nv_free(nvin); 662 goto close; 663 } 664 extentsize = nv_get_int32(nvin, "extentsize"); 665 if (extentsize != res->hr_extentsize) { 666 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 667 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 668 nv_free(nvin); 669 goto close; 670 } 671 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 672 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 673 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 674 map = NULL; 675 mapsize = nv_get_uint32(nvin, "mapsize"); 676 if (mapsize > 0) { 677 map = malloc(mapsize); 678 if (map == NULL) { 679 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 680 (uintmax_t)mapsize); 681 nv_free(nvin); 682 goto close; 683 } 684 /* 685 * Remote node have some dirty extents on its own, lets 686 * download its activemap. 687 */ 688 if (hast_proto_recv_data(res, out, nvin, map, 689 mapsize) < 0) { 690 pjdlog_errno(LOG_ERR, 691 "Unable to receive remote activemap"); 692 nv_free(nvin); 693 free(map); 694 goto close; 695 } 696 /* 697 * Merge local and remote bitmaps. 698 */ 699 activemap_merge(res->hr_amp, map, mapsize); 700 free(map); 701 /* 702 * Now that we merged bitmaps from both nodes, flush it to the 703 * disk before we start to synchronize. 704 */ 705 (void)hast_activemap_flush(res); 706 } 707 nv_free(nvin); 708 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 709 if (inp != NULL && outp != NULL) { 710 *inp = in; 711 *outp = out; 712 } else { 713 res->hr_remotein = in; 714 res->hr_remoteout = out; 715 } 716 event_send(res, EVENT_CONNECT); 717 return (true); 718close: 719 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 720 event_send(res, EVENT_SPLITBRAIN); 721 proto_close(out); 722 if (in != NULL) 723 proto_close(in); 724 return (false); 725} 726 727static void 728sync_start(void) 729{ 730 731 mtx_lock(&sync_lock); 732 sync_inprogress = true; 733 mtx_unlock(&sync_lock); 734 cv_signal(&sync_cond); 735} 736 737static void 738sync_stop(void) 739{ 740 741 mtx_lock(&sync_lock); 742 if (sync_inprogress) 743 sync_inprogress = false; 744 mtx_unlock(&sync_lock); 745} 746 747static void 748init_ggate(struct hast_resource *res) 749{ 750 struct g_gate_ctl_create ggiocreate; 751 struct g_gate_ctl_cancel ggiocancel; 752 753 /* 754 * We communicate with ggate via /dev/ggctl. Open it. 755 */ 756 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 757 if (res->hr_ggatefd < 0) 758 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 759 /* 760 * Create provider before trying to connect, as connection failure 761 * is not critical, but may take some time. 762 */ 763 bzero(&ggiocreate, sizeof(ggiocreate)); 764 ggiocreate.gctl_version = G_GATE_VERSION; 765 ggiocreate.gctl_mediasize = res->hr_datasize; 766 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 767 ggiocreate.gctl_flags = 0; 768 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 769 ggiocreate.gctl_timeout = 0; 770 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 771 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 772 res->hr_provname); 773 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 774 pjdlog_info("Device hast/%s created.", res->hr_provname); 775 res->hr_ggateunit = ggiocreate.gctl_unit; 776 return; 777 } 778 if (errno != EEXIST) { 779 primary_exit(EX_OSERR, "Unable to create hast/%s device", 780 res->hr_provname); 781 } 782 pjdlog_debug(1, 783 "Device hast/%s already exists, we will try to take it over.", 784 res->hr_provname); 785 /* 786 * If we received EEXIST, we assume that the process who created the 787 * provider died and didn't clean up. In that case we will start from 788 * where he left of. 789 */ 790 bzero(&ggiocancel, sizeof(ggiocancel)); 791 ggiocancel.gctl_version = G_GATE_VERSION; 792 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 793 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 794 res->hr_provname); 795 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 796 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 797 res->hr_ggateunit = ggiocancel.gctl_unit; 798 return; 799 } 800 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 801 res->hr_provname); 802} 803 804void 805hastd_primary(struct hast_resource *res) 806{ 807 pthread_t td; 808 pid_t pid; 809 int error, mode, debuglevel; 810 811 /* 812 * Create communication channel for sending control commands from 813 * parent to child. 814 */ 815 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 816 /* TODO: There's no need for this to be fatal error. */ 817 KEEP_ERRNO((void)pidfile_remove(pfh)); 818 pjdlog_exit(EX_OSERR, 819 "Unable to create control sockets between parent and child"); 820 } 821 /* 822 * Create communication channel for sending events from child to parent. 823 */ 824 if (proto_client("socketpair://", &res->hr_event) < 0) { 825 /* TODO: There's no need for this to be fatal error. */ 826 KEEP_ERRNO((void)pidfile_remove(pfh)); 827 pjdlog_exit(EX_OSERR, 828 "Unable to create event sockets between child and parent"); 829 } 830 /* 831 * Create communication channel for sending connection requests from 832 * child to parent. 833 */ 834 if (proto_client("socketpair://", &res->hr_conn) < 0) { 835 /* TODO: There's no need for this to be fatal error. */ 836 KEEP_ERRNO((void)pidfile_remove(pfh)); 837 pjdlog_exit(EX_OSERR, 838 "Unable to create connection sockets between child and parent"); 839 } 840 841 pid = fork(); 842 if (pid < 0) { 843 /* TODO: There's no need for this to be fatal error. */ 844 KEEP_ERRNO((void)pidfile_remove(pfh)); 845 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 846 } 847 848 if (pid > 0) { 849 /* This is parent. */ 850 /* Declare that we are receiver. */ 851 proto_recv(res->hr_event, NULL, 0); 852 proto_recv(res->hr_conn, NULL, 0); 853 /* Declare that we are sender. */ 854 proto_send(res->hr_ctrl, NULL, 0); 855 res->hr_workerpid = pid; 856 return; 857 } 858 859 gres = res; 860 mode = pjdlog_mode_get(); 861 debuglevel = pjdlog_debug_get(); 862 863 /* Declare that we are sender. */ 864 proto_send(res->hr_event, NULL, 0); 865 proto_send(res->hr_conn, NULL, 0); 866 /* Declare that we are receiver. */ 867 proto_recv(res->hr_ctrl, NULL, 0); 868 descriptors_cleanup(res); 869 870 descriptors_assert(res, mode); 871 872 pjdlog_init(mode); 873 pjdlog_debug_set(debuglevel); 874 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 875 setproctitle("%s (primary)", res->hr_name); 876 877 init_local(res); 878 init_ggate(res); 879 init_environment(res); 880 881 if (drop_privs() != 0) { 882 cleanup(res); 883 exit(EX_CONFIG); 884 } 885 pjdlog_info("Privileges successfully dropped."); 886 887 /* 888 * Create the guard thread first, so we can handle signals from the 889 * very begining. 890 */ 891 error = pthread_create(&td, NULL, guard_thread, res); 892 PJDLOG_ASSERT(error == 0); 893 /* 894 * Create the control thread before sending any event to the parent, 895 * as we can deadlock when parent sends control request to worker, 896 * but worker has no control thread started yet, so parent waits. 897 * In the meantime worker sends an event to the parent, but parent 898 * is unable to handle the event, because it waits for control 899 * request response. 900 */ 901 error = pthread_create(&td, NULL, ctrl_thread, res); 902 PJDLOG_ASSERT(error == 0); 903 if (real_remote(res) && init_remote(res, NULL, NULL)) 904 sync_start(); 905 error = pthread_create(&td, NULL, ggate_recv_thread, res); 906 PJDLOG_ASSERT(error == 0); 907 error = pthread_create(&td, NULL, local_send_thread, res); 908 PJDLOG_ASSERT(error == 0); 909 error = pthread_create(&td, NULL, remote_send_thread, res); 910 PJDLOG_ASSERT(error == 0); 911 error = pthread_create(&td, NULL, remote_recv_thread, res); 912 PJDLOG_ASSERT(error == 0); 913 error = pthread_create(&td, NULL, ggate_send_thread, res); 914 PJDLOG_ASSERT(error == 0); 915 (void)sync_thread(res); 916} 917 918static void 919reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 920{ 921 char msg[1024]; 922 va_list ap; 923 int len; 924 925 va_start(ap, fmt); 926 len = vsnprintf(msg, sizeof(msg), fmt, ap); 927 va_end(ap); 928 if ((size_t)len < sizeof(msg)) { 929 switch (ggio->gctl_cmd) { 930 case BIO_READ: 931 (void)snprintf(msg + len, sizeof(msg) - len, 932 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 933 (uintmax_t)ggio->gctl_length); 934 break; 935 case BIO_DELETE: 936 (void)snprintf(msg + len, sizeof(msg) - len, 937 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 938 (uintmax_t)ggio->gctl_length); 939 break; 940 case BIO_FLUSH: 941 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 942 break; 943 case BIO_WRITE: 944 (void)snprintf(msg + len, sizeof(msg) - len, 945 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 946 (uintmax_t)ggio->gctl_length); 947 break; 948 default: 949 (void)snprintf(msg + len, sizeof(msg) - len, 950 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 951 break; 952 } 953 } 954 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 955} 956 957static void 958remote_close(struct hast_resource *res, int ncomp) 959{ 960 961 rw_wlock(&hio_remote_lock[ncomp]); 962 /* 963 * A race is possible between dropping rlock and acquiring wlock - 964 * another thread can close connection in-between. 965 */ 966 if (!ISCONNECTED(res, ncomp)) { 967 PJDLOG_ASSERT(res->hr_remotein == NULL); 968 PJDLOG_ASSERT(res->hr_remoteout == NULL); 969 rw_unlock(&hio_remote_lock[ncomp]); 970 return; 971 } 972 973 PJDLOG_ASSERT(res->hr_remotein != NULL); 974 PJDLOG_ASSERT(res->hr_remoteout != NULL); 975 976 pjdlog_debug(2, "Closing incoming connection to %s.", 977 res->hr_remoteaddr); 978 proto_close(res->hr_remotein); 979 res->hr_remotein = NULL; 980 pjdlog_debug(2, "Closing outgoing connection to %s.", 981 res->hr_remoteaddr); 982 proto_close(res->hr_remoteout); 983 res->hr_remoteout = NULL; 984 985 rw_unlock(&hio_remote_lock[ncomp]); 986 987 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 988 989 /* 990 * Stop synchronization if in-progress. 991 */ 992 sync_stop(); 993 994 event_send(res, EVENT_DISCONNECT); 995} 996 997/* 998 * Thread receives ggate I/O requests from the kernel and passes them to 999 * appropriate threads: 1000 * WRITE - always goes to both local_send and remote_send threads 1001 * READ (when the block is up-to-date on local component) - 1002 * only local_send thread 1003 * READ (when the block isn't up-to-date on local component) - 1004 * only remote_send thread 1005 * DELETE - always goes to both local_send and remote_send threads 1006 * FLUSH - always goes to both local_send and remote_send threads 1007 */ 1008static void * 1009ggate_recv_thread(void *arg) 1010{ 1011 struct hast_resource *res = arg; 1012 struct g_gate_ctl_io *ggio; 1013 struct hio *hio; 1014 unsigned int ii, ncomp, ncomps; 1015 int error; 1016 1017 ncomps = HAST_NCOMPONENTS; 1018 1019 for (;;) { 1020 pjdlog_debug(2, "ggate_recv: Taking free request."); 1021 QUEUE_TAKE2(hio, free); 1022 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1023 ggio = &hio->hio_ggio; 1024 ggio->gctl_unit = res->hr_ggateunit; 1025 ggio->gctl_length = MAXPHYS; 1026 ggio->gctl_error = 0; 1027 pjdlog_debug(2, 1028 "ggate_recv: (%p) Waiting for request from the kernel.", 1029 hio); 1030 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1031 if (sigexit_received) 1032 pthread_exit(NULL); 1033 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1034 } 1035 error = ggio->gctl_error; 1036 switch (error) { 1037 case 0: 1038 break; 1039 case ECANCELED: 1040 /* Exit gracefully. */ 1041 if (!sigexit_received) { 1042 pjdlog_debug(2, 1043 "ggate_recv: (%p) Received cancel from the kernel.", 1044 hio); 1045 pjdlog_info("Received cancel from the kernel, exiting."); 1046 } 1047 pthread_exit(NULL); 1048 case ENOMEM: 1049 /* 1050 * Buffer too small? Impossible, we allocate MAXPHYS 1051 * bytes - request can't be bigger than that. 1052 */ 1053 /* FALLTHROUGH */ 1054 case ENXIO: 1055 default: 1056 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1057 strerror(error)); 1058 } 1059 for (ii = 0; ii < ncomps; ii++) 1060 hio->hio_errors[ii] = EINVAL; 1061 reqlog(LOG_DEBUG, 2, ggio, 1062 "ggate_recv: (%p) Request received from the kernel: ", 1063 hio); 1064 /* 1065 * Inform all components about new write request. 1066 * For read request prefer local component unless the given 1067 * range is out-of-date, then use remote component. 1068 */ 1069 switch (ggio->gctl_cmd) { 1070 case BIO_READ: 1071 pjdlog_debug(2, 1072 "ggate_recv: (%p) Moving request to the send queue.", 1073 hio); 1074 refcount_init(&hio->hio_countdown, 1); 1075 mtx_lock(&metadata_lock); 1076 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1077 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1078 /* 1079 * This range is up-to-date on local component, 1080 * so handle request locally. 1081 */ 1082 /* Local component is 0 for now. */ 1083 ncomp = 0; 1084 } else /* if (res->hr_syncsrc == 1085 HAST_SYNCSRC_SECONDARY) */ { 1086 PJDLOG_ASSERT(res->hr_syncsrc == 1087 HAST_SYNCSRC_SECONDARY); 1088 /* 1089 * This range is out-of-date on local component, 1090 * so send request to the remote node. 1091 */ 1092 /* Remote component is 1 for now. */ 1093 ncomp = 1; 1094 } 1095 mtx_unlock(&metadata_lock); 1096 QUEUE_INSERT1(hio, send, ncomp); 1097 break; 1098 case BIO_WRITE: 1099 if (res->hr_resuid == 0) { 1100 /* This is first write, initialize resuid. */ 1101 (void)init_resuid(res); 1102 } 1103 for (;;) { 1104 mtx_lock(&range_lock); 1105 if (rangelock_islocked(range_sync, 1106 ggio->gctl_offset, ggio->gctl_length)) { 1107 pjdlog_debug(2, 1108 "regular: Range offset=%jd length=%zu locked.", 1109 (intmax_t)ggio->gctl_offset, 1110 (size_t)ggio->gctl_length); 1111 range_regular_wait = true; 1112 cv_wait(&range_regular_cond, &range_lock); 1113 range_regular_wait = false; 1114 mtx_unlock(&range_lock); 1115 continue; 1116 } 1117 if (rangelock_add(range_regular, 1118 ggio->gctl_offset, ggio->gctl_length) < 0) { 1119 mtx_unlock(&range_lock); 1120 pjdlog_debug(2, 1121 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1122 (intmax_t)ggio->gctl_offset, 1123 (size_t)ggio->gctl_length); 1124 sleep(1); 1125 continue; 1126 } 1127 mtx_unlock(&range_lock); 1128 break; 1129 } 1130 mtx_lock(&res->hr_amp_lock); 1131 if (activemap_write_start(res->hr_amp, 1132 ggio->gctl_offset, ggio->gctl_length)) { 1133 (void)hast_activemap_flush(res); 1134 } 1135 mtx_unlock(&res->hr_amp_lock); 1136 /* FALLTHROUGH */ 1137 case BIO_DELETE: 1138 case BIO_FLUSH: 1139 pjdlog_debug(2, 1140 "ggate_recv: (%p) Moving request to the send queues.", 1141 hio); 1142 refcount_init(&hio->hio_countdown, ncomps); 1143 for (ii = 0; ii < ncomps; ii++) 1144 QUEUE_INSERT1(hio, send, ii); 1145 break; 1146 } 1147 } 1148 /* NOTREACHED */ 1149 return (NULL); 1150} 1151 1152/* 1153 * Thread reads from or writes to local component. 1154 * If local read fails, it redirects it to remote_send thread. 1155 */ 1156static void * 1157local_send_thread(void *arg) 1158{ 1159 struct hast_resource *res = arg; 1160 struct g_gate_ctl_io *ggio; 1161 struct hio *hio; 1162 unsigned int ncomp, rncomp; 1163 ssize_t ret; 1164 1165 /* Local component is 0 for now. */ 1166 ncomp = 0; 1167 /* Remote component is 1 for now. */ 1168 rncomp = 1; 1169 1170 for (;;) { 1171 pjdlog_debug(2, "local_send: Taking request."); 1172 QUEUE_TAKE1(hio, send, ncomp, 0); 1173 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1174 ggio = &hio->hio_ggio; 1175 switch (ggio->gctl_cmd) { 1176 case BIO_READ: 1177 ret = pread(res->hr_localfd, ggio->gctl_data, 1178 ggio->gctl_length, 1179 ggio->gctl_offset + res->hr_localoff); 1180 if (ret == ggio->gctl_length) 1181 hio->hio_errors[ncomp] = 0; 1182 else { 1183 /* 1184 * If READ failed, try to read from remote node. 1185 */ 1186 if (ret < 0) { 1187 reqlog(LOG_WARNING, 0, ggio, 1188 "Local request failed (%s), trying remote node. ", 1189 strerror(errno)); 1190 } else if (ret != ggio->gctl_length) { 1191 reqlog(LOG_WARNING, 0, ggio, 1192 "Local request failed (%zd != %jd), trying remote node. ", 1193 ret, (intmax_t)ggio->gctl_length); 1194 } 1195 QUEUE_INSERT1(hio, send, rncomp); 1196 continue; 1197 } 1198 break; 1199 case BIO_WRITE: 1200 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1201 ggio->gctl_length, 1202 ggio->gctl_offset + res->hr_localoff); 1203 if (ret < 0) { 1204 hio->hio_errors[ncomp] = errno; 1205 reqlog(LOG_WARNING, 0, ggio, 1206 "Local request failed (%s): ", 1207 strerror(errno)); 1208 } else if (ret != ggio->gctl_length) { 1209 hio->hio_errors[ncomp] = EIO; 1210 reqlog(LOG_WARNING, 0, ggio, 1211 "Local request failed (%zd != %jd): ", 1212 ret, (intmax_t)ggio->gctl_length); 1213 } else { 1214 hio->hio_errors[ncomp] = 0; 1215 } 1216 break; 1217 case BIO_DELETE: 1218 ret = g_delete(res->hr_localfd, 1219 ggio->gctl_offset + res->hr_localoff, 1220 ggio->gctl_length); 1221 if (ret < 0) { 1222 hio->hio_errors[ncomp] = errno; 1223 reqlog(LOG_WARNING, 0, ggio, 1224 "Local request failed (%s): ", 1225 strerror(errno)); 1226 } else { 1227 hio->hio_errors[ncomp] = 0; 1228 } 1229 break; 1230 case BIO_FLUSH: 1231 ret = g_flush(res->hr_localfd); 1232 if (ret < 0) { 1233 hio->hio_errors[ncomp] = errno; 1234 reqlog(LOG_WARNING, 0, ggio, 1235 "Local request failed (%s): ", 1236 strerror(errno)); 1237 } else { 1238 hio->hio_errors[ncomp] = 0; 1239 } 1240 break; 1241 } 1242 if (refcount_release(&hio->hio_countdown)) { 1243 if (ISSYNCREQ(hio)) { 1244 mtx_lock(&sync_lock); 1245 SYNCREQDONE(hio); 1246 mtx_unlock(&sync_lock); 1247 cv_signal(&sync_cond); 1248 } else { 1249 pjdlog_debug(2, 1250 "local_send: (%p) Moving request to the done queue.", 1251 hio); 1252 QUEUE_INSERT2(hio, done); 1253 } 1254 } 1255 } 1256 /* NOTREACHED */ 1257 return (NULL); 1258} 1259 1260static void 1261keepalive_send(struct hast_resource *res, unsigned int ncomp) 1262{ 1263 struct nv *nv; 1264 1265 rw_rlock(&hio_remote_lock[ncomp]); 1266 1267 if (!ISCONNECTED(res, ncomp)) { 1268 rw_unlock(&hio_remote_lock[ncomp]); 1269 return; 1270 } 1271 1272 PJDLOG_ASSERT(res->hr_remotein != NULL); 1273 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1274 1275 nv = nv_alloc(); 1276 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1277 if (nv_error(nv) != 0) { 1278 rw_unlock(&hio_remote_lock[ncomp]); 1279 nv_free(nv); 1280 pjdlog_debug(1, 1281 "keepalive_send: Unable to prepare header to send."); 1282 return; 1283 } 1284 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1285 rw_unlock(&hio_remote_lock[ncomp]); 1286 pjdlog_common(LOG_DEBUG, 1, errno, 1287 "keepalive_send: Unable to send request"); 1288 nv_free(nv); 1289 remote_close(res, ncomp); 1290 return; 1291 } 1292 1293 rw_unlock(&hio_remote_lock[ncomp]); 1294 nv_free(nv); 1295 pjdlog_debug(2, "keepalive_send: Request sent."); 1296} 1297 1298/* 1299 * Thread sends request to secondary node. 1300 */ 1301static void * 1302remote_send_thread(void *arg) 1303{ 1304 struct hast_resource *res = arg; 1305 struct g_gate_ctl_io *ggio; 1306 time_t lastcheck, now; 1307 struct hio *hio; 1308 struct nv *nv; 1309 unsigned int ncomp; 1310 bool wakeup; 1311 uint64_t offset, length; 1312 uint8_t cmd; 1313 void *data; 1314 1315 /* Remote component is 1 for now. */ 1316 ncomp = 1; 1317 lastcheck = time(NULL); 1318 1319 for (;;) { 1320 pjdlog_debug(2, "remote_send: Taking request."); 1321 QUEUE_TAKE1(hio, send, ncomp, RETRY_SLEEP); 1322 if (hio == NULL) { 1323 now = time(NULL); 1324 if (lastcheck + RETRY_SLEEP <= now) { 1325 keepalive_send(res, ncomp); 1326 lastcheck = now; 1327 } 1328 continue; 1329 } 1330 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1331 ggio = &hio->hio_ggio; 1332 switch (ggio->gctl_cmd) { 1333 case BIO_READ: 1334 cmd = HIO_READ; 1335 data = NULL; 1336 offset = ggio->gctl_offset; 1337 length = ggio->gctl_length; 1338 break; 1339 case BIO_WRITE: 1340 cmd = HIO_WRITE; 1341 data = ggio->gctl_data; 1342 offset = ggio->gctl_offset; 1343 length = ggio->gctl_length; 1344 break; 1345 case BIO_DELETE: 1346 cmd = HIO_DELETE; 1347 data = NULL; 1348 offset = ggio->gctl_offset; 1349 length = ggio->gctl_length; 1350 break; 1351 case BIO_FLUSH: 1352 cmd = HIO_FLUSH; 1353 data = NULL; 1354 offset = 0; 1355 length = 0; 1356 break; 1357 default: 1358 PJDLOG_ASSERT(!"invalid condition"); 1359 abort(); 1360 } 1361 nv = nv_alloc(); 1362 nv_add_uint8(nv, cmd, "cmd"); 1363 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1364 nv_add_uint64(nv, offset, "offset"); 1365 nv_add_uint64(nv, length, "length"); 1366 if (nv_error(nv) != 0) { 1367 hio->hio_errors[ncomp] = nv_error(nv); 1368 pjdlog_debug(2, 1369 "remote_send: (%p) Unable to prepare header to send.", 1370 hio); 1371 reqlog(LOG_ERR, 0, ggio, 1372 "Unable to prepare header to send (%s): ", 1373 strerror(nv_error(nv))); 1374 /* Move failed request immediately to the done queue. */ 1375 goto done_queue; 1376 } 1377 pjdlog_debug(2, 1378 "remote_send: (%p) Moving request to the recv queue.", 1379 hio); 1380 /* 1381 * Protect connection from disappearing. 1382 */ 1383 rw_rlock(&hio_remote_lock[ncomp]); 1384 if (!ISCONNECTED(res, ncomp)) { 1385 rw_unlock(&hio_remote_lock[ncomp]); 1386 hio->hio_errors[ncomp] = ENOTCONN; 1387 goto done_queue; 1388 } 1389 /* 1390 * Move the request to recv queue before sending it, because 1391 * in different order we can get reply before we move request 1392 * to recv queue. 1393 */ 1394 mtx_lock(&hio_recv_list_lock[ncomp]); 1395 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1396 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1397 mtx_unlock(&hio_recv_list_lock[ncomp]); 1398 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1399 data != NULL ? length : 0) < 0) { 1400 hio->hio_errors[ncomp] = errno; 1401 rw_unlock(&hio_remote_lock[ncomp]); 1402 pjdlog_debug(2, 1403 "remote_send: (%p) Unable to send request.", hio); 1404 reqlog(LOG_ERR, 0, ggio, 1405 "Unable to send request (%s): ", 1406 strerror(hio->hio_errors[ncomp])); 1407 remote_close(res, ncomp); 1408 /* 1409 * Take request back from the receive queue and move 1410 * it immediately to the done queue. 1411 */ 1412 mtx_lock(&hio_recv_list_lock[ncomp]); 1413 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1414 mtx_unlock(&hio_recv_list_lock[ncomp]); 1415 goto done_queue; 1416 } 1417 rw_unlock(&hio_remote_lock[ncomp]); 1418 nv_free(nv); 1419 if (wakeup) 1420 cv_signal(&hio_recv_list_cond[ncomp]); 1421 continue; 1422done_queue: 1423 nv_free(nv); 1424 if (ISSYNCREQ(hio)) { 1425 if (!refcount_release(&hio->hio_countdown)) 1426 continue; 1427 mtx_lock(&sync_lock); 1428 SYNCREQDONE(hio); 1429 mtx_unlock(&sync_lock); 1430 cv_signal(&sync_cond); 1431 continue; 1432 } 1433 if (ggio->gctl_cmd == BIO_WRITE) { 1434 mtx_lock(&res->hr_amp_lock); 1435 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1436 ggio->gctl_length)) { 1437 (void)hast_activemap_flush(res); 1438 } 1439 mtx_unlock(&res->hr_amp_lock); 1440 } 1441 if (!refcount_release(&hio->hio_countdown)) 1442 continue; 1443 pjdlog_debug(2, 1444 "remote_send: (%p) Moving request to the done queue.", 1445 hio); 1446 QUEUE_INSERT2(hio, done); 1447 } 1448 /* NOTREACHED */ 1449 return (NULL); 1450} 1451 1452/* 1453 * Thread receives answer from secondary node and passes it to ggate_send 1454 * thread. 1455 */ 1456static void * 1457remote_recv_thread(void *arg) 1458{ 1459 struct hast_resource *res = arg; 1460 struct g_gate_ctl_io *ggio; 1461 struct hio *hio; 1462 struct nv *nv; 1463 unsigned int ncomp; 1464 uint64_t seq; 1465 int error; 1466 1467 /* Remote component is 1 for now. */ 1468 ncomp = 1; 1469 1470 for (;;) { 1471 /* Wait until there is anything to receive. */ 1472 mtx_lock(&hio_recv_list_lock[ncomp]); 1473 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1474 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1475 cv_wait(&hio_recv_list_cond[ncomp], 1476 &hio_recv_list_lock[ncomp]); 1477 } 1478 mtx_unlock(&hio_recv_list_lock[ncomp]); 1479 rw_rlock(&hio_remote_lock[ncomp]); 1480 if (!ISCONNECTED(res, ncomp)) { 1481 rw_unlock(&hio_remote_lock[ncomp]); 1482 /* 1483 * Connection is dead, so move all pending requests to 1484 * the done queue (one-by-one). 1485 */ 1486 mtx_lock(&hio_recv_list_lock[ncomp]); 1487 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1488 PJDLOG_ASSERT(hio != NULL); 1489 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1490 hio_next[ncomp]); 1491 mtx_unlock(&hio_recv_list_lock[ncomp]); 1492 goto done_queue; 1493 } 1494 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1495 pjdlog_errno(LOG_ERR, 1496 "Unable to receive reply header"); 1497 rw_unlock(&hio_remote_lock[ncomp]); 1498 remote_close(res, ncomp); 1499 continue; 1500 } 1501 rw_unlock(&hio_remote_lock[ncomp]); 1502 seq = nv_get_uint64(nv, "seq"); 1503 if (seq == 0) { 1504 pjdlog_error("Header contains no 'seq' field."); 1505 nv_free(nv); 1506 continue; 1507 } 1508 mtx_lock(&hio_recv_list_lock[ncomp]); 1509 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1510 if (hio->hio_ggio.gctl_seq == seq) { 1511 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1512 hio_next[ncomp]); 1513 break; 1514 } 1515 } 1516 mtx_unlock(&hio_recv_list_lock[ncomp]); 1517 if (hio == NULL) { 1518 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1519 (uintmax_t)seq); 1520 nv_free(nv); 1521 continue; 1522 } 1523 error = nv_get_int16(nv, "error"); 1524 if (error != 0) { 1525 /* Request failed on remote side. */ 1526 hio->hio_errors[ncomp] = error; 1527 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1528 "Remote request failed (%s): ", strerror(error)); 1529 nv_free(nv); 1530 goto done_queue; 1531 } 1532 ggio = &hio->hio_ggio; 1533 switch (ggio->gctl_cmd) { 1534 case BIO_READ: 1535 rw_rlock(&hio_remote_lock[ncomp]); 1536 if (!ISCONNECTED(res, ncomp)) { 1537 rw_unlock(&hio_remote_lock[ncomp]); 1538 nv_free(nv); 1539 goto done_queue; 1540 } 1541 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1542 ggio->gctl_data, ggio->gctl_length) < 0) { 1543 hio->hio_errors[ncomp] = errno; 1544 pjdlog_errno(LOG_ERR, 1545 "Unable to receive reply data"); 1546 rw_unlock(&hio_remote_lock[ncomp]); 1547 nv_free(nv); 1548 remote_close(res, ncomp); 1549 goto done_queue; 1550 } 1551 rw_unlock(&hio_remote_lock[ncomp]); 1552 break; 1553 case BIO_WRITE: 1554 case BIO_DELETE: 1555 case BIO_FLUSH: 1556 break; 1557 default: 1558 PJDLOG_ASSERT(!"invalid condition"); 1559 abort(); 1560 } 1561 hio->hio_errors[ncomp] = 0; 1562 nv_free(nv); 1563done_queue: 1564 if (refcount_release(&hio->hio_countdown)) { 1565 if (ISSYNCREQ(hio)) { 1566 mtx_lock(&sync_lock); 1567 SYNCREQDONE(hio); 1568 mtx_unlock(&sync_lock); 1569 cv_signal(&sync_cond); 1570 } else { 1571 pjdlog_debug(2, 1572 "remote_recv: (%p) Moving request to the done queue.", 1573 hio); 1574 QUEUE_INSERT2(hio, done); 1575 } 1576 } 1577 } 1578 /* NOTREACHED */ 1579 return (NULL); 1580} 1581 1582/* 1583 * Thread sends answer to the kernel. 1584 */ 1585static void * 1586ggate_send_thread(void *arg) 1587{ 1588 struct hast_resource *res = arg; 1589 struct g_gate_ctl_io *ggio; 1590 struct hio *hio; 1591 unsigned int ii, ncomp, ncomps; 1592 1593 ncomps = HAST_NCOMPONENTS; 1594 1595 for (;;) { 1596 pjdlog_debug(2, "ggate_send: Taking request."); 1597 QUEUE_TAKE2(hio, done); 1598 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1599 ggio = &hio->hio_ggio; 1600 for (ii = 0; ii < ncomps; ii++) { 1601 if (hio->hio_errors[ii] == 0) { 1602 /* 1603 * One successful request is enough to declare 1604 * success. 1605 */ 1606 ggio->gctl_error = 0; 1607 break; 1608 } 1609 } 1610 if (ii == ncomps) { 1611 /* 1612 * None of the requests were successful. 1613 * Use first error. 1614 */ 1615 ggio->gctl_error = hio->hio_errors[0]; 1616 } 1617 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1618 mtx_lock(&res->hr_amp_lock); 1619 activemap_write_complete(res->hr_amp, 1620 ggio->gctl_offset, ggio->gctl_length); 1621 mtx_unlock(&res->hr_amp_lock); 1622 } 1623 if (ggio->gctl_cmd == BIO_WRITE) { 1624 /* 1625 * Unlock range we locked. 1626 */ 1627 mtx_lock(&range_lock); 1628 rangelock_del(range_regular, ggio->gctl_offset, 1629 ggio->gctl_length); 1630 if (range_sync_wait) 1631 cv_signal(&range_sync_cond); 1632 mtx_unlock(&range_lock); 1633 /* 1634 * Bump local count if this is first write after 1635 * connection failure with remote node. 1636 */ 1637 ncomp = 1; 1638 rw_rlock(&hio_remote_lock[ncomp]); 1639 if (!ISCONNECTED(res, ncomp)) { 1640 mtx_lock(&metadata_lock); 1641 if (res->hr_primary_localcnt == 1642 res->hr_secondary_remotecnt) { 1643 res->hr_primary_localcnt++; 1644 pjdlog_debug(1, 1645 "Increasing localcnt to %ju.", 1646 (uintmax_t)res->hr_primary_localcnt); 1647 (void)metadata_write(res); 1648 } 1649 mtx_unlock(&metadata_lock); 1650 } 1651 rw_unlock(&hio_remote_lock[ncomp]); 1652 } 1653 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1654 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1655 pjdlog_debug(2, 1656 "ggate_send: (%p) Moving request to the free queue.", hio); 1657 QUEUE_INSERT2(hio, free); 1658 } 1659 /* NOTREACHED */ 1660 return (NULL); 1661} 1662 1663/* 1664 * Thread synchronize local and remote components. 1665 */ 1666static void * 1667sync_thread(void *arg __unused) 1668{ 1669 struct hast_resource *res = arg; 1670 struct hio *hio; 1671 struct g_gate_ctl_io *ggio; 1672 struct timeval tstart, tend, tdiff; 1673 unsigned int ii, ncomp, ncomps; 1674 off_t offset, length, synced; 1675 bool dorewind; 1676 int syncext; 1677 1678 ncomps = HAST_NCOMPONENTS; 1679 dorewind = true; 1680 synced = 0; 1681 offset = -1; 1682 1683 for (;;) { 1684 mtx_lock(&sync_lock); 1685 if (offset >= 0 && !sync_inprogress) { 1686 gettimeofday(&tend, NULL); 1687 timersub(&tend, &tstart, &tdiff); 1688 pjdlog_info("Synchronization interrupted after %#.0T. " 1689 "%NB synchronized so far.", &tdiff, 1690 (intmax_t)synced); 1691 event_send(res, EVENT_SYNCINTR); 1692 } 1693 while (!sync_inprogress) { 1694 dorewind = true; 1695 synced = 0; 1696 cv_wait(&sync_cond, &sync_lock); 1697 } 1698 mtx_unlock(&sync_lock); 1699 /* 1700 * Obtain offset at which we should synchronize. 1701 * Rewind synchronization if needed. 1702 */ 1703 mtx_lock(&res->hr_amp_lock); 1704 if (dorewind) 1705 activemap_sync_rewind(res->hr_amp); 1706 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1707 if (syncext != -1) { 1708 /* 1709 * We synchronized entire syncext extent, we can mark 1710 * it as clean now. 1711 */ 1712 if (activemap_extent_complete(res->hr_amp, syncext)) 1713 (void)hast_activemap_flush(res); 1714 } 1715 mtx_unlock(&res->hr_amp_lock); 1716 if (dorewind) { 1717 dorewind = false; 1718 if (offset < 0) 1719 pjdlog_info("Nodes are in sync."); 1720 else { 1721 pjdlog_info("Synchronization started. %NB to go.", 1722 (intmax_t)(res->hr_extentsize * 1723 activemap_ndirty(res->hr_amp))); 1724 event_send(res, EVENT_SYNCSTART); 1725 gettimeofday(&tstart, NULL); 1726 } 1727 } 1728 if (offset < 0) { 1729 sync_stop(); 1730 pjdlog_debug(1, "Nothing to synchronize."); 1731 /* 1732 * Synchronization complete, make both localcnt and 1733 * remotecnt equal. 1734 */ 1735 ncomp = 1; 1736 rw_rlock(&hio_remote_lock[ncomp]); 1737 if (ISCONNECTED(res, ncomp)) { 1738 if (synced > 0) { 1739 int64_t bps; 1740 1741 gettimeofday(&tend, NULL); 1742 timersub(&tend, &tstart, &tdiff); 1743 bps = (int64_t)((double)synced / 1744 ((double)tdiff.tv_sec + 1745 (double)tdiff.tv_usec / 1000000)); 1746 pjdlog_info("Synchronization complete. " 1747 "%NB synchronized in %#.0lT (%NB/sec).", 1748 (intmax_t)synced, &tdiff, 1749 (intmax_t)bps); 1750 event_send(res, EVENT_SYNCDONE); 1751 } 1752 mtx_lock(&metadata_lock); 1753 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1754 res->hr_primary_localcnt = 1755 res->hr_secondary_localcnt; 1756 res->hr_primary_remotecnt = 1757 res->hr_secondary_remotecnt; 1758 pjdlog_debug(1, 1759 "Setting localcnt to %ju and remotecnt to %ju.", 1760 (uintmax_t)res->hr_primary_localcnt, 1761 (uintmax_t)res->hr_secondary_localcnt); 1762 (void)metadata_write(res); 1763 mtx_unlock(&metadata_lock); 1764 } 1765 rw_unlock(&hio_remote_lock[ncomp]); 1766 continue; 1767 } 1768 pjdlog_debug(2, "sync: Taking free request."); 1769 QUEUE_TAKE2(hio, free); 1770 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1771 /* 1772 * Lock the range we are going to synchronize. We don't want 1773 * race where someone writes between our read and write. 1774 */ 1775 for (;;) { 1776 mtx_lock(&range_lock); 1777 if (rangelock_islocked(range_regular, offset, length)) { 1778 pjdlog_debug(2, 1779 "sync: Range offset=%jd length=%jd locked.", 1780 (intmax_t)offset, (intmax_t)length); 1781 range_sync_wait = true; 1782 cv_wait(&range_sync_cond, &range_lock); 1783 range_sync_wait = false; 1784 mtx_unlock(&range_lock); 1785 continue; 1786 } 1787 if (rangelock_add(range_sync, offset, length) < 0) { 1788 mtx_unlock(&range_lock); 1789 pjdlog_debug(2, 1790 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1791 (intmax_t)offset, (intmax_t)length); 1792 sleep(1); 1793 continue; 1794 } 1795 mtx_unlock(&range_lock); 1796 break; 1797 } 1798 /* 1799 * First read the data from synchronization source. 1800 */ 1801 SYNCREQ(hio); 1802 ggio = &hio->hio_ggio; 1803 ggio->gctl_cmd = BIO_READ; 1804 ggio->gctl_offset = offset; 1805 ggio->gctl_length = length; 1806 ggio->gctl_error = 0; 1807 for (ii = 0; ii < ncomps; ii++) 1808 hio->hio_errors[ii] = EINVAL; 1809 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1810 hio); 1811 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1812 hio); 1813 mtx_lock(&metadata_lock); 1814 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1815 /* 1816 * This range is up-to-date on local component, 1817 * so handle request locally. 1818 */ 1819 /* Local component is 0 for now. */ 1820 ncomp = 0; 1821 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1822 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1823 /* 1824 * This range is out-of-date on local component, 1825 * so send request to the remote node. 1826 */ 1827 /* Remote component is 1 for now. */ 1828 ncomp = 1; 1829 } 1830 mtx_unlock(&metadata_lock); 1831 refcount_init(&hio->hio_countdown, 1); 1832 QUEUE_INSERT1(hio, send, ncomp); 1833 1834 /* 1835 * Let's wait for READ to finish. 1836 */ 1837 mtx_lock(&sync_lock); 1838 while (!ISSYNCREQDONE(hio)) 1839 cv_wait(&sync_cond, &sync_lock); 1840 mtx_unlock(&sync_lock); 1841 1842 if (hio->hio_errors[ncomp] != 0) { 1843 pjdlog_error("Unable to read synchronization data: %s.", 1844 strerror(hio->hio_errors[ncomp])); 1845 goto free_queue; 1846 } 1847 1848 /* 1849 * We read the data from synchronization source, now write it 1850 * to synchronization target. 1851 */ 1852 SYNCREQ(hio); 1853 ggio->gctl_cmd = BIO_WRITE; 1854 for (ii = 0; ii < ncomps; ii++) 1855 hio->hio_errors[ii] = EINVAL; 1856 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1857 hio); 1858 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1859 hio); 1860 mtx_lock(&metadata_lock); 1861 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1862 /* 1863 * This range is up-to-date on local component, 1864 * so we update remote component. 1865 */ 1866 /* Remote component is 1 for now. */ 1867 ncomp = 1; 1868 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1869 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1870 /* 1871 * This range is out-of-date on local component, 1872 * so we update it. 1873 */ 1874 /* Local component is 0 for now. */ 1875 ncomp = 0; 1876 } 1877 mtx_unlock(&metadata_lock); 1878 1879 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1880 hio); 1881 refcount_init(&hio->hio_countdown, 1); 1882 QUEUE_INSERT1(hio, send, ncomp); 1883 1884 /* 1885 * Let's wait for WRITE to finish. 1886 */ 1887 mtx_lock(&sync_lock); 1888 while (!ISSYNCREQDONE(hio)) 1889 cv_wait(&sync_cond, &sync_lock); 1890 mtx_unlock(&sync_lock); 1891 1892 if (hio->hio_errors[ncomp] != 0) { 1893 pjdlog_error("Unable to write synchronization data: %s.", 1894 strerror(hio->hio_errors[ncomp])); 1895 goto free_queue; 1896 } 1897 1898 synced += length; 1899free_queue: 1900 mtx_lock(&range_lock); 1901 rangelock_del(range_sync, offset, length); 1902 if (range_regular_wait) 1903 cv_signal(&range_regular_cond); 1904 mtx_unlock(&range_lock); 1905 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1906 hio); 1907 QUEUE_INSERT2(hio, free); 1908 } 1909 /* NOTREACHED */ 1910 return (NULL); 1911} 1912 1913void 1914primary_config_reload(struct hast_resource *res, struct nv *nv) 1915{ 1916 unsigned int ii, ncomps; 1917 int modified, vint; 1918 const char *vstr; 1919 1920 pjdlog_info("Reloading configuration..."); 1921 1922 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1923 PJDLOG_ASSERT(gres == res); 1924 nv_assert(nv, "remoteaddr"); 1925 nv_assert(nv, "replication"); 1926 nv_assert(nv, "checksum"); 1927 nv_assert(nv, "compression"); 1928 nv_assert(nv, "timeout"); 1929 nv_assert(nv, "exec"); 1930 1931 ncomps = HAST_NCOMPONENTS; 1932 1933#define MODIFIED_REMOTEADDR 0x01 1934#define MODIFIED_REPLICATION 0x02 1935#define MODIFIED_CHECKSUM 0x04 1936#define MODIFIED_COMPRESSION 0x08 1937#define MODIFIED_TIMEOUT 0x10 1938#define MODIFIED_EXEC 0x20 1939 modified = 0; 1940 1941 vstr = nv_get_string(nv, "remoteaddr"); 1942 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 1943 /* 1944 * Don't copy res->hr_remoteaddr to gres just yet. 1945 * We want remote_close() to log disconnect from the old 1946 * addresses, not from the new ones. 1947 */ 1948 modified |= MODIFIED_REMOTEADDR; 1949 } 1950 vint = nv_get_int32(nv, "replication"); 1951 if (gres->hr_replication != vint) { 1952 gres->hr_replication = vint; 1953 modified |= MODIFIED_REPLICATION; 1954 } 1955 vint = nv_get_int32(nv, "checksum"); 1956 if (gres->hr_checksum != vint) { 1957 gres->hr_checksum = vint; 1958 modified |= MODIFIED_CHECKSUM; 1959 } 1960 vint = nv_get_int32(nv, "compression"); 1961 if (gres->hr_compression != vint) { 1962 gres->hr_compression = vint; 1963 modified |= MODIFIED_COMPRESSION; 1964 } 1965 vint = nv_get_int32(nv, "timeout"); 1966 if (gres->hr_timeout != vint) { 1967 gres->hr_timeout = vint; 1968 modified |= MODIFIED_TIMEOUT; 1969 } 1970 vstr = nv_get_string(nv, "exec"); 1971 if (strcmp(gres->hr_exec, vstr) != 0) { 1972 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 1973 modified |= MODIFIED_EXEC; 1974 } 1975 1976 /* 1977 * Change timeout for connected sockets. 1978 * Don't bother if we need to reconnect. 1979 */ 1980 if ((modified & MODIFIED_TIMEOUT) != 0 && 1981 (modified & (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) == 0) { 1982 for (ii = 0; ii < ncomps; ii++) { 1983 if (!ISREMOTE(ii)) 1984 continue; 1985 rw_rlock(&hio_remote_lock[ii]); 1986 if (!ISCONNECTED(gres, ii)) { 1987 rw_unlock(&hio_remote_lock[ii]); 1988 continue; 1989 } 1990 rw_unlock(&hio_remote_lock[ii]); 1991 if (proto_timeout(gres->hr_remotein, 1992 gres->hr_timeout) < 0) { 1993 pjdlog_errno(LOG_WARNING, 1994 "Unable to set connection timeout"); 1995 } 1996 if (proto_timeout(gres->hr_remoteout, 1997 gres->hr_timeout) < 0) { 1998 pjdlog_errno(LOG_WARNING, 1999 "Unable to set connection timeout"); 2000 } 2001 } 2002 } 2003 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 2004 for (ii = 0; ii < ncomps; ii++) { 2005 if (!ISREMOTE(ii)) 2006 continue; 2007 remote_close(gres, ii); 2008 } 2009 if (modified & MODIFIED_REMOTEADDR) { 2010 vstr = nv_get_string(nv, "remoteaddr"); 2011 strlcpy(gres->hr_remoteaddr, vstr, 2012 sizeof(gres->hr_remoteaddr)); 2013 } 2014 } 2015#undef MODIFIED_REMOTEADDR 2016#undef MODIFIED_REPLICATION 2017#undef MODIFIED_CHECKSUM 2018#undef MODIFIED_COMPRESSION 2019#undef MODIFIED_TIMEOUT 2020#undef MODIFIED_EXEC 2021 2022 pjdlog_info("Configuration reloaded successfully."); 2023} 2024 2025static void 2026guard_one(struct hast_resource *res, unsigned int ncomp) 2027{ 2028 struct proto_conn *in, *out; 2029 2030 if (!ISREMOTE(ncomp)) 2031 return; 2032 2033 rw_rlock(&hio_remote_lock[ncomp]); 2034 2035 if (!real_remote(res)) { 2036 rw_unlock(&hio_remote_lock[ncomp]); 2037 return; 2038 } 2039 2040 if (ISCONNECTED(res, ncomp)) { 2041 PJDLOG_ASSERT(res->hr_remotein != NULL); 2042 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2043 rw_unlock(&hio_remote_lock[ncomp]); 2044 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2045 res->hr_remoteaddr); 2046 return; 2047 } 2048 2049 PJDLOG_ASSERT(res->hr_remotein == NULL); 2050 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2051 /* 2052 * Upgrade the lock. It doesn't have to be atomic as no other thread 2053 * can change connection status from disconnected to connected. 2054 */ 2055 rw_unlock(&hio_remote_lock[ncomp]); 2056 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2057 res->hr_remoteaddr); 2058 in = out = NULL; 2059 if (init_remote(res, &in, &out)) { 2060 rw_wlock(&hio_remote_lock[ncomp]); 2061 PJDLOG_ASSERT(res->hr_remotein == NULL); 2062 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2063 PJDLOG_ASSERT(in != NULL && out != NULL); 2064 res->hr_remotein = in; 2065 res->hr_remoteout = out; 2066 rw_unlock(&hio_remote_lock[ncomp]); 2067 pjdlog_info("Successfully reconnected to %s.", 2068 res->hr_remoteaddr); 2069 sync_start(); 2070 } else { 2071 /* Both connections should be NULL. */ 2072 PJDLOG_ASSERT(res->hr_remotein == NULL); 2073 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2074 PJDLOG_ASSERT(in == NULL && out == NULL); 2075 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2076 res->hr_remoteaddr); 2077 } 2078} 2079 2080/* 2081 * Thread guards remote connections and reconnects when needed, handles 2082 * signals, etc. 2083 */ 2084static void * 2085guard_thread(void *arg) 2086{ 2087 struct hast_resource *res = arg; 2088 unsigned int ii, ncomps; 2089 struct timespec timeout; 2090 time_t lastcheck, now; 2091 sigset_t mask; 2092 int signo; 2093 2094 ncomps = HAST_NCOMPONENTS; 2095 lastcheck = time(NULL); 2096 2097 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2098 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2099 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2100 2101 timeout.tv_sec = RETRY_SLEEP; 2102 timeout.tv_nsec = 0; 2103 signo = -1; 2104 2105 for (;;) { 2106 switch (signo) { 2107 case SIGINT: 2108 case SIGTERM: 2109 sigexit_received = true; 2110 primary_exitx(EX_OK, 2111 "Termination signal received, exiting."); 2112 break; 2113 default: 2114 break; 2115 } 2116 2117 pjdlog_debug(2, "remote_guard: Checking connections."); 2118 now = time(NULL); 2119 if (lastcheck + RETRY_SLEEP <= now) { 2120 for (ii = 0; ii < ncomps; ii++) 2121 guard_one(res, ii); 2122 lastcheck = now; 2123 } 2124 signo = sigtimedwait(&mask, NULL, &timeout); 2125 } 2126 /* NOTREACHED */ 2127 return (NULL); 2128} 2129