primary.c revision 222228
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 222228 2011-05-23 21:15:19Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <signal.h> 49#include <stdint.h> 50#include <stdio.h> 51#include <string.h> 52#include <sysexits.h> 53#include <unistd.h> 54 55#include <activemap.h> 56#include <nv.h> 57#include <rangelock.h> 58 59#include "control.h" 60#include "event.h" 61#include "hast.h" 62#include "hast_proto.h" 63#include "hastd.h" 64#include "hooks.h" 65#include "metadata.h" 66#include "proto.h" 67#include "pjdlog.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 TAILQ_ENTRY(hio) *hio_next; 93}; 94#define hio_free_next hio_next[0] 95#define hio_done_next hio_next[0] 96 97/* 98 * Free list holds unused structures. When free list is empty, we have to wait 99 * until some in-progress requests are freed. 100 */ 101static TAILQ_HEAD(, hio) hio_free_list; 102static pthread_mutex_t hio_free_list_lock; 103static pthread_cond_t hio_free_list_cond; 104/* 105 * There is one send list for every component. One requests is placed on all 106 * send lists - each component gets the same request, but each component is 107 * responsible for managing his own send list. 108 */ 109static TAILQ_HEAD(, hio) *hio_send_list; 110static pthread_mutex_t *hio_send_list_lock; 111static pthread_cond_t *hio_send_list_cond; 112/* 113 * There is one recv list for every component, although local components don't 114 * use recv lists as local requests are done synchronously. 115 */ 116static TAILQ_HEAD(, hio) *hio_recv_list; 117static pthread_mutex_t *hio_recv_list_lock; 118static pthread_cond_t *hio_recv_list_cond; 119/* 120 * Request is placed on done list by the slowest component (the one that 121 * decreased hio_countdown from 1 to 0). 122 */ 123static TAILQ_HEAD(, hio) hio_done_list; 124static pthread_mutex_t hio_done_list_lock; 125static pthread_cond_t hio_done_list_cond; 126/* 127 * Structure below are for interaction with sync thread. 128 */ 129static bool sync_inprogress; 130static pthread_mutex_t sync_lock; 131static pthread_cond_t sync_cond; 132/* 133 * The lock below allows to synchornize access to remote connections. 134 */ 135static pthread_rwlock_t *hio_remote_lock; 136 137/* 138 * Lock to synchronize metadata updates. Also synchronize access to 139 * hr_primary_localcnt and hr_primary_remotecnt fields. 140 */ 141static pthread_mutex_t metadata_lock; 142 143/* 144 * Maximum number of outstanding I/O requests. 145 */ 146#define HAST_HIO_MAX 256 147/* 148 * Number of components. At this point there are only two components: local 149 * and remote, but in the future it might be possible to use multiple local 150 * and remote components. 151 */ 152#define HAST_NCOMPONENTS 2 153 154#define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167} while (0) 168#define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177} while (0) 178#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 179 bool _last; \ 180 \ 181 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 182 _last = false; \ 183 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 184 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 185 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 186 if ((timeout) != 0) \ 187 _last = true; \ 188 } \ 189 if (hio != NULL) { \ 190 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 191 hio_next[(ncomp)]); \ 192 } \ 193 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 194} while (0) 195#define QUEUE_TAKE2(hio, name) do { \ 196 mtx_lock(&hio_##name##_list_lock); \ 197 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 198 cv_wait(&hio_##name##_list_cond, \ 199 &hio_##name##_list_lock); \ 200 } \ 201 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 202 mtx_unlock(&hio_##name##_list_lock); \ 203} while (0) 204 205#define SYNCREQ(hio) do { \ 206 (hio)->hio_ggio.gctl_unit = -1; \ 207 (hio)->hio_ggio.gctl_seq = 1; \ 208} while (0) 209#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 210#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 211#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 212 213static struct hast_resource *gres; 214 215static pthread_mutex_t range_lock; 216static struct rangelocks *range_regular; 217static bool range_regular_wait; 218static pthread_cond_t range_regular_cond; 219static struct rangelocks *range_sync; 220static bool range_sync_wait; 221static pthread_cond_t range_sync_cond; 222static bool fullystarted; 223 224static void *ggate_recv_thread(void *arg); 225static void *local_send_thread(void *arg); 226static void *remote_send_thread(void *arg); 227static void *remote_recv_thread(void *arg); 228static void *ggate_send_thread(void *arg); 229static void *sync_thread(void *arg); 230static void *guard_thread(void *arg); 231 232static void 233cleanup(struct hast_resource *res) 234{ 235 int rerrno; 236 237 /* Remember errno. */ 238 rerrno = errno; 239 240 /* Destroy ggate provider if we created one. */ 241 if (res->hr_ggateunit >= 0) { 242 struct g_gate_ctl_destroy ggiod; 243 244 bzero(&ggiod, sizeof(ggiod)); 245 ggiod.gctl_version = G_GATE_VERSION; 246 ggiod.gctl_unit = res->hr_ggateunit; 247 ggiod.gctl_force = 1; 248 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 249 pjdlog_errno(LOG_WARNING, 250 "Unable to destroy hast/%s device", 251 res->hr_provname); 252 } 253 res->hr_ggateunit = -1; 254 } 255 256 /* Restore errno. */ 257 errno = rerrno; 258} 259 260static __dead2 void 261primary_exit(int exitcode, const char *fmt, ...) 262{ 263 va_list ap; 264 265 PJDLOG_ASSERT(exitcode != EX_OK); 266 va_start(ap, fmt); 267 pjdlogv_errno(LOG_ERR, fmt, ap); 268 va_end(ap); 269 cleanup(gres); 270 exit(exitcode); 271} 272 273static __dead2 void 274primary_exitx(int exitcode, const char *fmt, ...) 275{ 276 va_list ap; 277 278 va_start(ap, fmt); 279 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 280 va_end(ap); 281 cleanup(gres); 282 exit(exitcode); 283} 284 285static int 286hast_activemap_flush(struct hast_resource *res) 287{ 288 const unsigned char *buf; 289 size_t size; 290 291 buf = activemap_bitmap(res->hr_amp, &size); 292 PJDLOG_ASSERT(buf != NULL); 293 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 294 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 295 (ssize_t)size) { 296 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 297 "Unable to flush activemap to disk")); 298 return (-1); 299 } 300 return (0); 301} 302 303static bool 304real_remote(const struct hast_resource *res) 305{ 306 307 return (strcmp(res->hr_remoteaddr, "none") != 0); 308} 309 310static void 311init_environment(struct hast_resource *res __unused) 312{ 313 struct hio *hio; 314 unsigned int ii, ncomps; 315 316 /* 317 * In the future it might be per-resource value. 318 */ 319 ncomps = HAST_NCOMPONENTS; 320 321 /* 322 * Allocate memory needed by lists. 323 */ 324 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 325 if (hio_send_list == NULL) { 326 primary_exitx(EX_TEMPFAIL, 327 "Unable to allocate %zu bytes of memory for send lists.", 328 sizeof(hio_send_list[0]) * ncomps); 329 } 330 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 331 if (hio_send_list_lock == NULL) { 332 primary_exitx(EX_TEMPFAIL, 333 "Unable to allocate %zu bytes of memory for send list locks.", 334 sizeof(hio_send_list_lock[0]) * ncomps); 335 } 336 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 337 if (hio_send_list_cond == NULL) { 338 primary_exitx(EX_TEMPFAIL, 339 "Unable to allocate %zu bytes of memory for send list condition variables.", 340 sizeof(hio_send_list_cond[0]) * ncomps); 341 } 342 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 343 if (hio_recv_list == NULL) { 344 primary_exitx(EX_TEMPFAIL, 345 "Unable to allocate %zu bytes of memory for recv lists.", 346 sizeof(hio_recv_list[0]) * ncomps); 347 } 348 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 349 if (hio_recv_list_lock == NULL) { 350 primary_exitx(EX_TEMPFAIL, 351 "Unable to allocate %zu bytes of memory for recv list locks.", 352 sizeof(hio_recv_list_lock[0]) * ncomps); 353 } 354 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 355 if (hio_recv_list_cond == NULL) { 356 primary_exitx(EX_TEMPFAIL, 357 "Unable to allocate %zu bytes of memory for recv list condition variables.", 358 sizeof(hio_recv_list_cond[0]) * ncomps); 359 } 360 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 361 if (hio_remote_lock == NULL) { 362 primary_exitx(EX_TEMPFAIL, 363 "Unable to allocate %zu bytes of memory for remote connections locks.", 364 sizeof(hio_remote_lock[0]) * ncomps); 365 } 366 367 /* 368 * Initialize lists, their locks and theirs condition variables. 369 */ 370 TAILQ_INIT(&hio_free_list); 371 mtx_init(&hio_free_list_lock); 372 cv_init(&hio_free_list_cond); 373 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 374 TAILQ_INIT(&hio_send_list[ii]); 375 mtx_init(&hio_send_list_lock[ii]); 376 cv_init(&hio_send_list_cond[ii]); 377 TAILQ_INIT(&hio_recv_list[ii]); 378 mtx_init(&hio_recv_list_lock[ii]); 379 cv_init(&hio_recv_list_cond[ii]); 380 rw_init(&hio_remote_lock[ii]); 381 } 382 TAILQ_INIT(&hio_done_list); 383 mtx_init(&hio_done_list_lock); 384 cv_init(&hio_done_list_cond); 385 mtx_init(&metadata_lock); 386 387 /* 388 * Allocate requests pool and initialize requests. 389 */ 390 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 391 hio = malloc(sizeof(*hio)); 392 if (hio == NULL) { 393 primary_exitx(EX_TEMPFAIL, 394 "Unable to allocate %zu bytes of memory for hio request.", 395 sizeof(*hio)); 396 } 397 hio->hio_countdown = 0; 398 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 399 if (hio->hio_errors == NULL) { 400 primary_exitx(EX_TEMPFAIL, 401 "Unable allocate %zu bytes of memory for hio errors.", 402 sizeof(hio->hio_errors[0]) * ncomps); 403 } 404 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 405 if (hio->hio_next == NULL) { 406 primary_exitx(EX_TEMPFAIL, 407 "Unable allocate %zu bytes of memory for hio_next field.", 408 sizeof(hio->hio_next[0]) * ncomps); 409 } 410 hio->hio_ggio.gctl_version = G_GATE_VERSION; 411 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 412 if (hio->hio_ggio.gctl_data == NULL) { 413 primary_exitx(EX_TEMPFAIL, 414 "Unable to allocate %zu bytes of memory for gctl_data.", 415 MAXPHYS); 416 } 417 hio->hio_ggio.gctl_length = MAXPHYS; 418 hio->hio_ggio.gctl_error = 0; 419 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 420 } 421} 422 423static bool 424init_resuid(struct hast_resource *res) 425{ 426 427 mtx_lock(&metadata_lock); 428 if (res->hr_resuid != 0) { 429 mtx_unlock(&metadata_lock); 430 return (false); 431 } else { 432 /* Initialize unique resource identifier. */ 433 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 434 mtx_unlock(&metadata_lock); 435 if (metadata_write(res) < 0) 436 exit(EX_NOINPUT); 437 return (true); 438 } 439} 440 441static void 442init_local(struct hast_resource *res) 443{ 444 unsigned char *buf; 445 size_t mapsize; 446 447 if (metadata_read(res, true) < 0) 448 exit(EX_NOINPUT); 449 mtx_init(&res->hr_amp_lock); 450 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 451 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 452 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 453 } 454 mtx_init(&range_lock); 455 cv_init(&range_regular_cond); 456 if (rangelock_init(&range_regular) < 0) 457 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 458 cv_init(&range_sync_cond); 459 if (rangelock_init(&range_sync) < 0) 460 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 461 mapsize = activemap_ondisk_size(res->hr_amp); 462 buf = calloc(1, mapsize); 463 if (buf == NULL) { 464 primary_exitx(EX_TEMPFAIL, 465 "Unable to allocate buffer for activemap."); 466 } 467 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 468 (ssize_t)mapsize) { 469 primary_exit(EX_NOINPUT, "Unable to read activemap"); 470 } 471 activemap_copyin(res->hr_amp, buf, mapsize); 472 free(buf); 473 if (res->hr_resuid != 0) 474 return; 475 /* 476 * We're using provider for the first time. Initialize local and remote 477 * counters. We don't initialize resuid here, as we want to do it just 478 * in time. The reason for this is that we want to inform secondary 479 * that there were no writes yet, so there is no need to synchronize 480 * anything. 481 */ 482 res->hr_primary_localcnt = 0; 483 res->hr_primary_remotecnt = 0; 484 if (metadata_write(res) < 0) 485 exit(EX_NOINPUT); 486} 487 488static int 489primary_connect(struct hast_resource *res, struct proto_conn **connp) 490{ 491 struct proto_conn *conn; 492 int16_t val; 493 494 val = 1; 495 if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 496 primary_exit(EX_TEMPFAIL, 497 "Unable to send connection request to parent"); 498 } 499 if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 500 primary_exit(EX_TEMPFAIL, 501 "Unable to receive reply to connection request from parent"); 502 } 503 if (val != 0) { 504 errno = val; 505 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 506 res->hr_remoteaddr); 507 return (-1); 508 } 509 if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 510 primary_exit(EX_TEMPFAIL, 511 "Unable to receive connection from parent"); 512 } 513 if (proto_connect_wait(conn, res->hr_timeout) < 0) { 514 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 515 res->hr_remoteaddr); 516 proto_close(conn); 517 return (-1); 518 } 519 /* Error in setting timeout is not critical, but why should it fail? */ 520 if (proto_timeout(conn, res->hr_timeout) < 0) 521 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 522 523 *connp = conn; 524 525 return (0); 526} 527 528static int 529init_remote(struct hast_resource *res, struct proto_conn **inp, 530 struct proto_conn **outp) 531{ 532 struct proto_conn *in, *out; 533 struct nv *nvout, *nvin; 534 const unsigned char *token; 535 unsigned char *map; 536 const char *errmsg; 537 int32_t extentsize; 538 int64_t datasize; 539 uint32_t mapsize; 540 size_t size; 541 int error; 542 543 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 544 PJDLOG_ASSERT(real_remote(res)); 545 546 in = out = NULL; 547 errmsg = NULL; 548 549 if (primary_connect(res, &out) == -1) 550 return (ECONNREFUSED); 551 552 error = ECONNABORTED; 553 554 /* 555 * First handshake step. 556 * Setup outgoing connection with remote node. 557 */ 558 nvout = nv_alloc(); 559 nv_add_string(nvout, res->hr_name, "resource"); 560 if (nv_error(nvout) != 0) { 561 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 562 "Unable to allocate header for connection with %s", 563 res->hr_remoteaddr); 564 nv_free(nvout); 565 goto close; 566 } 567 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 568 pjdlog_errno(LOG_WARNING, 569 "Unable to send handshake header to %s", 570 res->hr_remoteaddr); 571 nv_free(nvout); 572 goto close; 573 } 574 nv_free(nvout); 575 if (hast_proto_recv_hdr(out, &nvin) < 0) { 576 pjdlog_errno(LOG_WARNING, 577 "Unable to receive handshake header from %s", 578 res->hr_remoteaddr); 579 goto close; 580 } 581 errmsg = nv_get_string(nvin, "errmsg"); 582 if (errmsg != NULL) { 583 pjdlog_warning("%s", errmsg); 584 if (nv_exists(nvin, "wait")) 585 error = EBUSY; 586 nv_free(nvin); 587 goto close; 588 } 589 token = nv_get_uint8_array(nvin, &size, "token"); 590 if (token == NULL) { 591 pjdlog_warning("Handshake header from %s has no 'token' field.", 592 res->hr_remoteaddr); 593 nv_free(nvin); 594 goto close; 595 } 596 if (size != sizeof(res->hr_token)) { 597 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 598 res->hr_remoteaddr, size, sizeof(res->hr_token)); 599 nv_free(nvin); 600 goto close; 601 } 602 bcopy(token, res->hr_token, sizeof(res->hr_token)); 603 nv_free(nvin); 604 605 /* 606 * Second handshake step. 607 * Setup incoming connection with remote node. 608 */ 609 if (primary_connect(res, &in) == -1) 610 goto close; 611 612 nvout = nv_alloc(); 613 nv_add_string(nvout, res->hr_name, "resource"); 614 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 615 "token"); 616 if (res->hr_resuid == 0) { 617 /* 618 * The resuid field was not yet initialized. 619 * Because we do synchronization inside init_resuid(), it is 620 * possible that someone already initialized it, the function 621 * will return false then, but if we successfully initialized 622 * it, we will get true. True means that there were no writes 623 * to this resource yet and we want to inform secondary that 624 * synchronization is not needed by sending "virgin" argument. 625 */ 626 if (init_resuid(res)) 627 nv_add_int8(nvout, 1, "virgin"); 628 } 629 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 630 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 631 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 632 if (nv_error(nvout) != 0) { 633 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 634 "Unable to allocate header for connection with %s", 635 res->hr_remoteaddr); 636 nv_free(nvout); 637 goto close; 638 } 639 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 640 pjdlog_errno(LOG_WARNING, 641 "Unable to send handshake header to %s", 642 res->hr_remoteaddr); 643 nv_free(nvout); 644 goto close; 645 } 646 nv_free(nvout); 647 if (hast_proto_recv_hdr(out, &nvin) < 0) { 648 pjdlog_errno(LOG_WARNING, 649 "Unable to receive handshake header from %s", 650 res->hr_remoteaddr); 651 goto close; 652 } 653 errmsg = nv_get_string(nvin, "errmsg"); 654 if (errmsg != NULL) { 655 pjdlog_warning("%s", errmsg); 656 nv_free(nvin); 657 goto close; 658 } 659 datasize = nv_get_int64(nvin, "datasize"); 660 if (datasize != res->hr_datasize) { 661 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 662 (intmax_t)res->hr_datasize, (intmax_t)datasize); 663 nv_free(nvin); 664 goto close; 665 } 666 extentsize = nv_get_int32(nvin, "extentsize"); 667 if (extentsize != res->hr_extentsize) { 668 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 669 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 670 nv_free(nvin); 671 goto close; 672 } 673 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 674 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 675 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 676 if (nv_exists(nvin, "virgin")) { 677 /* 678 * Secondary was reinitialized, bump localcnt if it is 0 as 679 * only we have the data. 680 */ 681 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 682 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 683 684 if (res->hr_primary_localcnt == 0) { 685 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 686 687 mtx_lock(&metadata_lock); 688 res->hr_primary_localcnt++; 689 pjdlog_debug(1, "Increasing localcnt to %ju.", 690 (uintmax_t)res->hr_primary_localcnt); 691 (void)metadata_write(res); 692 mtx_unlock(&metadata_lock); 693 } 694 } 695 map = NULL; 696 mapsize = nv_get_uint32(nvin, "mapsize"); 697 if (mapsize > 0) { 698 map = malloc(mapsize); 699 if (map == NULL) { 700 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 701 (uintmax_t)mapsize); 702 nv_free(nvin); 703 goto close; 704 } 705 /* 706 * Remote node have some dirty extents on its own, lets 707 * download its activemap. 708 */ 709 if (hast_proto_recv_data(res, out, nvin, map, 710 mapsize) < 0) { 711 pjdlog_errno(LOG_ERR, 712 "Unable to receive remote activemap"); 713 nv_free(nvin); 714 free(map); 715 goto close; 716 } 717 /* 718 * Merge local and remote bitmaps. 719 */ 720 activemap_merge(res->hr_amp, map, mapsize); 721 free(map); 722 /* 723 * Now that we merged bitmaps from both nodes, flush it to the 724 * disk before we start to synchronize. 725 */ 726 (void)hast_activemap_flush(res); 727 } 728 nv_free(nvin); 729 /* Setup directions. */ 730 if (proto_send(out, NULL, 0) == -1) 731 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 732 if (proto_recv(in, NULL, 0) == -1) 733 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 734 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 735 if (inp != NULL && outp != NULL) { 736 *inp = in; 737 *outp = out; 738 } else { 739 res->hr_remotein = in; 740 res->hr_remoteout = out; 741 } 742 event_send(res, EVENT_CONNECT); 743 return (0); 744close: 745 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 746 event_send(res, EVENT_SPLITBRAIN); 747 proto_close(out); 748 if (in != NULL) 749 proto_close(in); 750 return (error); 751} 752 753static void 754sync_start(void) 755{ 756 757 mtx_lock(&sync_lock); 758 sync_inprogress = true; 759 mtx_unlock(&sync_lock); 760 cv_signal(&sync_cond); 761} 762 763static void 764sync_stop(void) 765{ 766 767 mtx_lock(&sync_lock); 768 if (sync_inprogress) 769 sync_inprogress = false; 770 mtx_unlock(&sync_lock); 771} 772 773static void 774init_ggate(struct hast_resource *res) 775{ 776 struct g_gate_ctl_create ggiocreate; 777 struct g_gate_ctl_cancel ggiocancel; 778 779 /* 780 * We communicate with ggate via /dev/ggctl. Open it. 781 */ 782 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 783 if (res->hr_ggatefd < 0) 784 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 785 /* 786 * Create provider before trying to connect, as connection failure 787 * is not critical, but may take some time. 788 */ 789 bzero(&ggiocreate, sizeof(ggiocreate)); 790 ggiocreate.gctl_version = G_GATE_VERSION; 791 ggiocreate.gctl_mediasize = res->hr_datasize; 792 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 793 ggiocreate.gctl_flags = 0; 794 ggiocreate.gctl_maxcount = 0; 795 ggiocreate.gctl_timeout = 0; 796 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 797 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 798 res->hr_provname); 799 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 800 pjdlog_info("Device hast/%s created.", res->hr_provname); 801 res->hr_ggateunit = ggiocreate.gctl_unit; 802 return; 803 } 804 if (errno != EEXIST) { 805 primary_exit(EX_OSERR, "Unable to create hast/%s device", 806 res->hr_provname); 807 } 808 pjdlog_debug(1, 809 "Device hast/%s already exists, we will try to take it over.", 810 res->hr_provname); 811 /* 812 * If we received EEXIST, we assume that the process who created the 813 * provider died and didn't clean up. In that case we will start from 814 * where he left of. 815 */ 816 bzero(&ggiocancel, sizeof(ggiocancel)); 817 ggiocancel.gctl_version = G_GATE_VERSION; 818 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 819 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 820 res->hr_provname); 821 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 822 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 823 res->hr_ggateunit = ggiocancel.gctl_unit; 824 return; 825 } 826 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 827 res->hr_provname); 828} 829 830void 831hastd_primary(struct hast_resource *res) 832{ 833 pthread_t td; 834 pid_t pid; 835 int error, mode, debuglevel; 836 837 /* 838 * Create communication channel for sending control commands from 839 * parent to child. 840 */ 841 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 842 /* TODO: There's no need for this to be fatal error. */ 843 KEEP_ERRNO((void)pidfile_remove(pfh)); 844 pjdlog_exit(EX_OSERR, 845 "Unable to create control sockets between parent and child"); 846 } 847 /* 848 * Create communication channel for sending events from child to parent. 849 */ 850 if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 851 /* TODO: There's no need for this to be fatal error. */ 852 KEEP_ERRNO((void)pidfile_remove(pfh)); 853 pjdlog_exit(EX_OSERR, 854 "Unable to create event sockets between child and parent"); 855 } 856 /* 857 * Create communication channel for sending connection requests from 858 * child to parent. 859 */ 860 if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 861 /* TODO: There's no need for this to be fatal error. */ 862 KEEP_ERRNO((void)pidfile_remove(pfh)); 863 pjdlog_exit(EX_OSERR, 864 "Unable to create connection sockets between child and parent"); 865 } 866 867 pid = fork(); 868 if (pid < 0) { 869 /* TODO: There's no need for this to be fatal error. */ 870 KEEP_ERRNO((void)pidfile_remove(pfh)); 871 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 872 } 873 874 if (pid > 0) { 875 /* This is parent. */ 876 /* Declare that we are receiver. */ 877 proto_recv(res->hr_event, NULL, 0); 878 proto_recv(res->hr_conn, NULL, 0); 879 /* Declare that we are sender. */ 880 proto_send(res->hr_ctrl, NULL, 0); 881 res->hr_workerpid = pid; 882 return; 883 } 884 885 gres = res; 886 mode = pjdlog_mode_get(); 887 debuglevel = pjdlog_debug_get(); 888 889 /* Declare that we are sender. */ 890 proto_send(res->hr_event, NULL, 0); 891 proto_send(res->hr_conn, NULL, 0); 892 /* Declare that we are receiver. */ 893 proto_recv(res->hr_ctrl, NULL, 0); 894 descriptors_cleanup(res); 895 896 descriptors_assert(res, mode); 897 898 pjdlog_init(mode); 899 pjdlog_debug_set(debuglevel); 900 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 901 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 902 903 init_local(res); 904 init_ggate(res); 905 init_environment(res); 906 907 if (drop_privs(res) != 0) { 908 cleanup(res); 909 exit(EX_CONFIG); 910 } 911 pjdlog_info("Privileges successfully dropped."); 912 913 /* 914 * Create the guard thread first, so we can handle signals from the 915 * very begining. 916 */ 917 error = pthread_create(&td, NULL, guard_thread, res); 918 PJDLOG_ASSERT(error == 0); 919 /* 920 * Create the control thread before sending any event to the parent, 921 * as we can deadlock when parent sends control request to worker, 922 * but worker has no control thread started yet, so parent waits. 923 * In the meantime worker sends an event to the parent, but parent 924 * is unable to handle the event, because it waits for control 925 * request response. 926 */ 927 error = pthread_create(&td, NULL, ctrl_thread, res); 928 PJDLOG_ASSERT(error == 0); 929 if (real_remote(res)) { 930 error = init_remote(res, NULL, NULL); 931 if (error == 0) { 932 sync_start(); 933 } else if (error == EBUSY) { 934 time_t start = time(NULL); 935 936 pjdlog_warning("Waiting for remote node to become %s for %ds.", 937 role2str(HAST_ROLE_SECONDARY), 938 res->hr_timeout); 939 for (;;) { 940 sleep(1); 941 error = init_remote(res, NULL, NULL); 942 if (error != EBUSY) 943 break; 944 if (time(NULL) > start + res->hr_timeout) 945 break; 946 } 947 if (error == EBUSY) { 948 pjdlog_warning("Remote node is still %s, starting anyway.", 949 role2str(HAST_ROLE_PRIMARY)); 950 } 951 } 952 } 953 error = pthread_create(&td, NULL, ggate_recv_thread, res); 954 PJDLOG_ASSERT(error == 0); 955 error = pthread_create(&td, NULL, local_send_thread, res); 956 PJDLOG_ASSERT(error == 0); 957 error = pthread_create(&td, NULL, remote_send_thread, res); 958 PJDLOG_ASSERT(error == 0); 959 error = pthread_create(&td, NULL, remote_recv_thread, res); 960 PJDLOG_ASSERT(error == 0); 961 error = pthread_create(&td, NULL, ggate_send_thread, res); 962 PJDLOG_ASSERT(error == 0); 963 fullystarted = true; 964 (void)sync_thread(res); 965} 966 967static void 968reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 969{ 970 char msg[1024]; 971 va_list ap; 972 int len; 973 974 va_start(ap, fmt); 975 len = vsnprintf(msg, sizeof(msg), fmt, ap); 976 va_end(ap); 977 if ((size_t)len < sizeof(msg)) { 978 switch (ggio->gctl_cmd) { 979 case BIO_READ: 980 (void)snprintf(msg + len, sizeof(msg) - len, 981 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 982 (uintmax_t)ggio->gctl_length); 983 break; 984 case BIO_DELETE: 985 (void)snprintf(msg + len, sizeof(msg) - len, 986 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 987 (uintmax_t)ggio->gctl_length); 988 break; 989 case BIO_FLUSH: 990 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 991 break; 992 case BIO_WRITE: 993 (void)snprintf(msg + len, sizeof(msg) - len, 994 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 995 (uintmax_t)ggio->gctl_length); 996 break; 997 default: 998 (void)snprintf(msg + len, sizeof(msg) - len, 999 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 1000 break; 1001 } 1002 } 1003 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1004} 1005 1006static void 1007remote_close(struct hast_resource *res, int ncomp) 1008{ 1009 1010 rw_wlock(&hio_remote_lock[ncomp]); 1011 /* 1012 * A race is possible between dropping rlock and acquiring wlock - 1013 * another thread can close connection in-between. 1014 */ 1015 if (!ISCONNECTED(res, ncomp)) { 1016 PJDLOG_ASSERT(res->hr_remotein == NULL); 1017 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1018 rw_unlock(&hio_remote_lock[ncomp]); 1019 return; 1020 } 1021 1022 PJDLOG_ASSERT(res->hr_remotein != NULL); 1023 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1024 1025 pjdlog_debug(2, "Closing incoming connection to %s.", 1026 res->hr_remoteaddr); 1027 proto_close(res->hr_remotein); 1028 res->hr_remotein = NULL; 1029 pjdlog_debug(2, "Closing outgoing connection to %s.", 1030 res->hr_remoteaddr); 1031 proto_close(res->hr_remoteout); 1032 res->hr_remoteout = NULL; 1033 1034 rw_unlock(&hio_remote_lock[ncomp]); 1035 1036 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1037 1038 /* 1039 * Stop synchronization if in-progress. 1040 */ 1041 sync_stop(); 1042 1043 event_send(res, EVENT_DISCONNECT); 1044} 1045 1046/* 1047 * Thread receives ggate I/O requests from the kernel and passes them to 1048 * appropriate threads: 1049 * WRITE - always goes to both local_send and remote_send threads 1050 * READ (when the block is up-to-date on local component) - 1051 * only local_send thread 1052 * READ (when the block isn't up-to-date on local component) - 1053 * only remote_send thread 1054 * DELETE - always goes to both local_send and remote_send threads 1055 * FLUSH - always goes to both local_send and remote_send threads 1056 */ 1057static void * 1058ggate_recv_thread(void *arg) 1059{ 1060 struct hast_resource *res = arg; 1061 struct g_gate_ctl_io *ggio; 1062 struct hio *hio; 1063 unsigned int ii, ncomp, ncomps; 1064 int error; 1065 1066 ncomps = HAST_NCOMPONENTS; 1067 1068 for (;;) { 1069 pjdlog_debug(2, "ggate_recv: Taking free request."); 1070 QUEUE_TAKE2(hio, free); 1071 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1072 ggio = &hio->hio_ggio; 1073 ggio->gctl_unit = res->hr_ggateunit; 1074 ggio->gctl_length = MAXPHYS; 1075 ggio->gctl_error = 0; 1076 pjdlog_debug(2, 1077 "ggate_recv: (%p) Waiting for request from the kernel.", 1078 hio); 1079 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1080 if (sigexit_received) 1081 pthread_exit(NULL); 1082 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1083 } 1084 error = ggio->gctl_error; 1085 switch (error) { 1086 case 0: 1087 break; 1088 case ECANCELED: 1089 /* Exit gracefully. */ 1090 if (!sigexit_received) { 1091 pjdlog_debug(2, 1092 "ggate_recv: (%p) Received cancel from the kernel.", 1093 hio); 1094 pjdlog_info("Received cancel from the kernel, exiting."); 1095 } 1096 pthread_exit(NULL); 1097 case ENOMEM: 1098 /* 1099 * Buffer too small? Impossible, we allocate MAXPHYS 1100 * bytes - request can't be bigger than that. 1101 */ 1102 /* FALLTHROUGH */ 1103 case ENXIO: 1104 default: 1105 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1106 strerror(error)); 1107 } 1108 for (ii = 0; ii < ncomps; ii++) 1109 hio->hio_errors[ii] = EINVAL; 1110 reqlog(LOG_DEBUG, 2, ggio, 1111 "ggate_recv: (%p) Request received from the kernel: ", 1112 hio); 1113 /* 1114 * Inform all components about new write request. 1115 * For read request prefer local component unless the given 1116 * range is out-of-date, then use remote component. 1117 */ 1118 switch (ggio->gctl_cmd) { 1119 case BIO_READ: 1120 res->hr_stat_read++; 1121 pjdlog_debug(2, 1122 "ggate_recv: (%p) Moving request to the send queue.", 1123 hio); 1124 refcount_init(&hio->hio_countdown, 1); 1125 mtx_lock(&metadata_lock); 1126 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1127 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1128 /* 1129 * This range is up-to-date on local component, 1130 * so handle request locally. 1131 */ 1132 /* Local component is 0 for now. */ 1133 ncomp = 0; 1134 } else /* if (res->hr_syncsrc == 1135 HAST_SYNCSRC_SECONDARY) */ { 1136 PJDLOG_ASSERT(res->hr_syncsrc == 1137 HAST_SYNCSRC_SECONDARY); 1138 /* 1139 * This range is out-of-date on local component, 1140 * so send request to the remote node. 1141 */ 1142 /* Remote component is 1 for now. */ 1143 ncomp = 1; 1144 } 1145 mtx_unlock(&metadata_lock); 1146 QUEUE_INSERT1(hio, send, ncomp); 1147 break; 1148 case BIO_WRITE: 1149 res->hr_stat_write++; 1150 if (res->hr_resuid == 0) { 1151 /* 1152 * This is first write, initialize localcnt and 1153 * resuid. 1154 */ 1155 res->hr_primary_localcnt = 1; 1156 (void)init_resuid(res); 1157 } 1158 for (;;) { 1159 mtx_lock(&range_lock); 1160 if (rangelock_islocked(range_sync, 1161 ggio->gctl_offset, ggio->gctl_length)) { 1162 pjdlog_debug(2, 1163 "regular: Range offset=%jd length=%zu locked.", 1164 (intmax_t)ggio->gctl_offset, 1165 (size_t)ggio->gctl_length); 1166 range_regular_wait = true; 1167 cv_wait(&range_regular_cond, &range_lock); 1168 range_regular_wait = false; 1169 mtx_unlock(&range_lock); 1170 continue; 1171 } 1172 if (rangelock_add(range_regular, 1173 ggio->gctl_offset, ggio->gctl_length) < 0) { 1174 mtx_unlock(&range_lock); 1175 pjdlog_debug(2, 1176 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1177 (intmax_t)ggio->gctl_offset, 1178 (size_t)ggio->gctl_length); 1179 sleep(1); 1180 continue; 1181 } 1182 mtx_unlock(&range_lock); 1183 break; 1184 } 1185 mtx_lock(&res->hr_amp_lock); 1186 if (activemap_write_start(res->hr_amp, 1187 ggio->gctl_offset, ggio->gctl_length)) { 1188 res->hr_stat_activemap_update++; 1189 (void)hast_activemap_flush(res); 1190 } 1191 mtx_unlock(&res->hr_amp_lock); 1192 /* FALLTHROUGH */ 1193 case BIO_DELETE: 1194 case BIO_FLUSH: 1195 switch (ggio->gctl_cmd) { 1196 case BIO_DELETE: 1197 res->hr_stat_delete++; 1198 break; 1199 case BIO_FLUSH: 1200 res->hr_stat_flush++; 1201 break; 1202 } 1203 pjdlog_debug(2, 1204 "ggate_recv: (%p) Moving request to the send queues.", 1205 hio); 1206 refcount_init(&hio->hio_countdown, ncomps); 1207 for (ii = 0; ii < ncomps; ii++) 1208 QUEUE_INSERT1(hio, send, ii); 1209 break; 1210 } 1211 } 1212 /* NOTREACHED */ 1213 return (NULL); 1214} 1215 1216/* 1217 * Thread reads from or writes to local component. 1218 * If local read fails, it redirects it to remote_send thread. 1219 */ 1220static void * 1221local_send_thread(void *arg) 1222{ 1223 struct hast_resource *res = arg; 1224 struct g_gate_ctl_io *ggio; 1225 struct hio *hio; 1226 unsigned int ncomp, rncomp; 1227 ssize_t ret; 1228 1229 /* Local component is 0 for now. */ 1230 ncomp = 0; 1231 /* Remote component is 1 for now. */ 1232 rncomp = 1; 1233 1234 for (;;) { 1235 pjdlog_debug(2, "local_send: Taking request."); 1236 QUEUE_TAKE1(hio, send, ncomp, 0); 1237 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1238 ggio = &hio->hio_ggio; 1239 switch (ggio->gctl_cmd) { 1240 case BIO_READ: 1241 ret = pread(res->hr_localfd, ggio->gctl_data, 1242 ggio->gctl_length, 1243 ggio->gctl_offset + res->hr_localoff); 1244 if (ret == ggio->gctl_length) 1245 hio->hio_errors[ncomp] = 0; 1246 else { 1247 /* 1248 * If READ failed, try to read from remote node. 1249 */ 1250 if (ret < 0) { 1251 reqlog(LOG_WARNING, 0, ggio, 1252 "Local request failed (%s), trying remote node. ", 1253 strerror(errno)); 1254 } else if (ret != ggio->gctl_length) { 1255 reqlog(LOG_WARNING, 0, ggio, 1256 "Local request failed (%zd != %jd), trying remote node. ", 1257 ret, (intmax_t)ggio->gctl_length); 1258 } 1259 QUEUE_INSERT1(hio, send, rncomp); 1260 continue; 1261 } 1262 break; 1263 case BIO_WRITE: 1264 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1265 ggio->gctl_length, 1266 ggio->gctl_offset + res->hr_localoff); 1267 if (ret < 0) { 1268 hio->hio_errors[ncomp] = errno; 1269 reqlog(LOG_WARNING, 0, ggio, 1270 "Local request failed (%s): ", 1271 strerror(errno)); 1272 } else if (ret != ggio->gctl_length) { 1273 hio->hio_errors[ncomp] = EIO; 1274 reqlog(LOG_WARNING, 0, ggio, 1275 "Local request failed (%zd != %jd): ", 1276 ret, (intmax_t)ggio->gctl_length); 1277 } else { 1278 hio->hio_errors[ncomp] = 0; 1279 } 1280 break; 1281 case BIO_DELETE: 1282 ret = g_delete(res->hr_localfd, 1283 ggio->gctl_offset + res->hr_localoff, 1284 ggio->gctl_length); 1285 if (ret < 0) { 1286 hio->hio_errors[ncomp] = errno; 1287 reqlog(LOG_WARNING, 0, ggio, 1288 "Local request failed (%s): ", 1289 strerror(errno)); 1290 } else { 1291 hio->hio_errors[ncomp] = 0; 1292 } 1293 break; 1294 case BIO_FLUSH: 1295 ret = g_flush(res->hr_localfd); 1296 if (ret < 0) { 1297 hio->hio_errors[ncomp] = errno; 1298 reqlog(LOG_WARNING, 0, ggio, 1299 "Local request failed (%s): ", 1300 strerror(errno)); 1301 } else { 1302 hio->hio_errors[ncomp] = 0; 1303 } 1304 break; 1305 } 1306 if (refcount_release(&hio->hio_countdown)) { 1307 if (ISSYNCREQ(hio)) { 1308 mtx_lock(&sync_lock); 1309 SYNCREQDONE(hio); 1310 mtx_unlock(&sync_lock); 1311 cv_signal(&sync_cond); 1312 } else { 1313 pjdlog_debug(2, 1314 "local_send: (%p) Moving request to the done queue.", 1315 hio); 1316 QUEUE_INSERT2(hio, done); 1317 } 1318 } 1319 } 1320 /* NOTREACHED */ 1321 return (NULL); 1322} 1323 1324static void 1325keepalive_send(struct hast_resource *res, unsigned int ncomp) 1326{ 1327 struct nv *nv; 1328 1329 rw_rlock(&hio_remote_lock[ncomp]); 1330 1331 if (!ISCONNECTED(res, ncomp)) { 1332 rw_unlock(&hio_remote_lock[ncomp]); 1333 return; 1334 } 1335 1336 PJDLOG_ASSERT(res->hr_remotein != NULL); 1337 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1338 1339 nv = nv_alloc(); 1340 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1341 if (nv_error(nv) != 0) { 1342 rw_unlock(&hio_remote_lock[ncomp]); 1343 nv_free(nv); 1344 pjdlog_debug(1, 1345 "keepalive_send: Unable to prepare header to send."); 1346 return; 1347 } 1348 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1349 rw_unlock(&hio_remote_lock[ncomp]); 1350 pjdlog_common(LOG_DEBUG, 1, errno, 1351 "keepalive_send: Unable to send request"); 1352 nv_free(nv); 1353 remote_close(res, ncomp); 1354 return; 1355 } 1356 1357 rw_unlock(&hio_remote_lock[ncomp]); 1358 nv_free(nv); 1359 pjdlog_debug(2, "keepalive_send: Request sent."); 1360} 1361 1362/* 1363 * Thread sends request to secondary node. 1364 */ 1365static void * 1366remote_send_thread(void *arg) 1367{ 1368 struct hast_resource *res = arg; 1369 struct g_gate_ctl_io *ggio; 1370 time_t lastcheck, now; 1371 struct hio *hio; 1372 struct nv *nv; 1373 unsigned int ncomp; 1374 bool wakeup; 1375 uint64_t offset, length; 1376 uint8_t cmd; 1377 void *data; 1378 1379 /* Remote component is 1 for now. */ 1380 ncomp = 1; 1381 lastcheck = time(NULL); 1382 1383 for (;;) { 1384 pjdlog_debug(2, "remote_send: Taking request."); 1385 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1386 if (hio == NULL) { 1387 now = time(NULL); 1388 if (lastcheck + HAST_KEEPALIVE <= now) { 1389 keepalive_send(res, ncomp); 1390 lastcheck = now; 1391 } 1392 continue; 1393 } 1394 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1395 ggio = &hio->hio_ggio; 1396 switch (ggio->gctl_cmd) { 1397 case BIO_READ: 1398 cmd = HIO_READ; 1399 data = NULL; 1400 offset = ggio->gctl_offset; 1401 length = ggio->gctl_length; 1402 break; 1403 case BIO_WRITE: 1404 cmd = HIO_WRITE; 1405 data = ggio->gctl_data; 1406 offset = ggio->gctl_offset; 1407 length = ggio->gctl_length; 1408 break; 1409 case BIO_DELETE: 1410 cmd = HIO_DELETE; 1411 data = NULL; 1412 offset = ggio->gctl_offset; 1413 length = ggio->gctl_length; 1414 break; 1415 case BIO_FLUSH: 1416 cmd = HIO_FLUSH; 1417 data = NULL; 1418 offset = 0; 1419 length = 0; 1420 break; 1421 default: 1422 PJDLOG_ASSERT(!"invalid condition"); 1423 abort(); 1424 } 1425 nv = nv_alloc(); 1426 nv_add_uint8(nv, cmd, "cmd"); 1427 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1428 nv_add_uint64(nv, offset, "offset"); 1429 nv_add_uint64(nv, length, "length"); 1430 if (nv_error(nv) != 0) { 1431 hio->hio_errors[ncomp] = nv_error(nv); 1432 pjdlog_debug(2, 1433 "remote_send: (%p) Unable to prepare header to send.", 1434 hio); 1435 reqlog(LOG_ERR, 0, ggio, 1436 "Unable to prepare header to send (%s): ", 1437 strerror(nv_error(nv))); 1438 /* Move failed request immediately to the done queue. */ 1439 goto done_queue; 1440 } 1441 pjdlog_debug(2, 1442 "remote_send: (%p) Moving request to the recv queue.", 1443 hio); 1444 /* 1445 * Protect connection from disappearing. 1446 */ 1447 rw_rlock(&hio_remote_lock[ncomp]); 1448 if (!ISCONNECTED(res, ncomp)) { 1449 rw_unlock(&hio_remote_lock[ncomp]); 1450 hio->hio_errors[ncomp] = ENOTCONN; 1451 goto done_queue; 1452 } 1453 /* 1454 * Move the request to recv queue before sending it, because 1455 * in different order we can get reply before we move request 1456 * to recv queue. 1457 */ 1458 mtx_lock(&hio_recv_list_lock[ncomp]); 1459 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1460 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1461 mtx_unlock(&hio_recv_list_lock[ncomp]); 1462 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1463 data != NULL ? length : 0) < 0) { 1464 hio->hio_errors[ncomp] = errno; 1465 rw_unlock(&hio_remote_lock[ncomp]); 1466 pjdlog_debug(2, 1467 "remote_send: (%p) Unable to send request.", hio); 1468 reqlog(LOG_ERR, 0, ggio, 1469 "Unable to send request (%s): ", 1470 strerror(hio->hio_errors[ncomp])); 1471 remote_close(res, ncomp); 1472 /* 1473 * Take request back from the receive queue and move 1474 * it immediately to the done queue. 1475 */ 1476 mtx_lock(&hio_recv_list_lock[ncomp]); 1477 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1478 mtx_unlock(&hio_recv_list_lock[ncomp]); 1479 goto done_queue; 1480 } 1481 rw_unlock(&hio_remote_lock[ncomp]); 1482 nv_free(nv); 1483 if (wakeup) 1484 cv_signal(&hio_recv_list_cond[ncomp]); 1485 continue; 1486done_queue: 1487 nv_free(nv); 1488 if (ISSYNCREQ(hio)) { 1489 if (!refcount_release(&hio->hio_countdown)) 1490 continue; 1491 mtx_lock(&sync_lock); 1492 SYNCREQDONE(hio); 1493 mtx_unlock(&sync_lock); 1494 cv_signal(&sync_cond); 1495 continue; 1496 } 1497 if (ggio->gctl_cmd == BIO_WRITE) { 1498 mtx_lock(&res->hr_amp_lock); 1499 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1500 ggio->gctl_length)) { 1501 (void)hast_activemap_flush(res); 1502 } 1503 mtx_unlock(&res->hr_amp_lock); 1504 } 1505 if (!refcount_release(&hio->hio_countdown)) 1506 continue; 1507 pjdlog_debug(2, 1508 "remote_send: (%p) Moving request to the done queue.", 1509 hio); 1510 QUEUE_INSERT2(hio, done); 1511 } 1512 /* NOTREACHED */ 1513 return (NULL); 1514} 1515 1516/* 1517 * Thread receives answer from secondary node and passes it to ggate_send 1518 * thread. 1519 */ 1520static void * 1521remote_recv_thread(void *arg) 1522{ 1523 struct hast_resource *res = arg; 1524 struct g_gate_ctl_io *ggio; 1525 struct hio *hio; 1526 struct nv *nv; 1527 unsigned int ncomp; 1528 uint64_t seq; 1529 int error; 1530 1531 /* Remote component is 1 for now. */ 1532 ncomp = 1; 1533 1534 for (;;) { 1535 /* Wait until there is anything to receive. */ 1536 mtx_lock(&hio_recv_list_lock[ncomp]); 1537 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1538 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1539 cv_wait(&hio_recv_list_cond[ncomp], 1540 &hio_recv_list_lock[ncomp]); 1541 } 1542 mtx_unlock(&hio_recv_list_lock[ncomp]); 1543 rw_rlock(&hio_remote_lock[ncomp]); 1544 if (!ISCONNECTED(res, ncomp)) { 1545 rw_unlock(&hio_remote_lock[ncomp]); 1546 /* 1547 * Connection is dead, so move all pending requests to 1548 * the done queue (one-by-one). 1549 */ 1550 mtx_lock(&hio_recv_list_lock[ncomp]); 1551 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1552 PJDLOG_ASSERT(hio != NULL); 1553 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1554 hio_next[ncomp]); 1555 mtx_unlock(&hio_recv_list_lock[ncomp]); 1556 goto done_queue; 1557 } 1558 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1559 pjdlog_errno(LOG_ERR, 1560 "Unable to receive reply header"); 1561 rw_unlock(&hio_remote_lock[ncomp]); 1562 remote_close(res, ncomp); 1563 continue; 1564 } 1565 rw_unlock(&hio_remote_lock[ncomp]); 1566 seq = nv_get_uint64(nv, "seq"); 1567 if (seq == 0) { 1568 pjdlog_error("Header contains no 'seq' field."); 1569 nv_free(nv); 1570 continue; 1571 } 1572 mtx_lock(&hio_recv_list_lock[ncomp]); 1573 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1574 if (hio->hio_ggio.gctl_seq == seq) { 1575 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1576 hio_next[ncomp]); 1577 break; 1578 } 1579 } 1580 mtx_unlock(&hio_recv_list_lock[ncomp]); 1581 if (hio == NULL) { 1582 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1583 (uintmax_t)seq); 1584 nv_free(nv); 1585 continue; 1586 } 1587 error = nv_get_int16(nv, "error"); 1588 if (error != 0) { 1589 /* Request failed on remote side. */ 1590 hio->hio_errors[ncomp] = error; 1591 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1592 "Remote request failed (%s): ", strerror(error)); 1593 nv_free(nv); 1594 goto done_queue; 1595 } 1596 ggio = &hio->hio_ggio; 1597 switch (ggio->gctl_cmd) { 1598 case BIO_READ: 1599 rw_rlock(&hio_remote_lock[ncomp]); 1600 if (!ISCONNECTED(res, ncomp)) { 1601 rw_unlock(&hio_remote_lock[ncomp]); 1602 nv_free(nv); 1603 goto done_queue; 1604 } 1605 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1606 ggio->gctl_data, ggio->gctl_length) < 0) { 1607 hio->hio_errors[ncomp] = errno; 1608 pjdlog_errno(LOG_ERR, 1609 "Unable to receive reply data"); 1610 rw_unlock(&hio_remote_lock[ncomp]); 1611 nv_free(nv); 1612 remote_close(res, ncomp); 1613 goto done_queue; 1614 } 1615 rw_unlock(&hio_remote_lock[ncomp]); 1616 break; 1617 case BIO_WRITE: 1618 case BIO_DELETE: 1619 case BIO_FLUSH: 1620 break; 1621 default: 1622 PJDLOG_ASSERT(!"invalid condition"); 1623 abort(); 1624 } 1625 hio->hio_errors[ncomp] = 0; 1626 nv_free(nv); 1627done_queue: 1628 if (refcount_release(&hio->hio_countdown)) { 1629 if (ISSYNCREQ(hio)) { 1630 mtx_lock(&sync_lock); 1631 SYNCREQDONE(hio); 1632 mtx_unlock(&sync_lock); 1633 cv_signal(&sync_cond); 1634 } else { 1635 pjdlog_debug(2, 1636 "remote_recv: (%p) Moving request to the done queue.", 1637 hio); 1638 QUEUE_INSERT2(hio, done); 1639 } 1640 } 1641 } 1642 /* NOTREACHED */ 1643 return (NULL); 1644} 1645 1646/* 1647 * Thread sends answer to the kernel. 1648 */ 1649static void * 1650ggate_send_thread(void *arg) 1651{ 1652 struct hast_resource *res = arg; 1653 struct g_gate_ctl_io *ggio; 1654 struct hio *hio; 1655 unsigned int ii, ncomp, ncomps; 1656 1657 ncomps = HAST_NCOMPONENTS; 1658 1659 for (;;) { 1660 pjdlog_debug(2, "ggate_send: Taking request."); 1661 QUEUE_TAKE2(hio, done); 1662 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1663 ggio = &hio->hio_ggio; 1664 for (ii = 0; ii < ncomps; ii++) { 1665 if (hio->hio_errors[ii] == 0) { 1666 /* 1667 * One successful request is enough to declare 1668 * success. 1669 */ 1670 ggio->gctl_error = 0; 1671 break; 1672 } 1673 } 1674 if (ii == ncomps) { 1675 /* 1676 * None of the requests were successful. 1677 * Use the error from local component except the 1678 * case when we did only remote request. 1679 */ 1680 if (ggio->gctl_cmd == BIO_READ && 1681 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1682 ggio->gctl_error = hio->hio_errors[1]; 1683 else 1684 ggio->gctl_error = hio->hio_errors[0]; 1685 } 1686 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1687 mtx_lock(&res->hr_amp_lock); 1688 activemap_write_complete(res->hr_amp, 1689 ggio->gctl_offset, ggio->gctl_length); 1690 mtx_unlock(&res->hr_amp_lock); 1691 } 1692 if (ggio->gctl_cmd == BIO_WRITE) { 1693 /* 1694 * Unlock range we locked. 1695 */ 1696 mtx_lock(&range_lock); 1697 rangelock_del(range_regular, ggio->gctl_offset, 1698 ggio->gctl_length); 1699 if (range_sync_wait) 1700 cv_signal(&range_sync_cond); 1701 mtx_unlock(&range_lock); 1702 /* 1703 * Bump local count if this is first write after 1704 * connection failure with remote node. 1705 */ 1706 ncomp = 1; 1707 rw_rlock(&hio_remote_lock[ncomp]); 1708 if (!ISCONNECTED(res, ncomp)) { 1709 mtx_lock(&metadata_lock); 1710 if (res->hr_primary_localcnt == 1711 res->hr_secondary_remotecnt) { 1712 res->hr_primary_localcnt++; 1713 pjdlog_debug(1, 1714 "Increasing localcnt to %ju.", 1715 (uintmax_t)res->hr_primary_localcnt); 1716 (void)metadata_write(res); 1717 } 1718 mtx_unlock(&metadata_lock); 1719 } 1720 rw_unlock(&hio_remote_lock[ncomp]); 1721 } 1722 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1723 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1724 pjdlog_debug(2, 1725 "ggate_send: (%p) Moving request to the free queue.", hio); 1726 QUEUE_INSERT2(hio, free); 1727 } 1728 /* NOTREACHED */ 1729 return (NULL); 1730} 1731 1732/* 1733 * Thread synchronize local and remote components. 1734 */ 1735static void * 1736sync_thread(void *arg __unused) 1737{ 1738 struct hast_resource *res = arg; 1739 struct hio *hio; 1740 struct g_gate_ctl_io *ggio; 1741 struct timeval tstart, tend, tdiff; 1742 unsigned int ii, ncomp, ncomps; 1743 off_t offset, length, synced; 1744 bool dorewind; 1745 int syncext; 1746 1747 ncomps = HAST_NCOMPONENTS; 1748 dorewind = true; 1749 synced = 0; 1750 offset = -1; 1751 1752 for (;;) { 1753 mtx_lock(&sync_lock); 1754 if (offset >= 0 && !sync_inprogress) { 1755 gettimeofday(&tend, NULL); 1756 timersub(&tend, &tstart, &tdiff); 1757 pjdlog_info("Synchronization interrupted after %#.0T. " 1758 "%NB synchronized so far.", &tdiff, 1759 (intmax_t)synced); 1760 event_send(res, EVENT_SYNCINTR); 1761 } 1762 while (!sync_inprogress) { 1763 dorewind = true; 1764 synced = 0; 1765 cv_wait(&sync_cond, &sync_lock); 1766 } 1767 mtx_unlock(&sync_lock); 1768 /* 1769 * Obtain offset at which we should synchronize. 1770 * Rewind synchronization if needed. 1771 */ 1772 mtx_lock(&res->hr_amp_lock); 1773 if (dorewind) 1774 activemap_sync_rewind(res->hr_amp); 1775 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1776 if (syncext != -1) { 1777 /* 1778 * We synchronized entire syncext extent, we can mark 1779 * it as clean now. 1780 */ 1781 if (activemap_extent_complete(res->hr_amp, syncext)) 1782 (void)hast_activemap_flush(res); 1783 } 1784 mtx_unlock(&res->hr_amp_lock); 1785 if (dorewind) { 1786 dorewind = false; 1787 if (offset < 0) 1788 pjdlog_info("Nodes are in sync."); 1789 else { 1790 pjdlog_info("Synchronization started. %NB to go.", 1791 (intmax_t)(res->hr_extentsize * 1792 activemap_ndirty(res->hr_amp))); 1793 event_send(res, EVENT_SYNCSTART); 1794 gettimeofday(&tstart, NULL); 1795 } 1796 } 1797 if (offset < 0) { 1798 sync_stop(); 1799 pjdlog_debug(1, "Nothing to synchronize."); 1800 /* 1801 * Synchronization complete, make both localcnt and 1802 * remotecnt equal. 1803 */ 1804 ncomp = 1; 1805 rw_rlock(&hio_remote_lock[ncomp]); 1806 if (ISCONNECTED(res, ncomp)) { 1807 if (synced > 0) { 1808 int64_t bps; 1809 1810 gettimeofday(&tend, NULL); 1811 timersub(&tend, &tstart, &tdiff); 1812 bps = (int64_t)((double)synced / 1813 ((double)tdiff.tv_sec + 1814 (double)tdiff.tv_usec / 1000000)); 1815 pjdlog_info("Synchronization complete. " 1816 "%NB synchronized in %#.0lT (%NB/sec).", 1817 (intmax_t)synced, &tdiff, 1818 (intmax_t)bps); 1819 event_send(res, EVENT_SYNCDONE); 1820 } 1821 mtx_lock(&metadata_lock); 1822 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1823 res->hr_primary_localcnt = 1824 res->hr_secondary_remotecnt; 1825 res->hr_primary_remotecnt = 1826 res->hr_secondary_localcnt; 1827 pjdlog_debug(1, 1828 "Setting localcnt to %ju and remotecnt to %ju.", 1829 (uintmax_t)res->hr_primary_localcnt, 1830 (uintmax_t)res->hr_primary_remotecnt); 1831 (void)metadata_write(res); 1832 mtx_unlock(&metadata_lock); 1833 } 1834 rw_unlock(&hio_remote_lock[ncomp]); 1835 continue; 1836 } 1837 pjdlog_debug(2, "sync: Taking free request."); 1838 QUEUE_TAKE2(hio, free); 1839 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1840 /* 1841 * Lock the range we are going to synchronize. We don't want 1842 * race where someone writes between our read and write. 1843 */ 1844 for (;;) { 1845 mtx_lock(&range_lock); 1846 if (rangelock_islocked(range_regular, offset, length)) { 1847 pjdlog_debug(2, 1848 "sync: Range offset=%jd length=%jd locked.", 1849 (intmax_t)offset, (intmax_t)length); 1850 range_sync_wait = true; 1851 cv_wait(&range_sync_cond, &range_lock); 1852 range_sync_wait = false; 1853 mtx_unlock(&range_lock); 1854 continue; 1855 } 1856 if (rangelock_add(range_sync, offset, length) < 0) { 1857 mtx_unlock(&range_lock); 1858 pjdlog_debug(2, 1859 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1860 (intmax_t)offset, (intmax_t)length); 1861 sleep(1); 1862 continue; 1863 } 1864 mtx_unlock(&range_lock); 1865 break; 1866 } 1867 /* 1868 * First read the data from synchronization source. 1869 */ 1870 SYNCREQ(hio); 1871 ggio = &hio->hio_ggio; 1872 ggio->gctl_cmd = BIO_READ; 1873 ggio->gctl_offset = offset; 1874 ggio->gctl_length = length; 1875 ggio->gctl_error = 0; 1876 for (ii = 0; ii < ncomps; ii++) 1877 hio->hio_errors[ii] = EINVAL; 1878 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1879 hio); 1880 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1881 hio); 1882 mtx_lock(&metadata_lock); 1883 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1884 /* 1885 * This range is up-to-date on local component, 1886 * so handle request locally. 1887 */ 1888 /* Local component is 0 for now. */ 1889 ncomp = 0; 1890 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1891 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1892 /* 1893 * This range is out-of-date on local component, 1894 * so send request to the remote node. 1895 */ 1896 /* Remote component is 1 for now. */ 1897 ncomp = 1; 1898 } 1899 mtx_unlock(&metadata_lock); 1900 refcount_init(&hio->hio_countdown, 1); 1901 QUEUE_INSERT1(hio, send, ncomp); 1902 1903 /* 1904 * Let's wait for READ to finish. 1905 */ 1906 mtx_lock(&sync_lock); 1907 while (!ISSYNCREQDONE(hio)) 1908 cv_wait(&sync_cond, &sync_lock); 1909 mtx_unlock(&sync_lock); 1910 1911 if (hio->hio_errors[ncomp] != 0) { 1912 pjdlog_error("Unable to read synchronization data: %s.", 1913 strerror(hio->hio_errors[ncomp])); 1914 goto free_queue; 1915 } 1916 1917 /* 1918 * We read the data from synchronization source, now write it 1919 * to synchronization target. 1920 */ 1921 SYNCREQ(hio); 1922 ggio->gctl_cmd = BIO_WRITE; 1923 for (ii = 0; ii < ncomps; ii++) 1924 hio->hio_errors[ii] = EINVAL; 1925 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1926 hio); 1927 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1928 hio); 1929 mtx_lock(&metadata_lock); 1930 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1931 /* 1932 * This range is up-to-date on local component, 1933 * so we update remote component. 1934 */ 1935 /* Remote component is 1 for now. */ 1936 ncomp = 1; 1937 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1938 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1939 /* 1940 * This range is out-of-date on local component, 1941 * so we update it. 1942 */ 1943 /* Local component is 0 for now. */ 1944 ncomp = 0; 1945 } 1946 mtx_unlock(&metadata_lock); 1947 1948 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1949 hio); 1950 refcount_init(&hio->hio_countdown, 1); 1951 QUEUE_INSERT1(hio, send, ncomp); 1952 1953 /* 1954 * Let's wait for WRITE to finish. 1955 */ 1956 mtx_lock(&sync_lock); 1957 while (!ISSYNCREQDONE(hio)) 1958 cv_wait(&sync_cond, &sync_lock); 1959 mtx_unlock(&sync_lock); 1960 1961 if (hio->hio_errors[ncomp] != 0) { 1962 pjdlog_error("Unable to write synchronization data: %s.", 1963 strerror(hio->hio_errors[ncomp])); 1964 goto free_queue; 1965 } 1966 1967 synced += length; 1968free_queue: 1969 mtx_lock(&range_lock); 1970 rangelock_del(range_sync, offset, length); 1971 if (range_regular_wait) 1972 cv_signal(&range_regular_cond); 1973 mtx_unlock(&range_lock); 1974 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1975 hio); 1976 QUEUE_INSERT2(hio, free); 1977 } 1978 /* NOTREACHED */ 1979 return (NULL); 1980} 1981 1982void 1983primary_config_reload(struct hast_resource *res, struct nv *nv) 1984{ 1985 unsigned int ii, ncomps; 1986 int modified, vint; 1987 const char *vstr; 1988 1989 pjdlog_info("Reloading configuration..."); 1990 1991 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1992 PJDLOG_ASSERT(gres == res); 1993 nv_assert(nv, "remoteaddr"); 1994 nv_assert(nv, "sourceaddr"); 1995 nv_assert(nv, "replication"); 1996 nv_assert(nv, "checksum"); 1997 nv_assert(nv, "compression"); 1998 nv_assert(nv, "timeout"); 1999 nv_assert(nv, "exec"); 2000 2001 ncomps = HAST_NCOMPONENTS; 2002 2003#define MODIFIED_REMOTEADDR 0x01 2004#define MODIFIED_SOURCEADDR 0x02 2005#define MODIFIED_REPLICATION 0x04 2006#define MODIFIED_CHECKSUM 0x08 2007#define MODIFIED_COMPRESSION 0x10 2008#define MODIFIED_TIMEOUT 0x20 2009#define MODIFIED_EXEC 0x40 2010 modified = 0; 2011 2012 vstr = nv_get_string(nv, "remoteaddr"); 2013 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2014 /* 2015 * Don't copy res->hr_remoteaddr to gres just yet. 2016 * We want remote_close() to log disconnect from the old 2017 * addresses, not from the new ones. 2018 */ 2019 modified |= MODIFIED_REMOTEADDR; 2020 } 2021 vstr = nv_get_string(nv, "sourceaddr"); 2022 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2023 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2024 modified |= MODIFIED_SOURCEADDR; 2025 } 2026 vint = nv_get_int32(nv, "replication"); 2027 if (gres->hr_replication != vint) { 2028 gres->hr_replication = vint; 2029 modified |= MODIFIED_REPLICATION; 2030 } 2031 vint = nv_get_int32(nv, "checksum"); 2032 if (gres->hr_checksum != vint) { 2033 gres->hr_checksum = vint; 2034 modified |= MODIFIED_CHECKSUM; 2035 } 2036 vint = nv_get_int32(nv, "compression"); 2037 if (gres->hr_compression != vint) { 2038 gres->hr_compression = vint; 2039 modified |= MODIFIED_COMPRESSION; 2040 } 2041 vint = nv_get_int32(nv, "timeout"); 2042 if (gres->hr_timeout != vint) { 2043 gres->hr_timeout = vint; 2044 modified |= MODIFIED_TIMEOUT; 2045 } 2046 vstr = nv_get_string(nv, "exec"); 2047 if (strcmp(gres->hr_exec, vstr) != 0) { 2048 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2049 modified |= MODIFIED_EXEC; 2050 } 2051 2052 /* 2053 * Change timeout for connected sockets. 2054 * Don't bother if we need to reconnect. 2055 */ 2056 if ((modified & MODIFIED_TIMEOUT) != 0 && 2057 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2058 MODIFIED_REPLICATION)) == 0) { 2059 for (ii = 0; ii < ncomps; ii++) { 2060 if (!ISREMOTE(ii)) 2061 continue; 2062 rw_rlock(&hio_remote_lock[ii]); 2063 if (!ISCONNECTED(gres, ii)) { 2064 rw_unlock(&hio_remote_lock[ii]); 2065 continue; 2066 } 2067 rw_unlock(&hio_remote_lock[ii]); 2068 if (proto_timeout(gres->hr_remotein, 2069 gres->hr_timeout) < 0) { 2070 pjdlog_errno(LOG_WARNING, 2071 "Unable to set connection timeout"); 2072 } 2073 if (proto_timeout(gres->hr_remoteout, 2074 gres->hr_timeout) < 0) { 2075 pjdlog_errno(LOG_WARNING, 2076 "Unable to set connection timeout"); 2077 } 2078 } 2079 } 2080 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2081 MODIFIED_REPLICATION)) != 0) { 2082 for (ii = 0; ii < ncomps; ii++) { 2083 if (!ISREMOTE(ii)) 2084 continue; 2085 remote_close(gres, ii); 2086 } 2087 if (modified & MODIFIED_REMOTEADDR) { 2088 vstr = nv_get_string(nv, "remoteaddr"); 2089 strlcpy(gres->hr_remoteaddr, vstr, 2090 sizeof(gres->hr_remoteaddr)); 2091 } 2092 } 2093#undef MODIFIED_REMOTEADDR 2094#undef MODIFIED_SOURCEADDR 2095#undef MODIFIED_REPLICATION 2096#undef MODIFIED_CHECKSUM 2097#undef MODIFIED_COMPRESSION 2098#undef MODIFIED_TIMEOUT 2099#undef MODIFIED_EXEC 2100 2101 pjdlog_info("Configuration reloaded successfully."); 2102} 2103 2104static void 2105guard_one(struct hast_resource *res, unsigned int ncomp) 2106{ 2107 struct proto_conn *in, *out; 2108 2109 if (!ISREMOTE(ncomp)) 2110 return; 2111 2112 rw_rlock(&hio_remote_lock[ncomp]); 2113 2114 if (!real_remote(res)) { 2115 rw_unlock(&hio_remote_lock[ncomp]); 2116 return; 2117 } 2118 2119 if (ISCONNECTED(res, ncomp)) { 2120 PJDLOG_ASSERT(res->hr_remotein != NULL); 2121 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2122 rw_unlock(&hio_remote_lock[ncomp]); 2123 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2124 res->hr_remoteaddr); 2125 return; 2126 } 2127 2128 PJDLOG_ASSERT(res->hr_remotein == NULL); 2129 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2130 /* 2131 * Upgrade the lock. It doesn't have to be atomic as no other thread 2132 * can change connection status from disconnected to connected. 2133 */ 2134 rw_unlock(&hio_remote_lock[ncomp]); 2135 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2136 res->hr_remoteaddr); 2137 in = out = NULL; 2138 if (init_remote(res, &in, &out) == 0) { 2139 rw_wlock(&hio_remote_lock[ncomp]); 2140 PJDLOG_ASSERT(res->hr_remotein == NULL); 2141 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2142 PJDLOG_ASSERT(in != NULL && out != NULL); 2143 res->hr_remotein = in; 2144 res->hr_remoteout = out; 2145 rw_unlock(&hio_remote_lock[ncomp]); 2146 pjdlog_info("Successfully reconnected to %s.", 2147 res->hr_remoteaddr); 2148 sync_start(); 2149 } else { 2150 /* Both connections should be NULL. */ 2151 PJDLOG_ASSERT(res->hr_remotein == NULL); 2152 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2153 PJDLOG_ASSERT(in == NULL && out == NULL); 2154 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2155 res->hr_remoteaddr); 2156 } 2157} 2158 2159/* 2160 * Thread guards remote connections and reconnects when needed, handles 2161 * signals, etc. 2162 */ 2163static void * 2164guard_thread(void *arg) 2165{ 2166 struct hast_resource *res = arg; 2167 unsigned int ii, ncomps; 2168 struct timespec timeout; 2169 time_t lastcheck, now; 2170 sigset_t mask; 2171 int signo; 2172 2173 ncomps = HAST_NCOMPONENTS; 2174 lastcheck = time(NULL); 2175 2176 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2177 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2178 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2179 2180 timeout.tv_sec = HAST_KEEPALIVE; 2181 timeout.tv_nsec = 0; 2182 signo = -1; 2183 2184 for (;;) { 2185 switch (signo) { 2186 case SIGINT: 2187 case SIGTERM: 2188 sigexit_received = true; 2189 primary_exitx(EX_OK, 2190 "Termination signal received, exiting."); 2191 break; 2192 default: 2193 break; 2194 } 2195 2196 /* 2197 * Don't check connections until we fully started, 2198 * as we may still be looping, waiting for remote node 2199 * to switch from primary to secondary. 2200 */ 2201 if (fullystarted) { 2202 pjdlog_debug(2, "remote_guard: Checking connections."); 2203 now = time(NULL); 2204 if (lastcheck + HAST_KEEPALIVE <= now) { 2205 for (ii = 0; ii < ncomps; ii++) 2206 guard_one(res, ii); 2207 lastcheck = now; 2208 } 2209 } 2210 signo = sigtimedwait(&mask, NULL, &timeout); 2211 } 2212 /* NOTREACHED */ 2213 return (NULL); 2214} 2215