secondary.c revision 214275
1139804Simp/*- 281645Sjasone * Copyright (c) 2009-2010 The FreeBSD Foundation 381645Sjasone * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 481645Sjasone * All rights reserved. 581645Sjasone * 681645Sjasone * This software was developed by Pawel Jakub Dawidek under sponsorship from 781645Sjasone * the FreeBSD Foundation. 881645Sjasone * 981645Sjasone * Redistribution and use in source and binary forms, with or without 1081645Sjasone * modification, are permitted provided that the following conditions 1181645Sjasone * are met: 1281645Sjasone * 1. Redistributions of source code must retain the above copyright 1381645Sjasone * notice, this list of conditions and the following disclaimer. 1481645Sjasone * 2. Redistributions in binary form must reproduce the above copyright 1581645Sjasone * notice, this list of conditions and the following disclaimer in the 1681645Sjasone * documentation and/or other materials provided with the distribution. 1781645Sjasone * 1881645Sjasone * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 1981645Sjasone * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 2081645Sjasone * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2181645Sjasone * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 2281645Sjasone * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2381645Sjasone * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2481645Sjasone * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2581645Sjasone * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2681645Sjasone * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2781645Sjasone * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2881645Sjasone * SUCH DAMAGE. 2981645Sjasone */ 3081645Sjasone 3181645Sjasone#include <sys/cdefs.h> 3281645Sjasone__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 214275 2010-10-24 15:42:16Z pjd $"); 3381645Sjasone 3481645Sjasone#include <sys/param.h> 3581645Sjasone#include <sys/time.h> 36116182Sobrien#include <sys/bio.h> 37116182Sobrien#include <sys/disk.h> 38116182Sobrien#include <sys/stat.h> 3981645Sjasone 4081645Sjasone#include <assert.h> 4181645Sjasone#include <err.h> 4281645Sjasone#include <errno.h> 4381645Sjasone#include <fcntl.h> 4481645Sjasone#include <libgeom.h> 4581645Sjasone#include <pthread.h> 4681645Sjasone#include <signal.h> 4781645Sjasone#include <stdint.h> 4881645Sjasone#include <stdio.h> 4981645Sjasone#include <string.h> 5081645Sjasone#include <sysexits.h> 5181645Sjasone#include <unistd.h> 5281645Sjasone 5381645Sjasone#include <activemap.h> 5493818Sjhb#include <nv.h> 5581645Sjasone#include <pjdlog.h> 5681645Sjasone 5781645Sjasone#include "control.h" 5881645Sjasone#include "event.h" 5987593Sobrien#include "hast.h" 6081645Sjasone#include "hast_proto.h" 6181645Sjasone#include "hastd.h" 6281645Sjasone#include "hooks.h" 6381645Sjasone#include "metadata.h" 6481645Sjasone#include "proto.h" 6581645Sjasone#include "subr.h" 6687593Sobrien#include "synch.h" 6781645Sjasone 6881645Sjasonestruct hio { 6981645Sjasone uint64_t hio_seq; 7081645Sjasone int hio_error; 7181645Sjasone struct nv *hio_nv; 7281645Sjasone void *hio_data; 7381645Sjasone uint8_t hio_cmd; 7481645Sjasone uint64_t hio_offset; 7581645Sjasone uint64_t hio_length; 7681645Sjasone TAILQ_ENTRY(hio) hio_next; 7781645Sjasone}; 7881645Sjasone 7981645Sjasonestatic struct hast_resource *gres; 8081645Sjasone 8181645Sjasone/* 8281645Sjasone * Free list holds unused structures. When free list is empty, we have to wait 8381645Sjasone * until some in-progress requests are freed. 8487593Sobrien */ 8581645Sjasonestatic TAILQ_HEAD(, hio) hio_free_list; 8681645Sjasonestatic pthread_mutex_t hio_free_list_lock; 8781645Sjasonestatic pthread_cond_t hio_free_list_cond; 8881645Sjasone/* 8981645Sjasone * Disk thread (the one that do I/O requests) takes requests from this list. 9081645Sjasone */ 9181645Sjasonestatic TAILQ_HEAD(, hio) hio_disk_list; 9281645Sjasonestatic pthread_mutex_t hio_disk_list_lock; 9381645Sjasonestatic pthread_cond_t hio_disk_list_cond; 9481645Sjasone/* 9581645Sjasone * There is one recv list for every component, although local components don't 9681645Sjasone * use recv lists as local requests are done synchronously. 9781645Sjasone */ 9881645Sjasonestatic TAILQ_HEAD(, hio) hio_send_list; 9981645Sjasonestatic pthread_mutex_t hio_send_list_lock; 10081645Sjasonestatic pthread_cond_t hio_send_list_cond; 10181645Sjasone 10287593Sobrien/* 10381645Sjasone * Maximum number of outstanding I/O requests. 10481645Sjasone */ 10581645Sjasone#define HAST_HIO_MAX 256 10681645Sjasone 10781645Sjasonestatic void *recv_thread(void *arg); 10881645Sjasonestatic void *disk_thread(void *arg); 10981645Sjasonestatic void *send_thread(void *arg); 11081645Sjasone 111130481Sjdp#define QUEUE_INSERT(name, hio) do { \ 11281645Sjasone bool _wakeup; \ 11381645Sjasone \ 11481645Sjasone mtx_lock(&hio_##name##_list_lock); \ 11581645Sjasone _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 11681645Sjasone TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ 11781645Sjasone mtx_unlock(&hio_##name##_list_lock); \ 11881645Sjasone if (_wakeup) \ 11981645Sjasone cv_signal(&hio_##name##_list_cond); \ 12081645Sjasone} while (0) 121130481Sjdp#define QUEUE_TAKE(name, hio) do { \ 12281645Sjasone mtx_lock(&hio_##name##_list_lock); \ 123130481Sjdp while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 12481645Sjasone cv_wait(&hio_##name##_list_cond, \ 12581645Sjasone &hio_##name##_list_lock); \ 12681645Sjasone } \ 12781645Sjasone TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ 12881645Sjasone mtx_unlock(&hio_##name##_list_lock); \ 129130481Sjdp} while (0) 13081645Sjasone 13187593Sobrienstatic void 13281645Sjasoneinit_environment(void) 13381645Sjasone{ 13487593Sobrien struct hio *hio; 13581645Sjasone unsigned int ii; 13681645Sjasone 13781645Sjasone /* 13881645Sjasone * Initialize lists, their locks and theirs condition variables. 139130481Sjdp */ 14081645Sjasone TAILQ_INIT(&hio_free_list); 14181645Sjasone mtx_init(&hio_free_list_lock); 14281645Sjasone cv_init(&hio_free_list_cond); 14381645Sjasone TAILQ_INIT(&hio_disk_list); 14481645Sjasone mtx_init(&hio_disk_list_lock); 14581645Sjasone cv_init(&hio_disk_list_cond); 14681645Sjasone TAILQ_INIT(&hio_send_list); 14781645Sjasone mtx_init(&hio_send_list_lock); 14881645Sjasone cv_init(&hio_send_list_cond); 14981645Sjasone 15081645Sjasone /* 15181645Sjasone * Allocate requests pool and initialize requests. 15281645Sjasone */ 15381645Sjasone for (ii = 0; ii < HAST_HIO_MAX; ii++) { 15487593Sobrien hio = malloc(sizeof(*hio)); 15581645Sjasone if (hio == NULL) { 15681645Sjasone pjdlog_exitx(EX_TEMPFAIL, 15781645Sjasone "Unable to allocate memory (%zu bytes) for hio request.", 15881645Sjasone sizeof(*hio)); 15987593Sobrien } 16081645Sjasone hio->hio_error = 0; 16181645Sjasone hio->hio_data = malloc(MAXPHYS); 16281645Sjasone if (hio->hio_data == NULL) { 16381645Sjasone pjdlog_exitx(EX_TEMPFAIL, 16481645Sjasone "Unable to allocate memory (%zu bytes) for gctl_data.", 16581645Sjasone (size_t)MAXPHYS); 16681645Sjasone } 16781645Sjasone TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 16881645Sjasone } 16981645Sjasone} 17081645Sjasone 17181645Sjasonestatic void 17281645Sjasoneinit_local(struct hast_resource *res) 17381645Sjasone{ 17481645Sjasone 17581645Sjasone if (metadata_read(res, true) < 0) 17681645Sjasone exit(EX_NOINPUT); 177} 178 179static void 180init_remote(struct hast_resource *res, struct nv *nvin) 181{ 182 uint64_t resuid; 183 struct nv *nvout; 184 unsigned char *map; 185 size_t mapsize; 186 187 map = NULL; 188 mapsize = 0; 189 nvout = nv_alloc(); 190 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 191 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 192 resuid = nv_get_uint64(nvin, "resuid"); 193 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 194 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 195 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 196 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 197 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 198 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 199 map = malloc(mapsize); 200 if (map == NULL) { 201 pjdlog_exitx(EX_TEMPFAIL, 202 "Unable to allocate memory (%zu bytes) for activemap.", 203 mapsize); 204 } 205 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 206 /* 207 * When we work as primary and secondary is missing we will increase 208 * localcnt in our metadata. When secondary is connected and synced 209 * we make localcnt be equal to remotecnt, which means nodes are more 210 * or less in sync. 211 * Split-brain condition is when both nodes are not able to communicate 212 * and are both configured as primary nodes. In turn, they can both 213 * make incompatible changes to the data and we have to detect that. 214 * Under split-brain condition we will increase our localcnt on first 215 * write and remote node will increase its localcnt on first write. 216 * When we connect we can see that primary's localcnt is greater than 217 * our remotecnt (primary was modified while we weren't watching) and 218 * our localcnt is greater than primary's remotecnt (we were modified 219 * while primary wasn't watching). 220 * There are many possible combinations which are all gathered below. 221 * Don't pay too much attention to exact numbers, the more important 222 * is to compare them. We compare secondary's local with primary's 223 * remote and secondary's remote with primary's local. 224 * Note that every case where primary's localcnt is smaller than 225 * secondary's remotecnt and where secondary's localcnt is smaller than 226 * primary's remotecnt should be impossible in practise. We will perform 227 * full synchronization then. Those cases are marked with an asterisk. 228 * Regular synchronization means that only extents marked as dirty are 229 * synchronized (regular synchronization). 230 * 231 * SECONDARY METADATA PRIMARY METADATA 232 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 233 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 234 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 235 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 236 * regular sync from secondary. 237 * local=3 remote=3 local=3 remote=3 Regular sync just in case. 238 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 239 * local=3 remote=3 local=4 remote=2 Split-brain condition. 240 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 241 * regular sync from primary. 242 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 243 */ 244 if (res->hr_resuid == 0) { 245 /* 246 * Provider is used for the first time. Initialize everything. 247 */ 248 assert(res->hr_secondary_localcnt == 0); 249 res->hr_resuid = resuid; 250 if (metadata_write(res) < 0) 251 exit(EX_NOINPUT); 252 memset(map, 0xff, mapsize); 253 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 254 } else if ( 255 /* Is primary is out-of-date? */ 256 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 257 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 258 /* Node are more or less in sync? */ 259 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 260 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 261 /* Is secondary is out-of-date? */ 262 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 263 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 264 /* 265 * Nodes are more or less in sync or one of the nodes is 266 * out-of-date. 267 * It doesn't matter at this point which one, we just have to 268 * send out local bitmap to the remote node. 269 */ 270 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 271 (ssize_t)mapsize) { 272 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 273 } 274 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 275 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 276 /* Primary is out-of-date, sync from secondary. */ 277 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 278 } else { 279 /* 280 * Secondary is out-of-date or counts match. 281 * Sync from primary. 282 */ 283 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 284 } 285 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 286 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 287 /* 288 * Not good, we have split-brain condition. 289 */ 290 pjdlog_error("Split-brain detected, exiting."); 291 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 292 free(map); 293 map = NULL; 294 mapsize = 0; 295 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 296 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 297 /* 298 * This should never happen in practise, but we will perform 299 * full synchronization. 300 */ 301 assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 302 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 303 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 304 METADATA_SIZE, res->hr_extentsize, 305 res->hr_local_sectorsize); 306 memset(map, 0xff, mapsize); 307 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 308 /* In this one of five cases sync from secondary. */ 309 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 310 } else { 311 /* For the rest four cases sync from primary. */ 312 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 313 } 314 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 315 (uintmax_t)res->hr_primary_localcnt, 316 (uintmax_t)res->hr_primary_remotecnt, 317 (uintmax_t)res->hr_secondary_localcnt, 318 (uintmax_t)res->hr_secondary_remotecnt); 319 } 320 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 321 pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s", 322 res->hr_remoteaddr); 323 nv_free(nvout); 324 exit(EX_TEMPFAIL); 325 } 326 if (map != NULL) 327 free(map); 328 nv_free(nvout); 329 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 330 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 331 /* Exit on split-brain. */ 332 event_send(res, EVENT_SPLITBRAIN); 333 exit(EX_CONFIG); 334 } 335} 336 337void 338hastd_secondary(struct hast_resource *res, struct nv *nvin) 339{ 340 sigset_t mask; 341 pthread_t td; 342 pid_t pid; 343 int error; 344 345 /* 346 * Create communication channel between parent and child. 347 */ 348 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 349 KEEP_ERRNO((void)pidfile_remove(pfh)); 350 pjdlog_exit(EX_OSERR, 351 "Unable to create control sockets between parent and child"); 352 } 353 /* 354 * Create communication channel between child and parent. 355 */ 356 if (proto_client("socketpair://", &res->hr_event) < 0) { 357 KEEP_ERRNO((void)pidfile_remove(pfh)); 358 pjdlog_exit(EX_OSERR, 359 "Unable to create event sockets between child and parent"); 360 } 361 362 pid = fork(); 363 if (pid < 0) { 364 KEEP_ERRNO((void)pidfile_remove(pfh)); 365 pjdlog_exit(EX_OSERR, "Unable to fork"); 366 } 367 368 if (pid > 0) { 369 /* This is parent. */ 370 proto_close(res->hr_remotein); 371 res->hr_remotein = NULL; 372 proto_close(res->hr_remoteout); 373 res->hr_remoteout = NULL; 374 /* Declare that we are receiver. */ 375 proto_recv(res->hr_event, NULL, 0); 376 res->hr_workerpid = pid; 377 return; 378 } 379 380 gres = res; 381 382 (void)pidfile_close(pfh); 383 hook_fini(); 384 385 setproctitle("%s (secondary)", res->hr_name); 386 387 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 388 PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); 389 390 /* Declare that we are sender. */ 391 proto_send(res->hr_event, NULL, 0); 392 393 /* Error in setting timeout is not critical, but why should it fail? */ 394 if (proto_timeout(res->hr_remotein, 0) < 0) 395 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 396 if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 397 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 398 399 init_local(res); 400 init_environment(); 401 402 /* 403 * Create the control thread before sending any event to the parent, 404 * as we can deadlock when parent sends control request to worker, 405 * but worker has no control thread started yet, so parent waits. 406 * In the meantime worker sends an event to the parent, but parent 407 * is unable to handle the event, because it waits for control 408 * request response. 409 */ 410 error = pthread_create(&td, NULL, ctrl_thread, res); 411 assert(error == 0); 412 413 init_remote(res, nvin); 414 event_send(res, EVENT_CONNECT); 415 416 error = pthread_create(&td, NULL, recv_thread, res); 417 assert(error == 0); 418 error = pthread_create(&td, NULL, disk_thread, res); 419 assert(error == 0); 420 (void)send_thread(res); 421} 422 423static void 424reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 425{ 426 char msg[1024]; 427 va_list ap; 428 int len; 429 430 va_start(ap, fmt); 431 len = vsnprintf(msg, sizeof(msg), fmt, ap); 432 va_end(ap); 433 if ((size_t)len < sizeof(msg)) { 434 switch (hio->hio_cmd) { 435 case HIO_READ: 436 (void)snprintf(msg + len, sizeof(msg) - len, 437 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 438 (uintmax_t)hio->hio_length); 439 break; 440 case HIO_DELETE: 441 (void)snprintf(msg + len, sizeof(msg) - len, 442 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 443 (uintmax_t)hio->hio_length); 444 break; 445 case HIO_FLUSH: 446 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 447 break; 448 case HIO_WRITE: 449 (void)snprintf(msg + len, sizeof(msg) - len, 450 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 451 (uintmax_t)hio->hio_length); 452 break; 453 case HIO_KEEPALIVE: 454 (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); 455 break; 456 default: 457 (void)snprintf(msg + len, sizeof(msg) - len, 458 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 459 break; 460 } 461 } 462 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 463} 464 465static int 466requnpack(struct hast_resource *res, struct hio *hio) 467{ 468 469 hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd"); 470 if (hio->hio_cmd == 0) { 471 pjdlog_error("Header contains no 'cmd' field."); 472 hio->hio_error = EINVAL; 473 goto end; 474 } 475 switch (hio->hio_cmd) { 476 case HIO_KEEPALIVE: 477 break; 478 case HIO_READ: 479 case HIO_WRITE: 480 case HIO_DELETE: 481 hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset"); 482 if (nv_error(hio->hio_nv) != 0) { 483 pjdlog_error("Header is missing 'offset' field."); 484 hio->hio_error = EINVAL; 485 goto end; 486 } 487 hio->hio_length = nv_get_uint64(hio->hio_nv, "length"); 488 if (nv_error(hio->hio_nv) != 0) { 489 pjdlog_error("Header is missing 'length' field."); 490 hio->hio_error = EINVAL; 491 goto end; 492 } 493 if (hio->hio_length == 0) { 494 pjdlog_error("Data length is zero."); 495 hio->hio_error = EINVAL; 496 goto end; 497 } 498 if (hio->hio_length > MAXPHYS) { 499 pjdlog_error("Data length is too large (%ju > %ju).", 500 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 501 hio->hio_error = EINVAL; 502 goto end; 503 } 504 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 505 pjdlog_error("Offset %ju is not multiple of sector size.", 506 (uintmax_t)hio->hio_offset); 507 hio->hio_error = EINVAL; 508 goto end; 509 } 510 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 511 pjdlog_error("Length %ju is not multiple of sector size.", 512 (uintmax_t)hio->hio_length); 513 hio->hio_error = EINVAL; 514 goto end; 515 } 516 if (hio->hio_offset + hio->hio_length > 517 (uint64_t)res->hr_datasize) { 518 pjdlog_error("Data offset is too large (%ju > %ju).", 519 (uintmax_t)(hio->hio_offset + hio->hio_length), 520 (uintmax_t)res->hr_datasize); 521 hio->hio_error = EINVAL; 522 goto end; 523 } 524 break; 525 default: 526 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 527 hio->hio_cmd); 528 hio->hio_error = EINVAL; 529 goto end; 530 } 531 hio->hio_error = 0; 532end: 533 return (hio->hio_error); 534} 535 536static __dead2 void 537secondary_exit(int exitcode, const char *fmt, ...) 538{ 539 va_list ap; 540 541 assert(exitcode != EX_OK); 542 va_start(ap, fmt); 543 pjdlogv_errno(LOG_ERR, fmt, ap); 544 va_end(ap); 545 event_send(gres, EVENT_DISCONNECT); 546 exit(exitcode); 547} 548 549/* 550 * Thread receives requests from the primary node. 551 */ 552static void * 553recv_thread(void *arg) 554{ 555 struct hast_resource *res = arg; 556 struct hio *hio; 557 558 for (;;) { 559 pjdlog_debug(2, "recv: Taking free request."); 560 QUEUE_TAKE(free, hio); 561 pjdlog_debug(2, "recv: (%p) Got request.", hio); 562 if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) { 563 secondary_exit(EX_TEMPFAIL, 564 "Unable to receive request header"); 565 } 566 if (requnpack(res, hio) != 0) { 567 pjdlog_debug(2, 568 "recv: (%p) Moving request to the send queue.", 569 hio); 570 QUEUE_INSERT(send, hio); 571 continue; 572 } 573 reqlog(LOG_DEBUG, 2, -1, hio, 574 "recv: (%p) Got request header: ", hio); 575 if (hio->hio_cmd == HIO_KEEPALIVE) { 576 pjdlog_debug(2, 577 "recv: (%p) Moving request to the free queue.", 578 hio); 579 nv_free(hio->hio_nv); 580 QUEUE_INSERT(free, hio); 581 continue; 582 } else if (hio->hio_cmd == HIO_WRITE) { 583 if (hast_proto_recv_data(res, res->hr_remotein, 584 hio->hio_nv, hio->hio_data, MAXPHYS) < 0) { 585 secondary_exit(EX_TEMPFAIL, 586 "Unable to receive request data"); 587 } 588 } 589 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 590 hio); 591 QUEUE_INSERT(disk, hio); 592 } 593 /* NOTREACHED */ 594 return (NULL); 595} 596 597/* 598 * Thread reads from or writes to local component and also handles DELETE and 599 * FLUSH requests. 600 */ 601static void * 602disk_thread(void *arg) 603{ 604 struct hast_resource *res = arg; 605 struct hio *hio; 606 ssize_t ret; 607 bool clear_activemap; 608 609 clear_activemap = true; 610 611 for (;;) { 612 pjdlog_debug(2, "disk: Taking request."); 613 QUEUE_TAKE(disk, hio); 614 while (clear_activemap) { 615 unsigned char *map; 616 size_t mapsize; 617 618 /* 619 * When first request is received, it means that primary 620 * already received our activemap, merged it and stored 621 * locally. We can now safely clear our activemap. 622 */ 623 mapsize = 624 activemap_calc_ondisk_size(res->hr_local_mediasize - 625 METADATA_SIZE, res->hr_extentsize, 626 res->hr_local_sectorsize); 627 map = calloc(1, mapsize); 628 if (map == NULL) { 629 pjdlog_warning("Unable to allocate memory to clear local activemap."); 630 break; 631 } 632 if (pwrite(res->hr_localfd, map, mapsize, 633 METADATA_SIZE) != (ssize_t)mapsize) { 634 pjdlog_errno(LOG_WARNING, 635 "Unable to store cleared activemap"); 636 free(map); 637 break; 638 } 639 free(map); 640 clear_activemap = false; 641 pjdlog_debug(1, "Local activemap cleared."); 642 } 643 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 644 /* Handle the actual request. */ 645 switch (hio->hio_cmd) { 646 case HIO_READ: 647 ret = pread(res->hr_localfd, hio->hio_data, 648 hio->hio_length, 649 hio->hio_offset + res->hr_localoff); 650 if (ret < 0) 651 hio->hio_error = errno; 652 else if (ret != (int64_t)hio->hio_length) 653 hio->hio_error = EIO; 654 else 655 hio->hio_error = 0; 656 break; 657 case HIO_WRITE: 658 ret = pwrite(res->hr_localfd, hio->hio_data, 659 hio->hio_length, 660 hio->hio_offset + res->hr_localoff); 661 if (ret < 0) 662 hio->hio_error = errno; 663 else if (ret != (int64_t)hio->hio_length) 664 hio->hio_error = EIO; 665 else 666 hio->hio_error = 0; 667 break; 668 case HIO_DELETE: 669 ret = g_delete(res->hr_localfd, 670 hio->hio_offset + res->hr_localoff, 671 hio->hio_length); 672 if (ret < 0) 673 hio->hio_error = errno; 674 else 675 hio->hio_error = 0; 676 break; 677 case HIO_FLUSH: 678 ret = g_flush(res->hr_localfd); 679 if (ret < 0) 680 hio->hio_error = errno; 681 else 682 hio->hio_error = 0; 683 break; 684 } 685 if (hio->hio_error != 0) { 686 reqlog(LOG_ERR, 0, hio->hio_error, hio, 687 "Request failed: "); 688 } 689 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 690 hio); 691 QUEUE_INSERT(send, hio); 692 } 693 /* NOTREACHED */ 694 return (NULL); 695} 696 697/* 698 * Thread sends requests back to primary node. 699 */ 700static void * 701send_thread(void *arg) 702{ 703 struct hast_resource *res = arg; 704 struct nv *nvout; 705 struct hio *hio; 706 void *data; 707 size_t length; 708 709 for (;;) { 710 pjdlog_debug(2, "send: Taking request."); 711 QUEUE_TAKE(send, hio); 712 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 713 nvout = nv_alloc(); 714 /* Copy sequence number. */ 715 nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq"); 716 switch (hio->hio_cmd) { 717 case HIO_READ: 718 if (hio->hio_error == 0) { 719 data = hio->hio_data; 720 length = hio->hio_length; 721 break; 722 } 723 /* 724 * We send no data in case of an error. 725 */ 726 /* FALLTHROUGH */ 727 case HIO_DELETE: 728 case HIO_FLUSH: 729 case HIO_WRITE: 730 data = NULL; 731 length = 0; 732 break; 733 default: 734 abort(); 735 break; 736 } 737 if (hio->hio_error != 0) 738 nv_add_int16(nvout, hio->hio_error, "error"); 739 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 740 length) < 0) { 741 secondary_exit(EX_TEMPFAIL, "Unable to send reply."); 742 } 743 nv_free(nvout); 744 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 745 hio); 746 nv_free(hio->hio_nv); 747 hio->hio_error = 0; 748 QUEUE_INSERT(free, hio); 749 } 750 /* NOTREACHED */ 751 return (NULL); 752} 753