1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22#include "uv.h" 23#include "task.h" 24#include <stdio.h> 25#include <stdlib.h> 26 27typedef struct { 28 uv_write_t req; 29 uv_buf_t buf; 30} write_req_t; 31 32static uv_loop_t* loop; 33 34static int server_closed; 35static stream_type serverType; 36static uv_tcp_t tcpServer; 37static uv_udp_t udpServer; 38static uv_pipe_t pipeServer; 39static uv_handle_t* server; 40static uv_udp_send_t* send_freelist; 41 42static void after_write(uv_write_t* req, int status); 43static void after_read(uv_stream_t*, ssize_t nread, const uv_buf_t* buf); 44static void on_close(uv_handle_t* peer); 45static void on_server_close(uv_handle_t* handle); 46static void on_connection(uv_stream_t*, int status); 47 48 49static void after_write(uv_write_t* req, int status) { 50 write_req_t* wr; 51 52 /* Free the read/write buffer and the request */ 53 wr = (write_req_t*) req; 54 free(wr->buf.base); 55 free(wr); 56 57 if (status == 0) 58 return; 59 60 fprintf(stderr, 61 "uv_write error: %s - %s\n", 62 uv_err_name(status), 63 uv_strerror(status)); 64} 65 66 67static void after_shutdown(uv_shutdown_t* req, int status) { 68 ASSERT_EQ(status, 0); 69 uv_close((uv_handle_t*) req->handle, on_close); 70 free(req); 71} 72 73 74static void on_shutdown(uv_shutdown_t* req, int status) { 75 ASSERT_EQ(status, 0); 76 free(req); 77} 78 79 80static void after_read(uv_stream_t* handle, 81 ssize_t nread, 82 const uv_buf_t* buf) { 83 int i; 84 write_req_t *wr; 85 uv_shutdown_t* sreq; 86 int shutdown = 0; 87 88 if (nread < 0) { 89 /* Error or EOF */ 90 ASSERT_EQ(nread, UV_EOF); 91 92 free(buf->base); 93 sreq = malloc(sizeof* sreq); 94 if (uv_is_writable(handle)) { 95 ASSERT_EQ(0, uv_shutdown(sreq, handle, after_shutdown)); 96 } 97 return; 98 } 99 100 if (nread == 0) { 101 /* Everything OK, but nothing read. */ 102 free(buf->base); 103 return; 104 } 105 106 /* 107 * Scan for the letter Q which signals that we should quit the server. 108 * If we get QS it means close the stream. 109 * If we get QSS it means shutdown the stream. 110 * If we get QSH it means disable linger before close the socket. 111 */ 112 for (i = 0; i < nread; i++) { 113 if (buf->base[i] == 'Q') { 114 if (i + 1 < nread && buf->base[i + 1] == 'S') { 115 int reset = 0; 116 if (i + 2 < nread && buf->base[i + 2] == 'S') 117 shutdown = 1; 118 if (i + 2 < nread && buf->base[i + 2] == 'H') 119 reset = 1; 120 if (reset && handle->type == UV_TCP) 121 ASSERT_EQ(0, uv_tcp_close_reset((uv_tcp_t*) handle, on_close)); 122 else if (shutdown) 123 break; 124 else 125 uv_close((uv_handle_t*) handle, on_close); 126 free(buf->base); 127 return; 128 } else if (!server_closed) { 129 uv_close(server, on_server_close); 130 server_closed = 1; 131 } 132 } 133 } 134 135 wr = (write_req_t*) malloc(sizeof *wr); 136 ASSERT_NOT_NULL(wr); 137 wr->buf = uv_buf_init(buf->base, nread); 138 139 if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) { 140 FATAL("uv_write failed"); 141 } 142 143 if (shutdown) 144 ASSERT_EQ(0, uv_shutdown(malloc(sizeof* sreq), handle, on_shutdown)); 145} 146 147 148static void on_close(uv_handle_t* peer) { 149 free(peer); 150} 151 152 153static void echo_alloc(uv_handle_t* handle, 154 size_t suggested_size, 155 uv_buf_t* buf) { 156 buf->base = malloc(suggested_size); 157 buf->len = suggested_size; 158} 159 160static void slab_alloc(uv_handle_t* handle, 161 size_t suggested_size, 162 uv_buf_t* buf) { 163 /* up to 16 datagrams at once */ 164 static char slab[16 * 64 * 1024]; 165 buf->base = slab; 166 buf->len = sizeof(slab); 167} 168 169static void on_connection(uv_stream_t* server, int status) { 170 uv_stream_t* stream; 171 int r; 172 173 if (status != 0) { 174 fprintf(stderr, "Connect error %s\n", uv_err_name(status)); 175 } 176 ASSERT(status == 0); 177 178 switch (serverType) { 179 case TCP: 180 stream = malloc(sizeof(uv_tcp_t)); 181 ASSERT_NOT_NULL(stream); 182 r = uv_tcp_init(loop, (uv_tcp_t*)stream); 183 ASSERT(r == 0); 184 break; 185 186 case PIPE: 187 stream = malloc(sizeof(uv_pipe_t)); 188 ASSERT_NOT_NULL(stream); 189 r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); 190 ASSERT(r == 0); 191 break; 192 193 default: 194 ASSERT(0 && "Bad serverType"); 195 abort(); 196 } 197 198 /* associate server with stream */ 199 stream->data = server; 200 201 r = uv_accept(server, stream); 202 ASSERT(r == 0); 203 204 r = uv_read_start(stream, echo_alloc, after_read); 205 ASSERT(r == 0); 206} 207 208 209static void on_server_close(uv_handle_t* handle) { 210 ASSERT(handle == server); 211} 212 213static uv_udp_send_t* send_alloc(void) { 214 uv_udp_send_t* req = send_freelist; 215 if (req != NULL) 216 send_freelist = req->data; 217 else 218 req = malloc(sizeof(*req)); 219 return req; 220} 221 222static void on_send(uv_udp_send_t* req, int status) { 223 ASSERT_NOT_NULL(req); 224 ASSERT(status == 0); 225 req->data = send_freelist; 226 send_freelist = req; 227} 228 229static void on_recv(uv_udp_t* handle, 230 ssize_t nread, 231 const uv_buf_t* rcvbuf, 232 const struct sockaddr* addr, 233 unsigned flags) { 234 uv_buf_t sndbuf; 235 uv_udp_send_t* req; 236 237 if (nread == 0) { 238 /* Everything OK, but nothing read. */ 239 return; 240 } 241 242 ASSERT(nread > 0); 243 ASSERT(addr->sa_family == AF_INET); 244 245 req = send_alloc(); 246 ASSERT_NOT_NULL(req); 247 sndbuf = uv_buf_init(rcvbuf->base, nread); 248 ASSERT(0 <= uv_udp_send(req, handle, &sndbuf, 1, addr, on_send)); 249} 250 251static int tcp4_echo_start(int port) { 252 struct sockaddr_in addr; 253 int r; 254 255 ASSERT(0 == uv_ip4_addr("127.0.0.1", port, &addr)); 256 257 server = (uv_handle_t*)&tcpServer; 258 serverType = TCP; 259 260 r = uv_tcp_init(loop, &tcpServer); 261 if (r) { 262 /* TODO: Error codes */ 263 fprintf(stderr, "Socket creation error\n"); 264 return 1; 265 } 266 267 r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr, 0); 268 if (r) { 269 /* TODO: Error codes */ 270 fprintf(stderr, "Bind error\n"); 271 return 1; 272 } 273 274 r = uv_listen((uv_stream_t*)&tcpServer, SOMAXCONN, on_connection); 275 if (r) { 276 /* TODO: Error codes */ 277 fprintf(stderr, "Listen error %s\n", uv_err_name(r)); 278 return 1; 279 } 280 281 return 0; 282} 283 284 285static int tcp6_echo_start(int port) { 286 struct sockaddr_in6 addr6; 287 int r; 288 289 ASSERT(0 == uv_ip6_addr("::1", port, &addr6)); 290 291 server = (uv_handle_t*)&tcpServer; 292 serverType = TCP; 293 294 r = uv_tcp_init(loop, &tcpServer); 295 if (r) { 296 /* TODO: Error codes */ 297 fprintf(stderr, "Socket creation error\n"); 298 return 1; 299 } 300 301 /* IPv6 is optional as not all platforms support it */ 302 r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr6, 0); 303 if (r) { 304 /* show message but return OK */ 305 fprintf(stderr, "IPv6 not supported\n"); 306 return 0; 307 } 308 309 r = uv_listen((uv_stream_t*)&tcpServer, SOMAXCONN, on_connection); 310 if (r) { 311 /* TODO: Error codes */ 312 fprintf(stderr, "Listen error\n"); 313 return 1; 314 } 315 316 return 0; 317} 318 319 320static int udp4_echo_start(int port) { 321 struct sockaddr_in addr; 322 int r; 323 324 ASSERT(0 == uv_ip4_addr("127.0.0.1", port, &addr)); 325 server = (uv_handle_t*)&udpServer; 326 serverType = UDP; 327 328 r = uv_udp_init(loop, &udpServer); 329 if (r) { 330 fprintf(stderr, "uv_udp_init: %s\n", uv_strerror(r)); 331 return 1; 332 } 333 334 r = uv_udp_bind(&udpServer, (const struct sockaddr*) &addr, 0); 335 if (r) { 336 fprintf(stderr, "uv_udp_bind: %s\n", uv_strerror(r)); 337 return 1; 338 } 339 340 r = uv_udp_recv_start(&udpServer, slab_alloc, on_recv); 341 if (r) { 342 fprintf(stderr, "uv_udp_recv_start: %s\n", uv_strerror(r)); 343 return 1; 344 } 345 346 return 0; 347} 348 349 350static int pipe_echo_start(char* pipeName) { 351 int r; 352 353#ifndef _WIN32 354 { 355 uv_fs_t req; 356 uv_fs_unlink(NULL, &req, pipeName, NULL); 357 uv_fs_req_cleanup(&req); 358 } 359#endif 360 361 server = (uv_handle_t*)&pipeServer; 362 serverType = PIPE; 363 364 r = uv_pipe_init(loop, &pipeServer, 0); 365 if (r) { 366 fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(r)); 367 return 1; 368 } 369 370 r = uv_pipe_bind(&pipeServer, pipeName); 371 if (r) { 372 fprintf(stderr, "uv_pipe_bind: %s\n", uv_strerror(r)); 373 return 1; 374 } 375 376 r = uv_listen((uv_stream_t*)&pipeServer, SOMAXCONN, on_connection); 377 if (r) { 378 fprintf(stderr, "uv_pipe_listen: %s\n", uv_strerror(r)); 379 return 1; 380 } 381 382 return 0; 383} 384 385 386HELPER_IMPL(tcp4_echo_server) { 387 loop = uv_default_loop(); 388 389 if (tcp4_echo_start(TEST_PORT)) 390 return 1; 391 392 notify_parent_process(); 393 uv_run(loop, UV_RUN_DEFAULT); 394 return 0; 395} 396 397 398HELPER_IMPL(tcp6_echo_server) { 399 loop = uv_default_loop(); 400 401 if (tcp6_echo_start(TEST_PORT)) 402 return 1; 403 404 notify_parent_process(); 405 uv_run(loop, UV_RUN_DEFAULT); 406 return 0; 407} 408 409 410HELPER_IMPL(pipe_echo_server) { 411 loop = uv_default_loop(); 412 413 if (pipe_echo_start(TEST_PIPENAME)) 414 return 1; 415 416 notify_parent_process(); 417 uv_run(loop, UV_RUN_DEFAULT); 418 return 0; 419} 420 421 422HELPER_IMPL(udp4_echo_server) { 423 loop = uv_default_loop(); 424 425 if (udp4_echo_start(TEST_PORT)) 426 return 1; 427 428 notify_parent_process(); 429 uv_run(loop, UV_RUN_DEFAULT); 430 return 0; 431} 432