diff --git a/CMakeLists.txt b/CMakeLists.txt index b391e7c3c..bf3b87803 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -647,6 +647,7 @@ if(LIBUV_BUILD_TESTS) test/test-tcp-connect-timeout.c test/test-tcp-connect6-error.c test/test-tcp-create-socket-early.c + test/test-tcp-exportimport.c test/test-tcp-flags.c test/test-tcp-oob.c test/test-tcp-open.c diff --git a/Makefile.am b/Makefile.am index 9b9e6be71..242046e20 100644 --- a/Makefile.am +++ b/Makefile.am @@ -273,6 +273,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-tcp-connect-error.c \ test/test-tcp-connect-timeout.c \ test/test-tcp-connect6-error.c \ + test/test-tcp-exportimport.c \ test/test-tcp-flags.c \ test/test-tcp-open.c \ test/test-tcp-read-stop.c \ diff --git a/include/uv.h b/include/uv.h index 938e998fd..34a16bc86 100644 --- a/include/uv.h +++ b/include/uv.h @@ -637,6 +637,26 @@ UV_EXTERN int uv_tcp_connect(uv_connect_t* req, const struct sockaddr* addr, uv_connect_cb cb); + +/* + * Exports a uv_tcp_t handle by duplicating its underlying file descriptor. + * + * This allows the socket to be safely imported and used by another + * libuv event loop or thread using `uv_tcp_import()`. + */ +UV_EXTERN int uv_tcp_export(uv_tcp_t* stream, int* fd); + +/* + * Imports a TCP socket file descriptor into a libuv TCP handle. + * + * This function initializes a user-provided `uv_tcp_t` structure and binds it + * to an existing, valid file descriptor (mostly obtained via `uv_tcp_export`) + */ +UV_EXTERN int uv_tcp_import(uv_loop_t* loop, + int fd, + uv_tcp_t* out, + unsigned int flags); + /* uv_connect_t is a subclass of uv_req_t. */ struct uv_connect_s { UV_REQ_FIELDS diff --git a/src/unix/stream.c b/src/unix/stream.c index 18763b474..2594c31aa 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -1477,6 +1478,50 @@ int uv_read_stop(uv_stream_t* stream) { } +int uv_tcp_export(uv_tcp_t* stream, int* fd) { +#ifndef F_DUPFD_CLOEXEC /* POSIX 2008 */ + int err; +#endif + + if (stream->type != UV_TCP) + return UV_EINVAL; + + /* Clone the inner fd. Start from a safe number (3). */ +#ifdef F_DUPFD_CLOEXEC /* POSIX 2008 */ + *fd = fcntl(stream->io_watcher.fd, F_DUPFD_CLOEXEC, 3); +#else + *fd = fcntl(stream->io_watcher.fd, F_DUPFD, 3); +#endif + if (*fd == -1) + return UV__ERR(errno); + +#ifndef F_DUPFD_CLOEXEC /* POSIX 2008 */ + err = uv__cloexec(fd, 1); + if (err != 0) { + uv__close(fd); + return err; + } +#endif + return 0; +} + + +int uv_tcp_import(uv_loop_t* loop, int fd, uv_tcp_t* out, unsigned int flags) { + int err; + + err = uv_tcp_init_ex(loop, out, flags); + if (err) + return err; + + err = uv_tcp_open(out, fd); + if (err) { + uv_close((uv_handle_t*)out, NULL); + return err; + } + + return 0; +} + int uv_is_readable(const uv_stream_t* stream) { return !!(stream->flags & UV_HANDLE_READABLE); } diff --git a/src/win/stream.c b/src/win/stream.c index a53a10b03..ae2f18e0f 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -250,3 +250,13 @@ int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { return 0; } + + +int uv_tcp_export(uv_tcp_t* stream, int* fd) { + return UV_ENOSYS; +} + + +int uv_tcp_import(uv_loop_t* loop, int fd, uv_tcp_t* out, unsigned int flags) { + return UV_ENOSYS; +} diff --git a/test/test-list.h b/test/test-list.h index 24dbcdd71..ad8e2695b 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -148,6 +148,8 @@ TEST_DECLARE (tcp_create_early_accept) #ifndef _WIN32 TEST_DECLARE (tcp_close_accept) TEST_DECLARE (tcp_oob) +TEST_DECLARE (tcp_exportimport_listen_after_write) +TEST_DECLARE (tcp_exportimport_listen_before_write) #endif TEST_DECLARE (tcp_flags) TEST_DECLARE (tcp_write_to_half_open_connection) @@ -770,6 +772,8 @@ TASK_LIST_START #ifndef _WIN32 TEST_ENTRY (tcp_close_accept) TEST_ENTRY (tcp_oob) + TEST_ENTRY (tcp_exportimport_listen_after_write) + TEST_ENTRY (tcp_exportimport_listen_before_write) #endif TEST_ENTRY (tcp_flags) TEST_ENTRY (tcp_write_to_half_open_connection) diff --git a/test/test-tcp-exportimport.c b/test/test-tcp-exportimport.c new file mode 100644 index 000000000..7d6c45d30 --- /dev/null +++ b/test/test-tcp-exportimport.c @@ -0,0 +1,201 @@ +/* Copyright libuv project contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +/* Win implementation is not done yet. */ +#ifndef _WIN32 + +#include "uv.h" +#include "runner.h" +#include "task.h" + +typedef struct { + uv_loop_t* loop; + uv_thread_t thread; + uv_async_t* recv_channel; + uv_async_t* send_channel; + uv_tcp_t server; + uv_tcp_t conn; + int connection_accepted; + int close_cb_called; +} worker_t; + +static uv_async_t send_channel; +static uv_async_t recv_channel; +static worker_t parent; +static worker_t child; + +static int dup_fd_handle = -1; + +typedef struct { + uv_connect_t conn_req; + uv_tcp_t conn; +} tcp_conn; + +#define CONN_COUNT 100 +static tcp_conn conns[CONN_COUNT]; + +static void close_cb(uv_handle_t* handle) { + worker_t* worker = handle->data; + ASSERT_NOT_NULL(worker); + worker->close_cb_called++; +} + + +static void on_connection(uv_stream_t* server, int status) { + worker_t* worker = container_of(server, worker_t, server); + ASSERT_NOT_NULL(worker); + ASSERT(worker == &parent || worker == &child); + + if (!worker->connection_accepted) { + /* Accept the connection and close it. */ + ASSERT_OK(status); + + ASSERT_OK(uv_tcp_init(server->loop, &worker->conn)); + + worker->conn.data = worker; + + ASSERT_OK(uv_accept(server, (uv_stream_t*)&worker->conn)); + + worker->connection_accepted = 1; + + uv_close((uv_handle_t*)worker->recv_channel, close_cb); + uv_close((uv_handle_t*)&worker->conn, close_cb); + uv_close((uv_handle_t*)server, close_cb); + } +} + + +static void connect_cb(uv_connect_t* req, int status) { + uv_close((uv_handle_t*)req->handle, NULL); +} + + +static void make_many_connections(void) { + tcp_conn* conn; + struct sockaddr_in addr; + int i; + + for (i = 0; i < (int)ARRAY_SIZE(conns); i++) { + conn = &conns[i]; + + ASSERT_OK(uv_tcp_init(uv_default_loop(), &conn->conn)); + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + ASSERT_OK(uv_tcp_connect(&conn->conn_req, + (uv_tcp_t*)&conn->conn, + (struct sockaddr*) &addr, + connect_cb)); + conn->conn.data = conn; + } +} + + +void on_parent_msg(uv_async_t* handle) { + parent.server.data = &parent; + + /* Import the shared TCP server, and start listening on it. */ + ASSERT_OK(uv_tcp_import(parent.loop, dup_fd_handle, &parent.server, 0)); + + ASSERT_OK(uv_listen((uv_stream_t*)&parent.server, 12, on_connection)); + ASSERT_EQ(parent.loop, parent.server.loop); + + /* Create a bunch of connections to get both servers to accept. */ + make_many_connections(); +} + + +void on_child_msg(uv_async_t* handle) { + ASSERT(!"no"); +} + + +static void child_thread_entry(void* arg) { + int listen_after_write = *(int*) arg; + struct sockaddr_in addr; + + ASSERT_OK(uv_tcp_init(child.loop, &child.server)); + child.server.data = &child; + + ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); + + ASSERT_OK(uv_tcp_bind(&child.server, (struct sockaddr*) &addr, 0)); + + if (!listen_after_write) + ASSERT_OK(uv_listen((uv_stream_t*)&child.server, 12, on_connection)); + + ASSERT_OK(uv_tcp_export(&child.server, &dup_fd_handle)); + ASSERT_GT(dup_fd_handle, -1); + + ASSERT_OK(uv_async_send(child.send_channel)); + + if (listen_after_write) + ASSERT_OK(uv_listen((uv_stream_t*)&child.server, 12, on_connection)); + + ASSERT_OK(uv_run(child.loop, UV_RUN_DEFAULT)); + + ASSERT(child.connection_accepted == 1); + ASSERT(child.close_cb_called == 3); +} + + +static void run_tcp_exportimport_test(int listen_after_write) { + parent.send_channel = &send_channel; + parent.recv_channel = &recv_channel; + child.send_channel = &recv_channel; + child.recv_channel = &send_channel; + + parent.loop = uv_default_loop(); + child.loop = uv_loop_new(); + ASSERT(child.loop); + + ASSERT_OK(uv_async_init(parent.loop, parent.recv_channel, on_parent_msg)); + parent.recv_channel->data = &parent; + + ASSERT_OK(uv_async_init(child.loop, child.recv_channel, on_child_msg)); + child.recv_channel->data = &child; + + ASSERT_OK(uv_thread_create(&child.thread, + child_thread_entry, + &listen_after_write)); + + ASSERT_OK(uv_run(parent.loop, UV_RUN_DEFAULT)); + MAKE_VALGRIND_HAPPY(parent.loop); + + ASSERT_EQ(parent.connection_accepted, 1); + ASSERT_EQ(parent.close_cb_called, 3); + + ASSERT_OK(uv_thread_join(&child.thread)); + + MAKE_VALGRIND_HAPPY(child.loop); +} + + +TEST_IMPL(tcp_exportimport_listen_after_write) { + run_tcp_exportimport_test(1); + return 0; +} + + +TEST_IMPL(tcp_exportimport_listen_before_write) { + run_tcp_exportimport_test(0); + return 0; +} + +#endif /* !_WIN32 */