1275970Scy/*
2275970Scy * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
3275970Scy *
4275970Scy * Redistribution and use in source and binary forms, with or without
5275970Scy * modification, are permitted provided that the following conditions
6275970Scy * are met:
7275970Scy * 1. Redistributions of source code must retain the above copyright
8275970Scy *    notice, this list of conditions and the following disclaimer.
9275970Scy * 2. Redistributions in binary form must reproduce the above copyright
10275970Scy *    notice, this list of conditions and the following disclaimer in the
11275970Scy *    documentation and/or other materials provided with the distribution.
12275970Scy * 3. The name of the author may not be used to endorse or promote products
13275970Scy *    derived from this software without specific prior written permission.
14275970Scy *
15275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25275970Scy */
26275970Scy
27275970Scy/**
28275970Scy   @file buffer_iocp.c
29275970Scy
30275970Scy   This module implements overlapped read and write functions for evbuffer
31275970Scy   objects on Windows.
32275970Scy*/
33275970Scy#include "event2/event-config.h"
34275970Scy#include "evconfig-private.h"
35275970Scy
36275970Scy#include "event2/buffer.h"
37275970Scy#include "event2/buffer_compat.h"
38275970Scy#include "event2/util.h"
39275970Scy#include "event2/thread.h"
40275970Scy#include "util-internal.h"
41275970Scy#include "evthread-internal.h"
42275970Scy#include "evbuffer-internal.h"
43275970Scy#include "iocp-internal.h"
44275970Scy#include "mm-internal.h"
45275970Scy
46275970Scy#include <winsock2.h>
47275970Scy#include <windows.h>
48275970Scy#include <stdio.h>
49275970Scy
50275970Scy#define MAX_WSABUFS 16
51275970Scy
52275970Scy/** An evbuffer that can handle overlapped IO. */
53275970Scystruct evbuffer_overlapped {
54275970Scy	struct evbuffer buffer;
55275970Scy	/** The socket that we're doing overlapped IO on. */
56275970Scy	evutil_socket_t fd;
57275970Scy
58275970Scy	/** pending I/O type */
59275970Scy	unsigned read_in_progress : 1;
60275970Scy	unsigned write_in_progress : 1;
61275970Scy
62275970Scy	/** The first pinned chain in the buffer. */
63275970Scy	struct evbuffer_chain *first_pinned;
64275970Scy
65275970Scy	/** How many chains are pinned; how many of the fields in buffers
66275970Scy	 * are we using. */
67275970Scy	int n_buffers;
68275970Scy	WSABUF buffers[MAX_WSABUFS];
69275970Scy};
70275970Scy
71275970Scy/** Given an evbuffer, return the correponding evbuffer structure, or NULL if
72275970Scy * the evbuffer isn't overlapped. */
73275970Scystatic inline struct evbuffer_overlapped *
74275970Scyupcast_evbuffer(struct evbuffer *buf)
75275970Scy{
76275970Scy	if (!buf || !buf->is_overlapped)
77275970Scy		return NULL;
78275970Scy	return EVUTIL_UPCAST(buf, struct evbuffer_overlapped, buffer);
79275970Scy}
80275970Scy
81275970Scy/** Unpin all the chains noted as pinned in 'eo'. */
82275970Scystatic void
83275970Scypin_release(struct evbuffer_overlapped *eo, unsigned flag)
84275970Scy{
85275970Scy	int i;
86275970Scy	struct evbuffer_chain *next, *chain = eo->first_pinned;
87275970Scy
88275970Scy	for (i = 0; i < eo->n_buffers; ++i) {
89275970Scy		EVUTIL_ASSERT(chain);
90275970Scy		next = chain->next;
91275970Scy		evbuffer_chain_unpin_(chain, flag);
92275970Scy		chain = next;
93275970Scy	}
94275970Scy}
95275970Scy
96275970Scyvoid
97275970Scyevbuffer_commit_read_(struct evbuffer *evbuf, ev_ssize_t nBytes)
98275970Scy{
99275970Scy	struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
100275970Scy	struct evbuffer_chain **chainp;
101275970Scy	size_t remaining, len;
102275970Scy	unsigned i;
103275970Scy
104275970Scy	EVBUFFER_LOCK(evbuf);
105275970Scy	EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
106275970Scy	EVUTIL_ASSERT(nBytes >= 0); /* XXXX Can this be false? */
107275970Scy
108275970Scy	evbuffer_unfreeze(evbuf, 0);
109275970Scy
110275970Scy	chainp = evbuf->last_with_datap;
111275970Scy	if (!((*chainp)->flags & EVBUFFER_MEM_PINNED_R))
112275970Scy		chainp = &(*chainp)->next;
113275970Scy	remaining = nBytes;
114275970Scy	for (i = 0; remaining > 0 && i < (unsigned)buf->n_buffers; ++i) {
115275970Scy		EVUTIL_ASSERT(*chainp);
116275970Scy		len = buf->buffers[i].len;
117275970Scy		if (remaining < len)
118275970Scy			len = remaining;
119275970Scy		(*chainp)->off += len;
120275970Scy		evbuf->last_with_datap = chainp;
121275970Scy		remaining -= len;
122275970Scy		chainp = &(*chainp)->next;
123275970Scy	}
124275970Scy
125275970Scy	pin_release(buf, EVBUFFER_MEM_PINNED_R);
126275970Scy
127275970Scy	buf->read_in_progress = 0;
128275970Scy
129275970Scy	evbuf->total_len += nBytes;
130275970Scy	evbuf->n_add_for_cb += nBytes;
131275970Scy
132275970Scy	evbuffer_invoke_callbacks_(evbuf);
133275970Scy
134275970Scy	evbuffer_decref_and_unlock_(evbuf);
135275970Scy}
136275970Scy
137275970Scyvoid
138275970Scyevbuffer_commit_write_(struct evbuffer *evbuf, ev_ssize_t nBytes)
139275970Scy{
140275970Scy	struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
141275970Scy
142275970Scy	EVBUFFER_LOCK(evbuf);
143275970Scy	EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress);
144275970Scy	evbuffer_unfreeze(evbuf, 1);
145275970Scy	evbuffer_drain(evbuf, nBytes);
146275970Scy	pin_release(buf,EVBUFFER_MEM_PINNED_W);
147275970Scy	buf->write_in_progress = 0;
148275970Scy	evbuffer_decref_and_unlock_(evbuf);
149275970Scy}
150275970Scy
151275970Scystruct evbuffer *
152275970Scyevbuffer_overlapped_new_(evutil_socket_t fd)
153275970Scy{
154275970Scy	struct evbuffer_overlapped *evo;
155275970Scy
156275970Scy	evo = mm_calloc(1, sizeof(struct evbuffer_overlapped));
157275970Scy	if (!evo)
158275970Scy		return NULL;
159275970Scy
160275970Scy	LIST_INIT(&evo->buffer.callbacks);
161275970Scy	evo->buffer.refcnt = 1;
162275970Scy	evo->buffer.last_with_datap = &evo->buffer.first;
163275970Scy
164275970Scy	evo->buffer.is_overlapped = 1;
165275970Scy	evo->fd = fd;
166275970Scy
167275970Scy	return &evo->buffer;
168275970Scy}
169275970Scy
170275970Scyint
171275970Scyevbuffer_launch_write_(struct evbuffer *buf, ev_ssize_t at_most,
172275970Scy		struct event_overlapped *ol)
173275970Scy{
174275970Scy	struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
175275970Scy	int r = -1;
176275970Scy	int i;
177275970Scy	struct evbuffer_chain *chain;
178275970Scy	DWORD bytesSent;
179275970Scy
180275970Scy	if (!buf) {
181275970Scy		/* No buffer, or it isn't overlapped */
182275970Scy		return -1;
183275970Scy	}
184275970Scy
185275970Scy	EVBUFFER_LOCK(buf);
186275970Scy	EVUTIL_ASSERT(!buf_o->read_in_progress);
187275970Scy	if (buf->freeze_start || buf_o->write_in_progress)
188275970Scy		goto done;
189275970Scy	if (!buf->total_len) {
190275970Scy		/* Nothing to write */
191275970Scy		r = 0;
192275970Scy		goto done;
193275970Scy	} else if (at_most < 0 || (size_t)at_most > buf->total_len) {
194275970Scy		at_most = buf->total_len;
195275970Scy	}
196275970Scy	evbuffer_freeze(buf, 1);
197275970Scy
198275970Scy	buf_o->first_pinned = NULL;
199275970Scy	buf_o->n_buffers = 0;
200275970Scy	memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
201275970Scy
202275970Scy	chain = buf_o->first_pinned = buf->first;
203275970Scy
204275970Scy	for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
205275970Scy		WSABUF *b = &buf_o->buffers[i];
206275970Scy		b->buf = (char*)( chain->buffer + chain->misalign );
207275970Scy		evbuffer_chain_pin_(chain, EVBUFFER_MEM_PINNED_W);
208275970Scy
209275970Scy		if ((size_t)at_most > chain->off) {
210275970Scy			/* XXXX Cast is safe for now, since win32 has no
211275970Scy			   mmaped chains.  But later, we need to have this
212275970Scy			   add more WSAbufs if chain->off is greater than
213275970Scy			   ULONG_MAX */
214275970Scy			b->len = (unsigned long)chain->off;
215275970Scy			at_most -= chain->off;
216275970Scy		} else {
217275970Scy			b->len = (unsigned long)at_most;
218275970Scy			++i;
219275970Scy			break;
220275970Scy		}
221275970Scy	}
222275970Scy
223275970Scy	buf_o->n_buffers = i;
224275970Scy	evbuffer_incref_(buf);
225275970Scy	if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
226275970Scy		&ol->overlapped, NULL)) {
227275970Scy		int error = WSAGetLastError();
228275970Scy		if (error != WSA_IO_PENDING) {
229275970Scy			/* An actual error. */
230275970Scy			pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
231275970Scy			evbuffer_unfreeze(buf, 1);
232275970Scy			evbuffer_free(buf); /* decref */
233275970Scy			goto done;
234275970Scy		}
235275970Scy	}
236275970Scy
237275970Scy	buf_o->write_in_progress = 1;
238275970Scy	r = 0;
239275970Scydone:
240275970Scy	EVBUFFER_UNLOCK(buf);
241275970Scy	return r;
242275970Scy}
243275970Scy
244275970Scyint
245275970Scyevbuffer_launch_read_(struct evbuffer *buf, size_t at_most,
246275970Scy		struct event_overlapped *ol)
247275970Scy{
248275970Scy	struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
249275970Scy	int r = -1, i;
250275970Scy	int nvecs;
251275970Scy	int npin=0;
252275970Scy	struct evbuffer_chain *chain=NULL, **chainp;
253275970Scy	DWORD bytesRead;
254275970Scy	DWORD flags = 0;
255275970Scy	struct evbuffer_iovec vecs[MAX_WSABUFS];
256275970Scy
257275970Scy	if (!buf_o)
258275970Scy		return -1;
259275970Scy	EVBUFFER_LOCK(buf);
260275970Scy	EVUTIL_ASSERT(!buf_o->write_in_progress);
261275970Scy	if (buf->freeze_end || buf_o->read_in_progress)
262275970Scy		goto done;
263275970Scy
264275970Scy	buf_o->first_pinned = NULL;
265275970Scy	buf_o->n_buffers = 0;
266275970Scy	memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
267275970Scy
268275970Scy	if (evbuffer_expand_fast_(buf, at_most, MAX_WSABUFS) == -1)
269275970Scy		goto done;
270275970Scy	evbuffer_freeze(buf, 0);
271275970Scy
272275970Scy	nvecs = evbuffer_read_setup_vecs_(buf, at_most,
273275970Scy	    vecs, MAX_WSABUFS, &chainp, 1);
274275970Scy	for (i=0;i<nvecs;++i) {
275275970Scy		WSABUF_FROM_EVBUFFER_IOV(
276275970Scy			&buf_o->buffers[i],
277275970Scy			&vecs[i]);
278275970Scy	}
279275970Scy
280275970Scy	buf_o->n_buffers = nvecs;
281275970Scy	buf_o->first_pinned = chain = *chainp;
282275970Scy
283275970Scy	npin=0;
284275970Scy	for ( ; chain; chain = chain->next) {
285275970Scy		evbuffer_chain_pin_(chain, EVBUFFER_MEM_PINNED_R);
286275970Scy		++npin;
287275970Scy	}
288275970Scy	EVUTIL_ASSERT(npin == nvecs);
289275970Scy
290275970Scy	evbuffer_incref_(buf);
291275970Scy	if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags,
292275970Scy		    &ol->overlapped, NULL)) {
293275970Scy		int error = WSAGetLastError();
294275970Scy		if (error != WSA_IO_PENDING) {
295275970Scy			/* An actual error. */
296275970Scy			pin_release(buf_o, EVBUFFER_MEM_PINNED_R);
297275970Scy			evbuffer_unfreeze(buf, 0);
298275970Scy			evbuffer_free(buf); /* decref */
299275970Scy			goto done;
300275970Scy		}
301275970Scy	}
302275970Scy
303275970Scy	buf_o->read_in_progress = 1;
304275970Scy	r = 0;
305275970Scydone:
306275970Scy	EVBUFFER_UNLOCK(buf);
307275970Scy	return r;
308275970Scy}
309275970Scy
310275970Scyevutil_socket_t
311275970Scyevbuffer_overlapped_get_fd_(struct evbuffer *buf)
312275970Scy{
313275970Scy	struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
314275970Scy	return buf_o ? buf_o->fd : -1;
315275970Scy}
316275970Scy
317275970Scyvoid
318275970Scyevbuffer_overlapped_set_fd_(struct evbuffer *buf, evutil_socket_t fd)
319275970Scy{
320275970Scy	struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
321275970Scy	EVBUFFER_LOCK(buf);
322275970Scy	/* XXX is this right?, should it cancel current I/O operations? */
323275970Scy	if (buf_o)
324275970Scy		buf_o->fd = fd;
325275970Scy	EVBUFFER_UNLOCK(buf);
326275970Scy}
327