1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "internal.h"
24
25#include <assert.h>
26#include <string.h>
27#include <errno.h>
28#include <stdlib.h>
29#include <unistd.h>
30#if defined(__MVS__)
31#include <xti.h>
32#endif
33#include <sys/un.h>
34
35#if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37#endif
38
39#if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41#endif
42
43union uv__sockaddr {
44  struct sockaddr_in6 in6;
45  struct sockaddr_in in;
46  struct sockaddr addr;
47};
48
49static void uv__udp_run_completed(uv_udp_t* handle);
50static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
51static void uv__udp_recvmsg(uv_udp_t* handle);
52static void uv__udp_sendmsg(uv_udp_t* handle);
53static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
54                                       int domain,
55                                       unsigned int flags);
56
57#if HAVE_MMSG
58
59#define UV__MMSG_MAXWIDTH 20
60
61static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
62static void uv__udp_sendmmsg(uv_udp_t* handle);
63
64static int uv__recvmmsg_avail;
65static int uv__sendmmsg_avail;
66static uv_once_t once = UV_ONCE_INIT;
67
68static void uv__udp_mmsg_init(void) {
69  int ret;
70  int s;
71  s = uv__socket(AF_INET, SOCK_DGRAM, 0);
72  if (s < 0)
73    return;
74  ret = uv__sendmmsg(s, NULL, 0);
75  if (ret == 0 || errno != ENOSYS) {
76    uv__sendmmsg_avail = 1;
77    uv__recvmmsg_avail = 1;
78  } else {
79    ret = uv__recvmmsg(s, NULL, 0);
80    if (ret == 0 || errno != ENOSYS)
81      uv__recvmmsg_avail = 1;
82  }
83  uv__close(s);
84}
85
86#endif
87
88void uv__udp_close(uv_udp_t* handle) {
89  uv__io_close(handle->loop, &handle->io_watcher);
90  uv__handle_stop(handle);
91
92  if (handle->io_watcher.fd != -1) {
93    uv__close(handle->io_watcher.fd);
94    handle->io_watcher.fd = -1;
95  }
96}
97
98
99void uv__udp_finish_close(uv_udp_t* handle) {
100  uv_udp_send_t* req;
101  QUEUE* q;
102
103  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
104  assert(handle->io_watcher.fd == -1);
105
106  while (!QUEUE_EMPTY(&handle->write_queue)) {
107    q = QUEUE_HEAD(&handle->write_queue);
108    QUEUE_REMOVE(q);
109
110    req = QUEUE_DATA(q, uv_udp_send_t, queue);
111    req->status = UV_ECANCELED;
112    QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
113  }
114
115  uv__udp_run_completed(handle);
116
117  assert(handle->send_queue_size == 0);
118  assert(handle->send_queue_count == 0);
119
120  /* Now tear down the handle. */
121  handle->recv_cb = NULL;
122  handle->alloc_cb = NULL;
123  /* but _do not_ touch close_cb */
124}
125
126
127static void uv__udp_run_completed(uv_udp_t* handle) {
128  uv_udp_send_t* req;
129  QUEUE* q;
130
131  assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
132  handle->flags |= UV_HANDLE_UDP_PROCESSING;
133
134  while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
135    q = QUEUE_HEAD(&handle->write_completed_queue);
136    QUEUE_REMOVE(q);
137
138    req = QUEUE_DATA(q, uv_udp_send_t, queue);
139    uv__req_unregister(handle->loop, req);
140
141    handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
142    handle->send_queue_count--;
143
144    if (req->bufs != req->bufsml)
145      uv__free(req->bufs);
146    req->bufs = NULL;
147
148    if (req->send_cb == NULL)
149      continue;
150
151    /* req->status >= 0 == bytes written
152     * req->status <  0 == errno
153     */
154    if (req->status >= 0)
155      req->send_cb(req, 0);
156    else
157      req->send_cb(req, req->status);
158  }
159
160  if (QUEUE_EMPTY(&handle->write_queue)) {
161    /* Pending queue and completion queue empty, stop watcher. */
162    uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
163    if (!uv__io_active(&handle->io_watcher, POLLIN))
164      uv__handle_stop(handle);
165  }
166
167  handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
168}
169
170
171static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
172  uv_udp_t* handle;
173
174  handle = container_of(w, uv_udp_t, io_watcher);
175  assert(handle->type == UV_UDP);
176
177  if (revents & POLLIN)
178    uv__udp_recvmsg(handle);
179
180  if (revents & POLLOUT) {
181    uv__udp_sendmsg(handle);
182    uv__udp_run_completed(handle);
183  }
184}
185
186#if HAVE_MMSG
187static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
188  struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
189  struct iovec iov[UV__MMSG_MAXWIDTH];
190  struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
191  ssize_t nread;
192  uv_buf_t chunk_buf;
193  size_t chunks;
194  int flags;
195  size_t k;
196
197  /* prepare structures for recvmmsg */
198  chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
199  if (chunks > ARRAY_SIZE(iov))
200    chunks = ARRAY_SIZE(iov);
201  for (k = 0; k < chunks; ++k) {
202    iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
203    iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
204    memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
205    msgs[k].msg_hdr.msg_iov = iov + k;
206    msgs[k].msg_hdr.msg_iovlen = 1;
207    msgs[k].msg_hdr.msg_name = peers + k;
208    msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
209    msgs[k].msg_hdr.msg_control = NULL;
210    msgs[k].msg_hdr.msg_controllen = 0;
211    msgs[k].msg_hdr.msg_flags = 0;
212  }
213
214  do
215    nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks);
216  while (nread == -1 && errno == EINTR);
217
218  if (nread < 1) {
219    if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
220      handle->recv_cb(handle, 0, buf, NULL, 0);
221    else
222      handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
223  } else {
224    /* pass each chunk to the application */
225    for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
226      flags = UV_UDP_MMSG_CHUNK;
227      if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
228        flags |= UV_UDP_PARTIAL;
229
230      chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
231      handle->recv_cb(handle,
232                      msgs[k].msg_len,
233                      &chunk_buf,
234                      msgs[k].msg_hdr.msg_name,
235                      flags);
236    }
237
238    /* one last callback so the original buffer is freed */
239    if (handle->recv_cb != NULL)
240      handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
241  }
242  return nread;
243}
244#endif
245
246static void uv__udp_recvmsg(uv_udp_t* handle) {
247  struct sockaddr_storage peer;
248  struct msghdr h;
249  ssize_t nread;
250  uv_buf_t buf;
251  int flags;
252  int count;
253
254  assert(handle->recv_cb != NULL);
255  assert(handle->alloc_cb != NULL);
256
257  /* Prevent loop starvation when the data comes in as fast as (or faster than)
258   * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
259   */
260  count = 32;
261
262  do {
263    buf = uv_buf_init(NULL, 0);
264    handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
265    if (buf.base == NULL || buf.len == 0) {
266      handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
267      return;
268    }
269    assert(buf.base != NULL);
270
271#if HAVE_MMSG
272    if (uv_udp_using_recvmmsg(handle)) {
273      nread = uv__udp_recvmmsg(handle, &buf);
274      if (nread > 0)
275        count -= nread;
276      continue;
277    }
278#endif
279
280    memset(&h, 0, sizeof(h));
281    memset(&peer, 0, sizeof(peer));
282    h.msg_name = &peer;
283    h.msg_namelen = sizeof(peer);
284    h.msg_iov = (void*) &buf;
285    h.msg_iovlen = 1;
286
287    do {
288      nread = recvmsg(handle->io_watcher.fd, &h, 0);
289    }
290    while (nread == -1 && errno == EINTR);
291
292    if (nread == -1) {
293      if (errno == EAGAIN || errno == EWOULDBLOCK)
294        handle->recv_cb(handle, 0, &buf, NULL, 0);
295      else
296        handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
297    }
298    else {
299      flags = 0;
300      if (h.msg_flags & MSG_TRUNC)
301        flags |= UV_UDP_PARTIAL;
302
303      handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
304    }
305    count--;
306  }
307  /* recv_cb callback may decide to pause or close the handle */
308  while (nread != -1
309      && count > 0
310      && handle->io_watcher.fd != -1
311      && handle->recv_cb != NULL);
312}
313
314#if HAVE_MMSG
315static void uv__udp_sendmmsg(uv_udp_t* handle) {
316  uv_udp_send_t* req;
317  struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
318  struct uv__mmsghdr *p;
319  QUEUE* q;
320  ssize_t npkts;
321  size_t pkts;
322  size_t i;
323
324  if (QUEUE_EMPTY(&handle->write_queue))
325    return;
326
327write_queue_drain:
328  for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
329       pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
330       ++pkts, q = QUEUE_HEAD(q)) {
331    assert(q != NULL);
332    req = QUEUE_DATA(q, uv_udp_send_t, queue);
333    assert(req != NULL);
334
335    p = &h[pkts];
336    memset(p, 0, sizeof(*p));
337    if (req->addr.ss_family == AF_UNSPEC) {
338      p->msg_hdr.msg_name = NULL;
339      p->msg_hdr.msg_namelen = 0;
340    } else {
341      p->msg_hdr.msg_name = &req->addr;
342      if (req->addr.ss_family == AF_INET6)
343        p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
344      else if (req->addr.ss_family == AF_INET)
345        p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
346      else if (req->addr.ss_family == AF_UNIX)
347        p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
348      else {
349        assert(0 && "unsupported address family");
350        abort();
351      }
352    }
353    h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
354    h[pkts].msg_hdr.msg_iovlen = req->nbufs;
355  }
356
357  do
358    npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts);
359  while (npkts == -1 && errno == EINTR);
360
361  if (npkts < 1) {
362    if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
363      return;
364    for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
365         i < pkts && q != &handle->write_queue;
366         ++i, q = QUEUE_HEAD(&handle->write_queue)) {
367      assert(q != NULL);
368      req = QUEUE_DATA(q, uv_udp_send_t, queue);
369      assert(req != NULL);
370
371      req->status = UV__ERR(errno);
372      QUEUE_REMOVE(&req->queue);
373      QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
374    }
375    uv__io_feed(handle->loop, &handle->io_watcher);
376    return;
377  }
378
379  /* Safety: npkts known to be >0 below. Hence cast from ssize_t
380   * to size_t safe.
381   */
382  for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
383       i < (size_t)npkts && q != &handle->write_queue;
384       ++i, q = QUEUE_HEAD(&handle->write_queue)) {
385    assert(q != NULL);
386    req = QUEUE_DATA(q, uv_udp_send_t, queue);
387    assert(req != NULL);
388
389    req->status = req->bufs[0].len;
390
391    /* Sending a datagram is an atomic operation: either all data
392     * is written or nothing is (and EMSGSIZE is raised). That is
393     * why we don't handle partial writes. Just pop the request
394     * off the write queue and onto the completed queue, done.
395     */
396    QUEUE_REMOVE(&req->queue);
397    QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
398  }
399
400  /* couldn't batch everything, continue sending (jump to avoid stack growth) */
401  if (!QUEUE_EMPTY(&handle->write_queue))
402    goto write_queue_drain;
403  uv__io_feed(handle->loop, &handle->io_watcher);
404  return;
405}
406#endif
407
408static void uv__udp_sendmsg(uv_udp_t* handle) {
409  uv_udp_send_t* req;
410  struct msghdr h;
411  QUEUE* q;
412  ssize_t size;
413
414#if HAVE_MMSG
415  uv_once(&once, uv__udp_mmsg_init);
416  if (uv__sendmmsg_avail) {
417    uv__udp_sendmmsg(handle);
418    return;
419  }
420#endif
421
422  while (!QUEUE_EMPTY(&handle->write_queue)) {
423    q = QUEUE_HEAD(&handle->write_queue);
424    assert(q != NULL);
425
426    req = QUEUE_DATA(q, uv_udp_send_t, queue);
427    assert(req != NULL);
428
429    memset(&h, 0, sizeof h);
430    if (req->addr.ss_family == AF_UNSPEC) {
431      h.msg_name = NULL;
432      h.msg_namelen = 0;
433    } else {
434      h.msg_name = &req->addr;
435      if (req->addr.ss_family == AF_INET6)
436        h.msg_namelen = sizeof(struct sockaddr_in6);
437      else if (req->addr.ss_family == AF_INET)
438        h.msg_namelen = sizeof(struct sockaddr_in);
439      else if (req->addr.ss_family == AF_UNIX)
440        h.msg_namelen = sizeof(struct sockaddr_un);
441      else {
442        assert(0 && "unsupported address family");
443        abort();
444      }
445    }
446    h.msg_iov = (struct iovec*) req->bufs;
447    h.msg_iovlen = req->nbufs;
448
449    do {
450      size = sendmsg(handle->io_watcher.fd, &h, 0);
451    } while (size == -1 && errno == EINTR);
452
453    if (size == -1) {
454      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
455        break;
456    }
457
458    req->status = (size == -1 ? UV__ERR(errno) : size);
459
460    /* Sending a datagram is an atomic operation: either all data
461     * is written or nothing is (and EMSGSIZE is raised). That is
462     * why we don't handle partial writes. Just pop the request
463     * off the write queue and onto the completed queue, done.
464     */
465    QUEUE_REMOVE(&req->queue);
466    QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
467    uv__io_feed(handle->loop, &handle->io_watcher);
468  }
469}
470
471/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
472 * refinements for programs that use multicast.
473 *
474 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
475 * are different from the BSDs: it _shares_ the port rather than steal it
476 * from the current listener.  While useful, it's not something we can emulate
477 * on other platforms so we don't enable it.
478 *
479 * zOS does not support getsockname with SO_REUSEPORT option when using
480 * AF_UNIX.
481 */
482static int uv__set_reuse(int fd) {
483  int yes;
484  yes = 1;
485
486#if defined(SO_REUSEPORT) && defined(__MVS__)
487  struct sockaddr_in sockfd;
488  unsigned int sockfd_len = sizeof(sockfd);
489  if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
490      return UV__ERR(errno);
491  if (sockfd.sin_family == AF_UNIX) {
492    if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
493      return UV__ERR(errno);
494  } else {
495    if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
496       return UV__ERR(errno);
497  }
498#elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__)
499  if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
500    return UV__ERR(errno);
501#else
502  if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
503    return UV__ERR(errno);
504#endif
505
506  return 0;
507}
508
509/*
510 * The Linux kernel suppresses some ICMP error messages by default for UDP
511 * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
512 * error reporting, hopefully resulting in faster failover to working name
513 * servers.
514 */
515static int uv__set_recverr(int fd, sa_family_t ss_family) {
516#if defined(__linux__)
517  int yes;
518
519  yes = 1;
520  if (ss_family == AF_INET) {
521    if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
522      return UV__ERR(errno);
523  } else if (ss_family == AF_INET6) {
524    if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
525       return UV__ERR(errno);
526  }
527#endif
528  return 0;
529}
530
531
532int uv__udp_bind(uv_udp_t* handle,
533                 const struct sockaddr* addr,
534                 unsigned int addrlen,
535                 unsigned int flags) {
536  int err;
537  int yes;
538  int fd;
539
540  /* Check for bad flags. */
541  if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | UV_UDP_LINUX_RECVERR))
542    return UV_EINVAL;
543
544  /* Cannot set IPv6-only mode on non-IPv6 socket. */
545  if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
546    return UV_EINVAL;
547
548  fd = handle->io_watcher.fd;
549  if (fd == -1) {
550    err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
551    if (err < 0)
552      return err;
553    fd = err;
554    handle->io_watcher.fd = fd;
555  }
556
557  if (flags & UV_UDP_LINUX_RECVERR) {
558    err = uv__set_recverr(fd, addr->sa_family);
559    if (err)
560      return err;
561  }
562
563  if (flags & UV_UDP_REUSEADDR) {
564    err = uv__set_reuse(fd);
565    if (err)
566      return err;
567  }
568
569  if (flags & UV_UDP_IPV6ONLY) {
570#ifdef IPV6_V6ONLY
571    yes = 1;
572    if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
573      err = UV__ERR(errno);
574      return err;
575    }
576#else
577    err = UV_ENOTSUP;
578    return err;
579#endif
580  }
581
582  if (bind(fd, addr, addrlen)) {
583    err = UV__ERR(errno);
584    if (errno == EAFNOSUPPORT)
585      /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
586       * socket created with AF_INET to an AF_INET6 address or vice versa. */
587      err = UV_EINVAL;
588    return err;
589  }
590
591  if (addr->sa_family == AF_INET6)
592    handle->flags |= UV_HANDLE_IPV6;
593
594  handle->flags |= UV_HANDLE_BOUND;
595  return 0;
596}
597
598
599static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
600                                       int domain,
601                                       unsigned int flags) {
602  union uv__sockaddr taddr;
603  socklen_t addrlen;
604
605  if (handle->io_watcher.fd != -1)
606    return 0;
607
608  switch (domain) {
609  case AF_INET:
610  {
611    struct sockaddr_in* addr = &taddr.in;
612    memset(addr, 0, sizeof *addr);
613    addr->sin_family = AF_INET;
614    addr->sin_addr.s_addr = INADDR_ANY;
615    addrlen = sizeof *addr;
616    break;
617  }
618  case AF_INET6:
619  {
620    struct sockaddr_in6* addr = &taddr.in6;
621    memset(addr, 0, sizeof *addr);
622    addr->sin6_family = AF_INET6;
623    addr->sin6_addr = in6addr_any;
624    addrlen = sizeof *addr;
625    break;
626  }
627  default:
628    assert(0 && "unsupported address family");
629    abort();
630  }
631
632  return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
633}
634
635
636int uv__udp_connect(uv_udp_t* handle,
637                    const struct sockaddr* addr,
638                    unsigned int addrlen) {
639  int err;
640
641  err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
642  if (err)
643    return err;
644
645  do {
646    errno = 0;
647    err = connect(handle->io_watcher.fd, addr, addrlen);
648  } while (err == -1 && errno == EINTR);
649
650  if (err)
651    return UV__ERR(errno);
652
653  handle->flags |= UV_HANDLE_UDP_CONNECTED;
654
655  return 0;
656}
657
658/* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
659 * Any of uv supported UNIXs kernel should be standardized, but the kernel
660 * implementation logic not same, let's use pseudocode to explain the udp
661 * disconnect behaviors:
662 *
663 * Predefined stubs for pseudocode:
664 *   1. sodisconnect: The function to perform the real udp disconnect
665 *   2. pru_connect: The function to perform the real udp connect
666 *   3. so: The kernel object match with socket fd
667 *   4. addr: The sockaddr parameter from user space
668 *
669 * BSDs:
670 *   if(sodisconnect(so) == 0) { // udp disconnect succeed
671 *     if (addr->sa_len != so->addr->sa_len) return EINVAL;
672 *     if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
673 *     pru_connect(so);
674 *   }
675 *   else return EISCONN;
676 *
677 * z/OS (same with Windows):
678 *   if(addr->sa_len < so->addr->sa_len) return EINVAL;
679 *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
680 *
681 * AIX:
682 *   if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
683 *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
684 *
685 * Linux,Others:
686 *   if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
687 *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
688 */
689int uv__udp_disconnect(uv_udp_t* handle) {
690    int r;
691#if defined(__MVS__)
692    struct sockaddr_storage addr;
693#else
694    struct sockaddr addr;
695#endif
696
697    memset(&addr, 0, sizeof(addr));
698
699#if defined(__MVS__)
700    addr.ss_family = AF_UNSPEC;
701#else
702    addr.sa_family = AF_UNSPEC;
703#endif
704
705    do {
706      errno = 0;
707#ifdef __PASE__
708      /* On IBMi a connectionless transport socket can be disconnected by
709       * either setting the addr parameter to NULL or setting the
710       * addr_length parameter to zero, and issuing another connect().
711       * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
712       */
713      r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
714#else
715      r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
716#endif
717    } while (r == -1 && errno == EINTR);
718
719    if (r == -1) {
720#if defined(BSD)  /* The macro BSD is from sys/param.h */
721      if (errno != EAFNOSUPPORT && errno != EINVAL)
722        return UV__ERR(errno);
723#else
724      return UV__ERR(errno);
725#endif
726    }
727
728    handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
729    return 0;
730}
731
732int uv__udp_send(uv_udp_send_t* req,
733                 uv_udp_t* handle,
734                 const uv_buf_t bufs[],
735                 unsigned int nbufs,
736                 const struct sockaddr* addr,
737                 unsigned int addrlen,
738                 uv_udp_send_cb send_cb) {
739  int err;
740  int empty_queue;
741
742  assert(nbufs > 0);
743
744  if (addr) {
745    err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
746    if (err)
747      return err;
748  }
749
750  /* It's legal for send_queue_count > 0 even when the write_queue is empty;
751   * it means there are error-state requests in the write_completed_queue that
752   * will touch up send_queue_size/count later.
753   */
754  empty_queue = (handle->send_queue_count == 0);
755
756  uv__req_init(handle->loop, req, UV_UDP_SEND);
757  assert(addrlen <= sizeof(req->addr));
758  if (addr == NULL)
759    req->addr.ss_family = AF_UNSPEC;
760  else
761    memcpy(&req->addr, addr, addrlen);
762  req->send_cb = send_cb;
763  req->handle = handle;
764  req->nbufs = nbufs;
765
766  req->bufs = req->bufsml;
767  if (nbufs > ARRAY_SIZE(req->bufsml))
768    req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
769
770  if (req->bufs == NULL) {
771    uv__req_unregister(handle->loop, req);
772    return UV_ENOMEM;
773  }
774
775  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
776  handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
777  handle->send_queue_count++;
778  QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
779  uv__handle_start(handle);
780
781  if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
782    uv__udp_sendmsg(handle);
783
784    /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
785     * away. In such cases the `io_watcher` has to be queued for asynchronous
786     * write.
787     */
788    if (!QUEUE_EMPTY(&handle->write_queue))
789      uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
790  } else {
791    uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
792  }
793
794  return 0;
795}
796
797
798int uv__udp_try_send(uv_udp_t* handle,
799                     const uv_buf_t bufs[],
800                     unsigned int nbufs,
801                     const struct sockaddr* addr,
802                     unsigned int addrlen) {
803  int err;
804  struct msghdr h;
805  ssize_t size;
806
807  assert(nbufs > 0);
808
809  /* already sending a message */
810  if (handle->send_queue_count != 0)
811    return UV_EAGAIN;
812
813  if (addr) {
814    err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
815    if (err)
816      return err;
817  } else {
818    assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
819  }
820
821  memset(&h, 0, sizeof h);
822  h.msg_name = (struct sockaddr*) addr;
823  h.msg_namelen = addrlen;
824  h.msg_iov = (struct iovec*) bufs;
825  h.msg_iovlen = nbufs;
826
827  do {
828    size = sendmsg(handle->io_watcher.fd, &h, 0);
829  } while (size == -1 && errno == EINTR);
830
831  if (size == -1) {
832    if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
833      return UV_EAGAIN;
834    else
835      return UV__ERR(errno);
836  }
837
838  return size;
839}
840
841
842static int uv__udp_set_membership4(uv_udp_t* handle,
843                                   const struct sockaddr_in* multicast_addr,
844                                   const char* interface_addr,
845                                   uv_membership membership) {
846  struct ip_mreq mreq;
847  int optname;
848  int err;
849
850  memset(&mreq, 0, sizeof mreq);
851
852  if (interface_addr) {
853    err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
854    if (err)
855      return err;
856  } else {
857    mreq.imr_interface.s_addr = htonl(INADDR_ANY);
858  }
859
860  mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
861
862  switch (membership) {
863  case UV_JOIN_GROUP:
864    optname = IP_ADD_MEMBERSHIP;
865    break;
866  case UV_LEAVE_GROUP:
867    optname = IP_DROP_MEMBERSHIP;
868    break;
869  default:
870    return UV_EINVAL;
871  }
872
873  if (setsockopt(handle->io_watcher.fd,
874                 IPPROTO_IP,
875                 optname,
876                 &mreq,
877                 sizeof(mreq))) {
878#if defined(__MVS__)
879  if (errno == ENXIO)
880    return UV_ENODEV;
881#endif
882    return UV__ERR(errno);
883  }
884
885  return 0;
886}
887
888
889static int uv__udp_set_membership6(uv_udp_t* handle,
890                                   const struct sockaddr_in6* multicast_addr,
891                                   const char* interface_addr,
892                                   uv_membership membership) {
893  int optname;
894  struct ipv6_mreq mreq;
895  struct sockaddr_in6 addr6;
896
897  memset(&mreq, 0, sizeof mreq);
898
899  if (interface_addr) {
900    if (uv_ip6_addr(interface_addr, 0, &addr6))
901      return UV_EINVAL;
902    mreq.ipv6mr_interface = addr6.sin6_scope_id;
903  } else {
904    mreq.ipv6mr_interface = 0;
905  }
906
907  mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
908
909  switch (membership) {
910  case UV_JOIN_GROUP:
911    optname = IPV6_ADD_MEMBERSHIP;
912    break;
913  case UV_LEAVE_GROUP:
914    optname = IPV6_DROP_MEMBERSHIP;
915    break;
916  default:
917    return UV_EINVAL;
918  }
919
920  if (setsockopt(handle->io_watcher.fd,
921                 IPPROTO_IPV6,
922                 optname,
923                 &mreq,
924                 sizeof(mreq))) {
925#if defined(__MVS__)
926  if (errno == ENXIO)
927    return UV_ENODEV;
928#endif
929    return UV__ERR(errno);
930  }
931
932  return 0;
933}
934
935
936#if !defined(__OpenBSD__) &&                                        \
937    !defined(__NetBSD__) &&                                         \
938    !defined(__ANDROID__) &&                                        \
939    !defined(__DragonFly__) &&                                      \
940    !defined(__QNX__) &&                                            \
941    !defined(__GNU__)
942static int uv__udp_set_source_membership4(uv_udp_t* handle,
943                                          const struct sockaddr_in* multicast_addr,
944                                          const char* interface_addr,
945                                          const struct sockaddr_in* source_addr,
946                                          uv_membership membership) {
947  struct ip_mreq_source mreq;
948  int optname;
949  int err;
950
951  err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
952  if (err)
953    return err;
954
955  memset(&mreq, 0, sizeof(mreq));
956
957  if (interface_addr != NULL) {
958    err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
959    if (err)
960      return err;
961  } else {
962    mreq.imr_interface.s_addr = htonl(INADDR_ANY);
963  }
964
965  mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
966  mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
967
968  if (membership == UV_JOIN_GROUP)
969    optname = IP_ADD_SOURCE_MEMBERSHIP;
970  else if (membership == UV_LEAVE_GROUP)
971    optname = IP_DROP_SOURCE_MEMBERSHIP;
972  else
973    return UV_EINVAL;
974
975  if (setsockopt(handle->io_watcher.fd,
976                 IPPROTO_IP,
977                 optname,
978                 &mreq,
979                 sizeof(mreq))) {
980    return UV__ERR(errno);
981  }
982
983  return 0;
984}
985
986
987static int uv__udp_set_source_membership6(uv_udp_t* handle,
988                                          const struct sockaddr_in6* multicast_addr,
989                                          const char* interface_addr,
990                                          const struct sockaddr_in6* source_addr,
991                                          uv_membership membership) {
992  struct group_source_req mreq;
993  struct sockaddr_in6 addr6;
994  int optname;
995  int err;
996
997  err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
998  if (err)
999    return err;
1000
1001  memset(&mreq, 0, sizeof(mreq));
1002
1003  if (interface_addr != NULL) {
1004    err = uv_ip6_addr(interface_addr, 0, &addr6);
1005    if (err)
1006      return err;
1007    mreq.gsr_interface = addr6.sin6_scope_id;
1008  } else {
1009    mreq.gsr_interface = 0;
1010  }
1011
1012  STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
1013  STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
1014  memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
1015  memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
1016
1017  if (membership == UV_JOIN_GROUP)
1018    optname = MCAST_JOIN_SOURCE_GROUP;
1019  else if (membership == UV_LEAVE_GROUP)
1020    optname = MCAST_LEAVE_SOURCE_GROUP;
1021  else
1022    return UV_EINVAL;
1023
1024  if (setsockopt(handle->io_watcher.fd,
1025                 IPPROTO_IPV6,
1026                 optname,
1027                 &mreq,
1028                 sizeof(mreq))) {
1029    return UV__ERR(errno);
1030  }
1031
1032  return 0;
1033}
1034#endif
1035
1036
1037int uv__udp_init_ex(uv_loop_t* loop,
1038                    uv_udp_t* handle,
1039                    unsigned flags,
1040                    int domain) {
1041  int fd;
1042
1043  fd = -1;
1044  if (domain != AF_UNSPEC) {
1045    fd = uv__socket(domain, SOCK_DGRAM, 0);
1046    if (fd < 0)
1047      return fd;
1048  }
1049
1050  uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
1051  handle->alloc_cb = NULL;
1052  handle->recv_cb = NULL;
1053  handle->send_queue_size = 0;
1054  handle->send_queue_count = 0;
1055  uv__io_init(&handle->io_watcher, uv__udp_io, fd);
1056  QUEUE_INIT(&handle->write_queue);
1057  QUEUE_INIT(&handle->write_completed_queue);
1058
1059  return 0;
1060}
1061
1062
1063int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
1064#if HAVE_MMSG
1065  if (handle->flags & UV_HANDLE_UDP_RECVMMSG) {
1066    uv_once(&once, uv__udp_mmsg_init);
1067    return uv__recvmmsg_avail;
1068  }
1069#endif
1070  return 0;
1071}
1072
1073
1074int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
1075  int err;
1076
1077  /* Check for already active socket. */
1078  if (handle->io_watcher.fd != -1)
1079    return UV_EBUSY;
1080
1081  if (uv__fd_exists(handle->loop, sock))
1082    return UV_EEXIST;
1083
1084  err = uv__nonblock(sock, 1);
1085  if (err)
1086    return err;
1087
1088  err = uv__set_reuse(sock);
1089  if (err)
1090    return err;
1091
1092  handle->io_watcher.fd = sock;
1093  if (uv__udp_is_connected(handle))
1094    handle->flags |= UV_HANDLE_UDP_CONNECTED;
1095
1096  return 0;
1097}
1098
1099
1100int uv_udp_set_membership(uv_udp_t* handle,
1101                          const char* multicast_addr,
1102                          const char* interface_addr,
1103                          uv_membership membership) {
1104  int err;
1105  struct sockaddr_in addr4;
1106  struct sockaddr_in6 addr6;
1107
1108  if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
1109    err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
1110    if (err)
1111      return err;
1112    return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
1113  } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
1114    err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
1115    if (err)
1116      return err;
1117    return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
1118  } else {
1119    return UV_EINVAL;
1120  }
1121}
1122
1123
1124int uv_udp_set_source_membership(uv_udp_t* handle,
1125                                 const char* multicast_addr,
1126                                 const char* interface_addr,
1127                                 const char* source_addr,
1128                                 uv_membership membership) {
1129#if !defined(__OpenBSD__) &&                                        \
1130    !defined(__NetBSD__) &&                                         \
1131    !defined(__ANDROID__) &&                                        \
1132    !defined(__DragonFly__) &&                                      \
1133    !defined(__QNX__) &&                                            \
1134    !defined(__GNU__)
1135  int err;
1136  union uv__sockaddr mcast_addr;
1137  union uv__sockaddr src_addr;
1138
1139  err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
1140  if (err) {
1141    err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
1142    if (err)
1143      return err;
1144    err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
1145    if (err)
1146      return err;
1147    return uv__udp_set_source_membership6(handle,
1148                                          &mcast_addr.in6,
1149                                          interface_addr,
1150                                          &src_addr.in6,
1151                                          membership);
1152  }
1153
1154  err = uv_ip4_addr(source_addr, 0, &src_addr.in);
1155  if (err)
1156    return err;
1157  return uv__udp_set_source_membership4(handle,
1158                                        &mcast_addr.in,
1159                                        interface_addr,
1160                                        &src_addr.in,
1161                                        membership);
1162#else
1163  return UV_ENOSYS;
1164#endif
1165}
1166
1167
1168static int uv__setsockopt(uv_udp_t* handle,
1169                         int option4,
1170                         int option6,
1171                         const void* val,
1172                         socklen_t size) {
1173  int r;
1174
1175  if (handle->flags & UV_HANDLE_IPV6)
1176    r = setsockopt(handle->io_watcher.fd,
1177                   IPPROTO_IPV6,
1178                   option6,
1179                   val,
1180                   size);
1181  else
1182    r = setsockopt(handle->io_watcher.fd,
1183                   IPPROTO_IP,
1184                   option4,
1185                   val,
1186                   size);
1187  if (r)
1188    return UV__ERR(errno);
1189
1190  return 0;
1191}
1192
1193static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1194                                     int option4,
1195                                     int option6,
1196                                     int val) {
1197#if defined(__sun) || defined(_AIX) || defined(__MVS__)
1198  char arg = val;
1199#elif defined(__OpenBSD__)
1200  unsigned char arg = val;
1201#else
1202  int arg = val;
1203#endif
1204
1205  if (val < 0 || val > 255)
1206    return UV_EINVAL;
1207
1208  return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1209}
1210
1211
1212int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1213  if (setsockopt(handle->io_watcher.fd,
1214                 SOL_SOCKET,
1215                 SO_BROADCAST,
1216                 &on,
1217                 sizeof(on))) {
1218    return UV__ERR(errno);
1219  }
1220
1221  return 0;
1222}
1223
1224
1225int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1226  if (ttl < 1 || ttl > 255)
1227    return UV_EINVAL;
1228
1229#if defined(__MVS__)
1230  if (!(handle->flags & UV_HANDLE_IPV6))
1231    return UV_ENOTSUP;  /* zOS does not support setting ttl for IPv4 */
1232#endif
1233
1234/*
1235 * On Solaris and derivatives such as SmartOS, the length of socket options
1236 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1237 * so hardcode the size of these options on this platform,
1238 * and use the general uv__setsockopt_maybe_char call on other platforms.
1239 */
1240#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1241    defined(__MVS__) || defined(__QNX__)
1242
1243  return uv__setsockopt(handle,
1244                        IP_TTL,
1245                        IPV6_UNICAST_HOPS,
1246                        &ttl,
1247                        sizeof(ttl));
1248
1249#else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1250           defined(__MVS__) || defined(__QNX__)) */
1251
1252  return uv__setsockopt_maybe_char(handle,
1253                                   IP_TTL,
1254                                   IPV6_UNICAST_HOPS,
1255                                   ttl);
1256
1257#endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1258          defined(__MVS__) || defined(__QNX__) */
1259}
1260
1261
1262int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1263/*
1264 * On Solaris and derivatives such as SmartOS, the length of socket options
1265 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1266 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1267 * and use the general uv__setsockopt_maybe_char call otherwise.
1268 */
1269#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1270    defined(__MVS__) || defined(__QNX__)
1271  if (handle->flags & UV_HANDLE_IPV6)
1272    return uv__setsockopt(handle,
1273                          IP_MULTICAST_TTL,
1274                          IPV6_MULTICAST_HOPS,
1275                          &ttl,
1276                          sizeof(ttl));
1277#endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1278    defined(__MVS__) || defined(__QNX__) */
1279
1280  return uv__setsockopt_maybe_char(handle,
1281                                   IP_MULTICAST_TTL,
1282                                   IPV6_MULTICAST_HOPS,
1283                                   ttl);
1284}
1285
1286
1287int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1288/*
1289 * On Solaris and derivatives such as SmartOS, the length of socket options
1290 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1291 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1292 * and use the general uv__setsockopt_maybe_char call otherwise.
1293 */
1294#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1295    defined(__MVS__) || defined(__QNX__)
1296  if (handle->flags & UV_HANDLE_IPV6)
1297    return uv__setsockopt(handle,
1298                          IP_MULTICAST_LOOP,
1299                          IPV6_MULTICAST_LOOP,
1300                          &on,
1301                          sizeof(on));
1302#endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1303    defined(__MVS__) || defined(__QNX__) */
1304
1305  return uv__setsockopt_maybe_char(handle,
1306                                   IP_MULTICAST_LOOP,
1307                                   IPV6_MULTICAST_LOOP,
1308                                   on);
1309}
1310
1311int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1312  struct sockaddr_storage addr_st;
1313  struct sockaddr_in* addr4;
1314  struct sockaddr_in6* addr6;
1315
1316  addr4 = (struct sockaddr_in*) &addr_st;
1317  addr6 = (struct sockaddr_in6*) &addr_st;
1318
1319  if (!interface_addr) {
1320    memset(&addr_st, 0, sizeof addr_st);
1321    if (handle->flags & UV_HANDLE_IPV6) {
1322      addr_st.ss_family = AF_INET6;
1323      addr6->sin6_scope_id = 0;
1324    } else {
1325      addr_st.ss_family = AF_INET;
1326      addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1327    }
1328  } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1329    /* nothing, address was parsed */
1330  } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1331    /* nothing, address was parsed */
1332  } else {
1333    return UV_EINVAL;
1334  }
1335
1336  if (addr_st.ss_family == AF_INET) {
1337    if (setsockopt(handle->io_watcher.fd,
1338                   IPPROTO_IP,
1339                   IP_MULTICAST_IF,
1340                   (void*) &addr4->sin_addr,
1341                   sizeof(addr4->sin_addr)) == -1) {
1342      return UV__ERR(errno);
1343    }
1344  } else if (addr_st.ss_family == AF_INET6) {
1345    if (setsockopt(handle->io_watcher.fd,
1346                   IPPROTO_IPV6,
1347                   IPV6_MULTICAST_IF,
1348                   &addr6->sin6_scope_id,
1349                   sizeof(addr6->sin6_scope_id)) == -1) {
1350      return UV__ERR(errno);
1351    }
1352  } else {
1353    assert(0 && "unexpected address family");
1354    abort();
1355  }
1356
1357  return 0;
1358}
1359
1360int uv_udp_getpeername(const uv_udp_t* handle,
1361                       struct sockaddr* name,
1362                       int* namelen) {
1363
1364  return uv__getsockpeername((const uv_handle_t*) handle,
1365                             getpeername,
1366                             name,
1367                             namelen);
1368}
1369
1370int uv_udp_getsockname(const uv_udp_t* handle,
1371                       struct sockaddr* name,
1372                       int* namelen) {
1373
1374  return uv__getsockpeername((const uv_handle_t*) handle,
1375                             getsockname,
1376                             name,
1377                             namelen);
1378}
1379
1380
1381int uv__udp_recv_start(uv_udp_t* handle,
1382                       uv_alloc_cb alloc_cb,
1383                       uv_udp_recv_cb recv_cb) {
1384  int err;
1385
1386  if (alloc_cb == NULL || recv_cb == NULL)
1387    return UV_EINVAL;
1388
1389  if (uv__io_active(&handle->io_watcher, POLLIN))
1390    return UV_EALREADY;  /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1391
1392  err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1393  if (err)
1394    return err;
1395
1396  handle->alloc_cb = alloc_cb;
1397  handle->recv_cb = recv_cb;
1398
1399  uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1400  uv__handle_start(handle);
1401
1402  return 0;
1403}
1404
1405
1406int uv__udp_recv_stop(uv_udp_t* handle) {
1407  uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1408
1409  if (!uv__io_active(&handle->io_watcher, POLLOUT))
1410    uv__handle_stop(handle);
1411
1412  handle->alloc_cb = NULL;
1413  handle->recv_cb = NULL;
1414
1415  return 0;
1416}
1417