1238106Sdes/*
2238106Sdes * util/tube.c - pipe service
3238106Sdes *
4238106Sdes * Copyright (c) 2008, NLnet Labs. All rights reserved.
5238106Sdes *
6238106Sdes * This software is open source.
7238106Sdes *
8238106Sdes * Redistribution and use in source and binary forms, with or without
9238106Sdes * modification, are permitted provided that the following conditions
10238106Sdes * are met:
11238106Sdes *
12238106Sdes * Redistributions of source code must retain the above copyright notice,
13238106Sdes * this list of conditions and the following disclaimer.
14238106Sdes *
15238106Sdes * Redistributions in binary form must reproduce the above copyright notice,
16238106Sdes * this list of conditions and the following disclaimer in the documentation
17238106Sdes * and/or other materials provided with the distribution.
18238106Sdes *
19238106Sdes * Neither the name of the NLNET LABS nor the names of its contributors may
20238106Sdes * be used to endorse or promote products derived from this software without
21238106Sdes * specific prior written permission.
22238106Sdes *
23238106Sdes * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24266114Sdes * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25266114Sdes * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26266114Sdes * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27266114Sdes * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28266114Sdes * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
29266114Sdes * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30266114Sdes * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31266114Sdes * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32266114Sdes * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33266114Sdes * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34238106Sdes */
35238106Sdes
36238106Sdes/**
37238106Sdes * \file
38238106Sdes *
39238106Sdes * This file contains pipe service functions.
40238106Sdes */
41238106Sdes#include "config.h"
42238106Sdes#include "util/tube.h"
43238106Sdes#include "util/log.h"
44238106Sdes#include "util/net_help.h"
45238106Sdes#include "util/netevent.h"
46238106Sdes#include "util/fptr_wlist.h"
47238106Sdes
48238106Sdes#ifndef USE_WINSOCK
49238106Sdes/* on unix */
50238106Sdes
51238106Sdes#ifndef HAVE_SOCKETPAIR
52238106Sdes/** no socketpair() available, like on Minix 3.1.7, use pipe */
53238106Sdes#define socketpair(f, t, p, sv) pipe(sv)
54238106Sdes#endif /* HAVE_SOCKETPAIR */
55238106Sdes
56238106Sdesstruct tube* tube_create(void)
57238106Sdes{
58238106Sdes	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
59238106Sdes	int sv[2];
60238106Sdes	if(!tube) {
61238106Sdes		int err = errno;
62238106Sdes		log_err("tube_create: out of memory");
63238106Sdes		errno = err;
64238106Sdes		return NULL;
65238106Sdes	}
66238106Sdes	tube->sr = -1;
67238106Sdes	tube->sw = -1;
68238106Sdes	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
69238106Sdes		int err = errno;
70238106Sdes		log_err("socketpair: %s", strerror(errno));
71238106Sdes		free(tube);
72238106Sdes		errno = err;
73238106Sdes		return NULL;
74238106Sdes	}
75238106Sdes	tube->sr = sv[0];
76238106Sdes	tube->sw = sv[1];
77238106Sdes	if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
78238106Sdes		int err = errno;
79238106Sdes		log_err("tube: cannot set nonblocking");
80238106Sdes		tube_delete(tube);
81238106Sdes		errno = err;
82238106Sdes		return NULL;
83238106Sdes	}
84238106Sdes	return tube;
85238106Sdes}
86238106Sdes
87238106Sdesvoid tube_delete(struct tube* tube)
88238106Sdes{
89238106Sdes	if(!tube) return;
90238106Sdes	tube_remove_bg_listen(tube);
91238106Sdes	tube_remove_bg_write(tube);
92238106Sdes	/* close fds after deleting commpoints, to be sure.
93238106Sdes	 *            Also epoll does not like closing fd before event_del */
94238106Sdes	tube_close_read(tube);
95238106Sdes	tube_close_write(tube);
96238106Sdes	free(tube);
97238106Sdes}
98238106Sdes
99238106Sdesvoid tube_close_read(struct tube* tube)
100238106Sdes{
101238106Sdes	if(tube->sr != -1) {
102238106Sdes		close(tube->sr);
103238106Sdes		tube->sr = -1;
104238106Sdes	}
105238106Sdes}
106238106Sdes
107238106Sdesvoid tube_close_write(struct tube* tube)
108238106Sdes{
109238106Sdes	if(tube->sw != -1) {
110238106Sdes		close(tube->sw);
111238106Sdes		tube->sw = -1;
112238106Sdes	}
113238106Sdes}
114238106Sdes
115238106Sdesvoid tube_remove_bg_listen(struct tube* tube)
116238106Sdes{
117238106Sdes	if(tube->listen_com) {
118238106Sdes		comm_point_delete(tube->listen_com);
119238106Sdes		tube->listen_com = NULL;
120238106Sdes	}
121296415Sdes	free(tube->cmd_msg);
122296415Sdes	tube->cmd_msg = NULL;
123238106Sdes}
124238106Sdes
125238106Sdesvoid tube_remove_bg_write(struct tube* tube)
126238106Sdes{
127238106Sdes	if(tube->res_com) {
128238106Sdes		comm_point_delete(tube->res_com);
129238106Sdes		tube->res_com = NULL;
130238106Sdes	}
131238106Sdes	if(tube->res_list) {
132238106Sdes		struct tube_res_list* np, *p = tube->res_list;
133238106Sdes		tube->res_list = NULL;
134238106Sdes		tube->res_last = NULL;
135238106Sdes		while(p) {
136238106Sdes			np = p->next;
137238106Sdes			free(p->buf);
138238106Sdes			free(p);
139238106Sdes			p = np;
140238106Sdes		}
141238106Sdes	}
142238106Sdes}
143238106Sdes
144238106Sdesint
145238106Sdestube_handle_listen(struct comm_point* c, void* arg, int error,
146238106Sdes        struct comm_reply* ATTR_UNUSED(reply_info))
147238106Sdes{
148238106Sdes	struct tube* tube = (struct tube*)arg;
149238106Sdes	ssize_t r;
150238106Sdes	if(error != NETEVENT_NOERROR) {
151238106Sdes		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
152238106Sdes		(*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
153238106Sdes		return 0;
154238106Sdes	}
155238106Sdes
156238106Sdes	if(tube->cmd_read < sizeof(tube->cmd_len)) {
157238106Sdes		/* complete reading the length of control msg */
158238106Sdes		r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
159238106Sdes			sizeof(tube->cmd_len) - tube->cmd_read);
160238106Sdes		if(r==0) {
161238106Sdes			/* error has happened or */
162238106Sdes			/* parent closed pipe, must have exited somehow */
163238106Sdes			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
164238106Sdes			(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
165238106Sdes				tube->listen_arg);
166238106Sdes			return 0;
167238106Sdes		}
168238106Sdes		if(r==-1) {
169238106Sdes			if(errno != EAGAIN && errno != EINTR) {
170238106Sdes				log_err("rpipe error: %s", strerror(errno));
171238106Sdes			}
172238106Sdes			/* nothing to read now, try later */
173238106Sdes			return 0;
174238106Sdes		}
175238106Sdes		tube->cmd_read += r;
176238106Sdes		if(tube->cmd_read < sizeof(tube->cmd_len)) {
177238106Sdes			/* not complete, try later */
178238106Sdes			return 0;
179238106Sdes		}
180238106Sdes		tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
181238106Sdes		if(!tube->cmd_msg) {
182238106Sdes			log_err("malloc failure");
183238106Sdes			tube->cmd_read = 0;
184238106Sdes			return 0;
185238106Sdes		}
186238106Sdes	}
187238106Sdes	/* cmd_len has been read, read remainder */
188238106Sdes	r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
189238106Sdes		tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
190238106Sdes	if(r==0) {
191238106Sdes		/* error has happened or */
192238106Sdes		/* parent closed pipe, must have exited somehow */
193238106Sdes		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
194238106Sdes		(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
195238106Sdes			tube->listen_arg);
196238106Sdes		return 0;
197238106Sdes	}
198238106Sdes	if(r==-1) {
199238106Sdes		/* nothing to read now, try later */
200238106Sdes		if(errno != EAGAIN && errno != EINTR) {
201238106Sdes			log_err("rpipe error: %s", strerror(errno));
202238106Sdes		}
203238106Sdes		return 0;
204238106Sdes	}
205238106Sdes	tube->cmd_read += r;
206238106Sdes	if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
207238106Sdes		/* not complete, try later */
208238106Sdes		return 0;
209238106Sdes	}
210238106Sdes	tube->cmd_read = 0;
211238106Sdes
212238106Sdes	fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
213238106Sdes	(*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
214238106Sdes		NETEVENT_NOERROR, tube->listen_arg);
215238106Sdes		/* also frees the buf */
216238106Sdes	tube->cmd_msg = NULL;
217238106Sdes	return 0;
218238106Sdes}
219238106Sdes
220238106Sdesint
221238106Sdestube_handle_write(struct comm_point* c, void* arg, int error,
222238106Sdes        struct comm_reply* ATTR_UNUSED(reply_info))
223238106Sdes{
224238106Sdes	struct tube* tube = (struct tube*)arg;
225238106Sdes	struct tube_res_list* item = tube->res_list;
226238106Sdes	ssize_t r;
227238106Sdes	if(error != NETEVENT_NOERROR) {
228238106Sdes		log_err("tube_handle_write net error %d", error);
229238106Sdes		return 0;
230238106Sdes	}
231238106Sdes
232238106Sdes	if(!item) {
233238106Sdes		comm_point_stop_listening(c);
234238106Sdes		return 0;
235238106Sdes	}
236238106Sdes
237238106Sdes	if(tube->res_write < sizeof(item->len)) {
238238106Sdes		r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
239238106Sdes			sizeof(item->len) - tube->res_write);
240238106Sdes		if(r == -1) {
241238106Sdes			if(errno != EAGAIN && errno != EINTR) {
242238106Sdes				log_err("wpipe error: %s", strerror(errno));
243238106Sdes			}
244238106Sdes			return 0; /* try again later */
245238106Sdes		}
246238106Sdes		if(r == 0) {
247238106Sdes			/* error on pipe, must have exited somehow */
248238106Sdes			/* cannot signal this to pipe user */
249238106Sdes			return 0;
250238106Sdes		}
251238106Sdes		tube->res_write += r;
252238106Sdes		if(tube->res_write < sizeof(item->len))
253238106Sdes			return 0;
254238106Sdes	}
255238106Sdes	r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
256238106Sdes		item->len - (tube->res_write - sizeof(item->len)));
257238106Sdes	if(r == -1) {
258238106Sdes		if(errno != EAGAIN && errno != EINTR) {
259238106Sdes			log_err("wpipe error: %s", strerror(errno));
260238106Sdes		}
261238106Sdes		return 0; /* try again later */
262238106Sdes	}
263238106Sdes	if(r == 0) {
264238106Sdes		/* error on pipe, must have exited somehow */
265238106Sdes		/* cannot signal this to pipe user */
266238106Sdes		return 0;
267238106Sdes	}
268238106Sdes	tube->res_write += r;
269238106Sdes	if(tube->res_write < sizeof(item->len) + item->len)
270238106Sdes		return 0;
271238106Sdes	/* done this result, remove it */
272238106Sdes	free(item->buf);
273238106Sdes	item->buf = NULL;
274238106Sdes	tube->res_list = tube->res_list->next;
275238106Sdes	free(item);
276238106Sdes	if(!tube->res_list) {
277238106Sdes		tube->res_last = NULL;
278238106Sdes		comm_point_stop_listening(c);
279238106Sdes	}
280238106Sdes	tube->res_write = 0;
281238106Sdes	return 0;
282238106Sdes}
283238106Sdes
284238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
285238106Sdes        int nonblock)
286238106Sdes{
287238106Sdes	ssize_t r, d;
288238106Sdes	int fd = tube->sw;
289238106Sdes
290238106Sdes	/* test */
291238106Sdes	if(nonblock) {
292238106Sdes		r = write(fd, &len, sizeof(len));
293238106Sdes		if(r == -1) {
294238106Sdes			if(errno==EINTR || errno==EAGAIN)
295238106Sdes				return -1;
296238106Sdes			log_err("tube msg write failed: %s", strerror(errno));
297238106Sdes			return -1; /* can still continue, perhaps */
298238106Sdes		}
299238106Sdes	} else r = 0;
300238106Sdes	if(!fd_set_block(fd))
301238106Sdes		return 0;
302238106Sdes	/* write remainder */
303238106Sdes	d = r;
304238106Sdes	while(d != (ssize_t)sizeof(len)) {
305238106Sdes		if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
306238106Sdes			log_err("tube msg write failed: %s", strerror(errno));
307238106Sdes			(void)fd_set_nonblock(fd);
308238106Sdes			return 0;
309238106Sdes		}
310238106Sdes		d += r;
311238106Sdes	}
312238106Sdes	d = 0;
313238106Sdes	while(d != (ssize_t)len) {
314238106Sdes		if((r=write(fd, buf+d, len-d)) == -1) {
315238106Sdes			log_err("tube msg write failed: %s", strerror(errno));
316238106Sdes			(void)fd_set_nonblock(fd);
317238106Sdes			return 0;
318238106Sdes		}
319238106Sdes		d += r;
320238106Sdes	}
321238106Sdes	if(!fd_set_nonblock(fd))
322238106Sdes		return 0;
323238106Sdes	return 1;
324238106Sdes}
325238106Sdes
326238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
327238106Sdes        int nonblock)
328238106Sdes{
329238106Sdes	ssize_t r, d;
330238106Sdes	int fd = tube->sr;
331238106Sdes
332238106Sdes	/* test */
333238106Sdes	*len = 0;
334238106Sdes	if(nonblock) {
335238106Sdes		r = read(fd, len, sizeof(*len));
336238106Sdes		if(r == -1) {
337238106Sdes			if(errno==EINTR || errno==EAGAIN)
338238106Sdes				return -1;
339238106Sdes			log_err("tube msg read failed: %s", strerror(errno));
340238106Sdes			return -1; /* we can still continue, perhaps */
341238106Sdes		}
342238106Sdes		if(r == 0) /* EOF */
343238106Sdes			return 0;
344238106Sdes	} else r = 0;
345238106Sdes	if(!fd_set_block(fd))
346238106Sdes		return 0;
347238106Sdes	/* read remainder */
348238106Sdes	d = r;
349238106Sdes	while(d != (ssize_t)sizeof(*len)) {
350238106Sdes		if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
351238106Sdes			log_err("tube msg read failed: %s", strerror(errno));
352238106Sdes			(void)fd_set_nonblock(fd);
353238106Sdes			return 0;
354238106Sdes		}
355238106Sdes		if(r == 0) /* EOF */ {
356238106Sdes			(void)fd_set_nonblock(fd);
357238106Sdes			return 0;
358238106Sdes		}
359238106Sdes		d += r;
360238106Sdes	}
361249141Sdes	log_assert(*len < 65536*2);
362238106Sdes	*buf = (uint8_t*)malloc(*len);
363238106Sdes	if(!*buf) {
364238106Sdes		log_err("tube read out of memory");
365238106Sdes		(void)fd_set_nonblock(fd);
366238106Sdes		return 0;
367238106Sdes	}
368238106Sdes	d = 0;
369266114Sdes	while(d < (ssize_t)*len) {
370238106Sdes		if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
371238106Sdes			log_err("tube msg read failed: %s", strerror(errno));
372238106Sdes			(void)fd_set_nonblock(fd);
373238106Sdes			free(*buf);
374238106Sdes			return 0;
375238106Sdes		}
376238106Sdes		if(r == 0) { /* EOF */
377238106Sdes			(void)fd_set_nonblock(fd);
378238106Sdes			free(*buf);
379238106Sdes			return 0;
380238106Sdes		}
381238106Sdes		d += r;
382238106Sdes	}
383238106Sdes	if(!fd_set_nonblock(fd)) {
384238106Sdes		free(*buf);
385238106Sdes		return 0;
386238106Sdes	}
387238106Sdes	return 1;
388238106Sdes}
389238106Sdes
390238106Sdes/** perform a select() on the fd */
391238106Sdesstatic int
392238106Sdespollit(int fd, struct timeval* t)
393238106Sdes{
394238106Sdes	fd_set r;
395238106Sdes#ifndef S_SPLINT_S
396238106Sdes	FD_ZERO(&r);
397238106Sdes	FD_SET(FD_SET_T fd, &r);
398238106Sdes#endif
399238106Sdes	if(select(fd+1, &r, NULL, NULL, t) == -1) {
400238106Sdes		return 0;
401238106Sdes	}
402238106Sdes	errno = 0;
403238106Sdes	return (int)(FD_ISSET(fd, &r));
404238106Sdes}
405238106Sdes
406238106Sdesint tube_poll(struct tube* tube)
407238106Sdes{
408238106Sdes	struct timeval t;
409238106Sdes	memset(&t, 0, sizeof(t));
410238106Sdes	return pollit(tube->sr, &t);
411238106Sdes}
412238106Sdes
413238106Sdesint tube_wait(struct tube* tube)
414238106Sdes{
415238106Sdes	return pollit(tube->sr, NULL);
416238106Sdes}
417238106Sdes
418238106Sdesint tube_read_fd(struct tube* tube)
419238106Sdes{
420238106Sdes	return tube->sr;
421238106Sdes}
422238106Sdes
423238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
424238106Sdes        tube_callback_t* cb, void* arg)
425238106Sdes{
426238106Sdes	tube->listen_cb = cb;
427238106Sdes	tube->listen_arg = arg;
428238106Sdes	if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
429238106Sdes		0, tube_handle_listen, tube))) {
430238106Sdes		int err = errno;
431238106Sdes		log_err("tube_setup_bg_l: commpoint creation failed");
432238106Sdes		errno = err;
433238106Sdes		return 0;
434238106Sdes	}
435238106Sdes	return 1;
436238106Sdes}
437238106Sdes
438238106Sdesint tube_setup_bg_write(struct tube* tube, struct comm_base* base)
439238106Sdes{
440238106Sdes	if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
441238106Sdes		1, tube_handle_write, tube))) {
442238106Sdes		int err = errno;
443238106Sdes		log_err("tube_setup_bg_w: commpoint creation failed");
444238106Sdes		errno = err;
445238106Sdes		return 0;
446238106Sdes	}
447238106Sdes	return 1;
448238106Sdes}
449238106Sdes
450238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
451238106Sdes{
452238106Sdes	struct tube_res_list* item =
453238106Sdes		(struct tube_res_list*)malloc(sizeof(*item));
454238106Sdes	if(!item) {
455238106Sdes		free(msg);
456238106Sdes		log_err("out of memory for async answer");
457238106Sdes		return 0;
458238106Sdes	}
459238106Sdes	item->buf = msg;
460238106Sdes	item->len = len;
461238106Sdes	item->next = NULL;
462238106Sdes	/* add at back of list, since the first one may be partially written */
463238106Sdes	if(tube->res_last)
464238106Sdes		tube->res_last->next = item;
465238106Sdes	else    tube->res_list = item;
466238106Sdes	tube->res_last = item;
467238106Sdes	if(tube->res_list == tube->res_last) {
468238106Sdes		/* first added item, start the write process */
469238106Sdes		comm_point_start_listening(tube->res_com, -1, -1);
470238106Sdes	}
471238106Sdes	return 1;
472238106Sdes}
473238106Sdes
474238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
475238106Sdes	void* ATTR_UNUSED(arg))
476238106Sdes{
477238106Sdes	log_assert(0);
478238106Sdes}
479238106Sdes
480238106Sdes#else /* USE_WINSOCK */
481238106Sdes/* on windows */
482238106Sdes
483238106Sdes
484238106Sdesstruct tube* tube_create(void)
485238106Sdes{
486238106Sdes	/* windows does not have forks like unix, so we only support
487238106Sdes	 * threads on windows. And thus the pipe need only connect
488238106Sdes	 * threads. We use a mutex and a list of datagrams. */
489238106Sdes	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
490238106Sdes	if(!tube) {
491238106Sdes		int err = errno;
492238106Sdes		log_err("tube_create: out of memory");
493238106Sdes		errno = err;
494238106Sdes		return NULL;
495238106Sdes	}
496238106Sdes	tube->event = WSACreateEvent();
497238106Sdes	if(tube->event == WSA_INVALID_EVENT) {
498238106Sdes		free(tube);
499238106Sdes		log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
500238106Sdes	}
501238106Sdes	if(!WSAResetEvent(tube->event)) {
502238106Sdes		log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
503238106Sdes	}
504238106Sdes	lock_basic_init(&tube->res_lock);
505238106Sdes	verbose(VERB_ALGO, "tube created");
506238106Sdes	return tube;
507238106Sdes}
508238106Sdes
509238106Sdesvoid tube_delete(struct tube* tube)
510238106Sdes{
511238106Sdes	if(!tube) return;
512238106Sdes	tube_remove_bg_listen(tube);
513238106Sdes	tube_remove_bg_write(tube);
514238106Sdes	tube_close_read(tube);
515238106Sdes	tube_close_write(tube);
516238106Sdes	if(!WSACloseEvent(tube->event))
517238106Sdes		log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
518238106Sdes	lock_basic_destroy(&tube->res_lock);
519238106Sdes	verbose(VERB_ALGO, "tube deleted");
520238106Sdes	free(tube);
521238106Sdes}
522238106Sdes
523238106Sdesvoid tube_close_read(struct tube* ATTR_UNUSED(tube))
524238106Sdes{
525238106Sdes	verbose(VERB_ALGO, "tube close_read");
526238106Sdes}
527238106Sdes
528238106Sdesvoid tube_close_write(struct tube* ATTR_UNUSED(tube))
529238106Sdes{
530238106Sdes	verbose(VERB_ALGO, "tube close_write");
531238106Sdes	/* wake up waiting reader with an empty queue */
532238106Sdes	if(!WSASetEvent(tube->event)) {
533238106Sdes		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
534238106Sdes	}
535238106Sdes}
536238106Sdes
537238106Sdesvoid tube_remove_bg_listen(struct tube* tube)
538238106Sdes{
539238106Sdes	verbose(VERB_ALGO, "tube remove_bg_listen");
540238106Sdes	winsock_unregister_wsaevent(&tube->ev_listen);
541238106Sdes}
542238106Sdes
543238106Sdesvoid tube_remove_bg_write(struct tube* tube)
544238106Sdes{
545238106Sdes	verbose(VERB_ALGO, "tube remove_bg_write");
546238106Sdes	if(tube->res_list) {
547238106Sdes		struct tube_res_list* np, *p = tube->res_list;
548238106Sdes		tube->res_list = NULL;
549238106Sdes		tube->res_last = NULL;
550238106Sdes		while(p) {
551238106Sdes			np = p->next;
552238106Sdes			free(p->buf);
553238106Sdes			free(p);
554238106Sdes			p = np;
555238106Sdes		}
556238106Sdes	}
557238106Sdes}
558238106Sdes
559238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
560238106Sdes        int ATTR_UNUSED(nonblock))
561238106Sdes{
562238106Sdes	uint8_t* a;
563238106Sdes	verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
564238106Sdes	a = (uint8_t*)memdup(buf, len);
565238106Sdes	if(!a) {
566238106Sdes		log_err("out of memory in tube_write_msg");
567238106Sdes		return 0;
568238106Sdes	}
569238106Sdes	/* always nonblocking, this pipe cannot get full */
570238106Sdes	return tube_queue_item(tube, a, len);
571238106Sdes}
572238106Sdes
573238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
574238106Sdes        int nonblock)
575238106Sdes{
576238106Sdes	struct tube_res_list* item = NULL;
577238106Sdes	verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
578238106Sdes	*buf = NULL;
579238106Sdes	if(!tube_poll(tube)) {
580238106Sdes		verbose(VERB_ALGO, "tube read_msg nodata");
581238106Sdes		/* nothing ready right now, wait if we want to */
582238106Sdes		if(nonblock)
583238106Sdes			return -1; /* would block waiting for items */
584238106Sdes		if(!tube_wait(tube))
585238106Sdes			return 0;
586238106Sdes	}
587238106Sdes	lock_basic_lock(&tube->res_lock);
588238106Sdes	if(tube->res_list) {
589238106Sdes		item = tube->res_list;
590238106Sdes		tube->res_list = item->next;
591238106Sdes		if(tube->res_last == item) {
592238106Sdes			/* the list is now empty */
593238106Sdes			tube->res_last = NULL;
594238106Sdes			verbose(VERB_ALGO, "tube read_msg lastdata");
595238106Sdes			if(!WSAResetEvent(tube->event)) {
596238106Sdes				log_err("WSAResetEvent: %s",
597238106Sdes					wsa_strerror(WSAGetLastError()));
598238106Sdes			}
599238106Sdes		}
600238106Sdes	}
601238106Sdes	lock_basic_unlock(&tube->res_lock);
602238106Sdes	if(!item)
603238106Sdes		return 0; /* would block waiting for items */
604238106Sdes	*buf = item->buf;
605238106Sdes	*len = item->len;
606238106Sdes	free(item);
607238106Sdes	verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
608238106Sdes	return 1;
609238106Sdes}
610238106Sdes
611238106Sdesint tube_poll(struct tube* tube)
612238106Sdes{
613238106Sdes	struct tube_res_list* item = NULL;
614238106Sdes	lock_basic_lock(&tube->res_lock);
615238106Sdes	item = tube->res_list;
616238106Sdes	lock_basic_unlock(&tube->res_lock);
617238106Sdes	if(item)
618238106Sdes		return 1;
619238106Sdes	return 0;
620238106Sdes}
621238106Sdes
622238106Sdesint tube_wait(struct tube* tube)
623238106Sdes{
624238106Sdes	/* block on eventhandle */
625238106Sdes	DWORD res = WSAWaitForMultipleEvents(
626238106Sdes		1 /* one event in array */,
627238106Sdes		&tube->event /* the event to wait for, our pipe signal */,
628238106Sdes		0 /* wait for all events is false */,
629238106Sdes		WSA_INFINITE /* wait, no timeout */,
630238106Sdes		0 /* we are not alertable for IO completion routines */
631238106Sdes		);
632238106Sdes	if(res == WSA_WAIT_TIMEOUT) {
633238106Sdes		return 0;
634238106Sdes	}
635238106Sdes	if(res == WSA_WAIT_IO_COMPLETION) {
636238106Sdes		/* a bit unexpected, since we were not alertable */
637238106Sdes		return 0;
638238106Sdes	}
639238106Sdes	return 1;
640238106Sdes}
641238106Sdes
642238106Sdesint tube_read_fd(struct tube* ATTR_UNUSED(tube))
643238106Sdes{
644238106Sdes	/* nothing sensible on Windows */
645238106Sdes	return -1;
646238106Sdes}
647238106Sdes
648238106Sdesint
649238106Sdestube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
650238106Sdes	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
651238106Sdes{
652238106Sdes	log_assert(0);
653238106Sdes	return 0;
654238106Sdes}
655238106Sdes
656238106Sdesint
657238106Sdestube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
658238106Sdes	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
659238106Sdes{
660238106Sdes	log_assert(0);
661238106Sdes	return 0;
662238106Sdes}
663238106Sdes
664238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
665238106Sdes        tube_callback_t* cb, void* arg)
666238106Sdes{
667238106Sdes	tube->listen_cb = cb;
668238106Sdes	tube->listen_arg = arg;
669238106Sdes	if(!comm_base_internal(base))
670238106Sdes		return 1; /* ignore when no comm base - testing */
671238106Sdes	return winsock_register_wsaevent(comm_base_internal(base),
672238106Sdes		&tube->ev_listen, tube->event, &tube_handle_signal, tube);
673238106Sdes}
674238106Sdes
675238106Sdesint tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
676238106Sdes	struct comm_base* ATTR_UNUSED(base))
677238106Sdes{
678238106Sdes	/* the queue item routine performs the signaling */
679238106Sdes	return 1;
680238106Sdes}
681238106Sdes
682238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
683238106Sdes{
684238106Sdes	struct tube_res_list* item =
685238106Sdes		(struct tube_res_list*)malloc(sizeof(*item));
686238106Sdes	verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
687238106Sdes	if(!item) {
688238106Sdes		free(msg);
689238106Sdes		log_err("out of memory for async answer");
690238106Sdes		return 0;
691238106Sdes	}
692238106Sdes	item->buf = msg;
693238106Sdes	item->len = len;
694238106Sdes	item->next = NULL;
695238106Sdes	lock_basic_lock(&tube->res_lock);
696238106Sdes	/* add at back of list, since the first one may be partially written */
697238106Sdes	if(tube->res_last)
698238106Sdes		tube->res_last->next = item;
699238106Sdes	else    tube->res_list = item;
700238106Sdes	tube->res_last = item;
701238106Sdes	/* signal the eventhandle */
702238106Sdes	if(!WSASetEvent(tube->event)) {
703238106Sdes		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
704238106Sdes	}
705238106Sdes	lock_basic_unlock(&tube->res_lock);
706238106Sdes	return 1;
707238106Sdes}
708238106Sdes
709238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
710238106Sdes	void* arg)
711238106Sdes{
712238106Sdes	struct tube* tube = (struct tube*)arg;
713238106Sdes	uint8_t* buf;
714238106Sdes	uint32_t len = 0;
715238106Sdes	verbose(VERB_ALGO, "tube handle_signal");
716238106Sdes	while(tube_poll(tube)) {
717238106Sdes		if(tube_read_msg(tube, &buf, &len, 1)) {
718238106Sdes			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
719238106Sdes			(*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
720238106Sdes				tube->listen_arg);
721238106Sdes		}
722238106Sdes	}
723238106Sdes}
724238106Sdes
725238106Sdes#endif /* USE_WINSOCK */
726