stream: Implement cancellation support for uv_write_t

This adds a new function `uv_write_cancel` which requests the
cancellation of an in-progress write as soon as possible, possibly
causing a short write. The exact timing of the cancellation is stream type
and kernel dependent. A cancelled request will receive the ordinary write
callback with `UV_ECANCELED` and the new function `uv_write_nwritten`
can be used to determine if the write request was partially fulfilled.
This commit is contained in:
Keno Fischer 2025-12-11 06:22:40 +00:00
parent ec7ec98b70
commit d718a22ace
15 changed files with 461 additions and 4 deletions

View File

@ -662,6 +662,7 @@ if(LIBUV_BUILD_TESTS)
test/test-tcp-unexpected-read.c
test/test-tcp-write-after-connect.c
test/test-tcp-write-fail.c
test/test-write-cancel.c
test/test-tcp-write-queue-order.c
test/test-tcp-write-to-half-open-connection.c
test/test-tcp-writealot.c

View File

@ -289,6 +289,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-tcp-try-write.c \
test/test-tcp-write-in-a-row.c \
test/test-tcp-try-write-error.c \
test/test-write-cancel.c \
test/test-tcp-write-queue-order.c \
test/test-test-macros.c \
test/test-thread-equal.c \

View File

@ -217,7 +217,37 @@ API
where it returns ``UV_EAGAIN``.
.. versionadded:: 1.42.0
.. c:function:: int uv_write_cancel(uv_write_t* req)
Cancel a pending write request. The write callback will still be called,
with ``UV_ECANCELED`` status, or with the normal result if the write
completed before cancellation took effect.
Fully cancelled writes (where no bytes were written) may have their
callbacks called out of order with respect to other writes on the same
stream. Partial writes are completed in order.
Returns 0 on success. Currently expected to succeed for all valid write
requests, but may return an error code in future libuv versions.
Use :c:func:`uv_write_nwritten` from the write callback to determine how
many bytes were written before cancellation.
.. versionadded:: 1.52.0
.. c:function:: size_t uv_write_nwritten(const uv_write_t* req)
Returns the number of bytes written by a write request. Only valid when
called from within the write callback (:c:type:`uv_write_cb`).
This is primarily useful when a write has been cancelled via
:c:func:`uv_write_cancel` and the callback receives ``UV_ECANCELED``
status, to determine how many bytes were actually written before
cancellation.
.. versionadded:: 1.52.0
.. c:function:: int uv_is_readable(const uv_stream_t* handle)
Returns 1 if the stream is readable, 0 otherwise.

View File

