diff --git a/docs/src/loop.rst b/docs/src/loop.rst index 3d1973ba4..466ec27ee 100644 --- a/docs/src/loop.rst +++ b/docs/src/loop.rst @@ -89,10 +89,18 @@ API - UV_LOOP_USE_IO_URING_SQPOLL: Enable SQPOLL io_uring instance to handle asynchronous file system operations. + - UV_LOOP_CANCEL_SIGNAL: Allow blocking IO to be cancelled. The argument is + the signal number to use for cancellation. + + If this is not configured, a :c:type:`uv_stream_t` in blocking mode can + cause a thread in the thread pool to become permanently blocked. + .. versionchanged:: 1.39.0 added the UV_METRICS_IDLE_TIME option. .. versionchanged:: 1.49.0 added the UV_LOOP_USE_IO_URING_SQPOLL option. + .. versionchanged:: 1.52.0 added the UV_LOOP_CANCEL_SIGNAL option. + .. c:function:: int uv_loop_close(uv_loop_t* loop) Releases all internal loop resources. Call this function only when the loop diff --git a/include/uv.h b/include/uv.h index d1a77c749..5961e5a5b 100644 --- a/include/uv.h +++ b/include/uv.h @@ -263,8 +263,9 @@ typedef struct uv_metrics_s uv_metrics_t; typedef enum { UV_LOOP_BLOCK_SIGNAL = 0, UV_METRICS_IDLE_TIME, - UV_LOOP_USE_IO_URING_SQPOLL + UV_LOOP_USE_IO_URING_SQPOLL, #define UV_LOOP_USE_IO_URING_SQPOLL UV_LOOP_USE_IO_URING_SQPOLL + UV_LOOP_CANCEL_SIGNAL } uv_loop_option; typedef enum { diff --git a/include/uv/threadpool.h b/include/uv/threadpool.h index 24ce916fd..2692fc6c4 100644 --- a/include/uv/threadpool.h +++ b/include/uv/threadpool.h @@ -27,11 +27,23 @@ #ifndef UV_THREADPOOL_H_ #define UV_THREADPOOL_H_ +enum { + UV__WORK_BUSY = 0, + UV__WORK_CANCELLABLE, + UV__WORK_CANCEL_PENDING, + UV__WORK_CANCELLED, + UV__WORK_DONE +}; + struct uv__work { void (*work)(struct uv__work *w); void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; struct uv__queue wq; +#ifndef _WIN32 + pthread_t thread; + _Atomic char state; +#endif }; #endif /* UV_THREADPOOL_H_ */ diff --git a/include/uv/unix.h b/include/uv/unix.h index 2cb6e043a..a86c3e79c 100644 --- a/include/uv/unix.h +++ b/include/uv/unix.h @@ -245,6 +245,7 @@ typedef struct { uv__io_t signal_io_watcher; \ uv_signal_t child_watcher; \ int emfile_fd; \ + int cancel_signum; \ UV_PLATFORM_LOOP_FIELDS \ #define UV_REQ_TYPE_PRIVATE /* empty */ diff --git a/src/threadpool.c b/src/threadpool.c index 2a129a5d4..8b6383931 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -58,10 +58,17 @@ static void worker(void* arg) { struct uv__work* w; struct uv__queue* q; int is_slow_work; +#ifndef _WIN32 + pthread_t self; + char expected; +#endif uv_thread_setname("libuv-worker"); uv_sem_post((uv_sem_t*) arg); arg = NULL; +#ifndef _WIN32 + self = pthread_self(); +#endif uv_mutex_lock(&mutex); for (;;) { @@ -117,14 +124,31 @@ static void worker(void* arg) { } } + w = uv__queue_data(q, struct uv__work, wq); +#ifndef _WIN32 + w->thread = self; +#endif + uv_mutex_unlock(&mutex); - w = uv__queue_data(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ +#ifndef _WIN32 + expected = UV__WORK_CANCELLABLE; + if (!atomic_compare_exchange_strong_explicit(&w->state, + &expected, + UV__WORK_DONE, + memory_order_relaxed, + memory_order_relaxed)) { + if (expected == UV__WORK_CANCEL_PENDING) + atomic_store_explicit(&w->state, + UV__WORK_CANCELLED, + memory_order_relaxed); + } +#endif uv__queue_insert_tail(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); @@ -273,15 +297,31 @@ static void init_once(void) { } +#ifndef _WIN32 +void uv__cancel_signal_handler(int signo) { + /* We just want to trigger EINTR. */ +} +#endif + + void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { +#ifndef _WIN32 + char state; +#endif + uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; +#ifndef _WIN32 + state = kind == UV__WORK_FAST_IO_CANCELLABLE ? UV__WORK_CANCELLABLE + : UV__WORK_BUSY; + atomic_store_explicit(&w->state, state, memory_order_relaxed); +#endif post(&w->wq, kind); } @@ -289,8 +329,13 @@ void uv__work_submit(uv_loop_t* loop, /* TODO(bnoordhuis) teach libuv how to cancel file operations * that go through io_uring instead of the thread pool. */ -static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { +int uv__work_cancel(uv_loop_t* loop, struct uv__work* w) { int cancelled; +#ifndef _WIN32 + char expected; + pthread_t thread; + int i; +#endif uv_once(&once, init_once); /* Ensure |mutex| is initialized. */ uv_mutex_lock(&mutex); @@ -303,8 +348,39 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { uv_mutex_unlock(&w->loop->wq_mutex); uv_mutex_unlock(&mutex); +#ifdef _WIN32 if (!cancelled) return UV_EBUSY; +#else + if (!cancelled) { + if (atomic_load_explicit(&w->state, memory_order_relaxed) == UV__WORK_BUSY) + return UV_EBUSY; + + if (loop->cancel_signum == -1) + return UV_EBUSY; + + expected = UV__WORK_CANCELLABLE; + if (atomic_compare_exchange_strong_explicit(&w->state, + &expected, + UV__WORK_CANCEL_PENDING, + memory_order_relaxed, + memory_order_relaxed)) { + thread = w->thread; + i = 0; + do { + if (i >= 10) + return UV_EBUSY; + if (i > 0) + uv_sleep(1 << i++); + if (pthread_kill(thread, loop->cancel_signum) != 0) + abort(); + } while (atomic_load_explicit(&w->state, memory_order_relaxed) == + UV__WORK_CANCEL_PENDING); + } + + return 0; + } +#endif w->work = uv__cancelled; uv_mutex_lock(&loop->wq_mutex); @@ -337,6 +413,10 @@ void uv__work_done(uv_async_t* handle) { w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; +#ifndef _WIN32 + if (atomic_load_explicit(&w->state, memory_order_relaxed) == UV__WORK_CANCELLED) + err = UV_ECANCELED; +#endif w->done(w, err); nevents++; } @@ -425,5 +505,5 @@ int uv_cancel(uv_req_t* req) { return UV_EINVAL; } - return uv__work_cancel(loop, req, wreq); + return uv__work_cancel(loop, wreq); } diff --git a/src/unix/internal.h b/src/unix/internal.h index c9704a8bd..006b21d09 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -589,4 +589,16 @@ int uv__get_constrained_cpu(long long* quota); #define UV__KQUEUE_EVFILT_USER 0 #endif +static inline int uv__work_check_cancelled(struct uv__work* w) { + if (w == NULL) + return 0; + + if (atomic_load_explicit(&w->state, memory_order_relaxed) != + UV__WORK_CANCEL_PENDING) + return 0; + + atomic_store_explicit(&w->state, UV__WORK_CANCELLED, memory_order_relaxed); + return 1; +} + #endif /* UV_UNIX_INTERNAL_H_ */ diff --git a/src/unix/loop.c b/src/unix/loop.c index da9fa3c8b..bcc355087 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -23,6 +23,7 @@ #include "uv/tree.h" #include "internal.h" #include "heap-inl.h" +#include #include #include #include @@ -72,6 +73,7 @@ int uv_loop_init(uv_loop_t* loop) { loop->signal_pipefd[1] = -1; loop->backend_fd = -1; loop->emfile_fd = -1; + loop->cancel_signum = -1; loop->timer_counter = 0; loop->stop_flag = 0; @@ -213,6 +215,7 @@ void uv__loop_close(uv_loop_t* loop) { int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap) { uv__loop_internal_fields_t* lfields; + struct sigaction sa; lfields = uv__get_internal_fields(loop); if (option == UV_METRICS_IDLE_TIME) { @@ -227,13 +230,23 @@ int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap) { } #endif + if (option == UV_LOOP_BLOCK_SIGNAL) { + if (va_arg(ap, int) != SIGPROF) + return UV_EINVAL; + loop->flags |= UV_LOOP_BLOCK_SIGPROF; + return 0; + } - if (option != UV_LOOP_BLOCK_SIGNAL) - return UV_ENOSYS; + if (option == UV_LOOP_CANCEL_SIGNAL) { + loop->cancel_signum = va_arg(ap, int); + memset(&sa, 0, sizeof sa); + sa.sa_handler = &uv__cancel_signal_handler; + if (sigaction(loop->cancel_signum, &sa, NULL) == -1) { + loop->cancel_signum = -1; + return UV__ERR(errno); + } + return 0; + } - if (va_arg(ap, int) != SIGPROF) - return UV_EINVAL; - - loop->flags |= UV_LOOP_BLOCK_SIGPROF; - return 0; + return UV_ENOSYS; } diff --git a/src/unix/stream.c b/src/unix/stream.c index 98d3e2bf5..25d91cd4a 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -759,10 +759,12 @@ static int uv__handle_fd(uv_handle_t* handle) { } } + static int uv__try_write(uv_stream_t* stream, const uv_buf_t bufs[], unsigned int nbufs, - uv_stream_t* send_handle) { + uv_stream_t* send_handle, + struct uv__work* w) { struct iovec* iov; int iovmax; int iovcnt; @@ -774,6 +776,7 @@ static int uv__try_write(uv_stream_t* stream, */ iov = (struct iovec*) bufs; iovcnt = nbufs; + n = 0; iovmax = uv__getiovmax(); @@ -813,13 +816,17 @@ static int uv__try_write(uv_stream_t* stream, cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send)); memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send)); - do + do { + if (uv__work_check_cancelled(w)) + break; n = sendmsg(uv__stream_fd(stream), &msg, 0); - while (n == -1 && errno == EINTR); + } while (n == -1 && errno == EINTR); } else { - do + do { + if (uv__work_check_cancelled(w)) + break; n = uv__writev(uv__stream_fd(stream), iov, iovcnt); - while (n == -1 && errno == EINTR); + } while (n == -1 && errno == EINTR); } if (n >= 0) @@ -845,13 +852,13 @@ static int uv__try_write(uv_stream_t* stream, return UV__ERR(errno); } + /* A note about blocking writes: The UV_HANDLE_WRITE_PENDING flag is toggled * only on the loop thread (either by uv__write or uv__write_done). While we're * doing work from the thread pool, we touch only the result field of * uv_write_t; we'll never read it from the loop thread while a blocked write is * pending. */ - static void uv__write_work(struct uv__work* w) { uv_stream_t* stream; struct uv__queue* q; @@ -867,9 +874,11 @@ static void uv__write_work(struct uv__work* w) { req->result = uv__try_write(stream, &(req->bufs[req->write_index]), req->nbufs - req->write_index, - req->send_handle); + req->send_handle, + w); } + static void uv__write_done(struct uv__work* w, int status) { uv_stream_t* stream; struct uv__queue* q; @@ -907,6 +916,7 @@ error: uv__stream_osx_interrupt_select(stream); } + static void uv__write(uv_stream_t* stream) { struct uv__queue* q; uv_write_t* req; @@ -933,14 +943,15 @@ static void uv__write(uv_stream_t* stream) { n = uv__try_write(stream, &(req->bufs[req->write_index]), req->nbufs - req->write_index, - req->send_handle); + req->send_handle, + NULL); } else { n = UV_EAGAIN; if (!(stream->flags & UV_HANDLE_WRITE_PENDING)) { stream->flags |= UV_HANDLE_WRITE_PENDING; uv__work_submit(stream->loop, &stream->blocked_write, - UV__WORK_FAST_IO, + UV__WORK_FAST_IO_CANCELLABLE, uv__write_work, uv__write_done); } @@ -1504,7 +1515,7 @@ int uv_try_write2(uv_stream_t* stream, if (err < 0) return err; - return uv__try_write(stream, bufs, nbufs, send_handle); + return uv__try_write(stream, bufs, nbufs, send_handle, NULL); } @@ -1620,8 +1631,8 @@ void uv__stream_close(uv_stream_t* handle) { handle->queued_fds = NULL; } - /* if (handle->flags & UV_HANDLE_WRITE_PENDING) - * uv_cancel((uv_req_t*)handle->blocked_write); */ + if (handle->flags & UV_HANDLE_WRITE_PENDING) + uv__work_cancel(handle->loop, &handle->blocked_write); assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); } diff --git a/src/uv-common.h b/src/uv-common.h index 639df6ddf..07ec6a5ca 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -215,15 +215,20 @@ int uv__getaddrinfo_translate_error(int sys_err); /* EAI_* error. */ enum uv__work_kind { UV__WORK_CPU, UV__WORK_FAST_IO, + UV__WORK_FAST_IO_CANCELLABLE, UV__WORK_SLOW_IO }; +void uv__cancel_signal_handler(int signo); + void uv__work_submit(uv_loop_t* loop, struct uv__work *w, enum uv__work_kind kind, void (*work)(struct uv__work *w), void (*done)(struct uv__work *w, int status)); +int uv__work_cancel(uv_loop_t* loop, struct uv__work* w); + void uv__work_done(uv_async_t* handle); size_t uv__count_bufs(const uv_buf_t bufs[], unsigned int nbufs); diff --git a/test/test-pipe-blocking-subprocess.c b/test/test-pipe-blocking-subprocess.c index 24f241f35..01444ddb7 100644 --- a/test/test-pipe-blocking-subprocess.c +++ b/test/test-pipe-blocking-subprocess.c @@ -72,6 +72,9 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { static void init_common(void) { loop = uv_default_loop(); +#ifndef _WIN32 + uv_loop_configure(loop, UV_LOOP_CANCEL_SIGNAL, SIGUSR1); +#endif ASSERT_OK(uv_pipe_init(loop, &pipe_in, 0)); ASSERT_OK(uv_pipe_init(loop, &pipe_out, 0));