Write to UV_HANDLE_BLOCKING_WRITES streams in the threadpool

On Unix, if a uv_stream_t has UV_HANDLE_BLOCKING_WRITES set, either
because we were unable to put the file in O_NONBLOCK mode when the
stream was initialized, or because the stream has been shared with a
subprocess via UV_INHERIT_STREAM, uv_write can block.

The nicest solution would be to use platform-specific features to do
non-blocking writes to files regardless of the O_NONBLOCK state, but a
fallback is needed regardless.  This change implements the fallback by
doing all writes to blocking streams in the threadpool.

Note that this also tweaks uv_spawn slightly: we clear O_NONBLOCK on
libuv-owned streams shared using UV_INHERIT_STREAM, but not on files
shared with UV_INHERIT_FD.
This commit is contained in:
Sam Schweigel 2025-12-12 15:01:40 -08:00
parent ec7ec98b70
commit a3c9a98e95
9 changed files with 217 additions and 35 deletions

View File

@ -596,6 +596,7 @@ if(LIBUV_BUILD_TESTS)
test/test-pass-always.c
test/test-ping-pong.c
test/test-pipe-bind-error.c
test/test-pipe-blocking-subprocess.c
test/test-pipe-close-stdout-read-stdin.c
test/test-pipe-connect-error.c
test/test-pipe-connect-multiple.c

View File

@ -221,6 +221,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-pass-always.c \
test/test-ping-pong.c \
test/test-pipe-bind-error.c \
test/test-pipe-blocking-subprocess.c \
test/test-pipe-connect-error.c \
test/test-pipe-connect-multiple.c \
test/test-pipe-connect-prepare.c \

View File

