1171169Smlaier/*
2171169Smlaier * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3171169Smlaier * All rights reserved.
4171169Smlaier *
5171169Smlaier * Redistribution and use in source and binary forms, with or without
6171169Smlaier * modification, are permitted provided that the following conditions
7171169Smlaier * are met:
8171169Smlaier * 1. Redistributions of source code must retain the above copyright
9171169Smlaier *    notice, this list of conditions and the following disclaimer.
10171169Smlaier * 2. Redistributions in binary form must reproduce the above copyright
11171169Smlaier *    notice, this list of conditions and the following disclaimer in the
12171169Smlaier *    documentation and/or other materials provided with the distribution.
13171169Smlaier * 3. The name of the author may not be used to endorse or promote products
14171169Smlaier *    derived from this software without specific prior written permission.
15171169Smlaier *
16171169Smlaier * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17171169Smlaier * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18171169Smlaier * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19171169Smlaier * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20171169Smlaier * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21171169Smlaier * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22171169Smlaier * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23171169Smlaier * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24171169Smlaier * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25171169Smlaier * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26171169Smlaier */
27171169Smlaier
28171169Smlaier#include <sys/types.h>
29171169Smlaier
30171169Smlaier#ifdef HAVE_CONFIG_H
31171169Smlaier#include "config.h"
32171169Smlaier#endif
33171169Smlaier
34171169Smlaier#ifdef HAVE_SYS_TIME_H
35171169Smlaier#include <sys/time.h>
36171169Smlaier#endif
37171169Smlaier
38171169Smlaier#include <errno.h>
39171169Smlaier#include <stdio.h>
40171169Smlaier#include <stdlib.h>
41171169Smlaier#include <string.h>
42171169Smlaier#ifdef HAVE_STDARG_H
43171169Smlaier#include <stdarg.h>
44171169Smlaier#endif
45171169Smlaier
46171169Smlaier#include "event.h"
47171169Smlaier
48171169Smlaier/* prototypes */
49171169Smlaier
50171169Smlaiervoid bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t);
51171169Smlaiervoid bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
52171169Smlaier
53171169Smlaierstatic int
54171169Smlaierbufferevent_add(struct event *ev, int timeout)
55171169Smlaier{
56171169Smlaier	struct timeval tv, *ptv = NULL;
57171169Smlaier
58171169Smlaier	if (timeout) {
59171169Smlaier		timerclear(&tv);
60171169Smlaier		tv.tv_sec = timeout;
61171169Smlaier		ptv = &tv;
62171169Smlaier	}
63171169Smlaier
64171169Smlaier	return (event_add(ev, ptv));
65171169Smlaier}
66171169Smlaier
67171169Smlaier/*
68171169Smlaier * This callback is executed when the size of the input buffer changes.
69171169Smlaier * We use it to apply back pressure on the reading side.
70171169Smlaier */
71171169Smlaier
72171169Smlaiervoid
73171169Smlaierbufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
74171169Smlaier    void *arg) {
75171169Smlaier	struct bufferevent *bufev = arg;
76171169Smlaier	/*
77171169Smlaier	 * If we are below the watermark then reschedule reading if it's
78171169Smlaier	 * still enabled.
79171169Smlaier	 */
80171169Smlaier	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
81171169Smlaier		evbuffer_setcb(buf, NULL, NULL);
82171169Smlaier
83171169Smlaier		if (bufev->enabled & EV_READ)
84171169Smlaier			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
85171169Smlaier	}
86171169Smlaier}
87171169Smlaier
88171169Smlaierstatic void
89171169Smlaierbufferevent_readcb(int fd, short event, void *arg)
90171169Smlaier{
91171169Smlaier	struct bufferevent *bufev = arg;
92171169Smlaier	int res = 0;
93171169Smlaier	short what = EVBUFFER_READ;
94171169Smlaier	size_t len;
95171169Smlaier	int howmuch = -1;
96171169Smlaier
97171169Smlaier	if (event == EV_TIMEOUT) {
98171169Smlaier		what |= EVBUFFER_TIMEOUT;
99171169Smlaier		goto error;
100171169Smlaier	}
101171169Smlaier
102171169Smlaier	/*
103171169Smlaier	 * If we have a high watermark configured then we don't want to
104171169Smlaier	 * read more data than would make us reach the watermark.
105171169Smlaier	 */
106171169Smlaier	if (bufev->wm_read.high != 0)
107171169Smlaier		howmuch = bufev->wm_read.high;
108171169Smlaier
109171169Smlaier	res = evbuffer_read(bufev->input, fd, howmuch);
110171169Smlaier	if (res == -1) {
111171169Smlaier		if (errno == EAGAIN || errno == EINTR)
112171169Smlaier			goto reschedule;
113171169Smlaier		/* error case */
114171169Smlaier		what |= EVBUFFER_ERROR;
115171169Smlaier	} else if (res == 0) {
116171169Smlaier		/* eof case */
117171169Smlaier		what |= EVBUFFER_EOF;
118171169Smlaier	}
119171169Smlaier
120171169Smlaier	if (res <= 0)
121171169Smlaier		goto error;
122171169Smlaier
123171169Smlaier	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
124171169Smlaier
125171169Smlaier	/* See if this callbacks meets the water marks */
126171169Smlaier	len = EVBUFFER_LENGTH(bufev->input);
127171169Smlaier	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
128171169Smlaier		return;
129171169Smlaier	if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
130171169Smlaier		struct evbuffer *buf = bufev->input;
131171169Smlaier		event_del(&bufev->ev_read);
132171169Smlaier
133171169Smlaier		/* Now schedule a callback for us */
134171169Smlaier		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
135171169Smlaier		return;
136171169Smlaier	}
137171169Smlaier
138171169Smlaier	/* Invoke the user callback - must always be called last */
139171169Smlaier	if (bufev->readcb != NULL)
140171169Smlaier		(*bufev->readcb)(bufev, bufev->cbarg);
141171169Smlaier	return;
142171169Smlaier
143171169Smlaier reschedule:
144171169Smlaier	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
145171169Smlaier	return;
146171169Smlaier
147171169Smlaier error:
148171169Smlaier	(*bufev->errorcb)(bufev, what, bufev->cbarg);
149171169Smlaier}
150171169Smlaier
151171169Smlaierstatic void
152171169Smlaierbufferevent_writecb(int fd, short event, void *arg)
153171169Smlaier{
154171169Smlaier	struct bufferevent *bufev = arg;
155171169Smlaier	int res = 0;
156171169Smlaier	short what = EVBUFFER_WRITE;
157171169Smlaier
158171169Smlaier	if (event == EV_TIMEOUT) {
159171169Smlaier		what |= EVBUFFER_TIMEOUT;
160171169Smlaier		goto error;
161171169Smlaier	}
162171169Smlaier
163171169Smlaier	if (EVBUFFER_LENGTH(bufev->output)) {
164171169Smlaier	    res = evbuffer_write(bufev->output, fd);
165171169Smlaier	    if (res == -1) {
166171169Smlaier#ifndef WIN32
167171169Smlaier/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
168171169Smlaier *set errno. thus this error checking is not portable*/
169171169Smlaier		    if (errno == EAGAIN ||
170171169Smlaier			errno == EINTR ||
171171169Smlaier			errno == EINPROGRESS)
172171169Smlaier			    goto reschedule;
173171169Smlaier		    /* error case */
174171169Smlaier		    what |= EVBUFFER_ERROR;
175171169Smlaier
176171169Smlaier#else
177171169Smlaier				goto reschedule;
178171169Smlaier#endif
179171169Smlaier
180171169Smlaier	    } else if (res == 0) {
181171169Smlaier		    /* eof case */
182171169Smlaier		    what |= EVBUFFER_EOF;
183171169Smlaier	    }
184171169Smlaier	    if (res <= 0)
185171169Smlaier		    goto error;
186171169Smlaier	}
187171169Smlaier
188171169Smlaier	if (EVBUFFER_LENGTH(bufev->output) != 0)
189171169Smlaier		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
190171169Smlaier
191171169Smlaier	/*
192171169Smlaier	 * Invoke the user callback if our buffer is drained or below the
193171169Smlaier	 * low watermark.
194171169Smlaier	 */
195171169Smlaier	if (bufev->writecb != NULL &&
196171169Smlaier	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
197171169Smlaier		(*bufev->writecb)(bufev, bufev->cbarg);
198171169Smlaier
199171169Smlaier	return;
200171169Smlaier
201171169Smlaier reschedule:
202171169Smlaier	if (EVBUFFER_LENGTH(bufev->output) != 0)
203171169Smlaier		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
204171169Smlaier	return;
205171169Smlaier
206171169Smlaier error:
207171169Smlaier	(*bufev->errorcb)(bufev, what, bufev->cbarg);
208171169Smlaier}
209171169Smlaier
210171169Smlaier/*
211171169Smlaier * Create a new buffered event object.
212171169Smlaier *
213171169Smlaier * The read callback is invoked whenever we read new data.
214171169Smlaier * The write callback is invoked whenever the output buffer is drained.
215171169Smlaier * The error callback is invoked on a write/read error or on EOF.
216171169Smlaier *
217171169Smlaier * Both read and write callbacks maybe NULL.  The error callback is not
218171169Smlaier * allowed to be NULL and have to be provided always.
219171169Smlaier */
220171169Smlaier
221171169Smlaierstruct bufferevent *
222171169Smlaierbufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
223171169Smlaier    everrorcb errorcb, void *cbarg)
224171169Smlaier{
225171169Smlaier	struct bufferevent *bufev;
226171169Smlaier
227171169Smlaier	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
228171169Smlaier		return (NULL);
229171169Smlaier
230171169Smlaier	if ((bufev->input = evbuffer_new()) == NULL) {
231171169Smlaier		free(bufev);
232171169Smlaier		return (NULL);
233171169Smlaier	}
234171169Smlaier
235171169Smlaier	if ((bufev->output = evbuffer_new()) == NULL) {
236171169Smlaier		evbuffer_free(bufev->input);
237171169Smlaier		free(bufev);
238171169Smlaier		return (NULL);
239171169Smlaier	}
240171169Smlaier
241171169Smlaier	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
242171169Smlaier	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
243171169Smlaier
244171169Smlaier	bufev->readcb = readcb;
245171169Smlaier	bufev->writecb = writecb;
246171169Smlaier	bufev->errorcb = errorcb;
247171169Smlaier
248171169Smlaier	bufev->cbarg = cbarg;
249171169Smlaier
250171169Smlaier	/*
251171169Smlaier	 * Set to EV_WRITE so that using bufferevent_write is going to
252171169Smlaier	 * trigger a callback.  Reading needs to be explicitly enabled
253171169Smlaier	 * because otherwise no data will be available.
254171169Smlaier	 */
255171169Smlaier	bufev->enabled = EV_WRITE;
256171169Smlaier
257171169Smlaier	return (bufev);
258171169Smlaier}
259171169Smlaier
260171169Smlaierint
261171169Smlaierbufferevent_priority_set(struct bufferevent *bufev, int priority)
262171169Smlaier{
263171169Smlaier	if (event_priority_set(&bufev->ev_read, priority) == -1)
264171169Smlaier		return (-1);
265171169Smlaier	if (event_priority_set(&bufev->ev_write, priority) == -1)
266171169Smlaier		return (-1);
267171169Smlaier
268171169Smlaier	return (0);
269171169Smlaier}
270171169Smlaier
271171169Smlaier/* Closing the file descriptor is the responsibility of the caller */
272171169Smlaier
273171169Smlaiervoid
274171169Smlaierbufferevent_free(struct bufferevent *bufev)
275171169Smlaier{
276171169Smlaier	event_del(&bufev->ev_read);
277171169Smlaier	event_del(&bufev->ev_write);
278171169Smlaier
279171169Smlaier	evbuffer_free(bufev->input);
280171169Smlaier	evbuffer_free(bufev->output);
281171169Smlaier
282171169Smlaier	free(bufev);
283171169Smlaier}
284171169Smlaier
285171169Smlaier/*
286171169Smlaier * Returns 0 on success;
287171169Smlaier *        -1 on failure.
288171169Smlaier */
289171169Smlaier
290171169Smlaierint
291171169Smlaierbufferevent_write(struct bufferevent *bufev, void *data, size_t size)
292171169Smlaier{
293171169Smlaier	int res;
294171169Smlaier
295171169Smlaier	res = evbuffer_add(bufev->output, data, size);
296171169Smlaier
297171169Smlaier	if (res == -1)
298171169Smlaier		return (res);
299171169Smlaier
300171169Smlaier	/* If everything is okay, we need to schedule a write */
301171169Smlaier	if (size > 0 && (bufev->enabled & EV_WRITE))
302171169Smlaier		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
303171169Smlaier
304171169Smlaier	return (res);
305171169Smlaier}
306171169Smlaier
307171169Smlaierint
308171169Smlaierbufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
309171169Smlaier{
310171169Smlaier	int res;
311171169Smlaier
312171169Smlaier	res = bufferevent_write(bufev, buf->buffer, buf->off);
313171169Smlaier	if (res != -1)
314171169Smlaier		evbuffer_drain(buf, buf->off);
315171169Smlaier
316171169Smlaier	return (res);
317171169Smlaier}
318171169Smlaier
319171169Smlaiersize_t
320171169Smlaierbufferevent_read(struct bufferevent *bufev, void *data, size_t size)
321171169Smlaier{
322171169Smlaier	struct evbuffer *buf = bufev->input;
323171169Smlaier
324171169Smlaier	if (buf->off < size)
325171169Smlaier		size = buf->off;
326171169Smlaier
327171169Smlaier	/* Copy the available data to the user buffer */
328171169Smlaier	memcpy(data, buf->buffer, size);
329171169Smlaier
330171169Smlaier	if (size)
331171169Smlaier		evbuffer_drain(buf, size);
332171169Smlaier
333171169Smlaier	return (size);
334171169Smlaier}
335171169Smlaier
336171169Smlaierint
337171169Smlaierbufferevent_enable(struct bufferevent *bufev, short event)
338171169Smlaier{
339171169Smlaier	if (event & EV_READ) {
340171169Smlaier		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
341171169Smlaier			return (-1);
342171169Smlaier	}
343171169Smlaier	if (event & EV_WRITE) {
344171169Smlaier		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
345171169Smlaier			return (-1);
346171169Smlaier	}
347171169Smlaier
348171169Smlaier	bufev->enabled |= event;
349171169Smlaier	return (0);
350171169Smlaier}
351171169Smlaier
352171169Smlaierint
353171169Smlaierbufferevent_disable(struct bufferevent *bufev, short event)
354171169Smlaier{
355171169Smlaier	if (event & EV_READ) {
356171169Smlaier		if (event_del(&bufev->ev_read) == -1)
357171169Smlaier			return (-1);
358171169Smlaier	}
359171169Smlaier	if (event & EV_WRITE) {
360171169Smlaier		if (event_del(&bufev->ev_write) == -1)
361171169Smlaier			return (-1);
362171169Smlaier	}
363171169Smlaier
364171169Smlaier	bufev->enabled &= ~event;
365171169Smlaier	return (0);
366171169Smlaier}
367171169Smlaier
368171169Smlaier/*
369171169Smlaier * Sets the read and write timeout for a buffered event.
370171169Smlaier */
371171169Smlaier
372171169Smlaiervoid
373171169Smlaierbufferevent_settimeout(struct bufferevent *bufev,
374171169Smlaier    int timeout_read, int timeout_write) {
375171169Smlaier	bufev->timeout_read = timeout_read;
376171169Smlaier	bufev->timeout_write = timeout_write;
377171169Smlaier}
378171169Smlaier
379171169Smlaier/*
380171169Smlaier * Sets the water marks
381171169Smlaier */
382171169Smlaier
383171169Smlaiervoid
384171169Smlaierbufferevent_setwatermark(struct bufferevent *bufev, short events,
385171169Smlaier    size_t lowmark, size_t highmark)
386171169Smlaier{
387171169Smlaier	if (events & EV_READ) {
388171169Smlaier		bufev->wm_read.low = lowmark;
389171169Smlaier		bufev->wm_read.high = highmark;
390171169Smlaier	}
391171169Smlaier
392171169Smlaier	if (events & EV_WRITE) {
393171169Smlaier		bufev->wm_write.low = lowmark;
394171169Smlaier		bufev->wm_write.high = highmark;
395171169Smlaier	}
396171169Smlaier
397171169Smlaier	/* If the watermarks changed then see if we should call read again */
398171169Smlaier	bufferevent_read_pressure_cb(bufev->input,
399171169Smlaier	    0, EVBUFFER_LENGTH(bufev->input), bufev);
400171169Smlaier}
401171169Smlaier
402171169Smlaierint
403171169Smlaierbufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
404171169Smlaier{
405171169Smlaier	int res;
406171169Smlaier
407171169Smlaier	res = event_base_set(base, &bufev->ev_read);
408171169Smlaier	if (res == -1)
409171169Smlaier		return (res);
410171169Smlaier
411171169Smlaier	res = event_base_set(base, &bufev->ev_write);
412171169Smlaier	return (res);
413171169Smlaier}
414