unix: Add the ability to cancel thread pool work with a signal

This commit adds a new uv__work_kind, UV__WORK_FAST_IO_CANCELLABLE,
which allows us to uv_cancel work in the thread pool that has already
started executing.  It does this by signaling the thread with
pthread_kill, using a signal configurable with the UV_LOOP_CANCEL_SIGNAL
option.  Since we need to install a no-op signal handler, this signal
must be owned by libuv.  This could be relaxed, provided we document
that the signal must not have SA_RESTART set, and running the signal
handler on libuv worker threads must be harmless.

The uv__work struct gets two new fields on Unix: the thread that is
currently performing the work, and an atomic state field.  If the work
is not cancellable, it has state UV__WORK_BUSY the entire time.

If the work is cancellable, it is inserted into the work queue with
state UV__WORK_CANCELLABLE.  When we want to cancel the work, we first
check if the work is still in the queue (holding the queue lock), and if
it is not, we atomically check for UV__WORK_CANCELLABLE and swap in
UV__WORK_CANCEL_PENDING.  If that check succeeded, we signal the thread
with pthread_kill (the signal handler does nothing; we need only kick
the thread out of an interruptable syscall with EINTR).

In the worker thread, we check for UV__WORK_CANCEL_PENDING before and
after the syscall, setting it to UV__WORK_CANCELLED if we avoided
running the syscall, or if we got EINTR as the return value.  Since
there is a race between checking the flag and entering the syscall, we
prevent deadlocks by spinning on pthread_kill in uv__work_cancel, with
exponential backoff.  We wait a maximum of 1024 ms before returning
UV_EBUSY.
This commit is contained in:
Sam Schweigel 2026-01-02 14:10:43 -08:00
parent 92f5f1ce5c
commit 6d0459bccf
10 changed files with 169 additions and 23 deletions

View File

@ -89,10 +89,18 @@ API
- UV_LOOP_USE_IO_URING_SQPOLL: Enable SQPOLL io_uring instance to handle
asynchronous file system operations.
- UV_LOOP_CANCEL_SIGNAL: Allow blocking IO to be cancelled. The argument is
the signal number to use for cancellation.
If this is not configured, a :c:type:`uv_stream_t` in blocking mode can
cause a thread in the thread pool to become permanently blocked.
.. versionchanged:: 1.39.0 added the UV_METRICS_IDLE_TIME option.
.. versionchanged:: 1.49.0 added the UV_LOOP_USE_IO_URING_SQPOLL option.
.. versionchanged:: 1.52.0 added the UV_LOOP_CANCEL_SIGNAL option.
.. c:function:: int uv_loop_close(uv_loop_t* loop)
Releases all internal loop resources. Call this function only when the loop

View File

@ -263,8 +263,9 @@ typedef struct uv_metrics_s uv_metrics_t;
typedef enum {
UV_LOOP_BLOCK_SIGNAL = 0,
UV_METRICS_IDLE_TIME,
UV_LOOP_USE_IO_URING_SQPOLL
UV_LOOP_USE_IO_URING_SQPOLL,
#define UV_LOOP_USE_IO_URING_SQPOLL UV_LOOP_USE_IO_URING_SQPOLL
UV_LOOP_CANCEL_SIGNAL
} uv_loop_option;
typedef enum {

View File

@ -27,11 +27,23 @@
#ifndef UV_THREADPOOL_H_
#define UV_THREADPOOL_H_
enum {
UV__WORK_BUSY = 0,
UV__WORK_CANCELLABLE,
UV__WORK_CANCEL_PENDING,
UV__WORK_CANCELLED,
UV__WORK_DONE
};
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
struct uv__queue wq;
#ifndef _WIN32
pthread_t thread;
_Atomic char state;
#endif
};
#endif /* UV_THREADPOOL_H_ */

View File

