diff --git a/include/uv/win.h b/include/uv/win.h index 5e20606c9..270f83a2d 100644 --- a/include/uv/win.h +++ b/include/uv/win.h @@ -558,8 +558,7 @@ typedef struct { #define UV_ASYNC_PRIVATE_FIELDS \ struct uv_req_s async_req; \ uv_async_cb async_cb; \ - /* char to avoid alignment issues */ \ - char volatile async_sent; + int pending; #define UV_PREPARE_PRIVATE_FIELDS \ uv_prepare_t* prepare_prev; \ diff --git a/src/unix/async.c b/src/unix/async.c index a50864f5d..074e048df 100644 --- a/src/unix/async.c +++ b/src/unix/async.c @@ -32,7 +32,6 @@ #include #include #include -#include /* sched_yield() */ #ifdef __linux__ #include @@ -68,7 +67,6 @@ static void uv__kqueue_runtime_detection(void) { static void uv__async_send(uv_loop_t* loop); static int uv__async_start(uv_loop_t* loop); -static void uv__cpu_relax(void); int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { @@ -89,70 +87,8 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { } -int uv_async_send(uv_async_t* handle) { - _Atomic int* pending; - int current; - - pending = (_Atomic int*) &handle->pending; - - /* Do a cheap read first. */ - current = atomic_load_explicit(pending, memory_order_relaxed); - if (current & 1) - return 0; - - /* Atomically set the pending flag (bit 0) and increment the busy counter - * (bits 1+). Adding 3 sets bit 0 and adds 2 to the busy counter at once, - * so both operations appear atomic to other threads. */ - while (!atomic_compare_exchange_weak_explicit(pending, - ¤t, - current + 3, - memory_order_relaxed, - memory_order_relaxed)) - if (current & 1) - return 0; - - /* Wake up the other thread's event loop. The write establishes a - * happens-before relationship with the reader via the kernel. */ +void uv__async_notify(uv_async_t* handle) { uv__async_send(handle->loop); - - /* Decrement the busy counter (bits 1+). */ - atomic_fetch_add_explicit(pending, -2, memory_order_relaxed); - - return 0; -} - - -/* Wait for the busy counter to clear before closing. - * Only call this from the event loop thread. */ -static void uv__async_spin(uv_async_t* handle) { - _Atomic int* pending; - int i; - - pending = (_Atomic int*) &handle->pending; - - /* Set the pending flag (bit 0) so no new events will be added by other - * threads after this function returns. */ - atomic_fetch_or_explicit(pending, 1, memory_order_relaxed); - - for (;;) { - /* 997 is not completely chosen at random. It's a prime number, acyclic by - * nature, and should therefore hopefully dampen sympathetic resonance. - */ - for (i = 0; i < 997; i++) { - /* Wait until the busy counter (bits 1+) is zero. */ - if ((atomic_load(pending) & ~1) == 0) - return; - - /* Other thread is busy with this handle, spin until it's done. */ - uv__cpu_relax(); - } - - /* Yield the CPU. We may have preempted the other thread while it's - * inside the critical section and if it's running on the same CPU - * as us, we'll just burn CPU cycles until the end of our time slice. - */ - sched_yield(); - } } @@ -407,18 +343,3 @@ int uv__async_fork(uv_loop_t* loop) { return uv__async_start(loop); } - - -static void uv__cpu_relax(void) { -#if defined(__i386__) || defined(__x86_64__) - __asm__ __volatile__ ("rep; nop" ::: "memory"); /* a.k.a. PAUSE */ -#elif (defined(__arm__) && __ARM_ARCH >= 7) || defined(__aarch64__) - __asm__ __volatile__ ("isb" ::: "memory"); -#elif (defined(__ppc__) || defined(__ppc64__)) && defined(__APPLE__) - __asm volatile ("" : : : "memory"); -#elif !defined(__APPLE__) && (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__)) - __asm__ __volatile__ ("or 1,1,1; or 2,2,2" ::: "memory"); -#elif defined(__riscv) && __riscv_xlen == 64 - __asm__ volatile(".insn 0x0100000f" ::: "memory"); /* FENCE */ -#endif -} diff --git a/src/uv-common.c b/src/uv-common.c index ac738be1f..9234704a7 100644 --- a/src/uv-common.c +++ b/src/uv-common.c @@ -34,6 +34,7 @@ # include /* malloc */ #else # include /* if_nametoindex */ +# include /* sched_yield() */ # include /* AF_UNIX, sockaddr_un */ #endif @@ -896,6 +897,128 @@ uv_loop_t* uv_loop_new(void) { } +/* Pause the CPU briefly to avoid burning power in spin-wait loops. */ +static void uv__cpu_relax(void) { +#if defined(_WIN32) + YieldProcessor(); +#elif defined(__i386__) || defined(__x86_64__) + __asm__ __volatile__ ("rep; nop" ::: "memory"); /* a.k.a. PAUSE */ +#elif (defined(__arm__) && __ARM_ARCH >= 7) || defined(__aarch64__) + __asm__ __volatile__ ("isb" ::: "memory"); +#elif !defined(__APPLE__) && (defined(__powerpc64__) || defined(__ppc64__) || defined(__PPC64__)) + __asm__ __volatile__ ("or 1,1,1; or 2,2,2" ::: "memory"); +#elif (defined(__ppc__) || defined(__ppc64__)) && defined(__APPLE__) + __asm volatile ("" : : : "memory"); +#elif defined(__riscv) && __riscv_xlen == 64 + __asm__ volatile(".insn 0x0100000f" ::: "memory"); /* FENCE */ +#endif +} + + +/* Atomic helpers for the async pending field. + * Bit 0: pending flag (notification sent or handle closing). + * Bits 1+: busy counter (2 per in-flight uv_async_send call). */ +#ifdef _MSC_VER + +static int uv__pending_cas(int* p, int* expected, int desired) { + LONG old; + old = InterlockedCompareExchange((LONG volatile*) p, (LONG) desired, (LONG) *expected); + if (old == (LONG) *expected) return 1; + *expected = (int) old; + return 0; +} + +#define uv__pending_load(p) ((int) *(volatile int*)(p)) +#define uv__pending_fetch_add(p, v) \ + ((void) InterlockedExchangeAdd((LONG volatile*)(p), (LONG)(v))) +#define uv__pending_fetch_or(p, v) \ + ((int) InterlockedOr((LONG volatile*)(p), (LONG)(v))) +#define uv__pending_fetch_and(p, v) \ + ((int) InterlockedAnd((LONG volatile*)(p), (LONG)(v))) + +#else /* GCC / Clang / MinGW — use C11 stdatomic */ + +static int uv__pending_cas(int* p, int* expected, int desired) { + return atomic_compare_exchange_weak_explicit((_Atomic int*) p, + expected, + desired, + memory_order_relaxed, + memory_order_relaxed); +} + +#define uv__pending_load(p) \ + atomic_load_explicit((_Atomic int*)(p), memory_order_relaxed) +#define uv__pending_fetch_add(p, v) \ + ((void) atomic_fetch_add_explicit((_Atomic int*)(p), (v), memory_order_relaxed)) +#define uv__pending_fetch_or(p, v) \ + ((int) atomic_fetch_or_explicit((_Atomic int*)(p), (v), memory_order_relaxed)) +#define uv__pending_fetch_and(p, v) \ + ((int) atomic_fetch_and_explicit((_Atomic int*)(p), (v), memory_order_relaxed)) + +#endif /* _MSC_VER */ + + +int uv_async_send(uv_async_t* handle) { + int current; + + /* Do a cheap read first. */ + current = uv__pending_load(&handle->pending); + if (current & 1) + return 0; + + /* Atomically set the pending flag (bit 0) and increment the busy counter + * (bits 1+). Adding 3 sets bit 0 and adds 2 to the busy counter at once, + * so both operations appear atomic to other threads. */ + while (!uv__pending_cas(&handle->pending, ¤t, current + 3)) + if (current & 1) + return 0; + + /* Wake up the event loop. The notification write establishes a + * happens-before relationship with the reader via the kernel. */ + uv__async_notify(handle); + + /* Decrement the busy counter (bits 1+). */ + uv__pending_fetch_add(&handle->pending, -2); + + return 0; +} + + +int uv__async_spin(uv_async_t* handle) { + int old; + int i; + + /* Atomically set the pending flag (bit 0) so no new notifications will be + * sent after this function returns. Save whether the flag was already set + * so callers can determine whether a notification is in flight. */ + old = uv__pending_fetch_or(&handle->pending, 1); + + for (;;) { + /* 997 is not completely chosen at random. It's a prime number, acyclic by + * nature, and should therefore hopefully dampen sympathetic resonance. + */ + for (i = 0; i < 997; i++) { + /* Wait until the busy counter (bits 1+) is zero. */ + if ((uv__pending_load(&handle->pending) & ~1) == 0) + return old & 1; + + /* Another thread is busy with this handle; spin until it's done. */ + uv__cpu_relax(); + } + + /* Yield the CPU. We may have preempted the other thread while it's + * inside the critical section and if it's running on the same CPU + * as us, we'll just burn CPU cycles until the end of our time slice. + */ +#ifdef _WIN32 + SwitchToThread(); +#else + sched_yield(); +#endif + } +} + + int uv_loop_close(uv_loop_t* loop) { struct uv__queue* q; uv_handle_t* h; diff --git a/src/uv-common.h b/src/uv-common.h index 0348a4f62..0328db877 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -149,6 +149,13 @@ int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap); void uv__loop_close(uv_loop_t* loop); +/* Sets the pending flag (bit 0) and waits for the busy counter (bits 1+) to + * drain. Returns the previous value of bit 0. */ +int uv__async_spin(uv_async_t* handle); + +/* Platform hook: post a wakeup notification for the given async handle. */ +void uv__async_notify(uv_async_t* handle); + int uv__read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb); diff --git a/src/win/async.c b/src/win/async.c index 4c2cd265e..ce4423185 100644 --- a/src/win/async.c +++ b/src/win/async.c @@ -26,29 +26,12 @@ #include "handle-inl.h" #include "req-inl.h" -#ifdef _MSC_VER /* MSVC */ - -/* _InterlockedOr8 is supported by MSVC on x32 and x64. It is slightly less - * efficient than InterlockedExchange, but InterlockedExchange8 does not exist, - * and interlocked operations on larger targets might require the target to be - * aligned. */ -#pragma intrinsic(_InterlockedOr8) - -static char uv__atomic_exchange_set(char volatile* target) { - return _InterlockedOr8(target, 1); -} - -#else /* GCC, Clang in mingw mode */ - -static char uv__atomic_exchange_set(char volatile* target) { - return __sync_fetch_and_or(target, 1); -} - -#endif /* _MSC_VER */ void uv__async_endgame(uv_loop_t* loop, uv_async_t* handle) { - if (handle->flags & UV_HANDLE_CLOSING && - !handle->async_sent) { + /* uv__async_close guarantees uv__want_endgame is called exactly once: the + * spin drains all in-flight senders and the return value selects which path + * schedules the endgame, so no double-close guard is needed here. */ + if (handle->flags & UV_HANDLE_CLOSING) { assert(!(handle->flags & UV_HANDLE_CLOSED)); uv__handle_close(handle); } @@ -59,7 +42,7 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { uv_req_t* req; uv__handle_init(loop, (uv_handle_t*) handle, UV_ASYNC); - handle->async_sent = 0; + handle->pending = 0; handle->async_cb = async_cb; req = &handle->async_req; @@ -73,31 +56,44 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { void uv__async_close(uv_loop_t* loop, uv_async_t* handle) { - if (!((uv_async_t*)handle)->async_sent) { + /* Block new senders and wait for any in-flight send to finish. + * uv__async_spin returns the previous value of the pending flag (bit 0): + * if it was already set, an IOCP notification is in flight and will trigger + * the endgame via uv__process_async_wakeup_req; otherwise we schedule the + * endgame immediately because no further IOCP completion will arrive. */ + if (!uv__async_spin(handle)) uv__want_endgame(loop, (uv_handle_t*) handle); - } uv__handle_closing(handle); } -int uv_async_send(uv_async_t* handle) { +/* Platform hook called by uv_async_send (in uv-common.c) after the CAS + * succeeds. Posts an IOCP completion so the event loop wakes up. */ +void uv__async_notify(uv_async_t* handle) { uv_loop_t* loop = handle->loop; + POST_COMPLETION_FOR_REQ(loop, &handle->async_req); +} - if (handle->type != UV_ASYNC) { - /* Can't set errno because that's not thread-safe. */ - return -1; + +void uv__async_stop(uv_loop_t* loop) { + struct uv__queue* q; + uv_handle_t* h; + + /* Spin all UV_ASYNC handles to drain any thread that has passed the CAS in + * uv_async_send but has not yet called PostQueuedCompletionStatus. Without + * this, such a thread could post to loop->iocp after it is closed. */ + uv__queue_foreach(q, &loop->handle_queue) { + h = uv__queue_data(q, uv_handle_t, handle_queue); + if (h->type == UV_ASYNC) + uv__async_spin((uv_async_t*) h); } - /* The user should make sure never to call uv_async_send to a closing or - * closed handle. */ - assert(!(handle->flags & UV_HANDLE_CLOSING)); - - if (!uv__atomic_exchange_set(&handle->async_sent)) { - POST_COMPLETION_FOR_REQ(loop, &handle->async_req); - } - - return 0; + /* Close the internal wq_async handle directly, bypassing the normal endgame: + * any pending IOCP message will be discarded with loop->iocp. */ + loop->wq_async.close_cb = NULL; + uv__handle_closing(&loop->wq_async); + uv__handle_close(&loop->wq_async); } @@ -106,10 +102,12 @@ void uv__process_async_wakeup_req(uv_loop_t* loop, uv_async_t* handle, assert(handle->type == UV_ASYNC); assert(req->type == UV_WAKEUP); - handle->async_sent = 0; + /* Atomically clear the pending flag (bit 0) while preserving the busy + * counter (bits 1+), allowing new senders to post again if needed. */ + InterlockedAnd((LONG volatile*) &handle->pending, ~1); if (handle->flags & UV_HANDLE_CLOSING) { - uv__want_endgame(loop, (uv_handle_t*)handle); + uv__want_endgame(loop, (uv_handle_t*) handle); } else if (handle->async_cb != NULL) { handle->async_cb(handle); } diff --git a/src/win/core.c b/src/win/core.c index 317238fd2..93d60e7c2 100644 --- a/src/win/core.c +++ b/src/win/core.c @@ -340,16 +340,10 @@ void uv__loop_close(uv_loop_t* loop) { uv__loops_remove(loop); - /* Close the async handle without needing an extra loop iteration. - * We might have a pending message, but we're just going to destroy the IOCP - * soon, so we can just discard it now without the usual risk of a getting - * another notification from GetQueuedCompletionStatusEx after calling the - * close_cb (which we also skip defining). We'll assert later that queue was - * actually empty and all reqs handled. */ - loop->wq_async.async_sent = 0; - loop->wq_async.close_cb = NULL; - uv__handle_closing(&loop->wq_async); - uv__handle_close(&loop->wq_async); + /* Any pending IOCP message for wq_async is discarded when loop->iocp is + * closed below; uv__async_stop spins to ensure no thread is still inside + * PostQueuedCompletionStatus before that happens. */ + uv__async_stop(loop); for (i = 0; i < ARRAY_SIZE(loop->poll_peer_sockets); i++) { SOCKET sock = loop->poll_peer_sockets[i]; diff --git a/src/win/internal.h b/src/win/internal.h index db488be77..b176cf3ea 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -201,6 +201,7 @@ void uv__once_init(void); /* * Async watcher */ +void uv__async_stop(uv_loop_t* loop); void uv__async_close(uv_loop_t* loop, uv_async_t* handle); void uv__async_endgame(uv_loop_t* loop, uv_async_t* handle);