secondary.c revision 209185
1/*- 2 * Copyright (c) 2009-2010 The FreeBSD Foundation 3 * All rights reserved. 4 * 5 * This software was developed by Pawel Jakub Dawidek under sponsorship from 6 * the FreeBSD Foundation. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27 * SUCH DAMAGE. 28 */ 29 30#include <sys/cdefs.h> 31__FBSDID("$FreeBSD: head/sbin/hastd/secondary.c 209185 2010-06-14 21:46:48Z pjd $"); 32 33#include <sys/param.h> 34#include <sys/time.h> 35#include <sys/bio.h> 36#include <sys/disk.h> 37#include <sys/stat.h> 38 39#include <assert.h> 40#include <err.h> 41#include <errno.h> 42#include <fcntl.h> 43#include <libgeom.h> 44#include <pthread.h> 45#include <stdint.h> 46#include <stdio.h> 47#include <string.h> 48#include <sysexits.h> 49#include <unistd.h> 50 51#include <activemap.h> 52#include <nv.h> 53#include <pjdlog.h> 54 55#include "control.h" 56#include "hast.h" 57#include "hast_proto.h" 58#include "hastd.h" 59#include "metadata.h" 60#include "proto.h" 61#include "subr.h" 62#include "synch.h" 63 64struct hio { 65 uint64_t hio_seq; 66 int hio_error; 67 struct nv *hio_nv; 68 void *hio_data; 69 uint8_t hio_cmd; 70 uint64_t hio_offset; 71 uint64_t hio_length; 72 TAILQ_ENTRY(hio) hio_next; 73}; 74 75/* 76 * Free list holds unused structures. When free list is empty, we have to wait 77 * until some in-progress requests are freed. 78 */ 79static TAILQ_HEAD(, hio) hio_free_list; 80static pthread_mutex_t hio_free_list_lock; 81static pthread_cond_t hio_free_list_cond; 82/* 83 * Disk thread (the one that do I/O requests) takes requests from this list. 84 */ 85static TAILQ_HEAD(, hio) hio_disk_list; 86static pthread_mutex_t hio_disk_list_lock; 87static pthread_cond_t hio_disk_list_cond; 88/* 89 * There is one recv list for every component, although local components don't 90 * use recv lists as local requests are done synchronously. 91 */ 92static TAILQ_HEAD(, hio) hio_send_list; 93static pthread_mutex_t hio_send_list_lock; 94static pthread_cond_t hio_send_list_cond; 95 96/* 97 * Maximum number of outstanding I/O requests. 98 */ 99#define HAST_HIO_MAX 256 100 101static void *recv_thread(void *arg); 102static void *disk_thread(void *arg); 103static void *send_thread(void *arg); 104 105static void 106init_environment(void) 107{ 108 struct hio *hio; 109 unsigned int ii; 110 111 /* 112 * Initialize lists, their locks and theirs condition variables. 113 */ 114 TAILQ_INIT(&hio_free_list); 115 mtx_init(&hio_free_list_lock); 116 cv_init(&hio_free_list_cond); 117 TAILQ_INIT(&hio_disk_list); 118 mtx_init(&hio_disk_list_lock); 119 cv_init(&hio_disk_list_cond); 120 TAILQ_INIT(&hio_send_list); 121 mtx_init(&hio_send_list_lock); 122 cv_init(&hio_send_list_cond); 123 124 /* 125 * Allocate requests pool and initialize requests. 126 */ 127 for (ii = 0; ii < HAST_HIO_MAX; ii++) { 128 hio = malloc(sizeof(*hio)); 129 if (hio == NULL) { 130 errx(EX_TEMPFAIL, "cannot allocate %zu bytes of memory " 131 "for hio request", sizeof(*hio)); 132 } 133 hio->hio_error = 0; 134 hio->hio_data = malloc(MAXPHYS); 135 if (hio->hio_data == NULL) { 136 errx(EX_TEMPFAIL, "cannot allocate %zu bytes of memory " 137 "for gctl_data", (size_t)MAXPHYS); 138 } 139 TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); 140 } 141} 142 143static void 144init_local(struct hast_resource *res) 145{ 146 147 if (metadata_read(res, true) < 0) 148 exit(EX_NOINPUT); 149} 150 151static void 152init_remote(struct hast_resource *res, struct nv *nvin) 153{ 154 uint64_t resuid; 155 struct nv *nvout; 156 unsigned char *map; 157 size_t mapsize; 158 159 map = NULL; 160 mapsize = 0; 161 nvout = nv_alloc(); 162 nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); 163 nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); 164 resuid = nv_get_uint64(nvin, "resuid"); 165 res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); 166 res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 167 nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); 168 nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); 169 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 170 METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); 171 map = malloc(mapsize); 172 if (map == NULL) { 173 pjdlog_exitx(EX_TEMPFAIL, 174 "Unable to allocate memory (%zu bytes) for activemap.", 175 mapsize); 176 } 177 nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); 178 /* 179 * When we work as primary and secondary is missing we will increase 180 * localcnt in our metadata. When secondary is connected and synced 181 * we make localcnt be equal to remotecnt, which means nodes are more 182 * or less in sync. 183 * Split-brain condition is when both nodes are not able to communicate 184 * and are both configured as primary nodes. In turn, they can both 185 * make incompatible changes to the data and we have to detect that. 186 * Under split-brain condition we will increase our localcnt on first 187 * write and remote node will increase its localcnt on first write. 188 * When we connect we can see that primary's localcnt is greater than 189 * our remotecnt (primary was modified while we weren't watching) and 190 * our localcnt is greater than primary's remotecnt (we were modified 191 * while primary wasn't watching). 192 * There are many possible combinations which are all gathered below. 193 * Don't pay too much attention to exact numbers, the more important 194 * is to compare them. We compare secondary's local with primary's 195 * remote and secondary's remote with primary's local. 196 * Note that every case where primary's localcnt is smaller than 197 * secondary's remotecnt and where secondary's localcnt is smaller than 198 * primary's remotecnt should be impossible in practise. We will perform 199 * full synchronization then. Those cases are marked with an asterisk. 200 * Regular synchronization means that only extents marked as dirty are 201 * synchronized (regular synchronization). 202 * 203 * SECONDARY METADATA PRIMARY METADATA 204 * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. 205 * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. 206 * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. 207 * local=3 remote=3 local=3 remote=2 Primary is out-of-date, 208 * regular sync from secondary. 209 * local=3 remote=3 local=3 remote=3 Regular sync just in case. 210 * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. 211 * local=3 remote=3 local=4 remote=2 Split-brain condition. 212 * local=3 remote=3 local=4 remote=3 Secondary out-of-date, 213 * regular sync from primary. 214 * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. 215 */ 216 if (res->hr_resuid == 0) { 217 /* 218 * Provider is used for the first time. Initialize everything. 219 */ 220 assert(res->hr_secondary_localcnt == 0); 221 res->hr_resuid = resuid; 222 if (metadata_write(res) < 0) 223 exit(EX_NOINPUT); 224 memset(map, 0xff, mapsize); 225 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 226 } else if ( 227 /* Is primary is out-of-date? */ 228 (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 229 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 230 /* Node are more or less in sync? */ 231 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 232 res->hr_secondary_remotecnt == res->hr_primary_localcnt) || 233 /* Is secondary is out-of-date? */ 234 (res->hr_secondary_localcnt == res->hr_primary_remotecnt && 235 res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { 236 /* 237 * Nodes are more or less in sync or one of the nodes is 238 * out-of-date. 239 * It doesn't matter at this point which one, we just have to 240 * send out local bitmap to the remote node. 241 */ 242 if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != 243 (ssize_t)mapsize) { 244 pjdlog_exit(LOG_ERR, "Unable to read activemap"); 245 } 246 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 247 res->hr_secondary_remotecnt == res->hr_primary_localcnt) { 248 /* Primary is out-of-date, sync from secondary. */ 249 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 250 } else { 251 /* 252 * Secondary is out-of-date or counts match. 253 * Sync from primary. 254 */ 255 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 256 } 257 } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 258 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 259 /* 260 * Not good, we have split-brain condition. 261 */ 262 pjdlog_error("Split-brain detected, exiting."); 263 nv_add_string(nvout, "Split-brain condition!", "errmsg"); 264 free(map); 265 map = NULL; 266 mapsize = 0; 267 } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || 268 res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { 269 /* 270 * This should never happen in practise, but we will perform 271 * full synchronization. 272 */ 273 assert(res->hr_secondary_localcnt < res->hr_primary_remotecnt || 274 res->hr_primary_localcnt < res->hr_secondary_remotecnt); 275 mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - 276 METADATA_SIZE, res->hr_extentsize, 277 res->hr_local_sectorsize); 278 memset(map, 0xff, mapsize); 279 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { 280 /* In this one of five cases sync from secondary. */ 281 nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); 282 } else { 283 /* For the rest four cases sync from primary. */ 284 nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); 285 } 286 pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", 287 (uintmax_t)res->hr_primary_localcnt, 288 (uintmax_t)res->hr_primary_remotecnt, 289 (uintmax_t)res->hr_secondary_localcnt, 290 (uintmax_t)res->hr_secondary_remotecnt); 291 } 292 if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) < 0) { 293 pjdlog_errno(LOG_WARNING, "Unable to send activemap to %s", 294 res->hr_remoteaddr); 295 nv_free(nvout); 296 exit(EX_TEMPFAIL); 297 } 298 nv_free(nvout); 299 if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && 300 res->hr_primary_localcnt > res->hr_secondary_remotecnt) { 301 /* Exit on split-brain. */ 302 exit(EX_CONFIG); 303 } 304} 305 306void 307hastd_secondary(struct hast_resource *res, struct nv *nvin) 308{ 309 pthread_t td; 310 pid_t pid; 311 int error; 312 313 /* 314 * Create communication channel between parent and child. 315 */ 316 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 317 KEEP_ERRNO((void)pidfile_remove(pfh)); 318 pjdlog_exit(EX_OSERR, 319 "Unable to create control sockets between parent and child"); 320 } 321 322 pid = fork(); 323 if (pid < 0) { 324 KEEP_ERRNO((void)pidfile_remove(pfh)); 325 pjdlog_exit(EX_OSERR, "Unable to fork"); 326 } 327 328 if (pid > 0) { 329 /* This is parent. */ 330 proto_close(res->hr_remotein); 331 res->hr_remotein = NULL; 332 proto_close(res->hr_remoteout); 333 res->hr_remoteout = NULL; 334 res->hr_workerpid = pid; 335 return; 336 } 337 (void)pidfile_close(pfh); 338 339 setproctitle("%s (secondary)", res->hr_name); 340 341 /* Error in setting timeout is not critical, but why should it fail? */ 342 if (proto_timeout(res->hr_remotein, 0) < 0) 343 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 344 if (proto_timeout(res->hr_remoteout, res->hr_timeout) < 0) 345 pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 346 347 init_local(res); 348 init_remote(res, nvin); 349 init_environment(); 350 351 error = pthread_create(&td, NULL, recv_thread, res); 352 assert(error == 0); 353 error = pthread_create(&td, NULL, disk_thread, res); 354 assert(error == 0); 355 error = pthread_create(&td, NULL, send_thread, res); 356 assert(error == 0); 357 (void)ctrl_thread(res); 358} 359 360static void 361reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) 362{ 363 char msg[1024]; 364 va_list ap; 365 int len; 366 367 va_start(ap, fmt); 368 len = vsnprintf(msg, sizeof(msg), fmt, ap); 369 va_end(ap); 370 if ((size_t)len < sizeof(msg)) { 371 switch (hio->hio_cmd) { 372 case HIO_READ: 373 (void)snprintf(msg + len, sizeof(msg) - len, 374 "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, 375 (uintmax_t)hio->hio_length); 376 break; 377 case HIO_DELETE: 378 (void)snprintf(msg + len, sizeof(msg) - len, 379 "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, 380 (uintmax_t)hio->hio_length); 381 break; 382 case HIO_FLUSH: 383 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 384 break; 385 case HIO_WRITE: 386 (void)snprintf(msg + len, sizeof(msg) - len, 387 "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, 388 (uintmax_t)hio->hio_length); 389 break; 390 default: 391 (void)snprintf(msg + len, sizeof(msg) - len, 392 "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); 393 break; 394 } 395 } 396 pjdlog_common(loglevel, debuglevel, error, "%s", msg); 397} 398 399static int 400requnpack(struct hast_resource *res, struct hio *hio) 401{ 402 403 hio->hio_cmd = nv_get_uint8(hio->hio_nv, "cmd"); 404 if (hio->hio_cmd == 0) { 405 pjdlog_error("Header contains no 'cmd' field."); 406 hio->hio_error = EINVAL; 407 goto end; 408 } 409 switch (hio->hio_cmd) { 410 case HIO_READ: 411 case HIO_WRITE: 412 case HIO_DELETE: 413 hio->hio_offset = nv_get_uint64(hio->hio_nv, "offset"); 414 if (nv_error(hio->hio_nv) != 0) { 415 pjdlog_error("Header is missing 'offset' field."); 416 hio->hio_error = EINVAL; 417 goto end; 418 } 419 hio->hio_length = nv_get_uint64(hio->hio_nv, "length"); 420 if (nv_error(hio->hio_nv) != 0) { 421 pjdlog_error("Header is missing 'length' field."); 422 hio->hio_error = EINVAL; 423 goto end; 424 } 425 if (hio->hio_length == 0) { 426 pjdlog_error("Data length is zero."); 427 hio->hio_error = EINVAL; 428 goto end; 429 } 430 if (hio->hio_length > MAXPHYS) { 431 pjdlog_error("Data length is too large (%ju > %ju).", 432 (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); 433 hio->hio_error = EINVAL; 434 goto end; 435 } 436 if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { 437 pjdlog_error("Offset %ju is not multiple of sector size.", 438 (uintmax_t)hio->hio_offset); 439 hio->hio_error = EINVAL; 440 goto end; 441 } 442 if ((hio->hio_length % res->hr_local_sectorsize) != 0) { 443 pjdlog_error("Length %ju is not multiple of sector size.", 444 (uintmax_t)hio->hio_length); 445 hio->hio_error = EINVAL; 446 goto end; 447 } 448 if (hio->hio_offset + hio->hio_length > 449 (uint64_t)res->hr_datasize) { 450 pjdlog_error("Data offset is too large (%ju > %ju).", 451 (uintmax_t)(hio->hio_offset + hio->hio_length), 452 (uintmax_t)res->hr_datasize); 453 hio->hio_error = EINVAL; 454 goto end; 455 } 456 break; 457 default: 458 pjdlog_error("Header contains invalid 'cmd' (%hhu).", 459 hio->hio_cmd); 460 hio->hio_error = EINVAL; 461 goto end; 462 } 463 hio->hio_error = 0; 464end: 465 return (hio->hio_error); 466} 467 468/* 469 * Thread receives requests from the primary node. 470 */ 471static void * 472recv_thread(void *arg) 473{ 474 struct hast_resource *res = arg; 475 struct hio *hio; 476 bool wakeup; 477 478 for (;;) { 479 pjdlog_debug(2, "recv: Taking free request."); 480 mtx_lock(&hio_free_list_lock); 481 while ((hio = TAILQ_FIRST(&hio_free_list)) == NULL) { 482 pjdlog_debug(2, "recv: No free requests, waiting."); 483 cv_wait(&hio_free_list_cond, &hio_free_list_lock); 484 } 485 TAILQ_REMOVE(&hio_free_list, hio, hio_next); 486 mtx_unlock(&hio_free_list_lock); 487 pjdlog_debug(2, "recv: (%p) Got request.", hio); 488 if (hast_proto_recv_hdr(res->hr_remotein, &hio->hio_nv) < 0) { 489 pjdlog_exit(EX_TEMPFAIL, 490 "Unable to receive request header"); 491 } 492 if (requnpack(res, hio) != 0) 493 goto send_queue; 494 reqlog(LOG_DEBUG, 2, -1, hio, 495 "recv: (%p) Got request header: ", hio); 496 if (hio->hio_cmd == HIO_WRITE) { 497 if (hast_proto_recv_data(res, res->hr_remotein, 498 hio->hio_nv, hio->hio_data, MAXPHYS) < 0) { 499 pjdlog_exit(EX_TEMPFAIL, 500 "Unable to receive reply data"); 501 } 502 } 503 pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", 504 hio); 505 mtx_lock(&hio_disk_list_lock); 506 wakeup = TAILQ_EMPTY(&hio_disk_list); 507 TAILQ_INSERT_TAIL(&hio_disk_list, hio, hio_next); 508 mtx_unlock(&hio_disk_list_lock); 509 if (wakeup) 510 cv_signal(&hio_disk_list_cond); 511 continue; 512send_queue: 513 pjdlog_debug(2, "recv: (%p) Moving request to the send queue.", 514 hio); 515 mtx_lock(&hio_send_list_lock); 516 wakeup = TAILQ_EMPTY(&hio_send_list); 517 TAILQ_INSERT_TAIL(&hio_send_list, hio, hio_next); 518 mtx_unlock(&hio_send_list_lock); 519 if (wakeup) 520 cv_signal(&hio_send_list_cond); 521 } 522 /* NOTREACHED */ 523 return (NULL); 524} 525 526/* 527 * Thread reads from or writes to local component and also handles DELETE and 528 * FLUSH requests. 529 */ 530static void * 531disk_thread(void *arg) 532{ 533 struct hast_resource *res = arg; 534 struct hio *hio; 535 ssize_t ret; 536 bool clear_activemap, wakeup; 537 538 clear_activemap = true; 539 540 for (;;) { 541 pjdlog_debug(2, "disk: Taking request."); 542 mtx_lock(&hio_disk_list_lock); 543 while ((hio = TAILQ_FIRST(&hio_disk_list)) == NULL) { 544 pjdlog_debug(2, "disk: No requests, waiting."); 545 cv_wait(&hio_disk_list_cond, &hio_disk_list_lock); 546 } 547 TAILQ_REMOVE(&hio_disk_list, hio, hio_next); 548 mtx_unlock(&hio_disk_list_lock); 549 while (clear_activemap) { 550 unsigned char *map; 551 size_t mapsize; 552 553 /* 554 * When first request is received, it means that primary 555 * already received our activemap, merged it and stored 556 * locally. We can now safely clear our activemap. 557 */ 558 mapsize = 559 activemap_calc_ondisk_size(res->hr_local_mediasize - 560 METADATA_SIZE, res->hr_extentsize, 561 res->hr_local_sectorsize); 562 map = calloc(1, mapsize); 563 if (map == NULL) { 564 pjdlog_warning("Unable to allocate memory to clear local activemap."); 565 break; 566 } 567 if (pwrite(res->hr_localfd, map, mapsize, 568 METADATA_SIZE) != (ssize_t)mapsize) { 569 pjdlog_errno(LOG_WARNING, 570 "Unable to store cleared activemap"); 571 free(map); 572 break; 573 } 574 free(map); 575 clear_activemap = false; 576 pjdlog_debug(1, "Local activemap cleared."); 577 } 578 reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); 579 /* Handle the actual request. */ 580 switch (hio->hio_cmd) { 581 case HIO_READ: 582 ret = pread(res->hr_localfd, hio->hio_data, 583 hio->hio_length, 584 hio->hio_offset + res->hr_localoff); 585 if (ret < 0) 586 hio->hio_error = errno; 587 else if (ret != (int64_t)hio->hio_length) 588 hio->hio_error = EIO; 589 else 590 hio->hio_error = 0; 591 break; 592 case HIO_WRITE: 593 ret = pwrite(res->hr_localfd, hio->hio_data, 594 hio->hio_length, 595 hio->hio_offset + res->hr_localoff); 596 if (ret < 0) 597 hio->hio_error = errno; 598 else if (ret != (int64_t)hio->hio_length) 599 hio->hio_error = EIO; 600 else 601 hio->hio_error = 0; 602 break; 603 case HIO_DELETE: 604 ret = g_delete(res->hr_localfd, 605 hio->hio_offset + res->hr_localoff, 606 hio->hio_length); 607 if (ret < 0) 608 hio->hio_error = errno; 609 else 610 hio->hio_error = 0; 611 break; 612 case HIO_FLUSH: 613 ret = g_flush(res->hr_localfd); 614 if (ret < 0) 615 hio->hio_error = errno; 616 else 617 hio->hio_error = 0; 618 break; 619 } 620 if (hio->hio_error != 0) { 621 reqlog(LOG_ERR, 0, hio->hio_error, hio, 622 "Request failed: "); 623 } 624 pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", 625 hio); 626 mtx_lock(&hio_send_list_lock); 627 wakeup = TAILQ_EMPTY(&hio_send_list); 628 TAILQ_INSERT_TAIL(&hio_send_list, hio, hio_next); 629 mtx_unlock(&hio_send_list_lock); 630 if (wakeup) 631 cv_signal(&hio_send_list_cond); 632 } 633 /* NOTREACHED */ 634 return (NULL); 635} 636 637/* 638 * Thread sends requests back to primary node. 639 */ 640static void * 641send_thread(void *arg) 642{ 643 struct hast_resource *res = arg; 644 struct nv *nvout; 645 struct hio *hio; 646 void *data; 647 size_t length; 648 bool wakeup; 649 650 for (;;) { 651 pjdlog_debug(2, "send: Taking request."); 652 mtx_lock(&hio_send_list_lock); 653 while ((hio = TAILQ_FIRST(&hio_send_list)) == NULL) { 654 pjdlog_debug(2, "send: No requests, waiting."); 655 cv_wait(&hio_send_list_cond, &hio_send_list_lock); 656 } 657 TAILQ_REMOVE(&hio_send_list, hio, hio_next); 658 mtx_unlock(&hio_send_list_lock); 659 reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); 660 nvout = nv_alloc(); 661 /* Copy sequence number. */ 662 nv_add_uint64(nvout, nv_get_uint64(hio->hio_nv, "seq"), "seq"); 663 switch (hio->hio_cmd) { 664 case HIO_READ: 665 if (hio->hio_error == 0) { 666 data = hio->hio_data; 667 length = hio->hio_length; 668 break; 669 } 670 /* 671 * We send no data in case of an error. 672 */ 673 /* FALLTHROUGH */ 674 case HIO_DELETE: 675 case HIO_FLUSH: 676 case HIO_WRITE: 677 data = NULL; 678 length = 0; 679 break; 680 default: 681 abort(); 682 break; 683 } 684 if (hio->hio_error != 0) 685 nv_add_int16(nvout, hio->hio_error, "error"); 686 if (hast_proto_send(res, res->hr_remoteout, nvout, data, 687 length) < 0) { 688 pjdlog_exit(EX_TEMPFAIL, "Unable to send reply."); 689 } 690 nv_free(nvout); 691 pjdlog_debug(2, "send: (%p) Moving request to the free queue.", 692 hio); 693 nv_free(hio->hio_nv); 694 hio->hio_error = 0; 695 mtx_lock(&hio_free_list_lock); 696 wakeup = TAILQ_EMPTY(&hio_free_list); 697 TAILQ_INSERT_TAIL(&hio_free_list, hio, hio_next); 698 mtx_unlock(&hio_free_list_lock); 699 if (wakeup) 700 cv_signal(&hio_free_list_cond); 701 } 702 /* NOTREACHED */ 703 return (NULL); 704} 705