primary.c revision 247866
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: stable/9/sbin/hastd/primary.c 247866 2013-03-06 06:57:18Z trociny $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <signal.h> 49#include <stdint.h> 50#include <stdio.h> 51#include <string.h> 52#include <sysexits.h> 53#include <unistd.h> 54 55#include <activemap.h> 56#include <nv.h> 57#include <rangelock.h> 58 59#include "control.h" 60#include "event.h" 61#include "hast.h" 62#include "hast_proto.h" 63#include "hastd.h" 64#include "hooks.h" 65#include "metadata.h" 66#include "proto.h" 67#include "pjdlog.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Remember replication from the time the request was initiated, 98 * so we won't get confused when replication changes on reload. 99 */ 100 int hio_replication; 101 TAILQ_ENTRY(hio) *hio_next; 102}; 103#define hio_free_next hio_next[0] 104#define hio_done_next hio_next[0] 105 106/* 107 * Free list holds unused structures. When free list is empty, we have to wait 108 * until some in-progress requests are freed. 109 */ 110static TAILQ_HEAD(, hio) hio_free_list; 111static pthread_mutex_t hio_free_list_lock; 112static pthread_cond_t hio_free_list_cond; 113/* 114 * There is one send list for every component. One requests is placed on all 115 * send lists - each component gets the same request, but each component is 116 * responsible for managing his own send list. 117 */ 118static TAILQ_HEAD(, hio) *hio_send_list; 119static pthread_mutex_t *hio_send_list_lock; 120static pthread_cond_t *hio_send_list_cond; 121/* 122 * There is one recv list for every component, although local components don't 123 * use recv lists as local requests are done synchronously. 124 */ 125static TAILQ_HEAD(, hio) *hio_recv_list; 126static pthread_mutex_t *hio_recv_list_lock; 127static pthread_cond_t *hio_recv_list_cond; 128/* 129 * Request is placed on done list by the slowest component (the one that 130 * decreased hio_countdown from 1 to 0). 131 */ 132static TAILQ_HEAD(, hio) hio_done_list; 133static pthread_mutex_t hio_done_list_lock; 134static pthread_cond_t hio_done_list_cond; 135/* 136 * Structure below are for interaction with sync thread. 137 */ 138static bool sync_inprogress; 139static pthread_mutex_t sync_lock; 140static pthread_cond_t sync_cond; 141/* 142 * The lock below allows to synchornize access to remote connections. 143 */ 144static pthread_rwlock_t *hio_remote_lock; 145 146/* 147 * Lock to synchronize metadata updates. Also synchronize access to 148 * hr_primary_localcnt and hr_primary_remotecnt fields. 149 */ 150static pthread_mutex_t metadata_lock; 151 152/* 153 * Maximum number of outstanding I/O requests. 154 */ 155#define HAST_HIO_MAX 256 156/* 157 * Number of components. At this point there are only two components: local 158 * and remote, but in the future it might be possible to use multiple local 159 * and remote components. 160 */ 161#define HAST_NCOMPONENTS 2 162 163#define ISCONNECTED(res, no) \ 164 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 165 166#define QUEUE_INSERT1(hio, name, ncomp) do { \ 167 bool _wakeup; \ 168 \ 169 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 170 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 171 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 172 hio_next[(ncomp)]); \ 173 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 174 if (_wakeup) \ 175 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 176} while (0) 177#define QUEUE_INSERT2(hio, name) do { \ 178 bool _wakeup; \ 179 \ 180 mtx_lock(&hio_##name##_list_lock); \ 181 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 182 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 183 mtx_unlock(&hio_##name##_list_lock); \ 184 if (_wakeup) \ 185 cv_signal(&hio_##name##_list_cond); \ 186} while (0) 187#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 188 bool _last; \ 189 \ 190 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 191 _last = false; \ 192 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 193 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 194 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 195 if ((timeout) != 0) \ 196 _last = true; \ 197 } \ 198 if (hio != NULL) { \ 199 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 200 hio_next[(ncomp)]); \ 201 } \ 202 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 203} while (0) 204#define QUEUE_TAKE2(hio, name) do { \ 205 mtx_lock(&hio_##name##_list_lock); \ 206 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 207 cv_wait(&hio_##name##_list_cond, \ 208 &hio_##name##_list_lock); \ 209 } \ 210 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 211 mtx_unlock(&hio_##name##_list_lock); \ 212} while (0) 213 214#define SYNCREQ(hio) do { \ 215 (hio)->hio_ggio.gctl_unit = -1; \ 216 (hio)->hio_ggio.gctl_seq = 1; \ 217} while (0) 218#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 219#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 220#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 221 222static struct hast_resource *gres; 223 224static pthread_mutex_t range_lock; 225static struct rangelocks *range_regular; 226static bool range_regular_wait; 227static pthread_cond_t range_regular_cond; 228static struct rangelocks *range_sync; 229static bool range_sync_wait; 230static pthread_cond_t range_sync_cond; 231static bool fullystarted; 232 233static void *ggate_recv_thread(void *arg); 234static void *local_send_thread(void *arg); 235static void *remote_send_thread(void *arg); 236static void *remote_recv_thread(void *arg); 237static void *ggate_send_thread(void *arg); 238static void *sync_thread(void *arg); 239static void *guard_thread(void *arg); 240 241static void 242cleanup(struct hast_resource *res) 243{ 244 int rerrno; 245 246 /* Remember errno. */ 247 rerrno = errno; 248 249 /* Destroy ggate provider if we created one. */ 250 if (res->hr_ggateunit >= 0) { 251 struct g_gate_ctl_destroy ggiod; 252 253 bzero(&ggiod, sizeof(ggiod)); 254 ggiod.gctl_version = G_GATE_VERSION; 255 ggiod.gctl_unit = res->hr_ggateunit; 256 ggiod.gctl_force = 1; 257 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 258 pjdlog_errno(LOG_WARNING, 259 "Unable to destroy hast/%s device", 260 res->hr_provname); 261 } 262 res->hr_ggateunit = -1; 263 } 264 265 /* Restore errno. */ 266 errno = rerrno; 267} 268 269static __dead2 void 270primary_exit(int exitcode, const char *fmt, ...) 271{ 272 va_list ap; 273 274 PJDLOG_ASSERT(exitcode != EX_OK); 275 va_start(ap, fmt); 276 pjdlogv_errno(LOG_ERR, fmt, ap); 277 va_end(ap); 278 cleanup(gres); 279 exit(exitcode); 280} 281 282static __dead2 void 283primary_exitx(int exitcode, const char *fmt, ...) 284{ 285 va_list ap; 286 287 va_start(ap, fmt); 288 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 289 va_end(ap); 290 cleanup(gres); 291 exit(exitcode); 292} 293 294static int 295hast_activemap_flush(struct hast_resource *res) 296{ 297 const unsigned char *buf; 298 size_t size; 299 300 buf = activemap_bitmap(res->hr_amp, &size); 301 PJDLOG_ASSERT(buf != NULL); 302 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 303 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 304 (ssize_t)size) { 305 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 306 res->hr_stat_activemap_write_error++; 307 return (-1); 308 } 309 if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { 310 if (errno == EOPNOTSUPP) { 311 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 312 res->hr_localpath); 313 res->hr_metaflush = 0; 314 } else { 315 pjdlog_errno(LOG_ERR, 316 "Unable to flush disk cache on activemap update"); 317 res->hr_stat_activemap_flush_error++; 318 return (-1); 319 } 320 } 321 return (0); 322} 323 324static bool 325real_remote(const struct hast_resource *res) 326{ 327 328 return (strcmp(res->hr_remoteaddr, "none") != 0); 329} 330 331static void 332init_environment(struct hast_resource *res __unused) 333{ 334 struct hio *hio; 335 unsigned int ii, ncomps; 336 337 /* 338 * In the future it might be per-resource value. 339 */ 340 ncomps = HAST_NCOMPONENTS; 341 342 /* 343 * Allocate memory needed by lists. 344 */ 345 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 346 if (hio_send_list == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for send lists.", 349 sizeof(hio_send_list[0]) * ncomps); 350 } 351 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 352 if (hio_send_list_lock == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for send list locks.", 355 sizeof(hio_send_list_lock[0]) * ncomps); 356 } 357 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 358 if (hio_send_list_cond == NULL) { 359 primary_exitx(EX_TEMPFAIL, 360 "Unable to allocate %zu bytes of memory for send list condition variables.", 361 sizeof(hio_send_list_cond[0]) * ncomps); 362 } 363 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 364 if (hio_recv_list == NULL) { 365 primary_exitx(EX_TEMPFAIL, 366 "Unable to allocate %zu bytes of memory for recv lists.", 367 sizeof(hio_recv_list[0]) * ncomps); 368 } 369 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 370 if (hio_recv_list_lock == NULL) { 371 primary_exitx(EX_TEMPFAIL, 372 "Unable to allocate %zu bytes of memory for recv list locks.", 373 sizeof(hio_recv_list_lock[0]) * ncomps); 374 } 375 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 376 if (hio_recv_list_cond == NULL) { 377 primary_exitx(EX_TEMPFAIL, 378 "Unable to allocate %zu bytes of memory for recv list condition variables.", 379 sizeof(hio_recv_list_cond[0]) * ncomps); 380 } 381 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 382 if (hio_remote_lock == NULL) { 383 primary_exitx(EX_TEMPFAIL, 384 "Unable to allocate %zu bytes of memory for remote connections locks.", 385 sizeof(hio_remote_lock[0]) * ncomps); 386 } 387 388 /* 389 * Initialize lists, their locks and theirs condition variables. 390 */ 391 TAILQ_INIT(&hio_free_list); 392 mtx_init(&hio_free_list_lock); 393 cv_init(&hio_free_list_cond); 394 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 395 TAILQ_INIT(&hio_send_list[ii]); 396 mtx_init(&hio_send_list_lock[ii]); 397 cv_init(&hio_send_list_cond[ii]); 398 TAILQ_INIT(&hio_recv_list[ii]); 399 mtx_init(&hio_recv_list_lock[ii]); 400 cv_init(&hio_recv_list_cond[ii]); 401 rw_init(&hio_remote_lock[ii]); 402 } 403 TAILQ_INIT(&hio_done_list); 404 mtx_init(&hio_done_list_lock); 405 cv_init(&hio_done_list_cond); 406 mtx_init(&metadata_lock); 407 408 /* 409 * Allocate requests pool and initialize requests. 410 */ 411 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 412 hio = malloc(sizeof(*hio)); 413 if (hio == NULL) { 414 primary_exitx(EX_TEMPFAIL, 415 "Unable to allocate %zu bytes of memory for hio request.", 416 sizeof(*hio)); 417 } 418 hio->hio_countdown = 0; 419 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 420 if (hio->hio_errors == NULL) { 421 primary_exitx(EX_TEMPFAIL, 422 "Unable allocate %zu bytes of memory for hio errors.", 423 sizeof(hio->hio_errors[0]) * ncomps); 424 } 425 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 426 if (hio->hio_next == NULL) { 427 primary_exitx(EX_TEMPFAIL, 428 "Unable allocate %zu bytes of memory for hio_next field.", 429 sizeof(hio->hio_next[0]) * ncomps); 430 } 431 hio->hio_ggio.gctl_version = G_GATE_VERSION; 432 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 433 if (hio->hio_ggio.gctl_data == NULL) { 434 primary_exitx(EX_TEMPFAIL, 435 "Unable to allocate %zu bytes of memory for gctl_data.", 436 MAXPHYS); 437 } 438 hio->hio_ggio.gctl_length = MAXPHYS; 439 hio->hio_ggio.gctl_error = 0; 440 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 441 } 442} 443 444static bool 445init_resuid(struct hast_resource *res) 446{ 447 448 mtx_lock(&metadata_lock); 449 if (res->hr_resuid != 0) { 450 mtx_unlock(&metadata_lock); 451 return (false); 452 } else { 453 /* Initialize unique resource identifier. */ 454 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 455 mtx_unlock(&metadata_lock); 456 if (metadata_write(res) == -1) 457 exit(EX_NOINPUT); 458 return (true); 459 } 460} 461 462static void 463init_local(struct hast_resource *res) 464{ 465 unsigned char *buf; 466 size_t mapsize; 467 468 if (metadata_read(res, true) == -1) 469 exit(EX_NOINPUT); 470 mtx_init(&res->hr_amp_lock); 471 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 472 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 473 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 474 } 475 mtx_init(&range_lock); 476 cv_init(&range_regular_cond); 477 if (rangelock_init(&range_regular) == -1) 478 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 479 cv_init(&range_sync_cond); 480 if (rangelock_init(&range_sync) == -1) 481 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 482 mapsize = activemap_ondisk_size(res->hr_amp); 483 buf = calloc(1, mapsize); 484 if (buf == NULL) { 485 primary_exitx(EX_TEMPFAIL, 486 "Unable to allocate buffer for activemap."); 487 } 488 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 489 (ssize_t)mapsize) { 490 primary_exit(EX_NOINPUT, "Unable to read activemap"); 491 } 492 activemap_copyin(res->hr_amp, buf, mapsize); 493 free(buf); 494 if (res->hr_resuid != 0) 495 return; 496 /* 497 * We're using provider for the first time. Initialize local and remote 498 * counters. We don't initialize resuid here, as we want to do it just 499 * in time. The reason for this is that we want to inform secondary 500 * that there were no writes yet, so there is no need to synchronize 501 * anything. 502 */ 503 res->hr_primary_localcnt = 0; 504 res->hr_primary_remotecnt = 0; 505 if (metadata_write(res) == -1) 506 exit(EX_NOINPUT); 507} 508 509static int 510primary_connect(struct hast_resource *res, struct proto_conn **connp) 511{ 512 struct proto_conn *conn; 513 int16_t val; 514 515 val = 1; 516 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 517 primary_exit(EX_TEMPFAIL, 518 "Unable to send connection request to parent"); 519 } 520 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 521 primary_exit(EX_TEMPFAIL, 522 "Unable to receive reply to connection request from parent"); 523 } 524 if (val != 0) { 525 errno = val; 526 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 527 res->hr_remoteaddr); 528 return (-1); 529 } 530 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 531 primary_exit(EX_TEMPFAIL, 532 "Unable to receive connection from parent"); 533 } 534 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 535 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 536 res->hr_remoteaddr); 537 proto_close(conn); 538 return (-1); 539 } 540 /* Error in setting timeout is not critical, but why should it fail? */ 541 if (proto_timeout(conn, res->hr_timeout) == -1) 542 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 543 544 *connp = conn; 545 546 return (0); 547} 548 549/* 550 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 551 */ 552static void 553enable_direct_reads(struct hast_resource *res) 554{ 555 struct g_gate_ctl_modify ggiomodify; 556 557 bzero(&ggiomodify, sizeof(ggiomodify)); 558 ggiomodify.gctl_version = G_GATE_VERSION; 559 ggiomodify.gctl_unit = res->hr_ggateunit; 560 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 561 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 562 sizeof(ggiomodify.gctl_readprov)); 563 ggiomodify.gctl_readoffset = res->hr_localoff; 564 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 565 pjdlog_debug(1, "Direct reads enabled."); 566 else 567 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 568} 569 570static int 571init_remote(struct hast_resource *res, struct proto_conn **inp, 572 struct proto_conn **outp) 573{ 574 struct proto_conn *in, *out; 575 struct nv *nvout, *nvin; 576 const unsigned char *token; 577 unsigned char *map; 578 const char *errmsg; 579 int32_t extentsize; 580 int64_t datasize; 581 uint32_t mapsize; 582 size_t size; 583 int error; 584 585 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 586 PJDLOG_ASSERT(real_remote(res)); 587 588 in = out = NULL; 589 errmsg = NULL; 590 591 if (primary_connect(res, &out) == -1) 592 return (ECONNREFUSED); 593 594 error = ECONNABORTED; 595 596 /* 597 * First handshake step. 598 * Setup outgoing connection with remote node. 599 */ 600 nvout = nv_alloc(); 601 nv_add_string(nvout, res->hr_name, "resource"); 602 if (nv_error(nvout) != 0) { 603 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 604 "Unable to allocate header for connection with %s", 605 res->hr_remoteaddr); 606 nv_free(nvout); 607 goto close; 608 } 609 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 610 pjdlog_errno(LOG_WARNING, 611 "Unable to send handshake header to %s", 612 res->hr_remoteaddr); 613 nv_free(nvout); 614 goto close; 615 } 616 nv_free(nvout); 617 if (hast_proto_recv_hdr(out, &nvin) == -1) { 618 pjdlog_errno(LOG_WARNING, 619 "Unable to receive handshake header from %s", 620 res->hr_remoteaddr); 621 goto close; 622 } 623 errmsg = nv_get_string(nvin, "errmsg"); 624 if (errmsg != NULL) { 625 pjdlog_warning("%s", errmsg); 626 if (nv_exists(nvin, "wait")) 627 error = EBUSY; 628 nv_free(nvin); 629 goto close; 630 } 631 token = nv_get_uint8_array(nvin, &size, "token"); 632 if (token == NULL) { 633 pjdlog_warning("Handshake header from %s has no 'token' field.", 634 res->hr_remoteaddr); 635 nv_free(nvin); 636 goto close; 637 } 638 if (size != sizeof(res->hr_token)) { 639 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 640 res->hr_remoteaddr, size, sizeof(res->hr_token)); 641 nv_free(nvin); 642 goto close; 643 } 644 bcopy(token, res->hr_token, sizeof(res->hr_token)); 645 nv_free(nvin); 646 647 /* 648 * Second handshake step. 649 * Setup incoming connection with remote node. 650 */ 651 if (primary_connect(res, &in) == -1) 652 goto close; 653 654 nvout = nv_alloc(); 655 nv_add_string(nvout, res->hr_name, "resource"); 656 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 657 "token"); 658 if (res->hr_resuid == 0) { 659 /* 660 * The resuid field was not yet initialized. 661 * Because we do synchronization inside init_resuid(), it is 662 * possible that someone already initialized it, the function 663 * will return false then, but if we successfully initialized 664 * it, we will get true. True means that there were no writes 665 * to this resource yet and we want to inform secondary that 666 * synchronization is not needed by sending "virgin" argument. 667 */ 668 if (init_resuid(res)) 669 nv_add_int8(nvout, 1, "virgin"); 670 } 671 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 672 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 673 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 674 if (nv_error(nvout) != 0) { 675 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 676 "Unable to allocate header for connection with %s", 677 res->hr_remoteaddr); 678 nv_free(nvout); 679 goto close; 680 } 681 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 682 pjdlog_errno(LOG_WARNING, 683 "Unable to send handshake header to %s", 684 res->hr_remoteaddr); 685 nv_free(nvout); 686 goto close; 687 } 688 nv_free(nvout); 689 if (hast_proto_recv_hdr(out, &nvin) == -1) { 690 pjdlog_errno(LOG_WARNING, 691 "Unable to receive handshake header from %s", 692 res->hr_remoteaddr); 693 goto close; 694 } 695 errmsg = nv_get_string(nvin, "errmsg"); 696 if (errmsg != NULL) { 697 pjdlog_warning("%s", errmsg); 698 nv_free(nvin); 699 goto close; 700 } 701 datasize = nv_get_int64(nvin, "datasize"); 702 if (datasize != res->hr_datasize) { 703 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 704 (intmax_t)res->hr_datasize, (intmax_t)datasize); 705 nv_free(nvin); 706 goto close; 707 } 708 extentsize = nv_get_int32(nvin, "extentsize"); 709 if (extentsize != res->hr_extentsize) { 710 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 711 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 712 nv_free(nvin); 713 goto close; 714 } 715 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 716 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 717 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 718 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 719 enable_direct_reads(res); 720 if (nv_exists(nvin, "virgin")) { 721 /* 722 * Secondary was reinitialized, bump localcnt if it is 0 as 723 * only we have the data. 724 */ 725 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 726 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 727 728 if (res->hr_primary_localcnt == 0) { 729 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 730 731 mtx_lock(&metadata_lock); 732 res->hr_primary_localcnt++; 733 pjdlog_debug(1, "Increasing localcnt to %ju.", 734 (uintmax_t)res->hr_primary_localcnt); 735 (void)metadata_write(res); 736 mtx_unlock(&metadata_lock); 737 } 738 } 739 map = NULL; 740 mapsize = nv_get_uint32(nvin, "mapsize"); 741 if (mapsize > 0) { 742 map = malloc(mapsize); 743 if (map == NULL) { 744 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 745 (uintmax_t)mapsize); 746 nv_free(nvin); 747 goto close; 748 } 749 /* 750 * Remote node have some dirty extents on its own, lets 751 * download its activemap. 752 */ 753 if (hast_proto_recv_data(res, out, nvin, map, 754 mapsize) == -1) { 755 pjdlog_errno(LOG_ERR, 756 "Unable to receive remote activemap"); 757 nv_free(nvin); 758 free(map); 759 goto close; 760 } 761 /* 762 * Merge local and remote bitmaps. 763 */ 764 activemap_merge(res->hr_amp, map, mapsize); 765 free(map); 766 /* 767 * Now that we merged bitmaps from both nodes, flush it to the 768 * disk before we start to synchronize. 769 */ 770 (void)hast_activemap_flush(res); 771 } 772 nv_free(nvin); 773#ifdef notyet 774 /* Setup directions. */ 775 if (proto_send(out, NULL, 0) == -1) 776 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 777 if (proto_recv(in, NULL, 0) == -1) 778 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 779#endif 780 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 781 if (inp != NULL && outp != NULL) { 782 *inp = in; 783 *outp = out; 784 } else { 785 res->hr_remotein = in; 786 res->hr_remoteout = out; 787 } 788 event_send(res, EVENT_CONNECT); 789 return (0); 790close: 791 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 792 event_send(res, EVENT_SPLITBRAIN); 793 proto_close(out); 794 if (in != NULL) 795 proto_close(in); 796 return (error); 797} 798 799static void 800sync_start(void) 801{ 802 803 mtx_lock(&sync_lock); 804 sync_inprogress = true; 805 mtx_unlock(&sync_lock); 806 cv_signal(&sync_cond); 807} 808 809static void 810sync_stop(void) 811{ 812 813 mtx_lock(&sync_lock); 814 if (sync_inprogress) 815 sync_inprogress = false; 816 mtx_unlock(&sync_lock); 817} 818 819static void 820init_ggate(struct hast_resource *res) 821{ 822 struct g_gate_ctl_create ggiocreate; 823 struct g_gate_ctl_cancel ggiocancel; 824 825 /* 826 * We communicate with ggate via /dev/ggctl. Open it. 827 */ 828 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 829 if (res->hr_ggatefd == -1) 830 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 831 /* 832 * Create provider before trying to connect, as connection failure 833 * is not critical, but may take some time. 834 */ 835 bzero(&ggiocreate, sizeof(ggiocreate)); 836 ggiocreate.gctl_version = G_GATE_VERSION; 837 ggiocreate.gctl_mediasize = res->hr_datasize; 838 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 839 ggiocreate.gctl_flags = 0; 840 ggiocreate.gctl_maxcount = 0; 841 ggiocreate.gctl_timeout = 0; 842 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 843 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 844 res->hr_provname); 845 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 846 pjdlog_info("Device hast/%s created.", res->hr_provname); 847 res->hr_ggateunit = ggiocreate.gctl_unit; 848 return; 849 } 850 if (errno != EEXIST) { 851 primary_exit(EX_OSERR, "Unable to create hast/%s device", 852 res->hr_provname); 853 } 854 pjdlog_debug(1, 855 "Device hast/%s already exists, we will try to take it over.", 856 res->hr_provname); 857 /* 858 * If we received EEXIST, we assume that the process who created the 859 * provider died and didn't clean up. In that case we will start from 860 * where he left of. 861 */ 862 bzero(&ggiocancel, sizeof(ggiocancel)); 863 ggiocancel.gctl_version = G_GATE_VERSION; 864 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 865 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 866 res->hr_provname); 867 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 868 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 869 res->hr_ggateunit = ggiocancel.gctl_unit; 870 return; 871 } 872 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 873 res->hr_provname); 874} 875 876void 877hastd_primary(struct hast_resource *res) 878{ 879 pthread_t td; 880 pid_t pid; 881 int error, mode, debuglevel; 882 883 /* 884 * Create communication channel for sending control commands from 885 * parent to child. 886 */ 887 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 888 /* TODO: There's no need for this to be fatal error. */ 889 KEEP_ERRNO((void)pidfile_remove(pfh)); 890 pjdlog_exit(EX_OSERR, 891 "Unable to create control sockets between parent and child"); 892 } 893 /* 894 * Create communication channel for sending events from child to parent. 895 */ 896 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 897 /* TODO: There's no need for this to be fatal error. */ 898 KEEP_ERRNO((void)pidfile_remove(pfh)); 899 pjdlog_exit(EX_OSERR, 900 "Unable to create event sockets between child and parent"); 901 } 902 /* 903 * Create communication channel for sending connection requests from 904 * child to parent. 905 */ 906 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 907 /* TODO: There's no need for this to be fatal error. */ 908 KEEP_ERRNO((void)pidfile_remove(pfh)); 909 pjdlog_exit(EX_OSERR, 910 "Unable to create connection sockets between child and parent"); 911 } 912 913 pid = fork(); 914 if (pid == -1) { 915 /* TODO: There's no need for this to be fatal error. */ 916 KEEP_ERRNO((void)pidfile_remove(pfh)); 917 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 918 } 919 920 if (pid > 0) { 921 /* This is parent. */ 922 /* Declare that we are receiver. */ 923 proto_recv(res->hr_event, NULL, 0); 924 proto_recv(res->hr_conn, NULL, 0); 925 /* Declare that we are sender. */ 926 proto_send(res->hr_ctrl, NULL, 0); 927 res->hr_workerpid = pid; 928 return; 929 } 930 931 gres = res; 932 mode = pjdlog_mode_get(); 933 debuglevel = pjdlog_debug_get(); 934 935 /* Declare that we are sender. */ 936 proto_send(res->hr_event, NULL, 0); 937 proto_send(res->hr_conn, NULL, 0); 938 /* Declare that we are receiver. */ 939 proto_recv(res->hr_ctrl, NULL, 0); 940 descriptors_cleanup(res); 941 942 descriptors_assert(res, mode); 943 944 pjdlog_init(mode); 945 pjdlog_debug_set(debuglevel); 946 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 947 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 948 949 init_local(res); 950 init_ggate(res); 951 init_environment(res); 952 953 if (drop_privs(res) != 0) { 954 cleanup(res); 955 exit(EX_CONFIG); 956 } 957 pjdlog_info("Privileges successfully dropped."); 958 959 /* 960 * Create the guard thread first, so we can handle signals from the 961 * very beginning. 962 */ 963 error = pthread_create(&td, NULL, guard_thread, res); 964 PJDLOG_ASSERT(error == 0); 965 /* 966 * Create the control thread before sending any event to the parent, 967 * as we can deadlock when parent sends control request to worker, 968 * but worker has no control thread started yet, so parent waits. 969 * In the meantime worker sends an event to the parent, but parent 970 * is unable to handle the event, because it waits for control 971 * request response. 972 */ 973 error = pthread_create(&td, NULL, ctrl_thread, res); 974 PJDLOG_ASSERT(error == 0); 975 if (real_remote(res)) { 976 error = init_remote(res, NULL, NULL); 977 if (error == 0) { 978 sync_start(); 979 } else if (error == EBUSY) { 980 time_t start = time(NULL); 981 982 pjdlog_warning("Waiting for remote node to become %s for %ds.", 983 role2str(HAST_ROLE_SECONDARY), 984 res->hr_timeout); 985 for (;;) { 986 sleep(1); 987 error = init_remote(res, NULL, NULL); 988 if (error != EBUSY) 989 break; 990 if (time(NULL) > start + res->hr_timeout) 991 break; 992 } 993 if (error == EBUSY) { 994 pjdlog_warning("Remote node is still %s, starting anyway.", 995 role2str(HAST_ROLE_PRIMARY)); 996 } 997 } 998 } 999 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1000 PJDLOG_ASSERT(error == 0); 1001 error = pthread_create(&td, NULL, local_send_thread, res); 1002 PJDLOG_ASSERT(error == 0); 1003 error = pthread_create(&td, NULL, remote_send_thread, res); 1004 PJDLOG_ASSERT(error == 0); 1005 error = pthread_create(&td, NULL, remote_recv_thread, res); 1006 PJDLOG_ASSERT(error == 0); 1007 error = pthread_create(&td, NULL, ggate_send_thread, res); 1008 PJDLOG_ASSERT(error == 0); 1009 fullystarted = true; 1010 (void)sync_thread(res); 1011} 1012 1013static void 1014reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 1015{ 1016 char msg[1024]; 1017 va_list ap; 1018 1019 va_start(ap, fmt); 1020 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1021 va_end(ap); 1022 switch (ggio->gctl_cmd) { 1023 case BIO_READ: 1024 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1025 (uintmax_t)ggio->gctl_offset, 1026 (uintmax_t)ggio->gctl_length); 1027 break; 1028 case BIO_DELETE: 1029 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1030 (uintmax_t)ggio->gctl_offset, 1031 (uintmax_t)ggio->gctl_length); 1032 break; 1033 case BIO_FLUSH: 1034 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1035 break; 1036 case BIO_WRITE: 1037 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1038 (uintmax_t)ggio->gctl_offset, 1039 (uintmax_t)ggio->gctl_length); 1040 break; 1041 default: 1042 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1043 (unsigned int)ggio->gctl_cmd); 1044 break; 1045 } 1046 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1047} 1048 1049static void 1050remote_close(struct hast_resource *res, int ncomp) 1051{ 1052 1053 rw_wlock(&hio_remote_lock[ncomp]); 1054 /* 1055 * Check for a race between dropping rlock and acquiring wlock - 1056 * another thread can close connection in-between. 1057 */ 1058 if (!ISCONNECTED(res, ncomp)) { 1059 PJDLOG_ASSERT(res->hr_remotein == NULL); 1060 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1061 rw_unlock(&hio_remote_lock[ncomp]); 1062 return; 1063 } 1064 1065 PJDLOG_ASSERT(res->hr_remotein != NULL); 1066 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1067 1068 pjdlog_debug(2, "Closing incoming connection to %s.", 1069 res->hr_remoteaddr); 1070 proto_close(res->hr_remotein); 1071 res->hr_remotein = NULL; 1072 pjdlog_debug(2, "Closing outgoing connection to %s.", 1073 res->hr_remoteaddr); 1074 proto_close(res->hr_remoteout); 1075 res->hr_remoteout = NULL; 1076 1077 rw_unlock(&hio_remote_lock[ncomp]); 1078 1079 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1080 1081 /* 1082 * Stop synchronization if in-progress. 1083 */ 1084 sync_stop(); 1085 1086 event_send(res, EVENT_DISCONNECT); 1087} 1088 1089/* 1090 * Acknowledge write completion to the kernel, but don't update activemap yet. 1091 */ 1092static void 1093write_complete(struct hast_resource *res, struct hio *hio) 1094{ 1095 struct g_gate_ctl_io *ggio; 1096 unsigned int ncomp; 1097 1098 PJDLOG_ASSERT(!hio->hio_done); 1099 1100 ggio = &hio->hio_ggio; 1101 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1102 1103 /* 1104 * Bump local count if this is first write after 1105 * connection failure with remote node. 1106 */ 1107 ncomp = 1; 1108 rw_rlock(&hio_remote_lock[ncomp]); 1109 if (!ISCONNECTED(res, ncomp)) { 1110 mtx_lock(&metadata_lock); 1111 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1112 res->hr_primary_localcnt++; 1113 pjdlog_debug(1, "Increasing localcnt to %ju.", 1114 (uintmax_t)res->hr_primary_localcnt); 1115 (void)metadata_write(res); 1116 } 1117 mtx_unlock(&metadata_lock); 1118 } 1119 rw_unlock(&hio_remote_lock[ncomp]); 1120 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1121 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1122 hio->hio_done = true; 1123} 1124 1125/* 1126 * Thread receives ggate I/O requests from the kernel and passes them to 1127 * appropriate threads: 1128 * WRITE - always goes to both local_send and remote_send threads 1129 * READ (when the block is up-to-date on local component) - 1130 * only local_send thread 1131 * READ (when the block isn't up-to-date on local component) - 1132 * only remote_send thread 1133 * DELETE - always goes to both local_send and remote_send threads 1134 * FLUSH - always goes to both local_send and remote_send threads 1135 */ 1136static void * 1137ggate_recv_thread(void *arg) 1138{ 1139 struct hast_resource *res = arg; 1140 struct g_gate_ctl_io *ggio; 1141 struct hio *hio; 1142 unsigned int ii, ncomp, ncomps; 1143 int error; 1144 1145 for (;;) { 1146 pjdlog_debug(2, "ggate_recv: Taking free request."); 1147 QUEUE_TAKE2(hio, free); 1148 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1149 ggio = &hio->hio_ggio; 1150 ggio->gctl_unit = res->hr_ggateunit; 1151 ggio->gctl_length = MAXPHYS; 1152 ggio->gctl_error = 0; 1153 hio->hio_done = false; 1154 hio->hio_replication = res->hr_replication; 1155 pjdlog_debug(2, 1156 "ggate_recv: (%p) Waiting for request from the kernel.", 1157 hio); 1158 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1159 if (sigexit_received) 1160 pthread_exit(NULL); 1161 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1162 } 1163 error = ggio->gctl_error; 1164 switch (error) { 1165 case 0: 1166 break; 1167 case ECANCELED: 1168 /* Exit gracefully. */ 1169 if (!sigexit_received) { 1170 pjdlog_debug(2, 1171 "ggate_recv: (%p) Received cancel from the kernel.", 1172 hio); 1173 pjdlog_info("Received cancel from the kernel, exiting."); 1174 } 1175 pthread_exit(NULL); 1176 case ENOMEM: 1177 /* 1178 * Buffer too small? Impossible, we allocate MAXPHYS 1179 * bytes - request can't be bigger than that. 1180 */ 1181 /* FALLTHROUGH */ 1182 case ENXIO: 1183 default: 1184 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1185 strerror(error)); 1186 } 1187 1188 ncomp = 0; 1189 ncomps = HAST_NCOMPONENTS; 1190 1191 for (ii = 0; ii < ncomps; ii++) 1192 hio->hio_errors[ii] = EINVAL; 1193 reqlog(LOG_DEBUG, 2, ggio, 1194 "ggate_recv: (%p) Request received from the kernel: ", 1195 hio); 1196 1197 /* 1198 * Inform all components about new write request. 1199 * For read request prefer local component unless the given 1200 * range is out-of-date, then use remote component. 1201 */ 1202 switch (ggio->gctl_cmd) { 1203 case BIO_READ: 1204 res->hr_stat_read++; 1205 ncomps = 1; 1206 mtx_lock(&metadata_lock); 1207 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1208 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1209 /* 1210 * This range is up-to-date on local component, 1211 * so handle request locally. 1212 */ 1213 /* Local component is 0 for now. */ 1214 ncomp = 0; 1215 } else /* if (res->hr_syncsrc == 1216 HAST_SYNCSRC_SECONDARY) */ { 1217 PJDLOG_ASSERT(res->hr_syncsrc == 1218 HAST_SYNCSRC_SECONDARY); 1219 /* 1220 * This range is out-of-date on local component, 1221 * so send request to the remote node. 1222 */ 1223 /* Remote component is 1 for now. */ 1224 ncomp = 1; 1225 } 1226 mtx_unlock(&metadata_lock); 1227 break; 1228 case BIO_WRITE: 1229 res->hr_stat_write++; 1230 if (res->hr_resuid == 0 && 1231 res->hr_primary_localcnt == 0) { 1232 /* This is first write. */ 1233 res->hr_primary_localcnt = 1; 1234 } 1235 for (;;) { 1236 mtx_lock(&range_lock); 1237 if (rangelock_islocked(range_sync, 1238 ggio->gctl_offset, ggio->gctl_length)) { 1239 pjdlog_debug(2, 1240 "regular: Range offset=%jd length=%zu locked.", 1241 (intmax_t)ggio->gctl_offset, 1242 (size_t)ggio->gctl_length); 1243 range_regular_wait = true; 1244 cv_wait(&range_regular_cond, &range_lock); 1245 range_regular_wait = false; 1246 mtx_unlock(&range_lock); 1247 continue; 1248 } 1249 if (rangelock_add(range_regular, 1250 ggio->gctl_offset, ggio->gctl_length) == -1) { 1251 mtx_unlock(&range_lock); 1252 pjdlog_debug(2, 1253 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1254 (intmax_t)ggio->gctl_offset, 1255 (size_t)ggio->gctl_length); 1256 sleep(1); 1257 continue; 1258 } 1259 mtx_unlock(&range_lock); 1260 break; 1261 } 1262 mtx_lock(&res->hr_amp_lock); 1263 if (activemap_write_start(res->hr_amp, 1264 ggio->gctl_offset, ggio->gctl_length)) { 1265 res->hr_stat_activemap_update++; 1266 (void)hast_activemap_flush(res); 1267 } 1268 mtx_unlock(&res->hr_amp_lock); 1269 break; 1270 case BIO_DELETE: 1271 res->hr_stat_delete++; 1272 break; 1273 case BIO_FLUSH: 1274 res->hr_stat_flush++; 1275 break; 1276 } 1277 pjdlog_debug(2, 1278 "ggate_recv: (%p) Moving request to the send queues.", hio); 1279 refcount_init(&hio->hio_countdown, ncomps); 1280 for (ii = ncomp; ii < ncomp + ncomps; ii++) 1281 QUEUE_INSERT1(hio, send, ii); 1282 } 1283 /* NOTREACHED */ 1284 return (NULL); 1285} 1286 1287/* 1288 * Thread reads from or writes to local component. 1289 * If local read fails, it redirects it to remote_send thread. 1290 */ 1291static void * 1292local_send_thread(void *arg) 1293{ 1294 struct hast_resource *res = arg; 1295 struct g_gate_ctl_io *ggio; 1296 struct hio *hio; 1297 unsigned int ncomp, rncomp; 1298 ssize_t ret; 1299 1300 /* Local component is 0 for now. */ 1301 ncomp = 0; 1302 /* Remote component is 1 for now. */ 1303 rncomp = 1; 1304 1305 for (;;) { 1306 pjdlog_debug(2, "local_send: Taking request."); 1307 QUEUE_TAKE1(hio, send, ncomp, 0); 1308 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1309 ggio = &hio->hio_ggio; 1310 switch (ggio->gctl_cmd) { 1311 case BIO_READ: 1312 ret = pread(res->hr_localfd, ggio->gctl_data, 1313 ggio->gctl_length, 1314 ggio->gctl_offset + res->hr_localoff); 1315 if (ret == ggio->gctl_length) 1316 hio->hio_errors[ncomp] = 0; 1317 else if (!ISSYNCREQ(hio)) { 1318 /* 1319 * If READ failed, try to read from remote node. 1320 */ 1321 if (ret == -1) { 1322 reqlog(LOG_WARNING, 0, ggio, 1323 "Local request failed (%s), trying remote node. ", 1324 strerror(errno)); 1325 } else if (ret != ggio->gctl_length) { 1326 reqlog(LOG_WARNING, 0, ggio, 1327 "Local request failed (%zd != %jd), trying remote node. ", 1328 ret, (intmax_t)ggio->gctl_length); 1329 } 1330 QUEUE_INSERT1(hio, send, rncomp); 1331 continue; 1332 } 1333 break; 1334 case BIO_WRITE: 1335 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1336 ggio->gctl_length, 1337 ggio->gctl_offset + res->hr_localoff); 1338 if (ret == -1) { 1339 hio->hio_errors[ncomp] = errno; 1340 reqlog(LOG_WARNING, 0, ggio, 1341 "Local request failed (%s): ", 1342 strerror(errno)); 1343 } else if (ret != ggio->gctl_length) { 1344 hio->hio_errors[ncomp] = EIO; 1345 reqlog(LOG_WARNING, 0, ggio, 1346 "Local request failed (%zd != %jd): ", 1347 ret, (intmax_t)ggio->gctl_length); 1348 } else { 1349 hio->hio_errors[ncomp] = 0; 1350 if (hio->hio_replication == 1351 HAST_REPLICATION_ASYNC && 1352 !ISSYNCREQ(hio)) { 1353 ggio->gctl_error = 0; 1354 write_complete(res, hio); 1355 } 1356 } 1357 break; 1358 case BIO_DELETE: 1359 ret = g_delete(res->hr_localfd, 1360 ggio->gctl_offset + res->hr_localoff, 1361 ggio->gctl_length); 1362 if (ret == -1) { 1363 hio->hio_errors[ncomp] = errno; 1364 reqlog(LOG_WARNING, 0, ggio, 1365 "Local request failed (%s): ", 1366 strerror(errno)); 1367 } else { 1368 hio->hio_errors[ncomp] = 0; 1369 } 1370 break; 1371 case BIO_FLUSH: 1372 if (!res->hr_localflush) { 1373 ret = -1; 1374 errno = EOPNOTSUPP; 1375 break; 1376 } 1377 ret = g_flush(res->hr_localfd); 1378 if (ret == -1) { 1379 if (errno == EOPNOTSUPP) 1380 res->hr_localflush = false; 1381 hio->hio_errors[ncomp] = errno; 1382 reqlog(LOG_WARNING, 0, ggio, 1383 "Local request failed (%s): ", 1384 strerror(errno)); 1385 } else { 1386 hio->hio_errors[ncomp] = 0; 1387 } 1388 break; 1389 } 1390 if (!refcount_release(&hio->hio_countdown)) 1391 continue; 1392 if (ISSYNCREQ(hio)) { 1393 mtx_lock(&sync_lock); 1394 SYNCREQDONE(hio); 1395 mtx_unlock(&sync_lock); 1396 cv_signal(&sync_cond); 1397 } else { 1398 pjdlog_debug(2, 1399 "local_send: (%p) Moving request to the done queue.", 1400 hio); 1401 QUEUE_INSERT2(hio, done); 1402 } 1403 } 1404 /* NOTREACHED */ 1405 return (NULL); 1406} 1407 1408static void 1409keepalive_send(struct hast_resource *res, unsigned int ncomp) 1410{ 1411 struct nv *nv; 1412 1413 rw_rlock(&hio_remote_lock[ncomp]); 1414 1415 if (!ISCONNECTED(res, ncomp)) { 1416 rw_unlock(&hio_remote_lock[ncomp]); 1417 return; 1418 } 1419 1420 PJDLOG_ASSERT(res->hr_remotein != NULL); 1421 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1422 1423 nv = nv_alloc(); 1424 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1425 if (nv_error(nv) != 0) { 1426 rw_unlock(&hio_remote_lock[ncomp]); 1427 nv_free(nv); 1428 pjdlog_debug(1, 1429 "keepalive_send: Unable to prepare header to send."); 1430 return; 1431 } 1432 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1433 rw_unlock(&hio_remote_lock[ncomp]); 1434 pjdlog_common(LOG_DEBUG, 1, errno, 1435 "keepalive_send: Unable to send request"); 1436 nv_free(nv); 1437 remote_close(res, ncomp); 1438 return; 1439 } 1440 1441 rw_unlock(&hio_remote_lock[ncomp]); 1442 nv_free(nv); 1443 pjdlog_debug(2, "keepalive_send: Request sent."); 1444} 1445 1446/* 1447 * Thread sends request to secondary node. 1448 */ 1449static void * 1450remote_send_thread(void *arg) 1451{ 1452 struct hast_resource *res = arg; 1453 struct g_gate_ctl_io *ggio; 1454 time_t lastcheck, now; 1455 struct hio *hio; 1456 struct nv *nv; 1457 unsigned int ncomp; 1458 bool wakeup; 1459 uint64_t offset, length; 1460 uint8_t cmd; 1461 void *data; 1462 1463 /* Remote component is 1 for now. */ 1464 ncomp = 1; 1465 lastcheck = time(NULL); 1466 1467 for (;;) { 1468 pjdlog_debug(2, "remote_send: Taking request."); 1469 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1470 if (hio == NULL) { 1471 now = time(NULL); 1472 if (lastcheck + HAST_KEEPALIVE <= now) { 1473 keepalive_send(res, ncomp); 1474 lastcheck = now; 1475 } 1476 continue; 1477 } 1478 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1479 ggio = &hio->hio_ggio; 1480 switch (ggio->gctl_cmd) { 1481 case BIO_READ: 1482 cmd = HIO_READ; 1483 data = NULL; 1484 offset = ggio->gctl_offset; 1485 length = ggio->gctl_length; 1486 break; 1487 case BIO_WRITE: 1488 cmd = HIO_WRITE; 1489 data = ggio->gctl_data; 1490 offset = ggio->gctl_offset; 1491 length = ggio->gctl_length; 1492 break; 1493 case BIO_DELETE: 1494 cmd = HIO_DELETE; 1495 data = NULL; 1496 offset = ggio->gctl_offset; 1497 length = ggio->gctl_length; 1498 break; 1499 case BIO_FLUSH: 1500 cmd = HIO_FLUSH; 1501 data = NULL; 1502 offset = 0; 1503 length = 0; 1504 break; 1505 default: 1506 PJDLOG_ABORT("invalid condition"); 1507 } 1508 nv = nv_alloc(); 1509 nv_add_uint8(nv, cmd, "cmd"); 1510 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1511 nv_add_uint64(nv, offset, "offset"); 1512 nv_add_uint64(nv, length, "length"); 1513 if (nv_error(nv) != 0) { 1514 hio->hio_errors[ncomp] = nv_error(nv); 1515 pjdlog_debug(2, 1516 "remote_send: (%p) Unable to prepare header to send.", 1517 hio); 1518 reqlog(LOG_ERR, 0, ggio, 1519 "Unable to prepare header to send (%s): ", 1520 strerror(nv_error(nv))); 1521 /* Move failed request immediately to the done queue. */ 1522 goto done_queue; 1523 } 1524 /* 1525 * Protect connection from disappearing. 1526 */ 1527 rw_rlock(&hio_remote_lock[ncomp]); 1528 if (!ISCONNECTED(res, ncomp)) { 1529 rw_unlock(&hio_remote_lock[ncomp]); 1530 hio->hio_errors[ncomp] = ENOTCONN; 1531 goto done_queue; 1532 } 1533 /* 1534 * Move the request to recv queue before sending it, because 1535 * in different order we can get reply before we move request 1536 * to recv queue. 1537 */ 1538 pjdlog_debug(2, 1539 "remote_send: (%p) Moving request to the recv queue.", 1540 hio); 1541 mtx_lock(&hio_recv_list_lock[ncomp]); 1542 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1543 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1544 mtx_unlock(&hio_recv_list_lock[ncomp]); 1545 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1546 data != NULL ? length : 0) == -1) { 1547 hio->hio_errors[ncomp] = errno; 1548 rw_unlock(&hio_remote_lock[ncomp]); 1549 pjdlog_debug(2, 1550 "remote_send: (%p) Unable to send request.", hio); 1551 reqlog(LOG_ERR, 0, ggio, 1552 "Unable to send request (%s): ", 1553 strerror(hio->hio_errors[ncomp])); 1554 remote_close(res, ncomp); 1555 /* 1556 * Take request back from the receive queue and move 1557 * it immediately to the done queue. 1558 */ 1559 mtx_lock(&hio_recv_list_lock[ncomp]); 1560 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1561 hio_next[ncomp]); 1562 mtx_unlock(&hio_recv_list_lock[ncomp]); 1563 goto done_queue; 1564 } 1565 rw_unlock(&hio_remote_lock[ncomp]); 1566 nv_free(nv); 1567 if (wakeup) 1568 cv_signal(&hio_recv_list_cond[ncomp]); 1569 continue; 1570done_queue: 1571 nv_free(nv); 1572 if (ISSYNCREQ(hio)) { 1573 if (!refcount_release(&hio->hio_countdown)) 1574 continue; 1575 mtx_lock(&sync_lock); 1576 SYNCREQDONE(hio); 1577 mtx_unlock(&sync_lock); 1578 cv_signal(&sync_cond); 1579 continue; 1580 } 1581 if (ggio->gctl_cmd == BIO_WRITE) { 1582 mtx_lock(&res->hr_amp_lock); 1583 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1584 ggio->gctl_length)) { 1585 (void)hast_activemap_flush(res); 1586 } 1587 mtx_unlock(&res->hr_amp_lock); 1588 } 1589 if (!refcount_release(&hio->hio_countdown)) 1590 continue; 1591 pjdlog_debug(2, 1592 "remote_send: (%p) Moving request to the done queue.", 1593 hio); 1594 QUEUE_INSERT2(hio, done); 1595 } 1596 /* NOTREACHED */ 1597 return (NULL); 1598} 1599 1600/* 1601 * Thread receives answer from secondary node and passes it to ggate_send 1602 * thread. 1603 */ 1604static void * 1605remote_recv_thread(void *arg) 1606{ 1607 struct hast_resource *res = arg; 1608 struct g_gate_ctl_io *ggio; 1609 struct hio *hio; 1610 struct nv *nv; 1611 unsigned int ncomp; 1612 uint64_t seq; 1613 int error; 1614 1615 /* Remote component is 1 for now. */ 1616 ncomp = 1; 1617 1618 for (;;) { 1619 /* Wait until there is anything to receive. */ 1620 mtx_lock(&hio_recv_list_lock[ncomp]); 1621 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1622 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1623 cv_wait(&hio_recv_list_cond[ncomp], 1624 &hio_recv_list_lock[ncomp]); 1625 } 1626 mtx_unlock(&hio_recv_list_lock[ncomp]); 1627 1628 rw_rlock(&hio_remote_lock[ncomp]); 1629 if (!ISCONNECTED(res, ncomp)) { 1630 rw_unlock(&hio_remote_lock[ncomp]); 1631 /* 1632 * Connection is dead, so move all pending requests to 1633 * the done queue (one-by-one). 1634 */ 1635 mtx_lock(&hio_recv_list_lock[ncomp]); 1636 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1637 PJDLOG_ASSERT(hio != NULL); 1638 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1639 hio_next[ncomp]); 1640 mtx_unlock(&hio_recv_list_lock[ncomp]); 1641 goto done_queue; 1642 } 1643 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1644 pjdlog_errno(LOG_ERR, 1645 "Unable to receive reply header"); 1646 rw_unlock(&hio_remote_lock[ncomp]); 1647 remote_close(res, ncomp); 1648 continue; 1649 } 1650 rw_unlock(&hio_remote_lock[ncomp]); 1651 seq = nv_get_uint64(nv, "seq"); 1652 if (seq == 0) { 1653 pjdlog_error("Header contains no 'seq' field."); 1654 nv_free(nv); 1655 continue; 1656 } 1657 mtx_lock(&hio_recv_list_lock[ncomp]); 1658 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1659 if (hio->hio_ggio.gctl_seq == seq) { 1660 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1661 hio_next[ncomp]); 1662 break; 1663 } 1664 } 1665 mtx_unlock(&hio_recv_list_lock[ncomp]); 1666 if (hio == NULL) { 1667 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1668 (uintmax_t)seq); 1669 nv_free(nv); 1670 continue; 1671 } 1672 ggio = &hio->hio_ggio; 1673 error = nv_get_int16(nv, "error"); 1674 if (error != 0) { 1675 /* Request failed on remote side. */ 1676 hio->hio_errors[ncomp] = error; 1677 reqlog(LOG_WARNING, 0, ggio, 1678 "Remote request failed (%s): ", strerror(error)); 1679 nv_free(nv); 1680 goto done_queue; 1681 } 1682 switch (ggio->gctl_cmd) { 1683 case BIO_READ: 1684 rw_rlock(&hio_remote_lock[ncomp]); 1685 if (!ISCONNECTED(res, ncomp)) { 1686 rw_unlock(&hio_remote_lock[ncomp]); 1687 nv_free(nv); 1688 goto done_queue; 1689 } 1690 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1691 ggio->gctl_data, ggio->gctl_length) == -1) { 1692 hio->hio_errors[ncomp] = errno; 1693 pjdlog_errno(LOG_ERR, 1694 "Unable to receive reply data"); 1695 rw_unlock(&hio_remote_lock[ncomp]); 1696 nv_free(nv); 1697 remote_close(res, ncomp); 1698 goto done_queue; 1699 } 1700 rw_unlock(&hio_remote_lock[ncomp]); 1701 break; 1702 case BIO_WRITE: 1703 case BIO_DELETE: 1704 case BIO_FLUSH: 1705 break; 1706 default: 1707 PJDLOG_ABORT("invalid condition"); 1708 } 1709 hio->hio_errors[ncomp] = 0; 1710 nv_free(nv); 1711done_queue: 1712 if (!refcount_release(&hio->hio_countdown)) 1713 continue; 1714 if (ISSYNCREQ(hio)) { 1715 mtx_lock(&sync_lock); 1716 SYNCREQDONE(hio); 1717 mtx_unlock(&sync_lock); 1718 cv_signal(&sync_cond); 1719 } else { 1720 pjdlog_debug(2, 1721 "remote_recv: (%p) Moving request to the done queue.", 1722 hio); 1723 QUEUE_INSERT2(hio, done); 1724 } 1725 } 1726 /* NOTREACHED */ 1727 return (NULL); 1728} 1729 1730/* 1731 * Thread sends answer to the kernel. 1732 */ 1733static void * 1734ggate_send_thread(void *arg) 1735{ 1736 struct hast_resource *res = arg; 1737 struct g_gate_ctl_io *ggio; 1738 struct hio *hio; 1739 unsigned int ii, ncomps; 1740 1741 ncomps = HAST_NCOMPONENTS; 1742 1743 for (;;) { 1744 pjdlog_debug(2, "ggate_send: Taking request."); 1745 QUEUE_TAKE2(hio, done); 1746 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1747 ggio = &hio->hio_ggio; 1748 for (ii = 0; ii < ncomps; ii++) { 1749 if (hio->hio_errors[ii] == 0) { 1750 /* 1751 * One successful request is enough to declare 1752 * success. 1753 */ 1754 ggio->gctl_error = 0; 1755 break; 1756 } 1757 } 1758 if (ii == ncomps) { 1759 /* 1760 * None of the requests were successful. 1761 * Use the error from local component except the 1762 * case when we did only remote request. 1763 */ 1764 if (ggio->gctl_cmd == BIO_READ && 1765 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1766 ggio->gctl_error = hio->hio_errors[1]; 1767 else 1768 ggio->gctl_error = hio->hio_errors[0]; 1769 } 1770 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1771 mtx_lock(&res->hr_amp_lock); 1772 if (activemap_write_complete(res->hr_amp, 1773 ggio->gctl_offset, ggio->gctl_length)) { 1774 res->hr_stat_activemap_update++; 1775 (void)hast_activemap_flush(res); 1776 } 1777 mtx_unlock(&res->hr_amp_lock); 1778 } 1779 if (ggio->gctl_cmd == BIO_WRITE) { 1780 /* 1781 * Unlock range we locked. 1782 */ 1783 mtx_lock(&range_lock); 1784 rangelock_del(range_regular, ggio->gctl_offset, 1785 ggio->gctl_length); 1786 if (range_sync_wait) 1787 cv_signal(&range_sync_cond); 1788 mtx_unlock(&range_lock); 1789 if (!hio->hio_done) 1790 write_complete(res, hio); 1791 } else { 1792 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1793 primary_exit(EX_OSERR, 1794 "G_GATE_CMD_DONE failed"); 1795 } 1796 } 1797 if (hio->hio_errors[0]) { 1798 switch (ggio->gctl_cmd) { 1799 case BIO_READ: 1800 res->hr_stat_read_error++; 1801 break; 1802 case BIO_WRITE: 1803 res->hr_stat_write_error++; 1804 break; 1805 case BIO_DELETE: 1806 res->hr_stat_delete_error++; 1807 break; 1808 case BIO_FLUSH: 1809 res->hr_stat_flush_error++; 1810 break; 1811 } 1812 } 1813 pjdlog_debug(2, 1814 "ggate_send: (%p) Moving request to the free queue.", hio); 1815 QUEUE_INSERT2(hio, free); 1816 } 1817 /* NOTREACHED */ 1818 return (NULL); 1819} 1820 1821/* 1822 * Thread synchronize local and remote components. 1823 */ 1824static void * 1825sync_thread(void *arg __unused) 1826{ 1827 struct hast_resource *res = arg; 1828 struct hio *hio; 1829 struct g_gate_ctl_io *ggio; 1830 struct timeval tstart, tend, tdiff; 1831 unsigned int ii, ncomp, ncomps; 1832 off_t offset, length, synced; 1833 bool dorewind, directreads; 1834 int syncext; 1835 1836 ncomps = HAST_NCOMPONENTS; 1837 dorewind = true; 1838 synced = 0; 1839 offset = -1; 1840 directreads = false; 1841 1842 for (;;) { 1843 mtx_lock(&sync_lock); 1844 if (offset >= 0 && !sync_inprogress) { 1845 gettimeofday(&tend, NULL); 1846 timersub(&tend, &tstart, &tdiff); 1847 pjdlog_info("Synchronization interrupted after %#.0T. " 1848 "%NB synchronized so far.", &tdiff, 1849 (intmax_t)synced); 1850 event_send(res, EVENT_SYNCINTR); 1851 } 1852 while (!sync_inprogress) { 1853 dorewind = true; 1854 synced = 0; 1855 cv_wait(&sync_cond, &sync_lock); 1856 } 1857 mtx_unlock(&sync_lock); 1858 /* 1859 * Obtain offset at which we should synchronize. 1860 * Rewind synchronization if needed. 1861 */ 1862 mtx_lock(&res->hr_amp_lock); 1863 if (dorewind) 1864 activemap_sync_rewind(res->hr_amp); 1865 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1866 if (syncext != -1) { 1867 /* 1868 * We synchronized entire syncext extent, we can mark 1869 * it as clean now. 1870 */ 1871 if (activemap_extent_complete(res->hr_amp, syncext)) 1872 (void)hast_activemap_flush(res); 1873 } 1874 mtx_unlock(&res->hr_amp_lock); 1875 if (dorewind) { 1876 dorewind = false; 1877 if (offset == -1) 1878 pjdlog_info("Nodes are in sync."); 1879 else { 1880 pjdlog_info("Synchronization started. %NB to go.", 1881 (intmax_t)(res->hr_extentsize * 1882 activemap_ndirty(res->hr_amp))); 1883 event_send(res, EVENT_SYNCSTART); 1884 gettimeofday(&tstart, NULL); 1885 } 1886 } 1887 if (offset == -1) { 1888 sync_stop(); 1889 pjdlog_debug(1, "Nothing to synchronize."); 1890 /* 1891 * Synchronization complete, make both localcnt and 1892 * remotecnt equal. 1893 */ 1894 ncomp = 1; 1895 rw_rlock(&hio_remote_lock[ncomp]); 1896 if (ISCONNECTED(res, ncomp)) { 1897 if (synced > 0) { 1898 int64_t bps; 1899 1900 gettimeofday(&tend, NULL); 1901 timersub(&tend, &tstart, &tdiff); 1902 bps = (int64_t)((double)synced / 1903 ((double)tdiff.tv_sec + 1904 (double)tdiff.tv_usec / 1000000)); 1905 pjdlog_info("Synchronization complete. " 1906 "%NB synchronized in %#.0lT (%NB/sec).", 1907 (intmax_t)synced, &tdiff, 1908 (intmax_t)bps); 1909 event_send(res, EVENT_SYNCDONE); 1910 } 1911 mtx_lock(&metadata_lock); 1912 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1913 directreads = true; 1914 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1915 res->hr_primary_localcnt = 1916 res->hr_secondary_remotecnt; 1917 res->hr_primary_remotecnt = 1918 res->hr_secondary_localcnt; 1919 pjdlog_debug(1, 1920 "Setting localcnt to %ju and remotecnt to %ju.", 1921 (uintmax_t)res->hr_primary_localcnt, 1922 (uintmax_t)res->hr_primary_remotecnt); 1923 (void)metadata_write(res); 1924 mtx_unlock(&metadata_lock); 1925 } 1926 rw_unlock(&hio_remote_lock[ncomp]); 1927 if (directreads) { 1928 directreads = false; 1929 enable_direct_reads(res); 1930 } 1931 continue; 1932 } 1933 pjdlog_debug(2, "sync: Taking free request."); 1934 QUEUE_TAKE2(hio, free); 1935 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1936 /* 1937 * Lock the range we are going to synchronize. We don't want 1938 * race where someone writes between our read and write. 1939 */ 1940 for (;;) { 1941 mtx_lock(&range_lock); 1942 if (rangelock_islocked(range_regular, offset, length)) { 1943 pjdlog_debug(2, 1944 "sync: Range offset=%jd length=%jd locked.", 1945 (intmax_t)offset, (intmax_t)length); 1946 range_sync_wait = true; 1947 cv_wait(&range_sync_cond, &range_lock); 1948 range_sync_wait = false; 1949 mtx_unlock(&range_lock); 1950 continue; 1951 } 1952 if (rangelock_add(range_sync, offset, length) == -1) { 1953 mtx_unlock(&range_lock); 1954 pjdlog_debug(2, 1955 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1956 (intmax_t)offset, (intmax_t)length); 1957 sleep(1); 1958 continue; 1959 } 1960 mtx_unlock(&range_lock); 1961 break; 1962 } 1963 /* 1964 * First read the data from synchronization source. 1965 */ 1966 SYNCREQ(hio); 1967 ggio = &hio->hio_ggio; 1968 ggio->gctl_cmd = BIO_READ; 1969 ggio->gctl_offset = offset; 1970 ggio->gctl_length = length; 1971 ggio->gctl_error = 0; 1972 hio->hio_done = false; 1973 hio->hio_replication = res->hr_replication; 1974 for (ii = 0; ii < ncomps; ii++) 1975 hio->hio_errors[ii] = EINVAL; 1976 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1977 hio); 1978 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1979 hio); 1980 mtx_lock(&metadata_lock); 1981 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1982 /* 1983 * This range is up-to-date on local component, 1984 * so handle request locally. 1985 */ 1986 /* Local component is 0 for now. */ 1987 ncomp = 0; 1988 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1989 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1990 /* 1991 * This range is out-of-date on local component, 1992 * so send request to the remote node. 1993 */ 1994 /* Remote component is 1 for now. */ 1995 ncomp = 1; 1996 } 1997 mtx_unlock(&metadata_lock); 1998 refcount_init(&hio->hio_countdown, 1); 1999 QUEUE_INSERT1(hio, send, ncomp); 2000 2001 /* 2002 * Let's wait for READ to finish. 2003 */ 2004 mtx_lock(&sync_lock); 2005 while (!ISSYNCREQDONE(hio)) 2006 cv_wait(&sync_cond, &sync_lock); 2007 mtx_unlock(&sync_lock); 2008 2009 if (hio->hio_errors[ncomp] != 0) { 2010 pjdlog_error("Unable to read synchronization data: %s.", 2011 strerror(hio->hio_errors[ncomp])); 2012 goto free_queue; 2013 } 2014 2015 /* 2016 * We read the data from synchronization source, now write it 2017 * to synchronization target. 2018 */ 2019 SYNCREQ(hio); 2020 ggio->gctl_cmd = BIO_WRITE; 2021 for (ii = 0; ii < ncomps; ii++) 2022 hio->hio_errors[ii] = EINVAL; 2023 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2024 hio); 2025 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2026 hio); 2027 mtx_lock(&metadata_lock); 2028 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2029 /* 2030 * This range is up-to-date on local component, 2031 * so we update remote component. 2032 */ 2033 /* Remote component is 1 for now. */ 2034 ncomp = 1; 2035 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2036 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2037 /* 2038 * This range is out-of-date on local component, 2039 * so we update it. 2040 */ 2041 /* Local component is 0 for now. */ 2042 ncomp = 0; 2043 } 2044 mtx_unlock(&metadata_lock); 2045 2046 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2047 hio); 2048 refcount_init(&hio->hio_countdown, 1); 2049 QUEUE_INSERT1(hio, send, ncomp); 2050 2051 /* 2052 * Let's wait for WRITE to finish. 2053 */ 2054 mtx_lock(&sync_lock); 2055 while (!ISSYNCREQDONE(hio)) 2056 cv_wait(&sync_cond, &sync_lock); 2057 mtx_unlock(&sync_lock); 2058 2059 if (hio->hio_errors[ncomp] != 0) { 2060 pjdlog_error("Unable to write synchronization data: %s.", 2061 strerror(hio->hio_errors[ncomp])); 2062 goto free_queue; 2063 } 2064 2065 synced += length; 2066free_queue: 2067 mtx_lock(&range_lock); 2068 rangelock_del(range_sync, offset, length); 2069 if (range_regular_wait) 2070 cv_signal(&range_regular_cond); 2071 mtx_unlock(&range_lock); 2072 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2073 hio); 2074 QUEUE_INSERT2(hio, free); 2075 } 2076 /* NOTREACHED */ 2077 return (NULL); 2078} 2079 2080void 2081primary_config_reload(struct hast_resource *res, struct nv *nv) 2082{ 2083 unsigned int ii, ncomps; 2084 int modified, vint; 2085 const char *vstr; 2086 2087 pjdlog_info("Reloading configuration..."); 2088 2089 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2090 PJDLOG_ASSERT(gres == res); 2091 nv_assert(nv, "remoteaddr"); 2092 nv_assert(nv, "sourceaddr"); 2093 nv_assert(nv, "replication"); 2094 nv_assert(nv, "checksum"); 2095 nv_assert(nv, "compression"); 2096 nv_assert(nv, "timeout"); 2097 nv_assert(nv, "exec"); 2098 nv_assert(nv, "metaflush"); 2099 2100 ncomps = HAST_NCOMPONENTS; 2101 2102#define MODIFIED_REMOTEADDR 0x01 2103#define MODIFIED_SOURCEADDR 0x02 2104#define MODIFIED_REPLICATION 0x04 2105#define MODIFIED_CHECKSUM 0x08 2106#define MODIFIED_COMPRESSION 0x10 2107#define MODIFIED_TIMEOUT 0x20 2108#define MODIFIED_EXEC 0x40 2109#define MODIFIED_METAFLUSH 0x80 2110 modified = 0; 2111 2112 vstr = nv_get_string(nv, "remoteaddr"); 2113 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2114 /* 2115 * Don't copy res->hr_remoteaddr to gres just yet. 2116 * We want remote_close() to log disconnect from the old 2117 * addresses, not from the new ones. 2118 */ 2119 modified |= MODIFIED_REMOTEADDR; 2120 } 2121 vstr = nv_get_string(nv, "sourceaddr"); 2122 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2123 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2124 modified |= MODIFIED_SOURCEADDR; 2125 } 2126 vint = nv_get_int32(nv, "replication"); 2127 if (gres->hr_replication != vint) { 2128 gres->hr_replication = vint; 2129 modified |= MODIFIED_REPLICATION; 2130 } 2131 vint = nv_get_int32(nv, "checksum"); 2132 if (gres->hr_checksum != vint) { 2133 gres->hr_checksum = vint; 2134 modified |= MODIFIED_CHECKSUM; 2135 } 2136 vint = nv_get_int32(nv, "compression"); 2137 if (gres->hr_compression != vint) { 2138 gres->hr_compression = vint; 2139 modified |= MODIFIED_COMPRESSION; 2140 } 2141 vint = nv_get_int32(nv, "timeout"); 2142 if (gres->hr_timeout != vint) { 2143 gres->hr_timeout = vint; 2144 modified |= MODIFIED_TIMEOUT; 2145 } 2146 vstr = nv_get_string(nv, "exec"); 2147 if (strcmp(gres->hr_exec, vstr) != 0) { 2148 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2149 modified |= MODIFIED_EXEC; 2150 } 2151 vint = nv_get_int32(nv, "metaflush"); 2152 if (gres->hr_metaflush != vint) { 2153 gres->hr_metaflush = vint; 2154 modified |= MODIFIED_METAFLUSH; 2155 } 2156 2157 /* 2158 * Change timeout for connected sockets. 2159 * Don't bother if we need to reconnect. 2160 */ 2161 if ((modified & MODIFIED_TIMEOUT) != 0 && 2162 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2163 for (ii = 0; ii < ncomps; ii++) { 2164 if (!ISREMOTE(ii)) 2165 continue; 2166 rw_rlock(&hio_remote_lock[ii]); 2167 if (!ISCONNECTED(gres, ii)) { 2168 rw_unlock(&hio_remote_lock[ii]); 2169 continue; 2170 } 2171 rw_unlock(&hio_remote_lock[ii]); 2172 if (proto_timeout(gres->hr_remotein, 2173 gres->hr_timeout) == -1) { 2174 pjdlog_errno(LOG_WARNING, 2175 "Unable to set connection timeout"); 2176 } 2177 if (proto_timeout(gres->hr_remoteout, 2178 gres->hr_timeout) == -1) { 2179 pjdlog_errno(LOG_WARNING, 2180 "Unable to set connection timeout"); 2181 } 2182 } 2183 } 2184 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2185 for (ii = 0; ii < ncomps; ii++) { 2186 if (!ISREMOTE(ii)) 2187 continue; 2188 remote_close(gres, ii); 2189 } 2190 if (modified & MODIFIED_REMOTEADDR) { 2191 vstr = nv_get_string(nv, "remoteaddr"); 2192 strlcpy(gres->hr_remoteaddr, vstr, 2193 sizeof(gres->hr_remoteaddr)); 2194 } 2195 } 2196#undef MODIFIED_REMOTEADDR 2197#undef MODIFIED_SOURCEADDR 2198#undef MODIFIED_REPLICATION 2199#undef MODIFIED_CHECKSUM 2200#undef MODIFIED_COMPRESSION 2201#undef MODIFIED_TIMEOUT 2202#undef MODIFIED_EXEC 2203#undef MODIFIED_METAFLUSH 2204 2205 pjdlog_info("Configuration reloaded successfully."); 2206} 2207 2208static void 2209guard_one(struct hast_resource *res, unsigned int ncomp) 2210{ 2211 struct proto_conn *in, *out; 2212 2213 if (!ISREMOTE(ncomp)) 2214 return; 2215 2216 rw_rlock(&hio_remote_lock[ncomp]); 2217 2218 if (!real_remote(res)) { 2219 rw_unlock(&hio_remote_lock[ncomp]); 2220 return; 2221 } 2222 2223 if (ISCONNECTED(res, ncomp)) { 2224 PJDLOG_ASSERT(res->hr_remotein != NULL); 2225 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2226 rw_unlock(&hio_remote_lock[ncomp]); 2227 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2228 res->hr_remoteaddr); 2229 return; 2230 } 2231 2232 PJDLOG_ASSERT(res->hr_remotein == NULL); 2233 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2234 /* 2235 * Upgrade the lock. It doesn't have to be atomic as no other thread 2236 * can change connection status from disconnected to connected. 2237 */ 2238 rw_unlock(&hio_remote_lock[ncomp]); 2239 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2240 res->hr_remoteaddr); 2241 in = out = NULL; 2242 if (init_remote(res, &in, &out) == 0) { 2243 rw_wlock(&hio_remote_lock[ncomp]); 2244 PJDLOG_ASSERT(res->hr_remotein == NULL); 2245 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2246 PJDLOG_ASSERT(in != NULL && out != NULL); 2247 res->hr_remotein = in; 2248 res->hr_remoteout = out; 2249 rw_unlock(&hio_remote_lock[ncomp]); 2250 pjdlog_info("Successfully reconnected to %s.", 2251 res->hr_remoteaddr); 2252 sync_start(); 2253 } else { 2254 /* Both connections should be NULL. */ 2255 PJDLOG_ASSERT(res->hr_remotein == NULL); 2256 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2257 PJDLOG_ASSERT(in == NULL && out == NULL); 2258 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2259 res->hr_remoteaddr); 2260 } 2261} 2262 2263/* 2264 * Thread guards remote connections and reconnects when needed, handles 2265 * signals, etc. 2266 */ 2267static void * 2268guard_thread(void *arg) 2269{ 2270 struct hast_resource *res = arg; 2271 unsigned int ii, ncomps; 2272 struct timespec timeout; 2273 time_t lastcheck, now; 2274 sigset_t mask; 2275 int signo; 2276 2277 ncomps = HAST_NCOMPONENTS; 2278 lastcheck = time(NULL); 2279 2280 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2281 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2282 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2283 2284 timeout.tv_sec = HAST_KEEPALIVE; 2285 timeout.tv_nsec = 0; 2286 signo = -1; 2287 2288 for (;;) { 2289 switch (signo) { 2290 case SIGINT: 2291 case SIGTERM: 2292 sigexit_received = true; 2293 primary_exitx(EX_OK, 2294 "Termination signal received, exiting."); 2295 break; 2296 default: 2297 break; 2298 } 2299 2300 /* 2301 * Don't check connections until we fully started, 2302 * as we may still be looping, waiting for remote node 2303 * to switch from primary to secondary. 2304 */ 2305 if (fullystarted) { 2306 pjdlog_debug(2, "remote_guard: Checking connections."); 2307 now = time(NULL); 2308 if (lastcheck + HAST_KEEPALIVE <= now) { 2309 for (ii = 0; ii < ncomps; ii++) 2310 guard_one(res, ii); 2311 lastcheck = now; 2312 } 2313 } 2314 signo = sigtimedwait(&mask, NULL, &timeout); 2315 } 2316 /* NOTREACHED */ 2317 return (NULL); 2318} 2319