1275970Scy/* 2275970Scy * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu> 3275970Scy * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson 4275970Scy * 5275970Scy * Redistribution and use in source and binary forms, with or without 6275970Scy * modification, are permitted provided that the following conditions 7275970Scy * are met: 8275970Scy * 1. Redistributions of source code must retain the above copyright 9275970Scy * notice, this list of conditions and the following disclaimer. 10275970Scy * 2. Redistributions in binary form must reproduce the above copyright 11275970Scy * notice, this list of conditions and the following disclaimer in the 12275970Scy * documentation and/or other materials provided with the distribution. 13275970Scy * 3. The name of the author may not be used to endorse or promote products 14275970Scy * derived from this software without specific prior written permission. 15275970Scy * 16275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26275970Scy */ 27275970Scy 28275970Scy#include "event2/event-config.h" 29275970Scy#include "evconfig-private.h" 30275970Scy 31275970Scy#include <sys/types.h> 32275970Scy 33275970Scy#ifdef EVENT__HAVE_SYS_TIME_H 34275970Scy#include <sys/time.h> 35275970Scy#endif 36275970Scy 37275970Scy#include <errno.h> 38275970Scy#include <stdio.h> 39275970Scy#include <stdlib.h> 40275970Scy#include <string.h> 41275970Scy#ifdef EVENT__HAVE_STDARG_H 42275970Scy#include <stdarg.h> 43275970Scy#endif 44275970Scy 45275970Scy#ifdef _WIN32 46275970Scy#include <winsock2.h> 47275970Scy#endif 48275970Scy#include <errno.h> 49275970Scy 50275970Scy#include "event2/util.h" 51275970Scy#include "event2/buffer.h" 52275970Scy#include "event2/buffer_compat.h" 53275970Scy#include "event2/bufferevent.h" 54275970Scy#include "event2/bufferevent_struct.h" 55275970Scy#include "event2/bufferevent_compat.h" 56275970Scy#include "event2/event.h" 57275970Scy#include "event-internal.h" 58275970Scy#include "log-internal.h" 59275970Scy#include "mm-internal.h" 60275970Scy#include "bufferevent-internal.h" 61275970Scy#include "evbuffer-internal.h" 62275970Scy#include "util-internal.h" 63275970Scy 64275970Scystatic void bufferevent_cancel_all_(struct bufferevent *bev); 65275970Scystatic void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_); 66275970Scy 67275970Scyvoid 68275970Scybufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 69275970Scy{ 70275970Scy struct bufferevent_private *bufev_private = 71275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 72275970Scy BEV_LOCK(bufev); 73275970Scy if (!bufev_private->read_suspended) 74275970Scy bufev->be_ops->disable(bufev, EV_READ); 75275970Scy bufev_private->read_suspended |= what; 76275970Scy BEV_UNLOCK(bufev); 77275970Scy} 78275970Scy 79275970Scyvoid 80275970Scybufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) 81275970Scy{ 82275970Scy struct bufferevent_private *bufev_private = 83275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 84275970Scy BEV_LOCK(bufev); 85275970Scy bufev_private->read_suspended &= ~what; 86275970Scy if (!bufev_private->read_suspended && (bufev->enabled & EV_READ)) 87275970Scy bufev->be_ops->enable(bufev, EV_READ); 88275970Scy BEV_UNLOCK(bufev); 89275970Scy} 90275970Scy 91275970Scyvoid 92275970Scybufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 93275970Scy{ 94275970Scy struct bufferevent_private *bufev_private = 95275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 96275970Scy BEV_LOCK(bufev); 97275970Scy if (!bufev_private->write_suspended) 98275970Scy bufev->be_ops->disable(bufev, EV_WRITE); 99275970Scy bufev_private->write_suspended |= what; 100275970Scy BEV_UNLOCK(bufev); 101275970Scy} 102275970Scy 103275970Scyvoid 104275970Scybufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what) 105275970Scy{ 106275970Scy struct bufferevent_private *bufev_private = 107275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 108275970Scy BEV_LOCK(bufev); 109275970Scy bufev_private->write_suspended &= ~what; 110275970Scy if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE)) 111275970Scy bufev->be_ops->enable(bufev, EV_WRITE); 112275970Scy BEV_UNLOCK(bufev); 113275970Scy} 114275970Scy 115275970Scy 116275970Scy/* Callback to implement watermarks on the input buffer. Only enabled 117275970Scy * if the watermark is set. */ 118275970Scystatic void 119275970Scybufferevent_inbuf_wm_cb(struct evbuffer *buf, 120275970Scy const struct evbuffer_cb_info *cbinfo, 121275970Scy void *arg) 122275970Scy{ 123275970Scy struct bufferevent *bufev = arg; 124275970Scy size_t size; 125275970Scy 126275970Scy size = evbuffer_get_length(buf); 127275970Scy 128275970Scy if (size >= bufev->wm_read.high) 129275970Scy bufferevent_wm_suspend_read(bufev); 130275970Scy else 131275970Scy bufferevent_wm_unsuspend_read(bufev); 132275970Scy} 133275970Scy 134275970Scystatic void 135275970Scybufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg) 136275970Scy{ 137275970Scy struct bufferevent_private *bufev_private = arg; 138275970Scy struct bufferevent *bufev = &bufev_private->bev; 139275970Scy 140275970Scy BEV_LOCK(bufev); 141275970Scy if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 142275970Scy bufev->errorcb) { 143275970Scy /* The "connected" happened before any reads or writes, so 144275970Scy send it first. */ 145275970Scy bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 146275970Scy bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg); 147275970Scy } 148275970Scy if (bufev_private->readcb_pending && bufev->readcb) { 149275970Scy bufev_private->readcb_pending = 0; 150275970Scy bufev->readcb(bufev, bufev->cbarg); 151275970Scy } 152275970Scy if (bufev_private->writecb_pending && bufev->writecb) { 153275970Scy bufev_private->writecb_pending = 0; 154275970Scy bufev->writecb(bufev, bufev->cbarg); 155275970Scy } 156275970Scy if (bufev_private->eventcb_pending && bufev->errorcb) { 157275970Scy short what = bufev_private->eventcb_pending; 158275970Scy int err = bufev_private->errno_pending; 159275970Scy bufev_private->eventcb_pending = 0; 160275970Scy bufev_private->errno_pending = 0; 161275970Scy EVUTIL_SET_SOCKET_ERROR(err); 162275970Scy bufev->errorcb(bufev, what, bufev->cbarg); 163275970Scy } 164275970Scy bufferevent_decref_and_unlock_(bufev); 165275970Scy} 166275970Scy 167275970Scystatic void 168275970Scybufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg) 169275970Scy{ 170275970Scy struct bufferevent_private *bufev_private = arg; 171275970Scy struct bufferevent *bufev = &bufev_private->bev; 172275970Scy 173275970Scy BEV_LOCK(bufev); 174275970Scy#define UNLOCKED(stmt) \ 175275970Scy do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0) 176275970Scy 177275970Scy if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 178275970Scy bufev->errorcb) { 179275970Scy /* The "connected" happened before any reads or writes, so 180275970Scy send it first. */ 181275970Scy bufferevent_event_cb errorcb = bufev->errorcb; 182275970Scy void *cbarg = bufev->cbarg; 183275970Scy bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 184275970Scy UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg)); 185275970Scy } 186275970Scy if (bufev_private->readcb_pending && bufev->readcb) { 187275970Scy bufferevent_data_cb readcb = bufev->readcb; 188275970Scy void *cbarg = bufev->cbarg; 189275970Scy bufev_private->readcb_pending = 0; 190275970Scy UNLOCKED(readcb(bufev, cbarg)); 191275970Scy } 192275970Scy if (bufev_private->writecb_pending && bufev->writecb) { 193275970Scy bufferevent_data_cb writecb = bufev->writecb; 194275970Scy void *cbarg = bufev->cbarg; 195275970Scy bufev_private->writecb_pending = 0; 196275970Scy UNLOCKED(writecb(bufev, cbarg)); 197275970Scy } 198275970Scy if (bufev_private->eventcb_pending && bufev->errorcb) { 199275970Scy bufferevent_event_cb errorcb = bufev->errorcb; 200275970Scy void *cbarg = bufev->cbarg; 201275970Scy short what = bufev_private->eventcb_pending; 202275970Scy int err = bufev_private->errno_pending; 203275970Scy bufev_private->eventcb_pending = 0; 204275970Scy bufev_private->errno_pending = 0; 205275970Scy EVUTIL_SET_SOCKET_ERROR(err); 206275970Scy UNLOCKED(errorcb(bufev,what,cbarg)); 207275970Scy } 208275970Scy bufferevent_decref_and_unlock_(bufev); 209275970Scy#undef UNLOCKED 210275970Scy} 211275970Scy 212275970Scy#define SCHEDULE_DEFERRED(bevp) \ 213275970Scy do { \ 214275970Scy if (event_deferred_cb_schedule_( \ 215275970Scy (bevp)->bev.ev_base, \ 216275970Scy &(bevp)->deferred)) \ 217275970Scy bufferevent_incref_(&(bevp)->bev); \ 218275970Scy } while (0) 219275970Scy 220275970Scy 221275970Scyvoid 222275970Scybufferevent_run_readcb_(struct bufferevent *bufev, int options) 223275970Scy{ 224275970Scy /* Requires that we hold the lock and a reference */ 225275970Scy struct bufferevent_private *p = 226275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 227275970Scy if (bufev->readcb == NULL) 228275970Scy return; 229275970Scy if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 230275970Scy p->readcb_pending = 1; 231275970Scy SCHEDULE_DEFERRED(p); 232275970Scy } else { 233275970Scy bufev->readcb(bufev, bufev->cbarg); 234275970Scy } 235275970Scy} 236275970Scy 237275970Scyvoid 238275970Scybufferevent_run_writecb_(struct bufferevent *bufev, int options) 239275970Scy{ 240275970Scy /* Requires that we hold the lock and a reference */ 241275970Scy struct bufferevent_private *p = 242275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 243275970Scy if (bufev->writecb == NULL) 244275970Scy return; 245275970Scy if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 246275970Scy p->writecb_pending = 1; 247275970Scy SCHEDULE_DEFERRED(p); 248275970Scy } else { 249275970Scy bufev->writecb(bufev, bufev->cbarg); 250275970Scy } 251275970Scy} 252275970Scy 253275970Scy#define BEV_TRIG_ALL_OPTS ( \ 254275970Scy BEV_TRIG_IGNORE_WATERMARKS| \ 255275970Scy BEV_TRIG_DEFER_CALLBACKS \ 256275970Scy ) 257275970Scy 258275970Scyvoid 259275970Scybufferevent_trigger(struct bufferevent *bufev, short iotype, int options) 260275970Scy{ 261275970Scy bufferevent_incref_and_lock_(bufev); 262275970Scy bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS); 263275970Scy bufferevent_decref_and_unlock_(bufev); 264275970Scy} 265275970Scy 266275970Scyvoid 267275970Scybufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options) 268275970Scy{ 269275970Scy /* Requires that we hold the lock and a reference */ 270275970Scy struct bufferevent_private *p = 271275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 272275970Scy if (bufev->errorcb == NULL) 273275970Scy return; 274275970Scy if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) { 275275970Scy p->eventcb_pending |= what; 276275970Scy p->errno_pending = EVUTIL_SOCKET_ERROR(); 277275970Scy SCHEDULE_DEFERRED(p); 278275970Scy } else { 279275970Scy bufev->errorcb(bufev, what, bufev->cbarg); 280275970Scy } 281275970Scy} 282275970Scy 283275970Scyvoid 284275970Scybufferevent_trigger_event(struct bufferevent *bufev, short what, int options) 285275970Scy{ 286275970Scy bufferevent_incref_and_lock_(bufev); 287275970Scy bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS); 288275970Scy bufferevent_decref_and_unlock_(bufev); 289275970Scy} 290275970Scy 291275970Scyint 292275970Scybufferevent_init_common_(struct bufferevent_private *bufev_private, 293275970Scy struct event_base *base, 294275970Scy const struct bufferevent_ops *ops, 295275970Scy enum bufferevent_options options) 296275970Scy{ 297275970Scy struct bufferevent *bufev = &bufev_private->bev; 298275970Scy 299275970Scy if (!bufev->input) { 300275970Scy if ((bufev->input = evbuffer_new()) == NULL) 301275970Scy return -1; 302275970Scy } 303275970Scy 304275970Scy if (!bufev->output) { 305275970Scy if ((bufev->output = evbuffer_new()) == NULL) { 306275970Scy evbuffer_free(bufev->input); 307275970Scy return -1; 308275970Scy } 309275970Scy } 310275970Scy 311275970Scy bufev_private->refcnt = 1; 312275970Scy bufev->ev_base = base; 313275970Scy 314275970Scy /* Disable timeouts. */ 315275970Scy evutil_timerclear(&bufev->timeout_read); 316275970Scy evutil_timerclear(&bufev->timeout_write); 317275970Scy 318275970Scy bufev->be_ops = ops; 319275970Scy 320275970Scy bufferevent_ratelim_init_(bufev_private); 321275970Scy 322275970Scy /* 323275970Scy * Set to EV_WRITE so that using bufferevent_write is going to 324275970Scy * trigger a callback. Reading needs to be explicitly enabled 325275970Scy * because otherwise no data will be available. 326275970Scy */ 327275970Scy bufev->enabled = EV_WRITE; 328275970Scy 329275970Scy#ifndef EVENT__DISABLE_THREAD_SUPPORT 330275970Scy if (options & BEV_OPT_THREADSAFE) { 331275970Scy if (bufferevent_enable_locking_(bufev, NULL) < 0) { 332275970Scy /* cleanup */ 333275970Scy evbuffer_free(bufev->input); 334275970Scy evbuffer_free(bufev->output); 335275970Scy bufev->input = NULL; 336275970Scy bufev->output = NULL; 337275970Scy return -1; 338275970Scy } 339275970Scy } 340275970Scy#endif 341275970Scy if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) 342275970Scy == BEV_OPT_UNLOCK_CALLBACKS) { 343275970Scy event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); 344275970Scy return -1; 345275970Scy } 346275970Scy if (options & BEV_OPT_UNLOCK_CALLBACKS) 347275970Scy event_deferred_cb_init_( 348275970Scy &bufev_private->deferred, 349275970Scy event_base_get_npriorities(base) / 2, 350275970Scy bufferevent_run_deferred_callbacks_unlocked, 351275970Scy bufev_private); 352275970Scy else 353275970Scy event_deferred_cb_init_( 354275970Scy &bufev_private->deferred, 355275970Scy event_base_get_npriorities(base) / 2, 356275970Scy bufferevent_run_deferred_callbacks_locked, 357275970Scy bufev_private); 358275970Scy 359275970Scy bufev_private->options = options; 360275970Scy 361275970Scy evbuffer_set_parent_(bufev->input, bufev); 362275970Scy evbuffer_set_parent_(bufev->output, bufev); 363275970Scy 364275970Scy return 0; 365275970Scy} 366275970Scy 367275970Scyvoid 368275970Scybufferevent_setcb(struct bufferevent *bufev, 369275970Scy bufferevent_data_cb readcb, bufferevent_data_cb writecb, 370275970Scy bufferevent_event_cb eventcb, void *cbarg) 371275970Scy{ 372275970Scy BEV_LOCK(bufev); 373275970Scy 374275970Scy bufev->readcb = readcb; 375275970Scy bufev->writecb = writecb; 376275970Scy bufev->errorcb = eventcb; 377275970Scy 378275970Scy bufev->cbarg = cbarg; 379275970Scy BEV_UNLOCK(bufev); 380275970Scy} 381275970Scy 382275970Scyvoid 383275970Scybufferevent_getcb(struct bufferevent *bufev, 384275970Scy bufferevent_data_cb *readcb_ptr, 385275970Scy bufferevent_data_cb *writecb_ptr, 386275970Scy bufferevent_event_cb *eventcb_ptr, 387275970Scy void **cbarg_ptr) 388275970Scy{ 389275970Scy BEV_LOCK(bufev); 390275970Scy if (readcb_ptr) 391275970Scy *readcb_ptr = bufev->readcb; 392275970Scy if (writecb_ptr) 393275970Scy *writecb_ptr = bufev->writecb; 394275970Scy if (eventcb_ptr) 395275970Scy *eventcb_ptr = bufev->errorcb; 396275970Scy if (cbarg_ptr) 397275970Scy *cbarg_ptr = bufev->cbarg; 398275970Scy 399275970Scy BEV_UNLOCK(bufev); 400275970Scy} 401275970Scy 402275970Scystruct evbuffer * 403275970Scybufferevent_get_input(struct bufferevent *bufev) 404275970Scy{ 405275970Scy return bufev->input; 406275970Scy} 407275970Scy 408275970Scystruct evbuffer * 409275970Scybufferevent_get_output(struct bufferevent *bufev) 410275970Scy{ 411275970Scy return bufev->output; 412275970Scy} 413275970Scy 414275970Scystruct event_base * 415275970Scybufferevent_get_base(struct bufferevent *bufev) 416275970Scy{ 417275970Scy return bufev->ev_base; 418275970Scy} 419275970Scy 420275970Scyint 421275970Scybufferevent_get_priority(const struct bufferevent *bufev) 422275970Scy{ 423275970Scy if (event_initialized(&bufev->ev_read)) { 424275970Scy return event_get_priority(&bufev->ev_read); 425275970Scy } else { 426275970Scy return event_base_get_npriorities(bufev->ev_base) / 2; 427275970Scy } 428275970Scy} 429275970Scy 430275970Scyint 431275970Scybufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 432275970Scy{ 433275970Scy if (evbuffer_add(bufev->output, data, size) == -1) 434275970Scy return (-1); 435275970Scy 436275970Scy return 0; 437275970Scy} 438275970Scy 439275970Scyint 440275970Scybufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 441275970Scy{ 442275970Scy if (evbuffer_add_buffer(bufev->output, buf) == -1) 443275970Scy return (-1); 444275970Scy 445275970Scy return 0; 446275970Scy} 447275970Scy 448275970Scysize_t 449275970Scybufferevent_read(struct bufferevent *bufev, void *data, size_t size) 450275970Scy{ 451275970Scy return (evbuffer_remove(bufev->input, data, size)); 452275970Scy} 453275970Scy 454275970Scyint 455275970Scybufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) 456275970Scy{ 457275970Scy return (evbuffer_add_buffer(buf, bufev->input)); 458275970Scy} 459275970Scy 460275970Scyint 461275970Scybufferevent_enable(struct bufferevent *bufev, short event) 462275970Scy{ 463275970Scy struct bufferevent_private *bufev_private = 464275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 465275970Scy short impl_events = event; 466275970Scy int r = 0; 467275970Scy 468275970Scy bufferevent_incref_and_lock_(bufev); 469275970Scy if (bufev_private->read_suspended) 470275970Scy impl_events &= ~EV_READ; 471275970Scy if (bufev_private->write_suspended) 472275970Scy impl_events &= ~EV_WRITE; 473275970Scy 474275970Scy bufev->enabled |= event; 475275970Scy 476275970Scy if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) 477275970Scy r = -1; 478275970Scy 479275970Scy bufferevent_decref_and_unlock_(bufev); 480275970Scy return r; 481275970Scy} 482275970Scy 483275970Scyint 484275970Scybufferevent_set_timeouts(struct bufferevent *bufev, 485275970Scy const struct timeval *tv_read, 486275970Scy const struct timeval *tv_write) 487275970Scy{ 488275970Scy int r = 0; 489275970Scy BEV_LOCK(bufev); 490275970Scy if (tv_read) { 491275970Scy bufev->timeout_read = *tv_read; 492275970Scy } else { 493275970Scy evutil_timerclear(&bufev->timeout_read); 494275970Scy } 495275970Scy if (tv_write) { 496275970Scy bufev->timeout_write = *tv_write; 497275970Scy } else { 498275970Scy evutil_timerclear(&bufev->timeout_write); 499275970Scy } 500275970Scy 501275970Scy if (bufev->be_ops->adj_timeouts) 502275970Scy r = bufev->be_ops->adj_timeouts(bufev); 503275970Scy BEV_UNLOCK(bufev); 504275970Scy 505275970Scy return r; 506275970Scy} 507275970Scy 508275970Scy 509275970Scy/* Obsolete; use bufferevent_set_timeouts */ 510275970Scyvoid 511275970Scybufferevent_settimeout(struct bufferevent *bufev, 512275970Scy int timeout_read, int timeout_write) 513275970Scy{ 514275970Scy struct timeval tv_read, tv_write; 515275970Scy struct timeval *ptv_read = NULL, *ptv_write = NULL; 516275970Scy 517275970Scy memset(&tv_read, 0, sizeof(tv_read)); 518275970Scy memset(&tv_write, 0, sizeof(tv_write)); 519275970Scy 520275970Scy if (timeout_read) { 521275970Scy tv_read.tv_sec = timeout_read; 522275970Scy ptv_read = &tv_read; 523275970Scy } 524275970Scy if (timeout_write) { 525275970Scy tv_write.tv_sec = timeout_write; 526275970Scy ptv_write = &tv_write; 527275970Scy } 528275970Scy 529275970Scy bufferevent_set_timeouts(bufev, ptv_read, ptv_write); 530275970Scy} 531275970Scy 532275970Scy 533275970Scyint 534275970Scybufferevent_disable_hard_(struct bufferevent *bufev, short event) 535275970Scy{ 536275970Scy int r = 0; 537275970Scy struct bufferevent_private *bufev_private = 538275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 539275970Scy 540275970Scy BEV_LOCK(bufev); 541275970Scy bufev->enabled &= ~event; 542275970Scy 543275970Scy bufev_private->connecting = 0; 544275970Scy if (bufev->be_ops->disable(bufev, event) < 0) 545275970Scy r = -1; 546275970Scy 547275970Scy BEV_UNLOCK(bufev); 548275970Scy return r; 549275970Scy} 550275970Scy 551275970Scyint 552275970Scybufferevent_disable(struct bufferevent *bufev, short event) 553275970Scy{ 554275970Scy int r = 0; 555275970Scy 556275970Scy BEV_LOCK(bufev); 557275970Scy bufev->enabled &= ~event; 558275970Scy 559275970Scy if (bufev->be_ops->disable(bufev, event) < 0) 560275970Scy r = -1; 561275970Scy 562275970Scy BEV_UNLOCK(bufev); 563275970Scy return r; 564275970Scy} 565275970Scy 566275970Scy/* 567275970Scy * Sets the water marks 568275970Scy */ 569275970Scy 570275970Scyvoid 571275970Scybufferevent_setwatermark(struct bufferevent *bufev, short events, 572275970Scy size_t lowmark, size_t highmark) 573275970Scy{ 574275970Scy struct bufferevent_private *bufev_private = 575275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 576275970Scy 577275970Scy BEV_LOCK(bufev); 578275970Scy if (events & EV_WRITE) { 579275970Scy bufev->wm_write.low = lowmark; 580275970Scy bufev->wm_write.high = highmark; 581275970Scy } 582275970Scy 583275970Scy if (events & EV_READ) { 584275970Scy bufev->wm_read.low = lowmark; 585275970Scy bufev->wm_read.high = highmark; 586275970Scy 587275970Scy if (highmark) { 588275970Scy /* There is now a new high-water mark for read. 589275970Scy enable the callback if needed, and see if we should 590275970Scy suspend/bufferevent_wm_unsuspend. */ 591275970Scy 592275970Scy if (bufev_private->read_watermarks_cb == NULL) { 593275970Scy bufev_private->read_watermarks_cb = 594275970Scy evbuffer_add_cb(bufev->input, 595275970Scy bufferevent_inbuf_wm_cb, 596275970Scy bufev); 597275970Scy } 598275970Scy evbuffer_cb_set_flags(bufev->input, 599275970Scy bufev_private->read_watermarks_cb, 600275970Scy EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER); 601275970Scy 602290000Sglebius if (evbuffer_get_length(bufev->input) >= highmark) 603275970Scy bufferevent_wm_suspend_read(bufev); 604275970Scy else if (evbuffer_get_length(bufev->input) < highmark) 605275970Scy bufferevent_wm_unsuspend_read(bufev); 606275970Scy } else { 607275970Scy /* There is now no high-water mark for read. */ 608275970Scy if (bufev_private->read_watermarks_cb) 609275970Scy evbuffer_cb_clear_flags(bufev->input, 610275970Scy bufev_private->read_watermarks_cb, 611275970Scy EVBUFFER_CB_ENABLED); 612275970Scy bufferevent_wm_unsuspend_read(bufev); 613275970Scy } 614275970Scy } 615275970Scy BEV_UNLOCK(bufev); 616275970Scy} 617275970Scy 618290000Sglebiusint 619275970Scybufferevent_getwatermark(struct bufferevent *bufev, short events, 620275970Scy size_t *lowmark, size_t *highmark) 621275970Scy{ 622275970Scy if (events == EV_WRITE) { 623290000Sglebius BEV_LOCK(bufev); 624275970Scy if (lowmark) 625275970Scy *lowmark = bufev->wm_write.low; 626275970Scy if (highmark) 627275970Scy *highmark = bufev->wm_write.high; 628290000Sglebius BEV_UNLOCK(bufev); 629290000Sglebius return 0; 630275970Scy } 631275970Scy 632275970Scy if (events == EV_READ) { 633290000Sglebius BEV_LOCK(bufev); 634275970Scy if (lowmark) 635275970Scy *lowmark = bufev->wm_read.low; 636275970Scy if (highmark) 637275970Scy *highmark = bufev->wm_read.high; 638290000Sglebius BEV_UNLOCK(bufev); 639290000Sglebius return 0; 640275970Scy } 641290000Sglebius return -1; 642275970Scy} 643275970Scy 644275970Scyint 645275970Scybufferevent_flush(struct bufferevent *bufev, 646275970Scy short iotype, 647275970Scy enum bufferevent_flush_mode mode) 648275970Scy{ 649275970Scy int r = -1; 650275970Scy BEV_LOCK(bufev); 651275970Scy if (bufev->be_ops->flush) 652275970Scy r = bufev->be_ops->flush(bufev, iotype, mode); 653275970Scy BEV_UNLOCK(bufev); 654275970Scy return r; 655275970Scy} 656275970Scy 657275970Scyvoid 658275970Scybufferevent_incref_and_lock_(struct bufferevent *bufev) 659275970Scy{ 660275970Scy struct bufferevent_private *bufev_private = 661275970Scy BEV_UPCAST(bufev); 662275970Scy BEV_LOCK(bufev); 663275970Scy ++bufev_private->refcnt; 664275970Scy} 665275970Scy 666275970Scy#if 0 667275970Scystatic void 668275970Scybufferevent_transfer_lock_ownership_(struct bufferevent *donor, 669275970Scy struct bufferevent *recipient) 670275970Scy{ 671275970Scy struct bufferevent_private *d = BEV_UPCAST(donor); 672275970Scy struct bufferevent_private *r = BEV_UPCAST(recipient); 673275970Scy if (d->lock != r->lock) 674275970Scy return; 675275970Scy if (r->own_lock) 676275970Scy return; 677275970Scy if (d->own_lock) { 678275970Scy d->own_lock = 0; 679275970Scy r->own_lock = 1; 680275970Scy } 681275970Scy} 682275970Scy#endif 683275970Scy 684275970Scyint 685275970Scybufferevent_decref_and_unlock_(struct bufferevent *bufev) 686275970Scy{ 687275970Scy struct bufferevent_private *bufev_private = 688275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 689275970Scy int n_cbs = 0; 690275970Scy#define MAX_CBS 16 691275970Scy struct event_callback *cbs[MAX_CBS]; 692275970Scy 693275970Scy EVUTIL_ASSERT(bufev_private->refcnt > 0); 694275970Scy 695275970Scy if (--bufev_private->refcnt) { 696275970Scy BEV_UNLOCK(bufev); 697275970Scy return 0; 698275970Scy } 699275970Scy 700275970Scy if (bufev->be_ops->unlink) 701275970Scy bufev->be_ops->unlink(bufev); 702275970Scy 703275970Scy /* Okay, we're out of references. Let's finalize this once all the 704275970Scy * callbacks are done running. */ 705275970Scy cbs[0] = &bufev->ev_read.ev_evcallback; 706275970Scy cbs[1] = &bufev->ev_write.ev_evcallback; 707275970Scy cbs[2] = &bufev_private->deferred; 708275970Scy n_cbs = 3; 709275970Scy if (bufev_private->rate_limiting) { 710275970Scy struct event *e = &bufev_private->rate_limiting->refill_bucket_event; 711275970Scy if (event_initialized(e)) 712275970Scy cbs[n_cbs++] = &e->ev_evcallback; 713275970Scy } 714275970Scy n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs); 715275970Scy n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs); 716275970Scy 717275970Scy event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs, 718275970Scy bufferevent_finalize_cb_); 719275970Scy 720275970Scy#undef MAX_CBS 721275970Scy BEV_UNLOCK(bufev); 722275970Scy 723275970Scy return 1; 724275970Scy} 725275970Scy 726275970Scystatic void 727275970Scybufferevent_finalize_cb_(struct event_callback *evcb, void *arg_) 728275970Scy{ 729275970Scy struct bufferevent *bufev = arg_; 730275970Scy struct bufferevent *underlying; 731275970Scy struct bufferevent_private *bufev_private = 732275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 733275970Scy 734275970Scy BEV_LOCK(bufev); 735275970Scy underlying = bufferevent_get_underlying(bufev); 736275970Scy 737275970Scy /* Clean up the shared info */ 738275970Scy if (bufev->be_ops->destruct) 739275970Scy bufev->be_ops->destruct(bufev); 740275970Scy 741275970Scy /* XXX what happens if refcnt for these buffers is > 1? 742275970Scy * The buffers can share a lock with this bufferevent object, 743275970Scy * but the lock might be destroyed below. */ 744275970Scy /* evbuffer will free the callbacks */ 745275970Scy evbuffer_free(bufev->input); 746275970Scy evbuffer_free(bufev->output); 747275970Scy 748275970Scy if (bufev_private->rate_limiting) { 749275970Scy if (bufev_private->rate_limiting->group) 750275970Scy bufferevent_remove_from_rate_limit_group_internal_(bufev,0); 751275970Scy mm_free(bufev_private->rate_limiting); 752275970Scy bufev_private->rate_limiting = NULL; 753275970Scy } 754275970Scy 755275970Scy 756275970Scy BEV_UNLOCK(bufev); 757275970Scy 758275970Scy if (bufev_private->own_lock) 759275970Scy EVTHREAD_FREE_LOCK(bufev_private->lock, 760275970Scy EVTHREAD_LOCKTYPE_RECURSIVE); 761275970Scy 762275970Scy /* Free the actual allocated memory. */ 763275970Scy mm_free(((char*)bufev) - bufev->be_ops->mem_offset); 764275970Scy 765275970Scy /* Release the reference to underlying now that we no longer need the 766275970Scy * reference to it. We wait this long mainly in case our lock is 767275970Scy * shared with underlying. 768275970Scy * 769275970Scy * The 'destruct' function will also drop a reference to underlying 770275970Scy * if BEV_OPT_CLOSE_ON_FREE is set. 771275970Scy * 772275970Scy * XXX Should we/can we just refcount evbuffer/bufferevent locks? 773275970Scy * It would probably save us some headaches. 774275970Scy */ 775275970Scy if (underlying) 776275970Scy bufferevent_decref_(underlying); 777275970Scy} 778275970Scy 779275970Scyint 780275970Scybufferevent_decref_(struct bufferevent *bufev) 781275970Scy{ 782275970Scy BEV_LOCK(bufev); 783275970Scy return bufferevent_decref_and_unlock_(bufev); 784275970Scy} 785275970Scy 786275970Scyvoid 787275970Scybufferevent_free(struct bufferevent *bufev) 788275970Scy{ 789275970Scy BEV_LOCK(bufev); 790275970Scy bufferevent_setcb(bufev, NULL, NULL, NULL, NULL); 791275970Scy bufferevent_cancel_all_(bufev); 792275970Scy bufferevent_decref_and_unlock_(bufev); 793275970Scy} 794275970Scy 795275970Scyvoid 796275970Scybufferevent_incref_(struct bufferevent *bufev) 797275970Scy{ 798275970Scy struct bufferevent_private *bufev_private = 799275970Scy EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 800275970Scy 801275970Scy BEV_LOCK(bufev); 802275970Scy ++bufev_private->refcnt; 803275970Scy BEV_UNLOCK(bufev); 804275970Scy} 805275970Scy 806275970Scyint 807275970Scybufferevent_enable_locking_(struct bufferevent *bufev, void *lock) 808275970Scy{ 809275970Scy#ifdef EVENT__DISABLE_THREAD_SUPPORT 810275970Scy return -1; 811275970Scy#else 812275970Scy struct bufferevent *underlying; 813275970Scy 814275970Scy if (BEV_UPCAST(bufev)->lock) 815275970Scy return -1; 816275970Scy underlying = bufferevent_get_underlying(bufev); 817275970Scy 818275970Scy if (!lock && underlying && BEV_UPCAST(underlying)->lock) { 819275970Scy lock = BEV_UPCAST(underlying)->lock; 820275970Scy BEV_UPCAST(bufev)->lock = lock; 821275970Scy BEV_UPCAST(bufev)->own_lock = 0; 822275970Scy } else if (!lock) { 823275970Scy EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE); 824275970Scy if (!lock) 825275970Scy return -1; 826275970Scy BEV_UPCAST(bufev)->lock = lock; 827275970Scy BEV_UPCAST(bufev)->own_lock = 1; 828275970Scy } else { 829275970Scy BEV_UPCAST(bufev)->lock = lock; 830275970Scy BEV_UPCAST(bufev)->own_lock = 0; 831275970Scy } 832275970Scy evbuffer_enable_locking(bufev->input, lock); 833275970Scy evbuffer_enable_locking(bufev->output, lock); 834275970Scy 835275970Scy if (underlying && !BEV_UPCAST(underlying)->lock) 836275970Scy bufferevent_enable_locking_(underlying, lock); 837275970Scy 838275970Scy return 0; 839275970Scy#endif 840275970Scy} 841275970Scy 842275970Scyint 843275970Scybufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd) 844275970Scy{ 845275970Scy union bufferevent_ctrl_data d; 846275970Scy int res = -1; 847275970Scy d.fd = fd; 848275970Scy BEV_LOCK(bev); 849275970Scy if (bev->be_ops->ctrl) 850275970Scy res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); 851275970Scy BEV_UNLOCK(bev); 852275970Scy return res; 853275970Scy} 854275970Scy 855275970Scyevutil_socket_t 856275970Scybufferevent_getfd(struct bufferevent *bev) 857275970Scy{ 858275970Scy union bufferevent_ctrl_data d; 859275970Scy int res = -1; 860275970Scy d.fd = -1; 861275970Scy BEV_LOCK(bev); 862275970Scy if (bev->be_ops->ctrl) 863275970Scy res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); 864275970Scy BEV_UNLOCK(bev); 865275970Scy return (res<0) ? -1 : d.fd; 866275970Scy} 867275970Scy 868275970Scyenum bufferevent_options 869275970Scybufferevent_get_options_(struct bufferevent *bev) 870275970Scy{ 871275970Scy struct bufferevent_private *bev_p = 872275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 873275970Scy enum bufferevent_options options; 874275970Scy 875275970Scy BEV_LOCK(bev); 876275970Scy options = bev_p->options; 877275970Scy BEV_UNLOCK(bev); 878275970Scy return options; 879275970Scy} 880275970Scy 881275970Scy 882275970Scystatic void 883275970Scybufferevent_cancel_all_(struct bufferevent *bev) 884275970Scy{ 885275970Scy union bufferevent_ctrl_data d; 886275970Scy memset(&d, 0, sizeof(d)); 887275970Scy BEV_LOCK(bev); 888275970Scy if (bev->be_ops->ctrl) 889275970Scy bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d); 890275970Scy BEV_UNLOCK(bev); 891275970Scy} 892275970Scy 893275970Scyshort 894275970Scybufferevent_get_enabled(struct bufferevent *bufev) 895275970Scy{ 896275970Scy short r; 897275970Scy BEV_LOCK(bufev); 898275970Scy r = bufev->enabled; 899275970Scy BEV_UNLOCK(bufev); 900275970Scy return r; 901275970Scy} 902275970Scy 903275970Scystruct bufferevent * 904275970Scybufferevent_get_underlying(struct bufferevent *bev) 905275970Scy{ 906275970Scy union bufferevent_ctrl_data d; 907275970Scy int res = -1; 908275970Scy d.ptr = NULL; 909275970Scy BEV_LOCK(bev); 910275970Scy if (bev->be_ops->ctrl) 911275970Scy res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d); 912275970Scy BEV_UNLOCK(bev); 913275970Scy return (res<0) ? NULL : d.ptr; 914275970Scy} 915275970Scy 916275970Scystatic void 917275970Scybufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) 918275970Scy{ 919275970Scy struct bufferevent *bev = ctx; 920275970Scy bufferevent_incref_and_lock_(bev); 921275970Scy bufferevent_disable(bev, EV_READ); 922275970Scy bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0); 923275970Scy bufferevent_decref_and_unlock_(bev); 924275970Scy} 925275970Scystatic void 926275970Scybufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) 927275970Scy{ 928275970Scy struct bufferevent *bev = ctx; 929275970Scy bufferevent_incref_and_lock_(bev); 930275970Scy bufferevent_disable(bev, EV_WRITE); 931275970Scy bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0); 932275970Scy bufferevent_decref_and_unlock_(bev); 933275970Scy} 934275970Scy 935275970Scyvoid 936275970Scybufferevent_init_generic_timeout_cbs_(struct bufferevent *bev) 937275970Scy{ 938275970Scy event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE, 939275970Scy bufferevent_generic_read_timeout_cb, bev); 940275970Scy event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE, 941275970Scy bufferevent_generic_write_timeout_cb, bev); 942275970Scy} 943275970Scy 944275970Scyint 945275970Scybufferevent_generic_adj_timeouts_(struct bufferevent *bev) 946275970Scy{ 947275970Scy const short enabled = bev->enabled; 948275970Scy struct bufferevent_private *bev_p = 949275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 950275970Scy int r1=0, r2=0; 951275970Scy if ((enabled & EV_READ) && !bev_p->read_suspended && 952275970Scy evutil_timerisset(&bev->timeout_read)) 953275970Scy r1 = event_add(&bev->ev_read, &bev->timeout_read); 954275970Scy else 955275970Scy r1 = event_del(&bev->ev_read); 956275970Scy 957275970Scy if ((enabled & EV_WRITE) && !bev_p->write_suspended && 958275970Scy evutil_timerisset(&bev->timeout_write) && 959275970Scy evbuffer_get_length(bev->output)) 960275970Scy r2 = event_add(&bev->ev_write, &bev->timeout_write); 961275970Scy else 962275970Scy r2 = event_del(&bev->ev_write); 963275970Scy if (r1 < 0 || r2 < 0) 964275970Scy return -1; 965275970Scy return 0; 966275970Scy} 967275970Scy 968275970Scyint 969275970Scybufferevent_add_event_(struct event *ev, const struct timeval *tv) 970275970Scy{ 971275970Scy if (tv->tv_sec == 0 && tv->tv_usec == 0) 972275970Scy return event_add(ev, NULL); 973275970Scy else 974275970Scy return event_add(ev, tv); 975275970Scy} 976275970Scy 977275970Scy/* For use by user programs only; internally, we should be calling 978275970Scy either bufferevent_incref_and_lock_(), or BEV_LOCK. */ 979275970Scyvoid 980275970Scybufferevent_lock(struct bufferevent *bev) 981275970Scy{ 982275970Scy bufferevent_incref_and_lock_(bev); 983275970Scy} 984275970Scy 985275970Scyvoid 986275970Scybufferevent_unlock(struct bufferevent *bev) 987275970Scy{ 988275970Scy bufferevent_decref_and_unlock_(bev); 989275970Scy} 990