@ -292,6 +292,7 @@ typedef struct {
int delayed_error; \
int accepted_fd; \
void* queued_fds; \
uv_fs_t* blocked_write; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
#define UV_TCP_PRIVATE_FIELDS /* empty */

View File

@ -344,7 +344,12 @@ static void uv__finish_close(uv_handle_t* handle) {
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
uv__stream_destroy((uv_stream_t*)handle);
if (handle->flags & UV_HANDLE_WRITE_PENDING) {
handle->flags ^= UV_HANDLE_CLOSED;
uv__make_close_pending(handle); /* Back into the queue. */
return;
}
uv__stream_destroy((uv_stream_t *)handle);
break;
case UV_UDP:

View File

@ -216,16 +216,24 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) {
return ret;
case UV_INHERIT_FD:
fd = container->data.fd;
if (fd == -1)
return UV_EINVAL;
fds[1] = fd;
return 0;
case UV_INHERIT_STREAM:
if (container->flags & UV_INHERIT_FD)
fd = container->data.fd;
else
fd = uv__stream_fd(container->data.stream);
fd = uv__stream_fd(container->data.stream);
if (fd == -1)
return UV_EINVAL;
fds[1] = fd;
ret = uv__nonblock(fd, 0);
if (ret != 0)
return ret;
container->data.stream->flags |= UV_HANDLE_BLOCKING_WRITES;
return 0;
default:
@ -372,9 +380,6 @@ static void uv__process_child_init(const uv_process_options_t* options,
if (fd == -1)
uv__write_errno(error_fd);
if (fd <= 2 && close_fd == -1)
uv__nonblock_fcntl(fd, 0);
if (close_fd >= stdio_count)
uv__close(close_fd);
}
@ -625,10 +630,6 @@ static int uv__spawn_set_posix_spawn_file_actions(
assert(err != ENOSYS);
if (err != 0)
goto error;
/* Make sure the fd is marked as non-blocking (state shared between child
* and parent). */
uv__nonblock_fcntl(use_fd, 0);
}
/* Finally, close all the superfluous descriptors */

View File

@ -92,6 +92,7 @@ void uv__stream_init(uv_loop_t* loop,
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->blocked_write = NULL;
stream->delayed_error = 0;
uv__queue_init(&stream->write_queue);
uv__queue_init(&stream->write_completed_queue);
@ -455,6 +456,13 @@ void uv__stream_destroy(uv_stream_t* stream) {
assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
assert(stream->flags & UV_HANDLE_CLOSED);
if (stream->io_watcher.fd != -1) {
/* Don't close stdio file descriptors. Nothing good comes from it. */
if (stream->io_watcher.fd > STDERR_FILENO)
uv__close(stream->io_watcher.fd);
stream->io_watcher.fd = -1;
}
if (stream->connect_req) {
uv__req_unregister(stream->loop);
stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
@ -465,6 +473,11 @@ void uv__stream_destroy(uv_stream_t* stream) {
uv__write_callbacks(stream);
uv__drain(stream);
if (stream->blocked_write) {
uv__free(stream->blocked_write);
stream->blocked_write = NULL;
}
assert(stream->write_queue_size == 0);
}
@ -732,7 +745,8 @@ static void uv__write_req_finish(uv_write_t* req) {
* callback called in the near future.
*/
uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
uv__io_feed(stream->loop, &stream->io_watcher);
if (!(stream->flags & UV_HANDLE_CLOSING))
uv__io_feed(stream->loop, &stream->io_watcher);
}
@ -836,6 +850,59 @@ static int uv__try_write(uv_stream_t* stream,
return UV__ERR(errno);
}
static void uv__write_done(uv_fs_t* req) {
uv_stream_t* stream;
struct uv__queue* q;
uv_write_t* wreq;
stream = (uv_stream_t*)req->data;
stream->flags &= ~UV_HANDLE_WRITE_PENDING;
assert(!(uv__queue_empty(&stream->write_queue)));
q = uv__queue_head(&stream->write_queue);
wreq = uv__queue_data(q, uv_write_t, queue);
if (req->result >= 0) {
if (uv__write_req_update(stream, wreq, req->result))
uv__write_req_finish(wreq);
} else {
wreq->error = req->result;
uv__write_req_finish(wreq);
}
}
static int uv__write_submit(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle) {
uv_os_fd_t fd;
uv_fs_t* req;
ssize_t r;
if (send_handle || !(stream->flags & UV_HANDLE_BLOCKING_WRITES))
return uv__try_write(stream, bufs, nbufs, send_handle);
/* Is a blocking write already underway? */
if (stream->flags & UV_HANDLE_WRITE_PENDING)
return UV_EAGAIN;
stream->flags |= UV_HANDLE_WRITE_PENDING;
req = stream->blocked_write;
if (req == NULL) {
req = uv__malloc(sizeof *stream->blocked_write);
req->data = stream;
stream->blocked_write = req;
}
fd = uv__stream_fd(stream);
r = uv_fs_write(stream->loop, req, fd, bufs, nbufs, -1, uv__write_done);
if (r != 0) {
stream->flags &= ~UV_HANDLE_WRITE_PENDING;
return r;
}
return UV_EAGAIN;
}
static void uv__write(uv_stream_t* stream) {
struct uv__queue* q;
uv_write_t* req;
@ -858,10 +925,10 @@ static void uv__write(uv_stream_t* stream) {
req = uv__queue_data(q, uv_write_t, queue);
assert(req->handle == stream);
n = uv__try_write(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
n = uv__write_submit(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0) {
@ -876,10 +943,6 @@ static void uv__write(uv_stream_t* stream) {
} else if (n != UV_EAGAIN)
goto error;
/* If this is a blocking stream, try again. */
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
continue;
/* We're not done. */
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
@ -1384,12 +1447,6 @@ int uv_write2(uv_write_t* req,
uv__write(stream);
}
else {
/*
* blocking streams should never have anything in the queue.
* if this assert fires then somehow the blocking stream isn't being
* sufficiently flushed in uv__write.
*/
assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
@ -1533,13 +1590,6 @@ void uv__stream_close(uv_stream_t* handle) {
uv__handle_stop(handle);
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
if (handle->io_watcher.fd != -1) {
/* Don't close stdio file descriptors. Nothing good comes from it. */
if (handle->io_watcher.fd > STDERR_FILENO)
uv__close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;
}
if (handle->accepted_fd != -1) {
uv__close(handle->accepted_fd);
handle->accepted_fd = -1;
@ -1554,6 +1604,9 @@ void uv__stream_close(uv_stream_t* handle) {
handle->queued_fds = NULL;
}
if (handle->flags & UV_HANDLE_WRITE_PENDING)
uv_cancel((uv_req_t*)handle->blocked_write);
assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}

View File

@ -101,6 +101,7 @@ enum {
UV_HANDLE_READ_PENDING = 0x00010000,
UV_HANDLE_SYNC_BYPASS_IOCP = 0x00020000,
UV_HANDLE_ZERO_READ = 0x00040000,
UV_HANDLE_WRITE_PENDING = 0x00080000, /* UNIX only */
UV_HANDLE_EMULATE_IOCP = 0x00080000,
UV_HANDLE_BLOCKING_WRITES = 0x00100000,
UV_HANDLE_CANCELLATION_PENDING = 0x00200000,

View File

@ -128,6 +128,7 @@ TEST_DECLARE (tcp_bind_invalid_flags)
TEST_DECLARE (tcp_bind_writable_flags)
TEST_DECLARE (tcp_bind_or_listen_error_after_close)
TEST_DECLARE (tcp_listen_without_bind)
TEST_DECLARE (pipe_blocking_subprocess)
TEST_DECLARE (tcp_connect_error_fault)
TEST_DECLARE (tcp_connect6_error_fault)
TEST_DECLARE (tcp_connect6_link_local)
@ -753,6 +754,7 @@ TASK_LIST_START
TEST_ENTRY (tcp_bind_writable_flags)
TEST_ENTRY (tcp_bind_or_listen_error_after_close)
TEST_ENTRY (tcp_listen_without_bind)
TEST_ENTRY_CUSTOM (pipe_blocking_subprocess, 0, 0, 20000)
TEST_ENTRY (tcp_connect_error_fault)
TEST_ENTRY (tcp_connect6_error_fault)
TEST_ENTRY (tcp_connect6_link_local)

View File

@ -0,0 +1,117 @@
/* Copyright libuv contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#define FILL_PIPE_NUM 0x40000
static uv_loop_t* loop;
static char exepath[1024];
static size_t exepath_size = sizeof exepath;
static char* args[3];
static uv_process_options_t options;
static uv_process_t process;
static uv_stdio_container_t stdios[3];
static uv_pipe_t pipe_in;
static uv_pipe_t pipe_out;
static uv_file fds[2];
static size_t total_read;
static int write_complete;
static uv_write_t req;
static uv_buf_t buf;
static void write_cb(uv_write_t* req, int status) {
ASSERT_OK(status);
++write_complete;
}
static void exit_cb(uv_process_t* process,
int64_t exit_status,
int term_signal) {
ASSERT_EQ(1, exit_status);
ASSERT_OK(term_signal);
uv_close((uv_handle_t*) process, NULL);
ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb));
}
static void read_cb(uv_stream_t* stream,
ssize_t nread,
const uv_buf_t* buf) {
ASSERT_GE(nread, 0);
total_read += nread;
free(buf->base);
if (total_read == 12 + FILL_PIPE_NUM) {
uv_read_stop(stream);
uv_close((uv_handle_t*)&pipe_in, NULL);
uv_close((uv_handle_t*)&pipe_out, NULL);
}
}
static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
buf->base = malloc(size);
buf->len = size;
}
TEST_IMPL(pipe_blocking_subprocess) {
loop = uv_default_loop();
ASSERT_OK(uv_pipe_init(loop, &pipe_in, 0));
ASSERT_OK(uv_pipe_init(loop, &pipe_out, 0));
ASSERT_OK(uv_pipe(fds, 0, 0));
ASSERT_OK(uv_pipe_open(&pipe_out, fds[0]));
ASSERT_OK(uv_pipe_open(&pipe_in, fds[1]));
ASSERT_OK(uv_read_start((uv_stream_t*)&pipe_out, alloc_cb, read_cb));
ASSERT_OK(uv_exepath(exepath, &exepath_size));
exepath[exepath_size] = '\0';
args[0] = exepath;
args[1] = "spawn_helper2";
args[2] = NULL;
options.file = exepath;
options.args = args;
options.exit_cb = exit_cb;
options.flags = 0;
options.stdio_count = ARRAY_SIZE(stdios);
options.stdio = stdios;
stdios[0].flags = UV_IGNORE;
stdios[1].flags = UV_INHERIT_STREAM;
stdios[1].data.stream = (uv_stream_t*)&pipe_in;
stdios[2].flags = UV_IGNORE;
/* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */
ASSERT_OK(uv_spawn(loop, &process, &options));
/* Now write enough that the pipe buffer fills. */
buf.len = FILL_PIPE_NUM;
buf.base = malloc(buf.len);
memset(buf.base, 'A', buf.len);
ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
ASSERT_EQ(write_complete, 1);
ASSERT_EQ(total_read, buf.len + 12);
free(buf.base);
return 0;
}