primary.c revision 246922
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 246922 2013-02-17 21:12:34Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/stat.h> 39 40#include <geom/gate/g_gate.h> 41 42#include <err.h> 43#include <errno.h> 44#include <fcntl.h> 45#include <libgeom.h> 46#include <pthread.h> 47#include <signal.h> 48#include <stdint.h> 49#include <stdio.h> 50#include <string.h> 51#include <sysexits.h> 52#include <unistd.h> 53 54#include <activemap.h> 55#include <nv.h> 56#include <rangelock.h> 57 58#include "control.h" 59#include "event.h" 60#include "hast.h" 61#include "hast_proto.h" 62#include "hastd.h" 63#include "hooks.h" 64#include "metadata.h" 65#include "proto.h" 66#include "pjdlog.h" 67#include "refcnt.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102}; 103#define hio_free_next hio_next[0] 104#define hio_done_next hio_next[0] 105 106/* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110static TAILQ_HEAD(, hio) hio_free_list; 111static pthread_mutex_t hio_free_list_lock; 112static pthread_cond_t hio_free_list_cond; 113/* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118static TAILQ_HEAD(, hio) *hio_send_list; 119static pthread_mutex_t *hio_send_list_lock; 120static pthread_cond_t *hio_send_list_cond; 121/* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125static TAILQ_HEAD(, hio) *hio_recv_list; 126static pthread_mutex_t *hio_recv_list_lock; 127static pthread_cond_t *hio_recv_list_cond; 128/* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132static TAILQ_HEAD(, hio) hio_done_list; 133static pthread_mutex_t hio_done_list_lock; 134static pthread_cond_t hio_done_list_cond; 135/* 136 * Structure below are for interaction with sync thread. 137 */ 138static bool sync_inprogress; 139static pthread_mutex_t sync_lock; 140static pthread_cond_t sync_cond; 141/* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144static pthread_rwlock_t *hio_remote_lock; 145 146/* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150static pthread_mutex_t metadata_lock; 151 152/* 153 * Maximum number of outstanding I/O requests. 154 */ 155#define HAST_HIO_MAX 256 156/* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161#define HAST_NCOMPONENTS 2 162 163#define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166#define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176} while (0) 177#define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186} while (0) 187#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203} while (0) 204#define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212} while (0) 213 214#define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217} while (0) 218#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222static struct hast_resource *gres; 223 224static pthread_mutex_t range_lock; 225static struct rangelocks *range_regular; 226static bool range_regular_wait; 227static pthread_cond_t range_regular_cond; 228static struct rangelocks *range_sync; 229static bool range_sync_wait; 230static pthread_cond_t range_sync_cond; 231static bool fullystarted; 232 233static void *ggate_recv_thread(void *arg); 234static void *local_send_thread(void *arg); 235static void *remote_send_thread(void *arg); 236static void *remote_recv_thread(void *arg); 237static void *ggate_send_thread(void *arg); 238static void *sync_thread(void *arg); 239static void *guard_thread(void *arg); 240 241static void 242cleanup(struct hast_resource *res) 243{ 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267} 268 269static __dead2 void 270primary_exit(int exitcode, const char *fmt, ...) 271{ 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280} 281 282static __dead2 void 283primary_exitx(int exitcode, const char *fmt, ...) 284{ 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292} 293 294static int 295hast_activemap_flush(struct hast_resource *res) 296{ 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 return (-1); 307 } 308 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 309 if (errno == EOPNOTSUPP) { 310 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 311 res->hr_localpath); 312 res->hr_metaflush = 0; 313 } else { 314 pjdlog_errno(LOG_ERR, 315 "Unable to flush disk cache on activemap update"); 316 return (-1); 317 } 318 } 319 return (0); 320} 321 322static bool 323real_remote(const struct hast_resource *res) 324{ 325 326 return (strcmp(res->hr_remoteaddr, "none") != 0); 327} 328 329static void 330init_environment(struct hast_resource *res __unused) 331{ 332 struct hio *hio; 333 unsigned int ii, ncomps; 334 335 /* 336 * In the future it might be per-resource value. 337 */ 338 ncomps = HAST_NCOMPONENTS; 339 340 /* 341 * Allocate memory needed by lists. 342 */ 343 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 344 if (hio_send_list == NULL) { 345 primary_exitx(EX_TEMPFAIL, 346 "Unable to allocate %zu bytes of memory for send lists.", 347 sizeof(hio_send_list[0]) * ncomps); 348 } 349 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 350 if (hio_send_list_lock == NULL) { 351 primary_exitx(EX_TEMPFAIL, 352 "Unable to allocate %zu bytes of memory for send list locks.", 353 sizeof(hio_send_list_lock[0]) * ncomps); 354 } 355 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 356 if (hio_send_list_cond == NULL) { 357 primary_exitx(EX_TEMPFAIL, 358 "Unable to allocate %zu bytes of memory for send list condition variables.", 359 sizeof(hio_send_list_cond[0]) * ncomps); 360 } 361 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 362 if (hio_recv_list == NULL) { 363 primary_exitx(EX_TEMPFAIL, 364 "Unable to allocate %zu bytes of memory for recv lists.", 365 sizeof(hio_recv_list[0]) * ncomps); 366 } 367 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 368 if (hio_recv_list_lock == NULL) { 369 primary_exitx(EX_TEMPFAIL, 370 "Unable to allocate %zu bytes of memory for recv list locks.", 371 sizeof(hio_recv_list_lock[0]) * ncomps); 372 } 373 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 374 if (hio_recv_list_cond == NULL) { 375 primary_exitx(EX_TEMPFAIL, 376 "Unable to allocate %zu bytes of memory for recv list condition variables.", 377 sizeof(hio_recv_list_cond[0]) * ncomps); 378 } 379 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 380 if (hio_remote_lock == NULL) { 381 primary_exitx(EX_TEMPFAIL, 382 "Unable to allocate %zu bytes of memory for remote connections locks.", 383 sizeof(hio_remote_lock[0]) * ncomps); 384 } 385 386 /* 387 * Initialize lists, their locks and theirs condition variables. 388 */ 389 TAILQ_INIT(&hio_free_list); 390 mtx_init(&hio_free_list_lock); 391 cv_init(&hio_free_list_cond); 392 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 393 TAILQ_INIT(&hio_send_list[ii]); 394 mtx_init(&hio_send_list_lock[ii]); 395 cv_init(&hio_send_list_cond[ii]); 396 TAILQ_INIT(&hio_recv_list[ii]); 397 mtx_init(&hio_recv_list_lock[ii]); 398 cv_init(&hio_recv_list_cond[ii]); 399 rw_init(&hio_remote_lock[ii]); 400 } 401 TAILQ_INIT(&hio_done_list); 402 mtx_init(&hio_done_list_lock); 403 cv_init(&hio_done_list_cond); 404 mtx_init(&metadata_lock); 405 406 /* 407 * Allocate requests pool and initialize requests. 408 */ 409 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 410 hio = malloc(sizeof(*hio)); 411 if (hio == NULL) { 412 primary_exitx(EX_TEMPFAIL, 413 "Unable to allocate %zu bytes of memory for hio request.", 414 sizeof(*hio)); 415 } 416 hio->hio_countdown = 0; 417 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 418 if (hio->hio_errors == NULL) { 419 primary_exitx(EX_TEMPFAIL, 420 "Unable allocate %zu bytes of memory for hio errors.", 421 sizeof(hio->hio_errors[0]) * ncomps); 422 } 423 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 424 if (hio->hio_next == NULL) { 425 primary_exitx(EX_TEMPFAIL, 426 "Unable allocate %zu bytes of memory for hio_next field.", 427 sizeof(hio->hio_next[0]) * ncomps); 428 } 429 hio->hio_ggio.gctl_version = G_GATE_VERSION; 430 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 431 if (hio->hio_ggio.gctl_data == NULL) { 432 primary_exitx(EX_TEMPFAIL, 433 "Unable to allocate %zu bytes of memory for gctl_data.", 434 MAXPHYS); 435 } 436 hio->hio_ggio.gctl_length = MAXPHYS; 437 hio->hio_ggio.gctl_error = 0; 438 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 439 } 440} 441 442static bool 443init_resuid(struct hast_resource *res) 444{ 445 446 mtx_lock(&metadata_lock); 447 if (res->hr_resuid != 0) { 448 mtx_unlock(&metadata_lock); 449 return (false); 450 } else { 451 /* Initialize unique resource identifier. */ 452 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 453 mtx_unlock(&metadata_lock); 454 if (metadata_write(res) == -1) 455 exit(EX_NOINPUT); 456 return (true); 457 } 458} 459 460static void 461init_local(struct hast_resource *res) 462{ 463 unsigned char *buf; 464 size_t mapsize; 465 466 if (metadata_read(res, true) == -1) 467 exit(EX_NOINPUT); 468 mtx_init(&res->hr_amp_lock); 469 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 470 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 471 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 472 } 473 mtx_init(&range_lock); 474 cv_init(&range_regular_cond); 475 if (rangelock_init(&range_regular) == -1) 476 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 477 cv_init(&range_sync_cond); 478 if (rangelock_init(&range_sync) == -1) 479 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 480 mapsize = activemap_ondisk_size(res->hr_amp); 481 buf = calloc(1, mapsize); 482 if (buf == NULL) { 483 primary_exitx(EX_TEMPFAIL, 484 "Unable to allocate buffer for activemap."); 485 } 486 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 487 (ssize_t)mapsize) { 488 primary_exit(EX_NOINPUT, "Unable to read activemap"); 489 } 490 activemap_copyin(res->hr_amp, buf, mapsize); 491 free(buf); 492 if (res->hr_resuid != 0) 493 return; 494 /* 495 * We're using provider for the first time. Initialize local and remote 496 * counters. We don't initialize resuid here, as we want to do it just 497 * in time. The reason for this is that we want to inform secondary 498 * that there were no writes yet, so there is no need to synchronize 499 * anything. 500 */ 501 res->hr_primary_localcnt = 0; 502 res->hr_primary_remotecnt = 0; 503 if (metadata_write(res) == -1) 504 exit(EX_NOINPUT); 505} 506 507static int 508primary_connect(struct hast_resource *res, struct proto_conn **connp) 509{ 510 struct proto_conn *conn; 511 int16_t val; 512 513 val = 1; 514 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 515 primary_exit(EX_TEMPFAIL, 516 "Unable to send connection request to parent"); 517 } 518 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 519 primary_exit(EX_TEMPFAIL, 520 "Unable to receive reply to connection request from parent"); 521 } 522 if (val != 0) { 523 errno = val; 524 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 525 res->hr_remoteaddr); 526 return (-1); 527 } 528 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 529 primary_exit(EX_TEMPFAIL, 530 "Unable to receive connection from parent"); 531 } 532 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 533 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 534 res->hr_remoteaddr); 535 proto_close(conn); 536 return (-1); 537 } 538 /* Error in setting timeout is not critical, but why should it fail? */ 539 if (proto_timeout(conn, res->hr_timeout) == -1) 540 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 541 542 *connp = conn; 543 544 return (0); 545} 546 547/* 548 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 549 */ 550static void 551enable_direct_reads(struct hast_resource *res) 552{ 553 struct g_gate_ctl_modify ggiomodify; 554 555 bzero(&ggiomodify, sizeof(ggiomodify)); 556 ggiomodify.gctl_version = G_GATE_VERSION; 557 ggiomodify.gctl_unit = res->hr_ggateunit; 558 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 559 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 560 sizeof(ggiomodify.gctl_readprov)); 561 ggiomodify.gctl_readoffset = res->hr_localoff; 562 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 563 pjdlog_debug(1, "Direct reads enabled."); 564 else 565 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 566} 567 568static int 569init_remote(struct hast_resource *res, struct proto_conn **inp, 570 struct proto_conn **outp) 571{ 572 struct proto_conn *in, *out; 573 struct nv *nvout, *nvin; 574 const unsigned char *token; 575 unsigned char *map; 576 const char *errmsg; 577 int32_t extentsize; 578 int64_t datasize; 579 uint32_t mapsize; 580 uint8_t version; 581 size_t size; 582 int error; 583 584 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 585 PJDLOG_ASSERT(real_remote(res)); 586 587 in = out = NULL; 588 errmsg = NULL; 589 590 if (primary_connect(res, &out) == -1) 591 return (ECONNREFUSED); 592 593 error = ECONNABORTED; 594 595 /* 596 * First handshake step. 597 * Setup outgoing connection with remote node. 598 */ 599 nvout = nv_alloc(); 600 nv_add_string(nvout, res->hr_name, "resource"); 601 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 602 if (nv_error(nvout) != 0) { 603 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 604 "Unable to allocate header for connection with %s", 605 res->hr_remoteaddr); 606 nv_free(nvout); 607 goto close; 608 } 609 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 610 pjdlog_errno(LOG_WARNING, 611 "Unable to send handshake header to %s", 612 res->hr_remoteaddr); 613 nv_free(nvout); 614 goto close; 615 } 616 nv_free(nvout); 617 if (hast_proto_recv_hdr(out, &nvin) == -1) { 618 pjdlog_errno(LOG_WARNING, 619 "Unable to receive handshake header from %s", 620 res->hr_remoteaddr); 621 goto close; 622 } 623 errmsg = nv_get_string(nvin, "errmsg"); 624 if (errmsg != NULL) { 625 pjdlog_warning("%s", errmsg); 626 if (nv_exists(nvin, "wait")) 627 error = EBUSY; 628 nv_free(nvin); 629 goto close; 630 } 631 version = nv_get_uint8(nvin, "version"); 632 if (version == 0) { 633 /* 634 * If no version is sent, it means this is protocol version 1. 635 */ 636 version = 1; 637 } 638 if (version > HAST_PROTO_VERSION) { 639 pjdlog_warning("Invalid version received (%hhu).", version); 640 nv_free(nvin); 641 goto close; 642 } 643 res->hr_version = version; 644 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 645 token = nv_get_uint8_array(nvin, &size, "token"); 646 if (token == NULL) { 647 pjdlog_warning("Handshake header from %s has no 'token' field.", 648 res->hr_remoteaddr); 649 nv_free(nvin); 650 goto close; 651 } 652 if (size != sizeof(res->hr_token)) { 653 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 654 res->hr_remoteaddr, size, sizeof(res->hr_token)); 655 nv_free(nvin); 656 goto close; 657 } 658 bcopy(token, res->hr_token, sizeof(res->hr_token)); 659 nv_free(nvin); 660 661 /* 662 * Second handshake step. 663 * Setup incoming connection with remote node. 664 */ 665 if (primary_connect(res, &in) == -1) 666 goto close; 667 668 nvout = nv_alloc(); 669 nv_add_string(nvout, res->hr_name, "resource"); 670 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 671 "token"); 672 if (res->hr_resuid == 0) { 673 /* 674 * The resuid field was not yet initialized. 675 * Because we do synchronization inside init_resuid(), it is 676 * possible that someone already initialized it, the function 677 * will return false then, but if we successfully initialized 678 * it, we will get true. True means that there were no writes 679 * to this resource yet and we want to inform secondary that 680 * synchronization is not needed by sending "virgin" argument. 681 */ 682 if (init_resuid(res)) 683 nv_add_int8(nvout, 1, "virgin"); 684 } 685 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 686 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 687 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 688 if (nv_error(nvout) != 0) { 689 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 690 "Unable to allocate header for connection with %s", 691 res->hr_remoteaddr); 692 nv_free(nvout); 693 goto close; 694 } 695 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 696 pjdlog_errno(LOG_WARNING, 697 "Unable to send handshake header to %s", 698 res->hr_remoteaddr); 699 nv_free(nvout); 700 goto close; 701 } 702 nv_free(nvout); 703 if (hast_proto_recv_hdr(out, &nvin) == -1) { 704 pjdlog_errno(LOG_WARNING, 705 "Unable to receive handshake header from %s", 706 res->hr_remoteaddr); 707 goto close; 708 } 709 errmsg = nv_get_string(nvin, "errmsg"); 710 if (errmsg != NULL) { 711 pjdlog_warning("%s", errmsg); 712 nv_free(nvin); 713 goto close; 714 } 715 datasize = nv_get_int64(nvin, "datasize"); 716 if (datasize != res->hr_datasize) { 717 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 718 (intmax_t)res->hr_datasize, (intmax_t)datasize); 719 nv_free(nvin); 720 goto close; 721 } 722 extentsize = nv_get_int32(nvin, "extentsize"); 723 if (extentsize != res->hr_extentsize) { 724 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 725 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 726 nv_free(nvin); 727 goto close; 728 } 729 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 730 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 731 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 732 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 733 enable_direct_reads(res); 734 if (nv_exists(nvin, "virgin")) { 735 /* 736 * Secondary was reinitialized, bump localcnt if it is 0 as 737 * only we have the data. 738 */ 739 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 740 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 741 742 if (res->hr_primary_localcnt == 0) { 743 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 744 745 mtx_lock(&metadata_lock); 746 res->hr_primary_localcnt++; 747 pjdlog_debug(1, "Increasing localcnt to %ju.", 748 (uintmax_t)res->hr_primary_localcnt); 749 (void)metadata_write(res); 750 mtx_unlock(&metadata_lock); 751 } 752 } 753 map = NULL; 754 mapsize = nv_get_uint32(nvin, "mapsize"); 755 if (mapsize > 0) { 756 map = malloc(mapsize); 757 if (map == NULL) { 758 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 759 (uintmax_t)mapsize); 760 nv_free(nvin); 761 goto close; 762 } 763 /* 764 * Remote node have some dirty extents on its own, lets 765 * download its activemap. 766 */ 767 if (hast_proto_recv_data(res, out, nvin, map, 768 mapsize) == -1) { 769 pjdlog_errno(LOG_ERR, 770 "Unable to receive remote activemap"); 771 nv_free(nvin); 772 free(map); 773 goto close; 774 } 775 /* 776 * Merge local and remote bitmaps. 777 */ 778 activemap_merge(res->hr_amp, map, mapsize); 779 free(map); 780 /* 781 * Now that we merged bitmaps from both nodes, flush it to the 782 * disk before we start to synchronize. 783 */ 784 (void)hast_activemap_flush(res); 785 } 786 nv_free(nvin); 787#ifdef notyet 788 /* Setup directions. */ 789 if (proto_send(out, NULL, 0) == -1) 790 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 791 if (proto_recv(in, NULL, 0) == -1) 792 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 793#endif 794 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 795 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 796 res->hr_version < 2) { 797 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 798 res->hr_replication = HAST_REPLICATION_FULLSYNC; 799 } else if (res->hr_replication != res->hr_original_replication) { 800 /* 801 * This is in case hastd disconnected and was upgraded. 802 */ 803 res->hr_replication = res->hr_original_replication; 804 } 805 if (inp != NULL && outp != NULL) { 806 *inp = in; 807 *outp = out; 808 } else { 809 res->hr_remotein = in; 810 res->hr_remoteout = out; 811 } 812 event_send(res, EVENT_CONNECT); 813 return (0); 814close: 815 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 816 event_send(res, EVENT_SPLITBRAIN); 817 proto_close(out); 818 if (in != NULL) 819 proto_close(in); 820 return (error); 821} 822 823static void 824sync_start(void) 825{ 826 827 mtx_lock(&sync_lock); 828 sync_inprogress = true; 829 mtx_unlock(&sync_lock); 830 cv_signal(&sync_cond); 831} 832 833static void 834sync_stop(void) 835{ 836 837 mtx_lock(&sync_lock); 838 if (sync_inprogress) 839 sync_inprogress = false; 840 mtx_unlock(&sync_lock); 841} 842 843static void 844init_ggate(struct hast_resource *res) 845{ 846 struct g_gate_ctl_create ggiocreate; 847 struct g_gate_ctl_cancel ggiocancel; 848 849 /* 850 * We communicate with ggate via /dev/ggctl. Open it. 851 */ 852 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 853 if (res->hr_ggatefd == -1) 854 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 855 /* 856 * Create provider before trying to connect, as connection failure 857 * is not critical, but may take some time. 858 */ 859 bzero(&ggiocreate, sizeof(ggiocreate)); 860 ggiocreate.gctl_version = G_GATE_VERSION; 861 ggiocreate.gctl_mediasize = res->hr_datasize; 862 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 863 ggiocreate.gctl_flags = 0; 864 ggiocreate.gctl_maxcount = 0; 865 ggiocreate.gctl_timeout = 0; 866 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 867 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 868 res->hr_provname); 869 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 870 pjdlog_info("Device hast/%s created.", res->hr_provname); 871 res->hr_ggateunit = ggiocreate.gctl_unit; 872 return; 873 } 874 if (errno != EEXIST) { 875 primary_exit(EX_OSERR, "Unable to create hast/%s device", 876 res->hr_provname); 877 } 878 pjdlog_debug(1, 879 "Device hast/%s already exists, we will try to take it over.", 880 res->hr_provname); 881 /* 882 * If we received EEXIST, we assume that the process who created the 883 * provider died and didn't clean up. In that case we will start from 884 * where he left of. 885 */ 886 bzero(&ggiocancel, sizeof(ggiocancel)); 887 ggiocancel.gctl_version = G_GATE_VERSION; 888 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 889 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 890 res->hr_provname); 891 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 892 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 893 res->hr_ggateunit = ggiocancel.gctl_unit; 894 return; 895 } 896 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 897 res->hr_provname); 898} 899 900void 901hastd_primary(struct hast_resource *res) 902{ 903 pthread_t td; 904 pid_t pid; 905 int error, mode, debuglevel; 906 907 /* 908 * Create communication channel for sending control commands from 909 * parent to child. 910 */ 911 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 912 /* TODO: There's no need for this to be fatal error. */ 913 KEEP_ERRNO((void)pidfile_remove(pfh)); 914 pjdlog_exit(EX_OSERR, 915 "Unable to create control sockets between parent and child"); 916 } 917 /* 918 * Create communication channel for sending events from child to parent. 919 */ 920 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 921 /* TODO: There's no need for this to be fatal error. */ 922 KEEP_ERRNO((void)pidfile_remove(pfh)); 923 pjdlog_exit(EX_OSERR, 924 "Unable to create event sockets between child and parent"); 925 } 926 /* 927 * Create communication channel for sending connection requests from 928 * child to parent. 929 */ 930 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 931 /* TODO: There's no need for this to be fatal error. */ 932 KEEP_ERRNO((void)pidfile_remove(pfh)); 933 pjdlog_exit(EX_OSERR, 934 "Unable to create connection sockets between child and parent"); 935 } 936 937 pid = fork(); 938 if (pid == -1) { 939 /* TODO: There's no need for this to be fatal error. */ 940 KEEP_ERRNO((void)pidfile_remove(pfh)); 941 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 942 } 943 944 if (pid > 0) { 945 /* This is parent. */ 946 /* Declare that we are receiver. */ 947 proto_recv(res->hr_event, NULL, 0); 948 proto_recv(res->hr_conn, NULL, 0); 949 /* Declare that we are sender. */ 950 proto_send(res->hr_ctrl, NULL, 0); 951 res->hr_workerpid = pid; 952 return; 953 } 954 955 gres = res; 956 mode = pjdlog_mode_get(); 957 debuglevel = pjdlog_debug_get(); 958 959 /* Declare that we are sender. */ 960 proto_send(res->hr_event, NULL, 0); 961 proto_send(res->hr_conn, NULL, 0); 962 /* Declare that we are receiver. */ 963 proto_recv(res->hr_ctrl, NULL, 0); 964 descriptors_cleanup(res); 965 966 descriptors_assert(res, mode); 967 968 pjdlog_init(mode); 969 pjdlog_debug_set(debuglevel); 970 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 971 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 972 973 init_local(res); 974 init_ggate(res); 975 init_environment(res); 976 977 if (drop_privs(res) != 0) { 978 cleanup(res); 979 exit(EX_CONFIG); 980 } 981 pjdlog_info("Privileges successfully dropped."); 982 983 /* 984 * Create the guard thread first, so we can handle signals from the 985 * very beginning. 986 */ 987 error = pthread_create(&td, NULL, guard_thread, res); 988 PJDLOG_ASSERT(error == 0); 989 /* 990 * Create the control thread before sending any event to the parent, 991 * as we can deadlock when parent sends control request to worker, 992 * but worker has no control thread started yet, so parent waits. 993 * In the meantime worker sends an event to the parent, but parent 994 * is unable to handle the event, because it waits for control 995 * request response. 996 */ 997 error = pthread_create(&td, NULL, ctrl_thread, res); 998 PJDLOG_ASSERT(error == 0); 999 if (real_remote(res)) { 1000 error = init_remote(res, NULL, NULL); 1001 if (error == 0) { 1002 sync_start(); 1003 } else if (error == EBUSY) { 1004 time_t start = time(NULL); 1005 1006 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1007 role2str(HAST_ROLE_SECONDARY), 1008 res->hr_timeout); 1009 for (;;) { 1010 sleep(1); 1011 error = init_remote(res, NULL, NULL); 1012 if (error != EBUSY) 1013 break; 1014 if (time(NULL) > start + res->hr_timeout) 1015 break; 1016 } 1017 if (error == EBUSY) { 1018 pjdlog_warning("Remote node is still %s, starting anyway.", 1019 role2str(HAST_ROLE_PRIMARY)); 1020 } 1021 } 1022 } 1023 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1024 PJDLOG_ASSERT(error == 0); 1025 error = pthread_create(&td, NULL, local_send_thread, res); 1026 PJDLOG_ASSERT(error == 0); 1027 error = pthread_create(&td, NULL, remote_send_thread, res); 1028 PJDLOG_ASSERT(error == 0); 1029 error = pthread_create(&td, NULL, remote_recv_thread, res); 1030 PJDLOG_ASSERT(error == 0); 1031 error = pthread_create(&td, NULL, ggate_send_thread, res); 1032 PJDLOG_ASSERT(error == 0); 1033 fullystarted = true; 1034 (void)sync_thread(res); 1035} 1036 1037static void 1038reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1039 const char *fmt, ...) 1040{ 1041 char msg[1024]; 1042 va_list ap; 1043 1044 va_start(ap, fmt); 1045 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1046 va_end(ap); 1047 switch (ggio->gctl_cmd) { 1048 case BIO_READ: 1049 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1050 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1051 break; 1052 case BIO_DELETE: 1053 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1054 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1055 break; 1056 case BIO_FLUSH: 1057 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1058 break; 1059 case BIO_WRITE: 1060 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1061 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1062 break; 1063 default: 1064 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1065 (unsigned int)ggio->gctl_cmd); 1066 break; 1067 } 1068 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1069} 1070 1071static void 1072remote_close(struct hast_resource *res, int ncomp) 1073{ 1074 1075 rw_wlock(&hio_remote_lock[ncomp]); 1076 /* 1077 * Check for a race between dropping rlock and acquiring wlock - 1078 * another thread can close connection in-between. 1079 */ 1080 if (!ISCONNECTED(res, ncomp)) { 1081 PJDLOG_ASSERT(res->hr_remotein == NULL); 1082 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1083 rw_unlock(&hio_remote_lock[ncomp]); 1084 return; 1085 } 1086 1087 PJDLOG_ASSERT(res->hr_remotein != NULL); 1088 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1089 1090 pjdlog_debug(2, "Closing incoming connection to %s.", 1091 res->hr_remoteaddr); 1092 proto_close(res->hr_remotein); 1093 res->hr_remotein = NULL; 1094 pjdlog_debug(2, "Closing outgoing connection to %s.", 1095 res->hr_remoteaddr); 1096 proto_close(res->hr_remoteout); 1097 res->hr_remoteout = NULL; 1098 1099 rw_unlock(&hio_remote_lock[ncomp]); 1100 1101 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1102 1103 /* 1104 * Stop synchronization if in-progress. 1105 */ 1106 sync_stop(); 1107 1108 event_send(res, EVENT_DISCONNECT); 1109} 1110 1111/* 1112 * Acknowledge write completion to the kernel, but don't update activemap yet. 1113 */ 1114static void 1115write_complete(struct hast_resource *res, struct hio *hio) 1116{ 1117 struct g_gate_ctl_io *ggio; 1118 unsigned int ncomp; 1119 1120 PJDLOG_ASSERT(!hio->hio_done); 1121 1122 ggio = &hio->hio_ggio; 1123 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1124 1125 /* 1126 * Bump local count if this is first write after 1127 * connection failure with remote node. 1128 */ 1129 ncomp = 1; 1130 rw_rlock(&hio_remote_lock[ncomp]); 1131 if (!ISCONNECTED(res, ncomp)) { 1132 mtx_lock(&metadata_lock); 1133 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1134 res->hr_primary_localcnt++; 1135 pjdlog_debug(1, "Increasing localcnt to %ju.", 1136 (uintmax_t)res->hr_primary_localcnt); 1137 (void)metadata_write(res); 1138 } 1139 mtx_unlock(&metadata_lock); 1140 } 1141 rw_unlock(&hio_remote_lock[ncomp]); 1142 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1143 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1144 hio->hio_done = true; 1145} 1146 1147/* 1148 * Thread receives ggate I/O requests from the kernel and passes them to 1149 * appropriate threads: 1150 * WRITE - always goes to both local_send and remote_send threads 1151 * READ (when the block is up-to-date on local component) - 1152 * only local_send thread 1153 * READ (when the block isn't up-to-date on local component) - 1154 * only remote_send thread 1155 * DELETE - always goes to both local_send and remote_send threads 1156 * FLUSH - always goes to both local_send and remote_send threads 1157 */ 1158static void * 1159ggate_recv_thread(void *arg) 1160{ 1161 struct hast_resource *res = arg; 1162 struct g_gate_ctl_io *ggio; 1163 struct hio *hio; 1164 unsigned int ii, ncomp, ncomps; 1165 int error; 1166 1167 for (;;) { 1168 pjdlog_debug(2, "ggate_recv: Taking free request."); 1169 QUEUE_TAKE2(hio, free); 1170 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1171 ggio = &hio->hio_ggio; 1172 ggio->gctl_unit = res->hr_ggateunit; 1173 ggio->gctl_length = MAXPHYS; 1174 ggio->gctl_error = 0; 1175 hio->hio_done = false; 1176 hio->hio_replication = res->hr_replication; 1177 pjdlog_debug(2, 1178 "ggate_recv: (%p) Waiting for request from the kernel.", 1179 hio); 1180 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1181 if (sigexit_received) 1182 pthread_exit(NULL); 1183 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1184 } 1185 error = ggio->gctl_error; 1186 switch (error) { 1187 case 0: 1188 break; 1189 case ECANCELED: 1190 /* Exit gracefully. */ 1191 if (!sigexit_received) { 1192 pjdlog_debug(2, 1193 "ggate_recv: (%p) Received cancel from the kernel.", 1194 hio); 1195 pjdlog_info("Received cancel from the kernel, exiting."); 1196 } 1197 pthread_exit(NULL); 1198 case ENOMEM: 1199 /* 1200 * Buffer too small? Impossible, we allocate MAXPHYS 1201 * bytes - request can't be bigger than that. 1202 */ 1203 /* FALLTHROUGH */ 1204 case ENXIO: 1205 default: 1206 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1207 strerror(error)); 1208 } 1209 1210 ncomp = 0; 1211 ncomps = HAST_NCOMPONENTS; 1212 1213 for (ii = 0; ii < ncomps; ii++) 1214 hio->hio_errors[ii] = EINVAL; 1215 reqlog(LOG_DEBUG, 2, ggio, 1216 "ggate_recv: (%p) Request received from the kernel: ", 1217 hio); 1218 1219 /* 1220 * Inform all components about new write request. 1221 * For read request prefer local component unless the given 1222 * range is out-of-date, then use remote component. 1223 */ 1224 switch (ggio->gctl_cmd) { 1225 case BIO_READ: 1226 res->hr_stat_read++; 1227 ncomps = 1; 1228 mtx_lock(&metadata_lock); 1229 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1230 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1231 /* 1232 * This range is up-to-date on local component, 1233 * so handle request locally. 1234 */ 1235 /* Local component is 0 for now. */ 1236 ncomp = 0; 1237 } else /* if (res->hr_syncsrc == 1238 HAST_SYNCSRC_SECONDARY) */ { 1239 PJDLOG_ASSERT(res->hr_syncsrc == 1240 HAST_SYNCSRC_SECONDARY); 1241 /* 1242 * This range is out-of-date on local component, 1243 * so send request to the remote node. 1244 */ 1245 /* Remote component is 1 for now. */ 1246 ncomp = 1; 1247 } 1248 mtx_unlock(&metadata_lock); 1249 break; 1250 case BIO_WRITE: 1251 res->hr_stat_write++; 1252 if (res->hr_resuid == 0 && 1253 res->hr_primary_localcnt == 0) { 1254 /* This is first write. */ 1255 res->hr_primary_localcnt = 1; 1256 } 1257 for (;;) { 1258 mtx_lock(&range_lock); 1259 if (rangelock_islocked(range_sync, 1260 ggio->gctl_offset, ggio->gctl_length)) { 1261 pjdlog_debug(2, 1262 "regular: Range offset=%jd length=%zu locked.", 1263 (intmax_t)ggio->gctl_offset, 1264 (size_t)ggio->gctl_length); 1265 range_regular_wait = true; 1266 cv_wait(&range_regular_cond, &range_lock); 1267 range_regular_wait = false; 1268 mtx_unlock(&range_lock); 1269 continue; 1270 } 1271 if (rangelock_add(range_regular, 1272 ggio->gctl_offset, ggio->gctl_length) == -1) { 1273 mtx_unlock(&range_lock); 1274 pjdlog_debug(2, 1275 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1276 (intmax_t)ggio->gctl_offset, 1277 (size_t)ggio->gctl_length); 1278 sleep(1); 1279 continue; 1280 } 1281 mtx_unlock(&range_lock); 1282 break; 1283 } 1284 mtx_lock(&res->hr_amp_lock); 1285 if (activemap_write_start(res->hr_amp, 1286 ggio->gctl_offset, ggio->gctl_length)) { 1287 res->hr_stat_activemap_update++; 1288 (void)hast_activemap_flush(res); 1289 } 1290 mtx_unlock(&res->hr_amp_lock); 1291 break; 1292 case BIO_DELETE: 1293 res->hr_stat_delete++; 1294 break; 1295 case BIO_FLUSH: 1296 res->hr_stat_flush++; 1297 break; 1298 } 1299 pjdlog_debug(2, 1300 "ggate_recv: (%p) Moving request to the send queues.", hio); 1301 hio->hio_countdown = ncomps; 1302 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1303 ggio->gctl_cmd == BIO_WRITE) { 1304 /* Each remote request needs two responses in memsync. */ 1305 hio->hio_countdown++; 1306 } 1307 for (ii = ncomp; ii < ncomps; ii++) 1308 QUEUE_INSERT1(hio, send, ii); 1309 } 1310 /* NOTREACHED */ 1311 return (NULL); 1312} 1313 1314/* 1315 * Thread reads from or writes to local component. 1316 * If local read fails, it redirects it to remote_send thread. 1317 */ 1318static void * 1319local_send_thread(void *arg) 1320{ 1321 struct hast_resource *res = arg; 1322 struct g_gate_ctl_io *ggio; 1323 struct hio *hio; 1324 unsigned int ncomp, rncomp; 1325 ssize_t ret; 1326 1327 /* Local component is 0 for now. */ 1328 ncomp = 0; 1329 /* Remote component is 1 for now. */ 1330 rncomp = 1; 1331 1332 for (;;) { 1333 pjdlog_debug(2, "local_send: Taking request."); 1334 QUEUE_TAKE1(hio, send, ncomp, 0); 1335 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1336 ggio = &hio->hio_ggio; 1337 switch (ggio->gctl_cmd) { 1338 case BIO_READ: 1339 ret = pread(res->hr_localfd, ggio->gctl_data, 1340 ggio->gctl_length, 1341 ggio->gctl_offset + res->hr_localoff); 1342 if (ret == ggio->gctl_length) 1343 hio->hio_errors[ncomp] = 0; 1344 else if (!ISSYNCREQ(hio)) { 1345 /* 1346 * If READ failed, try to read from remote node. 1347 */ 1348 if (ret == -1) { 1349 reqlog(LOG_WARNING, 0, ggio, 1350 "Local request failed (%s), trying remote node. ", 1351 strerror(errno)); 1352 } else if (ret != ggio->gctl_length) { 1353 reqlog(LOG_WARNING, 0, ggio, 1354 "Local request failed (%zd != %jd), trying remote node. ", 1355 ret, (intmax_t)ggio->gctl_length); 1356 } 1357 QUEUE_INSERT1(hio, send, rncomp); 1358 continue; 1359 } 1360 break; 1361 case BIO_WRITE: 1362 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1363 ggio->gctl_length, 1364 ggio->gctl_offset + res->hr_localoff); 1365 if (ret == -1) { 1366 hio->hio_errors[ncomp] = errno; 1367 reqlog(LOG_WARNING, 0, ggio, 1368 "Local request failed (%s): ", 1369 strerror(errno)); 1370 } else if (ret != ggio->gctl_length) { 1371 hio->hio_errors[ncomp] = EIO; 1372 reqlog(LOG_WARNING, 0, ggio, 1373 "Local request failed (%zd != %jd): ", 1374 ret, (intmax_t)ggio->gctl_length); 1375 } else { 1376 hio->hio_errors[ncomp] = 0; 1377 if (hio->hio_replication == 1378 HAST_REPLICATION_ASYNC) { 1379 ggio->gctl_error = 0; 1380 write_complete(res, hio); 1381 } 1382 } 1383 break; 1384 case BIO_DELETE: 1385 ret = g_delete(res->hr_localfd, 1386 ggio->gctl_offset + res->hr_localoff, 1387 ggio->gctl_length); 1388 if (ret == -1) { 1389 hio->hio_errors[ncomp] = errno; 1390 reqlog(LOG_WARNING, 0, ggio, 1391 "Local request failed (%s): ", 1392 strerror(errno)); 1393 } else { 1394 hio->hio_errors[ncomp] = 0; 1395 } 1396 break; 1397 case BIO_FLUSH: 1398 if (!res->hr_localflush) { 1399 ret = -1; 1400 errno = EOPNOTSUPP; 1401 break; 1402 } 1403 ret = g_flush(res->hr_localfd); 1404 if (ret == -1) { 1405 if (errno == EOPNOTSUPP) 1406 res->hr_localflush = false; 1407 hio->hio_errors[ncomp] = errno; 1408 reqlog(LOG_WARNING, 0, ggio, 1409 "Local request failed (%s): ", 1410 strerror(errno)); 1411 } else { 1412 hio->hio_errors[ncomp] = 0; 1413 } 1414 break; 1415 } 1416 1417 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1418 ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1419 if (refcnt_release(&hio->hio_countdown) > 0) 1420 continue; 1421 } else { 1422 /* 1423 * Depending on hio_countdown value, requests finished 1424 * in the following order: 1425 * 0: remote memsync, remote final, local write 1426 * 1: remote memsync, local write, (remote final) 1427 * 2: local write, (remote memsync), (remote final) 1428 */ 1429 switch (refcnt_release(&hio->hio_countdown)) { 1430 case 0: 1431 /* 1432 * Local write finished as last. 1433 */ 1434 break; 1435 case 1: 1436 /* 1437 * Local write finished after remote memsync 1438 * reply arrvied. We can complete the write now. 1439 */ 1440 if (hio->hio_errors[0] == 0) 1441 write_complete(res, hio); 1442 continue; 1443 case 2: 1444 /* 1445 * Local write finished as first. 1446 */ 1447 continue; 1448 default: 1449 PJDLOG_ABORT("Invalid hio_countdown."); 1450 } 1451 } 1452 if (ISSYNCREQ(hio)) { 1453 mtx_lock(&sync_lock); 1454 SYNCREQDONE(hio); 1455 mtx_unlock(&sync_lock); 1456 cv_signal(&sync_cond); 1457 } else { 1458 pjdlog_debug(2, 1459 "local_send: (%p) Moving request to the done queue.", 1460 hio); 1461 QUEUE_INSERT2(hio, done); 1462 } 1463 } 1464 /* NOTREACHED */ 1465 return (NULL); 1466} 1467 1468static void 1469keepalive_send(struct hast_resource *res, unsigned int ncomp) 1470{ 1471 struct nv *nv; 1472 1473 rw_rlock(&hio_remote_lock[ncomp]); 1474 1475 if (!ISCONNECTED(res, ncomp)) { 1476 rw_unlock(&hio_remote_lock[ncomp]); 1477 return; 1478 } 1479 1480 PJDLOG_ASSERT(res->hr_remotein != NULL); 1481 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1482 1483 nv = nv_alloc(); 1484 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1485 if (nv_error(nv) != 0) { 1486 rw_unlock(&hio_remote_lock[ncomp]); 1487 nv_free(nv); 1488 pjdlog_debug(1, 1489 "keepalive_send: Unable to prepare header to send."); 1490 return; 1491 } 1492 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1493 rw_unlock(&hio_remote_lock[ncomp]); 1494 pjdlog_common(LOG_DEBUG, 1, errno, 1495 "keepalive_send: Unable to send request"); 1496 nv_free(nv); 1497 remote_close(res, ncomp); 1498 return; 1499 } 1500 1501 rw_unlock(&hio_remote_lock[ncomp]); 1502 nv_free(nv); 1503 pjdlog_debug(2, "keepalive_send: Request sent."); 1504} 1505 1506/* 1507 * Thread sends request to secondary node. 1508 */ 1509static void * 1510remote_send_thread(void *arg) 1511{ 1512 struct hast_resource *res = arg; 1513 struct g_gate_ctl_io *ggio; 1514 time_t lastcheck, now; 1515 struct hio *hio; 1516 struct nv *nv; 1517 unsigned int ncomp; 1518 bool wakeup; 1519 uint64_t offset, length; 1520 uint8_t cmd; 1521 void *data; 1522 1523 /* Remote component is 1 for now. */ 1524 ncomp = 1; 1525 lastcheck = time(NULL); 1526 1527 for (;;) { 1528 pjdlog_debug(2, "remote_send: Taking request."); 1529 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1530 if (hio == NULL) { 1531 now = time(NULL); 1532 if (lastcheck + HAST_KEEPALIVE <= now) { 1533 keepalive_send(res, ncomp); 1534 lastcheck = now; 1535 } 1536 continue; 1537 } 1538 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1539 ggio = &hio->hio_ggio; 1540 switch (ggio->gctl_cmd) { 1541 case BIO_READ: 1542 cmd = HIO_READ; 1543 data = NULL; 1544 offset = ggio->gctl_offset; 1545 length = ggio->gctl_length; 1546 break; 1547 case BIO_WRITE: 1548 cmd = HIO_WRITE; 1549 data = ggio->gctl_data; 1550 offset = ggio->gctl_offset; 1551 length = ggio->gctl_length; 1552 break; 1553 case BIO_DELETE: 1554 cmd = HIO_DELETE; 1555 data = NULL; 1556 offset = ggio->gctl_offset; 1557 length = ggio->gctl_length; 1558 break; 1559 case BIO_FLUSH: 1560 cmd = HIO_FLUSH; 1561 data = NULL; 1562 offset = 0; 1563 length = 0; 1564 break; 1565 default: 1566 PJDLOG_ABORT("invalid condition"); 1567 } 1568 nv = nv_alloc(); 1569 nv_add_uint8(nv, cmd, "cmd"); 1570 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1571 nv_add_uint64(nv, offset, "offset"); 1572 nv_add_uint64(nv, length, "length"); 1573 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && 1574 ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { 1575 nv_add_uint8(nv, 1, "memsync"); 1576 } 1577 if (nv_error(nv) != 0) { 1578 hio->hio_errors[ncomp] = nv_error(nv); 1579 pjdlog_debug(2, 1580 "remote_send: (%p) Unable to prepare header to send.", 1581 hio); 1582 reqlog(LOG_ERR, 0, ggio, 1583 "Unable to prepare header to send (%s): ", 1584 strerror(nv_error(nv))); 1585 /* Move failed request immediately to the done queue. */ 1586 goto done_queue; 1587 } 1588 /* 1589 * Protect connection from disappearing. 1590 */ 1591 rw_rlock(&hio_remote_lock[ncomp]); 1592 if (!ISCONNECTED(res, ncomp)) { 1593 rw_unlock(&hio_remote_lock[ncomp]); 1594 hio->hio_errors[ncomp] = ENOTCONN; 1595 goto done_queue; 1596 } 1597 /* 1598 * Move the request to recv queue before sending it, because 1599 * in different order we can get reply before we move request 1600 * to recv queue. 1601 */ 1602 pjdlog_debug(2, 1603 "remote_send: (%p) Moving request to the recv queue.", 1604 hio); 1605 mtx_lock(&hio_recv_list_lock[ncomp]); 1606 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1607 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1608 mtx_unlock(&hio_recv_list_lock[ncomp]); 1609 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1610 data != NULL ? length : 0) == -1) { 1611 hio->hio_errors[ncomp] = errno; 1612 rw_unlock(&hio_remote_lock[ncomp]); 1613 pjdlog_debug(2, 1614 "remote_send: (%p) Unable to send request.", hio); 1615 reqlog(LOG_ERR, 0, ggio, 1616 "Unable to send request (%s): ", 1617 strerror(hio->hio_errors[ncomp])); 1618 remote_close(res, ncomp); 1619 /* 1620 * Take request back from the receive queue and move 1621 * it immediately to the done queue. 1622 */ 1623 mtx_lock(&hio_recv_list_lock[ncomp]); 1624 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1625 hio_next[ncomp]); 1626 mtx_unlock(&hio_recv_list_lock[ncomp]); 1627 goto done_queue; 1628 } 1629 rw_unlock(&hio_remote_lock[ncomp]); 1630 nv_free(nv); 1631 if (wakeup) 1632 cv_signal(&hio_recv_list_cond[ncomp]); 1633 continue; 1634done_queue: 1635 nv_free(nv); 1636 if (ISSYNCREQ(hio)) { 1637 if (refcnt_release(&hio->hio_countdown) > 0) 1638 continue; 1639 mtx_lock(&sync_lock); 1640 SYNCREQDONE(hio); 1641 mtx_unlock(&sync_lock); 1642 cv_signal(&sync_cond); 1643 continue; 1644 } 1645 if (ggio->gctl_cmd == BIO_WRITE) { 1646 mtx_lock(&res->hr_amp_lock); 1647 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1648 ggio->gctl_length)) { 1649 (void)hast_activemap_flush(res); 1650 } 1651 mtx_unlock(&res->hr_amp_lock); 1652 if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) 1653 (void)refcnt_release(&hio->hio_countdown); 1654 } 1655 if (refcnt_release(&hio->hio_countdown) > 0) 1656 continue; 1657 pjdlog_debug(2, 1658 "remote_send: (%p) Moving request to the done queue.", 1659 hio); 1660 QUEUE_INSERT2(hio, done); 1661 } 1662 /* NOTREACHED */ 1663 return (NULL); 1664} 1665 1666/* 1667 * Thread receives answer from secondary node and passes it to ggate_send 1668 * thread. 1669 */ 1670static void * 1671remote_recv_thread(void *arg) 1672{ 1673 struct hast_resource *res = arg; 1674 struct g_gate_ctl_io *ggio; 1675 struct hio *hio; 1676 struct nv *nv; 1677 unsigned int ncomp; 1678 uint64_t seq; 1679 bool memsyncack; 1680 int error; 1681 1682 /* Remote component is 1 for now. */ 1683 ncomp = 1; 1684 1685 for (;;) { 1686 /* Wait until there is anything to receive. */ 1687 mtx_lock(&hio_recv_list_lock[ncomp]); 1688 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1689 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1690 cv_wait(&hio_recv_list_cond[ncomp], 1691 &hio_recv_list_lock[ncomp]); 1692 } 1693 mtx_unlock(&hio_recv_list_lock[ncomp]); 1694 1695 memsyncack = false; 1696 1697 rw_rlock(&hio_remote_lock[ncomp]); 1698 if (!ISCONNECTED(res, ncomp)) { 1699 rw_unlock(&hio_remote_lock[ncomp]); 1700 /* 1701 * Connection is dead, so move all pending requests to 1702 * the done queue (one-by-one). 1703 */ 1704 mtx_lock(&hio_recv_list_lock[ncomp]); 1705 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1706 PJDLOG_ASSERT(hio != NULL); 1707 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1708 hio_next[ncomp]); 1709 mtx_unlock(&hio_recv_list_lock[ncomp]); 1710 goto done_queue; 1711 } 1712 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1713 pjdlog_errno(LOG_ERR, 1714 "Unable to receive reply header"); 1715 rw_unlock(&hio_remote_lock[ncomp]); 1716 remote_close(res, ncomp); 1717 continue; 1718 } 1719 rw_unlock(&hio_remote_lock[ncomp]); 1720 seq = nv_get_uint64(nv, "seq"); 1721 if (seq == 0) { 1722 pjdlog_error("Header contains no 'seq' field."); 1723 nv_free(nv); 1724 continue; 1725 } 1726 memsyncack = nv_exists(nv, "received"); 1727 mtx_lock(&hio_recv_list_lock[ncomp]); 1728 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1729 if (hio->hio_ggio.gctl_seq == seq) { 1730 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1731 hio_next[ncomp]); 1732 break; 1733 } 1734 } 1735 mtx_unlock(&hio_recv_list_lock[ncomp]); 1736 if (hio == NULL) { 1737 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1738 (uintmax_t)seq); 1739 nv_free(nv); 1740 continue; 1741 } 1742 ggio = &hio->hio_ggio; 1743 error = nv_get_int16(nv, "error"); 1744 if (error != 0) { 1745 /* Request failed on remote side. */ 1746 hio->hio_errors[ncomp] = error; 1747 reqlog(LOG_WARNING, 0, ggio, 1748 "Remote request failed (%s): ", strerror(error)); 1749 nv_free(nv); 1750 goto done_queue; 1751 } 1752 switch (ggio->gctl_cmd) { 1753 case BIO_READ: 1754 rw_rlock(&hio_remote_lock[ncomp]); 1755 if (!ISCONNECTED(res, ncomp)) { 1756 rw_unlock(&hio_remote_lock[ncomp]); 1757 nv_free(nv); 1758 goto done_queue; 1759 } 1760 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1761 ggio->gctl_data, ggio->gctl_length) == -1) { 1762 hio->hio_errors[ncomp] = errno; 1763 pjdlog_errno(LOG_ERR, 1764 "Unable to receive reply data"); 1765 rw_unlock(&hio_remote_lock[ncomp]); 1766 nv_free(nv); 1767 remote_close(res, ncomp); 1768 goto done_queue; 1769 } 1770 rw_unlock(&hio_remote_lock[ncomp]); 1771 break; 1772 case BIO_WRITE: 1773 case BIO_DELETE: 1774 case BIO_FLUSH: 1775 break; 1776 default: 1777 PJDLOG_ABORT("invalid condition"); 1778 } 1779 hio->hio_errors[ncomp] = 0; 1780 nv_free(nv); 1781done_queue: 1782 if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || 1783 hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { 1784 if (refcnt_release(&hio->hio_countdown) > 0) 1785 continue; 1786 } else { 1787 /* 1788 * Depending on hio_countdown value, requests finished 1789 * in the following order: 1790 * 1791 * 0: local write, remote memsync, remote final 1792 * or 1793 * 0: remote memsync, local write, remote final 1794 * 1795 * 1: local write, remote memsync, (remote final) 1796 * or 1797 * 1: remote memsync, remote final, (local write) 1798 * 1799 * 2: remote memsync, (local write), (remote final) 1800 * or 1801 * 2: remote memsync, (remote final), (local write) 1802 */ 1803 switch (refcnt_release(&hio->hio_countdown)) { 1804 case 0: 1805 /* 1806 * Remote final reply arrived. 1807 */ 1808 PJDLOG_ASSERT(!memsyncack); 1809 break; 1810 case 1: 1811 if (memsyncack) { 1812 /* 1813 * Local request already finished, so we 1814 * can complete the write. 1815 */ 1816 if (hio->hio_errors[0] == 0) 1817 write_complete(res, hio); 1818 /* 1819 * We still need to wait for final 1820 * remote reply. 1821 */ 1822 pjdlog_debug(2, 1823 "remote_recv: (%p) Moving request back to the recv queue.", 1824 hio); 1825 mtx_lock(&hio_recv_list_lock[ncomp]); 1826 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1827 hio, hio_next[ncomp]); 1828 mtx_unlock(&hio_recv_list_lock[ncomp]); 1829 } else { 1830 /* 1831 * Remote final reply arrived before 1832 * local write finished. 1833 * Nothing to do in such case. 1834 */ 1835 } 1836 continue; 1837 case 2: 1838 /* 1839 * We received remote memsync reply even before 1840 * local write finished. 1841 */ 1842 PJDLOG_ASSERT(memsyncack); 1843 1844 pjdlog_debug(2, 1845 "remote_recv: (%p) Moving request back to the recv queue.", 1846 hio); 1847 mtx_lock(&hio_recv_list_lock[ncomp]); 1848 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, 1849 hio_next[ncomp]); 1850 mtx_unlock(&hio_recv_list_lock[ncomp]); 1851 continue; 1852 default: 1853 PJDLOG_ABORT("Invalid hio_countdown."); 1854 } 1855 } 1856 if (ISSYNCREQ(hio)) { 1857 mtx_lock(&sync_lock); 1858 SYNCREQDONE(hio); 1859 mtx_unlock(&sync_lock); 1860 cv_signal(&sync_cond); 1861 } else { 1862 pjdlog_debug(2, 1863 "remote_recv: (%p) Moving request to the done queue.", 1864 hio); 1865 QUEUE_INSERT2(hio, done); 1866 } 1867 } 1868 /* NOTREACHED */ 1869 return (NULL); 1870} 1871 1872/* 1873 * Thread sends answer to the kernel. 1874 */ 1875static void * 1876ggate_send_thread(void *arg) 1877{ 1878 struct hast_resource *res = arg; 1879 struct g_gate_ctl_io *ggio; 1880 struct hio *hio; 1881 unsigned int ii, ncomps; 1882 1883 ncomps = HAST_NCOMPONENTS; 1884 1885 for (;;) { 1886 pjdlog_debug(2, "ggate_send: Taking request."); 1887 QUEUE_TAKE2(hio, done); 1888 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1889 ggio = &hio->hio_ggio; 1890 for (ii = 0; ii < ncomps; ii++) { 1891 if (hio->hio_errors[ii] == 0) { 1892 /* 1893 * One successful request is enough to declare 1894 * success. 1895 */ 1896 ggio->gctl_error = 0; 1897 break; 1898 } 1899 } 1900 if (ii == ncomps) { 1901 /* 1902 * None of the requests were successful. 1903 * Use the error from local component except the 1904 * case when we did only remote request. 1905 */ 1906 if (ggio->gctl_cmd == BIO_READ && 1907 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1908 ggio->gctl_error = hio->hio_errors[1]; 1909 else 1910 ggio->gctl_error = hio->hio_errors[0]; 1911 } 1912 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1913 mtx_lock(&res->hr_amp_lock); 1914 if (activemap_write_complete(res->hr_amp, 1915 ggio->gctl_offset, ggio->gctl_length)) { 1916 res->hr_stat_activemap_update++; 1917 (void)hast_activemap_flush(res); 1918 } 1919 mtx_unlock(&res->hr_amp_lock); 1920 } 1921 if (ggio->gctl_cmd == BIO_WRITE) { 1922 /* 1923 * Unlock range we locked. 1924 */ 1925 mtx_lock(&range_lock); 1926 rangelock_del(range_regular, ggio->gctl_offset, 1927 ggio->gctl_length); 1928 if (range_sync_wait) 1929 cv_signal(&range_sync_cond); 1930 mtx_unlock(&range_lock); 1931 if (!hio->hio_done) 1932 write_complete(res, hio); 1933 } else { 1934 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1935 primary_exit(EX_OSERR, 1936 "G_GATE_CMD_DONE failed"); 1937 } 1938 } 1939 pjdlog_debug(2, 1940 "ggate_send: (%p) Moving request to the free queue.", hio); 1941 QUEUE_INSERT2(hio, free); 1942 } 1943 /* NOTREACHED */ 1944 return (NULL); 1945} 1946 1947/* 1948 * Thread synchronize local and remote components. 1949 */ 1950static void * 1951sync_thread(void *arg __unused) 1952{ 1953 struct hast_resource *res = arg; 1954 struct hio *hio; 1955 struct g_gate_ctl_io *ggio; 1956 struct timeval tstart, tend, tdiff; 1957 unsigned int ii, ncomp, ncomps; 1958 off_t offset, length, synced; 1959 bool dorewind, directreads; 1960 int syncext; 1961 1962 ncomps = HAST_NCOMPONENTS; 1963 dorewind = true; 1964 synced = 0; 1965 offset = -1; 1966 directreads = false; 1967 1968 for (;;) { 1969 mtx_lock(&sync_lock); 1970 if (offset >= 0 && !sync_inprogress) { 1971 gettimeofday(&tend, NULL); 1972 timersub(&tend, &tstart, &tdiff); 1973 pjdlog_info("Synchronization interrupted after %#.0T. " 1974 "%NB synchronized so far.", &tdiff, 1975 (intmax_t)synced); 1976 event_send(res, EVENT_SYNCINTR); 1977 } 1978 while (!sync_inprogress) { 1979 dorewind = true; 1980 synced = 0; 1981 cv_wait(&sync_cond, &sync_lock); 1982 } 1983 mtx_unlock(&sync_lock); 1984 /* 1985 * Obtain offset at which we should synchronize. 1986 * Rewind synchronization if needed. 1987 */ 1988 mtx_lock(&res->hr_amp_lock); 1989 if (dorewind) 1990 activemap_sync_rewind(res->hr_amp); 1991 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1992 if (syncext != -1) { 1993 /* 1994 * We synchronized entire syncext extent, we can mark 1995 * it as clean now. 1996 */ 1997 if (activemap_extent_complete(res->hr_amp, syncext)) 1998 (void)hast_activemap_flush(res); 1999 } 2000 mtx_unlock(&res->hr_amp_lock); 2001 if (dorewind) { 2002 dorewind = false; 2003 if (offset == -1) 2004 pjdlog_info("Nodes are in sync."); 2005 else { 2006 pjdlog_info("Synchronization started. %NB to go.", 2007 (intmax_t)(res->hr_extentsize * 2008 activemap_ndirty(res->hr_amp))); 2009 event_send(res, EVENT_SYNCSTART); 2010 gettimeofday(&tstart, NULL); 2011 } 2012 } 2013 if (offset == -1) { 2014 sync_stop(); 2015 pjdlog_debug(1, "Nothing to synchronize."); 2016 /* 2017 * Synchronization complete, make both localcnt and 2018 * remotecnt equal. 2019 */ 2020 ncomp = 1; 2021 rw_rlock(&hio_remote_lock[ncomp]); 2022 if (ISCONNECTED(res, ncomp)) { 2023 if (synced > 0) { 2024 int64_t bps; 2025 2026 gettimeofday(&tend, NULL); 2027 timersub(&tend, &tstart, &tdiff); 2028 bps = (int64_t)((double)synced / 2029 ((double)tdiff.tv_sec + 2030 (double)tdiff.tv_usec / 1000000)); 2031 pjdlog_info("Synchronization complete. " 2032 "%NB synchronized in %#.0lT (%NB/sec).", 2033 (intmax_t)synced, &tdiff, 2034 (intmax_t)bps); 2035 event_send(res, EVENT_SYNCDONE); 2036 } 2037 mtx_lock(&metadata_lock); 2038 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2039 directreads = true; 2040 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2041 res->hr_primary_localcnt = 2042 res->hr_secondary_remotecnt; 2043 res->hr_primary_remotecnt = 2044 res->hr_secondary_localcnt; 2045 pjdlog_debug(1, 2046 "Setting localcnt to %ju and remotecnt to %ju.", 2047 (uintmax_t)res->hr_primary_localcnt, 2048 (uintmax_t)res->hr_primary_remotecnt); 2049 (void)metadata_write(res); 2050 mtx_unlock(&metadata_lock); 2051 } 2052 rw_unlock(&hio_remote_lock[ncomp]); 2053 if (directreads) { 2054 directreads = false; 2055 enable_direct_reads(res); 2056 } 2057 continue; 2058 } 2059 pjdlog_debug(2, "sync: Taking free request."); 2060 QUEUE_TAKE2(hio, free); 2061 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2062 /* 2063 * Lock the range we are going to synchronize. We don't want 2064 * race where someone writes between our read and write. 2065 */ 2066 for (;;) { 2067 mtx_lock(&range_lock); 2068 if (rangelock_islocked(range_regular, offset, length)) { 2069 pjdlog_debug(2, 2070 "sync: Range offset=%jd length=%jd locked.", 2071 (intmax_t)offset, (intmax_t)length); 2072 range_sync_wait = true; 2073 cv_wait(&range_sync_cond, &range_lock); 2074 range_sync_wait = false; 2075 mtx_unlock(&range_lock); 2076 continue; 2077 } 2078 if (rangelock_add(range_sync, offset, length) == -1) { 2079 mtx_unlock(&range_lock); 2080 pjdlog_debug(2, 2081 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2082 (intmax_t)offset, (intmax_t)length); 2083 sleep(1); 2084 continue; 2085 } 2086 mtx_unlock(&range_lock); 2087 break; 2088 } 2089 /* 2090 * First read the data from synchronization source. 2091 */ 2092 SYNCREQ(hio); 2093 ggio = &hio->hio_ggio; 2094 ggio->gctl_cmd = BIO_READ; 2095 ggio->gctl_offset = offset; 2096 ggio->gctl_length = length; 2097 ggio->gctl_error = 0; 2098 hio->hio_done = false; 2099 hio->hio_replication = res->hr_replication; 2100 for (ii = 0; ii < ncomps; ii++) 2101 hio->hio_errors[ii] = EINVAL; 2102 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2103 hio); 2104 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2105 hio); 2106 mtx_lock(&metadata_lock); 2107 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2108 /* 2109 * This range is up-to-date on local component, 2110 * so handle request locally. 2111 */ 2112 /* Local component is 0 for now. */ 2113 ncomp = 0; 2114 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2115 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2116 /* 2117 * This range is out-of-date on local component, 2118 * so send request to the remote node. 2119 */ 2120 /* Remote component is 1 for now. */ 2121 ncomp = 1; 2122 } 2123 mtx_unlock(&metadata_lock); 2124 hio->hio_countdown = 1; 2125 QUEUE_INSERT1(hio, send, ncomp); 2126 2127 /* 2128 * Let's wait for READ to finish. 2129 */ 2130 mtx_lock(&sync_lock); 2131 while (!ISSYNCREQDONE(hio)) 2132 cv_wait(&sync_cond, &sync_lock); 2133 mtx_unlock(&sync_lock); 2134 2135 if (hio->hio_errors[ncomp] != 0) { 2136 pjdlog_error("Unable to read synchronization data: %s.", 2137 strerror(hio->hio_errors[ncomp])); 2138 goto free_queue; 2139 } 2140 2141 /* 2142 * We read the data from synchronization source, now write it 2143 * to synchronization target. 2144 */ 2145 SYNCREQ(hio); 2146 ggio->gctl_cmd = BIO_WRITE; 2147 for (ii = 0; ii < ncomps; ii++) 2148 hio->hio_errors[ii] = EINVAL; 2149 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2150 hio); 2151 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2152 hio); 2153 mtx_lock(&metadata_lock); 2154 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2155 /* 2156 * This range is up-to-date on local component, 2157 * so we update remote component. 2158 */ 2159 /* Remote component is 1 for now. */ 2160 ncomp = 1; 2161 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2162 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2163 /* 2164 * This range is out-of-date on local component, 2165 * so we update it. 2166 */ 2167 /* Local component is 0 for now. */ 2168 ncomp = 0; 2169 } 2170 mtx_unlock(&metadata_lock); 2171 2172 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2173 hio); 2174 hio->hio_countdown = 1; 2175 QUEUE_INSERT1(hio, send, ncomp); 2176 2177 /* 2178 * Let's wait for WRITE to finish. 2179 */ 2180 mtx_lock(&sync_lock); 2181 while (!ISSYNCREQDONE(hio)) 2182 cv_wait(&sync_cond, &sync_lock); 2183 mtx_unlock(&sync_lock); 2184 2185 if (hio->hio_errors[ncomp] != 0) { 2186 pjdlog_error("Unable to write synchronization data: %s.", 2187 strerror(hio->hio_errors[ncomp])); 2188 goto free_queue; 2189 } 2190 2191 synced += length; 2192free_queue: 2193 mtx_lock(&range_lock); 2194 rangelock_del(range_sync, offset, length); 2195 if (range_regular_wait) 2196 cv_signal(&range_regular_cond); 2197 mtx_unlock(&range_lock); 2198 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2199 hio); 2200 QUEUE_INSERT2(hio, free); 2201 } 2202 /* NOTREACHED */ 2203 return (NULL); 2204} 2205 2206void 2207primary_config_reload(struct hast_resource *res, struct nv *nv) 2208{ 2209 unsigned int ii, ncomps; 2210 int modified, vint; 2211 const char *vstr; 2212 2213 pjdlog_info("Reloading configuration..."); 2214 2215 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2216 PJDLOG_ASSERT(gres == res); 2217 nv_assert(nv, "remoteaddr"); 2218 nv_assert(nv, "sourceaddr"); 2219 nv_assert(nv, "replication"); 2220 nv_assert(nv, "checksum"); 2221 nv_assert(nv, "compression"); 2222 nv_assert(nv, "timeout"); 2223 nv_assert(nv, "exec"); 2224 nv_assert(nv, "metaflush"); 2225 2226 ncomps = HAST_NCOMPONENTS; 2227 2228#define MODIFIED_REMOTEADDR 0x01 2229#define MODIFIED_SOURCEADDR 0x02 2230#define MODIFIED_REPLICATION 0x04 2231#define MODIFIED_CHECKSUM 0x08 2232#define MODIFIED_COMPRESSION 0x10 2233#define MODIFIED_TIMEOUT 0x20 2234#define MODIFIED_EXEC 0x40 2235#define MODIFIED_METAFLUSH 0x80 2236 modified = 0; 2237 2238 vstr = nv_get_string(nv, "remoteaddr"); 2239 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2240 /* 2241 * Don't copy res->hr_remoteaddr to gres just yet. 2242 * We want remote_close() to log disconnect from the old 2243 * addresses, not from the new ones. 2244 */ 2245 modified |= MODIFIED_REMOTEADDR; 2246 } 2247 vstr = nv_get_string(nv, "sourceaddr"); 2248 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2249 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2250 modified |= MODIFIED_SOURCEADDR; 2251 } 2252 vint = nv_get_int32(nv, "replication"); 2253 if (gres->hr_replication != vint) { 2254 gres->hr_replication = vint; 2255 modified |= MODIFIED_REPLICATION; 2256 } 2257 vint = nv_get_int32(nv, "checksum"); 2258 if (gres->hr_checksum != vint) { 2259 gres->hr_checksum = vint; 2260 modified |= MODIFIED_CHECKSUM; 2261 } 2262 vint = nv_get_int32(nv, "compression"); 2263 if (gres->hr_compression != vint) { 2264 gres->hr_compression = vint; 2265 modified |= MODIFIED_COMPRESSION; 2266 } 2267 vint = nv_get_int32(nv, "timeout"); 2268 if (gres->hr_timeout != vint) { 2269 gres->hr_timeout = vint; 2270 modified |= MODIFIED_TIMEOUT; 2271 } 2272 vstr = nv_get_string(nv, "exec"); 2273 if (strcmp(gres->hr_exec, vstr) != 0) { 2274 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2275 modified |= MODIFIED_EXEC; 2276 } 2277 vint = nv_get_int32(nv, "metaflush"); 2278 if (gres->hr_metaflush != vint) { 2279 gres->hr_metaflush = vint; 2280 modified |= MODIFIED_METAFLUSH; 2281 } 2282 2283 /* 2284 * Change timeout for connected sockets. 2285 * Don't bother if we need to reconnect. 2286 */ 2287 if ((modified & MODIFIED_TIMEOUT) != 0 && 2288 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2289 for (ii = 0; ii < ncomps; ii++) { 2290 if (!ISREMOTE(ii)) 2291 continue; 2292 rw_rlock(&hio_remote_lock[ii]); 2293 if (!ISCONNECTED(gres, ii)) { 2294 rw_unlock(&hio_remote_lock[ii]); 2295 continue; 2296 } 2297 rw_unlock(&hio_remote_lock[ii]); 2298 if (proto_timeout(gres->hr_remotein, 2299 gres->hr_timeout) == -1) { 2300 pjdlog_errno(LOG_WARNING, 2301 "Unable to set connection timeout"); 2302 } 2303 if (proto_timeout(gres->hr_remoteout, 2304 gres->hr_timeout) == -1) { 2305 pjdlog_errno(LOG_WARNING, 2306 "Unable to set connection timeout"); 2307 } 2308 } 2309 } 2310 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2311 for (ii = 0; ii < ncomps; ii++) { 2312 if (!ISREMOTE(ii)) 2313 continue; 2314 remote_close(gres, ii); 2315 } 2316 if (modified & MODIFIED_REMOTEADDR) { 2317 vstr = nv_get_string(nv, "remoteaddr"); 2318 strlcpy(gres->hr_remoteaddr, vstr, 2319 sizeof(gres->hr_remoteaddr)); 2320 } 2321 } 2322#undef MODIFIED_REMOTEADDR 2323#undef MODIFIED_SOURCEADDR 2324#undef MODIFIED_REPLICATION 2325#undef MODIFIED_CHECKSUM 2326#undef MODIFIED_COMPRESSION 2327#undef MODIFIED_TIMEOUT 2328#undef MODIFIED_EXEC 2329#undef MODIFIED_METAFLUSH 2330 2331 pjdlog_info("Configuration reloaded successfully."); 2332} 2333 2334static void 2335guard_one(struct hast_resource *res, unsigned int ncomp) 2336{ 2337 struct proto_conn *in, *out; 2338 2339 if (!ISREMOTE(ncomp)) 2340 return; 2341 2342 rw_rlock(&hio_remote_lock[ncomp]); 2343 2344 if (!real_remote(res)) { 2345 rw_unlock(&hio_remote_lock[ncomp]); 2346 return; 2347 } 2348 2349 if (ISCONNECTED(res, ncomp)) { 2350 PJDLOG_ASSERT(res->hr_remotein != NULL); 2351 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2352 rw_unlock(&hio_remote_lock[ncomp]); 2353 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2354 res->hr_remoteaddr); 2355 return; 2356 } 2357 2358 PJDLOG_ASSERT(res->hr_remotein == NULL); 2359 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2360 /* 2361 * Upgrade the lock. It doesn't have to be atomic as no other thread 2362 * can change connection status from disconnected to connected. 2363 */ 2364 rw_unlock(&hio_remote_lock[ncomp]); 2365 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2366 res->hr_remoteaddr); 2367 in = out = NULL; 2368 if (init_remote(res, &in, &out) == 0) { 2369 rw_wlock(&hio_remote_lock[ncomp]); 2370 PJDLOG_ASSERT(res->hr_remotein == NULL); 2371 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2372 PJDLOG_ASSERT(in != NULL && out != NULL); 2373 res->hr_remotein = in; 2374 res->hr_remoteout = out; 2375 rw_unlock(&hio_remote_lock[ncomp]); 2376 pjdlog_info("Successfully reconnected to %s.", 2377 res->hr_remoteaddr); 2378 sync_start(); 2379 } else { 2380 /* Both connections should be NULL. */ 2381 PJDLOG_ASSERT(res->hr_remotein == NULL); 2382 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2383 PJDLOG_ASSERT(in == NULL && out == NULL); 2384 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2385 res->hr_remoteaddr); 2386 } 2387} 2388 2389/* 2390 * Thread guards remote connections and reconnects when needed, handles 2391 * signals, etc. 2392 */ 2393static void * 2394guard_thread(void *arg) 2395{ 2396 struct hast_resource *res = arg; 2397 unsigned int ii, ncomps; 2398 struct timespec timeout; 2399 time_t lastcheck, now; 2400 sigset_t mask; 2401 int signo; 2402 2403 ncomps = HAST_NCOMPONENTS; 2404 lastcheck = time(NULL); 2405 2406 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2407 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2408 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2409 2410 timeout.tv_sec = HAST_KEEPALIVE; 2411 timeout.tv_nsec = 0; 2412 signo = -1; 2413 2414 for (;;) { 2415 switch (signo) { 2416 case SIGINT: 2417 case SIGTERM: 2418 sigexit_received = true; 2419 primary_exitx(EX_OK, 2420 "Termination signal received, exiting."); 2421 break; 2422 default: 2423 break; 2424 } 2425 2426 /* 2427 * Don't check connections until we fully started, 2428 * as we may still be looping, waiting for remote node 2429 * to switch from primary to secondary. 2430 */ 2431 if (fullystarted) { 2432 pjdlog_debug(2, "remote_guard: Checking connections."); 2433 now = time(NULL); 2434 if (lastcheck + HAST_KEEPALIVE <= now) { 2435 for (ii = 0; ii < ncomps; ii++) 2436 guard_one(res, ii); 2437 lastcheck = now; 2438 } 2439 } 2440 signo = sigtimedwait(&mask, NULL, &timeout); 2441 } 2442 /* NOTREACHED */ 2443 return (NULL); 2444} 2445