1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include <errno.h>
23
24#ifdef _WIN32
25# include <fcntl.h>
26#else
27# include <sys/socket.h>
28# include <unistd.h>
29#endif
30
31#include "uv.h"
32#include "task.h"
33
34#ifdef __linux__
35# include <sys/epoll.h>
36#endif
37
38#ifdef UV_HAVE_KQUEUE
39# include <sys/types.h>
40# include <sys/event.h>
41# include <sys/time.h>
42#endif
43
44
45#define NUM_CLIENTS 5
46#define TRANSFER_BYTES (1 << 16)
47
48#undef MIN
49#define MIN(a, b) (((a) < (b)) ? (a) : (b));
50
51
52typedef enum {
53  UNIDIRECTIONAL,
54  DUPLEX
55} test_mode_t;
56
57typedef struct connection_context_s {
58  uv_poll_t poll_handle;
59  uv_timer_t timer_handle;
60  uv_os_sock_t sock;
61  size_t read, sent;
62  int is_server_connection;
63  int open_handles;
64  int got_fin, sent_fin, got_disconnect;
65  unsigned int events, delayed_events;
66} connection_context_t;
67
68typedef struct server_context_s {
69  uv_poll_t poll_handle;
70  uv_os_sock_t sock;
71  int connections;
72} server_context_t;
73
74
75static void delay_timer_cb(uv_timer_t* timer);
76
77
78static test_mode_t test_mode = DUPLEX;
79
80static int closed_connections = 0;
81
82static int valid_writable_wakeups = 0;
83static int spurious_writable_wakeups = 0;
84
85#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
86static int disconnects = 0;
87#endif /* !__sun && !_AIX  && !__MVS__ */
88
89static int got_eagain(void) {
90#ifdef _WIN32
91  return WSAGetLastError() == WSAEWOULDBLOCK;
92#else
93  return errno == EAGAIN
94      || errno == EINPROGRESS
95#ifdef EWOULDBLOCK
96      || errno == EWOULDBLOCK;
97#endif
98      ;
99#endif
100}
101
102
103static uv_os_sock_t create_bound_socket (struct sockaddr_in bind_addr) {
104  uv_os_sock_t sock;
105  int r;
106
107  sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
108#ifdef _WIN32
109  ASSERT(sock != INVALID_SOCKET);
110#else
111  ASSERT(sock >= 0);
112#endif
113
114#ifndef _WIN32
115  {
116    /* Allow reuse of the port. */
117    int yes = 1;
118    r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
119    ASSERT(r == 0);
120  }
121#endif
122
123  r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr);
124  ASSERT(r == 0);
125
126  return sock;
127}
128
129
130static void close_socket(uv_os_sock_t sock) {
131  int r;
132#ifdef _WIN32
133  r = closesocket(sock);
134#else
135  r = close(sock);
136#endif
137  /* On FreeBSD close() can fail with ECONNRESET if the socket was shutdown by
138   * the peer before all pending data was delivered.
139   */
140  ASSERT(r == 0 || errno == ECONNRESET);
141}
142
143
144static connection_context_t* create_connection_context(
145    uv_os_sock_t sock, int is_server_connection) {
146  int r;
147  connection_context_t* context;
148
149  context = (connection_context_t*) malloc(sizeof *context);
150  ASSERT_NOT_NULL(context);
151
152  context->sock = sock;
153  context->is_server_connection = is_server_connection;
154  context->read = 0;
155  context->sent = 0;
156  context->open_handles = 0;
157  context->events = 0;
158  context->delayed_events = 0;
159  context->got_fin = 0;
160  context->sent_fin = 0;
161  context->got_disconnect = 0;
162
163  r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
164  context->open_handles++;
165  context->poll_handle.data = context;
166  ASSERT(r == 0);
167
168  r = uv_timer_init(uv_default_loop(), &context->timer_handle);
169  context->open_handles++;
170  context->timer_handle.data = context;
171  ASSERT(r == 0);
172
173  return context;
174}
175
176
177static void connection_close_cb(uv_handle_t* handle) {
178  connection_context_t* context = (connection_context_t*) handle->data;
179
180  if (--context->open_handles == 0) {
181    if (test_mode == DUPLEX || context->is_server_connection) {
182      ASSERT(context->read == TRANSFER_BYTES);
183    } else {
184      ASSERT(context->read == 0);
185    }
186
187    if (test_mode == DUPLEX || !context->is_server_connection) {
188      ASSERT(context->sent == TRANSFER_BYTES);
189    } else {
190      ASSERT(context->sent == 0);
191    }
192
193    closed_connections++;
194
195    free(context);
196  }
197}
198
199
200static void destroy_connection_context(connection_context_t* context) {
201  uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb);
202  uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb);
203}
204
205
206static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
207  connection_context_t* context = (connection_context_t*) handle->data;
208  unsigned int new_events;
209  int r;
210
211  ASSERT(status == 0);
212  ASSERT(events & context->events);
213  ASSERT(!(events & ~context->events));
214
215  new_events = context->events;
216
217  if (events & UV_READABLE) {
218    int action = rand() % 7;
219
220    switch (action) {
221      case 0:
222      case 1: {
223        /* Read a couple of bytes. */
224        static char buffer[74];
225
226        do
227          r = recv(context->sock, buffer, sizeof buffer, 0);
228        while (r == -1 && errno == EINTR);
229        ASSERT(r >= 0);
230
231        if (r > 0) {
232          context->read += r;
233        } else {
234          /* Got FIN. */
235          context->got_fin = 1;
236          new_events &= ~UV_READABLE;
237        }
238
239        break;
240      }
241
242      case 2:
243      case 3: {
244        /* Read until EAGAIN. */
245        static char buffer[931];
246
247        for (;;) {
248          do
249            r = recv(context->sock, buffer, sizeof buffer, 0);
250          while (r == -1 && errno == EINTR);
251
252          if (r <= 0)
253            break;
254
255          context->read += r;
256        }
257
258        if (r == 0) {
259          /* Got FIN. */
260          context->got_fin = 1;
261          new_events &= ~UV_READABLE;
262        } else {
263          ASSERT(got_eagain());
264        }
265
266        break;
267      }
268
269      case 4:
270        /* Ignore. */
271        break;
272
273      case 5:
274        /* Stop reading for a while. Restart in timer callback. */
275        new_events &= ~UV_READABLE;
276        if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
277          context->delayed_events = UV_READABLE;
278          uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0);
279        } else {
280          context->delayed_events |= UV_READABLE;
281        }
282        break;
283
284      case 6:
285        /* Fudge with the event mask. */
286        uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb);
287        uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb);
288        context->events = UV_READABLE;
289        break;
290
291      default:
292        ASSERT(0);
293    }
294  }
295
296  if (events & UV_WRITABLE) {
297    if (context->sent < TRANSFER_BYTES &&
298        !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) {
299      /* We have to send more bytes. */
300      int action = rand() % 7;
301
302      switch (action) {
303        case 0:
304        case 1: {
305          /* Send a couple of bytes. */
306          static char buffer[103];
307
308          int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
309          ASSERT(send_bytes > 0);
310
311          do
312            r = send(context->sock, buffer, send_bytes, 0);
313          while (r == -1 && errno == EINTR);
314
315          if (r < 0) {
316            ASSERT(got_eagain());
317            spurious_writable_wakeups++;
318            break;
319          }
320
321          ASSERT(r > 0);
322          context->sent += r;
323          valid_writable_wakeups++;
324          break;
325        }
326
327        case 2:
328        case 3: {
329          /* Send until EAGAIN. */
330          static char buffer[1234];
331
332          int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
333          ASSERT(send_bytes > 0);
334
335          do
336            r = send(context->sock, buffer, send_bytes, 0);
337          while (r == -1 && errno == EINTR);
338
339          if (r < 0) {
340            ASSERT(got_eagain());
341            spurious_writable_wakeups++;
342            break;
343          }
344
345          ASSERT(r > 0);
346          valid_writable_wakeups++;
347          context->sent += r;
348
349          while (context->sent < TRANSFER_BYTES) {
350            send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
351            ASSERT(send_bytes > 0);
352
353            do
354              r = send(context->sock, buffer, send_bytes, 0);
355            while (r == -1 && errno == EINTR);
356            ASSERT(r != 0);
357
358            if (r < 0) {
359              ASSERT(got_eagain());
360              break;
361            }
362
363            context->sent += r;
364          }
365          break;
366        }
367
368        case 4:
369          /* Ignore. */
370         break;
371
372        case 5:
373          /* Stop sending for a while. Restart in timer callback. */
374          new_events &= ~UV_WRITABLE;
375          if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
376            context->delayed_events = UV_WRITABLE;
377            uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0);
378          } else {
379            context->delayed_events |= UV_WRITABLE;
380          }
381          break;
382
383        case 6:
384          /* Fudge with the event mask. */
385          uv_poll_start(&context->poll_handle,
386                        UV_READABLE,
387                        connection_poll_cb);
388          uv_poll_start(&context->poll_handle,
389                        UV_WRITABLE,
390                        connection_poll_cb);
391          context->events = UV_WRITABLE;
392          break;
393
394        default:
395          ASSERT(0);
396      }
397
398    } else {
399      /* Nothing more to write. Send FIN. */
400      int r;
401#ifdef _WIN32
402      r = shutdown(context->sock, SD_SEND);
403#else
404      r = shutdown(context->sock, SHUT_WR);
405#endif
406      ASSERT(r == 0);
407      context->sent_fin = 1;
408      new_events &= ~UV_WRITABLE;
409    }
410  }
411#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
412  if (events & UV_DISCONNECT) {
413    context->got_disconnect = 1;
414    ++disconnects;
415    new_events &= ~UV_DISCONNECT;
416  }
417
418  if (context->got_fin && context->sent_fin && context->got_disconnect) {
419#else /* __sun && _AIX  && __MVS__ */
420  if (context->got_fin && context->sent_fin) {
421#endif /* !__sun && !_AIX && !__MVS__  */
422    /* Sent and received FIN. Close and destroy context. */
423    close_socket(context->sock);
424    destroy_connection_context(context);
425    context->events = 0;
426
427  } else if (new_events != context->events) {
428    /* Poll mask changed. Call uv_poll_start again. */
429    context->events = new_events;
430    uv_poll_start(handle, new_events, connection_poll_cb);
431  }
432
433  /* Assert that uv_is_active works correctly for poll handles. */
434  if (context->events != 0) {
435    ASSERT(1 == uv_is_active((uv_handle_t*) handle));
436  } else {
437    ASSERT(0 == uv_is_active((uv_handle_t*) handle));
438  }
439}
440
441
442static void delay_timer_cb(uv_timer_t* timer) {
443  connection_context_t* context = (connection_context_t*) timer->data;
444  int r;
445
446  /* Timer should auto stop. */
447  ASSERT(0 == uv_is_active((uv_handle_t*) timer));
448
449  /* Add the requested events to the poll mask. */
450  ASSERT(context->delayed_events != 0);
451  context->events |= context->delayed_events;
452  context->delayed_events = 0;
453
454  r = uv_poll_start(&context->poll_handle,
455                    context->events,
456                    connection_poll_cb);
457  ASSERT(r == 0);
458}
459
460
461static server_context_t* create_server_context(
462    uv_os_sock_t sock) {
463  int r;
464  server_context_t* context;
465
466  context = (server_context_t*) malloc(sizeof *context);
467  ASSERT_NOT_NULL(context);
468
469  context->sock = sock;
470  context->connections = 0;
471
472  r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
473  context->poll_handle.data = context;
474  ASSERT(r == 0);
475
476  return context;
477}
478
479
480static void server_close_cb(uv_handle_t* handle) {
481  server_context_t* context = (server_context_t*) handle->data;
482  free(context);
483}
484
485
486static void destroy_server_context(server_context_t* context) {
487  uv_close((uv_handle_t*) &context->poll_handle, server_close_cb);
488}
489
490
491static void server_poll_cb(uv_poll_t* handle, int status, int events) {
492  server_context_t* server_context = (server_context_t*)
493                                          handle->data;
494  connection_context_t* connection_context;
495  struct sockaddr_in addr;
496  socklen_t addr_len;
497  uv_os_sock_t sock;
498  int r;
499
500  addr_len = sizeof addr;
501  sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len);
502#ifdef _WIN32
503  ASSERT(sock != INVALID_SOCKET);
504#else
505  ASSERT(sock >= 0);
506#endif
507
508  connection_context = create_connection_context(sock, 1);
509  connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
510  r = uv_poll_start(&connection_context->poll_handle,
511                    UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
512                    connection_poll_cb);
513  ASSERT(r == 0);
514
515  if (++server_context->connections == NUM_CLIENTS) {
516    close_socket(server_context->sock);
517    destroy_server_context(server_context);
518  }
519}
520
521
522static void start_server(void) {
523  server_context_t* context;
524  struct sockaddr_in addr;
525  uv_os_sock_t sock;
526  int r;
527
528  ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
529  sock = create_bound_socket(addr);
530  context = create_server_context(sock);
531
532  r = listen(sock, 100);
533  ASSERT(r == 0);
534
535  r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb);
536  ASSERT(r == 0);
537}
538
539
540static void start_client(void) {
541  uv_os_sock_t sock;
542  connection_context_t* context;
543  struct sockaddr_in server_addr;
544  struct sockaddr_in addr;
545  int r;
546
547  ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
548  ASSERT(0 == uv_ip4_addr("0.0.0.0", 0, &addr));
549
550  sock = create_bound_socket(addr);
551  context = create_connection_context(sock, 0);
552
553  context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
554  r = uv_poll_start(&context->poll_handle,
555                    UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
556                    connection_poll_cb);
557  ASSERT(r == 0);
558
559  r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr);
560  ASSERT(r == 0 || got_eagain());
561}
562
563
564static void start_poll_test(void) {
565  int i, r;
566
567#ifdef _WIN32
568  {
569    struct WSAData wsa_data;
570    int r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
571    ASSERT(r == 0);
572  }
573#endif
574
575  start_server();
576
577  for (i = 0; i < NUM_CLIENTS; i++)
578    start_client();
579
580  r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
581  ASSERT(r == 0);
582
583  /* Assert that at most five percent of the writable wakeups was spurious. */
584  ASSERT(spurious_writable_wakeups == 0 ||
585         (valid_writable_wakeups + spurious_writable_wakeups) /
586         spurious_writable_wakeups > 20);
587
588  ASSERT(closed_connections == NUM_CLIENTS * 2);
589#if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
590  ASSERT(disconnects == NUM_CLIENTS * 2);
591#endif
592  MAKE_VALGRIND_HAPPY();
593}
594
595
596/* Issuing a shutdown() on IBM i PASE with parameter SHUT_WR
597 * also sends a normal close sequence to the partner program.
598 * This leads to timing issues and ECONNRESET failures in the
599 * test 'poll_duplex' and 'poll_unidirectional'.
600 *
601 * https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/shutdn.htm
602 */
603TEST_IMPL(poll_duplex) {
604#if defined(NO_SELF_CONNECT)
605  RETURN_SKIP(NO_SELF_CONNECT);
606#elif defined(__PASE__)
607  RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
608#endif
609  test_mode = DUPLEX;
610  start_poll_test();
611  return 0;
612}
613
614
615TEST_IMPL(poll_unidirectional) {
616#if defined(NO_SELF_CONNECT)
617  RETURN_SKIP(NO_SELF_CONNECT);
618#elif defined(__PASE__)
619  RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
620#endif
621  test_mode = UNIDIRECTIONAL;
622  start_poll_test();
623  return 0;
624}
625
626
627/* Windows won't let you open a directory so we open a file instead.
628 * OS X lets you poll a file so open the $PWD instead.  Both fail
629 * on Linux so it doesn't matter which one we pick.  Both succeed
630 * on FreeBSD, Solaris and AIX so skip the test on those platforms.
631 */
632TEST_IMPL(poll_bad_fdtype) {
633#if !defined(__DragonFly__) && !defined(__FreeBSD__) && !defined(__sun) && \
634    !defined(_AIX) && !defined(__MVS__) && !defined(__FreeBSD_kernel__) && \
635    !defined(__OpenBSD__) && !defined(__CYGWIN__) && !defined(__MSYS__) && \
636    !defined(__NetBSD__)
637  uv_poll_t poll_handle;
638  int fd;
639
640#if defined(_WIN32)
641  fd = open("test/fixtures/empty_file", O_RDONLY);
642#else
643  fd = open(".", O_RDONLY);
644#endif
645  ASSERT(fd != -1);
646  ASSERT(0 != uv_poll_init(uv_default_loop(), &poll_handle, fd));
647  ASSERT(0 == close(fd));
648#endif
649
650  MAKE_VALGRIND_HAPPY();
651  return 0;
652}
653
654
655#ifdef __linux__
656TEST_IMPL(poll_nested_epoll) {
657  uv_poll_t poll_handle;
658  int fd;
659
660  fd = epoll_create(1);
661  ASSERT(fd != -1);
662
663  ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
664  ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
665  ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
666
667  uv_close((uv_handle_t*) &poll_handle, NULL);
668  ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
669  ASSERT(0 == close(fd));
670
671  MAKE_VALGRIND_HAPPY();
672  return 0;
673}
674#endif  /* __linux__ */
675
676
677#ifdef UV_HAVE_KQUEUE
678TEST_IMPL(poll_nested_kqueue) {
679  uv_poll_t poll_handle;
680  int fd;
681
682  fd = kqueue();
683  ASSERT(fd != -1);
684
685  ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
686  ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
687  ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
688
689  uv_close((uv_handle_t*) &poll_handle, NULL);
690  ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
691  ASSERT(0 == close(fd));
692
693  MAKE_VALGRIND_HAPPY();
694  return 0;
695}
696#endif  /* UV_HAVE_KQUEUE */
697