1275970Scy/* 2275970Scy * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 3275970Scy * 4275970Scy * Redistribution and use in source and binary forms, with or without 5275970Scy * modification, are permitted provided that the following conditions 6275970Scy * are met: 7275970Scy * 1. Redistributions of source code must retain the above copyright 8275970Scy * notice, this list of conditions and the following disclaimer. 9275970Scy * 2. Redistributions in binary form must reproduce the above copyright 10275970Scy * notice, this list of conditions and the following disclaimer in the 11275970Scy * documentation and/or other materials provided with the distribution. 12275970Scy * 3. The name of the author may not be used to endorse or promote products 13275970Scy * derived from this software without specific prior written permission. 14275970Scy * 15275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25275970Scy */ 26275970Scy#include "event2/event-config.h" 27275970Scy#include "evconfig-private.h" 28275970Scy 29275970Scy#include <sys/types.h> 30275970Scy 31275970Scy#ifdef _WIN32 32275970Scy#include <winsock2.h> 33275970Scy#endif 34275970Scy 35275970Scy#include "event2/util.h" 36275970Scy#include "event2/buffer.h" 37275970Scy#include "event2/bufferevent.h" 38275970Scy#include "event2/bufferevent_struct.h" 39275970Scy#include "event2/event.h" 40275970Scy#include "defer-internal.h" 41275970Scy#include "bufferevent-internal.h" 42275970Scy#include "mm-internal.h" 43275970Scy#include "util-internal.h" 44275970Scy 45275970Scystruct bufferevent_pair { 46275970Scy struct bufferevent_private bev; 47275970Scy struct bufferevent_pair *partner; 48285612Sdelphij /* For ->destruct() lock checking */ 49285612Sdelphij struct bufferevent_pair *unlinked_partner; 50275970Scy}; 51275970Scy 52275970Scy 53275970Scy/* Given a bufferevent that's really a bev part of a bufferevent_pair, 54275970Scy * return that bufferevent_filtered. Returns NULL otherwise.*/ 55275970Scystatic inline struct bufferevent_pair * 56275970Scyupcast(struct bufferevent *bev) 57275970Scy{ 58275970Scy struct bufferevent_pair *bev_p; 59275970Scy if (bev->be_ops != &bufferevent_ops_pair) 60275970Scy return NULL; 61275970Scy bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); 62275970Scy EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair); 63275970Scy return bev_p; 64275970Scy} 65275970Scy 66275970Scy#define downcast(bev_pair) (&(bev_pair)->bev.bev) 67275970Scy 68275970Scystatic inline void 69275970Scyincref_and_lock(struct bufferevent *b) 70275970Scy{ 71275970Scy struct bufferevent_pair *bevp; 72275970Scy bufferevent_incref_and_lock_(b); 73275970Scy bevp = upcast(b); 74275970Scy if (bevp->partner) 75275970Scy bufferevent_incref_and_lock_(downcast(bevp->partner)); 76275970Scy} 77275970Scy 78275970Scystatic inline void 79275970Scydecref_and_unlock(struct bufferevent *b) 80275970Scy{ 81275970Scy struct bufferevent_pair *bevp = upcast(b); 82275970Scy if (bevp->partner) 83275970Scy bufferevent_decref_and_unlock_(downcast(bevp->partner)); 84275970Scy bufferevent_decref_and_unlock_(b); 85275970Scy} 86275970Scy 87275970Scy/* XXX Handle close */ 88275970Scy 89275970Scystatic void be_pair_outbuf_cb(struct evbuffer *, 90275970Scy const struct evbuffer_cb_info *, void *); 91275970Scy 92275970Scystatic struct bufferevent_pair * 93275970Scybufferevent_pair_elt_new(struct event_base *base, 94275970Scy int options) 95275970Scy{ 96275970Scy struct bufferevent_pair *bufev; 97275970Scy if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) 98275970Scy return NULL; 99275970Scy if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair, 100275970Scy options)) { 101275970Scy mm_free(bufev); 102275970Scy return NULL; 103275970Scy } 104275970Scy if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { 105275970Scy bufferevent_free(downcast(bufev)); 106275970Scy return NULL; 107275970Scy } 108275970Scy 109275970Scy bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev); 110275970Scy 111275970Scy return bufev; 112275970Scy} 113275970Scy 114275970Scyint 115275970Scybufferevent_pair_new(struct event_base *base, int options, 116275970Scy struct bufferevent *pair[2]) 117275970Scy{ 118275970Scy struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; 119275970Scy int tmp_options; 120275970Scy 121275970Scy options |= BEV_OPT_DEFER_CALLBACKS; 122275970Scy tmp_options = options & ~BEV_OPT_THREADSAFE; 123275970Scy 124275970Scy bufev1 = bufferevent_pair_elt_new(base, options); 125275970Scy if (!bufev1) 126275970Scy return -1; 127275970Scy bufev2 = bufferevent_pair_elt_new(base, tmp_options); 128275970Scy if (!bufev2) { 129275970Scy bufferevent_free(downcast(bufev1)); 130275970Scy return -1; 131275970Scy } 132275970Scy 133275970Scy if (options & BEV_OPT_THREADSAFE) { 134275970Scy /*XXXX check return */ 135275970Scy bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock); 136275970Scy } 137275970Scy 138275970Scy bufev1->partner = bufev2; 139275970Scy bufev2->partner = bufev1; 140275970Scy 141275970Scy evbuffer_freeze(downcast(bufev1)->input, 0); 142275970Scy evbuffer_freeze(downcast(bufev1)->output, 1); 143275970Scy evbuffer_freeze(downcast(bufev2)->input, 0); 144275970Scy evbuffer_freeze(downcast(bufev2)->output, 1); 145275970Scy 146275970Scy pair[0] = downcast(bufev1); 147275970Scy pair[1] = downcast(bufev2); 148275970Scy 149275970Scy return 0; 150275970Scy} 151275970Scy 152275970Scystatic void 153275970Scybe_pair_transfer(struct bufferevent *src, struct bufferevent *dst, 154275970Scy int ignore_wm) 155275970Scy{ 156275970Scy size_t dst_size; 157275970Scy size_t n; 158275970Scy 159275970Scy evbuffer_unfreeze(src->output, 1); 160275970Scy evbuffer_unfreeze(dst->input, 0); 161275970Scy 162275970Scy if (dst->wm_read.high) { 163275970Scy dst_size = evbuffer_get_length(dst->input); 164275970Scy if (dst_size < dst->wm_read.high) { 165275970Scy n = dst->wm_read.high - dst_size; 166275970Scy evbuffer_remove_buffer(src->output, dst->input, n); 167275970Scy } else { 168275970Scy if (!ignore_wm) 169275970Scy goto done; 170275970Scy n = evbuffer_get_length(src->output); 171275970Scy evbuffer_add_buffer(dst->input, src->output); 172275970Scy } 173275970Scy } else { 174275970Scy n = evbuffer_get_length(src->output); 175275970Scy evbuffer_add_buffer(dst->input, src->output); 176275970Scy } 177275970Scy 178275970Scy if (n) { 179275970Scy BEV_RESET_GENERIC_READ_TIMEOUT(dst); 180275970Scy 181275970Scy if (evbuffer_get_length(dst->output)) 182275970Scy BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); 183275970Scy else 184275970Scy BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); 185275970Scy } 186275970Scy 187275970Scy bufferevent_trigger_nolock_(dst, EV_READ, 0); 188275970Scy bufferevent_trigger_nolock_(src, EV_WRITE, 0); 189275970Scydone: 190275970Scy evbuffer_freeze(src->output, 1); 191275970Scy evbuffer_freeze(dst->input, 0); 192275970Scy} 193275970Scy 194275970Scystatic inline int 195275970Scybe_pair_wants_to_talk(struct bufferevent_pair *src, 196275970Scy struct bufferevent_pair *dst) 197275970Scy{ 198275970Scy return (downcast(src)->enabled & EV_WRITE) && 199275970Scy (downcast(dst)->enabled & EV_READ) && 200275970Scy !dst->bev.read_suspended && 201275970Scy evbuffer_get_length(downcast(src)->output); 202275970Scy} 203275970Scy 204275970Scystatic void 205275970Scybe_pair_outbuf_cb(struct evbuffer *outbuf, 206275970Scy const struct evbuffer_cb_info *info, void *arg) 207275970Scy{ 208275970Scy struct bufferevent_pair *bev_pair = arg; 209275970Scy struct bufferevent_pair *partner = bev_pair->partner; 210275970Scy 211275970Scy incref_and_lock(downcast(bev_pair)); 212275970Scy 213275970Scy if (info->n_added > info->n_deleted && partner) { 214275970Scy /* We got more data. If the other side's reading, then 215275970Scy hand it over. */ 216275970Scy if (be_pair_wants_to_talk(bev_pair, partner)) { 217275970Scy be_pair_transfer(downcast(bev_pair), downcast(partner), 0); 218275970Scy } 219275970Scy } 220275970Scy 221275970Scy decref_and_unlock(downcast(bev_pair)); 222275970Scy} 223275970Scy 224275970Scystatic int 225275970Scybe_pair_enable(struct bufferevent *bufev, short events) 226275970Scy{ 227275970Scy struct bufferevent_pair *bev_p = upcast(bufev); 228275970Scy struct bufferevent_pair *partner = bev_p->partner; 229275970Scy 230275970Scy incref_and_lock(bufev); 231275970Scy 232275970Scy if (events & EV_READ) { 233275970Scy BEV_RESET_GENERIC_READ_TIMEOUT(bufev); 234275970Scy } 235275970Scy if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) 236275970Scy BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 237275970Scy 238275970Scy /* We're starting to read! Does the other side have anything to write?*/ 239275970Scy if ((events & EV_READ) && partner && 240275970Scy be_pair_wants_to_talk(partner, bev_p)) { 241275970Scy be_pair_transfer(downcast(partner), bufev, 0); 242275970Scy } 243275970Scy /* We're starting to write! Does the other side want to read? */ 244275970Scy if ((events & EV_WRITE) && partner && 245275970Scy be_pair_wants_to_talk(bev_p, partner)) { 246275970Scy be_pair_transfer(bufev, downcast(partner), 0); 247275970Scy } 248275970Scy decref_and_unlock(bufev); 249275970Scy return 0; 250275970Scy} 251275970Scy 252275970Scystatic int 253275970Scybe_pair_disable(struct bufferevent *bev, short events) 254275970Scy{ 255275970Scy if (events & EV_READ) { 256275970Scy BEV_DEL_GENERIC_READ_TIMEOUT(bev); 257275970Scy } 258275970Scy if (events & EV_WRITE) { 259275970Scy BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 260275970Scy } 261275970Scy return 0; 262275970Scy} 263275970Scy 264275970Scystatic void 265275970Scybe_pair_unlink(struct bufferevent *bev) 266275970Scy{ 267275970Scy struct bufferevent_pair *bev_p = upcast(bev); 268275970Scy 269275970Scy if (bev_p->partner) { 270285612Sdelphij bev_p->unlinked_partner = bev_p->partner; 271275970Scy bev_p->partner->partner = NULL; 272275970Scy bev_p->partner = NULL; 273275970Scy } 274275970Scy} 275275970Scy 276285612Sdelphij/* Free *shared* lock in the latest be (since we share it between two of them). */ 277285612Sdelphijstatic void 278285612Sdelphijbe_pair_destruct(struct bufferevent *bev) 279285612Sdelphij{ 280285612Sdelphij struct bufferevent_pair *bev_p = upcast(bev); 281285612Sdelphij 282285612Sdelphij /* Transfer ownership of the lock into partner, otherwise we will use 283285612Sdelphij * already free'd lock during freeing second bev, see next example: 284285612Sdelphij * 285285612Sdelphij * bev1->own_lock = 1 286285612Sdelphij * bev2->own_lock = 0 287285612Sdelphij * bev2->lock = bev1->lock 288285612Sdelphij * 289285612Sdelphij * bufferevent_free(bev1) # refcnt == 0 -> unlink 290285612Sdelphij * bufferevent_free(bev2) # refcnt == 0 -> unlink 291285612Sdelphij * 292285612Sdelphij * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock) 293285612Sdelphij * -> BEV_LOCK(bev2->lock) <-- already freed 294285612Sdelphij * 295285612Sdelphij * Where bev1 == pair[0], bev2 == pair[1]. 296285612Sdelphij */ 297285612Sdelphij if (bev_p->unlinked_partner && bev_p->bev.own_lock) { 298285612Sdelphij bev_p->unlinked_partner->bev.own_lock = 1; 299285612Sdelphij bev_p->bev.own_lock = 0; 300285612Sdelphij } 301285612Sdelphij bev_p->unlinked_partner = NULL; 302285612Sdelphij} 303285612Sdelphij 304275970Scystatic int 305275970Scybe_pair_flush(struct bufferevent *bev, short iotype, 306275970Scy enum bufferevent_flush_mode mode) 307275970Scy{ 308275970Scy struct bufferevent_pair *bev_p = upcast(bev); 309275970Scy struct bufferevent *partner; 310275970Scy incref_and_lock(bev); 311275970Scy if (!bev_p->partner) 312275970Scy return -1; 313275970Scy 314275970Scy partner = downcast(bev_p->partner); 315275970Scy 316275970Scy if (mode == BEV_NORMAL) 317275970Scy return 0; 318275970Scy 319275970Scy if ((iotype & EV_READ) != 0) 320275970Scy be_pair_transfer(partner, bev, 1); 321275970Scy 322275970Scy if ((iotype & EV_WRITE) != 0) 323275970Scy be_pair_transfer(bev, partner, 1); 324275970Scy 325275970Scy if (mode == BEV_FINISHED) { 326275970Scy bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF, 0); 327275970Scy } 328275970Scy decref_and_unlock(bev); 329275970Scy return 0; 330275970Scy} 331275970Scy 332275970Scystruct bufferevent * 333275970Scybufferevent_pair_get_partner(struct bufferevent *bev) 334275970Scy{ 335275970Scy struct bufferevent_pair *bev_p; 336275970Scy struct bufferevent *partner = NULL; 337275970Scy bev_p = upcast(bev); 338275970Scy if (! bev_p) 339275970Scy return NULL; 340275970Scy 341275970Scy incref_and_lock(bev); 342275970Scy if (bev_p->partner) 343275970Scy partner = downcast(bev_p->partner); 344275970Scy decref_and_unlock(bev); 345275970Scy return partner; 346275970Scy} 347275970Scy 348275970Scyconst struct bufferevent_ops bufferevent_ops_pair = { 349275970Scy "pair_elt", 350275970Scy evutil_offsetof(struct bufferevent_pair, bev.bev), 351275970Scy be_pair_enable, 352275970Scy be_pair_disable, 353275970Scy be_pair_unlink, 354285612Sdelphij be_pair_destruct, 355275970Scy bufferevent_generic_adj_timeouts_, 356275970Scy be_pair_flush, 357275970Scy NULL, /* ctrl */ 358275970Scy}; 359