primary.c revision 213009
1228976Suqs/*- 236332Sdes * Copyright (c) 2009 The FreeBSD Foundation 336332Sdes * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 436332Sdes * All rights reserved. 536332Sdes * 636332Sdes * This software was developed by Pawel Jakub Dawidek under sponsorship from 736332Sdes * the FreeBSD Foundation. 836332Sdes * 936332Sdes * Redistribution and use in source and binary forms, with or without 1036332Sdes * modification, are permitted provided that the following conditions 1136332Sdes * are met: 1236332Sdes * 1. Redistributions of source code must retain the above copyright 1336332Sdes * notice, this list of conditions and the following disclaimer. 1436416Sdes * 2. Redistributions in binary form must reproduce the above copyright 1536332Sdes * notice, this list of conditions and the following disclaimer in the 1636332Sdes * documentation and/or other materials provided with the distribution. 1736332Sdes * 1836332Sdes * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 1936332Sdes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 2036332Sdes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2136332Sdes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 2236332Sdes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2336332Sdes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2436332Sdes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2536332Sdes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2636332Sdes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2750479Speter * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2836332Sdes * SUCH DAMAGE. 29147377Sru */ 3036332Sdes 3179537Sru#include <sys/cdefs.h> 3236332Sdes__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 213009 2010-09-22 19:08:11Z pjd $"); 3336332Sdes 3436332Sdes#include <sys/types.h> 3536332Sdes#include <sys/time.h> 3658491Sru#include <sys/bio.h> 37243084Seadler#include <sys/disk.h> 3836332Sdes#include <sys/refcount.h> 3936332Sdes#include <sys/stat.h> 4058491Sru 4158491Sru#include <geom/gate/g_gate.h> 4258491Sru 4336332Sdes#include <assert.h> 4457673Ssheldonh#include <err.h> 4557673Ssheldonh#include <errno.h> 4636332Sdes#include <fcntl.h> 4736332Sdes#include <libgeom.h> 48147377Sru#include <pthread.h> 49147377Sru#include <signal.h> 50146641Sroberto#include <stdint.h> 51243084Seadler#include <stdio.h> 52243084Seadler#include <string.h> 53243084Seadler#include <sysexits.h> 54243084Seadler#include <unistd.h> 55243084Seadler 56243084Seadler#include <activemap.h> 5736502Sdes#include <nv.h> 5858530Sru#include <rangelock.h> 5936502Sdes 6058530Sru#include "control.h" 6152335Scharnier#include "event.h" 62140368Sru#include "hast.h" 6358491Sru#include "hast_proto.h" 6458491Sru#include "hastd.h" 6558530Sru#include "hooks.h" 6658530Sru#include "metadata.h" 6758530Sru#include "proto.h" 6858530Sru#include "pjdlog.h" 6958530Sru#include "subr.h" 7058530Sru#include "synch.h" 71140442Sru 72140442Sru/* The is only one remote component for now. */ 73140442Sru#define ISREMOTE(no) ((no) == 1) 74140442Sru 75140442Srustruct hio { 76140442Sru /* 77140442Sru * Number of components we are still waiting for. 78140442Sru * When this field goes to 0, we can send the request back to the 7936332Sdes * kernel. Each component has to decrease this counter by one 8036332Sdes * even on failure. 8136332Sdes */ 8258491Sru unsigned int hio_countdown; 8358491Sru /* 8436332Sdes * Each component has a place to store its own error. 85147377Sru * Once the request is handled by all components we can decide if the 8658491Sru * request overall is successful or not. 8758491Sru */ 8858491Sru int *hio_errors; 89124963Sdes /* 90146641Sroberto * Structure used to comunicate with GEOM Gate class. 91146641Sroberto */ 9236332Sdes struct g_gate_ctl_io hio_ggio; 93146641Sroberto TAILQ_ENTRY(hio) *hio_next; 94}; 95#define hio_free_next hio_next[0] 96#define hio_done_next hio_next[0] 97 98/* 99 * Free list holds unused structures. When free list is empty, we have to wait 100 * until some in-progress requests are freed. 101 */ 102static TAILQ_HEAD(, hio) hio_free_list; 103static pthread_mutex_t hio_free_list_lock; 104static pthread_cond_t hio_free_list_cond; 105/* 106 * There is one send list for every component. One requests is placed on all 107 * send lists - each component gets the same request, but each component is 108 * responsible for managing his own send list. 109 */ 110static TAILQ_HEAD(, hio) *hio_send_list; 111static pthread_mutex_t *hio_send_list_lock; 112static pthread_cond_t *hio_send_list_cond; 113/* 114 * There is one recv list for every component, although local components don't 115 * use recv lists as local requests are done synchronously. 116 */ 117static TAILQ_HEAD(, hio) *hio_recv_list; 118static pthread_mutex_t *hio_recv_list_lock; 119static pthread_cond_t *hio_recv_list_cond; 120/* 121 * Request is placed on done list by the slowest component (the one that 122 * decreased hio_countdown from 1 to 0). 123 */ 124static TAILQ_HEAD(, hio) hio_done_list; 125static pthread_mutex_t hio_done_list_lock; 126static pthread_cond_t hio_done_list_cond; 127/* 128 * Structure below are for interaction with sync thread. 129 */ 130static bool sync_inprogress; 131static pthread_mutex_t sync_lock; 132static pthread_cond_t sync_cond; 133/* 134 * The lock below allows to synchornize access to remote connections. 135 */ 136static pthread_rwlock_t *hio_remote_lock; 137 138/* 139 * Lock to synchronize metadata updates. Also synchronize access to 140 * hr_primary_localcnt and hr_primary_remotecnt fields. 141 */ 142static pthread_mutex_t metadata_lock; 143 144/* 145 * Maximum number of outstanding I/O requests. 146 */ 147#define HAST_HIO_MAX 256 148/* 149 * Number of components. At this point there are only two components: local 150 * and remote, but in the future it might be possible to use multiple local 151 * and remote components. 152 */ 153#define HAST_NCOMPONENTS 2 154/* 155 * Number of seconds to sleep between reconnect retries or keepalive packets. 156 */ 157#define RETRY_SLEEP 10 158 159#define ISCONNECTED(res, no) \ 160 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 161 162#define QUEUE_INSERT1(hio, name, ncomp) do { \ 163 bool _wakeup; \ 164 \ 165 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 166 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 167 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 168 hio_next[(ncomp)]); \ 169 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 170 if (_wakeup) \ 171 cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 172} while (0) 173#define QUEUE_INSERT2(hio, name) do { \ 174 bool _wakeup; \ 175 \ 176 mtx_lock(&hio_##name##_list_lock); \ 177 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 178 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 179 mtx_unlock(&hio_##name##_list_lock); \ 180 if (_wakeup) \ 181 cv_signal(&hio_##name##_list_cond); \ 182} while (0) 183#define QUEUE_TAKE1(hio, name, ncomp) do { \ 184 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 185 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 186 cv_wait(&hio_##name##_list_cond[(ncomp)], \ 187 &hio_##name##_list_lock[(ncomp)]); \ 188 } \ 189 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 190 hio_next[(ncomp)]); \ 191 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 192} while (0) 193#define QUEUE_TAKE2(hio, name) do { \ 194 mtx_lock(&hio_##name##_list_lock); \ 195 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 196 cv_wait(&hio_##name##_list_cond, \ 197 &hio_##name##_list_lock); \ 198 } \ 199 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 200 mtx_unlock(&hio_##name##_list_lock); \ 201} while (0) 202 203#define SYNCREQ(hio) do { \ 204 (hio)->hio_ggio.gctl_unit = -1; \ 205 (hio)->hio_ggio.gctl_seq = 1; \ 206} while (0) 207#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 208#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 209#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 210 211static struct hast_resource *gres; 212 213static pthread_mutex_t range_lock; 214static struct rangelocks *range_regular; 215static bool range_regular_wait; 216static pthread_cond_t range_regular_cond; 217static struct rangelocks *range_sync; 218static bool range_sync_wait; 219static pthread_cond_t range_sync_cond; 220 221static void *ggate_recv_thread(void *arg); 222static void *local_send_thread(void *arg); 223static void *remote_send_thread(void *arg); 224static void *remote_recv_thread(void *arg); 225static void *ggate_send_thread(void *arg); 226static void *sync_thread(void *arg); 227static void *guard_thread(void *arg); 228 229static void 230cleanup(struct hast_resource *res) 231{ 232 int rerrno; 233 234 /* Remember errno. */ 235 rerrno = errno; 236 237 /* 238 * Close descriptor to /dev/hast/<name> 239 * to work-around race in the kernel. 240 */ 241 close(res->hr_localfd); 242 243 /* Destroy ggate provider if we created one. */ 244 if (res->hr_ggateunit >= 0) { 245 struct g_gate_ctl_destroy ggiod; 246 247 ggiod.gctl_version = G_GATE_VERSION; 248 ggiod.gctl_unit = res->hr_ggateunit; 249 ggiod.gctl_force = 1; 250 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 251 pjdlog_warning("Unable to destroy hast/%s device", 252 res->hr_provname); 253 } 254 res->hr_ggateunit = -1; 255 } 256 257 /* Restore errno. */ 258 errno = rerrno; 259} 260 261static __dead2 void 262primary_exit(int exitcode, const char *fmt, ...) 263{ 264 va_list ap; 265 266 assert(exitcode != EX_OK); 267 va_start(ap, fmt); 268 pjdlogv_errno(LOG_ERR, fmt, ap); 269 va_end(ap); 270 cleanup(gres); 271 exit(exitcode); 272} 273 274static __dead2 void 275primary_exitx(int exitcode, const char *fmt, ...) 276{ 277 va_list ap; 278 279 va_start(ap, fmt); 280 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 281 va_end(ap); 282 cleanup(gres); 283 exit(exitcode); 284} 285 286static int 287hast_activemap_flush(struct hast_resource *res) 288{ 289 const unsigned char *buf; 290 size_t size; 291 292 buf = activemap_bitmap(res->hr_amp, &size); 293 assert(buf != NULL); 294 assert((size % res->hr_local_sectorsize) == 0); 295 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 296 (ssize_t)size) { 297 KEEP_ERRNO(pjdlog_errno(LOG_ERR, 298 "Unable to flush activemap to disk")); 299 return (-1); 300 } 301 return (0); 302} 303 304static bool 305real_remote(const struct hast_resource *res) 306{ 307 308 return (strcmp(res->hr_remoteaddr, "none") != 0); 309} 310 311static void 312init_environment(struct hast_resource *res __unused) 313{ 314 struct hio *hio; 315 unsigned int ii, ncomps; 316 317 /* 318 * In the future it might be per-resource value. 319 */ 320 ncomps = HAST_NCOMPONENTS; 321 322 /* 323 * Allocate memory needed by lists. 324 */ 325 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 326 if (hio_send_list == NULL) { 327 primary_exitx(EX_TEMPFAIL, 328 "Unable to allocate %zu bytes of memory for send lists.", 329 sizeof(hio_send_list[0]) * ncomps); 330 } 331 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 332 if (hio_send_list_lock == NULL) { 333 primary_exitx(EX_TEMPFAIL, 334 "Unable to allocate %zu bytes of memory for send list locks.", 335 sizeof(hio_send_list_lock[0]) * ncomps); 336 } 337 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 338 if (hio_send_list_cond == NULL) { 339 primary_exitx(EX_TEMPFAIL, 340 "Unable to allocate %zu bytes of memory for send list condition variables.", 341 sizeof(hio_send_list_cond[0]) * ncomps); 342 } 343 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 344 if (hio_recv_list == NULL) { 345 primary_exitx(EX_TEMPFAIL, 346 "Unable to allocate %zu bytes of memory for recv lists.", 347 sizeof(hio_recv_list[0]) * ncomps); 348 } 349 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 350 if (hio_recv_list_lock == NULL) { 351 primary_exitx(EX_TEMPFAIL, 352 "Unable to allocate %zu bytes of memory for recv list locks.", 353 sizeof(hio_recv_list_lock[0]) * ncomps); 354 } 355 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 356 if (hio_recv_list_cond == NULL) { 357 primary_exitx(EX_TEMPFAIL, 358 "Unable to allocate %zu bytes of memory for recv list condition variables.", 359 sizeof(hio_recv_list_cond[0]) * ncomps); 360 } 361 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 362 if (hio_remote_lock == NULL) { 363 primary_exitx(EX_TEMPFAIL, 364 "Unable to allocate %zu bytes of memory for remote connections locks.", 365 sizeof(hio_remote_lock[0]) * ncomps); 366 } 367 368 /* 369 * Initialize lists, their locks and theirs condition variables. 370 */ 371 TAILQ_INIT(&hio_free_list); 372 mtx_init(&hio_free_list_lock); 373 cv_init(&hio_free_list_cond); 374 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 375 TAILQ_INIT(&hio_send_list[ii]); 376 mtx_init(&hio_send_list_lock[ii]); 377 cv_init(&hio_send_list_cond[ii]); 378 TAILQ_INIT(&hio_recv_list[ii]); 379 mtx_init(&hio_recv_list_lock[ii]); 380 cv_init(&hio_recv_list_cond[ii]); 381 rw_init(&hio_remote_lock[ii]); 382 } 383 TAILQ_INIT(&hio_done_list); 384 mtx_init(&hio_done_list_lock); 385 cv_init(&hio_done_list_cond); 386 mtx_init(&metadata_lock); 387 388 /* 389 * Allocate requests pool and initialize requests. 390 */ 391 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 392 hio = malloc(sizeof(*hio)); 393 if (hio == NULL) { 394 primary_exitx(EX_TEMPFAIL, 395 "Unable to allocate %zu bytes of memory for hio request.", 396 sizeof(*hio)); 397 } 398 hio->hio_countdown = 0; 399 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 400 if (hio->hio_errors == NULL) { 401 primary_exitx(EX_TEMPFAIL, 402 "Unable allocate %zu bytes of memory for hio errors.", 403 sizeof(hio->hio_errors[0]) * ncomps); 404 } 405 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 406 if (hio->hio_next == NULL) { 407 primary_exitx(EX_TEMPFAIL, 408 "Unable allocate %zu bytes of memory for hio_next field.", 409 sizeof(hio->hio_next[0]) * ncomps); 410 } 411 hio->hio_ggio.gctl_version = G_GATE_VERSION; 412 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 413 if (hio->hio_ggio.gctl_data == NULL) { 414 primary_exitx(EX_TEMPFAIL, 415 "Unable to allocate %zu bytes of memory for gctl_data.", 416 MAXPHYS); 417 } 418 hio->hio_ggio.gctl_length = MAXPHYS; 419 hio->hio_ggio.gctl_error = 0; 420 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 421 } 422} 423 424static void 425init_local(struct hast_resource *res) 426{ 427 unsigned char *buf; 428 size_t mapsize; 429 430 if (metadata_read(res, true) < 0) 431 exit(EX_NOINPUT); 432 mtx_init(&res->hr_amp_lock); 433 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 434 res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 435 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 436 } 437 mtx_init(&range_lock); 438 cv_init(&range_regular_cond); 439 if (rangelock_init(&range_regular) < 0) 440 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 441 cv_init(&range_sync_cond); 442 if (rangelock_init(&range_sync) < 0) 443 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 444 mapsize = activemap_ondisk_size(res->hr_amp); 445 buf = calloc(1, mapsize); 446 if (buf == NULL) { 447 primary_exitx(EX_TEMPFAIL, 448 "Unable to allocate buffer for activemap."); 449 } 450 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 451 (ssize_t)mapsize) { 452 primary_exit(EX_NOINPUT, "Unable to read activemap"); 453 } 454 activemap_copyin(res->hr_amp, buf, mapsize); 455 free(buf); 456 if (res->hr_resuid != 0) 457 return; 458 /* 459 * We're using provider for the first time, so we have to generate 460 * resource unique identifier and initialize local and remote counts. 461 */ 462 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 463 res->hr_primary_localcnt = 1; 464 res->hr_primary_remotecnt = 0; 465 if (metadata_write(res) < 0) 466 exit(EX_NOINPUT); 467} 468 469static bool 470init_remote(struct hast_resource *res, struct proto_conn **inp, 471 struct proto_conn **outp) 472{ 473 struct proto_conn *in, *out; 474 struct nv *nvout, *nvin; 475 const unsigned char *token; 476 unsigned char *map; 477 const char *errmsg; 478 int32_t extentsize; 479 int64_t datasize; 480 uint32_t mapsize; 481 size_t size; 482 483 assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 484 assert(real_remote(res)); 485 486 in = out = NULL; 487 errmsg = NULL; 488 489 /* Prepare outgoing connection with remote node. */ 490 if (proto_client(res->hr_remoteaddr, &out) < 0) { 491 primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 492 res->hr_remoteaddr); 493 } 494 /* Try to connect, but accept failure. */ 495 if (proto_connect(out) < 0) { 496 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 497 res->hr_remoteaddr); 498 goto close; 499 } 500 /* Error in setting timeout is not critical, but why should it fail? */ 501 if (proto_timeout(out, res->hr_timeout) < 0) 502 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 503 /* 504 * First handshake step. 505 * Setup outgoing connection with remote node. 506 */ 507 nvout = nv_alloc(); 508 nv_add_string(nvout, res->hr_name, "resource"); 509 if (nv_error(nvout) != 0) { 510 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 511 "Unable to allocate header for connection with %s", 512 res->hr_remoteaddr); 513 nv_free(nvout); 514 goto close; 515 } 516 if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 517 pjdlog_errno(LOG_WARNING, 518 "Unable to send handshake header to %s", 519 res->hr_remoteaddr); 520 nv_free(nvout); 521 goto close; 522 } 523 nv_free(nvout); 524 if (hast_proto_recv_hdr(out, &nvin) < 0) { 525 pjdlog_errno(LOG_WARNING, 526 "Unable to receive handshake header from %s", 527 res->hr_remoteaddr); 528 goto close; 529 } 530 errmsg = nv_get_string(nvin, "errmsg"); 531 if (errmsg != NULL) { 532 pjdlog_warning("%s", errmsg); 533 nv_free(nvin); 534 goto close; 535 } 536 token = nv_get_uint8_array(nvin, &size, "token"); 537 if (token == NULL) { 538 pjdlog_warning("Handshake header from %s has no 'token' field.", 539 res->hr_remoteaddr); 540 nv_free(nvin); 541 goto close; 542 } 543 if (size != sizeof(res->hr_token)) { 544 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 545 res->hr_remoteaddr, size, sizeof(res->hr_token)); 546 nv_free(nvin); 547 goto close; 548 } 549 bcopy(token, res->hr_token, sizeof(res->hr_token)); 550 nv_free(nvin); 551 552 /* 553 * Second handshake step. 554 * Setup incoming connection with remote node. 555 */ 556 if (proto_client(res->hr_remoteaddr, &in) < 0) { 557 pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 558 res->hr_remoteaddr); 559 } 560 /* Try to connect, but accept failure. */ 561 if (proto_connect(in) < 0) { 562 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 563 res->hr_remoteaddr); 564 goto close; 565 } 566 /* Error in setting timeout is not critical, but why should it fail? */ 567 if (proto_timeout(in, res->hr_timeout) < 0) 568 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 569 nvout = nv_alloc(); 570 nv_add_string(nvout, res->hr_name, "resource"); 571 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 572 "token"); 573 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 574 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 575 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 576 if (nv_error(nvout) != 0) { 577 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 578 "Unable to allocate header for connection with %s", 579 res->hr_remoteaddr); 580 nv_free(nvout); 581 goto close; 582 } 583 if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 584 pjdlog_errno(LOG_WARNING, 585 "Unable to send handshake header to %s", 586 res->hr_remoteaddr); 587 nv_free(nvout); 588 goto close; 589 } 590 nv_free(nvout); 591 if (hast_proto_recv_hdr(out, &nvin) < 0) { 592 pjdlog_errno(LOG_WARNING, 593 "Unable to receive handshake header from %s", 594 res->hr_remoteaddr); 595 goto close; 596 } 597 errmsg = nv_get_string(nvin, "errmsg"); 598 if (errmsg != NULL) { 599 pjdlog_warning("%s", errmsg); 600 nv_free(nvin); 601 goto close; 602 } 603 datasize = nv_get_int64(nvin, "datasize"); 604 if (datasize != res->hr_datasize) { 605 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 606 (intmax_t)res->hr_datasize, (intmax_t)datasize); 607 nv_free(nvin); 608 goto close; 609 } 610 extentsize = nv_get_int32(nvin, "extentsize"); 611 if (extentsize != res->hr_extentsize) { 612 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 613 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 614 nv_free(nvin); 615 goto close; 616 } 617 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 618 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 619 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 620 map = NULL; 621 mapsize = nv_get_uint32(nvin, "mapsize"); 622 if (mapsize > 0) { 623 map = malloc(mapsize); 624 if (map == NULL) { 625 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 626 (uintmax_t)mapsize); 627 nv_free(nvin); 628 goto close; 629 } 630 /* 631 * Remote node have some dirty extents on its own, lets 632 * download its activemap. 633 */ 634 if (hast_proto_recv_data(res, out, nvin, map, 635 mapsize) < 0) { 636 pjdlog_errno(LOG_ERR, 637 "Unable to receive remote activemap"); 638 nv_free(nvin); 639 free(map); 640 goto close; 641 } 642 /* 643 * Merge local and remote bitmaps. 644 */ 645 activemap_merge(res->hr_amp, map, mapsize); 646 free(map); 647 /* 648 * Now that we merged bitmaps from both nodes, flush it to the 649 * disk before we start to synchronize. 650 */ 651 (void)hast_activemap_flush(res); 652 } 653 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 654 if (inp != NULL && outp != NULL) { 655 *inp = in; 656 *outp = out; 657 } else { 658 res->hr_remotein = in; 659 res->hr_remoteout = out; 660 } 661 event_send(res, EVENT_CONNECT); 662 return (true); 663close: 664 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 665 event_send(res, EVENT_SPLITBRAIN); 666 proto_close(out); 667 if (in != NULL) 668 proto_close(in); 669 return (false); 670} 671 672static void 673sync_start(void) 674{ 675 676 mtx_lock(&sync_lock); 677 sync_inprogress = true; 678 mtx_unlock(&sync_lock); 679 cv_signal(&sync_cond); 680} 681 682static void 683sync_stop(void) 684{ 685 686 mtx_lock(&sync_lock); 687 if (sync_inprogress) 688 sync_inprogress = false; 689 mtx_unlock(&sync_lock); 690} 691 692static void 693init_ggate(struct hast_resource *res) 694{ 695 struct g_gate_ctl_create ggiocreate; 696 struct g_gate_ctl_cancel ggiocancel; 697 698 /* 699 * We communicate with ggate via /dev/ggctl. Open it. 700 */ 701 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 702 if (res->hr_ggatefd < 0) 703 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 704 /* 705 * Create provider before trying to connect, as connection failure 706 * is not critical, but may take some time. 707 */ 708 ggiocreate.gctl_version = G_GATE_VERSION; 709 ggiocreate.gctl_mediasize = res->hr_datasize; 710 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 711 ggiocreate.gctl_flags = 0; 712 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 713 ggiocreate.gctl_timeout = 0; 714 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 715 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 716 res->hr_provname); 717 bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 718 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 719 pjdlog_info("Device hast/%s created.", res->hr_provname); 720 res->hr_ggateunit = ggiocreate.gctl_unit; 721 return; 722 } 723 if (errno != EEXIST) { 724 primary_exit(EX_OSERR, "Unable to create hast/%s device", 725 res->hr_provname); 726 } 727 pjdlog_debug(1, 728 "Device hast/%s already exists, we will try to take it over.", 729 res->hr_provname); 730 /* 731 * If we received EEXIST, we assume that the process who created the 732 * provider died and didn't clean up. In that case we will start from 733 * where he left of. 734 */ 735 ggiocancel.gctl_version = G_GATE_VERSION; 736 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 737 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 738 res->hr_provname); 739 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 740 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 741 res->hr_ggateunit = ggiocancel.gctl_unit; 742 return; 743 } 744 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 745 res->hr_provname); 746} 747 748void 749hastd_primary(struct hast_resource *res) 750{ 751 pthread_t td; 752 pid_t pid; 753 int error; 754 755 /* 756 * Create communication channel between parent and child. 757 */ 758 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 759 KEEP_ERRNO((void)pidfile_remove(pfh)); 760 pjdlog_exit(EX_OSERR, 761 "Unable to create control sockets between parent and child"); 762 } 763 /* 764 * Create communication channel between child and parent. 765 */ 766 if (proto_client("socketpair://", &res->hr_event) < 0) { 767 KEEP_ERRNO((void)pidfile_remove(pfh)); 768 pjdlog_exit(EX_OSERR, 769 "Unable to create event sockets between child and parent"); 770 } 771 772 pid = fork(); 773 if (pid < 0) { 774 KEEP_ERRNO((void)pidfile_remove(pfh)); 775 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 776 } 777 778 if (pid > 0) { 779 /* This is parent. */ 780 /* Declare that we are receiver. */ 781 proto_recv(res->hr_event, NULL, 0); 782 res->hr_workerpid = pid; 783 return; 784 } 785 786 gres = res; 787 788 (void)pidfile_close(pfh); 789 hook_fini(); 790 791 setproctitle("%s (primary)", res->hr_name); 792 793 /* Declare that we are sender. */ 794 proto_send(res->hr_event, NULL, 0); 795 796 init_local(res); 797 init_ggate(res); 798 init_environment(res); 799 /* 800 * Create the control thread before sending any event to the parent, 801 * as we can deadlock when parent sends control request to worker, 802 * but worker has no control thread started yet, so parent waits. 803 * In the meantime worker sends an event to the parent, but parent 804 * is unable to handle the event, because it waits for control 805 * request response. 806 */ 807 error = pthread_create(&td, NULL, ctrl_thread, res); 808 assert(error == 0); 809 if (real_remote(res) && init_remote(res, NULL, NULL)) 810 sync_start(); 811 error = pthread_create(&td, NULL, ggate_recv_thread, res); 812 assert(error == 0); 813 error = pthread_create(&td, NULL, local_send_thread, res); 814 assert(error == 0); 815 error = pthread_create(&td, NULL, remote_send_thread, res); 816 assert(error == 0); 817 error = pthread_create(&td, NULL, remote_recv_thread, res); 818 assert(error == 0); 819 error = pthread_create(&td, NULL, ggate_send_thread, res); 820 assert(error == 0); 821 error = pthread_create(&td, NULL, sync_thread, res); 822 assert(error == 0); 823 (void)guard_thread(res); 824} 825 826static void 827reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 828{ 829 char msg[1024]; 830 va_list ap; 831 int len; 832 833 va_start(ap, fmt); 834 len = vsnprintf(msg, sizeof(msg), fmt, ap); 835 va_end(ap); 836 if ((size_t)len < sizeof(msg)) { 837 switch (ggio->gctl_cmd) { 838 case BIO_READ: 839 (void)snprintf(msg + len, sizeof(msg) - len, 840 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 841 (uintmax_t)ggio->gctl_length); 842 break; 843 case BIO_DELETE: 844 (void)snprintf(msg + len, sizeof(msg) - len, 845 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 846 (uintmax_t)ggio->gctl_length); 847 break; 848 case BIO_FLUSH: 849 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 850 break; 851 case BIO_WRITE: 852 (void)snprintf(msg + len, sizeof(msg) - len, 853 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 854 (uintmax_t)ggio->gctl_length); 855 break; 856 default: 857 (void)snprintf(msg + len, sizeof(msg) - len, 858 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 859 break; 860 } 861 } 862 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 863} 864 865static void 866remote_close(struct hast_resource *res, int ncomp) 867{ 868 869 rw_wlock(&hio_remote_lock[ncomp]); 870 /* 871 * A race is possible between dropping rlock and acquiring wlock - 872 * another thread can close connection in-between. 873 */ 874 if (!ISCONNECTED(res, ncomp)) { 875 assert(res->hr_remotein == NULL); 876 assert(res->hr_remoteout == NULL); 877 rw_unlock(&hio_remote_lock[ncomp]); 878 return; 879 } 880 881 assert(res->hr_remotein != NULL); 882 assert(res->hr_remoteout != NULL); 883 884 pjdlog_debug(2, "Closing incoming connection to %s.", 885 res->hr_remoteaddr); 886 proto_close(res->hr_remotein); 887 res->hr_remotein = NULL; 888 pjdlog_debug(2, "Closing outgoing connection to %s.", 889 res->hr_remoteaddr); 890 proto_close(res->hr_remoteout); 891 res->hr_remoteout = NULL; 892 893 rw_unlock(&hio_remote_lock[ncomp]); 894 895 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 896 897 /* 898 * Stop synchronization if in-progress. 899 */ 900 sync_stop(); 901 902 event_send(res, EVENT_DISCONNECT); 903} 904 905/* 906 * Thread receives ggate I/O requests from the kernel and passes them to 907 * appropriate threads: 908 * WRITE - always goes to both local_send and remote_send threads 909 * READ (when the block is up-to-date on local component) - 910 * only local_send thread 911 * READ (when the block isn't up-to-date on local component) - 912 * only remote_send thread 913 * DELETE - always goes to both local_send and remote_send threads 914 * FLUSH - always goes to both local_send and remote_send threads 915 */ 916static void * 917ggate_recv_thread(void *arg) 918{ 919 struct hast_resource *res = arg; 920 struct g_gate_ctl_io *ggio; 921 struct hio *hio; 922 unsigned int ii, ncomp, ncomps; 923 int error; 924 925 ncomps = HAST_NCOMPONENTS; 926 927 for (;;) { 928 pjdlog_debug(2, "ggate_recv: Taking free request."); 929 QUEUE_TAKE2(hio, free); 930 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 931 ggio = &hio->hio_ggio; 932 ggio->gctl_unit = res->hr_ggateunit; 933 ggio->gctl_length = MAXPHYS; 934 ggio->gctl_error = 0; 935 pjdlog_debug(2, 936 "ggate_recv: (%p) Waiting for request from the kernel.", 937 hio); 938 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 939 if (sigexit_received) 940 pthread_exit(NULL); 941 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 942 } 943 error = ggio->gctl_error; 944 switch (error) { 945 case 0: 946 break; 947 case ECANCELED: 948 /* Exit gracefully. */ 949 if (!sigexit_received) { 950 pjdlog_debug(2, 951 "ggate_recv: (%p) Received cancel from the kernel.", 952 hio); 953 pjdlog_info("Received cancel from the kernel, exiting."); 954 } 955 pthread_exit(NULL); 956 case ENOMEM: 957 /* 958 * Buffer too small? Impossible, we allocate MAXPHYS 959 * bytes - request can't be bigger than that. 960 */ 961 /* FALLTHROUGH */ 962 case ENXIO: 963 default: 964 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 965 strerror(error)); 966 } 967 for (ii = 0; ii < ncomps; ii++) 968 hio->hio_errors[ii] = EINVAL; 969 reqlog(LOG_DEBUG, 2, ggio, 970 "ggate_recv: (%p) Request received from the kernel: ", 971 hio); 972 /* 973 * Inform all components about new write request. 974 * For read request prefer local component unless the given 975 * range is out-of-date, then use remote component. 976 */ 977 switch (ggio->gctl_cmd) { 978 case BIO_READ: 979 pjdlog_debug(2, 980 "ggate_recv: (%p) Moving request to the send queue.", 981 hio); 982 refcount_init(&hio->hio_countdown, 1); 983 mtx_lock(&metadata_lock); 984 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 985 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 986 /* 987 * This range is up-to-date on local component, 988 * so handle request locally. 989 */ 990 /* Local component is 0 for now. */ 991 ncomp = 0; 992 } else /* if (res->hr_syncsrc == 993 HAST_SYNCSRC_SECONDARY) */ { 994 assert(res->hr_syncsrc == 995 HAST_SYNCSRC_SECONDARY); 996 /* 997 * This range is out-of-date on local component, 998 * so send request to the remote node. 999 */ 1000 /* Remote component is 1 for now. */ 1001 ncomp = 1; 1002 } 1003 mtx_unlock(&metadata_lock); 1004 QUEUE_INSERT1(hio, send, ncomp); 1005 break; 1006 case BIO_WRITE: 1007 for (;;) { 1008 mtx_lock(&range_lock); 1009 if (rangelock_islocked(range_sync, 1010 ggio->gctl_offset, ggio->gctl_length)) { 1011 pjdlog_debug(2, 1012 "regular: Range offset=%jd length=%zu locked.", 1013 (intmax_t)ggio->gctl_offset, 1014 (size_t)ggio->gctl_length); 1015 range_regular_wait = true; 1016 cv_wait(&range_regular_cond, &range_lock); 1017 range_regular_wait = false; 1018 mtx_unlock(&range_lock); 1019 continue; 1020 } 1021 if (rangelock_add(range_regular, 1022 ggio->gctl_offset, ggio->gctl_length) < 0) { 1023 mtx_unlock(&range_lock); 1024 pjdlog_debug(2, 1025 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1026 (intmax_t)ggio->gctl_offset, 1027 (size_t)ggio->gctl_length); 1028 sleep(1); 1029 continue; 1030 } 1031 mtx_unlock(&range_lock); 1032 break; 1033 } 1034 mtx_lock(&res->hr_amp_lock); 1035 if (activemap_write_start(res->hr_amp, 1036 ggio->gctl_offset, ggio->gctl_length)) { 1037 (void)hast_activemap_flush(res); 1038 } 1039 mtx_unlock(&res->hr_amp_lock); 1040 /* FALLTHROUGH */ 1041 case BIO_DELETE: 1042 case BIO_FLUSH: 1043 pjdlog_debug(2, 1044 "ggate_recv: (%p) Moving request to the send queues.", 1045 hio); 1046 refcount_init(&hio->hio_countdown, ncomps); 1047 for (ii = 0; ii < ncomps; ii++) 1048 QUEUE_INSERT1(hio, send, ii); 1049 break; 1050 } 1051 } 1052 /* NOTREACHED */ 1053 return (NULL); 1054} 1055 1056/* 1057 * Thread reads from or writes to local component. 1058 * If local read fails, it redirects it to remote_send thread. 1059 */ 1060static void * 1061local_send_thread(void *arg) 1062{ 1063 struct hast_resource *res = arg; 1064 struct g_gate_ctl_io *ggio; 1065 struct hio *hio; 1066 unsigned int ncomp, rncomp; 1067 ssize_t ret; 1068 1069 /* Local component is 0 for now. */ 1070 ncomp = 0; 1071 /* Remote component is 1 for now. */ 1072 rncomp = 1; 1073 1074 for (;;) { 1075 pjdlog_debug(2, "local_send: Taking request."); 1076 QUEUE_TAKE1(hio, send, ncomp); 1077 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1078 ggio = &hio->hio_ggio; 1079 switch (ggio->gctl_cmd) { 1080 case BIO_READ: 1081 ret = pread(res->hr_localfd, ggio->gctl_data, 1082 ggio->gctl_length, 1083 ggio->gctl_offset + res->hr_localoff); 1084 if (ret == ggio->gctl_length) 1085 hio->hio_errors[ncomp] = 0; 1086 else { 1087 /* 1088 * If READ failed, try to read from remote node. 1089 */ 1090 QUEUE_INSERT1(hio, send, rncomp); 1091 continue; 1092 } 1093 break; 1094 case BIO_WRITE: 1095 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1096 ggio->gctl_length, 1097 ggio->gctl_offset + res->hr_localoff); 1098 if (ret < 0) 1099 hio->hio_errors[ncomp] = errno; 1100 else if (ret != ggio->gctl_length) 1101 hio->hio_errors[ncomp] = EIO; 1102 else 1103 hio->hio_errors[ncomp] = 0; 1104 break; 1105 case BIO_DELETE: 1106 ret = g_delete(res->hr_localfd, 1107 ggio->gctl_offset + res->hr_localoff, 1108 ggio->gctl_length); 1109 if (ret < 0) 1110 hio->hio_errors[ncomp] = errno; 1111 else 1112 hio->hio_errors[ncomp] = 0; 1113 break; 1114 case BIO_FLUSH: 1115 ret = g_flush(res->hr_localfd); 1116 if (ret < 0) 1117 hio->hio_errors[ncomp] = errno; 1118 else 1119 hio->hio_errors[ncomp] = 0; 1120 break; 1121 } 1122 if (refcount_release(&hio->hio_countdown)) { 1123 if (ISSYNCREQ(hio)) { 1124 mtx_lock(&sync_lock); 1125 SYNCREQDONE(hio); 1126 mtx_unlock(&sync_lock); 1127 cv_signal(&sync_cond); 1128 } else { 1129 pjdlog_debug(2, 1130 "local_send: (%p) Moving request to the done queue.", 1131 hio); 1132 QUEUE_INSERT2(hio, done); 1133 } 1134 } 1135 } 1136 /* NOTREACHED */ 1137 return (NULL); 1138} 1139 1140/* 1141 * Thread sends request to secondary node. 1142 */ 1143static void * 1144remote_send_thread(void *arg) 1145{ 1146 struct hast_resource *res = arg; 1147 struct g_gate_ctl_io *ggio; 1148 struct hio *hio; 1149 struct nv *nv; 1150 unsigned int ncomp; 1151 bool wakeup; 1152 uint64_t offset, length; 1153 uint8_t cmd; 1154 void *data; 1155 1156 /* Remote component is 1 for now. */ 1157 ncomp = 1; 1158 1159 for (;;) { 1160 pjdlog_debug(2, "remote_send: Taking request."); 1161 QUEUE_TAKE1(hio, send, ncomp); 1162 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1163 ggio = &hio->hio_ggio; 1164 switch (ggio->gctl_cmd) { 1165 case BIO_READ: 1166 cmd = HIO_READ; 1167 data = NULL; 1168 offset = ggio->gctl_offset; 1169 length = ggio->gctl_length; 1170 break; 1171 case BIO_WRITE: 1172 cmd = HIO_WRITE; 1173 data = ggio->gctl_data; 1174 offset = ggio->gctl_offset; 1175 length = ggio->gctl_length; 1176 break; 1177 case BIO_DELETE: 1178 cmd = HIO_DELETE; 1179 data = NULL; 1180 offset = ggio->gctl_offset; 1181 length = ggio->gctl_length; 1182 break; 1183 case BIO_FLUSH: 1184 cmd = HIO_FLUSH; 1185 data = NULL; 1186 offset = 0; 1187 length = 0; 1188 break; 1189 default: 1190 assert(!"invalid condition"); 1191 abort(); 1192 } 1193 nv = nv_alloc(); 1194 nv_add_uint8(nv, cmd, "cmd"); 1195 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1196 nv_add_uint64(nv, offset, "offset"); 1197 nv_add_uint64(nv, length, "length"); 1198 if (nv_error(nv) != 0) { 1199 hio->hio_errors[ncomp] = nv_error(nv); 1200 pjdlog_debug(2, 1201 "remote_send: (%p) Unable to prepare header to send.", 1202 hio); 1203 reqlog(LOG_ERR, 0, ggio, 1204 "Unable to prepare header to send (%s): ", 1205 strerror(nv_error(nv))); 1206 /* Move failed request immediately to the done queue. */ 1207 goto done_queue; 1208 } 1209 pjdlog_debug(2, 1210 "remote_send: (%p) Moving request to the recv queue.", 1211 hio); 1212 /* 1213 * Protect connection from disappearing. 1214 */ 1215 rw_rlock(&hio_remote_lock[ncomp]); 1216 if (!ISCONNECTED(res, ncomp)) { 1217 rw_unlock(&hio_remote_lock[ncomp]); 1218 hio->hio_errors[ncomp] = ENOTCONN; 1219 goto done_queue; 1220 } 1221 /* 1222 * Move the request to recv queue before sending it, because 1223 * in different order we can get reply before we move request 1224 * to recv queue. 1225 */ 1226 mtx_lock(&hio_recv_list_lock[ncomp]); 1227 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1228 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1229 mtx_unlock(&hio_recv_list_lock[ncomp]); 1230 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1231 data != NULL ? length : 0) < 0) { 1232 hio->hio_errors[ncomp] = errno; 1233 rw_unlock(&hio_remote_lock[ncomp]); 1234 pjdlog_debug(2, 1235 "remote_send: (%p) Unable to send request.", hio); 1236 reqlog(LOG_ERR, 0, ggio, 1237 "Unable to send request (%s): ", 1238 strerror(hio->hio_errors[ncomp])); 1239 remote_close(res, ncomp); 1240 /* 1241 * Take request back from the receive queue and move 1242 * it immediately to the done queue. 1243 */ 1244 mtx_lock(&hio_recv_list_lock[ncomp]); 1245 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1246 mtx_unlock(&hio_recv_list_lock[ncomp]); 1247 goto done_queue; 1248 } 1249 rw_unlock(&hio_remote_lock[ncomp]); 1250 nv_free(nv); 1251 if (wakeup) 1252 cv_signal(&hio_recv_list_cond[ncomp]); 1253 continue; 1254done_queue: 1255 nv_free(nv); 1256 if (ISSYNCREQ(hio)) { 1257 if (!refcount_release(&hio->hio_countdown)) 1258 continue; 1259 mtx_lock(&sync_lock); 1260 SYNCREQDONE(hio); 1261 mtx_unlock(&sync_lock); 1262 cv_signal(&sync_cond); 1263 continue; 1264 } 1265 if (ggio->gctl_cmd == BIO_WRITE) { 1266 mtx_lock(&res->hr_amp_lock); 1267 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1268 ggio->gctl_length)) { 1269 (void)hast_activemap_flush(res); 1270 } 1271 mtx_unlock(&res->hr_amp_lock); 1272 } 1273 if (!refcount_release(&hio->hio_countdown)) 1274 continue; 1275 pjdlog_debug(2, 1276 "remote_send: (%p) Moving request to the done queue.", 1277 hio); 1278 QUEUE_INSERT2(hio, done); 1279 } 1280 /* NOTREACHED */ 1281 return (NULL); 1282} 1283 1284/* 1285 * Thread receives answer from secondary node and passes it to ggate_send 1286 * thread. 1287 */ 1288static void * 1289remote_recv_thread(void *arg) 1290{ 1291 struct hast_resource *res = arg; 1292 struct g_gate_ctl_io *ggio; 1293 struct hio *hio; 1294 struct nv *nv; 1295 unsigned int ncomp; 1296 uint64_t seq; 1297 int error; 1298 1299 /* Remote component is 1 for now. */ 1300 ncomp = 1; 1301 1302 for (;;) { 1303 /* Wait until there is anything to receive. */ 1304 mtx_lock(&hio_recv_list_lock[ncomp]); 1305 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1306 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1307 cv_wait(&hio_recv_list_cond[ncomp], 1308 &hio_recv_list_lock[ncomp]); 1309 } 1310 mtx_unlock(&hio_recv_list_lock[ncomp]); 1311 rw_rlock(&hio_remote_lock[ncomp]); 1312 if (!ISCONNECTED(res, ncomp)) { 1313 rw_unlock(&hio_remote_lock[ncomp]); 1314 /* 1315 * Connection is dead, so move all pending requests to 1316 * the done queue (one-by-one). 1317 */ 1318 mtx_lock(&hio_recv_list_lock[ncomp]); 1319 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1320 assert(hio != NULL); 1321 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1322 hio_next[ncomp]); 1323 mtx_unlock(&hio_recv_list_lock[ncomp]); 1324 goto done_queue; 1325 } 1326 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1327 pjdlog_errno(LOG_ERR, 1328 "Unable to receive reply header"); 1329 rw_unlock(&hio_remote_lock[ncomp]); 1330 remote_close(res, ncomp); 1331 continue; 1332 } 1333 rw_unlock(&hio_remote_lock[ncomp]); 1334 seq = nv_get_uint64(nv, "seq"); 1335 if (seq == 0) { 1336 pjdlog_error("Header contains no 'seq' field."); 1337 nv_free(nv); 1338 continue; 1339 } 1340 mtx_lock(&hio_recv_list_lock[ncomp]); 1341 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1342 if (hio->hio_ggio.gctl_seq == seq) { 1343 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1344 hio_next[ncomp]); 1345 break; 1346 } 1347 } 1348 mtx_unlock(&hio_recv_list_lock[ncomp]); 1349 if (hio == NULL) { 1350 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1351 (uintmax_t)seq); 1352 nv_free(nv); 1353 continue; 1354 } 1355 error = nv_get_int16(nv, "error"); 1356 if (error != 0) { 1357 /* Request failed on remote side. */ 1358 hio->hio_errors[ncomp] = 0; 1359 nv_free(nv); 1360 goto done_queue; 1361 } 1362 ggio = &hio->hio_ggio; 1363 switch (ggio->gctl_cmd) { 1364 case BIO_READ: 1365 rw_rlock(&hio_remote_lock[ncomp]); 1366 if (!ISCONNECTED(res, ncomp)) { 1367 rw_unlock(&hio_remote_lock[ncomp]); 1368 nv_free(nv); 1369 goto done_queue; 1370 } 1371 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1372 ggio->gctl_data, ggio->gctl_length) < 0) { 1373 hio->hio_errors[ncomp] = errno; 1374 pjdlog_errno(LOG_ERR, 1375 "Unable to receive reply data"); 1376 rw_unlock(&hio_remote_lock[ncomp]); 1377 nv_free(nv); 1378 remote_close(res, ncomp); 1379 goto done_queue; 1380 } 1381 rw_unlock(&hio_remote_lock[ncomp]); 1382 break; 1383 case BIO_WRITE: 1384 case BIO_DELETE: 1385 case BIO_FLUSH: 1386 break; 1387 default: 1388 assert(!"invalid condition"); 1389 abort(); 1390 } 1391 hio->hio_errors[ncomp] = 0; 1392 nv_free(nv); 1393done_queue: 1394 if (refcount_release(&hio->hio_countdown)) { 1395 if (ISSYNCREQ(hio)) { 1396 mtx_lock(&sync_lock); 1397 SYNCREQDONE(hio); 1398 mtx_unlock(&sync_lock); 1399 cv_signal(&sync_cond); 1400 } else { 1401 pjdlog_debug(2, 1402 "remote_recv: (%p) Moving request to the done queue.", 1403 hio); 1404 QUEUE_INSERT2(hio, done); 1405 } 1406 } 1407 } 1408 /* NOTREACHED */ 1409 return (NULL); 1410} 1411 1412/* 1413 * Thread sends answer to the kernel. 1414 */ 1415static void * 1416ggate_send_thread(void *arg) 1417{ 1418 struct hast_resource *res = arg; 1419 struct g_gate_ctl_io *ggio; 1420 struct hio *hio; 1421 unsigned int ii, ncomp, ncomps; 1422 1423 ncomps = HAST_NCOMPONENTS; 1424 1425 for (;;) { 1426 pjdlog_debug(2, "ggate_send: Taking request."); 1427 QUEUE_TAKE2(hio, done); 1428 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1429 ggio = &hio->hio_ggio; 1430 for (ii = 0; ii < ncomps; ii++) { 1431 if (hio->hio_errors[ii] == 0) { 1432 /* 1433 * One successful request is enough to declare 1434 * success. 1435 */ 1436 ggio->gctl_error = 0; 1437 break; 1438 } 1439 } 1440 if (ii == ncomps) { 1441 /* 1442 * None of the requests were successful. 1443 * Use first error. 1444 */ 1445 ggio->gctl_error = hio->hio_errors[0]; 1446 } 1447 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1448 mtx_lock(&res->hr_amp_lock); 1449 activemap_write_complete(res->hr_amp, 1450 ggio->gctl_offset, ggio->gctl_length); 1451 mtx_unlock(&res->hr_amp_lock); 1452 } 1453 if (ggio->gctl_cmd == BIO_WRITE) { 1454 /* 1455 * Unlock range we locked. 1456 */ 1457 mtx_lock(&range_lock); 1458 rangelock_del(range_regular, ggio->gctl_offset, 1459 ggio->gctl_length); 1460 if (range_sync_wait) 1461 cv_signal(&range_sync_cond); 1462 mtx_unlock(&range_lock); 1463 /* 1464 * Bump local count if this is first write after 1465 * connection failure with remote node. 1466 */ 1467 ncomp = 1; 1468 rw_rlock(&hio_remote_lock[ncomp]); 1469 if (!ISCONNECTED(res, ncomp)) { 1470 mtx_lock(&metadata_lock); 1471 if (res->hr_primary_localcnt == 1472 res->hr_secondary_remotecnt) { 1473 res->hr_primary_localcnt++; 1474 pjdlog_debug(1, 1475 "Increasing localcnt to %ju.", 1476 (uintmax_t)res->hr_primary_localcnt); 1477 (void)metadata_write(res); 1478 } 1479 mtx_unlock(&metadata_lock); 1480 } 1481 rw_unlock(&hio_remote_lock[ncomp]); 1482 } 1483 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1484 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1485 pjdlog_debug(2, 1486 "ggate_send: (%p) Moving request to the free queue.", hio); 1487 QUEUE_INSERT2(hio, free); 1488 } 1489 /* NOTREACHED */ 1490 return (NULL); 1491} 1492 1493/* 1494 * Thread synchronize local and remote components. 1495 */ 1496static void * 1497sync_thread(void *arg __unused) 1498{ 1499 struct hast_resource *res = arg; 1500 struct hio *hio; 1501 struct g_gate_ctl_io *ggio; 1502 unsigned int ii, ncomp, ncomps; 1503 off_t offset, length, synced; 1504 bool dorewind; 1505 int syncext; 1506 1507 ncomps = HAST_NCOMPONENTS; 1508 dorewind = true; 1509 synced = 0; 1510 offset = -1; 1511 1512 for (;;) { 1513 mtx_lock(&sync_lock); 1514 if (offset >= 0 && !sync_inprogress) { 1515 pjdlog_info("Synchronization interrupted. " 1516 "%jd bytes synchronized so far.", 1517 (intmax_t)synced); 1518 event_send(res, EVENT_SYNCINTR); 1519 } 1520 while (!sync_inprogress) { 1521 dorewind = true; 1522 synced = 0; 1523 cv_wait(&sync_cond, &sync_lock); 1524 } 1525 mtx_unlock(&sync_lock); 1526 /* 1527 * Obtain offset at which we should synchronize. 1528 * Rewind synchronization if needed. 1529 */ 1530 mtx_lock(&res->hr_amp_lock); 1531 if (dorewind) 1532 activemap_sync_rewind(res->hr_amp); 1533 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1534 if (syncext != -1) { 1535 /* 1536 * We synchronized entire syncext extent, we can mark 1537 * it as clean now. 1538 */ 1539 if (activemap_extent_complete(res->hr_amp, syncext)) 1540 (void)hast_activemap_flush(res); 1541 } 1542 mtx_unlock(&res->hr_amp_lock); 1543 if (dorewind) { 1544 dorewind = false; 1545 if (offset < 0) 1546 pjdlog_info("Nodes are in sync."); 1547 else { 1548 pjdlog_info("Synchronization started. %ju bytes to go.", 1549 (uintmax_t)(res->hr_extentsize * 1550 activemap_ndirty(res->hr_amp))); 1551 event_send(res, EVENT_SYNCSTART); 1552 } 1553 } 1554 if (offset < 0) { 1555 sync_stop(); 1556 pjdlog_debug(1, "Nothing to synchronize."); 1557 /* 1558 * Synchronization complete, make both localcnt and 1559 * remotecnt equal. 1560 */ 1561 ncomp = 1; 1562 rw_rlock(&hio_remote_lock[ncomp]); 1563 if (ISCONNECTED(res, ncomp)) { 1564 if (synced > 0) { 1565 pjdlog_info("Synchronization complete. " 1566 "%jd bytes synchronized.", 1567 (intmax_t)synced); 1568 event_send(res, EVENT_SYNCDONE); 1569 } 1570 mtx_lock(&metadata_lock); 1571 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1572 res->hr_primary_localcnt = 1573 res->hr_secondary_localcnt; 1574 res->hr_primary_remotecnt = 1575 res->hr_secondary_remotecnt; 1576 pjdlog_debug(1, 1577 "Setting localcnt to %ju and remotecnt to %ju.", 1578 (uintmax_t)res->hr_primary_localcnt, 1579 (uintmax_t)res->hr_secondary_localcnt); 1580 (void)metadata_write(res); 1581 mtx_unlock(&metadata_lock); 1582 } 1583 rw_unlock(&hio_remote_lock[ncomp]); 1584 continue; 1585 } 1586 pjdlog_debug(2, "sync: Taking free request."); 1587 QUEUE_TAKE2(hio, free); 1588 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1589 /* 1590 * Lock the range we are going to synchronize. We don't want 1591 * race where someone writes between our read and write. 1592 */ 1593 for (;;) { 1594 mtx_lock(&range_lock); 1595 if (rangelock_islocked(range_regular, offset, length)) { 1596 pjdlog_debug(2, 1597 "sync: Range offset=%jd length=%jd locked.", 1598 (intmax_t)offset, (intmax_t)length); 1599 range_sync_wait = true; 1600 cv_wait(&range_sync_cond, &range_lock); 1601 range_sync_wait = false; 1602 mtx_unlock(&range_lock); 1603 continue; 1604 } 1605 if (rangelock_add(range_sync, offset, length) < 0) { 1606 mtx_unlock(&range_lock); 1607 pjdlog_debug(2, 1608 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1609 (intmax_t)offset, (intmax_t)length); 1610 sleep(1); 1611 continue; 1612 } 1613 mtx_unlock(&range_lock); 1614 break; 1615 } 1616 /* 1617 * First read the data from synchronization source. 1618 */ 1619 SYNCREQ(hio); 1620 ggio = &hio->hio_ggio; 1621 ggio->gctl_cmd = BIO_READ; 1622 ggio->gctl_offset = offset; 1623 ggio->gctl_length = length; 1624 ggio->gctl_error = 0; 1625 for (ii = 0; ii < ncomps; ii++) 1626 hio->hio_errors[ii] = EINVAL; 1627 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1628 hio); 1629 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1630 hio); 1631 mtx_lock(&metadata_lock); 1632 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1633 /* 1634 * This range is up-to-date on local component, 1635 * so handle request locally. 1636 */ 1637 /* Local component is 0 for now. */ 1638 ncomp = 0; 1639 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1640 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1641 /* 1642 * This range is out-of-date on local component, 1643 * so send request to the remote node. 1644 */ 1645 /* Remote component is 1 for now. */ 1646 ncomp = 1; 1647 } 1648 mtx_unlock(&metadata_lock); 1649 refcount_init(&hio->hio_countdown, 1); 1650 QUEUE_INSERT1(hio, send, ncomp); 1651 1652 /* 1653 * Let's wait for READ to finish. 1654 */ 1655 mtx_lock(&sync_lock); 1656 while (!ISSYNCREQDONE(hio)) 1657 cv_wait(&sync_cond, &sync_lock); 1658 mtx_unlock(&sync_lock); 1659 1660 if (hio->hio_errors[ncomp] != 0) { 1661 pjdlog_error("Unable to read synchronization data: %s.", 1662 strerror(hio->hio_errors[ncomp])); 1663 goto free_queue; 1664 } 1665 1666 /* 1667 * We read the data from synchronization source, now write it 1668 * to synchronization target. 1669 */ 1670 SYNCREQ(hio); 1671 ggio->gctl_cmd = BIO_WRITE; 1672 for (ii = 0; ii < ncomps; ii++) 1673 hio->hio_errors[ii] = EINVAL; 1674 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1675 hio); 1676 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1677 hio); 1678 mtx_lock(&metadata_lock); 1679 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1680 /* 1681 * This range is up-to-date on local component, 1682 * so we update remote component. 1683 */ 1684 /* Remote component is 1 for now. */ 1685 ncomp = 1; 1686 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1687 assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1688 /* 1689 * This range is out-of-date on local component, 1690 * so we update it. 1691 */ 1692 /* Local component is 0 for now. */ 1693 ncomp = 0; 1694 } 1695 mtx_unlock(&metadata_lock); 1696 1697 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1698 hio); 1699 refcount_init(&hio->hio_countdown, 1); 1700 QUEUE_INSERT1(hio, send, ncomp); 1701 1702 /* 1703 * Let's wait for WRITE to finish. 1704 */ 1705 mtx_lock(&sync_lock); 1706 while (!ISSYNCREQDONE(hio)) 1707 cv_wait(&sync_cond, &sync_lock); 1708 mtx_unlock(&sync_lock); 1709 1710 if (hio->hio_errors[ncomp] != 0) { 1711 pjdlog_error("Unable to write synchronization data: %s.", 1712 strerror(hio->hio_errors[ncomp])); 1713 goto free_queue; 1714 } 1715 1716 synced += length; 1717free_queue: 1718 mtx_lock(&range_lock); 1719 rangelock_del(range_sync, offset, length); 1720 if (range_regular_wait) 1721 cv_signal(&range_regular_cond); 1722 mtx_unlock(&range_lock); 1723 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1724 hio); 1725 QUEUE_INSERT2(hio, free); 1726 } 1727 /* NOTREACHED */ 1728 return (NULL); 1729} 1730 1731static void 1732config_reload(void) 1733{ 1734 struct hastd_config *newcfg; 1735 struct hast_resource *res; 1736 unsigned int ii, ncomps; 1737 int modified; 1738 1739 pjdlog_info("Reloading configuration..."); 1740 1741 ncomps = HAST_NCOMPONENTS; 1742 1743 newcfg = yy_config_parse(cfgpath, false); 1744 if (newcfg == NULL) 1745 goto failed; 1746 1747 TAILQ_FOREACH(res, &newcfg->hc_resources, hr_next) { 1748 if (strcmp(res->hr_name, gres->hr_name) == 0) 1749 break; 1750 } 1751 /* 1752 * If resource was removed from the configuration file, resource 1753 * name, provider name or path to local component was modified we 1754 * shouldn't be here. This means that someone modified configuration 1755 * file and send SIGHUP to us instead of main hastd process. 1756 * Log advice and ignore the signal. 1757 */ 1758 if (res == NULL || strcmp(gres->hr_name, res->hr_name) != 0 || 1759 strcmp(gres->hr_provname, res->hr_provname) != 0 || 1760 strcmp(gres->hr_localpath, res->hr_localpath) != 0) { 1761 pjdlog_warning("To reload configuration send SIGHUP to the main hastd process (pid %u).", 1762 (unsigned int)getppid()); 1763 goto failed; 1764 } 1765 1766#define MODIFIED_REMOTEADDR 0x1 1767#define MODIFIED_REPLICATION 0x2 1768#define MODIFIED_TIMEOUT 0x4 1769#define MODIFIED_EXEC 0x8 1770 modified = 0; 1771 if (strcmp(gres->hr_remoteaddr, res->hr_remoteaddr) != 0) { 1772 /* 1773 * Don't copy res->hr_remoteaddr to gres just yet. 1774 * We want remote_close() to log disconnect from the old 1775 * addresses, not from the new ones. 1776 */ 1777 modified |= MODIFIED_REMOTEADDR; 1778 } 1779 if (gres->hr_replication != res->hr_replication) { 1780 gres->hr_replication = res->hr_replication; 1781 modified |= MODIFIED_REPLICATION; 1782 } 1783 if (gres->hr_timeout != res->hr_timeout) { 1784 gres->hr_timeout = res->hr_timeout; 1785 modified |= MODIFIED_TIMEOUT; 1786 } 1787 if (strcmp(gres->hr_exec, res->hr_exec) != 0) { 1788 strlcpy(gres->hr_exec, res->hr_exec, sizeof(gres->hr_exec)); 1789 modified |= MODIFIED_EXEC; 1790 } 1791 /* 1792 * If only timeout was modified we only need to change it without 1793 * reconnecting. 1794 */ 1795 if (modified == MODIFIED_TIMEOUT) { 1796 for (ii = 0; ii < ncomps; ii++) { 1797 if (!ISREMOTE(ii)) 1798 continue; 1799 rw_rlock(&hio_remote_lock[ii]); 1800 if (!ISCONNECTED(gres, ii)) { 1801 rw_unlock(&hio_remote_lock[ii]); 1802 continue; 1803 } 1804 rw_unlock(&hio_remote_lock[ii]); 1805 if (proto_timeout(gres->hr_remotein, 1806 gres->hr_timeout) < 0) { 1807 pjdlog_errno(LOG_WARNING, 1808 "Unable to set connection timeout"); 1809 } 1810 if (proto_timeout(gres->hr_remoteout, 1811 gres->hr_timeout) < 0) { 1812 pjdlog_errno(LOG_WARNING, 1813 "Unable to set connection timeout"); 1814 } 1815 } 1816 } else if ((modified & 1817 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 1818 for (ii = 0; ii < ncomps; ii++) { 1819 if (!ISREMOTE(ii)) 1820 continue; 1821 remote_close(gres, ii); 1822 } 1823 if (modified & MODIFIED_REMOTEADDR) { 1824 strlcpy(gres->hr_remoteaddr, res->hr_remoteaddr, 1825 sizeof(gres->hr_remoteaddr)); 1826 } 1827 } 1828#undef MODIFIED_REMOTEADDR 1829#undef MODIFIED_REPLICATION 1830#undef MODIFIED_TIMEOUT 1831#undef MODIFIED_EXEC 1832 1833 pjdlog_info("Configuration reloaded successfully."); 1834 return; 1835failed: 1836 if (newcfg != NULL) { 1837 if (newcfg->hc_controlconn != NULL) 1838 proto_close(newcfg->hc_controlconn); 1839 if (newcfg->hc_listenconn != NULL) 1840 proto_close(newcfg->hc_listenconn); 1841 yy_config_free(newcfg); 1842 } 1843 pjdlog_warning("Configuration not reloaded."); 1844} 1845 1846static void 1847keepalive_send(struct hast_resource *res, unsigned int ncomp) 1848{ 1849 struct nv *nv; 1850 1851 nv = nv_alloc(); 1852 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1853 if (nv_error(nv) != 0) { 1854 nv_free(nv); 1855 pjdlog_debug(1, 1856 "keepalive_send: Unable to prepare header to send."); 1857 return; 1858 } 1859 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1860 pjdlog_common(LOG_DEBUG, 1, errno, 1861 "keepalive_send: Unable to send request"); 1862 nv_free(nv); 1863 rw_unlock(&hio_remote_lock[ncomp]); 1864 remote_close(res, ncomp); 1865 rw_rlock(&hio_remote_lock[ncomp]); 1866 return; 1867 } 1868 nv_free(nv); 1869 pjdlog_debug(2, "keepalive_send: Request sent."); 1870} 1871 1872static void 1873guard_one(struct hast_resource *res, unsigned int ncomp) 1874{ 1875 struct proto_conn *in, *out; 1876 1877 if (!ISREMOTE(ncomp)) 1878 return; 1879 1880 rw_rlock(&hio_remote_lock[ncomp]); 1881 1882 if (!real_remote(res)) { 1883 rw_unlock(&hio_remote_lock[ncomp]); 1884 return; 1885 } 1886 1887 if (ISCONNECTED(res, ncomp)) { 1888 assert(res->hr_remotein != NULL); 1889 assert(res->hr_remoteout != NULL); 1890 keepalive_send(res, ncomp); 1891 } 1892 1893 if (ISCONNECTED(res, ncomp)) { 1894 assert(res->hr_remotein != NULL); 1895 assert(res->hr_remoteout != NULL); 1896 rw_unlock(&hio_remote_lock[ncomp]); 1897 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 1898 res->hr_remoteaddr); 1899 return; 1900 } 1901 1902 assert(res->hr_remotein == NULL); 1903 assert(res->hr_remoteout == NULL); 1904 /* 1905 * Upgrade the lock. It doesn't have to be atomic as no other thread 1906 * can change connection status from disconnected to connected. 1907 */ 1908 rw_unlock(&hio_remote_lock[ncomp]); 1909 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 1910 res->hr_remoteaddr); 1911 in = out = NULL; 1912 if (init_remote(res, &in, &out)) { 1913 rw_wlock(&hio_remote_lock[ncomp]); 1914 assert(res->hr_remotein == NULL); 1915 assert(res->hr_remoteout == NULL); 1916 assert(in != NULL && out != NULL); 1917 res->hr_remotein = in; 1918 res->hr_remoteout = out; 1919 rw_unlock(&hio_remote_lock[ncomp]); 1920 pjdlog_info("Successfully reconnected to %s.", 1921 res->hr_remoteaddr); 1922 sync_start(); 1923 } else { 1924 /* Both connections should be NULL. */ 1925 assert(res->hr_remotein == NULL); 1926 assert(res->hr_remoteout == NULL); 1927 assert(in == NULL && out == NULL); 1928 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 1929 res->hr_remoteaddr); 1930 } 1931} 1932 1933/* 1934 * Thread guards remote connections and reconnects when needed, handles 1935 * signals, etc. 1936 */ 1937static void * 1938guard_thread(void *arg) 1939{ 1940 struct hast_resource *res = arg; 1941 unsigned int ii, ncomps; 1942 struct timespec timeout; 1943 time_t lastcheck, now; 1944 sigset_t mask; 1945 int signo; 1946 1947 ncomps = HAST_NCOMPONENTS; 1948 lastcheck = time(NULL); 1949 1950 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 1951 PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); 1952 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 1953 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 1954 1955 timeout.tv_nsec = 0; 1956 signo = -1; 1957 1958 for (;;) { 1959 switch (signo) { 1960 case SIGHUP: 1961 config_reload(); 1962 break; 1963 case SIGINT: 1964 case SIGTERM: 1965 sigexit_received = true; 1966 primary_exitx(EX_OK, 1967 "Termination signal received, exiting."); 1968 break; 1969 default: 1970 break; 1971 } 1972 1973 pjdlog_debug(2, "remote_guard: Checking connections."); 1974 now = time(NULL); 1975 if (lastcheck + RETRY_SLEEP <= now) { 1976 for (ii = 0; ii < ncomps; ii++) 1977 guard_one(res, ii); 1978 lastcheck = now; 1979 } 1980 timeout.tv_sec = RETRY_SLEEP; 1981 signo = sigtimedwait(&mask, NULL, &timeout); 1982 } 1983 /* NOTREACHED */ 1984 return (NULL); 1985} 1986