primary.c revision 259194
1207753Smm/*- 2207753Smm * Copyright (c) 2009 The FreeBSD Foundation 3207753Smm * Copyright (c) 2010-2011 Pawel Jakub Dawidek <pawel@dawidek.net> 4207753Smm * All rights reserved. 5207753Smm * 6207753Smm * This software was developed by Pawel Jakub Dawidek under sponsorship from 7207753Smm * the FreeBSD Foundation. 8207753Smm * 9207753Smm * Redistribution and use in source and binary forms, with or without 10207753Smm * modification, are permitted provided that the following conditions 11207753Smm * are met: 12207753Smm * 1. Redistributions of source code must retain the above copyright 13207753Smm * notice, this list of conditions and the following disclaimer. 14207753Smm * 2. Redistributions in binary form must reproduce the above copyright 15207753Smm * notice, this list of conditions and the following disclaimer in the 16207753Smm * documentation and/or other materials provided with the distribution. 17207753Smm * 18207753Smm * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19207753Smm * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20207753Smm * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21207753Smm * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22207753Smm * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23207753Smm * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24207753Smm * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25207753Smm * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26207753Smm * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27207753Smm * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28207753Smm * SUCH DAMAGE. 29207753Smm */ 30207753Smm 31207753Smm#include <sys/cdefs.h> 32207753Smm__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 259194 2013-12-10 20:05:07Z trociny $"); 33207753Smm 34207753Smm#include <sys/types.h> 35207753Smm#include <sys/time.h> 36207753Smm#include <sys/bio.h> 37207753Smm#include <sys/disk.h> 38207753Smm#include <sys/stat.h> 39207753Smm 40207753Smm#include <geom/gate/g_gate.h> 41207753Smm 42207753Smm#include <err.h> 43207753Smm#include <errno.h> 44207753Smm#include <fcntl.h> 45207753Smm#include <libgeom.h> 46207753Smm#include <pthread.h> 47207753Smm#include <signal.h> 48#include <stdint.h> 49#include <stdio.h> 50#include <string.h> 51#include <sysexits.h> 52#include <unistd.h> 53 54#include <activemap.h> 55#include <nv.h> 56#include <rangelock.h> 57 58#include "control.h" 59#include "event.h" 60#include "hast.h" 61#include "hast_proto.h" 62#include "hastd.h" 63#include "hooks.h" 64#include "metadata.h" 65#include "proto.h" 66#include "pjdlog.h" 67#include "refcnt.h" 68#include "subr.h" 69#include "synch.h" 70 71/* The is only one remote component for now. */ 72#define ISREMOTE(no) ((no) == 1) 73 74struct hio { 75 /* 76 * Number of components we are still waiting for. 77 * When this field goes to 0, we can send the request back to the 78 * kernel. Each component has to decrease this counter by one 79 * even on failure. 80 */ 81 refcnt_t hio_countdown; 82 /* 83 * Each component has a place to store its own error. 84 * Once the request is handled by all components we can decide if the 85 * request overall is successful or not. 86 */ 87 int *hio_errors; 88 /* 89 * Structure used to communicate with GEOM Gate class. 90 */ 91 struct g_gate_ctl_io hio_ggio; 92 /* 93 * Request was already confirmed to GEOM Gate. 94 */ 95 bool hio_done; 96 /* 97 * Number of components we are still waiting before sending write 98 * completion ack to GEOM Gate. Used for memsync. 99 */ 100 refcnt_t hio_writecount; 101 /* 102 * Memsync request was acknowleged by remote. 103 */ 104 bool hio_memsyncacked; 105 /* 106 * Remember replication from the time the request was initiated, 107 * so we won't get confused when replication changes on reload. 108 */ 109 int hio_replication; 110 TAILQ_ENTRY(hio) *hio_next; 111}; 112#define hio_free_next hio_next[0] 113#define hio_done_next hio_next[0] 114 115/* 116 * Free list holds unused structures. When free list is empty, we have to wait 117 * until some in-progress requests are freed. 118 */ 119static TAILQ_HEAD(, hio) hio_free_list; 120static size_t hio_free_list_size; 121static pthread_mutex_t hio_free_list_lock; 122static pthread_cond_t hio_free_list_cond; 123/* 124 * There is one send list for every component. One requests is placed on all 125 * send lists - each component gets the same request, but each component is 126 * responsible for managing his own send list. 127 */ 128static TAILQ_HEAD(, hio) *hio_send_list; 129static size_t *hio_send_list_size; 130static pthread_mutex_t *hio_send_list_lock; 131static pthread_cond_t *hio_send_list_cond; 132#define hio_send_local_list_size hio_send_list_size[0] 133#define hio_send_remote_list_size hio_send_list_size[1] 134/* 135 * There is one recv list for every component, although local components don't 136 * use recv lists as local requests are done synchronously. 137 */ 138static TAILQ_HEAD(, hio) *hio_recv_list; 139static size_t *hio_recv_list_size; 140static pthread_mutex_t *hio_recv_list_lock; 141static pthread_cond_t *hio_recv_list_cond; 142#define hio_recv_remote_list_size hio_recv_list_size[1] 143/* 144 * Request is placed on done list by the slowest component (the one that 145 * decreased hio_countdown from 1 to 0). 146 */ 147static TAILQ_HEAD(, hio) hio_done_list; 148static size_t hio_done_list_size; 149static pthread_mutex_t hio_done_list_lock; 150static pthread_cond_t hio_done_list_cond; 151/* 152 * Structure below are for interaction with sync thread. 153 */ 154static bool sync_inprogress; 155static pthread_mutex_t sync_lock; 156static pthread_cond_t sync_cond; 157/* 158 * The lock below allows to synchornize access to remote connections. 159 */ 160static pthread_rwlock_t *hio_remote_lock; 161 162/* 163 * Lock to synchronize metadata updates. Also synchronize access to 164 * hr_primary_localcnt and hr_primary_remotecnt fields. 165 */ 166static pthread_mutex_t metadata_lock; 167 168/* 169 * Maximum number of outstanding I/O requests. 170 */ 171#define HAST_HIO_MAX 256 172/* 173 * Number of components. At this point there are only two components: local 174 * and remote, but in the future it might be possible to use multiple local 175 * and remote components. 176 */ 177#define HAST_NCOMPONENTS 2 178 179#define ISCONNECTED(res, no) \ 180 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 181 182#define QUEUE_INSERT1(hio, name, ncomp) do { \ 183 bool _wakeup; \ 184 \ 185 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 186 _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 187 TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 188 hio_next[(ncomp)]); \ 189 hio_##name##_list_size[(ncomp)]++; \ 190 mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 191 if (_wakeup) \ 192 cv_broadcast(&hio_##name##_list_cond[(ncomp)]); \ 193} while (0) 194#define QUEUE_INSERT2(hio, name) do { \ 195 bool _wakeup; \ 196 \ 197 mtx_lock(&hio_##name##_list_lock); \ 198 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 199 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 200 hio_##name##_list_size++; \ 201 mtx_unlock(&hio_##name##_list_lock); \ 202 if (_wakeup) \ 203 cv_broadcast(&hio_##name##_list_cond); \ 204} while (0) 205#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 206 bool _last; \ 207 \ 208 mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 209 _last = false; \ 210 while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 211 cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 212 &hio_##name##_list_lock[(ncomp)], (timeout)); \ 213 if ((timeout) != 0) \ 214 _last = true; \ 215 } \ 216 if (hio != NULL) { \ 217 PJDLOG_ASSERT(hio_##name##_list_size[(ncomp)] != 0); \ 218 hio_##name##_list_size[(ncomp)]--; \ 219 TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 220 hio_next[(ncomp)]); \ 221 } \ 222 mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 223} while (0) 224#define QUEUE_TAKE2(hio, name) do { \ 225 mtx_lock(&hio_##name##_list_lock); \ 226 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 227 cv_wait(&hio_##name##_list_cond, \ 228 &hio_##name##_list_lock); \ 229 } \ 230 PJDLOG_ASSERT(hio_##name##_list_size != 0); \ 231 hio_##name##_list_size--; \ 232 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 233 mtx_unlock(&hio_##name##_list_lock); \ 234} while (0) 235 236#define ISFULLSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_FULLSYNC) 237#define ISMEMSYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_MEMSYNC) 238#define ISASYNC(hio) ((hio)->hio_replication == HAST_REPLICATION_ASYNC) 239 240#define SYNCREQ(hio) do { \ 241 (hio)->hio_ggio.gctl_unit = -1; \ 242 (hio)->hio_ggio.gctl_seq = 1; \ 243} while (0) 244#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 245#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 246#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 247 248#define ISMEMSYNCWRITE(hio) (ISMEMSYNC(hio) && \ 249 (hio)->hio_ggio.gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) 250 251static struct hast_resource *gres; 252 253static pthread_mutex_t range_lock; 254static struct rangelocks *range_regular; 255static bool range_regular_wait; 256static pthread_cond_t range_regular_cond; 257static struct rangelocks *range_sync; 258static bool range_sync_wait; 259static pthread_cond_t range_sync_cond; 260static bool fullystarted; 261 262static void *ggate_recv_thread(void *arg); 263static void *local_send_thread(void *arg); 264static void *remote_send_thread(void *arg); 265static void *remote_recv_thread(void *arg); 266static void *ggate_send_thread(void *arg); 267static void *sync_thread(void *arg); 268static void *guard_thread(void *arg); 269 270static void 271output_status_aux(struct nv *nvout) 272{ 273 274 nv_add_uint64(nvout, (uint64_t)hio_free_list_size, 275 "idle_queue_size"); 276 nv_add_uint64(nvout, (uint64_t)hio_send_local_list_size, 277 "local_queue_size"); 278 nv_add_uint64(nvout, (uint64_t)hio_send_remote_list_size, 279 "send_queue_size"); 280 nv_add_uint64(nvout, (uint64_t)hio_recv_remote_list_size, 281 "recv_queue_size"); 282 nv_add_uint64(nvout, (uint64_t)hio_done_list_size, 283 "done_queue_size"); 284} 285 286static void 287cleanup(struct hast_resource *res) 288{ 289 int rerrno; 290 291 /* Remember errno. */ 292 rerrno = errno; 293 294 /* Destroy ggate provider if we created one. */ 295 if (res->hr_ggateunit >= 0) { 296 struct g_gate_ctl_destroy ggiod; 297 298 bzero(&ggiod, sizeof(ggiod)); 299 ggiod.gctl_version = G_GATE_VERSION; 300 ggiod.gctl_unit = res->hr_ggateunit; 301 ggiod.gctl_force = 1; 302 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { 303 pjdlog_errno(LOG_WARNING, 304 "Unable to destroy hast/%s device", 305 res->hr_provname); 306 } 307 res->hr_ggateunit = -1; 308 } 309 310 /* Restore errno. */ 311 errno = rerrno; 312} 313 314static __dead2 void 315primary_exit(int exitcode, const char *fmt, ...) 316{ 317 va_list ap; 318 319 PJDLOG_ASSERT(exitcode != EX_OK); 320 va_start(ap, fmt); 321 pjdlogv_errno(LOG_ERR, fmt, ap); 322 va_end(ap); 323 cleanup(gres); 324 exit(exitcode); 325} 326 327static __dead2 void 328primary_exitx(int exitcode, const char *fmt, ...) 329{ 330 va_list ap; 331 332 va_start(ap, fmt); 333 pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 334 va_end(ap); 335 cleanup(gres); 336 exit(exitcode); 337} 338 339/* Expects res->hr_amp locked, returns unlocked. */ 340static int 341hast_activemap_flush(struct hast_resource *res) 342{ 343 const unsigned char *buf; 344 size_t size; 345 int ret; 346 347 mtx_lock(&res->hr_amp_diskmap_lock); 348 buf = activemap_bitmap(res->hr_amp, &size); 349 mtx_unlock(&res->hr_amp_lock); 350 PJDLOG_ASSERT(buf != NULL); 351 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 352 ret = 0; 353 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 354 (ssize_t)size) { 355 pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); 356 res->hr_stat_activemap_write_error++; 357 ret = -1; 358 } 359 if (ret == 0 && res->hr_metaflush == 1 && 360 g_flush(res->hr_localfd) == -1) { 361 if (errno == EOPNOTSUPP) { 362 pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", 363 res->hr_localpath); 364 res->hr_metaflush = 0; 365 } else { 366 pjdlog_errno(LOG_ERR, 367 "Unable to flush disk cache on activemap update"); 368 res->hr_stat_activemap_flush_error++; 369 ret = -1; 370 } 371 } 372 mtx_unlock(&res->hr_amp_diskmap_lock); 373 return (ret); 374} 375 376static bool 377real_remote(const struct hast_resource *res) 378{ 379 380 return (strcmp(res->hr_remoteaddr, "none") != 0); 381} 382 383static void 384init_environment(struct hast_resource *res __unused) 385{ 386 struct hio *hio; 387 unsigned int ii, ncomps; 388 389 /* 390 * In the future it might be per-resource value. 391 */ 392 ncomps = HAST_NCOMPONENTS; 393 394 /* 395 * Allocate memory needed by lists. 396 */ 397 hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 398 if (hio_send_list == NULL) { 399 primary_exitx(EX_TEMPFAIL, 400 "Unable to allocate %zu bytes of memory for send lists.", 401 sizeof(hio_send_list[0]) * ncomps); 402 } 403 hio_send_list_size = malloc(sizeof(hio_send_list_size[0]) * ncomps); 404 if (hio_send_list_size == NULL) { 405 primary_exitx(EX_TEMPFAIL, 406 "Unable to allocate %zu bytes of memory for send list counters.", 407 sizeof(hio_send_list_size[0]) * ncomps); 408 } 409 hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 410 if (hio_send_list_lock == NULL) { 411 primary_exitx(EX_TEMPFAIL, 412 "Unable to allocate %zu bytes of memory for send list locks.", 413 sizeof(hio_send_list_lock[0]) * ncomps); 414 } 415 hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 416 if (hio_send_list_cond == NULL) { 417 primary_exitx(EX_TEMPFAIL, 418 "Unable to allocate %zu bytes of memory for send list condition variables.", 419 sizeof(hio_send_list_cond[0]) * ncomps); 420 } 421 hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 422 if (hio_recv_list == NULL) { 423 primary_exitx(EX_TEMPFAIL, 424 "Unable to allocate %zu bytes of memory for recv lists.", 425 sizeof(hio_recv_list[0]) * ncomps); 426 } 427 hio_recv_list_size = malloc(sizeof(hio_recv_list_size[0]) * ncomps); 428 if (hio_recv_list_size == NULL) { 429 primary_exitx(EX_TEMPFAIL, 430 "Unable to allocate %zu bytes of memory for recv list counters.", 431 sizeof(hio_recv_list_size[0]) * ncomps); 432 } 433 hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 434 if (hio_recv_list_lock == NULL) { 435 primary_exitx(EX_TEMPFAIL, 436 "Unable to allocate %zu bytes of memory for recv list locks.", 437 sizeof(hio_recv_list_lock[0]) * ncomps); 438 } 439 hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 440 if (hio_recv_list_cond == NULL) { 441 primary_exitx(EX_TEMPFAIL, 442 "Unable to allocate %zu bytes of memory for recv list condition variables.", 443 sizeof(hio_recv_list_cond[0]) * ncomps); 444 } 445 hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 446 if (hio_remote_lock == NULL) { 447 primary_exitx(EX_TEMPFAIL, 448 "Unable to allocate %zu bytes of memory for remote connections locks.", 449 sizeof(hio_remote_lock[0]) * ncomps); 450 } 451 452 /* 453 * Initialize lists, their counters, locks and condition variables. 454 */ 455 TAILQ_INIT(&hio_free_list); 456 mtx_init(&hio_free_list_lock); 457 cv_init(&hio_free_list_cond); 458 for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 459 TAILQ_INIT(&hio_send_list[ii]); 460 hio_send_list_size[ii] = 0; 461 mtx_init(&hio_send_list_lock[ii]); 462 cv_init(&hio_send_list_cond[ii]); 463 TAILQ_INIT(&hio_recv_list[ii]); 464 hio_recv_list_size[ii] = 0; 465 mtx_init(&hio_recv_list_lock[ii]); 466 cv_init(&hio_recv_list_cond[ii]); 467 rw_init(&hio_remote_lock[ii]); 468 } 469 TAILQ_INIT(&hio_done_list); 470 mtx_init(&hio_done_list_lock); 471 cv_init(&hio_done_list_cond); 472 mtx_init(&metadata_lock); 473 474 /* 475 * Allocate requests pool and initialize requests. 476 */ 477 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 478 hio = malloc(sizeof(*hio)); 479 if (hio == NULL) { 480 primary_exitx(EX_TEMPFAIL, 481 "Unable to allocate %zu bytes of memory for hio request.", 482 sizeof(*hio)); 483 } 484 refcnt_init(&hio->hio_countdown, 0); 485 hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 486 if (hio->hio_errors == NULL) { 487 primary_exitx(EX_TEMPFAIL, 488 "Unable allocate %zu bytes of memory for hio errors.", 489 sizeof(hio->hio_errors[0]) * ncomps); 490 } 491 hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 492 if (hio->hio_next == NULL) { 493 primary_exitx(EX_TEMPFAIL, 494 "Unable allocate %zu bytes of memory for hio_next field.", 495 sizeof(hio->hio_next[0]) * ncomps); 496 } 497 hio->hio_ggio.gctl_version = G_GATE_VERSION; 498 hio->hio_ggio.gctl_data = malloc(MAXPHYS); 499 if (hio->hio_ggio.gctl_data == NULL) { 500 primary_exitx(EX_TEMPFAIL, 501 "Unable to allocate %zu bytes of memory for gctl_data.", 502 MAXPHYS); 503 } 504 hio->hio_ggio.gctl_length = MAXPHYS; 505 hio->hio_ggio.gctl_error = 0; 506 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 507 hio_free_list_size++; 508 } 509} 510 511static bool 512init_resuid(struct hast_resource *res) 513{ 514 515 mtx_lock(&metadata_lock); 516 if (res->hr_resuid != 0) { 517 mtx_unlock(&metadata_lock); 518 return (false); 519 } else { 520 /* Initialize unique resource identifier. */ 521 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 522 mtx_unlock(&metadata_lock); 523 if (metadata_write(res) == -1) 524 exit(EX_NOINPUT); 525 return (true); 526 } 527} 528 529static void 530init_local(struct hast_resource *res) 531{ 532 unsigned char *buf; 533 size_t mapsize; 534 535 if (metadata_read(res, true) == -1) 536 exit(EX_NOINPUT); 537 mtx_init(&res->hr_amp_lock); 538 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 539 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { 540 primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 541 } 542 mtx_init(&range_lock); 543 cv_init(&range_regular_cond); 544 if (rangelock_init(&range_regular) == -1) 545 primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 546 cv_init(&range_sync_cond); 547 if (rangelock_init(&range_sync) == -1) 548 primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 549 mapsize = activemap_ondisk_size(res->hr_amp); 550 buf = calloc(1, mapsize); 551 if (buf == NULL) { 552 primary_exitx(EX_TEMPFAIL, 553 "Unable to allocate buffer for activemap."); 554 } 555 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 556 (ssize_t)mapsize) { 557 primary_exit(EX_NOINPUT, "Unable to read activemap"); 558 } 559 activemap_copyin(res->hr_amp, buf, mapsize); 560 free(buf); 561 if (res->hr_resuid != 0) 562 return; 563 /* 564 * We're using provider for the first time. Initialize local and remote 565 * counters. We don't initialize resuid here, as we want to do it just 566 * in time. The reason for this is that we want to inform secondary 567 * that there were no writes yet, so there is no need to synchronize 568 * anything. 569 */ 570 res->hr_primary_localcnt = 0; 571 res->hr_primary_remotecnt = 0; 572 if (metadata_write(res) == -1) 573 exit(EX_NOINPUT); 574} 575 576static int 577primary_connect(struct hast_resource *res, struct proto_conn **connp) 578{ 579 struct proto_conn *conn; 580 int16_t val; 581 582 val = 1; 583 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { 584 primary_exit(EX_TEMPFAIL, 585 "Unable to send connection request to parent"); 586 } 587 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { 588 primary_exit(EX_TEMPFAIL, 589 "Unable to receive reply to connection request from parent"); 590 } 591 if (val != 0) { 592 errno = val; 593 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 594 res->hr_remoteaddr); 595 return (-1); 596 } 597 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { 598 primary_exit(EX_TEMPFAIL, 599 "Unable to receive connection from parent"); 600 } 601 if (proto_connect_wait(conn, res->hr_timeout) == -1) { 602 pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 603 res->hr_remoteaddr); 604 proto_close(conn); 605 return (-1); 606 } 607 /* Error in setting timeout is not critical, but why should it fail? */ 608 if (proto_timeout(conn, res->hr_timeout) == -1) 609 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 610 611 *connp = conn; 612 613 return (0); 614} 615 616/* 617 * Function instructs GEOM_GATE to handle reads directly from within the kernel. 618 */ 619static void 620enable_direct_reads(struct hast_resource *res) 621{ 622 struct g_gate_ctl_modify ggiomodify; 623 624 bzero(&ggiomodify, sizeof(ggiomodify)); 625 ggiomodify.gctl_version = G_GATE_VERSION; 626 ggiomodify.gctl_unit = res->hr_ggateunit; 627 ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; 628 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, 629 sizeof(ggiomodify.gctl_readprov)); 630 ggiomodify.gctl_readoffset = res->hr_localoff; 631 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) 632 pjdlog_debug(1, "Direct reads enabled."); 633 else 634 pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); 635} 636 637static int 638init_remote(struct hast_resource *res, struct proto_conn **inp, 639 struct proto_conn **outp) 640{ 641 struct proto_conn *in, *out; 642 struct nv *nvout, *nvin; 643 const unsigned char *token; 644 unsigned char *map; 645 const char *errmsg; 646 int32_t extentsize; 647 int64_t datasize; 648 uint32_t mapsize; 649 uint8_t version; 650 size_t size; 651 int error; 652 653 PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 654 PJDLOG_ASSERT(real_remote(res)); 655 656 in = out = NULL; 657 errmsg = NULL; 658 659 if (primary_connect(res, &out) == -1) 660 return (ECONNREFUSED); 661 662 error = ECONNABORTED; 663 664 /* 665 * First handshake step. 666 * Setup outgoing connection with remote node. 667 */ 668 nvout = nv_alloc(); 669 nv_add_string(nvout, res->hr_name, "resource"); 670 nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); 671 if (nv_error(nvout) != 0) { 672 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 673 "Unable to allocate header for connection with %s", 674 res->hr_remoteaddr); 675 nv_free(nvout); 676 goto close; 677 } 678 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { 679 pjdlog_errno(LOG_WARNING, 680 "Unable to send handshake header to %s", 681 res->hr_remoteaddr); 682 nv_free(nvout); 683 goto close; 684 } 685 nv_free(nvout); 686 if (hast_proto_recv_hdr(out, &nvin) == -1) { 687 pjdlog_errno(LOG_WARNING, 688 "Unable to receive handshake header from %s", 689 res->hr_remoteaddr); 690 goto close; 691 } 692 errmsg = nv_get_string(nvin, "errmsg"); 693 if (errmsg != NULL) { 694 pjdlog_warning("%s", errmsg); 695 if (nv_exists(nvin, "wait")) 696 error = EBUSY; 697 nv_free(nvin); 698 goto close; 699 } 700 version = nv_get_uint8(nvin, "version"); 701 if (version == 0) { 702 /* 703 * If no version is sent, it means this is protocol version 1. 704 */ 705 version = 1; 706 } 707 if (version > HAST_PROTO_VERSION) { 708 pjdlog_warning("Invalid version received (%hhu).", version); 709 nv_free(nvin); 710 goto close; 711 } 712 res->hr_version = version; 713 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); 714 token = nv_get_uint8_array(nvin, &size, "token"); 715 if (token == NULL) { 716 pjdlog_warning("Handshake header from %s has no 'token' field.", 717 res->hr_remoteaddr); 718 nv_free(nvin); 719 goto close; 720 } 721 if (size != sizeof(res->hr_token)) { 722 pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 723 res->hr_remoteaddr, size, sizeof(res->hr_token)); 724 nv_free(nvin); 725 goto close; 726 } 727 bcopy(token, res->hr_token, sizeof(res->hr_token)); 728 nv_free(nvin); 729 730 /* 731 * Second handshake step. 732 * Setup incoming connection with remote node. 733 */ 734 if (primary_connect(res, &in) == -1) 735 goto close; 736 737 nvout = nv_alloc(); 738 nv_add_string(nvout, res->hr_name, "resource"); 739 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 740 "token"); 741 if (res->hr_resuid == 0) { 742 /* 743 * The resuid field was not yet initialized. 744 * Because we do synchronization inside init_resuid(), it is 745 * possible that someone already initialized it, the function 746 * will return false then, but if we successfully initialized 747 * it, we will get true. True means that there were no writes 748 * to this resource yet and we want to inform secondary that 749 * synchronization is not needed by sending "virgin" argument. 750 */ 751 if (init_resuid(res)) 752 nv_add_int8(nvout, 1, "virgin"); 753 } 754 nv_add_uint64(nvout, res->hr_resuid, "resuid"); 755 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 756 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 757 if (nv_error(nvout) != 0) { 758 pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 759 "Unable to allocate header for connection with %s", 760 res->hr_remoteaddr); 761 nv_free(nvout); 762 goto close; 763 } 764 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { 765 pjdlog_errno(LOG_WARNING, 766 "Unable to send handshake header to %s", 767 res->hr_remoteaddr); 768 nv_free(nvout); 769 goto close; 770 } 771 nv_free(nvout); 772 if (hast_proto_recv_hdr(out, &nvin) == -1) { 773 pjdlog_errno(LOG_WARNING, 774 "Unable to receive handshake header from %s", 775 res->hr_remoteaddr); 776 goto close; 777 } 778 errmsg = nv_get_string(nvin, "errmsg"); 779 if (errmsg != NULL) { 780 pjdlog_warning("%s", errmsg); 781 nv_free(nvin); 782 goto close; 783 } 784 datasize = nv_get_int64(nvin, "datasize"); 785 if (datasize != res->hr_datasize) { 786 pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 787 (intmax_t)res->hr_datasize, (intmax_t)datasize); 788 nv_free(nvin); 789 goto close; 790 } 791 extentsize = nv_get_int32(nvin, "extentsize"); 792 if (extentsize != res->hr_extentsize) { 793 pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 794 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 795 nv_free(nvin); 796 goto close; 797 } 798 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 799 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 800 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 801 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) 802 enable_direct_reads(res); 803 if (nv_exists(nvin, "virgin")) { 804 /* 805 * Secondary was reinitialized, bump localcnt if it is 0 as 806 * only we have the data. 807 */ 808 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); 809 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 810 811 if (res->hr_primary_localcnt == 0) { 812 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); 813 814 mtx_lock(&metadata_lock); 815 res->hr_primary_localcnt++; 816 pjdlog_debug(1, "Increasing localcnt to %ju.", 817 (uintmax_t)res->hr_primary_localcnt); 818 (void)metadata_write(res); 819 mtx_unlock(&metadata_lock); 820 } 821 } 822 map = NULL; 823 mapsize = nv_get_uint32(nvin, "mapsize"); 824 if (mapsize > 0) { 825 map = malloc(mapsize); 826 if (map == NULL) { 827 pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 828 (uintmax_t)mapsize); 829 nv_free(nvin); 830 goto close; 831 } 832 /* 833 * Remote node have some dirty extents on its own, lets 834 * download its activemap. 835 */ 836 if (hast_proto_recv_data(res, out, nvin, map, 837 mapsize) == -1) { 838 pjdlog_errno(LOG_ERR, 839 "Unable to receive remote activemap"); 840 nv_free(nvin); 841 free(map); 842 goto close; 843 } 844 mtx_lock(&res->hr_amp_lock); 845 /* 846 * Merge local and remote bitmaps. 847 */ 848 activemap_merge(res->hr_amp, map, mapsize); 849 free(map); 850 /* 851 * Now that we merged bitmaps from both nodes, flush it to the 852 * disk before we start to synchronize. 853 */ 854 (void)hast_activemap_flush(res); 855 } 856 nv_free(nvin); 857#ifdef notyet 858 /* Setup directions. */ 859 if (proto_send(out, NULL, 0) == -1) 860 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 861 if (proto_recv(in, NULL, 0) == -1) 862 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 863#endif 864 pjdlog_info("Connected to %s.", res->hr_remoteaddr); 865 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && 866 res->hr_version < 2) { 867 pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); 868 res->hr_replication = HAST_REPLICATION_FULLSYNC; 869 } else if (res->hr_replication != res->hr_original_replication) { 870 /* 871 * This is in case hastd disconnected and was upgraded. 872 */ 873 res->hr_replication = res->hr_original_replication; 874 } 875 if (inp != NULL && outp != NULL) { 876 *inp = in; 877 *outp = out; 878 } else { 879 res->hr_remotein = in; 880 res->hr_remoteout = out; 881 } 882 event_send(res, EVENT_CONNECT); 883 return (0); 884close: 885 if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 886 event_send(res, EVENT_SPLITBRAIN); 887 proto_close(out); 888 if (in != NULL) 889 proto_close(in); 890 return (error); 891} 892 893static void 894sync_start(void) 895{ 896 897 mtx_lock(&sync_lock); 898 sync_inprogress = true; 899 mtx_unlock(&sync_lock); 900 cv_signal(&sync_cond); 901} 902 903static void 904sync_stop(void) 905{ 906 907 mtx_lock(&sync_lock); 908 if (sync_inprogress) 909 sync_inprogress = false; 910 mtx_unlock(&sync_lock); 911} 912 913static void 914init_ggate(struct hast_resource *res) 915{ 916 struct g_gate_ctl_create ggiocreate; 917 struct g_gate_ctl_cancel ggiocancel; 918 919 /* 920 * We communicate with ggate via /dev/ggctl. Open it. 921 */ 922 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 923 if (res->hr_ggatefd == -1) 924 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 925 /* 926 * Create provider before trying to connect, as connection failure 927 * is not critical, but may take some time. 928 */ 929 bzero(&ggiocreate, sizeof(ggiocreate)); 930 ggiocreate.gctl_version = G_GATE_VERSION; 931 ggiocreate.gctl_mediasize = res->hr_datasize; 932 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 933 ggiocreate.gctl_flags = 0; 934 ggiocreate.gctl_maxcount = 0; 935 ggiocreate.gctl_timeout = 0; 936 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 937 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 938 res->hr_provname); 939 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 940 pjdlog_info("Device hast/%s created.", res->hr_provname); 941 res->hr_ggateunit = ggiocreate.gctl_unit; 942 return; 943 } 944 if (errno != EEXIST) { 945 primary_exit(EX_OSERR, "Unable to create hast/%s device", 946 res->hr_provname); 947 } 948 pjdlog_debug(1, 949 "Device hast/%s already exists, we will try to take it over.", 950 res->hr_provname); 951 /* 952 * If we received EEXIST, we assume that the process who created the 953 * provider died and didn't clean up. In that case we will start from 954 * where he left of. 955 */ 956 bzero(&ggiocancel, sizeof(ggiocancel)); 957 ggiocancel.gctl_version = G_GATE_VERSION; 958 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 959 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 960 res->hr_provname); 961 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 962 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 963 res->hr_ggateunit = ggiocancel.gctl_unit; 964 return; 965 } 966 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 967 res->hr_provname); 968} 969 970void 971hastd_primary(struct hast_resource *res) 972{ 973 pthread_t td; 974 pid_t pid; 975 int error, mode, debuglevel; 976 977 /* 978 * Create communication channel for sending control commands from 979 * parent to child. 980 */ 981 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 982 /* TODO: There's no need for this to be fatal error. */ 983 KEEP_ERRNO((void)pidfile_remove(pfh)); 984 pjdlog_exit(EX_OSERR, 985 "Unable to create control sockets between parent and child"); 986 } 987 /* 988 * Create communication channel for sending events from child to parent. 989 */ 990 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 991 /* TODO: There's no need for this to be fatal error. */ 992 KEEP_ERRNO((void)pidfile_remove(pfh)); 993 pjdlog_exit(EX_OSERR, 994 "Unable to create event sockets between child and parent"); 995 } 996 /* 997 * Create communication channel for sending connection requests from 998 * child to parent. 999 */ 1000 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { 1001 /* TODO: There's no need for this to be fatal error. */ 1002 KEEP_ERRNO((void)pidfile_remove(pfh)); 1003 pjdlog_exit(EX_OSERR, 1004 "Unable to create connection sockets between child and parent"); 1005 } 1006 1007 pid = fork(); 1008 if (pid == -1) { 1009 /* TODO: There's no need for this to be fatal error. */ 1010 KEEP_ERRNO((void)pidfile_remove(pfh)); 1011 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 1012 } 1013 1014 if (pid > 0) { 1015 /* This is parent. */ 1016 /* Declare that we are receiver. */ 1017 proto_recv(res->hr_event, NULL, 0); 1018 proto_recv(res->hr_conn, NULL, 0); 1019 /* Declare that we are sender. */ 1020 proto_send(res->hr_ctrl, NULL, 0); 1021 res->hr_workerpid = pid; 1022 return; 1023 } 1024 1025 gres = res; 1026 res->output_status_aux = output_status_aux; 1027 mode = pjdlog_mode_get(); 1028 debuglevel = pjdlog_debug_get(); 1029 1030 /* Declare that we are sender. */ 1031 proto_send(res->hr_event, NULL, 0); 1032 proto_send(res->hr_conn, NULL, 0); 1033 /* Declare that we are receiver. */ 1034 proto_recv(res->hr_ctrl, NULL, 0); 1035 descriptors_cleanup(res); 1036 1037 descriptors_assert(res, mode); 1038 1039 pjdlog_init(mode); 1040 pjdlog_debug_set(debuglevel); 1041 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 1042 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 1043 1044 init_local(res); 1045 init_ggate(res); 1046 init_environment(res); 1047 1048 if (drop_privs(res) != 0) { 1049 cleanup(res); 1050 exit(EX_CONFIG); 1051 } 1052 pjdlog_info("Privileges successfully dropped."); 1053 1054 /* 1055 * Create the guard thread first, so we can handle signals from the 1056 * very beginning. 1057 */ 1058 error = pthread_create(&td, NULL, guard_thread, res); 1059 PJDLOG_ASSERT(error == 0); 1060 /* 1061 * Create the control thread before sending any event to the parent, 1062 * as we can deadlock when parent sends control request to worker, 1063 * but worker has no control thread started yet, so parent waits. 1064 * In the meantime worker sends an event to the parent, but parent 1065 * is unable to handle the event, because it waits for control 1066 * request response. 1067 */ 1068 error = pthread_create(&td, NULL, ctrl_thread, res); 1069 PJDLOG_ASSERT(error == 0); 1070 if (real_remote(res)) { 1071 error = init_remote(res, NULL, NULL); 1072 if (error == 0) { 1073 sync_start(); 1074 } else if (error == EBUSY) { 1075 time_t start = time(NULL); 1076 1077 pjdlog_warning("Waiting for remote node to become %s for %ds.", 1078 role2str(HAST_ROLE_SECONDARY), 1079 res->hr_timeout); 1080 for (;;) { 1081 sleep(1); 1082 error = init_remote(res, NULL, NULL); 1083 if (error != EBUSY) 1084 break; 1085 if (time(NULL) > start + res->hr_timeout) 1086 break; 1087 } 1088 if (error == EBUSY) { 1089 pjdlog_warning("Remote node is still %s, starting anyway.", 1090 role2str(HAST_ROLE_PRIMARY)); 1091 } 1092 } 1093 } 1094 error = pthread_create(&td, NULL, ggate_recv_thread, res); 1095 PJDLOG_ASSERT(error == 0); 1096 error = pthread_create(&td, NULL, local_send_thread, res); 1097 PJDLOG_ASSERT(error == 0); 1098 error = pthread_create(&td, NULL, remote_send_thread, res); 1099 PJDLOG_ASSERT(error == 0); 1100 error = pthread_create(&td, NULL, remote_recv_thread, res); 1101 PJDLOG_ASSERT(error == 0); 1102 error = pthread_create(&td, NULL, ggate_send_thread, res); 1103 PJDLOG_ASSERT(error == 0); 1104 fullystarted = true; 1105 (void)sync_thread(res); 1106} 1107 1108static void 1109reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, 1110 const char *fmt, ...) 1111{ 1112 char msg[1024]; 1113 va_list ap; 1114 1115 va_start(ap, fmt); 1116 (void)vsnprintf(msg, sizeof(msg), fmt, ap); 1117 va_end(ap); 1118 switch (ggio->gctl_cmd) { 1119 case BIO_READ: 1120 (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", 1121 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1122 break; 1123 case BIO_DELETE: 1124 (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", 1125 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1126 break; 1127 case BIO_FLUSH: 1128 (void)snprlcat(msg, sizeof(msg), "FLUSH."); 1129 break; 1130 case BIO_WRITE: 1131 (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", 1132 (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); 1133 break; 1134 default: 1135 (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", 1136 (unsigned int)ggio->gctl_cmd); 1137 break; 1138 } 1139 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 1140} 1141 1142static void 1143remote_close(struct hast_resource *res, int ncomp) 1144{ 1145 1146 rw_wlock(&hio_remote_lock[ncomp]); 1147 /* 1148 * Check for a race between dropping rlock and acquiring wlock - 1149 * another thread can close connection in-between. 1150 */ 1151 if (!ISCONNECTED(res, ncomp)) { 1152 PJDLOG_ASSERT(res->hr_remotein == NULL); 1153 PJDLOG_ASSERT(res->hr_remoteout == NULL); 1154 rw_unlock(&hio_remote_lock[ncomp]); 1155 return; 1156 } 1157 1158 PJDLOG_ASSERT(res->hr_remotein != NULL); 1159 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1160 1161 pjdlog_debug(2, "Closing incoming connection to %s.", 1162 res->hr_remoteaddr); 1163 proto_close(res->hr_remotein); 1164 res->hr_remotein = NULL; 1165 pjdlog_debug(2, "Closing outgoing connection to %s.", 1166 res->hr_remoteaddr); 1167 proto_close(res->hr_remoteout); 1168 res->hr_remoteout = NULL; 1169 1170 rw_unlock(&hio_remote_lock[ncomp]); 1171 1172 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 1173 1174 /* 1175 * Stop synchronization if in-progress. 1176 */ 1177 sync_stop(); 1178 1179 event_send(res, EVENT_DISCONNECT); 1180} 1181 1182/* 1183 * Acknowledge write completion to the kernel, but don't update activemap yet. 1184 */ 1185static void 1186write_complete(struct hast_resource *res, struct hio *hio) 1187{ 1188 struct g_gate_ctl_io *ggio; 1189 unsigned int ncomp; 1190 1191 PJDLOG_ASSERT(!hio->hio_done); 1192 1193 ggio = &hio->hio_ggio; 1194 PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); 1195 1196 /* 1197 * Bump local count if this is first write after 1198 * connection failure with remote node. 1199 */ 1200 ncomp = 1; 1201 rw_rlock(&hio_remote_lock[ncomp]); 1202 if (!ISCONNECTED(res, ncomp)) { 1203 mtx_lock(&metadata_lock); 1204 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { 1205 res->hr_primary_localcnt++; 1206 pjdlog_debug(1, "Increasing localcnt to %ju.", 1207 (uintmax_t)res->hr_primary_localcnt); 1208 (void)metadata_write(res); 1209 } 1210 mtx_unlock(&metadata_lock); 1211 } 1212 rw_unlock(&hio_remote_lock[ncomp]); 1213 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) 1214 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1215 hio->hio_done = true; 1216} 1217 1218/* 1219 * Thread receives ggate I/O requests from the kernel and passes them to 1220 * appropriate threads: 1221 * WRITE - always goes to both local_send and remote_send threads 1222 * READ (when the block is up-to-date on local component) - 1223 * only local_send thread 1224 * READ (when the block isn't up-to-date on local component) - 1225 * only remote_send thread 1226 * DELETE - always goes to both local_send and remote_send threads 1227 * FLUSH - always goes to both local_send and remote_send threads 1228 */ 1229static void * 1230ggate_recv_thread(void *arg) 1231{ 1232 struct hast_resource *res = arg; 1233 struct g_gate_ctl_io *ggio; 1234 struct hio *hio; 1235 unsigned int ii, ncomp, ncomps; 1236 int error; 1237 1238 for (;;) { 1239 pjdlog_debug(2, "ggate_recv: Taking free request."); 1240 QUEUE_TAKE2(hio, free); 1241 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1242 ggio = &hio->hio_ggio; 1243 ggio->gctl_unit = res->hr_ggateunit; 1244 ggio->gctl_length = MAXPHYS; 1245 ggio->gctl_error = 0; 1246 hio->hio_done = false; 1247 hio->hio_replication = res->hr_replication; 1248 pjdlog_debug(2, 1249 "ggate_recv: (%p) Waiting for request from the kernel.", 1250 hio); 1251 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { 1252 if (sigexit_received) 1253 pthread_exit(NULL); 1254 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1255 } 1256 error = ggio->gctl_error; 1257 switch (error) { 1258 case 0: 1259 break; 1260 case ECANCELED: 1261 /* Exit gracefully. */ 1262 if (!sigexit_received) { 1263 pjdlog_debug(2, 1264 "ggate_recv: (%p) Received cancel from the kernel.", 1265 hio); 1266 pjdlog_info("Received cancel from the kernel, exiting."); 1267 } 1268 pthread_exit(NULL); 1269 case ENOMEM: 1270 /* 1271 * Buffer too small? Impossible, we allocate MAXPHYS 1272 * bytes - request can't be bigger than that. 1273 */ 1274 /* FALLTHROUGH */ 1275 case ENXIO: 1276 default: 1277 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1278 strerror(error)); 1279 } 1280 1281 ncomp = 0; 1282 ncomps = HAST_NCOMPONENTS; 1283 1284 for (ii = 0; ii < ncomps; ii++) 1285 hio->hio_errors[ii] = EINVAL; 1286 reqlog(LOG_DEBUG, 2, ggio, 1287 "ggate_recv: (%p) Request received from the kernel: ", 1288 hio); 1289 1290 /* 1291 * Inform all components about new write request. 1292 * For read request prefer local component unless the given 1293 * range is out-of-date, then use remote component. 1294 */ 1295 switch (ggio->gctl_cmd) { 1296 case BIO_READ: 1297 res->hr_stat_read++; 1298 ncomps = 1; 1299 mtx_lock(&metadata_lock); 1300 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1301 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1302 /* 1303 * This range is up-to-date on local component, 1304 * so handle request locally. 1305 */ 1306 /* Local component is 0 for now. */ 1307 ncomp = 0; 1308 } else /* if (res->hr_syncsrc == 1309 HAST_SYNCSRC_SECONDARY) */ { 1310 PJDLOG_ASSERT(res->hr_syncsrc == 1311 HAST_SYNCSRC_SECONDARY); 1312 /* 1313 * This range is out-of-date on local component, 1314 * so send request to the remote node. 1315 */ 1316 /* Remote component is 1 for now. */ 1317 ncomp = 1; 1318 } 1319 mtx_unlock(&metadata_lock); 1320 break; 1321 case BIO_WRITE: 1322 res->hr_stat_write++; 1323 if (res->hr_resuid == 0 && 1324 res->hr_primary_localcnt == 0) { 1325 /* This is first write. */ 1326 res->hr_primary_localcnt = 1; 1327 } 1328 for (;;) { 1329 mtx_lock(&range_lock); 1330 if (rangelock_islocked(range_sync, 1331 ggio->gctl_offset, ggio->gctl_length)) { 1332 pjdlog_debug(2, 1333 "regular: Range offset=%jd length=%zu locked.", 1334 (intmax_t)ggio->gctl_offset, 1335 (size_t)ggio->gctl_length); 1336 range_regular_wait = true; 1337 cv_wait(&range_regular_cond, &range_lock); 1338 range_regular_wait = false; 1339 mtx_unlock(&range_lock); 1340 continue; 1341 } 1342 if (rangelock_add(range_regular, 1343 ggio->gctl_offset, ggio->gctl_length) == -1) { 1344 mtx_unlock(&range_lock); 1345 pjdlog_debug(2, 1346 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1347 (intmax_t)ggio->gctl_offset, 1348 (size_t)ggio->gctl_length); 1349 sleep(1); 1350 continue; 1351 } 1352 mtx_unlock(&range_lock); 1353 break; 1354 } 1355 mtx_lock(&res->hr_amp_lock); 1356 if (activemap_write_start(res->hr_amp, 1357 ggio->gctl_offset, ggio->gctl_length)) { 1358 res->hr_stat_activemap_update++; 1359 (void)hast_activemap_flush(res); 1360 } else { 1361 mtx_unlock(&res->hr_amp_lock); 1362 } 1363 if (ISMEMSYNC(hio)) { 1364 hio->hio_memsyncacked = false; 1365 refcnt_init(&hio->hio_writecount, ncomps); 1366 } 1367 break; 1368 case BIO_DELETE: 1369 res->hr_stat_delete++; 1370 break; 1371 case BIO_FLUSH: 1372 res->hr_stat_flush++; 1373 break; 1374 } 1375 pjdlog_debug(2, 1376 "ggate_recv: (%p) Moving request to the send queues.", hio); 1377 refcnt_init(&hio->hio_countdown, ncomps); 1378 for (ii = ncomp; ii < ncomps; ii++) 1379 QUEUE_INSERT1(hio, send, ii); 1380 } 1381 /* NOTREACHED */ 1382 return (NULL); 1383} 1384 1385/* 1386 * Thread reads from or writes to local component. 1387 * If local read fails, it redirects it to remote_send thread. 1388 */ 1389static void * 1390local_send_thread(void *arg) 1391{ 1392 struct hast_resource *res = arg; 1393 struct g_gate_ctl_io *ggio; 1394 struct hio *hio; 1395 unsigned int ncomp, rncomp; 1396 ssize_t ret; 1397 1398 /* Local component is 0 for now. */ 1399 ncomp = 0; 1400 /* Remote component is 1 for now. */ 1401 rncomp = 1; 1402 1403 for (;;) { 1404 pjdlog_debug(2, "local_send: Taking request."); 1405 QUEUE_TAKE1(hio, send, ncomp, 0); 1406 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1407 ggio = &hio->hio_ggio; 1408 switch (ggio->gctl_cmd) { 1409 case BIO_READ: 1410 ret = pread(res->hr_localfd, ggio->gctl_data, 1411 ggio->gctl_length, 1412 ggio->gctl_offset + res->hr_localoff); 1413 if (ret == ggio->gctl_length) 1414 hio->hio_errors[ncomp] = 0; 1415 else if (!ISSYNCREQ(hio)) { 1416 /* 1417 * If READ failed, try to read from remote node. 1418 */ 1419 if (ret == -1) { 1420 reqlog(LOG_WARNING, 0, ggio, 1421 "Local request failed (%s), trying remote node. ", 1422 strerror(errno)); 1423 } else if (ret != ggio->gctl_length) { 1424 reqlog(LOG_WARNING, 0, ggio, 1425 "Local request failed (%zd != %jd), trying remote node. ", 1426 ret, (intmax_t)ggio->gctl_length); 1427 } 1428 QUEUE_INSERT1(hio, send, rncomp); 1429 continue; 1430 } 1431 break; 1432 case BIO_WRITE: 1433 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1434 ggio->gctl_length, 1435 ggio->gctl_offset + res->hr_localoff); 1436 if (ret == -1) { 1437 hio->hio_errors[ncomp] = errno; 1438 reqlog(LOG_WARNING, 0, ggio, 1439 "Local request failed (%s): ", 1440 strerror(errno)); 1441 } else if (ret != ggio->gctl_length) { 1442 hio->hio_errors[ncomp] = EIO; 1443 reqlog(LOG_WARNING, 0, ggio, 1444 "Local request failed (%zd != %jd): ", 1445 ret, (intmax_t)ggio->gctl_length); 1446 } else { 1447 hio->hio_errors[ncomp] = 0; 1448 if (ISASYNC(hio)) { 1449 ggio->gctl_error = 0; 1450 write_complete(res, hio); 1451 } 1452 } 1453 break; 1454 case BIO_DELETE: 1455 ret = g_delete(res->hr_localfd, 1456 ggio->gctl_offset + res->hr_localoff, 1457 ggio->gctl_length); 1458 if (ret == -1) { 1459 hio->hio_errors[ncomp] = errno; 1460 reqlog(LOG_WARNING, 0, ggio, 1461 "Local request failed (%s): ", 1462 strerror(errno)); 1463 } else { 1464 hio->hio_errors[ncomp] = 0; 1465 } 1466 break; 1467 case BIO_FLUSH: 1468 if (!res->hr_localflush) { 1469 ret = -1; 1470 errno = EOPNOTSUPP; 1471 break; 1472 } 1473 ret = g_flush(res->hr_localfd); 1474 if (ret == -1) { 1475 if (errno == EOPNOTSUPP) 1476 res->hr_localflush = false; 1477 hio->hio_errors[ncomp] = errno; 1478 reqlog(LOG_WARNING, 0, ggio, 1479 "Local request failed (%s): ", 1480 strerror(errno)); 1481 } else { 1482 hio->hio_errors[ncomp] = 0; 1483 } 1484 break; 1485 } 1486 if (ISMEMSYNCWRITE(hio)) { 1487 if (refcnt_release(&hio->hio_writecount) == 0) { 1488 write_complete(res, hio); 1489 } 1490 } 1491 if (refcnt_release(&hio->hio_countdown) > 0) 1492 continue; 1493 if (ISSYNCREQ(hio)) { 1494 mtx_lock(&sync_lock); 1495 SYNCREQDONE(hio); 1496 mtx_unlock(&sync_lock); 1497 cv_signal(&sync_cond); 1498 } else { 1499 pjdlog_debug(2, 1500 "local_send: (%p) Moving request to the done queue.", 1501 hio); 1502 QUEUE_INSERT2(hio, done); 1503 } 1504 } 1505 /* NOTREACHED */ 1506 return (NULL); 1507} 1508 1509static void 1510keepalive_send(struct hast_resource *res, unsigned int ncomp) 1511{ 1512 struct nv *nv; 1513 1514 rw_rlock(&hio_remote_lock[ncomp]); 1515 1516 if (!ISCONNECTED(res, ncomp)) { 1517 rw_unlock(&hio_remote_lock[ncomp]); 1518 return; 1519 } 1520 1521 PJDLOG_ASSERT(res->hr_remotein != NULL); 1522 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1523 1524 nv = nv_alloc(); 1525 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1526 if (nv_error(nv) != 0) { 1527 rw_unlock(&hio_remote_lock[ncomp]); 1528 nv_free(nv); 1529 pjdlog_debug(1, 1530 "keepalive_send: Unable to prepare header to send."); 1531 return; 1532 } 1533 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { 1534 rw_unlock(&hio_remote_lock[ncomp]); 1535 pjdlog_common(LOG_DEBUG, 1, errno, 1536 "keepalive_send: Unable to send request"); 1537 nv_free(nv); 1538 remote_close(res, ncomp); 1539 return; 1540 } 1541 1542 rw_unlock(&hio_remote_lock[ncomp]); 1543 nv_free(nv); 1544 pjdlog_debug(2, "keepalive_send: Request sent."); 1545} 1546 1547/* 1548 * Thread sends request to secondary node. 1549 */ 1550static void * 1551remote_send_thread(void *arg) 1552{ 1553 struct hast_resource *res = arg; 1554 struct g_gate_ctl_io *ggio; 1555 time_t lastcheck, now; 1556 struct hio *hio; 1557 struct nv *nv; 1558 unsigned int ncomp; 1559 bool wakeup; 1560 uint64_t offset, length; 1561 uint8_t cmd; 1562 void *data; 1563 1564 /* Remote component is 1 for now. */ 1565 ncomp = 1; 1566 lastcheck = time(NULL); 1567 1568 for (;;) { 1569 pjdlog_debug(2, "remote_send: Taking request."); 1570 QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); 1571 if (hio == NULL) { 1572 now = time(NULL); 1573 if (lastcheck + HAST_KEEPALIVE <= now) { 1574 keepalive_send(res, ncomp); 1575 lastcheck = now; 1576 } 1577 continue; 1578 } 1579 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1580 ggio = &hio->hio_ggio; 1581 switch (ggio->gctl_cmd) { 1582 case BIO_READ: 1583 cmd = HIO_READ; 1584 data = NULL; 1585 offset = ggio->gctl_offset; 1586 length = ggio->gctl_length; 1587 break; 1588 case BIO_WRITE: 1589 cmd = HIO_WRITE; 1590 data = ggio->gctl_data; 1591 offset = ggio->gctl_offset; 1592 length = ggio->gctl_length; 1593 break; 1594 case BIO_DELETE: 1595 cmd = HIO_DELETE; 1596 data = NULL; 1597 offset = ggio->gctl_offset; 1598 length = ggio->gctl_length; 1599 break; 1600 case BIO_FLUSH: 1601 cmd = HIO_FLUSH; 1602 data = NULL; 1603 offset = 0; 1604 length = 0; 1605 break; 1606 default: 1607 PJDLOG_ABORT("invalid condition"); 1608 } 1609 nv = nv_alloc(); 1610 nv_add_uint8(nv, cmd, "cmd"); 1611 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1612 nv_add_uint64(nv, offset, "offset"); 1613 nv_add_uint64(nv, length, "length"); 1614 if (ISMEMSYNCWRITE(hio)) 1615 nv_add_uint8(nv, 1, "memsync"); 1616 if (nv_error(nv) != 0) { 1617 hio->hio_errors[ncomp] = nv_error(nv); 1618 pjdlog_debug(2, 1619 "remote_send: (%p) Unable to prepare header to send.", 1620 hio); 1621 reqlog(LOG_ERR, 0, ggio, 1622 "Unable to prepare header to send (%s): ", 1623 strerror(nv_error(nv))); 1624 /* Move failed request immediately to the done queue. */ 1625 goto done_queue; 1626 } 1627 /* 1628 * Protect connection from disappearing. 1629 */ 1630 rw_rlock(&hio_remote_lock[ncomp]); 1631 if (!ISCONNECTED(res, ncomp)) { 1632 rw_unlock(&hio_remote_lock[ncomp]); 1633 hio->hio_errors[ncomp] = ENOTCONN; 1634 goto done_queue; 1635 } 1636 /* 1637 * Move the request to recv queue before sending it, because 1638 * in different order we can get reply before we move request 1639 * to recv queue. 1640 */ 1641 pjdlog_debug(2, 1642 "remote_send: (%p) Moving request to the recv queue.", 1643 hio); 1644 mtx_lock(&hio_recv_list_lock[ncomp]); 1645 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1646 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1647 hio_recv_list_size[ncomp]++; 1648 mtx_unlock(&hio_recv_list_lock[ncomp]); 1649 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1650 data != NULL ? length : 0) == -1) { 1651 hio->hio_errors[ncomp] = errno; 1652 rw_unlock(&hio_remote_lock[ncomp]); 1653 pjdlog_debug(2, 1654 "remote_send: (%p) Unable to send request.", hio); 1655 reqlog(LOG_ERR, 0, ggio, 1656 "Unable to send request (%s): ", 1657 strerror(hio->hio_errors[ncomp])); 1658 remote_close(res, ncomp); 1659 } else { 1660 rw_unlock(&hio_remote_lock[ncomp]); 1661 } 1662 nv_free(nv); 1663 if (wakeup) 1664 cv_signal(&hio_recv_list_cond[ncomp]); 1665 continue; 1666done_queue: 1667 nv_free(nv); 1668 if (ISSYNCREQ(hio)) { 1669 if (refcnt_release(&hio->hio_countdown) > 0) 1670 continue; 1671 mtx_lock(&sync_lock); 1672 SYNCREQDONE(hio); 1673 mtx_unlock(&sync_lock); 1674 cv_signal(&sync_cond); 1675 continue; 1676 } 1677 if (ggio->gctl_cmd == BIO_WRITE) { 1678 mtx_lock(&res->hr_amp_lock); 1679 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1680 ggio->gctl_length)) { 1681 (void)hast_activemap_flush(res); 1682 } else { 1683 mtx_unlock(&res->hr_amp_lock); 1684 } 1685 if (ISMEMSYNCWRITE(hio)) { 1686 if (refcnt_release(&hio->hio_writecount) == 0) { 1687 if (hio->hio_errors[0] == 0) 1688 write_complete(res, hio); 1689 } 1690 } 1691 } 1692 if (refcnt_release(&hio->hio_countdown) > 0) 1693 continue; 1694 pjdlog_debug(2, 1695 "remote_send: (%p) Moving request to the done queue.", 1696 hio); 1697 QUEUE_INSERT2(hio, done); 1698 } 1699 /* NOTREACHED */ 1700 return (NULL); 1701} 1702 1703/* 1704 * Thread receives answer from secondary node and passes it to ggate_send 1705 * thread. 1706 */ 1707static void * 1708remote_recv_thread(void *arg) 1709{ 1710 struct hast_resource *res = arg; 1711 struct g_gate_ctl_io *ggio; 1712 struct hio *hio; 1713 struct nv *nv; 1714 unsigned int ncomp; 1715 uint64_t seq; 1716 bool memsyncack; 1717 int error; 1718 1719 /* Remote component is 1 for now. */ 1720 ncomp = 1; 1721 1722 for (;;) { 1723 /* Wait until there is anything to receive. */ 1724 mtx_lock(&hio_recv_list_lock[ncomp]); 1725 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1726 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1727 cv_wait(&hio_recv_list_cond[ncomp], 1728 &hio_recv_list_lock[ncomp]); 1729 } 1730 mtx_unlock(&hio_recv_list_lock[ncomp]); 1731 1732 memsyncack = false; 1733 1734 rw_rlock(&hio_remote_lock[ncomp]); 1735 if (!ISCONNECTED(res, ncomp)) { 1736 rw_unlock(&hio_remote_lock[ncomp]); 1737 /* 1738 * Connection is dead, so move all pending requests to 1739 * the done queue (one-by-one). 1740 */ 1741 mtx_lock(&hio_recv_list_lock[ncomp]); 1742 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1743 PJDLOG_ASSERT(hio != NULL); 1744 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1745 hio_next[ncomp]); 1746 hio_recv_list_size[ncomp]--; 1747 mtx_unlock(&hio_recv_list_lock[ncomp]); 1748 hio->hio_errors[ncomp] = ENOTCONN; 1749 goto done_queue; 1750 } 1751 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 1752 pjdlog_errno(LOG_ERR, 1753 "Unable to receive reply header"); 1754 rw_unlock(&hio_remote_lock[ncomp]); 1755 remote_close(res, ncomp); 1756 continue; 1757 } 1758 rw_unlock(&hio_remote_lock[ncomp]); 1759 seq = nv_get_uint64(nv, "seq"); 1760 if (seq == 0) { 1761 pjdlog_error("Header contains no 'seq' field."); 1762 nv_free(nv); 1763 continue; 1764 } 1765 memsyncack = nv_exists(nv, "received"); 1766 mtx_lock(&hio_recv_list_lock[ncomp]); 1767 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1768 if (hio->hio_ggio.gctl_seq == seq) { 1769 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1770 hio_next[ncomp]); 1771 hio_recv_list_size[ncomp]--; 1772 break; 1773 } 1774 } 1775 mtx_unlock(&hio_recv_list_lock[ncomp]); 1776 if (hio == NULL) { 1777 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1778 (uintmax_t)seq); 1779 nv_free(nv); 1780 continue; 1781 } 1782 ggio = &hio->hio_ggio; 1783 error = nv_get_int16(nv, "error"); 1784 if (error != 0) { 1785 /* Request failed on remote side. */ 1786 hio->hio_errors[ncomp] = error; 1787 reqlog(LOG_WARNING, 0, ggio, 1788 "Remote request failed (%s): ", strerror(error)); 1789 nv_free(nv); 1790 goto done_queue; 1791 } 1792 switch (ggio->gctl_cmd) { 1793 case BIO_READ: 1794 rw_rlock(&hio_remote_lock[ncomp]); 1795 if (!ISCONNECTED(res, ncomp)) { 1796 rw_unlock(&hio_remote_lock[ncomp]); 1797 nv_free(nv); 1798 goto done_queue; 1799 } 1800 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1801 ggio->gctl_data, ggio->gctl_length) == -1) { 1802 hio->hio_errors[ncomp] = errno; 1803 pjdlog_errno(LOG_ERR, 1804 "Unable to receive reply data"); 1805 rw_unlock(&hio_remote_lock[ncomp]); 1806 nv_free(nv); 1807 remote_close(res, ncomp); 1808 goto done_queue; 1809 } 1810 rw_unlock(&hio_remote_lock[ncomp]); 1811 break; 1812 case BIO_WRITE: 1813 case BIO_DELETE: 1814 case BIO_FLUSH: 1815 break; 1816 default: 1817 PJDLOG_ABORT("invalid condition"); 1818 } 1819 hio->hio_errors[ncomp] = 0; 1820 nv_free(nv); 1821done_queue: 1822 if (ISMEMSYNCWRITE(hio)) { 1823 if (!hio->hio_memsyncacked) { 1824 PJDLOG_ASSERT(memsyncack || 1825 hio->hio_errors[ncomp] != 0); 1826 /* Remote ack arrived. */ 1827 if (refcnt_release(&hio->hio_writecount) == 0) { 1828 if (hio->hio_errors[0] == 0) 1829 write_complete(res, hio); 1830 } 1831 hio->hio_memsyncacked = true; 1832 if (hio->hio_errors[ncomp] == 0) { 1833 pjdlog_debug(2, 1834 "remote_recv: (%p) Moving request " 1835 "back to the recv queue.", hio); 1836 mtx_lock(&hio_recv_list_lock[ncomp]); 1837 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], 1838 hio, hio_next[ncomp]); 1839 hio_recv_list_size[ncomp]++; 1840 mtx_unlock(&hio_recv_list_lock[ncomp]); 1841 continue; 1842 } 1843 } else { 1844 PJDLOG_ASSERT(!memsyncack); 1845 /* Remote final reply arrived. */ 1846 } 1847 } 1848 if (refcnt_release(&hio->hio_countdown) > 0) 1849 continue; 1850 if (ISSYNCREQ(hio)) { 1851 mtx_lock(&sync_lock); 1852 SYNCREQDONE(hio); 1853 mtx_unlock(&sync_lock); 1854 cv_signal(&sync_cond); 1855 } else { 1856 pjdlog_debug(2, 1857 "remote_recv: (%p) Moving request to the done queue.", 1858 hio); 1859 QUEUE_INSERT2(hio, done); 1860 } 1861 } 1862 /* NOTREACHED */ 1863 return (NULL); 1864} 1865 1866/* 1867 * Thread sends answer to the kernel. 1868 */ 1869static void * 1870ggate_send_thread(void *arg) 1871{ 1872 struct hast_resource *res = arg; 1873 struct g_gate_ctl_io *ggio; 1874 struct hio *hio; 1875 unsigned int ii, ncomps; 1876 1877 ncomps = HAST_NCOMPONENTS; 1878 1879 for (;;) { 1880 pjdlog_debug(2, "ggate_send: Taking request."); 1881 QUEUE_TAKE2(hio, done); 1882 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1883 ggio = &hio->hio_ggio; 1884 for (ii = 0; ii < ncomps; ii++) { 1885 if (hio->hio_errors[ii] == 0) { 1886 /* 1887 * One successful request is enough to declare 1888 * success. 1889 */ 1890 ggio->gctl_error = 0; 1891 break; 1892 } 1893 } 1894 if (ii == ncomps) { 1895 /* 1896 * None of the requests were successful. 1897 * Use the error from local component except the 1898 * case when we did only remote request. 1899 */ 1900 if (ggio->gctl_cmd == BIO_READ && 1901 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 1902 ggio->gctl_error = hio->hio_errors[1]; 1903 else 1904 ggio->gctl_error = hio->hio_errors[0]; 1905 } 1906 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1907 mtx_lock(&res->hr_amp_lock); 1908 if (activemap_write_complete(res->hr_amp, 1909 ggio->gctl_offset, ggio->gctl_length)) { 1910 res->hr_stat_activemap_update++; 1911 (void)hast_activemap_flush(res); 1912 } else { 1913 mtx_unlock(&res->hr_amp_lock); 1914 } 1915 } 1916 if (ggio->gctl_cmd == BIO_WRITE) { 1917 /* 1918 * Unlock range we locked. 1919 */ 1920 mtx_lock(&range_lock); 1921 rangelock_del(range_regular, ggio->gctl_offset, 1922 ggio->gctl_length); 1923 if (range_sync_wait) 1924 cv_signal(&range_sync_cond); 1925 mtx_unlock(&range_lock); 1926 if (!hio->hio_done) 1927 write_complete(res, hio); 1928 } else { 1929 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { 1930 primary_exit(EX_OSERR, 1931 "G_GATE_CMD_DONE failed"); 1932 } 1933 } 1934 if (hio->hio_errors[0]) { 1935 switch (ggio->gctl_cmd) { 1936 case BIO_READ: 1937 res->hr_stat_read_error++; 1938 break; 1939 case BIO_WRITE: 1940 res->hr_stat_write_error++; 1941 break; 1942 case BIO_DELETE: 1943 res->hr_stat_delete_error++; 1944 break; 1945 case BIO_FLUSH: 1946 res->hr_stat_flush_error++; 1947 break; 1948 } 1949 } 1950 pjdlog_debug(2, 1951 "ggate_send: (%p) Moving request to the free queue.", hio); 1952 QUEUE_INSERT2(hio, free); 1953 } 1954 /* NOTREACHED */ 1955 return (NULL); 1956} 1957 1958/* 1959 * Thread synchronize local and remote components. 1960 */ 1961static void * 1962sync_thread(void *arg __unused) 1963{ 1964 struct hast_resource *res = arg; 1965 struct hio *hio; 1966 struct g_gate_ctl_io *ggio; 1967 struct timeval tstart, tend, tdiff; 1968 unsigned int ii, ncomp, ncomps; 1969 off_t offset, length, synced; 1970 bool dorewind, directreads; 1971 int syncext; 1972 1973 ncomps = HAST_NCOMPONENTS; 1974 dorewind = true; 1975 synced = 0; 1976 offset = -1; 1977 directreads = false; 1978 1979 for (;;) { 1980 mtx_lock(&sync_lock); 1981 if (offset >= 0 && !sync_inprogress) { 1982 gettimeofday(&tend, NULL); 1983 timersub(&tend, &tstart, &tdiff); 1984 pjdlog_info("Synchronization interrupted after %#.0T. " 1985 "%NB synchronized so far.", &tdiff, 1986 (intmax_t)synced); 1987 event_send(res, EVENT_SYNCINTR); 1988 } 1989 while (!sync_inprogress) { 1990 dorewind = true; 1991 synced = 0; 1992 cv_wait(&sync_cond, &sync_lock); 1993 } 1994 mtx_unlock(&sync_lock); 1995 /* 1996 * Obtain offset at which we should synchronize. 1997 * Rewind synchronization if needed. 1998 */ 1999 mtx_lock(&res->hr_amp_lock); 2000 if (dorewind) 2001 activemap_sync_rewind(res->hr_amp); 2002 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 2003 if (syncext != -1) { 2004 /* 2005 * We synchronized entire syncext extent, we can mark 2006 * it as clean now. 2007 */ 2008 if (activemap_extent_complete(res->hr_amp, syncext)) 2009 (void)hast_activemap_flush(res); 2010 else 2011 mtx_unlock(&res->hr_amp_lock); 2012 } else { 2013 mtx_unlock(&res->hr_amp_lock); 2014 } 2015 if (dorewind) { 2016 dorewind = false; 2017 if (offset == -1) 2018 pjdlog_info("Nodes are in sync."); 2019 else { 2020 pjdlog_info("Synchronization started. %NB to go.", 2021 (intmax_t)(res->hr_extentsize * 2022 activemap_ndirty(res->hr_amp))); 2023 event_send(res, EVENT_SYNCSTART); 2024 gettimeofday(&tstart, NULL); 2025 } 2026 } 2027 if (offset == -1) { 2028 sync_stop(); 2029 pjdlog_debug(1, "Nothing to synchronize."); 2030 /* 2031 * Synchronization complete, make both localcnt and 2032 * remotecnt equal. 2033 */ 2034 ncomp = 1; 2035 rw_rlock(&hio_remote_lock[ncomp]); 2036 if (ISCONNECTED(res, ncomp)) { 2037 if (synced > 0) { 2038 int64_t bps; 2039 2040 gettimeofday(&tend, NULL); 2041 timersub(&tend, &tstart, &tdiff); 2042 bps = (int64_t)((double)synced / 2043 ((double)tdiff.tv_sec + 2044 (double)tdiff.tv_usec / 1000000)); 2045 pjdlog_info("Synchronization complete. " 2046 "%NB synchronized in %#.0lT (%NB/sec).", 2047 (intmax_t)synced, &tdiff, 2048 (intmax_t)bps); 2049 event_send(res, EVENT_SYNCDONE); 2050 } 2051 mtx_lock(&metadata_lock); 2052 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) 2053 directreads = true; 2054 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 2055 res->hr_primary_localcnt = 2056 res->hr_secondary_remotecnt; 2057 res->hr_primary_remotecnt = 2058 res->hr_secondary_localcnt; 2059 pjdlog_debug(1, 2060 "Setting localcnt to %ju and remotecnt to %ju.", 2061 (uintmax_t)res->hr_primary_localcnt, 2062 (uintmax_t)res->hr_primary_remotecnt); 2063 (void)metadata_write(res); 2064 mtx_unlock(&metadata_lock); 2065 } 2066 rw_unlock(&hio_remote_lock[ncomp]); 2067 if (directreads) { 2068 directreads = false; 2069 enable_direct_reads(res); 2070 } 2071 continue; 2072 } 2073 pjdlog_debug(2, "sync: Taking free request."); 2074 QUEUE_TAKE2(hio, free); 2075 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 2076 /* 2077 * Lock the range we are going to synchronize. We don't want 2078 * race where someone writes between our read and write. 2079 */ 2080 for (;;) { 2081 mtx_lock(&range_lock); 2082 if (rangelock_islocked(range_regular, offset, length)) { 2083 pjdlog_debug(2, 2084 "sync: Range offset=%jd length=%jd locked.", 2085 (intmax_t)offset, (intmax_t)length); 2086 range_sync_wait = true; 2087 cv_wait(&range_sync_cond, &range_lock); 2088 range_sync_wait = false; 2089 mtx_unlock(&range_lock); 2090 continue; 2091 } 2092 if (rangelock_add(range_sync, offset, length) == -1) { 2093 mtx_unlock(&range_lock); 2094 pjdlog_debug(2, 2095 "sync: Range offset=%jd length=%jd is already locked, waiting.", 2096 (intmax_t)offset, (intmax_t)length); 2097 sleep(1); 2098 continue; 2099 } 2100 mtx_unlock(&range_lock); 2101 break; 2102 } 2103 /* 2104 * First read the data from synchronization source. 2105 */ 2106 SYNCREQ(hio); 2107 ggio = &hio->hio_ggio; 2108 ggio->gctl_cmd = BIO_READ; 2109 ggio->gctl_offset = offset; 2110 ggio->gctl_length = length; 2111 ggio->gctl_error = 0; 2112 hio->hio_done = false; 2113 hio->hio_replication = res->hr_replication; 2114 for (ii = 0; ii < ncomps; ii++) 2115 hio->hio_errors[ii] = EINVAL; 2116 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2117 hio); 2118 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2119 hio); 2120 mtx_lock(&metadata_lock); 2121 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2122 /* 2123 * This range is up-to-date on local component, 2124 * so handle request locally. 2125 */ 2126 /* Local component is 0 for now. */ 2127 ncomp = 0; 2128 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2129 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2130 /* 2131 * This range is out-of-date on local component, 2132 * so send request to the remote node. 2133 */ 2134 /* Remote component is 1 for now. */ 2135 ncomp = 1; 2136 } 2137 mtx_unlock(&metadata_lock); 2138 refcnt_init(&hio->hio_countdown, 1); 2139 QUEUE_INSERT1(hio, send, ncomp); 2140 2141 /* 2142 * Let's wait for READ to finish. 2143 */ 2144 mtx_lock(&sync_lock); 2145 while (!ISSYNCREQDONE(hio)) 2146 cv_wait(&sync_cond, &sync_lock); 2147 mtx_unlock(&sync_lock); 2148 2149 if (hio->hio_errors[ncomp] != 0) { 2150 pjdlog_error("Unable to read synchronization data: %s.", 2151 strerror(hio->hio_errors[ncomp])); 2152 goto free_queue; 2153 } 2154 2155 /* 2156 * We read the data from synchronization source, now write it 2157 * to synchronization target. 2158 */ 2159 SYNCREQ(hio); 2160 ggio->gctl_cmd = BIO_WRITE; 2161 for (ii = 0; ii < ncomps; ii++) 2162 hio->hio_errors[ii] = EINVAL; 2163 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 2164 hio); 2165 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2166 hio); 2167 mtx_lock(&metadata_lock); 2168 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 2169 /* 2170 * This range is up-to-date on local component, 2171 * so we update remote component. 2172 */ 2173 /* Remote component is 1 for now. */ 2174 ncomp = 1; 2175 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 2176 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 2177 /* 2178 * This range is out-of-date on local component, 2179 * so we update it. 2180 */ 2181 /* Local component is 0 for now. */ 2182 ncomp = 0; 2183 } 2184 mtx_unlock(&metadata_lock); 2185 2186 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 2187 hio); 2188 refcnt_init(&hio->hio_countdown, 1); 2189 QUEUE_INSERT1(hio, send, ncomp); 2190 2191 /* 2192 * Let's wait for WRITE to finish. 2193 */ 2194 mtx_lock(&sync_lock); 2195 while (!ISSYNCREQDONE(hio)) 2196 cv_wait(&sync_cond, &sync_lock); 2197 mtx_unlock(&sync_lock); 2198 2199 if (hio->hio_errors[ncomp] != 0) { 2200 pjdlog_error("Unable to write synchronization data: %s.", 2201 strerror(hio->hio_errors[ncomp])); 2202 goto free_queue; 2203 } 2204 2205 synced += length; 2206free_queue: 2207 mtx_lock(&range_lock); 2208 rangelock_del(range_sync, offset, length); 2209 if (range_regular_wait) 2210 cv_signal(&range_regular_cond); 2211 mtx_unlock(&range_lock); 2212 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 2213 hio); 2214 QUEUE_INSERT2(hio, free); 2215 } 2216 /* NOTREACHED */ 2217 return (NULL); 2218} 2219 2220void 2221primary_config_reload(struct hast_resource *res, struct nv *nv) 2222{ 2223 unsigned int ii, ncomps; 2224 int modified, vint; 2225 const char *vstr; 2226 2227 pjdlog_info("Reloading configuration..."); 2228 2229 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 2230 PJDLOG_ASSERT(gres == res); 2231 nv_assert(nv, "remoteaddr"); 2232 nv_assert(nv, "sourceaddr"); 2233 nv_assert(nv, "replication"); 2234 nv_assert(nv, "checksum"); 2235 nv_assert(nv, "compression"); 2236 nv_assert(nv, "timeout"); 2237 nv_assert(nv, "exec"); 2238 nv_assert(nv, "metaflush"); 2239 2240 ncomps = HAST_NCOMPONENTS; 2241 2242#define MODIFIED_REMOTEADDR 0x01 2243#define MODIFIED_SOURCEADDR 0x02 2244#define MODIFIED_REPLICATION 0x04 2245#define MODIFIED_CHECKSUM 0x08 2246#define MODIFIED_COMPRESSION 0x10 2247#define MODIFIED_TIMEOUT 0x20 2248#define MODIFIED_EXEC 0x40 2249#define MODIFIED_METAFLUSH 0x80 2250 modified = 0; 2251 2252 vstr = nv_get_string(nv, "remoteaddr"); 2253 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 2254 /* 2255 * Don't copy res->hr_remoteaddr to gres just yet. 2256 * We want remote_close() to log disconnect from the old 2257 * addresses, not from the new ones. 2258 */ 2259 modified |= MODIFIED_REMOTEADDR; 2260 } 2261 vstr = nv_get_string(nv, "sourceaddr"); 2262 if (strcmp(gres->hr_sourceaddr, vstr) != 0) { 2263 strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); 2264 modified |= MODIFIED_SOURCEADDR; 2265 } 2266 vint = nv_get_int32(nv, "replication"); 2267 if (gres->hr_replication != vint) { 2268 gres->hr_replication = vint; 2269 modified |= MODIFIED_REPLICATION; 2270 } 2271 vint = nv_get_int32(nv, "checksum"); 2272 if (gres->hr_checksum != vint) { 2273 gres->hr_checksum = vint; 2274 modified |= MODIFIED_CHECKSUM; 2275 } 2276 vint = nv_get_int32(nv, "compression"); 2277 if (gres->hr_compression != vint) { 2278 gres->hr_compression = vint; 2279 modified |= MODIFIED_COMPRESSION; 2280 } 2281 vint = nv_get_int32(nv, "timeout"); 2282 if (gres->hr_timeout != vint) { 2283 gres->hr_timeout = vint; 2284 modified |= MODIFIED_TIMEOUT; 2285 } 2286 vstr = nv_get_string(nv, "exec"); 2287 if (strcmp(gres->hr_exec, vstr) != 0) { 2288 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 2289 modified |= MODIFIED_EXEC; 2290 } 2291 vint = nv_get_int32(nv, "metaflush"); 2292 if (gres->hr_metaflush != vint) { 2293 gres->hr_metaflush = vint; 2294 modified |= MODIFIED_METAFLUSH; 2295 } 2296 2297 /* 2298 * Change timeout for connected sockets. 2299 * Don't bother if we need to reconnect. 2300 */ 2301 if ((modified & MODIFIED_TIMEOUT) != 0 && 2302 (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { 2303 for (ii = 0; ii < ncomps; ii++) { 2304 if (!ISREMOTE(ii)) 2305 continue; 2306 rw_rlock(&hio_remote_lock[ii]); 2307 if (!ISCONNECTED(gres, ii)) { 2308 rw_unlock(&hio_remote_lock[ii]); 2309 continue; 2310 } 2311 rw_unlock(&hio_remote_lock[ii]); 2312 if (proto_timeout(gres->hr_remotein, 2313 gres->hr_timeout) == -1) { 2314 pjdlog_errno(LOG_WARNING, 2315 "Unable to set connection timeout"); 2316 } 2317 if (proto_timeout(gres->hr_remoteout, 2318 gres->hr_timeout) == -1) { 2319 pjdlog_errno(LOG_WARNING, 2320 "Unable to set connection timeout"); 2321 } 2322 } 2323 } 2324 if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { 2325 for (ii = 0; ii < ncomps; ii++) { 2326 if (!ISREMOTE(ii)) 2327 continue; 2328 remote_close(gres, ii); 2329 } 2330 if (modified & MODIFIED_REMOTEADDR) { 2331 vstr = nv_get_string(nv, "remoteaddr"); 2332 strlcpy(gres->hr_remoteaddr, vstr, 2333 sizeof(gres->hr_remoteaddr)); 2334 } 2335 } 2336#undef MODIFIED_REMOTEADDR 2337#undef MODIFIED_SOURCEADDR 2338#undef MODIFIED_REPLICATION 2339#undef MODIFIED_CHECKSUM 2340#undef MODIFIED_COMPRESSION 2341#undef MODIFIED_TIMEOUT 2342#undef MODIFIED_EXEC 2343#undef MODIFIED_METAFLUSH 2344 2345 pjdlog_info("Configuration reloaded successfully."); 2346} 2347 2348static void 2349guard_one(struct hast_resource *res, unsigned int ncomp) 2350{ 2351 struct proto_conn *in, *out; 2352 2353 if (!ISREMOTE(ncomp)) 2354 return; 2355 2356 rw_rlock(&hio_remote_lock[ncomp]); 2357 2358 if (!real_remote(res)) { 2359 rw_unlock(&hio_remote_lock[ncomp]); 2360 return; 2361 } 2362 2363 if (ISCONNECTED(res, ncomp)) { 2364 PJDLOG_ASSERT(res->hr_remotein != NULL); 2365 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2366 rw_unlock(&hio_remote_lock[ncomp]); 2367 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2368 res->hr_remoteaddr); 2369 return; 2370 } 2371 2372 PJDLOG_ASSERT(res->hr_remotein == NULL); 2373 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2374 /* 2375 * Upgrade the lock. It doesn't have to be atomic as no other thread 2376 * can change connection status from disconnected to connected. 2377 */ 2378 rw_unlock(&hio_remote_lock[ncomp]); 2379 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2380 res->hr_remoteaddr); 2381 in = out = NULL; 2382 if (init_remote(res, &in, &out) == 0) { 2383 rw_wlock(&hio_remote_lock[ncomp]); 2384 PJDLOG_ASSERT(res->hr_remotein == NULL); 2385 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2386 PJDLOG_ASSERT(in != NULL && out != NULL); 2387 res->hr_remotein = in; 2388 res->hr_remoteout = out; 2389 rw_unlock(&hio_remote_lock[ncomp]); 2390 pjdlog_info("Successfully reconnected to %s.", 2391 res->hr_remoteaddr); 2392 sync_start(); 2393 } else { 2394 /* Both connections should be NULL. */ 2395 PJDLOG_ASSERT(res->hr_remotein == NULL); 2396 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2397 PJDLOG_ASSERT(in == NULL && out == NULL); 2398 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2399 res->hr_remoteaddr); 2400 } 2401} 2402 2403/* 2404 * Thread guards remote connections and reconnects when needed, handles 2405 * signals, etc. 2406 */ 2407static void * 2408guard_thread(void *arg) 2409{ 2410 struct hast_resource *res = arg; 2411 unsigned int ii, ncomps; 2412 struct timespec timeout; 2413 time_t lastcheck, now; 2414 sigset_t mask; 2415 int signo; 2416 2417 ncomps = HAST_NCOMPONENTS; 2418 lastcheck = time(NULL); 2419 2420 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2421 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2422 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2423 2424 timeout.tv_sec = HAST_KEEPALIVE; 2425 timeout.tv_nsec = 0; 2426 signo = -1; 2427 2428 for (;;) { 2429 switch (signo) { 2430 case SIGINT: 2431 case SIGTERM: 2432 sigexit_received = true; 2433 primary_exitx(EX_OK, 2434 "Termination signal received, exiting."); 2435 break; 2436 default: 2437 break; 2438 } 2439 2440 /* 2441 * Don't check connections until we fully started, 2442 * as we may still be looping, waiting for remote node 2443 * to switch from primary to secondary. 2444 */ 2445 if (fullystarted) { 2446 pjdlog_debug(2, "remote_guard: Checking connections."); 2447 now = time(NULL); 2448 if (lastcheck + HAST_KEEPALIVE <= now) { 2449 for (ii = 0; ii < ncomps; ii++) 2450 guard_one(res, ii); 2451 lastcheck = now; 2452 } 2453 } 2454 signo = sigtimedwait(&mask, NULL, &timeout); 2455 } 2456 /* NOTREACHED */ 2457 return (NULL); 2458} 2459