From d442f479198f075c230cc19f42bb1335dce52a93 Mon Sep 17 00:00:00 2001 From: Sam Schweigel Date: Tue, 16 Dec 2025 11:02:21 -0800 Subject: [PATCH] Add pipe_blocking_cancel to test cancelling blocking write on close --- test/test-list.h | 2 + test/test-pipe-blocking-subprocess.c | 98 ++++++++++++++++++++++------ 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/test/test-list.h b/test/test-list.h index f22f8cdf8..c3915248d 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -128,6 +128,7 @@ TEST_DECLARE (tcp_bind_invalid_flags) TEST_DECLARE (tcp_bind_writable_flags) TEST_DECLARE (tcp_bind_or_listen_error_after_close) TEST_DECLARE (tcp_listen_without_bind) +TEST_DECLARE (pipe_blocking_cancel) TEST_DECLARE (pipe_blocking_subprocess) TEST_DECLARE (tcp_connect_error_fault) TEST_DECLARE (tcp_connect6_error_fault) @@ -754,6 +755,7 @@ TASK_LIST_START TEST_ENTRY (tcp_bind_writable_flags) TEST_ENTRY (tcp_bind_or_listen_error_after_close) TEST_ENTRY (tcp_listen_without_bind) + TEST_ENTRY_CUSTOM (pipe_blocking_cancel, 0, 0, 20000) TEST_ENTRY_CUSTOM (pipe_blocking_subprocess, 0, 0, 20000) TEST_ENTRY (tcp_connect_error_fault) TEST_ENTRY (tcp_connect6_error_fault) diff --git a/test/test-pipe-blocking-subprocess.c b/test/test-pipe-blocking-subprocess.c index d89933f8d..9a181d018 100644 --- a/test/test-pipe-blocking-subprocess.c +++ b/test/test-pipe-blocking-subprocess.c @@ -38,19 +38,16 @@ static size_t total_read; static int write_complete; static uv_write_t req; static uv_buf_t buf; +static uv_timer_t timer; +static int closed_streams; -static void write_cb(uv_write_t* req, int status) { - ASSERT_OK(status); - ++write_complete; +static void close_cb(uv_handle_t *handle) { + ++closed_streams; } -static void exit_cb(uv_process_t* process, - int64_t exit_status, - int term_signal) { - ASSERT_EQ(1, exit_status); - ASSERT_OK(term_signal); - uv_close((uv_handle_t*) process, NULL); - ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb)); +static void write_cb_ok(uv_write_t* req, int status) { + ASSERT_OK(status); + ++write_complete; } static void read_cb(uv_stream_t* stream, @@ -61,8 +58,8 @@ static void read_cb(uv_stream_t* stream, free(buf->base); if (total_read == 12 + FILL_PIPE_NUM) { uv_read_stop(stream); - uv_close((uv_handle_t*)&pipe_in, NULL); - uv_close((uv_handle_t*)&pipe_out, NULL); + uv_close((uv_handle_t*)&pipe_in, close_cb); + uv_close((uv_handle_t*)&pipe_out, close_cb); } } @@ -71,8 +68,7 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { buf->len = size; } -TEST_IMPL(pipe_blocking_subprocess) { - +static void init_common(void) { loop = uv_default_loop(); ASSERT_OK(uv_pipe_init(loop, &pipe_in, 0)); @@ -81,8 +77,6 @@ TEST_IMPL(pipe_blocking_subprocess) { ASSERT_OK(uv_pipe_open(&pipe_out, fds[0])); ASSERT_OK(uv_pipe_open(&pipe_in, fds[1])); - ASSERT_OK(uv_read_start((uv_stream_t*)&pipe_out, alloc_cb, read_cb)); - ASSERT_OK(uv_exepath(exepath, &exepath_size)); exepath[exepath_size] = '\0'; args[0] = exepath; @@ -90,7 +84,6 @@ TEST_IMPL(pipe_blocking_subprocess) { args[2] = NULL; options.file = exepath; options.args = args; - options.exit_cb = exit_cb; options.flags = 0; options.stdio_count = ARRAY_SIZE(stdios); options.stdio = stdios; @@ -98,18 +91,83 @@ TEST_IMPL(pipe_blocking_subprocess) { stdios[1].flags = UV_INHERIT_STREAM; stdios[1].data.stream = (uv_stream_t*)&pipe_in; stdios[2].flags = UV_IGNORE; +} - /* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */ - ASSERT_OK(uv_spawn(loop, &process, &options)); +/* After the subprocess exits, fill the pipe buffer. */ +static void exit_cb_write(uv_process_t* process, + int64_t exit_status, + int term_signal) { + ASSERT_EQ(1, exit_status); + ASSERT_OK(term_signal); + uv_close((uv_handle_t*) process, NULL); + ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb_ok)); +} - /* Now write enough that the pipe buffer fills. */ +TEST_IMPL(pipe_blocking_subprocess) { + init_common(); + options.exit_cb = exit_cb_write; + + ASSERT_OK(uv_read_start((uv_stream_t*)&pipe_out, alloc_cb, read_cb)); + + /* Write enough that the pipe buffer fills. */ buf.len = FILL_PIPE_NUM; buf.base = malloc(buf.len); memset(buf.base, 'A', buf.len); + /* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */ + ASSERT_OK(uv_spawn(loop, &process, &options)); + ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); ASSERT_EQ(write_complete, 1); ASSERT_EQ(total_read, buf.len + 12); + ASSERT_EQ(closed_streams, 2); + + free(buf.base); + + return 0; +} + +static void timer_cb(uv_timer_t *handle) { + /* The write has begun by now. We'll close the write side of the pipe first, + * since closing the read side will trigger SIGPIPE. */ + uv_close((uv_handle_t*) &pipe_in, close_cb); + uv_close((uv_handle_t*) &timer, NULL); +} + +static void write_cb_cancel(uv_write_t* req, int status) { + ASSERT(status == UV_ECANCELED || status == UV_EPIPE); + ++write_complete; + uv_close((uv_handle_t*) &pipe_out, close_cb); +} + +/* After the subprocess exits, fill the pipe buffer and to cancel the write. */ +static void exit_cb_cancel(uv_process_t* process, + int64_t exit_status, + int term_signal) { + ASSERT_EQ(1, exit_status); + ASSERT_OK(term_signal); + uv_close((uv_handle_t*) process, NULL); + ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb_cancel)); + ASSERT_OK(uv_timer_start(&timer, timer_cb, 100, 0)); +} + +TEST_IMPL(pipe_blocking_cancel) { + init_common(); + options.exit_cb = exit_cb_cancel; + + uv_timer_init(loop, &timer); + + /* Write enough that the pipe buffer fills. */ + buf.len = FILL_PIPE_NUM; + buf.base = malloc(buf.len); + memset(buf.base, 'A', buf.len); + + /* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */ + ASSERT_OK(uv_spawn(loop, &process, &options)); + + ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); + ASSERT_EQ(write_complete, 1); + ASSERT_EQ(closed_streams, 2); free(buf.base);