1/*	$OpenBSD: rrdp.c,v 1.33 2024/02/16 11:46:57 tb Exp $ */
2/*
3 * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
4 * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18#include <sys/queue.h>
19#include <sys/stat.h>
20
21#include <err.h>
22#include <errno.h>
23#include <fcntl.h>
24#include <limits.h>
25#include <poll.h>
26#include <string.h>
27#include <unistd.h>
28#include <imsg.h>
29
30#include <expat.h>
31#include <openssl/sha.h>
32
33#include "extern.h"
34#include "rrdp.h"
35
36#define MAX_SESSIONS	12
37#define	READ_BUF_SIZE	(32 * 1024)
38
39static struct msgbuf	msgq;
40
41#define RRDP_STATE_REQ		0x01
42#define RRDP_STATE_WAIT		0x02
43#define RRDP_STATE_PARSE	0x04
44#define RRDP_STATE_PARSE_ERROR	0x08
45#define RRDP_STATE_PARSE_DONE	0x10
46#define RRDP_STATE_HTTP_DONE	0x20
47#define RRDP_STATE_DONE		(RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
48
49struct rrdp {
50	TAILQ_ENTRY(rrdp)	 entry;
51	unsigned int		 id;
52	char			*notifyuri;
53	char			*local;
54	char			*last_mod;
55
56	struct pollfd		*pfd;
57	int			 infd;
58	int			 state;
59	int			 aborted;
60	unsigned int		 file_pending;
61	unsigned int		 file_failed;
62	enum http_result	 res;
63	enum rrdp_task		 task;
64
65	char			 hash[SHA256_DIGEST_LENGTH];
66	SHA256_CTX		 ctx;
67
68	struct rrdp_session	*repository;
69	struct rrdp_session	*current;
70	XML_Parser		 parser;
71	struct notification_xml	*nxml;
72	struct snapshot_xml	*sxml;
73	struct delta_xml	*dxml;
74};
75
76static TAILQ_HEAD(, rrdp)	states = TAILQ_HEAD_INITIALIZER(states);
77
78char *
79xstrdup(const char *s)
80{
81	char *r;
82	if ((r = strdup(s)) == NULL)
83		err(1, "strdup");
84	return r;
85}
86
87/*
88 * Report back that a RRDP request finished.
89 * ok should only be set to 1 if the cache is now up-to-date.
90 */
91static void
92rrdp_done(unsigned int id, int ok)
93{
94	enum rrdp_msg type = RRDP_END;
95	struct ibuf *b;
96
97	b = io_new_buffer();
98	io_simple_buffer(b, &type, sizeof(type));
99	io_simple_buffer(b, &id, sizeof(id));
100	io_simple_buffer(b, &ok, sizeof(ok));
101	io_close_buffer(&msgq, b);
102}
103
104/*
105 * Request an URI to be fetched via HTTPS.
106 * The main process will respond with a RRDP_HTTP_INI which includes
107 * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
108 * end of the request with the HTTP status code and last modified timestamp.
109 * If the request should not set the If-Modified-Since: header then last_mod
110 * should be set to NULL, else it should point to a proper date string.
111 */
112static void
113rrdp_http_req(unsigned int id, const char *uri, const char *last_mod)
114{
115	enum rrdp_msg type = RRDP_HTTP_REQ;
116	struct ibuf *b;
117
118	b = io_new_buffer();
119	io_simple_buffer(b, &type, sizeof(type));
120	io_simple_buffer(b, &id, sizeof(id));
121	io_str_buffer(b, uri);
122	io_str_buffer(b, last_mod);
123	io_close_buffer(&msgq, b);
124}
125
126/*
127 * Send the session state to the main process so it gets stored.
128 */
129static void
130rrdp_state_send(struct rrdp *s)
131{
132	enum rrdp_msg type = RRDP_SESSION;
133	struct ibuf *b;
134
135	b = io_new_buffer();
136	io_simple_buffer(b, &type, sizeof(type));
137	io_simple_buffer(b, &s->id, sizeof(s->id));
138	rrdp_session_buffer(b, s->current);
139	io_close_buffer(&msgq, b);
140}
141
142/*
143 * Inform parent to clear the RRDP repository before start of snapshot.
144 */
145static void
146rrdp_clear_repo(struct rrdp *s)
147{
148	enum rrdp_msg type = RRDP_CLEAR;
149	struct ibuf *b;
150
151	b = io_new_buffer();
152	io_simple_buffer(b, &type, sizeof(type));
153	io_simple_buffer(b, &s->id, sizeof(s->id));
154	io_close_buffer(&msgq, b);
155}
156
157/*
158 * Send a blob of data to the main process to store it in the repository.
159 */
160void
161rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml,
162    unsigned char *data, size_t datasz)
163{
164	enum rrdp_msg type = RRDP_FILE;
165	struct ibuf *b;
166
167	/* only send files if the fetch did not fail already */
168	if (s->file_failed == 0) {
169		b = io_new_buffer();
170		io_simple_buffer(b, &type, sizeof(type));
171		io_simple_buffer(b, &s->id, sizeof(s->id));
172		io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
173		if (pxml->type != PUB_ADD)
174			io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
175		io_str_buffer(b, pxml->uri);
176		io_buf_buffer(b, data, datasz);
177		io_close_buffer(&msgq, b);
178		s->file_pending++;
179	}
180}
181
182static void
183rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state)
184{
185	struct rrdp *s;
186
187	if ((s = calloc(1, sizeof(*s))) == NULL)
188		err(1, NULL);
189
190	s->infd = -1;
191	s->id = id;
192	s->local = local;
193	s->notifyuri = notify;
194	s->repository = state;
195	if ((s->current = calloc(1, sizeof(*s->current))) == NULL)
196		err(1, NULL);
197
198	s->state = RRDP_STATE_REQ;
199	if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
200		err(1, "XML_ParserCreate");
201
202	s->nxml = new_notification_xml(s->parser, s->repository, s->current,
203	    notify);
204
205	TAILQ_INSERT_TAIL(&states, s, entry);
206}
207
208static void
209rrdp_free(struct rrdp *s)
210{
211	if (s == NULL)
212		return;
213
214	TAILQ_REMOVE(&states, s, entry);
215
216	free_notification_xml(s->nxml);
217	free_snapshot_xml(s->sxml);
218	free_delta_xml(s->dxml);
219
220	if (s->parser)
221		XML_ParserFree(s->parser);
222	if (s->infd != -1)
223		close(s->infd);
224	free(s->notifyuri);
225	free(s->local);
226	free(s->last_mod);
227	rrdp_session_free(s->repository);
228	rrdp_session_free(s->current);
229
230	free(s);
231}
232
233static struct rrdp *
234rrdp_get(unsigned int id)
235{
236	struct rrdp *s;
237
238	TAILQ_FOREACH(s, &states, entry)
239		if (s->id == id)
240			break;
241	return s;
242}
243
244static void
245rrdp_failed(struct rrdp *s)
246{
247	unsigned int id = s->id;
248
249	/* reset file state before retrying */
250	s->file_failed = 0;
251
252	if (s->task == DELTA && !s->aborted) {
253		/* fallback to a snapshot as per RFC8182 */
254		free_delta_xml(s->dxml);
255		s->dxml = NULL;
256		rrdp_clear_repo(s);
257		s->sxml = new_snapshot_xml(s->parser, s->current, s);
258		s->task = SNAPSHOT;
259		s->state = RRDP_STATE_REQ;
260		logx("%s: delta sync failed, fallback to snapshot", s->local);
261	} else {
262		/*
263		 * TODO: update state to track recurring failures
264		 * and fall back to rsync after a while.
265		 * This should probably happen in the main process.
266		 */
267		rrdp_free(s);
268		rrdp_done(id, 0);
269	}
270}
271
272static void
273rrdp_finished(struct rrdp *s)
274{
275	unsigned int id = s->id;
276
277	/* check if all parts of the process have finished */
278	if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
279		return;
280
281	/* still some files pending */
282	if (s->file_pending > 0)
283		return;
284
285	if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) {
286		rrdp_failed(s);
287		return;
288	}
289
290	if (s->res == HTTP_OK) {
291		XML_Parser p = s->parser;
292
293		/*
294		 * Finalize parsing on success to be sure that
295		 * all of the XML is correct. Needs to be done here
296		 * since the call would most probably fail for non
297		 * successful data fetches.
298		 */
299		if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
300			warnx("%s: XML error at line %llu: %s", s->local,
301			    (unsigned long long)XML_GetCurrentLineNumber(p),
302			    XML_ErrorString(XML_GetErrorCode(p)));
303			rrdp_failed(s);
304			return;
305		}
306
307		/* If a file caused an error fail the update */
308		if (s->file_failed > 0) {
309			rrdp_failed(s);
310			return;
311		}
312
313		switch (s->task) {
314		case NOTIFICATION:
315			s->task = notification_done(s->nxml, s->last_mod);
316			s->last_mod = NULL;
317			switch (s->task) {
318			case NOTIFICATION:
319				logx("%s: repository not modified (%s#%lld)",
320				    s->local, s->repository->session_id,
321				    s->repository->serial);
322				rrdp_state_send(s);
323				rrdp_free(s);
324				rrdp_done(id, 1);
325				break;
326			case SNAPSHOT:
327				logx("%s: downloading snapshot (%s#%lld)",
328				    s->local, s->current->session_id,
329				    s->current->serial);
330				rrdp_clear_repo(s);
331				s->sxml = new_snapshot_xml(p, s->current, s);
332				s->state = RRDP_STATE_REQ;
333				break;
334			case DELTA:
335				logx("%s: downloading %lld deltas (%s#%lld)",
336				    s->local,
337				    s->repository->serial - s->current->serial,
338				    s->current->session_id, s->current->serial);
339				s->dxml = new_delta_xml(p, s->current, s);
340				s->state = RRDP_STATE_REQ;
341				break;
342			}
343			break;
344		case SNAPSHOT:
345			rrdp_state_send(s);
346			rrdp_free(s);
347			rrdp_done(id, 1);
348			break;
349		case DELTA:
350			if (notification_delta_done(s->nxml)) {
351				/* finished */
352				rrdp_state_send(s);
353				rrdp_free(s);
354				rrdp_done(id, 1);
355			} else {
356				/* reset delta parser for next delta */
357				free_delta_xml(s->dxml);
358				s->dxml = new_delta_xml(p, s->current, s);
359				s->state = RRDP_STATE_REQ;
360			}
361			break;
362		}
363	} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
364		logx("%s: notification file not modified (%s#%lld)", s->local,
365		    s->repository->session_id, s->repository->serial);
366		/* no need to update state file */
367		rrdp_free(s);
368		rrdp_done(id, 1);
369	} else {
370		rrdp_failed(s);
371	}
372}
373
374static void
375rrdp_abort_req(struct rrdp *s)
376{
377	unsigned int id = s->id;
378
379	s->aborted = 1;
380	if (s->state == RRDP_STATE_REQ) {
381		/* nothing is pending, just abort */
382		rrdp_free(s);
383		rrdp_done(id, 1);
384		return;
385	}
386	if (s->state == RRDP_STATE_WAIT)
387		/* wait for HTTP_INI which will progress the state */
388		return;
389
390	/*
391	 * RRDP_STATE_PARSE or later, close infd, abort parser but
392	 * wait for HTTP_FIN and file_pending to drop to 0.
393	 */
394	if (s->infd != -1) {
395		close(s->infd);
396		s->infd = -1;
397		s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR;
398	}
399	rrdp_finished(s);
400}
401
402static void
403rrdp_input_handler(int fd)
404{
405	static struct ibuf *inbuf;
406	struct rrdp_session *state;
407	char *local, *notify, *last_mod;
408	struct ibuf *b;
409	struct rrdp *s;
410	enum rrdp_msg type;
411	enum http_result res;
412	unsigned int id;
413	int ok;
414
415	b = io_buf_recvfd(fd, &inbuf);
416	if (b == NULL)
417		return;
418
419	io_read_buf(b, &type, sizeof(type));
420	io_read_buf(b, &id, sizeof(id));
421
422	switch (type) {
423	case RRDP_START:
424		if (ibuf_fd_avail(b))
425			errx(1, "received unexpected fd");
426		io_read_str(b, &local);
427		io_read_str(b, &notify);
428		state = rrdp_session_read(b);
429		rrdp_new(id, local, notify, state);
430		break;
431	case RRDP_HTTP_INI:
432		s = rrdp_get(id);
433		if (s == NULL)
434			errx(1, "http ini, rrdp session %u does not exist", id);
435		if (s->state != RRDP_STATE_WAIT)
436			errx(1, "%s: bad internal state", s->local);
437		s->infd = ibuf_fd_get(b);
438		if (s->infd == -1)
439			errx(1, "expected fd not received");
440		s->state = RRDP_STATE_PARSE;
441		if (s->aborted) {
442			rrdp_abort_req(s);
443			break;
444		}
445		break;
446	case RRDP_HTTP_FIN:
447		io_read_buf(b, &res, sizeof(res));
448		io_read_str(b, &last_mod);
449		if (ibuf_fd_avail(b))
450			errx(1, "received unexpected fd");
451
452		s = rrdp_get(id);
453		if (s == NULL)
454			errx(1, "http fin, rrdp session %u does not exist", id);
455		if (!(s->state & RRDP_STATE_PARSE))
456			errx(1, "%s: bad internal state", s->local);
457		s->state |= RRDP_STATE_HTTP_DONE;
458		s->res = res;
459		free(s->last_mod);
460		s->last_mod = last_mod;
461		rrdp_finished(s);
462		break;
463	case RRDP_FILE:
464		s = rrdp_get(id);
465		if (s == NULL)
466			errx(1, "file, rrdp session %u does not exist", id);
467		if (ibuf_fd_avail(b))
468			errx(1, "received unexpected fd");
469		io_read_buf(b, &ok, sizeof(ok));
470		if (ok != 1)
471			s->file_failed++;
472		s->file_pending--;
473		if (s->file_pending == 0)
474			rrdp_finished(s);
475		break;
476	case RRDP_ABORT:
477		if (ibuf_fd_avail(b))
478			errx(1, "received unexpected fd");
479		s = rrdp_get(id);
480		if (s != NULL)
481			rrdp_abort_req(s);
482		break;
483	default:
484		errx(1, "unexpected message %d", type);
485	}
486	ibuf_free(b);
487}
488
489static void
490rrdp_data_handler(struct rrdp *s)
491{
492	char buf[READ_BUF_SIZE];
493	XML_Parser p = s->parser;
494	ssize_t len;
495
496	len = read(s->infd, buf, sizeof(buf));
497	if (len == -1) {
498		warn("%s: read failure", s->local);
499		rrdp_abort_req(s);
500		return;
501	}
502	if ((s->state & RRDP_STATE_PARSE) == 0)
503		errx(1, "%s: bad parser state", s->local);
504	if (len == 0) {
505		/* parser stage finished */
506		close(s->infd);
507		s->infd = -1;
508
509		if (s->task != NOTIFICATION) {
510			char h[SHA256_DIGEST_LENGTH];
511
512			SHA256_Final(h, &s->ctx);
513			if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
514				s->state |= RRDP_STATE_PARSE_ERROR;
515				warnx("%s: bad message digest", s->local);
516			}
517		}
518
519		s->state |= RRDP_STATE_PARSE_DONE;
520		rrdp_finished(s);
521		return;
522	}
523
524	/* parse and maybe hash the bytes just read */
525	if (s->task != NOTIFICATION)
526		SHA256_Update(&s->ctx, buf, len);
527	if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
528	    XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
529		warnx("%s: parse error at line %llu: %s", s->local,
530		    (unsigned long long)XML_GetCurrentLineNumber(p),
531		    XML_ErrorString(XML_GetErrorCode(p)));
532		s->state |= RRDP_STATE_PARSE_ERROR;
533	}
534}
535
536void
537proc_rrdp(int fd)
538{
539	struct pollfd pfds[MAX_SESSIONS + 1];
540	struct rrdp *s, *ns;
541	size_t i;
542
543	if (pledge("stdio recvfd", NULL) == -1)
544		err(1, "pledge");
545
546	msgbuf_init(&msgq);
547	msgq.fd = fd;
548
549	for (;;) {
550		i = 1;
551		memset(&pfds, 0, sizeof(pfds));
552		TAILQ_FOREACH(s, &states, entry) {
553			if (i >= MAX_SESSIONS + 1) {
554				/* not enough sessions, wait for better times */
555				s->pfd = NULL;
556				continue;
557			}
558			/* request new assets when there are free sessions */
559			if (s->state == RRDP_STATE_REQ) {
560				const char *uri;
561				switch (s->task) {
562				case NOTIFICATION:
563					rrdp_http_req(s->id, s->notifyuri,
564					    s->repository->last_mod);
565					break;
566				case SNAPSHOT:
567				case DELTA:
568					uri = notification_get_next(s->nxml,
569					    s->hash, sizeof(s->hash),
570					    s->task);
571					SHA256_Init(&s->ctx);
572					rrdp_http_req(s->id, uri, NULL);
573					break;
574				}
575				s->state = RRDP_STATE_WAIT;
576			}
577			s->pfd = pfds + i++;
578			s->pfd->fd = s->infd;
579			s->pfd->events = POLLIN;
580		}
581
582		/*
583		 * Update main fd last.
584		 * The previous loop may have enqueue messages.
585		 */
586		pfds[0].fd = fd;
587		pfds[0].events = POLLIN;
588		if (msgq.queued)
589			pfds[0].events |= POLLOUT;
590
591		if (poll(pfds, i, INFTIM) == -1) {
592			if (errno == EINTR)
593				continue;
594			err(1, "poll");
595		}
596
597		if (pfds[0].revents & POLLHUP)
598			break;
599		if (pfds[0].revents & POLLOUT) {
600			switch (msgbuf_write(&msgq)) {
601			case 0:
602				errx(1, "write: connection closed");
603			case -1:
604				err(1, "write");
605			}
606		}
607		if (pfds[0].revents & POLLIN)
608			rrdp_input_handler(fd);
609
610		TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
611			if (s->pfd == NULL)
612				continue;
613			if (s->pfd->revents != 0)
614				rrdp_data_handler(s);
615		}
616	}
617
618	exit(0);
619}
620