From 6d0459bccfd829061bb396c7c66b82a871fef5f0 Mon Sep 17 00:00:00 2001 From: Sam Schweigel Date: Fri, 2 Jan 2026 14:10:43 -0800 Subject: [PATCH] unix: Add the ability to cancel thread pool work with a signal This commit adds a new uv__work_kind, UV__WORK_FAST_IO_CANCELLABLE, which allows us to uv_cancel work in the thread pool that has already started executing. It does this by signaling the thread with pthread_kill, using a signal configurable with the UV_LOOP_CANCEL_SIGNAL option. Since we need to install a no-op signal handler, this signal must be owned by libuv. This could be relaxed, provided we document that the signal must not have SA_RESTART set, and running the signal handler on libuv worker threads must be harmless. The uv__work struct gets two new fields on Unix: the thread that is currently performing the work, and an atomic state field. If the work is not cancellable, it has state UV__WORK_BUSY the entire time. If the work is cancellable, it is inserted into the work queue with state UV__WORK_CANCELLABLE. When we want to cancel the work, we first check if the work is still in the queue (holding the queue lock), and if it is not, we atomically check for UV__WORK_CANCELLABLE and swap in UV__WORK_CANCEL_PENDING. If that check succeeded, we signal the thread with pthread_kill (the signal handler does nothing; we need only kick the thread out of an interruptable syscall with EINTR). In the worker thread, we check for UV__WORK_CANCEL_PENDING before and after the syscall, setting it to UV__WORK_CANCELLED if we avoided running the syscall, or if we got EINTR as the return value. Since there is a race between checking the flag and entering the syscall, we prevent deadlocks by spinning on pthread_kill in uv__work_cancel, with exponential backoff. We wait a maximum of 1024 ms before returning UV_EBUSY. --- docs/src/loop.rst | 8 +++ include/uv.h | 3 +- include/uv/threadpool.h | 12 ++++ include/uv/unix.h | 1 + src/threadpool.c | 86 +++++++++++++++++++++++++++- src/unix/internal.h | 12 ++++ src/unix/loop.c | 27 ++++++--- src/unix/stream.c | 35 +++++++---- src/uv-common.h | 5 ++ test/test-pipe-blocking-subprocess.c | 3 + 10 files changed, 169 insertions(+), 23 deletions(-) diff --git a/docs/src/loop.rst b/docs/src/loop.rst index 3d1973ba4..466ec27ee 100644 --- a/docs/src/loop.rst +++ b/docs/src/loop.rst @@ -89,10 +89,18 @@ API - UV_LOOP_USE_IO_URING_SQPOLL: Enable SQPOLL io_uring instance to handle asynchronous file system operations. + - UV_LOOP_CANCEL_SIGNAL: Allow blocking IO to be cancelled. The argument is + the signal number to use for cancellation. + + If this is not configured, a :c:type:`uv_stream_t` in blocking mode can + cause a thread in the thread pool to become permanently blocked. + .. versionchanged:: 1.39.0 added the UV_METRICS_IDLE_TIME option. .. versionchanged:: 1.49.0 added the UV_LOOP_USE_IO_URING_SQPOLL option. + .. versionchanged:: 1.52.0 added the UV_LOOP_CANCEL_SIGNAL option. + .. c:function:: int uv_loop_close(uv_loop_t* loop) Releases all internal loop resources. Call this function only when the loop diff --git a/include/uv.h b/include/uv.h index d1a77c749..5961e5a5b 100644 --- a/include/uv.h +++ b/include/uv.h @@ -263,8 +263,9 @@ typedef struct uv_metrics_s uv_metrics_t; typedef enum { UV_LOOP_BLOCK_SIGNAL = 0, UV_METRICS_IDLE_TIME, - UV_LOOP_USE_IO_URING_SQPOLL + UV_LOOP_USE_IO_URING_SQPOLL, #define UV_LOOP_USE_IO_URING_SQPOLL UV_LOOP_USE_IO_URING_SQPOLL + UV_LOOP_CANCEL_SIGNAL } uv_loop_option; typedef enum { diff --git a/include/uv/threadpool.h b/include/uv/threadpool.h index 24ce916fd..2692fc6c4 100644 --- a/include/uv/threadpool.h +++ b/include/uv/threadpool.h @@ -27,11 +27,23 @@ #ifndef UV_THREADPOOL_H_ #define UV_THREADPOOL_H_ +enum { + UV__WORK_BUSY = 0, + UV__WORK_CANCELLABLE, + UV__WORK_CANCEL_PENDING, + UV__WORK_CANCELLED, + UV__WORK_DONE +}; + struct uv__work { void (*work)(struct uv__work *w); void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; struct uv__queue wq; +#ifndef _WIN32 + pthread_t thread; + _Atomic char state; +#endif }; #endif /* UV_THREADPOOL_H_ */ diff --git a/include/uv/unix.h b/include/uv/unix.h index 2cb6e043a..a86c3e79c 100644 --- a/include/uv/unix.h +++ b/include/uv/unix.h @@ -245,6 +245,7 @@ typedef struct { uv__io_t signal_io_watcher; \ uv_signal_t child_watcher; \ int emfile_fd; \ + int cancel_signum; \ UV_PLATFORM_LOOP_FIELDS \ #define UV_REQ_TYPE_PRIVATE /* empty */ diff --git a/src/threadpool.c b/src/threadpool.c index 2a129a5d4..8b6383931 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -58,10 +58,17 @@ static void worker(void* arg) { struct uv__work* w; struct uv__queue* q; int is_slow_work; +#ifndef _WIN32 + pthread_t self; + char expected; +#endif uv_thread_setname("libuv-worker"); uv_sem_post((uv_sem_t*) arg); arg = NULL; +#ifndef _WIN32 + self = pthread_self(); +#endif uv_mutex_lock(&mutex); for (;;) { @@ -117,14 +124,31 @@ static void worker(void* arg) { } } + w = uv__queue_data(q, struct uv__work, wq); +#ifndef _WIN32 + w->thread = self; +#endif + uv_mutex_unlock(&mutex); - w = uv__queue_data(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ +#ifndef _WIN32 + expected = UV__WORK_CANCELLABLE; + if (!atomic_compare_exchange_strong_explicit(&w->state, + &expected, + UV__WORK_DONE, + memory_order_relaxed, + memory_order_relaxed)) { + if (expected == UV__WORK_CANCEL_PENDING) + atomic_store_explicit(&w->state, + UV__WORK_CANCELLED, + memory_order_relaxed); + } +#endif uv__queue_insert_tail(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); @@ -273,15 +297,31 @@ static void init_once(void) { } +#ifndef _WIN32 +void uv__cancel_signal_handler(int signo) { + /* We just want to trigger EINTR. */ +} +#endif + + void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { +#ifndef _WIN32 + char state; +#endif + uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; +#ifndef _WIN32 + state = kind == UV__WORK_FAST_IO_CANCELLABLE ? UV__WORK_CANCELLABLE + : UV__WORK_BUSY; + atomic_store_explicit(&w->state, state, memory_order_relaxed); +#endif post(&w->wq, kind); } @@ -289,8 +329,13 @@ void uv__work_submit(uv_loop_t* loop, /* TODO(bnoordhuis) teach libuv how to cancel file operations * that go through io_uring instead of the thread pool. */ -static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { +int uv__work_cancel(uv_loop_t* loop, struct uv__work* w) { int cancelled; +#ifndef _WIN32 + char expected; + pthread_t thread; + int i; +#endif uv_once(&once, init_once); /* Ensure |mutex| is initialized. */ uv_mutex_lock(&mutex); @@ -303,8 +348,39 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { uv_mutex_unlock(&w->loop->wq_mutex); uv_mutex_unlock(&mutex); +#ifdef _WIN32 if (!cancelled) return UV_EBUSY; +#else + if (!cancelled) { + if (atomic_load_explicit(&w->state, memory_order_relaxed) == UV__WORK_BUSY) + return UV_EBUSY; + + if (loop->cancel_signum == -1) + return UV_EBUSY; + + expected = UV__WORK_CANCELLABLE; + if (atomic_compare_exchange_strong_explicit(&w->state, + &expected, + UV__WORK_CANCEL_PENDING, + memory_order_relaxed, + memory_order_relaxed)) { + thread = w->thread; + i = 0; + do { + if (i >= 10) + return UV_EBUSY; + if (i > 0) + uv_sleep(1 << i++); + if (pthread_kill(thread, loop->cancel_signum) != 0) + abort(); + } while (atomic_load_explicit(&w->state, memory_order_relaxed) == + UV__WORK_CANCEL_PENDING); + } + + return 0; + } +#endif w->work = uv__cancelled; uv_mutex_lock(&loop->wq_mutex); @@ -337,6 +413,10 @@ void uv__work_done(uv_async_t* handle) { w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; +#ifndef _WIN32 + if (atomic_load_explicit(&w->state, memory_order_relaxed) == UV__WORK_CANCELLED) + err = UV_ECANCELED; +#endif w->done(w, err); nevents++; } @@ -425,5 +505,5 @@ int uv_cancel(uv_req_t* req) { return UV_EINVAL; } - return uv__work_cancel(loop, req, wreq); + return uv__work_cancel(loop, wreq); } diff --git a/src/unix/internal.h b/src/unix/internal.h index c9704a8bd..006b21d09 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -589,4 +589,16 @@ int uv__get_constrained_cpu(long long* quota); #define UV__KQUEUE_EVFILT_USER 0 #endif +static inline int uv__work_check_cancelled(struct uv__work* w) { + if (w == NULL) + return 0; + + if (atomic_load_explicit(&w->state, memory_order_relaxed) != + UV__WORK_CANCEL_PENDING) + return 0; + + atomic_store_explicit(&w->state, UV__WORK_CANCELLED, memory_order_relaxed); + return 1; +} + #endif /* UV_UNIX_INTERNAL_H_ */ diff --git a/src/unix/loop.c b/src/unix/loop.c index da9fa3c8b..bcc355087 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -23,6 +23,7 @@ #include "uv/tree.h" #include "internal.h" #include "heap-inl.h" +#include #include #include #include @@ -72,6 +73,7 @@ int uv_loop_init(uv_loop_t* loop) { loop->signal_pipefd[1] = -1; loop->backend_fd = -1; loop->emfile_fd = -1; + loop->cancel_signum = -1; loop->timer_counter = 0; loop->stop_flag = 0; @@ -213,6 +215,7 @@ void uv__loop_close(uv_loop_t* loop) { int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap) { uv__loop_internal_fields_t* lfields; + struct sigaction sa; lfields = uv__get_internal_fields(loop); if (option == UV_METRICS_IDLE_TIME) { @@ -227,13 +230,23 @@ int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap) { } #endif + if (option == UV_LOOP_BLOCK_SIGNAL) { + if (va_arg(ap, int) != SIGPROF) + return UV_EINVAL; + loop->flags |= UV_LOOP_BLOCK_SIGPROF; + return 0; + } - if (option != UV_LOOP_BLOCK_SIGNAL) - return UV_ENOSYS; + if (option == UV_LOOP_CANCEL_SIGNAL) { + loop->cancel_signum = va_arg(ap, int); + memset(&sa, 0, sizeof sa); + sa.sa_handler = &uv__cancel_signal_handler; + if (sigaction(loop->cancel_signum, &sa, NULL) == -1) { + loop->cancel_signum = -1; + return UV__ERR(errno); + } + return 0; + } - if (va_arg(ap, int) != SIGPROF) - return UV_EINVAL; - - loop->flags |= UV_LOOP_BLOCK_SIGPROF; - return 0; + return UV_ENOSYS; } diff --git a/src/unix/stream.c b/src/unix/stream.c index 98d3e2bf5..25d91cd4a 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -759,10 +759,12 @@ static int uv__handle_fd(uv_handle_t* handle) { } } + static int uv__try_write(uv_stream_t* stream, const uv_buf_t bufs[], unsigned int nbufs, - uv_stream_t* send_handle) { + uv_stream_t* send_handle, + struct uv__work* w) { struct iovec* iov; int iovmax; int iovcnt; @@ -774,6 +776,7 @@ static int uv__try_write(uv_stream_t* stream, */ iov = (struct iovec*) bufs; iovcnt = nbufs; + n = 0; iovmax = uv__getiovmax(); @@ -813,13 +816,17 @@ static int uv__try_write(uv_stream_t* stream, cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send)); memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send)); - do + do { + if (uv__work_check_cancelled(w)) + break; n = sendmsg(uv__stream_fd(stream), &msg, 0); - while (n == -1 && errno == EINTR); + } while (n == -1 && errno == EINTR); } else { - do + do { + if (uv__work_check_cancelled(w)) + break; n = uv__writev(uv__stream_fd(stream), iov, iovcnt); - while (n == -1 && errno == EINTR); + } while (n == -1 && errno == EINTR); } if (n >= 0) @@ -845,13 +852,13 @@ static int uv__try_write(uv_stream_t* stream, return UV__ERR(errno); } + /* A note about blocking writes: The UV_HANDLE_WRITE_PENDING flag is toggled * only on the loop thread (either by uv__write or uv__write_done). While we're * doing work from the thread pool, we touch only the result field of * uv_write_t; we'll never read it from the loop thread while a blocked write is * pending. */ - static void uv__write_work(struct uv__work* w) { uv_stream_t* stream; struct uv__queue* q; @@ -867,9 +874,11 @@ static void uv__write_work(struct uv__work* w) { req->result = uv__try_write(stream, &(req->bufs[req->write_index]), req->nbufs - req->write_index, - req->send_handle); + req->send_handle, + w); } + static void uv__write_done(struct uv__work* w, int status) { uv_stream_t* stream; struct uv__queue* q; @@ -907,6 +916,7 @@ error: uv__stream_osx_interrupt_select(stream); } + static void uv__write(uv_stream_t* stream) { struct uv__queue* q; uv_write_t* req; @@ -933,14 +943,15 @@ static void uv__write(uv_stream_t* stream) { n = uv__try_write(stream, &(req->bufs[req->write_index]), req->nbufs - req->write_index, - req->send_handle); + req->send_handle, + NULL); } else { n = UV_EAGAIN; if (!(stream->flags & UV_HANDLE_WRITE_PENDING)) { stream->flags |= UV_HANDLE_WRITE_PENDING; uv__work_submit(stream->loop, &stream->blocked_write, - UV__WORK_FAST_IO, + UV__WORK_FAST_IO_CANCELLABLE, uv__write_work, uv__write_done); } @@ -1504,7 +1515,7 @@ int uv_try_write2(uv_stream_t* stream, if (err < 0) return err; - return uv__try_write(stream, bufs, nbufs, send_handle); + return uv__try_write(stream, bufs, nbufs, send_handle, NULL); } @@ -1620,8 +1631,8 @@ void uv__stream_close(uv_stream_t* handle) { handle->queued_fds = NULL; } - /* if (handle->flags & UV_HANDLE_WRITE_PENDING) - * uv_cancel((uv_req_t*)handle->blocked_write); */ + if (handle->flags & UV_HANDLE_WRITE_PENDING) + uv__work_cancel(handle->loop, &handle->blocked_write); assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); } diff --git a/src/uv-common.h b/src/uv-common.h index 639df6ddf..07ec6a5ca 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -215,15 +215,20 @@ int uv__getaddrinfo_translate_error(int sys_err); /* EAI_* error. */ enum uv__work_kind { UV__WORK_CPU, UV__WORK_FAST_IO, + UV__WORK_FAST_IO_CANCELLABLE, UV__WORK_SLOW_IO }; +void uv__cancel_signal_handler(int signo); + void uv__work_submit(uv_loop_t* loop, struct uv__work *w, enum uv__work_kind kind, void (*work)(struct uv__work *w), void (*done)(struct uv__work *w, int status)); +int uv__work_cancel(uv_loop_t* loop, struct uv__work* w); + void uv__work_done(uv_async_t* handle); size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs); diff --git a/test/test-pipe-blocking-subprocess.c b/test/test-pipe-blocking-subprocess.c index 24f241f35..01444ddb7 100644 --- a/test/test-pipe-blocking-subprocess.c +++ b/test/test-pipe-blocking-subprocess.c @@ -72,6 +72,9 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { static void init_common(void) { loop = uv_default_loop(); +#ifndef _WIN32 + uv_loop_configure(loop, UV_LOOP_CANCEL_SIGNAL, SIGUSR1); +#endif ASSERT_OK(uv_pipe_init(loop, &pipe_in, 0)); ASSERT_OK(uv_pipe_init(loop, &pipe_out, 0));