win,pipe: skip IOCP for pipe handles

Copy the optimization from tcp/udp stack: when read/writes don't require
IOCP to complete, put them directly right into the pending queue, so
they can be processed without a full loop through IOCP each time to
drain them asyncronously.
This commit is contained in:
Jameson Nash 2026-03-17 00:08:58 -04:00
parent 919b92d944
commit a9334abdb0

View File

@ -391,6 +391,32 @@ int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
}
static DWORD uv__pipe_attach_iocp(HANDLE pipeHandle,
HANDLE iocp,
uv_pipe_t* handle) {
UCHAR sfcnm_flags;
DWORD err = 0;
if (CreateIoCompletionPort(pipeHandle, iocp, (ULONG_PTR) handle, 0) == NULL) {
err = GetLastError();
handle->flags |= UV_HANDLE_EMULATE_IOCP;
}
sfcnm_flags = FILE_SKIP_SET_EVENT_ON_HANDLE;
if (!err)
sfcnm_flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
if (SetFileCompletionNotificationModes(pipeHandle, sfcnm_flags)) {
if (sfcnm_flags & FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)
handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
} else {
err = GetLastError();
}
return err;
}
int uv__create_stdio_pipe_pair(uv_loop_t* loop,
uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
/* The parent_pipe is always the server_pipe and kept by libuv.
@ -430,13 +456,9 @@ int uv__create_stdio_pipe_pair(uv_loop_t* loop,
if (err)
goto error;
if (CreateIoCompletionPort(server_pipe,
loop->iocp,
(ULONG_PTR) parent_pipe,
0) == NULL) {
err = GetLastError();
goto error;
}
err = uv__pipe_attach_iocp(server_pipe, loop->iocp, parent_pipe);
if (err)
uv_fatal_error(err, "uv__pipe_attach_iocp");
parent_pipe->handle = server_pipe;
*child_pipe_ptr = client_pipe;
@ -522,12 +544,7 @@ static int uv__set_pipe_handle(uv_loop_t* loop,
InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
} else {
/* Overlapped pipe. Try to associate with IOCP. */
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR) handle,
0) == NULL) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
}
uv__pipe_attach_iocp(pipeHandle, loop->iocp, handle);
}
handle->handle = pipeHandle;
@ -540,6 +557,8 @@ static int uv__set_pipe_handle(uv_loop_t* loop,
static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
uv_pipe_accept_t* req, BOOL firstInstance) {
DWORD err;
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
req->pipeHandle =
@ -554,12 +573,9 @@ static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
}
/* Associate it with IOCP so we can get events. */
if (CreateIoCompletionPort(req->pipeHandle,
loop->iocp,
(ULONG_PTR) handle,
0) == NULL) {
uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
}
err = uv__pipe_attach_iocp(req->pipeHandle, loop->iocp, handle);
if (err)
uv_fatal_error(err, "uv__pipe_attach_iocp");
/* Stash a handle in the server object for use from places such as
* getsockname and chmod. As we transfer ownership of these to client
@ -1106,6 +1122,8 @@ void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
uv_pipe_accept_t* req, BOOL firstInstance) {
BOOL success;
assert(handle->flags & UV_HANDLE_LISTENING);
if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
@ -1120,23 +1138,27 @@ static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
/* Prepare the overlapped structure. */
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
GetLastError() != ERROR_IO_PENDING) {
success = ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
SET_REQ_SUCCESS(req);
uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
/* The req will be processed with IOCP. */
handle->reqs_pending++;
} else {
if (GetLastError() == ERROR_PIPE_CONNECTED) {
SET_REQ_SUCCESS(req);
} else {
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, GetLastError());
}
uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
/* Wait for completion via IOCP */
handle->reqs_pending++;
}
@ -1183,6 +1205,7 @@ int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
/* Initialize the client handle and copy the pipeHandle to the client */
pipe_client->handle = req->pipeHandle;
pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
pipe_client->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
/* Prepare the req to pick up a new connection */
server->pipe.serv.pending_accepts = req->next_pending;
@ -1410,7 +1433,14 @@ static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
NULL,
&req->u.io.overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
if (UV_SUCCEEDED_WITHOUT_IOCP(result)) {
/* Process the req without IOCP. */
eof_timer_start(handle);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
uv__insert_pending_req(loop, (uv_req_t*) req);
return;
} else if (!UV_SUCCEEDED_WITH_IOCP(result)) {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, GetLastError());
goto error;
@ -1695,21 +1725,23 @@ static int uv__pipe_write_data(uv_loop_t* loop,
write_buf.len,
NULL,
&req->u.io.overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
return GetLastError();
}
if (result) {
/* Request completed immediately. */
req->u.io.queued_bytes = 0;
} else {
/* Request queued by the kernel. */
req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (UV_SUCCEEDED_WITHOUT_IOCP(result)) {
/* Request completed immediately without IOCP packet. */
REGISTER_HANDLE_REQ(loop, handle);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
uv__insert_pending_req(loop, (uv_req_t*) req);
return 0;
} else if (!UV_SUCCEEDED_WITH_IOCP(result)) {
return GetLastError();
} else if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion_write_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {