1/*
2 * util/tube.c - pipe service
3 *
4 * Copyright (c) 2008, NLnet Labs. All rights reserved.
5 *
6 * This software is open source.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 *
15 * Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * Neither the name of the NLNET LABS nor the names of its contributors may
20 * be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/**
37 * \file
38 *
39 * This file contains pipe service functions.
40 */
41#include "config.h"
42#include "util/tube.h"
43#include "util/log.h"
44#include "util/net_help.h"
45#include "util/netevent.h"
46#include "util/fptr_wlist.h"
47#include "util/ub_event.h"
48#ifdef HAVE_POLL_H
49#include <poll.h>
50#endif
51
52#ifndef USE_WINSOCK
53/* on unix */
54
55#ifndef HAVE_SOCKETPAIR
56/** no socketpair() available, like on Minix 3.1.7, use pipe */
57#define socketpair(f, t, p, sv) pipe(sv)
58#endif /* HAVE_SOCKETPAIR */
59
60struct tube* tube_create(void)
61{
62	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
63	int sv[2];
64	if(!tube) {
65		int err = errno;
66		log_err("tube_create: out of memory");
67		errno = err;
68		return NULL;
69	}
70	tube->sr = -1;
71	tube->sw = -1;
72	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
73		int err = errno;
74		log_err("socketpair: %s", strerror(errno));
75		free(tube);
76		errno = err;
77		return NULL;
78	}
79	tube->sr = sv[0];
80	tube->sw = sv[1];
81	if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
82		int err = errno;
83		log_err("tube: cannot set nonblocking");
84		tube_delete(tube);
85		errno = err;
86		return NULL;
87	}
88	return tube;
89}
90
91void tube_delete(struct tube* tube)
92{
93	if(!tube) return;
94	tube_remove_bg_listen(tube);
95	tube_remove_bg_write(tube);
96	/* close fds after deleting commpoints, to be sure.
97	 *            Also epoll does not like closing fd before event_del */
98	tube_close_read(tube);
99	tube_close_write(tube);
100	free(tube);
101}
102
103void tube_close_read(struct tube* tube)
104{
105	if(tube->sr != -1) {
106		close(tube->sr);
107		tube->sr = -1;
108	}
109}
110
111void tube_close_write(struct tube* tube)
112{
113	if(tube->sw != -1) {
114		close(tube->sw);
115		tube->sw = -1;
116	}
117}
118
119void tube_remove_bg_listen(struct tube* tube)
120{
121	if(tube->listen_com) {
122		comm_point_delete(tube->listen_com);
123		tube->listen_com = NULL;
124	}
125	free(tube->cmd_msg);
126	tube->cmd_msg = NULL;
127}
128
129void tube_remove_bg_write(struct tube* tube)
130{
131	if(tube->res_com) {
132		comm_point_delete(tube->res_com);
133		tube->res_com = NULL;
134	}
135	if(tube->res_list) {
136		struct tube_res_list* np, *p = tube->res_list;
137		tube->res_list = NULL;
138		tube->res_last = NULL;
139		while(p) {
140			np = p->next;
141			free(p->buf);
142			free(p);
143			p = np;
144		}
145	}
146}
147
148int
149tube_handle_listen(struct comm_point* c, void* arg, int error,
150        struct comm_reply* ATTR_UNUSED(reply_info))
151{
152	struct tube* tube = (struct tube*)arg;
153	ssize_t r;
154	if(error != NETEVENT_NOERROR) {
155		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
156		(*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
157		return 0;
158	}
159
160	if(tube->cmd_read < sizeof(tube->cmd_len)) {
161		/* complete reading the length of control msg */
162		r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
163			sizeof(tube->cmd_len) - tube->cmd_read);
164		if(r==0) {
165			/* error has happened or */
166			/* parent closed pipe, must have exited somehow */
167			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
168			(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
169				tube->listen_arg);
170			return 0;
171		}
172		if(r==-1) {
173			if(errno != EAGAIN && errno != EINTR) {
174				log_err("rpipe error: %s", strerror(errno));
175			}
176			/* nothing to read now, try later */
177			return 0;
178		}
179		tube->cmd_read += r;
180		if(tube->cmd_read < sizeof(tube->cmd_len)) {
181			/* not complete, try later */
182			return 0;
183		}
184		tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
185		if(!tube->cmd_msg) {
186			log_err("malloc failure");
187			tube->cmd_read = 0;
188			return 0;
189		}
190	}
191	/* cmd_len has been read, read remainder */
192	r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
193		tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
194	if(r==0) {
195		/* error has happened or */
196		/* parent closed pipe, must have exited somehow */
197		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
198		(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
199			tube->listen_arg);
200		return 0;
201	}
202	if(r==-1) {
203		/* nothing to read now, try later */
204		if(errno != EAGAIN && errno != EINTR) {
205			log_err("rpipe error: %s", strerror(errno));
206		}
207		return 0;
208	}
209	tube->cmd_read += r;
210	if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
211		/* not complete, try later */
212		return 0;
213	}
214	tube->cmd_read = 0;
215
216	fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
217	(*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
218		NETEVENT_NOERROR, tube->listen_arg);
219		/* also frees the buf */
220	tube->cmd_msg = NULL;
221	return 0;
222}
223
224int
225tube_handle_write(struct comm_point* c, void* arg, int error,
226        struct comm_reply* ATTR_UNUSED(reply_info))
227{
228	struct tube* tube = (struct tube*)arg;
229	struct tube_res_list* item = tube->res_list;
230	ssize_t r;
231	if(error != NETEVENT_NOERROR) {
232		log_err("tube_handle_write net error %d", error);
233		return 0;
234	}
235
236	if(!item) {
237		comm_point_stop_listening(c);
238		return 0;
239	}
240
241	if(tube->res_write < sizeof(item->len)) {
242		r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
243			sizeof(item->len) - tube->res_write);
244		if(r == -1) {
245			if(errno != EAGAIN && errno != EINTR) {
246				log_err("wpipe error: %s", strerror(errno));
247			}
248			return 0; /* try again later */
249		}
250		if(r == 0) {
251			/* error on pipe, must have exited somehow */
252			/* cannot signal this to pipe user */
253			return 0;
254		}
255		tube->res_write += r;
256		if(tube->res_write < sizeof(item->len))
257			return 0;
258	}
259	r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
260		item->len - (tube->res_write - sizeof(item->len)));
261	if(r == -1) {
262		if(errno != EAGAIN && errno != EINTR) {
263			log_err("wpipe error: %s", strerror(errno));
264		}
265		return 0; /* try again later */
266	}
267	if(r == 0) {
268		/* error on pipe, must have exited somehow */
269		/* cannot signal this to pipe user */
270		return 0;
271	}
272	tube->res_write += r;
273	if(tube->res_write < sizeof(item->len) + item->len)
274		return 0;
275	/* done this result, remove it */
276	free(item->buf);
277	item->buf = NULL;
278	tube->res_list = tube->res_list->next;
279	free(item);
280	if(!tube->res_list) {
281		tube->res_last = NULL;
282		comm_point_stop_listening(c);
283	}
284	tube->res_write = 0;
285	return 0;
286}
287
288int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
289        int nonblock)
290{
291	ssize_t r, d;
292	int fd = tube->sw;
293
294	/* test */
295	if(nonblock) {
296		r = write(fd, &len, sizeof(len));
297		if(r == -1) {
298			if(errno==EINTR || errno==EAGAIN)
299				return -1;
300			log_err("tube msg write failed: %s", strerror(errno));
301			return -1; /* can still continue, perhaps */
302		}
303	} else r = 0;
304	if(!fd_set_block(fd))
305		return 0;
306	/* write remainder */
307	d = r;
308	while(d != (ssize_t)sizeof(len)) {
309		if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
310			if(errno == EAGAIN)
311				continue; /* temporarily unavail: try again*/
312			log_err("tube msg write failed: %s", strerror(errno));
313			(void)fd_set_nonblock(fd);
314			return 0;
315		}
316		d += r;
317	}
318	d = 0;
319	while(d != (ssize_t)len) {
320		if((r=write(fd, buf+d, len-d)) == -1) {
321			if(errno == EAGAIN)
322				continue; /* temporarily unavail: try again*/
323			log_err("tube msg write failed: %s", strerror(errno));
324			(void)fd_set_nonblock(fd);
325			return 0;
326		}
327		d += r;
328	}
329	if(!fd_set_nonblock(fd))
330		return 0;
331	return 1;
332}
333
334int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
335        int nonblock)
336{
337	ssize_t r, d;
338	int fd = tube->sr;
339
340	/* test */
341	*len = 0;
342	if(nonblock) {
343		r = read(fd, len, sizeof(*len));
344		if(r == -1) {
345			if(errno==EINTR || errno==EAGAIN)
346				return -1;
347			log_err("tube msg read failed: %s", strerror(errno));
348			return -1; /* we can still continue, perhaps */
349		}
350		if(r == 0) /* EOF */
351			return 0;
352	} else r = 0;
353	if(!fd_set_block(fd))
354		return 0;
355	/* read remainder */
356	d = r;
357	while(d != (ssize_t)sizeof(*len)) {
358		if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
359			log_err("tube msg read failed: %s", strerror(errno));
360			(void)fd_set_nonblock(fd);
361			return 0;
362		}
363		if(r == 0) /* EOF */ {
364			(void)fd_set_nonblock(fd);
365			return 0;
366		}
367		d += r;
368	}
369	if (*len >= 65536*2) {
370		log_err("tube msg length %u is too big", (unsigned)*len);
371		(void)fd_set_nonblock(fd);
372		return 0;
373	}
374	*buf = (uint8_t*)malloc(*len);
375	if(!*buf) {
376		log_err("tube read out of memory");
377		(void)fd_set_nonblock(fd);
378		return 0;
379	}
380	d = 0;
381	while(d < (ssize_t)*len) {
382		if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
383			log_err("tube msg read failed: %s", strerror(errno));
384			(void)fd_set_nonblock(fd);
385			free(*buf);
386			return 0;
387		}
388		if(r == 0) { /* EOF */
389			(void)fd_set_nonblock(fd);
390			free(*buf);
391			return 0;
392		}
393		d += r;
394	}
395	if(!fd_set_nonblock(fd)) {
396		free(*buf);
397		return 0;
398	}
399	return 1;
400}
401
402/** perform poll() on the fd */
403static int
404pollit(int fd, struct timeval* t)
405{
406	struct pollfd fds;
407	int pret;
408	int msec = -1;
409	memset(&fds, 0, sizeof(fds));
410	fds.fd = fd;
411	fds.events = POLLIN | POLLERR | POLLHUP;
412#ifndef S_SPLINT_S
413	if(t)
414		msec = t->tv_sec*1000 + t->tv_usec/1000;
415#endif
416
417	pret = poll(&fds, 1, msec);
418
419	if(pret == -1)
420		return 0;
421	if(pret != 0)
422		return 1;
423	return 0;
424}
425
426int tube_poll(struct tube* tube)
427{
428	struct timeval t;
429	memset(&t, 0, sizeof(t));
430	return pollit(tube->sr, &t);
431}
432
433int tube_wait(struct tube* tube)
434{
435	return pollit(tube->sr, NULL);
436}
437
438int tube_wait_timeout(struct tube* tube, int msec)
439{
440	int ret = 0;
441
442	while(1) {
443		struct pollfd fds;
444		memset(&fds, 0, sizeof(fds));
445
446		fds.fd = tube->sr;
447		fds.events = POLLIN | POLLERR | POLLHUP;
448		ret = poll(&fds, 1, msec);
449
450		if(ret == -1) {
451			if(errno == EAGAIN || errno == EINTR)
452				continue;
453			return -1;
454		}
455		break;
456	}
457
458	if(ret != 0)
459		return 1;
460	return 0;
461}
462
463int tube_read_fd(struct tube* tube)
464{
465	return tube->sr;
466}
467
468int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
469        tube_callback_type* cb, void* arg)
470{
471	tube->listen_cb = cb;
472	tube->listen_arg = arg;
473	if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
474		0, tube_handle_listen, tube))) {
475		int err = errno;
476		log_err("tube_setup_bg_l: commpoint creation failed");
477		errno = err;
478		return 0;
479	}
480	return 1;
481}
482
483int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
484{
485	if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
486		1, tube_handle_write, tube))) {
487		int err = errno;
488		log_err("tube_setup_bg_w: commpoint creation failed");
489		errno = err;
490		return 0;
491	}
492	return 1;
493}
494
495int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
496{
497	struct tube_res_list* item;
498	if(!tube || !tube->res_com) return 0;
499	item = (struct tube_res_list*)malloc(sizeof(*item));
500	if(!item) {
501		free(msg);
502		log_err("out of memory for async answer");
503		return 0;
504	}
505	item->buf = msg;
506	item->len = len;
507	item->next = NULL;
508	/* add at back of list, since the first one may be partially written */
509	if(tube->res_last)
510		tube->res_last->next = item;
511	else    tube->res_list = item;
512	tube->res_last = item;
513	if(tube->res_list == tube->res_last) {
514		/* first added item, start the write process */
515		comm_point_start_listening(tube->res_com, -1, -1);
516	}
517	return 1;
518}
519
520void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
521	void* ATTR_UNUSED(arg))
522{
523	log_assert(0);
524}
525
526#else /* USE_WINSOCK */
527/* on windows */
528
529
530struct tube* tube_create(void)
531{
532	/* windows does not have forks like unix, so we only support
533	 * threads on windows. And thus the pipe need only connect
534	 * threads. We use a mutex and a list of datagrams. */
535	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
536	if(!tube) {
537		int err = errno;
538		log_err("tube_create: out of memory");
539		errno = err;
540		return NULL;
541	}
542	tube->event = WSACreateEvent();
543	if(tube->event == WSA_INVALID_EVENT) {
544		free(tube);
545		log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
546		return NULL;
547	}
548	if(!WSAResetEvent(tube->event)) {
549		log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
550	}
551	lock_basic_init(&tube->res_lock);
552	verbose(VERB_ALGO, "tube created");
553	return tube;
554}
555
556void tube_delete(struct tube* tube)
557{
558	if(!tube) return;
559	tube_remove_bg_listen(tube);
560	tube_remove_bg_write(tube);
561	tube_close_read(tube);
562	tube_close_write(tube);
563	if(!WSACloseEvent(tube->event))
564		log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
565	lock_basic_destroy(&tube->res_lock);
566	verbose(VERB_ALGO, "tube deleted");
567	free(tube);
568}
569
570void tube_close_read(struct tube* ATTR_UNUSED(tube))
571{
572	verbose(VERB_ALGO, "tube close_read");
573}
574
575void tube_close_write(struct tube* ATTR_UNUSED(tube))
576{
577	verbose(VERB_ALGO, "tube close_write");
578	/* wake up waiting reader with an empty queue */
579	if(!WSASetEvent(tube->event)) {
580		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
581	}
582}
583
584void tube_remove_bg_listen(struct tube* tube)
585{
586	verbose(VERB_ALGO, "tube remove_bg_listen");
587	ub_winsock_unregister_wsaevent(tube->ev_listen);
588}
589
590void tube_remove_bg_write(struct tube* tube)
591{
592	verbose(VERB_ALGO, "tube remove_bg_write");
593	if(tube->res_list) {
594		struct tube_res_list* np, *p = tube->res_list;
595		tube->res_list = NULL;
596		tube->res_last = NULL;
597		while(p) {
598			np = p->next;
599			free(p->buf);
600			free(p);
601			p = np;
602		}
603	}
604}
605
606int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
607        int ATTR_UNUSED(nonblock))
608{
609	uint8_t* a;
610	verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
611	a = (uint8_t*)memdup(buf, len);
612	if(!a) {
613		log_err("out of memory in tube_write_msg");
614		return 0;
615	}
616	/* always nonblocking, this pipe cannot get full */
617	return tube_queue_item(tube, a, len);
618}
619
620int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
621        int nonblock)
622{
623	struct tube_res_list* item = NULL;
624	verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
625	*buf = NULL;
626	if(!tube_poll(tube)) {
627		verbose(VERB_ALGO, "tube read_msg nodata");
628		/* nothing ready right now, wait if we want to */
629		if(nonblock)
630			return -1; /* would block waiting for items */
631		if(!tube_wait(tube))
632			return 0;
633	}
634	lock_basic_lock(&tube->res_lock);
635	if(tube->res_list) {
636		item = tube->res_list;
637		tube->res_list = item->next;
638		if(tube->res_last == item) {
639			/* the list is now empty */
640			tube->res_last = NULL;
641			verbose(VERB_ALGO, "tube read_msg lastdata");
642			if(!WSAResetEvent(tube->event)) {
643				log_err("WSAResetEvent: %s",
644					wsa_strerror(WSAGetLastError()));
645			}
646		}
647	}
648	lock_basic_unlock(&tube->res_lock);
649	if(!item)
650		return 0; /* would block waiting for items */
651	*buf = item->buf;
652	*len = item->len;
653	free(item);
654	verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
655	return 1;
656}
657
658int tube_poll(struct tube* tube)
659{
660	struct tube_res_list* item = NULL;
661	lock_basic_lock(&tube->res_lock);
662	item = tube->res_list;
663	lock_basic_unlock(&tube->res_lock);
664	if(item)
665		return 1;
666	return 0;
667}
668
669int tube_wait(struct tube* tube)
670{
671	/* block on eventhandle */
672	DWORD res = WSAWaitForMultipleEvents(
673		1 /* one event in array */,
674		&tube->event /* the event to wait for, our pipe signal */,
675		0 /* wait for all events is false */,
676		WSA_INFINITE /* wait, no timeout */,
677		0 /* we are not alertable for IO completion routines */
678		);
679	if(res == WSA_WAIT_TIMEOUT) {
680		return 0;
681	}
682	if(res == WSA_WAIT_IO_COMPLETION) {
683		/* a bit unexpected, since we were not alertable */
684		return 0;
685	}
686	return 1;
687}
688
689int tube_wait_timeout(struct tube* tube, int msec)
690{
691	/* block on eventhandle */
692	DWORD res = WSAWaitForMultipleEvents(
693		1 /* one event in array */,
694		&tube->event /* the event to wait for, our pipe signal */,
695		0 /* wait for all events is false */,
696		msec /* wait for timeout */,
697		0 /* we are not alertable for IO completion routines */
698		);
699	if(res == WSA_WAIT_TIMEOUT) {
700		return 0;
701	}
702	if(res == WSA_WAIT_IO_COMPLETION) {
703		/* a bit unexpected, since we were not alertable */
704		return -1;
705	}
706	return 1;
707}
708
709int tube_read_fd(struct tube* ATTR_UNUSED(tube))
710{
711	/* nothing sensible on Windows */
712	return -1;
713}
714
715int
716tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
717	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
718{
719	log_assert(0);
720	return 0;
721}
722
723int
724tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
725	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
726{
727	log_assert(0);
728	return 0;
729}
730
731int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
732        tube_callback_type* cb, void* arg)
733{
734	tube->listen_cb = cb;
735	tube->listen_arg = arg;
736	if(!comm_base_internal(base))
737		return 1; /* ignore when no comm base - testing */
738	tube->ev_listen = ub_winsock_register_wsaevent(
739	    comm_base_internal(base), tube->event, &tube_handle_signal, tube);
740	return tube->ev_listen ? 1 : 0;
741}
742
743int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
744	struct comm_base* ATTR_UNUSED(base))
745{
746	/* the queue item routine performs the signaling */
747	return 1;
748}
749
750int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
751{
752	struct tube_res_list* item;
753	if(!tube) return 0;
754	item = (struct tube_res_list*)malloc(sizeof(*item));
755	verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
756	if(!item) {
757		free(msg);
758		log_err("out of memory for async answer");
759		return 0;
760	}
761	item->buf = msg;
762	item->len = len;
763	item->next = NULL;
764	lock_basic_lock(&tube->res_lock);
765	/* add at back of list, since the first one may be partially written */
766	if(tube->res_last)
767		tube->res_last->next = item;
768	else    tube->res_list = item;
769	tube->res_last = item;
770	/* signal the eventhandle */
771	if(!WSASetEvent(tube->event)) {
772		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
773	}
774	lock_basic_unlock(&tube->res_lock);
775	return 1;
776}
777
778void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
779	void* arg)
780{
781	struct tube* tube = (struct tube*)arg;
782	uint8_t* buf;
783	uint32_t len = 0;
784	verbose(VERB_ALGO, "tube handle_signal");
785	while(tube_poll(tube)) {
786		if(tube_read_msg(tube, &buf, &len, 1)) {
787			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
788			(*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
789				tube->listen_arg);
790		}
791	}
792}
793
794#endif /* USE_WINSOCK */
795