1290001Sglebius/* 2290001Sglebius * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3290001Sglebius * 4290001Sglebius * All rights reserved. 5290001Sglebius * 6290001Sglebius * Redistribution and use in source and binary forms, with or without 7290001Sglebius * modification, are permitted provided that the following conditions 8290001Sglebius * are met: 9290001Sglebius * 1. Redistributions of source code must retain the above copyright 10290001Sglebius * notice, this list of conditions and the following disclaimer. 11290001Sglebius * 2. Redistributions in binary form must reproduce the above copyright 12290001Sglebius * notice, this list of conditions and the following disclaimer in the 13290001Sglebius * documentation and/or other materials provided with the distribution. 14290001Sglebius * 3. The name of the author may not be used to endorse or promote products 15290001Sglebius * derived from this software without specific prior written permission. 16290001Sglebius * 17290001Sglebius * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18290001Sglebius * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19290001Sglebius * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20290001Sglebius * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21290001Sglebius * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22290001Sglebius * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23290001Sglebius * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24290001Sglebius * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25290001Sglebius * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26290001Sglebius * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27290001Sglebius */ 28290001Sglebius 29290001Sglebius#include "event2/event-config.h" 30290001Sglebius#include "evconfig-private.h" 31290001Sglebius 32290001Sglebius#ifdef EVENT__HAVE_SYS_TIME_H 33290001Sglebius#include <sys/time.h> 34290001Sglebius#endif 35290001Sglebius 36290001Sglebius#include <errno.h> 37290001Sglebius#include <stdio.h> 38290001Sglebius#include <stdlib.h> 39290001Sglebius#include <string.h> 40290001Sglebius#ifdef EVENT__HAVE_STDARG_H 41290001Sglebius#include <stdarg.h> 42290001Sglebius#endif 43290001Sglebius#ifdef EVENT__HAVE_UNISTD_H 44290001Sglebius#include <unistd.h> 45290001Sglebius#endif 46290001Sglebius 47290001Sglebius#ifdef _WIN32 48290001Sglebius#include <winsock2.h> 49290001Sglebius#include <ws2tcpip.h> 50290001Sglebius#endif 51290001Sglebius 52290001Sglebius#include <sys/queue.h> 53290001Sglebius 54290001Sglebius#include "event2/util.h" 55290001Sglebius#include "event2/bufferevent.h" 56290001Sglebius#include "event2/buffer.h" 57290001Sglebius#include "event2/bufferevent_struct.h" 58290001Sglebius#include "event2/event.h" 59290001Sglebius#include "event2/util.h" 60290001Sglebius#include "event-internal.h" 61290001Sglebius#include "log-internal.h" 62290001Sglebius#include "mm-internal.h" 63290001Sglebius#include "bufferevent-internal.h" 64290001Sglebius#include "util-internal.h" 65290001Sglebius#include "iocp-internal.h" 66290001Sglebius 67290001Sglebius#ifndef SO_UPDATE_CONNECT_CONTEXT 68290001Sglebius/* Mingw is sometimes missing this */ 69290001Sglebius#define SO_UPDATE_CONNECT_CONTEXT 0x7010 70290001Sglebius#endif 71290001Sglebius 72290001Sglebius/* prototypes */ 73290001Sglebiusstatic int be_async_enable(struct bufferevent *, short); 74290001Sglebiusstatic int be_async_disable(struct bufferevent *, short); 75290001Sglebiusstatic void be_async_destruct(struct bufferevent *); 76290001Sglebiusstatic int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 77290001Sglebiusstatic int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 78290001Sglebius 79290001Sglebiusstruct bufferevent_async { 80290001Sglebius struct bufferevent_private bev; 81290001Sglebius struct event_overlapped connect_overlapped; 82290001Sglebius struct event_overlapped read_overlapped; 83290001Sglebius struct event_overlapped write_overlapped; 84290001Sglebius size_t read_in_progress; 85290001Sglebius size_t write_in_progress; 86290001Sglebius unsigned ok : 1; 87290001Sglebius unsigned read_added : 1; 88290001Sglebius unsigned write_added : 1; 89290001Sglebius}; 90290001Sglebius 91290001Sglebiusconst struct bufferevent_ops bufferevent_ops_async = { 92290001Sglebius "socket_async", 93290001Sglebius evutil_offsetof(struct bufferevent_async, bev.bev), 94290001Sglebius be_async_enable, 95290001Sglebius be_async_disable, 96290001Sglebius NULL, /* Unlink */ 97290001Sglebius be_async_destruct, 98290001Sglebius bufferevent_generic_adj_timeouts_, 99290001Sglebius be_async_flush, 100290001Sglebius be_async_ctrl, 101290001Sglebius}; 102290001Sglebius 103290001Sglebiusstatic inline struct bufferevent_async * 104290001Sglebiusupcast(struct bufferevent *bev) 105290001Sglebius{ 106290001Sglebius struct bufferevent_async *bev_a; 107290001Sglebius if (bev->be_ops != &bufferevent_ops_async) 108290001Sglebius return NULL; 109290001Sglebius bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 110290001Sglebius return bev_a; 111290001Sglebius} 112290001Sglebius 113290001Sglebiusstatic inline struct bufferevent_async * 114290001Sglebiusupcast_connect(struct event_overlapped *eo) 115290001Sglebius{ 116290001Sglebius struct bufferevent_async *bev_a; 117290001Sglebius bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 118290001Sglebius EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 119290001Sglebius return bev_a; 120290001Sglebius} 121290001Sglebius 122290001Sglebiusstatic inline struct bufferevent_async * 123290001Sglebiusupcast_read(struct event_overlapped *eo) 124290001Sglebius{ 125290001Sglebius struct bufferevent_async *bev_a; 126290001Sglebius bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 127290001Sglebius EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 128290001Sglebius return bev_a; 129290001Sglebius} 130290001Sglebius 131290001Sglebiusstatic inline struct bufferevent_async * 132290001Sglebiusupcast_write(struct event_overlapped *eo) 133290001Sglebius{ 134290001Sglebius struct bufferevent_async *bev_a; 135290001Sglebius bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 136290001Sglebius EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 137290001Sglebius return bev_a; 138290001Sglebius} 139290001Sglebius 140290001Sglebiusstatic void 141290001Sglebiusbev_async_del_write(struct bufferevent_async *beva) 142290001Sglebius{ 143290001Sglebius struct bufferevent *bev = &beva->bev.bev; 144290001Sglebius 145290001Sglebius if (beva->write_added) { 146290001Sglebius beva->write_added = 0; 147290001Sglebius event_base_del_virtual_(bev->ev_base); 148290001Sglebius } 149290001Sglebius} 150290001Sglebius 151290001Sglebiusstatic void 152290001Sglebiusbev_async_del_read(struct bufferevent_async *beva) 153290001Sglebius{ 154290001Sglebius struct bufferevent *bev = &beva->bev.bev; 155290001Sglebius 156290001Sglebius if (beva->read_added) { 157290001Sglebius beva->read_added = 0; 158290001Sglebius event_base_del_virtual_(bev->ev_base); 159290001Sglebius } 160290001Sglebius} 161290001Sglebius 162290001Sglebiusstatic void 163290001Sglebiusbev_async_add_write(struct bufferevent_async *beva) 164290001Sglebius{ 165290001Sglebius struct bufferevent *bev = &beva->bev.bev; 166290001Sglebius 167290001Sglebius if (!beva->write_added) { 168290001Sglebius beva->write_added = 1; 169290001Sglebius event_base_add_virtual_(bev->ev_base); 170290001Sglebius } 171290001Sglebius} 172290001Sglebius 173290001Sglebiusstatic void 174290001Sglebiusbev_async_add_read(struct bufferevent_async *beva) 175290001Sglebius{ 176290001Sglebius struct bufferevent *bev = &beva->bev.bev; 177290001Sglebius 178290001Sglebius if (!beva->read_added) { 179290001Sglebius beva->read_added = 1; 180290001Sglebius event_base_add_virtual_(bev->ev_base); 181290001Sglebius } 182290001Sglebius} 183290001Sglebius 184290001Sglebiusstatic void 185290001Sglebiusbev_async_consider_writing(struct bufferevent_async *beva) 186290001Sglebius{ 187290001Sglebius size_t at_most; 188290001Sglebius int limit; 189290001Sglebius struct bufferevent *bev = &beva->bev.bev; 190290001Sglebius 191290001Sglebius /* Don't write if there's a write in progress, or we do not 192290001Sglebius * want to write, or when there's nothing left to write. */ 193290001Sglebius if (beva->write_in_progress || beva->bev.connecting) 194290001Sglebius return; 195290001Sglebius if (!beva->ok || !(bev->enabled&EV_WRITE) || 196290001Sglebius !evbuffer_get_length(bev->output)) { 197290001Sglebius bev_async_del_write(beva); 198290001Sglebius return; 199290001Sglebius } 200290001Sglebius 201290001Sglebius at_most = evbuffer_get_length(bev->output); 202290001Sglebius 203290001Sglebius /* This is safe so long as bufferevent_get_write_max never returns 204290001Sglebius * more than INT_MAX. That's true for now. XXXX */ 205290001Sglebius limit = (int)bufferevent_get_write_max_(&beva->bev); 206290001Sglebius if (at_most >= (size_t)limit && limit >= 0) 207290001Sglebius at_most = limit; 208290001Sglebius 209290001Sglebius if (beva->bev.write_suspended) { 210290001Sglebius bev_async_del_write(beva); 211290001Sglebius return; 212290001Sglebius } 213290001Sglebius 214290001Sglebius /* XXXX doesn't respect low-water mark very well. */ 215290001Sglebius bufferevent_incref_(bev); 216290001Sglebius if (evbuffer_launch_write_(bev->output, at_most, 217290001Sglebius &beva->write_overlapped)) { 218290001Sglebius bufferevent_decref_(bev); 219290001Sglebius beva->ok = 0; 220290001Sglebius bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 221290001Sglebius } else { 222290001Sglebius beva->write_in_progress = at_most; 223290001Sglebius bufferevent_decrement_write_buckets_(&beva->bev, at_most); 224290001Sglebius bev_async_add_write(beva); 225290001Sglebius } 226290001Sglebius} 227290001Sglebius 228290001Sglebiusstatic void 229290001Sglebiusbev_async_consider_reading(struct bufferevent_async *beva) 230290001Sglebius{ 231290001Sglebius size_t cur_size; 232290001Sglebius size_t read_high; 233290001Sglebius size_t at_most; 234290001Sglebius int limit; 235290001Sglebius struct bufferevent *bev = &beva->bev.bev; 236290001Sglebius 237290001Sglebius /* Don't read if there is a read in progress, or we do not 238290001Sglebius * want to read. */ 239290001Sglebius if (beva->read_in_progress || beva->bev.connecting) 240290001Sglebius return; 241290001Sglebius if (!beva->ok || !(bev->enabled&EV_READ)) { 242290001Sglebius bev_async_del_read(beva); 243290001Sglebius return; 244290001Sglebius } 245290001Sglebius 246290001Sglebius /* Don't read if we're full */ 247290001Sglebius cur_size = evbuffer_get_length(bev->input); 248290001Sglebius read_high = bev->wm_read.high; 249290001Sglebius if (read_high) { 250290001Sglebius if (cur_size >= read_high) { 251290001Sglebius bev_async_del_read(beva); 252290001Sglebius return; 253290001Sglebius } 254290001Sglebius at_most = read_high - cur_size; 255290001Sglebius } else { 256290001Sglebius at_most = 16384; /* FIXME totally magic. */ 257290001Sglebius } 258290001Sglebius 259290001Sglebius /* XXXX This over-commits. */ 260290001Sglebius /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 261290001Sglebius limit = (int)bufferevent_get_read_max_(&beva->bev); 262290001Sglebius if (at_most >= (size_t)limit && limit >= 0) 263290001Sglebius at_most = limit; 264290001Sglebius 265290001Sglebius if (beva->bev.read_suspended) { 266290001Sglebius bev_async_del_read(beva); 267290001Sglebius return; 268290001Sglebius } 269290001Sglebius 270290001Sglebius bufferevent_incref_(bev); 271290001Sglebius if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 272290001Sglebius beva->ok = 0; 273290001Sglebius bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 274290001Sglebius bufferevent_decref_(bev); 275290001Sglebius } else { 276290001Sglebius beva->read_in_progress = at_most; 277290001Sglebius bufferevent_decrement_read_buckets_(&beva->bev, at_most); 278290001Sglebius bev_async_add_read(beva); 279290001Sglebius } 280290001Sglebius 281290001Sglebius return; 282290001Sglebius} 283290001Sglebius 284290001Sglebiusstatic void 285290001Sglebiusbe_async_outbuf_callback(struct evbuffer *buf, 286290001Sglebius const struct evbuffer_cb_info *cbinfo, 287290001Sglebius void *arg) 288290001Sglebius{ 289290001Sglebius struct bufferevent *bev = arg; 290290001Sglebius struct bufferevent_async *bev_async = upcast(bev); 291290001Sglebius 292290001Sglebius /* If we added data to the outbuf and were not writing before, 293290001Sglebius * we may want to write now. */ 294290001Sglebius 295290001Sglebius bufferevent_incref_and_lock_(bev); 296290001Sglebius 297290001Sglebius if (cbinfo->n_added) 298290001Sglebius bev_async_consider_writing(bev_async); 299290001Sglebius 300290001Sglebius bufferevent_decref_and_unlock_(bev); 301290001Sglebius} 302290001Sglebius 303290001Sglebiusstatic void 304290001Sglebiusbe_async_inbuf_callback(struct evbuffer *buf, 305290001Sglebius const struct evbuffer_cb_info *cbinfo, 306290001Sglebius void *arg) 307290001Sglebius{ 308290001Sglebius struct bufferevent *bev = arg; 309290001Sglebius struct bufferevent_async *bev_async = upcast(bev); 310290001Sglebius 311290001Sglebius /* If we drained data from the inbuf and were not reading before, 312290001Sglebius * we may want to read now */ 313290001Sglebius 314290001Sglebius bufferevent_incref_and_lock_(bev); 315290001Sglebius 316290001Sglebius if (cbinfo->n_deleted) 317290001Sglebius bev_async_consider_reading(bev_async); 318290001Sglebius 319290001Sglebius bufferevent_decref_and_unlock_(bev); 320290001Sglebius} 321290001Sglebius 322290001Sglebiusstatic int 323290001Sglebiusbe_async_enable(struct bufferevent *buf, short what) 324290001Sglebius{ 325290001Sglebius struct bufferevent_async *bev_async = upcast(buf); 326290001Sglebius 327290001Sglebius if (!bev_async->ok) 328290001Sglebius return -1; 329290001Sglebius 330290001Sglebius if (bev_async->bev.connecting) { 331290001Sglebius /* Don't launch anything during connection attempts. */ 332290001Sglebius return 0; 333290001Sglebius } 334290001Sglebius 335290001Sglebius if (what & EV_READ) 336290001Sglebius BEV_RESET_GENERIC_READ_TIMEOUT(buf); 337290001Sglebius if (what & EV_WRITE) 338290001Sglebius BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 339290001Sglebius 340290001Sglebius /* If we newly enable reading or writing, and we aren't reading or 341290001Sglebius writing already, consider launching a new read or write. */ 342290001Sglebius 343290001Sglebius if (what & EV_READ) 344290001Sglebius bev_async_consider_reading(bev_async); 345290001Sglebius if (what & EV_WRITE) 346290001Sglebius bev_async_consider_writing(bev_async); 347290001Sglebius return 0; 348290001Sglebius} 349290001Sglebius 350290001Sglebiusstatic int 351290001Sglebiusbe_async_disable(struct bufferevent *bev, short what) 352290001Sglebius{ 353290001Sglebius struct bufferevent_async *bev_async = upcast(bev); 354290001Sglebius /* XXXX If we disable reading or writing, we may want to consider 355290001Sglebius * canceling any in-progress read or write operation, though it might 356290001Sglebius * not work. */ 357290001Sglebius 358290001Sglebius if (what & EV_READ) { 359290001Sglebius BEV_DEL_GENERIC_READ_TIMEOUT(bev); 360290001Sglebius bev_async_del_read(bev_async); 361290001Sglebius } 362290001Sglebius if (what & EV_WRITE) { 363290001Sglebius BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 364290001Sglebius bev_async_del_write(bev_async); 365290001Sglebius } 366290001Sglebius 367290001Sglebius return 0; 368290001Sglebius} 369290001Sglebius 370290001Sglebiusstatic void 371290001Sglebiusbe_async_destruct(struct bufferevent *bev) 372290001Sglebius{ 373290001Sglebius struct bufferevent_async *bev_async = upcast(bev); 374290001Sglebius struct bufferevent_private *bev_p = BEV_UPCAST(bev); 375290001Sglebius evutil_socket_t fd; 376290001Sglebius 377290001Sglebius EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 378290001Sglebius !upcast(bev)->read_in_progress); 379290001Sglebius 380290001Sglebius bev_async_del_read(bev_async); 381290001Sglebius bev_async_del_write(bev_async); 382290001Sglebius 383290001Sglebius fd = evbuffer_overlapped_get_fd_(bev->input); 384290001Sglebius if (fd != (evutil_socket_t)INVALID_SOCKET && 385290001Sglebius (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 386290001Sglebius evutil_closesocket(fd); 387290001Sglebius evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 388290001Sglebius } 389290001Sglebius} 390290001Sglebius 391290001Sglebius/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 392290001Sglebius * we use WSAGetOverlappedResult to translate. */ 393290001Sglebiusstatic void 394290001Sglebiusbev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 395290001Sglebius{ 396290001Sglebius DWORD bytes, flags; 397290001Sglebius evutil_socket_t fd; 398290001Sglebius 399290001Sglebius fd = evbuffer_overlapped_get_fd_(bev->input); 400290001Sglebius WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 401290001Sglebius} 402290001Sglebius 403290001Sglebiusstatic int 404290001Sglebiusbe_async_flush(struct bufferevent *bev, short what, 405290001Sglebius enum bufferevent_flush_mode mode) 406290001Sglebius{ 407290001Sglebius return 0; 408290001Sglebius} 409290001Sglebius 410290001Sglebiusstatic void 411290001Sglebiusconnect_complete(struct event_overlapped *eo, ev_uintptr_t key, 412290001Sglebius ev_ssize_t nbytes, int ok) 413290001Sglebius{ 414290001Sglebius struct bufferevent_async *bev_a = upcast_connect(eo); 415290001Sglebius struct bufferevent *bev = &bev_a->bev.bev; 416290001Sglebius evutil_socket_t sock; 417290001Sglebius 418290001Sglebius BEV_LOCK(bev); 419290001Sglebius 420290001Sglebius EVUTIL_ASSERT(bev_a->bev.connecting); 421290001Sglebius bev_a->bev.connecting = 0; 422290001Sglebius sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 423290001Sglebius /* XXXX Handle error? */ 424290001Sglebius setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 425290001Sglebius 426290001Sglebius if (ok) 427290001Sglebius bufferevent_async_set_connected_(bev); 428290001Sglebius else 429290001Sglebius bev_async_set_wsa_error(bev, eo); 430290001Sglebius 431290001Sglebius bufferevent_run_eventcb_(bev, 432290001Sglebius ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 433290001Sglebius 434290001Sglebius event_base_del_virtual_(bev->ev_base); 435290001Sglebius 436290001Sglebius bufferevent_decref_and_unlock_(bev); 437290001Sglebius} 438290001Sglebius 439290001Sglebiusstatic void 440290001Sglebiusread_complete(struct event_overlapped *eo, ev_uintptr_t key, 441290001Sglebius ev_ssize_t nbytes, int ok) 442290001Sglebius{ 443290001Sglebius struct bufferevent_async *bev_a = upcast_read(eo); 444290001Sglebius struct bufferevent *bev = &bev_a->bev.bev; 445290001Sglebius short what = BEV_EVENT_READING; 446290001Sglebius ev_ssize_t amount_unread; 447290001Sglebius BEV_LOCK(bev); 448290001Sglebius EVUTIL_ASSERT(bev_a->read_in_progress); 449290001Sglebius 450290001Sglebius amount_unread = bev_a->read_in_progress - nbytes; 451290001Sglebius evbuffer_commit_read_(bev->input, nbytes); 452290001Sglebius bev_a->read_in_progress = 0; 453290001Sglebius if (amount_unread) 454290001Sglebius bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 455290001Sglebius 456290001Sglebius if (!ok) 457290001Sglebius bev_async_set_wsa_error(bev, eo); 458290001Sglebius 459290001Sglebius if (bev_a->ok) { 460290001Sglebius if (ok && nbytes) { 461290001Sglebius BEV_RESET_GENERIC_READ_TIMEOUT(bev); 462290001Sglebius bufferevent_trigger_nolock_(bev, EV_READ, 0); 463290001Sglebius bev_async_consider_reading(bev_a); 464290001Sglebius } else if (!ok) { 465290001Sglebius what |= BEV_EVENT_ERROR; 466290001Sglebius bev_a->ok = 0; 467290001Sglebius bufferevent_run_eventcb_(bev, what, 0); 468290001Sglebius } else if (!nbytes) { 469290001Sglebius what |= BEV_EVENT_EOF; 470290001Sglebius bev_a->ok = 0; 471290001Sglebius bufferevent_run_eventcb_(bev, what, 0); 472290001Sglebius } 473290001Sglebius } 474290001Sglebius 475290001Sglebius bufferevent_decref_and_unlock_(bev); 476290001Sglebius} 477290001Sglebius 478290001Sglebiusstatic void 479290001Sglebiuswrite_complete(struct event_overlapped *eo, ev_uintptr_t key, 480290001Sglebius ev_ssize_t nbytes, int ok) 481290001Sglebius{ 482290001Sglebius struct bufferevent_async *bev_a = upcast_write(eo); 483290001Sglebius struct bufferevent *bev = &bev_a->bev.bev; 484290001Sglebius short what = BEV_EVENT_WRITING; 485290001Sglebius ev_ssize_t amount_unwritten; 486290001Sglebius 487290001Sglebius BEV_LOCK(bev); 488290001Sglebius EVUTIL_ASSERT(bev_a->write_in_progress); 489290001Sglebius 490290001Sglebius amount_unwritten = bev_a->write_in_progress - nbytes; 491290001Sglebius evbuffer_commit_write_(bev->output, nbytes); 492290001Sglebius bev_a->write_in_progress = 0; 493290001Sglebius 494290001Sglebius if (amount_unwritten) 495290001Sglebius bufferevent_decrement_write_buckets_(&bev_a->bev, 496290001Sglebius -amount_unwritten); 497290001Sglebius 498290001Sglebius 499290001Sglebius if (!ok) 500290001Sglebius bev_async_set_wsa_error(bev, eo); 501290001Sglebius 502290001Sglebius if (bev_a->ok) { 503290001Sglebius if (ok && nbytes) { 504290001Sglebius BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 505290001Sglebius bufferevent_trigger_nolock_(bev, EV_WRITE, 0); 506290001Sglebius bev_async_consider_writing(bev_a); 507290001Sglebius } else if (!ok) { 508290001Sglebius what |= BEV_EVENT_ERROR; 509290001Sglebius bev_a->ok = 0; 510290001Sglebius bufferevent_run_eventcb_(bev, what, 0); 511290001Sglebius } else if (!nbytes) { 512290001Sglebius what |= BEV_EVENT_EOF; 513290001Sglebius bev_a->ok = 0; 514290001Sglebius bufferevent_run_eventcb_(bev, what, 0); 515290001Sglebius } 516290001Sglebius } 517290001Sglebius 518290001Sglebius bufferevent_decref_and_unlock_(bev); 519290001Sglebius} 520290001Sglebius 521290001Sglebiusstruct bufferevent * 522290001Sglebiusbufferevent_async_new_(struct event_base *base, 523290001Sglebius evutil_socket_t fd, int options) 524290001Sglebius{ 525290001Sglebius struct bufferevent_async *bev_a; 526290001Sglebius struct bufferevent *bev; 527290001Sglebius struct event_iocp_port *iocp; 528290001Sglebius 529290001Sglebius options |= BEV_OPT_THREADSAFE; 530290001Sglebius 531290001Sglebius if (!(iocp = event_base_get_iocp_(base))) 532290001Sglebius return NULL; 533290001Sglebius 534290001Sglebius if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 535290001Sglebius int err = GetLastError(); 536290001Sglebius /* We may have alrady associated this fd with a port. 537290001Sglebius * Let's hope it's this port, and that the error code 538290001Sglebius * for doing this neer changes. */ 539290001Sglebius if (err != ERROR_INVALID_PARAMETER) 540290001Sglebius return NULL; 541290001Sglebius } 542290001Sglebius 543290001Sglebius if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 544290001Sglebius return NULL; 545290001Sglebius 546290001Sglebius bev = &bev_a->bev.bev; 547290001Sglebius if (!(bev->input = evbuffer_overlapped_new_(fd))) { 548290001Sglebius mm_free(bev_a); 549290001Sglebius return NULL; 550290001Sglebius } 551290001Sglebius if (!(bev->output = evbuffer_overlapped_new_(fd))) { 552290001Sglebius evbuffer_free(bev->input); 553290001Sglebius mm_free(bev_a); 554290001Sglebius return NULL; 555290001Sglebius } 556290001Sglebius 557290001Sglebius if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 558290001Sglebius options)<0) 559290001Sglebius goto err; 560290001Sglebius 561290001Sglebius evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 562290001Sglebius evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 563290001Sglebius 564290001Sglebius event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 565290001Sglebius event_overlapped_init_(&bev_a->read_overlapped, read_complete); 566290001Sglebius event_overlapped_init_(&bev_a->write_overlapped, write_complete); 567290001Sglebius 568290001Sglebius bufferevent_init_generic_timeout_cbs_(bev); 569290001Sglebius 570290001Sglebius bev_a->ok = fd >= 0; 571290001Sglebius 572290001Sglebius return bev; 573290001Sglebiuserr: 574290001Sglebius bufferevent_free(&bev_a->bev.bev); 575290001Sglebius return NULL; 576290001Sglebius} 577290001Sglebius 578290001Sglebiusvoid 579290001Sglebiusbufferevent_async_set_connected_(struct bufferevent *bev) 580290001Sglebius{ 581290001Sglebius struct bufferevent_async *bev_async = upcast(bev); 582290001Sglebius bev_async->ok = 1; 583290001Sglebius bufferevent_init_generic_timeout_cbs_(bev); 584290001Sglebius /* Now's a good time to consider reading/writing */ 585290001Sglebius be_async_enable(bev, bev->enabled); 586290001Sglebius} 587290001Sglebius 588290001Sglebiusint 589290001Sglebiusbufferevent_async_can_connect_(struct bufferevent *bev) 590290001Sglebius{ 591290001Sglebius const struct win32_extension_fns *ext = 592290001Sglebius event_get_win32_extension_fns_(); 593290001Sglebius 594290001Sglebius if (BEV_IS_ASYNC(bev) && 595290001Sglebius event_base_get_iocp_(bev->ev_base) && 596290001Sglebius ext && ext->ConnectEx) 597290001Sglebius return 1; 598290001Sglebius 599290001Sglebius return 0; 600290001Sglebius} 601290001Sglebius 602290001Sglebiusint 603290001Sglebiusbufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 604290001Sglebius const struct sockaddr *sa, int socklen) 605290001Sglebius{ 606290001Sglebius BOOL rc; 607290001Sglebius struct bufferevent_async *bev_async = upcast(bev); 608290001Sglebius struct sockaddr_storage ss; 609290001Sglebius const struct win32_extension_fns *ext = 610290001Sglebius event_get_win32_extension_fns_(); 611290001Sglebius 612290001Sglebius EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 613290001Sglebius 614290001Sglebius /* ConnectEx() requires that the socket be bound to an address 615290001Sglebius * with bind() before using, otherwise it will fail. We attempt 616290001Sglebius * to issue a bind() here, taking into account that the error 617290001Sglebius * code is set to WSAEINVAL when the socket is already bound. */ 618290001Sglebius memset(&ss, 0, sizeof(ss)); 619290001Sglebius if (sa->sa_family == AF_INET) { 620290001Sglebius struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 621290001Sglebius sin->sin_family = AF_INET; 622290001Sglebius sin->sin_addr.s_addr = INADDR_ANY; 623290001Sglebius } else if (sa->sa_family == AF_INET6) { 624290001Sglebius struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 625290001Sglebius sin6->sin6_family = AF_INET6; 626290001Sglebius sin6->sin6_addr = in6addr_any; 627290001Sglebius } else { 628290001Sglebius /* Well, the user will have to bind() */ 629290001Sglebius return -1; 630290001Sglebius } 631290001Sglebius if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 632290001Sglebius WSAGetLastError() != WSAEINVAL) 633290001Sglebius return -1; 634290001Sglebius 635290001Sglebius event_base_add_virtual_(bev->ev_base); 636290001Sglebius bufferevent_incref_(bev); 637290001Sglebius rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 638290001Sglebius &bev_async->connect_overlapped.overlapped); 639290001Sglebius if (rc || WSAGetLastError() == ERROR_IO_PENDING) 640290001Sglebius return 0; 641290001Sglebius 642290001Sglebius event_base_del_virtual_(bev->ev_base); 643290001Sglebius bufferevent_decref_(bev); 644290001Sglebius 645290001Sglebius return -1; 646290001Sglebius} 647290001Sglebius 648290001Sglebiusstatic int 649290001Sglebiusbe_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 650290001Sglebius union bufferevent_ctrl_data *data) 651290001Sglebius{ 652290001Sglebius switch (op) { 653290001Sglebius case BEV_CTRL_GET_FD: 654290001Sglebius data->fd = evbuffer_overlapped_get_fd_(bev->input); 655290001Sglebius return 0; 656290001Sglebius case BEV_CTRL_SET_FD: { 657290001Sglebius struct event_iocp_port *iocp; 658290001Sglebius 659290001Sglebius if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 660290001Sglebius return 0; 661290001Sglebius if (!(iocp = event_base_get_iocp_(bev->ev_base))) 662290001Sglebius return -1; 663290001Sglebius if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) 664290001Sglebius return -1; 665290001Sglebius evbuffer_overlapped_set_fd_(bev->input, data->fd); 666290001Sglebius evbuffer_overlapped_set_fd_(bev->output, data->fd); 667290001Sglebius return 0; 668290001Sglebius } 669290001Sglebius case BEV_CTRL_CANCEL_ALL: { 670290001Sglebius struct bufferevent_async *bev_a = upcast(bev); 671290001Sglebius evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 672290001Sglebius if (fd != (evutil_socket_t)INVALID_SOCKET && 673290001Sglebius (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 674290001Sglebius closesocket(fd); 675290001Sglebius evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 676290001Sglebius } 677290001Sglebius bev_a->ok = 0; 678290001Sglebius return 0; 679290001Sglebius } 680290001Sglebius case BEV_CTRL_GET_UNDERLYING: 681290001Sglebius default: 682290001Sglebius return -1; 683290001Sglebius } 684290001Sglebius} 685290001Sglebius 686290001Sglebius 687