sender.c revision 1.9
1/* $Id: sender.c,v 1.9 2019/02/16 16:58:39 florian Exp $ */ 2/* 3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17#include <sys/mman.h> 18#include <sys/queue.h> 19#include <sys/stat.h> 20 21#include <assert.h> 22#include <fcntl.h> 23#include <inttypes.h> 24#include <poll.h> 25#include <stdlib.h> 26#include <string.h> 27#include <unistd.h> 28 29#include <openssl/md4.h> 30 31#include "extern.h" 32 33/* 34 * A request from the receiver to download updated file data. 35 */ 36struct send_dl { 37 int32_t idx; /* index in our file list */ 38 struct blkset *blks; /* the sender's block information */ 39 TAILQ_ENTRY(send_dl) entries; 40}; 41 42/* 43 * The current file being "updated": sent from sender to receiver. 44 * If there is no file being uploaded, "cur" is NULL. 45 */ 46struct send_up { 47 struct send_dl *cur; /* file being updated or NULL */ 48 struct blkstat stat; /* status of file being updated */ 49 int primed; /* blk_recv_ack() was called */ 50}; 51 52TAILQ_HEAD(send_dlq, send_dl); 53 54/* 55 * We have finished updating the receiver's file with sender data. 56 * Deallocate and wipe clean all resources required for that. 57 */ 58static void 59send_up_reset(struct send_up *p) 60{ 61 62 assert(NULL != p); 63 64 /* Free the download request, if applicable. */ 65 66 if (p->cur != NULL) { 67 free(p->cur->blks); 68 free(p->cur); 69 p->cur = NULL; 70 } 71 72 /* If we mapped a file for scanning, unmap it and close. */ 73 74 if (p->stat.map != MAP_FAILED) 75 munmap(p->stat.map, p->stat.mapsz); 76 77 p->stat.map = MAP_FAILED; 78 p->stat.mapsz = 0; 79 80 if (p->stat.fd != -1) 81 close(p->stat.fd); 82 83 p->stat.fd = -1; 84 85 /* Now clear the in-transfer information. */ 86 87 p->stat.offs = 0; 88 p->stat.hint = 0; 89 p->stat.curst = BLKSTAT_NONE; 90 p->primed = 0; 91} 92 93/* 94 * Enqueue a download request, getting it off the read channel as 95 * quickly a possible. 96 * This frees up the read channel for further incoming requests. 97 * We'll handle each element in turn, up to and including the last 98 * request (phase change), which is always a -1 idx. 99 * Returns zero on failure, non-zero on success. 100 */ 101static int 102send_dl_enqueue(struct sess *sess, struct send_dlq *q, 103 int32_t idx, const struct flist *fl, size_t flsz, int fd) 104{ 105 struct send_dl *s; 106 107 /* End-of-phase marker. */ 108 109 if (idx == -1) { 110 if ((s = calloc(1, sizeof(struct send_dl))) == NULL) { 111 ERR(sess, "calloc"); 112 return 0; 113 } 114 s->idx = -1; 115 s->blks = NULL; 116 TAILQ_INSERT_TAIL(q, s, entries); 117 return 1; 118 } 119 120 /* Validate the index. */ 121 122 if (idx < 0 || (uint32_t)idx >= flsz) { 123 ERRX(sess, "file index out of bounds: invalid %" 124 PRId32 " out of %zu", idx, flsz); 125 return 0; 126 } else if (S_ISDIR(fl[idx].st.mode)) { 127 ERRX(sess, "blocks requested for " 128 "directory: %s", fl[idx].path); 129 return 0; 130 } else if (S_ISLNK(fl[idx].st.mode)) { 131 ERRX(sess, "blocks requested for " 132 "symlink: %s", fl[idx].path); 133 return 0; 134 } else if (!S_ISREG(fl[idx].st.mode)) { 135 ERRX(sess, "blocks requested for " 136 "special: %s", fl[idx].path); 137 return 0; 138 } 139 140 if ((s = calloc(1, sizeof(struct send_dl))) == NULL) { 141 ERR(sess, "callloc"); 142 return 0; 143 } 144 s->idx = idx; 145 s->blks = NULL; 146 TAILQ_INSERT_TAIL(q, s, entries); 147 148 /* 149 * This blocks til the full blockset has been read. 150 * That's ok, because the most important thing is getting data 151 * off the wire. 152 */ 153 154 if (!sess->opts->dry_run) { 155 s->blks = blk_recv(sess, fd, fl[idx].path); 156 if (s->blks == NULL) { 157 ERRX1(sess, "blk_recv"); 158 return 0; 159 } 160 } 161 return 1; 162} 163 164/* 165 * A client sender manages the read-only source files and sends data to 166 * the receiver as requested. 167 * First it sends its list of files, then it waits for the server to 168 * request updates to individual files. 169 * It queues requests for updates as soon as it receives them. 170 * Returns zero on failure, non-zero on success. 171 * 172 * Pledges: stdio, rpath, unveil. 173 */ 174int 175rsync_sender(struct sess *sess, int fdin, 176 int fdout, size_t argc, char **argv) 177{ 178 struct flist *fl = NULL; 179 const struct flist *f; 180 size_t i, flsz = 0, phase = 0, excl; 181 off_t sz; 182 int rc = 0, c; 183 int32_t idx; 184 struct pollfd pfd[3]; 185 struct send_dlq sdlq; 186 struct send_dl *dl; 187 struct send_up up; 188 struct stat st; 189 unsigned char filemd[MD4_DIGEST_LENGTH]; 190 void *wbuf = NULL; 191 size_t wbufpos = 0, pos, wbufsz = 0, wbufmax = 0; 192 ssize_t ssz; 193 194 if (pledge("stdio getpw rpath unveil", NULL) == -1) { 195 ERR(sess, "pledge"); 196 return 0; 197 } 198 199 memset(&up, 0, sizeof(struct send_up)); 200 TAILQ_INIT(&sdlq); 201 up.stat.fd = -1; 202 up.stat.map = MAP_FAILED; 203 204 /* 205 * Generate the list of files we want to send from our 206 * command-line input. 207 * This will also remove all invalid files. 208 */ 209 210 if (!flist_gen(sess, argc, argv, &fl, &flsz)) { 211 ERRX1(sess, "flist_gen"); 212 goto out; 213 } 214 215 /* Client sends zero-length exclusions if deleting. */ 216 217 if (!sess->opts->server && sess->opts->del && 218 !io_write_int(sess, fdout, 0)) { 219 ERRX1(sess, "io_write_int"); 220 goto out; 221 } 222 223 /* 224 * Then the file list in any mode. 225 * Finally, the IO error (always zero for us). 226 */ 227 228 if (!flist_send(sess, fdin, fdout, fl, flsz)) { 229 ERRX1(sess, "flist_send"); 230 goto out; 231 } else if (!io_write_int(sess, fdout, 0)) { 232 ERRX1(sess, "io_write_int"); 233 goto out; 234 } 235 236 /* Exit if we're the server with zero files. */ 237 238 if (flsz == 0 && sess->opts->server) { 239 WARNX(sess, "sender has empty file list: exiting"); 240 rc = 1; 241 goto out; 242 } else if (!sess->opts->server) 243 LOG1(sess, "Transfer starting: %zu files", flsz); 244 245 /* 246 * If we're the server, read our exclusion list. 247 * This is always 0 for now. 248 */ 249 250 if (sess->opts->server) { 251 if (!io_read_size(sess, fdin, &excl)) { 252 ERRX1(sess, "io_read_size"); 253 goto out; 254 } else if (excl != 0) { 255 ERRX1(sess, "exclusion list is non-empty"); 256 goto out; 257 } 258 } 259 260 /* 261 * Set up our poll events. 262 * We start by polling only in receiver requests, enabling other 263 * poll events on demand. 264 */ 265 266 pfd[0].fd = fdin; /* from receiver */ 267 pfd[0].events = POLLIN; 268 pfd[1].fd = -1; /* to receiver */ 269 pfd[1].events = POLLOUT; 270 pfd[2].fd = -1; /* from local file */ 271 pfd[2].events = POLLIN; 272 273 for (;;) { 274 assert(pfd[0].fd != -1); 275 if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) { 276 ERR(sess, "poll"); 277 goto out; 278 } else if (c == 0) { 279 ERRX(sess, "poll: timeout"); 280 goto out; 281 } 282 for (i = 0; i < 3; i++) 283 if (pfd[i].revents & (POLLERR|POLLNVAL)) { 284 ERRX(sess, "poll: bad fd"); 285 goto out; 286 } else if (pfd[i].revents & POLLHUP) { 287 ERRX(sess, "poll: hangup"); 288 goto out; 289 } 290 291 /* 292 * If we have a request coming down off the wire, pull 293 * it in as quickly as possible into our buffer. 294 * This unclogs the socket buffers so the data can flow. 295 * FIXME: if we're multiplexing, we might stall here if 296 * there's only a log message and no actual data. 297 * This can be fixed by doing a conditional test. 298 */ 299 300 if (pfd[0].revents & POLLIN) 301 for (;;) { 302 if (!io_read_int(sess, fdin, &idx)) { 303 ERRX1(sess, "io_read_int"); 304 goto out; 305 } 306 if (!send_dl_enqueue(sess, 307 &sdlq, idx, fl, flsz, fdin)) { 308 ERRX1(sess, "send_dl_enqueue"); 309 goto out; 310 } 311 c = io_read_check(sess, fdin); 312 if (c < 0) { 313 ERRX1(sess, "io_read_check"); 314 goto out; 315 } else if (c == 0) 316 break; 317 } 318 319 /* 320 * One of our local files has been opened in response 321 * to a receiver request and now we can map it. 322 * We'll respond to the event by looking at the map when 323 * the writer is available. 324 * Here we also enable the poll event for output. 325 */ 326 327 if (pfd[2].revents & POLLIN) { 328 assert(up.cur != NULL); 329 assert(up.stat.fd != -1); 330 assert(up.stat.map == MAP_FAILED); 331 assert(up.stat.mapsz == 0); 332 f = &fl[up.cur->idx]; 333 334 if (fstat(up.stat.fd, &st) == -1) { 335 ERR(sess, "%s: fstat", f->path); 336 goto out; 337 } 338 339 /* 340 * If the file is zero-length, the map will 341 * fail, but either way we want to unset that 342 * we're waiting for the file to open and set 343 * that we're ready for the output channel. 344 */ 345 346 if ((up.stat.mapsz = st.st_size) > 0) { 347 up.stat.map = mmap(NULL, 348 up.stat.mapsz, PROT_READ, 349 MAP_SHARED, up.stat.fd, 0); 350 if (up.stat.map == MAP_FAILED) { 351 ERR(sess, "%s: mmap", f->path); 352 goto out; 353 } 354 } 355 356 pfd[2].fd = -1; 357 pfd[1].fd = fdout; 358 } 359 360 /* 361 * If we have buffers waiting to write, write them out 362 * as soon as we can in a non-blocking fashion. 363 * We must not be waiting for any local files. 364 * ALL WRITES MUST HAPPEN HERE. 365 * This keeps the sender deadlock-free. 366 */ 367 368 if ((pfd[1].revents & POLLOUT) && wbufsz > 0) { 369 assert(pfd[2].fd == -1); 370 assert(wbufsz - wbufpos); 371 ssz = write(fdout, 372 wbuf + wbufpos, wbufsz - wbufpos); 373 if (ssz < 0) { 374 ERR(sess, "write"); 375 goto out; 376 } 377 wbufpos += ssz; 378 if (wbufpos == wbufsz) 379 wbufpos = wbufsz = 0; 380 pfd[1].revents &= ~POLLOUT; 381 382 /* This is usually in io.c... */ 383 384 sess->total_write += ssz; 385 } 386 387 if (pfd[1].revents & POLLOUT) { 388 assert(pfd[2].fd == -1); 389 assert(0 == wbufpos && 0 == wbufsz); 390 391 /* 392 * If we have data to write, do it now according 393 * to the data finite state machine. 394 * If we receive an invalid index (-1), then 395 * we're either promoted to the second phase or 396 * it's time to exit, depending upon which phase 397 * we're in. 398 * Otherwise, we either start a transfer 399 * sequence (if not primed) or continue one. 400 */ 401 402 pos = 0; 403 if (BLKSTAT_DATA == up.stat.curst) { 404 /* 405 * A data segment to be written: buffer 406 * both the length and the data, then 407 * put is in the token phase. 408 */ 409 410 sz = MIN(MAX_CHUNK, 411 up.stat.curlen - up.stat.curpos); 412 if (!io_lowbuffer_alloc(sess, &wbuf, 413 &wbufsz, &wbufmax, sizeof(int32_t))) { 414 ERRX1(sess, "io_lowbuffer_alloc"); 415 goto out; 416 } 417 io_lowbuffer_int(sess, 418 wbuf, &pos, wbufsz, sz); 419 if (!io_lowbuffer_alloc(sess, &wbuf, 420 &wbufsz, &wbufmax, sz)) { 421 ERRX1(sess, "io_lowbuffer_alloc"); 422 goto out; 423 } 424 io_lowbuffer_buf(sess, wbuf, &pos, wbufsz, 425 up.stat.map + up.stat.curpos, sz); 426 up.stat.curpos += sz; 427 if (up.stat.curpos == up.stat.curlen) 428 up.stat.curst = BLKSTAT_TOK; 429 } else if (BLKSTAT_TOK == up.stat.curst) { 430 /* 431 * The data token following (maybe) a 432 * data segment. 433 * These can also come standalone if, 434 * say, the file's being fully written. 435 * It's followed by a hash or another 436 * data segment, depending on the token. 437 */ 438 439 if (!io_lowbuffer_alloc(sess, &wbuf, 440 &wbufsz, &wbufmax, sizeof(int32_t))) { 441 ERRX1(sess, "io_lowbuffer_alloc"); 442 goto out; 443 } 444 io_lowbuffer_int(sess, wbuf, 445 &pos, wbufsz, up.stat.curtok); 446 up.stat.curst = up.stat.curtok ? 447 BLKSTAT_NONE : BLKSTAT_HASH; 448 } else if (BLKSTAT_HASH == up.stat.curst) { 449 /* 450 * The hash following transmission of 451 * all file contents. 452 * This is always followed by the state 453 * that we're finished with the file. 454 */ 455 456 hash_file(up.stat.map, 457 up.stat.mapsz, filemd, sess); 458 if (!io_lowbuffer_alloc(sess, &wbuf, 459 &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) { 460 ERRX1(sess, "io_lowbuffer_alloc"); 461 goto out; 462 } 463 io_lowbuffer_buf(sess, wbuf, &pos, 464 wbufsz, filemd, MD4_DIGEST_LENGTH); 465 up.stat.curst = BLKSTAT_DONE; 466 } else if (BLKSTAT_DONE == up.stat.curst) { 467 /* 468 * The data has been written. 469 * Clear our current send file and allow 470 * the block below to find another. 471 */ 472 473 LOG3(sess, "%s: flushed %jd KB total, " 474 "%.2f%% uploaded", 475 fl[up.cur->idx].path, 476 (intmax_t)up.stat.total / 1024, 477 100.0 * up.stat.dirty / up.stat.total); 478 send_up_reset(&up); 479 } else if (NULL != up.cur && up.cur->idx < 0) { 480 /* 481 * We've hit the phase change following 482 * the last file (or start, or prior 483 * phase change). 484 * Simply acknowledge it. 485 * FIXME: use buffering. 486 */ 487 488 if (!io_write_int(sess, fdout, -1)) { 489 ERRX1(sess, "io_write_int"); 490 goto out; 491 } 492 if (sess->opts->server && sess->rver > 27 && 493 !io_write_int(sess, fdout, -1)) { 494 ERRX1(sess, "io_write_int"); 495 goto out; 496 } 497 send_up_reset(&up); 498 499 /* 500 * This is where we actually stop the 501 * algorithm: we're already at the 502 * second phase. 503 */ 504 505 if (phase++) 506 break; 507 } else if (NULL != up.cur && 0 == up.primed) { 508 /* 509 * We're getting ready to send the file 510 * contents to the receiver. 511 * FIXME: use buffering. 512 */ 513 514 if (!sess->opts->server) 515 LOG1(sess, "%s", fl[up.cur->idx].wpath); 516 517 /* Dry-running does nothing but a response. */ 518 519 if (sess->opts->dry_run && 520 !io_write_int(sess, fdout, up.cur->idx)) { 521 ERRX1(sess, "io_write_int"); 522 goto out; 523 } 524 525 /* Actually perform the block send. */ 526 527 assert(up.stat.fd != -1); 528 if (!blk_recv_ack(sess, fdout, 529 up.cur->blks, up.cur->idx)) { 530 ERRX1(sess, "blk_recv_ack"); 531 goto out; 532 } 533 LOG3(sess, "%s: primed for %jd B total", 534 fl[up.cur->idx].path, 535 (intmax_t)up.cur->blks->size); 536 up.primed = 1; 537 } else if (NULL != up.cur) { 538 /* 539 * Our last case: we need to find the 540 * next block (and token) to transmit to 541 * the receiver. 542 * These will drive the finite state 543 * machine in the first few conditional 544 * blocks of this set. 545 */ 546 547 assert(up.stat.fd != -1); 548 blk_match(sess, up.cur->blks, 549 fl[up.cur->idx].path, &up.stat); 550 } 551 } 552 553 /* 554 * Incoming queue management. 555 * If we have no queue component that we're waiting on, 556 * then pull off the receiver-request queue and start 557 * processing the request. 558 */ 559 560 if (up.cur == NULL) { 561 assert(pfd[2].fd == -1); 562 assert(up.stat.fd == -1); 563 assert(up.stat.map == MAP_FAILED); 564 assert(up.stat.mapsz == 0); 565 assert(wbufsz == 0 && wbufpos == 0); 566 pfd[1].fd = -1; 567 568 /* 569 * If there's nothing in the queue, then keep 570 * the output channel disabled and wait for 571 * whatever comes next from the reader. 572 */ 573 574 if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL) 575 continue; 576 577 TAILQ_REMOVE(&sdlq, up.cur, entries); 578 579 /* 580 * End of phase: enable channel to receiver. 581 * We'll need our output buffer enabled in order 582 * to process this event. 583 */ 584 585 if (up.cur->idx == -1) { 586 pfd[1].fd = fdout; 587 continue; 588 } 589 590 /* 591 * Non-blocking open of file. 592 * This will be picked up in the state machine 593 * block of not being primed. 594 */ 595 596 up.stat.fd = open(fl[up.cur->idx].path, 597 O_RDONLY|O_NONBLOCK, 0); 598 if (up.stat.fd == -1) { 599 ERR(sess, "%s: open", fl[up.cur->idx].path); 600 goto out; 601 } 602 pfd[2].fd = up.stat.fd; 603 } 604 } 605 606 if (!TAILQ_EMPTY(&sdlq)) { 607 ERRX(sess, "phases complete with files still queued"); 608 goto out; 609 } 610 611 if (!sess_stats_send(sess, fdout)) { 612 ERRX1(sess, "sess_stats_end"); 613 goto out; 614 } 615 616 /* Final "goodbye" message. */ 617 618 if (!io_read_int(sess, fdin, &idx)) { 619 ERRX1(sess, "io_read_int"); 620 goto out; 621 } else if (idx != -1) { 622 ERRX(sess, "read incorrect update complete ack"); 623 goto out; 624 } 625 626 LOG2(sess, "sender finished updating"); 627 rc = 1; 628out: 629 send_up_reset(&up); 630 while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { 631 free(dl->blks); 632 free(dl); 633 } 634 flist_free(fl, flsz); 635 free(wbuf); 636 return rc; 637} 638