tube.c revision 249141
1238106Sdes/*
2238106Sdes * util/tube.c - pipe service
3238106Sdes *
4238106Sdes * Copyright (c) 2008, NLnet Labs. All rights reserved.
5238106Sdes *
6238106Sdes * This software is open source.
7238106Sdes *
8238106Sdes * Redistribution and use in source and binary forms, with or without
9238106Sdes * modification, are permitted provided that the following conditions
10238106Sdes * are met:
11238106Sdes *
12238106Sdes * Redistributions of source code must retain the above copyright notice,
13238106Sdes * this list of conditions and the following disclaimer.
14238106Sdes *
15238106Sdes * Redistributions in binary form must reproduce the above copyright notice,
16238106Sdes * this list of conditions and the following disclaimer in the documentation
17238106Sdes * and/or other materials provided with the distribution.
18238106Sdes *
19238106Sdes * Neither the name of the NLNET LABS nor the names of its contributors may
20238106Sdes * be used to endorse or promote products derived from this software without
21238106Sdes * specific prior written permission.
22238106Sdes *
23238106Sdes * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24238106Sdes * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25238106Sdes * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26238106Sdes * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
27238106Sdes * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28238106Sdes * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29238106Sdes * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30238106Sdes * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31238106Sdes * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32238106Sdes * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33238106Sdes * POSSIBILITY OF SUCH DAMAGE.
34238106Sdes */
35238106Sdes
36238106Sdes/**
37238106Sdes * \file
38238106Sdes *
39238106Sdes * This file contains pipe service functions.
40238106Sdes */
41238106Sdes#include "config.h"
42238106Sdes#include "util/tube.h"
43238106Sdes#include "util/log.h"
44238106Sdes#include "util/net_help.h"
45238106Sdes#include "util/netevent.h"
46238106Sdes#include "util/fptr_wlist.h"
47238106Sdes
48238106Sdes#ifndef USE_WINSOCK
49238106Sdes/* on unix */
50238106Sdes
51238106Sdes#ifndef HAVE_SOCKETPAIR
52238106Sdes/** no socketpair() available, like on Minix 3.1.7, use pipe */
53238106Sdes#define socketpair(f, t, p, sv) pipe(sv)
54238106Sdes#endif /* HAVE_SOCKETPAIR */
55238106Sdes
56238106Sdesstruct tube* tube_create(void)
57238106Sdes{
58238106Sdes	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
59238106Sdes	int sv[2];
60238106Sdes	if(!tube) {
61238106Sdes		int err = errno;
62238106Sdes		log_err("tube_create: out of memory");
63238106Sdes		errno = err;
64238106Sdes		return NULL;
65238106Sdes	}
66238106Sdes	tube->sr = -1;
67238106Sdes	tube->sw = -1;
68238106Sdes	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
69238106Sdes		int err = errno;
70238106Sdes		log_err("socketpair: %s", strerror(errno));
71238106Sdes		free(tube);
72238106Sdes		errno = err;
73238106Sdes		return NULL;
74238106Sdes	}
75238106Sdes	tube->sr = sv[0];
76238106Sdes	tube->sw = sv[1];
77238106Sdes	if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
78238106Sdes		int err = errno;
79238106Sdes		log_err("tube: cannot set nonblocking");
80238106Sdes		tube_delete(tube);
81238106Sdes		errno = err;
82238106Sdes		return NULL;
83238106Sdes	}
84238106Sdes	return tube;
85238106Sdes}
86238106Sdes
87238106Sdesvoid tube_delete(struct tube* tube)
88238106Sdes{
89238106Sdes	if(!tube) return;
90238106Sdes	tube_remove_bg_listen(tube);
91238106Sdes	tube_remove_bg_write(tube);
92238106Sdes	/* close fds after deleting commpoints, to be sure.
93238106Sdes	 *            Also epoll does not like closing fd before event_del */
94238106Sdes	tube_close_read(tube);
95238106Sdes	tube_close_write(tube);
96238106Sdes	free(tube);
97238106Sdes}
98238106Sdes
99238106Sdesvoid tube_close_read(struct tube* tube)
100238106Sdes{
101238106Sdes	if(tube->sr != -1) {
102238106Sdes		close(tube->sr);
103238106Sdes		tube->sr = -1;
104238106Sdes	}
105238106Sdes}
106238106Sdes
107238106Sdesvoid tube_close_write(struct tube* tube)
108238106Sdes{
109238106Sdes	if(tube->sw != -1) {
110238106Sdes		close(tube->sw);
111238106Sdes		tube->sw = -1;
112238106Sdes	}
113238106Sdes}
114238106Sdes
115238106Sdesvoid tube_remove_bg_listen(struct tube* tube)
116238106Sdes{
117238106Sdes	if(tube->listen_com) {
118238106Sdes		comm_point_delete(tube->listen_com);
119238106Sdes		tube->listen_com = NULL;
120238106Sdes	}
121238106Sdes	if(tube->cmd_msg) {
122238106Sdes		free(tube->cmd_msg);
123238106Sdes		tube->cmd_msg = NULL;
124238106Sdes	}
125238106Sdes}
126238106Sdes
127238106Sdesvoid tube_remove_bg_write(struct tube* tube)
128238106Sdes{
129238106Sdes	if(tube->res_com) {
130238106Sdes		comm_point_delete(tube->res_com);
131238106Sdes		tube->res_com = NULL;
132238106Sdes	}
133238106Sdes	if(tube->res_list) {
134238106Sdes		struct tube_res_list* np, *p = tube->res_list;
135238106Sdes		tube->res_list = NULL;
136238106Sdes		tube->res_last = NULL;
137238106Sdes		while(p) {
138238106Sdes			np = p->next;
139238106Sdes			free(p->buf);
140238106Sdes			free(p);
141238106Sdes			p = np;
142238106Sdes		}
143238106Sdes	}
144238106Sdes}
145238106Sdes
146238106Sdesint
147238106Sdestube_handle_listen(struct comm_point* c, void* arg, int error,
148238106Sdes        struct comm_reply* ATTR_UNUSED(reply_info))
149238106Sdes{
150238106Sdes	struct tube* tube = (struct tube*)arg;
151238106Sdes	ssize_t r;
152238106Sdes	if(error != NETEVENT_NOERROR) {
153238106Sdes		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
154238106Sdes		(*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
155238106Sdes		return 0;
156238106Sdes	}
157238106Sdes
158238106Sdes	if(tube->cmd_read < sizeof(tube->cmd_len)) {
159238106Sdes		/* complete reading the length of control msg */
160238106Sdes		r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
161238106Sdes			sizeof(tube->cmd_len) - tube->cmd_read);
162238106Sdes		if(r==0) {
163238106Sdes			/* error has happened or */
164238106Sdes			/* parent closed pipe, must have exited somehow */
165238106Sdes			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
166238106Sdes			(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
167238106Sdes				tube->listen_arg);
168238106Sdes			return 0;
169238106Sdes		}
170238106Sdes		if(r==-1) {
171238106Sdes			if(errno != EAGAIN && errno != EINTR) {
172238106Sdes				log_err("rpipe error: %s", strerror(errno));
173238106Sdes			}
174238106Sdes			/* nothing to read now, try later */
175238106Sdes			return 0;
176238106Sdes		}
177238106Sdes		tube->cmd_read += r;
178238106Sdes		if(tube->cmd_read < sizeof(tube->cmd_len)) {
179238106Sdes			/* not complete, try later */
180238106Sdes			return 0;
181238106Sdes		}
182238106Sdes		tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
183238106Sdes		if(!tube->cmd_msg) {
184238106Sdes			log_err("malloc failure");
185238106Sdes			tube->cmd_read = 0;
186238106Sdes			return 0;
187238106Sdes		}
188238106Sdes	}
189238106Sdes	/* cmd_len has been read, read remainder */
190238106Sdes	r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
191238106Sdes		tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
192238106Sdes	if(r==0) {
193238106Sdes		/* error has happened or */
194238106Sdes		/* parent closed pipe, must have exited somehow */
195238106Sdes		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
196238106Sdes		(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
197238106Sdes			tube->listen_arg);
198238106Sdes		return 0;
199238106Sdes	}
200238106Sdes	if(r==-1) {
201238106Sdes		/* nothing to read now, try later */
202238106Sdes		if(errno != EAGAIN && errno != EINTR) {
203238106Sdes			log_err("rpipe error: %s", strerror(errno));
204238106Sdes		}
205238106Sdes		return 0;
206238106Sdes	}
207238106Sdes	tube->cmd_read += r;
208238106Sdes	if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
209238106Sdes		/* not complete, try later */
210238106Sdes		return 0;
211238106Sdes	}
212238106Sdes	tube->cmd_read = 0;
213238106Sdes
214238106Sdes	fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
215238106Sdes	(*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
216238106Sdes		NETEVENT_NOERROR, tube->listen_arg);
217238106Sdes		/* also frees the buf */
218238106Sdes	tube->cmd_msg = NULL;
219238106Sdes	return 0;
220238106Sdes}
221238106Sdes
222238106Sdesint
223238106Sdestube_handle_write(struct comm_point* c, void* arg, int error,
224238106Sdes        struct comm_reply* ATTR_UNUSED(reply_info))
225238106Sdes{
226238106Sdes	struct tube* tube = (struct tube*)arg;
227238106Sdes	struct tube_res_list* item = tube->res_list;
228238106Sdes	ssize_t r;
229238106Sdes	if(error != NETEVENT_NOERROR) {
230238106Sdes		log_err("tube_handle_write net error %d", error);
231238106Sdes		return 0;
232238106Sdes	}
233238106Sdes
234238106Sdes	if(!item) {
235238106Sdes		comm_point_stop_listening(c);
236238106Sdes		return 0;
237238106Sdes	}
238238106Sdes
239238106Sdes	if(tube->res_write < sizeof(item->len)) {
240238106Sdes		r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
241238106Sdes			sizeof(item->len) - tube->res_write);
242238106Sdes		if(r == -1) {
243238106Sdes			if(errno != EAGAIN && errno != EINTR) {
244238106Sdes				log_err("wpipe error: %s", strerror(errno));
245238106Sdes			}
246238106Sdes			return 0; /* try again later */
247238106Sdes		}
248238106Sdes		if(r == 0) {
249238106Sdes			/* error on pipe, must have exited somehow */
250238106Sdes			/* cannot signal this to pipe user */
251238106Sdes			return 0;
252238106Sdes		}
253238106Sdes		tube->res_write += r;
254238106Sdes		if(tube->res_write < sizeof(item->len))
255238106Sdes			return 0;
256238106Sdes	}
257238106Sdes	r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
258238106Sdes		item->len - (tube->res_write - sizeof(item->len)));
259238106Sdes	if(r == -1) {
260238106Sdes		if(errno != EAGAIN && errno != EINTR) {
261238106Sdes			log_err("wpipe error: %s", strerror(errno));
262238106Sdes		}
263238106Sdes		return 0; /* try again later */
264238106Sdes	}
265238106Sdes	if(r == 0) {
266238106Sdes		/* error on pipe, must have exited somehow */
267238106Sdes		/* cannot signal this to pipe user */
268238106Sdes		return 0;
269238106Sdes	}
270238106Sdes	tube->res_write += r;
271238106Sdes	if(tube->res_write < sizeof(item->len) + item->len)
272238106Sdes		return 0;
273238106Sdes	/* done this result, remove it */
274238106Sdes	free(item->buf);
275238106Sdes	item->buf = NULL;
276238106Sdes	tube->res_list = tube->res_list->next;
277238106Sdes	free(item);
278238106Sdes	if(!tube->res_list) {
279238106Sdes		tube->res_last = NULL;
280238106Sdes		comm_point_stop_listening(c);
281238106Sdes	}
282238106Sdes	tube->res_write = 0;
283238106Sdes	return 0;
284238106Sdes}
285238106Sdes
286238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
287238106Sdes        int nonblock)
288238106Sdes{
289238106Sdes	ssize_t r, d;
290238106Sdes	int fd = tube->sw;
291238106Sdes
292238106Sdes	/* test */
293238106Sdes	if(nonblock) {
294238106Sdes		r = write(fd, &len, sizeof(len));
295238106Sdes		if(r == -1) {
296238106Sdes			if(errno==EINTR || errno==EAGAIN)
297238106Sdes				return -1;
298238106Sdes			log_err("tube msg write failed: %s", strerror(errno));
299238106Sdes			return -1; /* can still continue, perhaps */
300238106Sdes		}
301238106Sdes	} else r = 0;
302238106Sdes	if(!fd_set_block(fd))
303238106Sdes		return 0;
304238106Sdes	/* write remainder */
305238106Sdes	d = r;
306238106Sdes	while(d != (ssize_t)sizeof(len)) {
307238106Sdes		if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
308238106Sdes			log_err("tube msg write failed: %s", strerror(errno));
309238106Sdes			(void)fd_set_nonblock(fd);
310238106Sdes			return 0;
311238106Sdes		}
312238106Sdes		d += r;
313238106Sdes	}
314238106Sdes	d = 0;
315238106Sdes	while(d != (ssize_t)len) {
316238106Sdes		if((r=write(fd, buf+d, len-d)) == -1) {
317238106Sdes			log_err("tube msg write failed: %s", strerror(errno));
318238106Sdes			(void)fd_set_nonblock(fd);
319238106Sdes			return 0;
320238106Sdes		}
321238106Sdes		d += r;
322238106Sdes	}
323238106Sdes	if(!fd_set_nonblock(fd))
324238106Sdes		return 0;
325238106Sdes	return 1;
326238106Sdes}
327238106Sdes
328238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
329238106Sdes        int nonblock)
330238106Sdes{
331238106Sdes	ssize_t r, d;
332238106Sdes	int fd = tube->sr;
333238106Sdes
334238106Sdes	/* test */
335238106Sdes	*len = 0;
336238106Sdes	if(nonblock) {
337238106Sdes		r = read(fd, len, sizeof(*len));
338238106Sdes		if(r == -1) {
339238106Sdes			if(errno==EINTR || errno==EAGAIN)
340238106Sdes				return -1;
341238106Sdes			log_err("tube msg read failed: %s", strerror(errno));
342238106Sdes			return -1; /* we can still continue, perhaps */
343238106Sdes		}
344238106Sdes		if(r == 0) /* EOF */
345238106Sdes			return 0;
346238106Sdes	} else r = 0;
347238106Sdes	if(!fd_set_block(fd))
348238106Sdes		return 0;
349238106Sdes	/* read remainder */
350238106Sdes	d = r;
351238106Sdes	while(d != (ssize_t)sizeof(*len)) {
352238106Sdes		if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
353238106Sdes			log_err("tube msg read failed: %s", strerror(errno));
354238106Sdes			(void)fd_set_nonblock(fd);
355238106Sdes			return 0;
356238106Sdes		}
357238106Sdes		if(r == 0) /* EOF */ {
358238106Sdes			(void)fd_set_nonblock(fd);
359238106Sdes			return 0;
360238106Sdes		}
361238106Sdes		d += r;
362238106Sdes	}
363249141Sdes	log_assert(*len < 65536*2);
364238106Sdes	*buf = (uint8_t*)malloc(*len);
365238106Sdes	if(!*buf) {
366238106Sdes		log_err("tube read out of memory");
367238106Sdes		(void)fd_set_nonblock(fd);
368238106Sdes		return 0;
369238106Sdes	}
370238106Sdes	d = 0;
371238106Sdes	while(d != (ssize_t)*len) {
372238106Sdes		if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
373238106Sdes			log_err("tube msg read failed: %s", strerror(errno));
374238106Sdes			(void)fd_set_nonblock(fd);
375238106Sdes			free(*buf);
376238106Sdes			return 0;
377238106Sdes		}
378238106Sdes		if(r == 0) { /* EOF */
379238106Sdes			(void)fd_set_nonblock(fd);
380238106Sdes			free(*buf);
381238106Sdes			return 0;
382238106Sdes		}
383238106Sdes		d += r;
384238106Sdes	}
385238106Sdes	if(!fd_set_nonblock(fd)) {
386238106Sdes		free(*buf);
387238106Sdes		return 0;
388238106Sdes	}
389238106Sdes	return 1;
390238106Sdes}
391238106Sdes
392238106Sdes/** perform a select() on the fd */
393238106Sdesstatic int
394238106Sdespollit(int fd, struct timeval* t)
395238106Sdes{
396238106Sdes	fd_set r;
397238106Sdes#ifndef S_SPLINT_S
398238106Sdes	FD_ZERO(&r);
399238106Sdes	FD_SET(FD_SET_T fd, &r);
400238106Sdes#endif
401238106Sdes	if(select(fd+1, &r, NULL, NULL, t) == -1) {
402238106Sdes		return 0;
403238106Sdes	}
404238106Sdes	errno = 0;
405238106Sdes	return (int)(FD_ISSET(fd, &r));
406238106Sdes}
407238106Sdes
408238106Sdesint tube_poll(struct tube* tube)
409238106Sdes{
410238106Sdes	struct timeval t;
411238106Sdes	memset(&t, 0, sizeof(t));
412238106Sdes	return pollit(tube->sr, &t);
413238106Sdes}
414238106Sdes
415238106Sdesint tube_wait(struct tube* tube)
416238106Sdes{
417238106Sdes	return pollit(tube->sr, NULL);
418238106Sdes}
419238106Sdes
420238106Sdesint tube_read_fd(struct tube* tube)
421238106Sdes{
422238106Sdes	return tube->sr;
423238106Sdes}
424238106Sdes
425238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
426238106Sdes        tube_callback_t* cb, void* arg)
427238106Sdes{
428238106Sdes	tube->listen_cb = cb;
429238106Sdes	tube->listen_arg = arg;
430238106Sdes	if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
431238106Sdes		0, tube_handle_listen, tube))) {
432238106Sdes		int err = errno;
433238106Sdes		log_err("tube_setup_bg_l: commpoint creation failed");
434238106Sdes		errno = err;
435238106Sdes		return 0;
436238106Sdes	}
437238106Sdes	return 1;
438238106Sdes}
439238106Sdes
440238106Sdesint tube_setup_bg_write(struct tube* tube, struct comm_base* base)
441238106Sdes{
442238106Sdes	if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
443238106Sdes		1, tube_handle_write, tube))) {
444238106Sdes		int err = errno;
445238106Sdes		log_err("tube_setup_bg_w: commpoint creation failed");
446238106Sdes		errno = err;
447238106Sdes		return 0;
448238106Sdes	}
449238106Sdes	return 1;
450238106Sdes}
451238106Sdes
452238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
453238106Sdes{
454238106Sdes	struct tube_res_list* item =
455238106Sdes		(struct tube_res_list*)malloc(sizeof(*item));
456238106Sdes	if(!item) {
457238106Sdes		free(msg);
458238106Sdes		log_err("out of memory for async answer");
459238106Sdes		return 0;
460238106Sdes	}
461238106Sdes	item->buf = msg;
462238106Sdes	item->len = len;
463238106Sdes	item->next = NULL;
464238106Sdes	/* add at back of list, since the first one may be partially written */
465238106Sdes	if(tube->res_last)
466238106Sdes		tube->res_last->next = item;
467238106Sdes	else    tube->res_list = item;
468238106Sdes	tube->res_last = item;
469238106Sdes	if(tube->res_list == tube->res_last) {
470238106Sdes		/* first added item, start the write process */
471238106Sdes		comm_point_start_listening(tube->res_com, -1, -1);
472238106Sdes	}
473238106Sdes	return 1;
474238106Sdes}
475238106Sdes
476238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
477238106Sdes	void* ATTR_UNUSED(arg))
478238106Sdes{
479238106Sdes	log_assert(0);
480238106Sdes}
481238106Sdes
482238106Sdes#else /* USE_WINSOCK */
483238106Sdes/* on windows */
484238106Sdes
485238106Sdes
486238106Sdesstruct tube* tube_create(void)
487238106Sdes{
488238106Sdes	/* windows does not have forks like unix, so we only support
489238106Sdes	 * threads on windows. And thus the pipe need only connect
490238106Sdes	 * threads. We use a mutex and a list of datagrams. */
491238106Sdes	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
492238106Sdes	if(!tube) {
493238106Sdes		int err = errno;
494238106Sdes		log_err("tube_create: out of memory");
495238106Sdes		errno = err;
496238106Sdes		return NULL;
497238106Sdes	}
498238106Sdes	tube->event = WSACreateEvent();
499238106Sdes	if(tube->event == WSA_INVALID_EVENT) {
500238106Sdes		free(tube);
501238106Sdes		log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
502238106Sdes	}
503238106Sdes	if(!WSAResetEvent(tube->event)) {
504238106Sdes		log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
505238106Sdes	}
506238106Sdes	lock_basic_init(&tube->res_lock);
507238106Sdes	verbose(VERB_ALGO, "tube created");
508238106Sdes	return tube;
509238106Sdes}
510238106Sdes
511238106Sdesvoid tube_delete(struct tube* tube)
512238106Sdes{
513238106Sdes	if(!tube) return;
514238106Sdes	tube_remove_bg_listen(tube);
515238106Sdes	tube_remove_bg_write(tube);
516238106Sdes	tube_close_read(tube);
517238106Sdes	tube_close_write(tube);
518238106Sdes	if(!WSACloseEvent(tube->event))
519238106Sdes		log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
520238106Sdes	lock_basic_destroy(&tube->res_lock);
521238106Sdes	verbose(VERB_ALGO, "tube deleted");
522238106Sdes	free(tube);
523238106Sdes}
524238106Sdes
525238106Sdesvoid tube_close_read(struct tube* ATTR_UNUSED(tube))
526238106Sdes{
527238106Sdes	verbose(VERB_ALGO, "tube close_read");
528238106Sdes}
529238106Sdes
530238106Sdesvoid tube_close_write(struct tube* ATTR_UNUSED(tube))
531238106Sdes{
532238106Sdes	verbose(VERB_ALGO, "tube close_write");
533238106Sdes	/* wake up waiting reader with an empty queue */
534238106Sdes	if(!WSASetEvent(tube->event)) {
535238106Sdes		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
536238106Sdes	}
537238106Sdes}
538238106Sdes
539238106Sdesvoid tube_remove_bg_listen(struct tube* tube)
540238106Sdes{
541238106Sdes	verbose(VERB_ALGO, "tube remove_bg_listen");
542238106Sdes	winsock_unregister_wsaevent(&tube->ev_listen);
543238106Sdes}
544238106Sdes
545238106Sdesvoid tube_remove_bg_write(struct tube* tube)
546238106Sdes{
547238106Sdes	verbose(VERB_ALGO, "tube remove_bg_write");
548238106Sdes	if(tube->res_list) {
549238106Sdes		struct tube_res_list* np, *p = tube->res_list;
550238106Sdes		tube->res_list = NULL;
551238106Sdes		tube->res_last = NULL;
552238106Sdes		while(p) {
553238106Sdes			np = p->next;
554238106Sdes			free(p->buf);
555238106Sdes			free(p);
556238106Sdes			p = np;
557238106Sdes		}
558238106Sdes	}
559238106Sdes}
560238106Sdes
561238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
562238106Sdes        int ATTR_UNUSED(nonblock))
563238106Sdes{
564238106Sdes	uint8_t* a;
565238106Sdes	verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
566238106Sdes	a = (uint8_t*)memdup(buf, len);
567238106Sdes	if(!a) {
568238106Sdes		log_err("out of memory in tube_write_msg");
569238106Sdes		return 0;
570238106Sdes	}
571238106Sdes	/* always nonblocking, this pipe cannot get full */
572238106Sdes	return tube_queue_item(tube, a, len);
573238106Sdes}
574238106Sdes
575238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
576238106Sdes        int nonblock)
577238106Sdes{
578238106Sdes	struct tube_res_list* item = NULL;
579238106Sdes	verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
580238106Sdes	*buf = NULL;
581238106Sdes	if(!tube_poll(tube)) {
582238106Sdes		verbose(VERB_ALGO, "tube read_msg nodata");
583238106Sdes		/* nothing ready right now, wait if we want to */
584238106Sdes		if(nonblock)
585238106Sdes			return -1; /* would block waiting for items */
586238106Sdes		if(!tube_wait(tube))
587238106Sdes			return 0;
588238106Sdes	}
589238106Sdes	lock_basic_lock(&tube->res_lock);
590238106Sdes	if(tube->res_list) {
591238106Sdes		item = tube->res_list;
592238106Sdes		tube->res_list = item->next;
593238106Sdes		if(tube->res_last == item) {
594238106Sdes			/* the list is now empty */
595238106Sdes			tube->res_last = NULL;
596238106Sdes			verbose(VERB_ALGO, "tube read_msg lastdata");
597238106Sdes			if(!WSAResetEvent(tube->event)) {
598238106Sdes				log_err("WSAResetEvent: %s",
599238106Sdes					wsa_strerror(WSAGetLastError()));
600238106Sdes			}
601238106Sdes		}
602238106Sdes	}
603238106Sdes	lock_basic_unlock(&tube->res_lock);
604238106Sdes	if(!item)
605238106Sdes		return 0; /* would block waiting for items */
606238106Sdes	*buf = item->buf;
607238106Sdes	*len = item->len;
608238106Sdes	free(item);
609238106Sdes	verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
610238106Sdes	return 1;
611238106Sdes}
612238106Sdes
613238106Sdesint tube_poll(struct tube* tube)
614238106Sdes{
615238106Sdes	struct tube_res_list* item = NULL;
616238106Sdes	lock_basic_lock(&tube->res_lock);
617238106Sdes	item = tube->res_list;
618238106Sdes	lock_basic_unlock(&tube->res_lock);
619238106Sdes	if(item)
620238106Sdes		return 1;
621238106Sdes	return 0;
622238106Sdes}
623238106Sdes
624238106Sdesint tube_wait(struct tube* tube)
625238106Sdes{
626238106Sdes	/* block on eventhandle */
627238106Sdes	DWORD res = WSAWaitForMultipleEvents(
628238106Sdes		1 /* one event in array */,
629238106Sdes		&tube->event /* the event to wait for, our pipe signal */,
630238106Sdes		0 /* wait for all events is false */,
631238106Sdes		WSA_INFINITE /* wait, no timeout */,
632238106Sdes		0 /* we are not alertable for IO completion routines */
633238106Sdes		);
634238106Sdes	if(res == WSA_WAIT_TIMEOUT) {
635238106Sdes		return 0;
636238106Sdes	}
637238106Sdes	if(res == WSA_WAIT_IO_COMPLETION) {
638238106Sdes		/* a bit unexpected, since we were not alertable */
639238106Sdes		return 0;
640238106Sdes	}
641238106Sdes	return 1;
642238106Sdes}
643238106Sdes
644238106Sdesint tube_read_fd(struct tube* ATTR_UNUSED(tube))
645238106Sdes{
646238106Sdes	/* nothing sensible on Windows */
647238106Sdes	return -1;
648238106Sdes}
649238106Sdes
650238106Sdesint
651238106Sdestube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
652238106Sdes	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
653238106Sdes{
654238106Sdes	log_assert(0);
655238106Sdes	return 0;
656238106Sdes}
657238106Sdes
658238106Sdesint
659238106Sdestube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
660238106Sdes	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
661238106Sdes{
662238106Sdes	log_assert(0);
663238106Sdes	return 0;
664238106Sdes}
665238106Sdes
666238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
667238106Sdes        tube_callback_t* cb, void* arg)
668238106Sdes{
669238106Sdes	tube->listen_cb = cb;
670238106Sdes	tube->listen_arg = arg;
671238106Sdes	if(!comm_base_internal(base))
672238106Sdes		return 1; /* ignore when no comm base - testing */
673238106Sdes	return winsock_register_wsaevent(comm_base_internal(base),
674238106Sdes		&tube->ev_listen, tube->event, &tube_handle_signal, tube);
675238106Sdes}
676238106Sdes
677238106Sdesint tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
678238106Sdes	struct comm_base* ATTR_UNUSED(base))
679238106Sdes{
680238106Sdes	/* the queue item routine performs the signaling */
681238106Sdes	return 1;
682238106Sdes}
683238106Sdes
684238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
685238106Sdes{
686238106Sdes	struct tube_res_list* item =
687238106Sdes		(struct tube_res_list*)malloc(sizeof(*item));
688238106Sdes	verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
689238106Sdes	if(!item) {
690238106Sdes		free(msg);
691238106Sdes		log_err("out of memory for async answer");
692238106Sdes		return 0;
693238106Sdes	}
694238106Sdes	item->buf = msg;
695238106Sdes	item->len = len;
696238106Sdes	item->next = NULL;
697238106Sdes	lock_basic_lock(&tube->res_lock);
698238106Sdes	/* add at back of list, since the first one may be partially written */
699238106Sdes	if(tube->res_last)
700238106Sdes		tube->res_last->next = item;
701238106Sdes	else    tube->res_list = item;
702238106Sdes	tube->res_last = item;
703238106Sdes	/* signal the eventhandle */
704238106Sdes	if(!WSASetEvent(tube->event)) {
705238106Sdes		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
706238106Sdes	}
707238106Sdes	lock_basic_unlock(&tube->res_lock);
708238106Sdes	return 1;
709238106Sdes}
710238106Sdes
711238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
712238106Sdes	void* arg)
713238106Sdes{
714238106Sdes	struct tube* tube = (struct tube*)arg;
715238106Sdes	uint8_t* buf;
716238106Sdes	uint32_t len = 0;
717238106Sdes	verbose(VERB_ALGO, "tube handle_signal");
718238106Sdes	while(tube_poll(tube)) {
719238106Sdes		if(tube_read_msg(tube, &buf, &len, 1)) {
720238106Sdes			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
721238106Sdes			(*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
722238106Sdes				tube->listen_arg);
723238106Sdes		}
724238106Sdes	}
725238106Sdes}
726238106Sdes
727238106Sdes#endif /* USE_WINSOCK */
728