1275970Scy/*
2275970Scy * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
3275970Scy *
4275970Scy * All rights reserved.
5275970Scy *
6275970Scy * Redistribution and use in source and binary forms, with or without
7275970Scy * modification, are permitted provided that the following conditions
8275970Scy * are met:
9275970Scy * 1. Redistributions of source code must retain the above copyright
10275970Scy *    notice, this list of conditions and the following disclaimer.
11275970Scy * 2. Redistributions in binary form must reproduce the above copyright
12275970Scy *    notice, this list of conditions and the following disclaimer in the
13275970Scy *    documentation and/or other materials provided with the distribution.
14275970Scy * 3. The name of the author may not be used to endorse or promote products
15275970Scy *    derived from this software without specific prior written permission.
16275970Scy *
17275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27275970Scy */
28275970Scy
29275970Scy#include "event2/event-config.h"
30275970Scy#include "evconfig-private.h"
31275970Scy
32275970Scy#ifdef EVENT__HAVE_SYS_TIME_H
33275970Scy#include <sys/time.h>
34275970Scy#endif
35275970Scy
36275970Scy#include <errno.h>
37275970Scy#include <stdio.h>
38275970Scy#include <stdlib.h>
39275970Scy#include <string.h>
40275970Scy#ifdef EVENT__HAVE_STDARG_H
41275970Scy#include <stdarg.h>
42275970Scy#endif
43275970Scy#ifdef EVENT__HAVE_UNISTD_H
44275970Scy#include <unistd.h>
45275970Scy#endif
46275970Scy
47275970Scy#ifdef _WIN32
48275970Scy#include <winsock2.h>
49275970Scy#include <ws2tcpip.h>
50275970Scy#endif
51275970Scy
52275970Scy#include <sys/queue.h>
53275970Scy
54275970Scy#include "event2/util.h"
55275970Scy#include "event2/bufferevent.h"
56275970Scy#include "event2/buffer.h"
57275970Scy#include "event2/bufferevent_struct.h"
58275970Scy#include "event2/event.h"
59275970Scy#include "event2/util.h"
60275970Scy#include "event-internal.h"
61275970Scy#include "log-internal.h"
62275970Scy#include "mm-internal.h"
63275970Scy#include "bufferevent-internal.h"
64275970Scy#include "util-internal.h"
65275970Scy#include "iocp-internal.h"
66275970Scy
67275970Scy#ifndef SO_UPDATE_CONNECT_CONTEXT
68275970Scy/* Mingw is sometimes missing this */
69275970Scy#define SO_UPDATE_CONNECT_CONTEXT 0x7010
70275970Scy#endif
71275970Scy
72275970Scy/* prototypes */
73275970Scystatic int be_async_enable(struct bufferevent *, short);
74275970Scystatic int be_async_disable(struct bufferevent *, short);
75275970Scystatic void be_async_destruct(struct bufferevent *);
76275970Scystatic int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
77275970Scystatic int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
78275970Scy
79275970Scystruct bufferevent_async {
80275970Scy	struct bufferevent_private bev;
81275970Scy	struct event_overlapped connect_overlapped;
82275970Scy	struct event_overlapped read_overlapped;
83275970Scy	struct event_overlapped write_overlapped;
84275970Scy	size_t read_in_progress;
85275970Scy	size_t write_in_progress;
86275970Scy	unsigned ok : 1;
87275970Scy	unsigned read_added : 1;
88275970Scy	unsigned write_added : 1;
89275970Scy};
90275970Scy
91275970Scyconst struct bufferevent_ops bufferevent_ops_async = {
92275970Scy	"socket_async",
93275970Scy	evutil_offsetof(struct bufferevent_async, bev.bev),
94275970Scy	be_async_enable,
95275970Scy	be_async_disable,
96275970Scy	NULL, /* Unlink */
97275970Scy	be_async_destruct,
98275970Scy	bufferevent_generic_adj_timeouts_,
99275970Scy	be_async_flush,
100275970Scy	be_async_ctrl,
101275970Scy};
102275970Scy
103275970Scystatic inline struct bufferevent_async *
104275970Scyupcast(struct bufferevent *bev)
105275970Scy{
106275970Scy	struct bufferevent_async *bev_a;
107275970Scy	if (bev->be_ops != &bufferevent_ops_async)
108275970Scy		return NULL;
109275970Scy	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
110275970Scy	return bev_a;
111275970Scy}
112275970Scy
113275970Scystatic inline struct bufferevent_async *
114275970Scyupcast_connect(struct event_overlapped *eo)
115275970Scy{
116275970Scy	struct bufferevent_async *bev_a;
117275970Scy	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
118275970Scy	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
119275970Scy	return bev_a;
120275970Scy}
121275970Scy
122275970Scystatic inline struct bufferevent_async *
123275970Scyupcast_read(struct event_overlapped *eo)
124275970Scy{
125275970Scy	struct bufferevent_async *bev_a;
126275970Scy	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
127275970Scy	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
128275970Scy	return bev_a;
129275970Scy}
130275970Scy
131275970Scystatic inline struct bufferevent_async *
132275970Scyupcast_write(struct event_overlapped *eo)
133275970Scy{
134275970Scy	struct bufferevent_async *bev_a;
135275970Scy	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
136275970Scy	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
137275970Scy	return bev_a;
138275970Scy}
139275970Scy
140275970Scystatic void
141275970Scybev_async_del_write(struct bufferevent_async *beva)
142275970Scy{
143275970Scy	struct bufferevent *bev = &beva->bev.bev;
144275970Scy
145275970Scy	if (beva->write_added) {
146275970Scy		beva->write_added = 0;
147275970Scy		event_base_del_virtual_(bev->ev_base);
148275970Scy	}
149275970Scy}
150275970Scy
151275970Scystatic void
152275970Scybev_async_del_read(struct bufferevent_async *beva)
153275970Scy{
154275970Scy	struct bufferevent *bev = &beva->bev.bev;
155275970Scy
156275970Scy	if (beva->read_added) {
157275970Scy		beva->read_added = 0;
158275970Scy		event_base_del_virtual_(bev->ev_base);
159275970Scy	}
160275970Scy}
161275970Scy
162275970Scystatic void
163275970Scybev_async_add_write(struct bufferevent_async *beva)
164275970Scy{
165275970Scy	struct bufferevent *bev = &beva->bev.bev;
166275970Scy
167275970Scy	if (!beva->write_added) {
168275970Scy		beva->write_added = 1;
169275970Scy		event_base_add_virtual_(bev->ev_base);
170275970Scy	}
171275970Scy}
172275970Scy
173275970Scystatic void
174275970Scybev_async_add_read(struct bufferevent_async *beva)
175275970Scy{
176275970Scy	struct bufferevent *bev = &beva->bev.bev;
177275970Scy
178275970Scy	if (!beva->read_added) {
179275970Scy		beva->read_added = 1;
180275970Scy		event_base_add_virtual_(bev->ev_base);
181275970Scy	}
182275970Scy}
183275970Scy
184275970Scystatic void
185275970Scybev_async_consider_writing(struct bufferevent_async *beva)
186275970Scy{
187275970Scy	size_t at_most;
188275970Scy	int limit;
189275970Scy	struct bufferevent *bev = &beva->bev.bev;
190275970Scy
191275970Scy	/* Don't write if there's a write in progress, or we do not
192275970Scy	 * want to write, or when there's nothing left to write. */
193275970Scy	if (beva->write_in_progress || beva->bev.connecting)
194275970Scy		return;
195275970Scy	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
196275970Scy	    !evbuffer_get_length(bev->output)) {
197275970Scy		bev_async_del_write(beva);
198275970Scy		return;
199275970Scy	}
200275970Scy
201275970Scy	at_most = evbuffer_get_length(bev->output);
202275970Scy
203275970Scy	/* This is safe so long as bufferevent_get_write_max never returns
204275970Scy	 * more than INT_MAX.  That's true for now. XXXX */
205275970Scy	limit = (int)bufferevent_get_write_max_(&beva->bev);
206275970Scy	if (at_most >= (size_t)limit && limit >= 0)
207275970Scy		at_most = limit;
208275970Scy
209275970Scy	if (beva->bev.write_suspended) {
210275970Scy		bev_async_del_write(beva);
211275970Scy		return;
212275970Scy	}
213275970Scy
214275970Scy	/*  XXXX doesn't respect low-water mark very well. */
215275970Scy	bufferevent_incref_(bev);
216275970Scy	if (evbuffer_launch_write_(bev->output, at_most,
217275970Scy	    &beva->write_overlapped)) {
218275970Scy		bufferevent_decref_(bev);
219275970Scy		beva->ok = 0;
220275970Scy		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
221275970Scy	} else {
222275970Scy		beva->write_in_progress = at_most;
223275970Scy		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
224275970Scy		bev_async_add_write(beva);
225275970Scy	}
226275970Scy}
227275970Scy
228275970Scystatic void
229275970Scybev_async_consider_reading(struct bufferevent_async *beva)
230275970Scy{
231275970Scy	size_t cur_size;
232275970Scy	size_t read_high;
233275970Scy	size_t at_most;
234275970Scy	int limit;
235275970Scy	struct bufferevent *bev = &beva->bev.bev;
236275970Scy
237275970Scy	/* Don't read if there is a read in progress, or we do not
238275970Scy	 * want to read. */
239275970Scy	if (beva->read_in_progress || beva->bev.connecting)
240275970Scy		return;
241275970Scy	if (!beva->ok || !(bev->enabled&EV_READ)) {
242275970Scy		bev_async_del_read(beva);
243275970Scy		return;
244275970Scy	}
245275970Scy
246275970Scy	/* Don't read if we're full */
247275970Scy	cur_size = evbuffer_get_length(bev->input);
248275970Scy	read_high = bev->wm_read.high;
249275970Scy	if (read_high) {
250275970Scy		if (cur_size >= read_high) {
251275970Scy			bev_async_del_read(beva);
252275970Scy			return;
253275970Scy		}
254275970Scy		at_most = read_high - cur_size;
255275970Scy	} else {
256275970Scy		at_most = 16384; /* FIXME totally magic. */
257275970Scy	}
258275970Scy
259275970Scy	/* XXXX This over-commits. */
260275970Scy	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
261275970Scy	limit = (int)bufferevent_get_read_max_(&beva->bev);
262275970Scy	if (at_most >= (size_t)limit && limit >= 0)
263275970Scy		at_most = limit;
264275970Scy
265275970Scy	if (beva->bev.read_suspended) {
266275970Scy		bev_async_del_read(beva);
267275970Scy		return;
268275970Scy	}
269275970Scy
270275970Scy	bufferevent_incref_(bev);
271275970Scy	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
272275970Scy		beva->ok = 0;
273275970Scy		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
274275970Scy		bufferevent_decref_(bev);
275275970Scy	} else {
276275970Scy		beva->read_in_progress = at_most;
277275970Scy		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
278275970Scy		bev_async_add_read(beva);
279275970Scy	}
280275970Scy
281275970Scy	return;
282275970Scy}
283275970Scy
284275970Scystatic void
285275970Scybe_async_outbuf_callback(struct evbuffer *buf,
286275970Scy    const struct evbuffer_cb_info *cbinfo,
287275970Scy    void *arg)
288275970Scy{
289275970Scy	struct bufferevent *bev = arg;
290275970Scy	struct bufferevent_async *bev_async = upcast(bev);
291275970Scy
292275970Scy	/* If we added data to the outbuf and were not writing before,
293275970Scy	 * we may want to write now. */
294275970Scy
295275970Scy	bufferevent_incref_and_lock_(bev);
296275970Scy
297275970Scy	if (cbinfo->n_added)
298275970Scy		bev_async_consider_writing(bev_async);
299275970Scy
300275970Scy	bufferevent_decref_and_unlock_(bev);
301275970Scy}
302275970Scy
303275970Scystatic void
304275970Scybe_async_inbuf_callback(struct evbuffer *buf,
305275970Scy    const struct evbuffer_cb_info *cbinfo,
306275970Scy    void *arg)
307275970Scy{
308275970Scy	struct bufferevent *bev = arg;
309275970Scy	struct bufferevent_async *bev_async = upcast(bev);
310275970Scy
311275970Scy	/* If we drained data from the inbuf and were not reading before,
312275970Scy	 * we may want to read now */
313275970Scy
314275970Scy	bufferevent_incref_and_lock_(bev);
315275970Scy
316275970Scy	if (cbinfo->n_deleted)
317275970Scy		bev_async_consider_reading(bev_async);
318275970Scy
319275970Scy	bufferevent_decref_and_unlock_(bev);
320275970Scy}
321275970Scy
322275970Scystatic int
323275970Scybe_async_enable(struct bufferevent *buf, short what)
324275970Scy{
325275970Scy	struct bufferevent_async *bev_async = upcast(buf);
326275970Scy
327275970Scy	if (!bev_async->ok)
328275970Scy		return -1;
329275970Scy
330275970Scy	if (bev_async->bev.connecting) {
331275970Scy		/* Don't launch anything during connection attempts. */
332275970Scy		return 0;
333275970Scy	}
334275970Scy
335275970Scy	if (what & EV_READ)
336275970Scy		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
337275970Scy	if (what & EV_WRITE)
338275970Scy		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
339275970Scy
340275970Scy	/* If we newly enable reading or writing, and we aren't reading or
341275970Scy	   writing already, consider launching a new read or write. */
342275970Scy
343275970Scy	if (what & EV_READ)
344275970Scy		bev_async_consider_reading(bev_async);
345275970Scy	if (what & EV_WRITE)
346275970Scy		bev_async_consider_writing(bev_async);
347275970Scy	return 0;
348275970Scy}
349275970Scy
350275970Scystatic int
351275970Scybe_async_disable(struct bufferevent *bev, short what)
352275970Scy{
353275970Scy	struct bufferevent_async *bev_async = upcast(bev);
354275970Scy	/* XXXX If we disable reading or writing, we may want to consider
355275970Scy	 * canceling any in-progress read or write operation, though it might
356275970Scy	 * not work. */
357275970Scy
358275970Scy	if (what & EV_READ) {
359275970Scy		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
360275970Scy		bev_async_del_read(bev_async);
361275970Scy	}
362275970Scy	if (what & EV_WRITE) {
363275970Scy		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
364275970Scy		bev_async_del_write(bev_async);
365275970Scy	}
366275970Scy
367275970Scy	return 0;
368275970Scy}
369275970Scy
370275970Scystatic void
371275970Scybe_async_destruct(struct bufferevent *bev)
372275970Scy{
373275970Scy	struct bufferevent_async *bev_async = upcast(bev);
374275970Scy	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
375275970Scy	evutil_socket_t fd;
376275970Scy
377275970Scy	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
378275970Scy			!upcast(bev)->read_in_progress);
379275970Scy
380275970Scy	bev_async_del_read(bev_async);
381275970Scy	bev_async_del_write(bev_async);
382275970Scy
383275970Scy	fd = evbuffer_overlapped_get_fd_(bev->input);
384282408Scy	if (fd != (evutil_socket_t)INVALID_SOCKET &&
385282408Scy		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
386275970Scy		evutil_closesocket(fd);
387282408Scy		evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
388275970Scy	}
389275970Scy}
390275970Scy
391275970Scy/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
392275970Scy * we use WSAGetOverlappedResult to translate. */
393275970Scystatic void
394275970Scybev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
395275970Scy{
396275970Scy	DWORD bytes, flags;
397275970Scy	evutil_socket_t fd;
398275970Scy
399275970Scy	fd = evbuffer_overlapped_get_fd_(bev->input);
400275970Scy	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
401275970Scy}
402275970Scy
403275970Scystatic int
404275970Scybe_async_flush(struct bufferevent *bev, short what,
405275970Scy    enum bufferevent_flush_mode mode)
406275970Scy{
407275970Scy	return 0;
408275970Scy}
409275970Scy
410275970Scystatic void
411275970Scyconnect_complete(struct event_overlapped *eo, ev_uintptr_t key,
412275970Scy    ev_ssize_t nbytes, int ok)
413275970Scy{
414275970Scy	struct bufferevent_async *bev_a = upcast_connect(eo);
415275970Scy	struct bufferevent *bev = &bev_a->bev.bev;
416275970Scy	evutil_socket_t sock;
417275970Scy
418275970Scy	BEV_LOCK(bev);
419275970Scy
420275970Scy	EVUTIL_ASSERT(bev_a->bev.connecting);
421275970Scy	bev_a->bev.connecting = 0;
422275970Scy	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
423275970Scy	/* XXXX Handle error? */
424275970Scy	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
425275970Scy
426275970Scy	if (ok)
427275970Scy		bufferevent_async_set_connected_(bev);
428275970Scy	else
429275970Scy		bev_async_set_wsa_error(bev, eo);
430275970Scy
431275970Scy	bufferevent_run_eventcb_(bev,
432275970Scy			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
433275970Scy
434275970Scy	event_base_del_virtual_(bev->ev_base);
435275970Scy
436275970Scy	bufferevent_decref_and_unlock_(bev);
437275970Scy}
438275970Scy
439275970Scystatic void
440275970Scyread_complete(struct event_overlapped *eo, ev_uintptr_t key,
441275970Scy    ev_ssize_t nbytes, int ok)
442275970Scy{
443275970Scy	struct bufferevent_async *bev_a = upcast_read(eo);
444275970Scy	struct bufferevent *bev = &bev_a->bev.bev;
445275970Scy	short what = BEV_EVENT_READING;
446275970Scy	ev_ssize_t amount_unread;
447275970Scy	BEV_LOCK(bev);
448275970Scy	EVUTIL_ASSERT(bev_a->read_in_progress);
449275970Scy
450275970Scy	amount_unread = bev_a->read_in_progress - nbytes;
451275970Scy	evbuffer_commit_read_(bev->input, nbytes);
452275970Scy	bev_a->read_in_progress = 0;
453275970Scy	if (amount_unread)
454275970Scy		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
455275970Scy
456275970Scy	if (!ok)
457275970Scy		bev_async_set_wsa_error(bev, eo);
458275970Scy
459275970Scy	if (bev_a->ok) {
460275970Scy		if (ok && nbytes) {
461275970Scy			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
462275970Scy			bufferevent_trigger_nolock_(bev, EV_READ, 0);
463275970Scy			bev_async_consider_reading(bev_a);
464275970Scy		} else if (!ok) {
465275970Scy			what |= BEV_EVENT_ERROR;
466275970Scy			bev_a->ok = 0;
467275970Scy			bufferevent_run_eventcb_(bev, what, 0);
468275970Scy		} else if (!nbytes) {
469275970Scy			what |= BEV_EVENT_EOF;
470275970Scy			bev_a->ok = 0;
471275970Scy			bufferevent_run_eventcb_(bev, what, 0);
472275970Scy		}
473275970Scy	}
474275970Scy
475275970Scy	bufferevent_decref_and_unlock_(bev);
476275970Scy}
477275970Scy
478275970Scystatic void
479275970Scywrite_complete(struct event_overlapped *eo, ev_uintptr_t key,
480275970Scy    ev_ssize_t nbytes, int ok)
481275970Scy{
482275970Scy	struct bufferevent_async *bev_a = upcast_write(eo);
483275970Scy	struct bufferevent *bev = &bev_a->bev.bev;
484275970Scy	short what = BEV_EVENT_WRITING;
485275970Scy	ev_ssize_t amount_unwritten;
486275970Scy
487275970Scy	BEV_LOCK(bev);
488275970Scy	EVUTIL_ASSERT(bev_a->write_in_progress);
489275970Scy
490275970Scy	amount_unwritten = bev_a->write_in_progress - nbytes;
491275970Scy	evbuffer_commit_write_(bev->output, nbytes);
492275970Scy	bev_a->write_in_progress = 0;
493275970Scy
494275970Scy	if (amount_unwritten)
495275970Scy		bufferevent_decrement_write_buckets_(&bev_a->bev,
496275970Scy		                                     -amount_unwritten);
497275970Scy
498275970Scy
499275970Scy	if (!ok)
500275970Scy		bev_async_set_wsa_error(bev, eo);
501275970Scy
502275970Scy	if (bev_a->ok) {
503275970Scy		if (ok && nbytes) {
504275970Scy			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
505275970Scy			bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
506275970Scy			bev_async_consider_writing(bev_a);
507275970Scy		} else if (!ok) {
508275970Scy			what |= BEV_EVENT_ERROR;
509275970Scy			bev_a->ok = 0;
510275970Scy			bufferevent_run_eventcb_(bev, what, 0);
511275970Scy		} else if (!nbytes) {
512275970Scy			what |= BEV_EVENT_EOF;
513275970Scy			bev_a->ok = 0;
514275970Scy			bufferevent_run_eventcb_(bev, what, 0);
515275970Scy		}
516275970Scy	}
517275970Scy
518275970Scy	bufferevent_decref_and_unlock_(bev);
519275970Scy}
520275970Scy
521275970Scystruct bufferevent *
522275970Scybufferevent_async_new_(struct event_base *base,
523275970Scy    evutil_socket_t fd, int options)
524275970Scy{
525275970Scy	struct bufferevent_async *bev_a;
526275970Scy	struct bufferevent *bev;
527275970Scy	struct event_iocp_port *iocp;
528275970Scy
529275970Scy	options |= BEV_OPT_THREADSAFE;
530275970Scy
531275970Scy	if (!(iocp = event_base_get_iocp_(base)))
532275970Scy		return NULL;
533275970Scy
534275970Scy	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
535275970Scy		int err = GetLastError();
536275970Scy		/* We may have alrady associated this fd with a port.
537275970Scy		 * Let's hope it's this port, and that the error code
538275970Scy		 * for doing this neer changes. */
539275970Scy		if (err != ERROR_INVALID_PARAMETER)
540275970Scy			return NULL;
541275970Scy	}
542275970Scy
543275970Scy	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
544275970Scy		return NULL;
545275970Scy
546275970Scy	bev = &bev_a->bev.bev;
547275970Scy	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
548275970Scy		mm_free(bev_a);
549275970Scy		return NULL;
550275970Scy	}
551275970Scy	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
552275970Scy		evbuffer_free(bev->input);
553275970Scy		mm_free(bev_a);
554275970Scy		return NULL;
555275970Scy	}
556275970Scy
557275970Scy	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
558275970Scy		options)<0)
559275970Scy		goto err;
560275970Scy
561275970Scy	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
562275970Scy	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
563275970Scy
564275970Scy	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
565275970Scy	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
566275970Scy	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
567275970Scy
568282408Scy	bufferevent_init_generic_timeout_cbs_(bev);
569282408Scy
570275970Scy	bev_a->ok = fd >= 0;
571275970Scy
572275970Scy	return bev;
573275970Scyerr:
574275970Scy	bufferevent_free(&bev_a->bev.bev);
575275970Scy	return NULL;
576275970Scy}
577275970Scy
578275970Scyvoid
579275970Scybufferevent_async_set_connected_(struct bufferevent *bev)
580275970Scy{
581275970Scy	struct bufferevent_async *bev_async = upcast(bev);
582275970Scy	bev_async->ok = 1;
583275970Scy	bufferevent_init_generic_timeout_cbs_(bev);
584275970Scy	/* Now's a good time to consider reading/writing */
585275970Scy	be_async_enable(bev, bev->enabled);
586275970Scy}
587275970Scy
588275970Scyint
589275970Scybufferevent_async_can_connect_(struct bufferevent *bev)
590275970Scy{
591275970Scy	const struct win32_extension_fns *ext =
592275970Scy	    event_get_win32_extension_fns_();
593275970Scy
594275970Scy	if (BEV_IS_ASYNC(bev) &&
595275970Scy	    event_base_get_iocp_(bev->ev_base) &&
596275970Scy	    ext && ext->ConnectEx)
597275970Scy		return 1;
598275970Scy
599275970Scy	return 0;
600275970Scy}
601275970Scy
602275970Scyint
603275970Scybufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
604275970Scy	const struct sockaddr *sa, int socklen)
605275970Scy{
606275970Scy	BOOL rc;
607275970Scy	struct bufferevent_async *bev_async = upcast(bev);
608275970Scy	struct sockaddr_storage ss;
609275970Scy	const struct win32_extension_fns *ext =
610275970Scy	    event_get_win32_extension_fns_();
611275970Scy
612275970Scy	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
613275970Scy
614275970Scy	/* ConnectEx() requires that the socket be bound to an address
615275970Scy	 * with bind() before using, otherwise it will fail. We attempt
616275970Scy	 * to issue a bind() here, taking into account that the error
617275970Scy	 * code is set to WSAEINVAL when the socket is already bound. */
618275970Scy	memset(&ss, 0, sizeof(ss));
619275970Scy	if (sa->sa_family == AF_INET) {
620275970Scy		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
621275970Scy		sin->sin_family = AF_INET;
622275970Scy		sin->sin_addr.s_addr = INADDR_ANY;
623275970Scy	} else if (sa->sa_family == AF_INET6) {
624275970Scy		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
625275970Scy		sin6->sin6_family = AF_INET6;
626275970Scy		sin6->sin6_addr = in6addr_any;
627275970Scy	} else {
628275970Scy		/* Well, the user will have to bind() */
629275970Scy		return -1;
630275970Scy	}
631275970Scy	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
632275970Scy	    WSAGetLastError() != WSAEINVAL)
633275970Scy		return -1;
634275970Scy
635275970Scy	event_base_add_virtual_(bev->ev_base);
636275970Scy	bufferevent_incref_(bev);
637275970Scy	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
638275970Scy			    &bev_async->connect_overlapped.overlapped);
639275970Scy	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
640275970Scy		return 0;
641275970Scy
642275970Scy	event_base_del_virtual_(bev->ev_base);
643275970Scy	bufferevent_decref_(bev);
644275970Scy
645275970Scy	return -1;
646275970Scy}
647275970Scy
648275970Scystatic int
649275970Scybe_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
650275970Scy    union bufferevent_ctrl_data *data)
651275970Scy{
652275970Scy	switch (op) {
653275970Scy	case BEV_CTRL_GET_FD:
654275970Scy		data->fd = evbuffer_overlapped_get_fd_(bev->input);
655275970Scy		return 0;
656275970Scy	case BEV_CTRL_SET_FD: {
657275970Scy		struct event_iocp_port *iocp;
658275970Scy
659275970Scy		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
660275970Scy			return 0;
661275970Scy		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
662275970Scy			return -1;
663275970Scy		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
664275970Scy			return -1;
665275970Scy		evbuffer_overlapped_set_fd_(bev->input, data->fd);
666275970Scy		evbuffer_overlapped_set_fd_(bev->output, data->fd);
667275970Scy		return 0;
668275970Scy	}
669275970Scy	case BEV_CTRL_CANCEL_ALL: {
670275970Scy		struct bufferevent_async *bev_a = upcast(bev);
671275970Scy		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
672275970Scy		if (fd != (evutil_socket_t)INVALID_SOCKET &&
673275970Scy		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
674275970Scy			closesocket(fd);
675282408Scy			evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
676275970Scy		}
677275970Scy		bev_a->ok = 0;
678275970Scy		return 0;
679275970Scy	}
680275970Scy	case BEV_CTRL_GET_UNDERLYING:
681275970Scy	default:
682275970Scy		return -1;
683275970Scy	}
684275970Scy}
685275970Scy
686275970Scy
687