1275970Scy/* 2275970Scy * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 3275970Scy * 4275970Scy * Redistribution and use in source and binary forms, with or without 5275970Scy * modification, are permitted provided that the following conditions 6275970Scy * are met: 7275970Scy * 1. Redistributions of source code must retain the above copyright 8275970Scy * notice, this list of conditions and the following disclaimer. 9275970Scy * 2. Redistributions in binary form must reproduce the above copyright 10275970Scy * notice, this list of conditions and the following disclaimer in the 11275970Scy * documentation and/or other materials provided with the distribution. 12275970Scy * 3. The name of the author may not be used to endorse or promote products 13275970Scy * derived from this software without specific prior written permission. 14275970Scy * 15275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25275970Scy */ 26275970Scy#include "evconfig-private.h" 27275970Scy 28275970Scy#ifndef _WIN32_WINNT 29275970Scy/* Minimum required for InitializeCriticalSectionAndSpinCount */ 30275970Scy#define _WIN32_WINNT 0x0403 31275970Scy#endif 32275970Scy#include <winsock2.h> 33275970Scy#include <windows.h> 34275970Scy#include <process.h> 35275970Scy#include <stdio.h> 36275970Scy#include <mswsock.h> 37275970Scy 38275970Scy#include "event2/util.h" 39275970Scy#include "util-internal.h" 40275970Scy#include "iocp-internal.h" 41275970Scy#include "log-internal.h" 42275970Scy#include "mm-internal.h" 43275970Scy#include "event-internal.h" 44275970Scy#include "evthread-internal.h" 45275970Scy 46275970Scy#define NOTIFICATION_KEY ((ULONG_PTR)-1) 47275970Scy 48275970Scyvoid 49275970Scyevent_overlapped_init_(struct event_overlapped *o, iocp_callback cb) 50275970Scy{ 51275970Scy memset(o, 0, sizeof(struct event_overlapped)); 52275970Scy o->cb = cb; 53275970Scy} 54275970Scy 55275970Scystatic void 56275970Scyhandle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok) 57275970Scy{ 58275970Scy struct event_overlapped *eo = 59275970Scy EVUTIL_UPCAST(o, struct event_overlapped, overlapped); 60275970Scy eo->cb(eo, completion_key, nBytes, ok); 61275970Scy} 62275970Scy 63275970Scystatic void 64275970Scyloop(void *port_) 65275970Scy{ 66275970Scy struct event_iocp_port *port = port_; 67275970Scy long ms = port->ms; 68275970Scy HANDLE p = port->port; 69275970Scy 70275970Scy if (ms <= 0) 71275970Scy ms = INFINITE; 72275970Scy 73275970Scy while (1) { 74275970Scy OVERLAPPED *overlapped=NULL; 75275970Scy ULONG_PTR key=0; 76275970Scy DWORD bytes=0; 77275970Scy int ok = GetQueuedCompletionStatus(p, &bytes, &key, 78275970Scy &overlapped, ms); 79275970Scy EnterCriticalSection(&port->lock); 80275970Scy if (port->shutdown) { 81275970Scy if (--port->n_live_threads == 0) 82275970Scy ReleaseSemaphore(port->shutdownSemaphore, 1, 83275970Scy NULL); 84275970Scy LeaveCriticalSection(&port->lock); 85275970Scy return; 86275970Scy } 87275970Scy LeaveCriticalSection(&port->lock); 88275970Scy 89275970Scy if (key != NOTIFICATION_KEY && overlapped) 90275970Scy handle_entry(overlapped, key, bytes, ok); 91275970Scy else if (!overlapped) 92275970Scy break; 93275970Scy } 94275970Scy event_warnx("GetQueuedCompletionStatus exited with no event."); 95275970Scy EnterCriticalSection(&port->lock); 96275970Scy if (--port->n_live_threads == 0) 97275970Scy ReleaseSemaphore(port->shutdownSemaphore, 1, NULL); 98275970Scy LeaveCriticalSection(&port->lock); 99275970Scy} 100275970Scy 101275970Scyint 102275970Scyevent_iocp_port_associate_(struct event_iocp_port *port, evutil_socket_t fd, 103275970Scy ev_uintptr_t key) 104275970Scy{ 105275970Scy HANDLE h; 106275970Scy h = CreateIoCompletionPort((HANDLE)fd, port->port, key, port->n_threads); 107275970Scy if (!h) 108275970Scy return -1; 109275970Scy return 0; 110275970Scy} 111275970Scy 112275970Scystatic void * 113275970Scyget_extension_function(SOCKET s, const GUID *which_fn) 114275970Scy{ 115275970Scy void *ptr = NULL; 116275970Scy DWORD bytes=0; 117275970Scy WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, 118275970Scy (GUID*)which_fn, sizeof(*which_fn), 119275970Scy &ptr, sizeof(ptr), 120275970Scy &bytes, NULL, NULL); 121275970Scy 122275970Scy /* No need to detect errors here: if ptr is set, then we have a good 123275970Scy function pointer. Otherwise, we should behave as if we had no 124275970Scy function pointer. 125275970Scy */ 126275970Scy return ptr; 127275970Scy} 128275970Scy 129275970Scy/* Mingw doesn't have these in its mswsock.h. The values are copied from 130275970Scy wine.h. Perhaps if we copy them exactly, the cargo will come again. 131275970Scy*/ 132275970Scy#ifndef WSAID_ACCEPTEX 133275970Scy#define WSAID_ACCEPTEX \ 134275970Scy {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} 135275970Scy#endif 136275970Scy#ifndef WSAID_CONNECTEX 137275970Scy#define WSAID_CONNECTEX \ 138275970Scy {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}} 139275970Scy#endif 140275970Scy#ifndef WSAID_GETACCEPTEXSOCKADDRS 141275970Scy#define WSAID_GETACCEPTEXSOCKADDRS \ 142275970Scy {0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} 143275970Scy#endif 144275970Scy 145275970Scystatic int extension_fns_initialized = 0; 146275970Scy 147275970Scystatic void 148275970Scyinit_extension_functions(struct win32_extension_fns *ext) 149275970Scy{ 150275970Scy const GUID acceptex = WSAID_ACCEPTEX; 151275970Scy const GUID connectex = WSAID_CONNECTEX; 152275970Scy const GUID getacceptexsockaddrs = WSAID_GETACCEPTEXSOCKADDRS; 153275970Scy SOCKET s = socket(AF_INET, SOCK_STREAM, 0); 154275970Scy if (s == INVALID_SOCKET) 155275970Scy return; 156275970Scy ext->AcceptEx = get_extension_function(s, &acceptex); 157275970Scy ext->ConnectEx = get_extension_function(s, &connectex); 158275970Scy ext->GetAcceptExSockaddrs = get_extension_function(s, 159275970Scy &getacceptexsockaddrs); 160275970Scy closesocket(s); 161275970Scy 162275970Scy extension_fns_initialized = 1; 163275970Scy} 164275970Scy 165275970Scystatic struct win32_extension_fns the_extension_fns; 166275970Scy 167275970Scyconst struct win32_extension_fns * 168275970Scyevent_get_win32_extension_fns_(void) 169275970Scy{ 170275970Scy return &the_extension_fns; 171275970Scy} 172275970Scy 173275970Scy#define N_CPUS_DEFAULT 2 174275970Scy 175275970Scystruct event_iocp_port * 176275970Scyevent_iocp_port_launch_(int n_cpus) 177275970Scy{ 178275970Scy struct event_iocp_port *port; 179275970Scy int i; 180275970Scy 181275970Scy if (!extension_fns_initialized) 182275970Scy init_extension_functions(&the_extension_fns); 183275970Scy 184275970Scy if (!(port = mm_calloc(1, sizeof(struct event_iocp_port)))) 185275970Scy return NULL; 186275970Scy 187275970Scy if (n_cpus <= 0) 188275970Scy n_cpus = N_CPUS_DEFAULT; 189275970Scy port->n_threads = n_cpus * 2; 190275970Scy port->threads = mm_calloc(port->n_threads, sizeof(HANDLE)); 191275970Scy if (!port->threads) 192275970Scy goto err; 193275970Scy 194275970Scy port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 195275970Scy n_cpus); 196275970Scy port->ms = -1; 197275970Scy if (!port->port) 198275970Scy goto err; 199275970Scy 200275970Scy port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL); 201275970Scy if (!port->shutdownSemaphore) 202275970Scy goto err; 203275970Scy 204275970Scy for (i=0; i<port->n_threads; ++i) { 205275970Scy ev_uintptr_t th = _beginthread(loop, 0, port); 206275970Scy if (th == (ev_uintptr_t)-1) 207275970Scy goto err; 208275970Scy port->threads[i] = (HANDLE)th; 209275970Scy ++port->n_live_threads; 210275970Scy } 211275970Scy 212275970Scy InitializeCriticalSectionAndSpinCount(&port->lock, 1000); 213275970Scy 214275970Scy return port; 215275970Scyerr: 216275970Scy if (port->port) 217275970Scy CloseHandle(port->port); 218275970Scy if (port->threads) 219275970Scy mm_free(port->threads); 220275970Scy if (port->shutdownSemaphore) 221275970Scy CloseHandle(port->shutdownSemaphore); 222275970Scy mm_free(port); 223275970Scy return NULL; 224275970Scy} 225275970Scy 226275970Scystatic void 227275970Scyevent_iocp_port_unlock_and_free_(struct event_iocp_port *port) 228275970Scy{ 229275970Scy DeleteCriticalSection(&port->lock); 230275970Scy CloseHandle(port->port); 231275970Scy CloseHandle(port->shutdownSemaphore); 232275970Scy mm_free(port->threads); 233275970Scy mm_free(port); 234275970Scy} 235275970Scy 236275970Scystatic int 237275970Scyevent_iocp_notify_all(struct event_iocp_port *port) 238275970Scy{ 239275970Scy int i, r, ok=1; 240275970Scy for (i=0; i<port->n_threads; ++i) { 241275970Scy r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY, 242275970Scy NULL); 243275970Scy if (!r) 244275970Scy ok = 0; 245275970Scy } 246275970Scy return ok ? 0 : -1; 247275970Scy} 248275970Scy 249275970Scyint 250275970Scyevent_iocp_shutdown_(struct event_iocp_port *port, long waitMsec) 251275970Scy{ 252275970Scy DWORD ms = INFINITE; 253275970Scy int n; 254275970Scy 255275970Scy EnterCriticalSection(&port->lock); 256275970Scy port->shutdown = 1; 257275970Scy LeaveCriticalSection(&port->lock); 258275970Scy event_iocp_notify_all(port); 259275970Scy 260275970Scy if (waitMsec >= 0) 261275970Scy ms = waitMsec; 262275970Scy 263275970Scy WaitForSingleObject(port->shutdownSemaphore, ms); 264275970Scy EnterCriticalSection(&port->lock); 265275970Scy n = port->n_live_threads; 266275970Scy LeaveCriticalSection(&port->lock); 267275970Scy if (n == 0) { 268275970Scy event_iocp_port_unlock_and_free_(port); 269275970Scy return 0; 270275970Scy } else { 271275970Scy return -1; 272275970Scy } 273275970Scy} 274275970Scy 275275970Scyint 276275970Scyevent_iocp_activate_overlapped_( 277275970Scy struct event_iocp_port *port, struct event_overlapped *o, 278275970Scy ev_uintptr_t key, ev_uint32_t n) 279275970Scy{ 280275970Scy BOOL r; 281275970Scy 282275970Scy r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped); 283275970Scy return (r==0) ? -1 : 0; 284275970Scy} 285275970Scy 286275970Scystruct event_iocp_port * 287275970Scyevent_base_get_iocp_(struct event_base *base) 288275970Scy{ 289275970Scy#ifdef _WIN32 290275970Scy return base->iocp; 291275970Scy#else 292275970Scy return NULL; 293275970Scy#endif 294275970Scy} 295