secondary.c revision 247866
1/*- 2 * Copyright (c) 2009-2010 The FreeBSD Foundation 3 * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4 * All rights reserved. 5 * 6 * This software was developed by Pawel Jakub Dawidek under sponsorship from 7 * the FreeBSD Foundation. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 */ 30 31#include <sys/cdefs.h> 32__FBSDID("$FreeBSD: stable/9/sbin/hastd/secondary.c 247866 2013-03-06 06:57:18Z trociny $"); 33 34#include <sys/param.h> 35#include <sys/time.h> 36#include <sys/bio.h> 37#include <sys/disk.h> 38#include <sys/stat.h> 39 40#include <err.h> 41#include <errno.h> 42#include <fcntl.h> 43#include <libgeom.h> 44#include <pthread.h> 45#include <signal.h> 46#include <stdint.h> 47#include <stdio.h> 48#include <string.h> 49#include <sysexits.h> 50#include <unistd.h> 51 52#include <activemap.h> 53#include <nv.h> 54#include <pjdlog.h> 55 56#include "control.h" 57#include "event.h" 58#include "hast.h" 59#include "hast_proto.h" 60#include "hastd.h" 61#include "hooks.h" 62#include "metadata.h" 63#include "proto.h" 64#include "subr.h" 65#include "synch.h" 66 67struct hio { 68 uint64_t hio_seq; 69 int hio_error; 70 void *hio_data; 71 uint8_t hio_cmd; 72 uint64_t hio_offset; 73 uint64_t hio_length; 74 TAILQ_ENTRY(hio) hio_next; 75}; 76 77static struct hast_resource *gres; 78 79/* 80 * Free list holds unused structures. When free list is empty, we have to wait 81 * until some in-progress requests are freed. 82 */ 83static TAILQ_HEAD(, hio) hio_free_list; 84static pthread_mutex_t hio_free_list_lock; 85static pthread_cond_t hio_free_list_cond; 86/* 87 * Disk thread (the one that do I/O requests) takes requests from this list. 88 */ 89static TAILQ_HEAD(, hio) hio_disk_list; 90static pthread_mutex_t hio_disk_list_lock; 91static pthread_cond_t hio_disk_list_cond; 92/* 93 * There is one recv list for every component, although local components don't 94 * use recv lists as local requests are done synchronously. 95 */ 96static TAILQ_HEAD(, hio) hio_send_list; 97static pthread_mutex_t hio_send_list_lock; 98static pthread_cond_t hio_send_list_cond; 99 100/* 101 * Maximum number of outstanding I/O requests. 102 */ 103#define HAST_HIO_MAX 256 104 105static void *recv_thread(void *arg); 106static void *disk_thread(void *arg); 107static void *send_thread(void *arg); 108 109#define QUEUE_INSERT(name, hio) do { \ 110 bool _wakeup; \ 111 \ 112 mtx_lock(&hio_##name##_list_lock); \ 113 _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 114 TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 115 mtx_unlock(&hio_##name##_list_lock); \ 116 if (_wakeup) \ 117 cv_signal(&hio_##name##_list_cond); \ 118} while (0) 119#define QUEUE_TAKE(name, hio) do { \ 120 mtx_lock(&hio_##name##_list_lock); \ 121 while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 122 cv_wait(&hio_##name##_list_cond, \ 123 &hio_##name##_list_lock); \ 124 } \ 125 TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 126 mtx_unlock(&hio_##name##_list_lock); \ 127} while (0) 128 129static void 130hio_clear(struct hio *hio) 131{ 132 133 hio->hio_seq = 0; 134 hio->hio_error = 0; 135 hio->hio_cmd = HIO_UNDEF; 136 hio->hio_offset = 0; 137 hio->hio_length = 0; 138} 139 140static void 141init_environment(void) 142{ 143 struct hio *hio; 144 unsigned int ii; 145 146 /* 147 * Initialize lists, their locks and theirs condition variables. 148 */ 149 TAILQ_INIT(&hio_free_list); 150 mtx_init(&hio_free_list_lock); 151 cv_init(&hio_free_list_cond); 152 TAILQ_INIT(&hio_disk_list); 153 mtx_init(&hio_disk_list_lock); 154 cv_init(&hio_disk_list_cond); 155 TAILQ_INIT(&hio_send_list); 156 mtx_init(&hio_send_list_lock); 157 cv_init(&hio_send_list_cond); 158 159 /* 160 * Allocate requests pool and initialize requests. 161 */ 162 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 163 hio = malloc(sizeof(*hio)); 164 if (hio == NULL) { 165 pjdlog_exitx(EX_TEMPFAIL, 166 "Unable to allocate memory (%zu bytes) for hio request.", 167 sizeof(*hio)); 168 } 169 hio->hio_data = malloc(MAXPHYS); 170 if (hio->hio_data == NULL) { 171 pjdlog_exitx(EX_TEMPFAIL, 172 "Unable to allocate memory (%zu bytes) for gctl_data.", 173 (size_t)MAXPHYS); 174 } 175 hio_clear(hio); 176 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 177 } 178} 179 180static void 181init_local(struct hast_resource *res) 182{ 183 184 if (metadata_read(res, true) == -1) 185 exit(EX_NOINPUT); 186} 187 188static void 189init_remote(struct hast_resource *res, struct nv *nvin) 190{ 191 uint64_t resuid; 192 struct nv *nvout; 193 unsigned char *map; 194 size_t mapsize; 195 196#ifdef notyet 197 /* Setup direction. */ 198 if (proto_send(res->hr_remoteout, NULL, 0) == -1) 199 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 200#endif 201 202 nvout = nv_alloc(); 203 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 204 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 205 resuid = nv_get_uint64(nvin, "resuid"); 206 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 207 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 208 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 209 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 210 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 211 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 212 map = malloc(mapsize); 213 if (map == NULL) { 214 pjdlog_exitx(EX_TEMPFAIL, 215 "Unable to allocate memory (%zu bytes) for activemap.", 216 mapsize); 217 } 218 /* 219 * When we work as primary and secondary is missing we will increase 220 * localcnt in our metadata. When secondary is connected and synced 221 * we make localcnt be equal to remotecnt, which means nodes are more 222 * or less in sync. 223 * Split-brain condition is when both nodes are not able to communicate 224 * and are both configured as primary nodes. In turn, they can both 225 * make incompatible changes to the data and we have to detect that. 226 * Under split-brain condition we will increase our localcnt on first 227 * write and remote node will increase its localcnt on first write. 228 * When we connect we can see that primary's localcnt is greater than 229 * our remotecnt (primary was modified while we weren't watching) and 230 * our localcnt is greater than primary's remotecnt (we were modified 231 * while primary wasn't watching). 232 * There are many possible combinations which are all gathered below. 233 * Don't pay too much attention to exact numbers, the more important 234 * is to compare them. We compare secondary's local with primary's 235 * remote and secondary's remote with primary's local. 236 * Note that every case where primary's localcnt is smaller than 237 * secondary's remotecnt and where secondary's localcnt is smaller than 238 * primary's remotecnt should be impossible in practise. We will perform 239 * full synchronization then. Those cases are marked with an asterisk. 240 * Regular synchronization means that only extents marked as dirty are 241 * synchronized (regular synchronization). 242 * 243 * SECONDARY METADATA PRIMARY METADATA 244 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 245 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 246 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 247 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 248 * regular sync from secondary. 249 * local=3 remote=3 local=3 remote=3 Regular sync just in case. 250 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 251 * local=3 remote=3 local=4 remote=2 Split-brain condition. 252 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 253 * regular sync from primary. 254 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 255 */ 256 if (res->hr_resuid == 0) { 257 /* 258 * Provider is used for the first time. If primary node done no 259 * writes yet as well (we will find "virgin" argument) then 260 * there is no need to synchronize anything. If primary node 261 * done any writes already we have to synchronize everything. 262 */ 263 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); 264 res->hr_resuid = resuid; 265 if (metadata_write(res) == -1) 266 exit(EX_NOINPUT); 267 if (nv_exists(nvin, "virgin")) { 268 free(map); 269 map = NULL; 270 mapsize = 0; 271 } else { 272 memset(map, 0xff, mapsize); 273 } 274 nv_add_int8(nvout, 1, "virgin"); 275 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 276 } else if (res->hr_resuid != resuid) { 277 char errmsg[256]; 278 279 free(map); 280 (void)snprintf(errmsg, sizeof(errmsg), 281 "Resource unique ID mismatch (primary=%ju, secondary=%ju).", 282 (uintmax_t)resuid, (uintmax_t)res->hr_resuid); 283 pjdlog_error("%s", errmsg); 284 nv_add_string(nvout, errmsg, "errmsg"); 285 if (hast_proto_send(res, res->hr_remotein, nvout, 286 NULL, 0) == -1) { 287 pjdlog_exit(EX_TEMPFAIL, 288 "Unable to send response to %s", 289 res->hr_remoteaddr); 290 } 291 nv_free(nvout); 292 exit(EX_CONFIG); 293 } else if ( 294 /* Is primary out-of-date? */ 295 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 296 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 297 /* Are the nodes more or less in sync? */ 298 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 299 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 300 /* Is secondary out-of-date? */ 301 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 302 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 303 /* 304 * Nodes are more or less in sync or one of the nodes is 305 * out-of-date. 306 * It doesn't matter at this point which one, we just have to 307 * send out local bitmap to the remote node. 308 */ 309 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 310 (ssize_t)mapsize) { 311 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 312 } 313 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 314 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 315 /* Primary is out-of-date, sync from secondary. */ 316 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 317 } else { 318 /* 319 * Secondary is out-of-date or counts match. 320 * Sync from primary. 321 */ 322 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 323 } 324 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 325 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 326 /* 327 * Not good, we have split-brain condition. 328 */ 329 free(map); 330 pjdlog_error("Split-brain detected, exiting."); 331 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 332 if (hast_proto_send(res, res->hr_remotein, nvout, 333 NULL, 0) == -1) { 334 pjdlog_exit(EX_TEMPFAIL, 335 "Unable to send response to %s", 336 res->hr_remoteaddr); 337 } 338 nv_free(nvout); 339 /* Exit on split-brain. */ 340 event_send(res, EVENT_SPLITBRAIN); 341 exit(EX_CONFIG); 342 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 343 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 344 /* 345 * This should never happen in practise, but we will perform 346 * full synchronization. 347 */ 348 PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 349 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 350 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 351 METADATA_SIZE, res->hr_extentsize, 352 res->hr_local_sectorsize); 353 memset(map, 0xff, mapsize); 354 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 355 /* In this one of five cases sync from secondary. */ 356 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 357 } else { 358 /* For the rest four cases sync from primary. */ 359 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 360 } 361 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 362 (uintmax_t)res->hr_primary_localcnt, 363 (uintmax_t)res->hr_primary_remotecnt, 364 (uintmax_t)res->hr_secondary_localcnt, 365 (uintmax_t)res->hr_secondary_remotecnt); 366 } 367 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 368 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) { 369 pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", 370 res->hr_remoteaddr); 371 } 372 if (map != NULL) 373 free(map); 374 nv_free(nvout); 375#ifdef notyet 376 /* Setup direction. */ 377 if (proto_recv(res->hr_remotein, NULL, 0) == -1) 378 pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); 379#endif 380} 381 382void 383hastd_secondary(struct hast_resource *res, struct nv *nvin) 384{ 385 sigset_t mask; 386 pthread_t td; 387 pid_t pid; 388 int error, mode, debuglevel; 389 390 /* 391 * Create communication channel between parent and child. 392 */ 393 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { 394 KEEP_ERRNO((void)pidfile_remove(pfh)); 395 pjdlog_exit(EX_OSERR, 396 "Unable to create control sockets between parent and child"); 397 } 398 /* 399 * Create communication channel between child and parent. 400 */ 401 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { 402 KEEP_ERRNO((void)pidfile_remove(pfh)); 403 pjdlog_exit(EX_OSERR, 404 "Unable to create event sockets between child and parent"); 405 } 406 407 pid = fork(); 408 if (pid == -1) { 409 KEEP_ERRNO((void)pidfile_remove(pfh)); 410 pjdlog_exit(EX_OSERR, "Unable to fork"); 411 } 412 413 if (pid > 0) { 414 /* This is parent. */ 415 proto_close(res->hr_remotein); 416 res->hr_remotein = NULL; 417 proto_close(res->hr_remoteout); 418 res->hr_remoteout = NULL; 419 /* Declare that we are receiver. */ 420 proto_recv(res->hr_event, NULL, 0); 421 /* Declare that we are sender. */ 422 proto_send(res->hr_ctrl, NULL, 0); 423 res->hr_workerpid = pid; 424 return; 425 } 426 427 gres = res; 428 mode = pjdlog_mode_get(); 429 debuglevel = pjdlog_debug_get(); 430 431 /* Declare that we are sender. */ 432 proto_send(res->hr_event, NULL, 0); 433 /* Declare that we are receiver. */ 434 proto_recv(res->hr_ctrl, NULL, 0); 435 descriptors_cleanup(res); 436 437 descriptors_assert(res, mode); 438 439 pjdlog_init(mode); 440 pjdlog_debug_set(debuglevel); 441 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 442 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); 443 444 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 445 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 446 447 /* Error in setting timeout is not critical, but why should it fail? */ 448 if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1) 449 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 450 if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1) 451 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 452 453 init_local(res); 454 init_environment(); 455 456 if (drop_privs(res) != 0) 457 exit(EX_CONFIG); 458 pjdlog_info("Privileges successfully dropped."); 459 460 /* 461 * Create the control thread before sending any event to the parent, 462 * as we can deadlock when parent sends control request to worker, 463 * but worker has no control thread started yet, so parent waits. 464 * In the meantime worker sends an event to the parent, but parent 465 * is unable to handle the event, because it waits for control 466 * request response. 467 */ 468 error = pthread_create(&td, NULL, ctrl_thread, res); 469 PJDLOG_ASSERT(error == 0); 470 471 init_remote(res, nvin); 472 event_send(res, EVENT_CONNECT); 473 474 error = pthread_create(&td, NULL, recv_thread, res); 475 PJDLOG_ASSERT(error == 0); 476 error = pthread_create(&td, NULL, disk_thread, res); 477 PJDLOG_ASSERT(error == 0); 478 (void)send_thread(res); 479} 480 481static void 482reqlog(int loglevel, int debuglevel, int error, struct hio *hio, 483 const char *fmt, ...) 484{ 485 char msg[1024]; 486 va_list ap; 487 int len; 488 489 va_start(ap, fmt); 490 len = vsnprintf(msg, sizeof(msg), fmt, ap); 491 va_end(ap); 492 if ((size_t)len < sizeof(msg)) { 493 switch (hio->hio_cmd) { 494 case HIO_READ: 495 (void)snprintf(msg + len, sizeof(msg) - len, 496 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 497 (uintmax_t)hio->hio_length); 498 break; 499 case HIO_DELETE: 500 (void)snprintf(msg + len, sizeof(msg) - len, 501 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 502 (uintmax_t)hio->hio_length); 503 break; 504 case HIO_FLUSH: 505 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 506 break; 507 case HIO_WRITE: 508 (void)snprintf(msg + len, sizeof(msg) - len, 509 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 510 (uintmax_t)hio->hio_length); 511 break; 512 case HIO_KEEPALIVE: 513 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 514 break; 515 default: 516 (void)snprintf(msg + len, sizeof(msg) - len, 517 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 518 break; 519 } 520 } 521 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 522} 523 524static int 525requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv) 526{ 527 528 hio->hio_cmd = nv_get_uint8(nv, "cmd"); 529 if (hio->hio_cmd == 0) { 530 pjdlog_error("Header contains no 'cmd' field."); 531 hio->hio_error = EINVAL; 532 goto end; 533 } 534 if (hio->hio_cmd != HIO_KEEPALIVE) { 535 hio->hio_seq = nv_get_uint64(nv, "seq"); 536 if (hio->hio_seq == 0) { 537 pjdlog_error("Header contains no 'seq' field."); 538 hio->hio_error = EINVAL; 539 goto end; 540 } 541 } 542 switch (hio->hio_cmd) { 543 case HIO_FLUSH: 544 case HIO_KEEPALIVE: 545 break; 546 case HIO_READ: 547 case HIO_WRITE: 548 case HIO_DELETE: 549 hio->hio_offset = nv_get_uint64(nv, "offset"); 550 if (nv_error(nv) != 0) { 551 pjdlog_error("Header is missing 'offset' field."); 552 hio->hio_error = EINVAL; 553 goto end; 554 } 555 hio->hio_length = nv_get_uint64(nv, "length"); 556 if (nv_error(nv) != 0) { 557 pjdlog_error("Header is missing 'length' field."); 558 hio->hio_error = EINVAL; 559 goto end; 560 } 561 if (hio->hio_length == 0) { 562 pjdlog_error("Data length is zero."); 563 hio->hio_error = EINVAL; 564 goto end; 565 } 566 if (hio->hio_length > MAXPHYS) { 567 pjdlog_error("Data length is too large (%ju > %ju).", 568 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 569 hio->hio_error = EINVAL; 570 goto end; 571 } 572 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 573 pjdlog_error("Offset %ju is not multiple of sector size.", 574 (uintmax_t)hio->hio_offset); 575 hio->hio_error = EINVAL; 576 goto end; 577 } 578 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 579 pjdlog_error("Length %ju is not multiple of sector size.", 580 (uintmax_t)hio->hio_length); 581 hio->hio_error = EINVAL; 582 goto end; 583 } 584 if (hio->hio_offset + hio->hio_length > 585 (uint64_t)res->hr_datasize) { 586 pjdlog_error("Data offset is too large (%ju > %ju).", 587 (uintmax_t)(hio->hio_offset + hio->hio_length), 588 (uintmax_t)res->hr_datasize); 589 hio->hio_error = EINVAL; 590 goto end; 591 } 592 break; 593 default: 594 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 595 hio->hio_cmd); 596 hio->hio_error = EINVAL; 597 goto end; 598 } 599 hio->hio_error = 0; 600end: 601 return (hio->hio_error); 602} 603 604static __dead2 void 605secondary_exit(int exitcode, const char *fmt, ...) 606{ 607 va_list ap; 608 609 PJDLOG_ASSERT(exitcode != EX_OK); 610 va_start(ap, fmt); 611 pjdlogv_errno(LOG_ERR, fmt, ap); 612 va_end(ap); 613 event_send(gres, EVENT_DISCONNECT); 614 exit(exitcode); 615} 616 617/* 618 * Thread receives requests from the primary node. 619 */ 620static void * 621recv_thread(void *arg) 622{ 623 struct hast_resource *res = arg; 624 struct hio *hio; 625 struct nv *nv; 626 627 for (;;) { 628 pjdlog_debug(2, "recv: Taking free request."); 629 QUEUE_TAKE(free, hio); 630 pjdlog_debug(2, "recv: (%p) Got request.", hio); 631 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { 632 secondary_exit(EX_TEMPFAIL, 633 "Unable to receive request header"); 634 } 635 if (requnpack(res, hio, nv) != 0) { 636 nv_free(nv); 637 pjdlog_debug(2, 638 "recv: (%p) Moving request to the send queue.", 639 hio); 640 QUEUE_INSERT(send, hio); 641 continue; 642 } 643 switch (hio->hio_cmd) { 644 case HIO_READ: 645 res->hr_stat_read++; 646 break; 647 case HIO_WRITE: 648 res->hr_stat_write++; 649 break; 650 case HIO_DELETE: 651 res->hr_stat_delete++; 652 break; 653 case HIO_FLUSH: 654 res->hr_stat_flush++; 655 break; 656 case HIO_KEEPALIVE: 657 break; 658 default: 659 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 660 hio->hio_cmd); 661 } 662 reqlog(LOG_DEBUG, 2, -1, hio, 663 "recv: (%p) Got request header: ", hio); 664 if (hio->hio_cmd == HIO_KEEPALIVE) { 665 nv_free(nv); 666 pjdlog_debug(2, 667 "recv: (%p) Moving request to the free queue.", 668 hio); 669 hio_clear(hio); 670 QUEUE_INSERT(free, hio); 671 continue; 672 } else if (hio->hio_cmd == HIO_WRITE) { 673 if (hast_proto_recv_data(res, res->hr_remotein, nv, 674 hio->hio_data, MAXPHYS) == -1) { 675 secondary_exit(EX_TEMPFAIL, 676 "Unable to receive request data"); 677 } 678 } 679 nv_free(nv); 680 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 681 hio); 682 QUEUE_INSERT(disk, hio); 683 } 684 /* NOTREACHED */ 685 return (NULL); 686} 687 688/* 689 * Thread reads from or writes to local component and also handles DELETE and 690 * FLUSH requests. 691 */ 692static void * 693disk_thread(void *arg) 694{ 695 struct hast_resource *res = arg; 696 struct hio *hio; 697 ssize_t ret; 698 bool clear_activemap, logerror; 699 700 clear_activemap = true; 701 702 for (;;) { 703 pjdlog_debug(2, "disk: Taking request."); 704 QUEUE_TAKE(disk, hio); 705 while (clear_activemap) { 706 unsigned char *map; 707 size_t mapsize; 708 709 /* 710 * When first request is received, it means that primary 711 * already received our activemap, merged it and stored 712 * locally. We can now safely clear our activemap. 713 */ 714 mapsize = 715 activemap_calc_ondisk_size(res->hr_local_mediasize - 716 METADATA_SIZE, res->hr_extentsize, 717 res->hr_local_sectorsize); 718 map = calloc(1, mapsize); 719 if (map == NULL) { 720 pjdlog_warning("Unable to allocate memory to clear local activemap."); 721 break; 722 } 723 if (pwrite(res->hr_localfd, map, mapsize, 724 METADATA_SIZE) != (ssize_t)mapsize) { 725 pjdlog_errno(LOG_WARNING, 726 "Unable to store cleared activemap"); 727 free(map); 728 res->hr_stat_activemap_write_error++; 729 break; 730 } 731 free(map); 732 clear_activemap = false; 733 pjdlog_debug(1, "Local activemap cleared."); 734 break; 735 } 736 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 737 logerror = true; 738 /* Handle the actual request. */ 739 switch (hio->hio_cmd) { 740 case HIO_READ: 741 ret = pread(res->hr_localfd, hio->hio_data, 742 hio->hio_length, 743 hio->hio_offset + res->hr_localoff); 744 if (ret == -1) 745 hio->hio_error = errno; 746 else if (ret != (int64_t)hio->hio_length) 747 hio->hio_error = EIO; 748 else 749 hio->hio_error = 0; 750 break; 751 case HIO_WRITE: 752 ret = pwrite(res->hr_localfd, hio->hio_data, 753 hio->hio_length, 754 hio->hio_offset + res->hr_localoff); 755 if (ret == -1) 756 hio->hio_error = errno; 757 else if (ret != (int64_t)hio->hio_length) 758 hio->hio_error = EIO; 759 else 760 hio->hio_error = 0; 761 break; 762 case HIO_DELETE: 763 ret = g_delete(res->hr_localfd, 764 hio->hio_offset + res->hr_localoff, 765 hio->hio_length); 766 if (ret == -1) 767 hio->hio_error = errno; 768 else 769 hio->hio_error = 0; 770 break; 771 case HIO_FLUSH: 772 if (!res->hr_localflush) { 773 ret = -1; 774 hio->hio_error = EOPNOTSUPP; 775 logerror = false; 776 break; 777 } 778 ret = g_flush(res->hr_localfd); 779 if (ret == -1) { 780 if (errno == EOPNOTSUPP) 781 res->hr_localflush = false; 782 hio->hio_error = errno; 783 } else { 784 hio->hio_error = 0; 785 } 786 break; 787 default: 788 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 789 hio->hio_cmd); 790 } 791 if (logerror && hio->hio_error != 0) { 792 reqlog(LOG_ERR, 0, hio->hio_error, hio, 793 "Request failed: "); 794 } 795 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 796 hio); 797 QUEUE_INSERT(send, hio); 798 } 799 /* NOTREACHED */ 800 return (NULL); 801} 802 803/* 804 * Thread sends requests back to primary node. 805 */ 806static void * 807send_thread(void *arg) 808{ 809 struct hast_resource *res = arg; 810 struct nv *nvout; 811 struct hio *hio; 812 void *data; 813 size_t length; 814 815 for (;;) { 816 pjdlog_debug(2, "send: Taking request."); 817 QUEUE_TAKE(send, hio); 818 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 819 nvout = nv_alloc(); 820 /* Copy sequence number. */ 821 nv_add_uint64(nvout, hio->hio_seq, "seq"); 822 switch (hio->hio_cmd) { 823 case HIO_READ: 824 if (hio->hio_error == 0) { 825 data = hio->hio_data; 826 length = hio->hio_length; 827 break; 828 } 829 /* 830 * We send no data in case of an error. 831 */ 832 /* FALLTHROUGH */ 833 case HIO_DELETE: 834 case HIO_FLUSH: 835 case HIO_WRITE: 836 data = NULL; 837 length = 0; 838 break; 839 default: 840 PJDLOG_ABORT("Unexpected command (cmd=%hhu).", 841 hio->hio_cmd); 842 } 843 if (hio->hio_error != 0) { 844 switch (hio->hio_cmd) { 845 case HIO_READ: 846 res->hr_stat_read_error++; 847 break; 848 case HIO_WRITE: 849 res->hr_stat_write_error++; 850 break; 851 case HIO_DELETE: 852 res->hr_stat_delete_error++; 853 break; 854 case HIO_FLUSH: 855 res->hr_stat_flush_error++; 856 break; 857 } 858 nv_add_int16(nvout, hio->hio_error, "error"); 859 } 860 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 861 length) == -1) { 862 secondary_exit(EX_TEMPFAIL, "Unable to send reply"); 863 } 864 nv_free(nvout); 865 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 866 hio); 867 hio_clear(hio); 868 QUEUE_INSERT(free, hio); 869 } 870 /* NOTREACHED */ 871 return (NULL); 872} 873