regress_bufferevent.c revision 275970
1337414Skevans/*
2337414Skevans * Copyright (c) 2003-2007 Niels Provos <provos@citi.umich.edu>
3336668Skevans * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4336668Skevans *
5337414Skevans * Redistribution and use in source and binary forms, with or without
6336668Skevans * modification, are permitted provided that the following conditions
7336668Skevans * are met:
8336668Skevans * 1. Redistributions of source code must retain the above copyright
9336668Skevans *    notice, this list of conditions and the following disclaimer.
10336668Skevans * 2. Redistributions in binary form must reproduce the above copyright
11336668Skevans *    notice, this list of conditions and the following disclaimer in the
12336668Skevans *    documentation and/or other materials provided with the distribution.
13336668Skevans * 3. The name of the author may not be used to endorse or promote products
14336668Skevans *    derived from this software without specific prior written permission.
15336668Skevans *
16336668Skevans * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17336668Skevans * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18336668Skevans * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19336668Skevans * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20336668Skevans * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21336668Skevans * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22336668Skevans * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23336668Skevans * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24336668Skevans * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25336668Skevans * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26336668Skevans */
27336668Skevans#include "util-internal.h"
28336668Skevans
29336668Skevans/* The old tests here need assertions to work. */
30337416Skevans#undef NDEBUG
31337416Skevans
32337416Skevans#ifdef _WIN32
33355662Skevans#include <winsock2.h>
34355662Skevans#include <windows.h>
35336668Skevans#endif
36336668Skevans
37336668Skevans#include "event2/event-config.h"
38337404Skevans
39337358Skevans#include <sys/types.h>
40336668Skevans#include <sys/stat.h>
41336668Skevans#ifdef EVENT__HAVE_SYS_TIME_H
42336668Skevans#include <sys/time.h>
43336668Skevans#endif
44336668Skevans#include <sys/queue.h>
45336668Skevans#ifndef _WIN32
46336701Skevans#include <sys/socket.h>
47346429Skevans#include <sys/wait.h>
48346429Skevans#include <signal.h>
49346429Skevans#include <unistd.h>
50346429Skevans#include <netdb.h>
51336668Skevans#include <netinet/in.h>
52336668Skevans#endif
53336668Skevans#include <fcntl.h>
54336668Skevans#include <signal.h>
55336668Skevans#include <stdlib.h>
56336668Skevans#include <stdio.h>
57336668Skevans#include <string.h>
58336668Skevans#include <errno.h>
59336668Skevans#include <assert.h>
60336701Skevans
61336696Skevans#ifdef EVENT__HAVE_ARPA_INET_H
62336668Skevans#include <arpa/inet.h>
63336668Skevans#endif
64336709Skevans
65336709Skevans#include "event2/event-config.h"
66336709Skevans#include "event2/event.h"
67336709Skevans#include "event2/event_struct.h"
68336709Skevans#include "event2/event_compat.h"
69336709Skevans#include "event2/tag.h"
70336668Skevans#include "event2/buffer.h"
71346429Skevans#include "event2/bufferevent.h"
72346429Skevans#include "event2/bufferevent_compat.h"
73346429Skevans#include "event2/bufferevent_struct.h"
74346429Skevans#include "event2/listener.h"
75336709Skevans#include "event2/util.h"
76336709Skevans
77336709Skevans#include "bufferevent-internal.h"
78336668Skevans#include "util-internal.h"
79336709Skevans#ifdef _WIN32
80336709Skevans#include "iocp-internal.h"
81336709Skevans#endif
82336709Skevans
83336709Skevans#include "regress.h"
84336709Skevans#include "regress_testutils.h"
85336709Skevans
86336709Skevans/*
87336709Skevans * simple bufferevent test
88336709Skevans */
89336709Skevans
90336668Skevansstatic void
91336668Skevansreadcb(struct bufferevent *bev, void *arg)
92336668Skevans{
93336668Skevans	if (evbuffer_get_length(bev->input) == 8333) {
94336668Skevans		struct evbuffer *evbuf = evbuffer_new();
95336701Skevans		assert(evbuf != NULL);
96336668Skevans
97336668Skevans		/* gratuitous test of bufferevent_read_buffer */
98336668Skevans		bufferevent_read_buffer(bev, evbuf);
99336668Skevans
100336668Skevans		bufferevent_disable(bev, EV_READ);
101336708Skevans
102336708Skevans		if (evbuffer_get_length(evbuf) == 8333) {
103336668Skevans			test_ok++;
104336708Skevans		}
105336708Skevans
106336668Skevans		evbuffer_free(evbuf);
107336668Skevans	}
108336668Skevans}
109336668Skevans
110336708Skevansstatic void
111337228Skevanswritecb(struct bufferevent *bev, void *arg)
112337404Skevans{
113336668Skevans	if (evbuffer_get_length(bev->output) == 0) {
114336668Skevans		test_ok++;
115337228Skevans	}
116337343Skevans}
117337228Skevans
118337228Skevansstatic void
119337228Skevanserrorcb(struct bufferevent *bev, short what, void *arg)
120337228Skevans{
121336668Skevans	test_ok = -2;
122337228Skevans}
123337228Skevans
124337228Skevansstatic void
125337228Skevanstest_bufferevent_impl(int use_pair)
126337343Skevans{
127337228Skevans	struct bufferevent *bev1 = NULL, *bev2 = NULL;
128337228Skevans	char buffer[8333];
129337228Skevans	int i;
130337228Skevans
131337228Skevans	if (use_pair) {
132337228Skevans		struct bufferevent *pair[2];
133337228Skevans		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
134337358Skevans		bev1 = pair[0];
135337358Skevans		bev2 = pair[1];
136337358Skevans		bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1);
137337358Skevans		bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL);
138337358Skevans		tt_int_op(bufferevent_getfd(bev1), ==, -1);
139337358Skevans		tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
140337358Skevans		tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2);
141337358Skevans		tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1);
142337358Skevans	} else {
143337358Skevans		bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
144337358Skevans		bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
145337358Skevans		tt_int_op(bufferevent_getfd(bev1), ==, pair[0]);
146337358Skevans		tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
147337358Skevans		tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL);
148337404Skevans		tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
149337358Skevans	}
150337358Skevans
151337358Skevans	{
152337358Skevans		/* Test getcb. */
153336668Skevans		bufferevent_data_cb r, w;
154336668Skevans		bufferevent_event_cb e;
155336668Skevans		void *a;
156336668Skevans		bufferevent_getcb(bev1, &r, &w, &e, &a);
157336729Skevans		tt_ptr_op(r, ==, readcb);
158336668Skevans		tt_ptr_op(w, ==, writecb);
159336668Skevans		tt_ptr_op(e, ==, errorcb);
160336713Skevans		tt_ptr_op(a, ==, use_pair ? bev1 : NULL);
161336701Skevans	}
162336701Skevans
163336701Skevans	bufferevent_disable(bev1, EV_READ);
164336701Skevans	bufferevent_enable(bev2, EV_READ);
165336710Skevans
166336701Skevans	tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE);
167336668Skevans	tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ);
168336701Skevans
169336668Skevans	for (i = 0; i < (int)sizeof(buffer); i++)
170336668Skevans		buffer[i] = i;
171336668Skevans
172336668Skevans	bufferevent_write(bev1, buffer, sizeof(buffer));
173336701Skevans
174336668Skevans	event_dispatch();
175337228Skevans
176337228Skevans	bufferevent_free(bev1);
177337228Skevans	tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
178337228Skevans	bufferevent_free(bev2);
179336668Skevans
180336701Skevans	if (test_ok != 2)
181336668Skevans		test_ok = 0;
182336668Skevansend:
183336701Skevans	;
184336668Skevans}
185336668Skevans
186336713Skevansstatic void
187336668Skevanstest_bufferevent(void)
188336713Skevans{
189337406Skevans	test_bufferevent_impl(0);
190336668Skevans}
191337406Skevans
192337406Skevansstatic void
193337406Skevanstest_bufferevent_pair(void)
194337406Skevans{
195336668Skevans	test_bufferevent_impl(1);
196337224Skevans}
197336668Skevans
198336668Skevans/*
199336668Skevans * test watermarks and bufferevent
200337224Skevans */
201336668Skevans
202336668Skevansstatic void
203336668Skevanswm_readcb(struct bufferevent *bev, void *arg)
204336668Skevans{
205336668Skevans	struct evbuffer *evbuf = evbuffer_new();
206336668Skevans	int len = (int)evbuffer_get_length(bev->input);
207337224Skevans	static int nread;
208336668Skevans
209336668Skevans	assert(len >= 10 && len <= 20);
210336668Skevans
211337224Skevans	assert(evbuf != NULL);
212336668Skevans
213336668Skevans	/* gratuitous test of bufferevent_read_buffer */
214336668Skevans	bufferevent_read_buffer(bev, evbuf);
215337224Skevans
216336668Skevans	nread += len;
217336668Skevans	if (nread == 65000) {
218336668Skevans		bufferevent_disable(bev, EV_READ);
219337224Skevans		test_ok++;
220336668Skevans	}
221336668Skevans
222336668Skevans	evbuffer_free(evbuf);
223337224Skevans}
224336668Skevans
225336668Skevansstatic void
226336709Skevanswm_writecb(struct bufferevent *bev, void *arg)
227336709Skevans{
228336668Skevans	assert(evbuffer_get_length(bev->output) <= 100);
229337228Skevans	if (evbuffer_get_length(bev->output) == 0) {
230337228Skevans		evbuffer_drain(bev->output, evbuffer_get_length(bev->output));
231336668Skevans		test_ok++;
232336668Skevans	}
233336668Skevans}
234336668Skevans
235336668Skevansstatic void
236336668Skevanswm_errorcb(struct bufferevent *bev, short what, void *arg)
237336668Skevans{
238336701Skevans	test_ok = -2;
239336668Skevans}
240336668Skevans
241336729Skevansstatic void
242337404Skevanstest_bufferevent_watermarks_impl(int use_pair)
243336668Skevans{
244336668Skevans	struct bufferevent *bev1 = NULL, *bev2 = NULL;
245336668Skevans	char buffer[65000];
246336701Skevans	size_t low, high;
247336701Skevans	int i;
248336668Skevans	test_ok = 0;
249336668Skevans
250336701Skevans	if (use_pair) {
251336668Skevans		struct bufferevent *pair[2];
252336668Skevans		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
253336668Skevans		bev1 = pair[0];
254336668Skevans		bev2 = pair[1];
255336668Skevans		bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL);
256336668Skevans		bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL);
257336668Skevans	} else {
258337358Skevans		bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL);
259337404Skevans		bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL);
260337358Skevans	}
261336668Skevans	tt_assert(bev1);
262337358Skevans	tt_assert(bev2);
263337358Skevans	bufferevent_disable(bev1, EV_READ);
264337358Skevans	bufferevent_enable(bev2, EV_READ);
265337358Skevans
266336715Skevans	/* By default, low watermarks are set to 0 */
267336715Skevans	bufferevent_getwatermark(bev1, EV_READ, &low, NULL);
268336715Skevans	tt_int_op(low, ==, 0);
269336715Skevans	bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL);
270336715Skevans	tt_int_op(low, ==, 0);
271336715Skevans
272336715Skevans	for (i = 0; i < (int)sizeof(buffer); i++)
273336668Skevans		buffer[i] = (char)i;
274336668Skevans
275336668Skevans	/* limit the reading on the receiving bufferevent */
276336668Skevans	bufferevent_setwatermark(bev2, EV_READ, 10, 20);
277336668Skevans
278336668Skevans	bufferevent_getwatermark(bev2, EV_READ, &low, &high);
279336668Skevans	tt_int_op(low, ==, 10);
280336701Skevans	tt_int_op(high, ==, 20);
281336668Skevans
282336701Skevans	/* Tell the sending bufferevent not to notify us till it's down to
283336701Skevans	   100 bytes. */
284336668Skevans	bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
285336668Skevans
286336668Skevans	bufferevent_getwatermark(bev1, EV_WRITE, &low, &high);
287336701Skevans	tt_int_op(low, ==, 100);
288336668Skevans	tt_int_op(high, ==, 2000);
289336668Skevans
290336668Skevans	bufferevent_write(bev1, buffer, sizeof(buffer));
291336668Skevans
292336668Skevans	event_dispatch();
293336668Skevans
294336668Skevans	tt_int_op(test_ok, ==, 2);
295336668Skevans
296346429Skevans	/* The write callback drained all the data from outbuf, so we
297336668Skevans	 * should have removed the write event... */
298336668Skevans	tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL));
299336668Skevans
300336668Skevansend:
301336668Skevans	if (bev1)
302336668Skevans		bufferevent_free(bev1);
303337406Skevans	if (bev2)
304346429Skevans		bufferevent_free(bev2);
305337406Skevans}
306346429Skevans
307336668Skevansstatic void
308test_bufferevent_watermarks(void)
309{
310	test_bufferevent_watermarks_impl(0);
311}
312
313static void
314test_bufferevent_pair_watermarks(void)
315{
316	test_bufferevent_watermarks_impl(1);
317}
318
319/*
320 * Test bufferevent filters
321 */
322
323/* strip an 'x' from each byte */
324
325static enum bufferevent_filter_result
326bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
327    ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
328{
329	const unsigned char *buffer;
330	unsigned i;
331
332	buffer = evbuffer_pullup(src, evbuffer_get_length(src));
333	for (i = 0; i < evbuffer_get_length(src); i += 2) {
334		assert(buffer[i] == 'x');
335		evbuffer_add(dst, buffer + i + 1, 1);
336
337		if (i + 2 > evbuffer_get_length(src))
338			break;
339	}
340
341	evbuffer_drain(src, i);
342	return (BEV_OK);
343}
344
345/* add an 'x' before each byte */
346
347static enum bufferevent_filter_result
348bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
349    ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
350{
351	const unsigned char *buffer;
352	unsigned i;
353
354	buffer = evbuffer_pullup(src, evbuffer_get_length(src));
355	for (i = 0; i < evbuffer_get_length(src); ++i) {
356		evbuffer_add(dst, "x", 1);
357		evbuffer_add(dst, buffer + i, 1);
358	}
359
360	evbuffer_drain(src, evbuffer_get_length(src));
361	return (BEV_OK);
362}
363
364static void
365test_bufferevent_filters_impl(int use_pair)
366{
367	struct bufferevent *bev1 = NULL, *bev2 = NULL;
368	struct bufferevent *bev1_base = NULL, *bev2_base = NULL;
369	char buffer[8333];
370	int i;
371
372	test_ok = 0;
373
374	if (use_pair) {
375		struct bufferevent *pair[2];
376		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
377		bev1 = pair[0];
378		bev2 = pair[1];
379	} else {
380		bev1 = bufferevent_socket_new(NULL, pair[0], 0);
381		bev2 = bufferevent_socket_new(NULL, pair[1], 0);
382	}
383	bev1_base = bev1;
384	bev2_base = bev2;
385
386	for (i = 0; i < (int)sizeof(buffer); i++)
387		buffer[i] = i;
388
389	bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter,
390				      BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
391
392	bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter,
393				      NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
394	bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL);
395	bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL);
396
397	tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base);
398	tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base);
399	tt_int_op(bufferevent_getfd(bev1), ==, -1);
400	tt_int_op(bufferevent_getfd(bev2), ==, -1);
401
402	bufferevent_disable(bev1, EV_READ);
403	bufferevent_enable(bev2, EV_READ);
404	/* insert some filters */
405	bufferevent_write(bev1, buffer, sizeof(buffer));
406
407	event_dispatch();
408
409	if (test_ok != 2)
410		test_ok = 0;
411
412end:
413	if (bev1)
414		bufferevent_free(bev1);
415	if (bev2)
416		bufferevent_free(bev2);
417
418}
419
420static void
421test_bufferevent_filters(void)
422{
423	test_bufferevent_filters_impl(0);
424}
425
426static void
427test_bufferevent_pair_filters(void)
428{
429	test_bufferevent_filters_impl(1);
430}
431
432
433static void
434sender_writecb(struct bufferevent *bev, void *ctx)
435{
436	if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) {
437		bufferevent_disable(bev,EV_READ|EV_WRITE);
438		TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev)));
439		bufferevent_free(bev);
440	}
441}
442
443static void
444sender_errorcb(struct bufferevent *bev, short what, void *ctx)
445{
446	TT_FAIL(("Got sender error %d",(int)what));
447}
448
449static int bufferevent_connect_test_flags = 0;
450static int bufferevent_trigger_test_flags = 0;
451static int n_strings_read = 0;
452static int n_reads_invoked = 0;
453
454#define TEST_STR "Now is the time for all good events to signal for " \
455	"the good of their protocol"
456static void
457listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
458    struct sockaddr *sa, int socklen, void *arg)
459{
460	struct event_base *base = arg;
461	struct bufferevent *bev;
462	const char s[] = TEST_STR;
463	TT_BLATHER(("Got a request on socket %d", (int)fd ));
464	bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags);
465	tt_assert(bev);
466	bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
467	bufferevent_write(bev, s, sizeof(s));
468end:
469	;
470}
471
472static void
473reader_eventcb(struct bufferevent *bev, short what, void *ctx)
474{
475	struct event_base *base = ctx;
476	if (what & BEV_EVENT_ERROR) {
477		perror("foobar");
478		TT_FAIL(("got connector error %d", (int)what));
479		return;
480	}
481	if (what & BEV_EVENT_CONNECTED) {
482		TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev)));
483		bufferevent_enable(bev, EV_READ);
484	}
485	if (what & BEV_EVENT_EOF) {
486		char buf[512];
487		size_t n;
488		n = bufferevent_read(bev, buf, sizeof(buf)-1);
489		tt_int_op(n, >=, 0);
490		buf[n] = '\0';
491		tt_str_op(buf, ==, TEST_STR);
492		if (++n_strings_read == 2)
493			event_base_loopexit(base, NULL);
494		TT_BLATHER(("EOF on %d: %d strings read.",
495			(int)bufferevent_getfd(bev), n_strings_read));
496	}
497end:
498	;
499}
500
501static void
502reader_readcb(struct bufferevent *bev, void *ctx)
503{
504	TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
505	n_reads_invoked++;
506}
507
508static void
509test_bufferevent_connect(void *arg)
510{
511	struct basic_test_data *data = arg;
512	struct evconnlistener *lev=NULL;
513	struct bufferevent *bev1=NULL, *bev2=NULL;
514	struct sockaddr_in localhost;
515	struct sockaddr_storage ss;
516	struct sockaddr *sa;
517	ev_socklen_t slen;
518
519	int be_flags=BEV_OPT_CLOSE_ON_FREE;
520
521	if (strstr((char*)data->setup_data, "defer")) {
522		be_flags |= BEV_OPT_DEFER_CALLBACKS;
523	}
524	if (strstr((char*)data->setup_data, "unlocked")) {
525		be_flags |= BEV_OPT_UNLOCK_CALLBACKS;
526	}
527	if (strstr((char*)data->setup_data, "lock")) {
528		be_flags |= BEV_OPT_THREADSAFE;
529	}
530	bufferevent_connect_test_flags = be_flags;
531#ifdef _WIN32
532	if (!strcmp((char*)data->setup_data, "unset_connectex")) {
533		struct win32_extension_fns *ext =
534		    (struct win32_extension_fns *)
535		    event_get_win32_extension_fns_();
536		ext->ConnectEx = NULL;
537	}
538#endif
539
540	memset(&localhost, 0, sizeof(localhost));
541
542	localhost.sin_port = 0; /* pick-a-port */
543	localhost.sin_addr.s_addr = htonl(0x7f000001L);
544	localhost.sin_family = AF_INET;
545	sa = (struct sockaddr *)&localhost;
546	lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
547	    LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
548	    16, sa, sizeof(localhost));
549	tt_assert(lev);
550
551	sa = (struct sockaddr *)&ss;
552	slen = sizeof(ss);
553	if (regress_get_listener_addr(lev, sa, &slen) < 0) {
554		tt_abort_perror("getsockname");
555	}
556
557	tt_assert(!evconnlistener_enable(lev));
558	bev1 = bufferevent_socket_new(data->base, -1, be_flags);
559	bev2 = bufferevent_socket_new(data->base, -1, be_flags);
560	tt_assert(bev1);
561	tt_assert(bev2);
562	bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base);
563	bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base);
564
565	bufferevent_enable(bev1, EV_READ);
566	bufferevent_enable(bev2, EV_READ);
567
568	tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost)));
569	tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost)));
570
571	event_base_dispatch(data->base);
572
573	tt_int_op(n_strings_read, ==, 2);
574	tt_int_op(n_reads_invoked, >=, 2);
575end:
576	if (lev)
577		evconnlistener_free(lev);
578
579	if (bev1)
580		bufferevent_free(bev1);
581
582	if (bev2)
583		bufferevent_free(bev2);
584}
585
586static void
587want_fail_eventcb(struct bufferevent *bev, short what, void *ctx)
588{
589	struct event_base *base = ctx;
590	const char *err;
591	evutil_socket_t s;
592
593	if (what & BEV_EVENT_ERROR) {
594		s = bufferevent_getfd(bev);
595		err = evutil_socket_error_to_string(evutil_socket_geterror(s));
596		TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s",
597			EV_SOCK_ARG(s), err));
598		test_ok = 1;
599	} else {
600		TT_FAIL(("didn't fail? what %hd", what));
601	}
602
603	event_base_loopexit(base, NULL);
604}
605
606static void
607close_socket_cb(evutil_socket_t fd, short what, void *arg)
608{
609	evutil_socket_t *fdp = arg;
610	if (*fdp >= 0) {
611		evutil_closesocket(*fdp);
612		*fdp = -1;
613	}
614}
615
616static void
617test_bufferevent_connect_fail(void *arg)
618{
619	struct basic_test_data *data = (struct basic_test_data *)arg;
620	struct bufferevent *bev=NULL;
621	struct sockaddr_in localhost;
622	struct sockaddr *sa = (struct sockaddr*)&localhost;
623	evutil_socket_t fake_listener = -1;
624	ev_socklen_t slen = sizeof(localhost);
625	struct event close_listener_event;
626	int close_listener_event_added = 0;
627	struct timeval one_second = { 1, 0 };
628	int r;
629
630	test_ok = 0;
631
632	memset(&localhost, 0, sizeof(localhost));
633	localhost.sin_port = 0; /* have the kernel pick a port */
634	localhost.sin_addr.s_addr = htonl(0x7f000001L);
635	localhost.sin_family = AF_INET;
636
637	/* bind, but don't listen or accept. should trigger
638	   "Connection refused" reliably on most platforms. */
639	fake_listener = socket(localhost.sin_family, SOCK_STREAM, 0);
640	tt_assert(fake_listener >= 0);
641	tt_assert(bind(fake_listener, sa, slen) == 0);
642	tt_assert(getsockname(fake_listener, sa, &slen) == 0);
643	bev = bufferevent_socket_new(data->base, -1,
644		BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
645	tt_assert(bev);
646	bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base);
647
648	r = bufferevent_socket_connect(bev, sa, slen);
649	/* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells
650	 * detects the error immediately, which is not really wrong of it. */
651	tt_want(r == 0 || r == -1);
652
653	/* Close the listener socket after a second. This should trigger
654	   "connection refused" on some other platforms, including OSX. */
655	evtimer_assign(&close_listener_event, data->base, close_socket_cb,
656	    &fake_listener);
657	event_add(&close_listener_event, &one_second);
658	close_listener_event_added = 1;
659
660	event_base_dispatch(data->base);
661
662	tt_int_op(test_ok, ==, 1);
663
664end:
665	if (fake_listener >= 0)
666		evutil_closesocket(fake_listener);
667
668	if (bev)
669		bufferevent_free(bev);
670
671	if (close_listener_event_added)
672		event_del(&close_listener_event);
673}
674
675struct timeout_cb_result {
676	struct timeval read_timeout_at;
677	struct timeval write_timeout_at;
678	struct timeval last_wrote_at;
679	int n_read_timeouts;
680	int n_write_timeouts;
681	int total_calls;
682};
683
684static void
685bev_timeout_write_cb(struct bufferevent *bev, void *arg)
686{
687	struct timeout_cb_result *res = arg;
688	evutil_gettimeofday(&res->last_wrote_at, NULL);
689}
690
691static void
692bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
693{
694	struct timeout_cb_result *res = arg;
695	++res->total_calls;
696
697	if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
698	    == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
699		evutil_gettimeofday(&res->read_timeout_at, NULL);
700		++res->n_read_timeouts;
701	}
702	if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
703	    == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
704		evutil_gettimeofday(&res->write_timeout_at, NULL);
705		++res->n_write_timeouts;
706	}
707}
708
709static void
710test_bufferevent_timeouts(void *arg)
711{
712	/* "arg" is a string containing "pair" and/or "filter". */
713	struct bufferevent *bev1 = NULL, *bev2 = NULL;
714	struct basic_test_data *data = arg;
715	int use_pair = 0, use_filter = 0;
716	struct timeval tv_w, tv_r, started_at;
717	struct timeout_cb_result res1, res2;
718	char buf[1024];
719
720	memset(&res1, 0, sizeof(res1));
721	memset(&res2, 0, sizeof(res2));
722
723	if (strstr((char*)data->setup_data, "pair"))
724		use_pair = 1;
725	if (strstr((char*)data->setup_data, "filter"))
726		use_filter = 1;
727
728	if (use_pair) {
729		struct bufferevent *p[2];
730		tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
731		bev1 = p[0];
732		bev2 = p[1];
733	} else {
734		bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
735		bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
736	}
737
738	tt_assert(bev1);
739	tt_assert(bev2);
740
741	if (use_filter) {
742		struct bufferevent *bevf1, *bevf2;
743		bevf1 = bufferevent_filter_new(bev1, NULL, NULL,
744		    BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
745		bevf2 = bufferevent_filter_new(bev2, NULL, NULL,
746		    BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
747		tt_assert(bevf1);
748		tt_assert(bevf2);
749		bev1 = bevf1;
750		bev2 = bevf2;
751	}
752
753	/* Do this nice and early. */
754	bufferevent_disable(bev2, EV_READ);
755
756	/* bev1 will try to write and read.  Both will time out. */
757	evutil_gettimeofday(&started_at, NULL);
758	tv_w.tv_sec = tv_r.tv_sec = 0;
759	tv_w.tv_usec = 100*1000;
760	tv_r.tv_usec = 150*1000;
761	bufferevent_setcb(bev1, NULL, bev_timeout_write_cb,
762	    bev_timeout_event_cb, &res1);
763	bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0);
764	bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
765	if (use_pair) {
766		/* For a pair, the fact that the other side isn't reading
767		 * makes the writer stall */
768		bufferevent_write(bev1, "ABCDEFG", 7);
769	} else {
770		/* For a real socket, the kernel's TCP buffers can eat a
771		 * fair number of bytes; make sure that at some point we
772		 * have some bytes that will stall. */
773		struct evbuffer *output = bufferevent_get_output(bev1);
774		int i;
775		memset(buf, 0xbb, sizeof(buf));
776		for (i=0;i<1024;++i) {
777			evbuffer_add_reference(output, buf, sizeof(buf),
778			    NULL, NULL);
779		}
780	}
781	bufferevent_enable(bev1, EV_READ|EV_WRITE);
782
783	/* bev2 has nothing to say, and isn't listening. */
784	bufferevent_setcb(bev2, NULL,  bev_timeout_write_cb,
785	    bev_timeout_event_cb, &res2);
786	tv_w.tv_sec = tv_r.tv_sec = 0;
787	tv_w.tv_usec = 200*1000;
788	tv_r.tv_usec = 100*1000;
789	bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
790	bufferevent_enable(bev2, EV_WRITE);
791
792	tv_r.tv_sec = 0;
793	tv_r.tv_usec = 350000;
794
795	event_base_loopexit(data->base, &tv_r);
796	event_base_dispatch(data->base);
797
798	/* XXXX Test that actually reading or writing a little resets the
799	 * timeouts. */
800
801	/* Each buf1 timeout happens, and happens only once. */
802	tt_want(res1.n_read_timeouts);
803	tt_want(res1.n_write_timeouts);
804	tt_want(res1.n_read_timeouts == 1);
805	tt_want(res1.n_write_timeouts == 1);
806
807	test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150);
808	test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100);
809
810end:
811	if (bev1)
812		bufferevent_free(bev1);
813	if (bev2)
814		bufferevent_free(bev2);
815}
816
817static void
818trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
819{
820	TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
821}
822
823static void
824trigger_eventcb(struct bufferevent *bev, short what, void *ctx)
825{
826	struct event_base *base = ctx;
827	if (what == ~0) {
828		TT_BLATHER(("Event successfully triggered."));
829		event_base_loopexit(base, NULL);
830		return;
831	}
832	reader_eventcb(bev, what, ctx);
833}
834
835static void
836trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
837{
838	TT_BLATHER(("Read successfully triggered."));
839	n_reads_invoked++;
840	bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags);
841}
842
843static void
844trigger_readcb(struct bufferevent *bev, void *ctx)
845{
846	struct timeval timeout = { 30, 0 };
847	struct event_base *base = ctx;
848	size_t low, high, len;
849	int expected_reads;
850
851	TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
852	expected_reads = ++n_reads_invoked;
853
854	bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx);
855
856	bufferevent_getwatermark(bev, EV_READ, &low, &high);
857	len = evbuffer_get_length(bufferevent_get_input(bev));
858
859	bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
860	bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
861	/* no callback expected */
862	tt_int_op(n_reads_invoked, ==, expected_reads);
863
864	if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
865	    (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
866		/* will be deferred */
867	} else {
868		expected_reads++;
869	}
870
871	event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
872
873	bufferevent_trigger(bev, EV_READ,
874	    bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
875	tt_int_op(n_reads_invoked, ==, expected_reads);
876
877	bufferevent_setwatermark(bev, EV_READ, low, high);
878end:
879	;
880}
881
882static void
883test_bufferevent_trigger(void *arg)
884{
885	struct basic_test_data *data = arg;
886	struct evconnlistener *lev=NULL;
887	struct bufferevent *bev=NULL;
888	struct sockaddr_in localhost;
889	struct sockaddr_storage ss;
890	struct sockaddr *sa;
891	ev_socklen_t slen;
892
893	int be_flags=BEV_OPT_CLOSE_ON_FREE;
894	int trig_flags=0;
895
896	if (strstr((char*)data->setup_data, "defer")) {
897		be_flags |= BEV_OPT_DEFER_CALLBACKS;
898	}
899	bufferevent_connect_test_flags = be_flags;
900
901	if (strstr((char*)data->setup_data, "postpone")) {
902		trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
903	}
904	bufferevent_trigger_test_flags = trig_flags;
905
906	memset(&localhost, 0, sizeof(localhost));
907
908	localhost.sin_port = 0; /* pick-a-port */
909	localhost.sin_addr.s_addr = htonl(0x7f000001L);
910	localhost.sin_family = AF_INET;
911	sa = (struct sockaddr *)&localhost;
912	lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
913	    LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
914	    16, sa, sizeof(localhost));
915	tt_assert(lev);
916
917	sa = (struct sockaddr *)&ss;
918	slen = sizeof(ss);
919	if (regress_get_listener_addr(lev, sa, &slen) < 0) {
920		tt_abort_perror("getsockname");
921	}
922
923	tt_assert(!evconnlistener_enable(lev));
924	bev = bufferevent_socket_new(data->base, -1, be_flags);
925	tt_assert(bev);
926	bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base);
927
928	bufferevent_enable(bev, EV_READ);
929
930	tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
931
932	event_base_dispatch(data->base);
933
934	tt_int_op(n_reads_invoked, ==, 2);
935end:
936	if (lev)
937		evconnlistener_free(lev);
938
939	if (bev)
940		bufferevent_free(bev);
941}
942
943struct testcase_t bufferevent_testcases[] = {
944
945	LEGACY(bufferevent, TT_ISOLATED),
946	LEGACY(bufferevent_pair, TT_ISOLATED),
947	LEGACY(bufferevent_watermarks, TT_ISOLATED),
948	LEGACY(bufferevent_pair_watermarks, TT_ISOLATED),
949	LEGACY(bufferevent_filters, TT_ISOLATED),
950	LEGACY(bufferevent_pair_filters, TT_ISOLATED),
951	{ "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE,
952	  &basic_setup, (void*)"" },
953	{ "bufferevent_connect_defer", test_bufferevent_connect,
954	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
955	{ "bufferevent_connect_lock", test_bufferevent_connect,
956	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" },
957	{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
958	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
959	  (void*)"defer lock" },
960	{ "bufferevent_connect_unlocked_cbs", test_bufferevent_connect,
961	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
962	  (void*)"lock defer unlocked" },
963	{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
964	  TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
965	{ "bufferevent_timeout", test_bufferevent_timeouts,
966	  TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" },
967	{ "bufferevent_timeout_pair", test_bufferevent_timeouts,
968	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
969	{ "bufferevent_timeout_filter", test_bufferevent_timeouts,
970	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
971	{ "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
972	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
973	{ "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
974	  &basic_setup, (void*)"" },
975	{ "bufferevent_trigger_defer", test_bufferevent_trigger,
976	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
977	{ "bufferevent_trigger_postpone", test_bufferevent_trigger,
978	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
979	  (void*)"postpone" },
980	{ "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
981	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
982	  (void*)"defer postpone" },
983#ifdef EVENT__HAVE_LIBZ
984	LEGACY(bufferevent_zlib, TT_ISOLATED),
985#else
986	{ "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL },
987#endif
988
989	END_OF_TESTCASES,
990};
991
992struct testcase_t bufferevent_iocp_testcases[] = {
993
994	LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP),
995	LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
996	LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
997	{ "bufferevent_connect", test_bufferevent_connect,
998	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
999	{ "bufferevent_connect_defer", test_bufferevent_connect,
1000	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" },
1001	{ "bufferevent_connect_lock", test_bufferevent_connect,
1002	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
1003	  (void*)"lock" },
1004	{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
1005	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
1006	  (void*)"defer lock" },
1007	{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
1008	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
1009	{ "bufferevent_connect_nonblocking", test_bufferevent_connect,
1010	  TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
1011	  (void*)"unset_connectex" },
1012
1013	END_OF_TESTCASES,
1014};
1015