1238106Sdes/* 2238106Sdes * util/tube.c - pipe service 3238106Sdes * 4238106Sdes * Copyright (c) 2008, NLnet Labs. All rights reserved. 5238106Sdes * 6238106Sdes * This software is open source. 7238106Sdes * 8238106Sdes * Redistribution and use in source and binary forms, with or without 9238106Sdes * modification, are permitted provided that the following conditions 10238106Sdes * are met: 11238106Sdes * 12238106Sdes * Redistributions of source code must retain the above copyright notice, 13238106Sdes * this list of conditions and the following disclaimer. 14238106Sdes * 15238106Sdes * Redistributions in binary form must reproduce the above copyright notice, 16238106Sdes * this list of conditions and the following disclaimer in the documentation 17238106Sdes * and/or other materials provided with the distribution. 18238106Sdes * 19238106Sdes * Neither the name of the NLNET LABS nor the names of its contributors may 20238106Sdes * be used to endorse or promote products derived from this software without 21238106Sdes * specific prior written permission. 22238106Sdes * 23238106Sdes * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24266114Sdes * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25266114Sdes * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 26266114Sdes * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 27266114Sdes * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 28266114Sdes * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 29266114Sdes * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 30266114Sdes * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 31266114Sdes * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 32266114Sdes * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 33266114Sdes * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 34238106Sdes */ 35238106Sdes 36238106Sdes/** 37238106Sdes * \file 38238106Sdes * 39238106Sdes * This file contains pipe service functions. 40238106Sdes */ 41238106Sdes#include "config.h" 42238106Sdes#include "util/tube.h" 43238106Sdes#include "util/log.h" 44238106Sdes#include "util/net_help.h" 45238106Sdes#include "util/netevent.h" 46238106Sdes#include "util/fptr_wlist.h" 47238106Sdes 48238106Sdes#ifndef USE_WINSOCK 49238106Sdes/* on unix */ 50238106Sdes 51238106Sdes#ifndef HAVE_SOCKETPAIR 52238106Sdes/** no socketpair() available, like on Minix 3.1.7, use pipe */ 53238106Sdes#define socketpair(f, t, p, sv) pipe(sv) 54238106Sdes#endif /* HAVE_SOCKETPAIR */ 55238106Sdes 56238106Sdesstruct tube* tube_create(void) 57238106Sdes{ 58238106Sdes struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 59238106Sdes int sv[2]; 60238106Sdes if(!tube) { 61238106Sdes int err = errno; 62238106Sdes log_err("tube_create: out of memory"); 63238106Sdes errno = err; 64238106Sdes return NULL; 65238106Sdes } 66238106Sdes tube->sr = -1; 67238106Sdes tube->sw = -1; 68238106Sdes if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 69238106Sdes int err = errno; 70238106Sdes log_err("socketpair: %s", strerror(errno)); 71238106Sdes free(tube); 72238106Sdes errno = err; 73238106Sdes return NULL; 74238106Sdes } 75238106Sdes tube->sr = sv[0]; 76238106Sdes tube->sw = sv[1]; 77238106Sdes if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) { 78238106Sdes int err = errno; 79238106Sdes log_err("tube: cannot set nonblocking"); 80238106Sdes tube_delete(tube); 81238106Sdes errno = err; 82238106Sdes return NULL; 83238106Sdes } 84238106Sdes return tube; 85238106Sdes} 86238106Sdes 87238106Sdesvoid tube_delete(struct tube* tube) 88238106Sdes{ 89238106Sdes if(!tube) return; 90238106Sdes tube_remove_bg_listen(tube); 91238106Sdes tube_remove_bg_write(tube); 92238106Sdes /* close fds after deleting commpoints, to be sure. 93238106Sdes * Also epoll does not like closing fd before event_del */ 94238106Sdes tube_close_read(tube); 95238106Sdes tube_close_write(tube); 96238106Sdes free(tube); 97238106Sdes} 98238106Sdes 99238106Sdesvoid tube_close_read(struct tube* tube) 100238106Sdes{ 101238106Sdes if(tube->sr != -1) { 102238106Sdes close(tube->sr); 103238106Sdes tube->sr = -1; 104238106Sdes } 105238106Sdes} 106238106Sdes 107238106Sdesvoid tube_close_write(struct tube* tube) 108238106Sdes{ 109238106Sdes if(tube->sw != -1) { 110238106Sdes close(tube->sw); 111238106Sdes tube->sw = -1; 112238106Sdes } 113238106Sdes} 114238106Sdes 115238106Sdesvoid tube_remove_bg_listen(struct tube* tube) 116238106Sdes{ 117238106Sdes if(tube->listen_com) { 118238106Sdes comm_point_delete(tube->listen_com); 119238106Sdes tube->listen_com = NULL; 120238106Sdes } 121296415Sdes free(tube->cmd_msg); 122296415Sdes tube->cmd_msg = NULL; 123238106Sdes} 124238106Sdes 125238106Sdesvoid tube_remove_bg_write(struct tube* tube) 126238106Sdes{ 127238106Sdes if(tube->res_com) { 128238106Sdes comm_point_delete(tube->res_com); 129238106Sdes tube->res_com = NULL; 130238106Sdes } 131238106Sdes if(tube->res_list) { 132238106Sdes struct tube_res_list* np, *p = tube->res_list; 133238106Sdes tube->res_list = NULL; 134238106Sdes tube->res_last = NULL; 135238106Sdes while(p) { 136238106Sdes np = p->next; 137238106Sdes free(p->buf); 138238106Sdes free(p); 139238106Sdes p = np; 140238106Sdes } 141238106Sdes } 142238106Sdes} 143238106Sdes 144238106Sdesint 145238106Sdestube_handle_listen(struct comm_point* c, void* arg, int error, 146238106Sdes struct comm_reply* ATTR_UNUSED(reply_info)) 147238106Sdes{ 148238106Sdes struct tube* tube = (struct tube*)arg; 149238106Sdes ssize_t r; 150238106Sdes if(error != NETEVENT_NOERROR) { 151238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 152238106Sdes (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg); 153238106Sdes return 0; 154238106Sdes } 155238106Sdes 156238106Sdes if(tube->cmd_read < sizeof(tube->cmd_len)) { 157238106Sdes /* complete reading the length of control msg */ 158238106Sdes r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read, 159238106Sdes sizeof(tube->cmd_len) - tube->cmd_read); 160238106Sdes if(r==0) { 161238106Sdes /* error has happened or */ 162238106Sdes /* parent closed pipe, must have exited somehow */ 163238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 164238106Sdes (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 165238106Sdes tube->listen_arg); 166238106Sdes return 0; 167238106Sdes } 168238106Sdes if(r==-1) { 169238106Sdes if(errno != EAGAIN && errno != EINTR) { 170238106Sdes log_err("rpipe error: %s", strerror(errno)); 171238106Sdes } 172238106Sdes /* nothing to read now, try later */ 173238106Sdes return 0; 174238106Sdes } 175238106Sdes tube->cmd_read += r; 176238106Sdes if(tube->cmd_read < sizeof(tube->cmd_len)) { 177238106Sdes /* not complete, try later */ 178238106Sdes return 0; 179238106Sdes } 180238106Sdes tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len); 181238106Sdes if(!tube->cmd_msg) { 182238106Sdes log_err("malloc failure"); 183238106Sdes tube->cmd_read = 0; 184238106Sdes return 0; 185238106Sdes } 186238106Sdes } 187238106Sdes /* cmd_len has been read, read remainder */ 188238106Sdes r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len), 189238106Sdes tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len))); 190238106Sdes if(r==0) { 191238106Sdes /* error has happened or */ 192238106Sdes /* parent closed pipe, must have exited somehow */ 193238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 194238106Sdes (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED, 195238106Sdes tube->listen_arg); 196238106Sdes return 0; 197238106Sdes } 198238106Sdes if(r==-1) { 199238106Sdes /* nothing to read now, try later */ 200238106Sdes if(errno != EAGAIN && errno != EINTR) { 201238106Sdes log_err("rpipe error: %s", strerror(errno)); 202238106Sdes } 203238106Sdes return 0; 204238106Sdes } 205238106Sdes tube->cmd_read += r; 206238106Sdes if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) { 207238106Sdes /* not complete, try later */ 208238106Sdes return 0; 209238106Sdes } 210238106Sdes tube->cmd_read = 0; 211238106Sdes 212238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 213238106Sdes (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len, 214238106Sdes NETEVENT_NOERROR, tube->listen_arg); 215238106Sdes /* also frees the buf */ 216238106Sdes tube->cmd_msg = NULL; 217238106Sdes return 0; 218238106Sdes} 219238106Sdes 220238106Sdesint 221238106Sdestube_handle_write(struct comm_point* c, void* arg, int error, 222238106Sdes struct comm_reply* ATTR_UNUSED(reply_info)) 223238106Sdes{ 224238106Sdes struct tube* tube = (struct tube*)arg; 225238106Sdes struct tube_res_list* item = tube->res_list; 226238106Sdes ssize_t r; 227238106Sdes if(error != NETEVENT_NOERROR) { 228238106Sdes log_err("tube_handle_write net error %d", error); 229238106Sdes return 0; 230238106Sdes } 231238106Sdes 232238106Sdes if(!item) { 233238106Sdes comm_point_stop_listening(c); 234238106Sdes return 0; 235238106Sdes } 236238106Sdes 237238106Sdes if(tube->res_write < sizeof(item->len)) { 238238106Sdes r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write, 239238106Sdes sizeof(item->len) - tube->res_write); 240238106Sdes if(r == -1) { 241238106Sdes if(errno != EAGAIN && errno != EINTR) { 242238106Sdes log_err("wpipe error: %s", strerror(errno)); 243238106Sdes } 244238106Sdes return 0; /* try again later */ 245238106Sdes } 246238106Sdes if(r == 0) { 247238106Sdes /* error on pipe, must have exited somehow */ 248238106Sdes /* cannot signal this to pipe user */ 249238106Sdes return 0; 250238106Sdes } 251238106Sdes tube->res_write += r; 252238106Sdes if(tube->res_write < sizeof(item->len)) 253238106Sdes return 0; 254238106Sdes } 255238106Sdes r = write(c->fd, item->buf + tube->res_write - sizeof(item->len), 256238106Sdes item->len - (tube->res_write - sizeof(item->len))); 257238106Sdes if(r == -1) { 258238106Sdes if(errno != EAGAIN && errno != EINTR) { 259238106Sdes log_err("wpipe error: %s", strerror(errno)); 260238106Sdes } 261238106Sdes return 0; /* try again later */ 262238106Sdes } 263238106Sdes if(r == 0) { 264238106Sdes /* error on pipe, must have exited somehow */ 265238106Sdes /* cannot signal this to pipe user */ 266238106Sdes return 0; 267238106Sdes } 268238106Sdes tube->res_write += r; 269238106Sdes if(tube->res_write < sizeof(item->len) + item->len) 270238106Sdes return 0; 271238106Sdes /* done this result, remove it */ 272238106Sdes free(item->buf); 273238106Sdes item->buf = NULL; 274238106Sdes tube->res_list = tube->res_list->next; 275238106Sdes free(item); 276238106Sdes if(!tube->res_list) { 277238106Sdes tube->res_last = NULL; 278238106Sdes comm_point_stop_listening(c); 279238106Sdes } 280238106Sdes tube->res_write = 0; 281238106Sdes return 0; 282238106Sdes} 283238106Sdes 284238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 285238106Sdes int nonblock) 286238106Sdes{ 287238106Sdes ssize_t r, d; 288238106Sdes int fd = tube->sw; 289238106Sdes 290238106Sdes /* test */ 291238106Sdes if(nonblock) { 292238106Sdes r = write(fd, &len, sizeof(len)); 293238106Sdes if(r == -1) { 294238106Sdes if(errno==EINTR || errno==EAGAIN) 295238106Sdes return -1; 296238106Sdes log_err("tube msg write failed: %s", strerror(errno)); 297238106Sdes return -1; /* can still continue, perhaps */ 298238106Sdes } 299238106Sdes } else r = 0; 300238106Sdes if(!fd_set_block(fd)) 301238106Sdes return 0; 302238106Sdes /* write remainder */ 303238106Sdes d = r; 304238106Sdes while(d != (ssize_t)sizeof(len)) { 305238106Sdes if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) { 306238106Sdes log_err("tube msg write failed: %s", strerror(errno)); 307238106Sdes (void)fd_set_nonblock(fd); 308238106Sdes return 0; 309238106Sdes } 310238106Sdes d += r; 311238106Sdes } 312238106Sdes d = 0; 313238106Sdes while(d != (ssize_t)len) { 314238106Sdes if((r=write(fd, buf+d, len-d)) == -1) { 315238106Sdes log_err("tube msg write failed: %s", strerror(errno)); 316238106Sdes (void)fd_set_nonblock(fd); 317238106Sdes return 0; 318238106Sdes } 319238106Sdes d += r; 320238106Sdes } 321238106Sdes if(!fd_set_nonblock(fd)) 322238106Sdes return 0; 323238106Sdes return 1; 324238106Sdes} 325238106Sdes 326238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 327238106Sdes int nonblock) 328238106Sdes{ 329238106Sdes ssize_t r, d; 330238106Sdes int fd = tube->sr; 331238106Sdes 332238106Sdes /* test */ 333238106Sdes *len = 0; 334238106Sdes if(nonblock) { 335238106Sdes r = read(fd, len, sizeof(*len)); 336238106Sdes if(r == -1) { 337238106Sdes if(errno==EINTR || errno==EAGAIN) 338238106Sdes return -1; 339238106Sdes log_err("tube msg read failed: %s", strerror(errno)); 340238106Sdes return -1; /* we can still continue, perhaps */ 341238106Sdes } 342238106Sdes if(r == 0) /* EOF */ 343238106Sdes return 0; 344238106Sdes } else r = 0; 345238106Sdes if(!fd_set_block(fd)) 346238106Sdes return 0; 347238106Sdes /* read remainder */ 348238106Sdes d = r; 349238106Sdes while(d != (ssize_t)sizeof(*len)) { 350238106Sdes if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) { 351238106Sdes log_err("tube msg read failed: %s", strerror(errno)); 352238106Sdes (void)fd_set_nonblock(fd); 353238106Sdes return 0; 354238106Sdes } 355238106Sdes if(r == 0) /* EOF */ { 356238106Sdes (void)fd_set_nonblock(fd); 357238106Sdes return 0; 358238106Sdes } 359238106Sdes d += r; 360238106Sdes } 361249141Sdes log_assert(*len < 65536*2); 362238106Sdes *buf = (uint8_t*)malloc(*len); 363238106Sdes if(!*buf) { 364238106Sdes log_err("tube read out of memory"); 365238106Sdes (void)fd_set_nonblock(fd); 366238106Sdes return 0; 367238106Sdes } 368238106Sdes d = 0; 369266114Sdes while(d < (ssize_t)*len) { 370238106Sdes if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) { 371238106Sdes log_err("tube msg read failed: %s", strerror(errno)); 372238106Sdes (void)fd_set_nonblock(fd); 373238106Sdes free(*buf); 374238106Sdes return 0; 375238106Sdes } 376238106Sdes if(r == 0) { /* EOF */ 377238106Sdes (void)fd_set_nonblock(fd); 378238106Sdes free(*buf); 379238106Sdes return 0; 380238106Sdes } 381238106Sdes d += r; 382238106Sdes } 383238106Sdes if(!fd_set_nonblock(fd)) { 384238106Sdes free(*buf); 385238106Sdes return 0; 386238106Sdes } 387238106Sdes return 1; 388238106Sdes} 389238106Sdes 390238106Sdes/** perform a select() on the fd */ 391238106Sdesstatic int 392238106Sdespollit(int fd, struct timeval* t) 393238106Sdes{ 394238106Sdes fd_set r; 395238106Sdes#ifndef S_SPLINT_S 396238106Sdes FD_ZERO(&r); 397238106Sdes FD_SET(FD_SET_T fd, &r); 398238106Sdes#endif 399238106Sdes if(select(fd+1, &r, NULL, NULL, t) == -1) { 400238106Sdes return 0; 401238106Sdes } 402238106Sdes errno = 0; 403238106Sdes return (int)(FD_ISSET(fd, &r)); 404238106Sdes} 405238106Sdes 406238106Sdesint tube_poll(struct tube* tube) 407238106Sdes{ 408238106Sdes struct timeval t; 409238106Sdes memset(&t, 0, sizeof(t)); 410238106Sdes return pollit(tube->sr, &t); 411238106Sdes} 412238106Sdes 413238106Sdesint tube_wait(struct tube* tube) 414238106Sdes{ 415238106Sdes return pollit(tube->sr, NULL); 416238106Sdes} 417238106Sdes 418238106Sdesint tube_read_fd(struct tube* tube) 419238106Sdes{ 420238106Sdes return tube->sr; 421238106Sdes} 422238106Sdes 423238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 424238106Sdes tube_callback_t* cb, void* arg) 425238106Sdes{ 426238106Sdes tube->listen_cb = cb; 427238106Sdes tube->listen_arg = arg; 428238106Sdes if(!(tube->listen_com = comm_point_create_raw(base, tube->sr, 429238106Sdes 0, tube_handle_listen, tube))) { 430238106Sdes int err = errno; 431238106Sdes log_err("tube_setup_bg_l: commpoint creation failed"); 432238106Sdes errno = err; 433238106Sdes return 0; 434238106Sdes } 435238106Sdes return 1; 436238106Sdes} 437238106Sdes 438238106Sdesint tube_setup_bg_write(struct tube* tube, struct comm_base* base) 439238106Sdes{ 440238106Sdes if(!(tube->res_com = comm_point_create_raw(base, tube->sw, 441238106Sdes 1, tube_handle_write, tube))) { 442238106Sdes int err = errno; 443238106Sdes log_err("tube_setup_bg_w: commpoint creation failed"); 444238106Sdes errno = err; 445238106Sdes return 0; 446238106Sdes } 447238106Sdes return 1; 448238106Sdes} 449238106Sdes 450238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 451238106Sdes{ 452238106Sdes struct tube_res_list* item = 453238106Sdes (struct tube_res_list*)malloc(sizeof(*item)); 454238106Sdes if(!item) { 455238106Sdes free(msg); 456238106Sdes log_err("out of memory for async answer"); 457238106Sdes return 0; 458238106Sdes } 459238106Sdes item->buf = msg; 460238106Sdes item->len = len; 461238106Sdes item->next = NULL; 462238106Sdes /* add at back of list, since the first one may be partially written */ 463238106Sdes if(tube->res_last) 464238106Sdes tube->res_last->next = item; 465238106Sdes else tube->res_list = item; 466238106Sdes tube->res_last = item; 467238106Sdes if(tube->res_list == tube->res_last) { 468238106Sdes /* first added item, start the write process */ 469238106Sdes comm_point_start_listening(tube->res_com, -1, -1); 470238106Sdes } 471238106Sdes return 1; 472238106Sdes} 473238106Sdes 474238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 475238106Sdes void* ATTR_UNUSED(arg)) 476238106Sdes{ 477238106Sdes log_assert(0); 478238106Sdes} 479238106Sdes 480238106Sdes#else /* USE_WINSOCK */ 481238106Sdes/* on windows */ 482238106Sdes 483238106Sdes 484238106Sdesstruct tube* tube_create(void) 485238106Sdes{ 486238106Sdes /* windows does not have forks like unix, so we only support 487238106Sdes * threads on windows. And thus the pipe need only connect 488238106Sdes * threads. We use a mutex and a list of datagrams. */ 489238106Sdes struct tube* tube = (struct tube*)calloc(1, sizeof(*tube)); 490238106Sdes if(!tube) { 491238106Sdes int err = errno; 492238106Sdes log_err("tube_create: out of memory"); 493238106Sdes errno = err; 494238106Sdes return NULL; 495238106Sdes } 496238106Sdes tube->event = WSACreateEvent(); 497238106Sdes if(tube->event == WSA_INVALID_EVENT) { 498238106Sdes free(tube); 499238106Sdes log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError())); 500238106Sdes } 501238106Sdes if(!WSAResetEvent(tube->event)) { 502238106Sdes log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError())); 503238106Sdes } 504238106Sdes lock_basic_init(&tube->res_lock); 505238106Sdes verbose(VERB_ALGO, "tube created"); 506238106Sdes return tube; 507238106Sdes} 508238106Sdes 509238106Sdesvoid tube_delete(struct tube* tube) 510238106Sdes{ 511238106Sdes if(!tube) return; 512238106Sdes tube_remove_bg_listen(tube); 513238106Sdes tube_remove_bg_write(tube); 514238106Sdes tube_close_read(tube); 515238106Sdes tube_close_write(tube); 516238106Sdes if(!WSACloseEvent(tube->event)) 517238106Sdes log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError())); 518238106Sdes lock_basic_destroy(&tube->res_lock); 519238106Sdes verbose(VERB_ALGO, "tube deleted"); 520238106Sdes free(tube); 521238106Sdes} 522238106Sdes 523238106Sdesvoid tube_close_read(struct tube* ATTR_UNUSED(tube)) 524238106Sdes{ 525238106Sdes verbose(VERB_ALGO, "tube close_read"); 526238106Sdes} 527238106Sdes 528238106Sdesvoid tube_close_write(struct tube* ATTR_UNUSED(tube)) 529238106Sdes{ 530238106Sdes verbose(VERB_ALGO, "tube close_write"); 531238106Sdes /* wake up waiting reader with an empty queue */ 532238106Sdes if(!WSASetEvent(tube->event)) { 533238106Sdes log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 534238106Sdes } 535238106Sdes} 536238106Sdes 537238106Sdesvoid tube_remove_bg_listen(struct tube* tube) 538238106Sdes{ 539238106Sdes verbose(VERB_ALGO, "tube remove_bg_listen"); 540238106Sdes winsock_unregister_wsaevent(&tube->ev_listen); 541238106Sdes} 542238106Sdes 543238106Sdesvoid tube_remove_bg_write(struct tube* tube) 544238106Sdes{ 545238106Sdes verbose(VERB_ALGO, "tube remove_bg_write"); 546238106Sdes if(tube->res_list) { 547238106Sdes struct tube_res_list* np, *p = tube->res_list; 548238106Sdes tube->res_list = NULL; 549238106Sdes tube->res_last = NULL; 550238106Sdes while(p) { 551238106Sdes np = p->next; 552238106Sdes free(p->buf); 553238106Sdes free(p); 554238106Sdes p = np; 555238106Sdes } 556238106Sdes } 557238106Sdes} 558238106Sdes 559238106Sdesint tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len, 560238106Sdes int ATTR_UNUSED(nonblock)) 561238106Sdes{ 562238106Sdes uint8_t* a; 563238106Sdes verbose(VERB_ALGO, "tube write_msg len %d", (int)len); 564238106Sdes a = (uint8_t*)memdup(buf, len); 565238106Sdes if(!a) { 566238106Sdes log_err("out of memory in tube_write_msg"); 567238106Sdes return 0; 568238106Sdes } 569238106Sdes /* always nonblocking, this pipe cannot get full */ 570238106Sdes return tube_queue_item(tube, a, len); 571238106Sdes} 572238106Sdes 573238106Sdesint tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len, 574238106Sdes int nonblock) 575238106Sdes{ 576238106Sdes struct tube_res_list* item = NULL; 577238106Sdes verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking"); 578238106Sdes *buf = NULL; 579238106Sdes if(!tube_poll(tube)) { 580238106Sdes verbose(VERB_ALGO, "tube read_msg nodata"); 581238106Sdes /* nothing ready right now, wait if we want to */ 582238106Sdes if(nonblock) 583238106Sdes return -1; /* would block waiting for items */ 584238106Sdes if(!tube_wait(tube)) 585238106Sdes return 0; 586238106Sdes } 587238106Sdes lock_basic_lock(&tube->res_lock); 588238106Sdes if(tube->res_list) { 589238106Sdes item = tube->res_list; 590238106Sdes tube->res_list = item->next; 591238106Sdes if(tube->res_last == item) { 592238106Sdes /* the list is now empty */ 593238106Sdes tube->res_last = NULL; 594238106Sdes verbose(VERB_ALGO, "tube read_msg lastdata"); 595238106Sdes if(!WSAResetEvent(tube->event)) { 596238106Sdes log_err("WSAResetEvent: %s", 597238106Sdes wsa_strerror(WSAGetLastError())); 598238106Sdes } 599238106Sdes } 600238106Sdes } 601238106Sdes lock_basic_unlock(&tube->res_lock); 602238106Sdes if(!item) 603238106Sdes return 0; /* would block waiting for items */ 604238106Sdes *buf = item->buf; 605238106Sdes *len = item->len; 606238106Sdes free(item); 607238106Sdes verbose(VERB_ALGO, "tube read_msg len %d", (int)*len); 608238106Sdes return 1; 609238106Sdes} 610238106Sdes 611238106Sdesint tube_poll(struct tube* tube) 612238106Sdes{ 613238106Sdes struct tube_res_list* item = NULL; 614238106Sdes lock_basic_lock(&tube->res_lock); 615238106Sdes item = tube->res_list; 616238106Sdes lock_basic_unlock(&tube->res_lock); 617238106Sdes if(item) 618238106Sdes return 1; 619238106Sdes return 0; 620238106Sdes} 621238106Sdes 622238106Sdesint tube_wait(struct tube* tube) 623238106Sdes{ 624238106Sdes /* block on eventhandle */ 625238106Sdes DWORD res = WSAWaitForMultipleEvents( 626238106Sdes 1 /* one event in array */, 627238106Sdes &tube->event /* the event to wait for, our pipe signal */, 628238106Sdes 0 /* wait for all events is false */, 629238106Sdes WSA_INFINITE /* wait, no timeout */, 630238106Sdes 0 /* we are not alertable for IO completion routines */ 631238106Sdes ); 632238106Sdes if(res == WSA_WAIT_TIMEOUT) { 633238106Sdes return 0; 634238106Sdes } 635238106Sdes if(res == WSA_WAIT_IO_COMPLETION) { 636238106Sdes /* a bit unexpected, since we were not alertable */ 637238106Sdes return 0; 638238106Sdes } 639238106Sdes return 1; 640238106Sdes} 641238106Sdes 642238106Sdesint tube_read_fd(struct tube* ATTR_UNUSED(tube)) 643238106Sdes{ 644238106Sdes /* nothing sensible on Windows */ 645238106Sdes return -1; 646238106Sdes} 647238106Sdes 648238106Sdesint 649238106Sdestube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 650238106Sdes int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 651238106Sdes{ 652238106Sdes log_assert(0); 653238106Sdes return 0; 654238106Sdes} 655238106Sdes 656238106Sdesint 657238106Sdestube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg), 658238106Sdes int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info)) 659238106Sdes{ 660238106Sdes log_assert(0); 661238106Sdes return 0; 662238106Sdes} 663238106Sdes 664238106Sdesint tube_setup_bg_listen(struct tube* tube, struct comm_base* base, 665238106Sdes tube_callback_t* cb, void* arg) 666238106Sdes{ 667238106Sdes tube->listen_cb = cb; 668238106Sdes tube->listen_arg = arg; 669238106Sdes if(!comm_base_internal(base)) 670238106Sdes return 1; /* ignore when no comm base - testing */ 671238106Sdes return winsock_register_wsaevent(comm_base_internal(base), 672238106Sdes &tube->ev_listen, tube->event, &tube_handle_signal, tube); 673238106Sdes} 674238106Sdes 675238106Sdesint tube_setup_bg_write(struct tube* ATTR_UNUSED(tube), 676238106Sdes struct comm_base* ATTR_UNUSED(base)) 677238106Sdes{ 678238106Sdes /* the queue item routine performs the signaling */ 679238106Sdes return 1; 680238106Sdes} 681238106Sdes 682238106Sdesint tube_queue_item(struct tube* tube, uint8_t* msg, size_t len) 683238106Sdes{ 684238106Sdes struct tube_res_list* item = 685238106Sdes (struct tube_res_list*)malloc(sizeof(*item)); 686238106Sdes verbose(VERB_ALGO, "tube queue_item len %d", (int)len); 687238106Sdes if(!item) { 688238106Sdes free(msg); 689238106Sdes log_err("out of memory for async answer"); 690238106Sdes return 0; 691238106Sdes } 692238106Sdes item->buf = msg; 693238106Sdes item->len = len; 694238106Sdes item->next = NULL; 695238106Sdes lock_basic_lock(&tube->res_lock); 696238106Sdes /* add at back of list, since the first one may be partially written */ 697238106Sdes if(tube->res_last) 698238106Sdes tube->res_last->next = item; 699238106Sdes else tube->res_list = item; 700238106Sdes tube->res_last = item; 701238106Sdes /* signal the eventhandle */ 702238106Sdes if(!WSASetEvent(tube->event)) { 703238106Sdes log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError())); 704238106Sdes } 705238106Sdes lock_basic_unlock(&tube->res_lock); 706238106Sdes return 1; 707238106Sdes} 708238106Sdes 709238106Sdesvoid tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events), 710238106Sdes void* arg) 711238106Sdes{ 712238106Sdes struct tube* tube = (struct tube*)arg; 713238106Sdes uint8_t* buf; 714238106Sdes uint32_t len = 0; 715238106Sdes verbose(VERB_ALGO, "tube handle_signal"); 716238106Sdes while(tube_poll(tube)) { 717238106Sdes if(tube_read_msg(tube, &buf, &len, 1)) { 718238106Sdes fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb)); 719238106Sdes (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR, 720238106Sdes tube->listen_arg); 721238106Sdes } 722238106Sdes } 723238106Sdes} 724238106Sdes 725238106Sdes#endif /* USE_WINSOCK */ 726