1156230Smux/*- 2156230Smux * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org> 3156230Smux * All rights reserved. 4156230Smux * 5156230Smux * Redistribution and use in source and binary forms, with or without 6156230Smux * modification, are permitted provided that the following conditions 7156230Smux * are met: 8156230Smux * 1. Redistributions of source code must retain the above copyright 9156230Smux * notice, this list of conditions and the following disclaimer. 10156230Smux * 2. Redistributions in binary form must reproduce the above copyright 11156230Smux * notice, this list of conditions and the following disclaimer in the 12156230Smux * documentation and/or other materials provided with the distribution. 13156230Smux * 14156230Smux * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15156230Smux * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16156230Smux * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17156230Smux * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18156230Smux * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19156230Smux * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20156230Smux * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21156230Smux * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22156230Smux * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23156230Smux * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24156230Smux * SUCH DAMAGE. 25156230Smux * 26156230Smux * $FreeBSD: releng/10.3/usr.bin/csup/mux.c 228992 2011-12-30 11:02:40Z uqs $ 27156230Smux */ 28156230Smux 29156230Smux#include <sys/param.h> 30156230Smux#include <sys/socket.h> 31156230Smux#include <sys/uio.h> 32156230Smux 33156230Smux#include <netinet/in.h> 34156230Smux 35156230Smux#include <assert.h> 36156230Smux#include <errno.h> 37156230Smux#include <pthread.h> 38156230Smux#include <stdarg.h> 39156230Smux#include <stdio.h> 40156230Smux#include <stdlib.h> 41156230Smux#include <string.h> 42156230Smux#include <unistd.h> 43156230Smux 44156230Smux#include "misc.h" 45156230Smux#include "mux.h" 46156230Smux 47156230Smux/* 48156230Smux * Packet types. 49156230Smux */ 50156230Smux#define MUX_STARTUPREQ 0 51156230Smux#define MUX_STARTUPREP 1 52156230Smux#define MUX_CONNECT 2 53156230Smux#define MUX_ACCEPT 3 54156230Smux#define MUX_RESET 4 55156230Smux#define MUX_DATA 5 56156230Smux#define MUX_WINDOW 6 57156230Smux#define MUX_CLOSE 7 58156230Smux 59156230Smux/* 60156230Smux * Header sizes. 61156230Smux */ 62156230Smux#define MUX_STARTUPHDRSZ 3 63156230Smux#define MUX_CONNECTHDRSZ 8 64156230Smux#define MUX_ACCEPTHDRSZ 8 65156230Smux#define MUX_RESETHDRSZ 2 66156230Smux#define MUX_DATAHDRSZ 4 67156230Smux#define MUX_WINDOWHDRSZ 6 68156230Smux#define MUX_CLOSEHDRSZ 2 69156230Smux 70156230Smux#define MUX_PROTOVER 0 /* Protocol version. */ 71156230Smux 72156230Smuxstruct mux_header { 73156230Smux uint8_t type; 74156230Smux union { 75156230Smux struct { 76156230Smux uint16_t version; 77156230Smux } __packed mh_startup; 78156230Smux struct { 79156230Smux uint8_t id; 80156230Smux uint16_t mss; 81156230Smux uint32_t window; 82156230Smux } __packed mh_connect; 83156230Smux struct { 84156230Smux uint8_t id; 85156230Smux uint16_t mss; 86156230Smux uint32_t window; 87156230Smux } __packed mh_accept; 88156230Smux struct { 89156230Smux uint8_t id; 90156230Smux } __packed mh_reset; 91156230Smux struct { 92156230Smux uint8_t id; 93156230Smux uint16_t len; 94156230Smux } __packed mh_data; 95156230Smux struct { 96156230Smux uint8_t id; 97156230Smux uint32_t window; 98156230Smux } __packed mh_window; 99156230Smux struct { 100156230Smux uint8_t id; 101156230Smux } __packed mh_close; 102156230Smux } mh_u; 103156230Smux} __packed; 104156230Smux 105156230Smux#define mh_startup mh_u.mh_startup 106156230Smux#define mh_connect mh_u.mh_connect 107156230Smux#define mh_accept mh_u.mh_accept 108156230Smux#define mh_reset mh_u.mh_reset 109156230Smux#define mh_data mh_u.mh_data 110156230Smux#define mh_window mh_u.mh_window 111156230Smux#define mh_close mh_u.mh_close 112156230Smux 113156230Smux#define MUX_MAXCHAN 2 114156230Smux 115156230Smux/* Channel states. */ 116156230Smux#define CS_UNUSED 0 117156230Smux#define CS_LISTENING 1 118156230Smux#define CS_CONNECTING 2 119156230Smux#define CS_ESTABLISHED 3 120156230Smux#define CS_RDCLOSED 4 121156230Smux#define CS_WRCLOSED 5 122156230Smux#define CS_CLOSED 6 123156230Smux 124156230Smux/* Channel flags. */ 125156230Smux#define CF_CONNECT 0x01 126156230Smux#define CF_ACCEPT 0x02 127156230Smux#define CF_RESET 0x04 128156230Smux#define CF_WINDOW 0x08 129156230Smux#define CF_DATA 0x10 130156230Smux#define CF_CLOSE 0x20 131156230Smux 132156230Smux#define CHAN_SBSIZE (16 * 1024) /* Send buffer size. */ 133156230Smux#define CHAN_RBSIZE (16 * 1024) /* Receive buffer size. */ 134156230Smux#define CHAN_MAXSEGSIZE 1024 /* Maximum segment size. */ 135156230Smux 136156230Smux/* Circular buffer. */ 137156230Smuxstruct buf { 138156230Smux uint8_t *data; 139156230Smux size_t size; 140156230Smux size_t in; 141156230Smux size_t out; 142156230Smux}; 143156230Smux 144156230Smuxstruct chan { 145156230Smux int flags; 146156230Smux int state; 147156230Smux pthread_mutex_t lock; 148156230Smux struct mux *mux; 149156230Smux 150156230Smux /* Receiver state variables. */ 151156230Smux struct buf *recvbuf; 152156230Smux pthread_cond_t rdready; 153156230Smux uint32_t recvseq; 154156230Smux uint16_t recvmss; 155156230Smux 156156230Smux /* Sender state variables. */ 157156230Smux struct buf *sendbuf; 158156230Smux pthread_cond_t wrready; 159156230Smux uint32_t sendseq; 160156230Smux uint32_t sendwin; 161156230Smux uint16_t sendmss; 162156230Smux}; 163156230Smux 164156230Smuxstruct mux { 165156230Smux int closed; 166156230Smux int status; 167156230Smux int socket; 168156230Smux pthread_mutex_t lock; 169156230Smux pthread_cond_t done; 170156230Smux struct chan *channels[MUX_MAXCHAN]; 171156230Smux int nchans; 172156230Smux 173156230Smux /* Sender thread data. */ 174156230Smux pthread_t sender; 175156230Smux pthread_cond_t sender_newwork; 176156230Smux pthread_cond_t sender_started; 177156230Smux int sender_waiting; 178156230Smux int sender_ready; 179156230Smux int sender_lastid; 180156230Smux 181156230Smux /* Receiver thread data. */ 182156230Smux pthread_t receiver; 183156230Smux}; 184156230Smux 185156230Smuxstatic int sock_writev(int, struct iovec *, int); 186156230Smuxstatic int sock_write(int, void *, size_t); 187156230Smuxstatic ssize_t sock_read(int, void *, size_t); 188156230Smuxstatic int sock_readwait(int, void *, size_t); 189156230Smux 190156230Smuxstatic int mux_init(struct mux *); 191156230Smuxstatic void mux_lock(struct mux *); 192156230Smuxstatic void mux_unlock(struct mux *); 193156230Smux 194156230Smuxstatic struct chan *chan_new(struct mux *); 195156230Smuxstatic struct chan *chan_get(struct mux *, int); 196156230Smuxstatic struct chan *chan_connect(struct mux *, int); 197156230Smuxstatic void chan_lock(struct chan *); 198156230Smuxstatic void chan_unlock(struct chan *); 199156230Smuxstatic int chan_insert(struct mux *, struct chan *); 200156230Smuxstatic void chan_free(struct chan *); 201156230Smux 202156230Smuxstatic struct buf *buf_new(size_t); 203156230Smuxstatic size_t buf_count(struct buf *); 204156230Smuxstatic size_t buf_avail(struct buf *); 205156230Smuxstatic void buf_get(struct buf *, void *, size_t); 206156230Smuxstatic void buf_put(struct buf *, const void *, size_t); 207156230Smuxstatic void buf_free(struct buf *); 208156230Smux 209156230Smuxstatic void sender_wakeup(struct mux *); 210156230Smuxstatic void *sender_loop(void *); 211156230Smuxstatic int sender_waitforwork(struct mux *, int *); 212156230Smuxstatic int sender_scan(struct mux *, int *); 213156230Smuxstatic void sender_cleanup(void *); 214156230Smux 215156230Smuxstatic void *receiver_loop(void *); 216156230Smux 217156230Smuxstatic int 218156230Smuxsock_writev(int s, struct iovec *iov, int iovcnt) 219156230Smux{ 220156230Smux ssize_t nbytes; 221156230Smux 222156230Smuxagain: 223156230Smux nbytes = writev(s, iov, iovcnt); 224156230Smux if (nbytes != -1) { 225156230Smux while (nbytes > 0 && (size_t)nbytes >= iov->iov_len) { 226156230Smux nbytes -= iov->iov_len; 227156230Smux iov++; 228156230Smux iovcnt--; 229156230Smux } 230156230Smux if (nbytes == 0) 231156230Smux return (0); 232156230Smux iov->iov_len -= nbytes; 233156230Smux iov->iov_base = (char *)iov->iov_base + nbytes; 234156230Smux } else if (errno != EINTR) { 235156230Smux return (-1); 236156230Smux } 237156230Smux goto again; 238156230Smux} 239156230Smux 240156230Smuxstatic int 241156230Smuxsock_write(int s, void *buf, size_t size) 242156230Smux{ 243156230Smux struct iovec iov; 244156230Smux int ret; 245156230Smux 246156230Smux iov.iov_base = buf; 247156230Smux iov.iov_len = size; 248156230Smux ret = sock_writev(s, &iov, 1); 249156230Smux return (ret); 250156230Smux} 251156230Smux 252156230Smuxstatic ssize_t 253156230Smuxsock_read(int s, void *buf, size_t size) 254156230Smux{ 255156230Smux ssize_t nbytes; 256156230Smux 257156230Smuxagain: 258156230Smux nbytes = read(s, buf, size); 259156230Smux if (nbytes == -1 && errno == EINTR) 260156230Smux goto again; 261156230Smux return (nbytes); 262156230Smux} 263156230Smux 264156230Smuxstatic int 265156230Smuxsock_readwait(int s, void *buf, size_t size) 266156230Smux{ 267156230Smux char *cp; 268156230Smux ssize_t nbytes; 269156230Smux size_t left; 270156230Smux 271156230Smux cp = buf; 272156230Smux left = size; 273156230Smux while (left > 0) { 274156230Smux nbytes = sock_read(s, cp, left); 275156230Smux if (nbytes == 0) { 276156230Smux errno = ECONNRESET; 277156230Smux return (-1); 278156230Smux } 279156230Smux if (nbytes < 0) 280156230Smux return (-1); 281156230Smux left -= nbytes; 282156230Smux cp += nbytes; 283156230Smux } 284156230Smux return (0); 285156230Smux} 286156230Smux 287156230Smuxstatic void 288156230Smuxmux_lock(struct mux *m) 289156230Smux{ 290156230Smux int error; 291156230Smux 292156230Smux error = pthread_mutex_lock(&m->lock); 293156230Smux assert(!error); 294156230Smux} 295156230Smux 296156230Smuxstatic void 297156230Smuxmux_unlock(struct mux *m) 298156230Smux{ 299156230Smux int error; 300156230Smux 301156230Smux error = pthread_mutex_unlock(&m->lock); 302156230Smux assert(!error); 303156230Smux} 304156230Smux 305156230Smux/* Create a TCP multiplexer on the given socket. */ 306156230Smuxstruct mux * 307156230Smuxmux_open(int sock, struct chan **chan) 308156230Smux{ 309156230Smux struct mux *m; 310156230Smux struct chan *chan0; 311156230Smux int error; 312156230Smux 313156230Smux m = xmalloc(sizeof(struct mux)); 314156230Smux memset(m->channels, 0, sizeof(m->channels)); 315156230Smux m->nchans = 0; 316156230Smux m->closed = 0; 317156230Smux m->status = -1; 318156230Smux m->socket = sock; 319156230Smux 320156230Smux m->sender_waiting = 0; 321156230Smux m->sender_lastid = 0; 322156230Smux m->sender_ready = 0; 323156230Smux pthread_mutex_init(&m->lock, NULL); 324156230Smux pthread_cond_init(&m->done, NULL); 325156230Smux pthread_cond_init(&m->sender_newwork, NULL); 326156230Smux pthread_cond_init(&m->sender_started, NULL); 327156230Smux 328156230Smux error = mux_init(m); 329156230Smux if (error) 330156230Smux goto bad; 331156230Smux chan0 = chan_connect(m, 0); 332156230Smux if (chan0 == NULL) 333156230Smux goto bad; 334156230Smux *chan = chan0; 335156230Smux return (m); 336156230Smuxbad: 337156230Smux mux_shutdown(m, NULL, STATUS_FAILURE); 338156230Smux (void)mux_close(m); 339156230Smux return (NULL); 340156230Smux} 341156230Smux 342156230Smuxint 343156230Smuxmux_close(struct mux *m) 344156230Smux{ 345156230Smux struct chan *chan; 346156230Smux int i, status; 347156230Smux 348156230Smux assert(m->closed); 349156230Smux for (i = 0; i < m->nchans; i++) { 350156230Smux chan = m->channels[i]; 351156230Smux if (chan != NULL) 352156230Smux chan_free(chan); 353156230Smux } 354156230Smux pthread_cond_destroy(&m->sender_started); 355156230Smux pthread_cond_destroy(&m->sender_newwork); 356156230Smux pthread_cond_destroy(&m->done); 357156230Smux pthread_mutex_destroy(&m->lock); 358156230Smux status = m->status; 359156230Smux free(m); 360156230Smux return (status); 361156230Smux} 362156230Smux 363156230Smux/* Close a channel. */ 364156230Smuxint 365156230Smuxchan_close(struct chan *chan) 366156230Smux{ 367156230Smux 368156230Smux chan_lock(chan); 369156230Smux if (chan->state == CS_ESTABLISHED) { 370156230Smux chan->state = CS_WRCLOSED; 371156230Smux chan->flags |= CF_CLOSE; 372156230Smux } else if (chan->state == CS_RDCLOSED) { 373156230Smux chan->state = CS_CLOSED; 374156230Smux chan->flags |= CF_CLOSE; 375156230Smux } else if (chan->state == CS_WRCLOSED || chan->state == CS_CLOSED) { 376156230Smux chan_unlock(chan); 377156230Smux return (0); 378156230Smux } else { 379156230Smux chan_unlock(chan); 380156230Smux return (-1); 381156230Smux } 382156230Smux chan_unlock(chan); 383156230Smux sender_wakeup(chan->mux); 384156230Smux return (0); 385156230Smux} 386156230Smux 387156230Smuxvoid 388156230Smuxchan_wait(struct chan *chan) 389156230Smux{ 390156230Smux 391156230Smux chan_lock(chan); 392156230Smux while (chan->state != CS_CLOSED) 393156230Smux pthread_cond_wait(&chan->rdready, &chan->lock); 394156230Smux chan_unlock(chan); 395156230Smux} 396156230Smux 397156230Smux/* Returns the ID of an available channel in the listening state. */ 398156230Smuxint 399156230Smuxchan_listen(struct mux *m) 400156230Smux{ 401156230Smux struct chan *chan; 402156230Smux int i; 403156230Smux 404156230Smux mux_lock(m); 405156230Smux for (i = 0; i < m->nchans; i++) { 406156230Smux chan = m->channels[i]; 407156230Smux chan_lock(chan); 408156230Smux if (chan->state == CS_UNUSED) { 409156230Smux mux_unlock(m); 410156230Smux chan->state = CS_LISTENING; 411156230Smux chan_unlock(chan); 412156230Smux return (i); 413156230Smux } 414156230Smux chan_unlock(chan); 415156230Smux } 416156230Smux mux_unlock(m); 417156230Smux chan = chan_new(m); 418156230Smux chan->state = CS_LISTENING; 419156230Smux i = chan_insert(m, chan); 420156230Smux if (i == -1) 421156230Smux chan_free(chan); 422156230Smux return (i); 423156230Smux} 424156230Smux 425156230Smuxstruct chan * 426156230Smuxchan_accept(struct mux *m, int id) 427156230Smux{ 428156230Smux struct chan *chan; 429156230Smux 430156230Smux chan = chan_get(m, id); 431156230Smux while (chan->state == CS_LISTENING) 432156230Smux pthread_cond_wait(&chan->rdready, &chan->lock); 433156230Smux if (chan->state != CS_ESTABLISHED) { 434156230Smux errno = ECONNRESET; 435156230Smux chan_unlock(chan); 436156230Smux return (NULL); 437156230Smux } 438156230Smux chan_unlock(chan); 439156230Smux return (chan); 440156230Smux} 441156230Smux 442156230Smux/* Read bytes from a channel. */ 443156230Smuxssize_t 444156230Smuxchan_read(struct chan *chan, void *buf, size_t size) 445156230Smux{ 446156230Smux char *cp; 447156230Smux size_t count, n; 448156230Smux 449156230Smux cp = buf; 450156230Smux chan_lock(chan); 451156230Smux for (;;) { 452156230Smux if (chan->state == CS_RDCLOSED || chan->state == CS_CLOSED) { 453156230Smux chan_unlock(chan); 454156230Smux return (0); 455156230Smux } 456156230Smux if (chan->state != CS_ESTABLISHED && 457156230Smux chan->state != CS_WRCLOSED) { 458156230Smux chan_unlock(chan); 459156230Smux errno = EBADF; 460156230Smux return (-1); 461156230Smux } 462156230Smux count = buf_count(chan->recvbuf); 463156230Smux if (count > 0) 464156230Smux break; 465156230Smux pthread_cond_wait(&chan->rdready, &chan->lock); 466156230Smux } 467156230Smux n = min(count, size); 468156230Smux buf_get(chan->recvbuf, cp, n); 469156230Smux chan->recvseq += n; 470156230Smux chan->flags |= CF_WINDOW; 471156230Smux chan_unlock(chan); 472156230Smux /* We need to wake up the sender so that it sends a window update. */ 473156230Smux sender_wakeup(chan->mux); 474156230Smux return (n); 475156230Smux} 476156230Smux 477156230Smux/* Write bytes to a channel. */ 478156230Smuxssize_t 479156230Smuxchan_write(struct chan *chan, const void *buf, size_t size) 480156230Smux{ 481156230Smux const char *cp; 482156230Smux size_t avail, n, pos; 483156230Smux 484156230Smux pos = 0; 485156230Smux cp = buf; 486156230Smux chan_lock(chan); 487156230Smux while (pos < size) { 488156230Smux for (;;) { 489156230Smux if (chan->state != CS_ESTABLISHED && 490156230Smux chan->state != CS_RDCLOSED) { 491156230Smux chan_unlock(chan); 492156230Smux errno = EPIPE; 493156230Smux return (-1); 494156230Smux } 495156230Smux avail = buf_avail(chan->sendbuf); 496156230Smux if (avail > 0) 497156230Smux break; 498156230Smux pthread_cond_wait(&chan->wrready, &chan->lock); 499156230Smux } 500156230Smux n = min(avail, size - pos); 501156230Smux buf_put(chan->sendbuf, cp + pos, n); 502156230Smux pos += n; 503156230Smux } 504156230Smux chan_unlock(chan); 505156230Smux sender_wakeup(chan->mux); 506156230Smux return (size); 507156230Smux} 508156230Smux 509156230Smux/* 510156230Smux * Internal channel API. 511156230Smux */ 512156230Smux 513156230Smuxstatic struct chan * 514156230Smuxchan_connect(struct mux *m, int id) 515156230Smux{ 516156230Smux struct chan *chan; 517156230Smux 518156230Smux chan = chan_get(m, id); 519156230Smux if (chan->state != CS_UNUSED) { 520156230Smux chan_unlock(chan); 521156230Smux return (NULL); 522156230Smux } 523156230Smux chan->state = CS_CONNECTING; 524156230Smux chan->flags |= CF_CONNECT; 525156230Smux chan_unlock(chan); 526156230Smux sender_wakeup(m); 527156230Smux chan_lock(chan); 528156230Smux while (chan->state == CS_CONNECTING) 529156230Smux pthread_cond_wait(&chan->wrready, &chan->lock); 530156230Smux if (chan->state != CS_ESTABLISHED) { 531156230Smux chan_unlock(chan); 532156230Smux return (NULL); 533156230Smux } 534156230Smux chan_unlock(chan); 535156230Smux return (chan); 536156230Smux} 537156230Smux 538156230Smux/* 539156230Smux * Get a channel from its ID, creating it if necessary. 540156230Smux * The channel is returned locked. 541156230Smux */ 542156230Smuxstatic struct chan * 543156230Smuxchan_get(struct mux *m, int id) 544156230Smux{ 545156230Smux struct chan *chan; 546156230Smux 547156230Smux assert(id < MUX_MAXCHAN); 548156230Smux mux_lock(m); 549156230Smux chan = m->channels[id]; 550156230Smux if (chan == NULL) { 551156230Smux chan = chan_new(m); 552156230Smux m->channels[id] = chan; 553156230Smux m->nchans++; 554156230Smux } 555156230Smux chan_lock(chan); 556156230Smux mux_unlock(m); 557156230Smux return (chan); 558156230Smux} 559156230Smux 560156230Smux/* Lock a channel. */ 561156230Smuxstatic void 562156230Smuxchan_lock(struct chan *chan) 563156230Smux{ 564156230Smux int error; 565156230Smux 566156230Smux error = pthread_mutex_lock(&chan->lock); 567156230Smux assert(!error); 568156230Smux} 569156230Smux 570156230Smux/* Unlock a channel. */ 571156230Smuxstatic void 572156230Smuxchan_unlock(struct chan *chan) 573156230Smux{ 574156230Smux int error; 575156230Smux 576156230Smux error = pthread_mutex_unlock(&chan->lock); 577156230Smux assert(!error); 578156230Smux} 579156230Smux 580156230Smux/* 581156230Smux * Create a new channel. 582156230Smux */ 583156230Smuxstatic struct chan * 584156230Smuxchan_new(struct mux *m) 585156230Smux{ 586156230Smux struct chan *chan; 587156230Smux 588156230Smux chan = xmalloc(sizeof(struct chan)); 589156230Smux chan->state = CS_UNUSED; 590156230Smux chan->flags = 0; 591156230Smux chan->mux = m; 592156230Smux chan->sendbuf = buf_new(CHAN_SBSIZE); 593156230Smux chan->sendseq = 0; 594156230Smux chan->sendwin = 0; 595156230Smux chan->sendmss = 0; 596156230Smux chan->recvbuf = buf_new(CHAN_RBSIZE); 597156230Smux chan->recvseq = 0; 598156230Smux chan->recvmss = CHAN_MAXSEGSIZE; 599156230Smux pthread_mutex_init(&chan->lock, NULL); 600156230Smux pthread_cond_init(&chan->rdready, NULL); 601156230Smux pthread_cond_init(&chan->wrready, NULL); 602156230Smux return (chan); 603156230Smux} 604156230Smux 605156230Smux/* Free any resources associated with a channel. */ 606156230Smuxstatic void 607156230Smuxchan_free(struct chan *chan) 608156230Smux{ 609156230Smux 610156230Smux pthread_cond_destroy(&chan->rdready); 611156230Smux pthread_cond_destroy(&chan->wrready); 612156230Smux pthread_mutex_destroy(&chan->lock); 613156230Smux buf_free(chan->recvbuf); 614156230Smux buf_free(chan->sendbuf); 615156230Smux free(chan); 616156230Smux} 617156230Smux 618156230Smux/* Insert the new channel in the channel list. */ 619156230Smuxstatic int 620156230Smuxchan_insert(struct mux *m, struct chan *chan) 621156230Smux{ 622156230Smux int i; 623156230Smux 624156230Smux mux_lock(m); 625156230Smux for (i = 0; i < MUX_MAXCHAN; i++) { 626156230Smux if (m->channels[i] == NULL) { 627156230Smux m->channels[i] = chan; 628156230Smux m->nchans++; 629156230Smux mux_unlock(m); 630156230Smux return (i); 631156230Smux } 632156230Smux } 633156230Smux errno = ENOBUFS; 634156230Smux return (-1); 635156230Smux} 636156230Smux 637156230Smux/* 638156230Smux * Initialize the multiplexer protocol. 639156230Smux * 640156230Smux * This means negotiating protocol version and starting 641156230Smux * the receiver and sender threads. 642156230Smux */ 643156230Smuxstatic int 644156230Smuxmux_init(struct mux *m) 645156230Smux{ 646156230Smux struct mux_header mh; 647156230Smux int error; 648156230Smux 649156230Smux mh.type = MUX_STARTUPREQ; 650156230Smux mh.mh_startup.version = htons(MUX_PROTOVER); 651156230Smux error = sock_write(m->socket, &mh, MUX_STARTUPHDRSZ); 652156230Smux if (error) 653156230Smux return (-1); 654156230Smux error = sock_readwait(m->socket, &mh, MUX_STARTUPHDRSZ); 655156230Smux if (error) 656156230Smux return (-1); 657156230Smux if (mh.type != MUX_STARTUPREP || 658156230Smux ntohs(mh.mh_startup.version) != MUX_PROTOVER) 659156230Smux return (-1); 660156230Smux mux_lock(m); 661156230Smux error = pthread_create(&m->sender, NULL, sender_loop, m); 662156230Smux if (error) { 663156230Smux mux_unlock(m); 664156230Smux return (-1); 665156230Smux } 666156230Smux /* 667156230Smux * Make sure the sender thread has run and is waiting for new work 668156230Smux * before going on. Otherwise, it might lose the race and a 669156230Smux * request, which will cause a deadlock. 670156230Smux */ 671156230Smux while (!m->sender_ready) 672156230Smux pthread_cond_wait(&m->sender_started, &m->lock); 673156230Smux 674156230Smux mux_unlock(m); 675156230Smux error = pthread_create(&m->receiver, NULL, receiver_loop, m); 676156230Smux if (error) 677156230Smux return (-1); 678156230Smux return (0); 679156230Smux} 680156230Smux 681156230Smux/* 682156230Smux * Close all the channels, terminate the sender and receiver thread. 683228992Suqs * This is an important function because it is used every time we need 684156230Smux * to wake up all the worker threads to abort the program. 685156230Smux * 686156230Smux * This function accepts an error message that will be printed if the 687156230Smux * multiplexer wasn't already closed. This is useful because it ensures 688156230Smux * that only the first error message will be printed, and that it will 689156230Smux * be printed before doing the actual shutdown work. If this is a 690156230Smux * normal shutdown, NULL can be passed instead. 691156230Smux * 692156230Smux * The "status" parameter of the first mux_shutdown() call is retained 693156230Smux * and then returned by mux_close(), so that the main thread can know 694156230Smux * what type of error happened in the end, if any. 695156230Smux */ 696156230Smuxvoid 697156230Smuxmux_shutdown(struct mux *m, const char *errmsg, int status) 698156230Smux{ 699156230Smux pthread_t self, sender, receiver; 700156230Smux struct chan *chan; 701156230Smux const char *name; 702156230Smux void *val; 703156230Smux int i, ret; 704156230Smux 705156230Smux mux_lock(m); 706156230Smux if (m->closed) { 707156230Smux mux_unlock(m); 708156230Smux return; 709156230Smux } 710156230Smux m->closed = 1; 711156230Smux m->status = status; 712156230Smux self = pthread_self(); 713156230Smux sender = m->sender; 714156230Smux receiver = m->receiver; 715156230Smux if (errmsg != NULL) { 716156230Smux if (pthread_equal(self, receiver)) 717156230Smux name = "Receiver"; 718156230Smux else if (pthread_equal(self, sender)) 719156230Smux name = "Sender"; 720156230Smux else 721156230Smux name = NULL; 722156230Smux if (name == NULL) 723156230Smux lprintf(-1, "%s\n", errmsg); 724156230Smux else 725156230Smux lprintf(-1, "%s: %s\n", name, errmsg); 726156230Smux } 727156230Smux 728156230Smux for (i = 0; i < MUX_MAXCHAN; i++) { 729156230Smux if (m->channels[i] != NULL) { 730156230Smux chan = m->channels[i]; 731156230Smux chan_lock(chan); 732156230Smux if (chan->state != CS_UNUSED) { 733156230Smux chan->state = CS_CLOSED; 734156230Smux chan->flags = 0; 735156230Smux pthread_cond_broadcast(&chan->rdready); 736156230Smux pthread_cond_broadcast(&chan->wrready); 737156230Smux } 738156230Smux chan_unlock(chan); 739156230Smux } 740156230Smux } 741156230Smux mux_unlock(m); 742156230Smux 743156230Smux if (!pthread_equal(self, receiver)) { 744156230Smux ret = pthread_cancel(receiver); 745156230Smux assert(!ret); 746156230Smux pthread_join(receiver, &val); 747156230Smux assert(val == PTHREAD_CANCELED); 748156230Smux } 749156230Smux if (!pthread_equal(self, sender)) { 750156230Smux ret = pthread_cancel(sender); 751156230Smux assert(!ret); 752156230Smux pthread_join(sender, &val); 753156230Smux assert(val == PTHREAD_CANCELED); 754156230Smux } 755156230Smux} 756156230Smux 757156230Smuxstatic void 758156230Smuxsender_wakeup(struct mux *m) 759156230Smux{ 760156230Smux int waiting; 761156230Smux 762156230Smux mux_lock(m); 763156230Smux waiting = m->sender_waiting; 764156230Smux mux_unlock(m); 765156230Smux /* 766156230Smux * We don't care about the race here: if the sender was 767156230Smux * waiting and is not anymore, we'll just send a useless 768156230Smux * signal; if he wasn't waiting then he won't go to sleep 769156230Smux * before having sent what we want him to. 770156230Smux */ 771156230Smux if (waiting) 772156230Smux pthread_cond_signal(&m->sender_newwork); 773156230Smux} 774156230Smux 775156230Smuxstatic void * 776156230Smuxsender_loop(void *arg) 777156230Smux{ 778156230Smux struct iovec iov[3]; 779156230Smux struct mux_header mh; 780156230Smux struct mux *m; 781156230Smux struct chan *chan; 782156230Smux struct buf *buf; 783156230Smux uint32_t winsize; 784156230Smux uint16_t hdrsize, size, len; 785173715Sjb int error, id, iovcnt, what = 0; 786156230Smux 787156230Smux m = (struct mux *)arg; 788186781Slulf what = 0; 789156230Smuxagain: 790156230Smux id = sender_waitforwork(m, &what); 791156230Smux chan = chan_get(m, id); 792156230Smux hdrsize = size = 0; 793156230Smux switch (what) { 794156230Smux case CF_CONNECT: 795156230Smux mh.type = MUX_CONNECT; 796156230Smux mh.mh_connect.id = id; 797156230Smux mh.mh_connect.mss = htons(chan->recvmss); 798156230Smux mh.mh_connect.window = htonl(chan->recvseq + 799156230Smux chan->recvbuf->size); 800156230Smux hdrsize = MUX_CONNECTHDRSZ; 801156230Smux break; 802156230Smux case CF_ACCEPT: 803156230Smux mh.type = MUX_ACCEPT; 804156230Smux mh.mh_accept.id = id; 805156230Smux mh.mh_accept.mss = htons(chan->recvmss); 806156230Smux mh.mh_accept.window = htonl(chan->recvseq + 807156230Smux chan->recvbuf->size); 808156230Smux hdrsize = MUX_ACCEPTHDRSZ; 809156230Smux break; 810156230Smux case CF_RESET: 811156230Smux mh.type = MUX_RESET; 812156230Smux mh.mh_reset.id = id; 813156230Smux hdrsize = MUX_RESETHDRSZ; 814156230Smux break; 815156230Smux case CF_WINDOW: 816156230Smux mh.type = MUX_WINDOW; 817156230Smux mh.mh_window.id = id; 818156230Smux mh.mh_window.window = htonl(chan->recvseq + 819156230Smux chan->recvbuf->size); 820156230Smux hdrsize = MUX_WINDOWHDRSZ; 821156230Smux break; 822156230Smux case CF_DATA: 823156230Smux mh.type = MUX_DATA; 824156230Smux mh.mh_data.id = id; 825156230Smux size = min(buf_count(chan->sendbuf), chan->sendmss); 826156230Smux winsize = chan->sendwin - chan->sendseq; 827156230Smux if (winsize < size) 828156230Smux size = winsize; 829156230Smux mh.mh_data.len = htons(size); 830156230Smux hdrsize = MUX_DATAHDRSZ; 831156230Smux break; 832156230Smux case CF_CLOSE: 833156230Smux mh.type = MUX_CLOSE; 834156230Smux mh.mh_close.id = id; 835156230Smux hdrsize = MUX_CLOSEHDRSZ; 836156230Smux break; 837156230Smux } 838156230Smux if (size > 0) { 839156230Smux assert(mh.type == MUX_DATA); 840156230Smux /* 841156230Smux * Older FreeBSD versions (and maybe other OSes) have the 842156230Smux * iov_base field defined as char *. Cast to char * to 843156230Smux * silence a warning in this case. 844156230Smux */ 845156230Smux iov[0].iov_base = (char *)&mh; 846156230Smux iov[0].iov_len = hdrsize; 847156230Smux iovcnt = 1; 848156230Smux /* We access the buffer directly to avoid some copying. */ 849156230Smux buf = chan->sendbuf; 850156230Smux len = min(size, buf->size + 1 - buf->out); 851156230Smux iov[iovcnt].iov_base = buf->data + buf->out; 852156230Smux iov[iovcnt].iov_len = len; 853156230Smux iovcnt++; 854156230Smux if (size > len) { 855156230Smux /* Wrapping around. */ 856156230Smux iov[iovcnt].iov_base = buf->data; 857156230Smux iov[iovcnt].iov_len = size - len; 858156230Smux iovcnt++; 859156230Smux } 860156230Smux /* 861156230Smux * Since we're the only thread sending bytes from the 862156230Smux * buffer and modifying buf->out, it's safe to unlock 863156230Smux * here during I/O. It avoids keeping the channel lock 864156230Smux * too long, since write() might block. 865156230Smux */ 866156230Smux chan_unlock(chan); 867156230Smux error = sock_writev(m->socket, iov, iovcnt); 868156230Smux if (error) 869156230Smux goto bad; 870156230Smux chan_lock(chan); 871156230Smux chan->sendseq += size; 872156230Smux buf->out += size; 873156230Smux if (buf->out > buf->size) 874156230Smux buf->out -= buf->size + 1; 875156230Smux pthread_cond_signal(&chan->wrready); 876156230Smux chan_unlock(chan); 877156230Smux } else { 878156230Smux chan_unlock(chan); 879156230Smux error = sock_write(m->socket, &mh, hdrsize); 880156230Smux if (error) 881156230Smux goto bad; 882156230Smux } 883156230Smux goto again; 884156230Smuxbad: 885156230Smux if (error == EPIPE) 886156230Smux mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE); 887156230Smux else 888156230Smux mux_shutdown(m, strerror(errno), STATUS_FAILURE); 889156230Smux return (NULL); 890156230Smux} 891156230Smux 892156230Smuxstatic void 893156230Smuxsender_cleanup(void *arg) 894156230Smux{ 895156230Smux struct mux *m; 896156230Smux 897156230Smux m = (struct mux *)arg; 898156230Smux mux_unlock(m); 899156230Smux} 900156230Smux 901156230Smuxstatic int 902156230Smuxsender_waitforwork(struct mux *m, int *what) 903156230Smux{ 904156230Smux int id; 905156230Smux 906156230Smux mux_lock(m); 907156230Smux pthread_cleanup_push(sender_cleanup, m); 908156230Smux if (!m->sender_ready) { 909156230Smux pthread_cond_signal(&m->sender_started); 910156230Smux m->sender_ready = 1; 911156230Smux } 912156230Smux while ((id = sender_scan(m, what)) == -1) { 913156230Smux m->sender_waiting = 1; 914156230Smux pthread_cond_wait(&m->sender_newwork, &m->lock); 915156230Smux } 916156230Smux m->sender_waiting = 0; 917156230Smux pthread_cleanup_pop(1); 918156230Smux return (id); 919156230Smux} 920156230Smux 921156230Smux/* 922156230Smux * Scan for work to do for the sender. Has to be called with 923156230Smux * the multiplexer lock held. 924156230Smux */ 925156230Smuxstatic int 926156230Smuxsender_scan(struct mux *m, int *what) 927156230Smux{ 928156230Smux struct chan *chan; 929156230Smux int id; 930156230Smux 931156230Smux if (m->nchans <= 0) 932156230Smux return (-1); 933156230Smux id = m->sender_lastid; 934156230Smux do { 935156230Smux id++; 936156230Smux if (id >= m->nchans) 937156230Smux id = 0; 938156230Smux chan = m->channels[id]; 939156230Smux chan_lock(chan); 940156230Smux if (chan->state != CS_UNUSED) { 941156230Smux if (chan->sendseq != chan->sendwin && 942156230Smux buf_count(chan->sendbuf) > 0) 943156230Smux chan->flags |= CF_DATA; 944156230Smux if (chan->flags) { 945156230Smux /* By order of importance. */ 946156230Smux if (chan->flags & CF_CONNECT) 947156230Smux *what = CF_CONNECT; 948156230Smux else if (chan->flags & CF_ACCEPT) 949156230Smux *what = CF_ACCEPT; 950156230Smux else if (chan->flags & CF_RESET) 951156230Smux *what = CF_RESET; 952156230Smux else if (chan->flags & CF_WINDOW) 953156230Smux *what = CF_WINDOW; 954156230Smux else if (chan->flags & CF_DATA) 955156230Smux *what = CF_DATA; 956156230Smux else if (chan->flags & CF_CLOSE) 957156230Smux *what = CF_CLOSE; 958156230Smux chan->flags &= ~*what; 959156230Smux chan_unlock(chan); 960156230Smux m->sender_lastid = id; 961156230Smux return (id); 962156230Smux } 963156230Smux } 964156230Smux chan_unlock(chan); 965156230Smux } while (id != m->sender_lastid); 966156230Smux return (-1); 967156230Smux} 968156230Smux 969156230Smux/* Read the rest of a packet header depending on its type. */ 970156230Smux#define SOCK_READREST(s, mh, hsize) \ 971156230Smux sock_readwait(s, (char *)&mh + sizeof(mh.type), (hsize) - sizeof(mh.type)) 972156230Smux 973156230Smuxvoid * 974156230Smuxreceiver_loop(void *arg) 975156230Smux{ 976156230Smux struct mux_header mh; 977156230Smux struct mux *m; 978156230Smux struct chan *chan; 979156230Smux struct buf *buf; 980156230Smux uint16_t size, len; 981156230Smux int error; 982156230Smux 983156230Smux m = (struct mux *)arg; 984156230Smux while ((error = sock_readwait(m->socket, &mh.type, 985156230Smux sizeof(mh.type))) == 0) { 986156230Smux switch (mh.type) { 987156230Smux case MUX_CONNECT: 988156230Smux error = SOCK_READREST(m->socket, mh, MUX_CONNECTHDRSZ); 989156230Smux if (error) 990156230Smux goto bad; 991156230Smux chan = chan_get(m, mh.mh_connect.id); 992156230Smux if (chan->state == CS_LISTENING) { 993156230Smux chan->state = CS_ESTABLISHED; 994156230Smux chan->sendmss = ntohs(mh.mh_connect.mss); 995156230Smux chan->sendwin = ntohl(mh.mh_connect.window); 996156230Smux chan->flags |= CF_ACCEPT; 997156230Smux pthread_cond_signal(&chan->rdready); 998156230Smux } else 999156230Smux chan->flags |= CF_RESET; 1000156230Smux chan_unlock(chan); 1001156230Smux sender_wakeup(m); 1002156230Smux break; 1003156230Smux case MUX_ACCEPT: 1004156230Smux error = SOCK_READREST(m->socket, mh, MUX_ACCEPTHDRSZ); 1005156230Smux if (error) 1006156230Smux goto bad; 1007156230Smux chan = chan_get(m, mh.mh_accept.id); 1008156230Smux if (chan->state == CS_CONNECTING) { 1009156230Smux chan->sendmss = ntohs(mh.mh_accept.mss); 1010156230Smux chan->sendwin = ntohl(mh.mh_accept.window); 1011156230Smux chan->state = CS_ESTABLISHED; 1012156230Smux pthread_cond_signal(&chan->wrready); 1013156230Smux chan_unlock(chan); 1014156230Smux } else { 1015156230Smux chan->flags |= CF_RESET; 1016156230Smux chan_unlock(chan); 1017156230Smux sender_wakeup(m); 1018156230Smux } 1019156230Smux break; 1020156230Smux case MUX_RESET: 1021156230Smux error = SOCK_READREST(m->socket, mh, MUX_RESETHDRSZ); 1022156230Smux if (error) 1023156230Smux goto bad; 1024156230Smux goto badproto; 1025156230Smux case MUX_WINDOW: 1026156230Smux error = SOCK_READREST(m->socket, mh, MUX_WINDOWHDRSZ); 1027156230Smux if (error) 1028156230Smux goto bad; 1029156230Smux chan = chan_get(m, mh.mh_window.id); 1030156230Smux if (chan->state == CS_ESTABLISHED || 1031156230Smux chan->state == CS_RDCLOSED) { 1032156230Smux chan->sendwin = ntohl(mh.mh_window.window); 1033156230Smux chan_unlock(chan); 1034156230Smux sender_wakeup(m); 1035156230Smux } else { 1036156230Smux chan_unlock(chan); 1037156230Smux } 1038156230Smux break; 1039156230Smux case MUX_DATA: 1040156230Smux error = SOCK_READREST(m->socket, mh, MUX_DATAHDRSZ); 1041156230Smux if (error) 1042156230Smux goto bad; 1043156230Smux chan = chan_get(m, mh.mh_data.id); 1044156230Smux len = ntohs(mh.mh_data.len); 1045156230Smux buf = chan->recvbuf; 1046156230Smux if ((chan->state != CS_ESTABLISHED && 1047156230Smux chan->state != CS_WRCLOSED) || 1048156230Smux (len > buf_avail(buf) || 1049156230Smux len > chan->recvmss)) { 1050156230Smux chan_unlock(chan); 1051156230Smux goto badproto; 1052156230Smux return (NULL); 1053156230Smux } 1054156230Smux /* 1055156230Smux * Similarly to the sender code, it's safe to 1056156230Smux * unlock the channel here. 1057156230Smux */ 1058156230Smux chan_unlock(chan); 1059156230Smux size = min(buf->size + 1 - buf->in, len); 1060156230Smux error = sock_readwait(m->socket, 1061156230Smux buf->data + buf->in, size); 1062156230Smux if (error) 1063156230Smux goto bad; 1064156230Smux if (len > size) { 1065156230Smux /* Wrapping around. */ 1066156230Smux error = sock_readwait(m->socket, 1067156230Smux buf->data, len - size); 1068156230Smux if (error) 1069156230Smux goto bad; 1070156230Smux } 1071156230Smux chan_lock(chan); 1072156230Smux buf->in += len; 1073156230Smux if (buf->in > buf->size) 1074156230Smux buf->in -= buf->size + 1; 1075156230Smux pthread_cond_signal(&chan->rdready); 1076156230Smux chan_unlock(chan); 1077156230Smux break; 1078156230Smux case MUX_CLOSE: 1079156230Smux error = SOCK_READREST(m->socket, mh, MUX_CLOSEHDRSZ); 1080156230Smux if (error) 1081156230Smux goto bad; 1082156230Smux chan = chan_get(m, mh.mh_close.id); 1083156230Smux if (chan->state == CS_ESTABLISHED) 1084156230Smux chan->state = CS_RDCLOSED; 1085156230Smux else if (chan->state == CS_WRCLOSED) 1086156230Smux chan->state = CS_CLOSED; 1087156230Smux else 1088156230Smux goto badproto; 1089156230Smux pthread_cond_signal(&chan->rdready); 1090156230Smux chan_unlock(chan); 1091156230Smux break; 1092156230Smux default: 1093156230Smux goto badproto; 1094156230Smux } 1095156230Smux } 1096156230Smuxbad: 1097156230Smux if (errno == ECONNRESET || errno == ECONNABORTED) 1098156230Smux mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE); 1099156230Smux else 1100156230Smux mux_shutdown(m, strerror(errno), STATUS_FAILURE); 1101156230Smux return (NULL); 1102156230Smuxbadproto: 1103156230Smux mux_shutdown(m, "Protocol error", STATUS_FAILURE); 1104156230Smux return (NULL); 1105156230Smux} 1106156230Smux 1107156230Smux/* 1108156230Smux * Circular buffers API. 1109156230Smux */ 1110156230Smux 1111156230Smuxstatic struct buf * 1112156230Smuxbuf_new(size_t size) 1113156230Smux{ 1114156230Smux struct buf *buf; 1115156230Smux 1116156230Smux buf = xmalloc(sizeof(struct buf)); 1117156230Smux buf->data = xmalloc(size + 1); 1118156230Smux buf->size = size; 1119156230Smux buf->in = 0; 1120156230Smux buf->out = 0; 1121156230Smux return (buf); 1122156230Smux} 1123156230Smux 1124156230Smuxstatic void 1125156230Smuxbuf_free(struct buf *buf) 1126156230Smux{ 1127156230Smux 1128156230Smux free(buf->data); 1129156230Smux free(buf); 1130156230Smux} 1131156230Smux 1132156230Smux/* Number of bytes stored in the buffer. */ 1133156230Smuxstatic size_t 1134156230Smuxbuf_count(struct buf *buf) 1135156230Smux{ 1136156230Smux size_t count; 1137156230Smux 1138156230Smux if (buf->in >= buf->out) 1139156230Smux count = buf->in - buf->out; 1140156230Smux else 1141156230Smux count = buf->size + 1 + buf->in - buf->out; 1142156230Smux return (count); 1143156230Smux} 1144156230Smux 1145156230Smux/* Number of bytes available in the buffer. */ 1146156230Smuxstatic size_t 1147156230Smuxbuf_avail(struct buf *buf) 1148156230Smux{ 1149156230Smux size_t avail; 1150156230Smux 1151156230Smux if (buf->out > buf->in) 1152156230Smux avail = buf->out - buf->in - 1; 1153156230Smux else 1154156230Smux avail = buf->size + buf->out - buf->in; 1155156230Smux return (avail); 1156156230Smux} 1157156230Smux 1158156230Smuxstatic void 1159156230Smuxbuf_put(struct buf *buf, const void *data, size_t size) 1160156230Smux{ 1161156230Smux const char *cp; 1162156230Smux size_t len; 1163156230Smux 1164156230Smux assert(size > 0); 1165156230Smux assert(buf_avail(buf) >= size); 1166156230Smux cp = data; 1167156230Smux len = buf->size + 1 - buf->in; 1168156230Smux if (len < size) { 1169156230Smux /* Wrapping around. */ 1170156230Smux memcpy(buf->data + buf->in, cp, len); 1171156230Smux memcpy(buf->data, cp + len, size - len); 1172156230Smux } else { 1173156230Smux /* Not wrapping around. */ 1174156230Smux memcpy(buf->data + buf->in, cp, size); 1175156230Smux } 1176156230Smux buf->in += size; 1177156230Smux if (buf->in > buf->size) 1178156230Smux buf->in -= buf->size + 1; 1179156230Smux} 1180156230Smux 1181156230Smuxstatic void 1182156230Smuxbuf_get(struct buf *buf, void *data, size_t size) 1183156230Smux{ 1184156230Smux char *cp; 1185156230Smux size_t len; 1186156230Smux 1187156230Smux assert(size > 0); 1188156230Smux assert(buf_count(buf) >= size); 1189156230Smux cp = data; 1190156230Smux len = buf->size + 1 - buf->out; 1191156230Smux if (len < size) { 1192156230Smux /* Wrapping around. */ 1193156230Smux memcpy(cp, buf->data + buf->out, len); 1194156230Smux memcpy(cp + len, buf->data, size - len); 1195156230Smux } else { 1196156230Smux /* Not wrapping around. */ 1197156230Smux memcpy(cp, buf->data + buf->out, size); 1198156230Smux } 1199156230Smux buf->out += size; 1200156230Smux if (buf->out > buf->size) 1201156230Smux buf->out -= buf->size + 1; 1202156230Smux} 1203