@ -245,6 +245,7 @@ typedef struct {
uv__io_t signal_io_watcher; \
uv_signal_t child_watcher; \
int emfile_fd; \
int cancel_signum; \
UV_PLATFORM_LOOP_FIELDS \
#define UV_REQ_TYPE_PRIVATE /* empty */

View File

@ -58,10 +58,17 @@ static void worker(void* arg) {
struct uv__work* w;
struct uv__queue* q;
int is_slow_work;
#ifndef _WIN32
pthread_t self;
char expected;
#endif
uv_thread_setname("libuv-worker");
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
#ifndef _WIN32
self = pthread_self();
#endif
uv_mutex_lock(&mutex);
for (;;) {
@ -117,14 +124,31 @@ static void worker(void* arg) {
}
}
w = uv__queue_data(q, struct uv__work, wq);
#ifndef _WIN32
w->thread = self;
#endif
uv_mutex_unlock(&mutex);
w = uv__queue_data(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
#ifndef _WIN32
expected = UV__WORK_CANCELLABLE;
if (!atomic_compare_exchange_strong_explicit(&w->state,
&expected,
UV__WORK_DONE,
memory_order_relaxed,
memory_order_relaxed)) {
if (expected == UV__WORK_CANCEL_PENDING)
atomic_store_explicit(&w->state,
UV__WORK_CANCELLED,
memory_order_relaxed);
}
#endif
uv__queue_insert_tail(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
@ -273,15 +297,31 @@ static void init_once(void) {
}
#ifndef _WIN32
void uv__cancel_signal_handler(int signo) {
/* We just want to trigger EINTR. */
}
#endif
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
#ifndef _WIN32
char state;
#endif
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
#ifndef _WIN32
state = kind == UV__WORK_FAST_IO_CANCELLABLE ? UV__WORK_CANCELLABLE
: UV__WORK_BUSY;
atomic_store_explicit(&w->state, state, memory_order_relaxed);
#endif
post(&w->wq, kind);
}
@ -289,8 +329,13 @@ void uv__work_submit(uv_loop_t* loop,
/* TODO(bnoordhuis) teach libuv how to cancel file operations
* that go through io_uring instead of the thread pool.
*/
static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
int uv__work_cancel(uv_loop_t* loop, struct uv__work* w) {
int cancelled;
#ifndef _WIN32
char expected;
pthread_t thread;
int i;
#endif
uv_once(&once, init_once); /* Ensure |mutex| is initialized. */
uv_mutex_lock(&mutex);
@ -303,8 +348,39 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_unlock(&mutex);
#ifdef _WIN32
if (!cancelled)
return UV_EBUSY;
#else
if (!cancelled) {
if (atomic_load_explicit(&w->state, memory_order_relaxed) == UV__WORK_BUSY)
return UV_EBUSY;
if (loop->cancel_signum == -1)
return UV_EBUSY;
expected = UV__WORK_CANCELLABLE;
if (atomic_compare_exchange_strong_explicit(&w->state,
&expected,
UV__WORK_CANCEL_PENDING,
memory_order_relaxed,
memory_order_relaxed)) {
thread = w->thread;
i = 0;
do {
if (i >= 10)
return UV_EBUSY;
if (i > 0)
uv_sleep(1 << i++);
if (pthread_kill(thread, loop->cancel_signum) != 0)
abort();
} while (atomic_load_explicit(&w->state, memory_order_relaxed) ==
UV__WORK_CANCEL_PENDING);
}
return 0;
}
#endif
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
@ -337,6 +413,10 @@ void uv__work_done(uv_async_t* handle) {
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
#ifndef _WIN32
if (atomic_load_explicit(&w->state, memory_order_relaxed) == UV__WORK_CANCELLED)
err = UV_ECANCELED;
#endif
w->done(w, err);
nevents++;
}
@ -425,5 +505,5 @@ int uv_cancel(uv_req_t* req) {
return UV_EINVAL;
}
return uv__work_cancel(loop, req, wreq);
return uv__work_cancel(loop, wreq);
}

View File

@ -589,4 +589,16 @@ int uv__get_constrained_cpu(long long* quota);
#define UV__KQUEUE_EVFILT_USER 0
#endif
static inline int uv__work_check_cancelled(struct uv__work* w) {
if (w == NULL)
return 0;
if (atomic_load_explicit(&w->state, memory_order_relaxed) !=
UV__WORK_CANCEL_PENDING)
return 0;
atomic_store_explicit(&w->state, UV__WORK_CANCELLED, memory_order_relaxed);
return 1;
}
#endif /* UV_UNIX_INTERNAL_H_ */

View File

@ -23,6 +23,7 @@
#include "uv/tree.h"
#include "internal.h"
#include "heap-inl.h"
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@ -72,6 +73,7 @@ int uv_loop_init(uv_loop_t* loop) {
loop->signal_pipefd[1] = -1;
loop->backend_fd = -1;
loop->emfile_fd = -1;
loop->cancel_signum = -1;
loop->timer_counter = 0;
loop->stop_flag = 0;
@ -213,6 +215,7 @@ void uv__loop_close(uv_loop_t* loop) {
int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap) {
uv__loop_internal_fields_t* lfields;
struct sigaction sa;
lfields = uv__get_internal_fields(loop);
if (option == UV_METRICS_IDLE_TIME) {
@ -227,13 +230,23 @@ int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap) {
}
#endif
if (option == UV_LOOP_BLOCK_SIGNAL) {
if (va_arg(ap, int) != SIGPROF)
return UV_EINVAL;
loop->flags |= UV_LOOP_BLOCK_SIGPROF;
return 0;
}
if (option != UV_LOOP_BLOCK_SIGNAL)
return UV_ENOSYS;
if (option == UV_LOOP_CANCEL_SIGNAL) {
loop->cancel_signum = va_arg(ap, int);
memset(&sa, 0, sizeof sa);
sa.sa_handler = &uv__cancel_signal_handler;
if (sigaction(loop->cancel_signum, &sa, NULL) == -1) {
loop->cancel_signum = -1;
return UV__ERR(errno);
}
return 0;
}
if (va_arg(ap, int) != SIGPROF)
return UV_EINVAL;
loop->flags |= UV_LOOP_BLOCK_SIGPROF;
return 0;
return UV_ENOSYS;
}

View File

@ -759,10 +759,12 @@ static int uv__handle_fd(uv_handle_t* handle) {
}
}
static int uv__try_write(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle) {
uv_stream_t* send_handle,
struct uv__work* w) {
struct iovec* iov;
int iovmax;
int iovcnt;
@ -774,6 +776,7 @@ static int uv__try_write(uv_stream_t* stream,
*/
iov = (struct iovec*) bufs;
iovcnt = nbufs;
n = 0;
iovmax = uv__getiovmax();
@ -813,13 +816,17 @@ static int uv__try_write(uv_stream_t* stream,
cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send));
memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send));
do
do {
if (uv__work_check_cancelled(w))
break;
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && errno == EINTR);
} while (n == -1 && errno == EINTR);
} else {
do
do {
if (uv__work_check_cancelled(w))
break;
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && errno == EINTR);
} while (n == -1 && errno == EINTR);
}
if (n >= 0)
@ -845,13 +852,13 @@ static int uv__try_write(uv_stream_t* stream,
return UV__ERR(errno);
}
/* A note about blocking writes: The UV_HANDLE_WRITE_PENDING flag is toggled
* only on the loop thread (either by uv__write or uv__write_done). While we're
* doing work from the thread pool, we touch only the result field of
* uv_write_t; we'll never read it from the loop thread while a blocked write is
* pending.
*/
static void uv__write_work(struct uv__work* w) {
uv_stream_t* stream;
struct uv__queue* q;
@ -867,9 +874,11 @@ static void uv__write_work(struct uv__work* w) {
req->result = uv__try_write(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
req->send_handle,
w);
}
static void uv__write_done(struct uv__work* w, int status) {
uv_stream_t* stream;
struct uv__queue* q;
@ -907,6 +916,7 @@ error:
uv__stream_osx_interrupt_select(stream);
}
static void uv__write(uv_stream_t* stream) {
struct uv__queue* q;
uv_write_t* req;
@ -933,14 +943,15 @@ static void uv__write(uv_stream_t* stream) {
n = uv__try_write(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
req->send_handle,
NULL);
} else {
n = UV_EAGAIN;
if (!(stream->flags & UV_HANDLE_WRITE_PENDING)) {
stream->flags |= UV_HANDLE_WRITE_PENDING;
uv__work_submit(stream->loop,
&stream->blocked_write,
UV__WORK_FAST_IO,
UV__WORK_FAST_IO_CANCELLABLE,
uv__write_work,
uv__write_done);
}
@ -1504,7 +1515,7 @@ int uv_try_write2(uv_stream_t* stream,
if (err < 0)
return err;
return uv__try_write(stream, bufs, nbufs, send_handle);
return uv__try_write(stream, bufs, nbufs, send_handle, NULL);
}
@ -1620,8 +1631,8 @@ void uv__stream_close(uv_stream_t* handle) {
handle->queued_fds = NULL;
}
/* if (handle->flags & UV_HANDLE_WRITE_PENDING)
* uv_cancel((uv_req_t*)handle->blocked_write); */
if (handle->flags & UV_HANDLE_WRITE_PENDING)
uv__work_cancel(handle->loop, &handle->blocked_write);
assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}

View File

@ -215,15 +215,20 @@ int uv__getaddrinfo_translate_error(int sys_err); /* EAI_* error. */
enum uv__work_kind {
UV__WORK_CPU,
UV__WORK_FAST_IO,
UV__WORK_FAST_IO_CANCELLABLE,
UV__WORK_SLOW_IO
};
void uv__cancel_signal_handler(int signo);
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
enum uv__work_kind kind,
void (*work)(struct uv__work *w),
void (*done)(struct uv__work *w, int status));
int uv__work_cancel(uv_loop_t* loop, struct uv__work* w);
void uv__work_done(uv_async_t* handle);
size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs);

View File

@ -72,6 +72,9 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
static void init_common(void) {
loop = uv_default_loop();
#ifndef _WIN32
uv_loop_configure(loop, UV_LOOP_CANCEL_SIGNAL, SIGUSR1);
#endif
ASSERT_OK(uv_pipe_init(loop, &pipe_in, 0));
ASSERT_OK(uv_pipe_init(loop, &pipe_out, 0));