1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "task.h"
24#include <stdio.h>
25#include <stdlib.h>
26
27typedef struct {
28  uv_write_t req;
29  uv_buf_t buf;
30} write_req_t;
31
32static uv_loop_t* loop;
33
34static int server_closed;
35static stream_type serverType;
36static uv_tcp_t tcpServer;
37static uv_udp_t udpServer;
38static uv_pipe_t pipeServer;
39static uv_handle_t* server;
40static uv_udp_send_t* send_freelist;
41
42static void after_write(uv_write_t* req, int status);
43static void after_read(uv_stream_t*, ssize_t nread, const uv_buf_t* buf);
44static void on_close(uv_handle_t* peer);
45static void on_server_close(uv_handle_t* handle);
46static void on_connection(uv_stream_t*, int status);
47
48
49static void after_write(uv_write_t* req, int status) {
50  write_req_t* wr;
51
52  /* Free the read/write buffer and the request */
53  wr = (write_req_t*) req;
54  free(wr->buf.base);
55  free(wr);
56
57  if (status == 0)
58    return;
59
60  fprintf(stderr,
61          "uv_write error: %s - %s\n",
62          uv_err_name(status),
63          uv_strerror(status));
64}
65
66
67static void after_shutdown(uv_shutdown_t* req, int status) {
68  ASSERT_EQ(status, 0);
69  uv_close((uv_handle_t*) req->handle, on_close);
70  free(req);
71}
72
73
74static void on_shutdown(uv_shutdown_t* req, int status) {
75  ASSERT_EQ(status, 0);
76  free(req);
77}
78
79
80static void after_read(uv_stream_t* handle,
81                       ssize_t nread,
82                       const uv_buf_t* buf) {
83  int i;
84  write_req_t *wr;
85  uv_shutdown_t* sreq;
86  int shutdown = 0;
87
88  if (nread < 0) {
89    /* Error or EOF */
90    ASSERT_EQ(nread, UV_EOF);
91
92    free(buf->base);
93    sreq = malloc(sizeof* sreq);
94    if (uv_is_writable(handle)) {
95      ASSERT_EQ(0, uv_shutdown(sreq, handle, after_shutdown));
96    }
97    return;
98  }
99
100  if (nread == 0) {
101    /* Everything OK, but nothing read. */
102    free(buf->base);
103    return;
104  }
105
106  /*
107   * Scan for the letter Q which signals that we should quit the server.
108   * If we get QS it means close the stream.
109   * If we get QSS it means shutdown the stream.
110   * If we get QSH it means disable linger before close the socket.
111   */
112  for (i = 0; i < nread; i++) {
113    if (buf->base[i] == 'Q') {
114      if (i + 1 < nread && buf->base[i + 1] == 'S') {
115        int reset = 0;
116        if (i + 2 < nread && buf->base[i + 2] == 'S')
117          shutdown = 1;
118        if (i + 2 < nread && buf->base[i + 2] == 'H')
119          reset = 1;
120        if (reset && handle->type == UV_TCP)
121          ASSERT_EQ(0, uv_tcp_close_reset((uv_tcp_t*) handle, on_close));
122        else if (shutdown)
123          break;
124        else
125          uv_close((uv_handle_t*) handle, on_close);
126        free(buf->base);
127        return;
128      } else if (!server_closed) {
129        uv_close(server, on_server_close);
130        server_closed = 1;
131      }
132    }
133  }
134
135  wr = (write_req_t*) malloc(sizeof *wr);
136  ASSERT_NOT_NULL(wr);
137  wr->buf = uv_buf_init(buf->base, nread);
138
139  if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) {
140    FATAL("uv_write failed");
141  }
142
143  if (shutdown)
144    ASSERT_EQ(0, uv_shutdown(malloc(sizeof* sreq), handle, on_shutdown));
145}
146
147
148static void on_close(uv_handle_t* peer) {
149  free(peer);
150}
151
152
153static void echo_alloc(uv_handle_t* handle,
154                       size_t suggested_size,
155                       uv_buf_t* buf) {
156  buf->base = malloc(suggested_size);
157  buf->len = suggested_size;
158}
159
160static void slab_alloc(uv_handle_t* handle,
161                       size_t suggested_size,
162                       uv_buf_t* buf) {
163  /* up to 16 datagrams at once */
164  static char slab[16 * 64 * 1024];
165  buf->base = slab;
166  buf->len = sizeof(slab);
167}
168
169static void on_connection(uv_stream_t* server, int status) {
170  uv_stream_t* stream;
171  int r;
172
173  if (status != 0) {
174    fprintf(stderr, "Connect error %s\n", uv_err_name(status));
175  }
176  ASSERT(status == 0);
177
178  switch (serverType) {
179  case TCP:
180    stream = malloc(sizeof(uv_tcp_t));
181    ASSERT_NOT_NULL(stream);
182    r = uv_tcp_init(loop, (uv_tcp_t*)stream);
183    ASSERT(r == 0);
184    break;
185
186  case PIPE:
187    stream = malloc(sizeof(uv_pipe_t));
188    ASSERT_NOT_NULL(stream);
189    r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
190    ASSERT(r == 0);
191    break;
192
193  default:
194    ASSERT(0 && "Bad serverType");
195    abort();
196  }
197
198  /* associate server with stream */
199  stream->data = server;
200
201  r = uv_accept(server, stream);
202  ASSERT(r == 0);
203
204  r = uv_read_start(stream, echo_alloc, after_read);
205  ASSERT(r == 0);
206}
207
208
209static void on_server_close(uv_handle_t* handle) {
210  ASSERT(handle == server);
211}
212
213static uv_udp_send_t* send_alloc(void) {
214  uv_udp_send_t* req = send_freelist;
215  if (req != NULL)
216    send_freelist = req->data;
217  else
218    req = malloc(sizeof(*req));
219  return req;
220}
221
222static void on_send(uv_udp_send_t* req, int status) {
223  ASSERT_NOT_NULL(req);
224  ASSERT(status == 0);
225  req->data = send_freelist;
226  send_freelist = req;
227}
228
229static void on_recv(uv_udp_t* handle,
230                    ssize_t nread,
231                    const uv_buf_t* rcvbuf,
232                    const struct sockaddr* addr,
233                    unsigned flags) {
234  uv_buf_t sndbuf;
235  uv_udp_send_t* req;
236
237  if (nread == 0) {
238    /* Everything OK, but nothing read. */
239    return;
240  }
241
242  ASSERT(nread > 0);
243  ASSERT(addr->sa_family == AF_INET);
244
245  req = send_alloc();
246  ASSERT_NOT_NULL(req);
247  sndbuf = uv_buf_init(rcvbuf->base, nread);
248  ASSERT(0 <= uv_udp_send(req, handle, &sndbuf, 1, addr, on_send));
249}
250
251static int tcp4_echo_start(int port) {
252  struct sockaddr_in addr;
253  int r;
254
255  ASSERT(0 == uv_ip4_addr("127.0.0.1", port, &addr));
256
257  server = (uv_handle_t*)&tcpServer;
258  serverType = TCP;
259
260  r = uv_tcp_init(loop, &tcpServer);
261  if (r) {
262    /* TODO: Error codes */
263    fprintf(stderr, "Socket creation error\n");
264    return 1;
265  }
266
267  r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr, 0);
268  if (r) {
269    /* TODO: Error codes */
270    fprintf(stderr, "Bind error\n");
271    return 1;
272  }
273
274  r = uv_listen((uv_stream_t*)&tcpServer, SOMAXCONN, on_connection);
275  if (r) {
276    /* TODO: Error codes */
277    fprintf(stderr, "Listen error %s\n", uv_err_name(r));
278    return 1;
279  }
280
281  return 0;
282}
283
284
285static int tcp6_echo_start(int port) {
286  struct sockaddr_in6 addr6;
287  int r;
288
289  ASSERT(0 == uv_ip6_addr("::1", port, &addr6));
290
291  server = (uv_handle_t*)&tcpServer;
292  serverType = TCP;
293
294  r = uv_tcp_init(loop, &tcpServer);
295  if (r) {
296    /* TODO: Error codes */
297    fprintf(stderr, "Socket creation error\n");
298    return 1;
299  }
300
301  /* IPv6 is optional as not all platforms support it */
302  r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr6, 0);
303  if (r) {
304    /* show message but return OK */
305    fprintf(stderr, "IPv6 not supported\n");
306    return 0;
307  }
308
309  r = uv_listen((uv_stream_t*)&tcpServer, SOMAXCONN, on_connection);
310  if (r) {
311    /* TODO: Error codes */
312    fprintf(stderr, "Listen error\n");
313    return 1;
314  }
315
316  return 0;
317}
318
319
320static int udp4_echo_start(int port) {
321  struct sockaddr_in addr;
322  int r;
323
324  ASSERT(0 == uv_ip4_addr("127.0.0.1", port, &addr));
325  server = (uv_handle_t*)&udpServer;
326  serverType = UDP;
327
328  r = uv_udp_init(loop, &udpServer);
329  if (r) {
330    fprintf(stderr, "uv_udp_init: %s\n", uv_strerror(r));
331    return 1;
332  }
333
334  r = uv_udp_bind(&udpServer, (const struct sockaddr*) &addr, 0);
335  if (r) {
336    fprintf(stderr, "uv_udp_bind: %s\n", uv_strerror(r));
337    return 1;
338  }
339
340  r = uv_udp_recv_start(&udpServer, slab_alloc, on_recv);
341  if (r) {
342    fprintf(stderr, "uv_udp_recv_start: %s\n", uv_strerror(r));
343    return 1;
344  }
345
346  return 0;
347}
348
349
350static int pipe_echo_start(char* pipeName) {
351  int r;
352
353#ifndef _WIN32
354  {
355    uv_fs_t req;
356    uv_fs_unlink(NULL, &req, pipeName, NULL);
357    uv_fs_req_cleanup(&req);
358  }
359#endif
360
361  server = (uv_handle_t*)&pipeServer;
362  serverType = PIPE;
363
364  r = uv_pipe_init(loop, &pipeServer, 0);
365  if (r) {
366    fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(r));
367    return 1;
368  }
369
370  r = uv_pipe_bind(&pipeServer, pipeName);
371  if (r) {
372    fprintf(stderr, "uv_pipe_bind: %s\n", uv_strerror(r));
373    return 1;
374  }
375
376  r = uv_listen((uv_stream_t*)&pipeServer, SOMAXCONN, on_connection);
377  if (r) {
378    fprintf(stderr, "uv_pipe_listen: %s\n", uv_strerror(r));
379    return 1;
380  }
381
382  return 0;
383}
384
385
386HELPER_IMPL(tcp4_echo_server) {
387  loop = uv_default_loop();
388
389  if (tcp4_echo_start(TEST_PORT))
390    return 1;
391
392  notify_parent_process();
393  uv_run(loop, UV_RUN_DEFAULT);
394  return 0;
395}
396
397
398HELPER_IMPL(tcp6_echo_server) {
399  loop = uv_default_loop();
400
401  if (tcp6_echo_start(TEST_PORT))
402    return 1;
403
404  notify_parent_process();
405  uv_run(loop, UV_RUN_DEFAULT);
406  return 0;
407}
408
409
410HELPER_IMPL(pipe_echo_server) {
411  loop = uv_default_loop();
412
413  if (pipe_echo_start(TEST_PIPENAME))
414    return 1;
415
416  notify_parent_process();
417  uv_run(loop, UV_RUN_DEFAULT);
418  return 0;
419}
420
421
422HELPER_IMPL(udp4_echo_server) {
423  loop = uv_default_loop();
424
425  if (udp4_echo_start(TEST_PORT))
426    return 1;
427
428  notify_parent_process();
429  uv_run(loop, UV_RUN_DEFAULT);
430  return 0;
431}
432