sender.c revision 1.9
1/*	$Id: sender.c,v 1.9 2019/02/16 16:58:39 florian Exp $ */
2/*
3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4 *
5 * Permission to use, copy, modify, and distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
8 *
9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 */
17#include <sys/mman.h>
18#include <sys/queue.h>
19#include <sys/stat.h>
20
21#include <assert.h>
22#include <fcntl.h>
23#include <inttypes.h>
24#include <poll.h>
25#include <stdlib.h>
26#include <string.h>
27#include <unistd.h>
28
29#include <openssl/md4.h>
30
31#include "extern.h"
32
33/*
34 * A request from the receiver to download updated file data.
35 */
36struct	send_dl {
37	int32_t	 	     idx; /* index in our file list */
38	struct blkset	    *blks; /* the sender's block information */
39	TAILQ_ENTRY(send_dl) entries;
40};
41
42/*
43 * The current file being "updated": sent from sender to receiver.
44 * If there is no file being uploaded, "cur" is NULL.
45 */
46struct	send_up {
47	struct send_dl	*cur; /* file being updated or NULL */
48	struct blkstat	 stat; /* status of file being updated */
49	int		 primed; /* blk_recv_ack() was called */
50};
51
52TAILQ_HEAD(send_dlq, send_dl);
53
54/*
55 * We have finished updating the receiver's file with sender data.
56 * Deallocate and wipe clean all resources required for that.
57 */
58static void
59send_up_reset(struct send_up *p)
60{
61
62	assert(NULL != p);
63
64	/* Free the download request, if applicable. */
65
66	if (p->cur != NULL) {
67		free(p->cur->blks);
68		free(p->cur);
69		p->cur = NULL;
70	}
71
72	/* If we mapped a file for scanning, unmap it and close. */
73
74	if (p->stat.map != MAP_FAILED)
75		munmap(p->stat.map, p->stat.mapsz);
76
77	p->stat.map = MAP_FAILED;
78	p->stat.mapsz = 0;
79
80	if (p->stat.fd != -1)
81		close(p->stat.fd);
82
83	p->stat.fd = -1;
84
85	/* Now clear the in-transfer information. */
86
87	p->stat.offs = 0;
88	p->stat.hint = 0;
89	p->stat.curst = BLKSTAT_NONE;
90	p->primed = 0;
91}
92
93/*
94 * Enqueue a download request, getting it off the read channel as
95 * quickly a possible.
96 * This frees up the read channel for further incoming requests.
97 * We'll handle each element in turn, up to and including the last
98 * request (phase change), which is always a -1 idx.
99 * Returns zero on failure, non-zero on success.
100 */
101static int
102send_dl_enqueue(struct sess *sess, struct send_dlq *q,
103	int32_t idx, const struct flist *fl, size_t flsz, int fd)
104{
105	struct send_dl	*s;
106
107	/* End-of-phase marker. */
108
109	if (idx == -1) {
110		if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
111			ERR(sess, "calloc");
112			return 0;
113		}
114		s->idx = -1;
115		s->blks = NULL;
116		TAILQ_INSERT_TAIL(q, s, entries);
117		return 1;
118	}
119
120	/* Validate the index. */
121
122	if (idx < 0 || (uint32_t)idx >= flsz) {
123		ERRX(sess, "file index out of bounds: invalid %"
124			PRId32 " out of %zu", idx, flsz);
125		return 0;
126	} else if (S_ISDIR(fl[idx].st.mode)) {
127		ERRX(sess, "blocks requested for "
128			"directory: %s", fl[idx].path);
129		return 0;
130	} else if (S_ISLNK(fl[idx].st.mode)) {
131		ERRX(sess, "blocks requested for "
132			"symlink: %s", fl[idx].path);
133		return 0;
134	} else if (!S_ISREG(fl[idx].st.mode)) {
135		ERRX(sess, "blocks requested for "
136			"special: %s", fl[idx].path);
137		return 0;
138	}
139
140	if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
141		ERR(sess, "callloc");
142		return 0;
143	}
144	s->idx = idx;
145	s->blks = NULL;
146	TAILQ_INSERT_TAIL(q, s, entries);
147
148	/*
149	 * This blocks til the full blockset has been read.
150	 * That's ok, because the most important thing is getting data
151	 * off the wire.
152	 */
153
154	if (!sess->opts->dry_run) {
155		s->blks = blk_recv(sess, fd, fl[idx].path);
156		if (s->blks == NULL) {
157			ERRX1(sess, "blk_recv");
158			return 0;
159		}
160	}
161	return 1;
162}
163
164/*
165 * A client sender manages the read-only source files and sends data to
166 * the receiver as requested.
167 * First it sends its list of files, then it waits for the server to
168 * request updates to individual files.
169 * It queues requests for updates as soon as it receives them.
170 * Returns zero on failure, non-zero on success.
171 *
172 * Pledges: stdio, rpath, unveil.
173 */
174int
175rsync_sender(struct sess *sess, int fdin,
176	int fdout, size_t argc, char **argv)
177{
178	struct flist	   *fl = NULL;
179	const struct flist *f;
180	size_t		    i, flsz = 0, phase = 0, excl;
181	off_t		    sz;
182	int		    rc = 0, c;
183	int32_t		    idx;
184	struct pollfd	    pfd[3];
185	struct send_dlq	    sdlq;
186	struct send_dl	   *dl;
187	struct send_up	    up;
188	struct stat	    st;
189	unsigned char	    filemd[MD4_DIGEST_LENGTH];
190	void		   *wbuf = NULL;
191	size_t		    wbufpos = 0, pos, wbufsz = 0, wbufmax = 0;
192	ssize_t		    ssz;
193
194	if (pledge("stdio getpw rpath unveil", NULL) == -1) {
195		ERR(sess, "pledge");
196		return 0;
197	}
198
199	memset(&up, 0, sizeof(struct send_up));
200	TAILQ_INIT(&sdlq);
201	up.stat.fd = -1;
202	up.stat.map = MAP_FAILED;
203
204	/*
205	 * Generate the list of files we want to send from our
206	 * command-line input.
207	 * This will also remove all invalid files.
208	 */
209
210	if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
211		ERRX1(sess, "flist_gen");
212		goto out;
213	}
214
215	/* Client sends zero-length exclusions if deleting. */
216
217	if (!sess->opts->server && sess->opts->del &&
218	    !io_write_int(sess, fdout, 0)) {
219		ERRX1(sess, "io_write_int");
220		goto out;
221	}
222
223	/*
224	 * Then the file list in any mode.
225	 * Finally, the IO error (always zero for us).
226	 */
227
228	if (!flist_send(sess, fdin, fdout, fl, flsz)) {
229		ERRX1(sess, "flist_send");
230		goto out;
231	} else if (!io_write_int(sess, fdout, 0)) {
232		ERRX1(sess, "io_write_int");
233		goto out;
234	}
235
236	/* Exit if we're the server with zero files. */
237
238	if (flsz == 0 && sess->opts->server) {
239		WARNX(sess, "sender has empty file list: exiting");
240		rc = 1;
241		goto out;
242	} else if (!sess->opts->server)
243		LOG1(sess, "Transfer starting: %zu files", flsz);
244
245	/*
246	 * If we're the server, read our exclusion list.
247	 * This is always 0 for now.
248	 */
249
250	if (sess->opts->server) {
251		if (!io_read_size(sess, fdin, &excl)) {
252			ERRX1(sess, "io_read_size");
253			goto out;
254		} else if (excl != 0) {
255			ERRX1(sess, "exclusion list is non-empty");
256			goto out;
257		}
258	}
259
260	/*
261	 * Set up our poll events.
262	 * We start by polling only in receiver requests, enabling other
263	 * poll events on demand.
264	 */
265
266	pfd[0].fd = fdin; /* from receiver */
267	pfd[0].events = POLLIN;
268	pfd[1].fd = -1; /* to receiver */
269	pfd[1].events = POLLOUT;
270	pfd[2].fd = -1; /* from local file */
271	pfd[2].events = POLLIN;
272
273	for (;;) {
274		assert(pfd[0].fd != -1);
275		if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
276			ERR(sess, "poll");
277			goto out;
278		} else if (c == 0) {
279			ERRX(sess, "poll: timeout");
280			goto out;
281		}
282		for (i = 0; i < 3; i++)
283			if (pfd[i].revents & (POLLERR|POLLNVAL)) {
284				ERRX(sess, "poll: bad fd");
285				goto out;
286			} else if (pfd[i].revents & POLLHUP) {
287				ERRX(sess, "poll: hangup");
288				goto out;
289			}
290
291		/*
292		 * If we have a request coming down off the wire, pull
293		 * it in as quickly as possible into our buffer.
294		 * This unclogs the socket buffers so the data can flow.
295		 * FIXME: if we're multiplexing, we might stall here if
296		 * there's only a log message and no actual data.
297		 * This can be fixed by doing a conditional test.
298		 */
299
300		if (pfd[0].revents & POLLIN)
301			for (;;) {
302				if (!io_read_int(sess, fdin, &idx)) {
303					ERRX1(sess, "io_read_int");
304					goto out;
305				}
306				if (!send_dl_enqueue(sess,
307				    &sdlq, idx, fl, flsz, fdin)) {
308					ERRX1(sess, "send_dl_enqueue");
309					goto out;
310				}
311				c = io_read_check(sess, fdin);
312				if (c < 0) {
313					ERRX1(sess, "io_read_check");
314					goto out;
315				} else if (c == 0)
316					break;
317			}
318
319		/*
320		 * One of our local files has been opened in response
321		 * to a receiver request and now we can map it.
322		 * We'll respond to the event by looking at the map when
323		 * the writer is available.
324		 * Here we also enable the poll event for output.
325		 */
326
327		if (pfd[2].revents & POLLIN) {
328			assert(up.cur != NULL);
329			assert(up.stat.fd != -1);
330			assert(up.stat.map == MAP_FAILED);
331			assert(up.stat.mapsz == 0);
332			f = &fl[up.cur->idx];
333
334			if (fstat(up.stat.fd, &st) == -1) {
335				ERR(sess, "%s: fstat", f->path);
336				goto out;
337			}
338
339			/*
340			 * If the file is zero-length, the map will
341			 * fail, but either way we want to unset that
342			 * we're waiting for the file to open and set
343			 * that we're ready for the output channel.
344			 */
345
346			if ((up.stat.mapsz = st.st_size) > 0) {
347				up.stat.map = mmap(NULL,
348					up.stat.mapsz, PROT_READ,
349					MAP_SHARED, up.stat.fd, 0);
350				if (up.stat.map == MAP_FAILED) {
351					ERR(sess, "%s: mmap", f->path);
352					goto out;
353				}
354			}
355
356			pfd[2].fd = -1;
357			pfd[1].fd = fdout;
358		}
359
360		/*
361		 * If we have buffers waiting to write, write them out
362		 * as soon as we can in a non-blocking fashion.
363		 * We must not be waiting for any local files.
364		 * ALL WRITES MUST HAPPEN HERE.
365		 * This keeps the sender deadlock-free.
366		 */
367
368		if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
369			assert(pfd[2].fd == -1);
370			assert(wbufsz - wbufpos);
371			ssz = write(fdout,
372				wbuf + wbufpos, wbufsz - wbufpos);
373			if (ssz < 0) {
374				ERR(sess, "write");
375				goto out;
376			}
377			wbufpos += ssz;
378			if (wbufpos == wbufsz)
379				wbufpos = wbufsz = 0;
380			pfd[1].revents &= ~POLLOUT;
381
382			/* This is usually in io.c... */
383
384			sess->total_write += ssz;
385		}
386
387		if (pfd[1].revents & POLLOUT) {
388			assert(pfd[2].fd == -1);
389			assert(0 == wbufpos && 0 == wbufsz);
390
391			/*
392			 * If we have data to write, do it now according
393			 * to the data finite state machine.
394			 * If we receive an invalid index (-1), then
395			 * we're either promoted to the second phase or
396			 * it's time to exit, depending upon which phase
397			 * we're in.
398			 * Otherwise, we either start a transfer
399			 * sequence (if not primed) or continue one.
400			 */
401
402			pos = 0;
403			if (BLKSTAT_DATA == up.stat.curst) {
404				/*
405				 * A data segment to be written: buffer
406				 * both the length and the data, then
407				 * put is in the token phase.
408				 */
409
410				sz = MIN(MAX_CHUNK,
411					up.stat.curlen - up.stat.curpos);
412				if (!io_lowbuffer_alloc(sess, &wbuf,
413				    &wbufsz, &wbufmax, sizeof(int32_t))) {
414					ERRX1(sess, "io_lowbuffer_alloc");
415					goto out;
416				}
417				io_lowbuffer_int(sess,
418					wbuf, &pos, wbufsz, sz);
419				if (!io_lowbuffer_alloc(sess, &wbuf,
420				    &wbufsz, &wbufmax, sz)) {
421					ERRX1(sess, "io_lowbuffer_alloc");
422					goto out;
423				}
424				io_lowbuffer_buf(sess, wbuf, &pos, wbufsz,
425					up.stat.map + up.stat.curpos, sz);
426				up.stat.curpos += sz;
427				if (up.stat.curpos == up.stat.curlen)
428					up.stat.curst = BLKSTAT_TOK;
429			} else if (BLKSTAT_TOK == up.stat.curst) {
430				/*
431				 * The data token following (maybe) a
432				 * data segment.
433				 * These can also come standalone if,
434				 * say, the file's being fully written.
435				 * It's followed by a hash or another
436				 * data segment, depending on the token.
437				 */
438
439				if (!io_lowbuffer_alloc(sess, &wbuf,
440				    &wbufsz, &wbufmax, sizeof(int32_t))) {
441					ERRX1(sess, "io_lowbuffer_alloc");
442					goto out;
443				}
444				io_lowbuffer_int(sess, wbuf,
445					&pos, wbufsz, up.stat.curtok);
446				up.stat.curst = up.stat.curtok ?
447					BLKSTAT_NONE : BLKSTAT_HASH;
448			} else if (BLKSTAT_HASH == up.stat.curst) {
449				/*
450				 * The hash following transmission of
451				 * all file contents.
452				 * This is always followed by the state
453				 * that we're finished with the file.
454				 */
455
456				hash_file(up.stat.map,
457					up.stat.mapsz, filemd, sess);
458				if (!io_lowbuffer_alloc(sess, &wbuf,
459				    &wbufsz, &wbufmax, MD4_DIGEST_LENGTH)) {
460					ERRX1(sess, "io_lowbuffer_alloc");
461					goto out;
462				}
463				io_lowbuffer_buf(sess, wbuf, &pos,
464					wbufsz, filemd, MD4_DIGEST_LENGTH);
465				up.stat.curst = BLKSTAT_DONE;
466			} else if (BLKSTAT_DONE == up.stat.curst) {
467				/*
468				 * The data has been written.
469				 * Clear our current send file and allow
470				 * the block below to find another.
471				 */
472
473				LOG3(sess, "%s: flushed %jd KB total, "
474					"%.2f%% uploaded",
475					fl[up.cur->idx].path,
476					(intmax_t)up.stat.total / 1024,
477					100.0 * up.stat.dirty / up.stat.total);
478				send_up_reset(&up);
479			} else if (NULL != up.cur && up.cur->idx < 0) {
480				/*
481				 * We've hit the phase change following
482				 * the last file (or start, or prior
483				 * phase change).
484				 * Simply acknowledge it.
485				 * FIXME: use buffering.
486				 */
487
488				if (!io_write_int(sess, fdout, -1)) {
489					ERRX1(sess, "io_write_int");
490					goto out;
491				}
492				if (sess->opts->server && sess->rver > 27 &&
493				    !io_write_int(sess, fdout, -1)) {
494					ERRX1(sess, "io_write_int");
495					goto out;
496				}
497				send_up_reset(&up);
498
499				/*
500				 * This is where we actually stop the
501				 * algorithm: we're already at the
502				 * second phase.
503				 */
504
505				if (phase++)
506					break;
507			} else if (NULL != up.cur && 0 == up.primed) {
508				/*
509				 * We're getting ready to send the file
510				 * contents to the receiver.
511				 * FIXME: use buffering.
512				 */
513
514				if (!sess->opts->server)
515					LOG1(sess, "%s", fl[up.cur->idx].wpath);
516
517				/* Dry-running does nothing but a response. */
518
519				if (sess->opts->dry_run &&
520				    !io_write_int(sess, fdout, up.cur->idx)) {
521					ERRX1(sess, "io_write_int");
522					goto out;
523				}
524
525				/* Actually perform the block send. */
526
527				assert(up.stat.fd != -1);
528				if (!blk_recv_ack(sess, fdout,
529				    up.cur->blks, up.cur->idx)) {
530					ERRX1(sess, "blk_recv_ack");
531					goto out;
532				}
533				LOG3(sess, "%s: primed for %jd B total",
534					fl[up.cur->idx].path,
535					(intmax_t)up.cur->blks->size);
536				up.primed = 1;
537			} else if (NULL != up.cur) {
538				/*
539				 * Our last case: we need to find the
540				 * next block (and token) to transmit to
541				 * the receiver.
542				 * These will drive the finite state
543				 * machine in the first few conditional
544				 * blocks of this set.
545				 */
546
547				assert(up.stat.fd != -1);
548				blk_match(sess, up.cur->blks,
549					fl[up.cur->idx].path, &up.stat);
550			}
551		}
552
553		/*
554		 * Incoming queue management.
555		 * If we have no queue component that we're waiting on,
556		 * then pull off the receiver-request queue and start
557		 * processing the request.
558		 */
559
560		if (up.cur == NULL) {
561			assert(pfd[2].fd == -1);
562			assert(up.stat.fd == -1);
563			assert(up.stat.map == MAP_FAILED);
564			assert(up.stat.mapsz == 0);
565			assert(wbufsz == 0 && wbufpos == 0);
566			pfd[1].fd = -1;
567
568			/*
569			 * If there's nothing in the queue, then keep
570			 * the output channel disabled and wait for
571			 * whatever comes next from the reader.
572			 */
573
574			if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
575				continue;
576
577			TAILQ_REMOVE(&sdlq, up.cur, entries);
578
579			/*
580			 * End of phase: enable channel to receiver.
581			 * We'll need our output buffer enabled in order
582			 * to process this event.
583			 */
584
585			if (up.cur->idx == -1) {
586				pfd[1].fd = fdout;
587				continue;
588			}
589
590			/*
591			 * Non-blocking open of file.
592			 * This will be picked up in the state machine
593			 * block of not being primed.
594			 */
595
596			up.stat.fd = open(fl[up.cur->idx].path,
597				O_RDONLY|O_NONBLOCK, 0);
598			if (up.stat.fd == -1) {
599				ERR(sess, "%s: open", fl[up.cur->idx].path);
600				goto out;
601			}
602			pfd[2].fd = up.stat.fd;
603		}
604	}
605
606	if (!TAILQ_EMPTY(&sdlq)) {
607		ERRX(sess, "phases complete with files still queued");
608		goto out;
609	}
610
611	if (!sess_stats_send(sess, fdout)) {
612		ERRX1(sess, "sess_stats_end");
613		goto out;
614	}
615
616	/* Final "goodbye" message. */
617
618	if (!io_read_int(sess, fdin, &idx)) {
619		ERRX1(sess, "io_read_int");
620		goto out;
621	} else if (idx != -1) {
622		ERRX(sess, "read incorrect update complete ack");
623		goto out;
624	}
625
626	LOG2(sess, "sender finished updating");
627	rc = 1;
628out:
629	send_up_reset(&up);
630	while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
631		free(dl->blks);
632		free(dl);
633	}
634	flist_free(fl, flsz);
635	free(wbuf);
636	return rc;
637}
638