split out unix's udp source
This commit is contained in:
parent
cec94ee075
commit
58461d5ae7
@ -85,8 +85,8 @@ endif
|
||||
RUNNER_LIBS=
|
||||
RUNNER_SRC=test/runner-unix.c
|
||||
|
||||
uv.a: src/uv-unix.o src/unix/fs.o src/uv-common.o src/uv-platform.o src/unix/ev/ev.o src/unix/uv-eio.o src/unix/eio/eio.o $(CARES_OBJS)
|
||||
$(AR) rcs uv.a src/uv-unix.o src/unix/fs.o src/uv-platform.o src/uv-common.o src/unix/uv-eio.o src/unix/ev/ev.o \
|
||||
uv.a: src/uv-unix.o src/unix/fs.o src/unix/udp.o src/uv-common.o src/uv-platform.o src/unix/ev/ev.o src/unix/uv-eio.o src/unix/eio/eio.o $(CARES_OBJS)
|
||||
$(AR) rcs uv.a src/uv-unix.o src/unix/fs.o src/unix/udp.o src/uv-platform.o src/uv-common.o src/unix/uv-eio.o src/unix/ev/ev.o \
|
||||
src/unix/eio/eio.o $(CARES_OBJS)
|
||||
|
||||
src/uv-platform.o: src/unix/$(UV_OS_FILE) include/uv.h include/uv-private/uv-unix.h
|
||||
@ -98,6 +98,9 @@ src/uv-unix.o: src/uv-unix.c include/uv.h include/uv-private/uv-unix.h src/unix/
|
||||
src/unix/fs.o: src/unix/fs.c include/uv.h include/uv-private/uv-unix.h src/unix/internal.h
|
||||
$(CC) $(CSTDFLAG) $(CPPFLAGS) -Isrc/ $(CFLAGS) -c src/unix/fs.c -o src/unix/fs.o
|
||||
|
||||
src/unix/udp.o: src/unix/udp.c include/uv.h include/uv-private/uv-unix.h src/unix/internal.h
|
||||
$(CC) $(CSTDFLAG) $(CPPFLAGS) -Isrc/ $(CFLAGS) -c src/unix/udp.c -o src/unix/udp.o
|
||||
|
||||
src/uv-common.o: src/uv-common.c include/uv.h include/uv-private/uv-unix.h
|
||||
$(CC) $(CSTDFLAG) $(CPPFLAGS) $(CFLAGS) -c src/uv-common.c -o src/uv-common.o
|
||||
|
||||
|
||||
@ -28,7 +28,14 @@
|
||||
int uv__close(int fd);
|
||||
void uv__req_init(uv_req_t*);
|
||||
uv_err_t uv_err_new(uv_loop_t* loop, int sys_error);
|
||||
uv_err_t uv_err_new_artificial(uv_loop_t* loop, int code);
|
||||
int uv__nonblock(int fd, int set) __attribute__((unused));
|
||||
int uv__cloexec(int fd, int set) __attribute__((unused));
|
||||
int uv__socket(int domain, int type, int protocol);
|
||||
void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle, uv_handle_type type);
|
||||
|
||||
/* udp */
|
||||
void uv__udp_destroy(uv_udp_t* handle);
|
||||
void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w);
|
||||
|
||||
#endif /* UV_UNIX_INTERNAL_H_ */
|
||||
|
||||
524
src/unix/udp.c
Normal file
524
src/unix/udp.c
Normal file
@ -0,0 +1,524 @@
|
||||
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "uv.h"
|
||||
#include "internal.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
|
||||
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w);
|
||||
static void uv__udp_run_completed(uv_udp_t* handle);
|
||||
static void uv__udp_run_pending(uv_udp_t* handle);
|
||||
static void uv__udp_recvmsg(uv_udp_t* handle);
|
||||
static void uv__udp_sendmsg(uv_udp_t* handle);
|
||||
static void uv__udp_io(EV_P_ ev_io* w, int events);
|
||||
static int uv__udp_bind(uv_udp_t* handle, int domain, struct sockaddr* addr,
|
||||
socklen_t len, unsigned flags);
|
||||
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
|
||||
static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[],
|
||||
int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb);
|
||||
|
||||
|
||||
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) {
|
||||
int flags;
|
||||
|
||||
assert(w == &handle->read_watcher
|
||||
|| w == &handle->write_watcher);
|
||||
|
||||
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
|
||||
|
||||
w->data = handle;
|
||||
ev_set_cb(w, uv__udp_io);
|
||||
ev_io_set(w, handle->fd, flags);
|
||||
ev_io_start(handle->loop->ev, w);
|
||||
}
|
||||
|
||||
|
||||
void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) {
|
||||
int flags;
|
||||
|
||||
assert(w == &handle->read_watcher
|
||||
|| w == &handle->write_watcher);
|
||||
|
||||
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
|
||||
|
||||
ev_io_stop(handle->loop->ev, w);
|
||||
ev_io_set(w, -1, flags);
|
||||
ev_set_cb(w, NULL);
|
||||
w->data = (void*)0xDEADBABE;
|
||||
}
|
||||
|
||||
|
||||
void uv__udp_destroy(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
ngx_queue_t* q;
|
||||
|
||||
uv__udp_run_completed(handle);
|
||||
|
||||
while (!ngx_queue_empty(&handle->write_queue)) {
|
||||
q = ngx_queue_head(&handle->write_queue);
|
||||
ngx_queue_remove(q);
|
||||
|
||||
req = ngx_queue_data(q, uv_udp_send_t, queue);
|
||||
if (req->send_cb) {
|
||||
/* FIXME proper error code like UV_EABORTED */
|
||||
uv_err_new_artificial(handle->loop, UV_EINTR);
|
||||
req->send_cb(req, -1);
|
||||
}
|
||||
}
|
||||
|
||||
/* Now tear down the handle. */
|
||||
handle->flags = 0;
|
||||
handle->recv_cb = NULL;
|
||||
handle->alloc_cb = NULL;
|
||||
/* but _do not_ touch close_cb */
|
||||
|
||||
if (handle->fd != -1) {
|
||||
uv__close(handle->fd);
|
||||
handle->fd = -1;
|
||||
}
|
||||
|
||||
uv__udp_watcher_stop(handle, &handle->read_watcher);
|
||||
uv__udp_watcher_stop(handle, &handle->write_watcher);
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_run_pending(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
ngx_queue_t* q;
|
||||
struct msghdr h;
|
||||
ssize_t size;
|
||||
|
||||
while (!ngx_queue_empty(&handle->write_queue)) {
|
||||
q = ngx_queue_head(&handle->write_queue);
|
||||
assert(q != NULL);
|
||||
|
||||
req = ngx_queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
memset(&h, 0, sizeof h);
|
||||
h.msg_name = &req->addr;
|
||||
h.msg_namelen = req->addrlen;
|
||||
h.msg_iov = (struct iovec*)req->bufs;
|
||||
h.msg_iovlen = req->bufcnt;
|
||||
|
||||
do {
|
||||
size = sendmsg(handle->fd, &h, 0);
|
||||
}
|
||||
while (size == -1 && errno == EINTR);
|
||||
|
||||
/* TODO try to write once or twice more in the
|
||||
* hope that the socket becomes readable again?
|
||||
*/
|
||||
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||
break;
|
||||
|
||||
req->status = (size == -1 ? -errno : size);
|
||||
|
||||
#ifndef NDEBUG
|
||||
/* Sanity check. */
|
||||
if (size != -1) {
|
||||
ssize_t nbytes;
|
||||
int i;
|
||||
|
||||
for (nbytes = i = 0; i < req->bufcnt; i++)
|
||||
nbytes += req->bufs[i].len;
|
||||
|
||||
assert(size == nbytes);
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Sending a datagram is an atomic operation: either all data
|
||||
* is written or nothing is (and EMSGSIZE is raised). That is
|
||||
* why we don't handle partial writes. Just pop the request
|
||||
* off the write queue and onto the completed queue, done.
|
||||
*/
|
||||
ngx_queue_remove(&req->queue);
|
||||
ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
ngx_queue_t* q;
|
||||
|
||||
while (!ngx_queue_empty(&handle->write_completed_queue)) {
|
||||
q = ngx_queue_head(&handle->write_completed_queue);
|
||||
assert(q != NULL);
|
||||
|
||||
ngx_queue_remove(q);
|
||||
|
||||
req = ngx_queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
if (req->bufs != req->bufsml)
|
||||
free(req->bufs);
|
||||
|
||||
if (req->send_cb == NULL)
|
||||
continue;
|
||||
|
||||
/* req->status >= 0 == bytes written
|
||||
* req->status < 0 == errno
|
||||
*/
|
||||
if (req->status >= 0) {
|
||||
req->send_cb(req, 0);
|
||||
}
|
||||
else {
|
||||
uv_err_new(handle->loop, -req->status);
|
||||
req->send_cb(req, -1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_recvmsg(uv_udp_t* handle) {
|
||||
struct sockaddr_storage peer;
|
||||
struct msghdr h;
|
||||
ssize_t nread;
|
||||
uv_buf_t buf;
|
||||
int flags;
|
||||
|
||||
assert(handle->recv_cb != NULL);
|
||||
assert(handle->alloc_cb != NULL);
|
||||
|
||||
do {
|
||||
/* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */
|
||||
buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024);
|
||||
assert(buf.len > 0);
|
||||
assert(buf.base != NULL);
|
||||
|
||||
memset(&h, 0, sizeof h);
|
||||
h.msg_name = &peer;
|
||||
h.msg_namelen = sizeof peer;
|
||||
h.msg_iov = (struct iovec*)&buf;
|
||||
h.msg_iovlen = 1;
|
||||
|
||||
do {
|
||||
nread = recvmsg(handle->fd, &h, 0);
|
||||
}
|
||||
while (nread == -1 && errno == EINTR);
|
||||
|
||||
if (nread == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
uv_err_new(handle->loop, EAGAIN);
|
||||
handle->recv_cb(handle, 0, buf, NULL, 0);
|
||||
}
|
||||
else {
|
||||
uv_err_new(handle->loop, errno);
|
||||
handle->recv_cb(handle, -1, buf, NULL, 0);
|
||||
}
|
||||
}
|
||||
else {
|
||||
flags = 0;
|
||||
|
||||
if (h.msg_flags & MSG_TRUNC)
|
||||
flags |= UV_UDP_PARTIAL;
|
||||
|
||||
handle->recv_cb(handle,
|
||||
nread,
|
||||
buf,
|
||||
(struct sockaddr*)&peer,
|
||||
flags);
|
||||
}
|
||||
}
|
||||
/* recv_cb callback may decide to pause or close the handle */
|
||||
while (nread != -1
|
||||
&& handle->fd != -1
|
||||
&& handle->recv_cb != NULL);
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_sendmsg(uv_udp_t* handle) {
|
||||
assert(!ngx_queue_empty(&handle->write_queue)
|
||||
|| !ngx_queue_empty(&handle->write_completed_queue));
|
||||
|
||||
/* Write out pending data first. */
|
||||
uv__udp_run_pending(handle);
|
||||
|
||||
/* Drain 'request completed' queue. */
|
||||
uv__udp_run_completed(handle);
|
||||
|
||||
if (!ngx_queue_empty(&handle->write_completed_queue)) {
|
||||
/* Schedule completion callbacks. */
|
||||
ev_feed_event(handle->loop->ev, &handle->write_watcher, EV_WRITE);
|
||||
}
|
||||
else if (ngx_queue_empty(&handle->write_queue)) {
|
||||
/* Pending queue and completion queue empty, stop watcher. */
|
||||
uv__udp_watcher_stop(handle, &handle->write_watcher);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_io(EV_P_ ev_io* w, int events) {
|
||||
uv_udp_t* handle;
|
||||
|
||||
handle = w->data;
|
||||
assert(handle != NULL);
|
||||
assert(handle->type == UV_UDP);
|
||||
assert(handle->fd >= 0);
|
||||
assert(!(events & ~(EV_READ|EV_WRITE)));
|
||||
|
||||
if (events & EV_READ)
|
||||
uv__udp_recvmsg(handle);
|
||||
|
||||
if (events & EV_WRITE)
|
||||
uv__udp_sendmsg(handle);
|
||||
}
|
||||
|
||||
|
||||
static int uv__udp_bind(uv_udp_t* handle,
|
||||
int domain,
|
||||
struct sockaddr* addr,
|
||||
socklen_t len,
|
||||
unsigned flags) {
|
||||
int saved_errno;
|
||||
int status;
|
||||
int yes;
|
||||
int fd;
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
/* Check for bad flags. */
|
||||
if (flags & ~UV_UDP_IPV6ONLY) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Cannot set IPv6-only mode on non-IPv6 socket. */
|
||||
if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Check for already active socket. */
|
||||
if (handle->fd != -1) {
|
||||
uv_err_new_artificial(handle->loop, UV_EALREADY);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) {
|
||||
uv_err_new(handle->loop, errno);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (flags & UV_UDP_IPV6ONLY) {
|
||||
#ifdef IPV6_V6ONLY
|
||||
yes = 1;
|
||||
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
|
||||
uv_err_new(handle->loop, errno);
|
||||
goto out;
|
||||
}
|
||||
#else
|
||||
uv_err_new((uv_handle_t*)handle, ENOTSUP);
|
||||
goto out;
|
||||
#endif
|
||||
}
|
||||
|
||||
if (bind(fd, addr, len) == -1) {
|
||||
uv_err_new(handle->loop, errno);
|
||||
goto out;
|
||||
}
|
||||
|
||||
handle->fd = fd;
|
||||
status = 0;
|
||||
|
||||
out:
|
||||
if (status)
|
||||
uv__close(fd);
|
||||
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
|
||||
struct sockaddr_storage taddr;
|
||||
socklen_t addrlen;
|
||||
|
||||
assert(domain == AF_INET || domain == AF_INET6);
|
||||
|
||||
if (handle->fd != -1)
|
||||
return 0;
|
||||
|
||||
switch (domain) {
|
||||
case AF_INET:
|
||||
{
|
||||
struct sockaddr_in* addr = (void*)&taddr;
|
||||
memset(addr, 0, sizeof *addr);
|
||||
addr->sin_family = AF_INET;
|
||||
addr->sin_addr.s_addr = INADDR_ANY;
|
||||
addrlen = sizeof *addr;
|
||||
break;
|
||||
}
|
||||
case AF_INET6:
|
||||
{
|
||||
struct sockaddr_in6* addr = (void*)&taddr;
|
||||
memset(addr, 0, sizeof *addr);
|
||||
addr->sin6_family = AF_INET6;
|
||||
addr->sin6_addr = in6addr_any;
|
||||
addrlen = sizeof *addr;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(0 && "unsupported address family");
|
||||
abort();
|
||||
}
|
||||
|
||||
return uv__udp_bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0);
|
||||
}
|
||||
|
||||
|
||||
static int uv__udp_send(uv_udp_send_t* req,
|
||||
uv_udp_t* handle,
|
||||
uv_buf_t bufs[],
|
||||
int bufcnt,
|
||||
struct sockaddr* addr,
|
||||
socklen_t addrlen,
|
||||
uv_udp_send_cb send_cb) {
|
||||
if (uv__udp_maybe_deferred_bind(handle, addr->sa_family))
|
||||
return -1;
|
||||
|
||||
/* Don't use uv__req_init(), it zeroes the data field. */
|
||||
handle->loop->counters.req_init++;
|
||||
|
||||
memcpy(&req->addr, addr, addrlen);
|
||||
req->addrlen = addrlen;
|
||||
req->send_cb = send_cb;
|
||||
req->handle = handle;
|
||||
req->bufcnt = bufcnt;
|
||||
req->type = UV_UDP_SEND;
|
||||
|
||||
if (bufcnt <= UV_REQ_BUFSML_SIZE) {
|
||||
req->bufs = req->bufsml;
|
||||
}
|
||||
else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) {
|
||||
uv_err_new(handle->loop, ENOMEM);
|
||||
return -1;
|
||||
}
|
||||
memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
|
||||
|
||||
ngx_queue_insert_tail(&handle->write_queue, &req->queue);
|
||||
uv__udp_watcher_start(handle, &handle->write_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
|
||||
memset(handle, 0, sizeof *handle);
|
||||
|
||||
uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
|
||||
loop->counters.udp_init++;
|
||||
|
||||
handle->fd = -1;
|
||||
ngx_queue_init(&handle->write_queue);
|
||||
ngx_queue_init(&handle->write_completed_queue);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) {
|
||||
return uv__udp_bind(handle,
|
||||
AF_INET,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
flags);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) {
|
||||
return uv__udp_bind(handle,
|
||||
AF_INET6,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
flags);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_send(uv_udp_send_t* req,
|
||||
uv_udp_t* handle,
|
||||
uv_buf_t bufs[],
|
||||
int bufcnt,
|
||||
struct sockaddr_in addr,
|
||||
uv_udp_send_cb send_cb) {
|
||||
return uv__udp_send(req,
|
||||
handle,
|
||||
bufs,
|
||||
bufcnt,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
send_cb);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_send6(uv_udp_send_t* req,
|
||||
uv_udp_t* handle,
|
||||
uv_buf_t bufs[],
|
||||
int bufcnt,
|
||||
struct sockaddr_in6 addr,
|
||||
uv_udp_send_cb send_cb) {
|
||||
return uv__udp_send(req,
|
||||
handle,
|
||||
bufs,
|
||||
bufcnt,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
send_cb);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_recv_start(uv_udp_t* handle,
|
||||
uv_alloc_cb alloc_cb,
|
||||
uv_udp_recv_cb recv_cb) {
|
||||
if (alloc_cb == NULL || recv_cb == NULL) {
|
||||
uv_err_new_artificial(handle->loop, UV_EINVAL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ev_is_active(&handle->read_watcher)) {
|
||||
uv_err_new_artificial(handle->loop, UV_EALREADY);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (uv__udp_maybe_deferred_bind(handle, AF_INET))
|
||||
return -1;
|
||||
|
||||
handle->alloc_cb = alloc_cb;
|
||||
handle->recv_cb = recv_cb;
|
||||
uv__udp_watcher_start(handle, &handle->read_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_recv_stop(uv_udp_t* handle) {
|
||||
uv__udp_watcher_stop(handle, &handle->read_watcher);
|
||||
handle->alloc_cb = NULL;
|
||||
handle->recv_cb = NULL;
|
||||
return 0;
|
||||
}
|
||||
511
src/uv-unix.c
511
src/uv-unix.c
@ -103,34 +103,12 @@ static void uv__stream_connect(uv_stream_t*);
|
||||
static void uv__stream_io(EV_P_ ev_io* watcher, int revents);
|
||||
static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
|
||||
|
||||
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w);
|
||||
static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w);
|
||||
static void uv__udp_run_completed(uv_udp_t* handle);
|
||||
static void uv__udp_run_pending(uv_udp_t* handle);
|
||||
static void uv__udp_destroy(uv_udp_t* handle);
|
||||
static void uv__udp_recvmsg(uv_udp_t* handle);
|
||||
static void uv__udp_sendmsg(uv_udp_t* handle);
|
||||
static void uv__udp_io(EV_P_ ev_io* w, int events);
|
||||
static int uv__udp_bind(uv_udp_t* handle,
|
||||
int domain,
|
||||
struct sockaddr* addr,
|
||||
socklen_t len,
|
||||
unsigned flags);
|
||||
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
|
||||
static int uv__udp_send(uv_udp_send_t* req,
|
||||
uv_udp_t* handle,
|
||||
uv_buf_t bufs[],
|
||||
int bufcnt,
|
||||
struct sockaddr* addr,
|
||||
socklen_t addrlen,
|
||||
uv_udp_send_cb send_cb);
|
||||
|
||||
#ifndef __GNUC__
|
||||
#define __attribute__(a)
|
||||
#endif
|
||||
|
||||
|
||||
static int uv__socket(int domain, int type, int protocol);
|
||||
static int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
|
||||
|
||||
size_t uv__strlcpy(char* dst, const char* src, size_t size);
|
||||
@ -211,7 +189,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
|
||||
}
|
||||
|
||||
|
||||
static uv_err_t uv_err_new_artificial(uv_loop_t* loop, int code) {
|
||||
uv_err_t uv_err_new_artificial(uv_loop_t* loop, int code) {
|
||||
uv_err_t err;
|
||||
err.sys_errno_ = 0;
|
||||
err.code = code;
|
||||
@ -340,7 +318,7 @@ int uv_run(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
|
||||
static void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
|
||||
void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
|
||||
uv_handle_type type) {
|
||||
loop->counters.handle_init++;
|
||||
|
||||
@ -356,489 +334,6 @@ static void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) {
|
||||
int flags;
|
||||
|
||||
assert(w == &handle->read_watcher
|
||||
|| w == &handle->write_watcher);
|
||||
|
||||
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
|
||||
|
||||
w->data = handle;
|
||||
ev_set_cb(w, uv__udp_io);
|
||||
ev_io_set(w, handle->fd, flags);
|
||||
ev_io_start(handle->loop->ev, w);
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) {
|
||||
int flags;
|
||||
|
||||
assert(w == &handle->read_watcher
|
||||
|| w == &handle->write_watcher);
|
||||
|
||||
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
|
||||
|
||||
ev_io_stop(handle->loop->ev, w);
|
||||
ev_io_set(w, -1, flags);
|
||||
ev_set_cb(w, NULL);
|
||||
w->data = (void*)0xDEADBABE;
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_destroy(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
ngx_queue_t* q;
|
||||
|
||||
uv__udp_run_completed(handle);
|
||||
|
||||
while (!ngx_queue_empty(&handle->write_queue)) {
|
||||
q = ngx_queue_head(&handle->write_queue);
|
||||
ngx_queue_remove(q);
|
||||
|
||||
req = ngx_queue_data(q, uv_udp_send_t, queue);
|
||||
if (req->send_cb) {
|
||||
/* FIXME proper error code like UV_EABORTED */
|
||||
uv_err_new_artificial(handle->loop, UV_EINTR);
|
||||
req->send_cb(req, -1);
|
||||
}
|
||||
}
|
||||
|
||||
/* Now tear down the handle. */
|
||||
handle->flags = 0;
|
||||
handle->recv_cb = NULL;
|
||||
handle->alloc_cb = NULL;
|
||||
/* but _do not_ touch close_cb */
|
||||
|
||||
if (handle->fd != -1) {
|
||||
uv__close(handle->fd);
|
||||
handle->fd = -1;
|
||||
}
|
||||
|
||||
uv__udp_watcher_stop(handle, &handle->read_watcher);
|
||||
uv__udp_watcher_stop(handle, &handle->write_watcher);
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_run_pending(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
ngx_queue_t* q;
|
||||
struct msghdr h;
|
||||
ssize_t size;
|
||||
|
||||
while (!ngx_queue_empty(&handle->write_queue)) {
|
||||
q = ngx_queue_head(&handle->write_queue);
|
||||
assert(q != NULL);
|
||||
|
||||
req = ngx_queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
memset(&h, 0, sizeof h);
|
||||
h.msg_name = &req->addr;
|
||||
h.msg_namelen = req->addrlen;
|
||||
h.msg_iov = (struct iovec*)req->bufs;
|
||||
h.msg_iovlen = req->bufcnt;
|
||||
|
||||
do {
|
||||
size = sendmsg(handle->fd, &h, 0);
|
||||
}
|
||||
while (size == -1 && errno == EINTR);
|
||||
|
||||
/* TODO try to write once or twice more in the
|
||||
* hope that the socket becomes readable again?
|
||||
*/
|
||||
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||
break;
|
||||
|
||||
req->status = (size == -1 ? -errno : size);
|
||||
|
||||
#ifndef NDEBUG
|
||||
/* Sanity check. */
|
||||
if (size != -1) {
|
||||
ssize_t nbytes;
|
||||
int i;
|
||||
|
||||
for (nbytes = i = 0; i < req->bufcnt; i++)
|
||||
nbytes += req->bufs[i].len;
|
||||
|
||||
assert(size == nbytes);
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Sending a datagram is an atomic operation: either all data
|
||||
* is written or nothing is (and EMSGSIZE is raised). That is
|
||||
* why we don't handle partial writes. Just pop the request
|
||||
* off the write queue and onto the completed queue, done.
|
||||
*/
|
||||
ngx_queue_remove(&req->queue);
|
||||
ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
ngx_queue_t* q;
|
||||
|
||||
while (!ngx_queue_empty(&handle->write_completed_queue)) {
|
||||
q = ngx_queue_head(&handle->write_completed_queue);
|
||||
assert(q != NULL);
|
||||
|
||||
ngx_queue_remove(q);
|
||||
|
||||
req = ngx_queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
if (req->bufs != req->bufsml)
|
||||
free(req->bufs);
|
||||
|
||||
if (req->send_cb == NULL)
|
||||
continue;
|
||||
|
||||
/* req->status >= 0 == bytes written
|
||||
* req->status < 0 == errno
|
||||
*/
|
||||
if (req->status >= 0) {
|
||||
req->send_cb(req, 0);
|
||||
}
|
||||
else {
|
||||
uv_err_new(handle->loop, -req->status);
|
||||
req->send_cb(req, -1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_recvmsg(uv_udp_t* handle) {
|
||||
struct sockaddr_storage peer;
|
||||
struct msghdr h;
|
||||
ssize_t nread;
|
||||
uv_buf_t buf;
|
||||
int flags;
|
||||
|
||||
assert(handle->recv_cb != NULL);
|
||||
assert(handle->alloc_cb != NULL);
|
||||
|
||||
do {
|
||||
/* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */
|
||||
buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024);
|
||||
assert(buf.len > 0);
|
||||
assert(buf.base != NULL);
|
||||
|
||||
memset(&h, 0, sizeof h);
|
||||
h.msg_name = &peer;
|
||||
h.msg_namelen = sizeof peer;
|
||||
h.msg_iov = (struct iovec*)&buf;
|
||||
h.msg_iovlen = 1;
|
||||
|
||||
do {
|
||||
nread = recvmsg(handle->fd, &h, 0);
|
||||
}
|
||||
while (nread == -1 && errno == EINTR);
|
||||
|
||||
if (nread == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
uv_err_new(handle->loop, EAGAIN);
|
||||
handle->recv_cb(handle, 0, buf, NULL, 0);
|
||||
}
|
||||
else {
|
||||
uv_err_new(handle->loop, errno);
|
||||
handle->recv_cb(handle, -1, buf, NULL, 0);
|
||||
}
|
||||
}
|
||||
else {
|
||||
flags = 0;
|
||||
|
||||
if (h.msg_flags & MSG_TRUNC)
|
||||
flags |= UV_UDP_PARTIAL;
|
||||
|
||||
handle->recv_cb(handle,
|
||||
nread,
|
||||
buf,
|
||||
(struct sockaddr*)&peer,
|
||||
flags);
|
||||
}
|
||||
}
|
||||
/* recv_cb callback may decide to pause or close the handle */
|
||||
while (nread != -1
|
||||
&& handle->fd != -1
|
||||
&& handle->recv_cb != NULL);
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_sendmsg(uv_udp_t* handle) {
|
||||
assert(!ngx_queue_empty(&handle->write_queue)
|
||||
|| !ngx_queue_empty(&handle->write_completed_queue));
|
||||
|
||||
/* Write out pending data first. */
|
||||
uv__udp_run_pending(handle);
|
||||
|
||||
/* Drain 'request completed' queue. */
|
||||
uv__udp_run_completed(handle);
|
||||
|
||||
if (!ngx_queue_empty(&handle->write_completed_queue)) {
|
||||
/* Schedule completion callbacks. */
|
||||
ev_feed_event(handle->loop->ev, &handle->write_watcher, EV_WRITE);
|
||||
}
|
||||
else if (ngx_queue_empty(&handle->write_queue)) {
|
||||
/* Pending queue and completion queue empty, stop watcher. */
|
||||
uv__udp_watcher_stop(handle, &handle->write_watcher);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_io(EV_P_ ev_io* w, int events) {
|
||||
uv_udp_t* handle;
|
||||
|
||||
handle = w->data;
|
||||
assert(handle != NULL);
|
||||
assert(handle->type == UV_UDP);
|
||||
assert(handle->fd >= 0);
|
||||
assert(!(events & ~(EV_READ|EV_WRITE)));
|
||||
|
||||
if (events & EV_READ)
|
||||
uv__udp_recvmsg(handle);
|
||||
|
||||
if (events & EV_WRITE)
|
||||
uv__udp_sendmsg(handle);
|
||||
}
|
||||
|
||||
|
||||
static int uv__udp_bind(uv_udp_t* handle,
|
||||
int domain,
|
||||
struct sockaddr* addr,
|
||||
socklen_t len,
|
||||
unsigned flags) {
|
||||
int saved_errno;
|
||||
int status;
|
||||
int yes;
|
||||
int fd;
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
/* Check for bad flags. */
|
||||
if (flags & ~UV_UDP_IPV6ONLY) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Cannot set IPv6-only mode on non-IPv6 socket. */
|
||||
if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Check for already active socket. */
|
||||
if (handle->fd != -1) {
|
||||
uv_err_new_artificial(handle->loop, UV_EALREADY);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) {
|
||||
uv_err_new(handle->loop, errno);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (flags & UV_UDP_IPV6ONLY) {
|
||||
#ifdef IPV6_V6ONLY
|
||||
yes = 1;
|
||||
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
|
||||
uv_err_new(handle->loop, errno);
|
||||
goto out;
|
||||
}
|
||||
#else
|
||||
uv_err_new((uv_handle_t*)handle, ENOTSUP);
|
||||
goto out;
|
||||
#endif
|
||||
}
|
||||
|
||||
if (bind(fd, addr, len) == -1) {
|
||||
uv_err_new(handle->loop, errno);
|
||||
goto out;
|
||||
}
|
||||
|
||||
handle->fd = fd;
|
||||
status = 0;
|
||||
|
||||
out:
|
||||
if (status)
|
||||
uv__close(fd);
|
||||
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
|
||||
struct sockaddr_storage taddr;
|
||||
socklen_t addrlen;
|
||||
|
||||
assert(domain == AF_INET || domain == AF_INET6);
|
||||
|
||||
if (handle->fd != -1)
|
||||
return 0;
|
||||
|
||||
switch (domain) {
|
||||
case AF_INET:
|
||||
{
|
||||
struct sockaddr_in* addr = (void*)&taddr;
|
||||
memset(addr, 0, sizeof *addr);
|
||||
addr->sin_family = AF_INET;
|
||||
addr->sin_addr.s_addr = INADDR_ANY;
|
||||
addrlen = sizeof *addr;
|
||||
break;
|
||||
}
|
||||
case AF_INET6:
|
||||
{
|
||||
struct sockaddr_in6* addr = (void*)&taddr;
|
||||
memset(addr, 0, sizeof *addr);
|
||||
addr->sin6_family = AF_INET6;
|
||||
addr->sin6_addr = in6addr_any;
|
||||
addrlen = sizeof *addr;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(0 && "unsupported address family");
|
||||
abort();
|
||||
}
|
||||
|
||||
return uv__udp_bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0);
|
||||
}
|
||||
|
||||
|
||||
static int uv__udp_send(uv_udp_send_t* req,
|
||||
uv_udp_t* handle,
|
||||
uv_buf_t bufs[],
|
||||
int bufcnt,
|
||||
struct sockaddr* addr,
|
||||
socklen_t addrlen,
|
||||
uv_udp_send_cb send_cb) {
|
||||
if (uv__udp_maybe_deferred_bind(handle, addr->sa_family))
|
||||
return -1;
|
||||
|
||||
/* Don't use uv__req_init(), it zeroes the data field. */
|
||||
handle->loop->counters.req_init++;
|
||||
|
||||
memcpy(&req->addr, addr, addrlen);
|
||||
req->addrlen = addrlen;
|
||||
req->send_cb = send_cb;
|
||||
req->handle = handle;
|
||||
req->bufcnt = bufcnt;
|
||||
req->type = UV_UDP_SEND;
|
||||
|
||||
if (bufcnt <= UV_REQ_BUFSML_SIZE) {
|
||||
req->bufs = req->bufsml;
|
||||
}
|
||||
else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) {
|
||||
uv_err_new(handle->loop, ENOMEM);
|
||||
return -1;
|
||||
}
|
||||
memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
|
||||
|
||||
ngx_queue_insert_tail(&handle->write_queue, &req->queue);
|
||||
uv__udp_watcher_start(handle, &handle->write_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
|
||||
memset(handle, 0, sizeof *handle);
|
||||
|
||||
uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
|
||||
loop->counters.udp_init++;
|
||||
|
||||
handle->fd = -1;
|
||||
ngx_queue_init(&handle->write_queue);
|
||||
ngx_queue_init(&handle->write_completed_queue);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) {
|
||||
return uv__udp_bind(handle,
|
||||
AF_INET,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
flags);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) {
|
||||
return uv__udp_bind(handle,
|
||||
AF_INET6,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
flags);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_send(uv_udp_send_t* req,
|
||||
uv_udp_t* handle,
|
||||
uv_buf_t bufs[],
|
||||
int bufcnt,
|
||||
struct sockaddr_in addr,
|
||||
uv_udp_send_cb send_cb) {
|
||||
return uv__udp_send(req,
|
||||
handle,
|
||||
bufs,
|
||||
bufcnt,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
send_cb);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_send6(uv_udp_send_t* req,
|
||||
uv_udp_t* handle,
|
||||
uv_buf_t bufs[],
|
||||
int bufcnt,
|
||||
struct sockaddr_in6 addr,
|
||||
uv_udp_send_cb send_cb) {
|
||||
return uv__udp_send(req,
|
||||
handle,
|
||||
bufs,
|
||||
bufcnt,
|
||||
(struct sockaddr*)&addr,
|
||||
sizeof addr,
|
||||
send_cb);
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_recv_start(uv_udp_t* handle,
|
||||
uv_alloc_cb alloc_cb,
|
||||
uv_udp_recv_cb recv_cb) {
|
||||
if (alloc_cb == NULL || recv_cb == NULL) {
|
||||
uv_err_new_artificial(handle->loop, UV_EINVAL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ev_is_active(&handle->read_watcher)) {
|
||||
uv_err_new_artificial(handle->loop, UV_EALREADY);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (uv__udp_maybe_deferred_bind(handle, AF_INET))
|
||||
return -1;
|
||||
|
||||
handle->alloc_cb = alloc_cb;
|
||||
handle->recv_cb = recv_cb;
|
||||
uv__udp_watcher_start(handle, &handle->read_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_udp_recv_stop(uv_udp_t* handle) {
|
||||
uv__udp_watcher_stop(handle, &handle->read_watcher);
|
||||
handle->alloc_cb = NULL;
|
||||
handle->recv_cb = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
|
||||
uv__handle_init(loop, (uv_handle_t*)tcp, UV_TCP);
|
||||
loop->counters.tcp_init++;
|
||||
@ -2609,7 +2104,7 @@ static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
|
||||
|
||||
|
||||
/* Open a socket in non-blocking close-on-exec mode, atomically if possible. */
|
||||
static int uv__socket(int domain, int type, int protocol) {
|
||||
int uv__socket(int domain, int type, int protocol) {
|
||||
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
|
||||
return socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
|
||||
#else
|
||||
|
||||
Loading…
Reference in New Issue
Block a user