1/* 2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29#include <sys/types.h> 30 31#include "event2/event-config.h" 32 33#ifdef _EVENT_HAVE_SYS_TIME_H 34#include <sys/time.h> 35#endif 36 37#include <errno.h> 38#include <stdio.h> 39#include <stdlib.h> 40#include <string.h> 41#ifdef _EVENT_HAVE_STDARG_H 42#include <stdarg.h> 43#endif 44#ifdef _EVENT_HAVE_UNISTD_H 45#include <unistd.h> 46#endif 47 48#ifdef WIN32 49#include <winsock2.h> 50#include <ws2tcpip.h> 51#endif 52 53#ifdef _EVENT_HAVE_SYS_SOCKET_H 54#include <sys/socket.h> 55#endif 56#ifdef _EVENT_HAVE_NETINET_IN_H 57#include <netinet/in.h> 58#endif 59#ifdef _EVENT_HAVE_NETINET_IN6_H 60#include <netinet/in6.h> 61#endif 62 63#include "event2/util.h" 64#include "event2/bufferevent.h" 65#include "event2/buffer.h" 66#include "event2/bufferevent_struct.h" 67#include "event2/bufferevent_compat.h" 68#include "event2/event.h" 69#include "log-internal.h" 70#include "mm-internal.h" 71#include "bufferevent-internal.h" 72#include "util-internal.h" 73#ifdef WIN32 74#include "iocp-internal.h" 75#endif 76 77/* prototypes */ 78static int be_socket_enable(struct bufferevent *, short); 79static int be_socket_disable(struct bufferevent *, short); 80static void be_socket_destruct(struct bufferevent *); 81static int be_socket_adj_timeouts(struct bufferevent *); 82static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 83static int be_socket_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 84 85static void be_socket_setfd(struct bufferevent *, evutil_socket_t); 86 87const struct bufferevent_ops bufferevent_ops_socket = { 88 "socket", 89 evutil_offsetof(struct bufferevent_private, bev), 90 be_socket_enable, 91 be_socket_disable, 92 be_socket_destruct, 93 be_socket_adj_timeouts, 94 be_socket_flush, 95 be_socket_ctrl, 96}; 97 98#define be_socket_add(ev, t) \ 99 _bufferevent_add_event((ev), (t)) 100 101static void 102bufferevent_socket_outbuf_cb(struct evbuffer *buf, 103 const struct evbuffer_cb_info *cbinfo, 104 void *arg) 105{ 106 struct bufferevent *bufev = arg; 107 struct bufferevent_private *bufev_p = 108 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 109 110 if (cbinfo->n_added && 111 (bufev->enabled & EV_WRITE) && 112 !event_pending(&bufev->ev_write, EV_WRITE, NULL) && 113 !bufev_p->write_suspended) { 114 /* Somebody added data to the buffer, and we would like to 115 * write, and we were not writing. So, start writing. */ 116 if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) { 117 /* Should we log this? */ 118 } 119 } 120} 121 122static void 123bufferevent_readcb(evutil_socket_t fd, short event, void *arg) 124{ 125 struct bufferevent *bufev = arg; 126 struct bufferevent_private *bufev_p = 127 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 128 struct evbuffer *input; 129 int res = 0; 130 short what = BEV_EVENT_READING; 131 ev_ssize_t howmuch = -1, readmax=-1; 132 133 _bufferevent_incref_and_lock(bufev); 134 135 if (event == EV_TIMEOUT) { 136 what |= BEV_EVENT_TIMEOUT; 137 goto error; 138 } 139 140 input = bufev->input; 141 142 /* 143 * If we have a high watermark configured then we don't want to 144 * read more data than would make us reach the watermark. 145 */ 146 if (bufev->wm_read.high != 0) { 147 howmuch = bufev->wm_read.high - evbuffer_get_length(input); 148 /* we somehow lowered the watermark, stop reading */ 149 if (howmuch <= 0) { 150 bufferevent_wm_suspend_read(bufev); 151 goto done; 152 } 153 } 154 readmax = _bufferevent_get_read_max(bufev_p); 155 if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" 156 * uglifies this code. XXXX */ 157 howmuch = readmax; 158 if (bufev_p->read_suspended) 159 goto done; 160 161 evbuffer_unfreeze(input, 0); 162 res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */ 163 evbuffer_freeze(input, 0); 164 165 if (res == -1) { 166 int err = evutil_socket_geterror(fd); 167 if (EVUTIL_ERR_RW_RETRIABLE(err)) 168 goto reschedule; 169 /* error case */ 170 what |= BEV_EVENT_ERROR; 171 } else if (res == 0) { 172 /* eof case */ 173 what |= BEV_EVENT_EOF; 174 } 175 176 if (res <= 0) 177 goto error; 178 179 _bufferevent_decrement_read_buckets(bufev_p, res); 180 181 /* Invoke the user callback - must always be called last */ 182 if (evbuffer_get_length(input) >= bufev->wm_read.low) 183 _bufferevent_run_readcb(bufev); 184 185 goto done; 186 187 reschedule: 188 goto done; 189 190 error: 191 bufferevent_disable(bufev, EV_READ); 192 _bufferevent_run_eventcb(bufev, what); 193 194 done: 195 _bufferevent_decref_and_unlock(bufev); 196} 197 198static void 199bufferevent_writecb(evutil_socket_t fd, short event, void *arg) 200{ 201 struct bufferevent *bufev = arg; 202 struct bufferevent_private *bufev_p = 203 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 204 int res = 0; 205 short what = BEV_EVENT_WRITING; 206 int connected = 0; 207 ev_ssize_t atmost = -1; 208 209 _bufferevent_incref_and_lock(bufev); 210 211 if (event == EV_TIMEOUT) { 212 what |= BEV_EVENT_TIMEOUT; 213 goto error; 214 } 215 if (bufev_p->connecting) { 216 int c = evutil_socket_finished_connecting(fd); 217 /* we need to fake the error if the connection was refused 218 * immediately - usually connection to localhost on BSD */ 219 if (bufev_p->connection_refused) { 220 bufev_p->connection_refused = 0; 221 c = -1; 222 } 223 224 if (c == 0) 225 goto done; 226 227 bufev_p->connecting = 0; 228 if (c < 0) { 229 event_del(&bufev->ev_write); 230 event_del(&bufev->ev_read); 231 _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR); 232 goto done; 233 } else { 234 connected = 1; 235#ifdef WIN32 236 if (BEV_IS_ASYNC(bufev)) { 237 event_del(&bufev->ev_write); 238 bufferevent_async_set_connected(bufev); 239 _bufferevent_run_eventcb(bufev, 240 BEV_EVENT_CONNECTED); 241 goto done; 242 } 243#endif 244 _bufferevent_run_eventcb(bufev, 245 BEV_EVENT_CONNECTED); 246 if (!(bufev->enabled & EV_WRITE) || 247 bufev_p->write_suspended) { 248 event_del(&bufev->ev_write); 249 goto done; 250 } 251 } 252 } 253 254 atmost = _bufferevent_get_write_max(bufev_p); 255 256 if (bufev_p->write_suspended) 257 goto done; 258 259 if (evbuffer_get_length(bufev->output)) { 260 evbuffer_unfreeze(bufev->output, 1); 261 res = evbuffer_write_atmost(bufev->output, fd, atmost); 262 evbuffer_freeze(bufev->output, 1); 263 if (res == -1) { 264 int err = evutil_socket_geterror(fd); 265 if (EVUTIL_ERR_RW_RETRIABLE(err)) 266 goto reschedule; 267 what |= BEV_EVENT_ERROR; 268 } else if (res == 0) { 269 /* eof case 270 XXXX Actually, a 0 on write doesn't indicate 271 an EOF. An ECONNRESET might be more typical. 272 */ 273 what |= BEV_EVENT_EOF; 274 } 275 if (res <= 0) 276 goto error; 277 278 _bufferevent_decrement_write_buckets(bufev_p, res); 279 } 280 281 if (evbuffer_get_length(bufev->output) == 0) { 282 event_del(&bufev->ev_write); 283 } 284 285 /* 286 * Invoke the user callback if our buffer is drained or below the 287 * low watermark. 288 */ 289 if ((res || !connected) && 290 evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { 291 _bufferevent_run_writecb(bufev); 292 } 293 294 goto done; 295 296 reschedule: 297 if (evbuffer_get_length(bufev->output) == 0) { 298 event_del(&bufev->ev_write); 299 } 300 goto done; 301 302 error: 303 bufferevent_disable(bufev, EV_WRITE); 304 _bufferevent_run_eventcb(bufev, what); 305 306 done: 307 _bufferevent_decref_and_unlock(bufev); 308} 309 310struct bufferevent * 311bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, 312 int options) 313{ 314 struct bufferevent_private *bufev_p; 315 struct bufferevent *bufev; 316 317#ifdef WIN32 318 if (base && event_base_get_iocp(base)) 319 return bufferevent_async_new(base, fd, options); 320#endif 321 322 if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) 323 return NULL; 324 325 if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket, 326 options) < 0) { 327 mm_free(bufev_p); 328 return NULL; 329 } 330 bufev = &bufev_p->bev; 331 evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); 332 333 event_assign(&bufev->ev_read, bufev->ev_base, fd, 334 EV_READ|EV_PERSIST, bufferevent_readcb, bufev); 335 event_assign(&bufev->ev_write, bufev->ev_base, fd, 336 EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); 337 338 evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); 339 340 evbuffer_freeze(bufev->input, 0); 341 evbuffer_freeze(bufev->output, 1); 342 343 return bufev; 344} 345 346int 347bufferevent_socket_connect(struct bufferevent *bev, 348 struct sockaddr *sa, int socklen) 349{ 350 struct bufferevent_private *bufev_p = 351 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 352 353 evutil_socket_t fd; 354 int r = 0; 355 int result=-1; 356 int ownfd = 0; 357 358 _bufferevent_incref_and_lock(bev); 359 360 if (!bufev_p) 361 goto done; 362 363 fd = bufferevent_getfd(bev); 364 if (fd < 0) { 365 if (!sa) 366 goto done; 367 fd = socket(sa->sa_family, SOCK_STREAM, 0); 368 if (fd < 0) 369 goto done; 370 if (evutil_make_socket_nonblocking(fd)<0) 371 goto done; 372 ownfd = 1; 373 } 374 if (sa) { 375#ifdef WIN32 376 if (bufferevent_async_can_connect(bev)) { 377 bufferevent_setfd(bev, fd); 378 r = bufferevent_async_connect(bev, fd, sa, socklen); 379 if (r < 0) 380 goto freesock; 381 bufev_p->connecting = 1; 382 result = 0; 383 goto done; 384 } else 385#endif 386 r = evutil_socket_connect(&fd, sa, socklen); 387 if (r < 0) 388 goto freesock; 389 } 390#ifdef WIN32 391 /* ConnectEx() isn't always around, even when IOCP is enabled. 392 * Here, we borrow the socket object's write handler to fall back 393 * on a non-blocking connect() when ConnectEx() is unavailable. */ 394 if (BEV_IS_ASYNC(bev)) { 395 event_assign(&bev->ev_write, bev->ev_base, fd, 396 EV_WRITE|EV_PERSIST, bufferevent_writecb, bev); 397 } 398#endif 399 bufferevent_setfd(bev, fd); 400 if (r == 0) { 401 if (! be_socket_enable(bev, EV_WRITE)) { 402 bufev_p->connecting = 1; 403 result = 0; 404 goto done; 405 } 406 } else if (r == 1) { 407 /* The connect succeeded already. How very BSD of it. */ 408 result = 0; 409 bufev_p->connecting = 1; 410 event_active(&bev->ev_write, EV_WRITE, 1); 411 } else { 412 /* The connect failed already. How very BSD of it. */ 413 bufev_p->connection_refused = 1; 414 bufev_p->connecting = 1; 415 result = 0; 416 event_active(&bev->ev_write, EV_WRITE, 1); 417 } 418 419 goto done; 420 421freesock: 422 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); 423 if (ownfd) 424 evutil_closesocket(fd); 425 /* do something about the error? */ 426done: 427 _bufferevent_decref_and_unlock(bev); 428 return result; 429} 430 431static void 432bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai, 433 void *arg) 434{ 435 struct bufferevent *bev = arg; 436 struct bufferevent_private *bev_p = 437 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 438 int r; 439 BEV_LOCK(bev); 440 441 bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP); 442 bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP); 443 444 if (result != 0) { 445 bev_p->dns_error = result; 446 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); 447 _bufferevent_decref_and_unlock(bev); 448 if (ai) 449 evutil_freeaddrinfo(ai); 450 return; 451 } 452 453 /* XXX use the other addrinfos? */ 454 /* XXX use this return value */ 455 r = bufferevent_socket_connect(bev, ai->ai_addr, (int)ai->ai_addrlen); 456 (void)r; 457 _bufferevent_decref_and_unlock(bev); 458 evutil_freeaddrinfo(ai); 459} 460 461int 462bufferevent_socket_connect_hostname(struct bufferevent *bev, 463 struct evdns_base *evdns_base, int family, const char *hostname, int port) 464{ 465 char portbuf[10]; 466 struct evutil_addrinfo hint; 467 int err; 468 struct bufferevent_private *bev_p = 469 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 470 471 if (family != AF_INET && family != AF_INET6 && family != AF_UNSPEC) 472 return -1; 473 if (port < 1 || port > 65535) 474 return -1; 475 476 BEV_LOCK(bev); 477 bev_p->dns_error = 0; 478 BEV_UNLOCK(bev); 479 480 evutil_snprintf(portbuf, sizeof(portbuf), "%d", port); 481 482 memset(&hint, 0, sizeof(hint)); 483 hint.ai_family = family; 484 hint.ai_protocol = IPPROTO_TCP; 485 hint.ai_socktype = SOCK_STREAM; 486 487 bufferevent_suspend_write(bev, BEV_SUSPEND_LOOKUP); 488 bufferevent_suspend_read(bev, BEV_SUSPEND_LOOKUP); 489 490 bufferevent_incref(bev); 491 err = evutil_getaddrinfo_async(evdns_base, hostname, portbuf, 492 &hint, bufferevent_connect_getaddrinfo_cb, bev); 493 494 if (err == 0) { 495 return 0; 496 } else { 497 bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP); 498 bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP); 499 return -1; 500 } 501} 502 503int 504bufferevent_socket_get_dns_error(struct bufferevent *bev) 505{ 506 int rv; 507 struct bufferevent_private *bev_p = 508 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 509 510 BEV_LOCK(bev); 511 rv = bev_p->dns_error; 512 BEV_LOCK(bev); 513 514 return rv; 515} 516 517/* 518 * Create a new buffered event object. 519 * 520 * The read callback is invoked whenever we read new data. 521 * The write callback is invoked whenever the output buffer is drained. 522 * The error callback is invoked on a write/read error or on EOF. 523 * 524 * Both read and write callbacks maybe NULL. The error callback is not 525 * allowed to be NULL and have to be provided always. 526 */ 527 528struct bufferevent * 529bufferevent_new(evutil_socket_t fd, 530 bufferevent_data_cb readcb, bufferevent_data_cb writecb, 531 bufferevent_event_cb eventcb, void *cbarg) 532{ 533 struct bufferevent *bufev; 534 535 if (!(bufev = bufferevent_socket_new(NULL, fd, 0))) 536 return NULL; 537 538 bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg); 539 540 return bufev; 541} 542 543 544static int 545be_socket_enable(struct bufferevent *bufev, short event) 546{ 547 if (event & EV_READ) { 548 if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1) 549 return -1; 550 } 551 if (event & EV_WRITE) { 552 if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1) 553 return -1; 554 } 555 return 0; 556} 557 558static int 559be_socket_disable(struct bufferevent *bufev, short event) 560{ 561 struct bufferevent_private *bufev_p = 562 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 563 if (event & EV_READ) { 564 if (event_del(&bufev->ev_read) == -1) 565 return -1; 566 } 567 /* Don't actually disable the write if we are trying to connect. */ 568 if ((event & EV_WRITE) && ! bufev_p->connecting) { 569 if (event_del(&bufev->ev_write) == -1) 570 return -1; 571 } 572 return 0; 573} 574 575static void 576be_socket_destruct(struct bufferevent *bufev) 577{ 578 struct bufferevent_private *bufev_p = 579 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 580 evutil_socket_t fd; 581 EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket); 582 583 fd = event_get_fd(&bufev->ev_read); 584 585 event_del(&bufev->ev_read); 586 event_del(&bufev->ev_write); 587 588 if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0) 589 EVUTIL_CLOSESOCKET(fd); 590} 591 592static int 593be_socket_adj_timeouts(struct bufferevent *bufev) 594{ 595 int r = 0; 596 if (event_pending(&bufev->ev_read, EV_READ, NULL)) 597 if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0) 598 r = -1; 599 if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) { 600 if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0) 601 r = -1; 602 } 603 return r; 604} 605 606static int 607be_socket_flush(struct bufferevent *bev, short iotype, 608 enum bufferevent_flush_mode mode) 609{ 610 return 0; 611} 612 613 614static void 615be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) 616{ 617 BEV_LOCK(bufev); 618 EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket); 619 620 event_del(&bufev->ev_read); 621 event_del(&bufev->ev_write); 622 623 event_assign(&bufev->ev_read, bufev->ev_base, fd, 624 EV_READ|EV_PERSIST, bufferevent_readcb, bufev); 625 event_assign(&bufev->ev_write, bufev->ev_base, fd, 626 EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); 627 628 if (fd >= 0) 629 bufferevent_enable(bufev, bufev->enabled); 630 631 BEV_UNLOCK(bufev); 632} 633 634/* XXXX Should non-socket bufferevents support this? */ 635int 636bufferevent_priority_set(struct bufferevent *bufev, int priority) 637{ 638 int r = -1; 639 640 BEV_LOCK(bufev); 641 if (bufev->be_ops != &bufferevent_ops_socket) 642 goto done; 643 644 if (event_priority_set(&bufev->ev_read, priority) == -1) 645 goto done; 646 if (event_priority_set(&bufev->ev_write, priority) == -1) 647 goto done; 648 649 r = 0; 650done: 651 BEV_UNLOCK(bufev); 652 return r; 653} 654 655/* XXXX Should non-socket bufferevents support this? */ 656int 657bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 658{ 659 int res = -1; 660 661 BEV_LOCK(bufev); 662 if (bufev->be_ops != &bufferevent_ops_socket) 663 goto done; 664 665 bufev->ev_base = base; 666 667 res = event_base_set(base, &bufev->ev_read); 668 if (res == -1) 669 goto done; 670 671 res = event_base_set(base, &bufev->ev_write); 672done: 673 BEV_UNLOCK(bufev); 674 return res; 675} 676 677static int 678be_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 679 union bufferevent_ctrl_data *data) 680{ 681 switch (op) { 682 case BEV_CTRL_SET_FD: 683 be_socket_setfd(bev, data->fd); 684 return 0; 685 case BEV_CTRL_GET_FD: 686 data->fd = event_get_fd(&bev->ev_read); 687 return 0; 688 case BEV_CTRL_GET_UNDERLYING: 689 case BEV_CTRL_CANCEL_ALL: 690 default: 691 return -1; 692 } 693} 694