primary.c revision 211882
1/*- 2 * Copyright (c) 2009 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 211882 2010-08-27 14:26:37Z pjd $"); 33 34#include <sys/types.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/refcount.h> 39#include <sys/stat.h> 40 41#include <geom/gate/g_gate.h> 42 43#include <assert.h> 44#include <err.h> 45#include <errno.h> 46#include <fcntl.h> 47#include <libgeom.h> 48#include <pthread.h> 49#include <stdint.h> 50#include <stdio.h> 51#include <string.h> 52#include <sysexits.h> 53#include <unistd.h> 54 55#include <activemap.h> 56#include <nv.h> 57#include <rangelock.h> 58 59#include "control.h" 60#include "hast.h" 61#include "hast_proto.h" 62#include "hastd.h" 63#include "metadata.h" 64#include "proto.h" 65#include "pjdlog.h" 66#include "subr.h" 67#include "synch.h" 68 69/* The is only one remote component for now. */ 70#define ISREMOTE(no) ((no) == 1) 71 72struct hio { 73 /* 74 * Number of components we are still waiting for. 75 * When this field goes to 0, we can send the request back to the 76 * kernel. Each component has to decrease this counter by one 77 * even on failure. 78 */ 79 unsigned int hio_countdown; 80 /* 81 * Each component has a place to store its own error. 82 * Once the request is handled by all components we can decide if the 83 * request overall is successful or not. 84 */ 85 int *hio_errors; 86 /* 87 * Structure used to comunicate with GEOM Gate class. 88 */ 89 struct g_gate_ctl_io hio_ggio; 90 TAILQ_ENTRY(hio) *hio_next; 91}; 92#define hio_free_next hio_next[0] 93#define hio_done_next hio_next[0] 94 95/* 96 * Free list holds unused structures. When free list is empty, we have to wait 97 * until some in-progress requests are freed. 98 */ 99static TAILQ_HEAD(, hio) hio_free_list; 100static pthread_mutex_t hio_free_list_lock; 101static pthread_cond_t hio_free_list_cond; 102/* 103 * There is one send list for every component. One requests is placed on all 104 * send lists - each component gets the same request, but each component is 105 * responsible for managing his own send list. 106 */ 107static TAILQ_HEAD(, hio) *hio_send_list; 108static pthread_mutex_t *hio_send_list_lock; 109static pthread_cond_t *hio_send_list_cond; 110/* 111 * There is one recv list for every component, although local components don't 112 * use recv lists as local requests are done synchronously. 113 */ 114static TAILQ_HEAD(, hio) *hio_recv_list; 115static pthread_mutex_t *hio_recv_list_lock; 116static pthread_cond_t *hio_recv_list_cond; 117/* 118 * Request is placed on done list by the slowest component (the one that 119 * decreased hio_countdown from 1 to 0). 120 */ 121static TAILQ_HEAD(, hio) hio_done_list; 122static pthread_mutex_t hio_done_list_lock; 123static pthread_cond_t hio_done_list_cond; 124/* 125 * Structure below are for interaction with sync thread. 126 */ 127static bool sync_inprogress; 128static pthread_mutex_t sync_lock; 129static pthread_cond_t sync_cond; 130/* 131 * The lock below allows to synchornize access to remote connections. 132 */ 133static pthread_rwlock_t *hio_remote_lock; 134static pthread_mutex_t hio_guard_lock; 135static pthread_cond_t hio_guard_cond; 136 137/* 138 * Lock to synchronize metadata updates. Also synchronize access to 139 * hr_primary_localcnt and hr_primary_remotecnt fields. 140 */ 141static pthread_mutex_t metadata_lock; 142 143/* 144 * Maximum number of outstanding I/O requests. 145 */ 146#define HAST_HIO_MAX 256 147/* 148 * Number of components. At this point there are only two components: local 149 * and remote, but in the future it might be possible to use multiple local 150 * and remote components. 151 */ 152#define HAST_NCOMPONENTS 2 153/* 154 * Number of seconds to sleep between keepalive packets. 155 */ 156#define KEEPALIVE_SLEEP 10 157/* 158 * Number of seconds to sleep between reconnect retries. 159 */ 160#define RECONNECT_SLEEP 5 161 162#define ISCONNECTED(res, no) \ 163 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 164 165#define QUEUE_INSERT1(hio, name, ncomp) do { \ 166 bool _wakeup; \ 167 \ 168 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 169 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 170 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 171 hio_next[(ncomp)]); \ 172 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 173 if (_wakeup) \ 174 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 175} while (0) 176#define QUEUE_INSERT2(hio, name) do { \ 177 bool _wakeup; \ 178 \ 179 mtx_lock(&hio_##name##_list_lock); \ 180 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 181 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 182 mtx_unlock(&hio_##name##_list_lock); \ 183 if (_wakeup) \ 184 cv_signal(&hio_##name##_list_cond); \ 185} while (0) 186#define QUEUE_TAKE1(hio, name, ncomp) do { \ 187 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 188 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 189 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 190 &hio_##name##_list_lock[(ncomp)]); \ 191 } \ 192 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 193 hio_next[(ncomp)]); \ 194 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 195} while (0) 196#define QUEUE_TAKE2(hio, name) do { \ 197 mtx_lock(&hio_##name##_list_lock); \ 198 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 199 cv_wait(&hio_##name##_list_cond, \ 200 &hio_##name##_list_lock); \ 201 } \ 202 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 203 mtx_unlock(&hio_##name##_list_lock); \ 204} while (0) 205 206#define SYNCREQ(hio) do { \ 207 (hio)->hio_ggio.gctl_unit = -1; \ 208 (hio)->hio_ggio.gctl_seq = 1; \ 209} while (0) 210#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 211#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 212#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 213 214static struct hast_resource *gres; 215 216static pthread_mutex_t range_lock; 217static struct rangelocks *range_regular; 218static bool range_regular_wait; 219static pthread_cond_t range_regular_cond; 220static struct rangelocks *range_sync; 221static bool range_sync_wait; 222static pthread_cond_t range_sync_cond; 223 224static void *ggate_recv_thread(void *arg); 225static void *local_send_thread(void *arg); 226static void *remote_send_thread(void *arg); 227static void *remote_recv_thread(void *arg); 228static void *ggate_send_thread(void *arg); 229static void *sync_thread(void *arg); 230static void *guard_thread(void *arg); 231 232static void sighandler(int sig); 233 234static void 235cleanup(struct hast_resource *res) 236{ 237 int rerrno; 238 239 /* Remember errno. */ 240 rerrno = errno; 241 242 /* 243 * Close descriptor to /dev/hast/<name> 244 * to work-around race in the kernel. 245 */ 246 close(res->hr_localfd); 247 248 /* Destroy ggate provider if we created one. */ 249 if (res->hr_ggateunit >= 0) { 250 struct g_gate_ctl_destroy ggiod; 251 252 ggiod.gctl_version = G_GATE_VERSION; 253 ggiod.gctl_unit = res->hr_ggateunit; 254 ggiod.gctl_force = 1; 255 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 256 pjdlog_warning("Unable to destroy hast/%s device", 257 res->hr_provname); 258 } 259 res->hr_ggateunit = -1; 260 } 261 262 /* Restore errno. */ 263 errno = rerrno; 264} 265 266static void 267primary_exit(int exitcode, const char *fmt, ...) 268{ 269 va_list ap; 270 271 assert(exitcode != EX_OK); 272 va_start(ap, fmt); 273 pjdlogv_errno(LOG_ERR, fmt, ap); 274 va_end(ap); 275 cleanup(gres); 276 exit(exitcode); 277} 278 279static void 280primary_exitx(int exitcode, const char *fmt, ...) 281{ 282 va_list ap; 283 284 va_start(ap, fmt); 285 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 286 va_end(ap); 287 cleanup(gres); 288 exit(exitcode); 289} 290 291static int 292hast_activemap_flush(struct hast_resource *res) 293{ 294 const unsigned char *buf; 295 size_t size; 296 297 buf = activemap_bitmap(res->hr_amp, &size); 298 assert(buf != NULL); 299 assert((size % res->hr_local_sectorsize) == 0); 300 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 301 (ssize_t)size) { 302 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 303 "Unable to flush activemap to disk")); 304 return (-1); 305 } 306 return (0); 307} 308 309static bool 310real_remote(const struct hast_resource *res) 311{ 312 313 return (strcmp(res->hr_remoteaddr, "none") != 0); 314} 315 316static void 317init_environment(struct hast_resource *res __unused) 318{ 319 struct hio *hio; 320 unsigned int ii, ncomps; 321 322 /* 323 * In the future it might be per-resource value. 324 */ 325 ncomps = HAST_NCOMPONENTS; 326 327 /* 328 * Allocate memory needed by lists. 329 */ 330 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 331 if (hio_send_list == NULL) { 332 primary_exitx(EX_TEMPFAIL, 333 "Unable to allocate %zu bytes of memory for send lists.", 334 sizeof(hio_send_list[0]) * ncomps); 335 } 336 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 337 if (hio_send_list_lock == NULL) { 338 primary_exitx(EX_TEMPFAIL, 339 "Unable to allocate %zu bytes of memory for send list locks.", 340 sizeof(hio_send_list_lock[0]) * ncomps); 341 } 342 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 343 if (hio_send_list_cond == NULL) { 344 primary_exitx(EX_TEMPFAIL, 345 "Unable to allocate %zu bytes of memory for send list condition variables.", 346 sizeof(hio_send_list_cond[0]) * ncomps); 347 } 348 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 349 if (hio_recv_list == NULL) { 350 primary_exitx(EX_TEMPFAIL, 351 "Unable to allocate %zu bytes of memory for recv lists.", 352 sizeof(hio_recv_list[0]) * ncomps); 353 } 354 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 355 if (hio_recv_list_lock == NULL) { 356 primary_exitx(EX_TEMPFAIL, 357 "Unable to allocate %zu bytes of memory for recv list locks.", 358 sizeof(hio_recv_list_lock[0]) * ncomps); 359 } 360 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 361 if (hio_recv_list_cond == NULL) { 362 primary_exitx(EX_TEMPFAIL, 363 "Unable to allocate %zu bytes of memory for recv list condition variables.", 364 sizeof(hio_recv_list_cond[0]) * ncomps); 365 } 366 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 367 if (hio_remote_lock == NULL) { 368 primary_exitx(EX_TEMPFAIL, 369 "Unable to allocate %zu bytes of memory for remote connections locks.", 370 sizeof(hio_remote_lock[0]) * ncomps); 371 } 372 373 /* 374 * Initialize lists, their locks and theirs condition variables. 375 */ 376 TAILQ_INIT(&hio_free_list); 377 mtx_init(&hio_free_list_lock); 378 cv_init(&hio_free_list_cond); 379 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 380 TAILQ_INIT(&hio_send_list[ii]); 381 mtx_init(&hio_send_list_lock[ii]); 382 cv_init(&hio_send_list_cond[ii]); 383 TAILQ_INIT(&hio_recv_list[ii]); 384 mtx_init(&hio_recv_list_lock[ii]); 385 cv_init(&hio_recv_list_cond[ii]); 386 rw_init(&hio_remote_lock[ii]); 387 } 388 TAILQ_INIT(&hio_done_list); 389 mtx_init(&hio_done_list_lock); 390 cv_init(&hio_done_list_cond); 391 mtx_init(&hio_guard_lock); 392 cv_init(&hio_guard_cond); 393 mtx_init(&metadata_lock); 394 395 /* 396 * Allocate requests pool and initialize requests. 397 */ 398 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 399 hio = malloc(sizeof(*hio)); 400 if (hio == NULL) { 401 primary_exitx(EX_TEMPFAIL, 402 "Unable to allocate %zu bytes of memory for hio request.", 403 sizeof(*hio)); 404 } 405 hio->hio_countdown = 0; 406 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 407 if (hio->hio_errors == NULL) { 408 primary_exitx(EX_TEMPFAIL, 409 "Unable allocate %zu bytes of memory for hio errors.", 410 sizeof(hio->hio_errors[0]) * ncomps); 411 } 412 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 413 if (hio->hio_next == NULL) { 414 primary_exitx(EX_TEMPFAIL, 415 "Unable allocate %zu bytes of memory for hio_next field.", 416 sizeof(hio->hio_next[0]) * ncomps); 417 } 418 hio->hio_ggio.gctl_version = G_GATE_VERSION; 419 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 420 if (hio->hio_ggio.gctl_data == NULL) { 421 primary_exitx(EX_TEMPFAIL, 422 "Unable to allocate %zu bytes of memory for gctl_data.", 423 MAXPHYS); 424 } 425 hio->hio_ggio.gctl_length = MAXPHYS; 426 hio->hio_ggio.gctl_error = 0; 427 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 428 } 429 430 /* 431 * Turn on signals handling. 432 */ 433 signal(SIGINT, sighandler); 434 signal(SIGTERM, sighandler); 435 signal(SIGHUP, sighandler); 436} 437 438static void 439init_local(struct hast_resource *res) 440{ 441 unsigned char *buf; 442 size_t mapsize; 443 444 if (metadata_read(res, true) < 0) 445 exit(EX_NOINPUT); 446 mtx_init(&res->hr_amp_lock); 447 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 448 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 449 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 450 } 451 mtx_init(&range_lock); 452 cv_init(&range_regular_cond); 453 if (rangelock_init(&range_regular) < 0) 454 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 455 cv_init(&range_sync_cond); 456 if (rangelock_init(&range_sync) < 0) 457 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 458 mapsize = activemap_ondisk_size(res->hr_amp); 459 buf = calloc(1, mapsize); 460 if (buf == NULL) { 461 primary_exitx(EX_TEMPFAIL, 462 "Unable to allocate buffer for activemap."); 463 } 464 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 465 (ssize_t)mapsize) { 466 primary_exit(EX_NOINPUT, "Unable to read activemap"); 467 } 468 activemap_copyin(res->hr_amp, buf, mapsize); 469 free(buf); 470 if (res->hr_resuid != 0) 471 return; 472 /* 473 * We're using provider for the first time, so we have to generate 474 * resource unique identifier and initialize local and remote counts. 475 */ 476 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 477 res->hr_primary_localcnt = 1; 478 res->hr_primary_remotecnt = 0; 479 if (metadata_write(res) < 0) 480 exit(EX_NOINPUT); 481} 482 483static bool 484init_remote(struct hast_resource *res, struct proto_conn **inp, 485 struct proto_conn **outp) 486{ 487 struct proto_conn *in, *out; 488 struct nv *nvout, *nvin; 489 const unsigned char *token; 490 unsigned char *map; 491 const char *errmsg; 492 int32_t extentsize; 493 int64_t datasize; 494 uint32_t mapsize; 495 size_t size; 496 497 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 498 assert(real_remote(res)); 499 500 in = out = NULL; 501 502 /* Prepare outgoing connection with remote node. */ 503 if (proto_client(res->hr_remoteaddr, &out) < 0) { 504 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 505 res->hr_remoteaddr); 506 } 507 /* Try to connect, but accept failure. */ 508 if (proto_connect(out) < 0) { 509 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 510 res->hr_remoteaddr); 511 goto close; 512 } 513 /* Error in setting timeout is not critical, but why should it fail? */ 514 if (proto_timeout(out, res->hr_timeout) < 0) 515 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 516 /* 517 * First handshake step. 518 * Setup outgoing connection with remote node. 519 */ 520 nvout = nv_alloc(); 521 nv_add_string(nvout, res->hr_name, "resource"); 522 if (nv_error(nvout) != 0) { 523 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 524 "Unable to allocate header for connection with %s", 525 res->hr_remoteaddr); 526 nv_free(nvout); 527 goto close; 528 } 529 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 530 pjdlog_errno(LOG_WARNING, 531 "Unable to send handshake header to %s", 532 res->hr_remoteaddr); 533 nv_free(nvout); 534 goto close; 535 } 536 nv_free(nvout); 537 if (hast_proto_recv_hdr(out, &nvin) < 0) { 538 pjdlog_errno(LOG_WARNING, 539 "Unable to receive handshake header from %s", 540 res->hr_remoteaddr); 541 goto close; 542 } 543 errmsg = nv_get_string(nvin, "errmsg"); 544 if (errmsg != NULL) { 545 pjdlog_warning("%s", errmsg); 546 nv_free(nvin); 547 goto close; 548 } 549 token = nv_get_uint8_array(nvin, &size, "token"); 550 if (token == NULL) { 551 pjdlog_warning("Handshake header from %s has no 'token' field.", 552 res->hr_remoteaddr); 553 nv_free(nvin); 554 goto close; 555 } 556 if (size != sizeof(res->hr_token)) { 557 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 558 res->hr_remoteaddr, size, sizeof(res->hr_token)); 559 nv_free(nvin); 560 goto close; 561 } 562 bcopy(token, res->hr_token, sizeof(res->hr_token)); 563 nv_free(nvin); 564 565 /* 566 * Second handshake step. 567 * Setup incoming connection with remote node. 568 */ 569 if (proto_client(res->hr_remoteaddr, &in) < 0) { 570 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 571 res->hr_remoteaddr); 572 } 573 /* Try to connect, but accept failure. */ 574 if (proto_connect(in) < 0) { 575 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 576 res->hr_remoteaddr); 577 goto close; 578 } 579 /* Error in setting timeout is not critical, but why should it fail? */ 580 if (proto_timeout(in, res->hr_timeout) < 0) 581 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 582 nvout = nv_alloc(); 583 nv_add_string(nvout, res->hr_name, "resource"); 584 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 585 "token"); 586 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 587 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 588 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 589 if (nv_error(nvout) != 0) { 590 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 591 "Unable to allocate header for connection with %s", 592 res->hr_remoteaddr); 593 nv_free(nvout); 594 goto close; 595 } 596 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 597 pjdlog_errno(LOG_WARNING, 598 "Unable to send handshake header to %s", 599 res->hr_remoteaddr); 600 nv_free(nvout); 601 goto close; 602 } 603 nv_free(nvout); 604 if (hast_proto_recv_hdr(out, &nvin) < 0) { 605 pjdlog_errno(LOG_WARNING, 606 "Unable to receive handshake header from %s", 607 res->hr_remoteaddr); 608 goto close; 609 } 610 errmsg = nv_get_string(nvin, "errmsg"); 611 if (errmsg != NULL) { 612 pjdlog_warning("%s", errmsg); 613 nv_free(nvin); 614 goto close; 615 } 616 datasize = nv_get_int64(nvin, "datasize"); 617 if (datasize != res->hr_datasize) { 618 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 619 (intmax_t)res->hr_datasize, (intmax_t)datasize); 620 nv_free(nvin); 621 goto close; 622 } 623 extentsize = nv_get_int32(nvin, "extentsize"); 624 if (extentsize != res->hr_extentsize) { 625 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 626 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 627 nv_free(nvin); 628 goto close; 629 } 630 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 631 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 632 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 633 map = NULL; 634 mapsize = nv_get_uint32(nvin, "mapsize"); 635 if (mapsize > 0) { 636 map = malloc(mapsize); 637 if (map == NULL) { 638 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 639 (uintmax_t)mapsize); 640 nv_free(nvin); 641 goto close; 642 } 643 /* 644 * Remote node have some dirty extents on its own, lets 645 * download its activemap. 646 */ 647 if (hast_proto_recv_data(res, out, nvin, map, 648 mapsize) < 0) { 649 pjdlog_errno(LOG_ERR, 650 "Unable to receive remote activemap"); 651 nv_free(nvin); 652 free(map); 653 goto close; 654 } 655 /* 656 * Merge local and remote bitmaps. 657 */ 658 activemap_merge(res->hr_amp, map, mapsize); 659 free(map); 660 /* 661 * Now that we merged bitmaps from both nodes, flush it to the 662 * disk before we start to synchronize. 663 */ 664 (void)hast_activemap_flush(res); 665 } 666 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 667 if (inp != NULL && outp != NULL) { 668 *inp = in; 669 *outp = out; 670 } else { 671 res->hr_remotein = in; 672 res->hr_remoteout = out; 673 } 674 return (true); 675close: 676 proto_close(out); 677 if (in != NULL) 678 proto_close(in); 679 return (false); 680} 681 682static void 683sync_start(void) 684{ 685 686 mtx_lock(&sync_lock); 687 sync_inprogress = true; 688 mtx_unlock(&sync_lock); 689 cv_signal(&sync_cond); 690} 691 692static void 693sync_stop(void) 694{ 695 696 mtx_lock(&sync_lock); 697 if (sync_inprogress) 698 sync_inprogress = false; 699 mtx_unlock(&sync_lock); 700} 701 702static void 703init_ggate(struct hast_resource *res) 704{ 705 struct g_gate_ctl_create ggiocreate; 706 struct g_gate_ctl_cancel ggiocancel; 707 708 /* 709 * We communicate with ggate via /dev/ggctl. Open it. 710 */ 711 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 712 if (res->hr_ggatefd < 0) 713 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 714 /* 715 * Create provider before trying to connect, as connection failure 716 * is not critical, but may take some time. 717 */ 718 ggiocreate.gctl_version = G_GATE_VERSION; 719 ggiocreate.gctl_mediasize = res->hr_datasize; 720 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 721 ggiocreate.gctl_flags = 0; 722 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 723 ggiocreate.gctl_timeout = 0; 724 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 725 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 726 res->hr_provname); 727 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 728 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 729 pjdlog_info("Device hast/%s created.", res->hr_provname); 730 res->hr_ggateunit = ggiocreate.gctl_unit; 731 return; 732 } 733 if (errno != EEXIST) { 734 primary_exit(EX_OSERR, "Unable to create hast/%s device", 735 res->hr_provname); 736 } 737 pjdlog_debug(1, 738 "Device hast/%s already exists, we will try to take it over.", 739 res->hr_provname); 740 /* 741 * If we received EEXIST, we assume that the process who created the 742 * provider died and didn't clean up. In that case we will start from 743 * where he left of. 744 */ 745 ggiocancel.gctl_version = G_GATE_VERSION; 746 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 747 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 748 res->hr_provname); 749 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 750 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 751 res->hr_ggateunit = ggiocancel.gctl_unit; 752 return; 753 } 754 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 755 res->hr_provname); 756} 757 758void 759hastd_primary(struct hast_resource *res) 760{ 761 pthread_t td; 762 pid_t pid; 763 int error; 764 765 gres = res; 766 767 /* 768 * Create communication channel between parent and child. 769 */ 770 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 771 KEEP_ERRNO((void)pidfile_remove(pfh)); 772 primary_exit(EX_OSERR, 773 "Unable to create control sockets between parent and child"); 774 } 775 776 pid = fork(); 777 if (pid < 0) { 778 KEEP_ERRNO((void)pidfile_remove(pfh)); 779 primary_exit(EX_TEMPFAIL, "Unable to fork"); 780 } 781 782 if (pid > 0) { 783 /* This is parent. */ 784 res->hr_workerpid = pid; 785 return; 786 } 787 (void)pidfile_close(pfh); 788 789 setproctitle("%s (primary)", res->hr_name); 790 791 signal(SIGHUP, SIG_DFL); 792 signal(SIGCHLD, SIG_DFL); 793 794 init_local(res); 795 if (real_remote(res) && init_remote(res, NULL, NULL)) 796 sync_start(); 797 init_ggate(res); 798 init_environment(res); 799 error = pthread_create(&td, NULL, ggate_recv_thread, res); 800 assert(error == 0); 801 error = pthread_create(&td, NULL, local_send_thread, res); 802 assert(error == 0); 803 error = pthread_create(&td, NULL, remote_send_thread, res); 804 assert(error == 0); 805 error = pthread_create(&td, NULL, remote_recv_thread, res); 806 assert(error == 0); 807 error = pthread_create(&td, NULL, ggate_send_thread, res); 808 assert(error == 0); 809 error = pthread_create(&td, NULL, sync_thread, res); 810 assert(error == 0); 811 error = pthread_create(&td, NULL, ctrl_thread, res); 812 assert(error == 0); 813 (void)guard_thread(res); 814} 815 816static void 817reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 818{ 819 char msg[1024]; 820 va_list ap; 821 int len; 822 823 va_start(ap, fmt); 824 len = vsnprintf(msg, sizeof(msg), fmt, ap); 825 va_end(ap); 826 if ((size_t)len < sizeof(msg)) { 827 switch (ggio->gctl_cmd) { 828 case BIO_READ: 829 (void)snprintf(msg + len, sizeof(msg) - len, 830 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 831 (uintmax_t)ggio->gctl_length); 832 break; 833 case BIO_DELETE: 834 (void)snprintf(msg + len, sizeof(msg) - len, 835 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 836 (uintmax_t)ggio->gctl_length); 837 break; 838 case BIO_FLUSH: 839 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 840 break; 841 case BIO_WRITE: 842 (void)snprintf(msg + len, sizeof(msg) - len, 843 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 844 (uintmax_t)ggio->gctl_length); 845 break; 846 default: 847 (void)snprintf(msg + len, sizeof(msg) - len, 848 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 849 break; 850 } 851 } 852 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 853} 854 855static void 856remote_close(struct hast_resource *res, int ncomp) 857{ 858 859 rw_wlock(&hio_remote_lock[ncomp]); 860 /* 861 * A race is possible between dropping rlock and acquiring wlock - 862 * another thread can close connection in-between. 863 */ 864 if (!ISCONNECTED(res, ncomp)) { 865 assert(res->hr_remotein == NULL); 866 assert(res->hr_remoteout == NULL); 867 rw_unlock(&hio_remote_lock[ncomp]); 868 return; 869 } 870 871 assert(res->hr_remotein != NULL); 872 assert(res->hr_remoteout != NULL); 873 874 pjdlog_debug(2, "Closing incoming connection to %s.", 875 res->hr_remoteaddr); 876 proto_close(res->hr_remotein); 877 res->hr_remotein = NULL; 878 pjdlog_debug(2, "Closing outgoing connection to %s.", 879 res->hr_remoteaddr); 880 proto_close(res->hr_remoteout); 881 res->hr_remoteout = NULL; 882 883 rw_unlock(&hio_remote_lock[ncomp]); 884 885 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 886 887 /* 888 * Stop synchronization if in-progress. 889 */ 890 sync_stop(); 891 892 /* 893 * Wake up guard thread (if we are not called from within guard thread), 894 * so it can immediately start reconnect. 895 */ 896 if (!mtx_owned(&hio_guard_lock)) { 897 mtx_lock(&hio_guard_lock); 898 cv_signal(&hio_guard_cond); 899 mtx_unlock(&hio_guard_lock); 900 } 901} 902 903/* 904 * Thread receives ggate I/O requests from the kernel and passes them to 905 * appropriate threads: 906 * WRITE - always goes to both local_send and remote_send threads 907 * READ (when the block is up-to-date on local component) - 908 * only local_send thread 909 * READ (when the block isn't up-to-date on local component) - 910 * only remote_send thread 911 * DELETE - always goes to both local_send and remote_send threads 912 * FLUSH - always goes to both local_send and remote_send threads 913 */ 914static void * 915ggate_recv_thread(void *arg) 916{ 917 struct hast_resource *res = arg; 918 struct g_gate_ctl_io *ggio; 919 struct hio *hio; 920 unsigned int ii, ncomp, ncomps; 921 int error; 922 923 ncomps = HAST_NCOMPONENTS; 924 925 for (;;) { 926 pjdlog_debug(2, "ggate_recv: Taking free request."); 927 QUEUE_TAKE2(hio, free); 928 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 929 ggio = &hio->hio_ggio; 930 ggio->gctl_unit = res->hr_ggateunit; 931 ggio->gctl_length = MAXPHYS; 932 ggio->gctl_error = 0; 933 pjdlog_debug(2, 934 "ggate_recv: (%p) Waiting for request from the kernel.", 935 hio); 936 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 937 if (sigexit_received) 938 pthread_exit(NULL); 939 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 940 } 941 error = ggio->gctl_error; 942 switch (error) { 943 case 0: 944 break; 945 case ECANCELED: 946 /* Exit gracefully. */ 947 if (!sigexit_received) { 948 pjdlog_debug(2, 949 "ggate_recv: (%p) Received cancel from the kernel.", 950 hio); 951 pjdlog_info("Received cancel from the kernel, exiting."); 952 } 953 pthread_exit(NULL); 954 case ENOMEM: 955 /* 956 * Buffer too small? Impossible, we allocate MAXPHYS 957 * bytes - request can't be bigger than that. 958 */ 959 /* FALLTHROUGH */ 960 case ENXIO: 961 default: 962 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 963 strerror(error)); 964 } 965 for (ii = 0; ii < ncomps; ii++) 966 hio->hio_errors[ii] = EINVAL; 967 reqlog(LOG_DEBUG, 2, ggio, 968 "ggate_recv: (%p) Request received from the kernel: ", 969 hio); 970 /* 971 * Inform all components about new write request. 972 * For read request prefer local component unless the given 973 * range is out-of-date, then use remote component. 974 */ 975 switch (ggio->gctl_cmd) { 976 case BIO_READ: 977 pjdlog_debug(2, 978 "ggate_recv: (%p) Moving request to the send queue.", 979 hio); 980 refcount_init(&hio->hio_countdown, 1); 981 mtx_lock(&metadata_lock); 982 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 983 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 984 /* 985 * This range is up-to-date on local component, 986 * so handle request locally. 987 */ 988 /* Local component is 0 for now. */ 989 ncomp = 0; 990 } else /* if (res->hr_syncsrc == 991 HAST_SYNCSRC_SECONDARY) */ { 992 assert(res->hr_syncsrc == 993 HAST_SYNCSRC_SECONDARY); 994 /* 995 * This range is out-of-date on local component, 996 * so send request to the remote node. 997 */ 998 /* Remote component is 1 for now. */ 999 ncomp = 1; 1000 } 1001 mtx_unlock(&metadata_lock); 1002 QUEUE_INSERT1(hio, send, ncomp); 1003 break; 1004 case BIO_WRITE: 1005 for (;;) { 1006 mtx_lock(&range_lock); 1007 if (rangelock_islocked(range_sync, 1008 ggio->gctl_offset, ggio->gctl_length)) { 1009 pjdlog_debug(2, 1010 "regular: Range offset=%jd length=%zu locked.", 1011 (intmax_t)ggio->gctl_offset, 1012 (size_t)ggio->gctl_length); 1013 range_regular_wait = true; 1014 cv_wait(&range_regular_cond, &range_lock); 1015 range_regular_wait = false; 1016 mtx_unlock(&range_lock); 1017 continue; 1018 } 1019 if (rangelock_add(range_regular, 1020 ggio->gctl_offset, ggio->gctl_length) < 0) { 1021 mtx_unlock(&range_lock); 1022 pjdlog_debug(2, 1023 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1024 (intmax_t)ggio->gctl_offset, 1025 (size_t)ggio->gctl_length); 1026 sleep(1); 1027 continue; 1028 } 1029 mtx_unlock(&range_lock); 1030 break; 1031 } 1032 mtx_lock(&res->hr_amp_lock); 1033 if (activemap_write_start(res->hr_amp, 1034 ggio->gctl_offset, ggio->gctl_length)) { 1035 (void)hast_activemap_flush(res); 1036 } 1037 mtx_unlock(&res->hr_amp_lock); 1038 /* FALLTHROUGH */ 1039 case BIO_DELETE: 1040 case BIO_FLUSH: 1041 pjdlog_debug(2, 1042 "ggate_recv: (%p) Moving request to the send queues.", 1043 hio); 1044 refcount_init(&hio->hio_countdown, ncomps); 1045 for (ii = 0; ii < ncomps; ii++) 1046 QUEUE_INSERT1(hio, send, ii); 1047 break; 1048 } 1049 } 1050 /* NOTREACHED */ 1051 return (NULL); 1052} 1053 1054/* 1055 * Thread reads from or writes to local component. 1056 * If local read fails, it redirects it to remote_send thread. 1057 */ 1058static void * 1059local_send_thread(void *arg) 1060{ 1061 struct hast_resource *res = arg; 1062 struct g_gate_ctl_io *ggio; 1063 struct hio *hio; 1064 unsigned int ncomp, rncomp; 1065 ssize_t ret; 1066 1067 /* Local component is 0 for now. */ 1068 ncomp = 0; 1069 /* Remote component is 1 for now. */ 1070 rncomp = 1; 1071 1072 for (;;) { 1073 pjdlog_debug(2, "local_send: Taking request."); 1074 QUEUE_TAKE1(hio, send, ncomp); 1075 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1076 ggio = &hio->hio_ggio; 1077 switch (ggio->gctl_cmd) { 1078 case BIO_READ: 1079 ret = pread(res->hr_localfd, ggio->gctl_data, 1080 ggio->gctl_length, 1081 ggio->gctl_offset + res->hr_localoff); 1082 if (ret == ggio->gctl_length) 1083 hio->hio_errors[ncomp] = 0; 1084 else { 1085 /* 1086 * If READ failed, try to read from remote node. 1087 */ 1088 QUEUE_INSERT1(hio, send, rncomp); 1089 continue; 1090 } 1091 break; 1092 case BIO_WRITE: 1093 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1094 ggio->gctl_length, 1095 ggio->gctl_offset + res->hr_localoff); 1096 if (ret < 0) 1097 hio->hio_errors[ncomp] = errno; 1098 else if (ret != ggio->gctl_length) 1099 hio->hio_errors[ncomp] = EIO; 1100 else 1101 hio->hio_errors[ncomp] = 0; 1102 break; 1103 case BIO_DELETE: 1104 ret = g_delete(res->hr_localfd, 1105 ggio->gctl_offset + res->hr_localoff, 1106 ggio->gctl_length); 1107 if (ret < 0) 1108 hio->hio_errors[ncomp] = errno; 1109 else 1110 hio->hio_errors[ncomp] = 0; 1111 break; 1112 case BIO_FLUSH: 1113 ret = g_flush(res->hr_localfd); 1114 if (ret < 0) 1115 hio->hio_errors[ncomp] = errno; 1116 else 1117 hio->hio_errors[ncomp] = 0; 1118 break; 1119 } 1120 if (refcount_release(&hio->hio_countdown)) { 1121 if (ISSYNCREQ(hio)) { 1122 mtx_lock(&sync_lock); 1123 SYNCREQDONE(hio); 1124 mtx_unlock(&sync_lock); 1125 cv_signal(&sync_cond); 1126 } else { 1127 pjdlog_debug(2, 1128 "local_send: (%p) Moving request to the done queue.", 1129 hio); 1130 QUEUE_INSERT2(hio, done); 1131 } 1132 } 1133 } 1134 /* NOTREACHED */ 1135 return (NULL); 1136} 1137 1138/* 1139 * Thread sends request to secondary node. 1140 */ 1141static void * 1142remote_send_thread(void *arg) 1143{ 1144 struct hast_resource *res = arg; 1145 struct g_gate_ctl_io *ggio; 1146 struct hio *hio; 1147 struct nv *nv; 1148 unsigned int ncomp; 1149 bool wakeup; 1150 uint64_t offset, length; 1151 uint8_t cmd; 1152 void *data; 1153 1154 /* Remote component is 1 for now. */ 1155 ncomp = 1; 1156 1157 for (;;) { 1158 pjdlog_debug(2, "remote_send: Taking request."); 1159 QUEUE_TAKE1(hio, send, ncomp); 1160 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1161 ggio = &hio->hio_ggio; 1162 switch (ggio->gctl_cmd) { 1163 case BIO_READ: 1164 cmd = HIO_READ; 1165 data = NULL; 1166 offset = ggio->gctl_offset; 1167 length = ggio->gctl_length; 1168 break; 1169 case BIO_WRITE: 1170 cmd = HIO_WRITE; 1171 data = ggio->gctl_data; 1172 offset = ggio->gctl_offset; 1173 length = ggio->gctl_length; 1174 break; 1175 case BIO_DELETE: 1176 cmd = HIO_DELETE; 1177 data = NULL; 1178 offset = ggio->gctl_offset; 1179 length = ggio->gctl_length; 1180 break; 1181 case BIO_FLUSH: 1182 cmd = HIO_FLUSH; 1183 data = NULL; 1184 offset = 0; 1185 length = 0; 1186 break; 1187 default: 1188 assert(!"invalid condition"); 1189 abort(); 1190 } 1191 nv = nv_alloc(); 1192 nv_add_uint8(nv, cmd, "cmd"); 1193 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1194 nv_add_uint64(nv, offset, "offset"); 1195 nv_add_uint64(nv, length, "length"); 1196 if (nv_error(nv) != 0) { 1197 hio->hio_errors[ncomp] = nv_error(nv); 1198 pjdlog_debug(2, 1199 "remote_send: (%p) Unable to prepare header to send.", 1200 hio); 1201 reqlog(LOG_ERR, 0, ggio, 1202 "Unable to prepare header to send (%s): ", 1203 strerror(nv_error(nv))); 1204 /* Move failed request immediately to the done queue. */ 1205 goto done_queue; 1206 } 1207 pjdlog_debug(2, 1208 "remote_send: (%p) Moving request to the recv queue.", 1209 hio); 1210 /* 1211 * Protect connection from disappearing. 1212 */ 1213 rw_rlock(&hio_remote_lock[ncomp]); 1214 if (!ISCONNECTED(res, ncomp)) { 1215 rw_unlock(&hio_remote_lock[ncomp]); 1216 hio->hio_errors[ncomp] = ENOTCONN; 1217 goto done_queue; 1218 } 1219 /* 1220 * Move the request to recv queue before sending it, because 1221 * in different order we can get reply before we move request 1222 * to recv queue. 1223 */ 1224 mtx_lock(&hio_recv_list_lock[ncomp]); 1225 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1226 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1227 mtx_unlock(&hio_recv_list_lock[ncomp]); 1228 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1229 data != NULL ? length : 0) < 0) { 1230 hio->hio_errors[ncomp] = errno; 1231 rw_unlock(&hio_remote_lock[ncomp]); 1232 remote_close(res, ncomp); 1233 pjdlog_debug(2, 1234 "remote_send: (%p) Unable to send request.", hio); 1235 reqlog(LOG_ERR, 0, ggio, 1236 "Unable to send request (%s): ", 1237 strerror(hio->hio_errors[ncomp])); 1238 /* 1239 * Take request back from the receive queue and move 1240 * it immediately to the done queue. 1241 */ 1242 mtx_lock(&hio_recv_list_lock[ncomp]); 1243 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1244 mtx_unlock(&hio_recv_list_lock[ncomp]); 1245 goto done_queue; 1246 } 1247 rw_unlock(&hio_remote_lock[ncomp]); 1248 nv_free(nv); 1249 if (wakeup) 1250 cv_signal(&hio_recv_list_cond[ncomp]); 1251 continue; 1252done_queue: 1253 nv_free(nv); 1254 if (ISSYNCREQ(hio)) { 1255 if (!refcount_release(&hio->hio_countdown)) 1256 continue; 1257 mtx_lock(&sync_lock); 1258 SYNCREQDONE(hio); 1259 mtx_unlock(&sync_lock); 1260 cv_signal(&sync_cond); 1261 continue; 1262 } 1263 if (ggio->gctl_cmd == BIO_WRITE) { 1264 mtx_lock(&res->hr_amp_lock); 1265 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1266 ggio->gctl_length)) { 1267 (void)hast_activemap_flush(res); 1268 } 1269 mtx_unlock(&res->hr_amp_lock); 1270 } 1271 if (!refcount_release(&hio->hio_countdown)) 1272 continue; 1273 pjdlog_debug(2, 1274 "remote_send: (%p) Moving request to the done queue.", 1275 hio); 1276 QUEUE_INSERT2(hio, done); 1277 } 1278 /* NOTREACHED */ 1279 return (NULL); 1280} 1281 1282/* 1283 * Thread receives answer from secondary node and passes it to ggate_send 1284 * thread. 1285 */ 1286static void * 1287remote_recv_thread(void *arg) 1288{ 1289 struct hast_resource *res = arg; 1290 struct g_gate_ctl_io *ggio; 1291 struct hio *hio; 1292 struct nv *nv; 1293 unsigned int ncomp; 1294 uint64_t seq; 1295 int error; 1296 1297 /* Remote component is 1 for now. */ 1298 ncomp = 1; 1299 1300 for (;;) { 1301 /* Wait until there is anything to receive. */ 1302 mtx_lock(&hio_recv_list_lock[ncomp]); 1303 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1304 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1305 cv_wait(&hio_recv_list_cond[ncomp], 1306 &hio_recv_list_lock[ncomp]); 1307 } 1308 mtx_unlock(&hio_recv_list_lock[ncomp]); 1309 rw_rlock(&hio_remote_lock[ncomp]); 1310 if (!ISCONNECTED(res, ncomp)) { 1311 rw_unlock(&hio_remote_lock[ncomp]); 1312 /* 1313 * Connection is dead, so move all pending requests to 1314 * the done queue (one-by-one). 1315 */ 1316 mtx_lock(&hio_recv_list_lock[ncomp]); 1317 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1318 assert(hio != NULL); 1319 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1320 hio_next[ncomp]); 1321 mtx_unlock(&hio_recv_list_lock[ncomp]); 1322 goto done_queue; 1323 } 1324 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1325 pjdlog_errno(LOG_ERR, 1326 "Unable to receive reply header"); 1327 rw_unlock(&hio_remote_lock[ncomp]); 1328 remote_close(res, ncomp); 1329 continue; 1330 } 1331 rw_unlock(&hio_remote_lock[ncomp]); 1332 seq = nv_get_uint64(nv, "seq"); 1333 if (seq == 0) { 1334 pjdlog_error("Header contains no 'seq' field."); 1335 nv_free(nv); 1336 continue; 1337 } 1338 mtx_lock(&hio_recv_list_lock[ncomp]); 1339 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1340 if (hio->hio_ggio.gctl_seq == seq) { 1341 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1342 hio_next[ncomp]); 1343 break; 1344 } 1345 } 1346 mtx_unlock(&hio_recv_list_lock[ncomp]); 1347 if (hio == NULL) { 1348 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1349 (uintmax_t)seq); 1350 nv_free(nv); 1351 continue; 1352 } 1353 error = nv_get_int16(nv, "error"); 1354 if (error != 0) { 1355 /* Request failed on remote side. */ 1356 hio->hio_errors[ncomp] = 0; 1357 nv_free(nv); 1358 goto done_queue; 1359 } 1360 ggio = &hio->hio_ggio; 1361 switch (ggio->gctl_cmd) { 1362 case BIO_READ: 1363 rw_rlock(&hio_remote_lock[ncomp]); 1364 if (!ISCONNECTED(res, ncomp)) { 1365 rw_unlock(&hio_remote_lock[ncomp]); 1366 nv_free(nv); 1367 goto done_queue; 1368 } 1369 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1370 ggio->gctl_data, ggio->gctl_length) < 0) { 1371 hio->hio_errors[ncomp] = errno; 1372 pjdlog_errno(LOG_ERR, 1373 "Unable to receive reply data"); 1374 rw_unlock(&hio_remote_lock[ncomp]); 1375 nv_free(nv); 1376 remote_close(res, ncomp); 1377 goto done_queue; 1378 } 1379 rw_unlock(&hio_remote_lock[ncomp]); 1380 break; 1381 case BIO_WRITE: 1382 case BIO_DELETE: 1383 case BIO_FLUSH: 1384 break; 1385 default: 1386 assert(!"invalid condition"); 1387 abort(); 1388 } 1389 hio->hio_errors[ncomp] = 0; 1390 nv_free(nv); 1391done_queue: 1392 if (refcount_release(&hio->hio_countdown)) { 1393 if (ISSYNCREQ(hio)) { 1394 mtx_lock(&sync_lock); 1395 SYNCREQDONE(hio); 1396 mtx_unlock(&sync_lock); 1397 cv_signal(&sync_cond); 1398 } else { 1399 pjdlog_debug(2, 1400 "remote_recv: (%p) Moving request to the done queue.", 1401 hio); 1402 QUEUE_INSERT2(hio, done); 1403 } 1404 } 1405 } 1406 /* NOTREACHED */ 1407 return (NULL); 1408} 1409 1410/* 1411 * Thread sends answer to the kernel. 1412 */ 1413static void * 1414ggate_send_thread(void *arg) 1415{ 1416 struct hast_resource *res = arg; 1417 struct g_gate_ctl_io *ggio; 1418 struct hio *hio; 1419 unsigned int ii, ncomp, ncomps; 1420 1421 ncomps = HAST_NCOMPONENTS; 1422 1423 for (;;) { 1424 pjdlog_debug(2, "ggate_send: Taking request."); 1425 QUEUE_TAKE2(hio, done); 1426 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1427 ggio = &hio->hio_ggio; 1428 for (ii = 0; ii < ncomps; ii++) { 1429 if (hio->hio_errors[ii] == 0) { 1430 /* 1431 * One successful request is enough to declare 1432 * success. 1433 */ 1434 ggio->gctl_error = 0; 1435 break; 1436 } 1437 } 1438 if (ii == ncomps) { 1439 /* 1440 * None of the requests were successful. 1441 * Use first error. 1442 */ 1443 ggio->gctl_error = hio->hio_errors[0]; 1444 } 1445 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1446 mtx_lock(&res->hr_amp_lock); 1447 activemap_write_complete(res->hr_amp, 1448 ggio->gctl_offset, ggio->gctl_length); 1449 mtx_unlock(&res->hr_amp_lock); 1450 } 1451 if (ggio->gctl_cmd == BIO_WRITE) { 1452 /* 1453 * Unlock range we locked. 1454 */ 1455 mtx_lock(&range_lock); 1456 rangelock_del(range_regular, ggio->gctl_offset, 1457 ggio->gctl_length); 1458 if (range_sync_wait) 1459 cv_signal(&range_sync_cond); 1460 mtx_unlock(&range_lock); 1461 /* 1462 * Bump local count if this is first write after 1463 * connection failure with remote node. 1464 */ 1465 ncomp = 1; 1466 rw_rlock(&hio_remote_lock[ncomp]); 1467 if (!ISCONNECTED(res, ncomp)) { 1468 mtx_lock(&metadata_lock); 1469 if (res->hr_primary_localcnt == 1470 res->hr_secondary_remotecnt) { 1471 res->hr_primary_localcnt++; 1472 pjdlog_debug(1, 1473 "Increasing localcnt to %ju.", 1474 (uintmax_t)res->hr_primary_localcnt); 1475 (void)metadata_write(res); 1476 } 1477 mtx_unlock(&metadata_lock); 1478 } 1479 rw_unlock(&hio_remote_lock[ncomp]); 1480 } 1481 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1482 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1483 pjdlog_debug(2, 1484 "ggate_send: (%p) Moving request to the free queue.", hio); 1485 QUEUE_INSERT2(hio, free); 1486 } 1487 /* NOTREACHED */ 1488 return (NULL); 1489} 1490 1491/* 1492 * Thread synchronize local and remote components. 1493 */ 1494static void * 1495sync_thread(void *arg __unused) 1496{ 1497 struct hast_resource *res = arg; 1498 struct hio *hio; 1499 struct g_gate_ctl_io *ggio; 1500 unsigned int ii, ncomp, ncomps; 1501 off_t offset, length, synced; 1502 bool dorewind; 1503 int syncext; 1504 1505 ncomps = HAST_NCOMPONENTS; 1506 dorewind = true; 1507 synced = -1; 1508 1509 for (;;) { 1510 mtx_lock(&sync_lock); 1511 if (synced == -1) 1512 synced = 0; 1513 else if (!sync_inprogress) { 1514 pjdlog_info("Synchronization interrupted. " 1515 "%jd bytes synchronized so far.", 1516 (intmax_t)synced); 1517 } 1518 while (!sync_inprogress) { 1519 dorewind = true; 1520 synced = 0; 1521 cv_wait(&sync_cond, &sync_lock); 1522 } 1523 mtx_unlock(&sync_lock); 1524 /* 1525 * Obtain offset at which we should synchronize. 1526 * Rewind synchronization if needed. 1527 */ 1528 mtx_lock(&res->hr_amp_lock); 1529 if (dorewind) 1530 activemap_sync_rewind(res->hr_amp); 1531 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1532 if (syncext != -1) { 1533 /* 1534 * We synchronized entire syncext extent, we can mark 1535 * it as clean now. 1536 */ 1537 if (activemap_extent_complete(res->hr_amp, syncext)) 1538 (void)hast_activemap_flush(res); 1539 } 1540 mtx_unlock(&res->hr_amp_lock); 1541 if (dorewind) { 1542 dorewind = false; 1543 if (offset < 0) 1544 pjdlog_info("Nodes are in sync."); 1545 else { 1546 pjdlog_info("Synchronization started. %ju bytes to go.", 1547 (uintmax_t)(res->hr_extentsize * 1548 activemap_ndirty(res->hr_amp))); 1549 } 1550 } 1551 if (offset < 0) { 1552 sync_stop(); 1553 pjdlog_debug(1, "Nothing to synchronize."); 1554 /* 1555 * Synchronization complete, make both localcnt and 1556 * remotecnt equal. 1557 */ 1558 ncomp = 1; 1559 rw_rlock(&hio_remote_lock[ncomp]); 1560 if (ISCONNECTED(res, ncomp)) { 1561 if (synced > 0) { 1562 pjdlog_info("Synchronization complete. " 1563 "%jd bytes synchronized.", 1564 (intmax_t)synced); 1565 } 1566 mtx_lock(&metadata_lock); 1567 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1568 res->hr_primary_localcnt = 1569 res->hr_secondary_localcnt; 1570 res->hr_primary_remotecnt = 1571 res->hr_secondary_remotecnt; 1572 pjdlog_debug(1, 1573 "Setting localcnt to %ju and remotecnt to %ju.", 1574 (uintmax_t)res->hr_primary_localcnt, 1575 (uintmax_t)res->hr_secondary_localcnt); 1576 (void)metadata_write(res); 1577 mtx_unlock(&metadata_lock); 1578 } 1579 rw_unlock(&hio_remote_lock[ncomp]); 1580 continue; 1581 } 1582 pjdlog_debug(2, "sync: Taking free request."); 1583 QUEUE_TAKE2(hio, free); 1584 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1585 /* 1586 * Lock the range we are going to synchronize. We don't want 1587 * race where someone writes between our read and write. 1588 */ 1589 for (;;) { 1590 mtx_lock(&range_lock); 1591 if (rangelock_islocked(range_regular, offset, length)) { 1592 pjdlog_debug(2, 1593 "sync: Range offset=%jd length=%jd locked.", 1594 (intmax_t)offset, (intmax_t)length); 1595 range_sync_wait = true; 1596 cv_wait(&range_sync_cond, &range_lock); 1597 range_sync_wait = false; 1598 mtx_unlock(&range_lock); 1599 continue; 1600 } 1601 if (rangelock_add(range_sync, offset, length) < 0) { 1602 mtx_unlock(&range_lock); 1603 pjdlog_debug(2, 1604 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1605 (intmax_t)offset, (intmax_t)length); 1606 sleep(1); 1607 continue; 1608 } 1609 mtx_unlock(&range_lock); 1610 break; 1611 } 1612 /* 1613 * First read the data from synchronization source. 1614 */ 1615 SYNCREQ(hio); 1616 ggio = &hio->hio_ggio; 1617 ggio->gctl_cmd = BIO_READ; 1618 ggio->gctl_offset = offset; 1619 ggio->gctl_length = length; 1620 ggio->gctl_error = 0; 1621 for (ii = 0; ii < ncomps; ii++) 1622 hio->hio_errors[ii] = EINVAL; 1623 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1624 hio); 1625 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1626 hio); 1627 mtx_lock(&metadata_lock); 1628 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1629 /* 1630 * This range is up-to-date on local component, 1631 * so handle request locally. 1632 */ 1633 /* Local component is 0 for now. */ 1634 ncomp = 0; 1635 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1636 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1637 /* 1638 * This range is out-of-date on local component, 1639 * so send request to the remote node. 1640 */ 1641 /* Remote component is 1 for now. */ 1642 ncomp = 1; 1643 } 1644 mtx_unlock(&metadata_lock); 1645 refcount_init(&hio->hio_countdown, 1); 1646 QUEUE_INSERT1(hio, send, ncomp); 1647 1648 /* 1649 * Let's wait for READ to finish. 1650 */ 1651 mtx_lock(&sync_lock); 1652 while (!ISSYNCREQDONE(hio)) 1653 cv_wait(&sync_cond, &sync_lock); 1654 mtx_unlock(&sync_lock); 1655 1656 if (hio->hio_errors[ncomp] != 0) { 1657 pjdlog_error("Unable to read synchronization data: %s.", 1658 strerror(hio->hio_errors[ncomp])); 1659 goto free_queue; 1660 } 1661 1662 /* 1663 * We read the data from synchronization source, now write it 1664 * to synchronization target. 1665 */ 1666 SYNCREQ(hio); 1667 ggio->gctl_cmd = BIO_WRITE; 1668 for (ii = 0; ii < ncomps; ii++) 1669 hio->hio_errors[ii] = EINVAL; 1670 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1671 hio); 1672 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1673 hio); 1674 mtx_lock(&metadata_lock); 1675 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1676 /* 1677 * This range is up-to-date on local component, 1678 * so we update remote component. 1679 */ 1680 /* Remote component is 1 for now. */ 1681 ncomp = 1; 1682 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1683 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1684 /* 1685 * This range is out-of-date on local component, 1686 * so we update it. 1687 */ 1688 /* Local component is 0 for now. */ 1689 ncomp = 0; 1690 } 1691 mtx_unlock(&metadata_lock); 1692 1693 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1694 hio); 1695 refcount_init(&hio->hio_countdown, 1); 1696 QUEUE_INSERT1(hio, send, ncomp); 1697 1698 /* 1699 * Let's wait for WRITE to finish. 1700 */ 1701 mtx_lock(&sync_lock); 1702 while (!ISSYNCREQDONE(hio)) 1703 cv_wait(&sync_cond, &sync_lock); 1704 mtx_unlock(&sync_lock); 1705 1706 if (hio->hio_errors[ncomp] != 0) { 1707 pjdlog_error("Unable to write synchronization data: %s.", 1708 strerror(hio->hio_errors[ncomp])); 1709 goto free_queue; 1710 } 1711 1712 synced += length; 1713free_queue: 1714 mtx_lock(&range_lock); 1715 rangelock_del(range_sync, offset, length); 1716 if (range_regular_wait) 1717 cv_signal(&range_regular_cond); 1718 mtx_unlock(&range_lock); 1719 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1720 hio); 1721 QUEUE_INSERT2(hio, free); 1722 } 1723 /* NOTREACHED */ 1724 return (NULL); 1725} 1726 1727static void 1728sighandler(int sig) 1729{ 1730 bool unlock; 1731 1732 switch (sig) { 1733 case SIGINT: 1734 case SIGTERM: 1735 sigexit_received = true; 1736 break; 1737 case SIGHUP: 1738 sighup_received = true; 1739 break; 1740 default: 1741 assert(!"invalid condition"); 1742 } 1743 /* 1744 * Racy, but if we cannot obtain hio_guard_lock here, we don't 1745 * want to risk deadlock. 1746 */ 1747 unlock = mtx_trylock(&hio_guard_lock); 1748 cv_signal(&hio_guard_cond); 1749 if (unlock) 1750 mtx_unlock(&hio_guard_lock); 1751} 1752 1753static void 1754config_reload(void) 1755{ 1756 struct hastd_config *newcfg; 1757 struct hast_resource *res; 1758 unsigned int ii, ncomps; 1759 int modified; 1760 1761 pjdlog_info("Reloading configuration..."); 1762 1763 ncomps = HAST_NCOMPONENTS; 1764 1765 newcfg = yy_config_parse(cfgpath, false); 1766 if (newcfg == NULL) 1767 goto failed; 1768 1769 TAILQ_FOREACH(res, &newcfg->hc_resources, hr_next) { 1770 if (strcmp(res->hr_name, gres->hr_name) == 0) 1771 break; 1772 } 1773 /* 1774 * If resource was removed from the configuration file, resource 1775 * name, provider name or path to local component was modified we 1776 * shouldn't be here. This means that someone modified configuration 1777 * file and send SIGHUP to us instead of main hastd process. 1778 * Log advice and ignore the signal. 1779 */ 1780 if (res == NULL || strcmp(gres->hr_name, res->hr_name) != 0 || 1781 strcmp(gres->hr_provname, res->hr_provname) != 0 || 1782 strcmp(gres->hr_localpath, res->hr_localpath) != 0) { 1783 pjdlog_warning("To reload configuration send SIGHUP to the main hastd process (pid %u).", 1784 (unsigned int)getppid()); 1785 goto failed; 1786 } 1787 1788#define MODIFIED_REMOTEADDR 0x1 1789#define MODIFIED_REPLICATION 0x2 1790#define MODIFIED_TIMEOUT 0x4 1791 modified = 0; 1792 if (strcmp(gres->hr_remoteaddr, res->hr_remoteaddr) != 0) { 1793 /* 1794 * Don't copy res->hr_remoteaddr to gres just yet. 1795 * We want remote_close() to log disconnect from the old 1796 * addresses, not from the new ones. 1797 */ 1798 modified |= MODIFIED_REMOTEADDR; 1799 } 1800 if (gres->hr_replication != res->hr_replication) { 1801 gres->hr_replication = res->hr_replication; 1802 modified |= MODIFIED_REPLICATION; 1803 } 1804 if (gres->hr_timeout != res->hr_timeout) { 1805 gres->hr_timeout = res->hr_timeout; 1806 modified |= MODIFIED_TIMEOUT; 1807 } 1808 /* 1809 * If only timeout was modified we only need to change it without 1810 * reconnecting. 1811 */ 1812 if (modified == MODIFIED_TIMEOUT) { 1813 for (ii = 0; ii < ncomps; ii++) { 1814 if (!ISREMOTE(ii)) 1815 continue; 1816 rw_rlock(&hio_remote_lock[ii]); 1817 if (!ISCONNECTED(gres, ii)) { 1818 rw_unlock(&hio_remote_lock[ii]); 1819 continue; 1820 } 1821 rw_unlock(&hio_remote_lock[ii]); 1822 if (proto_timeout(gres->hr_remotein, 1823 gres->hr_timeout) < 0) { 1824 pjdlog_errno(LOG_WARNING, 1825 "Unable to set connection timeout"); 1826 } 1827 if (proto_timeout(gres->hr_remoteout, 1828 gres->hr_timeout) < 0) { 1829 pjdlog_errno(LOG_WARNING, 1830 "Unable to set connection timeout"); 1831 } 1832 } 1833 } else { 1834 for (ii = 0; ii < ncomps; ii++) { 1835 if (!ISREMOTE(ii)) 1836 continue; 1837 remote_close(gres, ii); 1838 } 1839 if (modified & MODIFIED_REMOTEADDR) { 1840 strlcpy(gres->hr_remoteaddr, res->hr_remoteaddr, 1841 sizeof(gres->hr_remoteaddr)); 1842 } 1843 } 1844#undef MODIFIED_REMOTEADDR 1845#undef MODIFIED_REPLICATION 1846#undef MODIFIED_TIMEOUT 1847 1848 pjdlog_info("Configuration reloaded successfully."); 1849 return; 1850failed: 1851 if (newcfg != NULL) { 1852 if (newcfg->hc_controlconn != NULL) 1853 proto_close(newcfg->hc_controlconn); 1854 if (newcfg->hc_listenconn != NULL) 1855 proto_close(newcfg->hc_listenconn); 1856 yy_config_free(newcfg); 1857 } 1858 pjdlog_warning("Configuration not reloaded."); 1859} 1860 1861static void 1862keepalive_send(struct hast_resource *res, unsigned int ncomp) 1863{ 1864 struct nv *nv; 1865 1866 nv = nv_alloc(); 1867 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1868 if (nv_error(nv) != 0) { 1869 nv_free(nv); 1870 pjdlog_debug(1, 1871 "keepalive_send: Unable to prepare header to send."); 1872 return; 1873 } 1874 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1875 pjdlog_common(LOG_DEBUG, 1, errno, 1876 "keepalive_send: Unable to send request"); 1877 nv_free(nv); 1878 rw_unlock(&hio_remote_lock[ncomp]); 1879 remote_close(res, ncomp); 1880 rw_rlock(&hio_remote_lock[ncomp]); 1881 return; 1882 } 1883 nv_free(nv); 1884 pjdlog_debug(2, "keepalive_send: Request sent."); 1885} 1886 1887/* 1888 * Thread guards remote connections and reconnects when needed, handles 1889 * signals, etc. 1890 */ 1891static void * 1892guard_thread(void *arg) 1893{ 1894 struct hast_resource *res = arg; 1895 struct proto_conn *in, *out; 1896 unsigned int ii, ncomps; 1897 int timeout; 1898 1899 ncomps = HAST_NCOMPONENTS; 1900 1901 for (;;) { 1902 if (sigexit_received) { 1903 primary_exitx(EX_OK, 1904 "Termination signal received, exiting."); 1905 } 1906 if (sighup_received) { 1907 sighup_received = false; 1908 config_reload(); 1909 } 1910 1911 timeout = KEEPALIVE_SLEEP; 1912 pjdlog_debug(2, "remote_guard: Checking connections."); 1913 mtx_lock(&hio_guard_lock); 1914 for (ii = 0; ii < ncomps; ii++) { 1915 if (!ISREMOTE(ii)) 1916 continue; 1917 rw_rlock(&hio_remote_lock[ii]); 1918 if (ISCONNECTED(res, ii)) { 1919 assert(res->hr_remotein != NULL); 1920 assert(res->hr_remoteout != NULL); 1921 keepalive_send(res, ii); 1922 } 1923 if (ISCONNECTED(res, ii)) { 1924 assert(res->hr_remotein != NULL); 1925 assert(res->hr_remoteout != NULL); 1926 rw_unlock(&hio_remote_lock[ii]); 1927 pjdlog_debug(2, 1928 "remote_guard: Connection to %s is ok.", 1929 res->hr_remoteaddr); 1930 } else if (real_remote(res)) { 1931 assert(res->hr_remotein == NULL); 1932 assert(res->hr_remoteout == NULL); 1933 /* 1934 * Upgrade the lock. It doesn't have to be 1935 * atomic as no other thread can change 1936 * connection status from disconnected to 1937 * connected. 1938 */ 1939 rw_unlock(&hio_remote_lock[ii]); 1940 pjdlog_debug(2, 1941 "remote_guard: Reconnecting to %s.", 1942 res->hr_remoteaddr); 1943 in = out = NULL; 1944 if (init_remote(res, &in, &out)) { 1945 rw_wlock(&hio_remote_lock[ii]); 1946 assert(res->hr_remotein == NULL); 1947 assert(res->hr_remoteout == NULL); 1948 assert(in != NULL && out != NULL); 1949 res->hr_remotein = in; 1950 res->hr_remoteout = out; 1951 rw_unlock(&hio_remote_lock[ii]); 1952 pjdlog_info("Successfully reconnected to %s.", 1953 res->hr_remoteaddr); 1954 sync_start(); 1955 } else { 1956 /* Both connections should be NULL. */ 1957 assert(res->hr_remotein == NULL); 1958 assert(res->hr_remoteout == NULL); 1959 assert(in == NULL && out == NULL); 1960 pjdlog_debug(2, 1961 "remote_guard: Reconnect to %s failed.", 1962 res->hr_remoteaddr); 1963 timeout = RECONNECT_SLEEP; 1964 } 1965 } else { 1966 rw_unlock(&hio_remote_lock[ii]); 1967 } 1968 } 1969 (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1970 mtx_unlock(&hio_guard_lock); 1971 } 1972 /* NOTREACHED */ 1973 return (NULL); 1974} 1975