From a3c9a98e95aa423270b382ad5e3c457533b258e5 Mon Sep 17 00:00:00 2001 From: Sam Schweigel Date: Fri, 12 Dec 2025 15:01:40 -0800 Subject: [PATCH] Write to UV_HANDLE_BLOCKING_WRITES streams in the threadpool On Unix, if a uv_stream_t has UV_HANDLE_BLOCKING_WRITES set, either because we were unable to put the file in O_NONBLOCK mode when the stream was initialized, or because the stream has been shared with a subprocess via UV_INHERIT_STREAM, uv_write can block. The nicest solution would be to use platform-specific features to do non-blocking writes to files regardless of the O_NONBLOCK state, but a fallback is needed regardless. This change implements the fallback by doing all writes to blocking streams in the threadpool. Note that this also tweaks uv_spawn slightly: we clear O_NONBLOCK on libuv-owned streams shared using UV_INHERIT_STREAM, but not on files shared with UV_INHERIT_FD. --- CMakeLists.txt | 1 + Makefile.am | 1 + include/uv/unix.h | 1 + src/unix/core.c | 7 +- src/unix/process.c | 25 +++--- src/unix/stream.c | 97 +++++++++++++++++----- src/uv-common.h | 1 + test/test-list.h | 2 + test/test-pipe-blocking-subprocess.c | 117 +++++++++++++++++++++++++++ 9 files changed, 217 insertions(+), 35 deletions(-) create mode 100644 test/test-pipe-blocking-subprocess.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 449dc8322..4fc4e26d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -596,6 +596,7 @@ if(LIBUV_BUILD_TESTS) test/test-pass-always.c test/test-ping-pong.c test/test-pipe-bind-error.c + test/test-pipe-blocking-subprocess.c test/test-pipe-close-stdout-read-stdin.c test/test-pipe-connect-error.c test/test-pipe-connect-multiple.c diff --git a/Makefile.am b/Makefile.am index 797efc83e..65ed7a262 100644 --- a/Makefile.am +++ b/Makefile.am @@ -221,6 +221,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-pass-always.c \ test/test-ping-pong.c \ test/test-pipe-bind-error.c \ + test/test-pipe-blocking-subprocess.c \ test/test-pipe-connect-error.c \ test/test-pipe-connect-multiple.c \ test/test-pipe-connect-prepare.c \ diff --git a/include/uv/unix.h b/include/uv/unix.h index c6ba419da..458836c3f 100644 --- a/include/uv/unix.h +++ b/include/uv/unix.h @@ -292,6 +292,7 @@ typedef struct { int delayed_error; \ int accepted_fd; \ void* queued_fds; \ + uv_fs_t* blocked_write; \ UV_STREAM_PRIVATE_PLATFORM_FIELDS \ #define UV_TCP_PRIVATE_FIELDS /* empty */ diff --git a/src/unix/core.c b/src/unix/core.c index cde4dc444..093735bc6 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -344,7 +344,12 @@ static void uv__finish_close(uv_handle_t* handle) { case UV_NAMED_PIPE: case UV_TCP: case UV_TTY: - uv__stream_destroy((uv_stream_t*)handle); + if (handle->flags & UV_HANDLE_WRITE_PENDING) { + handle->flags ^= UV_HANDLE_CLOSED; + uv__make_close_pending(handle); /* Back into the queue. */ + return; + } + uv__stream_destroy((uv_stream_t *)handle); break; case UV_UDP: diff --git a/src/unix/process.c b/src/unix/process.c index 43e6b7984..db08d6f7a 100644 --- a/src/unix/process.c +++ b/src/unix/process.c @@ -216,16 +216,24 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) { return ret; case UV_INHERIT_FD: + fd = container->data.fd; + if (fd == -1) + return UV_EINVAL; + fds[1] = fd; + return 0; + case UV_INHERIT_STREAM: - if (container->flags & UV_INHERIT_FD) - fd = container->data.fd; - else - fd = uv__stream_fd(container->data.stream); + fd = uv__stream_fd(container->data.stream); if (fd == -1) return UV_EINVAL; - fds[1] = fd; + + ret = uv__nonblock(fd, 0); + if (ret != 0) + return ret; + container->data.stream->flags |= UV_HANDLE_BLOCKING_WRITES; + return 0; default: @@ -372,9 +380,6 @@ static void uv__process_child_init(const uv_process_options_t* options, if (fd == -1) uv__write_errno(error_fd); - if (fd <= 2 && close_fd == -1) - uv__nonblock_fcntl(fd, 0); - if (close_fd >= stdio_count) uv__close(close_fd); } @@ -625,10 +630,6 @@ static int uv__spawn_set_posix_spawn_file_actions( assert(err != ENOSYS); if (err != 0) goto error; - - /* Make sure the fd is marked as non-blocking (state shared between child - * and parent). */ - uv__nonblock_fcntl(use_fd, 0); } /* Finally, close all the superfluous descriptors */ diff --git a/src/unix/stream.c b/src/unix/stream.c index 725558662..26c8ffe0d 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -92,6 +92,7 @@ void uv__stream_init(uv_loop_t* loop, stream->shutdown_req = NULL; stream->accepted_fd = -1; stream->queued_fds = NULL; + stream->blocked_write = NULL; stream->delayed_error = 0; uv__queue_init(&stream->write_queue); uv__queue_init(&stream->write_completed_queue); @@ -455,6 +456,13 @@ void uv__stream_destroy(uv_stream_t* stream) { assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); assert(stream->flags & UV_HANDLE_CLOSED); + if (stream->io_watcher.fd != -1) { + /* Don't close stdio file descriptors. Nothing good comes from it. */ + if (stream->io_watcher.fd > STDERR_FILENO) + uv__close(stream->io_watcher.fd); + stream->io_watcher.fd = -1; + } + if (stream->connect_req) { uv__req_unregister(stream->loop); stream->connect_req->cb(stream->connect_req, UV_ECANCELED); @@ -465,6 +473,11 @@ void uv__stream_destroy(uv_stream_t* stream) { uv__write_callbacks(stream); uv__drain(stream); + if (stream->blocked_write) { + uv__free(stream->blocked_write); + stream->blocked_write = NULL; + } + assert(stream->write_queue_size == 0); } @@ -732,7 +745,8 @@ static void uv__write_req_finish(uv_write_t* req) { * callback called in the near future. */ uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); - uv__io_feed(stream->loop, &stream->io_watcher); + if (!(stream->flags & UV_HANDLE_CLOSING)) + uv__io_feed(stream->loop, &stream->io_watcher); } @@ -836,6 +850,59 @@ static int uv__try_write(uv_stream_t* stream, return UV__ERR(errno); } +static void uv__write_done(uv_fs_t* req) { + uv_stream_t* stream; + struct uv__queue* q; + uv_write_t* wreq; + + stream = (uv_stream_t*)req->data; + stream->flags &= ~UV_HANDLE_WRITE_PENDING; + assert(!(uv__queue_empty(&stream->write_queue))); + q = uv__queue_head(&stream->write_queue); + wreq = uv__queue_data(q, uv_write_t, queue); + + if (req->result >= 0) { + if (uv__write_req_update(stream, wreq, req->result)) + uv__write_req_finish(wreq); + } else { + wreq->error = req->result; + uv__write_req_finish(wreq); + } +} + +static int uv__write_submit(uv_stream_t* stream, + const uv_buf_t bufs[], + unsigned int nbufs, + uv_stream_t* send_handle) { + uv_os_fd_t fd; + uv_fs_t* req; + ssize_t r; + + if (send_handle || !(stream->flags & UV_HANDLE_BLOCKING_WRITES)) + return uv__try_write(stream, bufs, nbufs, send_handle); + + /* Is a blocking write already underway? */ + if (stream->flags & UV_HANDLE_WRITE_PENDING) + return UV_EAGAIN; + + stream->flags |= UV_HANDLE_WRITE_PENDING; + req = stream->blocked_write; + if (req == NULL) { + req = uv__malloc(sizeof *stream->blocked_write); + req->data = stream; + stream->blocked_write = req; + } + + fd = uv__stream_fd(stream); + r = uv_fs_write(stream->loop, req, fd, bufs, nbufs, -1, uv__write_done); + if (r != 0) { + stream->flags &= ~UV_HANDLE_WRITE_PENDING; + return r; + } + + return UV_EAGAIN; +} + static void uv__write(uv_stream_t* stream) { struct uv__queue* q; uv_write_t* req; @@ -858,10 +925,10 @@ static void uv__write(uv_stream_t* stream) { req = uv__queue_data(q, uv_write_t, queue); assert(req->handle == stream); - n = uv__try_write(stream, - &(req->bufs[req->write_index]), - req->nbufs - req->write_index, - req->send_handle); + n = uv__write_submit(stream, + &(req->bufs[req->write_index]), + req->nbufs - req->write_index, + req->send_handle); /* Ensure the handle isn't sent again in case this is a partial write. */ if (n >= 0) { @@ -876,10 +943,6 @@ static void uv__write(uv_stream_t* stream) { } else if (n != UV_EAGAIN) goto error; - /* If this is a blocking stream, try again. */ - if (stream->flags & UV_HANDLE_BLOCKING_WRITES) - continue; - /* We're not done. */ uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); @@ -1384,12 +1447,6 @@ int uv_write2(uv_write_t* req, uv__write(stream); } else { - /* - * blocking streams should never have anything in the queue. - * if this assert fires then somehow the blocking stream isn't being - * sufficiently flushed in uv__write. - */ - assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); } @@ -1533,13 +1590,6 @@ void uv__stream_close(uv_stream_t* handle) { uv__handle_stop(handle); handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); - if (handle->io_watcher.fd != -1) { - /* Don't close stdio file descriptors. Nothing good comes from it. */ - if (handle->io_watcher.fd > STDERR_FILENO) - uv__close(handle->io_watcher.fd); - handle->io_watcher.fd = -1; - } - if (handle->accepted_fd != -1) { uv__close(handle->accepted_fd); handle->accepted_fd = -1; @@ -1554,6 +1604,9 @@ void uv__stream_close(uv_stream_t* handle) { handle->queued_fds = NULL; } + if (handle->flags & UV_HANDLE_WRITE_PENDING) + uv_cancel((uv_req_t*)handle->blocked_write); + assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); } diff --git a/src/uv-common.h b/src/uv-common.h index b9a8e976e..639df6ddf 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -101,6 +101,7 @@ enum { UV_HANDLE_READ_PENDING = 0x00010000, UV_HANDLE_SYNC_BYPASS_IOCP = 0x00020000, UV_HANDLE_ZERO_READ = 0x00040000, + UV_HANDLE_WRITE_PENDING = 0x00080000, /* UNIX only */ UV_HANDLE_EMULATE_IOCP = 0x00080000, UV_HANDLE_BLOCKING_WRITES = 0x00100000, UV_HANDLE_CANCELLATION_PENDING = 0x00200000, diff --git a/test/test-list.h b/test/test-list.h index 5d44fefb4..f22f8cdf8 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -128,6 +128,7 @@ TEST_DECLARE (tcp_bind_invalid_flags) TEST_DECLARE (tcp_bind_writable_flags) TEST_DECLARE (tcp_bind_or_listen_error_after_close) TEST_DECLARE (tcp_listen_without_bind) +TEST_DECLARE (pipe_blocking_subprocess) TEST_DECLARE (tcp_connect_error_fault) TEST_DECLARE (tcp_connect6_error_fault) TEST_DECLARE (tcp_connect6_link_local) @@ -753,6 +754,7 @@ TASK_LIST_START TEST_ENTRY (tcp_bind_writable_flags) TEST_ENTRY (tcp_bind_or_listen_error_after_close) TEST_ENTRY (tcp_listen_without_bind) + TEST_ENTRY_CUSTOM (pipe_blocking_subprocess, 0, 0, 20000) TEST_ENTRY (tcp_connect_error_fault) TEST_ENTRY (tcp_connect6_error_fault) TEST_ENTRY (tcp_connect6_link_local) diff --git a/test/test-pipe-blocking-subprocess.c b/test/test-pipe-blocking-subprocess.c new file mode 100644 index 000000000..d89933f8d --- /dev/null +++ b/test/test-pipe-blocking-subprocess.c @@ -0,0 +1,117 @@ +/* Copyright libuv contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#define FILL_PIPE_NUM 0x40000 + +static uv_loop_t* loop; +static char exepath[1024]; +static size_t exepath_size = sizeof exepath; +static char* args[3]; +static uv_process_options_t options; +static uv_process_t process; +static uv_stdio_container_t stdios[3]; +static uv_pipe_t pipe_in; +static uv_pipe_t pipe_out; +static uv_file fds[2]; +static size_t total_read; +static int write_complete; +static uv_write_t req; +static uv_buf_t buf; + +static void write_cb(uv_write_t* req, int status) { + ASSERT_OK(status); + ++write_complete; +} + +static void exit_cb(uv_process_t* process, + int64_t exit_status, + int term_signal) { + ASSERT_EQ(1, exit_status); + ASSERT_OK(term_signal); + uv_close((uv_handle_t*) process, NULL); + ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb)); +} + +static void read_cb(uv_stream_t* stream, + ssize_t nread, + const uv_buf_t* buf) { + ASSERT_GE(nread, 0); + total_read += nread; + free(buf->base); + if (total_read == 12 + FILL_PIPE_NUM) { + uv_read_stop(stream); + uv_close((uv_handle_t*)&pipe_in, NULL); + uv_close((uv_handle_t*)&pipe_out, NULL); + } +} + +static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { + buf->base = malloc(size); + buf->len = size; +} + +TEST_IMPL(pipe_blocking_subprocess) { + + loop = uv_default_loop(); + + ASSERT_OK(uv_pipe_init(loop, &pipe_in, 0)); + ASSERT_OK(uv_pipe_init(loop, &pipe_out, 0)); + ASSERT_OK(uv_pipe(fds, 0, 0)); + ASSERT_OK(uv_pipe_open(&pipe_out, fds[0])); + ASSERT_OK(uv_pipe_open(&pipe_in, fds[1])); + + ASSERT_OK(uv_read_start((uv_stream_t*)&pipe_out, alloc_cb, read_cb)); + + ASSERT_OK(uv_exepath(exepath, &exepath_size)); + exepath[exepath_size] = '\0'; + args[0] = exepath; + args[1] = "spawn_helper2"; + args[2] = NULL; + options.file = exepath; + options.args = args; + options.exit_cb = exit_cb; + options.flags = 0; + options.stdio_count = ARRAY_SIZE(stdios); + options.stdio = stdios; + stdios[0].flags = UV_IGNORE; + stdios[1].flags = UV_INHERIT_STREAM; + stdios[1].data.stream = (uv_stream_t*)&pipe_in; + stdios[2].flags = UV_IGNORE; + + /* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */ + ASSERT_OK(uv_spawn(loop, &process, &options)); + + /* Now write enough that the pipe buffer fills. */ + buf.len = FILL_PIPE_NUM; + buf.base = malloc(buf.len); + memset(buf.base, 'A', buf.len); + + ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); + ASSERT_EQ(write_complete, 1); + ASSERT_EQ(total_read, buf.len + 12); + + free(buf.base); + + return 0; +}