win: track each write_req in a queue instead of just a count (#5059)

Replace `write_reqs_pending` (unsigned int counter) in
`uv_stream_connection_fields` with `write_queue` (struct uv__queue),
mirroring the Unix stream implementation. Add `struct uv__queue queue`
to `UV_WRITE_PRIVATE_FIELDS` so each `uv_write_t` can be linked.

This enables iterating over in-flight write requests individually, which
is used in `uv__tcp_try_cancel_reqs` to replace the blanket `CancelIo`
call with per-request `CancelIoEx` calls (resolving the TODO comment
that anticipated this change).

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Jameson Nash 2026-03-16 17:36:41 -04:00 committed by GitHub
parent 5ecd86c531
commit e1d6e62a4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 47 additions and 36 deletions

View File

@ -260,6 +260,7 @@ typedef struct {
struct uv_req_s* next_req;
#define UV_WRITE_PRIVATE_FIELDS \
struct uv__queue queue; \
int coalesced; \
uv_buf_t write_buffer; \
HANDLE event_handle; \
@ -297,7 +298,7 @@ typedef struct {
} uv_read_t;
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
struct uv__queue write_queue; \
uv_shutdown_t* shutdown_req;
#define uv_stream_server_fields \

View File

@ -95,7 +95,7 @@ enum {
UV_HANDLE_WRITABLE = 0x00008000,
UV_HANDLE_READ_PENDING = 0x00010000,
UV_HANDLE_SYNC_BYPASS_IOCP = 0x00020000,
/*UV_HANDLE_FLAG_UNUSED = 0x00040000,*/
UV_HANDLE_IN_WRITE_CB = 0x00040000,
UV_HANDLE_EMULATE_IOCP = 0x00080000,
UV_HANDLE_BLOCKING_WRITES = 0x00100000,
UV_HANDLE_CANCELLATION_PENDING = 0x00200000,

View File

@ -1512,13 +1512,13 @@ static int uv__pipe_write_data(uv_loop_t* loop,
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = write_buf;
uv__insert_non_overlapped_write_req(handle, req);
if (handle->stream.conn.write_reqs_pending == 0) {
if (uv__queue_empty(&handle->stream.conn.write_queue)) {
uv__queue_non_overlapped_write(handle);
}
@ -1560,7 +1560,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
return 0;
} else {
result = WriteFile(handle->handle,
@ -1593,7 +1593,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
return 0;
}
@ -2061,6 +2061,7 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
/* If this was a coalesced write, extract pointer to the user_provided
* uv_write_t structure so we can pass the expected pointer to the callback,
* then free the heap-allocated write req. */
uv__queue_remove(&req->queue);
if (req->coalesced) {
uv__coalesced_write_t* coalesced_write =
container_of(req, uv__coalesced_write_t, req);
@ -2068,14 +2069,14 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv__free(coalesced_write);
}
if (req->cb) {
handle->flags |= UV_HANDLE_IN_WRITE_CB;
req->cb(req, uv_translate_sys_error(err));
handle->flags &= ~UV_HANDLE_IN_WRITE_CB;
}
handle->stream.conn.write_reqs_pending--;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
handle->pipe.conn.non_overlapped_writes_tail) {
assert(handle->stream.conn.write_reqs_pending > 0);
assert(!uv__queue_empty(&handle->stream.conn.write_queue));
uv__queue_non_overlapped_write(handle);
}

View File

@ -37,7 +37,7 @@ INLINE static void uv__stream_init(uv_loop_t* loop,
handle->write_queue_size = 0;
handle->activecnt = 0;
handle->stream.conn.shutdown_req = NULL;
handle->stream.conn.write_reqs_pending = 0;
uv__queue_init(&handle->stream.conn.write_queue);
UV_REQ_INIT(loop, &handle->read_req, UV_READ);
handle->read_req.event_handle = NULL;

View File

@ -221,7 +221,8 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle);
if (handle->stream.conn.write_reqs_pending == 0)
if (!(handle->flags & UV_HANDLE_IN_WRITE_CB) &&
uv__queue_empty(&handle->stream.conn.write_queue))
uv__insert_pending_req(loop, (uv_req_t*) req);
return 0;

View File

@ -205,7 +205,7 @@ void uv__process_tcp_shutdown_req(uv_loop_t* loop, uv_tcp_t* stream, uv_shutdown
int err;
assert(req);
assert(stream->stream.conn.write_reqs_pending == 0);
assert(uv__queue_empty(&stream->stream.conn.write_queue));
assert(!(stream->flags & UV_HANDLE_SHUT));
assert(stream->flags & UV_HANDLE_CONNECTION);
@ -919,14 +919,14 @@ int uv__tcp_write(uv_loop_t* loop,
/* Request completed immediately. */
req->u.io.queued_bytes = 0;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
uv__insert_pending_req(loop, (uv_req_t*) req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
/* Request queued by the kernel. */
req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
handle->write_queue_size += req->u.io.queued_bytes;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
@ -940,7 +940,7 @@ int uv__tcp_write(uv_loop_t* loop,
/* Send failed due to an error, report it later */
req->u.io.queued_bytes = 0;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
SET_REQ_ERROR(req, WSAGetLastError());
uv__insert_pending_req(loop, (uv_req_t*) req);
@ -956,7 +956,7 @@ int uv__tcp_try_write(uv_tcp_t* handle,
int result;
DWORD bytes;
if (handle->stream.conn.write_reqs_pending > 0)
if (!uv__queue_empty(&handle->stream.conn.write_queue))
return UV_EAGAIN;
result = WSASend(handle->socket,
@ -1083,7 +1083,7 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
assert(handle->write_queue_size >= req->u.io.queued_bytes);
handle->write_queue_size -= req->u.io.queued_bytes;
uv__queue_remove(&req->queue);
UNREGISTER_HANDLE_REQ(loop, handle);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
@ -1103,11 +1103,12 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
/* use UV_ECANCELED for consistency with Unix */
err = UV_ECANCELED;
}
handle->flags |= UV_HANDLE_IN_WRITE_CB;
req->cb(req, err);
handle->flags &= ~UV_HANDLE_IN_WRITE_CB;
}
handle->stream.conn.write_reqs_pending--;
if (handle->stream.conn.write_reqs_pending == 0) {
if (uv__queue_empty(&handle->stream.conn.write_queue)) {
if (handle->flags & UV_HANDLE_CLOSING) {
closesocket(handle->socket);
handle->socket = INVALID_SOCKET;
@ -1360,19 +1361,22 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
int non_ifs_lsp;
int reading;
int writing;
struct uv__queue* q;
socket = tcp->socket;
reading = tcp->flags & UV_HANDLE_READ_PENDING;
writing = tcp->stream.conn.write_reqs_pending > 0;
writing = !uv__queue_empty(&tcp->stream.conn.write_queue);
if (!reading && !writing)
return;
/* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel
* them each explicitly with CancelIoEx (like unix). */
if (reading)
CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
if (writing)
CancelIo((HANDLE) socket);
if (writing) {
uv__queue_foreach(q, &tcp->stream.conn.write_queue) {
uv_write_t* wr = uv__queue_data(q, uv_write_t, queue);
CancelIoEx((HANDLE) socket, &wr->u.io.overlapped);
}
}
/* Check if we have any non-IFS LSPs stacked on top of TCP */
non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
@ -1391,7 +1395,7 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
&bytes,
NULL,
NULL) != 0) {
/* Failed. We can't do CancelIo. */
/* Failed. We can't do CancelIoEx. */
return;
}
}
@ -1401,8 +1405,12 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
if (socket != tcp->socket) {
if (reading)
CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
if (writing)
CancelIo((HANDLE) socket);
if (writing) {
uv__queue_foreach(q, &tcp->stream.conn.write_queue) {
uv_write_t* wr = uv__queue_data(q, uv_write_t, queue);
CancelIoEx((HANDLE) socket, &wr->u.io.overlapped);
}
}
}
}
@ -1443,7 +1451,7 @@ void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
* first (which typically should be cancellations). There's not much we can
* do about canceled reads, which also will generate an RST packet. */
if (!(tcp->flags & UV_HANDLE_CONNECTION) ||
tcp->stream.conn.write_reqs_pending == 0) {
uv__queue_empty(&tcp->stream.conn.write_queue)) {
closesocket(tcp->socket);
tcp->socket = INVALID_SOCKET;
}

View File

@ -2166,7 +2166,7 @@ int uv__tty_write(uv_loop_t* loop,
req->cb = cb;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
req->u.io.queued_bytes = 0;
@ -2188,7 +2188,7 @@ int uv__tty_try_write(uv_tty_t* handle,
unsigned int nbufs) {
DWORD error;
if (handle->stream.conn.write_reqs_pending > 0)
if (!uv__queue_empty(&handle->stream.conn.write_queue))
return UV_EAGAIN;
if (uv__tty_write_bufs(handle, bufs, nbufs, &error))
@ -2204,15 +2204,17 @@ void uv__process_tty_write_req(uv_loop_t* loop, uv_tty_t* handle,
handle->write_queue_size -= req->u.io.queued_bytes;
UNREGISTER_HANDLE_REQ(loop, handle);
uv__queue_remove(&req->queue);
if (req->cb) {
err = GET_REQ_ERROR(req);
handle->flags |= UV_HANDLE_IN_WRITE_CB;
req->cb(req, uv_translate_sys_error(err));
handle->flags &= ~UV_HANDLE_IN_WRITE_CB;
}
handle->stream.conn.write_reqs_pending--;
if (handle->stream.conn.write_reqs_pending == 0 &&
if (uv__queue_empty(&handle->stream.conn.write_queue) &&
uv__is_stream_shutting(handle))
uv__process_tty_shutdown_req(loop,
handle,
@ -2237,7 +2239,7 @@ void uv__tty_close(uv_tty_t* handle) {
void uv__process_tty_shutdown_req(uv_loop_t* loop, uv_tty_t* stream, uv_shutdown_t* req) {
assert(stream->stream.conn.write_reqs_pending == 0);
assert(uv__queue_empty(&stream->stream.conn.write_queue));
assert(req);
stream->stream.conn.shutdown_req = NULL;

View File

@ -143,9 +143,7 @@ static void after_read(uv_stream_t* handle,
ASSERT_NOT_NULL(wr);
wr->buf = uv_buf_init(buf->base, nread);
if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) {
FATAL("uv_write failed");
}
ASSERT_OK(uv_write(&wr->req, handle, &wr->buf, 1, after_write));
if (shutdown) {
sreq = malloc(sizeof* sreq);