tube.c revision 303975
1/*
2 * util/tube.c - pipe service
3 *
4 * Copyright (c) 2008, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/**
37 * \file
38 *
39 * This file contains pipe service functions.
40 */
41#include "config.h"
42#include "util/tube.h"
43#include "util/log.h"
44#include "util/net_help.h"
45#include "util/netevent.h"
46#include "util/fptr_wlist.h"
47
48#ifndef USE_WINSOCK
49/* on unix */
50
51#ifndef HAVE_SOCKETPAIR
52/** no socketpair() available, like on Minix 3.1.7, use pipe */
53#define socketpair(f, t, p, sv) pipe(sv)
54#endif /* HAVE_SOCKETPAIR */
55
56struct tube* tube_create(void)
57{
58	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
59	int sv[2];
60	if(!tube) {
61		int err = errno;
62		log_err("tube_create: out of memory");
63		errno = err;
64		return NULL;
65	}
66	tube->sr = -1;
67	tube->sw = -1;
68	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
69		int err = errno;
70		log_err("socketpair: %s", strerror(errno));
71		free(tube);
72		errno = err;
73		return NULL;
74	}
75	tube->sr = sv[0];
76	tube->sw = sv[1];
77	if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
78		int err = errno;
79		log_err("tube: cannot set nonblocking");
80		tube_delete(tube);
81		errno = err;
82		return NULL;
83	}
84	return tube;
85}
86
87void tube_delete(struct tube* tube)
88{
89	if(!tube) return;
90	tube_remove_bg_listen(tube);
91	tube_remove_bg_write(tube);
92	/* close fds after deleting commpoints, to be sure.
93	 *            Also epoll does not like closing fd before event_del */
94	tube_close_read(tube);
95	tube_close_write(tube);
96	free(tube);
97}
98
99void tube_close_read(struct tube* tube)
100{
101	if(tube->sr != -1) {
102		close(tube->sr);
103		tube->sr = -1;
104	}
105}
106
107void tube_close_write(struct tube* tube)
108{
109	if(tube->sw != -1) {
110		close(tube->sw);
111		tube->sw = -1;
112	}
113}
114
115void tube_remove_bg_listen(struct tube* tube)
116{
117	if(tube->listen_com) {
118		comm_point_delete(tube->listen_com);
119		tube->listen_com = NULL;
120	}
121	free(tube->cmd_msg);
122	tube->cmd_msg = NULL;
123}
124
125void tube_remove_bg_write(struct tube* tube)
126{
127	if(tube->res_com) {
128		comm_point_delete(tube->res_com);
129		tube->res_com = NULL;
130	}
131	if(tube->res_list) {
132		struct tube_res_list* np, *p = tube->res_list;
133		tube->res_list = NULL;
134		tube->res_last = NULL;
135		while(p) {
136			np = p->next;
137			free(p->buf);
138			free(p);
139			p = np;
140		}
141	}
142}
143
144int
145tube_handle_listen(struct comm_point* c, void* arg, int error,
146        struct comm_reply* ATTR_UNUSED(reply_info))
147{
148	struct tube* tube = (struct tube*)arg;
149	ssize_t r;
150	if(error != NETEVENT_NOERROR) {
151		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
152		(*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
153		return 0;
154	}
155
156	if(tube->cmd_read < sizeof(tube->cmd_len)) {
157		/* complete reading the length of control msg */
158		r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
159			sizeof(tube->cmd_len) - tube->cmd_read);
160		if(r==0) {
161			/* error has happened or */
162			/* parent closed pipe, must have exited somehow */
163			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
164			(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
165				tube->listen_arg);
166			return 0;
167		}
168		if(r==-1) {
169			if(errno != EAGAIN && errno != EINTR) {
170				log_err("rpipe error: %s", strerror(errno));
171			}
172			/* nothing to read now, try later */
173			return 0;
174		}
175		tube->cmd_read += r;
176		if(tube->cmd_read < sizeof(tube->cmd_len)) {
177			/* not complete, try later */
178			return 0;
179		}
180		tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
181		if(!tube->cmd_msg) {
182			log_err("malloc failure");
183			tube->cmd_read = 0;
184			return 0;
185		}
186	}
187	/* cmd_len has been read, read remainder */
188	r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
189		tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
190	if(r==0) {
191		/* error has happened or */
192		/* parent closed pipe, must have exited somehow */
193		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
194		(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
195			tube->listen_arg);
196		return 0;
197	}
198	if(r==-1) {
199		/* nothing to read now, try later */
200		if(errno != EAGAIN && errno != EINTR) {
201			log_err("rpipe error: %s", strerror(errno));
202		}
203		return 0;
204	}
205	tube->cmd_read += r;
206	if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
207		/* not complete, try later */
208		return 0;
209	}
210	tube->cmd_read = 0;
211
212	fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
213	(*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
214		NETEVENT_NOERROR, tube->listen_arg);
215		/* also frees the buf */
216	tube->cmd_msg = NULL;
217	return 0;
218}
219
220int
221tube_handle_write(struct comm_point* c, void* arg, int error,
222        struct comm_reply* ATTR_UNUSED(reply_info))
223{
224	struct tube* tube = (struct tube*)arg;
225	struct tube_res_list* item = tube->res_list;
226	ssize_t r;
227	if(error != NETEVENT_NOERROR) {
228		log_err("tube_handle_write net error %d", error);
229		return 0;
230	}
231
232	if(!item) {
233		comm_point_stop_listening(c);
234		return 0;
235	}
236
237	if(tube->res_write < sizeof(item->len)) {
238		r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
239			sizeof(item->len) - tube->res_write);
240		if(r == -1) {
241			if(errno != EAGAIN && errno != EINTR) {
242				log_err("wpipe error: %s", strerror(errno));
243			}
244			return 0; /* try again later */
245		}
246		if(r == 0) {
247			/* error on pipe, must have exited somehow */
248			/* cannot signal this to pipe user */
249			return 0;
250		}
251		tube->res_write += r;
252		if(tube->res_write < sizeof(item->len))
253			return 0;
254	}
255	r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
256		item->len - (tube->res_write - sizeof(item->len)));
257	if(r == -1) {
258		if(errno != EAGAIN && errno != EINTR) {
259			log_err("wpipe error: %s", strerror(errno));
260		}
261		return 0; /* try again later */
262	}
263	if(r == 0) {
264		/* error on pipe, must have exited somehow */
265		/* cannot signal this to pipe user */
266		return 0;
267	}
268	tube->res_write += r;
269	if(tube->res_write < sizeof(item->len) + item->len)
270		return 0;
271	/* done this result, remove it */
272	free(item->buf);
273	item->buf = NULL;
274	tube->res_list = tube->res_list->next;
275	free(item);
276	if(!tube->res_list) {
277		tube->res_last = NULL;
278		comm_point_stop_listening(c);
279	}
280	tube->res_write = 0;
281	return 0;
282}
283
284int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
285        int nonblock)
286{
287	ssize_t r, d;
288	int fd = tube->sw;
289
290	/* test */
291	if(nonblock) {
292		r = write(fd, &len, sizeof(len));
293		if(r == -1) {
294			if(errno==EINTR || errno==EAGAIN)
295				return -1;
296			log_err("tube msg write failed: %s", strerror(errno));
297			return -1; /* can still continue, perhaps */
298		}
299	} else r = 0;
300	if(!fd_set_block(fd))
301		return 0;
302	/* write remainder */
303	d = r;
304	while(d != (ssize_t)sizeof(len)) {
305		if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
306			log_err("tube msg write failed: %s", strerror(errno));
307			(void)fd_set_nonblock(fd);
308			return 0;
309		}
310		d += r;
311	}
312	d = 0;
313	while(d != (ssize_t)len) {
314		if((r=write(fd, buf+d, len-d)) == -1) {
315			log_err("tube msg write failed: %s", strerror(errno));
316			(void)fd_set_nonblock(fd);
317			return 0;
318		}
319		d += r;
320	}
321	if(!fd_set_nonblock(fd))
322		return 0;
323	return 1;
324}
325
326int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
327        int nonblock)
328{
329	ssize_t r, d;
330	int fd = tube->sr;
331
332	/* test */
333	*len = 0;
334	if(nonblock) {
335		r = read(fd, len, sizeof(*len));
336		if(r == -1) {
337			if(errno==EINTR || errno==EAGAIN)
338				return -1;
339			log_err("tube msg read failed: %s", strerror(errno));
340			return -1; /* we can still continue, perhaps */
341		}
342		if(r == 0) /* EOF */
343			return 0;
344	} else r = 0;
345	if(!fd_set_block(fd))
346		return 0;
347	/* read remainder */
348	d = r;
349	while(d != (ssize_t)sizeof(*len)) {
350		if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
351			log_err("tube msg read failed: %s", strerror(errno));
352			(void)fd_set_nonblock(fd);
353			return 0;
354		}
355		if(r == 0) /* EOF */ {
356			(void)fd_set_nonblock(fd);
357			return 0;
358		}
359		d += r;
360	}
361	log_assert(*len < 65536*2);
362	*buf = (uint8_t*)malloc(*len);
363	if(!*buf) {
364		log_err("tube read out of memory");
365		(void)fd_set_nonblock(fd);
366		return 0;
367	}
368	d = 0;
369	while(d < (ssize_t)*len) {
370		if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
371			log_err("tube msg read failed: %s", strerror(errno));
372			(void)fd_set_nonblock(fd);
373			free(*buf);
374			return 0;
375		}
376		if(r == 0) { /* EOF */
377			(void)fd_set_nonblock(fd);
378			free(*buf);
379			return 0;
380		}
381		d += r;
382	}
383	if(!fd_set_nonblock(fd)) {
384		free(*buf);
385		return 0;
386	}
387	return 1;
388}
389
390/** perform a select() on the fd */
391static int
392pollit(int fd, struct timeval* t)
393{
394	fd_set r;
395#ifndef S_SPLINT_S
396	FD_ZERO(&r);
397	FD_SET(FD_SET_T fd, &r);
398#endif
399	if(select(fd+1, &r, NULL, NULL, t) == -1) {
400		return 0;
401	}
402	errno = 0;
403	return (int)(FD_ISSET(fd, &r));
404}
405
406int tube_poll(struct tube* tube)
407{
408	struct timeval t;
409	memset(&t, 0, sizeof(t));
410	return pollit(tube->sr, &t);
411}
412
413int tube_wait(struct tube* tube)
414{
415	return pollit(tube->sr, NULL);
416}
417
418int tube_read_fd(struct tube* tube)
419{
420	return tube->sr;
421}
422
423int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
424        tube_callback_t* cb, void* arg)
425{
426	tube->listen_cb = cb;
427	tube->listen_arg = arg;
428	if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
429		0, tube_handle_listen, tube))) {
430		int err = errno;
431		log_err("tube_setup_bg_l: commpoint creation failed");
432		errno = err;
433		return 0;
434	}
435	return 1;
436}
437
438int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
439{
440	if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
441		1, tube_handle_write, tube))) {
442		int err = errno;
443		log_err("tube_setup_bg_w: commpoint creation failed");
444		errno = err;
445		return 0;
446	}
447	return 1;
448}
449
450int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
451{
452	struct tube_res_list* item =
453		(struct tube_res_list*)malloc(sizeof(*item));
454	if(!item) {
455		free(msg);
456		log_err("out of memory for async answer");
457		return 0;
458	}
459	item->buf = msg;
460	item->len = len;
461	item->next = NULL;
462	/* add at back of list, since the first one may be partially written */
463	if(tube->res_last)
464		tube->res_last->next = item;
465	else    tube->res_list = item;
466	tube->res_last = item;
467	if(tube->res_list == tube->res_last) {
468		/* first added item, start the write process */
469		comm_point_start_listening(tube->res_com, -1, -1);
470	}
471	return 1;
472}
473
474void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
475	void* ATTR_UNUSED(arg))
476{
477	log_assert(0);
478}
479
480#else /* USE_WINSOCK */
481/* on windows */
482
483
484struct tube* tube_create(void)
485{
486	/* windows does not have forks like unix, so we only support
487	 * threads on windows. And thus the pipe need only connect
488	 * threads. We use a mutex and a list of datagrams. */
489	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
490	if(!tube) {
491		int err = errno;
492		log_err("tube_create: out of memory");
493		errno = err;
494		return NULL;
495	}
496	tube->event = WSACreateEvent();
497	if(tube->event == WSA_INVALID_EVENT) {
498		free(tube);
499		log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
500	}
501	if(!WSAResetEvent(tube->event)) {
502		log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
503	}
504	lock_basic_init(&tube->res_lock);
505	verbose(VERB_ALGO, "tube created");
506	return tube;
507}
508
509void tube_delete(struct tube* tube)
510{
511	if(!tube) return;
512	tube_remove_bg_listen(tube);
513	tube_remove_bg_write(tube);
514	tube_close_read(tube);
515	tube_close_write(tube);
516	if(!WSACloseEvent(tube->event))
517		log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
518	lock_basic_destroy(&tube->res_lock);
519	verbose(VERB_ALGO, "tube deleted");
520	free(tube);
521}
522
523void tube_close_read(struct tube* ATTR_UNUSED(tube))
524{
525	verbose(VERB_ALGO, "tube close_read");
526}
527
528void tube_close_write(struct tube* ATTR_UNUSED(tube))
529{
530	verbose(VERB_ALGO, "tube close_write");
531	/* wake up waiting reader with an empty queue */
532	if(!WSASetEvent(tube->event)) {
533		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
534	}
535}
536
537void tube_remove_bg_listen(struct tube* tube)
538{
539	verbose(VERB_ALGO, "tube remove_bg_listen");
540	winsock_unregister_wsaevent(&tube->ev_listen);
541}
542
543void tube_remove_bg_write(struct tube* tube)
544{
545	verbose(VERB_ALGO, "tube remove_bg_write");
546	if(tube->res_list) {
547		struct tube_res_list* np, *p = tube->res_list;
548		tube->res_list = NULL;
549		tube->res_last = NULL;
550		while(p) {
551			np = p->next;
552			free(p->buf);
553			free(p);
554			p = np;
555		}
556	}
557}
558
559int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
560        int ATTR_UNUSED(nonblock))
561{
562	uint8_t* a;
563	verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
564	a = (uint8_t*)memdup(buf, len);
565	if(!a) {
566		log_err("out of memory in tube_write_msg");
567		return 0;
568	}
569	/* always nonblocking, this pipe cannot get full */
570	return tube_queue_item(tube, a, len);
571}
572
573int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
574        int nonblock)
575{
576	struct tube_res_list* item = NULL;
577	verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
578	*buf = NULL;
579	if(!tube_poll(tube)) {
580		verbose(VERB_ALGO, "tube read_msg nodata");
581		/* nothing ready right now, wait if we want to */
582		if(nonblock)
583			return -1; /* would block waiting for items */
584		if(!tube_wait(tube))
585			return 0;
586	}
587	lock_basic_lock(&tube->res_lock);
588	if(tube->res_list) {
589		item = tube->res_list;
590		tube->res_list = item->next;
591		if(tube->res_last == item) {
592			/* the list is now empty */
593			tube->res_last = NULL;
594			verbose(VERB_ALGO, "tube read_msg lastdata");
595			if(!WSAResetEvent(tube->event)) {
596				log_err("WSAResetEvent: %s",
597					wsa_strerror(WSAGetLastError()));
598			}
599		}
600	}
601	lock_basic_unlock(&tube->res_lock);
602	if(!item)
603		return 0; /* would block waiting for items */
604	*buf = item->buf;
605	*len = item->len;
606	free(item);
607	verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
608	return 1;
609}
610
611int tube_poll(struct tube* tube)
612{
613	struct tube_res_list* item = NULL;
614	lock_basic_lock(&tube->res_lock);
615	item = tube->res_list;
616	lock_basic_unlock(&tube->res_lock);
617	if(item)
618		return 1;
619	return 0;
620}
621
622int tube_wait(struct tube* tube)
623{
624	/* block on eventhandle */
625	DWORD res = WSAWaitForMultipleEvents(
626		1 /* one event in array */,
627		&tube->event /* the event to wait for, our pipe signal */,
628		0 /* wait for all events is false */,
629		WSA_INFINITE /* wait, no timeout */,
630		0 /* we are not alertable for IO completion routines */
631		);
632	if(res == WSA_WAIT_TIMEOUT) {
633		return 0;
634	}
635	if(res == WSA_WAIT_IO_COMPLETION) {
636		/* a bit unexpected, since we were not alertable */
637		return 0;
638	}
639	return 1;
640}
641
642int tube_read_fd(struct tube* ATTR_UNUSED(tube))
643{
644	/* nothing sensible on Windows */
645	return -1;
646}
647
648int
649tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
650	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
651{
652	log_assert(0);
653	return 0;
654}
655
656int
657tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
658	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
659{
660	log_assert(0);
661	return 0;
662}
663
664int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
665        tube_callback_t* cb, void* arg)
666{
667	tube->listen_cb = cb;
668	tube->listen_arg = arg;
669	if(!comm_base_internal(base))
670		return 1; /* ignore when no comm base - testing */
671	return winsock_register_wsaevent(comm_base_internal(base),
672		&tube->ev_listen, tube->event, &tube_handle_signal, tube);
673}
674
675int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
676	struct comm_base* ATTR_UNUSED(base))
677{
678	/* the queue item routine performs the signaling */
679	return 1;
680}
681
682int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
683{
684	struct tube_res_list* item =
685		(struct tube_res_list*)malloc(sizeof(*item));
686	verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
687	if(!item) {
688		free(msg);
689		log_err("out of memory for async answer");
690		return 0;
691	}
692	item->buf = msg;
693	item->len = len;
694	item->next = NULL;
695	lock_basic_lock(&tube->res_lock);
696	/* add at back of list, since the first one may be partially written */
697	if(tube->res_last)
698		tube->res_last->next = item;
699	else    tube->res_list = item;
700	tube->res_last = item;
701	/* signal the eventhandle */
702	if(!WSASetEvent(tube->event)) {
703		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
704	}
705	lock_basic_unlock(&tube->res_lock);
706	return 1;
707}
708
709void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
710	void* arg)
711{
712	struct tube* tube = (struct tube*)arg;
713	uint8_t* buf;
714	uint32_t len = 0;
715	verbose(VERB_ALGO, "tube handle_signal");
716	while(tube_poll(tube)) {
717		if(tube_read_msg(tube, &buf, &len, 1)) {
718			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
719			(*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
720				tube->listen_arg);
721		}
722	}
723}
724
725#endif /* USE_WINSOCK */
726