primary.c revision 225783
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 225783 2011-09-27 07:59:10Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <signal.h> 49#include <stdint.h> 50#include <stdio.h> 51#include <string.h> 52#include <sysexits.h> 53#include <unistd.h> 54 55#include <activemap.h> 56#include <nv.h> 57#include <rangelock.h> 58 59#include "control.h" 60#include "event.h" 61#include "hast.h" 62#include "hast_proto.h" 63#include "hastd.h" 64#include "hooks.h" 65#include "metadata.h" 66#include "proto.h" 67#include "pjdlog.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 unsigned int hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 TAILQ_ENTRY(hio) *hio_next; 93}; 94#define hio_free_next hio_next[0] 95#define hio_done_next hio_next[0] 96 97/* 98 * Free list holds unused structures. When free list is empty, we have to wait 99 * until some in-progress requests are freed. 100 */ 101static TAILQ_HEAD(, hio) hio_free_list; 102static pthread_mutex_t hio_free_list_lock; 103static pthread_cond_t hio_free_list_cond; 104/* 105 * There is one send list for every component. One requests is placed on all 106 * send lists - each component gets the same request, but each component is 107 * responsible for managing his own send list. 108 */ 109static TAILQ_HEAD(, hio) *hio_send_list; 110static pthread_mutex_t *hio_send_list_lock; 111static pthread_cond_t *hio_send_list_cond; 112/* 113 * There is one recv list for every component, although local components don't 114 * use recv lists as local requests are done synchronously. 115 */ 116static TAILQ_HEAD(, hio) *hio_recv_list; 117static pthread_mutex_t *hio_recv_list_lock; 118static pthread_cond_t *hio_recv_list_cond; 119/* 120 * Request is placed on done list by the slowest component (the one that 121 * decreased hio_countdown from 1 to 0). 122 */ 123static TAILQ_HEAD(, hio) hio_done_list; 124static pthread_mutex_t hio_done_list_lock; 125static pthread_cond_t hio_done_list_cond; 126/* 127 * Structure below are for interaction with sync thread. 128 */ 129static bool sync_inprogress; 130static pthread_mutex_t sync_lock; 131static pthread_cond_t sync_cond; 132/* 133 * The lock below allows to synchornize access to remote connections. 134 */ 135static pthread_rwlock_t *hio_remote_lock; 136 137/* 138 * Lock to synchronize metadata updates. Also synchronize access to 139 * hr_primary_localcnt and hr_primary_remotecnt fields. 140 */ 141static pthread_mutex_t metadata_lock; 142 143/* 144 * Maximum number of outstanding I/O requests. 145 */ 146#define HAST_HIO_MAX 256 147/* 148 * Number of components. At this point there are only two components: local 149 * and remote, but in the future it might be possible to use multiple local 150 * and remote components. 151 */ 152#define HAST_NCOMPONENTS 2 153 154#define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167} while (0) 168#define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177} while (0) 178#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 179 bool _last; \ 180 \ 181 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 182 _last = false; \ 183 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 184 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 185 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 186 if ((timeout) != 0) \ 187 _last = true; \ 188 } \ 189 if (hio != NULL) { \ 190 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 191 hio_next[(ncomp)]); \ 192 } \ 193 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 194} while (0) 195#define QUEUE_TAKE2(hio, name) do { \ 196 mtx_lock(&hio_##name##_list_lock); \ 197 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 198 cv_wait(&hio_##name##_list_cond, \ 199 &hio_##name##_list_lock); \ 200 } \ 201 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 202 mtx_unlock(&hio_##name##_list_lock); \ 203} while (0) 204 205#define SYNCREQ(hio) do { \ 206 (hio)->hio_ggio.gctl_unit = -1; \ 207 (hio)->hio_ggio.gctl_seq = 1; \ 208} while (0) 209#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 210#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 211#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 212 213static struct hast_resource *gres; 214 215static pthread_mutex_t range_lock; 216static struct rangelocks *range_regular; 217static bool range_regular_wait; 218static pthread_cond_t range_regular_cond; 219static struct rangelocks *range_sync; 220static bool range_sync_wait; 221static pthread_cond_t range_sync_cond; 222static bool fullystarted; 223 224static void *ggate_recv_thread(void *arg); 225static void *local_send_thread(void *arg); 226static void *remote_send_thread(void *arg); 227static void *remote_recv_thread(void *arg); 228static void *ggate_send_thread(void *arg); 229static void *sync_thread(void *arg); 230static void *guard_thread(void *arg); 231 232static void 233cleanup(struct hast_resource *res) 234{ 235 int rerrno; 236 237 /* Remember errno. */ 238 rerrno = errno; 239 240 /* Destroy ggate provider if we created one. */ 241 if (res->hr_ggateunit >= 0) { 242 struct g_gate_ctl_destroy ggiod; 243 244 bzero(&ggiod, sizeof(ggiod)); 245 ggiod.gctl_version = G_GATE_VERSION; 246 ggiod.gctl_unit = res->hr_ggateunit; 247 ggiod.gctl_force = 1; 248 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 249 pjdlog_errno(LOG_WARNING, 250 "Unable to destroy hast/%s device", 251 res->hr_provname); 252 } 253 res->hr_ggateunit = -1; 254 } 255 256 /* Restore errno. */ 257 errno = rerrno; 258} 259 260static __dead2 void 261primary_exit(int exitcode, const char *fmt, ...) 262{ 263 va_list ap; 264 265 PJDLOG_ASSERT(exitcode != EX_OK); 266 va_start(ap, fmt); 267 pjdlogv_errno(LOG_ERR, fmt, ap); 268 va_end(ap); 269 cleanup(gres); 270 exit(exitcode); 271} 272 273static __dead2 void 274primary_exitx(int exitcode, const char *fmt, ...) 275{ 276 va_list ap; 277 278 va_start(ap, fmt); 279 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 280 va_end(ap); 281 cleanup(gres); 282 exit(exitcode); 283} 284 285static int 286hast_activemap_flush(struct hast_resource *res) 287{ 288 const unsigned char *buf; 289 size_t size; 290 291 buf = activemap_bitmap(res->hr_amp, &size); 292 PJDLOG_ASSERT(buf != NULL); 293 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 294 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 295 (ssize_t)size) { 296 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 297 "Unable to flush activemap to disk")); 298 return (-1); 299 } 300 return (0); 301} 302 303static bool 304real_remote(const struct hast_resource *res) 305{ 306 307 return (strcmp(res->hr_remoteaddr, "none") != 0); 308} 309 310static void 311init_environment(struct hast_resource *res __unused) 312{ 313 struct hio *hio; 314 unsigned int ii, ncomps; 315 316 /* 317 * In the future it might be per-resource value. 318 */ 319 ncomps = HAST_NCOMPONENTS; 320 321 /* 322 * Allocate memory needed by lists. 323 */ 324 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 325 if (hio_send_list == NULL) { 326 primary_exitx(EX_TEMPFAIL, 327 "Unable to allocate %zu bytes of memory for send lists.", 328 sizeof(hio_send_list[0]) * ncomps); 329 } 330 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 331 if (hio_send_list_lock == NULL) { 332 primary_exitx(EX_TEMPFAIL, 333 "Unable to allocate %zu bytes of memory for send list locks.", 334 sizeof(hio_send_list_lock[0]) * ncomps); 335 } 336 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 337 if (hio_send_list_cond == NULL) { 338 primary_exitx(EX_TEMPFAIL, 339 "Unable to allocate %zu bytes of memory for send list condition variables.", 340 sizeof(hio_send_list_cond[0]) * ncomps); 341 } 342 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 343 if (hio_recv_list == NULL) { 344 primary_exitx(EX_TEMPFAIL, 345 "Unable to allocate %zu bytes of memory for recv lists.", 346 sizeof(hio_recv_list[0]) * ncomps); 347 } 348 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 349 if (hio_recv_list_lock == NULL) { 350 primary_exitx(EX_TEMPFAIL, 351 "Unable to allocate %zu bytes of memory for recv list locks.", 352 sizeof(hio_recv_list_lock[0]) * ncomps); 353 } 354 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 355 if (hio_recv_list_cond == NULL) { 356 primary_exitx(EX_TEMPFAIL, 357 "Unable to allocate %zu bytes of memory for recv list condition variables.", 358 sizeof(hio_recv_list_cond[0]) * ncomps); 359 } 360 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 361 if (hio_remote_lock == NULL) { 362 primary_exitx(EX_TEMPFAIL, 363 "Unable to allocate %zu bytes of memory for remote connections locks.", 364 sizeof(hio_remote_lock[0]) * ncomps); 365 } 366 367 /* 368 * Initialize lists, their locks and theirs condition variables. 369 */ 370 TAILQ_INIT(&hio_free_list); 371 mtx_init(&hio_free_list_lock); 372 cv_init(&hio_free_list_cond); 373 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 374 TAILQ_INIT(&hio_send_list[ii]); 375 mtx_init(&hio_send_list_lock[ii]); 376 cv_init(&hio_send_list_cond[ii]); 377 TAILQ_INIT(&hio_recv_list[ii]); 378 mtx_init(&hio_recv_list_lock[ii]); 379 cv_init(&hio_recv_list_cond[ii]); 380 rw_init(&hio_remote_lock[ii]); 381 } 382 TAILQ_INIT(&hio_done_list); 383 mtx_init(&hio_done_list_lock); 384 cv_init(&hio_done_list_cond); 385 mtx_init(&metadata_lock); 386 387 /* 388 * Allocate requests pool and initialize requests. 389 */ 390 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 391 hio = malloc(sizeof(*hio)); 392 if (hio == NULL) { 393 primary_exitx(EX_TEMPFAIL, 394 "Unable to allocate %zu bytes of memory for hio request.", 395 sizeof(*hio)); 396 } 397 hio->hio_countdown = 0; 398 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 399 if (hio->hio_errors == NULL) { 400 primary_exitx(EX_TEMPFAIL, 401 "Unable allocate %zu bytes of memory for hio errors.", 402 sizeof(hio->hio_errors[0]) * ncomps); 403 } 404 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 405 if (hio->hio_next == NULL) { 406 primary_exitx(EX_TEMPFAIL, 407 "Unable allocate %zu bytes of memory for hio_next field.", 408 sizeof(hio->hio_next[0]) * ncomps); 409 } 410 hio->hio_ggio.gctl_version = G_GATE_VERSION; 411 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 412 if (hio->hio_ggio.gctl_data == NULL) { 413 primary_exitx(EX_TEMPFAIL, 414 "Unable to allocate %zu bytes of memory for gctl_data.", 415 MAXPHYS); 416 } 417 hio->hio_ggio.gctl_length = MAXPHYS; 418 hio->hio_ggio.gctl_error = 0; 419 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 420 } 421} 422 423static bool 424init_resuid(struct hast_resource *res) 425{ 426 427 mtx_lock(&metadata_lock); 428 if (res->hr_resuid != 0) { 429 mtx_unlock(&metadata_lock); 430 return (false); 431 } else { 432 /* Initialize unique resource identifier. */ 433 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 434 mtx_unlock(&metadata_lock); 435 if (metadata_write(res) < 0) 436 exit(EX_NOINPUT); 437 return (true); 438 } 439} 440 441static void 442init_local(struct hast_resource *res) 443{ 444 unsigned char *buf; 445 size_t mapsize; 446 447 if (metadata_read(res, true) < 0) 448 exit(EX_NOINPUT); 449 mtx_init(&res->hr_amp_lock); 450 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 451 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 452 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 453 } 454 mtx_init(&range_lock); 455 cv_init(&range_regular_cond); 456 if (rangelock_init(&range_regular) < 0) 457 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 458 cv_init(&range_sync_cond); 459 if (rangelock_init(&range_sync) < 0) 460 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 461 mapsize = activemap_ondisk_size(res->hr_amp); 462 buf = calloc(1, mapsize); 463 if (buf == NULL) { 464 primary_exitx(EX_TEMPFAIL, 465 "Unable to allocate buffer for activemap."); 466 } 467 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 468 (ssize_t)mapsize) { 469 primary_exit(EX_NOINPUT, "Unable to read activemap"); 470 } 471 activemap_copyin(res->hr_amp, buf, mapsize); 472 free(buf); 473 if (res->hr_resuid != 0) 474 return; 475 /* 476 * We're using provider for the first time. Initialize local and remote 477 * counters. We don't initialize resuid here, as we want to do it just 478 * in time. The reason for this is that we want to inform secondary 479 * that there were no writes yet, so there is no need to synchronize 480 * anything. 481 */ 482 res->hr_primary_localcnt = 0; 483 res->hr_primary_remotecnt = 0; 484 if (metadata_write(res) < 0) 485 exit(EX_NOINPUT); 486} 487 488static int 489primary_connect(struct hast_resource *res, struct proto_conn **connp) 490{ 491 struct proto_conn *conn; 492 int16_t val; 493 494 val = 1; 495 if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 496 primary_exit(EX_TEMPFAIL, 497 "Unable to send connection request to parent"); 498 } 499 if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 500 primary_exit(EX_TEMPFAIL, 501 "Unable to receive reply to connection request from parent"); 502 } 503 if (val != 0) { 504 errno = val; 505 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 506 res->hr_remoteaddr); 507 return (-1); 508 } 509 if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 510 primary_exit(EX_TEMPFAIL, 511 "Unable to receive connection from parent"); 512 } 513 if (proto_connect_wait(conn, res->hr_timeout) < 0) { 514 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 515 res->hr_remoteaddr); 516 proto_close(conn); 517 return (-1); 518 } 519 /* Error in setting timeout is not critical, but why should it fail? */ 520 if (proto_timeout(conn, res->hr_timeout) < 0) 521 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 522 523 *connp = conn; 524 525 return (0); 526} 527 528static int 529init_remote(struct hast_resource *res, struct proto_conn **inp, 530 struct proto_conn **outp) 531{ 532 struct proto_conn *in, *out; 533 struct nv *nvout, *nvin; 534 const unsigned char *token; 535 unsigned char *map; 536 const char *errmsg; 537 int32_t extentsize; 538 int64_t datasize; 539 uint32_t mapsize; 540 size_t size; 541 int error; 542 543 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 544 PJDLOG_ASSERT(real_remote(res)); 545 546 in = out = NULL; 547 errmsg = NULL; 548 549 if (primary_connect(res, &out) == -1) 550 return (ECONNREFUSED); 551 552 error = ECONNABORTED; 553 554 /* 555 * First handshake step. 556 * Setup outgoing connection with remote node. 557 */ 558 nvout = nv_alloc(); 559 nv_add_string(nvout, res->hr_name, "resource"); 560 if (nv_error(nvout) != 0) { 561 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 562 "Unable to allocate header for connection with %s", 563 res->hr_remoteaddr); 564 nv_free(nvout); 565 goto close; 566 } 567 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 568 pjdlog_errno(LOG_WARNING, 569 "Unable to send handshake header to %s", 570 res->hr_remoteaddr); 571 nv_free(nvout); 572 goto close; 573 } 574 nv_free(nvout); 575 if (hast_proto_recv_hdr(out, &nvin) < 0) { 576 pjdlog_errno(LOG_WARNING, 577 "Unable to receive handshake header from %s", 578 res->hr_remoteaddr); 579 goto close; 580 } 581 errmsg = nv_get_string(nvin, "errmsg"); 582 if (errmsg != NULL) { 583 pjdlog_warning("%s", errmsg); 584 if (nv_exists(nvin, "wait")) 585 error = EBUSY; 586 nv_free(nvin); 587 goto close; 588 } 589 token = nv_get_uint8_array(nvin, &size, "token"); 590 if (token == NULL) { 591 pjdlog_warning("Handshake header from %s has no 'token' field.", 592 res->hr_remoteaddr); 593 nv_free(nvin); 594 goto close; 595 } 596 if (size != sizeof(res->hr_token)) { 597 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 598 res->hr_remoteaddr, size, sizeof(res->hr_token)); 599 nv_free(nvin); 600 goto close; 601 } 602 bcopy(token, res->hr_token, sizeof(res->hr_token)); 603 nv_free(nvin); 604 605 /* 606 * Second handshake step. 607 * Setup incoming connection with remote node. 608 */ 609 if (primary_connect(res, &in) == -1) 610 goto close; 611 612 nvout = nv_alloc(); 613 nv_add_string(nvout, res->hr_name, "resource"); 614 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 615 "token"); 616 if (res->hr_resuid == 0) { 617 /* 618 * The resuid field was not yet initialized. 619 * Because we do synchronization inside init_resuid(), it is 620 * possible that someone already initialized it, the function 621 * will return false then, but if we successfully initialized 622 * it, we will get true. True means that there were no writes 623 * to this resource yet and we want to inform secondary that 624 * synchronization is not needed by sending "virgin" argument. 625 */ 626 if (init_resuid(res)) 627 nv_add_int8(nvout, 1, "virgin"); 628 } 629 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 630 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 631 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 632 if (nv_error(nvout) != 0) { 633 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 634 "Unable to allocate header for connection with %s", 635 res->hr_remoteaddr); 636 nv_free(nvout); 637 goto close; 638 } 639 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 640 pjdlog_errno(LOG_WARNING, 641 "Unable to send handshake header to %s", 642 res->hr_remoteaddr); 643 nv_free(nvout); 644 goto close; 645 } 646 nv_free(nvout); 647 if (hast_proto_recv_hdr(out, &nvin) < 0) { 648 pjdlog_errno(LOG_WARNING, 649 "Unable to receive handshake header from %s", 650 res->hr_remoteaddr); 651 goto close; 652 } 653 errmsg = nv_get_string(nvin, "errmsg"); 654 if (errmsg != NULL) { 655 pjdlog_warning("%s", errmsg); 656 nv_free(nvin); 657 goto close; 658 } 659 datasize = nv_get_int64(nvin, "datasize"); 660 if (datasize != res->hr_datasize) { 661 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 662 (intmax_t)res->hr_datasize, (intmax_t)datasize); 663 nv_free(nvin); 664 goto close; 665 } 666 extentsize = nv_get_int32(nvin, "extentsize"); 667 if (extentsize != res->hr_extentsize) { 668 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 669 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 670 nv_free(nvin); 671 goto close; 672 } 673 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 674 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 675 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 676 if (nv_exists(nvin, "virgin")) { 677 /* 678 * Secondary was reinitialized, bump localcnt if it is 0 as 679 * only we have the data. 680 */ 681 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 682 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 683 684 if (res->hr_primary_localcnt == 0) { 685 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 686 687 mtx_lock(&metadata_lock); 688 res->hr_primary_localcnt++; 689 pjdlog_debug(1, "Increasing localcnt to %ju.", 690 (uintmax_t)res->hr_primary_localcnt); 691 (void)metadata_write(res); 692 mtx_unlock(&metadata_lock); 693 } 694 } 695 map = NULL; 696 mapsize = nv_get_uint32(nvin, "mapsize"); 697 if (mapsize > 0) { 698 map = malloc(mapsize); 699 if (map == NULL) { 700 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 701 (uintmax_t)mapsize); 702 nv_free(nvin); 703 goto close; 704 } 705 /* 706 * Remote node have some dirty extents on its own, lets 707 * download its activemap. 708 */ 709 if (hast_proto_recv_data(res, out, nvin, map, 710 mapsize) < 0) { 711 pjdlog_errno(LOG_ERR, 712 "Unable to receive remote activemap"); 713 nv_free(nvin); 714 free(map); 715 goto close; 716 } 717 /* 718 * Merge local and remote bitmaps. 719 */ 720 activemap_merge(res->hr_amp, map, mapsize); 721 free(map); 722 /* 723 * Now that we merged bitmaps from both nodes, flush it to the 724 * disk before we start to synchronize. 725 */ 726 (void)hast_activemap_flush(res); 727 } 728 nv_free(nvin); 729#ifdef notyet 730 /* Setup directions. */ 731 if (proto_send(out, NULL, 0) == -1) 732 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 733 if (proto_recv(in, NULL, 0) == -1) 734 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 735#endif 736 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 737 if (inp != NULL && outp != NULL) { 738 *inp = in; 739 *outp = out; 740 } else { 741 res->hr_remotein = in; 742 res->hr_remoteout = out; 743 } 744 event_send(res, EVENT_CONNECT); 745 return (0); 746close: 747 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 748 event_send(res, EVENT_SPLITBRAIN); 749 proto_close(out); 750 if (in != NULL) 751 proto_close(in); 752 return (error); 753} 754 755static void 756sync_start(void) 757{ 758 759 mtx_lock(&sync_lock); 760 sync_inprogress = true; 761 mtx_unlock(&sync_lock); 762 cv_signal(&sync_cond); 763} 764 765static void 766sync_stop(void) 767{ 768 769 mtx_lock(&sync_lock); 770 if (sync_inprogress) 771 sync_inprogress = false; 772 mtx_unlock(&sync_lock); 773} 774 775static void 776init_ggate(struct hast_resource *res) 777{ 778 struct g_gate_ctl_create ggiocreate; 779 struct g_gate_ctl_cancel ggiocancel; 780 781 /* 782 * We communicate with ggate via /dev/ggctl. Open it. 783 */ 784 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 785 if (res->hr_ggatefd < 0) 786 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 787 /* 788 * Create provider before trying to connect, as connection failure 789 * is not critical, but may take some time. 790 */ 791 bzero(&ggiocreate, sizeof(ggiocreate)); 792 ggiocreate.gctl_version = G_GATE_VERSION; 793 ggiocreate.gctl_mediasize = res->hr_datasize; 794 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 795 ggiocreate.gctl_flags = 0; 796 ggiocreate.gctl_maxcount = 0; 797 ggiocreate.gctl_timeout = 0; 798 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 799 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 800 res->hr_provname); 801 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 802 pjdlog_info("Device hast/%s created.", res->hr_provname); 803 res->hr_ggateunit = ggiocreate.gctl_unit; 804 return; 805 } 806 if (errno != EEXIST) { 807 primary_exit(EX_OSERR, "Unable to create hast/%s device", 808 res->hr_provname); 809 } 810 pjdlog_debug(1, 811 "Device hast/%s already exists, we will try to take it over.", 812 res->hr_provname); 813 /* 814 * If we received EEXIST, we assume that the process who created the 815 * provider died and didn't clean up. In that case we will start from 816 * where he left of. 817 */ 818 bzero(&ggiocancel, sizeof(ggiocancel)); 819 ggiocancel.gctl_version = G_GATE_VERSION; 820 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 821 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 822 res->hr_provname); 823 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 824 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 825 res->hr_ggateunit = ggiocancel.gctl_unit; 826 return; 827 } 828 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 829 res->hr_provname); 830} 831 832void 833hastd_primary(struct hast_resource *res) 834{ 835 pthread_t td; 836 pid_t pid; 837 int error, mode, debuglevel; 838 839 /* 840 * Create communication channel for sending control commands from 841 * parent to child. 842 */ 843 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) < 0) { 844 /* TODO: There's no need for this to be fatal error. */ 845 KEEP_ERRNO((void)pidfile_remove(pfh)); 846 pjdlog_exit(EX_OSERR, 847 "Unable to create control sockets between parent and child"); 848 } 849 /* 850 * Create communication channel for sending events from child to parent. 851 */ 852 if (proto_client(NULL, "socketpair://", &res->hr_event) < 0) { 853 /* TODO: There's no need for this to be fatal error. */ 854 KEEP_ERRNO((void)pidfile_remove(pfh)); 855 pjdlog_exit(EX_OSERR, 856 "Unable to create event sockets between child and parent"); 857 } 858 /* 859 * Create communication channel for sending connection requests from 860 * child to parent. 861 */ 862 if (proto_client(NULL, "socketpair://", &res->hr_conn) < 0) { 863 /* TODO: There's no need for this to be fatal error. */ 864 KEEP_ERRNO((void)pidfile_remove(pfh)); 865 pjdlog_exit(EX_OSERR, 866 "Unable to create connection sockets between child and parent"); 867 } 868 869 pid = fork(); 870 if (pid < 0) { 871 /* TODO: There's no need for this to be fatal error. */ 872 KEEP_ERRNO((void)pidfile_remove(pfh)); 873 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 874 } 875 876 if (pid > 0) { 877 /* This is parent. */ 878 /* Declare that we are receiver. */ 879 proto_recv(res->hr_event, NULL, 0); 880 proto_recv(res->hr_conn, NULL, 0); 881 /* Declare that we are sender. */ 882 proto_send(res->hr_ctrl, NULL, 0); 883 res->hr_workerpid = pid; 884 return; 885 } 886 887 gres = res; 888 mode = pjdlog_mode_get(); 889 debuglevel = pjdlog_debug_get(); 890 891 /* Declare that we are sender. */ 892 proto_send(res->hr_event, NULL, 0); 893 proto_send(res->hr_conn, NULL, 0); 894 /* Declare that we are receiver. */ 895 proto_recv(res->hr_ctrl, NULL, 0); 896 descriptors_cleanup(res); 897 898 descriptors_assert(res, mode); 899 900 pjdlog_init(mode); 901 pjdlog_debug_set(debuglevel); 902 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 903 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 904 905 init_local(res); 906 init_ggate(res); 907 init_environment(res); 908 909 if (drop_privs(res) != 0) { 910 cleanup(res); 911 exit(EX_CONFIG); 912 } 913 pjdlog_info("Privileges successfully dropped."); 914 915 /* 916 * Create the guard thread first, so we can handle signals from the 917 * very begining. 918 */ 919 error = pthread_create(&td, NULL, guard_thread, res); 920 PJDLOG_ASSERT(error == 0); 921 /* 922 * Create the control thread before sending any event to the parent, 923 * as we can deadlock when parent sends control request to worker, 924 * but worker has no control thread started yet, so parent waits. 925 * In the meantime worker sends an event to the parent, but parent 926 * is unable to handle the event, because it waits for control 927 * request response. 928 */ 929 error = pthread_create(&td, NULL, ctrl_thread, res); 930 PJDLOG_ASSERT(error == 0); 931 if (real_remote(res)) { 932 error = init_remote(res, NULL, NULL); 933 if (error == 0) { 934 sync_start(); 935 } else if (error == EBUSY) { 936 time_t start = time(NULL); 937 938 pjdlog_warning("Waiting for remote node to become %s for %ds.", 939 role2str(HAST_ROLE_SECONDARY), 940 res->hr_timeout); 941 for (;;) { 942 sleep(1); 943 error = init_remote(res, NULL, NULL); 944 if (error != EBUSY) 945 break; 946 if (time(NULL) > start + res->hr_timeout) 947 break; 948 } 949 if (error == EBUSY) { 950 pjdlog_warning("Remote node is still %s, starting anyway.", 951 role2str(HAST_ROLE_PRIMARY)); 952 } 953 } 954 } 955 error = pthread_create(&td, NULL, ggate_recv_thread, res); 956 PJDLOG_ASSERT(error == 0); 957 error = pthread_create(&td, NULL, local_send_thread, res); 958 PJDLOG_ASSERT(error == 0); 959 error = pthread_create(&td, NULL, remote_send_thread, res); 960 PJDLOG_ASSERT(error == 0); 961 error = pthread_create(&td, NULL, remote_recv_thread, res); 962 PJDLOG_ASSERT(error == 0); 963 error = pthread_create(&td, NULL, ggate_send_thread, res); 964 PJDLOG_ASSERT(error == 0); 965 fullystarted = true; 966 (void)sync_thread(res); 967} 968 969static void 970reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 971{ 972 char msg[1024]; 973 va_list ap; 974 int len; 975 976 va_start(ap, fmt); 977 len = vsnprintf(msg, sizeof(msg), fmt, ap); 978 va_end(ap); 979 if ((size_t)len < sizeof(msg)) { 980 switch (ggio->gctl_cmd) { 981 case BIO_READ: 982 (void)snprintf(msg + len, sizeof(msg) - len, 983 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 984 (uintmax_t)ggio->gctl_length); 985 break; 986 case BIO_DELETE: 987 (void)snprintf(msg + len, sizeof(msg) - len, 988 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 989 (uintmax_t)ggio->gctl_length); 990 break; 991 case BIO_FLUSH: 992 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 993 break; 994 case BIO_WRITE: 995 (void)snprintf(msg + len, sizeof(msg) - len, 996 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 997 (uintmax_t)ggio->gctl_length); 998 break; 999 default: 1000 (void)snprintf(msg + len, sizeof(msg) - len, 1001 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 1002 break; 1003 } 1004 } 1005 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1006} 1007 1008static void 1009remote_close(struct hast_resource *res, int ncomp) 1010{ 1011 1012 rw_wlock(&hio_remote_lock[ncomp]); 1013 /* 1014 * A race is possible between dropping rlock and acquiring wlock - 1015 * another thread can close connection in-between. 1016 */ 1017 if (!ISCONNECTED(res, ncomp)) { 1018 PJDLOG_ASSERT(res->hr_remotein == NULL); 1019 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1020 rw_unlock(&hio_remote_lock[ncomp]); 1021 return; 1022 } 1023 1024 PJDLOG_ASSERT(res->hr_remotein != NULL); 1025 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1026 1027 pjdlog_debug(2, "Closing incoming connection to %s.", 1028 res->hr_remoteaddr); 1029 proto_close(res->hr_remotein); 1030 res->hr_remotein = NULL; 1031 pjdlog_debug(2, "Closing outgoing connection to %s.", 1032 res->hr_remoteaddr); 1033 proto_close(res->hr_remoteout); 1034 res->hr_remoteout = NULL; 1035 1036 rw_unlock(&hio_remote_lock[ncomp]); 1037 1038 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1039 1040 /* 1041 * Stop synchronization if in-progress. 1042 */ 1043 sync_stop(); 1044 1045 event_send(res, EVENT_DISCONNECT); 1046} 1047 1048/* 1049 * Thread receives ggate I/O requests from the kernel and passes them to 1050 * appropriate threads: 1051 * WRITE - always goes to both local_send and remote_send threads 1052 * READ (when the block is up-to-date on local component) - 1053 * only local_send thread 1054 * READ (when the block isn't up-to-date on local component) - 1055 * only remote_send thread 1056 * DELETE - always goes to both local_send and remote_send threads 1057 * FLUSH - always goes to both local_send and remote_send threads 1058 */ 1059static void * 1060ggate_recv_thread(void *arg) 1061{ 1062 struct hast_resource *res = arg; 1063 struct g_gate_ctl_io *ggio; 1064 struct hio *hio; 1065 unsigned int ii, ncomp, ncomps; 1066 int error; 1067 1068 ncomps = HAST_NCOMPONENTS; 1069 1070 for (;;) { 1071 pjdlog_debug(2, "ggate_recv: Taking free request."); 1072 QUEUE_TAKE2(hio, free); 1073 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1074 ggio = &hio->hio_ggio; 1075 ggio->gctl_unit = res->hr_ggateunit; 1076 ggio->gctl_length = MAXPHYS; 1077 ggio->gctl_error = 0; 1078 pjdlog_debug(2, 1079 "ggate_recv: (%p) Waiting for request from the kernel.", 1080 hio); 1081 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1082 if (sigexit_received) 1083 pthread_exit(NULL); 1084 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1085 } 1086 error = ggio->gctl_error; 1087 switch (error) { 1088 case 0: 1089 break; 1090 case ECANCELED: 1091 /* Exit gracefully. */ 1092 if (!sigexit_received) { 1093 pjdlog_debug(2, 1094 "ggate_recv: (%p) Received cancel from the kernel.", 1095 hio); 1096 pjdlog_info("Received cancel from the kernel, exiting."); 1097 } 1098 pthread_exit(NULL); 1099 case ENOMEM: 1100 /* 1101 * Buffer too small? Impossible, we allocate MAXPHYS 1102 * bytes - request can't be bigger than that. 1103 */ 1104 /* FALLTHROUGH */ 1105 case ENXIO: 1106 default: 1107 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1108 strerror(error)); 1109 } 1110 for (ii = 0; ii < ncomps; ii++) 1111 hio->hio_errors[ii] = EINVAL; 1112 reqlog(LOG_DEBUG, 2, ggio, 1113 "ggate_recv: (%p) Request received from the kernel: ", 1114 hio); 1115 /* 1116 * Inform all components about new write request. 1117 * For read request prefer local component unless the given 1118 * range is out-of-date, then use remote component. 1119 */ 1120 switch (ggio->gctl_cmd) { 1121 case BIO_READ: 1122 res->hr_stat_read++; 1123 pjdlog_debug(2, 1124 "ggate_recv: (%p) Moving request to the send queue.", 1125 hio); 1126 refcount_init(&hio->hio_countdown, 1); 1127 mtx_lock(&metadata_lock); 1128 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1129 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1130 /* 1131 * This range is up-to-date on local component, 1132 * so handle request locally. 1133 */ 1134 /* Local component is 0 for now. */ 1135 ncomp = 0; 1136 } else /* if (res->hr_syncsrc == 1137 HAST_SYNCSRC_SECONDARY) */ { 1138 PJDLOG_ASSERT(res->hr_syncsrc == 1139 HAST_SYNCSRC_SECONDARY); 1140 /* 1141 * This range is out-of-date on local component, 1142 * so send request to the remote node. 1143 */ 1144 /* Remote component is 1 for now. */ 1145 ncomp = 1; 1146 } 1147 mtx_unlock(&metadata_lock); 1148 QUEUE_INSERT1(hio, send, ncomp); 1149 break; 1150 case BIO_WRITE: 1151 res->hr_stat_write++; 1152 if (res->hr_resuid == 0) { 1153 /* 1154 * This is first write, initialize localcnt and 1155 * resuid. 1156 */ 1157 res->hr_primary_localcnt = 1; 1158 (void)init_resuid(res); 1159 } 1160 for (;;) { 1161 mtx_lock(&range_lock); 1162 if (rangelock_islocked(range_sync, 1163 ggio->gctl_offset, ggio->gctl_length)) { 1164 pjdlog_debug(2, 1165 "regular: Range offset=%jd length=%zu locked.", 1166 (intmax_t)ggio->gctl_offset, 1167 (size_t)ggio->gctl_length); 1168 range_regular_wait = true; 1169 cv_wait(&range_regular_cond, &range_lock); 1170 range_regular_wait = false; 1171 mtx_unlock(&range_lock); 1172 continue; 1173 } 1174 if (rangelock_add(range_regular, 1175 ggio->gctl_offset, ggio->gctl_length) < 0) { 1176 mtx_unlock(&range_lock); 1177 pjdlog_debug(2, 1178 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1179 (intmax_t)ggio->gctl_offset, 1180 (size_t)ggio->gctl_length); 1181 sleep(1); 1182 continue; 1183 } 1184 mtx_unlock(&range_lock); 1185 break; 1186 } 1187 mtx_lock(&res->hr_amp_lock); 1188 if (activemap_write_start(res->hr_amp, 1189 ggio->gctl_offset, ggio->gctl_length)) { 1190 res->hr_stat_activemap_update++; 1191 (void)hast_activemap_flush(res); 1192 } 1193 mtx_unlock(&res->hr_amp_lock); 1194 /* FALLTHROUGH */ 1195 case BIO_DELETE: 1196 case BIO_FLUSH: 1197 switch (ggio->gctl_cmd) { 1198 case BIO_DELETE: 1199 res->hr_stat_delete++; 1200 break; 1201 case BIO_FLUSH: 1202 res->hr_stat_flush++; 1203 break; 1204 } 1205 pjdlog_debug(2, 1206 "ggate_recv: (%p) Moving request to the send queues.", 1207 hio); 1208 refcount_init(&hio->hio_countdown, ncomps); 1209 for (ii = 0; ii < ncomps; ii++) 1210 QUEUE_INSERT1(hio, send, ii); 1211 break; 1212 } 1213 } 1214 /* NOTREACHED */ 1215 return (NULL); 1216} 1217 1218/* 1219 * Thread reads from or writes to local component. 1220 * If local read fails, it redirects it to remote_send thread. 1221 */ 1222static void * 1223local_send_thread(void *arg) 1224{ 1225 struct hast_resource *res = arg; 1226 struct g_gate_ctl_io *ggio; 1227 struct hio *hio; 1228 unsigned int ncomp, rncomp; 1229 ssize_t ret; 1230 1231 /* Local component is 0 for now. */ 1232 ncomp = 0; 1233 /* Remote component is 1 for now. */ 1234 rncomp = 1; 1235 1236 for (;;) { 1237 pjdlog_debug(2, "local_send: Taking request."); 1238 QUEUE_TAKE1(hio, send, ncomp, 0); 1239 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1240 ggio = &hio->hio_ggio; 1241 switch (ggio->gctl_cmd) { 1242 case BIO_READ: 1243 ret = pread(res->hr_localfd, ggio->gctl_data, 1244 ggio->gctl_length, 1245 ggio->gctl_offset + res->hr_localoff); 1246 if (ret == ggio->gctl_length) 1247 hio->hio_errors[ncomp] = 0; 1248 else if (!ISSYNCREQ(hio)) { 1249 /* 1250 * If READ failed, try to read from remote node. 1251 */ 1252 if (ret < 0) { 1253 reqlog(LOG_WARNING, 0, ggio, 1254 "Local request failed (%s), trying remote node. ", 1255 strerror(errno)); 1256 } else if (ret != ggio->gctl_length) { 1257 reqlog(LOG_WARNING, 0, ggio, 1258 "Local request failed (%zd != %jd), trying remote node. ", 1259 ret, (intmax_t)ggio->gctl_length); 1260 } 1261 QUEUE_INSERT1(hio, send, rncomp); 1262 continue; 1263 } 1264 break; 1265 case BIO_WRITE: 1266 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1267 ggio->gctl_length, 1268 ggio->gctl_offset + res->hr_localoff); 1269 if (ret < 0) { 1270 hio->hio_errors[ncomp] = errno; 1271 reqlog(LOG_WARNING, 0, ggio, 1272 "Local request failed (%s): ", 1273 strerror(errno)); 1274 } else if (ret != ggio->gctl_length) { 1275 hio->hio_errors[ncomp] = EIO; 1276 reqlog(LOG_WARNING, 0, ggio, 1277 "Local request failed (%zd != %jd): ", 1278 ret, (intmax_t)ggio->gctl_length); 1279 } else { 1280 hio->hio_errors[ncomp] = 0; 1281 } 1282 break; 1283 case BIO_DELETE: 1284 ret = g_delete(res->hr_localfd, 1285 ggio->gctl_offset + res->hr_localoff, 1286 ggio->gctl_length); 1287 if (ret < 0) { 1288 hio->hio_errors[ncomp] = errno; 1289 reqlog(LOG_WARNING, 0, ggio, 1290 "Local request failed (%s): ", 1291 strerror(errno)); 1292 } else { 1293 hio->hio_errors[ncomp] = 0; 1294 } 1295 break; 1296 case BIO_FLUSH: 1297 ret = g_flush(res->hr_localfd); 1298 if (ret < 0) { 1299 hio->hio_errors[ncomp] = errno; 1300 reqlog(LOG_WARNING, 0, ggio, 1301 "Local request failed (%s): ", 1302 strerror(errno)); 1303 } else { 1304 hio->hio_errors[ncomp] = 0; 1305 } 1306 break; 1307 } 1308 if (refcount_release(&hio->hio_countdown)) { 1309 if (ISSYNCREQ(hio)) { 1310 mtx_lock(&sync_lock); 1311 SYNCREQDONE(hio); 1312 mtx_unlock(&sync_lock); 1313 cv_signal(&sync_cond); 1314 } else { 1315 pjdlog_debug(2, 1316 "local_send: (%p) Moving request to the done queue.", 1317 hio); 1318 QUEUE_INSERT2(hio, done); 1319 } 1320 } 1321 } 1322 /* NOTREACHED */ 1323 return (NULL); 1324} 1325 1326static void 1327keepalive_send(struct hast_resource *res, unsigned int ncomp) 1328{ 1329 struct nv *nv; 1330 1331 rw_rlock(&hio_remote_lock[ncomp]); 1332 1333 if (!ISCONNECTED(res, ncomp)) { 1334 rw_unlock(&hio_remote_lock[ncomp]); 1335 return; 1336 } 1337 1338 PJDLOG_ASSERT(res->hr_remotein != NULL); 1339 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1340 1341 nv = nv_alloc(); 1342 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1343 if (nv_error(nv) != 0) { 1344 rw_unlock(&hio_remote_lock[ncomp]); 1345 nv_free(nv); 1346 pjdlog_debug(1, 1347 "keepalive_send: Unable to prepare header to send."); 1348 return; 1349 } 1350 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1351 rw_unlock(&hio_remote_lock[ncomp]); 1352 pjdlog_common(LOG_DEBUG, 1, errno, 1353 "keepalive_send: Unable to send request"); 1354 nv_free(nv); 1355 remote_close(res, ncomp); 1356 return; 1357 } 1358 1359 rw_unlock(&hio_remote_lock[ncomp]); 1360 nv_free(nv); 1361 pjdlog_debug(2, "keepalive_send: Request sent."); 1362} 1363 1364/* 1365 * Thread sends request to secondary node. 1366 */ 1367static void * 1368remote_send_thread(void *arg) 1369{ 1370 struct hast_resource *res = arg; 1371 struct g_gate_ctl_io *ggio; 1372 time_t lastcheck, now; 1373 struct hio *hio; 1374 struct nv *nv; 1375 unsigned int ncomp; 1376 bool wakeup; 1377 uint64_t offset, length; 1378 uint8_t cmd; 1379 void *data; 1380 1381 /* Remote component is 1 for now. */ 1382 ncomp = 1; 1383 lastcheck = time(NULL); 1384 1385 for (;;) { 1386 pjdlog_debug(2, "remote_send: Taking request."); 1387 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1388 if (hio == NULL) { 1389 now = time(NULL); 1390 if (lastcheck + HAST_KEEPALIVE <= now) { 1391 keepalive_send(res, ncomp); 1392 lastcheck = now; 1393 } 1394 continue; 1395 } 1396 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1397 ggio = &hio->hio_ggio; 1398 switch (ggio->gctl_cmd) { 1399 case BIO_READ: 1400 cmd = HIO_READ; 1401 data = NULL; 1402 offset = ggio->gctl_offset; 1403 length = ggio->gctl_length; 1404 break; 1405 case BIO_WRITE: 1406 cmd = HIO_WRITE; 1407 data = ggio->gctl_data; 1408 offset = ggio->gctl_offset; 1409 length = ggio->gctl_length; 1410 break; 1411 case BIO_DELETE: 1412 cmd = HIO_DELETE; 1413 data = NULL; 1414 offset = ggio->gctl_offset; 1415 length = ggio->gctl_length; 1416 break; 1417 case BIO_FLUSH: 1418 cmd = HIO_FLUSH; 1419 data = NULL; 1420 offset = 0; 1421 length = 0; 1422 break; 1423 default: 1424 PJDLOG_ABORT("invalid condition"); 1425 } 1426 nv = nv_alloc(); 1427 nv_add_uint8(nv, cmd, "cmd"); 1428 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1429 nv_add_uint64(nv, offset, "offset"); 1430 nv_add_uint64(nv, length, "length"); 1431 if (nv_error(nv) != 0) { 1432 hio->hio_errors[ncomp] = nv_error(nv); 1433 pjdlog_debug(2, 1434 "remote_send: (%p) Unable to prepare header to send.", 1435 hio); 1436 reqlog(LOG_ERR, 0, ggio, 1437 "Unable to prepare header to send (%s): ", 1438 strerror(nv_error(nv))); 1439 /* Move failed request immediately to the done queue. */ 1440 goto done_queue; 1441 } 1442 pjdlog_debug(2, 1443 "remote_send: (%p) Moving request to the recv queue.", 1444 hio); 1445 /* 1446 * Protect connection from disappearing. 1447 */ 1448 rw_rlock(&hio_remote_lock[ncomp]); 1449 if (!ISCONNECTED(res, ncomp)) { 1450 rw_unlock(&hio_remote_lock[ncomp]); 1451 hio->hio_errors[ncomp] = ENOTCONN; 1452 goto done_queue; 1453 } 1454 /* 1455 * Move the request to recv queue before sending it, because 1456 * in different order we can get reply before we move request 1457 * to recv queue. 1458 */ 1459 mtx_lock(&hio_recv_list_lock[ncomp]); 1460 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1461 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1462 mtx_unlock(&hio_recv_list_lock[ncomp]); 1463 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1464 data != NULL ? length : 0) < 0) { 1465 hio->hio_errors[ncomp] = errno; 1466 rw_unlock(&hio_remote_lock[ncomp]); 1467 pjdlog_debug(2, 1468 "remote_send: (%p) Unable to send request.", hio); 1469 reqlog(LOG_ERR, 0, ggio, 1470 "Unable to send request (%s): ", 1471 strerror(hio->hio_errors[ncomp])); 1472 remote_close(res, ncomp); 1473 /* 1474 * Take request back from the receive queue and move 1475 * it immediately to the done queue. 1476 */ 1477 mtx_lock(&hio_recv_list_lock[ncomp]); 1478 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1479 mtx_unlock(&hio_recv_list_lock[ncomp]); 1480 goto done_queue; 1481 } 1482 rw_unlock(&hio_remote_lock[ncomp]); 1483 nv_free(nv); 1484 if (wakeup) 1485 cv_signal(&hio_recv_list_cond[ncomp]); 1486 continue; 1487done_queue: 1488 nv_free(nv); 1489 if (ISSYNCREQ(hio)) { 1490 if (!refcount_release(&hio->hio_countdown)) 1491 continue; 1492 mtx_lock(&sync_lock); 1493 SYNCREQDONE(hio); 1494 mtx_unlock(&sync_lock); 1495 cv_signal(&sync_cond); 1496 continue; 1497 } 1498 if (ggio->gctl_cmd == BIO_WRITE) { 1499 mtx_lock(&res->hr_amp_lock); 1500 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1501 ggio->gctl_length)) { 1502 (void)hast_activemap_flush(res); 1503 } 1504 mtx_unlock(&res->hr_amp_lock); 1505 } 1506 if (!refcount_release(&hio->hio_countdown)) 1507 continue; 1508 pjdlog_debug(2, 1509 "remote_send: (%p) Moving request to the done queue.", 1510 hio); 1511 QUEUE_INSERT2(hio, done); 1512 } 1513 /* NOTREACHED */ 1514 return (NULL); 1515} 1516 1517/* 1518 * Thread receives answer from secondary node and passes it to ggate_send 1519 * thread. 1520 */ 1521static void * 1522remote_recv_thread(void *arg) 1523{ 1524 struct hast_resource *res = arg; 1525 struct g_gate_ctl_io *ggio; 1526 struct hio *hio; 1527 struct nv *nv; 1528 unsigned int ncomp; 1529 uint64_t seq; 1530 int error; 1531 1532 /* Remote component is 1 for now. */ 1533 ncomp = 1; 1534 1535 for (;;) { 1536 /* Wait until there is anything to receive. */ 1537 mtx_lock(&hio_recv_list_lock[ncomp]); 1538 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1539 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1540 cv_wait(&hio_recv_list_cond[ncomp], 1541 &hio_recv_list_lock[ncomp]); 1542 } 1543 mtx_unlock(&hio_recv_list_lock[ncomp]); 1544 rw_rlock(&hio_remote_lock[ncomp]); 1545 if (!ISCONNECTED(res, ncomp)) { 1546 rw_unlock(&hio_remote_lock[ncomp]); 1547 /* 1548 * Connection is dead, so move all pending requests to 1549 * the done queue (one-by-one). 1550 */ 1551 mtx_lock(&hio_recv_list_lock[ncomp]); 1552 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1553 PJDLOG_ASSERT(hio != NULL); 1554 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1555 hio_next[ncomp]); 1556 mtx_unlock(&hio_recv_list_lock[ncomp]); 1557 goto done_queue; 1558 } 1559 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1560 pjdlog_errno(LOG_ERR, 1561 "Unable to receive reply header"); 1562 rw_unlock(&hio_remote_lock[ncomp]); 1563 remote_close(res, ncomp); 1564 continue; 1565 } 1566 rw_unlock(&hio_remote_lock[ncomp]); 1567 seq = nv_get_uint64(nv, "seq"); 1568 if (seq == 0) { 1569 pjdlog_error("Header contains no 'seq' field."); 1570 nv_free(nv); 1571 continue; 1572 } 1573 mtx_lock(&hio_recv_list_lock[ncomp]); 1574 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1575 if (hio->hio_ggio.gctl_seq == seq) { 1576 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1577 hio_next[ncomp]); 1578 break; 1579 } 1580 } 1581 mtx_unlock(&hio_recv_list_lock[ncomp]); 1582 if (hio == NULL) { 1583 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1584 (uintmax_t)seq); 1585 nv_free(nv); 1586 continue; 1587 } 1588 error = nv_get_int16(nv, "error"); 1589 if (error != 0) { 1590 /* Request failed on remote side. */ 1591 hio->hio_errors[ncomp] = error; 1592 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1593 "Remote request failed (%s): ", strerror(error)); 1594 nv_free(nv); 1595 goto done_queue; 1596 } 1597 ggio = &hio->hio_ggio; 1598 switch (ggio->gctl_cmd) { 1599 case BIO_READ: 1600 rw_rlock(&hio_remote_lock[ncomp]); 1601 if (!ISCONNECTED(res, ncomp)) { 1602 rw_unlock(&hio_remote_lock[ncomp]); 1603 nv_free(nv); 1604 goto done_queue; 1605 } 1606 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1607 ggio->gctl_data, ggio->gctl_length) < 0) { 1608 hio->hio_errors[ncomp] = errno; 1609 pjdlog_errno(LOG_ERR, 1610 "Unable to receive reply data"); 1611 rw_unlock(&hio_remote_lock[ncomp]); 1612 nv_free(nv); 1613 remote_close(res, ncomp); 1614 goto done_queue; 1615 } 1616 rw_unlock(&hio_remote_lock[ncomp]); 1617 break; 1618 case BIO_WRITE: 1619 case BIO_DELETE: 1620 case BIO_FLUSH: 1621 break; 1622 default: 1623 PJDLOG_ABORT("invalid condition"); 1624 } 1625 hio->hio_errors[ncomp] = 0; 1626 nv_free(nv); 1627done_queue: 1628 if (refcount_release(&hio->hio_countdown)) { 1629 if (ISSYNCREQ(hio)) { 1630 mtx_lock(&sync_lock); 1631 SYNCREQDONE(hio); 1632 mtx_unlock(&sync_lock); 1633 cv_signal(&sync_cond); 1634 } else { 1635 pjdlog_debug(2, 1636 "remote_recv: (%p) Moving request to the done queue.", 1637 hio); 1638 QUEUE_INSERT2(hio, done); 1639 } 1640 } 1641 } 1642 /* NOTREACHED */ 1643 return (NULL); 1644} 1645 1646/* 1647 * Thread sends answer to the kernel. 1648 */ 1649static void * 1650ggate_send_thread(void *arg) 1651{ 1652 struct hast_resource *res = arg; 1653 struct g_gate_ctl_io *ggio; 1654 struct hio *hio; 1655 unsigned int ii, ncomp, ncomps; 1656 1657 ncomps = HAST_NCOMPONENTS; 1658 1659 for (;;) { 1660 pjdlog_debug(2, "ggate_send: Taking request."); 1661 QUEUE_TAKE2(hio, done); 1662 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1663 ggio = &hio->hio_ggio; 1664 for (ii = 0; ii < ncomps; ii++) { 1665 if (hio->hio_errors[ii] == 0) { 1666 /* 1667 * One successful request is enough to declare 1668 * success. 1669 */ 1670 ggio->gctl_error = 0; 1671 break; 1672 } 1673 } 1674 if (ii == ncomps) { 1675 /* 1676 * None of the requests were successful. 1677 * Use the error from local component except the 1678 * case when we did only remote request. 1679 */ 1680 if (ggio->gctl_cmd == BIO_READ && 1681 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1682 ggio->gctl_error = hio->hio_errors[1]; 1683 else 1684 ggio->gctl_error = hio->hio_errors[0]; 1685 } 1686 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1687 mtx_lock(&res->hr_amp_lock); 1688 if (activemap_write_complete(res->hr_amp, 1689 ggio->gctl_offset, ggio->gctl_length)) { 1690 res->hr_stat_activemap_update++; 1691 (void)hast_activemap_flush(res); 1692 } 1693 mtx_unlock(&res->hr_amp_lock); 1694 } 1695 if (ggio->gctl_cmd == BIO_WRITE) { 1696 /* 1697 * Unlock range we locked. 1698 */ 1699 mtx_lock(&range_lock); 1700 rangelock_del(range_regular, ggio->gctl_offset, 1701 ggio->gctl_length); 1702 if (range_sync_wait) 1703 cv_signal(&range_sync_cond); 1704 mtx_unlock(&range_lock); 1705 /* 1706 * Bump local count if this is first write after 1707 * connection failure with remote node. 1708 */ 1709 ncomp = 1; 1710 rw_rlock(&hio_remote_lock[ncomp]); 1711 if (!ISCONNECTED(res, ncomp)) { 1712 mtx_lock(&metadata_lock); 1713 if (res->hr_primary_localcnt == 1714 res->hr_secondary_remotecnt) { 1715 res->hr_primary_localcnt++; 1716 pjdlog_debug(1, 1717 "Increasing localcnt to %ju.", 1718 (uintmax_t)res->hr_primary_localcnt); 1719 (void)metadata_write(res); 1720 } 1721 mtx_unlock(&metadata_lock); 1722 } 1723 rw_unlock(&hio_remote_lock[ncomp]); 1724 } 1725 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1726 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1727 pjdlog_debug(2, 1728 "ggate_send: (%p) Moving request to the free queue.", hio); 1729 QUEUE_INSERT2(hio, free); 1730 } 1731 /* NOTREACHED */ 1732 return (NULL); 1733} 1734 1735/* 1736 * Thread synchronize local and remote components. 1737 */ 1738static void * 1739sync_thread(void *arg __unused) 1740{ 1741 struct hast_resource *res = arg; 1742 struct hio *hio; 1743 struct g_gate_ctl_io *ggio; 1744 struct timeval tstart, tend, tdiff; 1745 unsigned int ii, ncomp, ncomps; 1746 off_t offset, length, synced; 1747 bool dorewind; 1748 int syncext; 1749 1750 ncomps = HAST_NCOMPONENTS; 1751 dorewind = true; 1752 synced = 0; 1753 offset = -1; 1754 1755 for (;;) { 1756 mtx_lock(&sync_lock); 1757 if (offset >= 0 && !sync_inprogress) { 1758 gettimeofday(&tend, NULL); 1759 timersub(&tend, &tstart, &tdiff); 1760 pjdlog_info("Synchronization interrupted after %#.0T. " 1761 "%NB synchronized so far.", &tdiff, 1762 (intmax_t)synced); 1763 event_send(res, EVENT_SYNCINTR); 1764 } 1765 while (!sync_inprogress) { 1766 dorewind = true; 1767 synced = 0; 1768 cv_wait(&sync_cond, &sync_lock); 1769 } 1770 mtx_unlock(&sync_lock); 1771 /* 1772 * Obtain offset at which we should synchronize. 1773 * Rewind synchronization if needed. 1774 */ 1775 mtx_lock(&res->hr_amp_lock); 1776 if (dorewind) 1777 activemap_sync_rewind(res->hr_amp); 1778 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1779 if (syncext != -1) { 1780 /* 1781 * We synchronized entire syncext extent, we can mark 1782 * it as clean now. 1783 */ 1784 if (activemap_extent_complete(res->hr_amp, syncext)) 1785 (void)hast_activemap_flush(res); 1786 } 1787 mtx_unlock(&res->hr_amp_lock); 1788 if (dorewind) { 1789 dorewind = false; 1790 if (offset < 0) 1791 pjdlog_info("Nodes are in sync."); 1792 else { 1793 pjdlog_info("Synchronization started. %NB to go.", 1794 (intmax_t)(res->hr_extentsize * 1795 activemap_ndirty(res->hr_amp))); 1796 event_send(res, EVENT_SYNCSTART); 1797 gettimeofday(&tstart, NULL); 1798 } 1799 } 1800 if (offset < 0) { 1801 sync_stop(); 1802 pjdlog_debug(1, "Nothing to synchronize."); 1803 /* 1804 * Synchronization complete, make both localcnt and 1805 * remotecnt equal. 1806 */ 1807 ncomp = 1; 1808 rw_rlock(&hio_remote_lock[ncomp]); 1809 if (ISCONNECTED(res, ncomp)) { 1810 if (synced > 0) { 1811 int64_t bps; 1812 1813 gettimeofday(&tend, NULL); 1814 timersub(&tend, &tstart, &tdiff); 1815 bps = (int64_t)((double)synced / 1816 ((double)tdiff.tv_sec + 1817 (double)tdiff.tv_usec / 1000000)); 1818 pjdlog_info("Synchronization complete. " 1819 "%NB synchronized in %#.0lT (%NB/sec).", 1820 (intmax_t)synced, &tdiff, 1821 (intmax_t)bps); 1822 event_send(res, EVENT_SYNCDONE); 1823 } 1824 mtx_lock(&metadata_lock); 1825 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1826 res->hr_primary_localcnt = 1827 res->hr_secondary_remotecnt; 1828 res->hr_primary_remotecnt = 1829 res->hr_secondary_localcnt; 1830 pjdlog_debug(1, 1831 "Setting localcnt to %ju and remotecnt to %ju.", 1832 (uintmax_t)res->hr_primary_localcnt, 1833 (uintmax_t)res->hr_primary_remotecnt); 1834 (void)metadata_write(res); 1835 mtx_unlock(&metadata_lock); 1836 } 1837 rw_unlock(&hio_remote_lock[ncomp]); 1838 continue; 1839 } 1840 pjdlog_debug(2, "sync: Taking free request."); 1841 QUEUE_TAKE2(hio, free); 1842 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1843 /* 1844 * Lock the range we are going to synchronize. We don't want 1845 * race where someone writes between our read and write. 1846 */ 1847 for (;;) { 1848 mtx_lock(&range_lock); 1849 if (rangelock_islocked(range_regular, offset, length)) { 1850 pjdlog_debug(2, 1851 "sync: Range offset=%jd length=%jd locked.", 1852 (intmax_t)offset, (intmax_t)length); 1853 range_sync_wait = true; 1854 cv_wait(&range_sync_cond, &range_lock); 1855 range_sync_wait = false; 1856 mtx_unlock(&range_lock); 1857 continue; 1858 } 1859 if (rangelock_add(range_sync, offset, length) < 0) { 1860 mtx_unlock(&range_lock); 1861 pjdlog_debug(2, 1862 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1863 (intmax_t)offset, (intmax_t)length); 1864 sleep(1); 1865 continue; 1866 } 1867 mtx_unlock(&range_lock); 1868 break; 1869 } 1870 /* 1871 * First read the data from synchronization source. 1872 */ 1873 SYNCREQ(hio); 1874 ggio = &hio->hio_ggio; 1875 ggio->gctl_cmd = BIO_READ; 1876 ggio->gctl_offset = offset; 1877 ggio->gctl_length = length; 1878 ggio->gctl_error = 0; 1879 for (ii = 0; ii < ncomps; ii++) 1880 hio->hio_errors[ii] = EINVAL; 1881 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1882 hio); 1883 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1884 hio); 1885 mtx_lock(&metadata_lock); 1886 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1887 /* 1888 * This range is up-to-date on local component, 1889 * so handle request locally. 1890 */ 1891 /* Local component is 0 for now. */ 1892 ncomp = 0; 1893 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1894 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1895 /* 1896 * This range is out-of-date on local component, 1897 * so send request to the remote node. 1898 */ 1899 /* Remote component is 1 for now. */ 1900 ncomp = 1; 1901 } 1902 mtx_unlock(&metadata_lock); 1903 refcount_init(&hio->hio_countdown, 1); 1904 QUEUE_INSERT1(hio, send, ncomp); 1905 1906 /* 1907 * Let's wait for READ to finish. 1908 */ 1909 mtx_lock(&sync_lock); 1910 while (!ISSYNCREQDONE(hio)) 1911 cv_wait(&sync_cond, &sync_lock); 1912 mtx_unlock(&sync_lock); 1913 1914 if (hio->hio_errors[ncomp] != 0) { 1915 pjdlog_error("Unable to read synchronization data: %s.", 1916 strerror(hio->hio_errors[ncomp])); 1917 goto free_queue; 1918 } 1919 1920 /* 1921 * We read the data from synchronization source, now write it 1922 * to synchronization target. 1923 */ 1924 SYNCREQ(hio); 1925 ggio->gctl_cmd = BIO_WRITE; 1926 for (ii = 0; ii < ncomps; ii++) 1927 hio->hio_errors[ii] = EINVAL; 1928 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1929 hio); 1930 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1931 hio); 1932 mtx_lock(&metadata_lock); 1933 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1934 /* 1935 * This range is up-to-date on local component, 1936 * so we update remote component. 1937 */ 1938 /* Remote component is 1 for now. */ 1939 ncomp = 1; 1940 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1941 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1942 /* 1943 * This range is out-of-date on local component, 1944 * so we update it. 1945 */ 1946 /* Local component is 0 for now. */ 1947 ncomp = 0; 1948 } 1949 mtx_unlock(&metadata_lock); 1950 1951 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1952 hio); 1953 refcount_init(&hio->hio_countdown, 1); 1954 QUEUE_INSERT1(hio, send, ncomp); 1955 1956 /* 1957 * Let's wait for WRITE to finish. 1958 */ 1959 mtx_lock(&sync_lock); 1960 while (!ISSYNCREQDONE(hio)) 1961 cv_wait(&sync_cond, &sync_lock); 1962 mtx_unlock(&sync_lock); 1963 1964 if (hio->hio_errors[ncomp] != 0) { 1965 pjdlog_error("Unable to write synchronization data: %s.", 1966 strerror(hio->hio_errors[ncomp])); 1967 goto free_queue; 1968 } 1969 1970 synced += length; 1971free_queue: 1972 mtx_lock(&range_lock); 1973 rangelock_del(range_sync, offset, length); 1974 if (range_regular_wait) 1975 cv_signal(&range_regular_cond); 1976 mtx_unlock(&range_lock); 1977 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1978 hio); 1979 QUEUE_INSERT2(hio, free); 1980 } 1981 /* NOTREACHED */ 1982 return (NULL); 1983} 1984 1985void 1986primary_config_reload(struct hast_resource *res, struct nv *nv) 1987{ 1988 unsigned int ii, ncomps; 1989 int modified, vint; 1990 const char *vstr; 1991 1992 pjdlog_info("Reloading configuration..."); 1993 1994 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1995 PJDLOG_ASSERT(gres == res); 1996 nv_assert(nv, "remoteaddr"); 1997 nv_assert(nv, "sourceaddr"); 1998 nv_assert(nv, "replication"); 1999 nv_assert(nv, "checksum"); 2000 nv_assert(nv, "compression"); 2001 nv_assert(nv, "timeout"); 2002 nv_assert(nv, "exec"); 2003 2004 ncomps = HAST_NCOMPONENTS; 2005 2006#define MODIFIED_REMOTEADDR 0x01 2007#define MODIFIED_SOURCEADDR 0x02 2008#define MODIFIED_REPLICATION 0x04 2009#define MODIFIED_CHECKSUM 0x08 2010#define MODIFIED_COMPRESSION 0x10 2011#define MODIFIED_TIMEOUT 0x20 2012#define MODIFIED_EXEC 0x40 2013 modified = 0; 2014 2015 vstr = nv_get_string(nv, "remoteaddr"); 2016 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2017 /* 2018 * Don't copy res->hr_remoteaddr to gres just yet. 2019 * We want remote_close() to log disconnect from the old 2020 * addresses, not from the new ones. 2021 */ 2022 modified |= MODIFIED_REMOTEADDR; 2023 } 2024 vstr = nv_get_string(nv, "sourceaddr"); 2025 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2026 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2027 modified |= MODIFIED_SOURCEADDR; 2028 } 2029 vint = nv_get_int32(nv, "replication"); 2030 if (gres->hr_replication != vint) { 2031 gres->hr_replication = vint; 2032 modified |= MODIFIED_REPLICATION; 2033 } 2034 vint = nv_get_int32(nv, "checksum"); 2035 if (gres->hr_checksum != vint) { 2036 gres->hr_checksum = vint; 2037 modified |= MODIFIED_CHECKSUM; 2038 } 2039 vint = nv_get_int32(nv, "compression"); 2040 if (gres->hr_compression != vint) { 2041 gres->hr_compression = vint; 2042 modified |= MODIFIED_COMPRESSION; 2043 } 2044 vint = nv_get_int32(nv, "timeout"); 2045 if (gres->hr_timeout != vint) { 2046 gres->hr_timeout = vint; 2047 modified |= MODIFIED_TIMEOUT; 2048 } 2049 vstr = nv_get_string(nv, "exec"); 2050 if (strcmp(gres->hr_exec, vstr) != 0) { 2051 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2052 modified |= MODIFIED_EXEC; 2053 } 2054 2055 /* 2056 * Change timeout for connected sockets. 2057 * Don't bother if we need to reconnect. 2058 */ 2059 if ((modified & MODIFIED_TIMEOUT) != 0 && 2060 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2061 MODIFIED_REPLICATION)) == 0) { 2062 for (ii = 0; ii < ncomps; ii++) { 2063 if (!ISREMOTE(ii)) 2064 continue; 2065 rw_rlock(&hio_remote_lock[ii]); 2066 if (!ISCONNECTED(gres, ii)) { 2067 rw_unlock(&hio_remote_lock[ii]); 2068 continue; 2069 } 2070 rw_unlock(&hio_remote_lock[ii]); 2071 if (proto_timeout(gres->hr_remotein, 2072 gres->hr_timeout) < 0) { 2073 pjdlog_errno(LOG_WARNING, 2074 "Unable to set connection timeout"); 2075 } 2076 if (proto_timeout(gres->hr_remoteout, 2077 gres->hr_timeout) < 0) { 2078 pjdlog_errno(LOG_WARNING, 2079 "Unable to set connection timeout"); 2080 } 2081 } 2082 } 2083 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR | 2084 MODIFIED_REPLICATION)) != 0) { 2085 for (ii = 0; ii < ncomps; ii++) { 2086 if (!ISREMOTE(ii)) 2087 continue; 2088 remote_close(gres, ii); 2089 } 2090 if (modified & MODIFIED_REMOTEADDR) { 2091 vstr = nv_get_string(nv, "remoteaddr"); 2092 strlcpy(gres->hr_remoteaddr, vstr, 2093 sizeof(gres->hr_remoteaddr)); 2094 } 2095 } 2096#undef MODIFIED_REMOTEADDR 2097#undef MODIFIED_SOURCEADDR 2098#undef MODIFIED_REPLICATION 2099#undef MODIFIED_CHECKSUM 2100#undef MODIFIED_COMPRESSION 2101#undef MODIFIED_TIMEOUT 2102#undef MODIFIED_EXEC 2103 2104 pjdlog_info("Configuration reloaded successfully."); 2105} 2106 2107static void 2108guard_one(struct hast_resource *res, unsigned int ncomp) 2109{ 2110 struct proto_conn *in, *out; 2111 2112 if (!ISREMOTE(ncomp)) 2113 return; 2114 2115 rw_rlock(&hio_remote_lock[ncomp]); 2116 2117 if (!real_remote(res)) { 2118 rw_unlock(&hio_remote_lock[ncomp]); 2119 return; 2120 } 2121 2122 if (ISCONNECTED(res, ncomp)) { 2123 PJDLOG_ASSERT(res->hr_remotein != NULL); 2124 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2125 rw_unlock(&hio_remote_lock[ncomp]); 2126 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2127 res->hr_remoteaddr); 2128 return; 2129 } 2130 2131 PJDLOG_ASSERT(res->hr_remotein == NULL); 2132 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2133 /* 2134 * Upgrade the lock. It doesn't have to be atomic as no other thread 2135 * can change connection status from disconnected to connected. 2136 */ 2137 rw_unlock(&hio_remote_lock[ncomp]); 2138 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2139 res->hr_remoteaddr); 2140 in = out = NULL; 2141 if (init_remote(res, &in, &out) == 0) { 2142 rw_wlock(&hio_remote_lock[ncomp]); 2143 PJDLOG_ASSERT(res->hr_remotein == NULL); 2144 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2145 PJDLOG_ASSERT(in != NULL && out != NULL); 2146 res->hr_remotein = in; 2147 res->hr_remoteout = out; 2148 rw_unlock(&hio_remote_lock[ncomp]); 2149 pjdlog_info("Successfully reconnected to %s.", 2150 res->hr_remoteaddr); 2151 sync_start(); 2152 } else { 2153 /* Both connections should be NULL. */ 2154 PJDLOG_ASSERT(res->hr_remotein == NULL); 2155 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2156 PJDLOG_ASSERT(in == NULL && out == NULL); 2157 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2158 res->hr_remoteaddr); 2159 } 2160} 2161 2162/* 2163 * Thread guards remote connections and reconnects when needed, handles 2164 * signals, etc. 2165 */ 2166static void * 2167guard_thread(void *arg) 2168{ 2169 struct hast_resource *res = arg; 2170 unsigned int ii, ncomps; 2171 struct timespec timeout; 2172 time_t lastcheck, now; 2173 sigset_t mask; 2174 int signo; 2175 2176 ncomps = HAST_NCOMPONENTS; 2177 lastcheck = time(NULL); 2178 2179 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2180 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2181 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2182 2183 timeout.tv_sec = HAST_KEEPALIVE; 2184 timeout.tv_nsec = 0; 2185 signo = -1; 2186 2187 for (;;) { 2188 switch (signo) { 2189 case SIGINT: 2190 case SIGTERM: 2191 sigexit_received = true; 2192 primary_exitx(EX_OK, 2193 "Termination signal received, exiting."); 2194 break; 2195 default: 2196 break; 2197 } 2198 2199 /* 2200 * Don't check connections until we fully started, 2201 * as we may still be looping, waiting for remote node 2202 * to switch from primary to secondary. 2203 */ 2204 if (fullystarted) { 2205 pjdlog_debug(2, "remote_guard: Checking connections."); 2206 now = time(NULL); 2207 if (lastcheck + HAST_KEEPALIVE <= now) { 2208 for (ii = 0; ii < ncomps; ii++) 2209 guard_one(res, ii); 2210 lastcheck = now; 2211 } 2212 } 2213 signo = sigtimedwait(&mask, NULL, &timeout); 2214 } 2215 /* NOTREACHED */ 2216 return (NULL); 2217} 2218