primary.c revision 210881
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30#include <sys/cdefs.h> 31__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 210881 2010-08-05 19:01:57Z pjd $"); 32 33#include <sys/types.h> 34#include <sys/time.h> 35#include <sys/bio.h> 36#include <sys/disk.h> 37#include <sys/refcount.h> 38#include <sys/stat.h> 39 40#include <geom/gate/g_gate.h> 41 42#include <assert.h> 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <stdint.h> 49#include <stdio.h> 50#include <string.h> 51#include <sysexits.h> 52#include <unistd.h> 53 54#include <activemap.h> 55#include <nv.h> 56#include <rangelock.h> 57 58#include "control.h" 59#include "hast.h" 60#include "hast_proto.h" 61#include "hastd.h" 62#include "metadata.h" 63#include "proto.h" 64#include "pjdlog.h" 65#include "subr.h" 66#include "synch.h" 67 68struct hio { 69 /* 70 * Number of components we are still waiting for. 71 * When this field goes to 0, we can send the request back to the 72 * kernel. Each component has to decrease this counter by one 73 * even on failure. 74 */ 75 unsigned int hio_countdown; 76 /* 77 * Each component has a place to store its own error. 78 * Once the request is handled by all components we can decide if the 79 * request overall is successful or not. 80 */ 81 int *hio_errors; 82 /* 83 * Structure used to comunicate with GEOM Gate class. 84 */ 85 struct g_gate_ctl_io hio_ggio; 86 TAILQ_ENTRY(hio) *hio_next; 87}; 88#define hio_free_next hio_next[0] 89#define hio_done_next hio_next[0] 90 91/* 92 * Free list holds unused structures. When free list is empty, we have to wait 93 * until some in-progress requests are freed. 94 */ 95static TAILQ_HEAD(, hio) hio_free_list; 96static pthread_mutex_t hio_free_list_lock; 97static pthread_cond_t hio_free_list_cond; 98/* 99 * There is one send list for every component. One requests is placed on all 100 * send lists - each component gets the same request, but each component is 101 * responsible for managing his own send list. 102 */ 103static TAILQ_HEAD(, hio) *hio_send_list; 104static pthread_mutex_t *hio_send_list_lock; 105static pthread_cond_t *hio_send_list_cond; 106/* 107 * There is one recv list for every component, although local components don't 108 * use recv lists as local requests are done synchronously. 109 */ 110static TAILQ_HEAD(, hio) *hio_recv_list; 111static pthread_mutex_t *hio_recv_list_lock; 112static pthread_cond_t *hio_recv_list_cond; 113/* 114 * Request is placed on done list by the slowest component (the one that 115 * decreased hio_countdown from 1 to 0). 116 */ 117static TAILQ_HEAD(, hio) hio_done_list; 118static pthread_mutex_t hio_done_list_lock; 119static pthread_cond_t hio_done_list_cond; 120/* 121 * Structure below are for interaction with sync thread. 122 */ 123static bool sync_inprogress; 124static pthread_mutex_t sync_lock; 125static pthread_cond_t sync_cond; 126/* 127 * The lock below allows to synchornize access to remote connections. 128 */ 129static pthread_rwlock_t *hio_remote_lock; 130static pthread_mutex_t hio_guard_lock; 131static pthread_cond_t hio_guard_cond; 132 133/* 134 * Lock to synchronize metadata updates. Also synchronize access to 135 * hr_primary_localcnt and hr_primary_remotecnt fields. 136 */ 137static pthread_mutex_t metadata_lock; 138 139/* 140 * Maximum number of outstanding I/O requests. 141 */ 142#define HAST_HIO_MAX 256 143/* 144 * Number of components. At this point there are only two components: local 145 * and remote, but in the future it might be possible to use multiple local 146 * and remote components. 147 */ 148#define HAST_NCOMPONENTS 2 149/* 150 * Number of seconds to sleep before next reconnect try. 151 */ 152#define RECONNECT_SLEEP 5 153 154#define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167} while (0) 168#define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177} while (0) 178#define QUEUE_TAKE1(hio, name, ncomp) do { \ 179 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 180 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 181 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 182 &hio_##name##_list_lock[(ncomp)]); \ 183 } \ 184 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 185 hio_next[(ncomp)]); \ 186 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 187} while (0) 188#define QUEUE_TAKE2(hio, name) do { \ 189 mtx_lock(&hio_##name##_list_lock); \ 190 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 191 cv_wait(&hio_##name##_list_cond, \ 192 &hio_##name##_list_lock); \ 193 } \ 194 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 195 mtx_unlock(&hio_##name##_list_lock); \ 196} while (0) 197 198#define SYNCREQ(hio) do { \ 199 (hio)->hio_ggio.gctl_unit = -1; \ 200 (hio)->hio_ggio.gctl_seq = 1; \ 201} while (0) 202#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 203#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 204#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 205 206static struct hast_resource *gres; 207 208static pthread_mutex_t range_lock; 209static struct rangelocks *range_regular; 210static bool range_regular_wait; 211static pthread_cond_t range_regular_cond; 212static struct rangelocks *range_sync; 213static bool range_sync_wait; 214static pthread_cond_t range_sync_cond; 215 216static void *ggate_recv_thread(void *arg); 217static void *local_send_thread(void *arg); 218static void *remote_send_thread(void *arg); 219static void *remote_recv_thread(void *arg); 220static void *ggate_send_thread(void *arg); 221static void *sync_thread(void *arg); 222static void *guard_thread(void *arg); 223 224static void sighandler(int sig); 225 226static void 227cleanup(struct hast_resource *res) 228{ 229 int rerrno; 230 231 /* Remember errno. */ 232 rerrno = errno; 233 234 /* 235 * Close descriptor to /dev/hast/<name> 236 * to work-around race in the kernel. 237 */ 238 close(res->hr_localfd); 239 240 /* Destroy ggate provider if we created one. */ 241 if (res->hr_ggateunit >= 0) { 242 struct g_gate_ctl_destroy ggiod; 243 244 ggiod.gctl_version = G_GATE_VERSION; 245 ggiod.gctl_unit = res->hr_ggateunit; 246 ggiod.gctl_force = 1; 247 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 248 pjdlog_warning("Unable to destroy hast/%s device", 249 res->hr_provname); 250 } 251 res->hr_ggateunit = -1; 252 } 253 254 /* Restore errno. */ 255 errno = rerrno; 256} 257 258static void 259primary_exit(int exitcode, const char *fmt, ...) 260{ 261 va_list ap; 262 263 assert(exitcode != EX_OK); 264 va_start(ap, fmt); 265 pjdlogv_errno(LOG_ERR, fmt, ap); 266 va_end(ap); 267 cleanup(gres); 268 exit(exitcode); 269} 270 271static void 272primary_exitx(int exitcode, const char *fmt, ...) 273{ 274 va_list ap; 275 276 va_start(ap, fmt); 277 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 278 va_end(ap); 279 cleanup(gres); 280 exit(exitcode); 281} 282 283static int 284hast_activemap_flush(struct hast_resource *res) 285{ 286 const unsigned char *buf; 287 size_t size; 288 289 buf = activemap_bitmap(res->hr_amp, &size); 290 assert(buf != NULL); 291 assert((size % res->hr_local_sectorsize) == 0); 292 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 293 (ssize_t)size) { 294 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 295 "Unable to flush activemap to disk")); 296 return (-1); 297 } 298 return (0); 299} 300 301static bool 302real_remote(const struct hast_resource *res) 303{ 304 305 return (strcmp(res->hr_remoteaddr, "none") != 0); 306} 307 308static void 309init_environment(struct hast_resource *res __unused) 310{ 311 struct hio *hio; 312 unsigned int ii, ncomps; 313 314 /* 315 * In the future it might be per-resource value. 316 */ 317 ncomps = HAST_NCOMPONENTS; 318 319 /* 320 * Allocate memory needed by lists. 321 */ 322 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 323 if (hio_send_list == NULL) { 324 primary_exitx(EX_TEMPFAIL, 325 "Unable to allocate %zu bytes of memory for send lists.", 326 sizeof(hio_send_list[0]) * ncomps); 327 } 328 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 329 if (hio_send_list_lock == NULL) { 330 primary_exitx(EX_TEMPFAIL, 331 "Unable to allocate %zu bytes of memory for send list locks.", 332 sizeof(hio_send_list_lock[0]) * ncomps); 333 } 334 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 335 if (hio_send_list_cond == NULL) { 336 primary_exitx(EX_TEMPFAIL, 337 "Unable to allocate %zu bytes of memory for send list condition variables.", 338 sizeof(hio_send_list_cond[0]) * ncomps); 339 } 340 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 341 if (hio_recv_list == NULL) { 342 primary_exitx(EX_TEMPFAIL, 343 "Unable to allocate %zu bytes of memory for recv lists.", 344 sizeof(hio_recv_list[0]) * ncomps); 345 } 346 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 347 if (hio_recv_list_lock == NULL) { 348 primary_exitx(EX_TEMPFAIL, 349 "Unable to allocate %zu bytes of memory for recv list locks.", 350 sizeof(hio_recv_list_lock[0]) * ncomps); 351 } 352 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 353 if (hio_recv_list_cond == NULL) { 354 primary_exitx(EX_TEMPFAIL, 355 "Unable to allocate %zu bytes of memory for recv list condition variables.", 356 sizeof(hio_recv_list_cond[0]) * ncomps); 357 } 358 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 359 if (hio_remote_lock == NULL) { 360 primary_exitx(EX_TEMPFAIL, 361 "Unable to allocate %zu bytes of memory for remote connections locks.", 362 sizeof(hio_remote_lock[0]) * ncomps); 363 } 364 365 /* 366 * Initialize lists, their locks and theirs condition variables. 367 */ 368 TAILQ_INIT(&hio_free_list); 369 mtx_init(&hio_free_list_lock); 370 cv_init(&hio_free_list_cond); 371 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 372 TAILQ_INIT(&hio_send_list[ii]); 373 mtx_init(&hio_send_list_lock[ii]); 374 cv_init(&hio_send_list_cond[ii]); 375 TAILQ_INIT(&hio_recv_list[ii]); 376 mtx_init(&hio_recv_list_lock[ii]); 377 cv_init(&hio_recv_list_cond[ii]); 378 rw_init(&hio_remote_lock[ii]); 379 } 380 TAILQ_INIT(&hio_done_list); 381 mtx_init(&hio_done_list_lock); 382 cv_init(&hio_done_list_cond); 383 mtx_init(&hio_guard_lock); 384 cv_init(&hio_guard_cond); 385 mtx_init(&metadata_lock); 386 387 /* 388 * Allocate requests pool and initialize requests. 389 */ 390 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 391 hio = malloc(sizeof(*hio)); 392 if (hio == NULL) { 393 primary_exitx(EX_TEMPFAIL, 394 "Unable to allocate %zu bytes of memory for hio request.", 395 sizeof(*hio)); 396 } 397 hio->hio_countdown = 0; 398 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 399 if (hio->hio_errors == NULL) { 400 primary_exitx(EX_TEMPFAIL, 401 "Unable allocate %zu bytes of memory for hio errors.", 402 sizeof(hio->hio_errors[0]) * ncomps); 403 } 404 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 405 if (hio->hio_next == NULL) { 406 primary_exitx(EX_TEMPFAIL, 407 "Unable allocate %zu bytes of memory for hio_next field.", 408 sizeof(hio->hio_next[0]) * ncomps); 409 } 410 hio->hio_ggio.gctl_version = G_GATE_VERSION; 411 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 412 if (hio->hio_ggio.gctl_data == NULL) { 413 primary_exitx(EX_TEMPFAIL, 414 "Unable to allocate %zu bytes of memory for gctl_data.", 415 MAXPHYS); 416 } 417 hio->hio_ggio.gctl_length = MAXPHYS; 418 hio->hio_ggio.gctl_error = 0; 419 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 420 } 421 422 /* 423 * Turn on signals handling. 424 */ 425 signal(SIGINT, sighandler); 426 signal(SIGTERM, sighandler); 427} 428 429static void 430init_local(struct hast_resource *res) 431{ 432 unsigned char *buf; 433 size_t mapsize; 434 435 if (metadata_read(res, true) < 0) 436 exit(EX_NOINPUT); 437 mtx_init(&res->hr_amp_lock); 438 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 439 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 440 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 441 } 442 mtx_init(&range_lock); 443 cv_init(&range_regular_cond); 444 if (rangelock_init(&range_regular) < 0) 445 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 446 cv_init(&range_sync_cond); 447 if (rangelock_init(&range_sync) < 0) 448 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 449 mapsize = activemap_ondisk_size(res->hr_amp); 450 buf = calloc(1, mapsize); 451 if (buf == NULL) { 452 primary_exitx(EX_TEMPFAIL, 453 "Unable to allocate buffer for activemap."); 454 } 455 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 456 (ssize_t)mapsize) { 457 primary_exit(EX_NOINPUT, "Unable to read activemap"); 458 } 459 activemap_copyin(res->hr_amp, buf, mapsize); 460 free(buf); 461 if (res->hr_resuid != 0) 462 return; 463 /* 464 * We're using provider for the first time, so we have to generate 465 * resource unique identifier and initialize local and remote counts. 466 */ 467 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 468 res->hr_primary_localcnt = 1; 469 res->hr_primary_remotecnt = 0; 470 if (metadata_write(res) < 0) 471 exit(EX_NOINPUT); 472} 473 474static bool 475init_remote(struct hast_resource *res, struct proto_conn **inp, 476 struct proto_conn **outp) 477{ 478 struct proto_conn *in, *out; 479 struct nv *nvout, *nvin; 480 const unsigned char *token; 481 unsigned char *map; 482 const char *errmsg; 483 int32_t extentsize; 484 int64_t datasize; 485 uint32_t mapsize; 486 size_t size; 487 488 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 489 assert(real_remote(res)); 490 491 in = out = NULL; 492 493 /* Prepare outgoing connection with remote node. */ 494 if (proto_client(res->hr_remoteaddr, &out) < 0) { 495 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 496 res->hr_remoteaddr); 497 } 498 /* Try to connect, but accept failure. */ 499 if (proto_connect(out) < 0) { 500 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 501 res->hr_remoteaddr); 502 goto close; 503 } 504 /* Error in setting timeout is not critical, but why should it fail? */ 505 if (proto_timeout(out, res->hr_timeout) < 0) 506 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 507 /* 508 * First handshake step. 509 * Setup outgoing connection with remote node. 510 */ 511 nvout = nv_alloc(); 512 nv_add_string(nvout, res->hr_name, "resource"); 513 if (nv_error(nvout) != 0) { 514 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 515 "Unable to allocate header for connection with %s", 516 res->hr_remoteaddr); 517 nv_free(nvout); 518 goto close; 519 } 520 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 521 pjdlog_errno(LOG_WARNING, 522 "Unable to send handshake header to %s", 523 res->hr_remoteaddr); 524 nv_free(nvout); 525 goto close; 526 } 527 nv_free(nvout); 528 if (hast_proto_recv_hdr(out, &nvin) < 0) { 529 pjdlog_errno(LOG_WARNING, 530 "Unable to receive handshake header from %s", 531 res->hr_remoteaddr); 532 goto close; 533 } 534 errmsg = nv_get_string(nvin, "errmsg"); 535 if (errmsg != NULL) { 536 pjdlog_warning("%s", errmsg); 537 nv_free(nvin); 538 goto close; 539 } 540 token = nv_get_uint8_array(nvin, &size, "token"); 541 if (token == NULL) { 542 pjdlog_warning("Handshake header from %s has no 'token' field.", 543 res->hr_remoteaddr); 544 nv_free(nvin); 545 goto close; 546 } 547 if (size != sizeof(res->hr_token)) { 548 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 549 res->hr_remoteaddr, size, sizeof(res->hr_token)); 550 nv_free(nvin); 551 goto close; 552 } 553 bcopy(token, res->hr_token, sizeof(res->hr_token)); 554 nv_free(nvin); 555 556 /* 557 * Second handshake step. 558 * Setup incoming connection with remote node. 559 */ 560 if (proto_client(res->hr_remoteaddr, &in) < 0) { 561 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 562 res->hr_remoteaddr); 563 } 564 /* Try to connect, but accept failure. */ 565 if (proto_connect(in) < 0) { 566 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 567 res->hr_remoteaddr); 568 goto close; 569 } 570 /* Error in setting timeout is not critical, but why should it fail? */ 571 if (proto_timeout(in, res->hr_timeout) < 0) 572 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 573 nvout = nv_alloc(); 574 nv_add_string(nvout, res->hr_name, "resource"); 575 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 576 "token"); 577 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 578 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 579 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 580 if (nv_error(nvout) != 0) { 581 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 582 "Unable to allocate header for connection with %s", 583 res->hr_remoteaddr); 584 nv_free(nvout); 585 goto close; 586 } 587 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 588 pjdlog_errno(LOG_WARNING, 589 "Unable to send handshake header to %s", 590 res->hr_remoteaddr); 591 nv_free(nvout); 592 goto close; 593 } 594 nv_free(nvout); 595 if (hast_proto_recv_hdr(out, &nvin) < 0) { 596 pjdlog_errno(LOG_WARNING, 597 "Unable to receive handshake header from %s", 598 res->hr_remoteaddr); 599 goto close; 600 } 601 errmsg = nv_get_string(nvin, "errmsg"); 602 if (errmsg != NULL) { 603 pjdlog_warning("%s", errmsg); 604 nv_free(nvin); 605 goto close; 606 } 607 datasize = nv_get_int64(nvin, "datasize"); 608 if (datasize != res->hr_datasize) { 609 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 610 (intmax_t)res->hr_datasize, (intmax_t)datasize); 611 nv_free(nvin); 612 goto close; 613 } 614 extentsize = nv_get_int32(nvin, "extentsize"); 615 if (extentsize != res->hr_extentsize) { 616 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 617 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 618 nv_free(nvin); 619 goto close; 620 } 621 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 622 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 623 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 624 map = NULL; 625 mapsize = nv_get_uint32(nvin, "mapsize"); 626 if (mapsize > 0) { 627 map = malloc(mapsize); 628 if (map == NULL) { 629 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 630 (uintmax_t)mapsize); 631 nv_free(nvin); 632 goto close; 633 } 634 /* 635 * Remote node have some dirty extents on its own, lets 636 * download its activemap. 637 */ 638 if (hast_proto_recv_data(res, out, nvin, map, 639 mapsize) < 0) { 640 pjdlog_errno(LOG_ERR, 641 "Unable to receive remote activemap"); 642 nv_free(nvin); 643 free(map); 644 goto close; 645 } 646 /* 647 * Merge local and remote bitmaps. 648 */ 649 activemap_merge(res->hr_amp, map, mapsize); 650 free(map); 651 /* 652 * Now that we merged bitmaps from both nodes, flush it to the 653 * disk before we start to synchronize. 654 */ 655 (void)hast_activemap_flush(res); 656 } 657 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 658 if (inp != NULL && outp != NULL) { 659 *inp = in; 660 *outp = out; 661 } else { 662 res->hr_remotein = in; 663 res->hr_remoteout = out; 664 } 665 return (true); 666close: 667 proto_close(out); 668 if (in != NULL) 669 proto_close(in); 670 return (false); 671} 672 673static void 674sync_start(void) 675{ 676 677 mtx_lock(&sync_lock); 678 sync_inprogress = true; 679 mtx_unlock(&sync_lock); 680 cv_signal(&sync_cond); 681} 682 683static void 684init_ggate(struct hast_resource *res) 685{ 686 struct g_gate_ctl_create ggiocreate; 687 struct g_gate_ctl_cancel ggiocancel; 688 689 /* 690 * We communicate with ggate via /dev/ggctl. Open it. 691 */ 692 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 693 if (res->hr_ggatefd < 0) 694 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 695 /* 696 * Create provider before trying to connect, as connection failure 697 * is not critical, but may take some time. 698 */ 699 ggiocreate.gctl_version = G_GATE_VERSION; 700 ggiocreate.gctl_mediasize = res->hr_datasize; 701 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 702 ggiocreate.gctl_flags = 0; 703 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 704 ggiocreate.gctl_timeout = 0; 705 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 706 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 707 res->hr_provname); 708 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 709 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 710 pjdlog_info("Device hast/%s created.", res->hr_provname); 711 res->hr_ggateunit = ggiocreate.gctl_unit; 712 return; 713 } 714 if (errno != EEXIST) { 715 primary_exit(EX_OSERR, "Unable to create hast/%s device", 716 res->hr_provname); 717 } 718 pjdlog_debug(1, 719 "Device hast/%s already exists, we will try to take it over.", 720 res->hr_provname); 721 /* 722 * If we received EEXIST, we assume that the process who created the 723 * provider died and didn't clean up. In that case we will start from 724 * where he left of. 725 */ 726 ggiocancel.gctl_version = G_GATE_VERSION; 727 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 728 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 729 res->hr_provname); 730 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 731 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 732 res->hr_ggateunit = ggiocancel.gctl_unit; 733 return; 734 } 735 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 736 res->hr_provname); 737} 738 739void 740hastd_primary(struct hast_resource *res) 741{ 742 pthread_t td; 743 pid_t pid; 744 int error; 745 746 gres = res; 747 748 /* 749 * Create communication channel between parent and child. 750 */ 751 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 752 KEEP_ERRNO((void)pidfile_remove(pfh)); 753 primary_exit(EX_OSERR, 754 "Unable to create control sockets between parent and child"); 755 } 756 757 pid = fork(); 758 if (pid < 0) { 759 KEEP_ERRNO((void)pidfile_remove(pfh)); 760 primary_exit(EX_TEMPFAIL, "Unable to fork"); 761 } 762 763 if (pid > 0) { 764 /* This is parent. */ 765 res->hr_workerpid = pid; 766 return; 767 } 768 (void)pidfile_close(pfh); 769 770 setproctitle("%s (primary)", res->hr_name); 771 772 signal(SIGHUP, SIG_DFL); 773 signal(SIGCHLD, SIG_DFL); 774 775 init_local(res); 776 if (real_remote(res) && init_remote(res, NULL, NULL)) 777 sync_start(); 778 init_ggate(res); 779 init_environment(res); 780 error = pthread_create(&td, NULL, ggate_recv_thread, res); 781 assert(error == 0); 782 error = pthread_create(&td, NULL, local_send_thread, res); 783 assert(error == 0); 784 error = pthread_create(&td, NULL, remote_send_thread, res); 785 assert(error == 0); 786 error = pthread_create(&td, NULL, remote_recv_thread, res); 787 assert(error == 0); 788 error = pthread_create(&td, NULL, ggate_send_thread, res); 789 assert(error == 0); 790 error = pthread_create(&td, NULL, sync_thread, res); 791 assert(error == 0); 792 error = pthread_create(&td, NULL, ctrl_thread, res); 793 assert(error == 0); 794 (void)guard_thread(res); 795} 796 797static void 798reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 799{ 800 char msg[1024]; 801 va_list ap; 802 int len; 803 804 va_start(ap, fmt); 805 len = vsnprintf(msg, sizeof(msg), fmt, ap); 806 va_end(ap); 807 if ((size_t)len < sizeof(msg)) { 808 switch (ggio->gctl_cmd) { 809 case BIO_READ: 810 (void)snprintf(msg + len, sizeof(msg) - len, 811 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 812 (uintmax_t)ggio->gctl_length); 813 break; 814 case BIO_DELETE: 815 (void)snprintf(msg + len, sizeof(msg) - len, 816 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 817 (uintmax_t)ggio->gctl_length); 818 break; 819 case BIO_FLUSH: 820 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 821 break; 822 case BIO_WRITE: 823 (void)snprintf(msg + len, sizeof(msg) - len, 824 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 825 (uintmax_t)ggio->gctl_length); 826 break; 827 default: 828 (void)snprintf(msg + len, sizeof(msg) - len, 829 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 830 break; 831 } 832 } 833 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 834} 835 836static void 837remote_close(struct hast_resource *res, int ncomp) 838{ 839 840 rw_wlock(&hio_remote_lock[ncomp]); 841 /* 842 * A race is possible between dropping rlock and acquiring wlock - 843 * another thread can close connection in-between. 844 */ 845 if (!ISCONNECTED(res, ncomp)) { 846 assert(res->hr_remotein == NULL); 847 assert(res->hr_remoteout == NULL); 848 rw_unlock(&hio_remote_lock[ncomp]); 849 return; 850 } 851 852 assert(res->hr_remotein != NULL); 853 assert(res->hr_remoteout != NULL); 854 855 pjdlog_debug(2, "Closing old incoming connection to %s.", 856 res->hr_remoteaddr); 857 proto_close(res->hr_remotein); 858 res->hr_remotein = NULL; 859 pjdlog_debug(2, "Closing old outgoing connection to %s.", 860 res->hr_remoteaddr); 861 proto_close(res->hr_remoteout); 862 res->hr_remoteout = NULL; 863 864 rw_unlock(&hio_remote_lock[ncomp]); 865 866 /* 867 * Stop synchronization if in-progress. 868 */ 869 mtx_lock(&sync_lock); 870 if (sync_inprogress) 871 sync_inprogress = false; 872 mtx_unlock(&sync_lock); 873 874 /* 875 * Wake up guard thread, so it can immediately start reconnect. 876 */ 877 mtx_lock(&hio_guard_lock); 878 cv_signal(&hio_guard_cond); 879 mtx_unlock(&hio_guard_lock); 880} 881 882/* 883 * Thread receives ggate I/O requests from the kernel and passes them to 884 * appropriate threads: 885 * WRITE - always goes to both local_send and remote_send threads 886 * READ (when the block is up-to-date on local component) - 887 * only local_send thread 888 * READ (when the block isn't up-to-date on local component) - 889 * only remote_send thread 890 * DELETE - always goes to both local_send and remote_send threads 891 * FLUSH - always goes to both local_send and remote_send threads 892 */ 893static void * 894ggate_recv_thread(void *arg) 895{ 896 struct hast_resource *res = arg; 897 struct g_gate_ctl_io *ggio; 898 struct hio *hio; 899 unsigned int ii, ncomp, ncomps; 900 int error; 901 902 ncomps = HAST_NCOMPONENTS; 903 904 for (;;) { 905 pjdlog_debug(2, "ggate_recv: Taking free request."); 906 QUEUE_TAKE2(hio, free); 907 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 908 ggio = &hio->hio_ggio; 909 ggio->gctl_unit = res->hr_ggateunit; 910 ggio->gctl_length = MAXPHYS; 911 ggio->gctl_error = 0; 912 pjdlog_debug(2, 913 "ggate_recv: (%p) Waiting for request from the kernel.", 914 hio); 915 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 916 if (sigexit_received) 917 pthread_exit(NULL); 918 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 919 } 920 error = ggio->gctl_error; 921 switch (error) { 922 case 0: 923 break; 924 case ECANCELED: 925 /* Exit gracefully. */ 926 if (!sigexit_received) { 927 pjdlog_debug(2, 928 "ggate_recv: (%p) Received cancel from the kernel.", 929 hio); 930 pjdlog_info("Received cancel from the kernel, exiting."); 931 } 932 pthread_exit(NULL); 933 case ENOMEM: 934 /* 935 * Buffer too small? Impossible, we allocate MAXPHYS 936 * bytes - request can't be bigger than that. 937 */ 938 /* FALLTHROUGH */ 939 case ENXIO: 940 default: 941 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 942 strerror(error)); 943 } 944 for (ii = 0; ii < ncomps; ii++) 945 hio->hio_errors[ii] = EINVAL; 946 reqlog(LOG_DEBUG, 2, ggio, 947 "ggate_recv: (%p) Request received from the kernel: ", 948 hio); 949 /* 950 * Inform all components about new write request. 951 * For read request prefer local component unless the given 952 * range is out-of-date, then use remote component. 953 */ 954 switch (ggio->gctl_cmd) { 955 case BIO_READ: 956 pjdlog_debug(2, 957 "ggate_recv: (%p) Moving request to the send queue.", 958 hio); 959 refcount_init(&hio->hio_countdown, 1); 960 mtx_lock(&metadata_lock); 961 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 962 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 963 /* 964 * This range is up-to-date on local component, 965 * so handle request locally. 966 */ 967 /* Local component is 0 for now. */ 968 ncomp = 0; 969 } else /* if (res->hr_syncsrc == 970 HAST_SYNCSRC_SECONDARY) */ { 971 assert(res->hr_syncsrc == 972 HAST_SYNCSRC_SECONDARY); 973 /* 974 * This range is out-of-date on local component, 975 * so send request to the remote node. 976 */ 977 /* Remote component is 1 for now. */ 978 ncomp = 1; 979 } 980 mtx_unlock(&metadata_lock); 981 QUEUE_INSERT1(hio, send, ncomp); 982 break; 983 case BIO_WRITE: 984 for (;;) { 985 mtx_lock(&range_lock); 986 if (rangelock_islocked(range_sync, 987 ggio->gctl_offset, ggio->gctl_length)) { 988 pjdlog_debug(2, 989 "regular: Range offset=%jd length=%zu locked.", 990 (intmax_t)ggio->gctl_offset, 991 (size_t)ggio->gctl_length); 992 range_regular_wait = true; 993 cv_wait(&range_regular_cond, &range_lock); 994 range_regular_wait = false; 995 mtx_unlock(&range_lock); 996 continue; 997 } 998 if (rangelock_add(range_regular, 999 ggio->gctl_offset, ggio->gctl_length) < 0) { 1000 mtx_unlock(&range_lock); 1001 pjdlog_debug(2, 1002 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1003 (intmax_t)ggio->gctl_offset, 1004 (size_t)ggio->gctl_length); 1005 sleep(1); 1006 continue; 1007 } 1008 mtx_unlock(&range_lock); 1009 break; 1010 } 1011 mtx_lock(&res->hr_amp_lock); 1012 if (activemap_write_start(res->hr_amp, 1013 ggio->gctl_offset, ggio->gctl_length)) { 1014 (void)hast_activemap_flush(res); 1015 } 1016 mtx_unlock(&res->hr_amp_lock); 1017 /* FALLTHROUGH */ 1018 case BIO_DELETE: 1019 case BIO_FLUSH: 1020 pjdlog_debug(2, 1021 "ggate_recv: (%p) Moving request to the send queues.", 1022 hio); 1023 refcount_init(&hio->hio_countdown, ncomps); 1024 for (ii = 0; ii < ncomps; ii++) 1025 QUEUE_INSERT1(hio, send, ii); 1026 break; 1027 } 1028 } 1029 /* NOTREACHED */ 1030 return (NULL); 1031} 1032 1033/* 1034 * Thread reads from or writes to local component. 1035 * If local read fails, it redirects it to remote_send thread. 1036 */ 1037static void * 1038local_send_thread(void *arg) 1039{ 1040 struct hast_resource *res = arg; 1041 struct g_gate_ctl_io *ggio; 1042 struct hio *hio; 1043 unsigned int ncomp, rncomp; 1044 ssize_t ret; 1045 1046 /* Local component is 0 for now. */ 1047 ncomp = 0; 1048 /* Remote component is 1 for now. */ 1049 rncomp = 1; 1050 1051 for (;;) { 1052 pjdlog_debug(2, "local_send: Taking request."); 1053 QUEUE_TAKE1(hio, send, ncomp); 1054 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1055 ggio = &hio->hio_ggio; 1056 switch (ggio->gctl_cmd) { 1057 case BIO_READ: 1058 ret = pread(res->hr_localfd, ggio->gctl_data, 1059 ggio->gctl_length, 1060 ggio->gctl_offset + res->hr_localoff); 1061 if (ret == ggio->gctl_length) 1062 hio->hio_errors[ncomp] = 0; 1063 else { 1064 /* 1065 * If READ failed, try to read from remote node. 1066 */ 1067 QUEUE_INSERT1(hio, send, rncomp); 1068 continue; 1069 } 1070 break; 1071 case BIO_WRITE: 1072 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1073 ggio->gctl_length, 1074 ggio->gctl_offset + res->hr_localoff); 1075 if (ret < 0) 1076 hio->hio_errors[ncomp] = errno; 1077 else if (ret != ggio->gctl_length) 1078 hio->hio_errors[ncomp] = EIO; 1079 else 1080 hio->hio_errors[ncomp] = 0; 1081 break; 1082 case BIO_DELETE: 1083 ret = g_delete(res->hr_localfd, 1084 ggio->gctl_offset + res->hr_localoff, 1085 ggio->gctl_length); 1086 if (ret < 0) 1087 hio->hio_errors[ncomp] = errno; 1088 else 1089 hio->hio_errors[ncomp] = 0; 1090 break; 1091 case BIO_FLUSH: 1092 ret = g_flush(res->hr_localfd); 1093 if (ret < 0) 1094 hio->hio_errors[ncomp] = errno; 1095 else 1096 hio->hio_errors[ncomp] = 0; 1097 break; 1098 } 1099 if (refcount_release(&hio->hio_countdown)) { 1100 if (ISSYNCREQ(hio)) { 1101 mtx_lock(&sync_lock); 1102 SYNCREQDONE(hio); 1103 mtx_unlock(&sync_lock); 1104 cv_signal(&sync_cond); 1105 } else { 1106 pjdlog_debug(2, 1107 "local_send: (%p) Moving request to the done queue.", 1108 hio); 1109 QUEUE_INSERT2(hio, done); 1110 } 1111 } 1112 } 1113 /* NOTREACHED */ 1114 return (NULL); 1115} 1116 1117/* 1118 * Thread sends request to secondary node. 1119 */ 1120static void * 1121remote_send_thread(void *arg) 1122{ 1123 struct hast_resource *res = arg; 1124 struct g_gate_ctl_io *ggio; 1125 struct hio *hio; 1126 struct nv *nv; 1127 unsigned int ncomp; 1128 bool wakeup; 1129 uint64_t offset, length; 1130 uint8_t cmd; 1131 void *data; 1132 1133 /* Remote component is 1 for now. */ 1134 ncomp = 1; 1135 1136 for (;;) { 1137 pjdlog_debug(2, "remote_send: Taking request."); 1138 QUEUE_TAKE1(hio, send, ncomp); 1139 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1140 ggio = &hio->hio_ggio; 1141 switch (ggio->gctl_cmd) { 1142 case BIO_READ: 1143 cmd = HIO_READ; 1144 data = NULL; 1145 offset = ggio->gctl_offset; 1146 length = ggio->gctl_length; 1147 break; 1148 case BIO_WRITE: 1149 cmd = HIO_WRITE; 1150 data = ggio->gctl_data; 1151 offset = ggio->gctl_offset; 1152 length = ggio->gctl_length; 1153 break; 1154 case BIO_DELETE: 1155 cmd = HIO_DELETE; 1156 data = NULL; 1157 offset = ggio->gctl_offset; 1158 length = ggio->gctl_length; 1159 break; 1160 case BIO_FLUSH: 1161 cmd = HIO_FLUSH; 1162 data = NULL; 1163 offset = 0; 1164 length = 0; 1165 break; 1166 default: 1167 assert(!"invalid condition"); 1168 abort(); 1169 } 1170 nv = nv_alloc(); 1171 nv_add_uint8(nv, cmd, "cmd"); 1172 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1173 nv_add_uint64(nv, offset, "offset"); 1174 nv_add_uint64(nv, length, "length"); 1175 if (nv_error(nv) != 0) { 1176 hio->hio_errors[ncomp] = nv_error(nv); 1177 pjdlog_debug(2, 1178 "remote_send: (%p) Unable to prepare header to send.", 1179 hio); 1180 reqlog(LOG_ERR, 0, ggio, 1181 "Unable to prepare header to send (%s): ", 1182 strerror(nv_error(nv))); 1183 /* Move failed request immediately to the done queue. */ 1184 goto done_queue; 1185 } 1186 pjdlog_debug(2, 1187 "remote_send: (%p) Moving request to the recv queue.", 1188 hio); 1189 /* 1190 * Protect connection from disappearing. 1191 */ 1192 rw_rlock(&hio_remote_lock[ncomp]); 1193 if (!ISCONNECTED(res, ncomp)) { 1194 rw_unlock(&hio_remote_lock[ncomp]); 1195 hio->hio_errors[ncomp] = ENOTCONN; 1196 goto done_queue; 1197 } 1198 /* 1199 * Move the request to recv queue before sending it, because 1200 * in different order we can get reply before we move request 1201 * to recv queue. 1202 */ 1203 mtx_lock(&hio_recv_list_lock[ncomp]); 1204 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1205 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1206 mtx_unlock(&hio_recv_list_lock[ncomp]); 1207 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1208 data != NULL ? length : 0) < 0) { 1209 hio->hio_errors[ncomp] = errno; 1210 rw_unlock(&hio_remote_lock[ncomp]); 1211 remote_close(res, ncomp); 1212 pjdlog_debug(2, 1213 "remote_send: (%p) Unable to send request.", hio); 1214 reqlog(LOG_ERR, 0, ggio, 1215 "Unable to send request (%s): ", 1216 strerror(hio->hio_errors[ncomp])); 1217 /* 1218 * Take request back from the receive queue and move 1219 * it immediately to the done queue. 1220 */ 1221 mtx_lock(&hio_recv_list_lock[ncomp]); 1222 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1223 mtx_unlock(&hio_recv_list_lock[ncomp]); 1224 goto done_queue; 1225 } 1226 rw_unlock(&hio_remote_lock[ncomp]); 1227 nv_free(nv); 1228 if (wakeup) 1229 cv_signal(&hio_recv_list_cond[ncomp]); 1230 continue; 1231done_queue: 1232 nv_free(nv); 1233 if (ISSYNCREQ(hio)) { 1234 if (!refcount_release(&hio->hio_countdown)) 1235 continue; 1236 mtx_lock(&sync_lock); 1237 SYNCREQDONE(hio); 1238 mtx_unlock(&sync_lock); 1239 cv_signal(&sync_cond); 1240 continue; 1241 } 1242 if (ggio->gctl_cmd == BIO_WRITE) { 1243 mtx_lock(&res->hr_amp_lock); 1244 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1245 ggio->gctl_length)) { 1246 (void)hast_activemap_flush(res); 1247 } 1248 mtx_unlock(&res->hr_amp_lock); 1249 } 1250 if (!refcount_release(&hio->hio_countdown)) 1251 continue; 1252 pjdlog_debug(2, 1253 "remote_send: (%p) Moving request to the done queue.", 1254 hio); 1255 QUEUE_INSERT2(hio, done); 1256 } 1257 /* NOTREACHED */ 1258 return (NULL); 1259} 1260 1261/* 1262 * Thread receives answer from secondary node and passes it to ggate_send 1263 * thread. 1264 */ 1265static void * 1266remote_recv_thread(void *arg) 1267{ 1268 struct hast_resource *res = arg; 1269 struct g_gate_ctl_io *ggio; 1270 struct hio *hio; 1271 struct nv *nv; 1272 unsigned int ncomp; 1273 uint64_t seq; 1274 int error; 1275 1276 /* Remote component is 1 for now. */ 1277 ncomp = 1; 1278 1279 for (;;) { 1280 /* Wait until there is anything to receive. */ 1281 mtx_lock(&hio_recv_list_lock[ncomp]); 1282 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1283 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1284 cv_wait(&hio_recv_list_cond[ncomp], 1285 &hio_recv_list_lock[ncomp]); 1286 } 1287 mtx_unlock(&hio_recv_list_lock[ncomp]); 1288 rw_rlock(&hio_remote_lock[ncomp]); 1289 if (!ISCONNECTED(res, ncomp)) { 1290 rw_unlock(&hio_remote_lock[ncomp]); 1291 /* 1292 * Connection is dead, so move all pending requests to 1293 * the done queue (one-by-one). 1294 */ 1295 mtx_lock(&hio_recv_list_lock[ncomp]); 1296 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1297 assert(hio != NULL); 1298 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1299 hio_next[ncomp]); 1300 mtx_unlock(&hio_recv_list_lock[ncomp]); 1301 goto done_queue; 1302 } 1303 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1304 pjdlog_errno(LOG_ERR, 1305 "Unable to receive reply header"); 1306 rw_unlock(&hio_remote_lock[ncomp]); 1307 remote_close(res, ncomp); 1308 continue; 1309 } 1310 rw_unlock(&hio_remote_lock[ncomp]); 1311 seq = nv_get_uint64(nv, "seq"); 1312 if (seq == 0) { 1313 pjdlog_error("Header contains no 'seq' field."); 1314 nv_free(nv); 1315 continue; 1316 } 1317 mtx_lock(&hio_recv_list_lock[ncomp]); 1318 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1319 if (hio->hio_ggio.gctl_seq == seq) { 1320 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1321 hio_next[ncomp]); 1322 break; 1323 } 1324 } 1325 mtx_unlock(&hio_recv_list_lock[ncomp]); 1326 if (hio == NULL) { 1327 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1328 (uintmax_t)seq); 1329 nv_free(nv); 1330 continue; 1331 } 1332 error = nv_get_int16(nv, "error"); 1333 if (error != 0) { 1334 /* Request failed on remote side. */ 1335 hio->hio_errors[ncomp] = 0; 1336 nv_free(nv); 1337 goto done_queue; 1338 } 1339 ggio = &hio->hio_ggio; 1340 switch (ggio->gctl_cmd) { 1341 case BIO_READ: 1342 rw_rlock(&hio_remote_lock[ncomp]); 1343 if (!ISCONNECTED(res, ncomp)) { 1344 rw_unlock(&hio_remote_lock[ncomp]); 1345 nv_free(nv); 1346 goto done_queue; 1347 } 1348 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1349 ggio->gctl_data, ggio->gctl_length) < 0) { 1350 hio->hio_errors[ncomp] = errno; 1351 pjdlog_errno(LOG_ERR, 1352 "Unable to receive reply data"); 1353 rw_unlock(&hio_remote_lock[ncomp]); 1354 nv_free(nv); 1355 remote_close(res, ncomp); 1356 goto done_queue; 1357 } 1358 rw_unlock(&hio_remote_lock[ncomp]); 1359 break; 1360 case BIO_WRITE: 1361 case BIO_DELETE: 1362 case BIO_FLUSH: 1363 break; 1364 default: 1365 assert(!"invalid condition"); 1366 abort(); 1367 } 1368 hio->hio_errors[ncomp] = 0; 1369 nv_free(nv); 1370done_queue: 1371 if (refcount_release(&hio->hio_countdown)) { 1372 if (ISSYNCREQ(hio)) { 1373 mtx_lock(&sync_lock); 1374 SYNCREQDONE(hio); 1375 mtx_unlock(&sync_lock); 1376 cv_signal(&sync_cond); 1377 } else { 1378 pjdlog_debug(2, 1379 "remote_recv: (%p) Moving request to the done queue.", 1380 hio); 1381 QUEUE_INSERT2(hio, done); 1382 } 1383 } 1384 } 1385 /* NOTREACHED */ 1386 return (NULL); 1387} 1388 1389/* 1390 * Thread sends answer to the kernel. 1391 */ 1392static void * 1393ggate_send_thread(void *arg) 1394{ 1395 struct hast_resource *res = arg; 1396 struct g_gate_ctl_io *ggio; 1397 struct hio *hio; 1398 unsigned int ii, ncomp, ncomps; 1399 1400 ncomps = HAST_NCOMPONENTS; 1401 1402 for (;;) { 1403 pjdlog_debug(2, "ggate_send: Taking request."); 1404 QUEUE_TAKE2(hio, done); 1405 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1406 ggio = &hio->hio_ggio; 1407 for (ii = 0; ii < ncomps; ii++) { 1408 if (hio->hio_errors[ii] == 0) { 1409 /* 1410 * One successful request is enough to declare 1411 * success. 1412 */ 1413 ggio->gctl_error = 0; 1414 break; 1415 } 1416 } 1417 if (ii == ncomps) { 1418 /* 1419 * None of the requests were successful. 1420 * Use first error. 1421 */ 1422 ggio->gctl_error = hio->hio_errors[0]; 1423 } 1424 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1425 mtx_lock(&res->hr_amp_lock); 1426 activemap_write_complete(res->hr_amp, 1427 ggio->gctl_offset, ggio->gctl_length); 1428 mtx_unlock(&res->hr_amp_lock); 1429 } 1430 if (ggio->gctl_cmd == BIO_WRITE) { 1431 /* 1432 * Unlock range we locked. 1433 */ 1434 mtx_lock(&range_lock); 1435 rangelock_del(range_regular, ggio->gctl_offset, 1436 ggio->gctl_length); 1437 if (range_sync_wait) 1438 cv_signal(&range_sync_cond); 1439 mtx_unlock(&range_lock); 1440 /* 1441 * Bump local count if this is first write after 1442 * connection failure with remote node. 1443 */ 1444 ncomp = 1; 1445 rw_rlock(&hio_remote_lock[ncomp]); 1446 if (!ISCONNECTED(res, ncomp)) { 1447 mtx_lock(&metadata_lock); 1448 if (res->hr_primary_localcnt == 1449 res->hr_secondary_remotecnt) { 1450 res->hr_primary_localcnt++; 1451 pjdlog_debug(1, 1452 "Increasing localcnt to %ju.", 1453 (uintmax_t)res->hr_primary_localcnt); 1454 (void)metadata_write(res); 1455 } 1456 mtx_unlock(&metadata_lock); 1457 } 1458 rw_unlock(&hio_remote_lock[ncomp]); 1459 } 1460 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1461 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1462 pjdlog_debug(2, 1463 "ggate_send: (%p) Moving request to the free queue.", hio); 1464 QUEUE_INSERT2(hio, free); 1465 } 1466 /* NOTREACHED */ 1467 return (NULL); 1468} 1469 1470/* 1471 * Thread synchronize local and remote components. 1472 */ 1473static void * 1474sync_thread(void *arg __unused) 1475{ 1476 struct hast_resource *res = arg; 1477 struct hio *hio; 1478 struct g_gate_ctl_io *ggio; 1479 unsigned int ii, ncomp, ncomps; 1480 off_t offset, length, synced; 1481 bool dorewind; 1482 int syncext; 1483 1484 ncomps = HAST_NCOMPONENTS; 1485 dorewind = true; 1486 synced = 0; 1487 1488 for (;;) { 1489 mtx_lock(&sync_lock); 1490 while (!sync_inprogress) { 1491 dorewind = true; 1492 synced = 0; 1493 cv_wait(&sync_cond, &sync_lock); 1494 } 1495 mtx_unlock(&sync_lock); 1496 /* 1497 * Obtain offset at which we should synchronize. 1498 * Rewind synchronization if needed. 1499 */ 1500 mtx_lock(&res->hr_amp_lock); 1501 if (dorewind) 1502 activemap_sync_rewind(res->hr_amp); 1503 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1504 if (syncext != -1) { 1505 /* 1506 * We synchronized entire syncext extent, we can mark 1507 * it as clean now. 1508 */ 1509 if (activemap_extent_complete(res->hr_amp, syncext)) 1510 (void)hast_activemap_flush(res); 1511 } 1512 mtx_unlock(&res->hr_amp_lock); 1513 if (dorewind) { 1514 dorewind = false; 1515 if (offset < 0) 1516 pjdlog_info("Nodes are in sync."); 1517 else { 1518 pjdlog_info("Synchronization started. %ju bytes to go.", 1519 (uintmax_t)(res->hr_extentsize * 1520 activemap_ndirty(res->hr_amp))); 1521 } 1522 } 1523 if (offset < 0) { 1524 mtx_lock(&sync_lock); 1525 sync_inprogress = false; 1526 mtx_unlock(&sync_lock); 1527 pjdlog_debug(1, "Nothing to synchronize."); 1528 /* 1529 * Synchronization complete, make both localcnt and 1530 * remotecnt equal. 1531 */ 1532 ncomp = 1; 1533 rw_rlock(&hio_remote_lock[ncomp]); 1534 if (ISCONNECTED(res, ncomp)) { 1535 if (synced > 0) { 1536 pjdlog_info("Synchronization complete. " 1537 "%jd bytes synchronized.", 1538 (intmax_t)synced); 1539 } 1540 mtx_lock(&metadata_lock); 1541 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1542 res->hr_primary_localcnt = 1543 res->hr_secondary_localcnt; 1544 res->hr_primary_remotecnt = 1545 res->hr_secondary_remotecnt; 1546 pjdlog_debug(1, 1547 "Setting localcnt to %ju and remotecnt to %ju.", 1548 (uintmax_t)res->hr_primary_localcnt, 1549 (uintmax_t)res->hr_secondary_localcnt); 1550 (void)metadata_write(res); 1551 mtx_unlock(&metadata_lock); 1552 } else if (synced > 0) { 1553 pjdlog_info("Synchronization interrupted. " 1554 "%jd bytes synchronized so far.", 1555 (intmax_t)synced); 1556 } 1557 rw_unlock(&hio_remote_lock[ncomp]); 1558 continue; 1559 } 1560 pjdlog_debug(2, "sync: Taking free request."); 1561 QUEUE_TAKE2(hio, free); 1562 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1563 /* 1564 * Lock the range we are going to synchronize. We don't want 1565 * race where someone writes between our read and write. 1566 */ 1567 for (;;) { 1568 mtx_lock(&range_lock); 1569 if (rangelock_islocked(range_regular, offset, length)) { 1570 pjdlog_debug(2, 1571 "sync: Range offset=%jd length=%jd locked.", 1572 (intmax_t)offset, (intmax_t)length); 1573 range_sync_wait = true; 1574 cv_wait(&range_sync_cond, &range_lock); 1575 range_sync_wait = false; 1576 mtx_unlock(&range_lock); 1577 continue; 1578 } 1579 if (rangelock_add(range_sync, offset, length) < 0) { 1580 mtx_unlock(&range_lock); 1581 pjdlog_debug(2, 1582 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1583 (intmax_t)offset, (intmax_t)length); 1584 sleep(1); 1585 continue; 1586 } 1587 mtx_unlock(&range_lock); 1588 break; 1589 } 1590 /* 1591 * First read the data from synchronization source. 1592 */ 1593 SYNCREQ(hio); 1594 ggio = &hio->hio_ggio; 1595 ggio->gctl_cmd = BIO_READ; 1596 ggio->gctl_offset = offset; 1597 ggio->gctl_length = length; 1598 ggio->gctl_error = 0; 1599 for (ii = 0; ii < ncomps; ii++) 1600 hio->hio_errors[ii] = EINVAL; 1601 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1602 hio); 1603 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1604 hio); 1605 mtx_lock(&metadata_lock); 1606 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1607 /* 1608 * This range is up-to-date on local component, 1609 * so handle request locally. 1610 */ 1611 /* Local component is 0 for now. */ 1612 ncomp = 0; 1613 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1614 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1615 /* 1616 * This range is out-of-date on local component, 1617 * so send request to the remote node. 1618 */ 1619 /* Remote component is 1 for now. */ 1620 ncomp = 1; 1621 } 1622 mtx_unlock(&metadata_lock); 1623 refcount_init(&hio->hio_countdown, 1); 1624 QUEUE_INSERT1(hio, send, ncomp); 1625 1626 /* 1627 * Let's wait for READ to finish. 1628 */ 1629 mtx_lock(&sync_lock); 1630 while (!ISSYNCREQDONE(hio)) 1631 cv_wait(&sync_cond, &sync_lock); 1632 mtx_unlock(&sync_lock); 1633 1634 if (hio->hio_errors[ncomp] != 0) { 1635 pjdlog_error("Unable to read synchronization data: %s.", 1636 strerror(hio->hio_errors[ncomp])); 1637 goto free_queue; 1638 } 1639 1640 /* 1641 * We read the data from synchronization source, now write it 1642 * to synchronization target. 1643 */ 1644 SYNCREQ(hio); 1645 ggio->gctl_cmd = BIO_WRITE; 1646 for (ii = 0; ii < ncomps; ii++) 1647 hio->hio_errors[ii] = EINVAL; 1648 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1649 hio); 1650 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1651 hio); 1652 mtx_lock(&metadata_lock); 1653 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1654 /* 1655 * This range is up-to-date on local component, 1656 * so we update remote component. 1657 */ 1658 /* Remote component is 1 for now. */ 1659 ncomp = 1; 1660 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1661 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1662 /* 1663 * This range is out-of-date on local component, 1664 * so we update it. 1665 */ 1666 /* Local component is 0 for now. */ 1667 ncomp = 0; 1668 } 1669 mtx_unlock(&metadata_lock); 1670 1671 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1672 hio); 1673 refcount_init(&hio->hio_countdown, 1); 1674 QUEUE_INSERT1(hio, send, ncomp); 1675 1676 /* 1677 * Let's wait for WRITE to finish. 1678 */ 1679 mtx_lock(&sync_lock); 1680 while (!ISSYNCREQDONE(hio)) 1681 cv_wait(&sync_cond, &sync_lock); 1682 mtx_unlock(&sync_lock); 1683 1684 if (hio->hio_errors[ncomp] != 0) { 1685 pjdlog_error("Unable to write synchronization data: %s.", 1686 strerror(hio->hio_errors[ncomp])); 1687 goto free_queue; 1688 } 1689free_queue: 1690 mtx_lock(&range_lock); 1691 rangelock_del(range_sync, offset, length); 1692 if (range_regular_wait) 1693 cv_signal(&range_regular_cond); 1694 mtx_unlock(&range_lock); 1695 1696 synced += length; 1697 1698 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1699 hio); 1700 QUEUE_INSERT2(hio, free); 1701 } 1702 /* NOTREACHED */ 1703 return (NULL); 1704} 1705 1706static void 1707sighandler(int sig) 1708{ 1709 bool unlock; 1710 1711 switch (sig) { 1712 case SIGINT: 1713 case SIGTERM: 1714 sigexit_received = true; 1715 break; 1716 default: 1717 assert(!"invalid condition"); 1718 } 1719 /* 1720 * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't 1721 * want to risk deadlock. 1722 */ 1723 unlock = mtx_trylock(&hio_guard_lock); 1724 cv_signal(&hio_guard_cond); 1725 if (unlock) 1726 mtx_unlock(&hio_guard_lock); 1727} 1728 1729/* 1730 * Thread guards remote connections and reconnects when needed, handles 1731 * signals, etc. 1732 */ 1733static void * 1734guard_thread(void *arg) 1735{ 1736 struct hast_resource *res = arg; 1737 struct proto_conn *in, *out; 1738 unsigned int ii, ncomps; 1739 int timeout; 1740 1741 ncomps = HAST_NCOMPONENTS; 1742 /* The is only one remote component for now. */ 1743#define ISREMOTE(no) ((no) == 1) 1744 1745 for (;;) { 1746 if (sigexit_received) { 1747 primary_exitx(EX_OK, 1748 "Termination signal received, exiting."); 1749 } 1750 /* 1751 * If all the connection will be fine, we will sleep until 1752 * someone wakes us up. 1753 * If any of the connections will be broken and we won't be 1754 * able to connect, we will sleep only for RECONNECT_SLEEP 1755 * seconds so we can retry soon. 1756 */ 1757 timeout = 0; 1758 pjdlog_debug(2, "remote_guard: Checking connections."); 1759 mtx_lock(&hio_guard_lock); 1760 for (ii = 0; ii < ncomps; ii++) { 1761 if (!ISREMOTE(ii)) 1762 continue; 1763 rw_rlock(&hio_remote_lock[ii]); 1764 if (ISCONNECTED(res, ii)) { 1765 assert(res->hr_remotein != NULL); 1766 assert(res->hr_remoteout != NULL); 1767 rw_unlock(&hio_remote_lock[ii]); 1768 pjdlog_debug(2, 1769 "remote_guard: Connection to %s is ok.", 1770 res->hr_remoteaddr); 1771 } else if (real_remote(res)) { 1772 assert(res->hr_remotein == NULL); 1773 assert(res->hr_remoteout == NULL); 1774 /* 1775 * Upgrade the lock. It doesn't have to be 1776 * atomic as no other thread can change 1777 * connection status from disconnected to 1778 * connected. 1779 */ 1780 rw_unlock(&hio_remote_lock[ii]); 1781 pjdlog_debug(2, 1782 "remote_guard: Reconnecting to %s.", 1783 res->hr_remoteaddr); 1784 in = out = NULL; 1785 if (init_remote(res, &in, &out)) { 1786 rw_wlock(&hio_remote_lock[ii]); 1787 assert(res->hr_remotein == NULL); 1788 assert(res->hr_remoteout == NULL); 1789 assert(in != NULL && out != NULL); 1790 res->hr_remotein = in; 1791 res->hr_remoteout = out; 1792 rw_unlock(&hio_remote_lock[ii]); 1793 pjdlog_info("Successfully reconnected to %s.", 1794 res->hr_remoteaddr); 1795 sync_start(); 1796 } else { 1797 /* Both connections should be NULL. */ 1798 assert(res->hr_remotein == NULL); 1799 assert(res->hr_remoteout == NULL); 1800 assert(in == NULL && out == NULL); 1801 pjdlog_debug(2, 1802 "remote_guard: Reconnect to %s failed.", 1803 res->hr_remoteaddr); 1804 timeout = RECONNECT_SLEEP; 1805 } 1806 } else { 1807 rw_unlock(&hio_remote_lock[ii]); 1808 } 1809 } 1810 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1811 mtx_unlock(&hio_guard_lock); 1812 } 1813#undef ISREMOTE 1814 /* NOTREACHED */ 1815 return (NULL); 1816} 1817