1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22#include <errno.h> 23 24#ifdef _WIN32 25# include <fcntl.h> 26#else 27# include <sys/socket.h> 28# include <unistd.h> 29#endif 30 31#include "uv.h" 32#include "task.h" 33 34#ifdef __linux__ 35# include <sys/epoll.h> 36#endif 37 38#ifdef UV_HAVE_KQUEUE 39# include <sys/types.h> 40# include <sys/event.h> 41# include <sys/time.h> 42#endif 43 44 45#define NUM_CLIENTS 5 46#define TRANSFER_BYTES (1 << 16) 47 48#undef MIN 49#define MIN(a, b) (((a) < (b)) ? (a) : (b)); 50 51 52typedef enum { 53 UNIDIRECTIONAL, 54 DUPLEX 55} test_mode_t; 56 57typedef struct connection_context_s { 58 uv_poll_t poll_handle; 59 uv_timer_t timer_handle; 60 uv_os_sock_t sock; 61 size_t read, sent; 62 int is_server_connection; 63 int open_handles; 64 int got_fin, sent_fin, got_disconnect; 65 unsigned int events, delayed_events; 66} connection_context_t; 67 68typedef struct server_context_s { 69 uv_poll_t poll_handle; 70 uv_os_sock_t sock; 71 int connections; 72} server_context_t; 73 74 75static void delay_timer_cb(uv_timer_t* timer); 76 77 78static test_mode_t test_mode = DUPLEX; 79 80static int closed_connections = 0; 81 82static int valid_writable_wakeups = 0; 83static int spurious_writable_wakeups = 0; 84 85#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__) 86static int disconnects = 0; 87#endif /* !__sun && !_AIX && !__MVS__ */ 88 89static int got_eagain(void) { 90#ifdef _WIN32 91 return WSAGetLastError() == WSAEWOULDBLOCK; 92#else 93 return errno == EAGAIN 94 || errno == EINPROGRESS 95#ifdef EWOULDBLOCK 96 || errno == EWOULDBLOCK; 97#endif 98 ; 99#endif 100} 101 102 103static uv_os_sock_t create_bound_socket (struct sockaddr_in bind_addr) { 104 uv_os_sock_t sock; 105 int r; 106 107 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); 108#ifdef _WIN32 109 ASSERT(sock != INVALID_SOCKET); 110#else 111 ASSERT(sock >= 0); 112#endif 113 114#ifndef _WIN32 115 { 116 /* Allow reuse of the port. */ 117 int yes = 1; 118 r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); 119 ASSERT(r == 0); 120 } 121#endif 122 123 r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr); 124 ASSERT(r == 0); 125 126 return sock; 127} 128 129 130static void close_socket(uv_os_sock_t sock) { 131 int r; 132#ifdef _WIN32 133 r = closesocket(sock); 134#else 135 r = close(sock); 136#endif 137 /* On FreeBSD close() can fail with ECONNRESET if the socket was shutdown by 138 * the peer before all pending data was delivered. 139 */ 140 ASSERT(r == 0 || errno == ECONNRESET); 141} 142 143 144static connection_context_t* create_connection_context( 145 uv_os_sock_t sock, int is_server_connection) { 146 int r; 147 connection_context_t* context; 148 149 context = (connection_context_t*) malloc(sizeof *context); 150 ASSERT_NOT_NULL(context); 151 152 context->sock = sock; 153 context->is_server_connection = is_server_connection; 154 context->read = 0; 155 context->sent = 0; 156 context->open_handles = 0; 157 context->events = 0; 158 context->delayed_events = 0; 159 context->got_fin = 0; 160 context->sent_fin = 0; 161 context->got_disconnect = 0; 162 163 r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock); 164 context->open_handles++; 165 context->poll_handle.data = context; 166 ASSERT(r == 0); 167 168 r = uv_timer_init(uv_default_loop(), &context->timer_handle); 169 context->open_handles++; 170 context->timer_handle.data = context; 171 ASSERT(r == 0); 172 173 return context; 174} 175 176 177static void connection_close_cb(uv_handle_t* handle) { 178 connection_context_t* context = (connection_context_t*) handle->data; 179 180 if (--context->open_handles == 0) { 181 if (test_mode == DUPLEX || context->is_server_connection) { 182 ASSERT(context->read == TRANSFER_BYTES); 183 } else { 184 ASSERT(context->read == 0); 185 } 186 187 if (test_mode == DUPLEX || !context->is_server_connection) { 188 ASSERT(context->sent == TRANSFER_BYTES); 189 } else { 190 ASSERT(context->sent == 0); 191 } 192 193 closed_connections++; 194 195 free(context); 196 } 197} 198 199 200static void destroy_connection_context(connection_context_t* context) { 201 uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb); 202 uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb); 203} 204 205 206static void connection_poll_cb(uv_poll_t* handle, int status, int events) { 207 connection_context_t* context = (connection_context_t*) handle->data; 208 unsigned int new_events; 209 int r; 210 211 ASSERT(status == 0); 212 ASSERT(events & context->events); 213 ASSERT(!(events & ~context->events)); 214 215 new_events = context->events; 216 217 if (events & UV_READABLE) { 218 int action = rand() % 7; 219 220 switch (action) { 221 case 0: 222 case 1: { 223 /* Read a couple of bytes. */ 224 static char buffer[74]; 225 226 do 227 r = recv(context->sock, buffer, sizeof buffer, 0); 228 while (r == -1 && errno == EINTR); 229 ASSERT(r >= 0); 230 231 if (r > 0) { 232 context->read += r; 233 } else { 234 /* Got FIN. */ 235 context->got_fin = 1; 236 new_events &= ~UV_READABLE; 237 } 238 239 break; 240 } 241 242 case 2: 243 case 3: { 244 /* Read until EAGAIN. */ 245 static char buffer[931]; 246 247 for (;;) { 248 do 249 r = recv(context->sock, buffer, sizeof buffer, 0); 250 while (r == -1 && errno == EINTR); 251 252 if (r <= 0) 253 break; 254 255 context->read += r; 256 } 257 258 if (r == 0) { 259 /* Got FIN. */ 260 context->got_fin = 1; 261 new_events &= ~UV_READABLE; 262 } else { 263 ASSERT(got_eagain()); 264 } 265 266 break; 267 } 268 269 case 4: 270 /* Ignore. */ 271 break; 272 273 case 5: 274 /* Stop reading for a while. Restart in timer callback. */ 275 new_events &= ~UV_READABLE; 276 if (!uv_is_active((uv_handle_t*) &context->timer_handle)) { 277 context->delayed_events = UV_READABLE; 278 uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0); 279 } else { 280 context->delayed_events |= UV_READABLE; 281 } 282 break; 283 284 case 6: 285 /* Fudge with the event mask. */ 286 uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb); 287 uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb); 288 context->events = UV_READABLE; 289 break; 290 291 default: 292 ASSERT(0); 293 } 294 } 295 296 if (events & UV_WRITABLE) { 297 if (context->sent < TRANSFER_BYTES && 298 !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) { 299 /* We have to send more bytes. */ 300 int action = rand() % 7; 301 302 switch (action) { 303 case 0: 304 case 1: { 305 /* Send a couple of bytes. */ 306 static char buffer[103]; 307 308 int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); 309 ASSERT(send_bytes > 0); 310 311 do 312 r = send(context->sock, buffer, send_bytes, 0); 313 while (r == -1 && errno == EINTR); 314 315 if (r < 0) { 316 ASSERT(got_eagain()); 317 spurious_writable_wakeups++; 318 break; 319 } 320 321 ASSERT(r > 0); 322 context->sent += r; 323 valid_writable_wakeups++; 324 break; 325 } 326 327 case 2: 328 case 3: { 329 /* Send until EAGAIN. */ 330 static char buffer[1234]; 331 332 int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); 333 ASSERT(send_bytes > 0); 334 335 do 336 r = send(context->sock, buffer, send_bytes, 0); 337 while (r == -1 && errno == EINTR); 338 339 if (r < 0) { 340 ASSERT(got_eagain()); 341 spurious_writable_wakeups++; 342 break; 343 } 344 345 ASSERT(r > 0); 346 valid_writable_wakeups++; 347 context->sent += r; 348 349 while (context->sent < TRANSFER_BYTES) { 350 send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); 351 ASSERT(send_bytes > 0); 352 353 do 354 r = send(context->sock, buffer, send_bytes, 0); 355 while (r == -1 && errno == EINTR); 356 ASSERT(r != 0); 357 358 if (r < 0) { 359 ASSERT(got_eagain()); 360 break; 361 } 362 363 context->sent += r; 364 } 365 break; 366 } 367 368 case 4: 369 /* Ignore. */ 370 break; 371 372 case 5: 373 /* Stop sending for a while. Restart in timer callback. */ 374 new_events &= ~UV_WRITABLE; 375 if (!uv_is_active((uv_handle_t*) &context->timer_handle)) { 376 context->delayed_events = UV_WRITABLE; 377 uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0); 378 } else { 379 context->delayed_events |= UV_WRITABLE; 380 } 381 break; 382 383 case 6: 384 /* Fudge with the event mask. */ 385 uv_poll_start(&context->poll_handle, 386 UV_READABLE, 387 connection_poll_cb); 388 uv_poll_start(&context->poll_handle, 389 UV_WRITABLE, 390 connection_poll_cb); 391 context->events = UV_WRITABLE; 392 break; 393 394 default: 395 ASSERT(0); 396 } 397 398 } else { 399 /* Nothing more to write. Send FIN. */ 400 int r; 401#ifdef _WIN32 402 r = shutdown(context->sock, SD_SEND); 403#else 404 r = shutdown(context->sock, SHUT_WR); 405#endif 406 ASSERT(r == 0); 407 context->sent_fin = 1; 408 new_events &= ~UV_WRITABLE; 409 } 410 } 411#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__) 412 if (events & UV_DISCONNECT) { 413 context->got_disconnect = 1; 414 ++disconnects; 415 new_events &= ~UV_DISCONNECT; 416 } 417 418 if (context->got_fin && context->sent_fin && context->got_disconnect) { 419#else /* __sun && _AIX && __MVS__ */ 420 if (context->got_fin && context->sent_fin) { 421#endif /* !__sun && !_AIX && !__MVS__ */ 422 /* Sent and received FIN. Close and destroy context. */ 423 close_socket(context->sock); 424 destroy_connection_context(context); 425 context->events = 0; 426 427 } else if (new_events != context->events) { 428 /* Poll mask changed. Call uv_poll_start again. */ 429 context->events = new_events; 430 uv_poll_start(handle, new_events, connection_poll_cb); 431 } 432 433 /* Assert that uv_is_active works correctly for poll handles. */ 434 if (context->events != 0) { 435 ASSERT(1 == uv_is_active((uv_handle_t*) handle)); 436 } else { 437 ASSERT(0 == uv_is_active((uv_handle_t*) handle)); 438 } 439} 440 441 442static void delay_timer_cb(uv_timer_t* timer) { 443 connection_context_t* context = (connection_context_t*) timer->data; 444 int r; 445 446 /* Timer should auto stop. */ 447 ASSERT(0 == uv_is_active((uv_handle_t*) timer)); 448 449 /* Add the requested events to the poll mask. */ 450 ASSERT(context->delayed_events != 0); 451 context->events |= context->delayed_events; 452 context->delayed_events = 0; 453 454 r = uv_poll_start(&context->poll_handle, 455 context->events, 456 connection_poll_cb); 457 ASSERT(r == 0); 458} 459 460 461static server_context_t* create_server_context( 462 uv_os_sock_t sock) { 463 int r; 464 server_context_t* context; 465 466 context = (server_context_t*) malloc(sizeof *context); 467 ASSERT_NOT_NULL(context); 468 469 context->sock = sock; 470 context->connections = 0; 471 472 r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock); 473 context->poll_handle.data = context; 474 ASSERT(r == 0); 475 476 return context; 477} 478 479 480static void server_close_cb(uv_handle_t* handle) { 481 server_context_t* context = (server_context_t*) handle->data; 482 free(context); 483} 484 485 486static void destroy_server_context(server_context_t* context) { 487 uv_close((uv_handle_t*) &context->poll_handle, server_close_cb); 488} 489 490 491static void server_poll_cb(uv_poll_t* handle, int status, int events) { 492 server_context_t* server_context = (server_context_t*) 493 handle->data; 494 connection_context_t* connection_context; 495 struct sockaddr_in addr; 496 socklen_t addr_len; 497 uv_os_sock_t sock; 498 int r; 499 500 addr_len = sizeof addr; 501 sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len); 502#ifdef _WIN32 503 ASSERT(sock != INVALID_SOCKET); 504#else 505 ASSERT(sock >= 0); 506#endif 507 508 connection_context = create_connection_context(sock, 1); 509 connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT; 510 r = uv_poll_start(&connection_context->poll_handle, 511 UV_READABLE | UV_WRITABLE | UV_DISCONNECT, 512 connection_poll_cb); 513 ASSERT(r == 0); 514 515 if (++server_context->connections == NUM_CLIENTS) { 516 close_socket(server_context->sock); 517 destroy_server_context(server_context); 518 } 519} 520 521 522static void start_server(void) { 523 server_context_t* context; 524 struct sockaddr_in addr; 525 uv_os_sock_t sock; 526 int r; 527 528 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); 529 sock = create_bound_socket(addr); 530 context = create_server_context(sock); 531 532 r = listen(sock, 100); 533 ASSERT(r == 0); 534 535 r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb); 536 ASSERT(r == 0); 537} 538 539 540static void start_client(void) { 541 uv_os_sock_t sock; 542 connection_context_t* context; 543 struct sockaddr_in server_addr; 544 struct sockaddr_in addr; 545 int r; 546 547 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr)); 548 ASSERT(0 == uv_ip4_addr("0.0.0.0", 0, &addr)); 549 550 sock = create_bound_socket(addr); 551 context = create_connection_context(sock, 0); 552 553 context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT; 554 r = uv_poll_start(&context->poll_handle, 555 UV_READABLE | UV_WRITABLE | UV_DISCONNECT, 556 connection_poll_cb); 557 ASSERT(r == 0); 558 559 r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr); 560 ASSERT(r == 0 || got_eagain()); 561} 562 563 564static void start_poll_test(void) { 565 int i, r; 566 567#ifdef _WIN32 568 { 569 struct WSAData wsa_data; 570 int r = WSAStartup(MAKEWORD(2, 2), &wsa_data); 571 ASSERT(r == 0); 572 } 573#endif 574 575 start_server(); 576 577 for (i = 0; i < NUM_CLIENTS; i++) 578 start_client(); 579 580 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); 581 ASSERT(r == 0); 582 583 /* Assert that at most five percent of the writable wakeups was spurious. */ 584 ASSERT(spurious_writable_wakeups == 0 || 585 (valid_writable_wakeups + spurious_writable_wakeups) / 586 spurious_writable_wakeups > 20); 587 588 ASSERT(closed_connections == NUM_CLIENTS * 2); 589#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__) 590 ASSERT(disconnects == NUM_CLIENTS * 2); 591#endif 592 MAKE_VALGRIND_HAPPY(); 593} 594 595 596/* Issuing a shutdown() on IBM i PASE with parameter SHUT_WR 597 * also sends a normal close sequence to the partner program. 598 * This leads to timing issues and ECONNRESET failures in the 599 * test 'poll_duplex' and 'poll_unidirectional'. 600 * 601 * https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/shutdn.htm 602 */ 603TEST_IMPL(poll_duplex) { 604#if defined(NO_SELF_CONNECT) 605 RETURN_SKIP(NO_SELF_CONNECT); 606#elif defined(__PASE__) 607 RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE"); 608#endif 609 test_mode = DUPLEX; 610 start_poll_test(); 611 return 0; 612} 613 614 615TEST_IMPL(poll_unidirectional) { 616#if defined(NO_SELF_CONNECT) 617 RETURN_SKIP(NO_SELF_CONNECT); 618#elif defined(__PASE__) 619 RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE"); 620#endif 621 test_mode = UNIDIRECTIONAL; 622 start_poll_test(); 623 return 0; 624} 625 626 627/* Windows won't let you open a directory so we open a file instead. 628 * OS X lets you poll a file so open the $PWD instead. Both fail 629 * on Linux so it doesn't matter which one we pick. Both succeed 630 * on FreeBSD, Solaris and AIX so skip the test on those platforms. 631 */ 632TEST_IMPL(poll_bad_fdtype) { 633#if !defined(__DragonFly__) && !defined(__FreeBSD__) && !defined(__sun) && \ 634 !defined(_AIX) && !defined(__MVS__) && !defined(__FreeBSD_kernel__) && \ 635 !defined(__OpenBSD__) && !defined(__CYGWIN__) && !defined(__MSYS__) && \ 636 !defined(__NetBSD__) 637 uv_poll_t poll_handle; 638 int fd; 639 640#if defined(_WIN32) 641 fd = open("test/fixtures/empty_file", O_RDONLY); 642#else 643 fd = open(".", O_RDONLY); 644#endif 645 ASSERT(fd != -1); 646 ASSERT(0 != uv_poll_init(uv_default_loop(), &poll_handle, fd)); 647 ASSERT(0 == close(fd)); 648#endif 649 650 MAKE_VALGRIND_HAPPY(); 651 return 0; 652} 653 654 655#ifdef __linux__ 656TEST_IMPL(poll_nested_epoll) { 657 uv_poll_t poll_handle; 658 int fd; 659 660 fd = epoll_create(1); 661 ASSERT(fd != -1); 662 663 ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd)); 664 ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort)); 665 ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT)); 666 667 uv_close((uv_handle_t*) &poll_handle, NULL); 668 ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT)); 669 ASSERT(0 == close(fd)); 670 671 MAKE_VALGRIND_HAPPY(); 672 return 0; 673} 674#endif /* __linux__ */ 675 676 677#ifdef UV_HAVE_KQUEUE 678TEST_IMPL(poll_nested_kqueue) { 679 uv_poll_t poll_handle; 680 int fd; 681 682 fd = kqueue(); 683 ASSERT(fd != -1); 684 685 ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd)); 686 ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort)); 687 ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT)); 688 689 uv_close((uv_handle_t*) &poll_handle, NULL); 690 ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT)); 691 ASSERT(0 == close(fd)); 692 693 MAKE_VALGRIND_HAPPY(); 694 return 0; 695} 696#endif /* UV_HAVE_KQUEUE */ 697