1/*- 2 * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29#include <sys/param.h> 30#include <sys/socket.h> 31#include <sys/uio.h> 32 33#include <netinet/in.h> 34 35#include <assert.h> 36#include <errno.h> 37#include <pthread.h> 38#include <stdarg.h> 39#include <stdio.h> 40#include <stdlib.h> 41#include <string.h> 42#include <unistd.h> 43 44#include "misc.h" 45#include "mux.h" 46 47/* 48 * Packet types. 49 */ 50#define MUX_STARTUPREQ 0 51#define MUX_STARTUPREP 1 52#define MUX_CONNECT 2 53#define MUX_ACCEPT 3 54#define MUX_RESET 4 55#define MUX_DATA 5 56#define MUX_WINDOW 6 57#define MUX_CLOSE 7 58 59/* 60 * Header sizes. 61 */ 62#define MUX_STARTUPHDRSZ 3 63#define MUX_CONNECTHDRSZ 8 64#define MUX_ACCEPTHDRSZ 8 65#define MUX_RESETHDRSZ 2 66#define MUX_DATAHDRSZ 4 67#define MUX_WINDOWHDRSZ 6 68#define MUX_CLOSEHDRSZ 2 69 70#define MUX_PROTOVER 0 /* Protocol version. */ 71 72struct mux_header { 73 uint8_t type; 74 union { 75 struct { 76 uint16_t version; 77 } __packed mh_startup; 78 struct { 79 uint8_t id; 80 uint16_t mss; 81 uint32_t window; 82 } __packed mh_connect; 83 struct { 84 uint8_t id; 85 uint16_t mss; 86 uint32_t window; 87 } __packed mh_accept; 88 struct { 89 uint8_t id; 90 } __packed mh_reset; 91 struct { 92 uint8_t id; 93 uint16_t len; 94 } __packed mh_data; 95 struct { 96 uint8_t id; 97 uint32_t window; 98 } __packed mh_window; 99 struct { 100 uint8_t id; 101 } __packed mh_close; 102 } mh_u; 103} __packed; 104 105#define mh_startup mh_u.mh_startup 106#define mh_connect mh_u.mh_connect 107#define mh_accept mh_u.mh_accept 108#define mh_reset mh_u.mh_reset 109#define mh_data mh_u.mh_data 110#define mh_window mh_u.mh_window 111#define mh_close mh_u.mh_close 112 113#define MUX_MAXCHAN 2 114 115/* Channel states. */ 116#define CS_UNUSED 0 117#define CS_LISTENING 1 118#define CS_CONNECTING 2 119#define CS_ESTABLISHED 3 120#define CS_RDCLOSED 4 121#define CS_WRCLOSED 5 122#define CS_CLOSED 6 123 124/* Channel flags. */ 125#define CF_CONNECT 0x01 126#define CF_ACCEPT 0x02 127#define CF_RESET 0x04 128#define CF_WINDOW 0x08 129#define CF_DATA 0x10 130#define CF_CLOSE 0x20 131 132#define CHAN_SBSIZE (16 * 1024) /* Send buffer size. */ 133#define CHAN_RBSIZE (16 * 1024) /* Receive buffer size. */ 134#define CHAN_MAXSEGSIZE 1024 /* Maximum segment size. */ 135 136/* Circular buffer. */ 137struct buf { 138 uint8_t *data; 139 size_t size; 140 size_t in; 141 size_t out; 142}; 143 144struct chan { 145 int flags; 146 int state; 147 pthread_mutex_t lock; 148 struct mux *mux; 149 150 /* Receiver state variables. */ 151 struct buf *recvbuf; 152 pthread_cond_t rdready; 153 uint32_t recvseq; 154 uint16_t recvmss; 155 156 /* Sender state variables. */ 157 struct buf *sendbuf; 158 pthread_cond_t wrready; 159 uint32_t sendseq; 160 uint32_t sendwin; 161 uint16_t sendmss; 162}; 163 164struct mux { 165 int closed; 166 int status; 167 int socket; 168 pthread_mutex_t lock; 169 pthread_cond_t done; 170 struct chan *channels[MUX_MAXCHAN]; 171 int nchans; 172 173 /* Sender thread data. */ 174 pthread_t sender; 175 pthread_cond_t sender_newwork; 176 pthread_cond_t sender_started; 177 int sender_waiting; 178 int sender_ready; 179 int sender_lastid; 180 181 /* Receiver thread data. */ 182 pthread_t receiver; 183}; 184 185static int sock_writev(int, struct iovec *, int); 186static int sock_write(int, void *, size_t); 187static ssize_t sock_read(int, void *, size_t); 188static int sock_readwait(int, void *, size_t); 189 190static int mux_init(struct mux *); 191static void mux_lock(struct mux *); 192static void mux_unlock(struct mux *); 193 194static struct chan *chan_new(struct mux *); 195static struct chan *chan_get(struct mux *, int); 196static struct chan *chan_connect(struct mux *, int); 197static void chan_lock(struct chan *); 198static void chan_unlock(struct chan *); 199static int chan_insert(struct mux *, struct chan *); 200static void chan_free(struct chan *); 201 202static struct buf *buf_new(size_t); 203static size_t buf_count(struct buf *); 204static size_t buf_avail(struct buf *); 205static void buf_get(struct buf *, void *, size_t); 206static void buf_put(struct buf *, const void *, size_t); 207static void buf_free(struct buf *); 208 209static void sender_wakeup(struct mux *); 210static void *sender_loop(void *); 211static int sender_waitforwork(struct mux *, int *); 212static int sender_scan(struct mux *, int *); 213static void sender_cleanup(void *); 214 215static void *receiver_loop(void *); 216 217static int 218sock_writev(int s, struct iovec *iov, int iovcnt) 219{ 220 ssize_t nbytes; 221 222again: 223 nbytes = writev(s, iov, iovcnt); 224 if (nbytes != -1) { 225 while (nbytes > 0 && (size_t)nbytes >= iov->iov_len) { 226 nbytes -= iov->iov_len; 227 iov++; 228 iovcnt--; 229 } 230 if (nbytes == 0) 231 return (0); 232 iov->iov_len -= nbytes; 233 iov->iov_base = (char *)iov->iov_base + nbytes; 234 } else if (errno != EINTR) { 235 return (-1); 236 } 237 goto again; 238} 239 240static int 241sock_write(int s, void *buf, size_t size) 242{ 243 struct iovec iov; 244 int ret; 245 246 iov.iov_base = buf; 247 iov.iov_len = size; 248 ret = sock_writev(s, &iov, 1); 249 return (ret); 250} 251 252static ssize_t 253sock_read(int s, void *buf, size_t size) 254{ 255 ssize_t nbytes; 256 257again: 258 nbytes = read(s, buf, size); 259 if (nbytes == -1 && errno == EINTR) 260 goto again; 261 return (nbytes); 262} 263 264static int 265sock_readwait(int s, void *buf, size_t size) 266{ 267 char *cp; 268 ssize_t nbytes; 269 size_t left; 270 271 cp = buf; 272 left = size; 273 while (left > 0) { 274 nbytes = sock_read(s, cp, left); 275 if (nbytes == 0) { 276 errno = ECONNRESET; 277 return (-1); 278 } 279 if (nbytes < 0) 280 return (-1); 281 left -= nbytes; 282 cp += nbytes; 283 } 284 return (0); 285} 286 287static void 288mux_lock(struct mux *m) 289{ 290 int error; 291 292 error = pthread_mutex_lock(&m->lock); 293 assert(!error); 294} 295 296static void 297mux_unlock(struct mux *m) 298{ 299 int error; 300 301 error = pthread_mutex_unlock(&m->lock); 302 assert(!error); 303} 304 305/* Create a TCP multiplexer on the given socket. */ 306struct mux * 307mux_open(int sock, struct chan **chan) 308{ 309 struct mux *m; 310 struct chan *chan0; 311 int error; 312 313 m = xmalloc(sizeof(struct mux)); 314 memset(m->channels, 0, sizeof(m->channels)); 315 m->nchans = 0; 316 m->closed = 0; 317 m->status = -1; 318 m->socket = sock; 319 320 m->sender_waiting = 0; 321 m->sender_lastid = 0; 322 m->sender_ready = 0; 323 pthread_mutex_init(&m->lock, NULL); 324 pthread_cond_init(&m->done, NULL); 325 pthread_cond_init(&m->sender_newwork, NULL); 326 pthread_cond_init(&m->sender_started, NULL); 327 328 error = mux_init(m); 329 if (error) 330 goto bad; 331 chan0 = chan_connect(m, 0); 332 if (chan0 == NULL) 333 goto bad; 334 *chan = chan0; 335 return (m); 336bad: 337 mux_shutdown(m, NULL, STATUS_FAILURE); 338 (void)mux_close(m); 339 return (NULL); 340} 341 342int 343mux_close(struct mux *m) 344{ 345 struct chan *chan; 346 int i, status; 347 348 assert(m->closed); 349 for (i = 0; i < m->nchans; i++) { 350 chan = m->channels[i]; 351 if (chan != NULL) 352 chan_free(chan); 353 } 354 pthread_cond_destroy(&m->sender_started); 355 pthread_cond_destroy(&m->sender_newwork); 356 pthread_cond_destroy(&m->done); 357 pthread_mutex_destroy(&m->lock); 358 status = m->status; 359 free(m); 360 return (status); 361} 362 363/* Close a channel. */ 364int 365chan_close(struct chan *chan) 366{ 367 368 chan_lock(chan); 369 if (chan->state == CS_ESTABLISHED) { 370 chan->state = CS_WRCLOSED; 371 chan->flags |= CF_CLOSE; 372 } else if (chan->state == CS_RDCLOSED) { 373 chan->state = CS_CLOSED; 374 chan->flags |= CF_CLOSE; 375 } else if (chan->state == CS_WRCLOSED || chan->state == CS_CLOSED) { 376 chan_unlock(chan); 377 return (0); 378 } else { 379 chan_unlock(chan); 380 return (-1); 381 } 382 chan_unlock(chan); 383 sender_wakeup(chan->mux); 384 return (0); 385} 386 387void 388chan_wait(struct chan *chan) 389{ 390 391 chan_lock(chan); 392 while (chan->state != CS_CLOSED) 393 pthread_cond_wait(&chan->rdready, &chan->lock); 394 chan_unlock(chan); 395} 396 397/* Returns the ID of an available channel in the listening state. */ 398int 399chan_listen(struct mux *m) 400{ 401 struct chan *chan; 402 int i; 403 404 mux_lock(m); 405 for (i = 0; i < m->nchans; i++) { 406 chan = m->channels[i]; 407 chan_lock(chan); 408 if (chan->state == CS_UNUSED) { 409 mux_unlock(m); 410 chan->state = CS_LISTENING; 411 chan_unlock(chan); 412 return (i); 413 } 414 chan_unlock(chan); 415 } 416 mux_unlock(m); 417 chan = chan_new(m); 418 chan->state = CS_LISTENING; 419 i = chan_insert(m, chan); 420 if (i == -1) 421 chan_free(chan); 422 return (i); 423} 424 425struct chan * 426chan_accept(struct mux *m, int id) 427{ 428 struct chan *chan; 429 430 chan = chan_get(m, id); 431 while (chan->state == CS_LISTENING) 432 pthread_cond_wait(&chan->rdready, &chan->lock); 433 if (chan->state != CS_ESTABLISHED) { 434 errno = ECONNRESET; 435 chan_unlock(chan); 436 return (NULL); 437 } 438 chan_unlock(chan); 439 return (chan); 440} 441 442/* Read bytes from a channel. */ 443ssize_t 444chan_read(struct chan *chan, void *buf, size_t size) 445{ 446 char *cp; 447 size_t count, n; 448 449 cp = buf; 450 chan_lock(chan); 451 for (;;) { 452 if (chan->state == CS_RDCLOSED || chan->state == CS_CLOSED) { 453 chan_unlock(chan); 454 return (0); 455 } 456 if (chan->state != CS_ESTABLISHED && 457 chan->state != CS_WRCLOSED) { 458 chan_unlock(chan); 459 errno = EBADF; 460 return (-1); 461 } 462 count = buf_count(chan->recvbuf); 463 if (count > 0) 464 break; 465 pthread_cond_wait(&chan->rdready, &chan->lock); 466 } 467 n = min(count, size); 468 buf_get(chan->recvbuf, cp, n); 469 chan->recvseq += n; 470 chan->flags |= CF_WINDOW; 471 chan_unlock(chan); 472 /* We need to wake up the sender so that it sends a window update. */ 473 sender_wakeup(chan->mux); 474 return (n); 475} 476 477/* Write bytes to a channel. */ 478ssize_t 479chan_write(struct chan *chan, const void *buf, size_t size) 480{ 481 const char *cp; 482 size_t avail, n, pos; 483 484 pos = 0; 485 cp = buf; 486 chan_lock(chan); 487 while (pos < size) { 488 for (;;) { 489 if (chan->state != CS_ESTABLISHED && 490 chan->state != CS_RDCLOSED) { 491 chan_unlock(chan); 492 errno = EPIPE; 493 return (-1); 494 } 495 avail = buf_avail(chan->sendbuf); 496 if (avail > 0) 497 break; 498 pthread_cond_wait(&chan->wrready, &chan->lock); 499 } 500 n = min(avail, size - pos); 501 buf_put(chan->sendbuf, cp + pos, n); 502 pos += n; 503 } 504 chan_unlock(chan); 505 sender_wakeup(chan->mux); 506 return (size); 507} 508 509/* 510 * Internal channel API. 511 */ 512 513static struct chan * 514chan_connect(struct mux *m, int id) 515{ 516 struct chan *chan; 517 518 chan = chan_get(m, id); 519 if (chan->state != CS_UNUSED) { 520 chan_unlock(chan); 521 return (NULL); 522 } 523 chan->state = CS_CONNECTING; 524 chan->flags |= CF_CONNECT; 525 chan_unlock(chan); 526 sender_wakeup(m); 527 chan_lock(chan); 528 while (chan->state == CS_CONNECTING) 529 pthread_cond_wait(&chan->wrready, &chan->lock); 530 if (chan->state != CS_ESTABLISHED) { 531 chan_unlock(chan); 532 return (NULL); 533 } 534 chan_unlock(chan); 535 return (chan); 536} 537 538/* 539 * Get a channel from its ID, creating it if necessary. 540 * The channel is returned locked. 541 */ 542static struct chan * 543chan_get(struct mux *m, int id) 544{ 545 struct chan *chan; 546 547 assert(id < MUX_MAXCHAN); 548 mux_lock(m); 549 chan = m->channels[id]; 550 if (chan == NULL) { 551 chan = chan_new(m); 552 m->channels[id] = chan; 553 m->nchans++; 554 } 555 chan_lock(chan); 556 mux_unlock(m); 557 return (chan); 558} 559 560/* Lock a channel. */ 561static void 562chan_lock(struct chan *chan) 563{ 564 int error; 565 566 error = pthread_mutex_lock(&chan->lock); 567 assert(!error); 568} 569 570/* Unlock a channel. */ 571static void 572chan_unlock(struct chan *chan) 573{ 574 int error; 575 576 error = pthread_mutex_unlock(&chan->lock); 577 assert(!error); 578} 579 580/* 581 * Create a new channel. 582 */ 583static struct chan * 584chan_new(struct mux *m) 585{ 586 struct chan *chan; 587 588 chan = xmalloc(sizeof(struct chan)); 589 chan->state = CS_UNUSED; 590 chan->flags = 0; 591 chan->mux = m; 592 chan->sendbuf = buf_new(CHAN_SBSIZE); 593 chan->sendseq = 0; 594 chan->sendwin = 0; 595 chan->sendmss = 0; 596 chan->recvbuf = buf_new(CHAN_RBSIZE); 597 chan->recvseq = 0; 598 chan->recvmss = CHAN_MAXSEGSIZE; 599 pthread_mutex_init(&chan->lock, NULL); 600 pthread_cond_init(&chan->rdready, NULL); 601 pthread_cond_init(&chan->wrready, NULL); 602 return (chan); 603} 604 605/* Free any resources associated with a channel. */ 606static void 607chan_free(struct chan *chan) 608{ 609 610 pthread_cond_destroy(&chan->rdready); 611 pthread_cond_destroy(&chan->wrready); 612 pthread_mutex_destroy(&chan->lock); 613 buf_free(chan->recvbuf); 614 buf_free(chan->sendbuf); 615 free(chan); 616} 617 618/* Insert the new channel in the channel list. */ 619static int 620chan_insert(struct mux *m, struct chan *chan) 621{ 622 int i; 623 624 mux_lock(m); 625 for (i = 0; i < MUX_MAXCHAN; i++) { 626 if (m->channels[i] == NULL) { 627 m->channels[i] = chan; 628 m->nchans++; 629 mux_unlock(m); 630 return (i); 631 } 632 } 633 errno = ENOBUFS; 634 return (-1); 635} 636 637/* 638 * Initialize the multiplexer protocol. 639 * 640 * This means negotiating protocol version and starting 641 * the receiver and sender threads. 642 */ 643static int 644mux_init(struct mux *m) 645{ 646 struct mux_header mh; 647 int error; 648 649 mh.type = MUX_STARTUPREQ; 650 mh.mh_startup.version = htons(MUX_PROTOVER); 651 error = sock_write(m->socket, &mh, MUX_STARTUPHDRSZ); 652 if (error) 653 return (-1); 654 error = sock_readwait(m->socket, &mh, MUX_STARTUPHDRSZ); 655 if (error) 656 return (-1); 657 if (mh.type != MUX_STARTUPREP || 658 ntohs(mh.mh_startup.version) != MUX_PROTOVER) 659 return (-1); 660 mux_lock(m); 661 error = pthread_create(&m->sender, NULL, sender_loop, m); 662 if (error) { 663 mux_unlock(m); 664 return (-1); 665 } 666 /* 667 * Make sure the sender thread has run and is waiting for new work 668 * before going on. Otherwise, it might lose the race and a 669 * request, which will cause a deadlock. 670 */ 671 while (!m->sender_ready) 672 pthread_cond_wait(&m->sender_started, &m->lock); 673 674 mux_unlock(m); 675 error = pthread_create(&m->receiver, NULL, receiver_loop, m); 676 if (error) 677 return (-1); 678 return (0); 679} 680 681/* 682 * Close all the channels, terminate the sender and receiver thread. 683 * This is an important function because it is used everytime we need 684 * to wake up all the worker threads to abort the program. 685 * 686 * This function accepts an error message that will be printed if the 687 * multiplexer wasn't already closed. This is useful because it ensures 688 * that only the first error message will be printed, and that it will 689 * be printed before doing the actual shutdown work. If this is a 690 * normal shutdown, NULL can be passed instead. 691 * 692 * The "status" parameter of the first mux_shutdown() call is retained 693 * and then returned by mux_close(), so that the main thread can know 694 * what type of error happened in the end, if any. 695 */ 696void 697mux_shutdown(struct mux *m, const char *errmsg, int status) 698{ 699 pthread_t self, sender, receiver; 700 struct chan *chan; 701 const char *name; 702 void *val; 703 int i, ret; 704 705 mux_lock(m); 706 if (m->closed) { 707 mux_unlock(m); 708 return; 709 } 710 m->closed = 1; 711 m->status = status; 712 self = pthread_self(); 713 sender = m->sender; 714 receiver = m->receiver; 715 if (errmsg != NULL) { 716 if (pthread_equal(self, receiver)) 717 name = "Receiver"; 718 else if (pthread_equal(self, sender)) 719 name = "Sender"; 720 else 721 name = NULL; 722 if (name == NULL) 723 lprintf(-1, "%s\n", errmsg); 724 else 725 lprintf(-1, "%s: %s\n", name, errmsg); 726 } 727 728 for (i = 0; i < MUX_MAXCHAN; i++) { 729 if (m->channels[i] != NULL) { 730 chan = m->channels[i]; 731 chan_lock(chan); 732 if (chan->state != CS_UNUSED) { 733 chan->state = CS_CLOSED; 734 chan->flags = 0; 735 pthread_cond_broadcast(&chan->rdready); 736 pthread_cond_broadcast(&chan->wrready); 737 } 738 chan_unlock(chan); 739 } 740 } 741 mux_unlock(m); 742 743 if (!pthread_equal(self, receiver)) { 744 ret = pthread_cancel(receiver); 745 assert(!ret); 746 pthread_join(receiver, &val); 747 assert(val == PTHREAD_CANCELED); 748 } 749 if (!pthread_equal(self, sender)) { 750 ret = pthread_cancel(sender); 751 assert(!ret); 752 pthread_join(sender, &val); 753 assert(val == PTHREAD_CANCELED); 754 } 755} 756 757static void 758sender_wakeup(struct mux *m) 759{ 760 int waiting; 761 762 mux_lock(m); 763 waiting = m->sender_waiting; 764 mux_unlock(m); 765 /* 766 * We don't care about the race here: if the sender was 767 * waiting and is not anymore, we'll just send a useless 768 * signal; if he wasn't waiting then he won't go to sleep 769 * before having sent what we want him to. 770 */ 771 if (waiting) 772 pthread_cond_signal(&m->sender_newwork); 773} 774 775static void * 776sender_loop(void *arg) 777{ 778 struct iovec iov[3]; 779 struct mux_header mh; 780 struct mux *m; 781 struct chan *chan; 782 struct buf *buf; 783 uint32_t winsize; 784 uint16_t hdrsize, size, len; 785 int error, id, iovcnt, what = 0; 786 787 m = (struct mux *)arg; 788 what = 0; 789again: 790 id = sender_waitforwork(m, &what); 791 chan = chan_get(m, id); 792 hdrsize = size = 0; 793 switch (what) { 794 case CF_CONNECT: 795 mh.type = MUX_CONNECT; 796 mh.mh_connect.id = id; 797 mh.mh_connect.mss = htons(chan->recvmss); 798 mh.mh_connect.window = htonl(chan->recvseq + 799 chan->recvbuf->size); 800 hdrsize = MUX_CONNECTHDRSZ; 801 break; 802 case CF_ACCEPT: 803 mh.type = MUX_ACCEPT; 804 mh.mh_accept.id = id; 805 mh.mh_accept.mss = htons(chan->recvmss); 806 mh.mh_accept.window = htonl(chan->recvseq + 807 chan->recvbuf->size); 808 hdrsize = MUX_ACCEPTHDRSZ; 809 break; 810 case CF_RESET: 811 mh.type = MUX_RESET; 812 mh.mh_reset.id = id; 813 hdrsize = MUX_RESETHDRSZ; 814 break; 815 case CF_WINDOW: 816 mh.type = MUX_WINDOW; 817 mh.mh_window.id = id; 818 mh.mh_window.window = htonl(chan->recvseq + 819 chan->recvbuf->size); 820 hdrsize = MUX_WINDOWHDRSZ; 821 break; 822 case CF_DATA: 823 mh.type = MUX_DATA; 824 mh.mh_data.id = id; 825 size = min(buf_count(chan->sendbuf), chan->sendmss); 826 winsize = chan->sendwin - chan->sendseq; 827 if (winsize < size) 828 size = winsize; 829 mh.mh_data.len = htons(size); 830 hdrsize = MUX_DATAHDRSZ; 831 break; 832 case CF_CLOSE: 833 mh.type = MUX_CLOSE; 834 mh.mh_close.id = id; 835 hdrsize = MUX_CLOSEHDRSZ; 836 break; 837 } 838 if (size > 0) { 839 assert(mh.type == MUX_DATA); 840 /* 841 * Older FreeBSD versions (and maybe other OSes) have the 842 * iov_base field defined as char *. Cast to char * to 843 * silence a warning in this case. 844 */ 845 iov[0].iov_base = (char *)&mh; 846 iov[0].iov_len = hdrsize; 847 iovcnt = 1; 848 /* We access the buffer directly to avoid some copying. */ 849 buf = chan->sendbuf; 850 len = min(size, buf->size + 1 - buf->out); 851 iov[iovcnt].iov_base = buf->data + buf->out; 852 iov[iovcnt].iov_len = len; 853 iovcnt++; 854 if (size > len) { 855 /* Wrapping around. */ 856 iov[iovcnt].iov_base = buf->data; 857 iov[iovcnt].iov_len = size - len; 858 iovcnt++; 859 } 860 /* 861 * Since we're the only thread sending bytes from the 862 * buffer and modifying buf->out, it's safe to unlock 863 * here during I/O. It avoids keeping the channel lock 864 * too long, since write() might block. 865 */ 866 chan_unlock(chan); 867 error = sock_writev(m->socket, iov, iovcnt); 868 if (error) 869 goto bad; 870 chan_lock(chan); 871 chan->sendseq += size; 872 buf->out += size; 873 if (buf->out > buf->size) 874 buf->out -= buf->size + 1; 875 pthread_cond_signal(&chan->wrready); 876 chan_unlock(chan); 877 } else { 878 chan_unlock(chan); 879 error = sock_write(m->socket, &mh, hdrsize); 880 if (error) 881 goto bad; 882 } 883 goto again; 884bad: 885 if (error == EPIPE) 886 mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE); 887 else 888 mux_shutdown(m, strerror(errno), STATUS_FAILURE); 889 return (NULL); 890} 891 892static void 893sender_cleanup(void *arg) 894{ 895 struct mux *m; 896 897 m = (struct mux *)arg; 898 mux_unlock(m); 899} 900 901static int 902sender_waitforwork(struct mux *m, int *what) 903{ 904 int id; 905 906 mux_lock(m); 907 pthread_cleanup_push(sender_cleanup, m); 908 if (!m->sender_ready) { 909 pthread_cond_signal(&m->sender_started); 910 m->sender_ready = 1; 911 } 912 while ((id = sender_scan(m, what)) == -1) { 913 m->sender_waiting = 1; 914 pthread_cond_wait(&m->sender_newwork, &m->lock); 915 } 916 m->sender_waiting = 0; 917 pthread_cleanup_pop(1); 918 return (id); 919} 920 921/* 922 * Scan for work to do for the sender. Has to be called with 923 * the multiplexer lock held. 924 */ 925static int 926sender_scan(struct mux *m, int *what) 927{ 928 struct chan *chan; 929 int id; 930 931 if (m->nchans <= 0) 932 return (-1); 933 id = m->sender_lastid; 934 do { 935 id++; 936 if (id >= m->nchans) 937 id = 0; 938 chan = m->channels[id]; 939 chan_lock(chan); 940 if (chan->state != CS_UNUSED) { 941 if (chan->sendseq != chan->sendwin && 942 buf_count(chan->sendbuf) > 0) 943 chan->flags |= CF_DATA; 944 if (chan->flags) { 945 /* By order of importance. */ 946 if (chan->flags & CF_CONNECT) 947 *what = CF_CONNECT; 948 else if (chan->flags & CF_ACCEPT) 949 *what = CF_ACCEPT; 950 else if (chan->flags & CF_RESET) 951 *what = CF_RESET; 952 else if (chan->flags & CF_WINDOW) 953 *what = CF_WINDOW; 954 else if (chan->flags & CF_DATA) 955 *what = CF_DATA; 956 else if (chan->flags & CF_CLOSE) 957 *what = CF_CLOSE; 958 chan->flags &= ~*what; 959 chan_unlock(chan); 960 m->sender_lastid = id; 961 return (id); 962 } 963 } 964 chan_unlock(chan); 965 } while (id != m->sender_lastid); 966 return (-1); 967} 968 969/* Read the rest of a packet header depending on its type. */ 970#define SOCK_READREST(s, mh, hsize) \ 971 sock_readwait(s, (char *)&mh + sizeof(mh.type), (hsize) - sizeof(mh.type)) 972 973void * 974receiver_loop(void *arg) 975{ 976 struct mux_header mh; 977 struct mux *m; 978 struct chan *chan; 979 struct buf *buf; 980 uint16_t size, len; 981 int error; 982 983 m = (struct mux *)arg; 984 while ((error = sock_readwait(m->socket, &mh.type, 985 sizeof(mh.type))) == 0) { 986 switch (mh.type) { 987 case MUX_CONNECT: 988 error = SOCK_READREST(m->socket, mh, MUX_CONNECTHDRSZ); 989 if (error) 990 goto bad; 991 chan = chan_get(m, mh.mh_connect.id); 992 if (chan->state == CS_LISTENING) { 993 chan->state = CS_ESTABLISHED; 994 chan->sendmss = ntohs(mh.mh_connect.mss); 995 chan->sendwin = ntohl(mh.mh_connect.window); 996 chan->flags |= CF_ACCEPT; 997 pthread_cond_signal(&chan->rdready); 998 } else 999 chan->flags |= CF_RESET; 1000 chan_unlock(chan); 1001 sender_wakeup(m); 1002 break; 1003 case MUX_ACCEPT: 1004 error = SOCK_READREST(m->socket, mh, MUX_ACCEPTHDRSZ); 1005 if (error) 1006 goto bad; 1007 chan = chan_get(m, mh.mh_accept.id); 1008 if (chan->state == CS_CONNECTING) { 1009 chan->sendmss = ntohs(mh.mh_accept.mss); 1010 chan->sendwin = ntohl(mh.mh_accept.window); 1011 chan->state = CS_ESTABLISHED; 1012 pthread_cond_signal(&chan->wrready); 1013 chan_unlock(chan); 1014 } else { 1015 chan->flags |= CF_RESET; 1016 chan_unlock(chan); 1017 sender_wakeup(m); 1018 } 1019 break; 1020 case MUX_RESET: 1021 error = SOCK_READREST(m->socket, mh, MUX_RESETHDRSZ); 1022 if (error) 1023 goto bad; 1024 goto badproto; 1025 case MUX_WINDOW: 1026 error = SOCK_READREST(m->socket, mh, MUX_WINDOWHDRSZ); 1027 if (error) 1028 goto bad; 1029 chan = chan_get(m, mh.mh_window.id); 1030 if (chan->state == CS_ESTABLISHED || 1031 chan->state == CS_RDCLOSED) { 1032 chan->sendwin = ntohl(mh.mh_window.window); 1033 chan_unlock(chan); 1034 sender_wakeup(m); 1035 } else { 1036 chan_unlock(chan); 1037 } 1038 break; 1039 case MUX_DATA: 1040 error = SOCK_READREST(m->socket, mh, MUX_DATAHDRSZ); 1041 if (error) 1042 goto bad; 1043 chan = chan_get(m, mh.mh_data.id); 1044 len = ntohs(mh.mh_data.len); 1045 buf = chan->recvbuf; 1046 if ((chan->state != CS_ESTABLISHED && 1047 chan->state != CS_WRCLOSED) || 1048 (len > buf_avail(buf) || 1049 len > chan->recvmss)) { 1050 chan_unlock(chan); 1051 goto badproto; 1052 return (NULL); 1053 } 1054 /* 1055 * Similarly to the sender code, it's safe to 1056 * unlock the channel here. 1057 */ 1058 chan_unlock(chan); 1059 size = min(buf->size + 1 - buf->in, len); 1060 error = sock_readwait(m->socket, 1061 buf->data + buf->in, size); 1062 if (error) 1063 goto bad; 1064 if (len > size) { 1065 /* Wrapping around. */ 1066 error = sock_readwait(m->socket, 1067 buf->data, len - size); 1068 if (error) 1069 goto bad; 1070 } 1071 chan_lock(chan); 1072 buf->in += len; 1073 if (buf->in > buf->size) 1074 buf->in -= buf->size + 1; 1075 pthread_cond_signal(&chan->rdready); 1076 chan_unlock(chan); 1077 break; 1078 case MUX_CLOSE: 1079 error = SOCK_READREST(m->socket, mh, MUX_CLOSEHDRSZ); 1080 if (error) 1081 goto bad; 1082 chan = chan_get(m, mh.mh_close.id); 1083 if (chan->state == CS_ESTABLISHED) 1084 chan->state = CS_RDCLOSED; 1085 else if (chan->state == CS_WRCLOSED) 1086 chan->state = CS_CLOSED; 1087 else 1088 goto badproto; 1089 pthread_cond_signal(&chan->rdready); 1090 chan_unlock(chan); 1091 break; 1092 default: 1093 goto badproto; 1094 } 1095 } 1096bad: 1097 if (errno == ECONNRESET || errno == ECONNABORTED) 1098 mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE); 1099 else 1100 mux_shutdown(m, strerror(errno), STATUS_FAILURE); 1101 return (NULL); 1102badproto: 1103 mux_shutdown(m, "Protocol error", STATUS_FAILURE); 1104 return (NULL); 1105} 1106 1107/* 1108 * Circular buffers API. 1109 */ 1110 1111static struct buf * 1112buf_new(size_t size) 1113{ 1114 struct buf *buf; 1115 1116 buf = xmalloc(sizeof(struct buf)); 1117 buf->data = xmalloc(size + 1); 1118 buf->size = size; 1119 buf->in = 0; 1120 buf->out = 0; 1121 return (buf); 1122} 1123 1124static void 1125buf_free(struct buf *buf) 1126{ 1127 1128 free(buf->data); 1129 free(buf); 1130} 1131 1132/* Number of bytes stored in the buffer. */ 1133static size_t 1134buf_count(struct buf *buf) 1135{ 1136 size_t count; 1137 1138 if (buf->in >= buf->out) 1139 count = buf->in - buf->out; 1140 else 1141 count = buf->size + 1 + buf->in - buf->out; 1142 return (count); 1143} 1144 1145/* Number of bytes available in the buffer. */ 1146static size_t 1147buf_avail(struct buf *buf) 1148{ 1149 size_t avail; 1150 1151 if (buf->out > buf->in) 1152 avail = buf->out - buf->in - 1; 1153 else 1154 avail = buf->size + buf->out - buf->in; 1155 return (avail); 1156} 1157 1158static void 1159buf_put(struct buf *buf, const void *data, size_t size) 1160{ 1161 const char *cp; 1162 size_t len; 1163 1164 assert(size > 0); 1165 assert(buf_avail(buf) >= size); 1166 cp = data; 1167 len = buf->size + 1 - buf->in; 1168 if (len < size) { 1169 /* Wrapping around. */ 1170 memcpy(buf->data + buf->in, cp, len); 1171 memcpy(buf->data, cp + len, size - len); 1172 } else { 1173 /* Not wrapping around. */ 1174 memcpy(buf->data + buf->in, cp, size); 1175 } 1176 buf->in += size; 1177 if (buf->in > buf->size) 1178 buf->in -= buf->size + 1; 1179} 1180 1181static void 1182buf_get(struct buf *buf, void *data, size_t size) 1183{ 1184 char *cp; 1185 size_t len; 1186 1187 assert(size > 0); 1188 assert(buf_count(buf) >= size); 1189 cp = data; 1190 len = buf->size + 1 - buf->out; 1191 if (len < size) { 1192 /* Wrapping around. */ 1193 memcpy(cp, buf->data + buf->out, len); 1194 memcpy(cp + len, buf->data, size - len); 1195 } else { 1196 /* Not wrapping around. */ 1197 memcpy(cp, buf->data + buf->out, size); 1198 } 1199 buf->out += size; 1200 if (buf->out > buf->size) 1201 buf->out -= buf->size + 1; 1202} 1203