tube.c revision 249141
1238106Sdes/* 2238106Sdes * util/tube.c - pipe service 3238106Sdes * 4238106Sdes * Copyright (c) 2008, NLnet Labs. All rights reserved. 5238106Sdes * 6238106Sdes * This software is open source. 7238106Sdes * 8238106Sdes * Redistribution and use in source and binary forms, with or without 9238106Sdes * modification, are permitted provided that the following conditions 10238106Sdes * are met: 11238106Sdes * 12238106Sdes * Redistributions of source code must retain the above copyright notice, 13238106Sdes * this list of conditions and the following disclaimer. 14238106Sdes * 15238106Sdes * Redistributions in binary form must reproduce the above copyright notice, 16238106Sdes * this list of conditions and the following disclaimer in the documentation 17238106Sdes * and/or other materials provided with the distribution. 18238106Sdes * 19238106Sdes * Neither the name of the NLNET LABS nor the names of its contributors may 20238106Sdes * be used to endorse or promote products derived from this software without 21238106Sdes * specific prior written permission. 22238106Sdes * 23238106Sdes * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24238106Sdes * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 25238106Sdes * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 26238106Sdes * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE 27238106Sdes * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28238106Sdes * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29238106Sdes * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30238106Sdes * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31238106Sdes * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32238106Sdes * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33238106Sdes * POSSIBILITY OF SUCH DAMAGE. 34238106Sdes */ 35238106Sdes 36238106Sdes/** 37238106Sdes * \file 38238106Sdes * 39238106Sdes * This file contains pipe service functions. 40238106Sdes */ 41238106Sdes#include "config.h" 42238106Sdes#include "util/tube.h" 43238106Sdes#include "util/log.h" 44238106Sdes#include "util/net_help.h" 45238106Sdes#include "util/netevent.h" 46238106Sdes#include "util/fptr_wlist.h" 47238106Sdes 48238106Sdes#ifndef USE_WINSOCK 49238106Sdes/* on unix */ 50238106Sdes 51238106Sdes#ifndef HAVE_SOCKETPAIR 52238106Sdes/** no socketpair() available, like on Minix 3.1.7, use pipe */ 53238106Sdes#define socketpair(f, t, p, sv) pipe(sv) 54238106Sdes#endif /* HAVE_SOCKETPAIR */ 55238106Sdes 56238106Sdesstruct tube* tube_create(void) 57238106Sdes{ 58238106Sdes struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 59238106Sdes int sv[2]; 60238106Sdes if(!tube) { 61238106Sdes int err = errno; 62238106Sdes log_err("tube_create: out of memory"); 63238106Sdes errno = err; 64238106Sdes return NULL; 65238106Sdes } 66238106Sdes tube->sr = -1; 67238106Sdes tube->sw = -1; 68238106Sdes if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 69238106Sdes int err = errno; 70238106Sdes log_err("socketpair: %s", strerror(errno)); 71238106Sdes free(tube); 72238106Sdes errno = err; 73238106Sdes return NULL; 74238106Sdes } 75238106Sdes tube->sr = sv[0]; 76238106Sdes tube->sw = sv[1]; 77238106Sdes if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { 78238106Sdes int err = errno; 79238106Sdes log_err("tube: cannot set nonblocking"); 80238106Sdes tube_delete(tube); 81238106Sdes errno = err; 82238106Sdes return NULL; 83238106Sdes } 84238106Sdes return tube; 85238106Sdes} 86238106Sdes 87238106Sdesvoid tube_delete(struct tube* tube) 88238106Sdes{ 89238106Sdes if(!tube) return; 90238106Sdes tube_remove_bg_listen(tube); 91238106Sdes tube_remove_bg_write(tube); 92238106Sdes /* close fds after deleting commpoints, to be sure. 93238106Sdes * Also epoll does not like closing fd before event_del */ 94238106Sdes tube_close_read(tube); 95238106Sdes tube_close_write(tube); 96238106Sdes free(tube); 97238106Sdes} 98238106Sdes 99238106Sdesvoid tube_close_read(struct tube* tube) 100238106Sdes{ 101238106Sdes if(tube->sr != -1) { 102238106Sdes close(tube->sr); 103238106Sdes tube->sr = -1; 104238106Sdes } 105238106Sdes} 106238106Sdes 107238106Sdesvoid tube_close_write(struct tube* tube) 108238106Sdes{ 109238106Sdes if(tube->sw != -1) { 110238106Sdes close(tube->sw); 111238106Sdes tube->sw = -1; 112238106Sdes } 113238106Sdes} 114238106Sdes 115238106Sdesvoid tube_remove_bg_listen(struct tube* tube) 116238106Sdes{ 117238106Sdes if(tube->listen_com) { 118238106Sdes comm_point_delete(tube->listen_com); 119238106Sdes tube->listen_com = NULL; 120238106Sdes } 121238106Sdes if(tube->cmd_msg) { 122238106Sdes free(tube->cmd_msg); 123238106Sdes tube->cmd_msg = NULL; 124238106Sdes } 125238106Sdes} 126238106Sdes 127238106Sdesvoid tube_remove_bg_write(struct tube* tube) 128238106Sdes{ 129238106Sdes if(tube->res_com) { 130238106Sdes comm_point_delete(tube->res_com); 131238106Sdes tube->res_com = NULL; 132238106Sdes } 133238106Sdes if(tube->res_list) { 134238106Sdes struct tube_res_list* np, *p = tube->res_list; 135238106Sdes tube->res_list = NULL; 136238106Sdes tube->res_last = NULL; 137238106Sdes while(p) { 138238106Sdes np = p->next; 139238106Sdes free(p->buf); 140238106Sdes free(p); 141238106Sdes p = np; 142238106Sdes } 143238106Sdes } 144238106Sdes} 145238106Sdes 146238106Sdesint 147238106Sdestube_handle_listen(struct comm_point* c, void* arg, int error, 148238106Sdes struct comm_reply* ATTR_UNUSED(reply_info)) 149238106Sdes{ 150238106Sdes struct tube* tube = (struct tube*)arg; 151238106Sdes ssize_t r; 152238106Sdes if(error != NETEVENT_NOERROR) { 153238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 154238106Sdes (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg); 155238106Sdes return 0; 156238106Sdes } 157238106Sdes 158238106Sdes if(tube->cmd_read < sizeof(tube->cmd_len)) { 159238106Sdes /* complete reading the length of control msg */ 160238106Sdes r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read, 161238106Sdes sizeof(tube->cmd_len) - tube->cmd_read); 162238106Sdes if(r==0) { 163238106Sdes /* error has happened or */ 164238106Sdes /* parent closed pipe, must have exited somehow */ 165238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 166238106Sdes (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 167238106Sdes tube->listen_arg); 168238106Sdes return 0; 169238106Sdes } 170238106Sdes if(r==-1) { 171238106Sdes if(errno != EAGAIN && errno != EINTR) { 172238106Sdes log_err("rpipe error: %s", strerror(errno)); 173238106Sdes } 174238106Sdes /* nothing to read now, try later */ 175238106Sdes return 0; 176238106Sdes } 177238106Sdes tube->cmd_read += r; 178238106Sdes if(tube->cmd_read < sizeof(tube->cmd_len)) { 179238106Sdes /* not complete, try later */ 180238106Sdes return 0; 181238106Sdes } 182238106Sdes tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len); 183238106Sdes if(!tube->cmd_msg) { 184238106Sdes log_err("malloc failure"); 185238106Sdes tube->cmd_read = 0; 186238106Sdes return 0; 187238106Sdes } 188238106Sdes } 189238106Sdes /* cmd_len has been read, read remainder */ 190238106Sdes r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len), 191238106Sdes tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len))); 192238106Sdes if(r==0) { 193238106Sdes /* error has happened or */ 194238106Sdes /* parent closed pipe, must have exited somehow */ 195238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 196238106Sdes (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 197238106Sdes tube->listen_arg); 198238106Sdes return 0; 199238106Sdes } 200238106Sdes if(r==-1) { 201238106Sdes /* nothing to read now, try later */ 202238106Sdes if(errno != EAGAIN && errno != EINTR) { 203238106Sdes log_err("rpipe error: %s", strerror(errno)); 204238106Sdes } 205238106Sdes return 0; 206238106Sdes } 207238106Sdes tube->cmd_read += r; 208238106Sdes if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) { 209238106Sdes /* not complete, try later */ 210238106Sdes return 0; 211238106Sdes } 212238106Sdes tube->cmd_read = 0; 213238106Sdes 214238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 215238106Sdes (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 216238106Sdes NETEVENT_NOERROR, tube->listen_arg); 217238106Sdes /* also frees the buf */ 218238106Sdes tube->cmd_msg = NULL; 219238106Sdes return 0; 220238106Sdes} 221238106Sdes 222238106Sdesint 223238106Sdestube_handle_write(struct comm_point* c, void* arg, int error, 224238106Sdes struct comm_reply* ATTR_UNUSED(reply_info)) 225238106Sdes{ 226238106Sdes struct tube* tube = (struct tube*)arg; 227238106Sdes struct tube_res_list* item = tube->res_list; 228238106Sdes ssize_t r; 229238106Sdes if(error != NETEVENT_NOERROR) { 230238106Sdes log_err("tube_handle_write net error %d", error); 231238106Sdes return 0; 232238106Sdes } 233238106Sdes 234238106Sdes if(!item) { 235238106Sdes comm_point_stop_listening(c); 236238106Sdes return 0; 237238106Sdes } 238238106Sdes 239238106Sdes if(tube->res_write < sizeof(item->len)) { 240238106Sdes r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write, 241238106Sdes sizeof(item->len) - tube->res_write); 242238106Sdes if(r == -1) { 243238106Sdes if(errno != EAGAIN && errno != EINTR) { 244238106Sdes log_err("wpipe error: %s", strerror(errno)); 245238106Sdes } 246238106Sdes return 0; /* try again later */ 247238106Sdes } 248238106Sdes if(r == 0) { 249238106Sdes /* error on pipe, must have exited somehow */ 250238106Sdes /* cannot signal this to pipe user */ 251238106Sdes return 0; 252238106Sdes } 253238106Sdes tube->res_write += r; 254238106Sdes if(tube->res_write < sizeof(item->len)) 255238106Sdes return 0; 256238106Sdes } 257238106Sdes r = write(c->fd, item->buf + tube->res_write - sizeof(item->len), 258238106Sdes item->len - (tube->res_write - sizeof(item->len))); 259238106Sdes if(r == -1) { 260238106Sdes if(errno != EAGAIN && errno != EINTR) { 261238106Sdes log_err("wpipe error: %s", strerror(errno)); 262238106Sdes } 263238106Sdes return 0; /* try again later */ 264238106Sdes } 265238106Sdes if(r == 0) { 266238106Sdes /* error on pipe, must have exited somehow */ 267238106Sdes /* cannot signal this to pipe user */ 268238106Sdes return 0; 269238106Sdes } 270238106Sdes tube->res_write += r; 271238106Sdes if(tube->res_write < sizeof(item->len) + item->len) 272238106Sdes return 0; 273238106Sdes /* done this result, remove it */ 274238106Sdes free(item->buf); 275238106Sdes item->buf = NULL; 276238106Sdes tube->res_list = tube->res_list->next; 277238106Sdes free(item); 278238106Sdes if(!tube->res_list) { 279238106Sdes tube->res_last = NULL; 280238106Sdes comm_point_stop_listening(c); 281238106Sdes } 282238106Sdes tube->res_write = 0; 283238106Sdes return 0; 284238106Sdes} 285238106Sdes 286238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 287238106Sdes int nonblock) 288238106Sdes{ 289238106Sdes ssize_t r, d; 290238106Sdes int fd = tube->sw; 291238106Sdes 292238106Sdes /* test */ 293238106Sdes if(nonblock) { 294238106Sdes r = write(fd, &len, sizeof(len)); 295238106Sdes if(r == -1) { 296238106Sdes if(errno==EINTR || errno==EAGAIN) 297238106Sdes return -1; 298238106Sdes log_err("tube msg write failed: %s", strerror(errno)); 299238106Sdes return -1; /* can still continue, perhaps */ 300238106Sdes } 301238106Sdes } else r = 0; 302238106Sdes if(!fd_set_block(fd)) 303238106Sdes return 0; 304238106Sdes /* write remainder */ 305238106Sdes d = r; 306238106Sdes while(d != (ssize_t)sizeof(len)) { 307238106Sdes if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) { 308238106Sdes log_err("tube msg write failed: %s", strerror(errno)); 309238106Sdes (void)fd_set_nonblock(fd); 310238106Sdes return 0; 311238106Sdes } 312238106Sdes d += r; 313238106Sdes } 314238106Sdes d = 0; 315238106Sdes while(d != (ssize_t)len) { 316238106Sdes if((r=write(fd, buf+d, len-d)) == -1) { 317238106Sdes log_err("tube msg write failed: %s", strerror(errno)); 318238106Sdes (void)fd_set_nonblock(fd); 319238106Sdes return 0; 320238106Sdes } 321238106Sdes d += r; 322238106Sdes } 323238106Sdes if(!fd_set_nonblock(fd)) 324238106Sdes return 0; 325238106Sdes return 1; 326238106Sdes} 327238106Sdes 328238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 329238106Sdes int nonblock) 330238106Sdes{ 331238106Sdes ssize_t r, d; 332238106Sdes int fd = tube->sr; 333238106Sdes 334238106Sdes /* test */ 335238106Sdes *len = 0; 336238106Sdes if(nonblock) { 337238106Sdes r = read(fd, len, sizeof(*len)); 338238106Sdes if(r == -1) { 339238106Sdes if(errno==EINTR || errno==EAGAIN) 340238106Sdes return -1; 341238106Sdes log_err("tube msg read failed: %s", strerror(errno)); 342238106Sdes return -1; /* we can still continue, perhaps */ 343238106Sdes } 344238106Sdes if(r == 0) /* EOF */ 345238106Sdes return 0; 346238106Sdes } else r = 0; 347238106Sdes if(!fd_set_block(fd)) 348238106Sdes return 0; 349238106Sdes /* read remainder */ 350238106Sdes d = r; 351238106Sdes while(d != (ssize_t)sizeof(*len)) { 352238106Sdes if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) { 353238106Sdes log_err("tube msg read failed: %s", strerror(errno)); 354238106Sdes (void)fd_set_nonblock(fd); 355238106Sdes return 0; 356238106Sdes } 357238106Sdes if(r == 0) /* EOF */ { 358238106Sdes (void)fd_set_nonblock(fd); 359238106Sdes return 0; 360238106Sdes } 361238106Sdes d += r; 362238106Sdes } 363249141Sdes log_assert(*len < 65536*2); 364238106Sdes *buf = (uint8_t*)malloc(*len); 365238106Sdes if(!*buf) { 366238106Sdes log_err("tube read out of memory"); 367238106Sdes (void)fd_set_nonblock(fd); 368238106Sdes return 0; 369238106Sdes } 370238106Sdes d = 0; 371238106Sdes while(d != (ssize_t)*len) { 372238106Sdes if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) { 373238106Sdes log_err("tube msg read failed: %s", strerror(errno)); 374238106Sdes (void)fd_set_nonblock(fd); 375238106Sdes free(*buf); 376238106Sdes return 0; 377238106Sdes } 378238106Sdes if(r == 0) { /* EOF */ 379238106Sdes (void)fd_set_nonblock(fd); 380238106Sdes free(*buf); 381238106Sdes return 0; 382238106Sdes } 383238106Sdes d += r; 384238106Sdes } 385238106Sdes if(!fd_set_nonblock(fd)) { 386238106Sdes free(*buf); 387238106Sdes return 0; 388238106Sdes } 389238106Sdes return 1; 390238106Sdes} 391238106Sdes 392238106Sdes/** perform a select() on the fd */ 393238106Sdesstatic int 394238106Sdespollit(int fd, struct timeval* t) 395238106Sdes{ 396238106Sdes fd_set r; 397238106Sdes#ifndef S_SPLINT_S 398238106Sdes FD_ZERO(&r); 399238106Sdes FD_SET(FD_SET_T fd, &r); 400238106Sdes#endif 401238106Sdes if(select(fd+1, &r, NULL, NULL, t) == -1) { 402238106Sdes return 0; 403238106Sdes } 404238106Sdes errno = 0; 405238106Sdes return (int)(FD_ISSET(fd, &r)); 406238106Sdes} 407238106Sdes 408238106Sdesint tube_poll(struct tube* tube) 409238106Sdes{ 410238106Sdes struct timeval t; 411238106Sdes memset(&t, 0, sizeof(t)); 412238106Sdes return pollit(tube->sr, &t); 413238106Sdes} 414238106Sdes 415238106Sdesint tube_wait(struct tube* tube) 416238106Sdes{ 417238106Sdes return pollit(tube->sr, NULL); 418238106Sdes} 419238106Sdes 420238106Sdesint tube_read_fd(struct tube* tube) 421238106Sdes{ 422238106Sdes return tube->sr; 423238106Sdes} 424238106Sdes 425238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 426238106Sdes tube_callback_t* cb, void* arg) 427238106Sdes{ 428238106Sdes tube->listen_cb = cb; 429238106Sdes tube->listen_arg = arg; 430238106Sdes if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 431238106Sdes 0, tube_handle_listen, tube))) { 432238106Sdes int err = errno; 433238106Sdes log_err("tube_setup_bg_l: commpoint creation failed"); 434238106Sdes errno = err; 435238106Sdes return 0; 436238106Sdes } 437238106Sdes return 1; 438238106Sdes} 439238106Sdes 440238106Sdesint tube_setup_bg_write(struct tube* tube, struct comm_base* base) 441238106Sdes{ 442238106Sdes if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 443238106Sdes 1, tube_handle_write, tube))) { 444238106Sdes int err = errno; 445238106Sdes log_err("tube_setup_bg_w: commpoint creation failed"); 446238106Sdes errno = err; 447238106Sdes return 0; 448238106Sdes } 449238106Sdes return 1; 450238106Sdes} 451238106Sdes 452238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 453238106Sdes{ 454238106Sdes struct tube_res_list* item = 455238106Sdes (struct tube_res_list*)malloc(sizeof(*item)); 456238106Sdes if(!item) { 457238106Sdes free(msg); 458238106Sdes log_err("out of memory for async answer"); 459238106Sdes return 0; 460238106Sdes } 461238106Sdes item->buf = msg; 462238106Sdes item->len = len; 463238106Sdes item->next = NULL; 464238106Sdes /* add at back of list, since the first one may be partially written */ 465238106Sdes if(tube->res_last) 466238106Sdes tube->res_last->next = item; 467238106Sdes else tube->res_list = item; 468238106Sdes tube->res_last = item; 469238106Sdes if(tube->res_list == tube->res_last) { 470238106Sdes /* first added item, start the write process */ 471238106Sdes comm_point_start_listening(tube->res_com, -1, -1); 472238106Sdes } 473238106Sdes return 1; 474238106Sdes} 475238106Sdes 476238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 477238106Sdes void* ATTR_UNUSED(arg)) 478238106Sdes{ 479238106Sdes log_assert(0); 480238106Sdes} 481238106Sdes 482238106Sdes#else /* USE_WINSOCK */ 483238106Sdes/* on windows */ 484238106Sdes 485238106Sdes 486238106Sdesstruct tube* tube_create(void) 487238106Sdes{ 488238106Sdes /* windows does not have forks like unix, so we only support 489238106Sdes * threads on windows. And thus the pipe need only connect 490238106Sdes * threads. We use a mutex and a list of datagrams. */ 491238106Sdes struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 492238106Sdes if(!tube) { 493238106Sdes int err = errno; 494238106Sdes log_err("tube_create: out of memory"); 495238106Sdes errno = err; 496238106Sdes return NULL; 497238106Sdes } 498238106Sdes tube->event = WSACreateEvent(); 499238106Sdes if(tube->event == WSA_INVALID_EVENT) { 500238106Sdes free(tube); 501238106Sdes log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError())); 502238106Sdes } 503238106Sdes if(!WSAResetEvent(tube->event)) { 504238106Sdes log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError())); 505238106Sdes } 506238106Sdes lock_basic_init(&tube->res_lock); 507238106Sdes verbose(VERB_ALGO, "tube created"); 508238106Sdes return tube; 509238106Sdes} 510238106Sdes 511238106Sdesvoid tube_delete(struct tube* tube) 512238106Sdes{ 513238106Sdes if(!tube) return; 514238106Sdes tube_remove_bg_listen(tube); 515238106Sdes tube_remove_bg_write(tube); 516238106Sdes tube_close_read(tube); 517238106Sdes tube_close_write(tube); 518238106Sdes if(!WSACloseEvent(tube->event)) 519238106Sdes log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError())); 520238106Sdes lock_basic_destroy(&tube->res_lock); 521238106Sdes verbose(VERB_ALGO, "tube deleted"); 522238106Sdes free(tube); 523238106Sdes} 524238106Sdes 525238106Sdesvoid tube_close_read(struct tube* ATTR_UNUSED(tube)) 526238106Sdes{ 527238106Sdes verbose(VERB_ALGO, "tube close_read"); 528238106Sdes} 529238106Sdes 530238106Sdesvoid tube_close_write(struct tube* ATTR_UNUSED(tube)) 531238106Sdes{ 532238106Sdes verbose(VERB_ALGO, "tube close_write"); 533238106Sdes /* wake up waiting reader with an empty queue */ 534238106Sdes if(!WSASetEvent(tube->event)) { 535238106Sdes log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 536238106Sdes } 537238106Sdes} 538238106Sdes 539238106Sdesvoid tube_remove_bg_listen(struct tube* tube) 540238106Sdes{ 541238106Sdes verbose(VERB_ALGO, "tube remove_bg_listen"); 542238106Sdes winsock_unregister_wsaevent(&tube->ev_listen); 543238106Sdes} 544238106Sdes 545238106Sdesvoid tube_remove_bg_write(struct tube* tube) 546238106Sdes{ 547238106Sdes verbose(VERB_ALGO, "tube remove_bg_write"); 548238106Sdes if(tube->res_list) { 549238106Sdes struct tube_res_list* np, *p = tube->res_list; 550238106Sdes tube->res_list = NULL; 551238106Sdes tube->res_last = NULL; 552238106Sdes while(p) { 553238106Sdes np = p->next; 554238106Sdes free(p->buf); 555238106Sdes free(p); 556238106Sdes p = np; 557238106Sdes } 558238106Sdes } 559238106Sdes} 560238106Sdes 561238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 562238106Sdes int ATTR_UNUSED(nonblock)) 563238106Sdes{ 564238106Sdes uint8_t* a; 565238106Sdes verbose(VERB_ALGO, "tube write_msg len %d", (int)len); 566238106Sdes a = (uint8_t*)memdup(buf, len); 567238106Sdes if(!a) { 568238106Sdes log_err("out of memory in tube_write_msg"); 569238106Sdes return 0; 570238106Sdes } 571238106Sdes /* always nonblocking, this pipe cannot get full */ 572238106Sdes return tube_queue_item(tube, a, len); 573238106Sdes} 574238106Sdes 575238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 576238106Sdes int nonblock) 577238106Sdes{ 578238106Sdes struct tube_res_list* item = NULL; 579238106Sdes verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking"); 580238106Sdes *buf = NULL; 581238106Sdes if(!tube_poll(tube)) { 582238106Sdes verbose(VERB_ALGO, "tube read_msg nodata"); 583238106Sdes /* nothing ready right now, wait if we want to */ 584238106Sdes if(nonblock) 585238106Sdes return -1; /* would block waiting for items */ 586238106Sdes if(!tube_wait(tube)) 587238106Sdes return 0; 588238106Sdes } 589238106Sdes lock_basic_lock(&tube->res_lock); 590238106Sdes if(tube->res_list) { 591238106Sdes item = tube->res_list; 592238106Sdes tube->res_list = item->next; 593238106Sdes if(tube->res_last == item) { 594238106Sdes /* the list is now empty */ 595238106Sdes tube->res_last = NULL; 596238106Sdes verbose(VERB_ALGO, "tube read_msg lastdata"); 597238106Sdes if(!WSAResetEvent(tube->event)) { 598238106Sdes log_err("WSAResetEvent: %s", 599238106Sdes wsa_strerror(WSAGetLastError())); 600238106Sdes } 601238106Sdes } 602238106Sdes } 603238106Sdes lock_basic_unlock(&tube->res_lock); 604238106Sdes if(!item) 605238106Sdes return 0; /* would block waiting for items */ 606238106Sdes *buf = item->buf; 607238106Sdes *len = item->len; 608238106Sdes free(item); 609238106Sdes verbose(VERB_ALGO, "tube read_msg len %d", (int)*len); 610238106Sdes return 1; 611238106Sdes} 612238106Sdes 613238106Sdesint tube_poll(struct tube* tube) 614238106Sdes{ 615238106Sdes struct tube_res_list* item = NULL; 616238106Sdes lock_basic_lock(&tube->res_lock); 617238106Sdes item = tube->res_list; 618238106Sdes lock_basic_unlock(&tube->res_lock); 619238106Sdes if(item) 620238106Sdes return 1; 621238106Sdes return 0; 622238106Sdes} 623238106Sdes 624238106Sdesint tube_wait(struct tube* tube) 625238106Sdes{ 626238106Sdes /* block on eventhandle */ 627238106Sdes DWORD res = WSAWaitForMultipleEvents( 628238106Sdes 1 /* one event in array */, 629238106Sdes &tube->event /* the event to wait for, our pipe signal */, 630238106Sdes 0 /* wait for all events is false */, 631238106Sdes WSA_INFINITE /* wait, no timeout */, 632238106Sdes 0 /* we are not alertable for IO completion routines */ 633238106Sdes ); 634238106Sdes if(res == WSA_WAIT_TIMEOUT) { 635238106Sdes return 0; 636238106Sdes } 637238106Sdes if(res == WSA_WAIT_IO_COMPLETION) { 638238106Sdes /* a bit unexpected, since we were not alertable */ 639238106Sdes return 0; 640238106Sdes } 641238106Sdes return 1; 642238106Sdes} 643238106Sdes 644238106Sdesint tube_read_fd(struct tube* ATTR_UNUSED(tube)) 645238106Sdes{ 646238106Sdes /* nothing sensible on Windows */ 647238106Sdes return -1; 648238106Sdes} 649238106Sdes 650238106Sdesint 651238106Sdestube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 652238106Sdes int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 653238106Sdes{ 654238106Sdes log_assert(0); 655238106Sdes return 0; 656238106Sdes} 657238106Sdes 658238106Sdesint 659238106Sdestube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 660238106Sdes int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 661238106Sdes{ 662238106Sdes log_assert(0); 663238106Sdes return 0; 664238106Sdes} 665238106Sdes 666238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 667238106Sdes tube_callback_t* cb, void* arg) 668238106Sdes{ 669238106Sdes tube->listen_cb = cb; 670238106Sdes tube->listen_arg = arg; 671238106Sdes if(!comm_base_internal(base)) 672238106Sdes return 1; /* ignore when no comm base - testing */ 673238106Sdes return winsock_register_wsaevent(comm_base_internal(base), 674238106Sdes &tube->ev_listen, tube->event, &tube_handle_signal, tube); 675238106Sdes} 676238106Sdes 677238106Sdesint tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 678238106Sdes struct comm_base* ATTR_UNUSED(base)) 679238106Sdes{ 680238106Sdes /* the queue item routine performs the signaling */ 681238106Sdes return 1; 682238106Sdes} 683238106Sdes 684238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 685238106Sdes{ 686238106Sdes struct tube_res_list* item = 687238106Sdes (struct tube_res_list*)malloc(sizeof(*item)); 688238106Sdes verbose(VERB_ALGO, "tube queue_item len %d", (int)len); 689238106Sdes if(!item) { 690238106Sdes free(msg); 691238106Sdes log_err("out of memory for async answer"); 692238106Sdes return 0; 693238106Sdes } 694238106Sdes item->buf = msg; 695238106Sdes item->len = len; 696238106Sdes item->next = NULL; 697238106Sdes lock_basic_lock(&tube->res_lock); 698238106Sdes /* add at back of list, since the first one may be partially written */ 699238106Sdes if(tube->res_last) 700238106Sdes tube->res_last->next = item; 701238106Sdes else tube->res_list = item; 702238106Sdes tube->res_last = item; 703238106Sdes /* signal the eventhandle */ 704238106Sdes if(!WSASetEvent(tube->event)) { 705238106Sdes log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 706238106Sdes } 707238106Sdes lock_basic_unlock(&tube->res_lock); 708238106Sdes return 1; 709238106Sdes} 710238106Sdes 711238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 712238106Sdes void* arg) 713238106Sdes{ 714238106Sdes struct tube* tube = (struct tube*)arg; 715238106Sdes uint8_t* buf; 716238106Sdes uint32_t len = 0; 717238106Sdes verbose(VERB_ALGO, "tube handle_signal"); 718238106Sdes while(tube_poll(tube)) { 719238106Sdes if(tube_read_msg(tube, &buf, &len, 1)) { 720238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 721238106Sdes (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 722238106Sdes tube->listen_arg); 723238106Sdes } 724238106Sdes } 725238106Sdes} 726238106Sdes 727238106Sdes#endif /* USE_WINSOCK */ 728