primary.c revision 210880
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30#include <sys/cdefs.h> 31__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 210880 2010-08-05 18:58:00Z pjd $"); 32 33#include <sys/types.h> 34#include <sys/time.h> 35#include <sys/bio.h> 36#include <sys/disk.h> 37#include <sys/refcount.h> 38#include <sys/stat.h> 39 40#include <geom/gate/g_gate.h> 41 42#include <assert.h> 43#include <err.h> 44#include <errno.h> 45#include <fcntl.h> 46#include <libgeom.h> 47#include <pthread.h> 48#include <stdint.h> 49#include <stdio.h> 50#include <string.h> 51#include <sysexits.h> 52#include <unistd.h> 53 54#include <activemap.h> 55#include <nv.h> 56#include <rangelock.h> 57 58#include "control.h" 59#include "hast.h" 60#include "hast_proto.h" 61#include "hastd.h" 62#include "metadata.h" 63#include "proto.h" 64#include "pjdlog.h" 65#include "subr.h" 66#include "synch.h" 67 68struct hio { 69 /* 70 * Number of components we are still waiting for. 71 * When this field goes to 0, we can send the request back to the 72 * kernel. Each component has to decrease this counter by one 73 * even on failure. 74 */ 75 unsigned int hio_countdown; 76 /* 77 * Each component has a place to store its own error. 78 * Once the request is handled by all components we can decide if the 79 * request overall is successful or not. 80 */ 81 int *hio_errors; 82 /* 83 * Structure used to comunicate with GEOM Gate class. 84 */ 85 struct g_gate_ctl_io hio_ggio; 86 TAILQ_ENTRY(hio) *hio_next; 87}; 88#define hio_free_next hio_next[0] 89#define hio_done_next hio_next[0] 90 91/* 92 * Free list holds unused structures. When free list is empty, we have to wait 93 * until some in-progress requests are freed. 94 */ 95static TAILQ_HEAD(, hio) hio_free_list; 96static pthread_mutex_t hio_free_list_lock; 97static pthread_cond_t hio_free_list_cond; 98/* 99 * There is one send list for every component. One requests is placed on all 100 * send lists - each component gets the same request, but each component is 101 * responsible for managing his own send list. 102 */ 103static TAILQ_HEAD(, hio) *hio_send_list; 104static pthread_mutex_t *hio_send_list_lock; 105static pthread_cond_t *hio_send_list_cond; 106/* 107 * There is one recv list for every component, although local components don't 108 * use recv lists as local requests are done synchronously. 109 */ 110static TAILQ_HEAD(, hio) *hio_recv_list; 111static pthread_mutex_t *hio_recv_list_lock; 112static pthread_cond_t *hio_recv_list_cond; 113/* 114 * Request is placed on done list by the slowest component (the one that 115 * decreased hio_countdown from 1 to 0). 116 */ 117static TAILQ_HEAD(, hio) hio_done_list; 118static pthread_mutex_t hio_done_list_lock; 119static pthread_cond_t hio_done_list_cond; 120/* 121 * Structure below are for interaction with sync thread. 122 */ 123static bool sync_inprogress; 124static pthread_mutex_t sync_lock; 125static pthread_cond_t sync_cond; 126/* 127 * The lock below allows to synchornize access to remote connections. 128 */ 129static pthread_rwlock_t *hio_remote_lock; 130static pthread_mutex_t hio_guard_lock; 131static pthread_cond_t hio_guard_cond; 132 133/* 134 * Lock to synchronize metadata updates. Also synchronize access to 135 * hr_primary_localcnt and hr_primary_remotecnt fields. 136 */ 137static pthread_mutex_t metadata_lock; 138 139/* 140 * Maximum number of outstanding I/O requests. 141 */ 142#define HAST_HIO_MAX 256 143/* 144 * Number of components. At this point there are only two components: local 145 * and remote, but in the future it might be possible to use multiple local 146 * and remote components. 147 */ 148#define HAST_NCOMPONENTS 2 149/* 150 * Number of seconds to sleep before next reconnect try. 151 */ 152#define RECONNECT_SLEEP 5 153 154#define ISCONNECTED(res, no) \ 155 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156 157#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158 bool _wakeup; \ 159 \ 160 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163 hio_next[(ncomp)]); \ 164 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165 if (_wakeup) \ 166 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167} while (0) 168#define QUEUE_INSERT2(hio, name) do { \ 169 bool _wakeup; \ 170 \ 171 mtx_lock(&hio_##name##_list_lock); \ 172 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174 mtx_unlock(&hio_##name##_list_lock); \ 175 if (_wakeup) \ 176 cv_signal(&hio_##name##_list_cond); \ 177} while (0) 178#define QUEUE_TAKE1(hio, name, ncomp) do { \ 179 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 180 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 181 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 182 &hio_##name##_list_lock[(ncomp)]); \ 183 } \ 184 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 185 hio_next[(ncomp)]); \ 186 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 187} while (0) 188#define QUEUE_TAKE2(hio, name) do { \ 189 mtx_lock(&hio_##name##_list_lock); \ 190 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 191 cv_wait(&hio_##name##_list_cond, \ 192 &hio_##name##_list_lock); \ 193 } \ 194 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 195 mtx_unlock(&hio_##name##_list_lock); \ 196} while (0) 197 198#define SYNCREQ(hio) do { \ 199 (hio)->hio_ggio.gctl_unit = -1; \ 200 (hio)->hio_ggio.gctl_seq = 1; \ 201} while (0) 202#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 203#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 204#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 205 206static struct hast_resource *gres; 207 208static pthread_mutex_t range_lock; 209static struct rangelocks *range_regular; 210static bool range_regular_wait; 211static pthread_cond_t range_regular_cond; 212static struct rangelocks *range_sync; 213static bool range_sync_wait; 214static pthread_cond_t range_sync_cond; 215 216static void *ggate_recv_thread(void *arg); 217static void *local_send_thread(void *arg); 218static void *remote_send_thread(void *arg); 219static void *remote_recv_thread(void *arg); 220static void *ggate_send_thread(void *arg); 221static void *sync_thread(void *arg); 222static void *guard_thread(void *arg); 223 224static void sighandler(int sig); 225 226static void 227cleanup(struct hast_resource *res) 228{ 229 int rerrno; 230 231 /* Remember errno. */ 232 rerrno = errno; 233 234 /* 235 * Close descriptor to /dev/hast/<name> 236 * to work-around race in the kernel. 237 */ 238 close(res->hr_localfd); 239 240 /* Destroy ggate provider if we created one. */ 241 if (res->hr_ggateunit >= 0) { 242 struct g_gate_ctl_destroy ggiod; 243 244 ggiod.gctl_version = G_GATE_VERSION; 245 ggiod.gctl_unit = res->hr_ggateunit; 246 ggiod.gctl_force = 1; 247 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 248 pjdlog_warning("Unable to destroy hast/%s device", 249 res->hr_provname); 250 } 251 res->hr_ggateunit = -1; 252 } 253 254 /* Restore errno. */ 255 errno = rerrno; 256} 257 258static void 259primary_exit(int exitcode, const char *fmt, ...) 260{ 261 va_list ap; 262 263 assert(exitcode != EX_OK); 264 va_start(ap, fmt); 265 pjdlogv_errno(LOG_ERR, fmt, ap); 266 va_end(ap); 267 cleanup(gres); 268 exit(exitcode); 269} 270 271static void 272primary_exitx(int exitcode, const char *fmt, ...) 273{ 274 va_list ap; 275 276 va_start(ap, fmt); 277 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 278 va_end(ap); 279 cleanup(gres); 280 exit(exitcode); 281} 282 283static int 284hast_activemap_flush(struct hast_resource *res) 285{ 286 const unsigned char *buf; 287 size_t size; 288 289 buf = activemap_bitmap(res->hr_amp, &size); 290 assert(buf != NULL); 291 assert((size % res->hr_local_sectorsize) == 0); 292 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 293 (ssize_t)size) { 294 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 295 "Unable to flush activemap to disk")); 296 return (-1); 297 } 298 return (0); 299} 300 301static void 302init_environment(struct hast_resource *res __unused) 303{ 304 struct hio *hio; 305 unsigned int ii, ncomps; 306 307 /* 308 * In the future it might be per-resource value. 309 */ 310 ncomps = HAST_NCOMPONENTS; 311 312 /* 313 * Allocate memory needed by lists. 314 */ 315 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 316 if (hio_send_list == NULL) { 317 primary_exitx(EX_TEMPFAIL, 318 "Unable to allocate %zu bytes of memory for send lists.", 319 sizeof(hio_send_list[0]) * ncomps); 320 } 321 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 322 if (hio_send_list_lock == NULL) { 323 primary_exitx(EX_TEMPFAIL, 324 "Unable to allocate %zu bytes of memory for send list locks.", 325 sizeof(hio_send_list_lock[0]) * ncomps); 326 } 327 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 328 if (hio_send_list_cond == NULL) { 329 primary_exitx(EX_TEMPFAIL, 330 "Unable to allocate %zu bytes of memory for send list condition variables.", 331 sizeof(hio_send_list_cond[0]) * ncomps); 332 } 333 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 334 if (hio_recv_list == NULL) { 335 primary_exitx(EX_TEMPFAIL, 336 "Unable to allocate %zu bytes of memory for recv lists.", 337 sizeof(hio_recv_list[0]) * ncomps); 338 } 339 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 340 if (hio_recv_list_lock == NULL) { 341 primary_exitx(EX_TEMPFAIL, 342 "Unable to allocate %zu bytes of memory for recv list locks.", 343 sizeof(hio_recv_list_lock[0]) * ncomps); 344 } 345 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 346 if (hio_recv_list_cond == NULL) { 347 primary_exitx(EX_TEMPFAIL, 348 "Unable to allocate %zu bytes of memory for recv list condition variables.", 349 sizeof(hio_recv_list_cond[0]) * ncomps); 350 } 351 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 352 if (hio_remote_lock == NULL) { 353 primary_exitx(EX_TEMPFAIL, 354 "Unable to allocate %zu bytes of memory for remote connections locks.", 355 sizeof(hio_remote_lock[0]) * ncomps); 356 } 357 358 /* 359 * Initialize lists, their locks and theirs condition variables. 360 */ 361 TAILQ_INIT(&hio_free_list); 362 mtx_init(&hio_free_list_lock); 363 cv_init(&hio_free_list_cond); 364 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 365 TAILQ_INIT(&hio_send_list[ii]); 366 mtx_init(&hio_send_list_lock[ii]); 367 cv_init(&hio_send_list_cond[ii]); 368 TAILQ_INIT(&hio_recv_list[ii]); 369 mtx_init(&hio_recv_list_lock[ii]); 370 cv_init(&hio_recv_list_cond[ii]); 371 rw_init(&hio_remote_lock[ii]); 372 } 373 TAILQ_INIT(&hio_done_list); 374 mtx_init(&hio_done_list_lock); 375 cv_init(&hio_done_list_cond); 376 mtx_init(&hio_guard_lock); 377 cv_init(&hio_guard_cond); 378 mtx_init(&metadata_lock); 379 380 /* 381 * Allocate requests pool and initialize requests. 382 */ 383 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 384 hio = malloc(sizeof(*hio)); 385 if (hio == NULL) { 386 primary_exitx(EX_TEMPFAIL, 387 "Unable to allocate %zu bytes of memory for hio request.", 388 sizeof(*hio)); 389 } 390 hio->hio_countdown = 0; 391 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 392 if (hio->hio_errors == NULL) { 393 primary_exitx(EX_TEMPFAIL, 394 "Unable allocate %zu bytes of memory for hio errors.", 395 sizeof(hio->hio_errors[0]) * ncomps); 396 } 397 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 398 if (hio->hio_next == NULL) { 399 primary_exitx(EX_TEMPFAIL, 400 "Unable allocate %zu bytes of memory for hio_next field.", 401 sizeof(hio->hio_next[0]) * ncomps); 402 } 403 hio->hio_ggio.gctl_version = G_GATE_VERSION; 404 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 405 if (hio->hio_ggio.gctl_data == NULL) { 406 primary_exitx(EX_TEMPFAIL, 407 "Unable to allocate %zu bytes of memory for gctl_data.", 408 MAXPHYS); 409 } 410 hio->hio_ggio.gctl_length = MAXPHYS; 411 hio->hio_ggio.gctl_error = 0; 412 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 413 } 414 415 /* 416 * Turn on signals handling. 417 */ 418 signal(SIGINT, sighandler); 419 signal(SIGTERM, sighandler); 420} 421 422static void 423init_local(struct hast_resource *res) 424{ 425 unsigned char *buf; 426 size_t mapsize; 427 428 if (metadata_read(res, true) < 0) 429 exit(EX_NOINPUT); 430 mtx_init(&res->hr_amp_lock); 431 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 432 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 433 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 434 } 435 mtx_init(&range_lock); 436 cv_init(&range_regular_cond); 437 if (rangelock_init(&range_regular) < 0) 438 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 439 cv_init(&range_sync_cond); 440 if (rangelock_init(&range_sync) < 0) 441 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 442 mapsize = activemap_ondisk_size(res->hr_amp); 443 buf = calloc(1, mapsize); 444 if (buf == NULL) { 445 primary_exitx(EX_TEMPFAIL, 446 "Unable to allocate buffer for activemap."); 447 } 448 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 449 (ssize_t)mapsize) { 450 primary_exit(EX_NOINPUT, "Unable to read activemap"); 451 } 452 activemap_copyin(res->hr_amp, buf, mapsize); 453 free(buf); 454 if (res->hr_resuid != 0) 455 return; 456 /* 457 * We're using provider for the first time, so we have to generate 458 * resource unique identifier and initialize local and remote counts. 459 */ 460 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 461 res->hr_primary_localcnt = 1; 462 res->hr_primary_remotecnt = 0; 463 if (metadata_write(res) < 0) 464 exit(EX_NOINPUT); 465} 466 467static bool 468init_remote(struct hast_resource *res, struct proto_conn **inp, 469 struct proto_conn **outp) 470{ 471 struct proto_conn *in, *out; 472 struct nv *nvout, *nvin; 473 const unsigned char *token; 474 unsigned char *map; 475 const char *errmsg; 476 int32_t extentsize; 477 int64_t datasize; 478 uint32_t mapsize; 479 size_t size; 480 481 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 482 483 in = out = NULL; 484 485 /* Prepare outgoing connection with remote node. */ 486 if (proto_client(res->hr_remoteaddr, &out) < 0) { 487 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 488 res->hr_remoteaddr); 489 } 490 /* Try to connect, but accept failure. */ 491 if (proto_connect(out) < 0) { 492 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 493 res->hr_remoteaddr); 494 goto close; 495 } 496 /* Error in setting timeout is not critical, but why should it fail? */ 497 if (proto_timeout(out, res->hr_timeout) < 0) 498 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 499 /* 500 * First handshake step. 501 * Setup outgoing connection with remote node. 502 */ 503 nvout = nv_alloc(); 504 nv_add_string(nvout, res->hr_name, "resource"); 505 if (nv_error(nvout) != 0) { 506 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 507 "Unable to allocate header for connection with %s", 508 res->hr_remoteaddr); 509 nv_free(nvout); 510 goto close; 511 } 512 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 513 pjdlog_errno(LOG_WARNING, 514 "Unable to send handshake header to %s", 515 res->hr_remoteaddr); 516 nv_free(nvout); 517 goto close; 518 } 519 nv_free(nvout); 520 if (hast_proto_recv_hdr(out, &nvin) < 0) { 521 pjdlog_errno(LOG_WARNING, 522 "Unable to receive handshake header from %s", 523 res->hr_remoteaddr); 524 goto close; 525 } 526 errmsg = nv_get_string(nvin, "errmsg"); 527 if (errmsg != NULL) { 528 pjdlog_warning("%s", errmsg); 529 nv_free(nvin); 530 goto close; 531 } 532 token = nv_get_uint8_array(nvin, &size, "token"); 533 if (token == NULL) { 534 pjdlog_warning("Handshake header from %s has no 'token' field.", 535 res->hr_remoteaddr); 536 nv_free(nvin); 537 goto close; 538 } 539 if (size != sizeof(res->hr_token)) { 540 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 541 res->hr_remoteaddr, size, sizeof(res->hr_token)); 542 nv_free(nvin); 543 goto close; 544 } 545 bcopy(token, res->hr_token, sizeof(res->hr_token)); 546 nv_free(nvin); 547 548 /* 549 * Second handshake step. 550 * Setup incoming connection with remote node. 551 */ 552 if (proto_client(res->hr_remoteaddr, &in) < 0) { 553 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 554 res->hr_remoteaddr); 555 } 556 /* Try to connect, but accept failure. */ 557 if (proto_connect(in) < 0) { 558 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 559 res->hr_remoteaddr); 560 goto close; 561 } 562 /* Error in setting timeout is not critical, but why should it fail? */ 563 if (proto_timeout(in, res->hr_timeout) < 0) 564 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 565 nvout = nv_alloc(); 566 nv_add_string(nvout, res->hr_name, "resource"); 567 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 568 "token"); 569 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 570 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 571 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 572 if (nv_error(nvout) != 0) { 573 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 574 "Unable to allocate header for connection with %s", 575 res->hr_remoteaddr); 576 nv_free(nvout); 577 goto close; 578 } 579 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 580 pjdlog_errno(LOG_WARNING, 581 "Unable to send handshake header to %s", 582 res->hr_remoteaddr); 583 nv_free(nvout); 584 goto close; 585 } 586 nv_free(nvout); 587 if (hast_proto_recv_hdr(out, &nvin) < 0) { 588 pjdlog_errno(LOG_WARNING, 589 "Unable to receive handshake header from %s", 590 res->hr_remoteaddr); 591 goto close; 592 } 593 errmsg = nv_get_string(nvin, "errmsg"); 594 if (errmsg != NULL) { 595 pjdlog_warning("%s", errmsg); 596 nv_free(nvin); 597 goto close; 598 } 599 datasize = nv_get_int64(nvin, "datasize"); 600 if (datasize != res->hr_datasize) { 601 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 602 (intmax_t)res->hr_datasize, (intmax_t)datasize); 603 nv_free(nvin); 604 goto close; 605 } 606 extentsize = nv_get_int32(nvin, "extentsize"); 607 if (extentsize != res->hr_extentsize) { 608 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 609 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 610 nv_free(nvin); 611 goto close; 612 } 613 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 614 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 615 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 616 map = NULL; 617 mapsize = nv_get_uint32(nvin, "mapsize"); 618 if (mapsize > 0) { 619 map = malloc(mapsize); 620 if (map == NULL) { 621 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 622 (uintmax_t)mapsize); 623 nv_free(nvin); 624 goto close; 625 } 626 /* 627 * Remote node have some dirty extents on its own, lets 628 * download its activemap. 629 */ 630 if (hast_proto_recv_data(res, out, nvin, map, 631 mapsize) < 0) { 632 pjdlog_errno(LOG_ERR, 633 "Unable to receive remote activemap"); 634 nv_free(nvin); 635 free(map); 636 goto close; 637 } 638 /* 639 * Merge local and remote bitmaps. 640 */ 641 activemap_merge(res->hr_amp, map, mapsize); 642 free(map); 643 /* 644 * Now that we merged bitmaps from both nodes, flush it to the 645 * disk before we start to synchronize. 646 */ 647 (void)hast_activemap_flush(res); 648 } 649 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 650 if (inp != NULL && outp != NULL) { 651 *inp = in; 652 *outp = out; 653 } else { 654 res->hr_remotein = in; 655 res->hr_remoteout = out; 656 } 657 return (true); 658close: 659 proto_close(out); 660 if (in != NULL) 661 proto_close(in); 662 return (false); 663} 664 665static void 666sync_start(void) 667{ 668 669 mtx_lock(&sync_lock); 670 sync_inprogress = true; 671 mtx_unlock(&sync_lock); 672 cv_signal(&sync_cond); 673} 674 675static void 676init_ggate(struct hast_resource *res) 677{ 678 struct g_gate_ctl_create ggiocreate; 679 struct g_gate_ctl_cancel ggiocancel; 680 681 /* 682 * We communicate with ggate via /dev/ggctl. Open it. 683 */ 684 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 685 if (res->hr_ggatefd < 0) 686 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 687 /* 688 * Create provider before trying to connect, as connection failure 689 * is not critical, but may take some time. 690 */ 691 ggiocreate.gctl_version = G_GATE_VERSION; 692 ggiocreate.gctl_mediasize = res->hr_datasize; 693 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 694 ggiocreate.gctl_flags = 0; 695 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 696 ggiocreate.gctl_timeout = 0; 697 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 698 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 699 res->hr_provname); 700 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 701 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 702 pjdlog_info("Device hast/%s created.", res->hr_provname); 703 res->hr_ggateunit = ggiocreate.gctl_unit; 704 return; 705 } 706 if (errno != EEXIST) { 707 primary_exit(EX_OSERR, "Unable to create hast/%s device", 708 res->hr_provname); 709 } 710 pjdlog_debug(1, 711 "Device hast/%s already exists, we will try to take it over.", 712 res->hr_provname); 713 /* 714 * If we received EEXIST, we assume that the process who created the 715 * provider died and didn't clean up. In that case we will start from 716 * where he left of. 717 */ 718 ggiocancel.gctl_version = G_GATE_VERSION; 719 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 720 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 721 res->hr_provname); 722 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 723 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 724 res->hr_ggateunit = ggiocancel.gctl_unit; 725 return; 726 } 727 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 728 res->hr_provname); 729} 730 731void 732hastd_primary(struct hast_resource *res) 733{ 734 pthread_t td; 735 pid_t pid; 736 int error; 737 738 gres = res; 739 740 /* 741 * Create communication channel between parent and child. 742 */ 743 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 744 KEEP_ERRNO((void)pidfile_remove(pfh)); 745 primary_exit(EX_OSERR, 746 "Unable to create control sockets between parent and child"); 747 } 748 749 pid = fork(); 750 if (pid < 0) { 751 KEEP_ERRNO((void)pidfile_remove(pfh)); 752 primary_exit(EX_TEMPFAIL, "Unable to fork"); 753 } 754 755 if (pid > 0) { 756 /* This is parent. */ 757 res->hr_workerpid = pid; 758 return; 759 } 760 (void)pidfile_close(pfh); 761 762 setproctitle("%s (primary)", res->hr_name); 763 764 signal(SIGHUP, SIG_DFL); 765 signal(SIGCHLD, SIG_DFL); 766 767 init_local(res); 768 if (init_remote(res, NULL, NULL)) 769 sync_start(); 770 init_ggate(res); 771 init_environment(res); 772 error = pthread_create(&td, NULL, ggate_recv_thread, res); 773 assert(error == 0); 774 error = pthread_create(&td, NULL, local_send_thread, res); 775 assert(error == 0); 776 error = pthread_create(&td, NULL, remote_send_thread, res); 777 assert(error == 0); 778 error = pthread_create(&td, NULL, remote_recv_thread, res); 779 assert(error == 0); 780 error = pthread_create(&td, NULL, ggate_send_thread, res); 781 assert(error == 0); 782 error = pthread_create(&td, NULL, sync_thread, res); 783 assert(error == 0); 784 error = pthread_create(&td, NULL, ctrl_thread, res); 785 assert(error == 0); 786 (void)guard_thread(res); 787} 788 789static void 790reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 791{ 792 char msg[1024]; 793 va_list ap; 794 int len; 795 796 va_start(ap, fmt); 797 len = vsnprintf(msg, sizeof(msg), fmt, ap); 798 va_end(ap); 799 if ((size_t)len < sizeof(msg)) { 800 switch (ggio->gctl_cmd) { 801 case BIO_READ: 802 (void)snprintf(msg + len, sizeof(msg) - len, 803 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 804 (uintmax_t)ggio->gctl_length); 805 break; 806 case BIO_DELETE: 807 (void)snprintf(msg + len, sizeof(msg) - len, 808 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 809 (uintmax_t)ggio->gctl_length); 810 break; 811 case BIO_FLUSH: 812 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 813 break; 814 case BIO_WRITE: 815 (void)snprintf(msg + len, sizeof(msg) - len, 816 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 817 (uintmax_t)ggio->gctl_length); 818 break; 819 default: 820 (void)snprintf(msg + len, sizeof(msg) - len, 821 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 822 break; 823 } 824 } 825 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 826} 827 828static void 829remote_close(struct hast_resource *res, int ncomp) 830{ 831 832 rw_wlock(&hio_remote_lock[ncomp]); 833 /* 834 * A race is possible between dropping rlock and acquiring wlock - 835 * another thread can close connection in-between. 836 */ 837 if (!ISCONNECTED(res, ncomp)) { 838 assert(res->hr_remotein == NULL); 839 assert(res->hr_remoteout == NULL); 840 rw_unlock(&hio_remote_lock[ncomp]); 841 return; 842 } 843 844 assert(res->hr_remotein != NULL); 845 assert(res->hr_remoteout != NULL); 846 847 pjdlog_debug(2, "Closing old incoming connection to %s.", 848 res->hr_remoteaddr); 849 proto_close(res->hr_remotein); 850 res->hr_remotein = NULL; 851 pjdlog_debug(2, "Closing old outgoing connection to %s.", 852 res->hr_remoteaddr); 853 proto_close(res->hr_remoteout); 854 res->hr_remoteout = NULL; 855 856 rw_unlock(&hio_remote_lock[ncomp]); 857 858 /* 859 * Stop synchronization if in-progress. 860 */ 861 mtx_lock(&sync_lock); 862 if (sync_inprogress) 863 sync_inprogress = false; 864 mtx_unlock(&sync_lock); 865 866 /* 867 * Wake up guard thread, so it can immediately start reconnect. 868 */ 869 mtx_lock(&hio_guard_lock); 870 cv_signal(&hio_guard_cond); 871 mtx_unlock(&hio_guard_lock); 872} 873 874/* 875 * Thread receives ggate I/O requests from the kernel and passes them to 876 * appropriate threads: 877 * WRITE - always goes to both local_send and remote_send threads 878 * READ (when the block is up-to-date on local component) - 879 * only local_send thread 880 * READ (when the block isn't up-to-date on local component) - 881 * only remote_send thread 882 * DELETE - always goes to both local_send and remote_send threads 883 * FLUSH - always goes to both local_send and remote_send threads 884 */ 885static void * 886ggate_recv_thread(void *arg) 887{ 888 struct hast_resource *res = arg; 889 struct g_gate_ctl_io *ggio; 890 struct hio *hio; 891 unsigned int ii, ncomp, ncomps; 892 int error; 893 894 ncomps = HAST_NCOMPONENTS; 895 896 for (;;) { 897 pjdlog_debug(2, "ggate_recv: Taking free request."); 898 QUEUE_TAKE2(hio, free); 899 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 900 ggio = &hio->hio_ggio; 901 ggio->gctl_unit = res->hr_ggateunit; 902 ggio->gctl_length = MAXPHYS; 903 ggio->gctl_error = 0; 904 pjdlog_debug(2, 905 "ggate_recv: (%p) Waiting for request from the kernel.", 906 hio); 907 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 908 if (sigexit_received) 909 pthread_exit(NULL); 910 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 911 } 912 error = ggio->gctl_error; 913 switch (error) { 914 case 0: 915 break; 916 case ECANCELED: 917 /* Exit gracefully. */ 918 if (!sigexit_received) { 919 pjdlog_debug(2, 920 "ggate_recv: (%p) Received cancel from the kernel.", 921 hio); 922 pjdlog_info("Received cancel from the kernel, exiting."); 923 } 924 pthread_exit(NULL); 925 case ENOMEM: 926 /* 927 * Buffer too small? Impossible, we allocate MAXPHYS 928 * bytes - request can't be bigger than that. 929 */ 930 /* FALLTHROUGH */ 931 case ENXIO: 932 default: 933 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 934 strerror(error)); 935 } 936 for (ii = 0; ii < ncomps; ii++) 937 hio->hio_errors[ii] = EINVAL; 938 reqlog(LOG_DEBUG, 2, ggio, 939 "ggate_recv: (%p) Request received from the kernel: ", 940 hio); 941 /* 942 * Inform all components about new write request. 943 * For read request prefer local component unless the given 944 * range is out-of-date, then use remote component. 945 */ 946 switch (ggio->gctl_cmd) { 947 case BIO_READ: 948 pjdlog_debug(2, 949 "ggate_recv: (%p) Moving request to the send queue.", 950 hio); 951 refcount_init(&hio->hio_countdown, 1); 952 mtx_lock(&metadata_lock); 953 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 954 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 955 /* 956 * This range is up-to-date on local component, 957 * so handle request locally. 958 */ 959 /* Local component is 0 for now. */ 960 ncomp = 0; 961 } else /* if (res->hr_syncsrc == 962 HAST_SYNCSRC_SECONDARY) */ { 963 assert(res->hr_syncsrc == 964 HAST_SYNCSRC_SECONDARY); 965 /* 966 * This range is out-of-date on local component, 967 * so send request to the remote node. 968 */ 969 /* Remote component is 1 for now. */ 970 ncomp = 1; 971 } 972 mtx_unlock(&metadata_lock); 973 QUEUE_INSERT1(hio, send, ncomp); 974 break; 975 case BIO_WRITE: 976 for (;;) { 977 mtx_lock(&range_lock); 978 if (rangelock_islocked(range_sync, 979 ggio->gctl_offset, ggio->gctl_length)) { 980 pjdlog_debug(2, 981 "regular: Range offset=%jd length=%zu locked.", 982 (intmax_t)ggio->gctl_offset, 983 (size_t)ggio->gctl_length); 984 range_regular_wait = true; 985 cv_wait(&range_regular_cond, &range_lock); 986 range_regular_wait = false; 987 mtx_unlock(&range_lock); 988 continue; 989 } 990 if (rangelock_add(range_regular, 991 ggio->gctl_offset, ggio->gctl_length) < 0) { 992 mtx_unlock(&range_lock); 993 pjdlog_debug(2, 994 "regular: Range offset=%jd length=%zu is already locked, waiting.", 995 (intmax_t)ggio->gctl_offset, 996 (size_t)ggio->gctl_length); 997 sleep(1); 998 continue; 999 } 1000 mtx_unlock(&range_lock); 1001 break; 1002 } 1003 mtx_lock(&res->hr_amp_lock); 1004 if (activemap_write_start(res->hr_amp, 1005 ggio->gctl_offset, ggio->gctl_length)) { 1006 (void)hast_activemap_flush(res); 1007 } 1008 mtx_unlock(&res->hr_amp_lock); 1009 /* FALLTHROUGH */ 1010 case BIO_DELETE: 1011 case BIO_FLUSH: 1012 pjdlog_debug(2, 1013 "ggate_recv: (%p) Moving request to the send queues.", 1014 hio); 1015 refcount_init(&hio->hio_countdown, ncomps); 1016 for (ii = 0; ii < ncomps; ii++) 1017 QUEUE_INSERT1(hio, send, ii); 1018 break; 1019 } 1020 } 1021 /* NOTREACHED */ 1022 return (NULL); 1023} 1024 1025/* 1026 * Thread reads from or writes to local component. 1027 * If local read fails, it redirects it to remote_send thread. 1028 */ 1029static void * 1030local_send_thread(void *arg) 1031{ 1032 struct hast_resource *res = arg; 1033 struct g_gate_ctl_io *ggio; 1034 struct hio *hio; 1035 unsigned int ncomp, rncomp; 1036 ssize_t ret; 1037 1038 /* Local component is 0 for now. */ 1039 ncomp = 0; 1040 /* Remote component is 1 for now. */ 1041 rncomp = 1; 1042 1043 for (;;) { 1044 pjdlog_debug(2, "local_send: Taking request."); 1045 QUEUE_TAKE1(hio, send, ncomp); 1046 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1047 ggio = &hio->hio_ggio; 1048 switch (ggio->gctl_cmd) { 1049 case BIO_READ: 1050 ret = pread(res->hr_localfd, ggio->gctl_data, 1051 ggio->gctl_length, 1052 ggio->gctl_offset + res->hr_localoff); 1053 if (ret == ggio->gctl_length) 1054 hio->hio_errors[ncomp] = 0; 1055 else { 1056 /* 1057 * If READ failed, try to read from remote node. 1058 */ 1059 QUEUE_INSERT1(hio, send, rncomp); 1060 continue; 1061 } 1062 break; 1063 case BIO_WRITE: 1064 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1065 ggio->gctl_length, 1066 ggio->gctl_offset + res->hr_localoff); 1067 if (ret < 0) 1068 hio->hio_errors[ncomp] = errno; 1069 else if (ret != ggio->gctl_length) 1070 hio->hio_errors[ncomp] = EIO; 1071 else 1072 hio->hio_errors[ncomp] = 0; 1073 break; 1074 case BIO_DELETE: 1075 ret = g_delete(res->hr_localfd, 1076 ggio->gctl_offset + res->hr_localoff, 1077 ggio->gctl_length); 1078 if (ret < 0) 1079 hio->hio_errors[ncomp] = errno; 1080 else 1081 hio->hio_errors[ncomp] = 0; 1082 break; 1083 case BIO_FLUSH: 1084 ret = g_flush(res->hr_localfd); 1085 if (ret < 0) 1086 hio->hio_errors[ncomp] = errno; 1087 else 1088 hio->hio_errors[ncomp] = 0; 1089 break; 1090 } 1091 if (refcount_release(&hio->hio_countdown)) { 1092 if (ISSYNCREQ(hio)) { 1093 mtx_lock(&sync_lock); 1094 SYNCREQDONE(hio); 1095 mtx_unlock(&sync_lock); 1096 cv_signal(&sync_cond); 1097 } else { 1098 pjdlog_debug(2, 1099 "local_send: (%p) Moving request to the done queue.", 1100 hio); 1101 QUEUE_INSERT2(hio, done); 1102 } 1103 } 1104 } 1105 /* NOTREACHED */ 1106 return (NULL); 1107} 1108 1109/* 1110 * Thread sends request to secondary node. 1111 */ 1112static void * 1113remote_send_thread(void *arg) 1114{ 1115 struct hast_resource *res = arg; 1116 struct g_gate_ctl_io *ggio; 1117 struct hio *hio; 1118 struct nv *nv; 1119 unsigned int ncomp; 1120 bool wakeup; 1121 uint64_t offset, length; 1122 uint8_t cmd; 1123 void *data; 1124 1125 /* Remote component is 1 for now. */ 1126 ncomp = 1; 1127 1128 for (;;) { 1129 pjdlog_debug(2, "remote_send: Taking request."); 1130 QUEUE_TAKE1(hio, send, ncomp); 1131 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1132 ggio = &hio->hio_ggio; 1133 switch (ggio->gctl_cmd) { 1134 case BIO_READ: 1135 cmd = HIO_READ; 1136 data = NULL; 1137 offset = ggio->gctl_offset; 1138 length = ggio->gctl_length; 1139 break; 1140 case BIO_WRITE: 1141 cmd = HIO_WRITE; 1142 data = ggio->gctl_data; 1143 offset = ggio->gctl_offset; 1144 length = ggio->gctl_length; 1145 break; 1146 case BIO_DELETE: 1147 cmd = HIO_DELETE; 1148 data = NULL; 1149 offset = ggio->gctl_offset; 1150 length = ggio->gctl_length; 1151 break; 1152 case BIO_FLUSH: 1153 cmd = HIO_FLUSH; 1154 data = NULL; 1155 offset = 0; 1156 length = 0; 1157 break; 1158 default: 1159 assert(!"invalid condition"); 1160 abort(); 1161 } 1162 nv = nv_alloc(); 1163 nv_add_uint8(nv, cmd, "cmd"); 1164 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1165 nv_add_uint64(nv, offset, "offset"); 1166 nv_add_uint64(nv, length, "length"); 1167 if (nv_error(nv) != 0) { 1168 hio->hio_errors[ncomp] = nv_error(nv); 1169 pjdlog_debug(2, 1170 "remote_send: (%p) Unable to prepare header to send.", 1171 hio); 1172 reqlog(LOG_ERR, 0, ggio, 1173 "Unable to prepare header to send (%s): ", 1174 strerror(nv_error(nv))); 1175 /* Move failed request immediately to the done queue. */ 1176 goto done_queue; 1177 } 1178 pjdlog_debug(2, 1179 "remote_send: (%p) Moving request to the recv queue.", 1180 hio); 1181 /* 1182 * Protect connection from disappearing. 1183 */ 1184 rw_rlock(&hio_remote_lock[ncomp]); 1185 if (!ISCONNECTED(res, ncomp)) { 1186 rw_unlock(&hio_remote_lock[ncomp]); 1187 hio->hio_errors[ncomp] = ENOTCONN; 1188 goto done_queue; 1189 } 1190 /* 1191 * Move the request to recv queue before sending it, because 1192 * in different order we can get reply before we move request 1193 * to recv queue. 1194 */ 1195 mtx_lock(&hio_recv_list_lock[ncomp]); 1196 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1197 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1198 mtx_unlock(&hio_recv_list_lock[ncomp]); 1199 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1200 data != NULL ? length : 0) < 0) { 1201 hio->hio_errors[ncomp] = errno; 1202 rw_unlock(&hio_remote_lock[ncomp]); 1203 remote_close(res, ncomp); 1204 pjdlog_debug(2, 1205 "remote_send: (%p) Unable to send request.", hio); 1206 reqlog(LOG_ERR, 0, ggio, 1207 "Unable to send request (%s): ", 1208 strerror(hio->hio_errors[ncomp])); 1209 /* 1210 * Take request back from the receive queue and move 1211 * it immediately to the done queue. 1212 */ 1213 mtx_lock(&hio_recv_list_lock[ncomp]); 1214 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1215 mtx_unlock(&hio_recv_list_lock[ncomp]); 1216 goto done_queue; 1217 } 1218 rw_unlock(&hio_remote_lock[ncomp]); 1219 nv_free(nv); 1220 if (wakeup) 1221 cv_signal(&hio_recv_list_cond[ncomp]); 1222 continue; 1223done_queue: 1224 nv_free(nv); 1225 if (ISSYNCREQ(hio)) { 1226 if (!refcount_release(&hio->hio_countdown)) 1227 continue; 1228 mtx_lock(&sync_lock); 1229 SYNCREQDONE(hio); 1230 mtx_unlock(&sync_lock); 1231 cv_signal(&sync_cond); 1232 continue; 1233 } 1234 if (ggio->gctl_cmd == BIO_WRITE) { 1235 mtx_lock(&res->hr_amp_lock); 1236 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1237 ggio->gctl_length)) { 1238 (void)hast_activemap_flush(res); 1239 } 1240 mtx_unlock(&res->hr_amp_lock); 1241 } 1242 if (!refcount_release(&hio->hio_countdown)) 1243 continue; 1244 pjdlog_debug(2, 1245 "remote_send: (%p) Moving request to the done queue.", 1246 hio); 1247 QUEUE_INSERT2(hio, done); 1248 } 1249 /* NOTREACHED */ 1250 return (NULL); 1251} 1252 1253/* 1254 * Thread receives answer from secondary node and passes it to ggate_send 1255 * thread. 1256 */ 1257static void * 1258remote_recv_thread(void *arg) 1259{ 1260 struct hast_resource *res = arg; 1261 struct g_gate_ctl_io *ggio; 1262 struct hio *hio; 1263 struct nv *nv; 1264 unsigned int ncomp; 1265 uint64_t seq; 1266 int error; 1267 1268 /* Remote component is 1 for now. */ 1269 ncomp = 1; 1270 1271 for (;;) { 1272 /* Wait until there is anything to receive. */ 1273 mtx_lock(&hio_recv_list_lock[ncomp]); 1274 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1275 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1276 cv_wait(&hio_recv_list_cond[ncomp], 1277 &hio_recv_list_lock[ncomp]); 1278 } 1279 mtx_unlock(&hio_recv_list_lock[ncomp]); 1280 rw_rlock(&hio_remote_lock[ncomp]); 1281 if (!ISCONNECTED(res, ncomp)) { 1282 rw_unlock(&hio_remote_lock[ncomp]); 1283 /* 1284 * Connection is dead, so move all pending requests to 1285 * the done queue (one-by-one). 1286 */ 1287 mtx_lock(&hio_recv_list_lock[ncomp]); 1288 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1289 assert(hio != NULL); 1290 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1291 hio_next[ncomp]); 1292 mtx_unlock(&hio_recv_list_lock[ncomp]); 1293 goto done_queue; 1294 } 1295 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1296 pjdlog_errno(LOG_ERR, 1297 "Unable to receive reply header"); 1298 rw_unlock(&hio_remote_lock[ncomp]); 1299 remote_close(res, ncomp); 1300 continue; 1301 } 1302 rw_unlock(&hio_remote_lock[ncomp]); 1303 seq = nv_get_uint64(nv, "seq"); 1304 if (seq == 0) { 1305 pjdlog_error("Header contains no 'seq' field."); 1306 nv_free(nv); 1307 continue; 1308 } 1309 mtx_lock(&hio_recv_list_lock[ncomp]); 1310 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1311 if (hio->hio_ggio.gctl_seq == seq) { 1312 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1313 hio_next[ncomp]); 1314 break; 1315 } 1316 } 1317 mtx_unlock(&hio_recv_list_lock[ncomp]); 1318 if (hio == NULL) { 1319 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1320 (uintmax_t)seq); 1321 nv_free(nv); 1322 continue; 1323 } 1324 error = nv_get_int16(nv, "error"); 1325 if (error != 0) { 1326 /* Request failed on remote side. */ 1327 hio->hio_errors[ncomp] = 0; 1328 nv_free(nv); 1329 goto done_queue; 1330 } 1331 ggio = &hio->hio_ggio; 1332 switch (ggio->gctl_cmd) { 1333 case BIO_READ: 1334 rw_rlock(&hio_remote_lock[ncomp]); 1335 if (!ISCONNECTED(res, ncomp)) { 1336 rw_unlock(&hio_remote_lock[ncomp]); 1337 nv_free(nv); 1338 goto done_queue; 1339 } 1340 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1341 ggio->gctl_data, ggio->gctl_length) < 0) { 1342 hio->hio_errors[ncomp] = errno; 1343 pjdlog_errno(LOG_ERR, 1344 "Unable to receive reply data"); 1345 rw_unlock(&hio_remote_lock[ncomp]); 1346 nv_free(nv); 1347 remote_close(res, ncomp); 1348 goto done_queue; 1349 } 1350 rw_unlock(&hio_remote_lock[ncomp]); 1351 break; 1352 case BIO_WRITE: 1353 case BIO_DELETE: 1354 case BIO_FLUSH: 1355 break; 1356 default: 1357 assert(!"invalid condition"); 1358 abort(); 1359 } 1360 hio->hio_errors[ncomp] = 0; 1361 nv_free(nv); 1362done_queue: 1363 if (refcount_release(&hio->hio_countdown)) { 1364 if (ISSYNCREQ(hio)) { 1365 mtx_lock(&sync_lock); 1366 SYNCREQDONE(hio); 1367 mtx_unlock(&sync_lock); 1368 cv_signal(&sync_cond); 1369 } else { 1370 pjdlog_debug(2, 1371 "remote_recv: (%p) Moving request to the done queue.", 1372 hio); 1373 QUEUE_INSERT2(hio, done); 1374 } 1375 } 1376 } 1377 /* NOTREACHED */ 1378 return (NULL); 1379} 1380 1381/* 1382 * Thread sends answer to the kernel. 1383 */ 1384static void * 1385ggate_send_thread(void *arg) 1386{ 1387 struct hast_resource *res = arg; 1388 struct g_gate_ctl_io *ggio; 1389 struct hio *hio; 1390 unsigned int ii, ncomp, ncomps; 1391 1392 ncomps = HAST_NCOMPONENTS; 1393 1394 for (;;) { 1395 pjdlog_debug(2, "ggate_send: Taking request."); 1396 QUEUE_TAKE2(hio, done); 1397 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1398 ggio = &hio->hio_ggio; 1399 for (ii = 0; ii < ncomps; ii++) { 1400 if (hio->hio_errors[ii] == 0) { 1401 /* 1402 * One successful request is enough to declare 1403 * success. 1404 */ 1405 ggio->gctl_error = 0; 1406 break; 1407 } 1408 } 1409 if (ii == ncomps) { 1410 /* 1411 * None of the requests were successful. 1412 * Use first error. 1413 */ 1414 ggio->gctl_error = hio->hio_errors[0]; 1415 } 1416 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1417 mtx_lock(&res->hr_amp_lock); 1418 activemap_write_complete(res->hr_amp, 1419 ggio->gctl_offset, ggio->gctl_length); 1420 mtx_unlock(&res->hr_amp_lock); 1421 } 1422 if (ggio->gctl_cmd == BIO_WRITE) { 1423 /* 1424 * Unlock range we locked. 1425 */ 1426 mtx_lock(&range_lock); 1427 rangelock_del(range_regular, ggio->gctl_offset, 1428 ggio->gctl_length); 1429 if (range_sync_wait) 1430 cv_signal(&range_sync_cond); 1431 mtx_unlock(&range_lock); 1432 /* 1433 * Bump local count if this is first write after 1434 * connection failure with remote node. 1435 */ 1436 ncomp = 1; 1437 rw_rlock(&hio_remote_lock[ncomp]); 1438 if (!ISCONNECTED(res, ncomp)) { 1439 mtx_lock(&metadata_lock); 1440 if (res->hr_primary_localcnt == 1441 res->hr_secondary_remotecnt) { 1442 res->hr_primary_localcnt++; 1443 pjdlog_debug(1, 1444 "Increasing localcnt to %ju.", 1445 (uintmax_t)res->hr_primary_localcnt); 1446 (void)metadata_write(res); 1447 } 1448 mtx_unlock(&metadata_lock); 1449 } 1450 rw_unlock(&hio_remote_lock[ncomp]); 1451 } 1452 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1453 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1454 pjdlog_debug(2, 1455 "ggate_send: (%p) Moving request to the free queue.", hio); 1456 QUEUE_INSERT2(hio, free); 1457 } 1458 /* NOTREACHED */ 1459 return (NULL); 1460} 1461 1462/* 1463 * Thread synchronize local and remote components. 1464 */ 1465static void * 1466sync_thread(void *arg __unused) 1467{ 1468 struct hast_resource *res = arg; 1469 struct hio *hio; 1470 struct g_gate_ctl_io *ggio; 1471 unsigned int ii, ncomp, ncomps; 1472 off_t offset, length, synced; 1473 bool dorewind; 1474 int syncext; 1475 1476 ncomps = HAST_NCOMPONENTS; 1477 dorewind = true; 1478 synced = 0; 1479 1480 for (;;) { 1481 mtx_lock(&sync_lock); 1482 while (!sync_inprogress) { 1483 dorewind = true; 1484 synced = 0; 1485 cv_wait(&sync_cond, &sync_lock); 1486 } 1487 mtx_unlock(&sync_lock); 1488 /* 1489 * Obtain offset at which we should synchronize. 1490 * Rewind synchronization if needed. 1491 */ 1492 mtx_lock(&res->hr_amp_lock); 1493 if (dorewind) 1494 activemap_sync_rewind(res->hr_amp); 1495 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1496 if (syncext != -1) { 1497 /* 1498 * We synchronized entire syncext extent, we can mark 1499 * it as clean now. 1500 */ 1501 if (activemap_extent_complete(res->hr_amp, syncext)) 1502 (void)hast_activemap_flush(res); 1503 } 1504 mtx_unlock(&res->hr_amp_lock); 1505 if (dorewind) { 1506 dorewind = false; 1507 if (offset < 0) 1508 pjdlog_info("Nodes are in sync."); 1509 else { 1510 pjdlog_info("Synchronization started. %ju bytes to go.", 1511 (uintmax_t)(res->hr_extentsize * 1512 activemap_ndirty(res->hr_amp))); 1513 } 1514 } 1515 if (offset < 0) { 1516 mtx_lock(&sync_lock); 1517 sync_inprogress = false; 1518 mtx_unlock(&sync_lock); 1519 pjdlog_debug(1, "Nothing to synchronize."); 1520 /* 1521 * Synchronization complete, make both localcnt and 1522 * remotecnt equal. 1523 */ 1524 ncomp = 1; 1525 rw_rlock(&hio_remote_lock[ncomp]); 1526 if (ISCONNECTED(res, ncomp)) { 1527 if (synced > 0) { 1528 pjdlog_info("Synchronization complete. " 1529 "%jd bytes synchronized.", 1530 (intmax_t)synced); 1531 } 1532 mtx_lock(&metadata_lock); 1533 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1534 res->hr_primary_localcnt = 1535 res->hr_secondary_localcnt; 1536 res->hr_primary_remotecnt = 1537 res->hr_secondary_remotecnt; 1538 pjdlog_debug(1, 1539 "Setting localcnt to %ju and remotecnt to %ju.", 1540 (uintmax_t)res->hr_primary_localcnt, 1541 (uintmax_t)res->hr_secondary_localcnt); 1542 (void)metadata_write(res); 1543 mtx_unlock(&metadata_lock); 1544 } else if (synced > 0) { 1545 pjdlog_info("Synchronization interrupted. " 1546 "%jd bytes synchronized so far.", 1547 (intmax_t)synced); 1548 } 1549 rw_unlock(&hio_remote_lock[ncomp]); 1550 continue; 1551 } 1552 pjdlog_debug(2, "sync: Taking free request."); 1553 QUEUE_TAKE2(hio, free); 1554 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1555 /* 1556 * Lock the range we are going to synchronize. We don't want 1557 * race where someone writes between our read and write. 1558 */ 1559 for (;;) { 1560 mtx_lock(&range_lock); 1561 if (rangelock_islocked(range_regular, offset, length)) { 1562 pjdlog_debug(2, 1563 "sync: Range offset=%jd length=%jd locked.", 1564 (intmax_t)offset, (intmax_t)length); 1565 range_sync_wait = true; 1566 cv_wait(&range_sync_cond, &range_lock); 1567 range_sync_wait = false; 1568 mtx_unlock(&range_lock); 1569 continue; 1570 } 1571 if (rangelock_add(range_sync, offset, length) < 0) { 1572 mtx_unlock(&range_lock); 1573 pjdlog_debug(2, 1574 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1575 (intmax_t)offset, (intmax_t)length); 1576 sleep(1); 1577 continue; 1578 } 1579 mtx_unlock(&range_lock); 1580 break; 1581 } 1582 /* 1583 * First read the data from synchronization source. 1584 */ 1585 SYNCREQ(hio); 1586 ggio = &hio->hio_ggio; 1587 ggio->gctl_cmd = BIO_READ; 1588 ggio->gctl_offset = offset; 1589 ggio->gctl_length = length; 1590 ggio->gctl_error = 0; 1591 for (ii = 0; ii < ncomps; ii++) 1592 hio->hio_errors[ii] = EINVAL; 1593 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1594 hio); 1595 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1596 hio); 1597 mtx_lock(&metadata_lock); 1598 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1599 /* 1600 * This range is up-to-date on local component, 1601 * so handle request locally. 1602 */ 1603 /* Local component is 0 for now. */ 1604 ncomp = 0; 1605 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1606 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1607 /* 1608 * This range is out-of-date on local component, 1609 * so send request to the remote node. 1610 */ 1611 /* Remote component is 1 for now. */ 1612 ncomp = 1; 1613 } 1614 mtx_unlock(&metadata_lock); 1615 refcount_init(&hio->hio_countdown, 1); 1616 QUEUE_INSERT1(hio, send, ncomp); 1617 1618 /* 1619 * Let's wait for READ to finish. 1620 */ 1621 mtx_lock(&sync_lock); 1622 while (!ISSYNCREQDONE(hio)) 1623 cv_wait(&sync_cond, &sync_lock); 1624 mtx_unlock(&sync_lock); 1625 1626 if (hio->hio_errors[ncomp] != 0) { 1627 pjdlog_error("Unable to read synchronization data: %s.", 1628 strerror(hio->hio_errors[ncomp])); 1629 goto free_queue; 1630 } 1631 1632 /* 1633 * We read the data from synchronization source, now write it 1634 * to synchronization target. 1635 */ 1636 SYNCREQ(hio); 1637 ggio->gctl_cmd = BIO_WRITE; 1638 for (ii = 0; ii < ncomps; ii++) 1639 hio->hio_errors[ii] = EINVAL; 1640 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1641 hio); 1642 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1643 hio); 1644 mtx_lock(&metadata_lock); 1645 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1646 /* 1647 * This range is up-to-date on local component, 1648 * so we update remote component. 1649 */ 1650 /* Remote component is 1 for now. */ 1651 ncomp = 1; 1652 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1653 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1654 /* 1655 * This range is out-of-date on local component, 1656 * so we update it. 1657 */ 1658 /* Local component is 0 for now. */ 1659 ncomp = 0; 1660 } 1661 mtx_unlock(&metadata_lock); 1662 1663 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1664 hio); 1665 refcount_init(&hio->hio_countdown, 1); 1666 QUEUE_INSERT1(hio, send, ncomp); 1667 1668 /* 1669 * Let's wait for WRITE to finish. 1670 */ 1671 mtx_lock(&sync_lock); 1672 while (!ISSYNCREQDONE(hio)) 1673 cv_wait(&sync_cond, &sync_lock); 1674 mtx_unlock(&sync_lock); 1675 1676 if (hio->hio_errors[ncomp] != 0) { 1677 pjdlog_error("Unable to write synchronization data: %s.", 1678 strerror(hio->hio_errors[ncomp])); 1679 goto free_queue; 1680 } 1681free_queue: 1682 mtx_lock(&range_lock); 1683 rangelock_del(range_sync, offset, length); 1684 if (range_regular_wait) 1685 cv_signal(&range_regular_cond); 1686 mtx_unlock(&range_lock); 1687 1688 synced += length; 1689 1690 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1691 hio); 1692 QUEUE_INSERT2(hio, free); 1693 } 1694 /* NOTREACHED */ 1695 return (NULL); 1696} 1697 1698static void 1699sighandler(int sig) 1700{ 1701 bool unlock; 1702 1703 switch (sig) { 1704 case SIGINT: 1705 case SIGTERM: 1706 sigexit_received = true; 1707 break; 1708 default: 1709 assert(!"invalid condition"); 1710 } 1711 /* 1712 * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't 1713 * want to risk deadlock. 1714 */ 1715 unlock = mtx_trylock(&hio_guard_lock); 1716 cv_signal(&hio_guard_cond); 1717 if (unlock) 1718 mtx_unlock(&hio_guard_lock); 1719} 1720 1721/* 1722 * Thread guards remote connections and reconnects when needed, handles 1723 * signals, etc. 1724 */ 1725static void * 1726guard_thread(void *arg) 1727{ 1728 struct hast_resource *res = arg; 1729 struct proto_conn *in, *out; 1730 unsigned int ii, ncomps; 1731 int timeout; 1732 1733 ncomps = HAST_NCOMPONENTS; 1734 /* The is only one remote component for now. */ 1735#define ISREMOTE(no) ((no) == 1) 1736 1737 for (;;) { 1738 if (sigexit_received) { 1739 primary_exitx(EX_OK, 1740 "Termination signal received, exiting."); 1741 } 1742 /* 1743 * If all the connection will be fine, we will sleep until 1744 * someone wakes us up. 1745 * If any of the connections will be broken and we won't be 1746 * able to connect, we will sleep only for RECONNECT_SLEEP 1747 * seconds so we can retry soon. 1748 */ 1749 timeout = 0; 1750 pjdlog_debug(2, "remote_guard: Checking connections."); 1751 mtx_lock(&hio_guard_lock); 1752 for (ii = 0; ii < ncomps; ii++) { 1753 if (!ISREMOTE(ii)) 1754 continue; 1755 rw_rlock(&hio_remote_lock[ii]); 1756 if (ISCONNECTED(res, ii)) { 1757 assert(res->hr_remotein != NULL); 1758 assert(res->hr_remoteout != NULL); 1759 rw_unlock(&hio_remote_lock[ii]); 1760 pjdlog_debug(2, 1761 "remote_guard: Connection to %s is ok.", 1762 res->hr_remoteaddr); 1763 } else { 1764 assert(res->hr_remotein == NULL); 1765 assert(res->hr_remoteout == NULL); 1766 /* 1767 * Upgrade the lock. It doesn't have to be 1768 * atomic as no other thread can change 1769 * connection status from disconnected to 1770 * connected. 1771 */ 1772 rw_unlock(&hio_remote_lock[ii]); 1773 pjdlog_debug(2, 1774 "remote_guard: Reconnecting to %s.", 1775 res->hr_remoteaddr); 1776 in = out = NULL; 1777 if (init_remote(res, &in, &out)) { 1778 rw_wlock(&hio_remote_lock[ii]); 1779 assert(res->hr_remotein == NULL); 1780 assert(res->hr_remoteout == NULL); 1781 assert(in != NULL && out != NULL); 1782 res->hr_remotein = in; 1783 res->hr_remoteout = out; 1784 rw_unlock(&hio_remote_lock[ii]); 1785 pjdlog_info("Successfully reconnected to %s.", 1786 res->hr_remoteaddr); 1787 sync_start(); 1788 } else { 1789 /* Both connections should be NULL. */ 1790 assert(res->hr_remotein == NULL); 1791 assert(res->hr_remoteout == NULL); 1792 assert(in == NULL && out == NULL); 1793 pjdlog_debug(2, 1794 "remote_guard: Reconnect to %s failed.", 1795 res->hr_remoteaddr); 1796 timeout = RECONNECT_SLEEP; 1797 } 1798 } 1799 } 1800 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1801 mtx_unlock(&hio_guard_lock); 1802 } 1803#undef ISREMOTE 1804 /* NOTREACHED */ 1805 return (NULL); 1806} 1807