sender.c revision 1.20
1/* $Id: sender.c,v 1.20 2019/03/23 16:04:28 deraadt Exp $ */ 2/* 3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17#include <sys/mman.h> 18#include <sys/queue.h> 19#include <sys/stat.h> 20 21#include <assert.h> 22#include <fcntl.h> 23#include <inttypes.h> 24#include <poll.h> 25#include <stdlib.h> 26#include <string.h> 27#include <unistd.h> 28 29#include <openssl/md4.h> 30 31#include "extern.h" 32 33/* 34 * A request from the receiver to download updated file data. 35 */ 36struct send_dl { 37 int32_t idx; /* index in our file list */ 38 struct blkset *blks; /* the sender's block information */ 39 TAILQ_ENTRY(send_dl) entries; 40}; 41 42/* 43 * The current file being "updated": sent from sender to receiver. 44 * If there is no file being uploaded, "cur" is NULL. 45 */ 46struct send_up { 47 struct send_dl *cur; /* file being updated or NULL */ 48 struct blkstat stat; /* status of file being updated */ 49}; 50 51TAILQ_HEAD(send_dlq, send_dl); 52 53/* 54 * We have finished updating the receiver's file with sender data. 55 * Deallocate and wipe clean all resources required for that. 56 */ 57static void 58send_up_reset(struct send_up *p) 59{ 60 61 assert(p != NULL); 62 63 /* Free the download request, if applicable. */ 64 65 if (p->cur != NULL) { 66 free(p->cur->blks); 67 free(p->cur); 68 p->cur = NULL; 69 } 70 71 /* If we mapped a file for scanning, unmap it and close. */ 72 73 if (p->stat.map != MAP_FAILED) 74 munmap(p->stat.map, p->stat.mapsz); 75 76 p->stat.map = MAP_FAILED; 77 p->stat.mapsz = 0; 78 79 if (p->stat.fd != -1) 80 close(p->stat.fd); 81 82 p->stat.fd = -1; 83 84 /* Now clear the in-transfer information. */ 85 86 p->stat.offs = 0; 87 p->stat.hint = 0; 88 p->stat.curst = BLKSTAT_NONE; 89} 90 91/* 92 * This is the bulk of the sender work. 93 * Here we tend to an output buffer that responds to receiver requests 94 * for data. 95 * This does not act upon the output descriptor itself so as to avoid 96 * blocking, which otherwise would deadlock the protocol. 97 * Returns zero on failure, non-zero on success. 98 */ 99static int 100send_up_fsm(struct sess *sess, size_t *phase, 101 struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax, 102 const struct flist *fl) 103{ 104 size_t pos = 0, isz = sizeof(int32_t), 105 dsz = MD4_DIGEST_LENGTH; 106 unsigned char fmd[MD4_DIGEST_LENGTH]; 107 off_t sz; 108 char buf[20]; 109 110 switch (up->stat.curst) { 111 case BLKSTAT_DATA: 112 /* 113 * A data segment to be written: buffer both the length 114 * and the data. 115 * If we've finished the transfer, move on to the token; 116 * otherwise, keep sending data. 117 */ 118 119 sz = MINIMUM(MAX_CHUNK, 120 up->stat.curlen - up->stat.curpos); 121 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { 122 ERRX1(sess, "io_lowbuffer_alloc"); 123 return 0; 124 } 125 io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz); 126 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) { 127 ERRX1(sess, "io_lowbuffer_alloc"); 128 return 0; 129 } 130 io_lowbuffer_buf(sess, *wb, &pos, *wbsz, 131 up->stat.map + up->stat.curpos, sz); 132 133 up->stat.curpos += sz; 134 if (up->stat.curpos == up->stat.curlen) 135 up->stat.curst = BLKSTAT_TOK; 136 return 1; 137 case BLKSTAT_TOK: 138 /* 139 * The data token following (maybe) a data segment. 140 * These can also come standalone if, say, the file's 141 * being fully written. 142 * It's followed by a hash or another data segment, 143 * depending on the token. 144 */ 145 146 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { 147 ERRX1(sess, "io_lowbuffer_alloc"); 148 return 0; 149 } 150 io_lowbuffer_int(sess, *wb, 151 &pos, *wbsz, up->stat.curtok); 152 up->stat.curst = up->stat.curtok ? 153 BLKSTAT_NEXT : BLKSTAT_HASH; 154 return 1; 155 case BLKSTAT_HASH: 156 /* 157 * The hash following transmission of all file contents. 158 * This is always followed by the state that we're 159 * finished with the file. 160 */ 161 162 hash_file(up->stat.map, up->stat.mapsz, fmd, sess); 163 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) { 164 ERRX1(sess, "io_lowbuffer_alloc"); 165 return 0; 166 } 167 io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz); 168 up->stat.curst = BLKSTAT_DONE; 169 return 1; 170 case BLKSTAT_DONE: 171 /* 172 * The data has been written. 173 * Clear our current send file and allow the block below 174 * to find another. 175 */ 176 177 if (!sess->opts->dry_run) 178 LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded", 179 fl[up->cur->idx].path, 180 (intmax_t)up->stat.total / 1024, 181 100.0 * up->stat.dirty / up->stat.total); 182 send_up_reset(up); 183 return 1; 184 case BLKSTAT_PHASE: 185 /* 186 * This is where we actually stop the algorithm: we're 187 * already at the second phase. 188 */ 189 190 send_up_reset(up); 191 (*phase)++; 192 return 1; 193 case BLKSTAT_NEXT: 194 /* 195 * Our last case: we need to find the 196 * next block (and token) to transmit to 197 * the receiver. 198 * These will drive the finite state 199 * machine in the first few conditional 200 * blocks of this set. 201 */ 202 203 assert(up->stat.fd != -1); 204 blk_match(sess, up->cur->blks, 205 fl[up->cur->idx].path, &up->stat); 206 return 1; 207 case BLKSTAT_NONE: 208 break; 209 } 210 211 assert(BLKSTAT_NONE == up->stat.curst); 212 213 /* 214 * We've either hit the phase change following the last file (or 215 * start, or prior phase change), or we need to prime the next 216 * file for transmission. 217 * We special-case dry-run mode. 218 */ 219 220 if (up->cur->idx < 0) { 221 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { 222 ERRX1(sess, "io_lowbuffer_alloc"); 223 return 0; 224 } 225 io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); 226 227 if (sess->opts->server && sess->rver > 27) { 228 if (!io_lowbuffer_alloc(sess, 229 wb, wbsz, wbmax, isz)) { 230 ERRX1(sess, "io_lowbuffer_alloc"); 231 return 0; 232 } 233 io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1); 234 } 235 up->stat.curst = BLKSTAT_PHASE; 236 } else if (sess->opts->dry_run) { 237 if (!sess->opts->server) 238 LOG1(sess, "%s", fl[up->cur->idx].wpath); 239 240 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) { 241 ERRX1(sess, "io_lowbuffer_alloc"); 242 return 0; 243 } 244 io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx); 245 up->stat.curst = BLKSTAT_DONE; 246 } else { 247 assert(up->stat.fd != -1); 248 249 /* 250 * FIXME: use the nice output of log_file() and so on in 251 * downloader.c, which means moving this into 252 * BLKSTAT_DONE instead of having it be here. 253 */ 254 255 if (!sess->opts->server) 256 LOG1(sess, "%s", fl[up->cur->idx].wpath); 257 258 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) { 259 ERRX1(sess, "io_lowbuffer_alloc"); 260 return 0; 261 } 262 assert(sizeof(buf) == 20); 263 blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx); 264 io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20); 265 266 LOG3(sess, "%s: primed for %jd B total", 267 fl[up->cur->idx].path, (intmax_t)up->cur->blks->size); 268 up->stat.curst = BLKSTAT_NEXT; 269 } 270 271 return 1; 272} 273 274/* 275 * Enqueue a download request, getting it off the read channel as 276 * quickly a possible. 277 * This frees up the read channel for further incoming requests. 278 * We'll handle each element in turn, up to and including the last 279 * request (phase change), which is always a -1 idx. 280 * Returns zero on failure, non-zero on success. 281 */ 282static int 283send_dl_enqueue(struct sess *sess, struct send_dlq *q, 284 int32_t idx, const struct flist *fl, size_t flsz, int fd) 285{ 286 struct send_dl *s; 287 288 /* End-of-phase marker. */ 289 290 if (idx == -1) { 291 if ((s = calloc(1, sizeof(struct send_dl))) == NULL) { 292 ERR(sess, "calloc"); 293 return 0; 294 } 295 s->idx = -1; 296 s->blks = NULL; 297 TAILQ_INSERT_TAIL(q, s, entries); 298 return 1; 299 } 300 301 /* Validate the index. */ 302 303 if (idx < 0 || (uint32_t)idx >= flsz) { 304 ERRX(sess, "file index out of bounds: invalid %d out of %zu", 305 idx, flsz); 306 return 0; 307 } else if (S_ISDIR(fl[idx].st.mode)) { 308 ERRX(sess, "blocks requested for " 309 "directory: %s", fl[idx].path); 310 return 0; 311 } else if (S_ISLNK(fl[idx].st.mode)) { 312 ERRX(sess, "blocks requested for " 313 "symlink: %s", fl[idx].path); 314 return 0; 315 } else if (!S_ISREG(fl[idx].st.mode)) { 316 ERRX(sess, "blocks requested for " 317 "special: %s", fl[idx].path); 318 return 0; 319 } 320 321 if ((s = calloc(1, sizeof(struct send_dl))) == NULL) { 322 ERR(sess, "callloc"); 323 return 0; 324 } 325 s->idx = idx; 326 s->blks = NULL; 327 TAILQ_INSERT_TAIL(q, s, entries); 328 329 /* 330 * This blocks til the full blockset has been read. 331 * That's ok, because the most important thing is getting data 332 * off the wire. 333 */ 334 335 if (!sess->opts->dry_run) { 336 s->blks = blk_recv(sess, fd, fl[idx].path); 337 if (s->blks == NULL) { 338 ERRX1(sess, "blk_recv"); 339 return 0; 340 } 341 } 342 return 1; 343} 344 345/* 346 * A client sender manages the read-only source files and sends data to 347 * the receiver as requested. 348 * First it sends its list of files, then it waits for the server to 349 * request updates to individual files. 350 * It queues requests for updates as soon as it receives them. 351 * Returns zero on failure, non-zero on success. 352 * 353 * Pledges: stdio, rpath, unveil. 354 */ 355int 356rsync_sender(struct sess *sess, int fdin, 357 int fdout, size_t argc, char **argv) 358{ 359 struct flist *fl = NULL; 360 const struct flist *f; 361 size_t i, flsz = 0, phase = 0, excl; 362 int rc = 0, c; 363 int32_t idx; 364 struct pollfd pfd[3]; 365 struct send_dlq sdlq; 366 struct send_dl *dl; 367 struct send_up up; 368 struct stat st; 369 void *wbuf = NULL; 370 size_t wbufpos = 0, wbufsz = 0, wbufmax = 0; 371 ssize_t ssz; 372 373 if (pledge("stdio getpw rpath unveil", NULL) == -1) { 374 ERR(sess, "pledge"); 375 return 0; 376 } 377 378 memset(&up, 0, sizeof(struct send_up)); 379 TAILQ_INIT(&sdlq); 380 up.stat.fd = -1; 381 up.stat.map = MAP_FAILED; 382 383 /* 384 * Generate the list of files we want to send from our 385 * command-line input. 386 * This will also remove all invalid files. 387 */ 388 389 if (!flist_gen(sess, argc, argv, &fl, &flsz)) { 390 ERRX1(sess, "flist_gen"); 391 goto out; 392 } 393 394 /* Client sends zero-length exclusions if deleting. */ 395 396 if (!sess->opts->server && sess->opts->del && 397 !io_write_int(sess, fdout, 0)) { 398 ERRX1(sess, "io_write_int"); 399 goto out; 400 } 401 402 /* 403 * Then the file list in any mode. 404 * Finally, the IO error (always zero for us). 405 */ 406 407 if (!flist_send(sess, fdin, fdout, fl, flsz)) { 408 ERRX1(sess, "flist_send"); 409 goto out; 410 } else if (!io_write_int(sess, fdout, 0)) { 411 ERRX1(sess, "io_write_int"); 412 goto out; 413 } 414 415 /* Exit if we're the server with zero files. */ 416 417 if (flsz == 0 && sess->opts->server) { 418 WARNX(sess, "sender has empty file list: exiting"); 419 rc = 1; 420 goto out; 421 } else if (!sess->opts->server) 422 LOG1(sess, "Transfer starting: %zu files", flsz); 423 424 /* 425 * If we're the server, read our exclusion list. 426 * This is always 0 for now. 427 */ 428 429 if (sess->opts->server) { 430 if (!io_read_size(sess, fdin, &excl)) { 431 ERRX1(sess, "io_read_size"); 432 goto out; 433 } else if (excl != 0) { 434 ERRX1(sess, "exclusion list is non-empty"); 435 goto out; 436 } 437 } 438 439 /* 440 * Set up our poll events. 441 * We start by polling only in receiver requests, enabling other 442 * poll events on demand. 443 */ 444 445 pfd[0].fd = fdin; /* from receiver */ 446 pfd[0].events = POLLIN; 447 pfd[1].fd = -1; /* to receiver */ 448 pfd[1].events = POLLOUT; 449 pfd[2].fd = -1; /* from local file */ 450 pfd[2].events = POLLIN; 451 452 for (;;) { 453 assert(pfd[0].fd != -1); 454 if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) { 455 ERR(sess, "poll"); 456 goto out; 457 } else if (c == 0) { 458 ERRX(sess, "poll: timeout"); 459 goto out; 460 } 461 for (i = 0; i < 3; i++) 462 if (pfd[i].revents & (POLLERR|POLLNVAL)) { 463 ERRX(sess, "poll: bad fd"); 464 goto out; 465 } else if (pfd[i].revents & POLLHUP) { 466 ERRX(sess, "poll: hangup"); 467 goto out; 468 } 469 470 /* 471 * If we have a request coming down off the wire, pull 472 * it in as quickly as possible into our buffer. 473 * Start by seeing if we have a log message. 474 * If we do, pop it off, then see if we have anything 475 * left and hit it again if so (read priority). 476 */ 477 478 if (sess->mplex_reads && (pfd[0].revents & POLLIN)) { 479 if (!io_read_flush(sess, fdin)) { 480 ERRX1(sess, "io_read_flush"); 481 goto out; 482 } else if (sess->mplex_read_remain == 0) { 483 c = io_read_check(sess, fdin); 484 if (c < 0) { 485 ERRX1(sess, "io_read_check"); 486 goto out; 487 } else if (c > 0) 488 continue; 489 pfd[0].revents &= ~POLLIN; 490 } 491 } 492 493 /* 494 * Now that we've handled the log messages, we're left 495 * here if we have any actual data coming down. 496 * Enqueue message requests, then loop again if we see 497 * more data (read priority). 498 */ 499 500 if (pfd[0].revents & POLLIN) { 501 if (!io_read_int(sess, fdin, &idx)) { 502 ERRX1(sess, "io_read_int"); 503 goto out; 504 } 505 if (!send_dl_enqueue(sess, 506 &sdlq, idx, fl, flsz, fdin)) { 507 ERRX1(sess, "send_dl_enqueue"); 508 goto out; 509 } 510 c = io_read_check(sess, fdin); 511 if (c < 0) { 512 ERRX1(sess, "io_read_check"); 513 goto out; 514 } else if (c > 0) 515 continue; 516 } 517 518 /* 519 * One of our local files has been opened in response 520 * to a receiver request and now we can map it. 521 * We'll respond to the event by looking at the map when 522 * the writer is available. 523 * Here we also enable the poll event for output. 524 */ 525 526 if (pfd[2].revents & POLLIN) { 527 assert(up.cur != NULL); 528 assert(up.stat.fd != -1); 529 assert(up.stat.map == MAP_FAILED); 530 assert(up.stat.mapsz == 0); 531 f = &fl[up.cur->idx]; 532 533 if (fstat(up.stat.fd, &st) == -1) { 534 ERR(sess, "%s: fstat", f->path); 535 goto out; 536 } 537 538 /* 539 * If the file is zero-length, the map will 540 * fail, but either way we want to unset that 541 * we're waiting for the file to open and set 542 * that we're ready for the output channel. 543 */ 544 545 if ((up.stat.mapsz = st.st_size) > 0) { 546 up.stat.map = mmap(NULL, 547 up.stat.mapsz, PROT_READ, 548 MAP_SHARED, up.stat.fd, 0); 549 if (up.stat.map == MAP_FAILED) { 550 ERR(sess, "%s: mmap", f->path); 551 goto out; 552 } 553 } 554 555 pfd[2].fd = -1; 556 pfd[1].fd = fdout; 557 } 558 559 /* 560 * If we have buffers waiting to write, write them out 561 * as soon as we can in a non-blocking fashion. 562 * We must not be waiting for any local files. 563 * ALL WRITES MUST HAPPEN HERE. 564 * This keeps the sender deadlock-free. 565 */ 566 567 if ((pfd[1].revents & POLLOUT) && wbufsz > 0) { 568 assert(pfd[2].fd == -1); 569 assert(wbufsz - wbufpos); 570 ssz = write(fdout, 571 wbuf + wbufpos, wbufsz - wbufpos); 572 if (ssz < 0) { 573 ERR(sess, "write"); 574 goto out; 575 } 576 wbufpos += ssz; 577 if (wbufpos == wbufsz) 578 wbufpos = wbufsz = 0; 579 pfd[1].revents &= ~POLLOUT; 580 581 /* This is usually in io.c... */ 582 583 sess->total_write += ssz; 584 } 585 586 /* 587 * Engage the FSM for the current transfer. 588 * If our phase changes, stop processing. 589 */ 590 591 if (pfd[1].revents & POLLOUT && up.cur != NULL) { 592 assert(pfd[2].fd == -1); 593 assert(wbufpos == 0 && wbufsz == 0); 594 if (!send_up_fsm(sess, &phase, 595 &up, &wbuf, &wbufsz, &wbufmax, fl)) { 596 ERRX1(sess, "send_up_fsm"); 597 goto out; 598 } else if (phase > 1) 599 break; 600 } 601 602 /* 603 * Incoming queue management. 604 * If we have no queue component that we're waiting on, 605 * then pull off the receiver-request queue and start 606 * processing the request. 607 */ 608 609 if (up.cur == NULL) { 610 assert(pfd[2].fd == -1); 611 assert(up.stat.fd == -1); 612 assert(up.stat.map == MAP_FAILED); 613 assert(up.stat.mapsz == 0); 614 assert(wbufsz == 0 && wbufpos == 0); 615 pfd[1].fd = -1; 616 617 /* 618 * If there's nothing in the queue, then keep 619 * the output channel disabled and wait for 620 * whatever comes next from the reader. 621 */ 622 623 if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL) 624 continue; 625 626 TAILQ_REMOVE(&sdlq, up.cur, entries); 627 628 /* 629 * End of phase: enable channel to receiver. 630 * We'll need our output buffer enabled in order 631 * to process this event. 632 */ 633 634 if (up.cur->idx == -1) { 635 pfd[1].fd = fdout; 636 continue; 637 } 638 639 /* 640 * Non-blocking open of file. 641 * This will be picked up in the state machine 642 * block of not being primed. 643 */ 644 645 up.stat.fd = open(fl[up.cur->idx].path, 646 O_RDONLY|O_NONBLOCK, 0); 647 if (up.stat.fd == -1) { 648 ERR(sess, "%s: open", fl[up.cur->idx].path); 649 goto out; 650 } 651 pfd[2].fd = up.stat.fd; 652 } 653 } 654 655 if (!TAILQ_EMPTY(&sdlq)) { 656 ERRX(sess, "phases complete with files still queued"); 657 goto out; 658 } 659 660 if (!sess_stats_send(sess, fdout)) { 661 ERRX1(sess, "sess_stats_end"); 662 goto out; 663 } 664 665 /* Final "goodbye" message. */ 666 667 if (!io_read_int(sess, fdin, &idx)) { 668 ERRX1(sess, "io_read_int"); 669 goto out; 670 } else if (idx != -1) { 671 ERRX(sess, "read incorrect update complete ack"); 672 goto out; 673 } 674 675 LOG2(sess, "sender finished updating"); 676 rc = 1; 677out: 678 send_up_reset(&up); 679 while ((dl = TAILQ_FIRST(&sdlq)) != NULL) { 680 TAILQ_REMOVE(&sdlq, dl, entries); 681 free(dl->blks); 682 free(dl); 683 } 684 flist_free(fl, flsz); 685 free(wbuf); 686 return rc; 687} 688