proto.c revision 225736
1/*-
2 * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 * $FreeBSD: stable/9/usr.bin/csup/proto.c 204556 2010-03-02 07:26:07Z lulf $
27 */
28
29#include <sys/param.h>
30#include <sys/select.h>
31#include <sys/socket.h>
32#include <sys/types.h>
33#include <sys/stat.h>
34
35#include <assert.h>
36#include <err.h>
37#include <errno.h>
38#include <netdb.h>
39#include <pthread.h>
40#include <signal.h>
41#include <stdarg.h>
42#include <stddef.h>
43#include <stdio.h>
44#include <stdlib.h>
45#include <string.h>
46#include <unistd.h>
47
48#include "auth.h"
49#include "config.h"
50#include "detailer.h"
51#include "fattr.h"
52#include "fixups.h"
53#include "globtree.h"
54#include "keyword.h"
55#include "lister.h"
56#include "misc.h"
57#include "mux.h"
58#include "proto.h"
59#include "queue.h"
60#include "stream.h"
61#include "threads.h"
62#include "updater.h"
63
64struct killer {
65	pthread_t thread;
66	sigset_t sigset;
67	struct mux *mux;
68	int killedby;
69};
70
71static void		 killer_start(struct killer *, struct mux *);
72static void		*killer_run(void *);
73static void		 killer_stop(struct killer *);
74
75static int		 proto_waitconnect(int);
76static int		 proto_greet(struct config *);
77static int		 proto_negproto(struct config *);
78static int		 proto_fileattr(struct config *);
79static int		 proto_xchgcoll(struct config *);
80static struct mux	*proto_mux(struct config *);
81
82static int		 proto_escape(struct stream *, const char *);
83static void		 proto_unescape(char *);
84
85static int
86proto_waitconnect(int s)
87{
88	fd_set readfd;
89	socklen_t len;
90	int error, rv, soerror;
91
92	FD_ZERO(&readfd);
93	FD_SET(s, &readfd);
94
95	do {
96		rv = select(s + 1, &readfd, NULL, NULL, NULL);
97	} while (rv == -1 && errno == EINTR);
98	if (rv == -1)
99		return (-1);
100	/* Check that the connection was really successful. */
101	len = sizeof(soerror);
102	error = getsockopt(s, SOL_SOCKET, SO_ERROR, &soerror, &len);
103	if (error) {
104		/* We have no choice but faking an error here. */
105		errno = ECONNREFUSED;
106		return (-1);
107	}
108	if (soerror) {
109		errno = soerror;
110		return (-1);
111	}
112	return (0);
113}
114
115/* Connect to the CVSup server. */
116int
117proto_connect(struct config *config, int family, uint16_t port)
118{
119	char addrbuf[NI_MAXHOST];
120	/* Enough to hold sizeof("cvsup") or any port number. */
121	char servname[8];
122	struct addrinfo *res, *ai, hints;
123	int error, opt, s;
124
125	s = -1;
126	if (port != 0)
127		snprintf(servname, sizeof(servname), "%d", port);
128	else {
129		strncpy(servname, "cvsup", sizeof(servname) - 1);
130		servname[sizeof(servname) - 1] = '\0';
131	}
132	memset(&hints, 0, sizeof(hints));
133	hints.ai_family = family;
134	hints.ai_socktype = SOCK_STREAM;
135	error = getaddrinfo(config->host, servname, &hints, &res);
136	/*
137	 * Try with the hardcoded port number for OSes that don't
138	 * have cvsup defined in the /etc/services file.
139	 */
140	if (error == EAI_SERVICE) {
141		strncpy(servname, "5999", sizeof(servname) - 1);
142		servname[sizeof(servname) - 1] = '\0';
143		error = getaddrinfo(config->host, servname, &hints, &res);
144	}
145	if (error) {
146		lprintf(0, "Name lookup failure for \"%s\": %s\n", config->host,
147		    gai_strerror(error));
148		return (STATUS_TRANSIENTFAILURE);
149	}
150	for (ai = res; ai != NULL; ai = ai->ai_next) {
151		s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
152		if (s != -1) {
153			error = 0;
154			if (config->laddr != NULL) {
155				opt = 1;
156				(void)setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
157				    &opt, sizeof(opt));
158				error = bind(s, config->laddr,
159				    config->laddrlen);
160			}
161			if (!error) {
162				error = connect(s, ai->ai_addr, ai->ai_addrlen);
163				if (error && errno == EINTR)
164					error = proto_waitconnect(s);
165			}
166			if (error)
167				close(s);
168		}
169		(void)getnameinfo(ai->ai_addr, ai->ai_addrlen, addrbuf,
170		    sizeof(addrbuf), NULL, 0, NI_NUMERICHOST);
171		if (s == -1 || error) {
172			lprintf(0, "Cannot connect to %s: %s\n", addrbuf,
173			    strerror(errno));
174			continue;
175		}
176		lprintf(1, "Connected to %s\n", addrbuf);
177		freeaddrinfo(res);
178		config->socket = s;
179		return (STATUS_SUCCESS);
180	}
181	freeaddrinfo(res);
182	return (STATUS_TRANSIENTFAILURE);
183}
184
185/* Greet the server. */
186static int
187proto_greet(struct config *config)
188{
189	char *line, *cmd, *msg, *swver;
190	struct stream *s;
191
192	s = config->server;
193	line = stream_getln(s, NULL);
194	cmd = proto_get_ascii(&line);
195	if (cmd == NULL)
196		goto bad;
197	if (strcmp(cmd, "OK") == 0) {
198		(void)proto_get_ascii(&line);	/* major number */
199		(void)proto_get_ascii(&line);	/* minor number */
200		swver = proto_get_ascii(&line);
201	} else if (strcmp(cmd, "!") == 0) {
202		msg = proto_get_rest(&line);
203		if (msg == NULL)
204			goto bad;
205		lprintf(-1, "Rejected by server: %s\n", msg);
206		return (STATUS_TRANSIENTFAILURE);
207	} else
208		goto bad;
209	lprintf(2, "Server software version: %s\n",
210	    swver != NULL ? swver : ".");
211	return (STATUS_SUCCESS);
212bad:
213	lprintf(-1, "Invalid greeting from server\n");
214	return (STATUS_FAILURE);
215}
216
217/* Negotiate protocol version with the server. */
218static int
219proto_negproto(struct config *config)
220{
221	struct stream *s;
222	char *cmd, *line, *msg;
223	int error, maj, min;
224
225	s = config->server;
226	proto_printf(s, "PROTO %d %d %s\n", PROTO_MAJ, PROTO_MIN, PROTO_SWVER);
227	stream_flush(s);
228	line = stream_getln(s, NULL);
229	cmd = proto_get_ascii(&line);
230	if (cmd == NULL || line == NULL)
231		goto bad;
232	if (strcmp(cmd, "!") == 0) {
233		msg = proto_get_rest(&line);
234		lprintf(-1, "Protocol negotiation failed: %s\n", msg);
235		return (1);
236	} else if (strcmp(cmd, "PROTO") != 0)
237		goto bad;
238	error = proto_get_int(&line, &maj, 10);
239	if (!error)
240		error = proto_get_int(&line, &min, 10);
241	if (error)
242		goto bad;
243	if (maj != PROTO_MAJ || min != PROTO_MIN) {
244		lprintf(-1, "Server protocol version %d.%d not supported "
245		    "by client\n", maj, min);
246		return (STATUS_FAILURE);
247	}
248	return (STATUS_SUCCESS);
249bad:
250	lprintf(-1, "Invalid PROTO command from server\n");
251	return (STATUS_FAILURE);
252}
253
254/*
255 * File attribute support negotiation.
256 */
257static int
258proto_fileattr(struct config *config)
259{
260	fattr_support_t support;
261	struct stream *s;
262	char *line, *cmd;
263	int error, i, n, attr;
264
265	s = config->server;
266	lprintf(2, "Negotiating file attribute support\n");
267	proto_printf(s, "ATTR %d\n", FT_NUMBER);
268	for (i = 0; i < FT_NUMBER; i++)
269		proto_printf(s, "%x\n", fattr_supported(i));
270	proto_printf(s, ".\n");
271	stream_flush(s);
272	line = stream_getln(s, NULL);
273	if (line == NULL)
274		goto bad;
275	cmd = proto_get_ascii(&line);
276	error = proto_get_int(&line, &n, 10);
277	if (error || line != NULL || strcmp(cmd, "ATTR") != 0 || n > FT_NUMBER)
278		goto bad;
279	for (i = 0; i < n; i++) {
280		line = stream_getln(s, NULL);
281		if (line == NULL)
282			goto bad;
283		error = proto_get_int(&line, &attr, 16);
284		if (error)
285			goto bad;
286		support[i] = fattr_supported(i) & attr;
287	}
288	for (i = n; i < FT_NUMBER; i++)
289		support[i] = 0;
290	line = stream_getln(s, NULL);
291	if (line == NULL || strcmp(line, ".") != 0)
292		goto bad;
293	memcpy(config->fasupport, support, sizeof(config->fasupport));
294	return (STATUS_SUCCESS);
295bad:
296	lprintf(-1, "Protocol error negotiating attribute support\n");
297	return (STATUS_FAILURE);
298}
299
300/*
301 * Exchange collection information.
302 */
303static int
304proto_xchgcoll(struct config *config)
305{
306	struct coll *coll;
307	struct stream *s;
308	struct globtree *diraccept, *dirrefuse;
309	struct globtree *fileaccept, *filerefuse;
310	char *line, *cmd, *collname, *pat;
311	char *msg, *release, *ident, *rcskey, *prefix;
312	size_t i, len;
313	int error, flags, options;
314
315	s = config->server;
316	lprintf(2, "Exchanging collection information\n");
317	STAILQ_FOREACH(coll, &config->colls, co_next) {
318		if (coll->co_options & CO_SKIP)
319			continue;
320		proto_printf(s, "COLL %s %s %o %d\n", coll->co_name,
321		    coll->co_release, coll->co_umask, coll->co_options);
322		for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
323		    proto_printf(s, "ACC %s\n",
324			pattlist_get(coll->co_accepts, i));
325		}
326		for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
327		    proto_printf(s, "REF %s\n",
328			pattlist_get(coll->co_refusals, i));
329		}
330		proto_printf(s, ".\n");
331	}
332	proto_printf(s, ".\n");
333	stream_flush(s);
334
335	STAILQ_FOREACH(coll, &config->colls, co_next) {
336		if (coll->co_options & CO_SKIP)
337			continue;
338		coll->co_norsync = globtree_false();
339		line = stream_getln(s, NULL);
340		if (line == NULL)
341			goto bad;
342		cmd = proto_get_ascii(&line);
343		collname = proto_get_ascii(&line);
344		release = proto_get_ascii(&line);
345		error = proto_get_int(&line, &options, 10);
346		if (error || line != NULL)
347			goto bad;
348		if (strcmp(cmd, "COLL") != 0 ||
349		    strcmp(collname, coll->co_name) != 0 ||
350		    strcmp(release, coll->co_release) != 0)
351			goto bad;
352		coll->co_options =
353		    (coll->co_options | (options & CO_SERVMAYSET)) &
354		    ~(~options & CO_SERVMAYCLEAR);
355		while ((line = stream_getln(s, NULL)) != NULL) {
356		 	if (strcmp(line, ".") == 0)
357				break;
358			cmd = proto_get_ascii(&line);
359			if (cmd == NULL)
360				goto bad;
361			if (strcmp(cmd, "!") == 0) {
362				msg = proto_get_rest(&line);
363				if (msg == NULL)
364					goto bad;
365				lprintf(-1, "Server message: %s\n", msg);
366			} else if (strcmp(cmd, "PRFX") == 0) {
367				prefix = proto_get_ascii(&line);
368				if (prefix == NULL || line != NULL)
369					goto bad;
370				coll->co_cvsroot = xstrdup(prefix);
371			} else if (strcmp(cmd, "KEYALIAS") == 0) {
372				ident = proto_get_ascii(&line);
373				rcskey = proto_get_ascii(&line);
374				if (rcskey == NULL || line != NULL)
375					goto bad;
376				error = keyword_alias(coll->co_keyword, ident,
377				    rcskey);
378				if (error)
379					goto bad;
380			} else if (strcmp(cmd, "KEYON") == 0) {
381				ident = proto_get_ascii(&line);
382				if (ident == NULL || line != NULL)
383					goto bad;
384				error = keyword_enable(coll->co_keyword, ident);
385				if (error)
386					goto bad;
387			} else if (strcmp(cmd, "KEYOFF") == 0) {
388				ident = proto_get_ascii(&line);
389				if (ident == NULL || line != NULL)
390					goto bad;
391				error = keyword_disable(coll->co_keyword,
392				    ident);
393				if (error)
394					goto bad;
395			} else if (strcmp(cmd, "NORS") == 0) {
396				pat = proto_get_ascii(&line);
397				if (pat == NULL || line != NULL)
398					goto bad;
399				coll->co_norsync = globtree_or(coll->co_norsync,
400				    globtree_match(pat, FNM_PATHNAME));
401			} else if (strcmp(cmd, "RNORS") == 0) {
402				pat = proto_get_ascii(&line);
403				if (pat == NULL || line != NULL)
404					goto bad;
405				coll->co_norsync = globtree_or(coll->co_norsync,
406				    globtree_match(pat, FNM_PATHNAME |
407				    FNM_LEADING_DIR));
408			} else
409				goto bad;
410		}
411		if (line == NULL)
412			goto bad;
413		keyword_prepare(coll->co_keyword);
414
415		diraccept = globtree_true();
416		fileaccept = globtree_true();
417		dirrefuse = globtree_false();
418		filerefuse = globtree_false();
419
420		if (pattlist_size(coll->co_accepts) > 0) {
421			globtree_free(diraccept);
422			globtree_free(fileaccept);
423			diraccept = globtree_false();
424			fileaccept = globtree_false();
425			flags = FNM_PATHNAME | FNM_LEADING_DIR |
426			    FNM_PREFIX_DIRS;
427			for (i = 0; i < pattlist_size(coll->co_accepts); i++) {
428				pat = pattlist_get(coll->co_accepts, i);
429				diraccept = globtree_or(diraccept,
430				    globtree_match(pat, flags));
431
432				len = strlen(pat);
433				if (coll->co_options & CO_CHECKOUTMODE &&
434				    (len == 0 || pat[len - 1] != '*')) {
435					/* We must modify the pattern so that it
436					   refers to the RCS file, rather than
437					   the checked-out file. */
438					xasprintf(&pat, "%s,v", pat);
439					fileaccept = globtree_or(fileaccept,
440					    globtree_match(pat, flags));
441					free(pat);
442				} else {
443					fileaccept = globtree_or(fileaccept,
444					    globtree_match(pat, flags));
445				}
446			}
447		}
448
449		for (i = 0; i < pattlist_size(coll->co_refusals); i++) {
450			pat = pattlist_get(coll->co_refusals, i);
451			dirrefuse = globtree_or(dirrefuse,
452			    globtree_match(pat, 0));
453			len = strlen(pat);
454			if (coll->co_options & CO_CHECKOUTMODE &&
455			    (len == 0 || pat[len - 1] != '*')) {
456				/* We must modify the pattern so that it refers
457				   to the RCS file, rather than the checked-out
458				   file. */
459				xasprintf(&pat, "%s,v", pat);
460				filerefuse = globtree_or(filerefuse,
461				    globtree_match(pat, 0));
462				free(pat);
463			} else {
464				filerefuse = globtree_or(filerefuse,
465				    globtree_match(pat, 0));
466			}
467		}
468
469		coll->co_dirfilter = globtree_and(diraccept,
470		    globtree_not(dirrefuse));
471		coll->co_filefilter = globtree_and(fileaccept,
472		    globtree_not(filerefuse));
473
474		/* Set up a mask of file attributes that we don't want to sync
475		   with the server. */
476		if (!(coll->co_options & CO_SETOWNER))
477			coll->co_attrignore |= FA_OWNER | FA_GROUP;
478		if (!(coll->co_options & CO_SETMODE))
479			coll->co_attrignore |= FA_MODE;
480		if (!(coll->co_options & CO_SETFLAGS))
481			coll->co_attrignore |= FA_FLAGS;
482	}
483	return (STATUS_SUCCESS);
484bad:
485	lprintf(-1, "Protocol error during collection exchange\n");
486	return (STATUS_FAILURE);
487}
488
489static struct mux *
490proto_mux(struct config *config)
491{
492	struct mux *m;
493	struct stream *s, *wr;
494	struct chan *chan0, *chan1;
495	int id;
496
497	s = config->server;
498	lprintf(2, "Establishing multiplexed-mode data connection\n");
499	proto_printf(s, "MUX\n");
500	stream_flush(s);
501	m = mux_open(config->socket, &chan0);
502	if (m == NULL) {
503		lprintf(-1, "Cannot open the multiplexer\n");
504		return (NULL);
505	}
506	id = chan_listen(m);
507	if (id == -1) {
508		lprintf(-1, "ChannelMux.Listen failed: %s\n", strerror(errno));
509		mux_close(m);
510		return (NULL);
511	}
512	wr = stream_open(chan0, NULL, (stream_writefn_t *)chan_write, NULL);
513	proto_printf(wr, "CHAN %d\n", id);
514	stream_close(wr);
515	chan1 = chan_accept(m, id);
516	if (chan1 == NULL) {
517		lprintf(-1, "ChannelMux.Accept failed: %s\n", strerror(errno));
518		mux_close(m);
519		return (NULL);
520	}
521	config->chan0 = chan0;
522	config->chan1 = chan1;
523	return (m);
524}
525
526/*
527 * Initializes the connection to the CVSup server, that is handle
528 * the protocol negotiation, logging in, exchanging file attributes
529 * support and collections information, and finally run the update
530 * session.
531 */
532int
533proto_run(struct config *config)
534{
535	struct thread_args lister_args;
536	struct thread_args detailer_args;
537	struct thread_args updater_args;
538	struct thread_args *args;
539	struct killer killer;
540	struct threads *workers;
541	struct mux *m;
542	int i, status;
543
544	/*
545	 * We pass NULL for the close() function because we'll reuse
546	 * the socket after the stream is closed.
547	 */
548	config->server = stream_open_fd(config->socket, stream_read_fd,
549	    stream_write_fd, NULL);
550	status = proto_greet(config);
551	if (status == STATUS_SUCCESS)
552		status = proto_negproto(config);
553	if (status == STATUS_SUCCESS)
554		status = auth_login(config);
555	if (status == STATUS_SUCCESS)
556		status = proto_fileattr(config);
557	if (status == STATUS_SUCCESS)
558		status = proto_xchgcoll(config);
559	if (status != STATUS_SUCCESS)
560		return (status);
561
562	/* Multi-threaded action starts here. */
563	m = proto_mux(config);
564	if (m == NULL)
565		return (STATUS_FAILURE);
566
567	stream_close(config->server);
568	config->server = NULL;
569	config->fixups = fixups_new();
570	killer_start(&killer, m);
571
572	/* Start the worker threads. */
573	workers = threads_new();
574	args = &lister_args;
575	args->config = config;
576	args->status = -1;
577	args->errmsg = NULL;
578	args->rd = NULL;
579	args->wr = stream_open(config->chan0,
580	    NULL, (stream_writefn_t *)chan_write, NULL);
581	threads_create(workers, lister, args);
582
583	args = &detailer_args;
584	args->config = config;
585	args->status = -1;
586	args->errmsg = NULL;
587	args->rd = stream_open(config->chan0,
588	    (stream_readfn_t *)chan_read, NULL, NULL);
589	args->wr = stream_open(config->chan1,
590	    NULL, (stream_writefn_t *)chan_write, NULL);
591	threads_create(workers, detailer, args);
592
593	args = &updater_args;
594	args->config = config;
595	args->status = -1;
596	args->errmsg = NULL;
597	args->rd = stream_open(config->chan1,
598	    (stream_readfn_t *)chan_read, NULL, NULL);
599	args->wr = NULL;
600	threads_create(workers, updater, args);
601
602	lprintf(2, "Running\n");
603	/* Wait for all the worker threads to finish. */
604	status = STATUS_SUCCESS;
605	for (i = 0; i < 3; i++) {
606		args = threads_wait(workers);
607		if (args->rd != NULL)
608			stream_close(args->rd);
609		if (args->wr != NULL)
610			stream_close(args->wr);
611		if (args->status != STATUS_SUCCESS) {
612			assert(args->errmsg != NULL);
613			if (status == STATUS_SUCCESS) {
614				status = args->status;
615				/* Shutdown the multiplexer to wake up all
616				   the other threads. */
617				mux_shutdown(m, args->errmsg, status);
618			}
619			free(args->errmsg);
620		}
621	}
622	threads_free(workers);
623	if (status == STATUS_SUCCESS) {
624		lprintf(2, "Shutting down connection to server\n");
625		chan_close(config->chan0);
626		chan_close(config->chan1);
627		chan_wait(config->chan0);
628		chan_wait(config->chan1);
629		mux_shutdown(m, NULL, STATUS_SUCCESS);
630	}
631	killer_stop(&killer);
632	fixups_free(config->fixups);
633	status = mux_close(m);
634	if (status == STATUS_SUCCESS) {
635		lprintf(1, "Finished successfully\n");
636	} else if (status == STATUS_INTERRUPTED) {
637		lprintf(-1, "Interrupted\n");
638		if (killer.killedby != -1)
639			kill(getpid(), killer.killedby);
640	}
641	return (status);
642}
643
644/*
645 * Write a string into the stream, escaping characters as needed.
646 * Characters escaped:
647 *
648 * SPACE	-> "\_"
649 * TAB		->  "\t"
650 * NEWLINE	-> "\n"
651 * CR		-> "\r"
652 * \		-> "\\"
653 */
654static int
655proto_escape(struct stream *wr, const char *s)
656{
657	size_t len;
658	ssize_t n;
659	char c;
660
661	/* Handle characters that need escaping. */
662	do {
663		len = strcspn(s, " \t\r\n\\");
664		n = stream_write(wr, s, len);
665		if (n == -1)
666			return (-1);
667		c = s[len];
668		switch (c) {
669		case ' ':
670			n = stream_write(wr, "\\_", 2);
671			break;
672		case '\t':
673			n = stream_write(wr, "\\t", 2);
674			break;
675		case '\r':
676			n = stream_write(wr, "\\r", 2);
677			break;
678		case '\n':
679			n = stream_write(wr, "\\n", 2);
680			break;
681		case '\\':
682			n = stream_write(wr, "\\\\", 2);
683			break;
684		}
685		if (n == -1)
686			return (-1);
687		s += len + 1;
688	} while (c != '\0');
689	return (0);
690}
691
692/*
693 * A simple printf() implementation specifically tailored for csup.
694 * List of the supported formats:
695 *
696 * %c		Print a char.
697 * %d or %i	Print an int as decimal.
698 * %x		Print an int as hexadecimal.
699 * %o		Print an int as octal.
700 * %t		Print a time_t as decimal.
701 * %s		Print a char * escaping some characters as needed.
702 * %S		Print a char * without escaping.
703 * %f		Print an encoded struct fattr *.
704 * %F		Print an encoded struct fattr *, specifying the supported
705 * 		attributes.
706 */
707int
708proto_printf(struct stream *wr, const char *format, ...)
709{
710	fattr_support_t *support;
711	long long longval;
712	struct fattr *fa;
713	const char *fmt;
714	va_list ap;
715	char *cp, *s, *attr;
716	ssize_t n;
717	size_t size;
718	off_t off;
719	int rv, val, ignore;
720	char c;
721
722	n = 0;
723	rv = 0;
724	fmt = format;
725	va_start(ap, format);
726	while ((cp = strchr(fmt, '%')) != NULL) {
727		if (cp > fmt) {
728			n = stream_write(wr, fmt, cp - fmt);
729			if (n == -1)
730				return (-1);
731		}
732		if (*++cp == '\0')
733			goto done;
734		switch (*cp) {
735		case 'c':
736			c = va_arg(ap, int);
737			rv = stream_printf(wr, "%c", c);
738			break;
739		case 'd':
740		case 'i':
741			val = va_arg(ap, int);
742			rv = stream_printf(wr, "%d", val);
743			break;
744		case 'x':
745			val = va_arg(ap, int);
746			rv = stream_printf(wr, "%x", val);
747			break;
748		case 'o':
749			val = va_arg(ap, int);
750			rv = stream_printf(wr, "%o", val);
751			break;
752		case 'O':
753			off = va_arg(ap, off_t);
754			rv = stream_printf(wr, "%llu", off);
755			break;
756		case 'S':
757			s = va_arg(ap, char *);
758			assert(s != NULL);
759			rv = stream_printf(wr, "%s", s);
760			break;
761		case 's':
762			s = va_arg(ap, char *);
763			assert(s != NULL);
764			rv = proto_escape(wr, s);
765			break;
766		case 't':
767			longval = (long long)va_arg(ap, time_t);
768			rv = stream_printf(wr, "%lld", longval);
769			break;
770		case 'f':
771			fa = va_arg(ap, struct fattr *);
772			attr = fattr_encode(fa, NULL, 0);
773			rv = proto_escape(wr, attr);
774			free(attr);
775			break;
776		case 'F':
777			fa = va_arg(ap, struct fattr *);
778			support = va_arg(ap, fattr_support_t *);
779			ignore = va_arg(ap, int);
780			attr = fattr_encode(fa, *support, ignore);
781			rv = proto_escape(wr, attr);
782			free(attr);
783			break;
784		case 'z':
785			size = va_arg(ap, size_t);
786			rv = stream_printf(wr, "%zu", size);
787			break;
788
789		case '%':
790			n = stream_write(wr, "%", 1);
791			if (n == -1)
792				return (-1);
793			break;
794		}
795		if (rv == -1)
796			return (-1);
797		fmt = cp + 1;
798	}
799	if (*fmt != '\0') {
800		rv = stream_printf(wr, "%s", fmt);
801		if (rv == -1)
802			return (-1);
803	}
804done:
805	va_end(ap);
806	return (0);
807}
808
809/*
810 * Unescape the string, see proto_escape().
811 */
812static void
813proto_unescape(char *s)
814{
815	char *cp, *cp2;
816
817	cp = s;
818	while ((cp = strchr(cp, '\\')) != NULL) {
819		switch (cp[1]) {
820		case '_':
821			*cp = ' ';
822			break;
823		case 't':
824			*cp = '\t';
825			break;
826		case 'r':
827			*cp = '\r';
828			break;
829		case 'n':
830			*cp = '\n';
831			break;
832		case '\\':
833			*cp = '\\';
834			break;
835		default:
836			*cp = *(cp + 1);
837		}
838		cp2 = ++cp;
839		while (*cp2 != '\0') {
840			*cp2 = *(cp2 + 1);
841			cp2++;
842		}
843	}
844}
845
846/*
847 * Get an ascii token in the string.
848 */
849char *
850proto_get_ascii(char **s)
851{
852	char *ret;
853
854	ret = strsep(s, " ");
855	if (ret == NULL)
856		return (NULL);
857	/* Make sure we disallow 0-length fields. */
858	if (*ret == '\0') {
859		*s = NULL;
860		return (NULL);
861	}
862	proto_unescape(ret);
863	return (ret);
864}
865
866/*
867 * Get the rest of the string.
868 */
869char *
870proto_get_rest(char **s)
871{
872	char *ret;
873
874	if (s == NULL)
875		return (NULL);
876	ret = *s;
877	proto_unescape(ret);
878	*s = NULL;
879	return (ret);
880}
881
882/*
883 * Get an int token.
884 */
885int
886proto_get_int(char **s, int *val, int base)
887{
888	char *cp;
889	int error;
890
891	cp = proto_get_ascii(s);
892	if (cp == NULL)
893		return (-1);
894	error = asciitoint(cp, val, base);
895	return (error);
896}
897
898/*
899 * Get a size_t token.
900 */
901int
902proto_get_sizet(char **s, size_t *val, int base)
903{
904	unsigned long long tmp;
905	char *cp, *end;
906
907	cp = proto_get_ascii(s);
908	if (cp == NULL)
909		return (-1);
910	errno = 0;
911	tmp = strtoll(cp, &end, base);
912	if (errno || *end != '\0')
913		return (-1);
914	*val = (size_t)tmp;
915	return (0);
916}
917
918/*
919 * Get a time_t token.
920 *
921 * Ideally, we would use an intmax_t and strtoimax() here, but strtoll()
922 * is more portable and 64bits should be enough for a timestamp.
923 */
924int
925proto_get_time(char **s, time_t *val)
926{
927	long long tmp;
928	char *cp, *end;
929
930	cp = proto_get_ascii(s);
931	if (cp == NULL)
932		return (-1);
933	errno = 0;
934	tmp = strtoll(cp, &end, 10);
935	if (errno || *end != '\0')
936		return (-1);
937	*val = (time_t)tmp;
938	return (0);
939}
940
941/* Start the killer thread.  It is used to protect against some signals
942   during the multi-threaded run so that we can gracefully fail.  */
943static void
944killer_start(struct killer *k, struct mux *m)
945{
946	int error;
947
948	k->mux = m;
949	k->killedby = -1;
950	sigemptyset(&k->sigset);
951	sigaddset(&k->sigset, SIGINT);
952	sigaddset(&k->sigset, SIGHUP);
953	sigaddset(&k->sigset, SIGTERM);
954	sigaddset(&k->sigset, SIGPIPE);
955	pthread_sigmask(SIG_BLOCK, &k->sigset, NULL);
956	error = pthread_create(&k->thread, NULL, killer_run, k);
957	if (error)
958		err(1, "pthread_create");
959}
960
961/* The main loop of the killer thread. */
962static void *
963killer_run(void *arg)
964{
965	struct killer *k;
966	int error, sig, old;
967
968	k = arg;
969again:
970	error = sigwait(&k->sigset, &sig);
971	assert(!error);
972	if (sig == SIGINT || sig == SIGHUP || sig == SIGTERM) {
973		if (k->killedby == -1) {
974			k->killedby = sig;
975			/* Ensure we don't get canceled during the shutdown. */
976			pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
977			mux_shutdown(k->mux, "Cleaning up ...",
978			    STATUS_INTERRUPTED);
979			pthread_setcancelstate(old, NULL);
980		}
981	}
982	goto again;
983}
984
985/* Stop the killer thread. */
986static void
987killer_stop(struct killer *k)
988{
989	void *val;
990	int error;
991
992	error = pthread_cancel(k->thread);
993	assert(!error);
994	pthread_join(k->thread, &val);
995	assert(val == PTHREAD_CANCELED);
996	pthread_sigmask(SIG_UNBLOCK, &k->sigset, NULL);
997}
998