1275970Scy/* 2275970Scy * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3275970Scy * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 4275970Scy * All rights reserved. 5275970Scy * 6275970Scy * Redistribution and use in source and binary forms, with or without 7275970Scy * modification, are permitted provided that the following conditions 8275970Scy * are met: 9275970Scy * 1. Redistributions of source code must retain the above copyright 10275970Scy * notice, this list of conditions and the following disclaimer. 11275970Scy * 2. Redistributions in binary form must reproduce the above copyright 12275970Scy * notice, this list of conditions and the following disclaimer in the 13275970Scy * documentation and/or other materials provided with the distribution. 14275970Scy * 3. The name of the author may not be used to endorse or promote products 15275970Scy * derived from this software without specific prior written permission. 16275970Scy * 17275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27275970Scy */ 28275970Scy 29275970Scy#include "evconfig-private.h" 30275970Scy 31275970Scy#include <sys/types.h> 32275970Scy 33275970Scy#include "event2/event-config.h" 34275970Scy 35275970Scy#ifdef EVENT__HAVE_SYS_TIME_H 36275970Scy#include <sys/time.h> 37275970Scy#endif 38275970Scy 39275970Scy#include <errno.h> 40275970Scy#include <stdio.h> 41275970Scy#include <stdlib.h> 42275970Scy#include <string.h> 43275970Scy#ifdef EVENT__HAVE_STDARG_H 44275970Scy#include <stdarg.h> 45275970Scy#endif 46275970Scy 47275970Scy#ifdef _WIN32 48275970Scy#include <winsock2.h> 49275970Scy#endif 50275970Scy 51275970Scy#include "event2/util.h" 52275970Scy#include "event2/bufferevent.h" 53275970Scy#include "event2/buffer.h" 54275970Scy#include "event2/bufferevent_struct.h" 55275970Scy#include "event2/event.h" 56275970Scy#include "log-internal.h" 57275970Scy#include "mm-internal.h" 58275970Scy#include "bufferevent-internal.h" 59275970Scy#include "util-internal.h" 60275970Scy 61275970Scy/* prototypes */ 62275970Scystatic int be_filter_enable(struct bufferevent *, short); 63275970Scystatic int be_filter_disable(struct bufferevent *, short); 64275970Scystatic void be_filter_unlink(struct bufferevent *); 65275970Scystatic void be_filter_destruct(struct bufferevent *); 66275970Scy 67275970Scystatic void be_filter_readcb(struct bufferevent *, void *); 68275970Scystatic void be_filter_writecb(struct bufferevent *, void *); 69275970Scystatic void be_filter_eventcb(struct bufferevent *, short, void *); 70275970Scystatic int be_filter_flush(struct bufferevent *bufev, 71275970Scy short iotype, enum bufferevent_flush_mode mode); 72275970Scystatic int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 73275970Scy 74275970Scystatic void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, 75275970Scy const struct evbuffer_cb_info *info, void *arg); 76275970Scy 77275970Scystruct bufferevent_filtered { 78275970Scy struct bufferevent_private bev; 79275970Scy 80275970Scy /** The bufferevent that we read/write filtered data from/to. */ 81275970Scy struct bufferevent *underlying; 82275970Scy /** A callback on our outbuf to notice when somebody adds data */ 83275970Scy struct evbuffer_cb_entry *outbuf_cb; 84275970Scy /** True iff we have received an EOF callback from the underlying 85275970Scy * bufferevent. */ 86275970Scy unsigned got_eof; 87275970Scy 88275970Scy /** Function to free context when we're done. */ 89275970Scy void (*free_context)(void *); 90275970Scy /** Input filter */ 91275970Scy bufferevent_filter_cb process_in; 92275970Scy /** Output filter */ 93275970Scy bufferevent_filter_cb process_out; 94275970Scy /** User-supplied argument to the filters. */ 95275970Scy void *context; 96275970Scy}; 97275970Scy 98275970Scyconst struct bufferevent_ops bufferevent_ops_filter = { 99275970Scy "filter", 100275970Scy evutil_offsetof(struct bufferevent_filtered, bev.bev), 101275970Scy be_filter_enable, 102275970Scy be_filter_disable, 103275970Scy be_filter_unlink, 104275970Scy be_filter_destruct, 105275970Scy bufferevent_generic_adj_timeouts_, 106275970Scy be_filter_flush, 107275970Scy be_filter_ctrl, 108275970Scy}; 109275970Scy 110275970Scy/* Given a bufferevent that's really the bev filter of a bufferevent_filtered, 111275970Scy * return that bufferevent_filtered. Returns NULL otherwise.*/ 112275970Scystatic inline struct bufferevent_filtered * 113275970Scyupcast(struct bufferevent *bev) 114275970Scy{ 115275970Scy struct bufferevent_filtered *bev_f; 116275970Scy if (bev->be_ops != &bufferevent_ops_filter) 117275970Scy return NULL; 118275970Scy bev_f = (void*)( ((char*)bev) - 119275970Scy evutil_offsetof(struct bufferevent_filtered, bev.bev)); 120275970Scy EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter); 121275970Scy return bev_f; 122275970Scy} 123275970Scy 124275970Scy#define downcast(bev_f) (&(bev_f)->bev.bev) 125275970Scy 126275970Scy/** Return 1 iff bevf's underlying bufferevent's output buffer is at or 127275970Scy * over its high watermark such that we should not write to it in a given 128275970Scy * flush mode. */ 129275970Scystatic int 130275970Scybe_underlying_writebuf_full(struct bufferevent_filtered *bevf, 131275970Scy enum bufferevent_flush_mode state) 132275970Scy{ 133275970Scy struct bufferevent *u = bevf->underlying; 134275970Scy return state == BEV_NORMAL && 135275970Scy u->wm_write.high && 136275970Scy evbuffer_get_length(u->output) >= u->wm_write.high; 137275970Scy} 138275970Scy 139275970Scy/** Return 1 if our input buffer is at or over its high watermark such that we 140275970Scy * should not write to it in a given flush mode. */ 141275970Scystatic int 142275970Scybe_readbuf_full(struct bufferevent_filtered *bevf, 143275970Scy enum bufferevent_flush_mode state) 144275970Scy{ 145275970Scy struct bufferevent *bufev = downcast(bevf); 146275970Scy return state == BEV_NORMAL && 147275970Scy bufev->wm_read.high && 148275970Scy evbuffer_get_length(bufev->input) >= bufev->wm_read.high; 149275970Scy} 150275970Scy 151275970Scy 152275970Scy/* Filter to use when we're created with a NULL filter. */ 153275970Scystatic enum bufferevent_filter_result 154275970Scybe_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim, 155275970Scy enum bufferevent_flush_mode state, void *ctx) 156275970Scy{ 157275970Scy (void)state; 158275970Scy if (evbuffer_remove_buffer(src, dst, lim) == 0) 159275970Scy return BEV_OK; 160275970Scy else 161275970Scy return BEV_ERROR; 162275970Scy} 163275970Scy 164275970Scystruct bufferevent * 165275970Scybufferevent_filter_new(struct bufferevent *underlying, 166275970Scy bufferevent_filter_cb input_filter, 167275970Scy bufferevent_filter_cb output_filter, 168275970Scy int options, 169275970Scy void (*free_context)(void *), 170275970Scy void *ctx) 171275970Scy{ 172275970Scy struct bufferevent_filtered *bufev_f; 173275970Scy int tmp_options = options & ~BEV_OPT_THREADSAFE; 174275970Scy 175275970Scy if (!underlying) 176275970Scy return NULL; 177275970Scy 178275970Scy if (!input_filter) 179275970Scy input_filter = be_null_filter; 180275970Scy if (!output_filter) 181275970Scy output_filter = be_null_filter; 182275970Scy 183275970Scy bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered)); 184275970Scy if (!bufev_f) 185275970Scy return NULL; 186275970Scy 187275970Scy if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base, 188275970Scy &bufferevent_ops_filter, tmp_options) < 0) { 189275970Scy mm_free(bufev_f); 190275970Scy return NULL; 191275970Scy } 192275970Scy if (options & BEV_OPT_THREADSAFE) { 193275970Scy bufferevent_enable_locking_(downcast(bufev_f), NULL); 194275970Scy } 195275970Scy 196275970Scy bufev_f->underlying = underlying; 197275970Scy 198275970Scy bufev_f->process_in = input_filter; 199275970Scy bufev_f->process_out = output_filter; 200275970Scy bufev_f->free_context = free_context; 201275970Scy bufev_f->context = ctx; 202275970Scy 203275970Scy bufferevent_setcb(bufev_f->underlying, 204275970Scy be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); 205275970Scy 206275970Scy bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, 207275970Scy bufferevent_filtered_outbuf_cb, bufev_f); 208275970Scy 209275970Scy bufferevent_init_generic_timeout_cbs_(downcast(bufev_f)); 210275970Scy bufferevent_incref_(underlying); 211275970Scy 212275970Scy bufferevent_enable(underlying, EV_READ|EV_WRITE); 213275970Scy bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ); 214275970Scy 215275970Scy return downcast(bufev_f); 216275970Scy} 217275970Scy 218275970Scystatic void 219275970Scybe_filter_unlink(struct bufferevent *bev) 220275970Scy{ 221275970Scy struct bufferevent_filtered *bevf = upcast(bev); 222275970Scy EVUTIL_ASSERT(bevf); 223275970Scy 224275970Scy if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { 225275970Scy /* Yes, there is also a decref in bufferevent_decref_. 226275970Scy * That decref corresponds to the incref when we set 227275970Scy * underlying for the first time. This decref is an 228275970Scy * extra one to remove the last reference. 229275970Scy */ 230275970Scy if (BEV_UPCAST(bevf->underlying)->refcnt < 2) { 231275970Scy event_warnx("BEV_OPT_CLOSE_ON_FREE set on an " 232275970Scy "bufferevent with too few references"); 233275970Scy } else { 234275970Scy bufferevent_free(bevf->underlying); 235275970Scy } 236275970Scy } else { 237275970Scy if (bevf->underlying) { 238275970Scy if (bevf->underlying->errorcb == be_filter_eventcb) 239275970Scy bufferevent_setcb(bevf->underlying, 240275970Scy NULL, NULL, NULL, NULL); 241275970Scy bufferevent_unsuspend_read_(bevf->underlying, 242275970Scy BEV_SUSPEND_FILT_READ); 243275970Scy } 244275970Scy } 245275970Scy} 246275970Scy 247275970Scystatic void 248275970Scybe_filter_destruct(struct bufferevent *bev) 249275970Scy{ 250275970Scy struct bufferevent_filtered *bevf = upcast(bev); 251275970Scy EVUTIL_ASSERT(bevf); 252275970Scy if (bevf->free_context) 253275970Scy bevf->free_context(bevf->context); 254275970Scy} 255275970Scy 256275970Scystatic int 257275970Scybe_filter_enable(struct bufferevent *bev, short event) 258275970Scy{ 259275970Scy struct bufferevent_filtered *bevf = upcast(bev); 260275970Scy if (event & EV_WRITE) 261275970Scy BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 262275970Scy 263275970Scy if (event & EV_READ) { 264275970Scy BEV_RESET_GENERIC_READ_TIMEOUT(bev); 265275970Scy bufferevent_unsuspend_read_(bevf->underlying, 266275970Scy BEV_SUSPEND_FILT_READ); 267275970Scy } 268275970Scy return 0; 269275970Scy} 270275970Scy 271275970Scystatic int 272275970Scybe_filter_disable(struct bufferevent *bev, short event) 273275970Scy{ 274275970Scy struct bufferevent_filtered *bevf = upcast(bev); 275275970Scy if (event & EV_WRITE) 276275970Scy BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 277275970Scy if (event & EV_READ) { 278275970Scy BEV_DEL_GENERIC_READ_TIMEOUT(bev); 279275970Scy bufferevent_suspend_read_(bevf->underlying, 280275970Scy BEV_SUSPEND_FILT_READ); 281275970Scy } 282275970Scy return 0; 283275970Scy} 284275970Scy 285275970Scystatic enum bufferevent_filter_result 286275970Scybe_filter_process_input(struct bufferevent_filtered *bevf, 287275970Scy enum bufferevent_flush_mode state, 288275970Scy int *processed_out) 289275970Scy{ 290275970Scy enum bufferevent_filter_result res; 291275970Scy struct bufferevent *bev = downcast(bevf); 292275970Scy 293275970Scy if (state == BEV_NORMAL) { 294275970Scy /* If we're in 'normal' mode, don't urge data on the filter 295275970Scy * unless we're reading data and under our high-water mark.*/ 296275970Scy if (!(bev->enabled & EV_READ) || 297275970Scy be_readbuf_full(bevf, state)) 298275970Scy return BEV_OK; 299275970Scy } 300275970Scy 301275970Scy do { 302275970Scy ev_ssize_t limit = -1; 303275970Scy if (state == BEV_NORMAL && bev->wm_read.high) 304275970Scy limit = bev->wm_read.high - 305275970Scy evbuffer_get_length(bev->input); 306275970Scy 307275970Scy res = bevf->process_in(bevf->underlying->input, 308275970Scy bev->input, limit, state, bevf->context); 309275970Scy 310275970Scy if (res == BEV_OK) 311275970Scy *processed_out = 1; 312275970Scy } while (res == BEV_OK && 313275970Scy (bev->enabled & EV_READ) && 314275970Scy evbuffer_get_length(bevf->underlying->input) && 315275970Scy !be_readbuf_full(bevf, state)); 316275970Scy 317275970Scy if (*processed_out) 318275970Scy BEV_RESET_GENERIC_READ_TIMEOUT(bev); 319275970Scy 320275970Scy return res; 321275970Scy} 322275970Scy 323275970Scy 324275970Scystatic enum bufferevent_filter_result 325275970Scybe_filter_process_output(struct bufferevent_filtered *bevf, 326275970Scy enum bufferevent_flush_mode state, 327275970Scy int *processed_out) 328275970Scy{ 329275970Scy /* Requires references and lock: might call writecb */ 330275970Scy enum bufferevent_filter_result res = BEV_OK; 331275970Scy struct bufferevent *bufev = downcast(bevf); 332275970Scy int again = 0; 333275970Scy 334275970Scy if (state == BEV_NORMAL) { 335275970Scy /* If we're in 'normal' mode, don't urge data on the 336275970Scy * filter unless we're writing data, and the underlying 337275970Scy * bufferevent is accepting data, and we have data to 338275970Scy * give the filter. If we're in 'flush' or 'finish', 339275970Scy * call the filter no matter what. */ 340275970Scy if (!(bufev->enabled & EV_WRITE) || 341275970Scy be_underlying_writebuf_full(bevf, state) || 342275970Scy !evbuffer_get_length(bufev->output)) 343275970Scy return BEV_OK; 344275970Scy } 345275970Scy 346275970Scy /* disable the callback that calls this function 347275970Scy when the user adds to the output buffer. */ 348275970Scy evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0); 349275970Scy 350275970Scy do { 351275970Scy int processed = 0; 352275970Scy again = 0; 353275970Scy 354275970Scy do { 355275970Scy ev_ssize_t limit = -1; 356275970Scy if (state == BEV_NORMAL && 357275970Scy bevf->underlying->wm_write.high) 358275970Scy limit = bevf->underlying->wm_write.high - 359275970Scy evbuffer_get_length(bevf->underlying->output); 360275970Scy 361275970Scy res = bevf->process_out(downcast(bevf)->output, 362275970Scy bevf->underlying->output, 363275970Scy limit, 364275970Scy state, 365275970Scy bevf->context); 366275970Scy 367275970Scy if (res == BEV_OK) 368275970Scy processed = *processed_out = 1; 369275970Scy } while (/* Stop if the filter wasn't successful...*/ 370275970Scy res == BEV_OK && 371275970Scy /* Or if we aren't writing any more. */ 372275970Scy (bufev->enabled & EV_WRITE) && 373275970Scy /* Of if we have nothing more to write and we are 374275970Scy * not flushing. */ 375275970Scy evbuffer_get_length(bufev->output) && 376275970Scy /* Or if we have filled the underlying output buffer. */ 377275970Scy !be_underlying_writebuf_full(bevf,state)); 378275970Scy 379275970Scy if (processed) { 380275970Scy /* call the write callback.*/ 381275970Scy bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); 382275970Scy 383275970Scy if (res == BEV_OK && 384275970Scy (bufev->enabled & EV_WRITE) && 385275970Scy evbuffer_get_length(bufev->output) && 386275970Scy !be_underlying_writebuf_full(bevf, state)) { 387275970Scy again = 1; 388275970Scy } 389275970Scy } 390275970Scy } while (again); 391275970Scy 392275970Scy /* reenable the outbuf_cb */ 393275970Scy evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb, 394275970Scy EVBUFFER_CB_ENABLED); 395275970Scy 396275970Scy if (*processed_out) 397275970Scy BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 398275970Scy 399275970Scy return res; 400275970Scy} 401275970Scy 402275970Scy/* Called when the size of our outbuf changes. */ 403275970Scystatic void 404275970Scybufferevent_filtered_outbuf_cb(struct evbuffer *buf, 405275970Scy const struct evbuffer_cb_info *cbinfo, void *arg) 406275970Scy{ 407275970Scy struct bufferevent_filtered *bevf = arg; 408275970Scy struct bufferevent *bev = downcast(bevf); 409275970Scy 410275970Scy if (cbinfo->n_added) { 411275970Scy int processed_any = 0; 412275970Scy /* Somebody added more data to the output buffer. Try to 413275970Scy * process it, if we should. */ 414275970Scy bufferevent_incref_and_lock_(bev); 415275970Scy be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 416275970Scy bufferevent_decref_and_unlock_(bev); 417275970Scy } 418275970Scy} 419275970Scy 420275970Scy/* Called when the underlying socket has read. */ 421275970Scystatic void 422275970Scybe_filter_readcb(struct bufferevent *underlying, void *me_) 423275970Scy{ 424275970Scy struct bufferevent_filtered *bevf = me_; 425275970Scy enum bufferevent_filter_result res; 426275970Scy enum bufferevent_flush_mode state; 427275970Scy struct bufferevent *bufev = downcast(bevf); 428282408Scy struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 429275970Scy int processed_any = 0; 430275970Scy 431282408Scy BEV_LOCK(bufev); 432275970Scy 433282408Scy // It's possible our refcount is 0 at this point if another thread free'd our filterevent 434282408Scy EVUTIL_ASSERT(bufev_private->refcnt >= 0); 435275970Scy 436282408Scy // If our refcount is > 0 437282408Scy if (bufev_private->refcnt > 0) { 438275970Scy 439282408Scy if (bevf->got_eof) 440282408Scy state = BEV_FINISHED; 441282408Scy else 442282408Scy state = BEV_NORMAL; 443275970Scy 444282408Scy /* XXXX use return value */ 445282408Scy res = be_filter_process_input(bevf, state, &processed_any); 446282408Scy (void)res; 447282408Scy 448282408Scy /* XXX This should be in process_input, not here. There are 449282408Scy * other places that can call process-input, and they should 450282408Scy * force readcb calls as needed. */ 451282408Scy if (processed_any) 452282408Scy bufferevent_trigger_nolock_(bufev, EV_READ, 0); 453282408Scy } 454282408Scy 455282408Scy BEV_UNLOCK(bufev); 456275970Scy} 457275970Scy 458275970Scy/* Called when the underlying socket has drained enough that we can write to 459275970Scy it. */ 460275970Scystatic void 461275970Scybe_filter_writecb(struct bufferevent *underlying, void *me_) 462275970Scy{ 463275970Scy struct bufferevent_filtered *bevf = me_; 464275970Scy struct bufferevent *bev = downcast(bevf); 465282408Scy struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 466275970Scy int processed_any = 0; 467275970Scy 468282408Scy BEV_LOCK(bev); 469282408Scy 470282408Scy // It's possible our refcount is 0 at this point if another thread free'd our filterevent 471282408Scy EVUTIL_ASSERT(bufev_private->refcnt >= 0); 472282408Scy 473282408Scy // If our refcount is > 0 474282408Scy if (bufev_private->refcnt > 0) { 475282408Scy be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 476282408Scy } 477282408Scy 478282408Scy BEV_UNLOCK(bev); 479275970Scy} 480275970Scy 481275970Scy/* Called when the underlying socket has given us an error */ 482275970Scystatic void 483275970Scybe_filter_eventcb(struct bufferevent *underlying, short what, void *me_) 484275970Scy{ 485275970Scy struct bufferevent_filtered *bevf = me_; 486275970Scy struct bufferevent *bev = downcast(bevf); 487282408Scy struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 488275970Scy 489282408Scy BEV_LOCK(bev); 490282408Scy 491282408Scy // It's possible our refcount is 0 at this point if another thread free'd our filterevent 492282408Scy EVUTIL_ASSERT(bufev_private->refcnt >= 0); 493282408Scy 494282408Scy // If our refcount is > 0 495282408Scy if (bufev_private->refcnt > 0) { 496282408Scy 497282408Scy /* All we can really to is tell our own eventcb. */ 498282408Scy bufferevent_run_eventcb_(bev, what, 0); 499282408Scy } 500282408Scy 501282408Scy BEV_UNLOCK(bev); 502275970Scy} 503275970Scy 504275970Scystatic int 505275970Scybe_filter_flush(struct bufferevent *bufev, 506275970Scy short iotype, enum bufferevent_flush_mode mode) 507275970Scy{ 508275970Scy struct bufferevent_filtered *bevf = upcast(bufev); 509275970Scy int processed_any = 0; 510275970Scy EVUTIL_ASSERT(bevf); 511275970Scy 512275970Scy bufferevent_incref_and_lock_(bufev); 513275970Scy 514275970Scy if (iotype & EV_READ) { 515275970Scy be_filter_process_input(bevf, mode, &processed_any); 516275970Scy } 517275970Scy if (iotype & EV_WRITE) { 518275970Scy be_filter_process_output(bevf, mode, &processed_any); 519275970Scy } 520275970Scy /* XXX check the return value? */ 521275970Scy /* XXX does this want to recursively call lower-level flushes? */ 522275970Scy bufferevent_flush(bevf->underlying, iotype, mode); 523275970Scy 524275970Scy bufferevent_decref_and_unlock_(bufev); 525275970Scy 526275970Scy return processed_any; 527275970Scy} 528275970Scy 529275970Scystatic int 530275970Scybe_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 531275970Scy union bufferevent_ctrl_data *data) 532275970Scy{ 533275970Scy struct bufferevent_filtered *bevf; 534275970Scy switch (op) { 535275970Scy case BEV_CTRL_GET_UNDERLYING: 536275970Scy bevf = upcast(bev); 537275970Scy data->ptr = bevf->underlying; 538275970Scy return 0; 539275970Scy case BEV_CTRL_GET_FD: 540275970Scy case BEV_CTRL_SET_FD: 541275970Scy case BEV_CTRL_CANCEL_ALL: 542275970Scy default: 543275970Scy return -1; 544275970Scy } 545275970Scy} 546