1/*-
2 * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 * $FreeBSD$
27 */
28
29#include <sys/param.h>
30#include <sys/select.h>
31#include <sys/socket.h>
32#include <sys/types.h>
33#include <sys/stat.h>
34
35#include <assert.h>
36#include <err.h>
37#include <errno.h>
38#include <inttypes.h>
39#include <netdb.h>
40#include <pthread.h>
41#include <signal.h>
42#include <stdarg.h>
43#include <stddef.h>
44#include <stdio.h>
45#include <stdlib.h>
46#include <string.h>
47#include <unistd.h>
48
49#include "auth.h"
50#include "config.h"
51#include "detailer.h"
52#include "fattr.h"
53#include "fixups.h"
54#include "globtree.h"
55#include "keyword.h"
56#include "lister.h"
57#include "misc.h"
58#include "mux.h"
59#include "proto.h"
60#include "queue.h"
61#include "stream.h"
62#include "threads.h"
63#include "updater.h"
64
65struct killer {
66	pthread_t thread;
67	sigset_t sigset;
68	struct mux *mux;
69	int killedby;
70};
71
72static void		 killer_start(struct killer *, struct mux *);
73static void		*killer_run(void *);
74static void		 killer_stop(struct killer *);
75
76static int		 proto_waitconnect(int);
77static int		 proto_greet(struct config *);
78static int		 proto_negproto(struct config *);
79static int		 proto_fileattr(struct config *);
80static int		 proto_xchgcoll(struct config *);
81static struct mux	*proto_mux(struct config *);
82
83static int		 proto_escape(struct stream *, const char *);
84static void		 proto_unescape(char *);
85
86static int
87proto_waitconnect(int s)
88{
89	fd_set readfd;
90	socklen_t len;
91	int error, rv, soerror;
92
93	FD_ZERO(&readfd);
94	FD_SET(s, &readfd);
95
96	do {
97		rv = select(s + 1, &readfd, NULL, NULL, NULL);
98	} while (rv == -1 && errno == EINTR);
99	if (rv == -1)
100		return (-1);
101	/* Check that the connection was really successful. */
102	len = sizeof(soerror);
103	error = getsockopt(s, SOL_SOCKET, SO_ERROR, &soerror, &len);
104	if (error) {
105		/* We have no choice but faking an error here. */
106		errno = ECONNREFUSED;
107		return (-1);
108	}
109	if (soerror) {
110		errno = soerror;
111		return (-1);
112	}
113	return (0);
114}
115
116/* Connect to the CVSup server. */
117int
118proto_connect(struct config *config, int family, uint16_t port)
119{
120	char addrbuf[NI_MAXHOST];
121	/* Enough to hold sizeof("cvsup") or any port number. */
122	char servname[8];
123	struct addrinfo *res, *ai, hints;
124	int error, opt, s;
125
126	s = -1;
127	if (port != 0)
128		snprintf(servname, sizeof(servname), "%d", port);
129	else {
130		strncpy(servname, "cvsup", sizeof(servname) - 1);
131		servname[sizeof(servname) - 1] = '\0';
132	}
133	memset(&hints, 0, sizeof(hints));
134	hints.ai_family = family;
135	hints.ai_socktype = SOCK_STREAM;
136	error = getaddrinfo(config->host, servname, &hints, &res);
137	/*
138	 * Try with the hardcoded port number for OSes that don't
139	 * have cvsup defined in the /etc/services file.
140	 */
141	if (error == EAI_SERVICE) {
142		strncpy(servname, "5999", sizeof(servname) - 1);
143		servname[sizeof(servname) - 1] = '\0';
144		error = getaddrinfo(config->host, servname, &hints, &res);
145	}
146	if (error) {
147		lprintf(0, "Name lookup failure for \"%s\": %s\n", config->host,
148		    gai_strerror(error));
149		return (STATUS_TRANSIENTFAILURE);
150	}
151	for (ai = res; ai != NULL; ai = ai->ai_next) {
152		s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
153		if (s != -1) {
154			error = 0;
155			if (config->laddr != NULL) {
156				opt = 1;
157				(void)setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
158				    &opt, sizeof(opt));
159				error = bind(s, config->laddr,
160				    config->laddrlen);
161			}
162			if (!error) {
163				error = connect(s, ai->ai_addr, ai->ai_addrlen);
164				if (error && errno == EINTR)
165					error = proto_waitconnect(s);
166			}
167			if (error)
168				close(s);
169		}
170		(void)getnameinfo(ai->ai_addr, ai->ai_addrlen, addrbuf,
171		    sizeof(addrbuf), NULL, 0, NI_NUMERICHOST);
172		if (s == -1 || error) {
173			lprintf(0, "Cannot connect to %s: %s\n", addrbuf,
174			    strerror(errno));
175			continue;
176		}
177		lprintf(1, "Connected to %s\n", addrbuf);
178		freeaddrinfo(res);
179		config->socket = s;
180		return (STATUS_SUCCESS);
181	}
182	freeaddrinfo(res);
183	return (STATUS_TRANSIENTFAILURE);
184}
185
186/* Greet the server. */
187static int
188proto_greet(struct config *config)
189{
190	char *line, *cmd, *msg, *swver;
191	struct stream *s;
192
193	s = config->server;
194	line = stream_getln(s, NULL);
195	cmd = proto_get_ascii(&line);
196	if (cmd == NULL)
197		goto bad;
198	if (strcmp(cmd, "OK") == 0) {
199		(void)proto_get_ascii(&line);	/* major number */
200		(void)proto_get_ascii(&line);	/* minor number */
201		swver = proto_get_ascii(&line);
202	} else if (strcmp(cmd, "!") == 0) {
203		msg = proto_get_rest(&line);
204		if (msg == NULL)
205			goto bad;
206		lprintf(-1, "Rejected by server: %s\n", msg);
207		return (STATUS_TRANSIENTFAILURE);
208	} else
209		goto bad;
210	lprintf(2, "Server software version: %s\n",
211	    swver != NULL ? swver : ".");
212	return (STATUS_SUCCESS);
213bad:
214	lprintf(-1, "Invalid greeting from server\n");
215	return (STATUS_FAILURE);
216}
217
218/* Negotiate protocol version with the server. */
219static int
220proto_negproto(struct config *config)
221{
222	struct stream *s;
223	char *cmd, *line, *msg;
224	int error, maj, min;
225
226	s = config->server;
227	proto_printf(s, "PROTO %d %d %s\n", PROTO_MAJ, PROTO_MIN, PROTO_SWVER);
228	stream_flush(s);
229	line = stream_getln(s, NULL);
230	cmd = proto_get_ascii(&line);
231	if (cmd == NULL || line == NULL)
232		goto bad;
233	if (strcmp(cmd, "!") == 0) {
234		msg = proto_get_rest(&line);
235		lprintf(-1, "Protocol negotiation failed: %s\n", msg);
236		return (1);
237	} else if (strcmp(cmd, "PROTO") != 0)
238		goto bad;
239	error = proto_get_int(&line, &maj, 10);
240	if (!error)
241		error = proto_get_int(&line, &min, 10);
242	if (error)
243		goto bad;
244	if (maj != PROTO_MAJ || min != PROTO_MIN) {
245		lprintf(-1, "Server protocol version %d.%d not supported "
246		    "by client\n", maj, min);
247		return (STATUS_FAILURE);
248	}
249	return (STATUS_SUCCESS);
250bad:
251	lprintf(-1, "Invalid PROTO command from server\n");
252	return (STATUS_FAILURE);
253}
254
255/*
256 * File attribute support negotiation.
257 */
258static int
259proto_fileattr(struct config *config)
260{
261	fattr_support_t support;
262	struct stream *s;
263	char *line, *cmd;
264	int error, i, n, attr;
265
266	s = config->server;
267	lprintf(2, "Negotiating file attribute support\n");
268	proto_printf(s, "ATTR %d\n", FT_NUMBER);
269	for (i = 0; i < FT_NUMBER; i++)
270		proto_printf(s, "%x\n", fattr_supported(i));
271	proto_printf(s, ".\n");
272	stream_flush(s);
273	line = stream_getln(s, NULL);
274	if (line == NULL)
275		goto bad;
276	cmd = proto_get_ascii(&line);
277	error = proto_get_int(&line, &n, 10);
278	if (error || line != NULL || strcmp(cmd, "ATTR") != 0 || n > FT_NUMBER)
279		goto bad;
280	for (i = 0; i < n; i++) {
281		line = stream_getln(s, NULL);
282		if (line == NULL)
283			goto bad;
284		error = proto_get_int(&line, &attr, 16);
285		if (error)
286			goto bad;
287		support[i] = fattr_supported(i) & attr;
288	}
289	for (i = n; i < FT_NUMBER; i++)
290		support[i] = 0;
291	line = stream_getln(s, NULL);
292	if (line == NULL || strcmp(line, ".") != 0)
293		goto bad;
294	memcpy(config->fasupport, support, sizeof(config->fasupport));
295	return (STATUS_SUCCESS);
296bad:
297	lprintf(-1, "Protocol error negotiating attribute support\n");
298	return (STATUS_FAILURE);
299}
300
301/*
302 * Exchange collection information.
303 */
304static int
305proto_xchgcoll(struct config *config)
306{
307	struct coll *coll;
308	struct stream *s;
309	struct globtree *diraccept, *dirrefuse;
310	struct globtree *fileaccept, *filerefuse;
311	char *line, *cmd, *collname, *pat;
312	char *msg, *release, *ident, *rcskey, *prefix;
313	size_t i, len;
314	int error, flags, options;
315
316	s = config->server;
317	lprintf(2, "Exchanging collection information\n");
318	STAILQ_FOREACH(coll, &config->colls, co_next) {
319		if (coll->co_options & CO_SKIP)
320			continue;
321		proto_printf(s, "COLL %s %s %o %d\n", coll->co_name,
322		    coll->co_release, coll->co_umask, coll->co_options);
323		for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
324		    proto_printf(s, "ACC %s\n",
325			pattlist_get(coll->co_accepts, i));
326		}
327		for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
328		    proto_printf(s, "REF %s\n",
329			pattlist_get(coll->co_refusals, i));
330		}
331		proto_printf(s, ".\n");
332	}
333	proto_printf(s, ".\n");
334	stream_flush(s);
335
336	STAILQ_FOREACH(coll, &config->colls, co_next) {
337		if (coll->co_options & CO_SKIP)
338			continue;
339		coll->co_norsync = globtree_false();
340		line = stream_getln(s, NULL);
341		if (line == NULL)
342			goto bad;
343		cmd = proto_get_ascii(&line);
344		collname = proto_get_ascii(&line);
345		release = proto_get_ascii(&line);
346		error = proto_get_int(&line, &options, 10);
347		if (error || line != NULL)
348			goto bad;
349		if (strcmp(cmd, "COLL") != 0 ||
350		    strcmp(collname, coll->co_name) != 0 ||
351		    strcmp(release, coll->co_release) != 0)
352			goto bad;
353		coll->co_options =
354		    (coll->co_options | (options & CO_SERVMAYSET)) &
355		    ~(~options & CO_SERVMAYCLEAR);
356		while ((line = stream_getln(s, NULL)) != NULL) {
357		 	if (strcmp(line, ".") == 0)
358				break;
359			cmd = proto_get_ascii(&line);
360			if (cmd == NULL)
361				goto bad;
362			if (strcmp(cmd, "!") == 0) {
363				msg = proto_get_rest(&line);
364				if (msg == NULL)
365					goto bad;
366				lprintf(-1, "Server message: %s\n", msg);
367			} else if (strcmp(cmd, "PRFX") == 0) {
368				prefix = proto_get_ascii(&line);
369				if (prefix == NULL || line != NULL)
370					goto bad;
371				coll->co_cvsroot = xstrdup(prefix);
372			} else if (strcmp(cmd, "KEYALIAS") == 0) {
373				ident = proto_get_ascii(&line);
374				rcskey = proto_get_ascii(&line);
375				if (rcskey == NULL || line != NULL)
376					goto bad;
377				error = keyword_alias(coll->co_keyword, ident,
378				    rcskey);
379				if (error)
380					goto bad;
381			} else if (strcmp(cmd, "KEYON") == 0) {
382				ident = proto_get_ascii(&line);
383				if (ident == NULL || line != NULL)
384					goto bad;
385				error = keyword_enable(coll->co_keyword, ident);
386				if (error)
387					goto bad;
388			} else if (strcmp(cmd, "KEYOFF") == 0) {
389				ident = proto_get_ascii(&line);
390				if (ident == NULL || line != NULL)
391					goto bad;
392				error = keyword_disable(coll->co_keyword,
393				    ident);
394				if (error)
395					goto bad;
396			} else if (strcmp(cmd, "NORS") == 0) {
397				pat = proto_get_ascii(&line);
398				if (pat == NULL || line != NULL)
399					goto bad;
400				coll->co_norsync = globtree_or(coll->co_norsync,
401				    globtree_match(pat, FNM_PATHNAME));
402			} else if (strcmp(cmd, "RNORS") == 0) {
403				pat = proto_get_ascii(&line);
404				if (pat == NULL || line != NULL)
405					goto bad;
406				coll->co_norsync = globtree_or(coll->co_norsync,
407				    globtree_match(pat, FNM_PATHNAME |
408				    FNM_LEADING_DIR));
409			} else
410				goto bad;
411		}
412		if (line == NULL)
413			goto bad;
414		keyword_prepare(coll->co_keyword);
415
416		diraccept = globtree_true();
417		fileaccept = globtree_true();
418		dirrefuse = globtree_false();
419		filerefuse = globtree_false();
420
421		if (pattlist_size(coll->co_accepts) > 0) {
422			globtree_free(diraccept);
423			globtree_free(fileaccept);
424			diraccept = globtree_false();
425			fileaccept = globtree_false();
426			flags = FNM_PATHNAME | FNM_LEADING_DIR |
427			    FNM_PREFIX_DIRS;
428			for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
429				pat = pattlist_get(coll->co_accepts, i);
430				diraccept = globtree_or(diraccept,
431				    globtree_match(pat, flags));
432
433				len = strlen(pat);
434				if (coll->co_options & CO_CHECKOUTMODE &&
435				    (len == 0 || pat[len - 1] != '*')) {
436					/* We must modify the pattern so that it
437					   refers to the RCS file, rather than
438					   the checked-out file. */
439					xasprintf(&pat, "%s,v", pat);
440					fileaccept = globtree_or(fileaccept,
441					    globtree_match(pat, flags));
442					free(pat);
443				} else {
444					fileaccept = globtree_or(fileaccept,
445					    globtree_match(pat, flags));
446				}
447			}
448		}
449
450		for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
451			pat = pattlist_get(coll->co_refusals, i);
452			dirrefuse = globtree_or(dirrefuse,
453			    globtree_match(pat, 0));
454			len = strlen(pat);
455			if (coll->co_options & CO_CHECKOUTMODE &&
456			    (len == 0 || pat[len - 1] != '*')) {
457				/* We must modify the pattern so that it refers
458				   to the RCS file, rather than the checked-out
459				   file. */
460				xasprintf(&pat, "%s,v", pat);
461				filerefuse = globtree_or(filerefuse,
462				    globtree_match(pat, 0));
463				free(pat);
464			} else {
465				filerefuse = globtree_or(filerefuse,
466				    globtree_match(pat, 0));
467			}
468		}
469
470		coll->co_dirfilter = globtree_and(diraccept,
471		    globtree_not(dirrefuse));
472		coll->co_filefilter = globtree_and(fileaccept,
473		    globtree_not(filerefuse));
474
475		/* Set up a mask of file attributes that we don't want to sync
476		   with the server. */
477		if (!(coll->co_options & CO_SETOWNER))
478			coll->co_attrignore |= FA_OWNER | FA_GROUP;
479		if (!(coll->co_options & CO_SETMODE))
480			coll->co_attrignore |= FA_MODE;
481		if (!(coll->co_options & CO_SETFLAGS))
482			coll->co_attrignore |= FA_FLAGS;
483	}
484	return (STATUS_SUCCESS);
485bad:
486	lprintf(-1, "Protocol error during collection exchange\n");
487	return (STATUS_FAILURE);
488}
489
490static struct mux *
491proto_mux(struct config *config)
492{
493	struct mux *m;
494	struct stream *s, *wr;
495	struct chan *chan0, *chan1;
496	int id;
497
498	s = config->server;
499	lprintf(2, "Establishing multiplexed-mode data connection\n");
500	proto_printf(s, "MUX\n");
501	stream_flush(s);
502	m = mux_open(config->socket, &chan0);
503	if (m == NULL) {
504		lprintf(-1, "Cannot open the multiplexer\n");
505		return (NULL);
506	}
507	id = chan_listen(m);
508	if (id == -1) {
509		lprintf(-1, "ChannelMux.Listen failed: %s\n", strerror(errno));
510		mux_close(m);
511		return (NULL);
512	}
513	wr = stream_open(chan0, NULL, (stream_writefn_t *)chan_write, NULL);
514	proto_printf(wr, "CHAN %d\n", id);
515	stream_close(wr);
516	chan1 = chan_accept(m, id);
517	if (chan1 == NULL) {
518		lprintf(-1, "ChannelMux.Accept failed: %s\n", strerror(errno));
519		mux_close(m);
520		return (NULL);
521	}
522	config->chan0 = chan0;
523	config->chan1 = chan1;
524	return (m);
525}
526
527/*
528 * Initializes the connection to the CVSup server, that is handle
529 * the protocol negotiation, logging in, exchanging file attributes
530 * support and collections information, and finally run the update
531 * session.
532 */
533int
534proto_run(struct config *config)
535{
536	struct thread_args lister_args;
537	struct thread_args detailer_args;
538	struct thread_args updater_args;
539	struct thread_args *args;
540	struct killer killer;
541	struct threads *workers;
542	struct mux *m;
543	int i, status;
544
545	/*
546	 * We pass NULL for the close() function because we'll reuse
547	 * the socket after the stream is closed.
548	 */
549	config->server = stream_open_fd(config->socket, stream_read_fd,
550	    stream_write_fd, NULL);
551	status = proto_greet(config);
552	if (status == STATUS_SUCCESS)
553		status = proto_negproto(config);
554	if (status == STATUS_SUCCESS)
555		status = auth_login(config);
556	if (status == STATUS_SUCCESS)
557		status = proto_fileattr(config);
558	if (status == STATUS_SUCCESS)
559		status = proto_xchgcoll(config);
560	if (status != STATUS_SUCCESS)
561		return (status);
562
563	/* Multi-threaded action starts here. */
564	m = proto_mux(config);
565	if (m == NULL)
566		return (STATUS_FAILURE);
567
568	stream_close(config->server);
569	config->server = NULL;
570	config->fixups = fixups_new();
571	killer_start(&killer, m);
572
573	/* Start the worker threads. */
574	workers = threads_new();
575	args = &lister_args;
576	args->config = config;
577	args->status = -1;
578	args->errmsg = NULL;
579	args->rd = NULL;
580	args->wr = stream_open(config->chan0,
581	    NULL, (stream_writefn_t *)chan_write, NULL);
582	threads_create(workers, lister, args);
583
584	args = &detailer_args;
585	args->config = config;
586	args->status = -1;
587	args->errmsg = NULL;
588	args->rd = stream_open(config->chan0,
589	    (stream_readfn_t *)chan_read, NULL, NULL);
590	args->wr = stream_open(config->chan1,
591	    NULL, (stream_writefn_t *)chan_write, NULL);
592	threads_create(workers, detailer, args);
593
594	args = &updater_args;
595	args->config = config;
596	args->status = -1;
597	args->errmsg = NULL;
598	args->rd = stream_open(config->chan1,
599	    (stream_readfn_t *)chan_read, NULL, NULL);
600	args->wr = NULL;
601	threads_create(workers, updater, args);
602
603	lprintf(2, "Running\n");
604	/* Wait for all the worker threads to finish. */
605	status = STATUS_SUCCESS;
606	for (i = 0; i < 3; i++) {
607		args = threads_wait(workers);
608		if (args->rd != NULL)
609			stream_close(args->rd);
610		if (args->wr != NULL)
611			stream_close(args->wr);
612		if (args->status != STATUS_SUCCESS) {
613			assert(args->errmsg != NULL);
614			if (status == STATUS_SUCCESS) {
615				status = args->status;
616				/* Shutdown the multiplexer to wake up all
617				   the other threads. */
618				mux_shutdown(m, args->errmsg, status);
619			}
620			free(args->errmsg);
621		}
622	}
623	threads_free(workers);
624	if (status == STATUS_SUCCESS) {
625		lprintf(2, "Shutting down connection to server\n");
626		chan_close(config->chan0);
627		chan_close(config->chan1);
628		chan_wait(config->chan0);
629		chan_wait(config->chan1);
630		mux_shutdown(m, NULL, STATUS_SUCCESS);
631	}
632	killer_stop(&killer);
633	fixups_free(config->fixups);
634	status = mux_close(m);
635	if (status == STATUS_SUCCESS) {
636		lprintf(1, "Finished successfully\n");
637	} else if (status == STATUS_INTERRUPTED) {
638		lprintf(-1, "Interrupted\n");
639		if (killer.killedby != -1)
640			kill(getpid(), killer.killedby);
641	}
642	return (status);
643}
644
645/*
646 * Write a string into the stream, escaping characters as needed.
647 * Characters escaped:
648 *
649 * SPACE	-> "\_"
650 * TAB		->  "\t"
651 * NEWLINE	-> "\n"
652 * CR		-> "\r"
653 * \		-> "\\"
654 */
655static int
656proto_escape(struct stream *wr, const char *s)
657{
658	size_t len;
659	ssize_t n;
660	char c;
661
662	/* Handle characters that need escaping. */
663	do {
664		len = strcspn(s, " \t\r\n\\");
665		n = stream_write(wr, s, len);
666		if (n == -1)
667			return (-1);
668		c = s[len];
669		switch (c) {
670		case ' ':
671			n = stream_write(wr, "\\_", 2);
672			break;
673		case '\t':
674			n = stream_write(wr, "\\t", 2);
675			break;
676		case '\r':
677			n = stream_write(wr, "\\r", 2);
678			break;
679		case '\n':
680			n = stream_write(wr, "\\n", 2);
681			break;
682		case '\\':
683			n = stream_write(wr, "\\\\", 2);
684			break;
685		}
686		if (n == -1)
687			return (-1);
688		s += len + 1;
689	} while (c != '\0');
690	return (0);
691}
692
693/*
694 * A simple printf() implementation specifically tailored for csup.
695 * List of the supported formats:
696 *
697 * %c		Print a char.
698 * %d or %i	Print an int as decimal.
699 * %x		Print an int as hexadecimal.
700 * %o		Print an int as octal.
701 * %t		Print a time_t as decimal.
702 * %s		Print a char * escaping some characters as needed.
703 * %S		Print a char * without escaping.
704 * %f		Print an encoded struct fattr *.
705 * %F		Print an encoded struct fattr *, specifying the supported
706 * 		attributes.
707 */
708int
709proto_printf(struct stream *wr, const char *format, ...)
710{
711	fattr_support_t *support;
712	long long longval;
713	struct fattr *fa;
714	const char *fmt;
715	va_list ap;
716	char *cp, *s, *attr;
717	ssize_t n;
718	size_t size;
719	off_t off;
720	int rv, val, ignore;
721	char c;
722
723	n = 0;
724	rv = 0;
725	fmt = format;
726	va_start(ap, format);
727	while ((cp = strchr(fmt, '%')) != NULL) {
728		if (cp > fmt) {
729			n = stream_write(wr, fmt, cp - fmt);
730			if (n == -1) {
731				va_end(ap);
732				return (-1);
733			}
734		}
735		if (*++cp == '\0')
736			goto done;
737		switch (*cp) {
738		case 'c':
739			c = va_arg(ap, int);
740			rv = stream_printf(wr, "%c", c);
741			break;
742		case 'd':
743		case 'i':
744			val = va_arg(ap, int);
745			rv = stream_printf(wr, "%d", val);
746			break;
747		case 'x':
748			val = va_arg(ap, int);
749			rv = stream_printf(wr, "%x", val);
750			break;
751		case 'o':
752			val = va_arg(ap, int);
753			rv = stream_printf(wr, "%o", val);
754			break;
755		case 'O':
756			off = va_arg(ap, off_t);
757			rv = stream_printf(wr, "%" PRId64, off);
758			break;
759		case 'S':
760			s = va_arg(ap, char *);
761			assert(s != NULL);
762			rv = stream_printf(wr, "%s", s);
763			break;
764		case 's':
765			s = va_arg(ap, char *);
766			assert(s != NULL);
767			rv = proto_escape(wr, s);
768			break;
769		case 't':
770			longval = (long long)va_arg(ap, time_t);
771			rv = stream_printf(wr, "%lld", longval);
772			break;
773		case 'f':
774			fa = va_arg(ap, struct fattr *);
775			attr = fattr_encode(fa, NULL, 0);
776			rv = proto_escape(wr, attr);
777			free(attr);
778			break;
779		case 'F':
780			fa = va_arg(ap, struct fattr *);
781			support = va_arg(ap, fattr_support_t *);
782			ignore = va_arg(ap, int);
783			attr = fattr_encode(fa, *support, ignore);
784			rv = proto_escape(wr, attr);
785			free(attr);
786			break;
787		case 'z':
788			size = va_arg(ap, size_t);
789			rv = stream_printf(wr, "%zu", size);
790			break;
791
792		case '%':
793			n = stream_write(wr, "%", 1);
794			if (n == -1) {
795				va_end(ap);
796				return (-1);
797			}
798			break;
799		}
800		if (rv == -1) {
801			va_end(ap);
802			return (-1);
803		}
804		fmt = cp + 1;
805	}
806	if (*fmt != '\0') {
807		rv = stream_printf(wr, "%s", fmt);
808		if (rv == -1) {
809			va_end(ap);
810			return (-1);
811		}
812	}
813done:
814	va_end(ap);
815	return (0);
816}
817
818/*
819 * Unescape the string, see proto_escape().
820 */
821static void
822proto_unescape(char *s)
823{
824	char *cp, *cp2;
825
826	cp = s;
827	while ((cp = strchr(cp, '\\')) != NULL) {
828		switch (cp[1]) {
829		case '_':
830			*cp = ' ';
831			break;
832		case 't':
833			*cp = '\t';
834			break;
835		case 'r':
836			*cp = '\r';
837			break;
838		case 'n':
839			*cp = '\n';
840			break;
841		case '\\':
842			*cp = '\\';
843			break;
844		default:
845			*cp = *(cp + 1);
846		}
847		cp2 = ++cp;
848		while (*cp2 != '\0') {
849			*cp2 = *(cp2 + 1);
850			cp2++;
851		}
852	}
853}
854
855/*
856 * Get an ascii token in the string.
857 */
858char *
859proto_get_ascii(char **s)
860{
861	char *ret;
862
863	ret = strsep(s, " ");
864	if (ret == NULL)
865		return (NULL);
866	/* Make sure we disallow 0-length fields. */
867	if (*ret == '\0') {
868		*s = NULL;
869		return (NULL);
870	}
871	proto_unescape(ret);
872	return (ret);
873}
874
875/*
876 * Get the rest of the string.
877 */
878char *
879proto_get_rest(char **s)
880{
881	char *ret;
882
883	if (s == NULL)
884		return (NULL);
885	ret = *s;
886	proto_unescape(ret);
887	*s = NULL;
888	return (ret);
889}
890
891/*
892 * Get an int token.
893 */
894int
895proto_get_int(char **s, int *val, int base)
896{
897	char *cp;
898	int error;
899
900	cp = proto_get_ascii(s);
901	if (cp == NULL)
902		return (-1);
903	error = asciitoint(cp, val, base);
904	return (error);
905}
906
907/*
908 * Get a size_t token.
909 */
910int
911proto_get_sizet(char **s, size_t *val, int base)
912{
913	unsigned long long tmp;
914	char *cp, *end;
915
916	cp = proto_get_ascii(s);
917	if (cp == NULL)
918		return (-1);
919	errno = 0;
920	tmp = strtoll(cp, &end, base);
921	if (errno || *end != '\0')
922		return (-1);
923	*val = (size_t)tmp;
924	return (0);
925}
926
927/*
928 * Get a time_t token.
929 *
930 * Ideally, we would use an intmax_t and strtoimax() here, but strtoll()
931 * is more portable and 64bits should be enough for a timestamp.
932 */
933int
934proto_get_time(char **s, time_t *val)
935{
936	long long tmp;
937	char *cp, *end;
938
939	cp = proto_get_ascii(s);
940	if (cp == NULL)
941		return (-1);
942	errno = 0;
943	tmp = strtoll(cp, &end, 10);
944	if (errno || *end != '\0')
945		return (-1);
946	*val = (time_t)tmp;
947	return (0);
948}
949
950/* Start the killer thread.  It is used to protect against some signals
951   during the multi-threaded run so that we can gracefully fail.  */
952static void
953killer_start(struct killer *k, struct mux *m)
954{
955	int error;
956
957	k->mux = m;
958	k->killedby = -1;
959	sigemptyset(&k->sigset);
960	sigaddset(&k->sigset, SIGINT);
961	sigaddset(&k->sigset, SIGHUP);
962	sigaddset(&k->sigset, SIGTERM);
963	sigaddset(&k->sigset, SIGPIPE);
964	pthread_sigmask(SIG_BLOCK, &k->sigset, NULL);
965	error = pthread_create(&k->thread, NULL, killer_run, k);
966	if (error)
967		err(1, "pthread_create");
968}
969
970/* The main loop of the killer thread. */
971static void *
972killer_run(void *arg)
973{
974	struct killer *k;
975	int error, sig, old;
976
977	k = arg;
978again:
979	error = sigwait(&k->sigset, &sig);
980	assert(!error);
981	if (sig == SIGINT || sig == SIGHUP || sig == SIGTERM) {
982		if (k->killedby == -1) {
983			k->killedby = sig;
984			/* Ensure we don't get canceled during the shutdown. */
985			pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
986			mux_shutdown(k->mux, "Cleaning up ...",
987			    STATUS_INTERRUPTED);
988			pthread_setcancelstate(old, NULL);
989		}
990	}
991	goto again;
992}
993
994/* Stop the killer thread. */
995static void
996killer_stop(struct killer *k)
997{
998	void *val;
999	int error;
1000
1001	error = pthread_cancel(k->thread);
1002	assert(!error);
1003	pthread_join(k->thread, &val);
1004	assert(val == PTHREAD_CANCELED);
1005	pthread_sigmask(SIG_UNBLOCK, &k->sigset, NULL);
1006}
1007