primary.c revision 209181
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30#include <sys/cdefs.h> 31__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 209181 2010-06-14 21:37:25Z pjd $"); 32 33#include <sys/types.h> 34#include <sys/time.h> 35#include <sys/bio.h> 36#include <sys/disk.h> 37#include <sys/refcount.h> 38#include <sys/stat.h> 39 40#include <geom/gate/g_gate.h> 41 42#include <assert.h> 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <stdint.h> 49#include <stdio.h> 50#include <string.h> 51#include <sysexits.h> 52#include <unistd.h> 53 54#include <activemap.h> 55#include <nv.h> 56#include <rangelock.h> 57 58#include "control.h" 59#include "hast.h" 60#include "hast_proto.h" 61#include "hastd.h" 62#include "metadata.h" 63#include "proto.h" 64#include "pjdlog.h" 65#include "subr.h" 66#include "synch.h" 67 68struct hio { 69 /* 70 * Number of components we are still waiting for. 71 * When this field goes to 0, we can send the request back to the 72 * kernel. Each component has to decrease this counter by one 73 * even on failure. 74 */ 75 unsigned int hio_countdown; 76 /* 77 * Each component has a place to store its own error. 78 * Once the request is handled by all components we can decide if the 79 * request overall is successful or not. 80 */ 81 int *hio_errors; 82 /* 83 * Structure used to comunicate with GEOM Gate class. 84 */ 85 struct g_gate_ctl_io hio_ggio; 86 TAILQ_ENTRY(hio) *hio_next; 87}; 88#define hio_free_next hio_next[0] 89#define hio_done_next hio_next[0] 90 91/* 92 * Free list holds unused structures. When free list is empty, we have to wait 93 * until some in-progress requests are freed. 94 */ 95static TAILQ_HEAD(, hio) hio_free_list; 96static pthread_mutex_t hio_free_list_lock; 97static pthread_cond_t hio_free_list_cond; 98/* 99 * There is one send list for every component. One requests is placed on all 100 * send lists - each component gets the same request, but each component is 101 * responsible for managing his own send list. 102 */ 103static TAILQ_HEAD(, hio) *hio_send_list; 104static pthread_mutex_t *hio_send_list_lock; 105static pthread_cond_t *hio_send_list_cond; 106/* 107 * There is one recv list for every component, although local components don't 108 * use recv lists as local requests are done synchronously. 109 */ 110static TAILQ_HEAD(, hio) *hio_recv_list; 111static pthread_mutex_t *hio_recv_list_lock; 112static pthread_cond_t *hio_recv_list_cond; 113/* 114 * Request is placed on done list by the slowest component (the one that 115 * decreased hio_countdown from 1 to 0). 116 */ 117static TAILQ_HEAD(, hio) hio_done_list; 118static pthread_mutex_t hio_done_list_lock; 119static pthread_cond_t hio_done_list_cond; 120/* 121 * Structure below are for interaction with sync thread. 122 */ 123static bool sync_inprogress; 124static pthread_mutex_t sync_lock; 125static pthread_cond_t sync_cond; 126/* 127 * The lock below allows to synchornize access to remote connections. 128 */ 129static pthread_rwlock_t *hio_remote_lock; 130static pthread_mutex_t hio_guard_lock; 131static pthread_cond_t hio_guard_cond; 132 133/* 134 * Lock to synchronize metadata updates. Also synchronize access to 135 * hr_primary_localcnt and hr_primary_remotecnt fields. 136 */ 137static pthread_mutex_t metadata_lock; 138 139/* 140 * Maximum number of outstanding I/O requests. 141 */ 142#define HAST_HIO_MAX 256 143/* 144 * Number of components. At this point there are only two components: local 145 * and remote, but in the future it might be possible to use multiple local 146 * and remote components. 147 */ 148#define HAST_NCOMPONENTS 2 149/* 150 * Number of seconds to sleep before next reconnect try. 151 */ 152#define RECONNECT_SLEEP 5 153 154#define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167} while (0) 168#define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177} while (0) 178#define QUEUE_TAKE1(hio, name, ncomp) do { \ 179 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 180 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 181 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 182 &hio_##name##_list_lock[(ncomp)]); \ 183 } \ 184 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 185 hio_next[(ncomp)]); \ 186 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 187} while (0) 188#define QUEUE_TAKE2(hio, name) do { \ 189 mtx_lock(&hio_##name##_list_lock); \ 190 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 191 cv_wait(&hio_##name##_list_cond, \ 192 &hio_##name##_list_lock); \ 193 } \ 194 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 195 mtx_unlock(&hio_##name##_list_lock); \ 196} while (0) 197 198#define SYNCREQ(hio) do { (hio)->hio_ggio.gctl_unit = -1; } while (0) 199#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 200#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 201#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 202 203static struct hast_resource *gres; 204 205static pthread_mutex_t range_lock; 206static struct rangelocks *range_regular; 207static bool range_regular_wait; 208static pthread_cond_t range_regular_cond; 209static struct rangelocks *range_sync; 210static bool range_sync_wait; 211static pthread_cond_t range_sync_cond; 212 213static void *ggate_recv_thread(void *arg); 214static void *local_send_thread(void *arg); 215static void *remote_send_thread(void *arg); 216static void *remote_recv_thread(void *arg); 217static void *ggate_send_thread(void *arg); 218static void *sync_thread(void *arg); 219static void *guard_thread(void *arg); 220 221static void sighandler(int sig); 222 223static void 224cleanup(struct hast_resource *res) 225{ 226 int rerrno; 227 228 /* Remember errno. */ 229 rerrno = errno; 230 231 /* 232 * Close descriptor to /dev/hast/<name> 233 * to work-around race in the kernel. 234 */ 235 close(res->hr_localfd); 236 237 /* Destroy ggate provider if we created one. */ 238 if (res->hr_ggateunit >= 0) { 239 struct g_gate_ctl_destroy ggiod; 240 241 ggiod.gctl_version = G_GATE_VERSION; 242 ggiod.gctl_unit = res->hr_ggateunit; 243 ggiod.gctl_force = 1; 244 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 245 pjdlog_warning("Unable to destroy hast/%s device", 246 res->hr_provname); 247 } 248 res->hr_ggateunit = -1; 249 } 250 251 /* Restore errno. */ 252 errno = rerrno; 253} 254 255static void 256primary_exit(int exitcode, const char *fmt, ...) 257{ 258 va_list ap; 259 260 assert(exitcode != EX_OK); 261 va_start(ap, fmt); 262 pjdlogv_errno(LOG_ERR, fmt, ap); 263 va_end(ap); 264 cleanup(gres); 265 exit(exitcode); 266} 267 268static void 269primary_exitx(int exitcode, const char *fmt, ...) 270{ 271 va_list ap; 272 273 va_start(ap, fmt); 274 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 275 va_end(ap); 276 cleanup(gres); 277 exit(exitcode); 278} 279 280static int 281hast_activemap_flush(struct hast_resource *res) 282{ 283 const unsigned char *buf; 284 size_t size; 285 286 buf = activemap_bitmap(res->hr_amp, &size); 287 assert(buf != NULL); 288 assert((size % res->hr_local_sectorsize) == 0); 289 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 290 (ssize_t)size) { 291 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 292 "Unable to flush activemap to disk")); 293 return (-1); 294 } 295 return (0); 296} 297 298static void 299init_environment(struct hast_resource *res __unused) 300{ 301 struct hio *hio; 302 unsigned int ii, ncomps; 303 304 /* 305 * In the future it might be per-resource value. 306 */ 307 ncomps = HAST_NCOMPONENTS; 308 309 /* 310 * Allocate memory needed by lists. 311 */ 312 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 313 if (hio_send_list == NULL) { 314 primary_exitx(EX_TEMPFAIL, 315 "Unable to allocate %zu bytes of memory for send lists.", 316 sizeof(hio_send_list[0]) * ncomps); 317 } 318 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 319 if (hio_send_list_lock == NULL) { 320 primary_exitx(EX_TEMPFAIL, 321 "Unable to allocate %zu bytes of memory for send list locks.", 322 sizeof(hio_send_list_lock[0]) * ncomps); 323 } 324 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 325 if (hio_send_list_cond == NULL) { 326 primary_exitx(EX_TEMPFAIL, 327 "Unable to allocate %zu bytes of memory for send list condition variables.", 328 sizeof(hio_send_list_cond[0]) * ncomps); 329 } 330 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 331 if (hio_recv_list == NULL) { 332 primary_exitx(EX_TEMPFAIL, 333 "Unable to allocate %zu bytes of memory for recv lists.", 334 sizeof(hio_recv_list[0]) * ncomps); 335 } 336 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 337 if (hio_recv_list_lock == NULL) { 338 primary_exitx(EX_TEMPFAIL, 339 "Unable to allocate %zu bytes of memory for recv list locks.", 340 sizeof(hio_recv_list_lock[0]) * ncomps); 341 } 342 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 343 if (hio_recv_list_cond == NULL) { 344 primary_exitx(EX_TEMPFAIL, 345 "Unable to allocate %zu bytes of memory for recv list condition variables.", 346 sizeof(hio_recv_list_cond[0]) * ncomps); 347 } 348 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 349 if (hio_remote_lock == NULL) { 350 primary_exitx(EX_TEMPFAIL, 351 "Unable to allocate %zu bytes of memory for remote connections locks.", 352 sizeof(hio_remote_lock[0]) * ncomps); 353 } 354 355 /* 356 * Initialize lists, their locks and theirs condition variables. 357 */ 358 TAILQ_INIT(&hio_free_list); 359 mtx_init(&hio_free_list_lock); 360 cv_init(&hio_free_list_cond); 361 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 362 TAILQ_INIT(&hio_send_list[ii]); 363 mtx_init(&hio_send_list_lock[ii]); 364 cv_init(&hio_send_list_cond[ii]); 365 TAILQ_INIT(&hio_recv_list[ii]); 366 mtx_init(&hio_recv_list_lock[ii]); 367 cv_init(&hio_recv_list_cond[ii]); 368 rw_init(&hio_remote_lock[ii]); 369 } 370 TAILQ_INIT(&hio_done_list); 371 mtx_init(&hio_done_list_lock); 372 cv_init(&hio_done_list_cond); 373 mtx_init(&hio_guard_lock); 374 cv_init(&hio_guard_cond); 375 mtx_init(&metadata_lock); 376 377 /* 378 * Allocate requests pool and initialize requests. 379 */ 380 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 381 hio = malloc(sizeof(*hio)); 382 if (hio == NULL) { 383 primary_exitx(EX_TEMPFAIL, 384 "Unable to allocate %zu bytes of memory for hio request.", 385 sizeof(*hio)); 386 } 387 hio->hio_countdown = 0; 388 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 389 if (hio->hio_errors == NULL) { 390 primary_exitx(EX_TEMPFAIL, 391 "Unable allocate %zu bytes of memory for hio errors.", 392 sizeof(hio->hio_errors[0]) * ncomps); 393 } 394 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 395 if (hio->hio_next == NULL) { 396 primary_exitx(EX_TEMPFAIL, 397 "Unable allocate %zu bytes of memory for hio_next field.", 398 sizeof(hio->hio_next[0]) * ncomps); 399 } 400 hio->hio_ggio.gctl_version = G_GATE_VERSION; 401 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 402 if (hio->hio_ggio.gctl_data == NULL) { 403 primary_exitx(EX_TEMPFAIL, 404 "Unable to allocate %zu bytes of memory for gctl_data.", 405 MAXPHYS); 406 } 407 hio->hio_ggio.gctl_length = MAXPHYS; 408 hio->hio_ggio.gctl_error = 0; 409 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 410 } 411 412 /* 413 * Turn on signals handling. 414 */ 415 signal(SIGINT, sighandler); 416 signal(SIGTERM, sighandler); 417} 418 419static void 420init_local(struct hast_resource *res) 421{ 422 unsigned char *buf; 423 size_t mapsize; 424 425 if (metadata_read(res, true) < 0) 426 exit(EX_NOINPUT); 427 mtx_init(&res->hr_amp_lock); 428 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 429 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 430 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 431 } 432 mtx_init(&range_lock); 433 cv_init(&range_regular_cond); 434 if (rangelock_init(&range_regular) < 0) 435 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 436 cv_init(&range_sync_cond); 437 if (rangelock_init(&range_sync) < 0) 438 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 439 mapsize = activemap_ondisk_size(res->hr_amp); 440 buf = calloc(1, mapsize); 441 if (buf == NULL) { 442 primary_exitx(EX_TEMPFAIL, 443 "Unable to allocate buffer for activemap."); 444 } 445 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 446 (ssize_t)mapsize) { 447 primary_exit(EX_NOINPUT, "Unable to read activemap"); 448 } 449 activemap_copyin(res->hr_amp, buf, mapsize); 450 free(buf); 451 if (res->hr_resuid != 0) 452 return; 453 /* 454 * We're using provider for the first time, so we have to generate 455 * resource unique identifier and initialize local and remote counts. 456 */ 457 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 458 res->hr_primary_localcnt = 1; 459 res->hr_primary_remotecnt = 0; 460 if (metadata_write(res) < 0) 461 exit(EX_NOINPUT); 462} 463 464static bool 465init_remote(struct hast_resource *res, struct proto_conn **inp, 466 struct proto_conn **outp) 467{ 468 struct proto_conn *in, *out; 469 struct nv *nvout, *nvin; 470 const unsigned char *token; 471 unsigned char *map; 472 const char *errmsg; 473 int32_t extentsize; 474 int64_t datasize; 475 uint32_t mapsize; 476 size_t size; 477 478 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 479 480 in = out = NULL; 481 482 /* Prepare outgoing connection with remote node. */ 483 if (proto_client(res->hr_remoteaddr, &out) < 0) { 484 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 485 res->hr_remoteaddr); 486 } 487 /* Try to connect, but accept failure. */ 488 if (proto_connect(out) < 0) { 489 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 490 res->hr_remoteaddr); 491 goto close; 492 } 493 /* Error in setting timeout is not critical, but why should it fail? */ 494 if (proto_timeout(out, res->hr_timeout) < 0) 495 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 496 /* 497 * First handshake step. 498 * Setup outgoing connection with remote node. 499 */ 500 nvout = nv_alloc(); 501 nv_add_string(nvout, res->hr_name, "resource"); 502 if (nv_error(nvout) != 0) { 503 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 504 "Unable to allocate header for connection with %s", 505 res->hr_remoteaddr); 506 nv_free(nvout); 507 goto close; 508 } 509 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 510 pjdlog_errno(LOG_WARNING, 511 "Unable to send handshake header to %s", 512 res->hr_remoteaddr); 513 nv_free(nvout); 514 goto close; 515 } 516 nv_free(nvout); 517 if (hast_proto_recv_hdr(out, &nvin) < 0) { 518 pjdlog_errno(LOG_WARNING, 519 "Unable to receive handshake header from %s", 520 res->hr_remoteaddr); 521 goto close; 522 } 523 errmsg = nv_get_string(nvin, "errmsg"); 524 if (errmsg != NULL) { 525 pjdlog_warning("%s", errmsg); 526 nv_free(nvin); 527 goto close; 528 } 529 token = nv_get_uint8_array(nvin, &size, "token"); 530 if (token == NULL) { 531 pjdlog_warning("Handshake header from %s has no 'token' field.", 532 res->hr_remoteaddr); 533 nv_free(nvin); 534 goto close; 535 } 536 if (size != sizeof(res->hr_token)) { 537 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 538 res->hr_remoteaddr, size, sizeof(res->hr_token)); 539 nv_free(nvin); 540 goto close; 541 } 542 bcopy(token, res->hr_token, sizeof(res->hr_token)); 543 nv_free(nvin); 544 545 /* 546 * Second handshake step. 547 * Setup incoming connection with remote node. 548 */ 549 if (proto_client(res->hr_remoteaddr, &in) < 0) { 550 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 551 res->hr_remoteaddr); 552 } 553 /* Try to connect, but accept failure. */ 554 if (proto_connect(in) < 0) { 555 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 556 res->hr_remoteaddr); 557 goto close; 558 } 559 /* Error in setting timeout is not critical, but why should it fail? */ 560 if (proto_timeout(in, res->hr_timeout) < 0) 561 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 562 nvout = nv_alloc(); 563 nv_add_string(nvout, res->hr_name, "resource"); 564 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 565 "token"); 566 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 567 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 568 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 569 if (nv_error(nvout) != 0) { 570 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 571 "Unable to allocate header for connection with %s", 572 res->hr_remoteaddr); 573 nv_free(nvout); 574 goto close; 575 } 576 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 577 pjdlog_errno(LOG_WARNING, 578 "Unable to send handshake header to %s", 579 res->hr_remoteaddr); 580 nv_free(nvout); 581 goto close; 582 } 583 nv_free(nvout); 584 if (hast_proto_recv_hdr(out, &nvin) < 0) { 585 pjdlog_errno(LOG_WARNING, 586 "Unable to receive handshake header from %s", 587 res->hr_remoteaddr); 588 goto close; 589 } 590 errmsg = nv_get_string(nvin, "errmsg"); 591 if (errmsg != NULL) { 592 pjdlog_warning("%s", errmsg); 593 nv_free(nvin); 594 goto close; 595 } 596 datasize = nv_get_int64(nvin, "datasize"); 597 if (datasize != res->hr_datasize) { 598 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 599 (intmax_t)res->hr_datasize, (intmax_t)datasize); 600 nv_free(nvin); 601 goto close; 602 } 603 extentsize = nv_get_int32(nvin, "extentsize"); 604 if (extentsize != res->hr_extentsize) { 605 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 606 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 607 nv_free(nvin); 608 goto close; 609 } 610 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 611 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 612 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 613 map = NULL; 614 mapsize = nv_get_uint32(nvin, "mapsize"); 615 if (mapsize > 0) { 616 map = malloc(mapsize); 617 if (map == NULL) { 618 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 619 (uintmax_t)mapsize); 620 nv_free(nvin); 621 goto close; 622 } 623 /* 624 * Remote node have some dirty extents on its own, lets 625 * download its activemap. 626 */ 627 if (hast_proto_recv_data(res, out, nvin, map, 628 mapsize) < 0) { 629 pjdlog_errno(LOG_ERR, 630 "Unable to receive remote activemap"); 631 nv_free(nvin); 632 free(map); 633 goto close; 634 } 635 /* 636 * Merge local and remote bitmaps. 637 */ 638 activemap_merge(res->hr_amp, map, mapsize); 639 free(map); 640 /* 641 * Now that we merged bitmaps from both nodes, flush it to the 642 * disk before we start to synchronize. 643 */ 644 (void)hast_activemap_flush(res); 645 } 646 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 647 if (inp != NULL && outp != NULL) { 648 *inp = in; 649 *outp = out; 650 } else { 651 res->hr_remotein = in; 652 res->hr_remoteout = out; 653 } 654 return (true); 655close: 656 proto_close(out); 657 if (in != NULL) 658 proto_close(in); 659 return (false); 660} 661 662static void 663sync_start(void) 664{ 665 666 mtx_lock(&sync_lock); 667 sync_inprogress = true; 668 mtx_unlock(&sync_lock); 669 cv_signal(&sync_cond); 670} 671 672static void 673init_ggate(struct hast_resource *res) 674{ 675 struct g_gate_ctl_create ggiocreate; 676 struct g_gate_ctl_cancel ggiocancel; 677 678 /* 679 * We communicate with ggate via /dev/ggctl. Open it. 680 */ 681 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 682 if (res->hr_ggatefd < 0) 683 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 684 /* 685 * Create provider before trying to connect, as connection failure 686 * is not critical, but may take some time. 687 */ 688 ggiocreate.gctl_version = G_GATE_VERSION; 689 ggiocreate.gctl_mediasize = res->hr_datasize; 690 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 691 ggiocreate.gctl_flags = 0; 692 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 693 ggiocreate.gctl_timeout = 0; 694 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 695 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 696 res->hr_provname); 697 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 698 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 699 pjdlog_info("Device hast/%s created.", res->hr_provname); 700 res->hr_ggateunit = ggiocreate.gctl_unit; 701 return; 702 } 703 if (errno != EEXIST) { 704 primary_exit(EX_OSERR, "Unable to create hast/%s device", 705 res->hr_provname); 706 } 707 pjdlog_debug(1, 708 "Device hast/%s already exists, we will try to take it over.", 709 res->hr_provname); 710 /* 711 * If we received EEXIST, we assume that the process who created the 712 * provider died and didn't clean up. In that case we will start from 713 * where he left of. 714 */ 715 ggiocancel.gctl_version = G_GATE_VERSION; 716 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 717 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 718 res->hr_provname); 719 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 720 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 721 res->hr_ggateunit = ggiocancel.gctl_unit; 722 return; 723 } 724 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 725 res->hr_provname); 726} 727 728void 729hastd_primary(struct hast_resource *res) 730{ 731 pthread_t td; 732 pid_t pid; 733 int error; 734 735 gres = res; 736 737 /* 738 * Create communication channel between parent and child. 739 */ 740 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 741 KEEP_ERRNO((void)pidfile_remove(pfh)); 742 primary_exit(EX_OSERR, 743 "Unable to create control sockets between parent and child"); 744 } 745 746 pid = fork(); 747 if (pid < 0) { 748 KEEP_ERRNO((void)pidfile_remove(pfh)); 749 primary_exit(EX_TEMPFAIL, "Unable to fork"); 750 } 751 752 if (pid > 0) { 753 /* This is parent. */ 754 res->hr_workerpid = pid; 755 return; 756 } 757 (void)pidfile_close(pfh); 758 759 setproctitle("%s (primary)", res->hr_name); 760 761 init_local(res); 762 if (init_remote(res, NULL, NULL)) 763 sync_start(); 764 init_ggate(res); 765 init_environment(res); 766 error = pthread_create(&td, NULL, ggate_recv_thread, res); 767 assert(error == 0); 768 error = pthread_create(&td, NULL, local_send_thread, res); 769 assert(error == 0); 770 error = pthread_create(&td, NULL, remote_send_thread, res); 771 assert(error == 0); 772 error = pthread_create(&td, NULL, remote_recv_thread, res); 773 assert(error == 0); 774 error = pthread_create(&td, NULL, ggate_send_thread, res); 775 assert(error == 0); 776 error = pthread_create(&td, NULL, sync_thread, res); 777 assert(error == 0); 778 error = pthread_create(&td, NULL, ctrl_thread, res); 779 assert(error == 0); 780 (void)guard_thread(res); 781} 782 783static void 784reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 785{ 786 char msg[1024]; 787 va_list ap; 788 int len; 789 790 va_start(ap, fmt); 791 len = vsnprintf(msg, sizeof(msg), fmt, ap); 792 va_end(ap); 793 if ((size_t)len < sizeof(msg)) { 794 switch (ggio->gctl_cmd) { 795 case BIO_READ: 796 (void)snprintf(msg + len, sizeof(msg) - len, 797 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 798 (uintmax_t)ggio->gctl_length); 799 break; 800 case BIO_DELETE: 801 (void)snprintf(msg + len, sizeof(msg) - len, 802 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 803 (uintmax_t)ggio->gctl_length); 804 break; 805 case BIO_FLUSH: 806 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 807 break; 808 case BIO_WRITE: 809 (void)snprintf(msg + len, sizeof(msg) - len, 810 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 811 (uintmax_t)ggio->gctl_length); 812 break; 813 default: 814 (void)snprintf(msg + len, sizeof(msg) - len, 815 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 816 break; 817 } 818 } 819 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 820} 821 822static void 823remote_close(struct hast_resource *res, int ncomp) 824{ 825 826 rw_wlock(&hio_remote_lock[ncomp]); 827 /* 828 * A race is possible between dropping rlock and acquiring wlock - 829 * another thread can close connection in-between. 830 */ 831 if (!ISCONNECTED(res, ncomp)) { 832 assert(res->hr_remotein == NULL); 833 assert(res->hr_remoteout == NULL); 834 rw_unlock(&hio_remote_lock[ncomp]); 835 return; 836 } 837 838 assert(res->hr_remotein != NULL); 839 assert(res->hr_remoteout != NULL); 840 841 pjdlog_debug(2, "Closing old incoming connection to %s.", 842 res->hr_remoteaddr); 843 proto_close(res->hr_remotein); 844 res->hr_remotein = NULL; 845 pjdlog_debug(2, "Closing old outgoing connection to %s.", 846 res->hr_remoteaddr); 847 proto_close(res->hr_remoteout); 848 res->hr_remoteout = NULL; 849 850 rw_unlock(&hio_remote_lock[ncomp]); 851 852 /* 853 * Stop synchronization if in-progress. 854 */ 855 mtx_lock(&sync_lock); 856 if (sync_inprogress) 857 sync_inprogress = false; 858 mtx_unlock(&sync_lock); 859 860 /* 861 * Wake up guard thread, so it can immediately start reconnect. 862 */ 863 mtx_lock(&hio_guard_lock); 864 cv_signal(&hio_guard_cond); 865 mtx_unlock(&hio_guard_lock); 866} 867 868/* 869 * Thread receives ggate I/O requests from the kernel and passes them to 870 * appropriate threads: 871 * WRITE - always goes to both local_send and remote_send threads 872 * READ (when the block is up-to-date on local component) - 873 * only local_send thread 874 * READ (when the block isn't up-to-date on local component) - 875 * only remote_send thread 876 * DELETE - always goes to both local_send and remote_send threads 877 * FLUSH - always goes to both local_send and remote_send threads 878 */ 879static void * 880ggate_recv_thread(void *arg) 881{ 882 struct hast_resource *res = arg; 883 struct g_gate_ctl_io *ggio; 884 struct hio *hio; 885 unsigned int ii, ncomp, ncomps; 886 int error; 887 888 ncomps = HAST_NCOMPONENTS; 889 890 for (;;) { 891 pjdlog_debug(2, "ggate_recv: Taking free request."); 892 QUEUE_TAKE2(hio, free); 893 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 894 ggio = &hio->hio_ggio; 895 ggio->gctl_unit = res->hr_ggateunit; 896 ggio->gctl_length = MAXPHYS; 897 ggio->gctl_error = 0; 898 pjdlog_debug(2, 899 "ggate_recv: (%p) Waiting for request from the kernel.", 900 hio); 901 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 902 if (sigexit_received) 903 pthread_exit(NULL); 904 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 905 } 906 error = ggio->gctl_error; 907 switch (error) { 908 case 0: 909 break; 910 case ECANCELED: 911 /* Exit gracefully. */ 912 if (!sigexit_received) { 913 pjdlog_debug(2, 914 "ggate_recv: (%p) Received cancel from the kernel.", 915 hio); 916 pjdlog_info("Received cancel from the kernel, exiting."); 917 } 918 pthread_exit(NULL); 919 case ENOMEM: 920 /* 921 * Buffer too small? Impossible, we allocate MAXPHYS 922 * bytes - request can't be bigger than that. 923 */ 924 /* FALLTHROUGH */ 925 case ENXIO: 926 default: 927 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 928 strerror(error)); 929 } 930 for (ii = 0; ii < ncomps; ii++) 931 hio->hio_errors[ii] = EINVAL; 932 reqlog(LOG_DEBUG, 2, ggio, 933 "ggate_recv: (%p) Request received from the kernel: ", 934 hio); 935 /* 936 * Inform all components about new write request. 937 * For read request prefer local component unless the given 938 * range is out-of-date, then use remote component. 939 */ 940 switch (ggio->gctl_cmd) { 941 case BIO_READ: 942 pjdlog_debug(2, 943 "ggate_recv: (%p) Moving request to the send queue.", 944 hio); 945 refcount_init(&hio->hio_countdown, 1); 946 mtx_lock(&metadata_lock); 947 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 948 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 949 /* 950 * This range is up-to-date on local component, 951 * so handle request locally. 952 */ 953 /* Local component is 0 for now. */ 954 ncomp = 0; 955 } else /* if (res->hr_syncsrc == 956 HAST_SYNCSRC_SECONDARY) */ { 957 assert(res->hr_syncsrc == 958 HAST_SYNCSRC_SECONDARY); 959 /* 960 * This range is out-of-date on local component, 961 * so send request to the remote node. 962 */ 963 /* Remote component is 1 for now. */ 964 ncomp = 1; 965 } 966 mtx_unlock(&metadata_lock); 967 QUEUE_INSERT1(hio, send, ncomp); 968 break; 969 case BIO_WRITE: 970 for (;;) { 971 mtx_lock(&range_lock); 972 if (rangelock_islocked(range_sync, 973 ggio->gctl_offset, ggio->gctl_length)) { 974 pjdlog_debug(2, 975 "regular: Range offset=%jd length=%zu locked.", 976 (intmax_t)ggio->gctl_offset, 977 (size_t)ggio->gctl_length); 978 range_regular_wait = true; 979 cv_wait(&range_regular_cond, &range_lock); 980 range_regular_wait = false; 981 mtx_unlock(&range_lock); 982 continue; 983 } 984 if (rangelock_add(range_regular, 985 ggio->gctl_offset, ggio->gctl_length) < 0) { 986 mtx_unlock(&range_lock); 987 pjdlog_debug(2, 988 "regular: Range offset=%jd length=%zu is already locked, waiting.", 989 (intmax_t)ggio->gctl_offset, 990 (size_t)ggio->gctl_length); 991 sleep(1); 992 continue; 993 } 994 mtx_unlock(&range_lock); 995 break; 996 } 997 mtx_lock(&res->hr_amp_lock); 998 if (activemap_write_start(res->hr_amp, 999 ggio->gctl_offset, ggio->gctl_length)) { 1000 (void)hast_activemap_flush(res); 1001 } 1002 mtx_unlock(&res->hr_amp_lock); 1003 /* FALLTHROUGH */ 1004 case BIO_DELETE: 1005 case BIO_FLUSH: 1006 pjdlog_debug(2, 1007 "ggate_recv: (%p) Moving request to the send queues.", 1008 hio); 1009 refcount_init(&hio->hio_countdown, ncomps); 1010 for (ii = 0; ii < ncomps; ii++) 1011 QUEUE_INSERT1(hio, send, ii); 1012 break; 1013 } 1014 } 1015 /* NOTREACHED */ 1016 return (NULL); 1017} 1018 1019/* 1020 * Thread reads from or writes to local component. 1021 * If local read fails, it redirects it to remote_send thread. 1022 */ 1023static void * 1024local_send_thread(void *arg) 1025{ 1026 struct hast_resource *res = arg; 1027 struct g_gate_ctl_io *ggio; 1028 struct hio *hio; 1029 unsigned int ncomp, rncomp; 1030 ssize_t ret; 1031 1032 /* Local component is 0 for now. */ 1033 ncomp = 0; 1034 /* Remote component is 1 for now. */ 1035 rncomp = 1; 1036 1037 for (;;) { 1038 pjdlog_debug(2, "local_send: Taking request."); 1039 QUEUE_TAKE1(hio, send, ncomp); 1040 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1041 ggio = &hio->hio_ggio; 1042 switch (ggio->gctl_cmd) { 1043 case BIO_READ: 1044 ret = pread(res->hr_localfd, ggio->gctl_data, 1045 ggio->gctl_length, 1046 ggio->gctl_offset + res->hr_localoff); 1047 if (ret == ggio->gctl_length) 1048 hio->hio_errors[ncomp] = 0; 1049 else { 1050 /* 1051 * If READ failed, try to read from remote node. 1052 */ 1053 QUEUE_INSERT1(hio, send, rncomp); 1054 continue; 1055 } 1056 break; 1057 case BIO_WRITE: 1058 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1059 ggio->gctl_length, 1060 ggio->gctl_offset + res->hr_localoff); 1061 if (ret < 0) 1062 hio->hio_errors[ncomp] = errno; 1063 else if (ret != ggio->gctl_length) 1064 hio->hio_errors[ncomp] = EIO; 1065 else 1066 hio->hio_errors[ncomp] = 0; 1067 break; 1068 case BIO_DELETE: 1069 ret = g_delete(res->hr_localfd, 1070 ggio->gctl_offset + res->hr_localoff, 1071 ggio->gctl_length); 1072 if (ret < 0) 1073 hio->hio_errors[ncomp] = errno; 1074 else 1075 hio->hio_errors[ncomp] = 0; 1076 break; 1077 case BIO_FLUSH: 1078 ret = g_flush(res->hr_localfd); 1079 if (ret < 0) 1080 hio->hio_errors[ncomp] = errno; 1081 else 1082 hio->hio_errors[ncomp] = 0; 1083 break; 1084 } 1085 if (refcount_release(&hio->hio_countdown)) { 1086 if (ISSYNCREQ(hio)) { 1087 mtx_lock(&sync_lock); 1088 SYNCREQDONE(hio); 1089 mtx_unlock(&sync_lock); 1090 cv_signal(&sync_cond); 1091 } else { 1092 pjdlog_debug(2, 1093 "local_send: (%p) Moving request to the done queue.", 1094 hio); 1095 QUEUE_INSERT2(hio, done); 1096 } 1097 } 1098 } 1099 /* NOTREACHED */ 1100 return (NULL); 1101} 1102 1103/* 1104 * Thread sends request to secondary node. 1105 */ 1106static void * 1107remote_send_thread(void *arg) 1108{ 1109 struct hast_resource *res = arg; 1110 struct g_gate_ctl_io *ggio; 1111 struct hio *hio; 1112 struct nv *nv; 1113 unsigned int ncomp; 1114 bool wakeup; 1115 uint64_t offset, length; 1116 uint8_t cmd; 1117 void *data; 1118 1119 /* Remote component is 1 for now. */ 1120 ncomp = 1; 1121 1122 for (;;) { 1123 pjdlog_debug(2, "remote_send: Taking request."); 1124 QUEUE_TAKE1(hio, send, ncomp); 1125 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1126 ggio = &hio->hio_ggio; 1127 switch (ggio->gctl_cmd) { 1128 case BIO_READ: 1129 cmd = HIO_READ; 1130 data = NULL; 1131 offset = ggio->gctl_offset; 1132 length = ggio->gctl_length; 1133 break; 1134 case BIO_WRITE: 1135 cmd = HIO_WRITE; 1136 data = ggio->gctl_data; 1137 offset = ggio->gctl_offset; 1138 length = ggio->gctl_length; 1139 break; 1140 case BIO_DELETE: 1141 cmd = HIO_DELETE; 1142 data = NULL; 1143 offset = ggio->gctl_offset; 1144 length = ggio->gctl_length; 1145 break; 1146 case BIO_FLUSH: 1147 cmd = HIO_FLUSH; 1148 data = NULL; 1149 offset = 0; 1150 length = 0; 1151 break; 1152 default: 1153 assert(!"invalid condition"); 1154 abort(); 1155 } 1156 nv = nv_alloc(); 1157 nv_add_uint8(nv, cmd, "cmd"); 1158 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1159 nv_add_uint64(nv, offset, "offset"); 1160 nv_add_uint64(nv, length, "length"); 1161 if (nv_error(nv) != 0) { 1162 hio->hio_errors[ncomp] = nv_error(nv); 1163 pjdlog_debug(2, 1164 "remote_send: (%p) Unable to prepare header to send.", 1165 hio); 1166 reqlog(LOG_ERR, 0, ggio, 1167 "Unable to prepare header to send (%s): ", 1168 strerror(nv_error(nv))); 1169 /* Move failed request immediately to the done queue. */ 1170 goto done_queue; 1171 } 1172 pjdlog_debug(2, 1173 "remote_send: (%p) Moving request to the recv queue.", 1174 hio); 1175 /* 1176 * Protect connection from disappearing. 1177 */ 1178 rw_rlock(&hio_remote_lock[ncomp]); 1179 if (!ISCONNECTED(res, ncomp)) { 1180 rw_unlock(&hio_remote_lock[ncomp]); 1181 hio->hio_errors[ncomp] = ENOTCONN; 1182 goto done_queue; 1183 } 1184 /* 1185 * Move the request to recv queue before sending it, because 1186 * in different order we can get reply before we move request 1187 * to recv queue. 1188 */ 1189 mtx_lock(&hio_recv_list_lock[ncomp]); 1190 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1191 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1192 mtx_unlock(&hio_recv_list_lock[ncomp]); 1193 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1194 data != NULL ? length : 0) < 0) { 1195 hio->hio_errors[ncomp] = errno; 1196 rw_unlock(&hio_remote_lock[ncomp]); 1197 remote_close(res, ncomp); 1198 pjdlog_debug(2, 1199 "remote_send: (%p) Unable to send request.", hio); 1200 reqlog(LOG_ERR, 0, ggio, 1201 "Unable to send request (%s): ", 1202 strerror(hio->hio_errors[ncomp])); 1203 /* 1204 * Take request back from the receive queue and move 1205 * it immediately to the done queue. 1206 */ 1207 mtx_lock(&hio_recv_list_lock[ncomp]); 1208 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1209 mtx_unlock(&hio_recv_list_lock[ncomp]); 1210 goto done_queue; 1211 } 1212 rw_unlock(&hio_remote_lock[ncomp]); 1213 nv_free(nv); 1214 if (wakeup) 1215 cv_signal(&hio_recv_list_cond[ncomp]); 1216 continue; 1217done_queue: 1218 nv_free(nv); 1219 if (ISSYNCREQ(hio)) { 1220 if (!refcount_release(&hio->hio_countdown)) 1221 continue; 1222 mtx_lock(&sync_lock); 1223 SYNCREQDONE(hio); 1224 mtx_unlock(&sync_lock); 1225 cv_signal(&sync_cond); 1226 continue; 1227 } 1228 if (ggio->gctl_cmd == BIO_WRITE) { 1229 mtx_lock(&res->hr_amp_lock); 1230 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1231 ggio->gctl_length)) { 1232 (void)hast_activemap_flush(res); 1233 } 1234 mtx_unlock(&res->hr_amp_lock); 1235 } 1236 if (!refcount_release(&hio->hio_countdown)) 1237 continue; 1238 pjdlog_debug(2, 1239 "remote_send: (%p) Moving request to the done queue.", 1240 hio); 1241 QUEUE_INSERT2(hio, done); 1242 } 1243 /* NOTREACHED */ 1244 return (NULL); 1245} 1246 1247/* 1248 * Thread receives answer from secondary node and passes it to ggate_send 1249 * thread. 1250 */ 1251static void * 1252remote_recv_thread(void *arg) 1253{ 1254 struct hast_resource *res = arg; 1255 struct g_gate_ctl_io *ggio; 1256 struct hio *hio; 1257 struct nv *nv; 1258 unsigned int ncomp; 1259 uint64_t seq; 1260 int error; 1261 1262 /* Remote component is 1 for now. */ 1263 ncomp = 1; 1264 1265 for (;;) { 1266 /* Wait until there is anything to receive. */ 1267 mtx_lock(&hio_recv_list_lock[ncomp]); 1268 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1269 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1270 cv_wait(&hio_recv_list_cond[ncomp], 1271 &hio_recv_list_lock[ncomp]); 1272 } 1273 mtx_unlock(&hio_recv_list_lock[ncomp]); 1274 rw_rlock(&hio_remote_lock[ncomp]); 1275 if (!ISCONNECTED(res, ncomp)) { 1276 rw_unlock(&hio_remote_lock[ncomp]); 1277 /* 1278 * Connection is dead, so move all pending requests to 1279 * the done queue (one-by-one). 1280 */ 1281 mtx_lock(&hio_recv_list_lock[ncomp]); 1282 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1283 assert(hio != NULL); 1284 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1285 hio_next[ncomp]); 1286 mtx_unlock(&hio_recv_list_lock[ncomp]); 1287 goto done_queue; 1288 } 1289 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1290 pjdlog_errno(LOG_ERR, 1291 "Unable to receive reply header"); 1292 rw_unlock(&hio_remote_lock[ncomp]); 1293 remote_close(res, ncomp); 1294 continue; 1295 } 1296 rw_unlock(&hio_remote_lock[ncomp]); 1297 seq = nv_get_uint64(nv, "seq"); 1298 if (seq == 0) { 1299 pjdlog_error("Header contains no 'seq' field."); 1300 nv_free(nv); 1301 continue; 1302 } 1303 mtx_lock(&hio_recv_list_lock[ncomp]); 1304 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1305 if (hio->hio_ggio.gctl_seq == seq) { 1306 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1307 hio_next[ncomp]); 1308 break; 1309 } 1310 } 1311 mtx_unlock(&hio_recv_list_lock[ncomp]); 1312 if (hio == NULL) { 1313 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1314 (uintmax_t)seq); 1315 nv_free(nv); 1316 continue; 1317 } 1318 error = nv_get_int16(nv, "error"); 1319 if (error != 0) { 1320 /* Request failed on remote side. */ 1321 hio->hio_errors[ncomp] = 0; 1322 nv_free(nv); 1323 goto done_queue; 1324 } 1325 ggio = &hio->hio_ggio; 1326 switch (ggio->gctl_cmd) { 1327 case BIO_READ: 1328 rw_rlock(&hio_remote_lock[ncomp]); 1329 if (!ISCONNECTED(res, ncomp)) { 1330 rw_unlock(&hio_remote_lock[ncomp]); 1331 nv_free(nv); 1332 goto done_queue; 1333 } 1334 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1335 ggio->gctl_data, ggio->gctl_length) < 0) { 1336 hio->hio_errors[ncomp] = errno; 1337 pjdlog_errno(LOG_ERR, 1338 "Unable to receive reply data"); 1339 rw_unlock(&hio_remote_lock[ncomp]); 1340 nv_free(nv); 1341 remote_close(res, ncomp); 1342 goto done_queue; 1343 } 1344 rw_unlock(&hio_remote_lock[ncomp]); 1345 break; 1346 case BIO_WRITE: 1347 case BIO_DELETE: 1348 case BIO_FLUSH: 1349 break; 1350 default: 1351 assert(!"invalid condition"); 1352 abort(); 1353 } 1354 hio->hio_errors[ncomp] = 0; 1355 nv_free(nv); 1356done_queue: 1357 if (refcount_release(&hio->hio_countdown)) { 1358 if (ISSYNCREQ(hio)) { 1359 mtx_lock(&sync_lock); 1360 SYNCREQDONE(hio); 1361 mtx_unlock(&sync_lock); 1362 cv_signal(&sync_cond); 1363 } else { 1364 pjdlog_debug(2, 1365 "remote_recv: (%p) Moving request to the done queue.", 1366 hio); 1367 QUEUE_INSERT2(hio, done); 1368 } 1369 } 1370 } 1371 /* NOTREACHED */ 1372 return (NULL); 1373} 1374 1375/* 1376 * Thread sends answer to the kernel. 1377 */ 1378static void * 1379ggate_send_thread(void *arg) 1380{ 1381 struct hast_resource *res = arg; 1382 struct g_gate_ctl_io *ggio; 1383 struct hio *hio; 1384 unsigned int ii, ncomp, ncomps; 1385 1386 ncomps = HAST_NCOMPONENTS; 1387 1388 for (;;) { 1389 pjdlog_debug(2, "ggate_send: Taking request."); 1390 QUEUE_TAKE2(hio, done); 1391 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1392 ggio = &hio->hio_ggio; 1393 for (ii = 0; ii < ncomps; ii++) { 1394 if (hio->hio_errors[ii] == 0) { 1395 /* 1396 * One successful request is enough to declare 1397 * success. 1398 */ 1399 ggio->gctl_error = 0; 1400 break; 1401 } 1402 } 1403 if (ii == ncomps) { 1404 /* 1405 * None of the requests were successful. 1406 * Use first error. 1407 */ 1408 ggio->gctl_error = hio->hio_errors[0]; 1409 } 1410 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1411 mtx_lock(&res->hr_amp_lock); 1412 activemap_write_complete(res->hr_amp, 1413 ggio->gctl_offset, ggio->gctl_length); 1414 mtx_unlock(&res->hr_amp_lock); 1415 } 1416 if (ggio->gctl_cmd == BIO_WRITE) { 1417 /* 1418 * Unlock range we locked. 1419 */ 1420 mtx_lock(&range_lock); 1421 rangelock_del(range_regular, ggio->gctl_offset, 1422 ggio->gctl_length); 1423 if (range_sync_wait) 1424 cv_signal(&range_sync_cond); 1425 mtx_unlock(&range_lock); 1426 /* 1427 * Bump local count if this is first write after 1428 * connection failure with remote node. 1429 */ 1430 ncomp = 1; 1431 rw_rlock(&hio_remote_lock[ncomp]); 1432 if (!ISCONNECTED(res, ncomp)) { 1433 mtx_lock(&metadata_lock); 1434 if (res->hr_primary_localcnt == 1435 res->hr_secondary_remotecnt) { 1436 res->hr_primary_localcnt++; 1437 pjdlog_debug(1, 1438 "Increasing localcnt to %ju.", 1439 (uintmax_t)res->hr_primary_localcnt); 1440 (void)metadata_write(res); 1441 } 1442 mtx_unlock(&metadata_lock); 1443 } 1444 rw_unlock(&hio_remote_lock[ncomp]); 1445 } 1446 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1447 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1448 pjdlog_debug(2, 1449 "ggate_send: (%p) Moving request to the free queue.", hio); 1450 QUEUE_INSERT2(hio, free); 1451 } 1452 /* NOTREACHED */ 1453 return (NULL); 1454} 1455 1456/* 1457 * Thread synchronize local and remote components. 1458 */ 1459static void * 1460sync_thread(void *arg __unused) 1461{ 1462 struct hast_resource *res = arg; 1463 struct hio *hio; 1464 struct g_gate_ctl_io *ggio; 1465 unsigned int ii, ncomp, ncomps; 1466 off_t offset, length, synced; 1467 bool dorewind; 1468 int syncext; 1469 1470 ncomps = HAST_NCOMPONENTS; 1471 dorewind = true; 1472 synced = 0; 1473 1474 for (;;) { 1475 mtx_lock(&sync_lock); 1476 while (!sync_inprogress) { 1477 dorewind = true; 1478 synced = 0; 1479 cv_wait(&sync_cond, &sync_lock); 1480 } 1481 mtx_unlock(&sync_lock); 1482 /* 1483 * Obtain offset at which we should synchronize. 1484 * Rewind synchronization if needed. 1485 */ 1486 mtx_lock(&res->hr_amp_lock); 1487 if (dorewind) 1488 activemap_sync_rewind(res->hr_amp); 1489 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1490 if (syncext != -1) { 1491 /* 1492 * We synchronized entire syncext extent, we can mark 1493 * it as clean now. 1494 */ 1495 if (activemap_extent_complete(res->hr_amp, syncext)) 1496 (void)hast_activemap_flush(res); 1497 } 1498 mtx_unlock(&res->hr_amp_lock); 1499 if (dorewind) { 1500 dorewind = false; 1501 if (offset < 0) 1502 pjdlog_info("Nodes are in sync."); 1503 else { 1504 pjdlog_info("Synchronization started. %ju bytes to go.", 1505 (uintmax_t)(res->hr_extentsize * 1506 activemap_ndirty(res->hr_amp))); 1507 } 1508 } 1509 if (offset < 0) { 1510 mtx_lock(&sync_lock); 1511 sync_inprogress = false; 1512 mtx_unlock(&sync_lock); 1513 pjdlog_debug(1, "Nothing to synchronize."); 1514 /* 1515 * Synchronization complete, make both localcnt and 1516 * remotecnt equal. 1517 */ 1518 ncomp = 1; 1519 rw_rlock(&hio_remote_lock[ncomp]); 1520 if (ISCONNECTED(res, ncomp)) { 1521 if (synced > 0) { 1522 pjdlog_info("Synchronization complete. " 1523 "%jd bytes synchronized.", 1524 (intmax_t)synced); 1525 } 1526 mtx_lock(&metadata_lock); 1527 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1528 res->hr_primary_localcnt = 1529 res->hr_secondary_localcnt; 1530 res->hr_primary_remotecnt = 1531 res->hr_secondary_remotecnt; 1532 pjdlog_debug(1, 1533 "Setting localcnt to %ju and remotecnt to %ju.", 1534 (uintmax_t)res->hr_primary_localcnt, 1535 (uintmax_t)res->hr_secondary_localcnt); 1536 (void)metadata_write(res); 1537 mtx_unlock(&metadata_lock); 1538 } else if (synced > 0) { 1539 pjdlog_info("Synchronization interrupted. " 1540 "%jd bytes synchronized so far.", 1541 (intmax_t)synced); 1542 } 1543 rw_unlock(&hio_remote_lock[ncomp]); 1544 continue; 1545 } 1546 pjdlog_debug(2, "sync: Taking free request."); 1547 QUEUE_TAKE2(hio, free); 1548 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1549 /* 1550 * Lock the range we are going to synchronize. We don't want 1551 * race where someone writes between our read and write. 1552 */ 1553 for (;;) { 1554 mtx_lock(&range_lock); 1555 if (rangelock_islocked(range_regular, offset, length)) { 1556 pjdlog_debug(2, 1557 "sync: Range offset=%jd length=%jd locked.", 1558 (intmax_t)offset, (intmax_t)length); 1559 range_sync_wait = true; 1560 cv_wait(&range_sync_cond, &range_lock); 1561 range_sync_wait = false; 1562 mtx_unlock(&range_lock); 1563 continue; 1564 } 1565 if (rangelock_add(range_sync, offset, length) < 0) { 1566 mtx_unlock(&range_lock); 1567 pjdlog_debug(2, 1568 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1569 (intmax_t)offset, (intmax_t)length); 1570 sleep(1); 1571 continue; 1572 } 1573 mtx_unlock(&range_lock); 1574 break; 1575 } 1576 /* 1577 * First read the data from synchronization source. 1578 */ 1579 SYNCREQ(hio); 1580 ggio = &hio->hio_ggio; 1581 ggio->gctl_cmd = BIO_READ; 1582 ggio->gctl_offset = offset; 1583 ggio->gctl_length = length; 1584 ggio->gctl_error = 0; 1585 for (ii = 0; ii < ncomps; ii++) 1586 hio->hio_errors[ii] = EINVAL; 1587 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1588 hio); 1589 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1590 hio); 1591 mtx_lock(&metadata_lock); 1592 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1593 /* 1594 * This range is up-to-date on local component, 1595 * so handle request locally. 1596 */ 1597 /* Local component is 0 for now. */ 1598 ncomp = 0; 1599 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1600 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1601 /* 1602 * This range is out-of-date on local component, 1603 * so send request to the remote node. 1604 */ 1605 /* Remote component is 1 for now. */ 1606 ncomp = 1; 1607 } 1608 mtx_unlock(&metadata_lock); 1609 refcount_init(&hio->hio_countdown, 1); 1610 QUEUE_INSERT1(hio, send, ncomp); 1611 1612 /* 1613 * Let's wait for READ to finish. 1614 */ 1615 mtx_lock(&sync_lock); 1616 while (!ISSYNCREQDONE(hio)) 1617 cv_wait(&sync_cond, &sync_lock); 1618 mtx_unlock(&sync_lock); 1619 1620 if (hio->hio_errors[ncomp] != 0) { 1621 pjdlog_error("Unable to read synchronization data: %s.", 1622 strerror(hio->hio_errors[ncomp])); 1623 goto free_queue; 1624 } 1625 1626 /* 1627 * We read the data from synchronization source, now write it 1628 * to synchronization target. 1629 */ 1630 SYNCREQ(hio); 1631 ggio->gctl_cmd = BIO_WRITE; 1632 for (ii = 0; ii < ncomps; ii++) 1633 hio->hio_errors[ii] = EINVAL; 1634 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1635 hio); 1636 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1637 hio); 1638 mtx_lock(&metadata_lock); 1639 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1640 /* 1641 * This range is up-to-date on local component, 1642 * so we update remote component. 1643 */ 1644 /* Remote component is 1 for now. */ 1645 ncomp = 1; 1646 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1647 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1648 /* 1649 * This range is out-of-date on local component, 1650 * so we update it. 1651 */ 1652 /* Local component is 0 for now. */ 1653 ncomp = 0; 1654 } 1655 mtx_unlock(&metadata_lock); 1656 1657 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1658 hio); 1659 refcount_init(&hio->hio_countdown, 1); 1660 QUEUE_INSERT1(hio, send, ncomp); 1661 1662 /* 1663 * Let's wait for WRITE to finish. 1664 */ 1665 mtx_lock(&sync_lock); 1666 while (!ISSYNCREQDONE(hio)) 1667 cv_wait(&sync_cond, &sync_lock); 1668 mtx_unlock(&sync_lock); 1669 1670 if (hio->hio_errors[ncomp] != 0) { 1671 pjdlog_error("Unable to write synchronization data: %s.", 1672 strerror(hio->hio_errors[ncomp])); 1673 goto free_queue; 1674 } 1675free_queue: 1676 mtx_lock(&range_lock); 1677 rangelock_del(range_sync, offset, length); 1678 if (range_regular_wait) 1679 cv_signal(&range_regular_cond); 1680 mtx_unlock(&range_lock); 1681 1682 synced += length; 1683 1684 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1685 hio); 1686 QUEUE_INSERT2(hio, free); 1687 } 1688 /* NOTREACHED */ 1689 return (NULL); 1690} 1691 1692static void 1693sighandler(int sig) 1694{ 1695 bool unlock; 1696 1697 switch (sig) { 1698 case SIGINT: 1699 case SIGTERM: 1700 sigexit_received = true; 1701 break; 1702 default: 1703 assert(!"invalid condition"); 1704 } 1705 /* 1706 * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't 1707 * want to risk deadlock. 1708 */ 1709 unlock = mtx_trylock(&hio_guard_lock); 1710 cv_signal(&hio_guard_cond); 1711 if (unlock) 1712 mtx_unlock(&hio_guard_lock); 1713} 1714 1715/* 1716 * Thread guards remote connections and reconnects when needed, handles 1717 * signals, etc. 1718 */ 1719static void * 1720guard_thread(void *arg) 1721{ 1722 struct hast_resource *res = arg; 1723 struct proto_conn *in, *out; 1724 unsigned int ii, ncomps; 1725 int timeout; 1726 1727 ncomps = HAST_NCOMPONENTS; 1728 /* The is only one remote component for now. */ 1729#define ISREMOTE(no) ((no) == 1) 1730 1731 for (;;) { 1732 if (sigexit_received) { 1733 primary_exitx(EX_OK, 1734 "Termination signal received, exiting."); 1735 } 1736 /* 1737 * If all the connection will be fine, we will sleep until 1738 * someone wakes us up. 1739 * If any of the connections will be broken and we won't be 1740 * able to connect, we will sleep only for RECONNECT_SLEEP 1741 * seconds so we can retry soon. 1742 */ 1743 timeout = 0; 1744 pjdlog_debug(2, "remote_guard: Checking connections."); 1745 mtx_lock(&hio_guard_lock); 1746 for (ii = 0; ii < ncomps; ii++) { 1747 if (!ISREMOTE(ii)) 1748 continue; 1749 rw_rlock(&hio_remote_lock[ii]); 1750 if (ISCONNECTED(res, ii)) { 1751 assert(res->hr_remotein != NULL); 1752 assert(res->hr_remoteout != NULL); 1753 rw_unlock(&hio_remote_lock[ii]); 1754 pjdlog_debug(2, 1755 "remote_guard: Connection to %s is ok.", 1756 res->hr_remoteaddr); 1757 } else { 1758 assert(res->hr_remotein == NULL); 1759 assert(res->hr_remoteout == NULL); 1760 /* 1761 * Upgrade the lock. It doesn't have to be 1762 * atomic as no other thread can change 1763 * connection status from disconnected to 1764 * connected. 1765 */ 1766 rw_unlock(&hio_remote_lock[ii]); 1767 pjdlog_debug(2, 1768 "remote_guard: Reconnecting to %s.", 1769 res->hr_remoteaddr); 1770 in = out = NULL; 1771 if (init_remote(res, &in, &out)) { 1772 rw_wlock(&hio_remote_lock[ii]); 1773 assert(res->hr_remotein == NULL); 1774 assert(res->hr_remoteout == NULL); 1775 assert(in != NULL && out != NULL); 1776 res->hr_remotein = in; 1777 res->hr_remoteout = out; 1778 rw_unlock(&hio_remote_lock[ii]); 1779 pjdlog_info("Successfully reconnected to %s.", 1780 res->hr_remoteaddr); 1781 sync_start(); 1782 } else { 1783 /* Both connections should be NULL. */ 1784 assert(res->hr_remotein == NULL); 1785 assert(res->hr_remoteout == NULL); 1786 assert(in == NULL && out == NULL); 1787 pjdlog_debug(2, 1788 "remote_guard: Reconnect to %s failed.", 1789 res->hr_remoteaddr); 1790 timeout = RECONNECT_SLEEP; 1791 } 1792 } 1793 } 1794 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1795 mtx_unlock(&hio_guard_lock); 1796 } 1797#undef ISREMOTE 1798 /* NOTREACHED */ 1799 return (NULL); 1800} 1801