1275970Scy/*
2275970Scy * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3275970Scy * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4275970Scy * All rights reserved.
5275970Scy *
6275970Scy * Redistribution and use in source and binary forms, with or without
7275970Scy * modification, are permitted provided that the following conditions
8275970Scy * are met:
9275970Scy * 1. Redistributions of source code must retain the above copyright
10275970Scy *    notice, this list of conditions and the following disclaimer.
11275970Scy * 2. Redistributions in binary form must reproduce the above copyright
12275970Scy *    notice, this list of conditions and the following disclaimer in the
13275970Scy *    documentation and/or other materials provided with the distribution.
14275970Scy * 3. The name of the author may not be used to endorse or promote products
15275970Scy *    derived from this software without specific prior written permission.
16275970Scy *
17275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27275970Scy */
28275970Scy
29275970Scy#include "evconfig-private.h"
30275970Scy
31275970Scy#include <sys/types.h>
32275970Scy
33275970Scy#include "event2/event-config.h"
34275970Scy
35275970Scy#ifdef EVENT__HAVE_SYS_TIME_H
36275970Scy#include <sys/time.h>
37275970Scy#endif
38275970Scy
39275970Scy#include <errno.h>
40275970Scy#include <stdio.h>
41275970Scy#include <stdlib.h>
42275970Scy#include <string.h>
43275970Scy#ifdef EVENT__HAVE_STDARG_H
44275970Scy#include <stdarg.h>
45275970Scy#endif
46275970Scy
47275970Scy#ifdef _WIN32
48275970Scy#include <winsock2.h>
49275970Scy#endif
50275970Scy
51275970Scy#include "event2/util.h"
52275970Scy#include "event2/bufferevent.h"
53275970Scy#include "event2/buffer.h"
54275970Scy#include "event2/bufferevent_struct.h"
55275970Scy#include "event2/event.h"
56275970Scy#include "log-internal.h"
57275970Scy#include "mm-internal.h"
58275970Scy#include "bufferevent-internal.h"
59275970Scy#include "util-internal.h"
60275970Scy
61275970Scy/* prototypes */
62275970Scystatic int be_filter_enable(struct bufferevent *, short);
63275970Scystatic int be_filter_disable(struct bufferevent *, short);
64275970Scystatic void be_filter_unlink(struct bufferevent *);
65275970Scystatic void be_filter_destruct(struct bufferevent *);
66275970Scy
67275970Scystatic void be_filter_readcb(struct bufferevent *, void *);
68275970Scystatic void be_filter_writecb(struct bufferevent *, void *);
69275970Scystatic void be_filter_eventcb(struct bufferevent *, short, void *);
70275970Scystatic int be_filter_flush(struct bufferevent *bufev,
71275970Scy    short iotype, enum bufferevent_flush_mode mode);
72275970Scystatic int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
73275970Scy
74275970Scystatic void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
75275970Scy    const struct evbuffer_cb_info *info, void *arg);
76275970Scy
77275970Scystruct bufferevent_filtered {
78275970Scy	struct bufferevent_private bev;
79275970Scy
80275970Scy	/** The bufferevent that we read/write filtered data from/to. */
81275970Scy	struct bufferevent *underlying;
82275970Scy	/** A callback on our outbuf to notice when somebody adds data */
83275970Scy	struct evbuffer_cb_entry *outbuf_cb;
84275970Scy	/** True iff we have received an EOF callback from the underlying
85275970Scy	 * bufferevent. */
86275970Scy	unsigned got_eof;
87275970Scy
88275970Scy	/** Function to free context when we're done. */
89275970Scy	void (*free_context)(void *);
90275970Scy	/** Input filter */
91275970Scy	bufferevent_filter_cb process_in;
92275970Scy	/** Output filter */
93275970Scy	bufferevent_filter_cb process_out;
94275970Scy	/** User-supplied argument to the filters. */
95275970Scy	void *context;
96275970Scy};
97275970Scy
98275970Scyconst struct bufferevent_ops bufferevent_ops_filter = {
99275970Scy	"filter",
100275970Scy	evutil_offsetof(struct bufferevent_filtered, bev.bev),
101275970Scy	be_filter_enable,
102275970Scy	be_filter_disable,
103275970Scy	be_filter_unlink,
104275970Scy	be_filter_destruct,
105275970Scy	bufferevent_generic_adj_timeouts_,
106275970Scy	be_filter_flush,
107275970Scy	be_filter_ctrl,
108275970Scy};
109275970Scy
110275970Scy/* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
111275970Scy * return that bufferevent_filtered. Returns NULL otherwise.*/
112275970Scystatic inline struct bufferevent_filtered *
113275970Scyupcast(struct bufferevent *bev)
114275970Scy{
115275970Scy	struct bufferevent_filtered *bev_f;
116275970Scy	if (bev->be_ops != &bufferevent_ops_filter)
117275970Scy		return NULL;
118275970Scy	bev_f = (void*)( ((char*)bev) -
119275970Scy			 evutil_offsetof(struct bufferevent_filtered, bev.bev));
120275970Scy	EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
121275970Scy	return bev_f;
122275970Scy}
123275970Scy
124275970Scy#define downcast(bev_f) (&(bev_f)->bev.bev)
125275970Scy
126275970Scy/** Return 1 iff bevf's underlying bufferevent's output buffer is at or
127275970Scy * over its high watermark such that we should not write to it in a given
128275970Scy * flush mode. */
129275970Scystatic int
130275970Scybe_underlying_writebuf_full(struct bufferevent_filtered *bevf,
131275970Scy    enum bufferevent_flush_mode state)
132275970Scy{
133275970Scy	struct bufferevent *u = bevf->underlying;
134275970Scy	return state == BEV_NORMAL &&
135275970Scy	    u->wm_write.high &&
136275970Scy	    evbuffer_get_length(u->output) >= u->wm_write.high;
137275970Scy}
138275970Scy
139275970Scy/** Return 1 if our input buffer is at or over its high watermark such that we
140275970Scy * should not write to it in a given flush mode. */
141275970Scystatic int
142275970Scybe_readbuf_full(struct bufferevent_filtered *bevf,
143275970Scy    enum bufferevent_flush_mode state)
144275970Scy{
145275970Scy	struct bufferevent *bufev = downcast(bevf);
146275970Scy	return state == BEV_NORMAL &&
147275970Scy	    bufev->wm_read.high &&
148275970Scy	    evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
149275970Scy}
150275970Scy
151275970Scy
152275970Scy/* Filter to use when we're created with a NULL filter. */
153275970Scystatic enum bufferevent_filter_result
154275970Scybe_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
155275970Scy	       enum bufferevent_flush_mode state, void *ctx)
156275970Scy{
157275970Scy	(void)state;
158275970Scy	if (evbuffer_remove_buffer(src, dst, lim) == 0)
159275970Scy		return BEV_OK;
160275970Scy	else
161275970Scy		return BEV_ERROR;
162275970Scy}
163275970Scy
164275970Scystruct bufferevent *
165275970Scybufferevent_filter_new(struct bufferevent *underlying,
166275970Scy		       bufferevent_filter_cb input_filter,
167275970Scy		       bufferevent_filter_cb output_filter,
168275970Scy		       int options,
169275970Scy		       void (*free_context)(void *),
170275970Scy		       void *ctx)
171275970Scy{
172275970Scy	struct bufferevent_filtered *bufev_f;
173275970Scy	int tmp_options = options & ~BEV_OPT_THREADSAFE;
174275970Scy
175275970Scy	if (!underlying)
176275970Scy		return NULL;
177275970Scy
178275970Scy	if (!input_filter)
179275970Scy		input_filter = be_null_filter;
180275970Scy	if (!output_filter)
181275970Scy		output_filter = be_null_filter;
182275970Scy
183275970Scy	bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
184275970Scy	if (!bufev_f)
185275970Scy		return NULL;
186275970Scy
187275970Scy	if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base,
188275970Scy				    &bufferevent_ops_filter, tmp_options) < 0) {
189275970Scy		mm_free(bufev_f);
190275970Scy		return NULL;
191275970Scy	}
192275970Scy	if (options & BEV_OPT_THREADSAFE) {
193275970Scy		bufferevent_enable_locking_(downcast(bufev_f), NULL);
194275970Scy	}
195275970Scy
196275970Scy	bufev_f->underlying = underlying;
197275970Scy
198275970Scy	bufev_f->process_in = input_filter;
199275970Scy	bufev_f->process_out = output_filter;
200275970Scy	bufev_f->free_context = free_context;
201275970Scy	bufev_f->context = ctx;
202275970Scy
203275970Scy	bufferevent_setcb(bufev_f->underlying,
204275970Scy	    be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
205275970Scy
206275970Scy	bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
207275970Scy	   bufferevent_filtered_outbuf_cb, bufev_f);
208275970Scy
209275970Scy	bufferevent_init_generic_timeout_cbs_(downcast(bufev_f));
210275970Scy	bufferevent_incref_(underlying);
211275970Scy
212275970Scy	bufferevent_enable(underlying, EV_READ|EV_WRITE);
213275970Scy	bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ);
214275970Scy
215275970Scy	return downcast(bufev_f);
216275970Scy}
217275970Scy
218275970Scystatic void
219275970Scybe_filter_unlink(struct bufferevent *bev)
220275970Scy{
221275970Scy	struct bufferevent_filtered *bevf = upcast(bev);
222275970Scy	EVUTIL_ASSERT(bevf);
223275970Scy
224275970Scy	if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
225275970Scy		/* Yes, there is also a decref in bufferevent_decref_.
226275970Scy		 * That decref corresponds to the incref when we set
227275970Scy		 * underlying for the first time.  This decref is an
228275970Scy		 * extra one to remove the last reference.
229275970Scy		 */
230275970Scy		if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
231275970Scy			event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
232275970Scy			    "bufferevent with too few references");
233275970Scy		} else {
234275970Scy			bufferevent_free(bevf->underlying);
235275970Scy		}
236275970Scy	} else {
237275970Scy		if (bevf->underlying) {
238275970Scy			if (bevf->underlying->errorcb == be_filter_eventcb)
239275970Scy				bufferevent_setcb(bevf->underlying,
240275970Scy				    NULL, NULL, NULL, NULL);
241275970Scy			bufferevent_unsuspend_read_(bevf->underlying,
242275970Scy			    BEV_SUSPEND_FILT_READ);
243275970Scy		}
244275970Scy	}
245275970Scy}
246275970Scy
247275970Scystatic void
248275970Scybe_filter_destruct(struct bufferevent *bev)
249275970Scy{
250275970Scy	struct bufferevent_filtered *bevf = upcast(bev);
251275970Scy	EVUTIL_ASSERT(bevf);
252275970Scy	if (bevf->free_context)
253275970Scy		bevf->free_context(bevf->context);
254275970Scy}
255275970Scy
256275970Scystatic int
257275970Scybe_filter_enable(struct bufferevent *bev, short event)
258275970Scy{
259275970Scy	struct bufferevent_filtered *bevf = upcast(bev);
260275970Scy	if (event & EV_WRITE)
261275970Scy		BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
262275970Scy
263275970Scy	if (event & EV_READ) {
264275970Scy		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
265275970Scy		bufferevent_unsuspend_read_(bevf->underlying,
266275970Scy		    BEV_SUSPEND_FILT_READ);
267275970Scy	}
268275970Scy	return 0;
269275970Scy}
270275970Scy
271275970Scystatic int
272275970Scybe_filter_disable(struct bufferevent *bev, short event)
273275970Scy{
274275970Scy	struct bufferevent_filtered *bevf = upcast(bev);
275275970Scy	if (event & EV_WRITE)
276275970Scy		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
277275970Scy	if (event & EV_READ) {
278275970Scy		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
279275970Scy		bufferevent_suspend_read_(bevf->underlying,
280275970Scy		    BEV_SUSPEND_FILT_READ);
281275970Scy	}
282275970Scy	return 0;
283275970Scy}
284275970Scy
285275970Scystatic enum bufferevent_filter_result
286275970Scybe_filter_process_input(struct bufferevent_filtered *bevf,
287275970Scy			enum bufferevent_flush_mode state,
288275970Scy			int *processed_out)
289275970Scy{
290275970Scy	enum bufferevent_filter_result res;
291275970Scy	struct bufferevent *bev = downcast(bevf);
292275970Scy
293275970Scy	if (state == BEV_NORMAL) {
294275970Scy		/* If we're in 'normal' mode, don't urge data on the filter
295275970Scy		 * unless we're reading data and under our high-water mark.*/
296275970Scy		if (!(bev->enabled & EV_READ) ||
297275970Scy		    be_readbuf_full(bevf, state))
298275970Scy			return BEV_OK;
299275970Scy	}
300275970Scy
301275970Scy	do {
302275970Scy		ev_ssize_t limit = -1;
303275970Scy		if (state == BEV_NORMAL && bev->wm_read.high)
304275970Scy			limit = bev->wm_read.high -
305275970Scy			    evbuffer_get_length(bev->input);
306275970Scy
307275970Scy		res = bevf->process_in(bevf->underlying->input,
308275970Scy		    bev->input, limit, state, bevf->context);
309275970Scy
310275970Scy		if (res == BEV_OK)
311275970Scy			*processed_out = 1;
312275970Scy	} while (res == BEV_OK &&
313275970Scy		 (bev->enabled & EV_READ) &&
314275970Scy		 evbuffer_get_length(bevf->underlying->input) &&
315275970Scy		 !be_readbuf_full(bevf, state));
316275970Scy
317275970Scy	if (*processed_out)
318275970Scy		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
319275970Scy
320275970Scy	return res;
321275970Scy}
322275970Scy
323275970Scy
324275970Scystatic enum bufferevent_filter_result
325275970Scybe_filter_process_output(struct bufferevent_filtered *bevf,
326275970Scy			 enum bufferevent_flush_mode state,
327275970Scy			 int *processed_out)
328275970Scy{
329275970Scy	/* Requires references and lock: might call writecb */
330275970Scy	enum bufferevent_filter_result res = BEV_OK;
331275970Scy	struct bufferevent *bufev = downcast(bevf);
332275970Scy	int again = 0;
333275970Scy
334275970Scy	if (state == BEV_NORMAL) {
335275970Scy		/* If we're in 'normal' mode, don't urge data on the
336275970Scy		 * filter unless we're writing data, and the underlying
337275970Scy		 * bufferevent is accepting data, and we have data to
338275970Scy		 * give the filter.  If we're in 'flush' or 'finish',
339275970Scy		 * call the filter no matter what. */
340275970Scy		if (!(bufev->enabled & EV_WRITE) ||
341275970Scy		    be_underlying_writebuf_full(bevf, state) ||
342275970Scy		    !evbuffer_get_length(bufev->output))
343275970Scy			return BEV_OK;
344275970Scy	}
345275970Scy
346275970Scy	/* disable the callback that calls this function
347275970Scy	   when the user adds to the output buffer. */
348275970Scy	evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
349275970Scy
350275970Scy	do {
351275970Scy		int processed = 0;
352275970Scy		again = 0;
353275970Scy
354275970Scy		do {
355275970Scy			ev_ssize_t limit = -1;
356275970Scy			if (state == BEV_NORMAL &&
357275970Scy			    bevf->underlying->wm_write.high)
358275970Scy				limit = bevf->underlying->wm_write.high -
359275970Scy				    evbuffer_get_length(bevf->underlying->output);
360275970Scy
361275970Scy			res = bevf->process_out(downcast(bevf)->output,
362275970Scy			    bevf->underlying->output,
363275970Scy			    limit,
364275970Scy			    state,
365275970Scy			    bevf->context);
366275970Scy
367275970Scy			if (res == BEV_OK)
368275970Scy				processed = *processed_out = 1;
369275970Scy		} while (/* Stop if the filter wasn't successful...*/
370275970Scy			res == BEV_OK &&
371275970Scy			/* Or if we aren't writing any more. */
372275970Scy			(bufev->enabled & EV_WRITE) &&
373275970Scy			/* Of if we have nothing more to write and we are
374275970Scy			 * not flushing. */
375275970Scy			evbuffer_get_length(bufev->output) &&
376275970Scy			/* Or if we have filled the underlying output buffer. */
377275970Scy			!be_underlying_writebuf_full(bevf,state));
378275970Scy
379275970Scy		if (processed) {
380275970Scy			/* call the write callback.*/
381275970Scy			bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
382275970Scy
383275970Scy			if (res == BEV_OK &&
384275970Scy			    (bufev->enabled & EV_WRITE) &&
385275970Scy			    evbuffer_get_length(bufev->output) &&
386275970Scy			    !be_underlying_writebuf_full(bevf, state)) {
387275970Scy				again = 1;
388275970Scy			}
389275970Scy		}
390275970Scy	} while (again);
391275970Scy
392275970Scy	/* reenable the outbuf_cb */
393275970Scy	evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
394275970Scy	    EVBUFFER_CB_ENABLED);
395275970Scy
396275970Scy	if (*processed_out)
397275970Scy		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
398275970Scy
399275970Scy	return res;
400275970Scy}
401275970Scy
402275970Scy/* Called when the size of our outbuf changes. */
403275970Scystatic void
404275970Scybufferevent_filtered_outbuf_cb(struct evbuffer *buf,
405275970Scy    const struct evbuffer_cb_info *cbinfo, void *arg)
406275970Scy{
407275970Scy	struct bufferevent_filtered *bevf = arg;
408275970Scy	struct bufferevent *bev = downcast(bevf);
409275970Scy
410275970Scy	if (cbinfo->n_added) {
411275970Scy		int processed_any = 0;
412275970Scy		/* Somebody added more data to the output buffer. Try to
413275970Scy		 * process it, if we should. */
414275970Scy		bufferevent_incref_and_lock_(bev);
415275970Scy		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
416275970Scy		bufferevent_decref_and_unlock_(bev);
417275970Scy	}
418275970Scy}
419275970Scy
420275970Scy/* Called when the underlying socket has read. */
421275970Scystatic void
422275970Scybe_filter_readcb(struct bufferevent *underlying, void *me_)
423275970Scy{
424275970Scy	struct bufferevent_filtered *bevf = me_;
425275970Scy	enum bufferevent_filter_result res;
426275970Scy	enum bufferevent_flush_mode state;
427275970Scy	struct bufferevent *bufev = downcast(bevf);
428282408Scy	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
429275970Scy	int processed_any = 0;
430275970Scy
431282408Scy	BEV_LOCK(bufev);
432275970Scy
433282408Scy	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
434282408Scy	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
435275970Scy
436282408Scy	// If our refcount is > 0
437282408Scy	if (bufev_private->refcnt > 0) {
438275970Scy
439282408Scy		if (bevf->got_eof)
440282408Scy			state = BEV_FINISHED;
441282408Scy		else
442282408Scy			state = BEV_NORMAL;
443275970Scy
444282408Scy		/* XXXX use return value */
445282408Scy		res = be_filter_process_input(bevf, state, &processed_any);
446282408Scy		(void)res;
447282408Scy
448282408Scy		/* XXX This should be in process_input, not here.  There are
449282408Scy		 * other places that can call process-input, and they should
450282408Scy		 * force readcb calls as needed. */
451282408Scy		if (processed_any)
452282408Scy			bufferevent_trigger_nolock_(bufev, EV_READ, 0);
453282408Scy	}
454282408Scy
455282408Scy	BEV_UNLOCK(bufev);
456275970Scy}
457275970Scy
458275970Scy/* Called when the underlying socket has drained enough that we can write to
459275970Scy   it. */
460275970Scystatic void
461275970Scybe_filter_writecb(struct bufferevent *underlying, void *me_)
462275970Scy{
463275970Scy	struct bufferevent_filtered *bevf = me_;
464275970Scy	struct bufferevent *bev = downcast(bevf);
465282408Scy	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
466275970Scy	int processed_any = 0;
467275970Scy
468282408Scy	BEV_LOCK(bev);
469282408Scy
470282408Scy	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
471282408Scy	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
472282408Scy
473282408Scy	// If our refcount is > 0
474282408Scy	if (bufev_private->refcnt > 0) {
475282408Scy		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
476282408Scy	}
477282408Scy
478282408Scy	BEV_UNLOCK(bev);
479275970Scy}
480275970Scy
481275970Scy/* Called when the underlying socket has given us an error */
482275970Scystatic void
483275970Scybe_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
484275970Scy{
485275970Scy	struct bufferevent_filtered *bevf = me_;
486275970Scy	struct bufferevent *bev = downcast(bevf);
487282408Scy	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
488275970Scy
489282408Scy	BEV_LOCK(bev);
490282408Scy
491282408Scy	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
492282408Scy	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
493282408Scy
494282408Scy	// If our refcount is > 0
495282408Scy	if (bufev_private->refcnt > 0) {
496282408Scy
497282408Scy		/* All we can really to is tell our own eventcb. */
498282408Scy		bufferevent_run_eventcb_(bev, what, 0);
499282408Scy	}
500282408Scy
501282408Scy	BEV_UNLOCK(bev);
502275970Scy}
503275970Scy
504275970Scystatic int
505275970Scybe_filter_flush(struct bufferevent *bufev,
506275970Scy    short iotype, enum bufferevent_flush_mode mode)
507275970Scy{
508275970Scy	struct bufferevent_filtered *bevf = upcast(bufev);
509275970Scy	int processed_any = 0;
510275970Scy	EVUTIL_ASSERT(bevf);
511275970Scy
512275970Scy	bufferevent_incref_and_lock_(bufev);
513275970Scy
514275970Scy	if (iotype & EV_READ) {
515275970Scy		be_filter_process_input(bevf, mode, &processed_any);
516275970Scy	}
517275970Scy	if (iotype & EV_WRITE) {
518275970Scy		be_filter_process_output(bevf, mode, &processed_any);
519275970Scy	}
520275970Scy	/* XXX check the return value? */
521275970Scy	/* XXX does this want to recursively call lower-level flushes? */
522275970Scy	bufferevent_flush(bevf->underlying, iotype, mode);
523275970Scy
524275970Scy	bufferevent_decref_and_unlock_(bufev);
525275970Scy
526275970Scy	return processed_any;
527275970Scy}
528275970Scy
529275970Scystatic int
530275970Scybe_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
531275970Scy    union bufferevent_ctrl_data *data)
532275970Scy{
533275970Scy	struct bufferevent_filtered *bevf;
534275970Scy	switch (op) {
535275970Scy	case BEV_CTRL_GET_UNDERLYING:
536275970Scy		bevf = upcast(bev);
537275970Scy		data->ptr = bevf->underlying;
538275970Scy		return 0;
539275970Scy	case BEV_CTRL_GET_FD:
540275970Scy	case BEV_CTRL_SET_FD:
541275970Scy	case BEV_CTRL_CANCEL_ALL:
542275970Scy	default:
543275970Scy		return -1;
544275970Scy	}
545275970Scy}
546