win: remove busy loop from uv_async_send (#3879)

See #2231 (37042a5bca)
This commit is contained in:
Jameson Nash 2026-03-12 15:02:26 +00:00
parent 58418d5310
commit 307fe29a1a
7 changed files with 174 additions and 131 deletions

View File

@ -558,8 +558,7 @@ typedef struct {
#define UV_ASYNC_PRIVATE_FIELDS \
struct uv_req_s async_req; \
uv_async_cb async_cb; \
/* char to avoid alignment issues */ \
char volatile async_sent;
int pending;
#define UV_PREPARE_PRIVATE_FIELDS \
uv_prepare_t* prepare_prev; \

View File

@ -32,7 +32,6 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sched.h> /* sched_yield() */
#ifdef __linux__
#include <sys/eventfd.h>
@ -68,7 +67,6 @@ static void uv__kqueue_runtime_detection(void) {
static void uv__async_send(uv_loop_t* loop);
static int uv__async_start(uv_loop_t* loop);
static void uv__cpu_relax(void);
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
@ -89,70 +87,8 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
}
int uv_async_send(uv_async_t* handle) {
_Atomic int* pending;
int current;
pending = (_Atomic int*) &handle->pending;
/* Do a cheap read first. */
current = atomic_load_explicit(pending, memory_order_relaxed);
if (current & 1)
return 0;
/* Atomically set the pending flag (bit 0) and increment the busy counter
* (bits 1+). Adding 3 sets bit 0 and adds 2 to the busy counter at once,
* so both operations appear atomic to other threads. */
while (!atomic_compare_exchange_weak_explicit(pending,
&current,
current + 3,
memory_order_relaxed,
memory_order_relaxed))
if (current & 1)
return 0;
/* Wake up the other thread's event loop. The write establishes a
* happens-before relationship with the reader via the kernel. */
void uv__async_notify(uv_async_t* handle) {
uv__async_send(handle->loop);
/* Decrement the busy counter (bits 1+). */
atomic_fetch_add_explicit(pending, -2, memory_order_relaxed);
return 0;
}
/* Wait for the busy counter to clear before closing.
* Only call this from the event loop thread. */
static void uv__async_spin(uv_async_t* handle) {
_Atomic int* pending;
int i;
pending = (_Atomic int*) &handle->pending;
/* Set the pending flag (bit 0) so no new events will be added by other
* threads after this function returns. */
atomic_fetch_or_explicit(pending, 1, memory_order_relaxed);
for (;;) {
/* 997 is not completely chosen at random. It's a prime number, acyclic by
* nature, and should therefore hopefully dampen sympathetic resonance.
*/
for (i = 0; i < 997; i++) {
/* Wait until the busy counter (bits 1+) is zero. */
if ((atomic_load(pending) & ~1) == 0)
return;
/* Other thread is busy with this handle, spin until it's done. */
uv__cpu_relax();
}
/* Yield the CPU. We may have preempted the other thread while it's
* inside the critical section and if it's running on the same CPU
* as us, we'll just burn CPU cycles until the end of our time slice.
*/
sched_yield();
}
}
@ -407,18 +343,3 @@ int uv__async_fork(uv_loop_t* loop) {
return uv__async_start(loop);
}
static void uv__cpu_relax(void) {
#if defined(__i386__) || defined(__x86_64__)
__asm__ __volatile__ ("rep; nop" ::: "memory"); /* a.k.a. PAUSE */
#elif (defined(__arm__) && __ARM_ARCH >= 7) || defined(__aarch64__)
__asm__ __volatile__ ("isb" ::: "memory");
#elif (defined(__ppc__) || defined(__ppc64__)) && defined(__APPLE__)
__asm volatile ("" : : : "memory");
#elif !defined(__APPLE__) && (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__))
__asm__ __volatile__ ("or 1,1,1; or 2,2,2" ::: "memory");
#elif defined(__riscv) && __riscv_xlen == 64
__asm__ volatile(".insn 0x0100000f" ::: "memory"); /* FENCE */
#endif
}

View File

@ -34,6 +34,7 @@
# include <malloc.h> /* malloc */
#else
# include <net/if.h> /* if_nametoindex */
# include <sched.h> /* sched_yield() */
# include <sys/un.h> /* AF_UNIX, sockaddr_un */
#endif
@ -887,6 +888,128 @@ uv_loop_t* uv_loop_new(void) {
}
/* Pause the CPU briefly to avoid burning power in spin-wait loops. */
static void uv__cpu_relax(void) {
#if defined(_WIN32)
YieldProcessor();
#elif defined(__i386__) || defined(__x86_64__)
__asm__ __volatile__ ("rep; nop" ::: "memory"); /* a.k.a. PAUSE */
#elif (defined(__arm__) && __ARM_ARCH >= 7) || defined(__aarch64__)
__asm__ __volatile__ ("isb" ::: "memory");
#elif !defined(__APPLE__) && (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__))
__asm__ __volatile__ ("or 1,1,1; or 2,2,2" ::: "memory");
#elif (defined(__ppc__) || defined(__ppc64__)) && defined(__APPLE__)
__asm volatile ("" : : : "memory");
#elif defined(__riscv) && __riscv_xlen == 64
__asm__ volatile(".insn 0x0100000f" ::: "memory"); /* FENCE */
#endif
}
/* Atomic helpers for the async pending field.
* Bit 0: pending flag (notification sent or handle closing).
* Bits 1+: busy counter (2 per in-flight uv_async_send call). */
#ifdef _MSC_VER
static int uv__pending_cas(int* p, int* expected, int desired) {
LONG old;
old = InterlockedCompareExchange((LONG volatile*) p, (LONG) desired, (LONG) *expected);
if (old == (LONG) *expected) return 1;
*expected = (int) old;
return 0;
}
#define uv__pending_load(p) ((int) *(volatile int*)(p))
#define uv__pending_fetch_add(p, v) \
((void) InterlockedExchangeAdd((LONG volatile*)(p), (LONG)(v)))
#define uv__pending_fetch_or(p, v) \
((int) InterlockedOr((LONG volatile*)(p), (LONG)(v)))
#define uv__pending_fetch_and(p, v) \
((int) InterlockedAnd((LONG volatile*)(p), (LONG)(v)))
#else /* GCC / Clang / MinGW — use C11 stdatomic */
static int uv__pending_cas(int* p, int* expected, int desired) {
return atomic_compare_exchange_weak_explicit((_Atomic int*) p,
expected,
desired,
memory_order_relaxed,
memory_order_relaxed);
}
#define uv__pending_load(p) \
atomic_load_explicit((_Atomic int*)(p), memory_order_relaxed)
#define uv__pending_fetch_add(p, v) \
((void) atomic_fetch_add_explicit((_Atomic int*)(p), (v), memory_order_relaxed))
#define uv__pending_fetch_or(p, v) \
((int) atomic_fetch_or_explicit((_Atomic int*)(p), (v), memory_order_relaxed))
#define uv__pending_fetch_and(p, v) \
((int) atomic_fetch_and_explicit((_Atomic int*)(p), (v), memory_order_relaxed))
#endif /* _MSC_VER */
int uv_async_send(uv_async_t* handle) {
int current;
/* Do a cheap read first. */
current = uv__pending_load(&handle->pending);
if (current & 1)
return 0;
/* Atomically set the pending flag (bit 0) and increment the busy counter
* (bits 1+). Adding 3 sets bit 0 and adds 2 to the busy counter at once,
* so both operations appear atomic to other threads. */
while (!uv__pending_cas(&handle->pending, &current, current + 3))
if (current & 1)
return 0;
/* Wake up the event loop. The notification write establishes a
* happens-before relationship with the reader via the kernel. */
uv__async_notify(handle);
/* Decrement the busy counter (bits 1+). */
uv__pending_fetch_add(&handle->pending, -2);
return 0;
}
int uv__async_spin(uv_async_t* handle) {
int old;
int i;
/* Atomically set the pending flag (bit 0) so no new notifications will be
* sent after this function returns. Save whether the flag was already set
* so callers can determine whether a notification is in flight. */
old = uv__pending_fetch_or(&handle->pending, 1);
for (;;) {
/* 997 is not completely chosen at random. It's a prime number, acyclic by
* nature, and should therefore hopefully dampen sympathetic resonance.
*/
for (i = 0; i < 997; i++) {
/* Wait until the busy counter (bits 1+) is zero. */
if ((uv__pending_load(&handle->pending) & ~1) == 0)
return old & 1;
/* Another thread is busy with this handle; spin until it's done. */
uv__cpu_relax();
}
/* Yield the CPU. We may have preempted the other thread while it's
* inside the critical section and if it's running on the same CPU
* as us, we'll just burn CPU cycles until the end of our time slice.
*/
#ifdef _WIN32
SwitchToThread();
#else
sched_yield();
#endif
}
}
int uv_loop_close(uv_loop_t* loop) {
struct uv__queue* q;
uv_handle_t* h;

View File

@ -149,6 +149,13 @@ int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap);
void uv__loop_close(uv_loop_t* loop);
/* Sets the pending flag (bit 0) and waits for the busy counter (bits 1+) to
* drain. Returns the previous value of bit 0. */
int uv__async_spin(uv_async_t* handle);
/* Platform hook: post a wakeup notification for the given async handle. */
void uv__async_notify(uv_async_t* handle);
int uv__read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb);

View File

@ -26,29 +26,12 @@
#include "handle-inl.h"
#include "req-inl.h"
#ifdef _MSC_VER /* MSVC */
/* _InterlockedOr8 is supported by MSVC on x32 and x64. It is slightly less
* efficient than InterlockedExchange, but InterlockedExchange8 does not exist,
* and interlocked operations on larger targets might require the target to be
* aligned. */
#pragma intrinsic(_InterlockedOr8)
static char uv__atomic_exchange_set(char volatile* target) {
return _InterlockedOr8(target, 1);
}
#else /* GCC, Clang in mingw mode */
static char uv__atomic_exchange_set(char volatile* target) {
return __sync_fetch_and_or(target, 1);
}
#endif /* _MSC_VER */
void uv__async_endgame(uv_loop_t* loop, uv_async_t* handle) {
if (handle->flags & UV_HANDLE_CLOSING &&
!handle->async_sent) {
/* uv__async_close guarantees uv__want_endgame is called exactly once: the
* spin drains all in-flight senders and the return value selects which path
* schedules the endgame, so no double-close guard is needed here. */
if (handle->flags & UV_HANDLE_CLOSING) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
uv__handle_close(handle);
}
@ -59,7 +42,7 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
uv_req_t* req;
uv__handle_init(loop, (uv_handle_t*) handle, UV_ASYNC);
handle->async_sent = 0;
handle->pending = 0;
handle->async_cb = async_cb;
req = &handle->async_req;
@ -73,31 +56,44 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
void uv__async_close(uv_loop_t* loop, uv_async_t* handle) {
if (!((uv_async_t*)handle)->async_sent) {
/* Block new senders and wait for any in-flight send to finish.
* uv__async_spin returns the previous value of the pending flag (bit 0):
* if it was already set, an IOCP notification is in flight and will trigger
* the endgame via uv__process_async_wakeup_req; otherwise we schedule the
* endgame immediately because no further IOCP completion will arrive. */
if (!uv__async_spin(handle))
uv__want_endgame(loop, (uv_handle_t*) handle);
}
uv__handle_closing(handle);
}
int uv_async_send(uv_async_t* handle) {
/* Platform hook called by uv_async_send (in uv-common.c) after the CAS
* succeeds. Posts an IOCP completion so the event loop wakes up. */
void uv__async_notify(uv_async_t* handle) {
uv_loop_t* loop = handle->loop;
POST_COMPLETION_FOR_REQ(loop, &handle->async_req);
}
if (handle->type != UV_ASYNC) {
/* Can't set errno because that's not thread-safe. */
return -1;
void uv__async_stop(uv_loop_t* loop) {
struct uv__queue* q;
uv_handle_t* h;
/* Spin all UV_ASYNC handles to drain any thread that has passed the CAS in
* uv_async_send but has not yet called PostQueuedCompletionStatus. Without
* this, such a thread could post to loop->iocp after it is closed. */
uv__queue_foreach(q, &loop->handle_queue) {
h = uv__queue_data(q, uv_handle_t, handle_queue);
if (h->type == UV_ASYNC)
uv__async_spin((uv_async_t*) h);
}
/* The user should make sure never to call uv_async_send to a closing or
* closed handle. */
assert(!(handle->flags & UV_HANDLE_CLOSING));
if (!uv__atomic_exchange_set(&handle->async_sent)) {
POST_COMPLETION_FOR_REQ(loop, &handle->async_req);
}
return 0;
/* Close the internal wq_async handle directly, bypassing the normal endgame:
* any pending IOCP message will be discarded with loop->iocp. */
loop->wq_async.close_cb = NULL;
uv__handle_closing(&loop->wq_async);
uv__handle_close(&loop->wq_async);
}
@ -106,10 +102,12 @@ void uv__process_async_wakeup_req(uv_loop_t* loop, uv_async_t* handle,
assert(handle->type == UV_ASYNC);
assert(req->type == UV_WAKEUP);
handle->async_sent = 0;
/* Atomically clear the pending flag (bit 0) while preserving the busy
* counter (bits 1+), allowing new senders to post again if needed. */
InterlockedAnd((LONG volatile*) &handle->pending, ~1);
if (handle->flags & UV_HANDLE_CLOSING) {
uv__want_endgame(loop, (uv_handle_t*)handle);
uv__want_endgame(loop, (uv_handle_t*) handle);
} else if (handle->async_cb != NULL) {
handle->async_cb(handle);
}

View File

@ -340,16 +340,10 @@ void uv__loop_close(uv_loop_t* loop) {
uv__loops_remove(loop);
/* Close the async handle without needing an extra loop iteration.
* We might have a pending message, but we're just going to destroy the IOCP
* soon, so we can just discard it now without the usual risk of a getting
* another notification from GetQueuedCompletionStatusEx after calling the
* close_cb (which we also skip defining). We'll assert later that queue was
* actually empty and all reqs handled. */
loop->wq_async.async_sent = 0;
loop->wq_async.close_cb = NULL;
uv__handle_closing(&loop->wq_async);
uv__handle_close(&loop->wq_async);
/* Any pending IOCP message for wq_async is discarded when loop->iocp is
* closed below; uv__async_stop spins to ensure no thread is still inside
* PostQueuedCompletionStatus before that happens. */
uv__async_stop(loop);
for (i = 0; i < ARRAY_SIZE(loop->poll_peer_sockets); i++) {
SOCKET sock = loop->poll_peer_sockets[i];

View File

@ -201,6 +201,7 @@ void uv__once_init(void);
/*
* Async watcher
*/
void uv__async_stop(uv_loop_t* loop);
void uv__async_close(uv_loop_t* loop, uv_async_t* handle);
void uv__async_endgame(uv_loop_t* loop, uv_async_t* handle);