regress_bufferevent.c revision 275970
1/*
2 * Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu>
3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 *    derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27#include "util-internal.h"
28
29/* The old tests here need assertions to work. */
30#undef NDEBUG
31
32#ifdef _WIN32
33#include <winsock2.h>
34#include <windows.h>
35#endif
36
37#include "event2/event-config.h"
38
39#include <sys/types.h>
40#include <sys/stat.h>
41#ifdef EVENT__HAVE_SYS_TIME_H
42#include <sys/time.h>
43#endif
44#include <sys/queue.h>
45#ifndef _WIN32
46#include <sys/socket.h>
47#include <sys/wait.h>
48#include <signal.h>
49#include <unistd.h>
50#include <netdb.h>
51#include <netinet/in.h>
52#endif
53#include <fcntl.h>
54#include <signal.h>
55#include <stdlib.h>
56#include <stdio.h>
57#include <string.h>
58#include <errno.h>
59#include <assert.h>
60
61#ifdef EVENT__HAVE_ARPA_INET_H
62#include <arpa/inet.h>
63#endif
64
65#include "event2/event-config.h"
66#include "event2/event.h"
67#include "event2/event_struct.h"
68#include "event2/event_compat.h"
69#include "event2/tag.h"
70#include "event2/buffer.h"
71#include "event2/bufferevent.h"
72#include "event2/bufferevent_compat.h"
73#include "event2/bufferevent_struct.h"
74#include "event2/listener.h"
75#include "event2/util.h"
76
77#include "bufferevent-internal.h"
78#include "util-internal.h"
79#ifdef _WIN32
80#include "iocp-internal.h"
81#endif
82
83#include "regress.h"
84#include "regress_testutils.h"
85
86/*
87 * simple bufferevent test
88 */
89
90static void
91readcb(struct bufferevent *bev, void *arg)
92{
93	if (evbuffer_get_length(bev->input) == 8333) {
94		struct evbuffer *evbuf = evbuffer_new();
95		assert(evbuf != NULL);
96
97		/* gratuitous test of bufferevent_read_buffer */
98		bufferevent_read_buffer(bev, evbuf);
99
100		bufferevent_disable(bev, EV_READ);
101
102		if (evbuffer_get_length(evbuf) == 8333) {
103			test_ok++;
104		}
105
106		evbuffer_free(evbuf);
107	}
108}
109
110static void
111writecb(struct bufferevent *bev, void *arg)
112{
113	if (evbuffer_get_length(bev->output) == 0) {
114		test_ok++;
115	}
116}
117
118static void
119errorcb(struct bufferevent *bev, short what, void *arg)
120{
121	test_ok = -2;
122}
123
124static void
125test_bufferevent_impl(int use_pair)
126{
127	struct bufferevent *bev1 = NULL, *bev2 = NULL;
128	char buffer[8333];
129	int i;
130
131	if (use_pair) {
132		struct bufferevent *pair[2];
133		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
134		bev1 = pair[0];
135		bev2 = pair[1];
136		bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1);
137		bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL);
138		tt_int_op(bufferevent_getfd(bev1), ==, -1);
139		tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
140		tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2);
141		tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1);
142	} else {
143		bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
144		bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
145		tt_int_op(bufferevent_getfd(bev1), ==, pair[0]);
146		tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
147		tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL);
148		tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
149	}
150
151	{
152		/* Test getcb. */
153		bufferevent_data_cb r, w;
154		bufferevent_event_cb e;
155		void *a;
156		bufferevent_getcb(bev1, &r, &w, &e, &a);
157		tt_ptr_op(r, ==, readcb);
158		tt_ptr_op(w, ==, writecb);
159		tt_ptr_op(e, ==, errorcb);
160		tt_ptr_op(a, ==, use_pair ? bev1 : NULL);
161	}
162
163	bufferevent_disable(bev1, EV_READ);
164	bufferevent_enable(bev2, EV_READ);
165
166	tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE);
167	tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ);
168
169	for (i = 0; i < (int)sizeof(buffer); i++)
170		buffer[i] = i;
171
172	bufferevent_write(bev1, buffer, sizeof(buffer));
173
174	event_dispatch();
175
176	bufferevent_free(bev1);
177	tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
178	bufferevent_free(bev2);
179
180	if (test_ok != 2)
181		test_ok = 0;
182end:
183	;
184}
185
186static void
187test_bufferevent(void)
188{
189	test_bufferevent_impl(0);
190}
191
192static void
193test_bufferevent_pair(void)
194{
195	test_bufferevent_impl(1);
196}
197
198/*
199 * test watermarks and bufferevent
200 */
201
202static void
203wm_readcb(struct bufferevent *bev, void *arg)
204{
205	struct evbuffer *evbuf = evbuffer_new();
206	int len = (int)evbuffer_get_length(bev->input);
207	static int nread;
208
209	assert(len >= 10 && len <= 20);
210
211	assert(evbuf != NULL);
212
213	/* gratuitous test of bufferevent_read_buffer */
214	bufferevent_read_buffer(bev, evbuf);
215
216	nread += len;
217	if (nread == 65000) {
218		bufferevent_disable(bev, EV_READ);
219		test_ok++;
220	}
221
222	evbuffer_free(evbuf);
223}
224
225static void
226wm_writecb(struct bufferevent *bev, void *arg)
227{
228	assert(evbuffer_get_length(bev->output) <= 100);
229	if (evbuffer_get_length(bev->output) == 0) {
230		evbuffer_drain(bev->output, evbuffer_get_length(bev->output));
231		test_ok++;
232	}
233}
234
235static void
236wm_errorcb(struct bufferevent *bev, short what, void *arg)
237{
238	test_ok = -2;
239}
240
241static void
242test_bufferevent_watermarks_impl(int use_pair)
243{
244	struct bufferevent *bev1 = NULL, *bev2 = NULL;
245	char buffer[65000];
246	size_t low, high;
247	int i;
248	test_ok = 0;
249
250	if (use_pair) {
251		struct bufferevent *pair[2];
252		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
253		bev1 = pair[0];
254		bev2 = pair[1];
255		bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL);
256		bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL);
257	} else {
258		bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL);
259		bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL);
260	}
261	tt_assert(bev1);
262	tt_assert(bev2);
263	bufferevent_disable(bev1, EV_READ);
264	bufferevent_enable(bev2, EV_READ);
265
266	/* By default, low watermarks are set to 0 */
267	bufferevent_getwatermark(bev1, EV_READ, &low, NULL);
268	tt_int_op(low, ==, 0);
269	bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL);
270	tt_int_op(low, ==, 0);
271
272	for (i = 0; i < (int)sizeof(buffer); i++)
273		buffer[i] = (char)i;
274
275	/* limit the reading on the receiving bufferevent */
276	bufferevent_setwatermark(bev2, EV_READ, 10, 20);
277
278	bufferevent_getwatermark(bev2, EV_READ, &low, &high);
279	tt_int_op(low, ==, 10);
280	tt_int_op(high, ==, 20);
281
282	/* Tell the sending bufferevent not to notify us till it's down to
283	   100 bytes. */
284	bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
285
286	bufferevent_getwatermark(bev1, EV_WRITE, &low, &high);
287	tt_int_op(low, ==, 100);
288	tt_int_op(high, ==, 2000);
289
290	bufferevent_write(bev1, buffer, sizeof(buffer));
291
292	event_dispatch();
293
294	tt_int_op(test_ok, ==, 2);
295
296	/* The write callback drained all the data from outbuf, so we
297	 * should have removed the write event... */
298	tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL));
299
300end:
301	if (bev1)
302		bufferevent_free(bev1);
303	if (bev2)
304		bufferevent_free(bev2);
305}
306
307static void
308test_bufferevent_watermarks(void)
309{
310	test_bufferevent_watermarks_impl(0);
311}
312
313static void
314test_bufferevent_pair_watermarks(void)
315{
316	test_bufferevent_watermarks_impl(1);
317}
318
319/*
320 * Test bufferevent filters
321 */
322
323/* strip an 'x' from each byte */
324
325static enum bufferevent_filter_result
326bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
327    ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
328{
329	const unsigned char *buffer;
330	unsigned i;
331
332	buffer = evbuffer_pullup(src, evbuffer_get_length(src));
333	for (i = 0; i < evbuffer_get_length(src); i += 2) {
334		assert(buffer[i] == 'x');
335		evbuffer_add(dst, buffer + i + 1, 1);
336
337		if (i + 2 > evbuffer_get_length(src))
338			break;
339	}
340
341	evbuffer_drain(src, i);
342	return (BEV_OK);
343}
344
345/* add an 'x' before each byte */
346
347static enum bufferevent_filter_result
348bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
349    ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
350{
351	const unsigned char *buffer;
352	unsigned i;
353
354	buffer = evbuffer_pullup(src, evbuffer_get_length(src));
355	for (i = 0; i < evbuffer_get_length(src); ++i) {
356		evbuffer_add(dst, "x", 1);
357		evbuffer_add(dst, buffer + i, 1);
358	}
359
360	evbuffer_drain(src, evbuffer_get_length(src));
361	return (BEV_OK);
362}
363
364static void
365test_bufferevent_filters_impl(int use_pair)
366{
367	struct bufferevent *bev1 = NULL, *bev2 = NULL;
368	struct bufferevent *bev1_base = NULL, *bev2_base = NULL;
369	char buffer[8333];
370	int i;
371
372	test_ok = 0;
373
374	if (use_pair) {
375		struct bufferevent *pair[2];
376		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
377		bev1 = pair[0];
378		bev2 = pair[1];
379	} else {
380		bev1 = bufferevent_socket_new(NULL, pair[0], 0);
381		bev2 = bufferevent_socket_new(NULL, pair[1], 0);
382	}
383	bev1_base = bev1;
384	bev2_base = bev2;
385
386	for (i = 0; i < (int)sizeof(buffer); i++)
387		buffer[i] = i;
388
389	bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter,
390				      BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
391
392	bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter,
393				      NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
394	bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL);
395	bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL);
396
397	tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base);
398	tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base);
399	tt_int_op(bufferevent_getfd(bev1), ==, -1);
400	tt_int_op(bufferevent_getfd(bev2), ==, -1);
401
402	bufferevent_disable(bev1, EV_READ);
403	bufferevent_enable(bev2, EV_READ);
404	/* insert some filters */
405	bufferevent_write(bev1, buffer, sizeof(buffer));
406
407	event_dispatch();
408
409	if (test_ok != 2)
410		test_ok = 0;
411
412end:
413	if (bev1)
414		bufferevent_free(bev1);
415	if (bev2)
416		bufferevent_free(bev2);
417
418}
419
420static void
421test_bufferevent_filters(void)
422{
423	test_bufferevent_filters_impl(0);
424}
425
426static void
427test_bufferevent_pair_filters(void)
428{
429	test_bufferevent_filters_impl(1);
430}
431
432
433static void
434sender_writecb(struct bufferevent *bev, void *ctx)
435{
436	if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) {
437		bufferevent_disable(bev,EV_READ|EV_WRITE);
438		TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev)));
439		bufferevent_free(bev);
440	}
441}
442
443static void
444sender_errorcb(struct bufferevent *bev, short what, void *ctx)
445{
446	TT_FAIL(("Got sender error %d",(int)what));
447}
448
449static int bufferevent_connect_test_flags = 0;
450static int bufferevent_trigger_test_flags = 0;
451static int n_strings_read = 0;
452static int n_reads_invoked = 0;
453
454#define TEST_STR "Now is the time for all good events to signal for " \
455	"the good of their protocol"
456static void
457listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
458    struct sockaddr *sa, int socklen, void *arg)
459{
460	struct event_base *base = arg;
461	struct bufferevent *bev;
462	const char s[] = TEST_STR;
463	TT_BLATHER(("Got a request on socket %d", (int)fd ));
464	bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags);
465	tt_assert(bev);
466	bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
467	bufferevent_write(bev, s, sizeof(s));
468end:
469	;
470}
471
472static void
473reader_eventcb(struct bufferevent *bev, short what, void *ctx)
474{
475	struct event_base *base = ctx;
476	if (what & BEV_EVENT_ERROR) {
477		perror("foobar");
478		TT_FAIL(("got connector error %d", (int)what));
479		return;
480	}
481	if (what & BEV_EVENT_CONNECTED) {
482		TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev)));
483		bufferevent_enable(bev, EV_READ);
484	}
485	if (what & BEV_EVENT_EOF) {
486		char buf[512];
487		size_t n;
488		n = bufferevent_read(bev, buf, sizeof(buf)-1);
489		tt_int_op(n, >=, 0);
490		buf[n] = '\0';
491		tt_str_op(buf, ==, TEST_STR);
492		if (++n_strings_read == 2)
493			event_base_loopexit(base, NULL);
494		TT_BLATHER(("EOF on %d: %d strings read.",
495			(int)bufferevent_getfd(bev), n_strings_read));
496	}
497end:
498	;
499}
500
501static void
502reader_readcb(struct bufferevent *bev, void *ctx)
503{
504	TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
505	n_reads_invoked++;
506}
507
508static void
509test_bufferevent_connect(void *arg)
510{
511	struct basic_test_data *data = arg;
512	struct evconnlistener *lev=NULL;
513	struct bufferevent *bev1=NULL, *bev2=NULL;
514	struct sockaddr_in localhost;
515	struct sockaddr_storage ss;
516	struct sockaddr *sa;
517	ev_socklen_t slen;
518
519	int be_flags=BEV_OPT_CLOSE_ON_FREE;
520
521	if (strstr((char*)data->setup_data, "defer")) {
522		be_flags |= BEV_OPT_DEFER_CALLBACKS;
523	}
524	if (strstr((char*)data->setup_data, "unlocked")) {
525		be_flags |= BEV_OPT_UNLOCK_CALLBACKS;
526	}
527	if (strstr((char*)data->setup_data, "lock")) {
528		be_flags |= BEV_OPT_THREADSAFE;
529	}
530	bufferevent_connect_test_flags = be_flags;
531#ifdef _WIN32
532	if (!strcmp((char*)data->setup_data, "unset_connectex")) {
533		struct win32_extension_fns *ext =
534		    (struct win32_extension_fns *)
535		    event_get_win32_extension_fns_();
536		ext->ConnectEx = NULL;
537	}
538#endif
539
540	memset(&localhost, 0, sizeof(localhost));
541
542	localhost.sin_port = 0; /* pick-a-port */
543	localhost.sin_addr.s_addr = htonl(0x7f000001L);
544	localhost.sin_family = AF_INET;
545	sa = (struct sockaddr *)&localhost;
546	lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
547	    LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
548	    16, sa, sizeof(localhost));
549	tt_assert(lev);
550
551	sa = (struct sockaddr *)&ss;
552	slen = sizeof(ss);
553	if (regress_get_listener_addr(lev, sa, &slen) < 0) {
554		tt_abort_perror("getsockname");
555	}
556
557	tt_assert(!evconnlistener_enable(lev));
558	bev1 = bufferevent_socket_new(data->base, -1, be_flags);
559	bev2 = bufferevent_socket_new(data->base, -1, be_flags);
560	tt_assert(bev1);
561	tt_assert(bev2);
562	bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base);
563	bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base);
564
565	bufferevent_enable(bev1, EV_READ);
566	bufferevent_enable(bev2, EV_READ);
567
568	tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost)));
569	tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost)));
570
571	event_base_dispatch(data->base);
572
573	tt_int_op(n_strings_read, ==, 2);
574	tt_int_op(n_reads_invoked, >=, 2);
575end:
576	if (lev)
577		evconnlistener_free(lev);
578
579	if (bev1)
580		bufferevent_free(bev1);
581
582	if (bev2)
583		bufferevent_free(bev2);
584}
585
586static void
587want_fail_eventcb(struct bufferevent *bev, short what, void *ctx)
588{
589	struct event_base *base = ctx;
590	const char *err;
591	evutil_socket_t s;
592
593	if (what & BEV_EVENT_ERROR) {
594		s = bufferevent_getfd(bev);
595		err = evutil_socket_error_to_string(evutil_socket_geterror(s));
596		TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s",
597			EV_SOCK_ARG(s), err));
598		test_ok = 1;
599	} else {
600		TT_FAIL(("didn't fail? what %hd", what));
601	}
602
603	event_base_loopexit(base, NULL);
604}
605
606static void
607close_socket_cb(evutil_socket_t fd, short what, void *arg)
608{
609	evutil_socket_t *fdp = arg;
610	if (*fdp >= 0) {
611		evutil_closesocket(*fdp);
612		*fdp = -1;
613	}
614}
615
616static void
617test_bufferevent_connect_fail(void *arg)
618{
619	struct basic_test_data *data = (struct basic_test_data *)arg;
620	struct bufferevent *bev=NULL;
621	struct sockaddr_in localhost;
622	struct sockaddr *sa = (struct sockaddr*)&localhost;
623	evutil_socket_t fake_listener = -1;
624	ev_socklen_t slen = sizeof(localhost);
625	struct event close_listener_event;
626	int close_listener_event_added = 0;
627	struct timeval one_second = { 1, 0 };
628	int r;
629
630	test_ok = 0;
631
632	memset(&localhost, 0, sizeof(localhost));
633	localhost.sin_port = 0; /* have the kernel pick a port */
634	localhost.sin_addr.s_addr = htonl(0x7f000001L);
635	localhost.sin_family = AF_INET;
636
637	/* bind, but don't listen or accept. should trigger
638	   "Connection refused" reliably on most platforms. */
639	fake_listener = socket(localhost.sin_family, SOCK_STREAM, 0);
640	tt_assert(fake_listener >= 0);
641	tt_assert(bind(fake_listener, sa, slen) == 0);
642	tt_assert(getsockname(fake_listener, sa, &slen) == 0);
643	bev = bufferevent_socket_new(data->base, -1,
644		BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
645	tt_assert(bev);
646	bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base);
647
648	r = bufferevent_socket_connect(bev, sa, slen);
649	/* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells
650	 * detects the error immediately, which is not really wrong of it. */
651	tt_want(r == 0 || r == -1);
652
653	/* Close the listener socket after a second. This should trigger
654	   "connection refused" on some other platforms, including OSX. */
655	evtimer_assign(&close_listener_event, data->base, close_socket_cb,
656	    &fake_listener);
657	event_add(&close_listener_event, &one_second);
658	close_listener_event_added = 1;
659
660	event_base_dispatch(data->base);
661
662	tt_int_op(test_ok, ==, 1);
663
664end:
665	if (fake_listener >= 0)
666		evutil_closesocket(fake_listener);
667
668	if (bev)
669		bufferevent_free(bev);
670
671	if (close_listener_event_added)
672		event_del(&close_listener_event);
673}
674
675struct timeout_cb_result {
676	struct timeval read_timeout_at;
677	struct timeval write_timeout_at;
678	struct timeval last_wrote_at;
679	int n_read_timeouts;
680	int n_write_timeouts;
681	int total_calls;
682};
683
684static void
685bev_timeout_write_cb(struct bufferevent *bev, void *arg)
686{
687	struct timeout_cb_result *res = arg;
688	evutil_gettimeofday(&res->last_wrote_at, NULL);
689}
690
691static void
692bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
693{
694	struct timeout_cb_result *res = arg;
695	++res->total_calls;
696
697	if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
698	    == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
699		evutil_gettimeofday(&res->read_timeout_at, NULL);
700		++res->n_read_timeouts;
701	}
702	if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
703	    == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
704		evutil_gettimeofday(&res->write_timeout_at, NULL);
705		++res->n_write_timeouts;
706	}
707}
708
709static void
710test_bufferevent_timeouts(void *arg)
711{
712	/* "arg" is a string containing "pair" and/or "filter". */
713	struct bufferevent *bev1 = NULL, *bev2 = NULL;
714	struct basic_test_data *data = arg;
715	int use_pair = 0, use_filter = 0;
716	struct timeval tv_w, tv_r, started_at;
717	struct timeout_cb_result res1, res2;
718	char buf[1024];
719
720	memset(&res1, 0, sizeof(res1));
721	memset(&res2, 0, sizeof(res2));
722
723	if (strstr((char*)data->setup_data, "pair"))
724		use_pair = 1;
725	if (strstr((char*)data->setup_data, "filter"))
726		use_filter = 1;
727
728	if (use_pair) {
729		struct bufferevent *p[2];
730		tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
731		bev1 = p[0];
732		bev2 = p[1];
733	} else {
734		bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
735		bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
736	}
737
738	tt_assert(bev1);
739	tt_assert(bev2);
740
741	if (use_filter) {
742		struct bufferevent *bevf1, *bevf2;
743		bevf1 = bufferevent_filter_new(bev1, NULL, NULL,
744		    BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
745		bevf2 = bufferevent_filter_new(bev2, NULL, NULL,
746		    BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
747		tt_assert(bevf1);
748		tt_assert(bevf2);
749		bev1 = bevf1;
750		bev2 = bevf2;
751	}
752
753	/* Do this nice and early. */
754	bufferevent_disable(bev2, EV_READ);
755
756	/* bev1 will try to write and read.  Both will time out. */
757	evutil_gettimeofday(&started_at, NULL);
758	tv_w.tv_sec = tv_r.tv_sec = 0;
759	tv_w.tv_usec = 100*1000;
760	tv_r.tv_usec = 150*1000;
761	bufferevent_setcb(bev1, NULL, bev_timeout_write_cb,
762	    bev_timeout_event_cb, &res1);
763	bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0);
764	bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
765	if (use_pair) {
766		/* For a pair, the fact that the other side isn't reading
767		 * makes the writer stall */
768		bufferevent_write(bev1, "ABCDEFG", 7);
769	} else {
770		/* For a real socket, the kernel's TCP buffers can eat a
771		 * fair number of bytes; make sure that at some point we
772		 * have some bytes that will stall. */
773		struct evbuffer *output = bufferevent_get_output(bev1);
774		int i;
775		memset(buf, 0xbb, sizeof(buf));
776		for (i=0;i<1024;++i) {
777			evbuffer_add_reference(output, buf, sizeof(buf),
778			    NULL, NULL);
779		}
780	}
781	bufferevent_enable(bev1, EV_READ|EV_WRITE);
782
783	/* bev2 has nothing to say, and isn't listening. */
784	bufferevent_setcb(bev2, NULL,  bev_timeout_write_cb,
785	    bev_timeout_event_cb, &res2);
786	tv_w.tv_sec = tv_r.tv_sec = 0;
787	tv_w.tv_usec = 200*1000;
788	tv_r.tv_usec = 100*1000;
789	bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
790	bufferevent_enable(bev2, EV_WRITE);
791
792	tv_r.tv_sec = 0;
793	tv_r.tv_usec = 350000;
794
795	event_base_loopexit(data->base, &tv_r);
796	event_base_dispatch(data->base);
797
798	/* XXXX Test that actually reading or writing a little resets the
799	 * timeouts. */
800
801	/* Each buf1 timeout happens, and happens only once. */
802	tt_want(res1.n_read_timeouts);
803	tt_want(res1.n_write_timeouts);
804	tt_want(res1.n_read_timeouts == 1);
805	tt_want(res1.n_write_timeouts == 1);
806
807	test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150);
808	test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100);
809
810end:
811	if (bev1)
812		bufferevent_free(bev1);
813	if (bev2)
814		bufferevent_free(bev2);
815}
816
817static void
818trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
819{
820	TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
821}
822
823static void
824trigger_eventcb(struct bufferevent *bev, short what, void *ctx)
825{
826	struct event_base *base = ctx;
827	if (what == ~0) {
828		TT_BLATHER(("Event successfully triggered."));
829		event_base_loopexit(base, NULL);
830		return;
831	}
832	reader_eventcb(bev, what, ctx);
833}
834
835static void
836trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
837{
838	TT_BLATHER(("Read successfully triggered."));
839	n_reads_invoked++;
840	bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags);
841}
842
843static void
844trigger_readcb(struct bufferevent *bev, void *ctx)
845{
846	struct timeval timeout = { 30, 0 };
847	struct event_base *base = ctx;
848	size_t low, high, len;
849	int expected_reads;
850
851	TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
852	expected_reads = ++n_reads_invoked;
853
854	bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx);
855
856	bufferevent_getwatermark(bev, EV_READ, &low, &high);
857	len = evbuffer_get_length(bufferevent_get_input(bev));
858
859	bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
860	bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
861	/* no callback expected */
862	tt_int_op(n_reads_invoked, ==, expected_reads);
863
864	if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
865	    (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
866		/* will be deferred */
867	} else {
868		expected_reads++;
869	}
870
871	event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
872
873	bufferevent_trigger(bev, EV_READ,
874	    bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
875	tt_int_op(n_reads_invoked, ==, expected_reads);
876
877	bufferevent_setwatermark(bev, EV_READ, low, high);
878end:
879	;
880}
881
882static void
883test_bufferevent_trigger(void *arg)
884{
885	struct basic_test_data *data = arg;
886	struct evconnlistener *lev=NULL;
887	struct bufferevent *bev=NULL;
888	struct sockaddr_in localhost;
889	struct sockaddr_storage ss;
890	struct sockaddr *sa;
891	ev_socklen_t slen;
892
893	int be_flags=BEV_OPT_CLOSE_ON_FREE;
894	int trig_flags=0;
895
896	if (strstr((char*)data->setup_data, "defer")) {
897		be_flags |= BEV_OPT_DEFER_CALLBACKS;
898	}
899	bufferevent_connect_test_flags = be_flags;
900
901	if (strstr((char*)data->setup_data, "postpone")) {
902		trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
903	}
904	bufferevent_trigger_test_flags = trig_flags;
905
906	memset(&localhost, 0, sizeof(localhost));
907
908	localhost.sin_port = 0; /* pick-a-port */
909	localhost.sin_addr.s_addr = htonl(0x7f000001L);
910	localhost.sin_family = AF_INET;
911	sa = (struct sockaddr *)&localhost;
912	lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
913	    LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
914	    16, sa, sizeof(localhost));
915	tt_assert(lev);
916
917	sa = (struct sockaddr *)&ss;
918	slen = sizeof(ss);
919	if (regress_get_listener_addr(lev, sa, &slen) < 0) {
920		tt_abort_perror("getsockname");
921	}
922
923	tt_assert(!evconnlistener_enable(lev));
924	bev = bufferevent_socket_new(data->base, -1, be_flags);
925	tt_assert(bev);
926	bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base);
927
928	bufferevent_enable(bev, EV_READ);
929
930	tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
931
932	event_base_dispatch(data->base);
933
934	tt_int_op(n_reads_invoked, ==, 2);
935end:
936	if (lev)
937		evconnlistener_free(lev);
938
939	if (bev)
940		bufferevent_free(bev);
941}
942
943struct testcase_t bufferevent_testcases[] = {
944
945	LEGACY(bufferevent, TT_ISOLATED),
946	LEGACY(bufferevent_pair, TT_ISOLATED),
947	LEGACY(bufferevent_watermarks, TT_ISOLATED),
948	LEGACY(bufferevent_pair_watermarks, TT_ISOLATED),
949	LEGACY(bufferevent_filters, TT_ISOLATED),
950	LEGACY(bufferevent_pair_filters, TT_ISOLATED),
951	{ "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE,
952	  &basic_setup, (void*)"" },
953	{ "bufferevent_connect_defer", test_bufferevent_connect,
954	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
955	{ "bufferevent_connect_lock", test_bufferevent_connect,
956	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" },
957	{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
958	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
959	  (void*)"defer lock" },
960	{ "bufferevent_connect_unlocked_cbs", test_bufferevent_connect,
961	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
962	  (void*)"lock defer unlocked" },
963	{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
964	  TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
965	{ "bufferevent_timeout", test_bufferevent_timeouts,
966	  TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" },
967	{ "bufferevent_timeout_pair", test_bufferevent_timeouts,
968	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
969	{ "bufferevent_timeout_filter", test_bufferevent_timeouts,
970	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
971	{ "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
972	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
973	{ "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
974	  &basic_setup, (void*)"" },
975	{ "bufferevent_trigger_defer", test_bufferevent_trigger,
976	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
977	{ "bufferevent_trigger_postpone", test_bufferevent_trigger,
978	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
979	  (void*)"postpone" },
980	{ "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
981	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
982	  (void*)"defer postpone" },
983#ifdef EVENT__HAVE_LIBZ
984	LEGACY(bufferevent_zlib, TT_ISOLATED),
985#else
986	{ "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL },
987#endif
988
989	END_OF_TESTCASES,
990};
991
992struct testcase_t bufferevent_iocp_testcases[] = {
993
994	LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP),
995	LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
996	LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
997	{ "bufferevent_connect", test_bufferevent_connect,
998	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
999	{ "bufferevent_connect_defer", test_bufferevent_connect,
1000	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" },
1001	{ "bufferevent_connect_lock", test_bufferevent_connect,
1002	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
1003	  (void*)"lock" },
1004	{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
1005	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
1006	  (void*)"defer lock" },
1007	{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
1008	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
1009	{ "bufferevent_connect_nonblocking", test_bufferevent_connect,
1010	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
1011	  (void*)"unset_connectex" },
1012
1013	END_OF_TESTCASES,
1014};
1015