Add pipe_blocking_cancel to test cancelling blocking write on close

This commit is contained in:
Sam Schweigel 2025-12-16 11:02:21 -08:00
parent a3c9a98e95
commit d442f47919
2 changed files with 80 additions and 20 deletions

View File

@ -128,6 +128,7 @@ TEST_DECLARE (tcp_bind_invalid_flags)
TEST_DECLARE (tcp_bind_writable_flags)
TEST_DECLARE (tcp_bind_or_listen_error_after_close)
TEST_DECLARE (tcp_listen_without_bind)
TEST_DECLARE (pipe_blocking_cancel)
TEST_DECLARE (pipe_blocking_subprocess)
TEST_DECLARE (tcp_connect_error_fault)
TEST_DECLARE (tcp_connect6_error_fault)
@ -754,6 +755,7 @@ TASK_LIST_START
TEST_ENTRY (tcp_bind_writable_flags)
TEST_ENTRY (tcp_bind_or_listen_error_after_close)
TEST_ENTRY (tcp_listen_without_bind)
TEST_ENTRY_CUSTOM (pipe_blocking_cancel, 0, 0, 20000)
TEST_ENTRY_CUSTOM (pipe_blocking_subprocess, 0, 0, 20000)
TEST_ENTRY (tcp_connect_error_fault)
TEST_ENTRY (tcp_connect6_error_fault)

View File

@ -38,19 +38,16 @@ static size_t total_read;
static int write_complete;
static uv_write_t req;
static uv_buf_t buf;
static uv_timer_t timer;
static int closed_streams;
static void write_cb(uv_write_t* req, int status) {
ASSERT_OK(status);
++write_complete;
static void close_cb(uv_handle_t *handle) {
++closed_streams;
}
static void exit_cb(uv_process_t* process,
int64_t exit_status,
int term_signal) {
ASSERT_EQ(1, exit_status);
ASSERT_OK(term_signal);
uv_close((uv_handle_t*) process, NULL);
ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb));
static void write_cb_ok(uv_write_t* req, int status) {
ASSERT_OK(status);
++write_complete;
}
static void read_cb(uv_stream_t* stream,
@ -61,8 +58,8 @@ static void read_cb(uv_stream_t* stream,
free(buf->base);
if (total_read == 12 + FILL_PIPE_NUM) {
uv_read_stop(stream);
uv_close((uv_handle_t*)&pipe_in, NULL);
uv_close((uv_handle_t*)&pipe_out, NULL);
uv_close((uv_handle_t*)&pipe_in, close_cb);
uv_close((uv_handle_t*)&pipe_out, close_cb);
}
}
@ -71,8 +68,7 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
buf->len = size;
}
TEST_IMPL(pipe_blocking_subprocess) {
static void init_common(void) {
loop = uv_default_loop();
ASSERT_OK(uv_pipe_init(loop, &pipe_in, 0));
@ -81,8 +77,6 @@ TEST_IMPL(pipe_blocking_subprocess) {
ASSERT_OK(uv_pipe_open(&pipe_out, fds[0]));
ASSERT_OK(uv_pipe_open(&pipe_in, fds[1]));
ASSERT_OK(uv_read_start((uv_stream_t*)&pipe_out, alloc_cb, read_cb));
ASSERT_OK(uv_exepath(exepath, &exepath_size));
exepath[exepath_size] = '\0';
args[0] = exepath;
@ -90,7 +84,6 @@ TEST_IMPL(pipe_blocking_subprocess) {
args[2] = NULL;
options.file = exepath;
options.args = args;
options.exit_cb = exit_cb;
options.flags = 0;
options.stdio_count = ARRAY_SIZE(stdios);
options.stdio = stdios;
@ -98,18 +91,83 @@ TEST_IMPL(pipe_blocking_subprocess) {
stdios[1].flags = UV_INHERIT_STREAM;
stdios[1].data.stream = (uv_stream_t*)&pipe_in;
stdios[2].flags = UV_IGNORE;
}
/* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */
ASSERT_OK(uv_spawn(loop, &process, &options));
/* After the subprocess exits, fill the pipe buffer. */
static void exit_cb_write(uv_process_t* process,
int64_t exit_status,
int term_signal) {
ASSERT_EQ(1, exit_status);
ASSERT_OK(term_signal);
uv_close((uv_handle_t*) process, NULL);
ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb_ok));
}
/* Now write enough that the pipe buffer fills. */
TEST_IMPL(pipe_blocking_subprocess) {
init_common();
options.exit_cb = exit_cb_write;
ASSERT_OK(uv_read_start((uv_stream_t*)&pipe_out, alloc_cb, read_cb));
/* Write enough that the pipe buffer fills. */
buf.len = FILL_PIPE_NUM;
buf.base = malloc(buf.len);
memset(buf.base, 'A', buf.len);
/* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */
ASSERT_OK(uv_spawn(loop, &process, &options));
ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
ASSERT_EQ(write_complete, 1);
ASSERT_EQ(total_read, buf.len + 12);
ASSERT_EQ(closed_streams, 2);
free(buf.base);
return 0;
}
static void timer_cb(uv_timer_t *handle) {
/* The write has begun by now. We'll close the write side of the pipe first,
* since closing the read side will trigger SIGPIPE. */
uv_close((uv_handle_t*) &pipe_in, close_cb);
uv_close((uv_handle_t*) &timer, NULL);
}
static void write_cb_cancel(uv_write_t* req, int status) {
ASSERT(status == UV_ECANCELED || status == UV_EPIPE);
++write_complete;
uv_close((uv_handle_t*) &pipe_out, close_cb);
}
/* After the subprocess exits, fill the pipe buffer and to cancel the write. */
static void exit_cb_cancel(uv_process_t* process,
int64_t exit_status,
int term_signal) {
ASSERT_EQ(1, exit_status);
ASSERT_OK(term_signal);
uv_close((uv_handle_t*) process, NULL);
ASSERT_OK(uv_write(&req, (uv_stream_t*)&pipe_in, &buf, 1, write_cb_cancel));
ASSERT_OK(uv_timer_start(&timer, timer_cb, 100, 0));
}
TEST_IMPL(pipe_blocking_cancel) {
init_common();
options.exit_cb = exit_cb_cancel;
uv_timer_init(loop, &timer);
/* Write enough that the pipe buffer fills. */
buf.len = FILL_PIPE_NUM;
buf.base = malloc(buf.len);
memset(buf.base, 'A', buf.len);
/* The subprocess forces fds[0] into blocking mode and writes 12 bytes. */
ASSERT_OK(uv_spawn(loop, &process, &options));
ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
ASSERT_EQ(write_complete, 1);
ASSERT_EQ(closed_streams, 2);
free(buf.base);