1275970Scy/*
2275970Scy * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu>
3275970Scy * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
4275970Scy *
5275970Scy * Redistribution and use in source and binary forms, with or without
6275970Scy * modification, are permitted provided that the following conditions
7275970Scy * are met:
8275970Scy * 1. Redistributions of source code must retain the above copyright
9275970Scy *    notice, this list of conditions and the following disclaimer.
10275970Scy * 2. Redistributions in binary form must reproduce the above copyright
11275970Scy *    notice, this list of conditions and the following disclaimer in the
12275970Scy *    documentation and/or other materials provided with the distribution.
13275970Scy * 3. The name of the author may not be used to endorse or promote products
14275970Scy *    derived from this software without specific prior written permission.
15275970Scy *
16275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26275970Scy */
27275970Scy
28275970Scy#include "event2/event-config.h"
29275970Scy#include "evconfig-private.h"
30275970Scy
31275970Scy#include <sys/types.h>
32275970Scy
33275970Scy#ifdef EVENT__HAVE_SYS_TIME_H
34275970Scy#include <sys/time.h>
35275970Scy#endif
36275970Scy
37275970Scy#include <errno.h>
38275970Scy#include <stdio.h>
39275970Scy#include <stdlib.h>
40275970Scy#include <string.h>
41275970Scy#ifdef EVENT__HAVE_STDARG_H
42275970Scy#include <stdarg.h>
43275970Scy#endif
44275970Scy
45275970Scy#ifdef _WIN32
46275970Scy#include <winsock2.h>
47275970Scy#endif
48275970Scy#include <errno.h>
49275970Scy
50275970Scy#include "event2/util.h"
51275970Scy#include "event2/buffer.h"
52275970Scy#include "event2/buffer_compat.h"
53275970Scy#include "event2/bufferevent.h"
54275970Scy#include "event2/bufferevent_struct.h"
55275970Scy#include "event2/bufferevent_compat.h"
56275970Scy#include "event2/event.h"
57275970Scy#include "event-internal.h"
58275970Scy#include "log-internal.h"
59275970Scy#include "mm-internal.h"
60275970Scy#include "bufferevent-internal.h"
61275970Scy#include "evbuffer-internal.h"
62275970Scy#include "util-internal.h"
63275970Scy
64275970Scystatic void bufferevent_cancel_all_(struct bufferevent *bev);
65275970Scystatic void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
66275970Scy
67275970Scyvoid
68275970Scybufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
69275970Scy{
70275970Scy	struct bufferevent_private *bufev_private =
71275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
72275970Scy	BEV_LOCK(bufev);
73275970Scy	if (!bufev_private->read_suspended)
74275970Scy		bufev->be_ops->disable(bufev, EV_READ);
75275970Scy	bufev_private->read_suspended |= what;
76275970Scy	BEV_UNLOCK(bufev);
77275970Scy}
78275970Scy
79275970Scyvoid
80275970Scybufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
81275970Scy{
82275970Scy	struct bufferevent_private *bufev_private =
83275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
84275970Scy	BEV_LOCK(bufev);
85275970Scy	bufev_private->read_suspended &= ~what;
86275970Scy	if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
87275970Scy		bufev->be_ops->enable(bufev, EV_READ);
88275970Scy	BEV_UNLOCK(bufev);
89275970Scy}
90275970Scy
91275970Scyvoid
92275970Scybufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
93275970Scy{
94275970Scy	struct bufferevent_private *bufev_private =
95275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
96275970Scy	BEV_LOCK(bufev);
97275970Scy	if (!bufev_private->write_suspended)
98275970Scy		bufev->be_ops->disable(bufev, EV_WRITE);
99275970Scy	bufev_private->write_suspended |= what;
100275970Scy	BEV_UNLOCK(bufev);
101275970Scy}
102275970Scy
103275970Scyvoid
104275970Scybufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
105275970Scy{
106275970Scy	struct bufferevent_private *bufev_private =
107275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
108275970Scy	BEV_LOCK(bufev);
109275970Scy	bufev_private->write_suspended &= ~what;
110275970Scy	if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
111275970Scy		bufev->be_ops->enable(bufev, EV_WRITE);
112275970Scy	BEV_UNLOCK(bufev);
113275970Scy}
114275970Scy
115275970Scy
116275970Scy/* Callback to implement watermarks on the input buffer.  Only enabled
117275970Scy * if the watermark is set. */
118275970Scystatic void
119275970Scybufferevent_inbuf_wm_cb(struct evbuffer *buf,
120275970Scy    const struct evbuffer_cb_info *cbinfo,
121275970Scy    void *arg)
122275970Scy{
123275970Scy	struct bufferevent *bufev = arg;
124275970Scy	size_t size;
125275970Scy
126275970Scy	size = evbuffer_get_length(buf);
127275970Scy
128275970Scy	if (size >= bufev->wm_read.high)
129275970Scy		bufferevent_wm_suspend_read(bufev);
130275970Scy	else
131275970Scy		bufferevent_wm_unsuspend_read(bufev);
132275970Scy}
133275970Scy
134275970Scystatic void
135275970Scybufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
136275970Scy{
137275970Scy	struct bufferevent_private *bufev_private = arg;
138275970Scy	struct bufferevent *bufev = &bufev_private->bev;
139275970Scy
140275970Scy	BEV_LOCK(bufev);
141275970Scy	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
142275970Scy	    bufev->errorcb) {
143275970Scy		/* The "connected" happened before any reads or writes, so
144275970Scy		   send it first. */
145275970Scy		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
146275970Scy		bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
147275970Scy	}
148275970Scy	if (bufev_private->readcb_pending && bufev->readcb) {
149275970Scy		bufev_private->readcb_pending = 0;
150275970Scy		bufev->readcb(bufev, bufev->cbarg);
151275970Scy	}
152275970Scy	if (bufev_private->writecb_pending && bufev->writecb) {
153275970Scy		bufev_private->writecb_pending = 0;
154275970Scy		bufev->writecb(bufev, bufev->cbarg);
155275970Scy	}
156275970Scy	if (bufev_private->eventcb_pending && bufev->errorcb) {
157275970Scy		short what = bufev_private->eventcb_pending;
158275970Scy		int err = bufev_private->errno_pending;
159275970Scy		bufev_private->eventcb_pending = 0;
160275970Scy		bufev_private->errno_pending = 0;
161275970Scy		EVUTIL_SET_SOCKET_ERROR(err);
162275970Scy		bufev->errorcb(bufev, what, bufev->cbarg);
163275970Scy	}
164275970Scy	bufferevent_decref_and_unlock_(bufev);
165275970Scy}
166275970Scy
167275970Scystatic void
168275970Scybufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg)
169275970Scy{
170275970Scy	struct bufferevent_private *bufev_private = arg;
171275970Scy	struct bufferevent *bufev = &bufev_private->bev;
172275970Scy
173275970Scy	BEV_LOCK(bufev);
174275970Scy#define UNLOCKED(stmt) \
175275970Scy	do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
176275970Scy
177275970Scy	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
178275970Scy	    bufev->errorcb) {
179275970Scy		/* The "connected" happened before any reads or writes, so
180275970Scy		   send it first. */
181275970Scy		bufferevent_event_cb errorcb = bufev->errorcb;
182275970Scy		void *cbarg = bufev->cbarg;
183275970Scy		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
184275970Scy		UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
185275970Scy	}
186275970Scy	if (bufev_private->readcb_pending && bufev->readcb) {
187275970Scy		bufferevent_data_cb readcb = bufev->readcb;
188275970Scy		void *cbarg = bufev->cbarg;
189275970Scy		bufev_private->readcb_pending = 0;
190275970Scy		UNLOCKED(readcb(bufev, cbarg));
191275970Scy	}
192275970Scy	if (bufev_private->writecb_pending && bufev->writecb) {
193275970Scy		bufferevent_data_cb writecb = bufev->writecb;
194275970Scy		void *cbarg = bufev->cbarg;
195275970Scy		bufev_private->writecb_pending = 0;
196275970Scy		UNLOCKED(writecb(bufev, cbarg));
197275970Scy	}
198275970Scy	if (bufev_private->eventcb_pending && bufev->errorcb) {
199275970Scy		bufferevent_event_cb errorcb = bufev->errorcb;
200275970Scy		void *cbarg = bufev->cbarg;
201275970Scy		short what = bufev_private->eventcb_pending;
202275970Scy		int err = bufev_private->errno_pending;
203275970Scy		bufev_private->eventcb_pending = 0;
204275970Scy		bufev_private->errno_pending = 0;
205275970Scy		EVUTIL_SET_SOCKET_ERROR(err);
206275970Scy		UNLOCKED(errorcb(bufev,what,cbarg));
207275970Scy	}
208275970Scy	bufferevent_decref_and_unlock_(bufev);
209275970Scy#undef UNLOCKED
210275970Scy}
211275970Scy
212275970Scy#define SCHEDULE_DEFERRED(bevp)						\
213275970Scy	do {								\
214275970Scy		if (event_deferred_cb_schedule_(			\
215275970Scy			    (bevp)->bev.ev_base,			\
216275970Scy			&(bevp)->deferred))				\
217275970Scy			bufferevent_incref_(&(bevp)->bev);		\
218275970Scy	} while (0)
219275970Scy
220275970Scy
221275970Scyvoid
222275970Scybufferevent_run_readcb_(struct bufferevent *bufev, int options)
223275970Scy{
224275970Scy	/* Requires that we hold the lock and a reference */
225275970Scy	struct bufferevent_private *p =
226275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
227275970Scy	if (bufev->readcb == NULL)
228275970Scy		return;
229275970Scy	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
230275970Scy		p->readcb_pending = 1;
231275970Scy		SCHEDULE_DEFERRED(p);
232275970Scy	} else {
233275970Scy		bufev->readcb(bufev, bufev->cbarg);
234275970Scy	}
235275970Scy}
236275970Scy
237275970Scyvoid
238275970Scybufferevent_run_writecb_(struct bufferevent *bufev, int options)
239275970Scy{
240275970Scy	/* Requires that we hold the lock and a reference */
241275970Scy	struct bufferevent_private *p =
242275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
243275970Scy	if (bufev->writecb == NULL)
244275970Scy		return;
245275970Scy	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
246275970Scy		p->writecb_pending = 1;
247275970Scy		SCHEDULE_DEFERRED(p);
248275970Scy	} else {
249275970Scy		bufev->writecb(bufev, bufev->cbarg);
250275970Scy	}
251275970Scy}
252275970Scy
253275970Scy#define BEV_TRIG_ALL_OPTS (			\
254275970Scy		BEV_TRIG_IGNORE_WATERMARKS|	\
255275970Scy		BEV_TRIG_DEFER_CALLBACKS	\
256275970Scy	)
257275970Scy
258275970Scyvoid
259275970Scybufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
260275970Scy{
261275970Scy	bufferevent_incref_and_lock_(bufev);
262275970Scy	bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS);
263275970Scy	bufferevent_decref_and_unlock_(bufev);
264275970Scy}
265275970Scy
266275970Scyvoid
267275970Scybufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
268275970Scy{
269275970Scy	/* Requires that we hold the lock and a reference */
270275970Scy	struct bufferevent_private *p =
271275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
272275970Scy	if (bufev->errorcb == NULL)
273275970Scy		return;
274275970Scy	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
275275970Scy		p->eventcb_pending |= what;
276275970Scy		p->errno_pending = EVUTIL_SOCKET_ERROR();
277275970Scy		SCHEDULE_DEFERRED(p);
278275970Scy	} else {
279275970Scy		bufev->errorcb(bufev, what, bufev->cbarg);
280275970Scy	}
281275970Scy}
282275970Scy
283275970Scyvoid
284275970Scybufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
285275970Scy{
286275970Scy	bufferevent_incref_and_lock_(bufev);
287275970Scy	bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS);
288275970Scy	bufferevent_decref_and_unlock_(bufev);
289275970Scy}
290275970Scy
291275970Scyint
292275970Scybufferevent_init_common_(struct bufferevent_private *bufev_private,
293275970Scy    struct event_base *base,
294275970Scy    const struct bufferevent_ops *ops,
295275970Scy    enum bufferevent_options options)
296275970Scy{
297275970Scy	struct bufferevent *bufev = &bufev_private->bev;
298275970Scy
299275970Scy	if (!bufev->input) {
300275970Scy		if ((bufev->input = evbuffer_new()) == NULL)
301275970Scy			return -1;
302275970Scy	}
303275970Scy
304275970Scy	if (!bufev->output) {
305275970Scy		if ((bufev->output = evbuffer_new()) == NULL) {
306275970Scy			evbuffer_free(bufev->input);
307275970Scy			return -1;
308275970Scy		}
309275970Scy	}
310275970Scy
311275970Scy	bufev_private->refcnt = 1;
312275970Scy	bufev->ev_base = base;
313275970Scy
314275970Scy	/* Disable timeouts. */
315275970Scy	evutil_timerclear(&bufev->timeout_read);
316275970Scy	evutil_timerclear(&bufev->timeout_write);
317275970Scy
318275970Scy	bufev->be_ops = ops;
319275970Scy
320275970Scy	bufferevent_ratelim_init_(bufev_private);
321275970Scy
322275970Scy	/*
323275970Scy	 * Set to EV_WRITE so that using bufferevent_write is going to
324275970Scy	 * trigger a callback.  Reading needs to be explicitly enabled
325275970Scy	 * because otherwise no data will be available.
326275970Scy	 */
327275970Scy	bufev->enabled = EV_WRITE;
328275970Scy
329275970Scy#ifndef EVENT__DISABLE_THREAD_SUPPORT
330275970Scy	if (options & BEV_OPT_THREADSAFE) {
331275970Scy		if (bufferevent_enable_locking_(bufev, NULL) < 0) {
332275970Scy			/* cleanup */
333275970Scy			evbuffer_free(bufev->input);
334275970Scy			evbuffer_free(bufev->output);
335275970Scy			bufev->input = NULL;
336275970Scy			bufev->output = NULL;
337275970Scy			return -1;
338275970Scy		}
339275970Scy	}
340275970Scy#endif
341275970Scy	if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
342275970Scy	    == BEV_OPT_UNLOCK_CALLBACKS) {
343275970Scy		event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
344275970Scy		return -1;
345275970Scy	}
346275970Scy	if (options & BEV_OPT_UNLOCK_CALLBACKS)
347275970Scy		event_deferred_cb_init_(
348275970Scy		    &bufev_private->deferred,
349275970Scy		    event_base_get_npriorities(base) / 2,
350275970Scy		    bufferevent_run_deferred_callbacks_unlocked,
351275970Scy		    bufev_private);
352275970Scy	else
353275970Scy		event_deferred_cb_init_(
354275970Scy		    &bufev_private->deferred,
355275970Scy		    event_base_get_npriorities(base) / 2,
356275970Scy		    bufferevent_run_deferred_callbacks_locked,
357275970Scy		    bufev_private);
358275970Scy
359275970Scy	bufev_private->options = options;
360275970Scy
361275970Scy	evbuffer_set_parent_(bufev->input, bufev);
362275970Scy	evbuffer_set_parent_(bufev->output, bufev);
363275970Scy
364275970Scy	return 0;
365275970Scy}
366275970Scy
367275970Scyvoid
368275970Scybufferevent_setcb(struct bufferevent *bufev,
369275970Scy    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
370275970Scy    bufferevent_event_cb eventcb, void *cbarg)
371275970Scy{
372275970Scy	BEV_LOCK(bufev);
373275970Scy
374275970Scy	bufev->readcb = readcb;
375275970Scy	bufev->writecb = writecb;
376275970Scy	bufev->errorcb = eventcb;
377275970Scy
378275970Scy	bufev->cbarg = cbarg;
379275970Scy	BEV_UNLOCK(bufev);
380275970Scy}
381275970Scy
382275970Scyvoid
383275970Scybufferevent_getcb(struct bufferevent *bufev,
384275970Scy    bufferevent_data_cb *readcb_ptr,
385275970Scy    bufferevent_data_cb *writecb_ptr,
386275970Scy    bufferevent_event_cb *eventcb_ptr,
387275970Scy    void **cbarg_ptr)
388275970Scy{
389275970Scy	BEV_LOCK(bufev);
390275970Scy	if (readcb_ptr)
391275970Scy		*readcb_ptr = bufev->readcb;
392275970Scy	if (writecb_ptr)
393275970Scy		*writecb_ptr = bufev->writecb;
394275970Scy	if (eventcb_ptr)
395275970Scy		*eventcb_ptr = bufev->errorcb;
396275970Scy	if (cbarg_ptr)
397275970Scy		*cbarg_ptr = bufev->cbarg;
398275970Scy
399275970Scy	BEV_UNLOCK(bufev);
400275970Scy}
401275970Scy
402275970Scystruct evbuffer *
403275970Scybufferevent_get_input(struct bufferevent *bufev)
404275970Scy{
405275970Scy	return bufev->input;
406275970Scy}
407275970Scy
408275970Scystruct evbuffer *
409275970Scybufferevent_get_output(struct bufferevent *bufev)
410275970Scy{
411275970Scy	return bufev->output;
412275970Scy}
413275970Scy
414275970Scystruct event_base *
415275970Scybufferevent_get_base(struct bufferevent *bufev)
416275970Scy{
417275970Scy	return bufev->ev_base;
418275970Scy}
419275970Scy
420275970Scyint
421275970Scybufferevent_get_priority(const struct bufferevent *bufev)
422275970Scy{
423275970Scy	if (event_initialized(&bufev->ev_read)) {
424275970Scy		return event_get_priority(&bufev->ev_read);
425275970Scy	} else {
426275970Scy		return event_base_get_npriorities(bufev->ev_base) / 2;
427275970Scy	}
428275970Scy}
429275970Scy
430275970Scyint
431275970Scybufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
432275970Scy{
433275970Scy	if (evbuffer_add(bufev->output, data, size) == -1)
434275970Scy		return (-1);
435275970Scy
436275970Scy	return 0;
437275970Scy}
438275970Scy
439275970Scyint
440275970Scybufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
441275970Scy{
442275970Scy	if (evbuffer_add_buffer(bufev->output, buf) == -1)
443275970Scy		return (-1);
444275970Scy
445275970Scy	return 0;
446275970Scy}
447275970Scy
448275970Scysize_t
449275970Scybufferevent_read(struct bufferevent *bufev, void *data, size_t size)
450275970Scy{
451275970Scy	return (evbuffer_remove(bufev->input, data, size));
452275970Scy}
453275970Scy
454275970Scyint
455275970Scybufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
456275970Scy{
457275970Scy	return (evbuffer_add_buffer(buf, bufev->input));
458275970Scy}
459275970Scy
460275970Scyint
461275970Scybufferevent_enable(struct bufferevent *bufev, short event)
462275970Scy{
463275970Scy	struct bufferevent_private *bufev_private =
464275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
465275970Scy	short impl_events = event;
466275970Scy	int r = 0;
467275970Scy
468275970Scy	bufferevent_incref_and_lock_(bufev);
469275970Scy	if (bufev_private->read_suspended)
470275970Scy		impl_events &= ~EV_READ;
471275970Scy	if (bufev_private->write_suspended)
472275970Scy		impl_events &= ~EV_WRITE;
473275970Scy
474275970Scy	bufev->enabled |= event;
475275970Scy
476275970Scy	if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
477275970Scy		r = -1;
478275970Scy
479275970Scy	bufferevent_decref_and_unlock_(bufev);
480275970Scy	return r;
481275970Scy}
482275970Scy
483275970Scyint
484275970Scybufferevent_set_timeouts(struct bufferevent *bufev,
485275970Scy			 const struct timeval *tv_read,
486275970Scy			 const struct timeval *tv_write)
487275970Scy{
488275970Scy	int r = 0;
489275970Scy	BEV_LOCK(bufev);
490275970Scy	if (tv_read) {
491275970Scy		bufev->timeout_read = *tv_read;
492275970Scy	} else {
493275970Scy		evutil_timerclear(&bufev->timeout_read);
494275970Scy	}
495275970Scy	if (tv_write) {
496275970Scy		bufev->timeout_write = *tv_write;
497275970Scy	} else {
498275970Scy		evutil_timerclear(&bufev->timeout_write);
499275970Scy	}
500275970Scy
501275970Scy	if (bufev->be_ops->adj_timeouts)
502275970Scy		r = bufev->be_ops->adj_timeouts(bufev);
503275970Scy	BEV_UNLOCK(bufev);
504275970Scy
505275970Scy	return r;
506275970Scy}
507275970Scy
508275970Scy
509275970Scy/* Obsolete; use bufferevent_set_timeouts */
510275970Scyvoid
511275970Scybufferevent_settimeout(struct bufferevent *bufev,
512275970Scy		       int timeout_read, int timeout_write)
513275970Scy{
514275970Scy	struct timeval tv_read, tv_write;
515275970Scy	struct timeval *ptv_read = NULL, *ptv_write = NULL;
516275970Scy
517275970Scy	memset(&tv_read, 0, sizeof(tv_read));
518275970Scy	memset(&tv_write, 0, sizeof(tv_write));
519275970Scy
520275970Scy	if (timeout_read) {
521275970Scy		tv_read.tv_sec = timeout_read;
522275970Scy		ptv_read = &tv_read;
523275970Scy	}
524275970Scy	if (timeout_write) {
525275970Scy		tv_write.tv_sec = timeout_write;
526275970Scy		ptv_write = &tv_write;
527275970Scy	}
528275970Scy
529275970Scy	bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
530275970Scy}
531275970Scy
532275970Scy
533275970Scyint
534275970Scybufferevent_disable_hard_(struct bufferevent *bufev, short event)
535275970Scy{
536275970Scy	int r = 0;
537275970Scy	struct bufferevent_private *bufev_private =
538275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
539275970Scy
540275970Scy	BEV_LOCK(bufev);
541275970Scy	bufev->enabled &= ~event;
542275970Scy
543275970Scy	bufev_private->connecting = 0;
544275970Scy	if (bufev->be_ops->disable(bufev, event) < 0)
545275970Scy		r = -1;
546275970Scy
547275970Scy	BEV_UNLOCK(bufev);
548275970Scy	return r;
549275970Scy}
550275970Scy
551275970Scyint
552275970Scybufferevent_disable(struct bufferevent *bufev, short event)
553275970Scy{
554275970Scy	int r = 0;
555275970Scy
556275970Scy	BEV_LOCK(bufev);
557275970Scy	bufev->enabled &= ~event;
558275970Scy
559275970Scy	if (bufev->be_ops->disable(bufev, event) < 0)
560275970Scy		r = -1;
561275970Scy
562275970Scy	BEV_UNLOCK(bufev);
563275970Scy	return r;
564275970Scy}
565275970Scy
566275970Scy/*
567275970Scy * Sets the water marks
568275970Scy */
569275970Scy
570275970Scyvoid
571275970Scybufferevent_setwatermark(struct bufferevent *bufev, short events,
572275970Scy    size_t lowmark, size_t highmark)
573275970Scy{
574275970Scy	struct bufferevent_private *bufev_private =
575275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
576275970Scy
577275970Scy	BEV_LOCK(bufev);
578275970Scy	if (events & EV_WRITE) {
579275970Scy		bufev->wm_write.low = lowmark;
580275970Scy		bufev->wm_write.high = highmark;
581275970Scy	}
582275970Scy
583275970Scy	if (events & EV_READ) {
584275970Scy		bufev->wm_read.low = lowmark;
585275970Scy		bufev->wm_read.high = highmark;
586275970Scy
587275970Scy		if (highmark) {
588275970Scy			/* There is now a new high-water mark for read.
589275970Scy			   enable the callback if needed, and see if we should
590275970Scy			   suspend/bufferevent_wm_unsuspend. */
591275970Scy
592275970Scy			if (bufev_private->read_watermarks_cb == NULL) {
593275970Scy				bufev_private->read_watermarks_cb =
594275970Scy				    evbuffer_add_cb(bufev->input,
595275970Scy						    bufferevent_inbuf_wm_cb,
596275970Scy						    bufev);
597275970Scy			}
598275970Scy			evbuffer_cb_set_flags(bufev->input,
599275970Scy				      bufev_private->read_watermarks_cb,
600275970Scy				      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
601275970Scy
602290000Sglebius			if (evbuffer_get_length(bufev->input) >= highmark)
603275970Scy				bufferevent_wm_suspend_read(bufev);
604275970Scy			else if (evbuffer_get_length(bufev->input) < highmark)
605275970Scy				bufferevent_wm_unsuspend_read(bufev);
606275970Scy		} else {
607275970Scy			/* There is now no high-water mark for read. */
608275970Scy			if (bufev_private->read_watermarks_cb)
609275970Scy				evbuffer_cb_clear_flags(bufev->input,
610275970Scy				    bufev_private->read_watermarks_cb,
611275970Scy				    EVBUFFER_CB_ENABLED);
612275970Scy			bufferevent_wm_unsuspend_read(bufev);
613275970Scy		}
614275970Scy	}
615275970Scy	BEV_UNLOCK(bufev);
616275970Scy}
617275970Scy
618290000Sglebiusint
619275970Scybufferevent_getwatermark(struct bufferevent *bufev, short events,
620275970Scy    size_t *lowmark, size_t *highmark)
621275970Scy{
622275970Scy	if (events == EV_WRITE) {
623290000Sglebius		BEV_LOCK(bufev);
624275970Scy		if (lowmark)
625275970Scy			*lowmark = bufev->wm_write.low;
626275970Scy		if (highmark)
627275970Scy			*highmark = bufev->wm_write.high;
628290000Sglebius		BEV_UNLOCK(bufev);
629290000Sglebius		return 0;
630275970Scy	}
631275970Scy
632275970Scy	if (events == EV_READ) {
633290000Sglebius		BEV_LOCK(bufev);
634275970Scy		if (lowmark)
635275970Scy			*lowmark = bufev->wm_read.low;
636275970Scy		if (highmark)
637275970Scy			*highmark = bufev->wm_read.high;
638290000Sglebius		BEV_UNLOCK(bufev);
639290000Sglebius		return 0;
640275970Scy	}
641290000Sglebius	return -1;
642275970Scy}
643275970Scy
644275970Scyint
645275970Scybufferevent_flush(struct bufferevent *bufev,
646275970Scy    short iotype,
647275970Scy    enum bufferevent_flush_mode mode)
648275970Scy{
649275970Scy	int r = -1;
650275970Scy	BEV_LOCK(bufev);
651275970Scy	if (bufev->be_ops->flush)
652275970Scy		r = bufev->be_ops->flush(bufev, iotype, mode);
653275970Scy	BEV_UNLOCK(bufev);
654275970Scy	return r;
655275970Scy}
656275970Scy
657275970Scyvoid
658275970Scybufferevent_incref_and_lock_(struct bufferevent *bufev)
659275970Scy{
660275970Scy	struct bufferevent_private *bufev_private =
661275970Scy	    BEV_UPCAST(bufev);
662275970Scy	BEV_LOCK(bufev);
663275970Scy	++bufev_private->refcnt;
664275970Scy}
665275970Scy
666275970Scy#if 0
667275970Scystatic void
668275970Scybufferevent_transfer_lock_ownership_(struct bufferevent *donor,
669275970Scy    struct bufferevent *recipient)
670275970Scy{
671275970Scy	struct bufferevent_private *d = BEV_UPCAST(donor);
672275970Scy	struct bufferevent_private *r = BEV_UPCAST(recipient);
673275970Scy	if (d->lock != r->lock)
674275970Scy		return;
675275970Scy	if (r->own_lock)
676275970Scy		return;
677275970Scy	if (d->own_lock) {
678275970Scy		d->own_lock = 0;
679275970Scy		r->own_lock = 1;
680275970Scy	}
681275970Scy}
682275970Scy#endif
683275970Scy
684275970Scyint
685275970Scybufferevent_decref_and_unlock_(struct bufferevent *bufev)
686275970Scy{
687275970Scy	struct bufferevent_private *bufev_private =
688275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
689275970Scy	int n_cbs = 0;
690275970Scy#define MAX_CBS 16
691275970Scy	struct event_callback *cbs[MAX_CBS];
692275970Scy
693275970Scy	EVUTIL_ASSERT(bufev_private->refcnt > 0);
694275970Scy
695275970Scy	if (--bufev_private->refcnt) {
696275970Scy		BEV_UNLOCK(bufev);
697275970Scy		return 0;
698275970Scy	}
699275970Scy
700275970Scy	if (bufev->be_ops->unlink)
701275970Scy		bufev->be_ops->unlink(bufev);
702275970Scy
703275970Scy	/* Okay, we're out of references. Let's finalize this once all the
704275970Scy	 * callbacks are done running. */
705275970Scy	cbs[0] = &bufev->ev_read.ev_evcallback;
706275970Scy	cbs[1] = &bufev->ev_write.ev_evcallback;
707275970Scy	cbs[2] = &bufev_private->deferred;
708275970Scy	n_cbs = 3;
709275970Scy	if (bufev_private->rate_limiting) {
710275970Scy		struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
711275970Scy		if (event_initialized(e))
712275970Scy			cbs[n_cbs++] = &e->ev_evcallback;
713275970Scy	}
714275970Scy	n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
715275970Scy	n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
716275970Scy
717275970Scy	event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
718275970Scy	    bufferevent_finalize_cb_);
719275970Scy
720275970Scy#undef MAX_CBS
721275970Scy	BEV_UNLOCK(bufev);
722275970Scy
723275970Scy	return 1;
724275970Scy}
725275970Scy
726275970Scystatic void
727275970Scybufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
728275970Scy{
729275970Scy	struct bufferevent *bufev = arg_;
730275970Scy	struct bufferevent *underlying;
731275970Scy	struct bufferevent_private *bufev_private =
732275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
733275970Scy
734275970Scy	BEV_LOCK(bufev);
735275970Scy	underlying = bufferevent_get_underlying(bufev);
736275970Scy
737275970Scy	/* Clean up the shared info */
738275970Scy	if (bufev->be_ops->destruct)
739275970Scy		bufev->be_ops->destruct(bufev);
740275970Scy
741275970Scy	/* XXX what happens if refcnt for these buffers is > 1?
742275970Scy	 * The buffers can share a lock with this bufferevent object,
743275970Scy	 * but the lock might be destroyed below. */
744275970Scy	/* evbuffer will free the callbacks */
745275970Scy	evbuffer_free(bufev->input);
746275970Scy	evbuffer_free(bufev->output);
747275970Scy
748275970Scy	if (bufev_private->rate_limiting) {
749275970Scy		if (bufev_private->rate_limiting->group)
750275970Scy			bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
751275970Scy		mm_free(bufev_private->rate_limiting);
752275970Scy		bufev_private->rate_limiting = NULL;
753275970Scy	}
754275970Scy
755275970Scy
756275970Scy	BEV_UNLOCK(bufev);
757275970Scy
758275970Scy	if (bufev_private->own_lock)
759275970Scy		EVTHREAD_FREE_LOCK(bufev_private->lock,
760275970Scy		    EVTHREAD_LOCKTYPE_RECURSIVE);
761275970Scy
762275970Scy	/* Free the actual allocated memory. */
763275970Scy	mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
764275970Scy
765275970Scy	/* Release the reference to underlying now that we no longer need the
766275970Scy	 * reference to it.  We wait this long mainly in case our lock is
767275970Scy	 * shared with underlying.
768275970Scy	 *
769275970Scy	 * The 'destruct' function will also drop a reference to underlying
770275970Scy	 * if BEV_OPT_CLOSE_ON_FREE is set.
771275970Scy	 *
772275970Scy	 * XXX Should we/can we just refcount evbuffer/bufferevent locks?
773275970Scy	 * It would probably save us some headaches.
774275970Scy	 */
775275970Scy	if (underlying)
776275970Scy		bufferevent_decref_(underlying);
777275970Scy}
778275970Scy
779275970Scyint
780275970Scybufferevent_decref_(struct bufferevent *bufev)
781275970Scy{
782275970Scy	BEV_LOCK(bufev);
783275970Scy	return bufferevent_decref_and_unlock_(bufev);
784275970Scy}
785275970Scy
786275970Scyvoid
787275970Scybufferevent_free(struct bufferevent *bufev)
788275970Scy{
789275970Scy	BEV_LOCK(bufev);
790275970Scy	bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
791275970Scy	bufferevent_cancel_all_(bufev);
792275970Scy	bufferevent_decref_and_unlock_(bufev);
793275970Scy}
794275970Scy
795275970Scyvoid
796275970Scybufferevent_incref_(struct bufferevent *bufev)
797275970Scy{
798275970Scy	struct bufferevent_private *bufev_private =
799275970Scy	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
800275970Scy
801275970Scy	BEV_LOCK(bufev);
802275970Scy	++bufev_private->refcnt;
803275970Scy	BEV_UNLOCK(bufev);
804275970Scy}
805275970Scy
806275970Scyint
807275970Scybufferevent_enable_locking_(struct bufferevent *bufev, void *lock)
808275970Scy{
809275970Scy#ifdef EVENT__DISABLE_THREAD_SUPPORT
810275970Scy	return -1;
811275970Scy#else
812275970Scy	struct bufferevent *underlying;
813275970Scy
814275970Scy	if (BEV_UPCAST(bufev)->lock)
815275970Scy		return -1;
816275970Scy	underlying = bufferevent_get_underlying(bufev);
817275970Scy
818275970Scy	if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
819275970Scy		lock = BEV_UPCAST(underlying)->lock;
820275970Scy		BEV_UPCAST(bufev)->lock = lock;
821275970Scy		BEV_UPCAST(bufev)->own_lock = 0;
822275970Scy	} else if (!lock) {
823275970Scy		EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
824275970Scy		if (!lock)
825275970Scy			return -1;
826275970Scy		BEV_UPCAST(bufev)->lock = lock;
827275970Scy		BEV_UPCAST(bufev)->own_lock = 1;
828275970Scy	} else {
829275970Scy		BEV_UPCAST(bufev)->lock = lock;
830275970Scy		BEV_UPCAST(bufev)->own_lock = 0;
831275970Scy	}
832275970Scy	evbuffer_enable_locking(bufev->input, lock);
833275970Scy	evbuffer_enable_locking(bufev->output, lock);
834275970Scy
835275970Scy	if (underlying && !BEV_UPCAST(underlying)->lock)
836275970Scy		bufferevent_enable_locking_(underlying, lock);
837275970Scy
838275970Scy	return 0;
839275970Scy#endif
840275970Scy}
841275970Scy
842275970Scyint
843275970Scybufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
844275970Scy{
845275970Scy	union bufferevent_ctrl_data d;
846275970Scy	int res = -1;
847275970Scy	d.fd = fd;
848275970Scy	BEV_LOCK(bev);
849275970Scy	if (bev->be_ops->ctrl)
850275970Scy		res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
851275970Scy	BEV_UNLOCK(bev);
852275970Scy	return res;
853275970Scy}
854275970Scy
855275970Scyevutil_socket_t
856275970Scybufferevent_getfd(struct bufferevent *bev)
857275970Scy{
858275970Scy	union bufferevent_ctrl_data d;
859275970Scy	int res = -1;
860275970Scy	d.fd = -1;
861275970Scy	BEV_LOCK(bev);
862275970Scy	if (bev->be_ops->ctrl)
863275970Scy		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
864275970Scy	BEV_UNLOCK(bev);
865275970Scy	return (res<0) ? -1 : d.fd;
866275970Scy}
867275970Scy
868275970Scyenum bufferevent_options
869275970Scybufferevent_get_options_(struct bufferevent *bev)
870275970Scy{
871275970Scy	struct bufferevent_private *bev_p =
872275970Scy	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
873275970Scy	enum bufferevent_options options;
874275970Scy
875275970Scy	BEV_LOCK(bev);
876275970Scy	options = bev_p->options;
877275970Scy	BEV_UNLOCK(bev);
878275970Scy	return options;
879275970Scy}
880275970Scy
881275970Scy
882275970Scystatic void
883275970Scybufferevent_cancel_all_(struct bufferevent *bev)
884275970Scy{
885275970Scy	union bufferevent_ctrl_data d;
886275970Scy	memset(&d, 0, sizeof(d));
887275970Scy	BEV_LOCK(bev);
888275970Scy	if (bev->be_ops->ctrl)
889275970Scy		bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
890275970Scy	BEV_UNLOCK(bev);
891275970Scy}
892275970Scy
893275970Scyshort
894275970Scybufferevent_get_enabled(struct bufferevent *bufev)
895275970Scy{
896275970Scy	short r;
897275970Scy	BEV_LOCK(bufev);
898275970Scy	r = bufev->enabled;
899275970Scy	BEV_UNLOCK(bufev);
900275970Scy	return r;
901275970Scy}
902275970Scy
903275970Scystruct bufferevent *
904275970Scybufferevent_get_underlying(struct bufferevent *bev)
905275970Scy{
906275970Scy	union bufferevent_ctrl_data d;
907275970Scy	int res = -1;
908275970Scy	d.ptr = NULL;
909275970Scy	BEV_LOCK(bev);
910275970Scy	if (bev->be_ops->ctrl)
911275970Scy		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
912275970Scy	BEV_UNLOCK(bev);
913275970Scy	return (res<0) ? NULL : d.ptr;
914275970Scy}
915275970Scy
916275970Scystatic void
917275970Scybufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
918275970Scy{
919275970Scy	struct bufferevent *bev = ctx;
920275970Scy	bufferevent_incref_and_lock_(bev);
921275970Scy	bufferevent_disable(bev, EV_READ);
922275970Scy	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
923275970Scy	bufferevent_decref_and_unlock_(bev);
924275970Scy}
925275970Scystatic void
926275970Scybufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
927275970Scy{
928275970Scy	struct bufferevent *bev = ctx;
929275970Scy	bufferevent_incref_and_lock_(bev);
930275970Scy	bufferevent_disable(bev, EV_WRITE);
931275970Scy	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
932275970Scy	bufferevent_decref_and_unlock_(bev);
933275970Scy}
934275970Scy
935275970Scyvoid
936275970Scybufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
937275970Scy{
938275970Scy	event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
939275970Scy	    bufferevent_generic_read_timeout_cb, bev);
940275970Scy	event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
941275970Scy	    bufferevent_generic_write_timeout_cb, bev);
942275970Scy}
943275970Scy
944275970Scyint
945275970Scybufferevent_generic_adj_timeouts_(struct bufferevent *bev)
946275970Scy{
947275970Scy	const short enabled = bev->enabled;
948275970Scy	struct bufferevent_private *bev_p =
949275970Scy	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
950275970Scy	int r1=0, r2=0;
951275970Scy	if ((enabled & EV_READ) && !bev_p->read_suspended &&
952275970Scy	    evutil_timerisset(&bev->timeout_read))
953275970Scy		r1 = event_add(&bev->ev_read, &bev->timeout_read);
954275970Scy	else
955275970Scy		r1 = event_del(&bev->ev_read);
956275970Scy
957275970Scy	if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
958275970Scy	    evutil_timerisset(&bev->timeout_write) &&
959275970Scy	    evbuffer_get_length(bev->output))
960275970Scy		r2 = event_add(&bev->ev_write, &bev->timeout_write);
961275970Scy	else
962275970Scy		r2 = event_del(&bev->ev_write);
963275970Scy	if (r1 < 0 || r2 < 0)
964275970Scy		return -1;
965275970Scy	return 0;
966275970Scy}
967275970Scy
968275970Scyint
969275970Scybufferevent_add_event_(struct event *ev, const struct timeval *tv)
970275970Scy{
971275970Scy	if (tv->tv_sec == 0 && tv->tv_usec == 0)
972275970Scy		return event_add(ev, NULL);
973275970Scy	else
974275970Scy		return event_add(ev, tv);
975275970Scy}
976275970Scy
977275970Scy/* For use by user programs only; internally, we should be calling
978275970Scy   either bufferevent_incref_and_lock_(), or BEV_LOCK. */
979275970Scyvoid
980275970Scybufferevent_lock(struct bufferevent *bev)
981275970Scy{
982275970Scy	bufferevent_incref_and_lock_(bev);
983275970Scy}
984275970Scy
985275970Scyvoid
986275970Scybufferevent_unlock(struct bufferevent *bev)
987275970Scy{
988275970Scy	bufferevent_decref_and_unlock_(bev);
989275970Scy}
990