1275970Scy/* 2275970Scy * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3275970Scy * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 4275970Scy * All rights reserved. 5275970Scy * 6275970Scy * Redistribution and use in source and binary forms, with or without 7275970Scy * modification, are permitted provided that the following conditions 8275970Scy * are met: 9275970Scy * 1. Redistributions of source code must retain the above copyright 10275970Scy * notice, this list of conditions and the following disclaimer. 11275970Scy * 2. Redistributions in binary form must reproduce the above copyright 12275970Scy * notice, this list of conditions and the following disclaimer in the 13275970Scy * documentation and/or other materials provided with the distribution. 14275970Scy * 3. The name of the author may not be used to endorse or promote products 15275970Scy * derived from this software without specific prior written permission. 16275970Scy * 17275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27275970Scy */ 28275970Scy 29275970Scy#include "event2/event-config.h" 30275970Scy#include "evconfig-private.h" 31275970Scy 32275970Scy#include <sys/types.h> 33275970Scy 34275970Scy#ifdef EVENT__HAVE_SYS_TIME_H 35275970Scy#include <sys/time.h> 36275970Scy#endif 37275970Scy 38275970Scy#include <errno.h> 39275970Scy#include <stdio.h> 40275970Scy#include <stdlib.h> 41275970Scy#include <string.h> 42275970Scy#ifdef EVENT__HAVE_STDARG_H 43275970Scy#include <stdarg.h> 44275970Scy#endif 45275970Scy#ifdef EVENT__HAVE_UNISTD_H 46275970Scy#include <unistd.h> 47275970Scy#endif 48275970Scy 49275970Scy#ifdef _WIN32 50275970Scy#include <winsock2.h> 51275970Scy#include <ws2tcpip.h> 52275970Scy#endif 53275970Scy 54275970Scy#ifdef EVENT__HAVE_SYS_SOCKET_H 55275970Scy#include <sys/socket.h> 56275970Scy#endif 57275970Scy#ifdef EVENT__HAVE_NETINET_IN_H 58275970Scy#include <netinet/in.h> 59275970Scy#endif 60275970Scy#ifdef EVENT__HAVE_NETINET_IN6_H 61275970Scy#include <netinet/in6.h> 62275970Scy#endif 63275970Scy 64275970Scy#include "event2/util.h" 65275970Scy#include "event2/bufferevent.h" 66275970Scy#include "event2/buffer.h" 67275970Scy#include "event2/bufferevent_struct.h" 68275970Scy#include "event2/bufferevent_compat.h" 69275970Scy#include "event2/event.h" 70275970Scy#include "log-internal.h" 71275970Scy#include "mm-internal.h" 72275970Scy#include "bufferevent-internal.h" 73275970Scy#include "util-internal.h" 74275970Scy#ifdef _WIN32 75275970Scy#include "iocp-internal.h" 76275970Scy#endif 77275970Scy 78275970Scy/* prototypes */ 79275970Scystatic int be_socket_enable(struct bufferevent *, short); 80275970Scystatic int be_socket_disable(struct bufferevent *, short); 81275970Scystatic void be_socket_destruct(struct bufferevent *); 82275970Scystatic int be_socket_adj_timeouts(struct bufferevent *); 83275970Scystatic int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 84275970Scystatic int be_socket_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 85275970Scy 86275970Scystatic void be_socket_setfd(struct bufferevent *, evutil_socket_t); 87275970Scy 88275970Scyconst struct bufferevent_ops bufferevent_ops_socket = { 89275970Scy "socket", 90275970Scy evutil_offsetof(struct bufferevent_private, bev), 91275970Scy be_socket_enable, 92275970Scy be_socket_disable, 93275970Scy NULL, /* unlink */ 94275970Scy be_socket_destruct, 95275970Scy be_socket_adj_timeouts, 96275970Scy be_socket_flush, 97275970Scy be_socket_ctrl, 98275970Scy}; 99275970Scy 100275970Scy#define be_socket_add(ev, t) \ 101275970Scy bufferevent_add_event_((ev), (t)) 102275970Scy 103275970Scystatic void 104275970Scybufferevent_socket_outbuf_cb(struct evbuffer *buf, 105275970Scy const struct evbuffer_cb_info *cbinfo, 106275970Scy void *arg) 107275970Scy{ 108275970Scy struct bufferevent *bufev = arg; 109275970Scy struct bufferevent_private *bufev_p = 110275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 111275970Scy 112275970Scy if (cbinfo->n_added && 113275970Scy (bufev->enabled & EV_WRITE) && 114275970Scy !event_pending(&bufev->ev_write, EV_WRITE, NULL) && 115275970Scy !bufev_p->write_suspended) { 116275970Scy /* Somebody added data to the buffer, and we would like to 117275970Scy * write, and we were not writing. So, start writing. */ 118275970Scy if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) { 119275970Scy /* Should we log this? */ 120275970Scy } 121275970Scy } 122275970Scy} 123275970Scy 124275970Scystatic void 125275970Scybufferevent_readcb(evutil_socket_t fd, short event, void *arg) 126275970Scy{ 127275970Scy struct bufferevent *bufev = arg; 128275970Scy struct bufferevent_private *bufev_p = 129275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 130275970Scy struct evbuffer *input; 131275970Scy int res = 0; 132275970Scy short what = BEV_EVENT_READING; 133275970Scy ev_ssize_t howmuch = -1, readmax=-1; 134275970Scy 135275970Scy bufferevent_incref_and_lock_(bufev); 136275970Scy 137275970Scy if (event == EV_TIMEOUT) { 138275970Scy /* Note that we only check for event==EV_TIMEOUT. If 139275970Scy * event==EV_TIMEOUT|EV_READ, we can safely ignore the 140275970Scy * timeout, since a read has occurred */ 141275970Scy what |= BEV_EVENT_TIMEOUT; 142275970Scy goto error; 143275970Scy } 144275970Scy 145275970Scy input = bufev->input; 146275970Scy 147275970Scy /* 148275970Scy * If we have a high watermark configured then we don't want to 149275970Scy * read more data than would make us reach the watermark. 150275970Scy */ 151275970Scy if (bufev->wm_read.high != 0) { 152275970Scy howmuch = bufev->wm_read.high - evbuffer_get_length(input); 153275970Scy /* we somehow lowered the watermark, stop reading */ 154275970Scy if (howmuch <= 0) { 155275970Scy bufferevent_wm_suspend_read(bufev); 156275970Scy goto done; 157275970Scy } 158275970Scy } 159275970Scy readmax = bufferevent_get_read_max_(bufev_p); 160275970Scy if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" 161275970Scy * uglifies this code. XXXX */ 162275970Scy howmuch = readmax; 163275970Scy if (bufev_p->read_suspended) 164275970Scy goto done; 165275970Scy 166275970Scy evbuffer_unfreeze(input, 0); 167275970Scy res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */ 168275970Scy evbuffer_freeze(input, 0); 169275970Scy 170275970Scy if (res == -1) { 171275970Scy int err = evutil_socket_geterror(fd); 172275970Scy if (EVUTIL_ERR_RW_RETRIABLE(err)) 173275970Scy goto reschedule; 174275970Scy /* error case */ 175275970Scy what |= BEV_EVENT_ERROR; 176275970Scy } else if (res == 0) { 177275970Scy /* eof case */ 178275970Scy what |= BEV_EVENT_EOF; 179275970Scy } 180275970Scy 181275970Scy if (res <= 0) 182275970Scy goto error; 183275970Scy 184275970Scy bufferevent_decrement_read_buckets_(bufev_p, res); 185275970Scy 186275970Scy /* Invoke the user callback - must always be called last */ 187275970Scy bufferevent_trigger_nolock_(bufev, EV_READ, 0); 188275970Scy 189275970Scy goto done; 190275970Scy 191275970Scy reschedule: 192275970Scy goto done; 193275970Scy 194275970Scy error: 195275970Scy bufferevent_disable(bufev, EV_READ); 196275970Scy bufferevent_run_eventcb_(bufev, what, 0); 197275970Scy 198275970Scy done: 199275970Scy bufferevent_decref_and_unlock_(bufev); 200275970Scy} 201275970Scy 202275970Scystatic void 203275970Scybufferevent_writecb(evutil_socket_t fd, short event, void *arg) 204275970Scy{ 205275970Scy struct bufferevent *bufev = arg; 206275970Scy struct bufferevent_private *bufev_p = 207275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 208275970Scy int res = 0; 209275970Scy short what = BEV_EVENT_WRITING; 210275970Scy int connected = 0; 211275970Scy ev_ssize_t atmost = -1; 212275970Scy 213275970Scy bufferevent_incref_and_lock_(bufev); 214275970Scy 215275970Scy if (event == EV_TIMEOUT) { 216275970Scy /* Note that we only check for event==EV_TIMEOUT. If 217275970Scy * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the 218275970Scy * timeout, since a read has occurred */ 219275970Scy what |= BEV_EVENT_TIMEOUT; 220275970Scy goto error; 221275970Scy } 222275970Scy if (bufev_p->connecting) { 223275970Scy int c = evutil_socket_finished_connecting_(fd); 224275970Scy /* we need to fake the error if the connection was refused 225275970Scy * immediately - usually connection to localhost on BSD */ 226275970Scy if (bufev_p->connection_refused) { 227275970Scy bufev_p->connection_refused = 0; 228275970Scy c = -1; 229275970Scy } 230275970Scy 231275970Scy if (c == 0) 232275970Scy goto done; 233275970Scy 234275970Scy bufev_p->connecting = 0; 235275970Scy if (c < 0) { 236275970Scy event_del(&bufev->ev_write); 237275970Scy event_del(&bufev->ev_read); 238275970Scy bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR, 0); 239275970Scy goto done; 240275970Scy } else { 241275970Scy connected = 1; 242275970Scy#ifdef _WIN32 243275970Scy if (BEV_IS_ASYNC(bufev)) { 244275970Scy event_del(&bufev->ev_write); 245275970Scy bufferevent_async_set_connected_(bufev); 246275970Scy bufferevent_run_eventcb_(bufev, 247275970Scy BEV_EVENT_CONNECTED, 0); 248275970Scy goto done; 249275970Scy } 250275970Scy#endif 251275970Scy bufferevent_run_eventcb_(bufev, 252275970Scy BEV_EVENT_CONNECTED, 0); 253275970Scy if (!(bufev->enabled & EV_WRITE) || 254275970Scy bufev_p->write_suspended) { 255275970Scy event_del(&bufev->ev_write); 256275970Scy goto done; 257275970Scy } 258275970Scy } 259275970Scy } 260275970Scy 261275970Scy atmost = bufferevent_get_write_max_(bufev_p); 262275970Scy 263275970Scy if (bufev_p->write_suspended) 264275970Scy goto done; 265275970Scy 266275970Scy if (evbuffer_get_length(bufev->output)) { 267275970Scy evbuffer_unfreeze(bufev->output, 1); 268275970Scy res = evbuffer_write_atmost(bufev->output, fd, atmost); 269275970Scy evbuffer_freeze(bufev->output, 1); 270275970Scy if (res == -1) { 271275970Scy int err = evutil_socket_geterror(fd); 272275970Scy if (EVUTIL_ERR_RW_RETRIABLE(err)) 273275970Scy goto reschedule; 274275970Scy what |= BEV_EVENT_ERROR; 275275970Scy } else if (res == 0) { 276275970Scy /* eof case 277275970Scy XXXX Actually, a 0 on write doesn't indicate 278275970Scy an EOF. An ECONNRESET might be more typical. 279275970Scy */ 280275970Scy what |= BEV_EVENT_EOF; 281275970Scy } 282275970Scy if (res <= 0) 283275970Scy goto error; 284275970Scy 285275970Scy bufferevent_decrement_write_buckets_(bufev_p, res); 286275970Scy } 287275970Scy 288275970Scy if (evbuffer_get_length(bufev->output) == 0) { 289275970Scy event_del(&bufev->ev_write); 290275970Scy } 291275970Scy 292275970Scy /* 293275970Scy * Invoke the user callback if our buffer is drained or below the 294275970Scy * low watermark. 295275970Scy */ 296275970Scy if (res || !connected) { 297275970Scy bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); 298275970Scy } 299275970Scy 300275970Scy goto done; 301275970Scy 302275970Scy reschedule: 303275970Scy if (evbuffer_get_length(bufev->output) == 0) { 304275970Scy event_del(&bufev->ev_write); 305275970Scy } 306275970Scy goto done; 307275970Scy 308275970Scy error: 309275970Scy bufferevent_disable(bufev, EV_WRITE); 310275970Scy bufferevent_run_eventcb_(bufev, what, 0); 311275970Scy 312275970Scy done: 313275970Scy bufferevent_decref_and_unlock_(bufev); 314275970Scy} 315275970Scy 316275970Scystruct bufferevent * 317275970Scybufferevent_socket_new(struct event_base *base, evutil_socket_t fd, 318275970Scy int options) 319275970Scy{ 320275970Scy struct bufferevent_private *bufev_p; 321275970Scy struct bufferevent *bufev; 322275970Scy 323275970Scy#ifdef _WIN32 324275970Scy if (base && event_base_get_iocp_(base)) 325275970Scy return bufferevent_async_new_(base, fd, options); 326275970Scy#endif 327275970Scy 328275970Scy if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) 329275970Scy return NULL; 330275970Scy 331275970Scy if (bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket, 332275970Scy options) < 0) { 333275970Scy mm_free(bufev_p); 334275970Scy return NULL; 335275970Scy } 336275970Scy bufev = &bufev_p->bev; 337275970Scy evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); 338275970Scy 339275970Scy event_assign(&bufev->ev_read, bufev->ev_base, fd, 340275970Scy EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); 341275970Scy event_assign(&bufev->ev_write, bufev->ev_base, fd, 342275970Scy EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); 343275970Scy 344275970Scy evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); 345275970Scy 346275970Scy evbuffer_freeze(bufev->input, 0); 347275970Scy evbuffer_freeze(bufev->output, 1); 348275970Scy 349275970Scy return bufev; 350275970Scy} 351275970Scy 352275970Scyint 353275970Scybufferevent_socket_connect(struct bufferevent *bev, 354275970Scy struct sockaddr *sa, int socklen) 355275970Scy{ 356275970Scy struct bufferevent_private *bufev_p = 357275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 358275970Scy 359275970Scy evutil_socket_t fd; 360275970Scy int r = 0; 361275970Scy int result=-1; 362275970Scy int ownfd = 0; 363275970Scy 364275970Scy bufferevent_incref_and_lock_(bev); 365275970Scy 366275970Scy if (!bufev_p) 367275970Scy goto done; 368275970Scy 369275970Scy fd = bufferevent_getfd(bev); 370275970Scy if (fd < 0) { 371275970Scy if (!sa) 372275970Scy goto done; 373275970Scy fd = evutil_socket_(sa->sa_family, 374275970Scy SOCK_STREAM|EVUTIL_SOCK_NONBLOCK, 0); 375275970Scy if (fd < 0) 376275970Scy goto done; 377275970Scy ownfd = 1; 378275970Scy } 379275970Scy if (sa) { 380275970Scy#ifdef _WIN32 381275970Scy if (bufferevent_async_can_connect_(bev)) { 382275970Scy bufferevent_setfd(bev, fd); 383275970Scy r = bufferevent_async_connect_(bev, fd, sa, socklen); 384275970Scy if (r < 0) 385275970Scy goto freesock; 386275970Scy bufev_p->connecting = 1; 387275970Scy result = 0; 388275970Scy goto done; 389275970Scy } else 390275970Scy#endif 391275970Scy r = evutil_socket_connect_(&fd, sa, socklen); 392275970Scy if (r < 0) 393275970Scy goto freesock; 394275970Scy } 395275970Scy#ifdef _WIN32 396275970Scy /* ConnectEx() isn't always around, even when IOCP is enabled. 397275970Scy * Here, we borrow the socket object's write handler to fall back 398275970Scy * on a non-blocking connect() when ConnectEx() is unavailable. */ 399275970Scy if (BEV_IS_ASYNC(bev)) { 400275970Scy event_assign(&bev->ev_write, bev->ev_base, fd, 401275970Scy EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev); 402275970Scy } 403275970Scy#endif 404275970Scy bufferevent_setfd(bev, fd); 405275970Scy if (r == 0) { 406275970Scy if (! be_socket_enable(bev, EV_WRITE)) { 407275970Scy bufev_p->connecting = 1; 408275970Scy result = 0; 409275970Scy goto done; 410275970Scy } 411275970Scy } else if (r == 1) { 412275970Scy /* The connect succeeded already. How very BSD of it. */ 413275970Scy result = 0; 414275970Scy bufev_p->connecting = 1; 415275970Scy event_active(&bev->ev_write, EV_WRITE, 1); 416275970Scy } else { 417275970Scy /* The connect failed already. How very BSD of it. */ 418275970Scy bufev_p->connection_refused = 1; 419275970Scy bufev_p->connecting = 1; 420275970Scy result = 0; 421275970Scy event_active(&bev->ev_write, EV_WRITE, 1); 422275970Scy } 423275970Scy 424275970Scy goto done; 425275970Scy 426275970Scyfreesock: 427275970Scy bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 428275970Scy if (ownfd) 429275970Scy evutil_closesocket(fd); 430275970Scy /* do something about the error? */ 431275970Scydone: 432275970Scy bufferevent_decref_and_unlock_(bev); 433275970Scy return result; 434275970Scy} 435275970Scy 436275970Scystatic void 437275970Scybufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai, 438275970Scy void *arg) 439275970Scy{ 440275970Scy struct bufferevent *bev = arg; 441275970Scy struct bufferevent_private *bev_p = 442275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 443275970Scy int r; 444275970Scy BEV_LOCK(bev); 445275970Scy 446275970Scy bufferevent_unsuspend_write_(bev, BEV_SUSPEND_LOOKUP); 447275970Scy bufferevent_unsuspend_read_(bev, BEV_SUSPEND_LOOKUP); 448275970Scy 449275970Scy if (result != 0) { 450275970Scy bev_p->dns_error = result; 451275970Scy bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 452275970Scy bufferevent_decref_and_unlock_(bev); 453275970Scy if (ai) 454275970Scy evutil_freeaddrinfo(ai); 455275970Scy return; 456275970Scy } 457275970Scy 458275970Scy /* XXX use the other addrinfos? */ 459275970Scy /* XXX use this return value */ 460275970Scy r = bufferevent_socket_connect(bev, ai->ai_addr, (int)ai->ai_addrlen); 461275970Scy (void)r; 462275970Scy bufferevent_decref_and_unlock_(bev); 463275970Scy evutil_freeaddrinfo(ai); 464275970Scy} 465275970Scy 466275970Scyint 467275970Scybufferevent_socket_connect_hostname(struct bufferevent *bev, 468275970Scy struct evdns_base *evdns_base, int family, const char *hostname, int port) 469275970Scy{ 470275970Scy char portbuf[10]; 471275970Scy struct evutil_addrinfo hint; 472275970Scy int err; 473275970Scy struct bufferevent_private *bev_p = 474275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 475275970Scy 476275970Scy if (family != AF_INET && family != AF_INET6 && family != AF_UNSPEC) 477275970Scy return -1; 478275970Scy if (port < 1 || port > 65535) 479275970Scy return -1; 480275970Scy 481275970Scy BEV_LOCK(bev); 482275970Scy bev_p->dns_error = 0; 483275970Scy BEV_UNLOCK(bev); 484275970Scy 485275970Scy evutil_snprintf(portbuf, sizeof(portbuf), "%d", port); 486275970Scy 487275970Scy memset(&hint, 0, sizeof(hint)); 488275970Scy hint.ai_family = family; 489275970Scy hint.ai_protocol = IPPROTO_TCP; 490275970Scy hint.ai_socktype = SOCK_STREAM; 491275970Scy 492275970Scy bufferevent_suspend_write_(bev, BEV_SUSPEND_LOOKUP); 493275970Scy bufferevent_suspend_read_(bev, BEV_SUSPEND_LOOKUP); 494275970Scy 495275970Scy bufferevent_incref_(bev); 496275970Scy err = evutil_getaddrinfo_async_(evdns_base, hostname, portbuf, 497275970Scy &hint, bufferevent_connect_getaddrinfo_cb, bev); 498275970Scy 499275970Scy if (err == 0) { 500275970Scy return 0; 501275970Scy } else { 502275970Scy bufferevent_unsuspend_write_(bev, BEV_SUSPEND_LOOKUP); 503275970Scy bufferevent_unsuspend_read_(bev, BEV_SUSPEND_LOOKUP); 504275970Scy bufferevent_decref_(bev); 505275970Scy return -1; 506275970Scy } 507275970Scy} 508275970Scy 509275970Scyint 510275970Scybufferevent_socket_get_dns_error(struct bufferevent *bev) 511275970Scy{ 512275970Scy int rv; 513275970Scy struct bufferevent_private *bev_p = 514275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 515275970Scy 516275970Scy BEV_LOCK(bev); 517275970Scy rv = bev_p->dns_error; 518275970Scy BEV_UNLOCK(bev); 519275970Scy 520275970Scy return rv; 521275970Scy} 522275970Scy 523275970Scy/* 524275970Scy * Create a new buffered event object. 525275970Scy * 526275970Scy * The read callback is invoked whenever we read new data. 527275970Scy * The write callback is invoked whenever the output buffer is drained. 528275970Scy * The error callback is invoked on a write/read error or on EOF. 529275970Scy * 530275970Scy * Both read and write callbacks maybe NULL. The error callback is not 531275970Scy * allowed to be NULL and have to be provided always. 532275970Scy */ 533275970Scy 534275970Scystruct bufferevent * 535275970Scybufferevent_new(evutil_socket_t fd, 536275970Scy bufferevent_data_cb readcb, bufferevent_data_cb writecb, 537275970Scy bufferevent_event_cb eventcb, void *cbarg) 538275970Scy{ 539275970Scy struct bufferevent *bufev; 540275970Scy 541275970Scy if (!(bufev = bufferevent_socket_new(NULL, fd, 0))) 542275970Scy return NULL; 543275970Scy 544275970Scy bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg); 545275970Scy 546275970Scy return bufev; 547275970Scy} 548275970Scy 549275970Scy 550275970Scystatic int 551275970Scybe_socket_enable(struct bufferevent *bufev, short event) 552275970Scy{ 553275970Scy if (event & EV_READ) { 554275970Scy if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1) 555275970Scy return -1; 556275970Scy } 557275970Scy if (event & EV_WRITE) { 558275970Scy if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1) 559275970Scy return -1; 560275970Scy } 561275970Scy return 0; 562275970Scy} 563275970Scy 564275970Scystatic int 565275970Scybe_socket_disable(struct bufferevent *bufev, short event) 566275970Scy{ 567275970Scy struct bufferevent_private *bufev_p = 568275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 569275970Scy if (event & EV_READ) { 570275970Scy if (event_del(&bufev->ev_read) == -1) 571275970Scy return -1; 572275970Scy } 573275970Scy /* Don't actually disable the write if we are trying to connect. */ 574275970Scy if ((event & EV_WRITE) && ! bufev_p->connecting) { 575275970Scy if (event_del(&bufev->ev_write) == -1) 576275970Scy return -1; 577275970Scy } 578275970Scy return 0; 579275970Scy} 580275970Scy 581275970Scystatic void 582275970Scybe_socket_destruct(struct bufferevent *bufev) 583275970Scy{ 584275970Scy struct bufferevent_private *bufev_p = 585275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 586275970Scy evutil_socket_t fd; 587275970Scy EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket); 588275970Scy 589275970Scy fd = event_get_fd(&bufev->ev_read); 590275970Scy 591275970Scy if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0) 592275970Scy EVUTIL_CLOSESOCKET(fd); 593275970Scy} 594275970Scy 595275970Scystatic int 596275970Scybe_socket_adj_timeouts(struct bufferevent *bufev) 597275970Scy{ 598275970Scy int r = 0; 599275970Scy if (event_pending(&bufev->ev_read, EV_READ, NULL)) { 600275970Scy if (evutil_timerisset(&bufev->timeout_read)) { 601275970Scy if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0) 602275970Scy r = -1; 603275970Scy } else { 604275970Scy event_remove_timer(&bufev->ev_read); 605275970Scy } 606275970Scy } 607275970Scy if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) { 608275970Scy if (evutil_timerisset(&bufev->timeout_write)) { 609275970Scy if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0) 610275970Scy r = -1; 611275970Scy } else { 612275970Scy event_remove_timer(&bufev->ev_write); 613275970Scy } 614275970Scy } 615275970Scy return r; 616275970Scy} 617275970Scy 618275970Scystatic int 619275970Scybe_socket_flush(struct bufferevent *bev, short iotype, 620275970Scy enum bufferevent_flush_mode mode) 621275970Scy{ 622275970Scy return 0; 623275970Scy} 624275970Scy 625275970Scy 626275970Scystatic void 627275970Scybe_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) 628275970Scy{ 629275970Scy BEV_LOCK(bufev); 630275970Scy EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket); 631275970Scy 632275970Scy event_del(&bufev->ev_read); 633275970Scy event_del(&bufev->ev_write); 634275970Scy 635275970Scy event_assign(&bufev->ev_read, bufev->ev_base, fd, 636275970Scy EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); 637275970Scy event_assign(&bufev->ev_write, bufev->ev_base, fd, 638275970Scy EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); 639275970Scy 640275970Scy if (fd >= 0) 641275970Scy bufferevent_enable(bufev, bufev->enabled); 642275970Scy 643275970Scy BEV_UNLOCK(bufev); 644275970Scy} 645275970Scy 646275970Scy/* XXXX Should non-socket bufferevents support this? */ 647275970Scyint 648275970Scybufferevent_priority_set(struct bufferevent *bufev, int priority) 649275970Scy{ 650275970Scy int r = -1; 651275970Scy struct bufferevent_private *bufev_p = 652275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 653275970Scy 654275970Scy BEV_LOCK(bufev); 655275970Scy if (bufev->be_ops != &bufferevent_ops_socket) 656275970Scy goto done; 657275970Scy 658275970Scy if (event_priority_set(&bufev->ev_read, priority) == -1) 659275970Scy goto done; 660275970Scy if (event_priority_set(&bufev->ev_write, priority) == -1) 661275970Scy goto done; 662275970Scy 663275970Scy event_deferred_cb_set_priority_(&bufev_p->deferred, priority); 664275970Scy 665275970Scy r = 0; 666275970Scydone: 667275970Scy BEV_UNLOCK(bufev); 668275970Scy return r; 669275970Scy} 670275970Scy 671275970Scy/* XXXX Should non-socket bufferevents support this? */ 672275970Scyint 673275970Scybufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 674275970Scy{ 675275970Scy int res = -1; 676275970Scy 677275970Scy BEV_LOCK(bufev); 678275970Scy if (bufev->be_ops != &bufferevent_ops_socket) 679275970Scy goto done; 680275970Scy 681275970Scy bufev->ev_base = base; 682275970Scy 683275970Scy res = event_base_set(base, &bufev->ev_read); 684275970Scy if (res == -1) 685275970Scy goto done; 686275970Scy 687275970Scy res = event_base_set(base, &bufev->ev_write); 688275970Scydone: 689275970Scy BEV_UNLOCK(bufev); 690275970Scy return res; 691275970Scy} 692275970Scy 693275970Scystatic int 694275970Scybe_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 695275970Scy union bufferevent_ctrl_data *data) 696275970Scy{ 697275970Scy switch (op) { 698275970Scy case BEV_CTRL_SET_FD: 699275970Scy be_socket_setfd(bev, data->fd); 700275970Scy return 0; 701275970Scy case BEV_CTRL_GET_FD: 702275970Scy data->fd = event_get_fd(&bev->ev_read); 703275970Scy return 0; 704275970Scy case BEV_CTRL_GET_UNDERLYING: 705275970Scy case BEV_CTRL_CANCEL_ALL: 706275970Scy default: 707275970Scy return -1; 708275970Scy } 709275970Scy} 710