primary.c revision 226851
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 226851 2011-10-27 18:45:01Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <signal.h> 49#include <stdint.h> 50#include <stdio.h> 51#include <string.h> 52#include <sysexits.h> 53#include <unistd.h> 54 55#include <activemap.h> 56#include <nv.h> 57#include <rangelock.h> 58 59#include "control.h" 60#include "event.h" 61#include "hast.h" 62#include "hast_proto.h" 63#include "hastd.h" 64#include "hooks.h" 65#include "metadata.h" 66#include "proto.h" 67#include "pjdlog.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 TAILQ_ENTRY(hio) *hio_next; 93}; 94#define hio_free_next hio_next[0] 95#define hio_done_next hio_next[0] 96 97/* 98 * Free list holds unused structures. When free list is empty, we have to wait 99 * until some in-progress requests are freed. 100 */ 101static TAILQ_HEAD(, hio) hio_free_list; 102static pthread_mutex_t hio_free_list_lock; 103static pthread_cond_t hio_free_list_cond; 104/* 105 * There is one send list for every component. One requests is placed on all 106 * send lists - each component gets the same request, but each component is 107 * responsible for managing his own send list. 108 */ 109static TAILQ_HEAD(, hio) *hio_send_list; 110static pthread_mutex_t *hio_send_list_lock; 111static pthread_cond_t *hio_send_list_cond; 112/* 113 * There is one recv list for every component, although local components don't 114 * use recv lists as local requests are done synchronously. 115 */ 116static TAILQ_HEAD(, hio) *hio_recv_list; 117static pthread_mutex_t *hio_recv_list_lock; 118static pthread_cond_t *hio_recv_list_cond; 119/* 120 * Request is placed on done list by the slowest component (the one that 121 * decreased hio_countdown from 1 to 0). 122 */ 123static TAILQ_HEAD(, hio) hio_done_list; 124static pthread_mutex_t hio_done_list_lock; 125static pthread_cond_t hio_done_list_cond; 126/* 127 * Structure below are for interaction with sync thread. 128 */ 129static bool sync_inprogress; 130static pthread_mutex_t sync_lock; 131static pthread_cond_t sync_cond; 132/* 133 * The lock below allows to synchornize access to remote connections. 134 */ 135static pthread_rwlock_t *hio_remote_lock; 136 137/* 138 * Lock to synchronize metadata updates. Also synchronize access to 139 * hr_primary_localcnt and hr_primary_remotecnt fields. 140 */ 141static pthread_mutex_t metadata_lock; 142 143/* 144 * Maximum number of outstanding I/O requests. 145 */ 146#define HAST_HIO_MAX 256 147/* 148 * Number of components. At this point there are only two components: local 149 * and remote, but in the future it might be possible to use multiple local 150 * and remote components. 151 */ 152#define HAST_NCOMPONENTS 2 153 154#define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167} while (0) 168#define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177} while (0) 178#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 179 bool _last; \ 180 \ 181 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 182 _last = false; \ 183 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 184 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 185 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 186 if ((timeout) != 0) \ 187 _last = true; \ 188 } \ 189 if (hio != NULL) { \ 190 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 191 hio_next[(ncomp)]); \ 192 } \ 193 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 194} while (0) 195#define QUEUE_TAKE2(hio, name) do { \ 196 mtx_lock(&hio_##name##_list_lock); \ 197 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 198 cv_wait(&hio_##name##_list_cond, \ 199 &hio_##name##_list_lock); \ 200 } \ 201 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 202 mtx_unlock(&hio_##name##_list_lock); \ 203} while (0) 204 205#define SYNCREQ(hio) do { \ 206 (hio)->hio_ggio.gctl_unit = -1; \ 207 (hio)->hio_ggio.gctl_seq = 1; \ 208} while (0) 209#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 210#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 211#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 212 213static struct hast_resource *gres; 214 215static pthread_mutex_t range_lock; 216static struct rangelocks *range_regular; 217static bool range_regular_wait; 218static pthread_cond_t range_regular_cond; 219static struct rangelocks *range_sync; 220static bool range_sync_wait; 221static pthread_cond_t range_sync_cond; 222static bool fullystarted; 223 224static void *ggate_recv_thread(void *arg); 225static void *local_send_thread(void *arg); 226static void *remote_send_thread(void *arg); 227static void *remote_recv_thread(void *arg); 228static void *ggate_send_thread(void *arg); 229static void *sync_thread(void *arg); 230static void *guard_thread(void *arg); 231 232static void 233cleanup(struct hast_resource *res) 234{ 235 int rerrno; 236 237 /* Remember errno. */ 238 rerrno = errno; 239 240 /* Destroy ggate provider if we created one. */ 241 if (res->hr_ggateunit >= 0) { 242 struct g_gate_ctl_destroy ggiod; 243 244 bzero(&ggiod, sizeof(ggiod)); 245 ggiod.gctl_version = G_GATE_VERSION; 246 ggiod.gctl_unit = res->hr_ggateunit; 247 ggiod.gctl_force = 1; 248 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 249 pjdlog_errno(LOG_WARNING, 250 "Unable to destroy hast/%s device", 251 res->hr_provname); 252 } 253 res->hr_ggateunit = -1; 254 } 255 256 /* Restore errno. */ 257 errno = rerrno; 258} 259 260static __dead2 void 261primary_exit(int exitcode, const char *fmt, ...) 262{ 263 va_list ap; 264 265 PJDLOG_ASSERT(exitcode != EX_OK); 266 va_start(ap, fmt); 267 pjdlogv_errno(LOG_ERR, fmt, ap); 268 va_end(ap); 269 cleanup(gres); 270 exit(exitcode); 271} 272 273static __dead2 void 274primary_exitx(int exitcode, const char *fmt, ...) 275{ 276 va_list ap; 277 278 va_start(ap, fmt); 279 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 280 va_end(ap); 281 cleanup(gres); 282 exit(exitcode); 283} 284 285static int 286hast_activemap_flush(struct hast_resource *res) 287{ 288 const unsigned char *buf; 289 size_t size; 290 291 buf = activemap_bitmap(res->hr_amp, &size); 292 PJDLOG_ASSERT(buf != NULL); 293 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 294 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 295 (ssize_t)size) { 296 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 297 return (-1); 298 } 299 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 300 if (errno == EOPNOTSUPP) { 301 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 302 res->hr_localpath); 303 res->hr_metaflush = 0; 304 } else { 305 pjdlog_errno(LOG_ERR, 306 "Unable to flush disk cache on activemap update"); 307 return (-1); 308 } 309 } 310 return (0); 311} 312 313static bool 314real_remote(const struct hast_resource *res) 315{ 316 317 return (strcmp(res->hr_remoteaddr, "none") != 0); 318} 319 320static void 321init_environment(struct hast_resource *res __unused) 322{ 323 struct hio *hio; 324 unsigned int ii, ncomps; 325 326 /* 327 * In the future it might be per-resource value. 328 */ 329 ncomps = HAST_NCOMPONENTS; 330 331 /* 332 * Allocate memory needed by lists. 333 */ 334 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 335 if (hio_send_list == NULL) { 336 primary_exitx(EX_TEMPFAIL, 337 "Unable to allocate %zu bytes of memory for send lists.", 338 sizeof(hio_send_list[0]) * ncomps); 339 } 340 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 341 if (hio_send_list_lock == NULL) { 342 primary_exitx(EX_TEMPFAIL, 343 "Unable to allocate %zu bytes of memory for send list locks.", 344 sizeof(hio_send_list_lock[0]) * ncomps); 345 } 346 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 347 if (hio_send_list_cond == NULL) { 348 primary_exitx(EX_TEMPFAIL, 349 "Unable to allocate %zu bytes of memory for send list condition variables.", 350 sizeof(hio_send_list_cond[0]) * ncomps); 351 } 352 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 353 if (hio_recv_list == NULL) { 354 primary_exitx(EX_TEMPFAIL, 355 "Unable to allocate %zu bytes of memory for recv lists.", 356 sizeof(hio_recv_list[0]) * ncomps); 357 } 358 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 359 if (hio_recv_list_lock == NULL) { 360 primary_exitx(EX_TEMPFAIL, 361 "Unable to allocate %zu bytes of memory for recv list locks.", 362 sizeof(hio_recv_list_lock[0]) * ncomps); 363 } 364 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 365 if (hio_recv_list_cond == NULL) { 366 primary_exitx(EX_TEMPFAIL, 367 "Unable to allocate %zu bytes of memory for recv list condition variables.", 368 sizeof(hio_recv_list_cond[0]) * ncomps); 369 } 370 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 371 if (hio_remote_lock == NULL) { 372 primary_exitx(EX_TEMPFAIL, 373 "Unable to allocate %zu bytes of memory for remote connections locks.", 374 sizeof(hio_remote_lock[0]) * ncomps); 375 } 376 377 /* 378 * Initialize lists, their locks and theirs condition variables. 379 */ 380 TAILQ_INIT(&hio_free_list); 381 mtx_init(&hio_free_list_lock); 382 cv_init(&hio_free_list_cond); 383 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 384 TAILQ_INIT(&hio_send_list[ii]); 385 mtx_init(&hio_send_list_lock[ii]); 386 cv_init(&hio_send_list_cond[ii]); 387 TAILQ_INIT(&hio_recv_list[ii]); 388 mtx_init(&hio_recv_list_lock[ii]); 389 cv_init(&hio_recv_list_cond[ii]); 390 rw_init(&hio_remote_lock[ii]); 391 } 392 TAILQ_INIT(&hio_done_list); 393 mtx_init(&hio_done_list_lock); 394 cv_init(&hio_done_list_cond); 395 mtx_init(&metadata_lock); 396 397 /* 398 * Allocate requests pool and initialize requests. 399 */ 400 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 401 hio = malloc(sizeof(*hio)); 402 if (hio == NULL) { 403 primary_exitx(EX_TEMPFAIL, 404 "Unable to allocate %zu bytes of memory for hio request.", 405 sizeof(*hio)); 406 } 407 hio->hio_countdown = 0; 408 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 409 if (hio->hio_errors == NULL) { 410 primary_exitx(EX_TEMPFAIL, 411 "Unable allocate %zu bytes of memory for hio errors.", 412 sizeof(hio->hio_errors[0]) * ncomps); 413 } 414 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 415 if (hio->hio_next == NULL) { 416 primary_exitx(EX_TEMPFAIL, 417 "Unable allocate %zu bytes of memory for hio_next field.", 418 sizeof(hio->hio_next[0]) * ncomps); 419 } 420 hio->hio_ggio.gctl_version = G_GATE_VERSION; 421 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 422 if (hio->hio_ggio.gctl_data == NULL) { 423 primary_exitx(EX_TEMPFAIL, 424 "Unable to allocate %zu bytes of memory for gctl_data.", 425 MAXPHYS); 426 } 427 hio->hio_ggio.gctl_length = MAXPHYS; 428 hio->hio_ggio.gctl_error = 0; 429 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 430 } 431} 432 433static bool 434init_resuid(struct hast_resource *res) 435{ 436 437 mtx_lock(&metadata_lock); 438 if (res->hr_resuid != 0) { 439 mtx_unlock(&metadata_lock); 440 return (false); 441 } else { 442 /* Initialize unique resource identifier. */ 443 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 444 mtx_unlock(&metadata_lock); 445 if (metadata_write(res) < 0) 446 exit(EX_NOINPUT); 447 return (true); 448 } 449} 450 451static void 452init_local(struct hast_resource *res) 453{ 454 unsigned char *buf; 455 size_t mapsize; 456 457 if (metadata_read(res, true) < 0) 458 exit(EX_NOINPUT); 459 mtx_init(&res->hr_amp_lock); 460 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 461 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 462 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 463 } 464 mtx_init(&range_lock); 465 cv_init(&range_regular_cond); 466 if (rangelock_init(&range_regular) < 0) 467 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 468 cv_init(&range_sync_cond); 469 if (rangelock_init(&range_sync) < 0) 470 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 471 mapsize = activemap_ondisk_size(res->hr_amp); 472 buf = calloc(1, mapsize); 473 if (buf == NULL) { 474 primary_exitx(EX_TEMPFAIL, 475 "Unable to allocate buffer for activemap."); 476 } 477 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 478 (ssize_t)mapsize) { 479 primary_exit(EX_NOINPUT, "Unable to read activemap"); 480 } 481 activemap_copyin(res->hr_amp, buf, mapsize); 482 free(buf); 483 if (res->hr_resuid != 0) 484 return; 485 /* 486 * We're using provider for the first time. Initialize local and remote 487 * counters. We don't initialize resuid here, as we want to do it just 488 * in time. The reason for this is that we want to inform secondary 489 * that there were no writes yet, so there is no need to synchronize 490 * anything. 491 */ 492 res->hr_primary_localcnt = 0; 493 res->hr_primary_remotecnt = 0; 494 if (metadata_write(res) < 0) 495 exit(EX_NOINPUT); 496} 497 498static int 499primary_connect(struct hast_resource *res, struct proto_conn **connp) 500{ 501 struct proto_conn *conn; 502 int16_t val; 503 504 val = 1; 505 if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 506 primary_exit(EX_TEMPFAIL, 507 "Unable to send connection request to parent"); 508 } 509 if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 510 primary_exit(EX_TEMPFAIL, 511 "Unable to receive reply to connection request from parent"); 512 } 513 if (val != 0) { 514 errno = val; 515 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 516 res->hr_remoteaddr); 517 return (-1); 518 } 519 if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 520 primary_exit(EX_TEMPFAIL, 521 "Unable to receive connection from parent"); 522 } 523 if (proto_connect_wait(conn, res->hr_timeout) < 0) { 524 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 525 res->hr_remoteaddr); 526 proto_close(conn); 527 return (-1); 528 } 529 /* Error in setting timeout is not critical, but why should it fail? */ 530 if (proto_timeout(conn, res->hr_timeout) < 0) 531 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 532 533 *connp = conn; 534 535 return (0); 536} 537 538static int 539init_remote(struct hast_resource *res, struct proto_conn **inp, 540 struct proto_conn **outp) 541{ 542 struct proto_conn *in, *out; 543 struct nv *nvout, *nvin; 544 const unsigned char *token; 545 unsigned char *map; 546 const char *errmsg; 547 int32_t extentsize; 548 int64_t datasize; 549 uint32_t mapsize; 550 size_t size; 551 int error; 552 553 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 554 PJDLOG_ASSERT(real_remote(res)); 555 556 in = out = NULL; 557 errmsg = NULL; 558 559 if (primary_connect(res, &out) == -1) 560 return (ECONNREFUSED); 561 562 error = ECONNABORTED; 563 564 /* 565 * First handshake step. 566 * Setup outgoing connection with remote node. 567 */ 568 nvout = nv_alloc(); 569 nv_add_string(nvout, res->hr_name, "resource"); 570 if (nv_error(nvout) != 0) { 571 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 572 "Unable to allocate header for connection with %s", 573 res->hr_remoteaddr); 574 nv_free(nvout); 575 goto close; 576 } 577 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 578 pjdlog_errno(LOG_WARNING, 579 "Unable to send handshake header to %s", 580 res->hr_remoteaddr); 581 nv_free(nvout); 582 goto close; 583 } 584 nv_free(nvout); 585 if (hast_proto_recv_hdr(out, &nvin) < 0) { 586 pjdlog_errno(LOG_WARNING, 587 "Unable to receive handshake header from %s", 588 res->hr_remoteaddr); 589 goto close; 590 } 591 errmsg = nv_get_string(nvin, "errmsg"); 592 if (errmsg != NULL) { 593 pjdlog_warning("%s", errmsg); 594 if (nv_exists(nvin, "wait")) 595 error = EBUSY; 596 nv_free(nvin); 597 goto close; 598 } 599 token = nv_get_uint8_array(nvin, &size, "token"); 600 if (token == NULL) { 601 pjdlog_warning("Handshake header from %s has no 'token' field.", 602 res->hr_remoteaddr); 603 nv_free(nvin); 604 goto close; 605 } 606 if (size != sizeof(res->hr_token)) { 607 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 608 res->hr_remoteaddr, size, sizeof(res->hr_token)); 609 nv_free(nvin); 610 goto close; 611 } 612 bcopy(token, res->hr_token, sizeof(res->hr_token)); 613 nv_free(nvin); 614 615 /* 616 * Second handshake step. 617 * Setup incoming connection with remote node. 618 */ 619 if (primary_connect(res, &in) == -1) 620 goto close; 621 622 nvout = nv_alloc(); 623 nv_add_string(nvout, res->hr_name, "resource"); 624 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 625 "token"); 626 if (res->hr_resuid == 0) { 627 /* 628 * The resuid field was not yet initialized. 629 * Because we do synchronization inside init_resuid(), it is 630 * possible that someone already initialized it, the function 631 * will return false then, but if we successfully initialized 632 * it, we will get true. True means that there were no writes 633 * to this resource yet and we want to inform secondary that 634 * synchronization is not needed by sending "virgin" argument. 635 */ 636 if (init_resuid(res)) 637 nv_add_int8(nvout, 1, "virgin"); 638 } 639 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 640 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 641 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 642 if (nv_error(nvout) != 0) { 643 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 644 "Unable to allocate header for connection with %s", 645 res->hr_remoteaddr); 646 nv_free(nvout); 647 goto close; 648 } 649 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 650 pjdlog_errno(LOG_WARNING, 651 "Unable to send handshake header to %s", 652 res->hr_remoteaddr); 653 nv_free(nvout); 654 goto close; 655 } 656 nv_free(nvout); 657 if (hast_proto_recv_hdr(out, &nvin) < 0) { 658 pjdlog_errno(LOG_WARNING, 659 "Unable to receive handshake header from %s", 660 res->hr_remoteaddr); 661 goto close; 662 } 663 errmsg = nv_get_string(nvin, "errmsg"); 664 if (errmsg != NULL) { 665 pjdlog_warning("%s", errmsg); 666 nv_free(nvin); 667 goto close; 668 } 669 datasize = nv_get_int64(nvin, "datasize"); 670 if (datasize != res->hr_datasize) { 671 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 672 (intmax_t)res->hr_datasize, (intmax_t)datasize); 673 nv_free(nvin); 674 goto close; 675 } 676 extentsize = nv_get_int32(nvin, "extentsize"); 677 if (extentsize != res->hr_extentsize) { 678 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 679 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 680 nv_free(nvin); 681 goto close; 682 } 683 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 684 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 685 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 686 if (nv_exists(nvin, "virgin")) { 687 /* 688 * Secondary was reinitialized, bump localcnt if it is 0 as 689 * only we have the data. 690 */ 691 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 692 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 693 694 if (res->hr_primary_localcnt == 0) { 695 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 696 697 mtx_lock(&metadata_lock); 698 res->hr_primary_localcnt++; 699 pjdlog_debug(1, "Increasing localcnt to %ju.", 700 (uintmax_t)res->hr_primary_localcnt); 701 (void)metadata_write(res); 702 mtx_unlock(&metadata_lock); 703 } 704 } 705 map = NULL; 706 mapsize = nv_get_uint32(nvin, "mapsize"); 707 if (mapsize > 0) { 708 map = malloc(mapsize); 709 if (map == NULL) { 710 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 711 (uintmax_t)mapsize); 712 nv_free(nvin); 713 goto close; 714 } 715 /* 716 * Remote node have some dirty extents on its own, lets 717 * download its activemap. 718 */ 719 if (hast_proto_recv_data(res, out, nvin, map, 720 mapsize) < 0) { 721 pjdlog_errno(LOG_ERR, 722 "Unable to receive remote activemap"); 723 nv_free(nvin); 724 free(map); 725 goto close; 726 } 727 /* 728 * Merge local and remote bitmaps. 729 */ 730 activemap_merge(res->hr_amp, map, mapsize); 731 free(map); 732 /* 733 * Now that we merged bitmaps from both nodes, flush it to the 734 * disk before we start to synchronize. 735 */ 736 (void)hast_activemap_flush(res); 737 } 738 nv_free(nvin); 739#ifdef notyet 740 /* Setup directions. */ 741 if (proto_send(out, NULL, 0) == -1) 742 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 743 if (proto_recv(in, NULL, 0) == -1) 744 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 745#endif 746 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 747 if (inp != NULL && outp != NULL) { 748 *inp = in; 749 *outp = out; 750 } else { 751 res->hr_remotein = in; 752 res->hr_remoteout = out; 753 } 754 event_send(res, EVENT_CONNECT); 755 return (0); 756close: 757 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 758 event_send(res, EVENT_SPLITBRAIN); 759 proto_close(out); 760 if (in != NULL) 761 proto_close(in); 762 return (error); 763} 764 765static void 766sync_start(void) 767{ 768 769 mtx_lock(&sync_lock); 770 sync_inprogress = true; 771 mtx_unlock(&sync_lock); 772 cv_signal(&sync_cond); 773} 774 775static void 776sync_stop(void) 777{ 778 779 mtx_lock(&sync_lock); 780 if (sync_inprogress) 781 sync_inprogress = false; 782 mtx_unlock(&sync_lock); 783} 784 785static void 786init_ggate(struct hast_resource *res) 787{ 788 struct g_gate_ctl_create ggiocreate; 789 struct g_gate_ctl_cancel ggiocancel; 790 791 /* 792 * We communicate with ggate via /dev/ggctl. Open it. 793 */ 794 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 795 if (res->hr_ggatefd < 0) 796 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 797 /* 798 * Create provider before trying to connect, as connection failure 799 * is not critical, but may take some time. 800 */ 801 bzero(&ggiocreate, sizeof(ggiocreate)); 802 ggiocreate.gctl_version = G_GATE_VERSION; 803 ggiocreate.gctl_mediasize = res->hr_datasize; 804 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 805 ggiocreate.gctl_flags = 0; 806 ggiocreate.gctl_maxcount = 0; 807 ggiocreate.gctl_timeout = 0; 808 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 809 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 810 res->hr_provname); 811 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 812 pjdlog_info("Device hast/%s created.", res->hr_provname); 813 res->hr_ggateunit = ggiocreate.gctl_unit; 814 return; 815 } 816 if (errno != EEXIST) { 817 primary_exit(EX_OSERR, "Unable to create hast/%s device", 818 res->hr_provname); 819 } 820 pjdlog_debug(1, 821 "Device hast/%s already exists, we will try to take it over.", 822 res->hr_provname); 823 /* 824 * If we received EEXIST, we assume that the process who created the 825 * provider died and didn't clean up. In that case we will start from 826 * where he left of. 827 */ 828 bzero(&ggiocancel, sizeof(ggiocancel)); 829 ggiocancel.gctl_version = G_GATE_VERSION; 830 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 831 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 832 res->hr_provname); 833 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 834 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 835 res->hr_ggateunit = ggiocancel.gctl_unit; 836 return; 837 } 838 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 839 res->hr_provname); 840} 841 842void 843hastd_primary(struct hast_resource *res) 844{ 845 pthread_t td; 846 pid_t pid; 847 int error, mode, debuglevel; 848 849 /* 850 * Create communication channel for sending control commands from 851 * parent to child. 852 */ 853 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 854 /* TODO: There's no need for this to be fatal error. */ 855 KEEP_ERRNO((void)pidfile_remove(pfh)); 856 pjdlog_exit(EX_OSERR, 857 "Unable to create control sockets between parent and child"); 858 } 859 /* 860 * Create communication channel for sending events from child to parent. 861 */ 862 if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 863 /* TODO: There's no need for this to be fatal error. */ 864 KEEP_ERRNO((void)pidfile_remove(pfh)); 865 pjdlog_exit(EX_OSERR, 866 "Unable to create event sockets between child and parent"); 867 } 868 /* 869 * Create communication channel for sending connection requests from 870 * child to parent. 871 */ 872 if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 873 /* TODO: There's no need for this to be fatal error. */ 874 KEEP_ERRNO((void)pidfile_remove(pfh)); 875 pjdlog_exit(EX_OSERR, 876 "Unable to create connection sockets between child and parent"); 877 } 878 879 pid = fork(); 880 if (pid < 0) { 881 /* TODO: There's no need for this to be fatal error. */ 882 KEEP_ERRNO((void)pidfile_remove(pfh)); 883 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 884 } 885 886 if (pid > 0) { 887 /* This is parent. */ 888 /* Declare that we are receiver. */ 889 proto_recv(res->hr_event, NULL, 0); 890 proto_recv(res->hr_conn, NULL, 0); 891 /* Declare that we are sender. */ 892 proto_send(res->hr_ctrl, NULL, 0); 893 res->hr_workerpid = pid; 894 return; 895 } 896 897 gres = res; 898 mode = pjdlog_mode_get(); 899 debuglevel = pjdlog_debug_get(); 900 901 /* Declare that we are sender. */ 902 proto_send(res->hr_event, NULL, 0); 903 proto_send(res->hr_conn, NULL, 0); 904 /* Declare that we are receiver. */ 905 proto_recv(res->hr_ctrl, NULL, 0); 906 descriptors_cleanup(res); 907 908 descriptors_assert(res, mode); 909 910 pjdlog_init(mode); 911 pjdlog_debug_set(debuglevel); 912 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 913 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 914 915 init_local(res); 916 init_ggate(res); 917 init_environment(res); 918 919 if (drop_privs(res) != 0) { 920 cleanup(res); 921 exit(EX_CONFIG); 922 } 923 pjdlog_info("Privileges successfully dropped."); 924 925 /* 926 * Create the guard thread first, so we can handle signals from the 927 * very begining. 928 */ 929 error = pthread_create(&td, NULL, guard_thread, res); 930 PJDLOG_ASSERT(error == 0); 931 /* 932 * Create the control thread before sending any event to the parent, 933 * as we can deadlock when parent sends control request to worker, 934 * but worker has no control thread started yet, so parent waits. 935 * In the meantime worker sends an event to the parent, but parent 936 * is unable to handle the event, because it waits for control 937 * request response. 938 */ 939 error = pthread_create(&td, NULL, ctrl_thread, res); 940 PJDLOG_ASSERT(error == 0); 941 if (real_remote(res)) { 942 error = init_remote(res, NULL, NULL); 943 if (error == 0) { 944 sync_start(); 945 } else if (error == EBUSY) { 946 time_t start = time(NULL); 947 948 pjdlog_warning("Waiting for remote node to become %s for %ds.", 949 role2str(HAST_ROLE_SECONDARY), 950 res->hr_timeout); 951 for (;;) { 952 sleep(1); 953 error = init_remote(res, NULL, NULL); 954 if (error != EBUSY) 955 break; 956 if (time(NULL) > start + res->hr_timeout) 957 break; 958 } 959 if (error == EBUSY) { 960 pjdlog_warning("Remote node is still %s, starting anyway.", 961 role2str(HAST_ROLE_PRIMARY)); 962 } 963 } 964 } 965 error = pthread_create(&td, NULL, ggate_recv_thread, res); 966 PJDLOG_ASSERT(error == 0); 967 error = pthread_create(&td, NULL, local_send_thread, res); 968 PJDLOG_ASSERT(error == 0); 969 error = pthread_create(&td, NULL, remote_send_thread, res); 970 PJDLOG_ASSERT(error == 0); 971 error = pthread_create(&td, NULL, remote_recv_thread, res); 972 PJDLOG_ASSERT(error == 0); 973 error = pthread_create(&td, NULL, ggate_send_thread, res); 974 PJDLOG_ASSERT(error == 0); 975 fullystarted = true; 976 (void)sync_thread(res); 977} 978 979static void 980reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 981{ 982 char msg[1024]; 983 va_list ap; 984 int len; 985 986 va_start(ap, fmt); 987 len = vsnprintf(msg, sizeof(msg), fmt, ap); 988 va_end(ap); 989 if ((size_t)len < sizeof(msg)) { 990 switch (ggio->gctl_cmd) { 991 case BIO_READ: 992 (void)snprintf(msg + len, sizeof(msg) - len, 993 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 994 (uintmax_t)ggio->gctl_length); 995 break; 996 case BIO_DELETE: 997 (void)snprintf(msg + len, sizeof(msg) - len, 998 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 999 (uintmax_t)ggio->gctl_length); 1000 break; 1001 case BIO_FLUSH: 1002 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 1003 break; 1004 case BIO_WRITE: 1005 (void)snprintf(msg + len, sizeof(msg) - len, 1006 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 1007 (uintmax_t)ggio->gctl_length); 1008 break; 1009 default: 1010 (void)snprintf(msg + len, sizeof(msg) - len, 1011 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 1012 break; 1013 } 1014 } 1015 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1016} 1017 1018static void 1019remote_close(struct hast_resource *res, int ncomp) 1020{ 1021 1022 rw_wlock(&hio_remote_lock[ncomp]); 1023 /* 1024 * A race is possible between dropping rlock and acquiring wlock - 1025 * another thread can close connection in-between. 1026 */ 1027 if (!ISCONNECTED(res, ncomp)) { 1028 PJDLOG_ASSERT(res->hr_remotein == NULL); 1029 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1030 rw_unlock(&hio_remote_lock[ncomp]); 1031 return; 1032 } 1033 1034 PJDLOG_ASSERT(res->hr_remotein != NULL); 1035 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1036 1037 pjdlog_debug(2, "Closing incoming connection to %s.", 1038 res->hr_remoteaddr); 1039 proto_close(res->hr_remotein); 1040 res->hr_remotein = NULL; 1041 pjdlog_debug(2, "Closing outgoing connection to %s.", 1042 res->hr_remoteaddr); 1043 proto_close(res->hr_remoteout); 1044 res->hr_remoteout = NULL; 1045 1046 rw_unlock(&hio_remote_lock[ncomp]); 1047 1048 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1049 1050 /* 1051 * Stop synchronization if in-progress. 1052 */ 1053 sync_stop(); 1054 1055 event_send(res, EVENT_DISCONNECT); 1056} 1057 1058/* 1059 * Thread receives ggate I/O requests from the kernel and passes them to 1060 * appropriate threads: 1061 * WRITE - always goes to both local_send and remote_send threads 1062 * READ (when the block is up-to-date on local component) - 1063 * only local_send thread 1064 * READ (when the block isn't up-to-date on local component) - 1065 * only remote_send thread 1066 * DELETE - always goes to both local_send and remote_send threads 1067 * FLUSH - always goes to both local_send and remote_send threads 1068 */ 1069static void * 1070ggate_recv_thread(void *arg) 1071{ 1072 struct hast_resource *res = arg; 1073 struct g_gate_ctl_io *ggio; 1074 struct hio *hio; 1075 unsigned int ii, ncomp, ncomps; 1076 int error; 1077 1078 ncomps = HAST_NCOMPONENTS; 1079 1080 for (;;) { 1081 pjdlog_debug(2, "ggate_recv: Taking free request."); 1082 QUEUE_TAKE2(hio, free); 1083 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1084 ggio = &hio->hio_ggio; 1085 ggio->gctl_unit = res->hr_ggateunit; 1086 ggio->gctl_length = MAXPHYS; 1087 ggio->gctl_error = 0; 1088 pjdlog_debug(2, 1089 "ggate_recv: (%p) Waiting for request from the kernel.", 1090 hio); 1091 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1092 if (sigexit_received) 1093 pthread_exit(NULL); 1094 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1095 } 1096 error = ggio->gctl_error; 1097 switch (error) { 1098 case 0: 1099 break; 1100 case ECANCELED: 1101 /* Exit gracefully. */ 1102 if (!sigexit_received) { 1103 pjdlog_debug(2, 1104 "ggate_recv: (%p) Received cancel from the kernel.", 1105 hio); 1106 pjdlog_info("Received cancel from the kernel, exiting."); 1107 } 1108 pthread_exit(NULL); 1109 case ENOMEM: 1110 /* 1111 * Buffer too small? Impossible, we allocate MAXPHYS 1112 * bytes - request can't be bigger than that. 1113 */ 1114 /* FALLTHROUGH */ 1115 case ENXIO: 1116 default: 1117 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1118 strerror(error)); 1119 } 1120 for (ii = 0; ii < ncomps; ii++) 1121 hio->hio_errors[ii] = EINVAL; 1122 reqlog(LOG_DEBUG, 2, ggio, 1123 "ggate_recv: (%p) Request received from the kernel: ", 1124 hio); 1125 /* 1126 * Inform all components about new write request. 1127 * For read request prefer local component unless the given 1128 * range is out-of-date, then use remote component. 1129 */ 1130 switch (ggio->gctl_cmd) { 1131 case BIO_READ: 1132 res->hr_stat_read++; 1133 pjdlog_debug(2, 1134 "ggate_recv: (%p) Moving request to the send queue.", 1135 hio); 1136 refcount_init(&hio->hio_countdown, 1); 1137 mtx_lock(&metadata_lock); 1138 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1139 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1140 /* 1141 * This range is up-to-date on local component, 1142 * so handle request locally. 1143 */ 1144 /* Local component is 0 for now. */ 1145 ncomp = 0; 1146 } else /* if (res->hr_syncsrc == 1147 HAST_SYNCSRC_SECONDARY) */ { 1148 PJDLOG_ASSERT(res->hr_syncsrc == 1149 HAST_SYNCSRC_SECONDARY); 1150 /* 1151 * This range is out-of-date on local component, 1152 * so send request to the remote node. 1153 */ 1154 /* Remote component is 1 for now. */ 1155 ncomp = 1; 1156 } 1157 mtx_unlock(&metadata_lock); 1158 QUEUE_INSERT1(hio, send, ncomp); 1159 break; 1160 case BIO_WRITE: 1161 res->hr_stat_write++; 1162 if (res->hr_resuid == 0 && 1163 res->hr_primary_localcnt == 0) { 1164 /* This is first write. */ 1165 res->hr_primary_localcnt = 1; 1166 } 1167 for (;;) { 1168 mtx_lock(&range_lock); 1169 if (rangelock_islocked(range_sync, 1170 ggio->gctl_offset, ggio->gctl_length)) { 1171 pjdlog_debug(2, 1172 "regular: Range offset=%jd length=%zu locked.", 1173 (intmax_t)ggio->gctl_offset, 1174 (size_t)ggio->gctl_length); 1175 range_regular_wait = true; 1176 cv_wait(&range_regular_cond, &range_lock); 1177 range_regular_wait = false; 1178 mtx_unlock(&range_lock); 1179 continue; 1180 } 1181 if (rangelock_add(range_regular, 1182 ggio->gctl_offset, ggio->gctl_length) < 0) { 1183 mtx_unlock(&range_lock); 1184 pjdlog_debug(2, 1185 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1186 (intmax_t)ggio->gctl_offset, 1187 (size_t)ggio->gctl_length); 1188 sleep(1); 1189 continue; 1190 } 1191 mtx_unlock(&range_lock); 1192 break; 1193 } 1194 mtx_lock(&res->hr_amp_lock); 1195 if (activemap_write_start(res->hr_amp, 1196 ggio->gctl_offset, ggio->gctl_length)) { 1197 res->hr_stat_activemap_update++; 1198 (void)hast_activemap_flush(res); 1199 } 1200 mtx_unlock(&res->hr_amp_lock); 1201 /* FALLTHROUGH */ 1202 case BIO_DELETE: 1203 case BIO_FLUSH: 1204 switch (ggio->gctl_cmd) { 1205 case BIO_DELETE: 1206 res->hr_stat_delete++; 1207 break; 1208 case BIO_FLUSH: 1209 res->hr_stat_flush++; 1210 break; 1211 } 1212 pjdlog_debug(2, 1213 "ggate_recv: (%p) Moving request to the send queue.", 1214 hio); 1215 refcount_init(&hio->hio_countdown, ncomps); 1216 for (ii = 0; ii < ncomps; ii++) 1217 QUEUE_INSERT1(hio, send, ii); 1218 break; 1219 } 1220 } 1221 /* NOTREACHED */ 1222 return (NULL); 1223} 1224 1225/* 1226 * Thread reads from or writes to local component. 1227 * If local read fails, it redirects it to remote_send thread. 1228 */ 1229static void * 1230local_send_thread(void *arg) 1231{ 1232 struct hast_resource *res = arg; 1233 struct g_gate_ctl_io *ggio; 1234 struct hio *hio; 1235 unsigned int ncomp, rncomp; 1236 ssize_t ret; 1237 1238 /* Local component is 0 for now. */ 1239 ncomp = 0; 1240 /* Remote component is 1 for now. */ 1241 rncomp = 1; 1242 1243 for (;;) { 1244 pjdlog_debug(2, "local_send: Taking request."); 1245 QUEUE_TAKE1(hio, send, ncomp, 0); 1246 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1247 ggio = &hio->hio_ggio; 1248 switch (ggio->gctl_cmd) { 1249 case BIO_READ: 1250 ret = pread(res->hr_localfd, ggio->gctl_data, 1251 ggio->gctl_length, 1252 ggio->gctl_offset + res->hr_localoff); 1253 if (ret == ggio->gctl_length) 1254 hio->hio_errors[ncomp] = 0; 1255 else if (!ISSYNCREQ(hio)) { 1256 /* 1257 * If READ failed, try to read from remote node. 1258 */ 1259 if (ret < 0) { 1260 reqlog(LOG_WARNING, 0, ggio, 1261 "Local request failed (%s), trying remote node. ", 1262 strerror(errno)); 1263 } else if (ret != ggio->gctl_length) { 1264 reqlog(LOG_WARNING, 0, ggio, 1265 "Local request failed (%zd != %jd), trying remote node. ", 1266 ret, (intmax_t)ggio->gctl_length); 1267 } 1268 QUEUE_INSERT1(hio, send, rncomp); 1269 continue; 1270 } 1271 break; 1272 case BIO_WRITE: 1273 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1274 ggio->gctl_length, 1275 ggio->gctl_offset + res->hr_localoff); 1276 if (ret < 0) { 1277 hio->hio_errors[ncomp] = errno; 1278 reqlog(LOG_WARNING, 0, ggio, 1279 "Local request failed (%s): ", 1280 strerror(errno)); 1281 } else if (ret != ggio->gctl_length) { 1282 hio->hio_errors[ncomp] = EIO; 1283 reqlog(LOG_WARNING, 0, ggio, 1284 "Local request failed (%zd != %jd): ", 1285 ret, (intmax_t)ggio->gctl_length); 1286 } else { 1287 hio->hio_errors[ncomp] = 0; 1288 } 1289 break; 1290 case BIO_DELETE: 1291 ret = g_delete(res->hr_localfd, 1292 ggio->gctl_offset + res->hr_localoff, 1293 ggio->gctl_length); 1294 if (ret < 0) { 1295 hio->hio_errors[ncomp] = errno; 1296 reqlog(LOG_WARNING, 0, ggio, 1297 "Local request failed (%s): ", 1298 strerror(errno)); 1299 } else { 1300 hio->hio_errors[ncomp] = 0; 1301 } 1302 break; 1303 case BIO_FLUSH: 1304 if (!res->hr_localflush) { 1305 ret = -1; 1306 errno = EOPNOTSUPP; 1307 break; 1308 } 1309 ret = g_flush(res->hr_localfd); 1310 if (ret < 0) { 1311 if (errno == EOPNOTSUPP) 1312 res->hr_localflush = false; 1313 hio->hio_errors[ncomp] = errno; 1314 reqlog(LOG_WARNING, 0, ggio, 1315 "Local request failed (%s): ", 1316 strerror(errno)); 1317 } else { 1318 hio->hio_errors[ncomp] = 0; 1319 } 1320 break; 1321 } 1322 if (refcount_release(&hio->hio_countdown)) { 1323 if (ISSYNCREQ(hio)) { 1324 mtx_lock(&sync_lock); 1325 SYNCREQDONE(hio); 1326 mtx_unlock(&sync_lock); 1327 cv_signal(&sync_cond); 1328 } else { 1329 pjdlog_debug(2, 1330 "local_send: (%p) Moving request to the done queue.", 1331 hio); 1332 QUEUE_INSERT2(hio, done); 1333 } 1334 } 1335 } 1336 /* NOTREACHED */ 1337 return (NULL); 1338} 1339 1340static void 1341keepalive_send(struct hast_resource *res, unsigned int ncomp) 1342{ 1343 struct nv *nv; 1344 1345 rw_rlock(&hio_remote_lock[ncomp]); 1346 1347 if (!ISCONNECTED(res, ncomp)) { 1348 rw_unlock(&hio_remote_lock[ncomp]); 1349 return; 1350 } 1351 1352 PJDLOG_ASSERT(res->hr_remotein != NULL); 1353 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1354 1355 nv = nv_alloc(); 1356 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1357 if (nv_error(nv) != 0) { 1358 rw_unlock(&hio_remote_lock[ncomp]); 1359 nv_free(nv); 1360 pjdlog_debug(1, 1361 "keepalive_send: Unable to prepare header to send."); 1362 return; 1363 } 1364 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1365 rw_unlock(&hio_remote_lock[ncomp]); 1366 pjdlog_common(LOG_DEBUG, 1, errno, 1367 "keepalive_send: Unable to send request"); 1368 nv_free(nv); 1369 remote_close(res, ncomp); 1370 return; 1371 } 1372 1373 rw_unlock(&hio_remote_lock[ncomp]); 1374 nv_free(nv); 1375 pjdlog_debug(2, "keepalive_send: Request sent."); 1376} 1377 1378/* 1379 * Thread sends request to secondary node. 1380 */ 1381static void * 1382remote_send_thread(void *arg) 1383{ 1384 struct hast_resource *res = arg; 1385 struct g_gate_ctl_io *ggio; 1386 time_t lastcheck, now; 1387 struct hio *hio; 1388 struct nv *nv; 1389 unsigned int ncomp; 1390 bool wakeup; 1391 uint64_t offset, length; 1392 uint8_t cmd; 1393 void *data; 1394 1395 /* Remote component is 1 for now. */ 1396 ncomp = 1; 1397 lastcheck = time(NULL); 1398 1399 for (;;) { 1400 pjdlog_debug(2, "remote_send: Taking request."); 1401 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1402 if (hio == NULL) { 1403 now = time(NULL); 1404 if (lastcheck + HAST_KEEPALIVE <= now) { 1405 keepalive_send(res, ncomp); 1406 lastcheck = now; 1407 } 1408 continue; 1409 } 1410 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1411 ggio = &hio->hio_ggio; 1412 switch (ggio->gctl_cmd) { 1413 case BIO_READ: 1414 cmd = HIO_READ; 1415 data = NULL; 1416 offset = ggio->gctl_offset; 1417 length = ggio->gctl_length; 1418 break; 1419 case BIO_WRITE: 1420 cmd = HIO_WRITE; 1421 data = ggio->gctl_data; 1422 offset = ggio->gctl_offset; 1423 length = ggio->gctl_length; 1424 break; 1425 case BIO_DELETE: 1426 cmd = HIO_DELETE; 1427 data = NULL; 1428 offset = ggio->gctl_offset; 1429 length = ggio->gctl_length; 1430 break; 1431 case BIO_FLUSH: 1432 cmd = HIO_FLUSH; 1433 data = NULL; 1434 offset = 0; 1435 length = 0; 1436 break; 1437 default: 1438 PJDLOG_ABORT("invalid condition"); 1439 } 1440 nv = nv_alloc(); 1441 nv_add_uint8(nv, cmd, "cmd"); 1442 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1443 nv_add_uint64(nv, offset, "offset"); 1444 nv_add_uint64(nv, length, "length"); 1445 if (nv_error(nv) != 0) { 1446 hio->hio_errors[ncomp] = nv_error(nv); 1447 pjdlog_debug(2, 1448 "remote_send: (%p) Unable to prepare header to send.", 1449 hio); 1450 reqlog(LOG_ERR, 0, ggio, 1451 "Unable to prepare header to send (%s): ", 1452 strerror(nv_error(nv))); 1453 /* Move failed request immediately to the done queue. */ 1454 goto done_queue; 1455 } 1456 pjdlog_debug(2, 1457 "remote_send: (%p) Moving request to the recv queue.", 1458 hio); 1459 /* 1460 * Protect connection from disappearing. 1461 */ 1462 rw_rlock(&hio_remote_lock[ncomp]); 1463 if (!ISCONNECTED(res, ncomp)) { 1464 rw_unlock(&hio_remote_lock[ncomp]); 1465 hio->hio_errors[ncomp] = ENOTCONN; 1466 goto done_queue; 1467 } 1468 /* 1469 * Move the request to recv queue before sending it, because 1470 * in different order we can get reply before we move request 1471 * to recv queue. 1472 */ 1473 mtx_lock(&hio_recv_list_lock[ncomp]); 1474 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1475 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1476 mtx_unlock(&hio_recv_list_lock[ncomp]); 1477 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1478 data != NULL ? length : 0) < 0) { 1479 hio->hio_errors[ncomp] = errno; 1480 rw_unlock(&hio_remote_lock[ncomp]); 1481 pjdlog_debug(2, 1482 "remote_send: (%p) Unable to send request.", hio); 1483 reqlog(LOG_ERR, 0, ggio, 1484 "Unable to send request (%s): ", 1485 strerror(hio->hio_errors[ncomp])); 1486 remote_close(res, ncomp); 1487 /* 1488 * Take request back from the receive queue and move 1489 * it immediately to the done queue. 1490 */ 1491 mtx_lock(&hio_recv_list_lock[ncomp]); 1492 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1493 mtx_unlock(&hio_recv_list_lock[ncomp]); 1494 goto done_queue; 1495 } 1496 rw_unlock(&hio_remote_lock[ncomp]); 1497 nv_free(nv); 1498 if (wakeup) 1499 cv_signal(&hio_recv_list_cond[ncomp]); 1500 continue; 1501done_queue: 1502 nv_free(nv); 1503 if (ISSYNCREQ(hio)) { 1504 if (!refcount_release(&hio->hio_countdown)) 1505 continue; 1506 mtx_lock(&sync_lock); 1507 SYNCREQDONE(hio); 1508 mtx_unlock(&sync_lock); 1509 cv_signal(&sync_cond); 1510 continue; 1511 } 1512 if (ggio->gctl_cmd == BIO_WRITE) { 1513 mtx_lock(&res->hr_amp_lock); 1514 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1515 ggio->gctl_length)) { 1516 (void)hast_activemap_flush(res); 1517 } 1518 mtx_unlock(&res->hr_amp_lock); 1519 } 1520 if (!refcount_release(&hio->hio_countdown)) 1521 continue; 1522 pjdlog_debug(2, 1523 "remote_send: (%p) Moving request to the done queue.", 1524 hio); 1525 QUEUE_INSERT2(hio, done); 1526 } 1527 /* NOTREACHED */ 1528 return (NULL); 1529} 1530 1531/* 1532 * Thread receives answer from secondary node and passes it to ggate_send 1533 * thread. 1534 */ 1535static void * 1536remote_recv_thread(void *arg) 1537{ 1538 struct hast_resource *res = arg; 1539 struct g_gate_ctl_io *ggio; 1540 struct hio *hio; 1541 struct nv *nv; 1542 unsigned int ncomp; 1543 uint64_t seq; 1544 int error; 1545 1546 /* Remote component is 1 for now. */ 1547 ncomp = 1; 1548 1549 for (;;) { 1550 /* Wait until there is anything to receive. */ 1551 mtx_lock(&hio_recv_list_lock[ncomp]); 1552 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1553 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1554 cv_wait(&hio_recv_list_cond[ncomp], 1555 &hio_recv_list_lock[ncomp]); 1556 } 1557 mtx_unlock(&hio_recv_list_lock[ncomp]); 1558 rw_rlock(&hio_remote_lock[ncomp]); 1559 if (!ISCONNECTED(res, ncomp)) { 1560 rw_unlock(&hio_remote_lock[ncomp]); 1561 /* 1562 * Connection is dead, so move all pending requests to 1563 * the done queue (one-by-one). 1564 */ 1565 mtx_lock(&hio_recv_list_lock[ncomp]); 1566 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1567 PJDLOG_ASSERT(hio != NULL); 1568 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1569 hio_next[ncomp]); 1570 mtx_unlock(&hio_recv_list_lock[ncomp]); 1571 goto done_queue; 1572 } 1573 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1574 pjdlog_errno(LOG_ERR, 1575 "Unable to receive reply header"); 1576 rw_unlock(&hio_remote_lock[ncomp]); 1577 remote_close(res, ncomp); 1578 continue; 1579 } 1580 rw_unlock(&hio_remote_lock[ncomp]); 1581 seq = nv_get_uint64(nv, "seq"); 1582 if (seq == 0) { 1583 pjdlog_error("Header contains no 'seq' field."); 1584 nv_free(nv); 1585 continue; 1586 } 1587 mtx_lock(&hio_recv_list_lock[ncomp]); 1588 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1589 if (hio->hio_ggio.gctl_seq == seq) { 1590 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1591 hio_next[ncomp]); 1592 break; 1593 } 1594 } 1595 mtx_unlock(&hio_recv_list_lock[ncomp]); 1596 if (hio == NULL) { 1597 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1598 (uintmax_t)seq); 1599 nv_free(nv); 1600 continue; 1601 } 1602 error = nv_get_int16(nv, "error"); 1603 if (error != 0) { 1604 /* Request failed on remote side. */ 1605 hio->hio_errors[ncomp] = error; 1606 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1607 "Remote request failed (%s): ", strerror(error)); 1608 nv_free(nv); 1609 goto done_queue; 1610 } 1611 ggio = &hio->hio_ggio; 1612 switch (ggio->gctl_cmd) { 1613 case BIO_READ: 1614 rw_rlock(&hio_remote_lock[ncomp]); 1615 if (!ISCONNECTED(res, ncomp)) { 1616 rw_unlock(&hio_remote_lock[ncomp]); 1617 nv_free(nv); 1618 goto done_queue; 1619 } 1620 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1621 ggio->gctl_data, ggio->gctl_length) < 0) { 1622 hio->hio_errors[ncomp] = errno; 1623 pjdlog_errno(LOG_ERR, 1624 "Unable to receive reply data"); 1625 rw_unlock(&hio_remote_lock[ncomp]); 1626 nv_free(nv); 1627 remote_close(res, ncomp); 1628 goto done_queue; 1629 } 1630 rw_unlock(&hio_remote_lock[ncomp]); 1631 break; 1632 case BIO_WRITE: 1633 case BIO_DELETE: 1634 case BIO_FLUSH: 1635 break; 1636 default: 1637 PJDLOG_ABORT("invalid condition"); 1638 } 1639 hio->hio_errors[ncomp] = 0; 1640 nv_free(nv); 1641done_queue: 1642 if (refcount_release(&hio->hio_countdown)) { 1643 if (ISSYNCREQ(hio)) { 1644 mtx_lock(&sync_lock); 1645 SYNCREQDONE(hio); 1646 mtx_unlock(&sync_lock); 1647 cv_signal(&sync_cond); 1648 } else { 1649 pjdlog_debug(2, 1650 "remote_recv: (%p) Moving request to the done queue.", 1651 hio); 1652 QUEUE_INSERT2(hio, done); 1653 } 1654 } 1655 } 1656 /* NOTREACHED */ 1657 return (NULL); 1658} 1659 1660/* 1661 * Thread sends answer to the kernel. 1662 */ 1663static void * 1664ggate_send_thread(void *arg) 1665{ 1666 struct hast_resource *res = arg; 1667 struct g_gate_ctl_io *ggio; 1668 struct hio *hio; 1669 unsigned int ii, ncomp, ncomps; 1670 1671 ncomps = HAST_NCOMPONENTS; 1672 1673 for (;;) { 1674 pjdlog_debug(2, "ggate_send: Taking request."); 1675 QUEUE_TAKE2(hio, done); 1676 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1677 ggio = &hio->hio_ggio; 1678 for (ii = 0; ii < ncomps; ii++) { 1679 if (hio->hio_errors[ii] == 0) { 1680 /* 1681 * One successful request is enough to declare 1682 * success. 1683 */ 1684 ggio->gctl_error = 0; 1685 break; 1686 } 1687 } 1688 if (ii == ncomps) { 1689 /* 1690 * None of the requests were successful. 1691 * Use the error from local component except the 1692 * case when we did only remote request. 1693 */ 1694 if (ggio->gctl_cmd == BIO_READ && 1695 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1696 ggio->gctl_error = hio->hio_errors[1]; 1697 else 1698 ggio->gctl_error = hio->hio_errors[0]; 1699 } 1700 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1701 mtx_lock(&res->hr_amp_lock); 1702 if (activemap_write_complete(res->hr_amp, 1703 ggio->gctl_offset, ggio->gctl_length)) { 1704 res->hr_stat_activemap_update++; 1705 (void)hast_activemap_flush(res); 1706 } 1707 mtx_unlock(&res->hr_amp_lock); 1708 } 1709 if (ggio->gctl_cmd == BIO_WRITE) { 1710 /* 1711 * Unlock range we locked. 1712 */ 1713 mtx_lock(&range_lock); 1714 rangelock_del(range_regular, ggio->gctl_offset, 1715 ggio->gctl_length); 1716 if (range_sync_wait) 1717 cv_signal(&range_sync_cond); 1718 mtx_unlock(&range_lock); 1719 /* 1720 * Bump local count if this is first write after 1721 * connection failure with remote node. 1722 */ 1723 ncomp = 1; 1724 rw_rlock(&hio_remote_lock[ncomp]); 1725 if (!ISCONNECTED(res, ncomp)) { 1726 mtx_lock(&metadata_lock); 1727 if (res->hr_primary_localcnt == 1728 res->hr_secondary_remotecnt) { 1729 res->hr_primary_localcnt++; 1730 pjdlog_debug(1, 1731 "Increasing localcnt to %ju.", 1732 (uintmax_t)res->hr_primary_localcnt); 1733 (void)metadata_write(res); 1734 } 1735 mtx_unlock(&metadata_lock); 1736 } 1737 rw_unlock(&hio_remote_lock[ncomp]); 1738 } 1739 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1740 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1741 pjdlog_debug(2, 1742 "ggate_send: (%p) Moving request to the free queue.", hio); 1743 QUEUE_INSERT2(hio, free); 1744 } 1745 /* NOTREACHED */ 1746 return (NULL); 1747} 1748 1749/* 1750 * Thread synchronize local and remote components. 1751 */ 1752static void * 1753sync_thread(void *arg __unused) 1754{ 1755 struct hast_resource *res = arg; 1756 struct hio *hio; 1757 struct g_gate_ctl_io *ggio; 1758 struct timeval tstart, tend, tdiff; 1759 unsigned int ii, ncomp, ncomps; 1760 off_t offset, length, synced; 1761 bool dorewind; 1762 int syncext; 1763 1764 ncomps = HAST_NCOMPONENTS; 1765 dorewind = true; 1766 synced = 0; 1767 offset = -1; 1768 1769 for (;;) { 1770 mtx_lock(&sync_lock); 1771 if (offset >= 0 && !sync_inprogress) { 1772 gettimeofday(&tend, NULL); 1773 timersub(&tend, &tstart, &tdiff); 1774 pjdlog_info("Synchronization interrupted after %#.0T. " 1775 "%NB synchronized so far.", &tdiff, 1776 (intmax_t)synced); 1777 event_send(res, EVENT_SYNCINTR); 1778 } 1779 while (!sync_inprogress) { 1780 dorewind = true; 1781 synced = 0; 1782 cv_wait(&sync_cond, &sync_lock); 1783 } 1784 mtx_unlock(&sync_lock); 1785 /* 1786 * Obtain offset at which we should synchronize. 1787 * Rewind synchronization if needed. 1788 */ 1789 mtx_lock(&res->hr_amp_lock); 1790 if (dorewind) 1791 activemap_sync_rewind(res->hr_amp); 1792 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1793 if (syncext != -1) { 1794 /* 1795 * We synchronized entire syncext extent, we can mark 1796 * it as clean now. 1797 */ 1798 if (activemap_extent_complete(res->hr_amp, syncext)) 1799 (void)hast_activemap_flush(res); 1800 } 1801 mtx_unlock(&res->hr_amp_lock); 1802 if (dorewind) { 1803 dorewind = false; 1804 if (offset < 0) 1805 pjdlog_info("Nodes are in sync."); 1806 else { 1807 pjdlog_info("Synchronization started. %NB to go.", 1808 (intmax_t)(res->hr_extentsize * 1809 activemap_ndirty(res->hr_amp))); 1810 event_send(res, EVENT_SYNCSTART); 1811 gettimeofday(&tstart, NULL); 1812 } 1813 } 1814 if (offset < 0) { 1815 sync_stop(); 1816 pjdlog_debug(1, "Nothing to synchronize."); 1817 /* 1818 * Synchronization complete, make both localcnt and 1819 * remotecnt equal. 1820 */ 1821 ncomp = 1; 1822 rw_rlock(&hio_remote_lock[ncomp]); 1823 if (ISCONNECTED(res, ncomp)) { 1824 if (synced > 0) { 1825 int64_t bps; 1826 1827 gettimeofday(&tend, NULL); 1828 timersub(&tend, &tstart, &tdiff); 1829 bps = (int64_t)((double)synced / 1830 ((double)tdiff.tv_sec + 1831 (double)tdiff.tv_usec / 1000000)); 1832 pjdlog_info("Synchronization complete. " 1833 "%NB synchronized in %#.0lT (%NB/sec).", 1834 (intmax_t)synced, &tdiff, 1835 (intmax_t)bps); 1836 event_send(res, EVENT_SYNCDONE); 1837 } 1838 mtx_lock(&metadata_lock); 1839 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1840 res->hr_primary_localcnt = 1841 res->hr_secondary_remotecnt; 1842 res->hr_primary_remotecnt = 1843 res->hr_secondary_localcnt; 1844 pjdlog_debug(1, 1845 "Setting localcnt to %ju and remotecnt to %ju.", 1846 (uintmax_t)res->hr_primary_localcnt, 1847 (uintmax_t)res->hr_primary_remotecnt); 1848 (void)metadata_write(res); 1849 mtx_unlock(&metadata_lock); 1850 } 1851 rw_unlock(&hio_remote_lock[ncomp]); 1852 continue; 1853 } 1854 pjdlog_debug(2, "sync: Taking free request."); 1855 QUEUE_TAKE2(hio, free); 1856 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1857 /* 1858 * Lock the range we are going to synchronize. We don't want 1859 * race where someone writes between our read and write. 1860 */ 1861 for (;;) { 1862 mtx_lock(&range_lock); 1863 if (rangelock_islocked(range_regular, offset, length)) { 1864 pjdlog_debug(2, 1865 "sync: Range offset=%jd length=%jd locked.", 1866 (intmax_t)offset, (intmax_t)length); 1867 range_sync_wait = true; 1868 cv_wait(&range_sync_cond, &range_lock); 1869 range_sync_wait = false; 1870 mtx_unlock(&range_lock); 1871 continue; 1872 } 1873 if (rangelock_add(range_sync, offset, length) < 0) { 1874 mtx_unlock(&range_lock); 1875 pjdlog_debug(2, 1876 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1877 (intmax_t)offset, (intmax_t)length); 1878 sleep(1); 1879 continue; 1880 } 1881 mtx_unlock(&range_lock); 1882 break; 1883 } 1884 /* 1885 * First read the data from synchronization source. 1886 */ 1887 SYNCREQ(hio); 1888 ggio = &hio->hio_ggio; 1889 ggio->gctl_cmd = BIO_READ; 1890 ggio->gctl_offset = offset; 1891 ggio->gctl_length = length; 1892 ggio->gctl_error = 0; 1893 for (ii = 0; ii < ncomps; ii++) 1894 hio->hio_errors[ii] = EINVAL; 1895 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1896 hio); 1897 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1898 hio); 1899 mtx_lock(&metadata_lock); 1900 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1901 /* 1902 * This range is up-to-date on local component, 1903 * so handle request locally. 1904 */ 1905 /* Local component is 0 for now. */ 1906 ncomp = 0; 1907 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1908 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1909 /* 1910 * This range is out-of-date on local component, 1911 * so send request to the remote node. 1912 */ 1913 /* Remote component is 1 for now. */ 1914 ncomp = 1; 1915 } 1916 mtx_unlock(&metadata_lock); 1917 refcount_init(&hio->hio_countdown, 1); 1918 QUEUE_INSERT1(hio, send, ncomp); 1919 1920 /* 1921 * Let's wait for READ to finish. 1922 */ 1923 mtx_lock(&sync_lock); 1924 while (!ISSYNCREQDONE(hio)) 1925 cv_wait(&sync_cond, &sync_lock); 1926 mtx_unlock(&sync_lock); 1927 1928 if (hio->hio_errors[ncomp] != 0) { 1929 pjdlog_error("Unable to read synchronization data: %s.", 1930 strerror(hio->hio_errors[ncomp])); 1931 goto free_queue; 1932 } 1933 1934 /* 1935 * We read the data from synchronization source, now write it 1936 * to synchronization target. 1937 */ 1938 SYNCREQ(hio); 1939 ggio->gctl_cmd = BIO_WRITE; 1940 for (ii = 0; ii < ncomps; ii++) 1941 hio->hio_errors[ii] = EINVAL; 1942 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1943 hio); 1944 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1945 hio); 1946 mtx_lock(&metadata_lock); 1947 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1948 /* 1949 * This range is up-to-date on local component, 1950 * so we update remote component. 1951 */ 1952 /* Remote component is 1 for now. */ 1953 ncomp = 1; 1954 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1955 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1956 /* 1957 * This range is out-of-date on local component, 1958 * so we update it. 1959 */ 1960 /* Local component is 0 for now. */ 1961 ncomp = 0; 1962 } 1963 mtx_unlock(&metadata_lock); 1964 1965 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1966 hio); 1967 refcount_init(&hio->hio_countdown, 1); 1968 QUEUE_INSERT1(hio, send, ncomp); 1969 1970 /* 1971 * Let's wait for WRITE to finish. 1972 */ 1973 mtx_lock(&sync_lock); 1974 while (!ISSYNCREQDONE(hio)) 1975 cv_wait(&sync_cond, &sync_lock); 1976 mtx_unlock(&sync_lock); 1977 1978 if (hio->hio_errors[ncomp] != 0) { 1979 pjdlog_error("Unable to write synchronization data: %s.", 1980 strerror(hio->hio_errors[ncomp])); 1981 goto free_queue; 1982 } 1983 1984 synced += length; 1985free_queue: 1986 mtx_lock(&range_lock); 1987 rangelock_del(range_sync, offset, length); 1988 if (range_regular_wait) 1989 cv_signal(&range_regular_cond); 1990 mtx_unlock(&range_lock); 1991 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1992 hio); 1993 QUEUE_INSERT2(hio, free); 1994 } 1995 /* NOTREACHED */ 1996 return (NULL); 1997} 1998 1999void 2000primary_config_reload(struct hast_resource *res, struct nv *nv) 2001{ 2002 unsigned int ii, ncomps; 2003 int modified, vint; 2004 const char *vstr; 2005 2006 pjdlog_info("Reloading configuration..."); 2007 2008 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2009 PJDLOG_ASSERT(gres == res); 2010 nv_assert(nv, "remoteaddr"); 2011 nv_assert(nv, "sourceaddr"); 2012 nv_assert(nv, "replication"); 2013 nv_assert(nv, "checksum"); 2014 nv_assert(nv, "compression"); 2015 nv_assert(nv, "timeout"); 2016 nv_assert(nv, "exec"); 2017 nv_assert(nv, "metaflush"); 2018 2019 ncomps = HAST_NCOMPONENTS; 2020 2021#define MODIFIED_REMOTEADDR 0x01 2022#define MODIFIED_SOURCEADDR 0x02 2023#define MODIFIED_REPLICATION 0x04 2024#define MODIFIED_CHECKSUM 0x08 2025#define MODIFIED_COMPRESSION 0x10 2026#define MODIFIED_TIMEOUT 0x20 2027#define MODIFIED_EXEC 0x40 2028#define MODIFIED_METAFLUSH 0x80 2029 modified = 0; 2030 2031 vstr = nv_get_string(nv, "remoteaddr"); 2032 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2033 /* 2034 * Don't copy res->hr_remoteaddr to gres just yet. 2035 * We want remote_close() to log disconnect from the old 2036 * addresses, not from the new ones. 2037 */ 2038 modified |= MODIFIED_REMOTEADDR; 2039 } 2040 vstr = nv_get_string(nv, "sourceaddr"); 2041 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2042 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2043 modified |= MODIFIED_SOURCEADDR; 2044 } 2045 vint = nv_get_int32(nv, "replication"); 2046 if (gres->hr_replication != vint) { 2047 gres->hr_replication = vint; 2048 modified |= MODIFIED_REPLICATION; 2049 } 2050 vint = nv_get_int32(nv, "checksum"); 2051 if (gres->hr_checksum != vint) { 2052 gres->hr_checksum = vint; 2053 modified |= MODIFIED_CHECKSUM; 2054 } 2055 vint = nv_get_int32(nv, "compression"); 2056 if (gres->hr_compression != vint) { 2057 gres->hr_compression = vint; 2058 modified |= MODIFIED_COMPRESSION; 2059 } 2060 vint = nv_get_int32(nv, "timeout"); 2061 if (gres->hr_timeout != vint) { 2062 gres->hr_timeout = vint; 2063 modified |= MODIFIED_TIMEOUT; 2064 } 2065 vstr = nv_get_string(nv, "exec"); 2066 if (strcmp(gres->hr_exec, vstr) != 0) { 2067 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2068 modified |= MODIFIED_EXEC; 2069 } 2070 vint = nv_get_int32(nv, "metaflush"); 2071 if (gres->hr_metaflush != vint) { 2072 gres->hr_metaflush = vint; 2073 modified |= MODIFIED_METAFLUSH; 2074 } 2075 2076 /* 2077 * Change timeout for connected sockets. 2078 * Don't bother if we need to reconnect. 2079 */ 2080 if ((modified & MODIFIED_TIMEOUT) != 0 && 2081 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2082 MODIFIED_REPLICATION)) == 0) { 2083 for (ii = 0; ii < ncomps; ii++) { 2084 if (!ISREMOTE(ii)) 2085 continue; 2086 rw_rlock(&hio_remote_lock[ii]); 2087 if (!ISCONNECTED(gres, ii)) { 2088 rw_unlock(&hio_remote_lock[ii]); 2089 continue; 2090 } 2091 rw_unlock(&hio_remote_lock[ii]); 2092 if (proto_timeout(gres->hr_remotein, 2093 gres->hr_timeout) < 0) { 2094 pjdlog_errno(LOG_WARNING, 2095 "Unable to set connection timeout"); 2096 } 2097 if (proto_timeout(gres->hr_remoteout, 2098 gres->hr_timeout) < 0) { 2099 pjdlog_errno(LOG_WARNING, 2100 "Unable to set connection timeout"); 2101 } 2102 } 2103 } 2104 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2105 MODIFIED_REPLICATION)) != 0) { 2106 for (ii = 0; ii < ncomps; ii++) { 2107 if (!ISREMOTE(ii)) 2108 continue; 2109 remote_close(gres, ii); 2110 } 2111 if (modified & MODIFIED_REMOTEADDR) { 2112 vstr = nv_get_string(nv, "remoteaddr"); 2113 strlcpy(gres->hr_remoteaddr, vstr, 2114 sizeof(gres->hr_remoteaddr)); 2115 } 2116 } 2117#undef MODIFIED_REMOTEADDR 2118#undef MODIFIED_SOURCEADDR 2119#undef MODIFIED_REPLICATION 2120#undef MODIFIED_CHECKSUM 2121#undef MODIFIED_COMPRESSION 2122#undef MODIFIED_TIMEOUT 2123#undef MODIFIED_EXEC 2124#undef MODIFIED_METAFLUSH 2125 2126 pjdlog_info("Configuration reloaded successfully."); 2127} 2128 2129static void 2130guard_one(struct hast_resource *res, unsigned int ncomp) 2131{ 2132 struct proto_conn *in, *out; 2133 2134 if (!ISREMOTE(ncomp)) 2135 return; 2136 2137 rw_rlock(&hio_remote_lock[ncomp]); 2138 2139 if (!real_remote(res)) { 2140 rw_unlock(&hio_remote_lock[ncomp]); 2141 return; 2142 } 2143 2144 if (ISCONNECTED(res, ncomp)) { 2145 PJDLOG_ASSERT(res->hr_remotein != NULL); 2146 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2147 rw_unlock(&hio_remote_lock[ncomp]); 2148 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2149 res->hr_remoteaddr); 2150 return; 2151 } 2152 2153 PJDLOG_ASSERT(res->hr_remotein == NULL); 2154 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2155 /* 2156 * Upgrade the lock. It doesn't have to be atomic as no other thread 2157 * can change connection status from disconnected to connected. 2158 */ 2159 rw_unlock(&hio_remote_lock[ncomp]); 2160 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2161 res->hr_remoteaddr); 2162 in = out = NULL; 2163 if (init_remote(res, &in, &out) == 0) { 2164 rw_wlock(&hio_remote_lock[ncomp]); 2165 PJDLOG_ASSERT(res->hr_remotein == NULL); 2166 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2167 PJDLOG_ASSERT(in != NULL && out != NULL); 2168 res->hr_remotein = in; 2169 res->hr_remoteout = out; 2170 rw_unlock(&hio_remote_lock[ncomp]); 2171 pjdlog_info("Successfully reconnected to %s.", 2172 res->hr_remoteaddr); 2173 sync_start(); 2174 } else { 2175 /* Both connections should be NULL. */ 2176 PJDLOG_ASSERT(res->hr_remotein == NULL); 2177 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2178 PJDLOG_ASSERT(in == NULL && out == NULL); 2179 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2180 res->hr_remoteaddr); 2181 } 2182} 2183 2184/* 2185 * Thread guards remote connections and reconnects when needed, handles 2186 * signals, etc. 2187 */ 2188static void * 2189guard_thread(void *arg) 2190{ 2191 struct hast_resource *res = arg; 2192 unsigned int ii, ncomps; 2193 struct timespec timeout; 2194 time_t lastcheck, now; 2195 sigset_t mask; 2196 int signo; 2197 2198 ncomps = HAST_NCOMPONENTS; 2199 lastcheck = time(NULL); 2200 2201 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2202 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2203 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2204 2205 timeout.tv_sec = HAST_KEEPALIVE; 2206 timeout.tv_nsec = 0; 2207 signo = -1; 2208 2209 for (;;) { 2210 switch (signo) { 2211 case SIGINT: 2212 case SIGTERM: 2213 sigexit_received = true; 2214 primary_exitx(EX_OK, 2215 "Termination signal received, exiting."); 2216 break; 2217 default: 2218 break; 2219 } 2220 2221 /* 2222 * Don't check connections until we fully started, 2223 * as we may still be looping, waiting for remote node 2224 * to switch from primary to secondary. 2225 */ 2226 if (fullystarted) { 2227 pjdlog_debug(2, "remote_guard: Checking connections."); 2228 now = time(NULL); 2229 if (lastcheck + HAST_KEEPALIVE <= now) { 2230 for (ii = 0; ii < ncomps; ii++) 2231 guard_one(res, ii); 2232 lastcheck = now; 2233 } 2234 } 2235 signo = sigtimedwait(&mask, NULL, &timeout); 2236 } 2237 /* NOTREACHED */ 2238 return (NULL); 2239} 2240