win: track each write_req in a queue instead of just a count

Replace `write_reqs_pending` (unsigned int counter) in
`uv_stream_connection_fields` with `write_queue` (struct uv__queue),
mirroring the Unix stream implementation. Add `struct uv__queue queue`
to `UV_WRITE_PRIVATE_FIELDS` so each `uv_write_t` can be linked.

This enables iterating over in-flight write requests individually, which
is used in `uv__tcp_try_cancel_reqs` to replace the blanket `CancelIo`
call with per-request `CancelIoEx` calls (resolving the TODO comment
that anticipated this change).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Jameson Nash 2026-03-12 18:33:42 +00:00
parent e5f79e487b
commit 695043fa18
6 changed files with 40 additions and 33 deletions

View File

@ -260,6 +260,7 @@ typedef struct {
struct uv_req_s* next_req;
#define UV_WRITE_PRIVATE_FIELDS \
struct uv__queue queue; \
int coalesced; \
uv_buf_t write_buffer; \
HANDLE event_handle; \
@ -297,7 +298,7 @@ typedef struct {
} uv_read_t;
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
struct uv__queue write_queue; \
uv_shutdown_t* shutdown_req;
#define uv_stream_server_fields \

View File

@ -560,7 +560,7 @@ void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
assert(handle->flags & UV_HANDLE_CONNECTION);
assert(req != NULL);
assert(handle->stream.conn.write_reqs_pending == 0);
assert(uv__queue_empty(&handle->stream.conn.write_queue));
SET_REQ_SUCCESS(req);
if (handle->flags & UV_HANDLE_CLOSING) {
@ -1600,13 +1600,13 @@ static int uv__pipe_write_data(uv_loop_t* loop,
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = write_buf;
uv__insert_non_overlapped_write_req(handle, req);
if (handle->stream.conn.write_reqs_pending == 0) {
if (uv__queue_empty(&handle->stream.conn.write_queue)) {
uv__queue_non_overlapped_write(handle);
}
@ -1648,7 +1648,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
return 0;
} else {
result = WriteFile(handle->handle,
@ -1681,7 +1681,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
return 0;
}
@ -2158,6 +2158,7 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
/* If this was a coalesced write, extract pointer to the user_provided
* uv_write_t structure so we can pass the expected pointer to the callback,
* then free the heap-allocated write req. */
uv__queue_remove(&req->queue);
if (req->coalesced) {
uv__coalesced_write_t* coalesced_write =
container_of(req, uv__coalesced_write_t, req);
@ -2168,15 +2169,13 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
req->cb(req, uv_translate_sys_error(err));
}
handle->stream.conn.write_reqs_pending--;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
handle->pipe.conn.non_overlapped_writes_tail) {
assert(handle->stream.conn.write_reqs_pending > 0);
assert(!uv__queue_empty(&handle->stream.conn.write_queue));
uv__queue_non_overlapped_write(handle);
}
if (handle->stream.conn.write_reqs_pending == 0 &&
if (uv__queue_empty(&handle->stream.conn.write_queue) &&
uv__is_stream_shutting(handle))
uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);

View File

@ -37,7 +37,7 @@ INLINE static void uv__stream_init(uv_loop_t* loop,
handle->write_queue_size = 0;
handle->activecnt = 0;
handle->stream.conn.shutdown_req = NULL;
handle->stream.conn.write_reqs_pending = 0;
uv__queue_init(&handle->stream.conn.write_queue);
UV_REQ_INIT(loop, &handle->read_req, UV_READ);
handle->read_req.event_handle = NULL;

View File

@ -218,7 +218,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle);
if (handle->stream.conn.write_reqs_pending == 0) {
if (uv__queue_empty(&handle->stream.conn.write_queue)) {
if (handle->type == UV_NAMED_PIPE)
uv__pipe_shutdown(loop, (uv_pipe_t*) handle, req);
else

View File

@ -205,7 +205,7 @@ void uv__process_tcp_shutdown_req(uv_loop_t* loop, uv_tcp_t* stream, uv_shutdown
int err;
assert(req);
assert(stream->stream.conn.write_reqs_pending == 0);
assert(uv__queue_empty(&stream->stream.conn.write_queue));
assert(!(stream->flags & UV_HANDLE_SHUT));
assert(stream->flags & UV_HANDLE_CONNECTION);
@ -919,14 +919,14 @@ int uv__tcp_write(uv_loop_t* loop,
/* Request completed immediately. */
req->u.io.queued_bytes = 0;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
uv__insert_pending_req(loop, (uv_req_t*) req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
/* Request queued by the kernel. */
req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
handle->write_queue_size += req->u.io.queued_bytes;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
@ -940,7 +940,7 @@ int uv__tcp_write(uv_loop_t* loop,
/* Send failed due to an error, report it later */
req->u.io.queued_bytes = 0;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
SET_REQ_ERROR(req, WSAGetLastError());
uv__insert_pending_req(loop, (uv_req_t*) req);
@ -956,7 +956,7 @@ int uv__tcp_try_write(uv_tcp_t* handle,
int result;
DWORD bytes;
if (handle->stream.conn.write_reqs_pending > 0)
if (!uv__queue_empty(&handle->stream.conn.write_queue))
return UV_EAGAIN;
result = WSASend(handle->socket,
@ -1106,8 +1106,8 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
req->cb(req, err);
}
handle->stream.conn.write_reqs_pending--;
if (handle->stream.conn.write_reqs_pending == 0) {
uv__queue_remove(&req->queue);
if (uv__queue_empty(&handle->stream.conn.write_queue)) {
if (handle->flags & UV_HANDLE_CLOSING) {
closesocket(handle->socket);
handle->socket = INVALID_SOCKET;
@ -1360,19 +1360,22 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
int non_ifs_lsp;
int reading;
int writing;
struct uv__queue* q;
socket = tcp->socket;
reading = tcp->flags & UV_HANDLE_READ_PENDING;
writing = tcp->stream.conn.write_reqs_pending > 0;
writing = !uv__queue_empty(&tcp->stream.conn.write_queue);
if (!reading && !writing)
return;
/* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel
* them each explicitly with CancelIoEx (like unix). */
if (reading)
CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
if (writing)
CancelIo((HANDLE) socket);
if (writing) {
uv__queue_foreach(q, &tcp->stream.conn.write_queue) {
uv_write_t* wr = uv__queue_data(q, uv_write_t, queue);
CancelIoEx((HANDLE) socket, &wr->u.io.overlapped);
}
}
/* Check if we have any non-IFS LSPs stacked on top of TCP */
non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
@ -1391,7 +1394,7 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
&bytes,
NULL,
NULL) != 0) {
/* Failed. We can't do CancelIo. */
/* Failed. We can't do CancelIoEx. */
return;
}
}
@ -1401,8 +1404,12 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
if (socket != tcp->socket) {
if (reading)
CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
if (writing)
CancelIo((HANDLE) socket);
if (writing) {
uv__queue_foreach(q, &tcp->stream.conn.write_queue) {
uv_write_t* wr = uv__queue_data(q, uv_write_t, queue);
CancelIoEx((HANDLE) socket, &wr->u.io.overlapped);
}
}
}
}
@ -1443,7 +1450,7 @@ void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
* first (which typically should be cancellations). There's not much we can
* do about canceled reads, which also will generate an RST packet. */
if (!(tcp->flags & UV_HANDLE_CONNECTION) ||
tcp->stream.conn.write_reqs_pending == 0) {
uv__queue_empty(&tcp->stream.conn.write_queue)) {
closesocket(tcp->socket);
tcp->socket = INVALID_SOCKET;
}

View File

@ -2166,7 +2166,7 @@ int uv__tty_write(uv_loop_t* loop,
req->cb = cb;
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue);
REGISTER_HANDLE_REQ(loop, handle);
req->u.io.queued_bytes = 0;
@ -2188,7 +2188,7 @@ int uv__tty_try_write(uv_tty_t* handle,
unsigned int nbufs) {
DWORD error;
if (handle->stream.conn.write_reqs_pending > 0)
if (!uv__queue_empty(&handle->stream.conn.write_queue))
return UV_EAGAIN;
if (uv__tty_write_bufs(handle, bufs, nbufs, &error))
@ -2211,8 +2211,8 @@ void uv__process_tty_write_req(uv_loop_t* loop, uv_tty_t* handle,
}
handle->stream.conn.write_reqs_pending--;
if (handle->stream.conn.write_reqs_pending == 0 &&
uv__queue_remove(&req->queue);
if (uv__queue_empty(&handle->stream.conn.write_queue) &&
uv__is_stream_shutting(handle))
uv__process_tty_shutdown_req(loop,
handle,
@ -2237,7 +2237,7 @@ void uv__tty_close(uv_tty_t* handle) {
void uv__process_tty_shutdown_req(uv_loop_t* loop, uv_tty_t* stream, uv_shutdown_t* req) {
assert(stream->stream.conn.write_reqs_pending == 0);
assert(uv__queue_empty(&stream->stream.conn.write_queue));
assert(req);
stream->stream.conn.shutdown_req = NULL;