sender.c revision 1.20
1/*	$Id: sender.c,v 1.20 2019/03/23 16:04:28 deraadt Exp $ */
2/*
3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4 *
5 * Permission to use, copy, modify, and distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
8 *
9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 */
17#include <sys/mman.h>
18#include <sys/queue.h>
19#include <sys/stat.h>
20
21#include <assert.h>
22#include <fcntl.h>
23#include <inttypes.h>
24#include <poll.h>
25#include <stdlib.h>
26#include <string.h>
27#include <unistd.h>
28
29#include <openssl/md4.h>
30
31#include "extern.h"
32
33/*
34 * A request from the receiver to download updated file data.
35 */
36struct	send_dl {
37	int32_t		     idx; /* index in our file list */
38	struct blkset	    *blks; /* the sender's block information */
39	TAILQ_ENTRY(send_dl) entries;
40};
41
42/*
43 * The current file being "updated": sent from sender to receiver.
44 * If there is no file being uploaded, "cur" is NULL.
45 */
46struct	send_up {
47	struct send_dl	*cur; /* file being updated or NULL */
48	struct blkstat	 stat; /* status of file being updated */
49};
50
51TAILQ_HEAD(send_dlq, send_dl);
52
53/*
54 * We have finished updating the receiver's file with sender data.
55 * Deallocate and wipe clean all resources required for that.
56 */
57static void
58send_up_reset(struct send_up *p)
59{
60
61	assert(p != NULL);
62
63	/* Free the download request, if applicable. */
64
65	if (p->cur != NULL) {
66		free(p->cur->blks);
67		free(p->cur);
68		p->cur = NULL;
69	}
70
71	/* If we mapped a file for scanning, unmap it and close. */
72
73	if (p->stat.map != MAP_FAILED)
74		munmap(p->stat.map, p->stat.mapsz);
75
76	p->stat.map = MAP_FAILED;
77	p->stat.mapsz = 0;
78
79	if (p->stat.fd != -1)
80		close(p->stat.fd);
81
82	p->stat.fd = -1;
83
84	/* Now clear the in-transfer information. */
85
86	p->stat.offs = 0;
87	p->stat.hint = 0;
88	p->stat.curst = BLKSTAT_NONE;
89}
90
91/*
92 * This is the bulk of the sender work.
93 * Here we tend to an output buffer that responds to receiver requests
94 * for data.
95 * This does not act upon the output descriptor itself so as to avoid
96 * blocking, which otherwise would deadlock the protocol.
97 * Returns zero on failure, non-zero on success.
98 */
99static int
100send_up_fsm(struct sess *sess, size_t *phase,
101	struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,
102	const struct flist *fl)
103{
104	size_t		 pos = 0, isz = sizeof(int32_t),
105			 dsz = MD4_DIGEST_LENGTH;
106	unsigned char	 fmd[MD4_DIGEST_LENGTH];
107	off_t		 sz;
108	char		 buf[20];
109
110	switch (up->stat.curst) {
111	case BLKSTAT_DATA:
112		/*
113		 * A data segment to be written: buffer both the length
114		 * and the data.
115		 * If we've finished the transfer, move on to the token;
116		 * otherwise, keep sending data.
117		 */
118
119		sz = MINIMUM(MAX_CHUNK,
120			up->stat.curlen - up->stat.curpos);
121		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
122			ERRX1(sess, "io_lowbuffer_alloc");
123			return 0;
124		}
125		io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);
126		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {
127			ERRX1(sess, "io_lowbuffer_alloc");
128			return 0;
129		}
130		io_lowbuffer_buf(sess, *wb, &pos, *wbsz,
131			up->stat.map + up->stat.curpos, sz);
132
133		up->stat.curpos += sz;
134		if (up->stat.curpos == up->stat.curlen)
135			up->stat.curst = BLKSTAT_TOK;
136		return 1;
137	case BLKSTAT_TOK:
138		/*
139		 * The data token following (maybe) a data segment.
140		 * These can also come standalone if, say, the file's
141		 * being fully written.
142		 * It's followed by a hash or another data segment,
143		 * depending on the token.
144		 */
145
146		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
147			ERRX1(sess, "io_lowbuffer_alloc");
148			return 0;
149		}
150		io_lowbuffer_int(sess, *wb,
151			&pos, *wbsz, up->stat.curtok);
152		up->stat.curst = up->stat.curtok ?
153			BLKSTAT_NEXT : BLKSTAT_HASH;
154		return 1;
155	case BLKSTAT_HASH:
156		/*
157		 * The hash following transmission of all file contents.
158		 * This is always followed by the state that we're
159		 * finished with the file.
160		 */
161
162		hash_file(up->stat.map, up->stat.mapsz, fmd, sess);
163		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {
164			ERRX1(sess, "io_lowbuffer_alloc");
165			return 0;
166		}
167		io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);
168		up->stat.curst = BLKSTAT_DONE;
169		return 1;
170	case BLKSTAT_DONE:
171		/*
172		 * The data has been written.
173		 * Clear our current send file and allow the block below
174		 * to find another.
175		 */
176
177		if (!sess->opts->dry_run)
178			LOG3(sess, "%s: flushed %jd KB total, %.2f%% uploaded",
179			    fl[up->cur->idx].path,
180			    (intmax_t)up->stat.total / 1024,
181			    100.0 * up->stat.dirty / up->stat.total);
182		send_up_reset(up);
183		return 1;
184	case BLKSTAT_PHASE:
185		/*
186		 * This is where we actually stop the algorithm: we're
187		 * already at the second phase.
188		 */
189
190		send_up_reset(up);
191		(*phase)++;
192		return 1;
193	case BLKSTAT_NEXT:
194		/*
195		 * Our last case: we need to find the
196		 * next block (and token) to transmit to
197		 * the receiver.
198		 * These will drive the finite state
199		 * machine in the first few conditional
200		 * blocks of this set.
201		 */
202
203		assert(up->stat.fd != -1);
204		blk_match(sess, up->cur->blks,
205			fl[up->cur->idx].path, &up->stat);
206		return 1;
207	case BLKSTAT_NONE:
208		break;
209	}
210
211	assert(BLKSTAT_NONE == up->stat.curst);
212
213	/*
214	 * We've either hit the phase change following the last file (or
215	 * start, or prior phase change), or we need to prime the next
216	 * file for transmission.
217	 * We special-case dry-run mode.
218	 */
219
220	if (up->cur->idx < 0) {
221		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
222			ERRX1(sess, "io_lowbuffer_alloc");
223			return 0;
224		}
225		io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
226
227		if (sess->opts->server && sess->rver > 27) {
228			if (!io_lowbuffer_alloc(sess,
229			    wb, wbsz, wbmax, isz)) {
230				ERRX1(sess, "io_lowbuffer_alloc");
231				return 0;
232			}
233			io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
234		}
235		up->stat.curst = BLKSTAT_PHASE;
236	} else if (sess->opts->dry_run) {
237		if (!sess->opts->server)
238			LOG1(sess, "%s", fl[up->cur->idx].wpath);
239
240		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
241			ERRX1(sess, "io_lowbuffer_alloc");
242			return 0;
243		}
244		io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);
245		up->stat.curst = BLKSTAT_DONE;
246	} else {
247		assert(up->stat.fd != -1);
248
249		/*
250		 * FIXME: use the nice output of log_file() and so on in
251		 * downloader.c, which means moving this into
252		 * BLKSTAT_DONE instead of having it be here.
253		 */
254
255		if (!sess->opts->server)
256			LOG1(sess, "%s", fl[up->cur->idx].wpath);
257
258		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {
259			ERRX1(sess, "io_lowbuffer_alloc");
260			return 0;
261		}
262		assert(sizeof(buf) == 20);
263		blk_recv_ack(sess, buf, up->cur->blks, up->cur->idx);
264		io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);
265
266		LOG3(sess, "%s: primed for %jd B total",
267		    fl[up->cur->idx].path, (intmax_t)up->cur->blks->size);
268		up->stat.curst = BLKSTAT_NEXT;
269	}
270
271	return 1;
272}
273
274/*
275 * Enqueue a download request, getting it off the read channel as
276 * quickly a possible.
277 * This frees up the read channel for further incoming requests.
278 * We'll handle each element in turn, up to and including the last
279 * request (phase change), which is always a -1 idx.
280 * Returns zero on failure, non-zero on success.
281 */
282static int
283send_dl_enqueue(struct sess *sess, struct send_dlq *q,
284	int32_t idx, const struct flist *fl, size_t flsz, int fd)
285{
286	struct send_dl	*s;
287
288	/* End-of-phase marker. */
289
290	if (idx == -1) {
291		if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
292			ERR(sess, "calloc");
293			return 0;
294		}
295		s->idx = -1;
296		s->blks = NULL;
297		TAILQ_INSERT_TAIL(q, s, entries);
298		return 1;
299	}
300
301	/* Validate the index. */
302
303	if (idx < 0 || (uint32_t)idx >= flsz) {
304		ERRX(sess, "file index out of bounds: invalid %d out of %zu",
305		    idx, flsz);
306		return 0;
307	} else if (S_ISDIR(fl[idx].st.mode)) {
308		ERRX(sess, "blocks requested for "
309			"directory: %s", fl[idx].path);
310		return 0;
311	} else if (S_ISLNK(fl[idx].st.mode)) {
312		ERRX(sess, "blocks requested for "
313			"symlink: %s", fl[idx].path);
314		return 0;
315	} else if (!S_ISREG(fl[idx].st.mode)) {
316		ERRX(sess, "blocks requested for "
317			"special: %s", fl[idx].path);
318		return 0;
319	}
320
321	if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
322		ERR(sess, "callloc");
323		return 0;
324	}
325	s->idx = idx;
326	s->blks = NULL;
327	TAILQ_INSERT_TAIL(q, s, entries);
328
329	/*
330	 * This blocks til the full blockset has been read.
331	 * That's ok, because the most important thing is getting data
332	 * off the wire.
333	 */
334
335	if (!sess->opts->dry_run) {
336		s->blks = blk_recv(sess, fd, fl[idx].path);
337		if (s->blks == NULL) {
338			ERRX1(sess, "blk_recv");
339			return 0;
340		}
341	}
342	return 1;
343}
344
345/*
346 * A client sender manages the read-only source files and sends data to
347 * the receiver as requested.
348 * First it sends its list of files, then it waits for the server to
349 * request updates to individual files.
350 * It queues requests for updates as soon as it receives them.
351 * Returns zero on failure, non-zero on success.
352 *
353 * Pledges: stdio, rpath, unveil.
354 */
355int
356rsync_sender(struct sess *sess, int fdin,
357	int fdout, size_t argc, char **argv)
358{
359	struct flist	   *fl = NULL;
360	const struct flist *f;
361	size_t		    i, flsz = 0, phase = 0, excl;
362	int		    rc = 0, c;
363	int32_t		    idx;
364	struct pollfd	    pfd[3];
365	struct send_dlq	    sdlq;
366	struct send_dl	   *dl;
367	struct send_up	    up;
368	struct stat	    st;
369	void		   *wbuf = NULL;
370	size_t		    wbufpos = 0, wbufsz = 0, wbufmax = 0;
371	ssize_t		    ssz;
372
373	if (pledge("stdio getpw rpath unveil", NULL) == -1) {
374		ERR(sess, "pledge");
375		return 0;
376	}
377
378	memset(&up, 0, sizeof(struct send_up));
379	TAILQ_INIT(&sdlq);
380	up.stat.fd = -1;
381	up.stat.map = MAP_FAILED;
382
383	/*
384	 * Generate the list of files we want to send from our
385	 * command-line input.
386	 * This will also remove all invalid files.
387	 */
388
389	if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
390		ERRX1(sess, "flist_gen");
391		goto out;
392	}
393
394	/* Client sends zero-length exclusions if deleting. */
395
396	if (!sess->opts->server && sess->opts->del &&
397	     !io_write_int(sess, fdout, 0)) {
398		ERRX1(sess, "io_write_int");
399		goto out;
400	}
401
402	/*
403	 * Then the file list in any mode.
404	 * Finally, the IO error (always zero for us).
405	 */
406
407	if (!flist_send(sess, fdin, fdout, fl, flsz)) {
408		ERRX1(sess, "flist_send");
409		goto out;
410	} else if (!io_write_int(sess, fdout, 0)) {
411		ERRX1(sess, "io_write_int");
412		goto out;
413	}
414
415	/* Exit if we're the server with zero files. */
416
417	if (flsz == 0 && sess->opts->server) {
418		WARNX(sess, "sender has empty file list: exiting");
419		rc = 1;
420		goto out;
421	} else if (!sess->opts->server)
422		LOG1(sess, "Transfer starting: %zu files", flsz);
423
424	/*
425	 * If we're the server, read our exclusion list.
426	 * This is always 0 for now.
427	 */
428
429	if (sess->opts->server) {
430		if (!io_read_size(sess, fdin, &excl)) {
431			ERRX1(sess, "io_read_size");
432			goto out;
433		} else if (excl != 0) {
434			ERRX1(sess, "exclusion list is non-empty");
435			goto out;
436		}
437	}
438
439	/*
440	 * Set up our poll events.
441	 * We start by polling only in receiver requests, enabling other
442	 * poll events on demand.
443	 */
444
445	pfd[0].fd = fdin; /* from receiver */
446	pfd[0].events = POLLIN;
447	pfd[1].fd = -1; /* to receiver */
448	pfd[1].events = POLLOUT;
449	pfd[2].fd = -1; /* from local file */
450	pfd[2].events = POLLIN;
451
452	for (;;) {
453		assert(pfd[0].fd != -1);
454		if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
455			ERR(sess, "poll");
456			goto out;
457		} else if (c == 0) {
458			ERRX(sess, "poll: timeout");
459			goto out;
460		}
461		for (i = 0; i < 3; i++)
462			if (pfd[i].revents & (POLLERR|POLLNVAL)) {
463				ERRX(sess, "poll: bad fd");
464				goto out;
465			} else if (pfd[i].revents & POLLHUP) {
466				ERRX(sess, "poll: hangup");
467				goto out;
468			}
469
470		/*
471		 * If we have a request coming down off the wire, pull
472		 * it in as quickly as possible into our buffer.
473		 * Start by seeing if we have a log message.
474		 * If we do, pop it off, then see if we have anything
475		 * left and hit it again if so (read priority).
476		 */
477
478		if (sess->mplex_reads && (pfd[0].revents & POLLIN)) {
479			if (!io_read_flush(sess, fdin)) {
480				ERRX1(sess, "io_read_flush");
481				goto out;
482			} else if (sess->mplex_read_remain == 0) {
483				c = io_read_check(sess, fdin);
484				if (c < 0) {
485					ERRX1(sess, "io_read_check");
486					goto out;
487				} else if (c > 0)
488					continue;
489				pfd[0].revents &= ~POLLIN;
490			}
491		}
492
493		/*
494		 * Now that we've handled the log messages, we're left
495		 * here if we have any actual data coming down.
496		 * Enqueue message requests, then loop again if we see
497		 * more data (read priority).
498		 */
499
500		if (pfd[0].revents & POLLIN) {
501			if (!io_read_int(sess, fdin, &idx)) {
502				ERRX1(sess, "io_read_int");
503				goto out;
504			}
505			if (!send_dl_enqueue(sess,
506			    &sdlq, idx, fl, flsz, fdin)) {
507				ERRX1(sess, "send_dl_enqueue");
508				goto out;
509			}
510			c = io_read_check(sess, fdin);
511			if (c < 0) {
512				ERRX1(sess, "io_read_check");
513				goto out;
514			} else if (c > 0)
515				continue;
516		}
517
518		/*
519		 * One of our local files has been opened in response
520		 * to a receiver request and now we can map it.
521		 * We'll respond to the event by looking at the map when
522		 * the writer is available.
523		 * Here we also enable the poll event for output.
524		 */
525
526		if (pfd[2].revents & POLLIN) {
527			assert(up.cur != NULL);
528			assert(up.stat.fd != -1);
529			assert(up.stat.map == MAP_FAILED);
530			assert(up.stat.mapsz == 0);
531			f = &fl[up.cur->idx];
532
533			if (fstat(up.stat.fd, &st) == -1) {
534				ERR(sess, "%s: fstat", f->path);
535				goto out;
536			}
537
538			/*
539			 * If the file is zero-length, the map will
540			 * fail, but either way we want to unset that
541			 * we're waiting for the file to open and set
542			 * that we're ready for the output channel.
543			 */
544
545			if ((up.stat.mapsz = st.st_size) > 0) {
546				up.stat.map = mmap(NULL,
547					up.stat.mapsz, PROT_READ,
548					MAP_SHARED, up.stat.fd, 0);
549				if (up.stat.map == MAP_FAILED) {
550					ERR(sess, "%s: mmap", f->path);
551					goto out;
552				}
553			}
554
555			pfd[2].fd = -1;
556			pfd[1].fd = fdout;
557		}
558
559		/*
560		 * If we have buffers waiting to write, write them out
561		 * as soon as we can in a non-blocking fashion.
562		 * We must not be waiting for any local files.
563		 * ALL WRITES MUST HAPPEN HERE.
564		 * This keeps the sender deadlock-free.
565		 */
566
567		if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
568			assert(pfd[2].fd == -1);
569			assert(wbufsz - wbufpos);
570			ssz = write(fdout,
571				wbuf + wbufpos, wbufsz - wbufpos);
572			if (ssz < 0) {
573				ERR(sess, "write");
574				goto out;
575			}
576			wbufpos += ssz;
577			if (wbufpos == wbufsz)
578				wbufpos = wbufsz = 0;
579			pfd[1].revents &= ~POLLOUT;
580
581			/* This is usually in io.c... */
582
583			sess->total_write += ssz;
584		}
585
586		/*
587		 * Engage the FSM for the current transfer.
588		 * If our phase changes, stop processing.
589		 */
590
591		if (pfd[1].revents & POLLOUT && up.cur != NULL) {
592			assert(pfd[2].fd == -1);
593			assert(wbufpos == 0 && wbufsz == 0);
594			if (!send_up_fsm(sess, &phase,
595			    &up, &wbuf, &wbufsz, &wbufmax, fl)) {
596				ERRX1(sess, "send_up_fsm");
597				goto out;
598			} else if (phase > 1)
599				break;
600		}
601
602		/*
603		 * Incoming queue management.
604		 * If we have no queue component that we're waiting on,
605		 * then pull off the receiver-request queue and start
606		 * processing the request.
607		 */
608
609		if (up.cur == NULL) {
610			assert(pfd[2].fd == -1);
611			assert(up.stat.fd == -1);
612			assert(up.stat.map == MAP_FAILED);
613			assert(up.stat.mapsz == 0);
614			assert(wbufsz == 0 && wbufpos == 0);
615			pfd[1].fd = -1;
616
617			/*
618			 * If there's nothing in the queue, then keep
619			 * the output channel disabled and wait for
620			 * whatever comes next from the reader.
621			 */
622
623			if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
624				continue;
625
626			TAILQ_REMOVE(&sdlq, up.cur, entries);
627
628			/*
629			 * End of phase: enable channel to receiver.
630			 * We'll need our output buffer enabled in order
631			 * to process this event.
632			 */
633
634			if (up.cur->idx == -1) {
635				pfd[1].fd = fdout;
636				continue;
637			}
638
639			/*
640			 * Non-blocking open of file.
641			 * This will be picked up in the state machine
642			 * block of not being primed.
643			 */
644
645			up.stat.fd = open(fl[up.cur->idx].path,
646				O_RDONLY|O_NONBLOCK, 0);
647			if (up.stat.fd == -1) {
648				ERR(sess, "%s: open", fl[up.cur->idx].path);
649				goto out;
650			}
651			pfd[2].fd = up.stat.fd;
652		}
653	}
654
655	if (!TAILQ_EMPTY(&sdlq)) {
656		ERRX(sess, "phases complete with files still queued");
657		goto out;
658	}
659
660	if (!sess_stats_send(sess, fdout)) {
661		ERRX1(sess, "sess_stats_end");
662		goto out;
663	}
664
665	/* Final "goodbye" message. */
666
667	if (!io_read_int(sess, fdin, &idx)) {
668		ERRX1(sess, "io_read_int");
669		goto out;
670	} else if (idx != -1) {
671		ERRX(sess, "read incorrect update complete ack");
672		goto out;
673	}
674
675	LOG2(sess, "sender finished updating");
676	rc = 1;
677out:
678	send_up_reset(&up);
679	while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
680		TAILQ_REMOVE(&sdlq, dl, entries);
681		free(dl->blks);
682		free(dl);
683	}
684	flist_free(fl, flsz);
685	free(wbuf);
686	return rc;
687}
688