@ -437,7 +437,7 @@ UV_EXTERN char* uv_err_name_r(int err, char* buf, size_t buflen);
/* read-only */ \
uv_req_type type; \
/* private */ \
void* reserved[6]; \
void* reserved[5]; \
UV_REQ_PRIVATE_FIELDS \
/* Abstract base class of all requests. */
@ -568,6 +568,34 @@ UV_EXTERN int uv_try_write2(uv_stream_t* handle,
unsigned int nbufs,
uv_stream_t* send_handle);
/*
* Returns the number of bytes written by a write request.
*
* NOTE: The result is only well-defined when called from within the
* write callback (uv_write_cb). The value is undefined if called at
* other times.
*
* This is primarily useful when a write has been cancelled via uv_write_cancel()
* and the callback receives UV_ECANCELED status, to determine how many
* bytes were actually written before cancellation.
*/
UV_EXTERN size_t uv_write_nwritten(const uv_write_t* req);
/*
* Cancel a pending write request.
*
* The write callback will still be called, with UV_ECANCELED status, or with
* the normal result if the write completed before cancellation took effect.
*
* Fully cancelled writes (where no bytes were written) may have their
* callbacks called out of order with respect to other writes on the same
* stream. Partial writes are completed in order.
*
* Returns 0 on success. Currently expected to succeed for all valid write
* requests, but may return an error code in future libuv versions.
*/
UV_EXTERN int uv_write_cancel(uv_write_t* req);
/* uv_write_t is a subclass of uv_req_t. */
struct uv_write_s {
UV_REQ_FIELDS

View File

@ -249,7 +249,13 @@ typedef struct {
#define UV_REQ_TYPE_PRIVATE /* empty */
#define UV_REQ_PRIVATE_FIELDS /* empty */
#define UV_REQ_PRIVATE_FIELDS \
union { \
void* reserved2[1]; \
struct { \
size_t nwritten; \
} write_extra; \
};
#define UV_PRIVATE_REQ_TYPES /* empty */

View File

@ -379,7 +379,13 @@ typedef struct {
WCHAR* name; \
} connect; \
} u; \
struct uv_req_s* next_req;
struct uv_req_s* next_req; \
union { \
void* reserved2[1]; \
struct { \
size_t nwritten; \
} write_extra; \
};
#define UV_WRITE_PRIVATE_FIELDS \
int coalesced; \

View File

@ -87,4 +87,13 @@ static inline void uv__queue_remove(struct uv__queue* q) {
q->next->prev = q->prev;
}
static inline int uv__queue_contains(const struct uv__queue* h,
const struct uv__queue* q) {
const struct uv__queue* p;
uv__queue_foreach(p, h)
if (p == q)
return 1;
return 0;
}
#endif /* QUEUE_H_ */

View File

@ -692,6 +692,7 @@ static int uv__write_req_update(uv_stream_t* stream,
assert(n <= stream->write_queue_size);
stream->write_queue_size -= n;
req->write_extra.nwritten += n;
buf = req->bufs + req->write_index;
@ -915,6 +916,9 @@ static void uv__write_callbacks(uv_stream_t* stream) {
uv__req_unregister(stream->loop);
if (req->bufs != NULL) {
/* bufs are non-NULL on errors (including cancel and stream close).
* Success path sets bufs to NULL after adjusting size in uv__write. */
assert(req->error != 0);
stream->write_queue_size -= uv__write_req_size(req);
if (req->bufs != req->bufsml)
uv__free(req->bufs);
@ -1356,6 +1360,7 @@ int uv_write2(uv_write_t* req,
req->handle = stream;
req->error = 0;
req->send_handle = send_handle;
req->write_extra.nwritten = 0;
uv__queue_init(&req->queue);
req->bufs = req->bufsml;
@ -1398,6 +1403,31 @@ int uv_write2(uv_write_t* req,
}
size_t uv_write_nwritten(const uv_write_t* req) {
return req->write_extra.nwritten;
}
int uv_write_cancel(uv_write_t* req) {
uv_stream_t* stream;
stream = req->handle;
/* Already completed, return success, callback will return success as well */
if (!uv__queue_contains(&stream->write_queue, &req->queue))
return 0;
uv__queue_remove(&req->queue);
req->error = UV_ECANCELED;
/* uv__write_callbacks will handle write_queue_size and freeing bufs. */
uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
uv__io_feed(stream->loop, &stream->io_watcher);
return 0;
}
/* The buffers to be written must remain valid until the callback is called.
* This is not required for the uv_buf_t array.
*/

View File

@ -1332,6 +1332,7 @@ static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
if (!result) {
SET_REQ_ERROR(req, GetLastError());
}
SET_REQ_NWRITTEN(req, bytes);
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
@ -1592,6 +1593,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
req->handle = (uv_stream_t*) handle;
req->send_handle = NULL;
req->cb = cb;
req->write_extra.nwritten = 0;
/* Private fields. */
req->coalesced = 0;
req->event_handle = NULL;
@ -1637,6 +1639,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
req->u.io.queued_bytes = 0;
}
SET_REQ_NWRITTEN(req, bytes);
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
@ -2176,11 +2179,15 @@ void uv__process_pipe_read_req(uv_loop_t* loop,
void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_write_t* req) {
int err;
size_t bytes_written;
assert(handle->type == UV_NAMED_PIPE);
assert(handle->write_queue_size >= req->u.io.queued_bytes);
handle->write_queue_size -= req->u.io.queued_bytes;
/* Ask the kernel how many bytes were actually written.
* N.B.: If the write was partially cancelled, this could differ from queued_bytes. */
bytes_written = req->u.io.overlapped.InternalHigh;
UNREGISTER_HANDLE_REQ(loop, handle);
@ -2204,6 +2211,9 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
req = coalesced_write->user_req;
uv__free(coalesced_write);
}
req->write_extra.nwritten += bytes_written;
if (req->cb) {
req->cb(req, uv_translate_sys_error(err));
}

View File

@ -34,6 +34,9 @@
#define SET_REQ_ERROR(req, error) \
SET_REQ_STATUS((req), NTSTATUS_FROM_WIN32((error)))
#define SET_REQ_NWRITTEN(req, nwritten) \
(req)->u.io.overlapped.InternalHigh = (ULONG_PTR) (nwritten);
/* Note: used open-coded in UV_REQ_INIT() because of a circular dependency
* between src/uv-common.h and src/win/internal.h.
*/

View File

@ -250,3 +250,54 @@ int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
return 0;
}
size_t uv_write_nwritten(const uv_write_t* req) {
return req->write_extra.nwritten;
}
int uv_write_cancel(uv_write_t* req) {
uv_stream_t* stream;
HANDLE handle;
BOOL result;
stream = req->handle;
switch (stream->type) {
case UV_TCP:
handle = (HANDLE)((uv_tcp_t*)stream)->socket;
break;
case UV_NAMED_PIPE:
handle = ((uv_pipe_t*)stream)->handle;
if ((stream->flags & (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
UV_HANDLE_NON_OVERLAPPED_PIPE) {
/* Non-overlapped, non-blocking writes run on the threadpool and we
do not have a good way to cancel them. */
return UV_ENOTSUP;
}
break;
case UV_TTY:
/* TTY writes complete synchronously on Windows, so cancellation
* is not applicable - the callback has already been queued. */
return 0;
default:
assert(0);
return UV_EINVAL;
}
result = CancelIoEx(handle, &req->u.io.overlapped);
if (!result) {
DWORD err = GetLastError();
if (err == ERROR_NOT_FOUND) {
/* The operation has already completed. */
return 0;
}
return uv_translate_sys_error(err);
}
return 0;
}

View File

@ -956,6 +956,7 @@ int uv__tcp_write(uv_loop_t* loop,
UV_REQ_INIT(req, UV_WRITE);
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->write_extra.nwritten = 0;
/* Prepare the overlapped structure. */
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
@ -1172,6 +1173,7 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
assert(handle->write_queue_size >= req->u.io.queued_bytes);
handle->write_queue_size -= req->u.io.queued_bytes;
req->write_extra.nwritten += req->u.io.overlapped.InternalHigh;
UNREGISTER_HANDLE_REQ(loop, handle);

View File

@ -2208,6 +2208,7 @@ int uv__tty_write(uv_loop_t* loop,
UV_REQ_INIT(req, UV_WRITE);
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->write_extra.nwritten = 0;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
@ -2217,6 +2218,7 @@ int uv__tty_write(uv_loop_t* loop,
if (!uv__tty_write_bufs(handle, bufs, nbufs, &error)) {
SET_REQ_SUCCESS(req);
req->write_extra.nwritten = uv__count_bufs(bufs, nbufs);
} else {
SET_REQ_ERROR(req, error);
}

View File

@ -111,6 +111,9 @@ TEST_DECLARE (tcp_try_write)
TEST_DECLARE (tcp_write_in_a_row)
TEST_DECLARE (tcp_try_write_error)
TEST_DECLARE (tcp_write_queue_order)
TEST_DECLARE (tcp_write_cancel)
TEST_DECLARE (tcp_write_nwritten)
TEST_DECLARE (pipe_write_nwritten)
TEST_DECLARE (tcp_open)
TEST_DECLARE (tcp_open_twice)
TEST_DECLARE (tcp_open_bound)
@ -723,6 +726,9 @@ TASK_LIST_START
TEST_ENTRY (tcp_try_write_error)
TEST_ENTRY (tcp_write_queue_order)
TEST_ENTRY (tcp_write_cancel)
TEST_ENTRY (tcp_write_nwritten)
TEST_ENTRY (pipe_write_nwritten)
TEST_ENTRY (tcp_open)
TEST_HELPER (tcp_open, tcp4_echo_server)

272
test/test-write-cancel.c Normal file
View File

@ -0,0 +1,272 @@
/* Copyright libuv project contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "uv.h"
#include "task.h"
#define REQ_COUNT 100
static uv_tcp_t server;
static uv_tcp_t client;
static uv_tcp_t incoming;
static int close_cb_called;
static int write_cb_called;
static int cancelled_count;
static int connected;
static int closing;
static uv_write_t write_reqs[REQ_COUNT];
static char buf_data[16 * 1024];
static void close_cb(uv_handle_t* handle) {
close_cb_called++;
}
static void connection_cb(uv_stream_t* tcp, int status) {
ASSERT_OK(status);
ASSERT_OK(uv_tcp_init(tcp->loop, &incoming));
ASSERT_OK(uv_accept(tcp, (uv_stream_t*) &incoming));
connected = 1;
}
static void write_cb(uv_write_t* req, int status) {
write_cb_called++;
if (status == UV_ECANCELED && !closing)
cancelled_count++;
if (cancelled_count >= 5 && !closing) {
closing = 1;
uv_close((uv_handle_t*) &client, close_cb);
uv_close((uv_handle_t*) &server, close_cb);
if (connected)
uv_close((uv_handle_t*) &incoming, close_cb);
}
}
static void connect_cb(uv_connect_t* req, int status) {
uv_buf_t buf;
int r;
int i;
int cancel_count;
ASSERT_OK(status);
buf = uv_buf_init(buf_data, sizeof(buf_data));
/* Queue many writes to fill the socket buffer */
for (i = 0; i < REQ_COUNT; i++) {
r = uv_write(&write_reqs[i],
req->handle,
&buf,
1,
write_cb);
ASSERT_OK(r);
}
/* Cancel the trailing writes which should be queued */
cancel_count = 0;
for (i = REQ_COUNT - 5; i < REQ_COUNT; i++) {
r = uv_write_cancel(&write_reqs[i]);
ASSERT_OK(r);
cancel_count++;
}
ASSERT_EQ(5, cancel_count);
}
TEST_IMPL(tcp_write_cancel) {
uv_connect_t connect_req;
struct sockaddr_in addr;
uv_loop_t* loop;
int buffer_size;
loop = uv_default_loop();
close_cb_called = 0;
write_cb_called = 0;
cancelled_count = 0;
connected = 0;
closing = 0;
buffer_size = sizeof(buf_data);
memset(buf_data, 'A', sizeof(buf_data));
ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
ASSERT_OK(uv_tcp_init(loop, &server));
ASSERT_OK(uv_tcp_bind(&server, (struct sockaddr*) &addr, 0));
ASSERT_OK(uv_listen((uv_stream_t*) &server, 128, connection_cb));
ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
ASSERT_OK(uv_tcp_init(loop, &client));
ASSERT_OK(uv_tcp_connect(&connect_req,
&client,
(struct sockaddr*) &addr,
connect_cb));
ASSERT_OK(uv_send_buffer_size((uv_handle_t*) &client, &buffer_size));
ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
/* The writes we cancelled should have gotten UV_ECANCELED callbacks */
ASSERT_EQ(5, cancelled_count);
ASSERT_EQ(2 + connected, close_cb_called);
MAKE_VALGRIND_HAPPY(loop);
return 0;
}
/*
* Test that uv_write_nwritten returns correct byte count on success.
*/
static uv_tcp_t nwritten_server;
static uv_tcp_t nwritten_client;
static uv_tcp_t nwritten_incoming;
static uv_write_t nwritten_req;
static int nwritten_cb_called;
static size_t nwritten_value;
static char nwritten_buf_data[1024];
static void nwritten_close_cb(uv_handle_t* handle) {
close_cb_called++;
}
static void nwritten_write_cb(uv_write_t* req, int status) {
ASSERT_OK(status);
nwritten_cb_called++;
nwritten_value = uv_write_nwritten(req);
uv_close((uv_handle_t*) &nwritten_client, nwritten_close_cb);
uv_close((uv_handle_t*) &nwritten_server, nwritten_close_cb);
uv_close((uv_handle_t*) &nwritten_incoming, nwritten_close_cb);
}
static void nwritten_connection_cb(uv_stream_t* tcp, int status) {
ASSERT_OK(status);
ASSERT_OK(uv_tcp_init(tcp->loop, &nwritten_incoming));
ASSERT_OK(uv_accept(tcp, (uv_stream_t*) &nwritten_incoming));
}
static void nwritten_connect_cb(uv_connect_t* req, int status) {
uv_buf_t buf;
ASSERT_OK(status);
buf = uv_buf_init(nwritten_buf_data, sizeof(nwritten_buf_data));
ASSERT_OK(uv_write(&nwritten_req, req->handle, &buf, 1, nwritten_write_cb));
}
TEST_IMPL(tcp_write_nwritten) {
uv_connect_t connect_req;
struct sockaddr_in addr;
uv_loop_t* loop;
loop = uv_default_loop();
close_cb_called = 0;
nwritten_cb_called = 0;
nwritten_value = 0;
memset(nwritten_buf_data, 'B', sizeof(nwritten_buf_data));
ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
ASSERT_OK(uv_tcp_init(loop, &nwritten_server));
ASSERT_OK(uv_tcp_bind(&nwritten_server, (struct sockaddr*) &addr, 0));
ASSERT_OK(uv_listen((uv_stream_t*) &nwritten_server, 128, nwritten_connection_cb));
ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
ASSERT_OK(uv_tcp_init(loop, &nwritten_client));
ASSERT_OK(uv_tcp_connect(&connect_req,
&nwritten_client,
(struct sockaddr*) &addr,
nwritten_connect_cb));
ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
ASSERT_EQ(1, nwritten_cb_called);
ASSERT_EQ(sizeof(nwritten_buf_data), nwritten_value);
ASSERT_EQ(3, close_cb_called);
MAKE_VALGRIND_HAPPY(loop);
return 0;
}
/*
* Test that uv_write_nwritten returns correct byte count for pipes.
*/
static uv_pipe_t pipe_client;
static uv_pipe_t pipe_server;
static uv_write_t pipe_write_req;
static int pipe_cb_called;
static size_t pipe_nwritten_value;
static char pipe_buf_data[1024];
static void pipe_close_cb(uv_handle_t* handle) {
close_cb_called++;
}
static void pipe_write_cb(uv_write_t* req, int status) {
ASSERT_OK(status);
pipe_cb_called++;
pipe_nwritten_value = uv_write_nwritten(req);
uv_close((uv_handle_t*) &pipe_client, pipe_close_cb);
uv_close((uv_handle_t*) &pipe_server, pipe_close_cb);
}
TEST_IMPL(pipe_write_nwritten) {
uv_loop_t* loop;
uv_buf_t buf;
int fds[2];
loop = uv_default_loop();
close_cb_called = 0;
pipe_cb_called = 0;
pipe_nwritten_value = 0;
memset(pipe_buf_data, 'C', sizeof(pipe_buf_data));
ASSERT_OK(uv_pipe(fds, 0, 0));
ASSERT_OK(uv_pipe_init(loop, &pipe_client, 0));
ASSERT_OK(uv_pipe_init(loop, &pipe_server, 0));
ASSERT_OK(uv_pipe_open(&pipe_client, fds[1]));
ASSERT_OK(uv_pipe_open(&pipe_server, fds[0]));
buf = uv_buf_init(pipe_buf_data, sizeof(pipe_buf_data));
ASSERT_OK(uv_write(&pipe_write_req,
(uv_stream_t*) &pipe_client,
&buf,
1,
pipe_write_cb));
ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
ASSERT_EQ(1, pipe_cb_called);
ASSERT_EQ(sizeof(pipe_buf_data), pipe_nwritten_value);
ASSERT_EQ(2, close_cb_called);
MAKE_VALGRIND_HAPPY(loop);
return 0;
}