1275970Scy/* 2275970Scy * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3275970Scy * 4275970Scy * All rights reserved. 5275970Scy * 6275970Scy * Redistribution and use in source and binary forms, with or without 7275970Scy * modification, are permitted provided that the following conditions 8275970Scy * are met: 9275970Scy * 1. Redistributions of source code must retain the above copyright 10275970Scy * notice, this list of conditions and the following disclaimer. 11275970Scy * 2. Redistributions in binary form must reproduce the above copyright 12275970Scy * notice, this list of conditions and the following disclaimer in the 13275970Scy * documentation and/or other materials provided with the distribution. 14275970Scy * 3. The name of the author may not be used to endorse or promote products 15275970Scy * derived from this software without specific prior written permission. 16275970Scy * 17275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27275970Scy */ 28275970Scy 29275970Scy#include "event2/event-config.h" 30275970Scy#include "evconfig-private.h" 31275970Scy 32275970Scy#ifdef EVENT__HAVE_SYS_TIME_H 33275970Scy#include <sys/time.h> 34275970Scy#endif 35275970Scy 36275970Scy#include <errno.h> 37275970Scy#include <stdio.h> 38275970Scy#include <stdlib.h> 39275970Scy#include <string.h> 40275970Scy#ifdef EVENT__HAVE_STDARG_H 41275970Scy#include <stdarg.h> 42275970Scy#endif 43275970Scy#ifdef EVENT__HAVE_UNISTD_H 44275970Scy#include <unistd.h> 45275970Scy#endif 46275970Scy 47275970Scy#ifdef _WIN32 48275970Scy#include <winsock2.h> 49275970Scy#include <ws2tcpip.h> 50275970Scy#endif 51275970Scy 52275970Scy#include <sys/queue.h> 53275970Scy 54275970Scy#include "event2/util.h" 55275970Scy#include "event2/bufferevent.h" 56275970Scy#include "event2/buffer.h" 57275970Scy#include "event2/bufferevent_struct.h" 58275970Scy#include "event2/event.h" 59275970Scy#include "event2/util.h" 60275970Scy#include "event-internal.h" 61275970Scy#include "log-internal.h" 62275970Scy#include "mm-internal.h" 63275970Scy#include "bufferevent-internal.h" 64275970Scy#include "util-internal.h" 65275970Scy#include "iocp-internal.h" 66275970Scy 67275970Scy#ifndef SO_UPDATE_CONNECT_CONTEXT 68275970Scy/* Mingw is sometimes missing this */ 69275970Scy#define SO_UPDATE_CONNECT_CONTEXT 0x7010 70275970Scy#endif 71275970Scy 72275970Scy/* prototypes */ 73275970Scystatic int be_async_enable(struct bufferevent *, short); 74275970Scystatic int be_async_disable(struct bufferevent *, short); 75275970Scystatic void be_async_destruct(struct bufferevent *); 76275970Scystatic int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 77275970Scystatic int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 78275970Scy 79275970Scystruct bufferevent_async { 80275970Scy struct bufferevent_private bev; 81275970Scy struct event_overlapped connect_overlapped; 82275970Scy struct event_overlapped read_overlapped; 83275970Scy struct event_overlapped write_overlapped; 84275970Scy size_t read_in_progress; 85275970Scy size_t write_in_progress; 86275970Scy unsigned ok : 1; 87275970Scy unsigned read_added : 1; 88275970Scy unsigned write_added : 1; 89275970Scy}; 90275970Scy 91275970Scyconst struct bufferevent_ops bufferevent_ops_async = { 92275970Scy "socket_async", 93275970Scy evutil_offsetof(struct bufferevent_async, bev.bev), 94275970Scy be_async_enable, 95275970Scy be_async_disable, 96275970Scy NULL, /* Unlink */ 97275970Scy be_async_destruct, 98275970Scy bufferevent_generic_adj_timeouts_, 99275970Scy be_async_flush, 100275970Scy be_async_ctrl, 101275970Scy}; 102275970Scy 103275970Scystatic inline struct bufferevent_async * 104275970Scyupcast(struct bufferevent *bev) 105275970Scy{ 106275970Scy struct bufferevent_async *bev_a; 107275970Scy if (bev->be_ops != &bufferevent_ops_async) 108275970Scy return NULL; 109275970Scy bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 110275970Scy return bev_a; 111275970Scy} 112275970Scy 113275970Scystatic inline struct bufferevent_async * 114275970Scyupcast_connect(struct event_overlapped *eo) 115275970Scy{ 116275970Scy struct bufferevent_async *bev_a; 117275970Scy bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 118275970Scy EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 119275970Scy return bev_a; 120275970Scy} 121275970Scy 122275970Scystatic inline struct bufferevent_async * 123275970Scyupcast_read(struct event_overlapped *eo) 124275970Scy{ 125275970Scy struct bufferevent_async *bev_a; 126275970Scy bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 127275970Scy EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 128275970Scy return bev_a; 129275970Scy} 130275970Scy 131275970Scystatic inline struct bufferevent_async * 132275970Scyupcast_write(struct event_overlapped *eo) 133275970Scy{ 134275970Scy struct bufferevent_async *bev_a; 135275970Scy bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 136275970Scy EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 137275970Scy return bev_a; 138275970Scy} 139275970Scy 140275970Scystatic void 141275970Scybev_async_del_write(struct bufferevent_async *beva) 142275970Scy{ 143275970Scy struct bufferevent *bev = &beva->bev.bev; 144275970Scy 145275970Scy if (beva->write_added) { 146275970Scy beva->write_added = 0; 147275970Scy event_base_del_virtual_(bev->ev_base); 148275970Scy } 149275970Scy} 150275970Scy 151275970Scystatic void 152275970Scybev_async_del_read(struct bufferevent_async *beva) 153275970Scy{ 154275970Scy struct bufferevent *bev = &beva->bev.bev; 155275970Scy 156275970Scy if (beva->read_added) { 157275970Scy beva->read_added = 0; 158275970Scy event_base_del_virtual_(bev->ev_base); 159275970Scy } 160275970Scy} 161275970Scy 162275970Scystatic void 163275970Scybev_async_add_write(struct bufferevent_async *beva) 164275970Scy{ 165275970Scy struct bufferevent *bev = &beva->bev.bev; 166275970Scy 167275970Scy if (!beva->write_added) { 168275970Scy beva->write_added = 1; 169275970Scy event_base_add_virtual_(bev->ev_base); 170275970Scy } 171275970Scy} 172275970Scy 173275970Scystatic void 174275970Scybev_async_add_read(struct bufferevent_async *beva) 175275970Scy{ 176275970Scy struct bufferevent *bev = &beva->bev.bev; 177275970Scy 178275970Scy if (!beva->read_added) { 179275970Scy beva->read_added = 1; 180275970Scy event_base_add_virtual_(bev->ev_base); 181275970Scy } 182275970Scy} 183275970Scy 184275970Scystatic void 185275970Scybev_async_consider_writing(struct bufferevent_async *beva) 186275970Scy{ 187275970Scy size_t at_most; 188275970Scy int limit; 189275970Scy struct bufferevent *bev = &beva->bev.bev; 190275970Scy 191275970Scy /* Don't write if there's a write in progress, or we do not 192275970Scy * want to write, or when there's nothing left to write. */ 193275970Scy if (beva->write_in_progress || beva->bev.connecting) 194275970Scy return; 195275970Scy if (!beva->ok || !(bev->enabled&EV_WRITE) || 196275970Scy !evbuffer_get_length(bev->output)) { 197275970Scy bev_async_del_write(beva); 198275970Scy return; 199275970Scy } 200275970Scy 201275970Scy at_most = evbuffer_get_length(bev->output); 202275970Scy 203275970Scy /* This is safe so long as bufferevent_get_write_max never returns 204275970Scy * more than INT_MAX. That's true for now. XXXX */ 205275970Scy limit = (int)bufferevent_get_write_max_(&beva->bev); 206275970Scy if (at_most >= (size_t)limit && limit >= 0) 207275970Scy at_most = limit; 208275970Scy 209275970Scy if (beva->bev.write_suspended) { 210275970Scy bev_async_del_write(beva); 211275970Scy return; 212275970Scy } 213275970Scy 214275970Scy /* XXXX doesn't respect low-water mark very well. */ 215275970Scy bufferevent_incref_(bev); 216275970Scy if (evbuffer_launch_write_(bev->output, at_most, 217275970Scy &beva->write_overlapped)) { 218275970Scy bufferevent_decref_(bev); 219275970Scy beva->ok = 0; 220275970Scy bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 221275970Scy } else { 222275970Scy beva->write_in_progress = at_most; 223275970Scy bufferevent_decrement_write_buckets_(&beva->bev, at_most); 224275970Scy bev_async_add_write(beva); 225275970Scy } 226275970Scy} 227275970Scy 228275970Scystatic void 229275970Scybev_async_consider_reading(struct bufferevent_async *beva) 230275970Scy{ 231275970Scy size_t cur_size; 232275970Scy size_t read_high; 233275970Scy size_t at_most; 234275970Scy int limit; 235275970Scy struct bufferevent *bev = &beva->bev.bev; 236275970Scy 237275970Scy /* Don't read if there is a read in progress, or we do not 238275970Scy * want to read. */ 239275970Scy if (beva->read_in_progress || beva->bev.connecting) 240275970Scy return; 241275970Scy if (!beva->ok || !(bev->enabled&EV_READ)) { 242275970Scy bev_async_del_read(beva); 243275970Scy return; 244275970Scy } 245275970Scy 246275970Scy /* Don't read if we're full */ 247275970Scy cur_size = evbuffer_get_length(bev->input); 248275970Scy read_high = bev->wm_read.high; 249275970Scy if (read_high) { 250275970Scy if (cur_size >= read_high) { 251275970Scy bev_async_del_read(beva); 252275970Scy return; 253275970Scy } 254275970Scy at_most = read_high - cur_size; 255275970Scy } else { 256275970Scy at_most = 16384; /* FIXME totally magic. */ 257275970Scy } 258275970Scy 259275970Scy /* XXXX This over-commits. */ 260275970Scy /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 261275970Scy limit = (int)bufferevent_get_read_max_(&beva->bev); 262275970Scy if (at_most >= (size_t)limit && limit >= 0) 263275970Scy at_most = limit; 264275970Scy 265275970Scy if (beva->bev.read_suspended) { 266275970Scy bev_async_del_read(beva); 267275970Scy return; 268275970Scy } 269275970Scy 270275970Scy bufferevent_incref_(bev); 271275970Scy if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 272275970Scy beva->ok = 0; 273275970Scy bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 274275970Scy bufferevent_decref_(bev); 275275970Scy } else { 276275970Scy beva->read_in_progress = at_most; 277275970Scy bufferevent_decrement_read_buckets_(&beva->bev, at_most); 278275970Scy bev_async_add_read(beva); 279275970Scy } 280275970Scy 281275970Scy return; 282275970Scy} 283275970Scy 284275970Scystatic void 285275970Scybe_async_outbuf_callback(struct evbuffer *buf, 286275970Scy const struct evbuffer_cb_info *cbinfo, 287275970Scy void *arg) 288275970Scy{ 289275970Scy struct bufferevent *bev = arg; 290275970Scy struct bufferevent_async *bev_async = upcast(bev); 291275970Scy 292275970Scy /* If we added data to the outbuf and were not writing before, 293275970Scy * we may want to write now. */ 294275970Scy 295275970Scy bufferevent_incref_and_lock_(bev); 296275970Scy 297275970Scy if (cbinfo->n_added) 298275970Scy bev_async_consider_writing(bev_async); 299275970Scy 300275970Scy bufferevent_decref_and_unlock_(bev); 301275970Scy} 302275970Scy 303275970Scystatic void 304275970Scybe_async_inbuf_callback(struct evbuffer *buf, 305275970Scy const struct evbuffer_cb_info *cbinfo, 306275970Scy void *arg) 307275970Scy{ 308275970Scy struct bufferevent *bev = arg; 309275970Scy struct bufferevent_async *bev_async = upcast(bev); 310275970Scy 311275970Scy /* If we drained data from the inbuf and were not reading before, 312275970Scy * we may want to read now */ 313275970Scy 314275970Scy bufferevent_incref_and_lock_(bev); 315275970Scy 316275970Scy if (cbinfo->n_deleted) 317275970Scy bev_async_consider_reading(bev_async); 318275970Scy 319275970Scy bufferevent_decref_and_unlock_(bev); 320275970Scy} 321275970Scy 322275970Scystatic int 323275970Scybe_async_enable(struct bufferevent *buf, short what) 324275970Scy{ 325275970Scy struct bufferevent_async *bev_async = upcast(buf); 326275970Scy 327275970Scy if (!bev_async->ok) 328275970Scy return -1; 329275970Scy 330275970Scy if (bev_async->bev.connecting) { 331275970Scy /* Don't launch anything during connection attempts. */ 332275970Scy return 0; 333275970Scy } 334275970Scy 335275970Scy if (what & EV_READ) 336275970Scy BEV_RESET_GENERIC_READ_TIMEOUT(buf); 337275970Scy if (what & EV_WRITE) 338275970Scy BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 339275970Scy 340275970Scy /* If we newly enable reading or writing, and we aren't reading or 341275970Scy writing already, consider launching a new read or write. */ 342275970Scy 343275970Scy if (what & EV_READ) 344275970Scy bev_async_consider_reading(bev_async); 345275970Scy if (what & EV_WRITE) 346275970Scy bev_async_consider_writing(bev_async); 347275970Scy return 0; 348275970Scy} 349275970Scy 350275970Scystatic int 351275970Scybe_async_disable(struct bufferevent *bev, short what) 352275970Scy{ 353275970Scy struct bufferevent_async *bev_async = upcast(bev); 354275970Scy /* XXXX If we disable reading or writing, we may want to consider 355275970Scy * canceling any in-progress read or write operation, though it might 356275970Scy * not work. */ 357275970Scy 358275970Scy if (what & EV_READ) { 359275970Scy BEV_DEL_GENERIC_READ_TIMEOUT(bev); 360275970Scy bev_async_del_read(bev_async); 361275970Scy } 362275970Scy if (what & EV_WRITE) { 363275970Scy BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 364275970Scy bev_async_del_write(bev_async); 365275970Scy } 366275970Scy 367275970Scy return 0; 368275970Scy} 369275970Scy 370275970Scystatic void 371275970Scybe_async_destruct(struct bufferevent *bev) 372275970Scy{ 373275970Scy struct bufferevent_async *bev_async = upcast(bev); 374275970Scy struct bufferevent_private *bev_p = BEV_UPCAST(bev); 375275970Scy evutil_socket_t fd; 376275970Scy 377275970Scy EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 378275970Scy !upcast(bev)->read_in_progress); 379275970Scy 380275970Scy bev_async_del_read(bev_async); 381275970Scy bev_async_del_write(bev_async); 382275970Scy 383275970Scy fd = evbuffer_overlapped_get_fd_(bev->input); 384282408Scy if (fd != (evutil_socket_t)INVALID_SOCKET && 385282408Scy (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 386275970Scy evutil_closesocket(fd); 387282408Scy evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 388275970Scy } 389275970Scy} 390275970Scy 391275970Scy/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 392275970Scy * we use WSAGetOverlappedResult to translate. */ 393275970Scystatic void 394275970Scybev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 395275970Scy{ 396275970Scy DWORD bytes, flags; 397275970Scy evutil_socket_t fd; 398275970Scy 399275970Scy fd = evbuffer_overlapped_get_fd_(bev->input); 400275970Scy WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 401275970Scy} 402275970Scy 403275970Scystatic int 404275970Scybe_async_flush(struct bufferevent *bev, short what, 405275970Scy enum bufferevent_flush_mode mode) 406275970Scy{ 407275970Scy return 0; 408275970Scy} 409275970Scy 410275970Scystatic void 411275970Scyconnect_complete(struct event_overlapped *eo, ev_uintptr_t key, 412275970Scy ev_ssize_t nbytes, int ok) 413275970Scy{ 414275970Scy struct bufferevent_async *bev_a = upcast_connect(eo); 415275970Scy struct bufferevent *bev = &bev_a->bev.bev; 416275970Scy evutil_socket_t sock; 417275970Scy 418275970Scy BEV_LOCK(bev); 419275970Scy 420275970Scy EVUTIL_ASSERT(bev_a->bev.connecting); 421275970Scy bev_a->bev.connecting = 0; 422275970Scy sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 423275970Scy /* XXXX Handle error? */ 424275970Scy setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 425275970Scy 426275970Scy if (ok) 427275970Scy bufferevent_async_set_connected_(bev); 428275970Scy else 429275970Scy bev_async_set_wsa_error(bev, eo); 430275970Scy 431275970Scy bufferevent_run_eventcb_(bev, 432275970Scy ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 433275970Scy 434275970Scy event_base_del_virtual_(bev->ev_base); 435275970Scy 436275970Scy bufferevent_decref_and_unlock_(bev); 437275970Scy} 438275970Scy 439275970Scystatic void 440275970Scyread_complete(struct event_overlapped *eo, ev_uintptr_t key, 441275970Scy ev_ssize_t nbytes, int ok) 442275970Scy{ 443275970Scy struct bufferevent_async *bev_a = upcast_read(eo); 444275970Scy struct bufferevent *bev = &bev_a->bev.bev; 445275970Scy short what = BEV_EVENT_READING; 446275970Scy ev_ssize_t amount_unread; 447275970Scy BEV_LOCK(bev); 448275970Scy EVUTIL_ASSERT(bev_a->read_in_progress); 449275970Scy 450275970Scy amount_unread = bev_a->read_in_progress - nbytes; 451275970Scy evbuffer_commit_read_(bev->input, nbytes); 452275970Scy bev_a->read_in_progress = 0; 453275970Scy if (amount_unread) 454275970Scy bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 455275970Scy 456275970Scy if (!ok) 457275970Scy bev_async_set_wsa_error(bev, eo); 458275970Scy 459275970Scy if (bev_a->ok) { 460275970Scy if (ok && nbytes) { 461275970Scy BEV_RESET_GENERIC_READ_TIMEOUT(bev); 462275970Scy bufferevent_trigger_nolock_(bev, EV_READ, 0); 463275970Scy bev_async_consider_reading(bev_a); 464275970Scy } else if (!ok) { 465275970Scy what |= BEV_EVENT_ERROR; 466275970Scy bev_a->ok = 0; 467275970Scy bufferevent_run_eventcb_(bev, what, 0); 468275970Scy } else if (!nbytes) { 469275970Scy what |= BEV_EVENT_EOF; 470275970Scy bev_a->ok = 0; 471275970Scy bufferevent_run_eventcb_(bev, what, 0); 472275970Scy } 473275970Scy } 474275970Scy 475275970Scy bufferevent_decref_and_unlock_(bev); 476275970Scy} 477275970Scy 478275970Scystatic void 479275970Scywrite_complete(struct event_overlapped *eo, ev_uintptr_t key, 480275970Scy ev_ssize_t nbytes, int ok) 481275970Scy{ 482275970Scy struct bufferevent_async *bev_a = upcast_write(eo); 483275970Scy struct bufferevent *bev = &bev_a->bev.bev; 484275970Scy short what = BEV_EVENT_WRITING; 485275970Scy ev_ssize_t amount_unwritten; 486275970Scy 487275970Scy BEV_LOCK(bev); 488275970Scy EVUTIL_ASSERT(bev_a->write_in_progress); 489275970Scy 490275970Scy amount_unwritten = bev_a->write_in_progress - nbytes; 491275970Scy evbuffer_commit_write_(bev->output, nbytes); 492275970Scy bev_a->write_in_progress = 0; 493275970Scy 494275970Scy if (amount_unwritten) 495275970Scy bufferevent_decrement_write_buckets_(&bev_a->bev, 496275970Scy -amount_unwritten); 497275970Scy 498275970Scy 499275970Scy if (!ok) 500275970Scy bev_async_set_wsa_error(bev, eo); 501275970Scy 502275970Scy if (bev_a->ok) { 503275970Scy if (ok && nbytes) { 504275970Scy BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 505275970Scy bufferevent_trigger_nolock_(bev, EV_WRITE, 0); 506275970Scy bev_async_consider_writing(bev_a); 507275970Scy } else if (!ok) { 508275970Scy what |= BEV_EVENT_ERROR; 509275970Scy bev_a->ok = 0; 510275970Scy bufferevent_run_eventcb_(bev, what, 0); 511275970Scy } else if (!nbytes) { 512275970Scy what |= BEV_EVENT_EOF; 513275970Scy bev_a->ok = 0; 514275970Scy bufferevent_run_eventcb_(bev, what, 0); 515275970Scy } 516275970Scy } 517275970Scy 518275970Scy bufferevent_decref_and_unlock_(bev); 519275970Scy} 520275970Scy 521275970Scystruct bufferevent * 522275970Scybufferevent_async_new_(struct event_base *base, 523275970Scy evutil_socket_t fd, int options) 524275970Scy{ 525275970Scy struct bufferevent_async *bev_a; 526275970Scy struct bufferevent *bev; 527275970Scy struct event_iocp_port *iocp; 528275970Scy 529275970Scy options |= BEV_OPT_THREADSAFE; 530275970Scy 531275970Scy if (!(iocp = event_base_get_iocp_(base))) 532275970Scy return NULL; 533275970Scy 534275970Scy if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 535275970Scy int err = GetLastError(); 536275970Scy /* We may have alrady associated this fd with a port. 537275970Scy * Let's hope it's this port, and that the error code 538275970Scy * for doing this neer changes. */ 539275970Scy if (err != ERROR_INVALID_PARAMETER) 540275970Scy return NULL; 541275970Scy } 542275970Scy 543275970Scy if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 544275970Scy return NULL; 545275970Scy 546275970Scy bev = &bev_a->bev.bev; 547275970Scy if (!(bev->input = evbuffer_overlapped_new_(fd))) { 548275970Scy mm_free(bev_a); 549275970Scy return NULL; 550275970Scy } 551275970Scy if (!(bev->output = evbuffer_overlapped_new_(fd))) { 552275970Scy evbuffer_free(bev->input); 553275970Scy mm_free(bev_a); 554275970Scy return NULL; 555275970Scy } 556275970Scy 557275970Scy if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 558275970Scy options)<0) 559275970Scy goto err; 560275970Scy 561275970Scy evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 562275970Scy evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 563275970Scy 564275970Scy event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 565275970Scy event_overlapped_init_(&bev_a->read_overlapped, read_complete); 566275970Scy event_overlapped_init_(&bev_a->write_overlapped, write_complete); 567275970Scy 568282408Scy bufferevent_init_generic_timeout_cbs_(bev); 569282408Scy 570275970Scy bev_a->ok = fd >= 0; 571275970Scy 572275970Scy return bev; 573275970Scyerr: 574275970Scy bufferevent_free(&bev_a->bev.bev); 575275970Scy return NULL; 576275970Scy} 577275970Scy 578275970Scyvoid 579275970Scybufferevent_async_set_connected_(struct bufferevent *bev) 580275970Scy{ 581275970Scy struct bufferevent_async *bev_async = upcast(bev); 582275970Scy bev_async->ok = 1; 583275970Scy bufferevent_init_generic_timeout_cbs_(bev); 584275970Scy /* Now's a good time to consider reading/writing */ 585275970Scy be_async_enable(bev, bev->enabled); 586275970Scy} 587275970Scy 588275970Scyint 589275970Scybufferevent_async_can_connect_(struct bufferevent *bev) 590275970Scy{ 591275970Scy const struct win32_extension_fns *ext = 592275970Scy event_get_win32_extension_fns_(); 593275970Scy 594275970Scy if (BEV_IS_ASYNC(bev) && 595275970Scy event_base_get_iocp_(bev->ev_base) && 596275970Scy ext && ext->ConnectEx) 597275970Scy return 1; 598275970Scy 599275970Scy return 0; 600275970Scy} 601275970Scy 602275970Scyint 603275970Scybufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 604275970Scy const struct sockaddr *sa, int socklen) 605275970Scy{ 606275970Scy BOOL rc; 607275970Scy struct bufferevent_async *bev_async = upcast(bev); 608275970Scy struct sockaddr_storage ss; 609275970Scy const struct win32_extension_fns *ext = 610275970Scy event_get_win32_extension_fns_(); 611275970Scy 612275970Scy EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 613275970Scy 614275970Scy /* ConnectEx() requires that the socket be bound to an address 615275970Scy * with bind() before using, otherwise it will fail. We attempt 616275970Scy * to issue a bind() here, taking into account that the error 617275970Scy * code is set to WSAEINVAL when the socket is already bound. */ 618275970Scy memset(&ss, 0, sizeof(ss)); 619275970Scy if (sa->sa_family == AF_INET) { 620275970Scy struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 621275970Scy sin->sin_family = AF_INET; 622275970Scy sin->sin_addr.s_addr = INADDR_ANY; 623275970Scy } else if (sa->sa_family == AF_INET6) { 624275970Scy struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 625275970Scy sin6->sin6_family = AF_INET6; 626275970Scy sin6->sin6_addr = in6addr_any; 627275970Scy } else { 628275970Scy /* Well, the user will have to bind() */ 629275970Scy return -1; 630275970Scy } 631275970Scy if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 632275970Scy WSAGetLastError() != WSAEINVAL) 633275970Scy return -1; 634275970Scy 635275970Scy event_base_add_virtual_(bev->ev_base); 636275970Scy bufferevent_incref_(bev); 637275970Scy rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 638275970Scy &bev_async->connect_overlapped.overlapped); 639275970Scy if (rc || WSAGetLastError() == ERROR_IO_PENDING) 640275970Scy return 0; 641275970Scy 642275970Scy event_base_del_virtual_(bev->ev_base); 643275970Scy bufferevent_decref_(bev); 644275970Scy 645275970Scy return -1; 646275970Scy} 647275970Scy 648275970Scystatic int 649275970Scybe_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 650275970Scy union bufferevent_ctrl_data *data) 651275970Scy{ 652275970Scy switch (op) { 653275970Scy case BEV_CTRL_GET_FD: 654275970Scy data->fd = evbuffer_overlapped_get_fd_(bev->input); 655275970Scy return 0; 656275970Scy case BEV_CTRL_SET_FD: { 657275970Scy struct event_iocp_port *iocp; 658275970Scy 659275970Scy if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 660275970Scy return 0; 661275970Scy if (!(iocp = event_base_get_iocp_(bev->ev_base))) 662275970Scy return -1; 663275970Scy if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) 664275970Scy return -1; 665275970Scy evbuffer_overlapped_set_fd_(bev->input, data->fd); 666275970Scy evbuffer_overlapped_set_fd_(bev->output, data->fd); 667275970Scy return 0; 668275970Scy } 669275970Scy case BEV_CTRL_CANCEL_ALL: { 670275970Scy struct bufferevent_async *bev_a = upcast(bev); 671275970Scy evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 672275970Scy if (fd != (evutil_socket_t)INVALID_SOCKET && 673275970Scy (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 674275970Scy closesocket(fd); 675282408Scy evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 676275970Scy } 677275970Scy bev_a->ok = 0; 678275970Scy return 0; 679275970Scy } 680275970Scy case BEV_CTRL_GET_UNDERLYING: 681275970Scy default: 682275970Scy return -1; 683275970Scy } 684275970Scy} 685275970Scy 686275970Scy 687