From 0d43992c3b268b756d3018b7e55663f2b056daf3 Mon Sep 17 00:00:00 2001 From: Andrius Bentkus Date: Mon, 30 Jun 2014 00:01:10 +0200 Subject: [PATCH] unix, win: add uv_udp_try_send The function returns UV_EAGAIN if the queue is not empty and the message couldn't be sent immediately. --- Makefile.am | 1 + include/uv.h | 21 +++++-- src/unix/udp.c | 40 ++++++++++++ src/uv-common.c | 20 ++++++ src/uv-common.h | 6 ++ src/win/udp.c | 9 +++ test/test-list.h | 2 + test/test-udp-try-send.c | 133 +++++++++++++++++++++++++++++++++++++++ uv.gyp | 1 + 9 files changed, 229 insertions(+), 4 deletions(-) create mode 100644 test/test-udp-try-send.c diff --git a/Makefile.am b/Makefile.am index 25c87145f..e62fbac32 100644 --- a/Makefile.am +++ b/Makefile.am @@ -214,6 +214,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-udp-open.c \ test/test-udp-options.c \ test/test-udp-send-and-recv.c \ + test/test-udp-try-send.c \ test/test-walk-handles.c \ test/test-watcher-cross-stop.c test_run_tests_LDADD = libuv.la diff --git a/include/uv.h b/include/uv.h index 99b9037ae..c21a51a58 100644 --- a/include/uv.h +++ b/include/uv.h @@ -901,12 +901,12 @@ typedef void (*uv_udp_recv_cb)(uv_udp_t* handle, struct uv_udp_s { UV_HANDLE_FIELDS /* read-only */ - /* Total size of buffers queued for sending. May send - * actually less since udp packets are truncated to the MTU size. + /* number of bytes queued for sending, ay send + * actually less since udp packets are truncated to the MTU size */ size_t send_queue_size; - /* Total count of sends currently in the queue awaiting to - * be processed. + /* number of send requests currently in the queue awaiting to + * be processed */ size_t send_queue_count; UV_UDP_PRIVATE_FIELDS @@ -1076,6 +1076,19 @@ UV_EXTERN int uv_udp_send(uv_udp_send_t* req, const struct sockaddr* addr, uv_udp_send_cb send_cb); +/* + * Same as `uv_udp_send()`, but won't queue a send request if it can't be completed + * immediately. + * Will return either: + * - >= 0: number of bytes written (can be less than the supplied buffer size if the + * packet is truncated) + * - < 0: negative error code (UV_EAGAIN is returned when the message can't be sent + * immediately) + */ +UV_EXTERN int uv_udp_try_send(uv_udp_t* handle, + const uv_buf_t bufs[], + unsigned int nbufs, + const struct sockaddr* addr); /* * Receive data. If the socket has not previously been bound with `uv_udp_bind` * it is bound to 0.0.0.0 (the "all interfaces" address) and a random diff --git a/src/unix/udp.c b/src/unix/udp.c index 4ca811d64..d99fe104b 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -448,6 +448,46 @@ int uv__udp_send(uv_udp_send_t* req, } +int uv__udp_try_send(uv_udp_t* handle, + const uv_buf_t bufs[], + unsigned int nbufs, + const struct sockaddr* addr, + unsigned int addrlen) { + int err; + struct msghdr h; + ssize_t size; + + assert(nbufs > 0); + + /* already sending a message */ + if (handle->send_queue_count != 0) + return -EAGAIN; + + err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); + if (err) + return err; + + memset(&h, 0, sizeof h); + h.msg_name = (struct sockaddr*) addr; + h.msg_namelen = addrlen; + h.msg_iov = (struct iovec*) bufs; + h.msg_iovlen = nbufs; + + do { + size = sendmsg(handle->io_watcher.fd, &h, 0); + } while (size == -1 && errno == EINTR); + + if (size == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return -EAGAIN; + else + return -errno; + } + + return size; +} + + static int uv__udp_set_membership4(uv_udp_t* handle, const struct sockaddr_in* multicast_addr, const char* interface_addr, diff --git a/src/uv-common.c b/src/uv-common.c index e3255031a..4e3968cb4 100644 --- a/src/uv-common.c +++ b/src/uv-common.c @@ -233,6 +233,26 @@ int uv_udp_send(uv_udp_send_t* req, } +int uv_udp_try_send(uv_udp_t* handle, + const uv_buf_t bufs[], + unsigned int nbufs, + const struct sockaddr* addr) { + unsigned int addrlen; + + if (handle->type != UV_UDP) + return UV_EINVAL; + + if (addr->sa_family == AF_INET) + addrlen = sizeof(struct sockaddr_in); + else if (addr->sa_family == AF_INET6) + addrlen = sizeof(struct sockaddr_in6); + else + return UV_EINVAL; + + return uv__udp_try_send(handle, bufs, nbufs, addr, addrlen); +} + + int uv_udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb, uv_udp_recv_cb recv_cb) { diff --git a/src/uv-common.h b/src/uv-common.h index be8991d6a..34c287898 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -83,6 +83,12 @@ int uv__udp_send(uv_udp_send_t* req, unsigned int addrlen, uv_udp_send_cb send_cb); +int uv__udp_try_send(uv_udp_t* handle, + const uv_buf_t bufs[], + unsigned int nbufs, + const struct sockaddr* addr, + unsigned int addrlen); + int uv__udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloccb, uv_udp_recv_cb recv_cb); diff --git a/src/win/udp.c b/src/win/udp.c index 638b331ec..e98a7d79b 100644 --- a/src/win/udp.c +++ b/src/win/udp.c @@ -869,3 +869,12 @@ int uv__udp_send(uv_udp_send_t* req, return 0; } + + +int uv__udp_try_send(uv_udp_t* handle, + const uv_buf_t bufs[], + unsigned int nbufs, + const struct sockaddr* addr, + unsigned int addrlen) { + return UV_ENOSYS; +} diff --git a/test/test-list.h b/test/test-list.h index 15c2e4ed3..6b26da156 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -95,6 +95,7 @@ TEST_DECLARE (udp_ipv6_only) TEST_DECLARE (udp_options) TEST_DECLARE (udp_no_autobind) TEST_DECLARE (udp_open) +TEST_DECLARE (udp_try_send) TEST_DECLARE (pipe_bind_error_addrinuse) TEST_DECLARE (pipe_bind_error_addrnotavail) TEST_DECLARE (pipe_bind_error_inval) @@ -376,6 +377,7 @@ TASK_LIST_START TEST_ENTRY (udp_multicast_join) TEST_ENTRY (udp_multicast_join6) TEST_ENTRY (udp_multicast_ttl) + TEST_ENTRY (udp_try_send) TEST_ENTRY (udp_open) TEST_HELPER (udp_open, udp4_echo_server) diff --git a/test/test-udp-try-send.c b/test/test-udp-try-send.c new file mode 100644 index 000000000..7b6de3654 --- /dev/null +++ b/test/test-udp-try-send.c @@ -0,0 +1,133 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include +#include + +#ifdef _WIN32 + +TEST_IMPL(udp_try_send) { + + MAKE_VALGRIND_HAPPY(); + return 0; +} + +#else /* !_WIN32 */ + +#define CHECK_HANDLE(handle) \ + ASSERT((uv_udp_t*)(handle) == &server || (uv_udp_t*)(handle) == &client) + +static uv_udp_t server; +static uv_udp_t client; + +static int sv_recv_cb_called; + +static int close_cb_called; + + +static void alloc_cb(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { + static char slab[65536]; + CHECK_HANDLE(handle); + ASSERT(suggested_size <= sizeof(slab)); + buf->base = slab; + buf->len = sizeof(slab); +} + + +static void close_cb(uv_handle_t* handle) { + CHECK_HANDLE(handle); + ASSERT(uv_is_closing(handle)); + close_cb_called++; +} + + +static void sv_recv_cb(uv_udp_t* handle, + ssize_t nread, + const uv_buf_t* rcvbuf, + const struct sockaddr* addr, + unsigned flags) { + ASSERT(nread > 0); + + if (nread == 0) { + ASSERT(addr == NULL); + return; + } + + ASSERT(nread == 4); + ASSERT(addr != NULL); + + ASSERT(memcmp("EXIT", rcvbuf->base, nread) == 0); + uv_close((uv_handle_t*) handle, close_cb); + uv_close((uv_handle_t*) &client, close_cb); + + sv_recv_cb_called++; +} + + +TEST_IMPL(udp_try_send) { + struct sockaddr_in addr; + static char buffer[64 * 1024]; + uv_buf_t buf; + int r; + + ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); + + r = uv_udp_init(uv_default_loop(), &server); + ASSERT(r == 0); + + r = uv_udp_bind(&server, (const struct sockaddr*) &addr, 0); + ASSERT(r == 0); + + r = uv_udp_recv_start(&server, alloc_cb, sv_recv_cb); + ASSERT(r == 0); + + ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + r = uv_udp_init(uv_default_loop(), &client); + ASSERT(r == 0); + + buf = uv_buf_init(buffer, sizeof(buffer)); + r = uv_udp_try_send(&client, &buf, 1, (const struct sockaddr*) &addr); + ASSERT(r == UV_EMSGSIZE); + + buf = uv_buf_init("EXIT", 4); + r = uv_udp_try_send(&client, &buf, 1, (const struct sockaddr*) &addr); + ASSERT(r == 4); + + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + + ASSERT(close_cb_called == 2); + ASSERT(sv_recv_cb_called == 1); + + ASSERT(client.send_queue_size == 0); + ASSERT(server.send_queue_size == 0); + + MAKE_VALGRIND_HAPPY(); + return 0; +} + +#endif /* !_WIN32 */ diff --git a/uv.gyp b/uv.gyp index d9772b4ea..9d68fc961 100644 --- a/uv.gyp +++ b/uv.gyp @@ -402,6 +402,7 @@ 'test/test-ip6-addr.c', 'test/test-udp-multicast-interface.c', 'test/test-udp-multicast-interface6.c', + 'test/test-udp-try-send.c', ], 'conditions': [ [ 'OS=="win"', {