Lines Matching defs:tube

2  * util/tube.c - pipe service
42 #include "util/tube.h"
56 struct tube* tube_create(void)
58 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
60 if(!tube) {
66 tube->sr = -1;
67 tube->sw = -1;
71 free(tube);
75 tube->sr = sv[0];
76 tube->sw = sv[1];
77 if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
79 log_err("tube: cannot set nonblocking");
80 tube_delete(tube);
84 return tube;
87 void tube_delete(struct tube* tube)
89 if(!tube) return;
90 tube_remove_bg_listen(tube);
91 tube_remove_bg_write(tube);
94 tube_close_read(tube);
95 tube_close_write(tube);
96 free(tube);
99 void tube_close_read(struct tube* tube)
101 if(tube->sr != -1) {
102 close(tube->sr);
103 tube->sr = -1;
107 void tube_close_write(struct tube* tube)
109 if(tube->sw != -1) {
110 close(tube->sw);
111 tube->sw = -1;
115 void tube_remove_bg_listen(struct tube* tube)
117 if(tube->listen_com) {
118 comm_point_delete(tube->listen_com);
119 tube->listen_com = NULL;
121 if(tube->cmd_msg) {
122 free(tube->cmd_msg);
123 tube->cmd_msg = NULL;
127 void tube_remove_bg_write(struct tube* tube)
129 if(tube->res_com) {
130 comm_point_delete(tube->res_com);
131 tube->res_com = NULL;
133 if(tube->res_list) {
134 struct tube_res_list* np, *p = tube->res_list;
135 tube->res_list = NULL;
136 tube->res_last = NULL;
150 struct tube* tube = (struct tube*)arg;
153 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
154 (*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
158 if(tube->cmd_read < sizeof(tube->cmd_len)) {
160 r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
161 sizeof(tube->cmd_len) - tube->cmd_read);
165 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
166 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
167 tube->listen_arg);
177 tube->cmd_read += r;
178 if(tube->cmd_read < sizeof(tube->cmd_len)) {
182 tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
183 if(!tube->cmd_msg) {
185 tube->cmd_read = 0;
190 r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
191 tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
195 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
196 (*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
197 tube->listen_arg);
207 tube->cmd_read += r;
208 if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
212 tube->cmd_read = 0;
214 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
215 (*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
216 NETEVENT_NOERROR, tube->listen_arg);
218 tube->cmd_msg = NULL;
226 struct tube* tube = (struct tube*)arg;
227 struct tube_res_list* item = tube->res_list;
239 if(tube->res_write < sizeof(item->len)) {
240 r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
241 sizeof(item->len) - tube->res_write);
253 tube->res_write += r;
254 if(tube->res_write < sizeof(item->len))
257 r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
258 item->len - (tube->res_write - sizeof(item->len)));
270 tube->res_write += r;
271 if(tube->res_write < sizeof(item->len) + item->len)
276 tube->res_list = tube->res_list->next;
278 if(!tube->res_list) {
279 tube->res_last = NULL;
282 tube->res_write = 0;
286 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
290 int fd = tube->sw;
298 log_err("tube msg write failed: %s", strerror(errno));
308 log_err("tube msg write failed: %s", strerror(errno));
317 log_err("tube msg write failed: %s", strerror(errno));
328 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
332 int fd = tube->sr;
341 log_err("tube msg read failed: %s", strerror(errno));
353 log_err("tube msg read failed: %s", strerror(errno));
366 log_err("tube read out of memory");
373 log_err("tube msg read failed: %s", strerror(errno));
408 int tube_poll(struct tube* tube)
412 return pollit(tube->sr, &t);
415 int tube_wait(struct tube* tube)
417 return pollit(tube->sr, NULL);
420 int tube_read_fd(struct tube* tube)
422 return tube->sr;
425 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
428 tube->listen_cb = cb;
429 tube->listen_arg = arg;
430 if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
431 0, tube_handle_listen, tube))) {
440 int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
442 if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
443 1, tube_handle_write, tube))) {
452 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
465 if(tube->res_last)
466 tube->res_last->next = item;
467 else tube->res_list = item;
468 tube->res_last = item;
469 if(tube->res_list == tube->res_last) {
471 comm_point_start_listening(tube->res_com, -1, -1);
486 struct tube* tube_create(void)
491 struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
492 if(!tube) {
498 tube->event = WSACreateEvent();
499 if(tube->event == WSA_INVALID_EVENT) {
500 free(tube);
503 if(!WSAResetEvent(tube->event)) {
506 lock_basic_init(&tube->res_lock);
507 verbose(VERB_ALGO, "tube created");
508 return tube;
511 void tube_delete(struct tube* tube)
513 if(!tube) return;
514 tube_remove_bg_listen(tube);
515 tube_remove_bg_write(tube);
516 tube_close_read(tube);
517 tube_close_write(tube);
518 if(!WSACloseEvent(tube->event))
520 lock_basic_destroy(&tube->res_lock);
521 verbose(VERB_ALGO, "tube deleted");
522 free(tube);
525 void tube_close_read(struct tube* ATTR_UNUSED(tube))
527 verbose(VERB_ALGO, "tube close_read");
530 void tube_close_write(struct tube* ATTR_UNUSED(tube))
532 verbose(VERB_ALGO, "tube close_write");
534 if(!WSASetEvent(tube->event)) {
539 void tube_remove_bg_listen(struct tube* tube)
541 verbose(VERB_ALGO, "tube remove_bg_listen");
542 winsock_unregister_wsaevent(&tube->ev_listen);
545 void tube_remove_bg_write(struct tube* tube)
547 verbose(VERB_ALGO, "tube remove_bg_write");
548 if(tube->res_list) {
549 struct tube_res_list* np, *p = tube->res_list;
550 tube->res_list = NULL;
551 tube->res_last = NULL;
561 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
565 verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
572 return tube_queue_item(tube, a, len);
575 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
579 verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
581 if(!tube_poll(tube)) {
582 verbose(VERB_ALGO, "tube read_msg nodata");
586 if(!tube_wait(tube))
589 lock_basic_lock(&tube->res_lock);
590 if(tube->res_list) {
591 item = tube->res_list;
592 tube->res_list = item->next;
593 if(tube->res_last == item) {
595 tube->res_last = NULL;
596 verbose(VERB_ALGO, "tube read_msg lastdata");
597 if(!WSAResetEvent(tube->event)) {
603 lock_basic_unlock(&tube->res_lock);
609 verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
613 int tube_poll(struct tube* tube)
616 lock_basic_lock(&tube->res_lock);
617 item = tube->res_list;
618 lock_basic_unlock(&tube->res_lock);
624 int tube_wait(struct tube* tube)
629 &tube->event /* the event to wait for, our pipe signal */,
644 int tube_read_fd(struct tube* ATTR_UNUSED(tube))
666 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
669 tube->listen_cb = cb;
670 tube->listen_arg = arg;
674 &tube->ev_listen, tube->event, &tube_handle_signal, tube);
677 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
684 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
688 verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
697 lock_basic_lock(&tube->res_lock);
699 if(tube->res_last)
700 tube->res_last->next = item;
701 else tube->res_list = item;
702 tube->res_last = item;
704 if(!WSASetEvent(tube->event)) {
707 lock_basic_unlock(&tube->res_lock);
714 struct tube* tube = (struct tube*)arg;
717 verbose(VERB_ALGO, "tube handle_signal");
718 while(tube_poll(tube)) {
719 if(tube_read_msg(tube, &buf, &len, 1)) {
720 fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
721 (*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
722 tube->listen_arg);