1/*
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 *    derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <sys/types.h>
30
31#include "event2/event-config.h"
32
33#ifdef _EVENT_HAVE_SYS_TIME_H
34#include <sys/time.h>
35#endif
36
37#include <errno.h>
38#include <stdio.h>
39#include <stdlib.h>
40#include <string.h>
41#ifdef _EVENT_HAVE_STDARG_H
42#include <stdarg.h>
43#endif
44#ifdef _EVENT_HAVE_UNISTD_H
45#include <unistd.h>
46#endif
47
48#ifdef WIN32
49#include <winsock2.h>
50#include <ws2tcpip.h>
51#endif
52
53#ifdef _EVENT_HAVE_SYS_SOCKET_H
54#include <sys/socket.h>
55#endif
56#ifdef _EVENT_HAVE_NETINET_IN_H
57#include <netinet/in.h>
58#endif
59#ifdef _EVENT_HAVE_NETINET_IN6_H
60#include <netinet/in6.h>
61#endif
62
63#include "event2/util.h"
64#include "event2/bufferevent.h"
65#include "event2/buffer.h"
66#include "event2/bufferevent_struct.h"
67#include "event2/bufferevent_compat.h"
68#include "event2/event.h"
69#include "log-internal.h"
70#include "mm-internal.h"
71#include "bufferevent-internal.h"
72#include "util-internal.h"
73#ifdef WIN32
74#include "iocp-internal.h"
75#endif
76
77/* prototypes */
78static int be_socket_enable(struct bufferevent *, short);
79static int be_socket_disable(struct bufferevent *, short);
80static void be_socket_destruct(struct bufferevent *);
81static int be_socket_adj_timeouts(struct bufferevent *);
82static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
83static int be_socket_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
84
85static void be_socket_setfd(struct bufferevent *, evutil_socket_t);
86
87const struct bufferevent_ops bufferevent_ops_socket = {
88	"socket",
89	evutil_offsetof(struct bufferevent_private, bev),
90	be_socket_enable,
91	be_socket_disable,
92	be_socket_destruct,
93	be_socket_adj_timeouts,
94	be_socket_flush,
95	be_socket_ctrl,
96};
97
98#define be_socket_add(ev, t)			\
99	_bufferevent_add_event((ev), (t))
100
101static void
102bufferevent_socket_outbuf_cb(struct evbuffer *buf,
103    const struct evbuffer_cb_info *cbinfo,
104    void *arg)
105{
106	struct bufferevent *bufev = arg;
107	struct bufferevent_private *bufev_p =
108	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
109
110	if (cbinfo->n_added &&
111	    (bufev->enabled & EV_WRITE) &&
112	    !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
113	    !bufev_p->write_suspended) {
114		/* Somebody added data to the buffer, and we would like to
115		 * write, and we were not writing.  So, start writing. */
116		if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
117		    /* Should we log this? */
118		}
119	}
120}
121
122static void
123bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
124{
125	struct bufferevent *bufev = arg;
126	struct bufferevent_private *bufev_p =
127	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
128	struct evbuffer *input;
129	int res = 0;
130	short what = BEV_EVENT_READING;
131	ev_ssize_t howmuch = -1, readmax=-1;
132
133	_bufferevent_incref_and_lock(bufev);
134
135	if (event == EV_TIMEOUT) {
136		what |= BEV_EVENT_TIMEOUT;
137		goto error;
138	}
139
140	input = bufev->input;
141
142	/*
143	 * If we have a high watermark configured then we don't want to
144	 * read more data than would make us reach the watermark.
145	 */
146	if (bufev->wm_read.high != 0) {
147		howmuch = bufev->wm_read.high - evbuffer_get_length(input);
148		/* we somehow lowered the watermark, stop reading */
149		if (howmuch <= 0) {
150			bufferevent_wm_suspend_read(bufev);
151			goto done;
152		}
153	}
154	readmax = _bufferevent_get_read_max(bufev_p);
155	if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
156					       * uglifies this code. XXXX */
157		howmuch = readmax;
158	if (bufev_p->read_suspended)
159		goto done;
160
161	evbuffer_unfreeze(input, 0);
162	res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
163	evbuffer_freeze(input, 0);
164
165	if (res == -1) {
166		int err = evutil_socket_geterror(fd);
167		if (EVUTIL_ERR_RW_RETRIABLE(err))
168			goto reschedule;
169		/* error case */
170		what |= BEV_EVENT_ERROR;
171	} else if (res == 0) {
172		/* eof case */
173		what |= BEV_EVENT_EOF;
174	}
175
176	if (res <= 0)
177		goto error;
178
179	_bufferevent_decrement_read_buckets(bufev_p, res);
180
181	/* Invoke the user callback - must always be called last */
182	if (evbuffer_get_length(input) >= bufev->wm_read.low)
183		_bufferevent_run_readcb(bufev);
184
185	goto done;
186
187 reschedule:
188	goto done;
189
190 error:
191	bufferevent_disable(bufev, EV_READ);
192	_bufferevent_run_eventcb(bufev, what);
193
194 done:
195	_bufferevent_decref_and_unlock(bufev);
196}
197
198static void
199bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
200{
201	struct bufferevent *bufev = arg;
202	struct bufferevent_private *bufev_p =
203	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
204	int res = 0;
205	short what = BEV_EVENT_WRITING;
206	int connected = 0;
207	ev_ssize_t atmost = -1;
208
209	_bufferevent_incref_and_lock(bufev);
210
211	if (event == EV_TIMEOUT) {
212		what |= BEV_EVENT_TIMEOUT;
213		goto error;
214	}
215	if (bufev_p->connecting) {
216		int c = evutil_socket_finished_connecting(fd);
217		/* we need to fake the error if the connection was refused
218		 * immediately - usually connection to localhost on BSD */
219		if (bufev_p->connection_refused) {
220		  bufev_p->connection_refused = 0;
221		  c = -1;
222		}
223
224		if (c == 0)
225			goto done;
226
227		bufev_p->connecting = 0;
228		if (c < 0) {
229			event_del(&bufev->ev_write);
230			event_del(&bufev->ev_read);
231			_bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
232			goto done;
233		} else {
234			connected = 1;
235#ifdef WIN32
236			if (BEV_IS_ASYNC(bufev)) {
237				event_del(&bufev->ev_write);
238				bufferevent_async_set_connected(bufev);
239				_bufferevent_run_eventcb(bufev,
240						BEV_EVENT_CONNECTED);
241				goto done;
242			}
243#endif
244			_bufferevent_run_eventcb(bufev,
245					BEV_EVENT_CONNECTED);
246			if (!(bufev->enabled & EV_WRITE) ||
247			    bufev_p->write_suspended) {
248				event_del(&bufev->ev_write);
249				goto done;
250			}
251		}
252	}
253
254	atmost = _bufferevent_get_write_max(bufev_p);
255
256	if (bufev_p->write_suspended)
257		goto done;
258
259	if (evbuffer_get_length(bufev->output)) {
260		evbuffer_unfreeze(bufev->output, 1);
261		res = evbuffer_write_atmost(bufev->output, fd, atmost);
262		evbuffer_freeze(bufev->output, 1);
263		if (res == -1) {
264			int err = evutil_socket_geterror(fd);
265			if (EVUTIL_ERR_RW_RETRIABLE(err))
266				goto reschedule;
267			what |= BEV_EVENT_ERROR;
268		} else if (res == 0) {
269			/* eof case
270			   XXXX Actually, a 0 on write doesn't indicate
271			   an EOF. An ECONNRESET might be more typical.
272			 */
273			what |= BEV_EVENT_EOF;
274		}
275		if (res <= 0)
276			goto error;
277
278		_bufferevent_decrement_write_buckets(bufev_p, res);
279	}
280
281	if (evbuffer_get_length(bufev->output) == 0) {
282		event_del(&bufev->ev_write);
283	}
284
285	/*
286	 * Invoke the user callback if our buffer is drained or below the
287	 * low watermark.
288	 */
289	if ((res || !connected) &&
290	    evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
291		_bufferevent_run_writecb(bufev);
292	}
293
294	goto done;
295
296 reschedule:
297	if (evbuffer_get_length(bufev->output) == 0) {
298		event_del(&bufev->ev_write);
299	}
300	goto done;
301
302 error:
303	bufferevent_disable(bufev, EV_WRITE);
304	_bufferevent_run_eventcb(bufev, what);
305
306 done:
307	_bufferevent_decref_and_unlock(bufev);
308}
309
310struct bufferevent *
311bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
312    int options)
313{
314	struct bufferevent_private *bufev_p;
315	struct bufferevent *bufev;
316
317#ifdef WIN32
318	if (base && event_base_get_iocp(base))
319		return bufferevent_async_new(base, fd, options);
320#endif
321
322	if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
323		return NULL;
324
325	if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
326				    options) < 0) {
327		mm_free(bufev_p);
328		return NULL;
329	}
330	bufev = &bufev_p->bev;
331	evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
332
333	event_assign(&bufev->ev_read, bufev->ev_base, fd,
334	    EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
335	event_assign(&bufev->ev_write, bufev->ev_base, fd,
336	    EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
337
338	evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
339
340	evbuffer_freeze(bufev->input, 0);
341	evbuffer_freeze(bufev->output, 1);
342
343	return bufev;
344}
345
346int
347bufferevent_socket_connect(struct bufferevent *bev,
348    struct sockaddr *sa, int socklen)
349{
350	struct bufferevent_private *bufev_p =
351	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
352
353	evutil_socket_t fd;
354	int r = 0;
355	int result=-1;
356	int ownfd = 0;
357
358	_bufferevent_incref_and_lock(bev);
359
360	if (!bufev_p)
361		goto done;
362
363	fd = bufferevent_getfd(bev);
364	if (fd < 0) {
365		if (!sa)
366			goto done;
367		fd = socket(sa->sa_family, SOCK_STREAM, 0);
368		if (fd < 0)
369			goto done;
370		if (evutil_make_socket_nonblocking(fd)<0)
371			goto done;
372		ownfd = 1;
373	}
374	if (sa) {
375#ifdef WIN32
376		if (bufferevent_async_can_connect(bev)) {
377			bufferevent_setfd(bev, fd);
378			r = bufferevent_async_connect(bev, fd, sa, socklen);
379			if (r < 0)
380				goto freesock;
381			bufev_p->connecting = 1;
382			result = 0;
383			goto done;
384		} else
385#endif
386		r = evutil_socket_connect(&fd, sa, socklen);
387		if (r < 0)
388			goto freesock;
389	}
390#ifdef WIN32
391	/* ConnectEx() isn't always around, even when IOCP is enabled.
392	 * Here, we borrow the socket object's write handler to fall back
393	 * on a non-blocking connect() when ConnectEx() is unavailable. */
394	if (BEV_IS_ASYNC(bev)) {
395		event_assign(&bev->ev_write, bev->ev_base, fd,
396		    EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
397	}
398#endif
399	bufferevent_setfd(bev, fd);
400	if (r == 0) {
401		if (! be_socket_enable(bev, EV_WRITE)) {
402			bufev_p->connecting = 1;
403			result = 0;
404			goto done;
405		}
406	} else if (r == 1) {
407		/* The connect succeeded already. How very BSD of it. */
408		result = 0;
409		bufev_p->connecting = 1;
410		event_active(&bev->ev_write, EV_WRITE, 1);
411	} else {
412		/* The connect failed already.  How very BSD of it. */
413		bufev_p->connection_refused = 1;
414		bufev_p->connecting = 1;
415		result = 0;
416		event_active(&bev->ev_write, EV_WRITE, 1);
417	}
418
419	goto done;
420
421freesock:
422	_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
423	if (ownfd)
424		evutil_closesocket(fd);
425	/* do something about the error? */
426done:
427	_bufferevent_decref_and_unlock(bev);
428	return result;
429}
430
431static void
432bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai,
433    void *arg)
434{
435	struct bufferevent *bev = arg;
436	struct bufferevent_private *bev_p =
437	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
438	int r;
439	BEV_LOCK(bev);
440
441	bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP);
442	bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP);
443
444	if (result != 0) {
445		bev_p->dns_error = result;
446		_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
447		_bufferevent_decref_and_unlock(bev);
448		if (ai)
449			evutil_freeaddrinfo(ai);
450		return;
451	}
452
453	/* XXX use the other addrinfos? */
454	/* XXX use this return value */
455	r = bufferevent_socket_connect(bev, ai->ai_addr, (int)ai->ai_addrlen);
456	(void)r;
457	_bufferevent_decref_and_unlock(bev);
458	evutil_freeaddrinfo(ai);
459}
460
461int
462bufferevent_socket_connect_hostname(struct bufferevent *bev,
463    struct evdns_base *evdns_base, int family, const char *hostname, int port)
464{
465	char portbuf[10];
466	struct evutil_addrinfo hint;
467	int err;
468	struct bufferevent_private *bev_p =
469	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
470
471	if (family != AF_INET && family != AF_INET6 && family != AF_UNSPEC)
472		return -1;
473	if (port < 1 || port > 65535)
474		return -1;
475
476	BEV_LOCK(bev);
477	bev_p->dns_error = 0;
478	BEV_UNLOCK(bev);
479
480	evutil_snprintf(portbuf, sizeof(portbuf), "%d", port);
481
482	memset(&hint, 0, sizeof(hint));
483	hint.ai_family = family;
484	hint.ai_protocol = IPPROTO_TCP;
485	hint.ai_socktype = SOCK_STREAM;
486
487	bufferevent_suspend_write(bev, BEV_SUSPEND_LOOKUP);
488	bufferevent_suspend_read(bev, BEV_SUSPEND_LOOKUP);
489
490	bufferevent_incref(bev);
491	err = evutil_getaddrinfo_async(evdns_base, hostname, portbuf,
492	    &hint, bufferevent_connect_getaddrinfo_cb, bev);
493
494	if (err == 0) {
495		return 0;
496	} else {
497		bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP);
498		bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP);
499		return -1;
500	}
501}
502
503int
504bufferevent_socket_get_dns_error(struct bufferevent *bev)
505{
506	int rv;
507	struct bufferevent_private *bev_p =
508	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
509
510	BEV_LOCK(bev);
511	rv = bev_p->dns_error;
512	BEV_LOCK(bev);
513
514	return rv;
515}
516
517/*
518 * Create a new buffered event object.
519 *
520 * The read callback is invoked whenever we read new data.
521 * The write callback is invoked whenever the output buffer is drained.
522 * The error callback is invoked on a write/read error or on EOF.
523 *
524 * Both read and write callbacks maybe NULL.  The error callback is not
525 * allowed to be NULL and have to be provided always.
526 */
527
528struct bufferevent *
529bufferevent_new(evutil_socket_t fd,
530    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
531    bufferevent_event_cb eventcb, void *cbarg)
532{
533	struct bufferevent *bufev;
534
535	if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
536		return NULL;
537
538	bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);
539
540	return bufev;
541}
542
543
544static int
545be_socket_enable(struct bufferevent *bufev, short event)
546{
547	if (event & EV_READ) {
548		if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
549			return -1;
550	}
551	if (event & EV_WRITE) {
552		if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
553			return -1;
554	}
555	return 0;
556}
557
558static int
559be_socket_disable(struct bufferevent *bufev, short event)
560{
561	struct bufferevent_private *bufev_p =
562	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
563	if (event & EV_READ) {
564		if (event_del(&bufev->ev_read) == -1)
565			return -1;
566	}
567	/* Don't actually disable the write if we are trying to connect. */
568	if ((event & EV_WRITE) && ! bufev_p->connecting) {
569		if (event_del(&bufev->ev_write) == -1)
570			return -1;
571	}
572	return 0;
573}
574
575static void
576be_socket_destruct(struct bufferevent *bufev)
577{
578	struct bufferevent_private *bufev_p =
579	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
580	evutil_socket_t fd;
581	EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);
582
583	fd = event_get_fd(&bufev->ev_read);
584
585	event_del(&bufev->ev_read);
586	event_del(&bufev->ev_write);
587
588	if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
589		EVUTIL_CLOSESOCKET(fd);
590}
591
592static int
593be_socket_adj_timeouts(struct bufferevent *bufev)
594{
595	int r = 0;
596	if (event_pending(&bufev->ev_read, EV_READ, NULL))
597		if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0)
598			r = -1;
599	if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) {
600		if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0)
601			r = -1;
602	}
603	return r;
604}
605
606static int
607be_socket_flush(struct bufferevent *bev, short iotype,
608    enum bufferevent_flush_mode mode)
609{
610	return 0;
611}
612
613
614static void
615be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
616{
617	BEV_LOCK(bufev);
618	EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);
619
620	event_del(&bufev->ev_read);
621	event_del(&bufev->ev_write);
622
623	event_assign(&bufev->ev_read, bufev->ev_base, fd,
624	    EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
625	event_assign(&bufev->ev_write, bufev->ev_base, fd,
626	    EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
627
628	if (fd >= 0)
629		bufferevent_enable(bufev, bufev->enabled);
630
631	BEV_UNLOCK(bufev);
632}
633
634/* XXXX Should non-socket bufferevents support this? */
635int
636bufferevent_priority_set(struct bufferevent *bufev, int priority)
637{
638	int r = -1;
639
640	BEV_LOCK(bufev);
641	if (bufev->be_ops != &bufferevent_ops_socket)
642		goto done;
643
644	if (event_priority_set(&bufev->ev_read, priority) == -1)
645		goto done;
646	if (event_priority_set(&bufev->ev_write, priority) == -1)
647		goto done;
648
649	r = 0;
650done:
651	BEV_UNLOCK(bufev);
652	return r;
653}
654
655/* XXXX Should non-socket bufferevents support this? */
656int
657bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
658{
659	int res = -1;
660
661	BEV_LOCK(bufev);
662	if (bufev->be_ops != &bufferevent_ops_socket)
663		goto done;
664
665	bufev->ev_base = base;
666
667	res = event_base_set(base, &bufev->ev_read);
668	if (res == -1)
669		goto done;
670
671	res = event_base_set(base, &bufev->ev_write);
672done:
673	BEV_UNLOCK(bufev);
674	return res;
675}
676
677static int
678be_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
679    union bufferevent_ctrl_data *data)
680{
681	switch (op) {
682	case BEV_CTRL_SET_FD:
683		be_socket_setfd(bev, data->fd);
684		return 0;
685	case BEV_CTRL_GET_FD:
686		data->fd = event_get_fd(&bev->ev_read);
687		return 0;
688	case BEV_CTRL_GET_UNDERLYING:
689	case BEV_CTRL_CANCEL_ALL:
690	default:
691		return -1;
692	}
693}
694