secondary.c revision 214275
1280304Sjkim/*- 2280304Sjkim * Copyright (c) 2009-2010 The FreeBSD Foundation 3280304Sjkim * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4238384Sjkim * All rights reserved. 5238384Sjkim * 6238384Sjkim * This software was developed by Pawel Jakub Dawidek under sponsorship from 7238384Sjkim * the FreeBSD Foundation. 8238384Sjkim * 9238384Sjkim * Redistribution and use in source and binary forms, with or without 10238384Sjkim * modification, are permitted provided that the following conditions 11238384Sjkim * are met: 12238384Sjkim * 1. Redistributions of source code must retain the above copyright 13280304Sjkim * notice, this list of conditions and the following disclaimer. 14238384Sjkim * 2. Redistributions in binary form must reproduce the above copyright 15238384Sjkim * notice, this list of conditions and the following disclaimer in the 16238384Sjkim * documentation and/or other materials provided with the distribution. 17238384Sjkim * 18238384Sjkim * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19238384Sjkim * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20238384Sjkim * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21238384Sjkim * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22238384Sjkim * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23238384Sjkim * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24238384Sjkim * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25238384Sjkim * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26238384Sjkim * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27238384Sjkim * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28238384Sjkim * SUCH DAMAGE. 29238384Sjkim */ 30238384Sjkim 31238384Sjkim#include <sys/cdefs.h> 32238384Sjkim__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 214275 2010-10-24 15:42:16Z pjd $"); 33238384Sjkim 34238384Sjkim#include <sys/param.h> 35238384Sjkim#include <sys/time.h> 36238384Sjkim#include <sys/bio.h> 37238384Sjkim#include <sys/disk.h> 38238384Sjkim#include <sys/stat.h> 39238384Sjkim 40238384Sjkim#include <assert.h> 41238384Sjkim#include <err.h> 42238384Sjkim#include <errno.h> 43238384Sjkim#include <fcntl.h> 44238384Sjkim#include <libgeom.h> 45238384Sjkim#include <pthread.h> 46238384Sjkim#include <signal.h> 47238384Sjkim#include <stdint.h> 48238384Sjkim#include <stdio.h> 49238384Sjkim#include <string.h> 50238384Sjkim#include <sysexits.h> 51238384Sjkim#include <unistd.h> 52238384Sjkim 53238384Sjkim#include <activemap.h> 54238384Sjkim#include <nv.h> 55238384Sjkim#include <pjdlog.h> 56238384Sjkim 57238384Sjkim#include "control.h" 58238384Sjkim#include "event.h" 59238384Sjkim#include "hast.h" 60238384Sjkim#include "hast_proto.h" 61238384Sjkim#include "hastd.h" 62238384Sjkim#include "hooks.h" 63238384Sjkim#include "metadata.h" 64238384Sjkim#include "proto.h" 65238384Sjkim#include "subr.h" 66238384Sjkim#include "synch.h" 67238384Sjkim 68238384Sjkimstruct hio { 69238384Sjkim uint64_t hio_seq; 70280304Sjkim int hio_error; 71280304Sjkim struct nv *hio_nv; 72280304Sjkim void *hio_data; 73280304Sjkim uint8_t hio_cmd; 74280304Sjkim uint64_t hio_offset; 75280304Sjkim uint64_t hio_length; 76280304Sjkim TAILQ_ENTRY(hio) hio_next; 77280304Sjkim}; 78280304Sjkim 79280304Sjkimstatic struct hast_resource *gres; 80238384Sjkim 81238384Sjkim/* 82280304Sjkim * Free list holds unused structures. When free list is empty, we have to wait 83280304Sjkim * until some in-progress requests are freed. 84280304Sjkim */ 85280304Sjkimstatic TAILQ_HEAD(, hio) hio_free_list; 86280304Sjkimstatic pthread_mutex_t hio_free_list_lock; 87280304Sjkimstatic pthread_cond_t hio_free_list_cond; 88280304Sjkim/* 89280304Sjkim * Disk thread (the one that do I/O requests) takes requests from this list. 90280304Sjkim */ 91238384Sjkimstatic TAILQ_HEAD(, hio) hio_disk_list; 92280304Sjkimstatic pthread_mutex_t hio_disk_list_lock; 93280304Sjkimstatic pthread_cond_t hio_disk_list_cond; 94280304Sjkim/* 95238384Sjkim * There is one recv list for every component, although local components don't 96280304Sjkim * use recv lists as local requests are done synchronously. 97280304Sjkim */ 98280304Sjkimstatic TAILQ_HEAD(, hio) hio_send_list; 99238384Sjkimstatic pthread_mutex_t hio_send_list_lock; 100280304Sjkimstatic pthread_cond_t hio_send_list_cond; 101280304Sjkim 102280304Sjkim/* 103280304Sjkim * Maximum number of outstanding I/O requests. 104280304Sjkim */ 105280304Sjkim#define HAST_HIO_MAX 256 106280304Sjkim 107280304Sjkimstatic void *recv_thread(void *arg); 108280304Sjkimstatic void *disk_thread(void *arg); 109280304Sjkimstatic void *send_thread(void *arg); 110280304Sjkim 111280304Sjkim#define QUEUE_INSERT(name, hio) do { \ 112238384Sjkim bool _wakeup; \ 113238384Sjkim \ 114280304Sjkim mtx_lock(&hio_##name##_list_lock); \ 115280304Sjkim _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 116280304Sjkim TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 117280304Sjkim mtx_unlock(&hio_##name##_list_lock); \ 118280304Sjkim if (_wakeup) \ 119238384Sjkim cv_signal(&hio_##name##_list_cond); \ 120280304Sjkim} while (0) 121280304Sjkim#define QUEUE_TAKE(name, hio) do { \ 122280304Sjkim mtx_lock(&hio_##name##_list_lock); \ 123280304Sjkim while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 124280304Sjkim cv_wait(&hio_##name##_list_cond, \ 125280304Sjkim &hio_##name##_list_lock); \ 126280304Sjkim } \ 127280304Sjkim TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 128238384Sjkim mtx_unlock(&hio_##name##_list_lock); \ 129280304Sjkim} while (0) 130280304Sjkim 131280304Sjkimstatic void 132280304Sjkiminit_environment(void) 133238384Sjkim{ 134280304Sjkim struct hio *hio; 135238384Sjkim unsigned int ii; 136280304Sjkim 137280304Sjkim /* 138280304Sjkim * Initialize lists, their locks and theirs condition variables. 139280304Sjkim */ 140280304Sjkim TAILQ_INIT(&hio_free_list); 141238384Sjkim mtx_init(&hio_free_list_lock); 142238384Sjkim cv_init(&hio_free_list_cond); 143280304Sjkim TAILQ_INIT(&hio_disk_list); 144280304Sjkim mtx_init(&hio_disk_list_lock); 145280304Sjkim cv_init(&hio_disk_list_cond); 146280304Sjkim TAILQ_INIT(&hio_send_list); 147280304Sjkim mtx_init(&hio_send_list_lock); 148280304Sjkim cv_init(&hio_send_list_cond); 149238384Sjkim 150280304Sjkim /* 151280304Sjkim * Allocate requests pool and initialize requests. 152280304Sjkim */ 153280304Sjkim for (ii = 0; ii < HAST_HIO_MAX; ii++) { 154238384Sjkim hio = malloc(sizeof(*hio)); 155280304Sjkim if (hio == NULL) { 156238384Sjkim pjdlog_exitx(EX_TEMPFAIL, 157280304Sjkim "Unable to allocate memory (%zu bytes) for hio request.", 158280304Sjkim sizeof(*hio)); 159238384Sjkim } 160238384Sjkim hio->hio_error = 0; 161280304Sjkim hio->hio_data = malloc(MAXPHYS); 162280304Sjkim if (hio->hio_data == NULL) { 163280304Sjkim pjdlog_exitx(EX_TEMPFAIL, 164280304Sjkim "Unable to allocate memory (%zu bytes) for gctl_data.", 165280304Sjkim (size_t)MAXPHYS); 166280304Sjkim } 167280304Sjkim TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 168280304Sjkim } 169238384Sjkim} 170280304Sjkim 171280304Sjkimstatic void 172280304Sjkiminit_local(struct hast_resource *res) 173280304Sjkim{ 174280304Sjkim 175238384Sjkim if (metadata_read(res, true) < 0) 176280304Sjkim exit(EX_NOINPUT); 177280304Sjkim} 178280304Sjkim 179280304Sjkimstatic void 180280304Sjkiminit_remote(struct hast_resource *res, struct nv *nvin) 181280304Sjkim{ 182280304Sjkim uint64_t resuid; 183280304Sjkim struct nv *nvout; 184280304Sjkim unsigned char *map; 185238384Sjkim size_t mapsize; 186280304Sjkim 187280304Sjkim map = NULL; 188280304Sjkim mapsize = 0; 189280304Sjkim nvout = nv_alloc(); 190280304Sjkim nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 191280304Sjkim nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 192280304Sjkim resuid = nv_get_uint64(nvin, "resuid"); 193280304Sjkim res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 194280304Sjkim res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 195280304Sjkim nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 196280304Sjkim nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 197280304Sjkim mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 198280304Sjkim METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 199238384Sjkim map = malloc(mapsize); 200280304Sjkim if (map == NULL) { 201280304Sjkim pjdlog_exitx(EX_TEMPFAIL, 202280304Sjkim "Unable to allocate memory (%zu bytes) for activemap.", 203280304Sjkim mapsize); 204238384Sjkim } 205280304Sjkim nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 206280304Sjkim /* 207280304Sjkim * When we work as primary and secondary is missing we will increase 208280304Sjkim * localcnt in our metadata. When secondary is connected and synced 209280304Sjkim * we make localcnt be equal to remotecnt, which means nodes are more 210280304Sjkim * or less in sync. 211280304Sjkim * Split-brain condition is when both nodes are not able to communicate 212280304Sjkim * and are both configured as primary nodes. In turn, they can both 213280304Sjkim * make incompatible changes to the data and we have to detect that. 214280304Sjkim * Under split-brain condition we will increase our localcnt on first 215238384Sjkim * write and remote node will increase its localcnt on first write. 216280304Sjkim * When we connect we can see that primary's localcnt is greater than 217280304Sjkim * our remotecnt (primary was modified while we weren't watching) and 218280304Sjkim * our localcnt is greater than primary's remotecnt (we were modified 219280304Sjkim * while primary wasn't watching). 220280304Sjkim * There are many possible combinations which are all gathered below. 221280304Sjkim * Don't pay too much attention to exact numbers, the more important 222280304Sjkim * is to compare them. We compare secondary's local with primary's 223280304Sjkim * remote and secondary's remote with primary's local. 224280304Sjkim * Note that every case where primary's localcnt is smaller than 225280304Sjkim * secondary's remotecnt and where secondary's localcnt is smaller than 226280304Sjkim * primary's remotecnt should be impossible in practise. We will perform 227280304Sjkim * full synchronization then. Those cases are marked with an asterisk. 228280304Sjkim * Regular synchronization means that only extents marked as dirty are 229280304Sjkim * synchronized (regular synchronization). 230280304Sjkim * 231280304Sjkim * SECONDARY METADATA PRIMARY METADATA 232280304Sjkim * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 233280304Sjkim * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 234280304Sjkim * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 235280304Sjkim * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 236238384Sjkim * regular sync from secondary. 237238384Sjkim * local=3 remote=3 local=3 remote=3 Regular sync just in case. 238280304Sjkim * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 239280304Sjkim * local=3 remote=3 local=4 remote=2 Split-brain condition. 240280304Sjkim * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 241280304Sjkim * regular sync from primary. 242280304Sjkim * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 243280304Sjkim */ 244280304Sjkim if (res->hr_resuid == 0) { 245280304Sjkim /* 246280304Sjkim * Provider is used for the first time. Initialize everything. 247280304Sjkim */ 248280304Sjkim assert(res->hr_secondary_localcnt == 0); 249280304Sjkim res->hr_resuid = resuid; 250280304Sjkim if (metadata_write(res) < 0) 251280304Sjkim exit(EX_NOINPUT); 252280304Sjkim memset(map, 0xff, mapsize); 253280304Sjkim nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 254280304Sjkim } else if ( 255280304Sjkim /* Is primary is out-of-date? */ 256280304Sjkim (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 257280304Sjkim res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 258280304Sjkim /* Node are more or less in sync? */ 259238384Sjkim (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 260238384Sjkim res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 261280304Sjkim /* Is secondary is out-of-date? */ 262280304Sjkim (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 263280304Sjkim res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 264280304Sjkim /* 265280304Sjkim * Nodes are more or less in sync or one of the nodes is 266280304Sjkim * out-of-date. 267280304Sjkim * It doesn't matter at this point which one, we just have to 268280304Sjkim * send out local bitmap to the remote node. 269280304Sjkim */ 270280304Sjkim if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 271280304Sjkim (ssize_t)mapsize) { 272280304Sjkim pjdlog_exit(LOG_ERR, "Unable to read activemap"); 273280304Sjkim } 274280304Sjkim if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 275280304Sjkim res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 276238384Sjkim /* Primary is out-of-date, sync from secondary. */ 277280304Sjkim nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 278280304Sjkim } else { 279280304Sjkim /* 280280304Sjkim * Secondary is out-of-date or counts match. 281280304Sjkim * Sync from primary. 282280304Sjkim */ 283238384Sjkim nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 284280304Sjkim } 285280304Sjkim } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 286238384Sjkim res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 287280304Sjkim /* 288280304Sjkim * Not good, we have split-brain condition. 289238384Sjkim */ 290280304Sjkim pjdlog_error("Split-brain detected, exiting."); 291280304Sjkim nv_add_string(nvout, "Split-brain condition!", "errmsg"); 292238384Sjkim free(map); 293280304Sjkim map = NULL; 294280304Sjkim mapsize = 0; 295238384Sjkim } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 296280304Sjkim res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 297238384Sjkim /* 298280304Sjkim * This should never happen in practise, but we will perform 299238384Sjkim * full synchronization. 300280304Sjkim */ 301238384Sjkim assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 302280304Sjkim res->hr_primary_localcnt < res->hr_secondary_remotecnt); 303238384Sjkim mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 304280304Sjkim METADATA_SIZE, res->hr_extentsize, 305238384Sjkim res->hr_local_sectorsize); 306280304Sjkim memset(map, 0xff, mapsize); 307280304Sjkim if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 308280304Sjkim /* In this one of five cases sync from secondary. */ 309 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 310 } else { 311 /* For the rest four cases sync from primary. */ 312 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 313 } 314 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 315 (uintmax_t)res->hr_primary_localcnt, 316 (uintmax_t)res->hr_primary_remotecnt, 317 (uintmax_t)res->hr_secondary_localcnt, 318 (uintmax_t)res->hr_secondary_remotecnt); 319 } 320 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 321 pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s", 322 res->hr_remoteaddr); 323 nv_free(nvout); 324 exit(EX_TEMPFAIL); 325 } 326 if (map != NULL) 327 free(map); 328 nv_free(nvout); 329 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 330 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 331 /* Exit on split-brain. */ 332 event_send(res, EVENT_SPLITBRAIN); 333 exit(EX_CONFIG); 334 } 335} 336 337void 338hastd_secondary(struct hast_resource *res, struct nv *nvin) 339{ 340 sigset_t mask; 341 pthread_t td; 342 pid_t pid; 343 int error; 344 345 /* 346 * Create communication channel between parent and child. 347 */ 348 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 349 KEEP_ERRNO((void)pidfile_remove(pfh)); 350 pjdlog_exit(EX_OSERR, 351 "Unable to create control sockets between parent and child"); 352 } 353 /* 354 * Create communication channel between child and parent. 355 */ 356 if (proto_client("socketpair://", &res->hr_event) < 0) { 357 KEEP_ERRNO((void)pidfile_remove(pfh)); 358 pjdlog_exit(EX_OSERR, 359 "Unable to create event sockets between child and parent"); 360 } 361 362 pid = fork(); 363 if (pid < 0) { 364 KEEP_ERRNO((void)pidfile_remove(pfh)); 365 pjdlog_exit(EX_OSERR, "Unable to fork"); 366 } 367 368 if (pid > 0) { 369 /* This is parent. */ 370 proto_close(res->hr_remotein); 371 res->hr_remotein = NULL; 372 proto_close(res->hr_remoteout); 373 res->hr_remoteout = NULL; 374 /* Declare that we are receiver. */ 375 proto_recv(res->hr_event, NULL, 0); 376 res->hr_workerpid = pid; 377 return; 378 } 379 380 gres = res; 381 382 (void)pidfile_close(pfh); 383 hook_fini(); 384 385 setproctitle("%s (secondary)", res->hr_name); 386 387 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 388 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 389 390 /* Declare that we are sender. */ 391 proto_send(res->hr_event, NULL, 0); 392 393 /* Error in setting timeout is not critical, but why should it fail? */ 394 if (proto_timeout(res->hr_remotein, 0) < 0) 395 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 396 if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 397 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 398 399 init_local(res); 400 init_environment(); 401 402 /* 403 * Create the control thread before sending any event to the parent, 404 * as we can deadlock when parent sends control request to worker, 405 * but worker has no control thread started yet, so parent waits. 406 * In the meantime worker sends an event to the parent, but parent 407 * is unable to handle the event, because it waits for control 408 * request response. 409 */ 410 error = pthread_create(&td, NULL, ctrl_thread, res); 411 assert(error == 0); 412 413 init_remote(res, nvin); 414 event_send(res, EVENT_CONNECT); 415 416 error = pthread_create(&td, NULL, recv_thread, res); 417 assert(error == 0); 418 error = pthread_create(&td, NULL, disk_thread, res); 419 assert(error == 0); 420 (void)send_thread(res); 421} 422 423static void 424reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 425{ 426 char msg[1024]; 427 va_list ap; 428 int len; 429 430 va_start(ap, fmt); 431 len = vsnprintf(msg, sizeof(msg), fmt, ap); 432 va_end(ap); 433 if ((size_t)len < sizeof(msg)) { 434 switch (hio->hio_cmd) { 435 case HIO_READ: 436 (void)snprintf(msg + len, sizeof(msg) - len, 437 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 438 (uintmax_t)hio->hio_length); 439 break; 440 case HIO_DELETE: 441 (void)snprintf(msg + len, sizeof(msg) - len, 442 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 443 (uintmax_t)hio->hio_length); 444 break; 445 case HIO_FLUSH: 446 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 447 break; 448 case HIO_WRITE: 449 (void)snprintf(msg + len, sizeof(msg) - len, 450 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 451 (uintmax_t)hio->hio_length); 452 break; 453 case HIO_KEEPALIVE: 454 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 455 break; 456 default: 457 (void)snprintf(msg + len, sizeof(msg) - len, 458 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 459 break; 460 } 461 } 462 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 463} 464 465static int 466requnpack(struct hast_resource *res, struct hio *hio) 467{ 468 469 hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd"); 470 if (hio->hio_cmd == 0) { 471 pjdlog_error("Header contains no 'cmd' field."); 472 hio->hio_error = EINVAL; 473 goto end; 474 } 475 switch (hio->hio_cmd) { 476 case HIO_KEEPALIVE: 477 break; 478 case HIO_READ: 479 case HIO_WRITE: 480 case HIO_DELETE: 481 hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset"); 482 if (nv_error(hio->hio_nv) != 0) { 483 pjdlog_error("Header is missing 'offset' field."); 484 hio->hio_error = EINVAL; 485 goto end; 486 } 487 hio->hio_length = nv_get_uint64(hio->hio_nv, "length"); 488 if (nv_error(hio->hio_nv) != 0) { 489 pjdlog_error("Header is missing 'length' field."); 490 hio->hio_error = EINVAL; 491 goto end; 492 } 493 if (hio->hio_length == 0) { 494 pjdlog_error("Data length is zero."); 495 hio->hio_error = EINVAL; 496 goto end; 497 } 498 if (hio->hio_length > MAXPHYS) { 499 pjdlog_error("Data length is too large (%ju > %ju).", 500 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 501 hio->hio_error = EINVAL; 502 goto end; 503 } 504 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 505 pjdlog_error("Offset %ju is not multiple of sector size.", 506 (uintmax_t)hio->hio_offset); 507 hio->hio_error = EINVAL; 508 goto end; 509 } 510 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 511 pjdlog_error("Length %ju is not multiple of sector size.", 512 (uintmax_t)hio->hio_length); 513 hio->hio_error = EINVAL; 514 goto end; 515 } 516 if (hio->hio_offset + hio->hio_length > 517 (uint64_t)res->hr_datasize) { 518 pjdlog_error("Data offset is too large (%ju > %ju).", 519 (uintmax_t)(hio->hio_offset + hio->hio_length), 520 (uintmax_t)res->hr_datasize); 521 hio->hio_error = EINVAL; 522 goto end; 523 } 524 break; 525 default: 526 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 527 hio->hio_cmd); 528 hio->hio_error = EINVAL; 529 goto end; 530 } 531 hio->hio_error = 0; 532end: 533 return (hio->hio_error); 534} 535 536static __dead2 void 537secondary_exit(int exitcode, const char *fmt, ...) 538{ 539 va_list ap; 540 541 assert(exitcode != EX_OK); 542 va_start(ap, fmt); 543 pjdlogv_errno(LOG_ERR, fmt, ap); 544 va_end(ap); 545 event_send(gres, EVENT_DISCONNECT); 546 exit(exitcode); 547} 548 549/* 550 * Thread receives requests from the primary node. 551 */ 552static void * 553recv_thread(void *arg) 554{ 555 struct hast_resource *res = arg; 556 struct hio *hio; 557 558 for (;;) { 559 pjdlog_debug(2, "recv: Taking free request."); 560 QUEUE_TAKE(free, hio); 561 pjdlog_debug(2, "recv: (%p) Got request.", hio); 562 if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) { 563 secondary_exit(EX_TEMPFAIL, 564 "Unable to receive request header"); 565 } 566 if (requnpack(res, hio) != 0) { 567 pjdlog_debug(2, 568 "recv: (%p) Moving request to the send queue.", 569 hio); 570 QUEUE_INSERT(send, hio); 571 continue; 572 } 573 reqlog(LOG_DEBUG, 2, -1, hio, 574 "recv: (%p) Got request header: ", hio); 575 if (hio->hio_cmd == HIO_KEEPALIVE) { 576 pjdlog_debug(2, 577 "recv: (%p) Moving request to the free queue.", 578 hio); 579 nv_free(hio->hio_nv); 580 QUEUE_INSERT(free, hio); 581 continue; 582 } else if (hio->hio_cmd == HIO_WRITE) { 583 if (hast_proto_recv_data(res, res->hr_remotein, 584 hio->hio_nv, hio->hio_data, MAXPHYS) < 0) { 585 secondary_exit(EX_TEMPFAIL, 586 "Unable to receive request data"); 587 } 588 } 589 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 590 hio); 591 QUEUE_INSERT(disk, hio); 592 } 593 /* NOTREACHED */ 594 return (NULL); 595} 596 597/* 598 * Thread reads from or writes to local component and also handles DELETE and 599 * FLUSH requests. 600 */ 601static void * 602disk_thread(void *arg) 603{ 604 struct hast_resource *res = arg; 605 struct hio *hio; 606 ssize_t ret; 607 bool clear_activemap; 608 609 clear_activemap = true; 610 611 for (;;) { 612 pjdlog_debug(2, "disk: Taking request."); 613 QUEUE_TAKE(disk, hio); 614 while (clear_activemap) { 615 unsigned char *map; 616 size_t mapsize; 617 618 /* 619 * When first request is received, it means that primary 620 * already received our activemap, merged it and stored 621 * locally. We can now safely clear our activemap. 622 */ 623 mapsize = 624 activemap_calc_ondisk_size(res->hr_local_mediasize - 625 METADATA_SIZE, res->hr_extentsize, 626 res->hr_local_sectorsize); 627 map = calloc(1, mapsize); 628 if (map == NULL) { 629 pjdlog_warning("Unable to allocate memory to clear local activemap."); 630 break; 631 } 632 if (pwrite(res->hr_localfd, map, mapsize, 633 METADATA_SIZE) != (ssize_t)mapsize) { 634 pjdlog_errno(LOG_WARNING, 635 "Unable to store cleared activemap"); 636 free(map); 637 break; 638 } 639 free(map); 640 clear_activemap = false; 641 pjdlog_debug(1, "Local activemap cleared."); 642 } 643 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 644 /* Handle the actual request. */ 645 switch (hio->hio_cmd) { 646 case HIO_READ: 647 ret = pread(res->hr_localfd, hio->hio_data, 648 hio->hio_length, 649 hio->hio_offset + res->hr_localoff); 650 if (ret < 0) 651 hio->hio_error = errno; 652 else if (ret != (int64_t)hio->hio_length) 653 hio->hio_error = EIO; 654 else 655 hio->hio_error = 0; 656 break; 657 case HIO_WRITE: 658 ret = pwrite(res->hr_localfd, hio->hio_data, 659 hio->hio_length, 660 hio->hio_offset + res->hr_localoff); 661 if (ret < 0) 662 hio->hio_error = errno; 663 else if (ret != (int64_t)hio->hio_length) 664 hio->hio_error = EIO; 665 else 666 hio->hio_error = 0; 667 break; 668 case HIO_DELETE: 669 ret = g_delete(res->hr_localfd, 670 hio->hio_offset + res->hr_localoff, 671 hio->hio_length); 672 if (ret < 0) 673 hio->hio_error = errno; 674 else 675 hio->hio_error = 0; 676 break; 677 case HIO_FLUSH: 678 ret = g_flush(res->hr_localfd); 679 if (ret < 0) 680 hio->hio_error = errno; 681 else 682 hio->hio_error = 0; 683 break; 684 } 685 if (hio->hio_error != 0) { 686 reqlog(LOG_ERR, 0, hio->hio_error, hio, 687 "Request failed: "); 688 } 689 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 690 hio); 691 QUEUE_INSERT(send, hio); 692 } 693 /* NOTREACHED */ 694 return (NULL); 695} 696 697/* 698 * Thread sends requests back to primary node. 699 */ 700static void * 701send_thread(void *arg) 702{ 703 struct hast_resource *res = arg; 704 struct nv *nvout; 705 struct hio *hio; 706 void *data; 707 size_t length; 708 709 for (;;) { 710 pjdlog_debug(2, "send: Taking request."); 711 QUEUE_TAKE(send, hio); 712 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 713 nvout = nv_alloc(); 714 /* Copy sequence number. */ 715 nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq"); 716 switch (hio->hio_cmd) { 717 case HIO_READ: 718 if (hio->hio_error == 0) { 719 data = hio->hio_data; 720 length = hio->hio_length; 721 break; 722 } 723 /* 724 * We send no data in case of an error. 725 */ 726 /* FALLTHROUGH */ 727 case HIO_DELETE: 728 case HIO_FLUSH: 729 case HIO_WRITE: 730 data = NULL; 731 length = 0; 732 break; 733 default: 734 abort(); 735 break; 736 } 737 if (hio->hio_error != 0) 738 nv_add_int16(nvout, hio->hio_error, "error"); 739 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 740 length) < 0) { 741 secondary_exit(EX_TEMPFAIL, "Unable to send reply."); 742 } 743 nv_free(nvout); 744 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 745 hio); 746 nv_free(hio->hio_nv); 747 hio->hio_error = 0; 748 QUEUE_INSERT(free, hio); 749 } 750 /* NOTREACHED */ 751 return (NULL); 752} 753