secondary.c revision 211977
11590Srgrimes/*- 21590Srgrimes * Copyright (c) 2009-2010 The FreeBSD Foundation 31590Srgrimes * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 41590Srgrimes * All rights reserved. 51590Srgrimes * 61590Srgrimes * This software was developed by Pawel Jakub Dawidek under sponsorship from 71590Srgrimes * the FreeBSD Foundation. 81590Srgrimes * 91590Srgrimes * Redistribution and use in source and binary forms, with or without 101590Srgrimes * modification, are permitted provided that the following conditions 111590Srgrimes * are met: 121590Srgrimes * 1. Redistributions of source code must retain the above copyright 131590Srgrimes * notice, this list of conditions and the following disclaimer. 141590Srgrimes * 2. Redistributions in binary form must reproduce the above copyright 151590Srgrimes * notice, this list of conditions and the following disclaimer in the 161590Srgrimes * documentation and/or other materials provided with the distribution. 171590Srgrimes * 181590Srgrimes * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 191590Srgrimes * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 201590Srgrimes * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 211590Srgrimes * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 221590Srgrimes * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 231590Srgrimes * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 241590Srgrimes * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 251590Srgrimes * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 261590Srgrimes * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 271590Srgrimes * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 281590Srgrimes * SUCH DAMAGE. 291590Srgrimes */ 301590Srgrimes 311590Srgrimes#include <sys/cdefs.h> 321590Srgrimes__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 211977 2010-08-29 21:41:53Z pjd $"); 331590Srgrimes 341590Srgrimes#include <sys/param.h> 351590Srgrimes#include <sys/time.h> 361590Srgrimes#include <sys/bio.h> 3794609Sdwmalone#include <sys/disk.h> 3894609Sdwmalone#include <sys/stat.h> 3994609Sdwmalone 4094609Sdwmalone#include <assert.h> 4194609Sdwmalone#include <err.h> 4294609Sdwmalone#include <errno.h> 4387712Smarkm#include <fcntl.h> 4487712Smarkm#include <libgeom.h> 4587712Smarkm#include <pthread.h> 461590Srgrimes#include <stdint.h> 471590Srgrimes#include <stdio.h> 481590Srgrimes#include <string.h> 491590Srgrimes#include <sysexits.h> 5087712Smarkm#include <unistd.h> 5187712Smarkm 521590Srgrimes#include <activemap.h> 531590Srgrimes#include <nv.h> 541590Srgrimes#include <pjdlog.h> 551590Srgrimes 5687712Smarkm#include "control.h" 5787712Smarkm#include "hast.h" 581590Srgrimes#include "hast_proto.h" 591590Srgrimes#include "hastd.h" 6092922Simp#include "hooks.h" 6192922Simp#include "metadata.h" 621590Srgrimes#include "proto.h" 631590Srgrimes#include "subr.h" 641590Srgrimes#include "synch.h" 651590Srgrimes 661590Srgrimesstruct hio { 671590Srgrimes uint64_t hio_seq; 681590Srgrimes int hio_error; 691590Srgrimes struct nv *hio_nv; 701590Srgrimes void *hio_data; 711590Srgrimes uint8_t hio_cmd; 721590Srgrimes uint64_t hio_offset; 731590Srgrimes uint64_t hio_length; 741590Srgrimes TAILQ_ENTRY(hio) hio_next; 751590Srgrimes}; 761590Srgrimes 771590Srgrimes/* 781590Srgrimes * Free list holds unused structures. When free list is empty, we have to wait 791590Srgrimes * until some in-progress requests are freed. 801590Srgrimes */ 811590Srgrimesstatic TAILQ_HEAD(, hio) hio_free_list; 821590Srgrimesstatic pthread_mutex_t hio_free_list_lock; 831590Srgrimesstatic pthread_cond_t hio_free_list_cond; 841590Srgrimes/* 8582762Sache * Disk thread (the one that do I/O requests) takes requests from this list. 861590Srgrimes */ 871590Srgrimesstatic TAILQ_HEAD(, hio) hio_disk_list; 881590Srgrimesstatic pthread_mutex_t hio_disk_list_lock; 891590Srgrimesstatic pthread_cond_t hio_disk_list_cond; 901590Srgrimes/* 911590Srgrimes * There is one recv list for every component, although local components don't 921590Srgrimes * use recv lists as local requests are done synchronously. 931590Srgrimes */ 941590Srgrimesstatic TAILQ_HEAD(, hio) hio_send_list; 951590Srgrimesstatic pthread_mutex_t hio_send_list_lock; 961590Srgrimesstatic pthread_cond_t hio_send_list_cond; 971590Srgrimes 981590Srgrimes/* 991590Srgrimes * Maximum number of outstanding I/O requests. 1001590Srgrimes */ 1011590Srgrimes#define HAST_HIO_MAX 256 1021590Srgrimes 1031590Srgrimesstatic void *recv_thread(void *arg); 1041590Srgrimesstatic void *disk_thread(void *arg); 1051590Srgrimesstatic void *send_thread(void *arg); 10687712Smarkm 10794178Smurray#define QUEUE_INSERT(name, hio) do { \ 1081590Srgrimes bool _wakeup; \ 1091590Srgrimes \ 1101590Srgrimes mtx_lock(&hio_##name##_list_lock); \ 1111590Srgrimes _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 1121590Srgrimes TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 1131590Srgrimes mtx_unlock(&hio_##name##_list_lock); \ 1141590Srgrimes if (_wakeup) \ 1151590Srgrimes cv_signal(&hio_##name##_list_cond); \ 1161590Srgrimes} while (0) 11769552Sasmodai#define QUEUE_TAKE(name, hio) do { \ 11882762Sache mtx_lock(&hio_##name##_list_lock); \ 1191590Srgrimes while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 1201590Srgrimes cv_wait(&hio_##name##_list_cond, \ 12174876Sdwmalone &hio_##name##_list_lock); \ 12274876Sdwmalone } \ 12374876Sdwmalone TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 1241590Srgrimes mtx_unlock(&hio_##name##_list_lock); \ 1251590Srgrimes} while (0) 1261590Srgrimes 1271590Srgrimesstatic void 12874876Sdwmaloneinit_environment(void) 12974876Sdwmalone{ 13074876Sdwmalone struct hio *hio; 1311590Srgrimes unsigned int ii; 13274876Sdwmalone 13374876Sdwmalone /* 13474876Sdwmalone * Initialize lists, their locks and theirs condition variables. 13574876Sdwmalone */ 13674876Sdwmalone TAILQ_INIT(&hio_free_list); 13774876Sdwmalone mtx_init(&hio_free_list_lock); 13874876Sdwmalone cv_init(&hio_free_list_cond); 13974876Sdwmalone TAILQ_INIT(&hio_disk_list); 14074876Sdwmalone mtx_init(&hio_disk_list_lock); 14174876Sdwmalone cv_init(&hio_disk_list_cond); 14274876Sdwmalone TAILQ_INIT(&hio_send_list); 14374876Sdwmalone mtx_init(&hio_send_list_lock); 14474876Sdwmalone cv_init(&hio_send_list_cond); 14574876Sdwmalone 14674876Sdwmalone /* 14774876Sdwmalone * Allocate requests pool and initialize requests. 14874876Sdwmalone */ 14974876Sdwmalone for (ii = 0; ii < HAST_HIO_MAX; ii++) { 15074876Sdwmalone hio = malloc(sizeof(*hio)); 15174876Sdwmalone if (hio == NULL) { 15274876Sdwmalone pjdlog_exitx(EX_TEMPFAIL, 15374876Sdwmalone "Unable to allocate memory (%zu bytes) for hio request.", 15474876Sdwmalone sizeof(*hio)); 1551590Srgrimes } 15674876Sdwmalone hio->hio_error = 0; 15774876Sdwmalone hio->hio_data = malloc(MAXPHYS); 15874876Sdwmalone if (hio->hio_data == NULL) { 15974876Sdwmalone pjdlog_exitx(EX_TEMPFAIL, 16074876Sdwmalone "Unable to allocate memory (%zu bytes) for gctl_data.", 16174876Sdwmalone (size_t)MAXPHYS); 16274876Sdwmalone } 1631590Srgrimes TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 16474876Sdwmalone } 16574876Sdwmalone} 16674876Sdwmalone 16774876Sdwmalonestatic void 16874876Sdwmaloneinit_local(struct hast_resource *res) 16974876Sdwmalone{ 17074876Sdwmalone 1711590Srgrimes if (metadata_read(res, true) < 0) 17274876Sdwmalone exit(EX_NOINPUT); 17374876Sdwmalone} 17417833Sadam 17574876Sdwmalonestatic void 17674876Sdwmaloneinit_remote(struct hast_resource *res, struct nv *nvin) 17774876Sdwmalone{ 17874876Sdwmalone uint64_t resuid; 1791590Srgrimes struct nv *nvout; 1801590Srgrimes unsigned char *map; 1811590Srgrimes size_t mapsize; 1821590Srgrimes 1831590Srgrimes map = NULL; 1841590Srgrimes mapsize = 0; 1851590Srgrimes nvout = nv_alloc(); 1861590Srgrimes nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 1871590Srgrimes nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 1881590Srgrimes resuid = nv_get_uint64(nvin, "resuid"); 1891590Srgrimes res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 1901590Srgrimes res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 1911590Srgrimes nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 1921590Srgrimes nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 1931590Srgrimes mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 1941590Srgrimes METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 1951590Srgrimes map = malloc(mapsize); 1961590Srgrimes if (map == NULL) { 1971590Srgrimes pjdlog_exitx(EX_TEMPFAIL, 1981590Srgrimes "Unable to allocate memory (%zu bytes) for activemap.", 1991590Srgrimes mapsize); 2001590Srgrimes } 2011590Srgrimes nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 20269552Sasmodai /* 20369552Sasmodai * When we work as primary and secondary is missing we will increase 20469552Sasmodai * localcnt in our metadata. When secondary is connected and synced 2051590Srgrimes * we make localcnt be equal to remotecnt, which means nodes are more 2061590Srgrimes * or less in sync. 2071590Srgrimes * Split-brain condition is when both nodes are not able to communicate 2081590Srgrimes * and are both configured as primary nodes. In turn, they can both 2091590Srgrimes * make incompatible changes to the data and we have to detect that. 2101590Srgrimes * Under split-brain condition we will increase our localcnt on first 2111590Srgrimes * write and remote node will increase its localcnt on first write. 2121590Srgrimes * When we connect we can see that primary's localcnt is greater than 2131590Srgrimes * our remotecnt (primary was modified while we weren't watching) and 2141590Srgrimes * our localcnt is greater than primary's remotecnt (we were modified 2151590Srgrimes * while primary wasn't watching). 2161590Srgrimes * There are many possible combinations which are all gathered below. 21717825Speter * Don't pay too much attention to exact numbers, the more important 2181590Srgrimes * is to compare them. We compare secondary's local with primary's 2191590Srgrimes * remote and secondary's remote with primary's local. 2201590Srgrimes * Note that every case where primary's localcnt is smaller than 2211590Srgrimes * secondary's remotecnt and where secondary's localcnt is smaller than 2221590Srgrimes * primary's remotecnt should be impossible in practise. We will perform 2231590Srgrimes * full synchronization then. Those cases are marked with an asterisk. 2241590Srgrimes * Regular synchronization means that only extents marked as dirty are 22594609Sdwmalone * synchronized (regular synchronization). 22694609Sdwmalone * 22794609Sdwmalone * SECONDARY METADATA PRIMARY METADATA 22894609Sdwmalone * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 2291590Srgrimes * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 2301590Srgrimes * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 2311590Srgrimes * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 2321590Srgrimes * regular sync from secondary. 2331590Srgrimes * local=3 remote=3 local=3 remote=3 Regular sync just in case. 2341590Srgrimes * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 23517339Sadam * local=3 remote=3 local=4 remote=2 Split-brain condition. 23617339Sadam * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 23717339Sadam * regular sync from primary. 23817339Sadam * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 23917339Sadam */ 2401590Srgrimes if (res->hr_resuid == 0) { 2411590Srgrimes /* 2421590Srgrimes * Provider is used for the first time. Initialize everything. 2431590Srgrimes */ 2441590Srgrimes assert(res->hr_secondary_localcnt == 0); 2451590Srgrimes res->hr_resuid = resuid; 2461590Srgrimes if (metadata_write(res) < 0) 2471590Srgrimes exit(EX_NOINPUT); 2481590Srgrimes memset(map, 0xff, mapsize); 2491590Srgrimes nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 2501590Srgrimes } else if ( 2511590Srgrimes /* Is primary is out-of-date? */ 2521590Srgrimes (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 2531590Srgrimes res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 2541590Srgrimes /* Node are more or less in sync? */ 2551590Srgrimes (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 2561590Srgrimes res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 25737453Sbde /* Is secondary is out-of-date? */ 2581590Srgrimes (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 2591590Srgrimes res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 2601590Srgrimes /* 2611590Srgrimes * Nodes are more or less in sync or one of the nodes is 2621590Srgrimes * out-of-date. 2631590Srgrimes * It doesn't matter at this point which one, we just have to 2641590Srgrimes * send out local bitmap to the remote node. 2651590Srgrimes */ 2661590Srgrimes if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 2671590Srgrimes (ssize_t)mapsize) { 2681590Srgrimes pjdlog_exit(LOG_ERR, "Unable to read activemap"); 2691590Srgrimes } 2701590Srgrimes if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 2711590Srgrimes res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 2721590Srgrimes /* Primary is out-of-date, sync from secondary. */ 2731590Srgrimes nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 2741590Srgrimes } else { 2751590Srgrimes /* 2761590Srgrimes * Secondary is out-of-date or counts match. 2771590Srgrimes * Sync from primary. 2781590Srgrimes */ 2791590Srgrimes nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 2801590Srgrimes } 2811590Srgrimes } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 2821590Srgrimes res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 2831590Srgrimes /* 2841590Srgrimes * Not good, we have split-brain condition. 2851590Srgrimes */ 2861590Srgrimes pjdlog_error("Split-brain detected, exiting."); 2871590Srgrimes nv_add_string(nvout, "Split-brain condition!", "errmsg"); 2881590Srgrimes free(map); 2891590Srgrimes map = NULL; 2901590Srgrimes mapsize = 0; 2911590Srgrimes } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 2921590Srgrimes res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 2931590Srgrimes /* 2941590Srgrimes * This should never happen in practise, but we will perform 2951590Srgrimes * full synchronization. 296 */ 297 assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 298 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 299 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 300 METADATA_SIZE, res->hr_extentsize, 301 res->hr_local_sectorsize); 302 memset(map, 0xff, mapsize); 303 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 304 /* In this one of five cases sync from secondary. */ 305 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 306 } else { 307 /* For the rest four cases sync from primary. */ 308 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 309 } 310 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 311 (uintmax_t)res->hr_primary_localcnt, 312 (uintmax_t)res->hr_primary_remotecnt, 313 (uintmax_t)res->hr_secondary_localcnt, 314 (uintmax_t)res->hr_secondary_remotecnt); 315 } 316 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 317 pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s", 318 res->hr_remoteaddr); 319 nv_free(nvout); 320 exit(EX_TEMPFAIL); 321 } 322 nv_free(nvout); 323 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 324 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 325 /* Exit on split-brain. */ 326 exit(EX_CONFIG); 327 } 328} 329 330void 331hastd_secondary(struct hast_resource *res, struct nv *nvin) 332{ 333 pthread_t td; 334 pid_t pid; 335 int error; 336 337 /* 338 * Create communication channel between parent and child. 339 */ 340 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 341 KEEP_ERRNO((void)pidfile_remove(pfh)); 342 pjdlog_exit(EX_OSERR, 343 "Unable to create control sockets between parent and child"); 344 } 345 346 pid = fork(); 347 if (pid < 0) { 348 KEEP_ERRNO((void)pidfile_remove(pfh)); 349 pjdlog_exit(EX_OSERR, "Unable to fork"); 350 } 351 352 if (pid > 0) { 353 /* This is parent. */ 354 proto_close(res->hr_remotein); 355 res->hr_remotein = NULL; 356 proto_close(res->hr_remoteout); 357 res->hr_remoteout = NULL; 358 res->hr_workerpid = pid; 359 return; 360 } 361 362 (void)pidfile_close(pfh); 363 hook_fini(); 364 365 setproctitle("%s (secondary)", res->hr_name); 366 367 signal(SIGHUP, SIG_DFL); 368 signal(SIGCHLD, SIG_DFL); 369 370 /* Error in setting timeout is not critical, but why should it fail? */ 371 if (proto_timeout(res->hr_remotein, 0) < 0) 372 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 373 if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 374 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 375 376 init_local(res); 377 init_remote(res, nvin); 378 init_environment(); 379 380 error = pthread_create(&td, NULL, recv_thread, res); 381 assert(error == 0); 382 error = pthread_create(&td, NULL, disk_thread, res); 383 assert(error == 0); 384 error = pthread_create(&td, NULL, send_thread, res); 385 assert(error == 0); 386 (void)ctrl_thread(res); 387} 388 389static void 390reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 391{ 392 char msg[1024]; 393 va_list ap; 394 int len; 395 396 va_start(ap, fmt); 397 len = vsnprintf(msg, sizeof(msg), fmt, ap); 398 va_end(ap); 399 if ((size_t)len < sizeof(msg)) { 400 switch (hio->hio_cmd) { 401 case HIO_READ: 402 (void)snprintf(msg + len, sizeof(msg) - len, 403 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 404 (uintmax_t)hio->hio_length); 405 break; 406 case HIO_DELETE: 407 (void)snprintf(msg + len, sizeof(msg) - len, 408 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 409 (uintmax_t)hio->hio_length); 410 break; 411 case HIO_FLUSH: 412 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 413 break; 414 case HIO_WRITE: 415 (void)snprintf(msg + len, sizeof(msg) - len, 416 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 417 (uintmax_t)hio->hio_length); 418 break; 419 case HIO_KEEPALIVE: 420 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 421 break; 422 default: 423 (void)snprintf(msg + len, sizeof(msg) - len, 424 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 425 break; 426 } 427 } 428 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 429} 430 431static int 432requnpack(struct hast_resource *res, struct hio *hio) 433{ 434 435 hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd"); 436 if (hio->hio_cmd == 0) { 437 pjdlog_error("Header contains no 'cmd' field."); 438 hio->hio_error = EINVAL; 439 goto end; 440 } 441 switch (hio->hio_cmd) { 442 case HIO_KEEPALIVE: 443 break; 444 case HIO_READ: 445 case HIO_WRITE: 446 case HIO_DELETE: 447 hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset"); 448 if (nv_error(hio->hio_nv) != 0) { 449 pjdlog_error("Header is missing 'offset' field."); 450 hio->hio_error = EINVAL; 451 goto end; 452 } 453 hio->hio_length = nv_get_uint64(hio->hio_nv, "length"); 454 if (nv_error(hio->hio_nv) != 0) { 455 pjdlog_error("Header is missing 'length' field."); 456 hio->hio_error = EINVAL; 457 goto end; 458 } 459 if (hio->hio_length == 0) { 460 pjdlog_error("Data length is zero."); 461 hio->hio_error = EINVAL; 462 goto end; 463 } 464 if (hio->hio_length > MAXPHYS) { 465 pjdlog_error("Data length is too large (%ju > %ju).", 466 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 467 hio->hio_error = EINVAL; 468 goto end; 469 } 470 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 471 pjdlog_error("Offset %ju is not multiple of sector size.", 472 (uintmax_t)hio->hio_offset); 473 hio->hio_error = EINVAL; 474 goto end; 475 } 476 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 477 pjdlog_error("Length %ju is not multiple of sector size.", 478 (uintmax_t)hio->hio_length); 479 hio->hio_error = EINVAL; 480 goto end; 481 } 482 if (hio->hio_offset + hio->hio_length > 483 (uint64_t)res->hr_datasize) { 484 pjdlog_error("Data offset is too large (%ju > %ju).", 485 (uintmax_t)(hio->hio_offset + hio->hio_length), 486 (uintmax_t)res->hr_datasize); 487 hio->hio_error = EINVAL; 488 goto end; 489 } 490 break; 491 default: 492 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 493 hio->hio_cmd); 494 hio->hio_error = EINVAL; 495 goto end; 496 } 497 hio->hio_error = 0; 498end: 499 return (hio->hio_error); 500} 501 502/* 503 * Thread receives requests from the primary node. 504 */ 505static void * 506recv_thread(void *arg) 507{ 508 struct hast_resource *res = arg; 509 struct hio *hio; 510 511 for (;;) { 512 pjdlog_debug(2, "recv: Taking free request."); 513 QUEUE_TAKE(free, hio); 514 pjdlog_debug(2, "recv: (%p) Got request.", hio); 515 if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) { 516 pjdlog_exit(EX_TEMPFAIL, 517 "Unable to receive request header"); 518 } 519 if (requnpack(res, hio) != 0) { 520 pjdlog_debug(2, 521 "recv: (%p) Moving request to the send queue.", 522 hio); 523 QUEUE_INSERT(send, hio); 524 continue; 525 } 526 reqlog(LOG_DEBUG, 2, -1, hio, 527 "recv: (%p) Got request header: ", hio); 528 if (hio->hio_cmd == HIO_KEEPALIVE) { 529 pjdlog_debug(2, 530 "recv: (%p) Moving request to the free queue.", 531 hio); 532 nv_free(hio->hio_nv); 533 QUEUE_INSERT(free, hio); 534 continue; 535 } else if (hio->hio_cmd == HIO_WRITE) { 536 if (hast_proto_recv_data(res, res->hr_remotein, 537 hio->hio_nv, hio->hio_data, MAXPHYS) < 0) { 538 pjdlog_exit(EX_TEMPFAIL, 539 "Unable to receive reply data"); 540 } 541 } 542 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 543 hio); 544 QUEUE_INSERT(disk, hio); 545 } 546 /* NOTREACHED */ 547 return (NULL); 548} 549 550/* 551 * Thread reads from or writes to local component and also handles DELETE and 552 * FLUSH requests. 553 */ 554static void * 555disk_thread(void *arg) 556{ 557 struct hast_resource *res = arg; 558 struct hio *hio; 559 ssize_t ret; 560 bool clear_activemap; 561 562 clear_activemap = true; 563 564 for (;;) { 565 pjdlog_debug(2, "disk: Taking request."); 566 QUEUE_TAKE(disk, hio); 567 while (clear_activemap) { 568 unsigned char *map; 569 size_t mapsize; 570 571 /* 572 * When first request is received, it means that primary 573 * already received our activemap, merged it and stored 574 * locally. We can now safely clear our activemap. 575 */ 576 mapsize = 577 activemap_calc_ondisk_size(res->hr_local_mediasize - 578 METADATA_SIZE, res->hr_extentsize, 579 res->hr_local_sectorsize); 580 map = calloc(1, mapsize); 581 if (map == NULL) { 582 pjdlog_warning("Unable to allocate memory to clear local activemap."); 583 break; 584 } 585 if (pwrite(res->hr_localfd, map, mapsize, 586 METADATA_SIZE) != (ssize_t)mapsize) { 587 pjdlog_errno(LOG_WARNING, 588 "Unable to store cleared activemap"); 589 free(map); 590 break; 591 } 592 free(map); 593 clear_activemap = false; 594 pjdlog_debug(1, "Local activemap cleared."); 595 } 596 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 597 /* Handle the actual request. */ 598 switch (hio->hio_cmd) { 599 case HIO_READ: 600 ret = pread(res->hr_localfd, hio->hio_data, 601 hio->hio_length, 602 hio->hio_offset + res->hr_localoff); 603 if (ret < 0) 604 hio->hio_error = errno; 605 else if (ret != (int64_t)hio->hio_length) 606 hio->hio_error = EIO; 607 else 608 hio->hio_error = 0; 609 break; 610 case HIO_WRITE: 611 ret = pwrite(res->hr_localfd, hio->hio_data, 612 hio->hio_length, 613 hio->hio_offset + res->hr_localoff); 614 if (ret < 0) 615 hio->hio_error = errno; 616 else if (ret != (int64_t)hio->hio_length) 617 hio->hio_error = EIO; 618 else 619 hio->hio_error = 0; 620 break; 621 case HIO_DELETE: 622 ret = g_delete(res->hr_localfd, 623 hio->hio_offset + res->hr_localoff, 624 hio->hio_length); 625 if (ret < 0) 626 hio->hio_error = errno; 627 else 628 hio->hio_error = 0; 629 break; 630 case HIO_FLUSH: 631 ret = g_flush(res->hr_localfd); 632 if (ret < 0) 633 hio->hio_error = errno; 634 else 635 hio->hio_error = 0; 636 break; 637 } 638 if (hio->hio_error != 0) { 639 reqlog(LOG_ERR, 0, hio->hio_error, hio, 640 "Request failed: "); 641 } 642 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 643 hio); 644 QUEUE_INSERT(send, hio); 645 } 646 /* NOTREACHED */ 647 return (NULL); 648} 649 650/* 651 * Thread sends requests back to primary node. 652 */ 653static void * 654send_thread(void *arg) 655{ 656 struct hast_resource *res = arg; 657 struct nv *nvout; 658 struct hio *hio; 659 void *data; 660 size_t length; 661 662 for (;;) { 663 pjdlog_debug(2, "send: Taking request."); 664 QUEUE_TAKE(send, hio); 665 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 666 nvout = nv_alloc(); 667 /* Copy sequence number. */ 668 nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq"); 669 switch (hio->hio_cmd) { 670 case HIO_READ: 671 if (hio->hio_error == 0) { 672 data = hio->hio_data; 673 length = hio->hio_length; 674 break; 675 } 676 /* 677 * We send no data in case of an error. 678 */ 679 /* FALLTHROUGH */ 680 case HIO_DELETE: 681 case HIO_FLUSH: 682 case HIO_WRITE: 683 data = NULL; 684 length = 0; 685 break; 686 default: 687 abort(); 688 break; 689 } 690 if (hio->hio_error != 0) 691 nv_add_int16(nvout, hio->hio_error, "error"); 692 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 693 length) < 0) { 694 pjdlog_exit(EX_TEMPFAIL, "Unable to send reply."); 695 } 696 nv_free(nvout); 697 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 698 hio); 699 nv_free(hio->hio_nv); 700 hio->hio_error = 0; 701 QUEUE_INSERT(free, hio); 702 } 703 /* NOTREACHED */ 704 return (NULL); 705} 706