diff --git a/config-unix.mk b/config-unix.mk index 2deb3cdbd..28478c6c8 100644 --- a/config-unix.mk +++ b/config-unix.mk @@ -42,6 +42,7 @@ OBJS += src/unix/signal.o OBJS += src/unix/stream.o OBJS += src/unix/tcp.o OBJS += src/unix/thread.o +OBJS += src/unix/threadpool.o OBJS += src/unix/timer.o OBJS += src/unix/tty.o OBJS += src/unix/udp.o diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 71aee366c..4daf20e27 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -54,6 +54,13 @@ struct uv__io_s { ev_io io_watcher; }; +struct uv__work { + void (*work)(struct uv__work *w); + void (*done)(struct uv__work *w); + struct uv_loop_s* loop; + ngx_queue_t wq; +}; + #if defined(__linux__) # include "uv-linux.h" #elif defined(__sun) @@ -122,6 +129,9 @@ typedef struct { uv_async_t uv_eio_want_poll_notifier; \ uv_async_t uv_eio_done_poll_notifier; \ uv_idle_t uv_eio_poller; \ + ngx_queue_t wq; \ + uv_mutex_t wq_mutex; \ + uv_async_t wq_async; \ uv_handle_t* closing_handles; \ ngx_queue_t process_handles[1]; \ ngx_queue_t prepare_handles; \ diff --git a/src/unix/internal.h b/src/unix/internal.h index 20dd23c71..20f12bcd1 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -165,6 +165,13 @@ unsigned int uv__next_timeout(uv_loop_t* loop); void uv__signal_close(uv_signal_t* handle); void uv__signal_unregister(uv_loop_t* loop); +/* thread pool */ +void uv__work_submit(uv_loop_t* loop, + struct uv__work *w, + void (*work)(struct uv__work *w), + void (*done)(struct uv__work *w)); +void uv__work_done(uv_async_t* handle, int status); + /* platform specific */ int uv__platform_loop_init(uv_loop_t* loop, int default_loop); void uv__platform_loop_delete(uv_loop_t* loop); diff --git a/src/unix/loop.c b/src/unix/loop.c index db85d2871..d757a7187 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -38,8 +38,8 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { #endif memset(loop, 0, sizeof(*loop)); - RB_INIT(&loop->timer_handles); + ngx_queue_init(&loop->wq); ngx_queue_init(&loop->active_reqs); ngx_queue_init(&loop->idle_handles); ngx_queue_init(&loop->async_handles); @@ -63,6 +63,15 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { for (i = 0; i < ARRAY_SIZE(loop->process_handles); i++) ngx_queue_init(loop->process_handles + i); + if (uv_mutex_init(&loop->wq_mutex)) + abort(); + + if (uv_async_init(loop, &loop->wq_async, uv__work_done)) + abort(); + + uv__handle_unref(&loop->wq_async); + loop->wq_async.flags |= UV__HANDLE_INTERNAL; + if (uv__platform_loop_init(loop, default_loop)) return -1; @@ -89,4 +98,9 @@ void uv__loop_delete(uv_loop_t* loop) { close(loop->emfile_fd); loop->emfile_fd = -1; } + + uv_mutex_lock(&loop->wq_mutex); + assert(ngx_queue_empty(&loop->wq) && "thread pool work queue not empty!"); + uv_mutex_unlock(&loop->wq_mutex); + uv_mutex_destroy(&loop->wq_mutex); } diff --git a/src/unix/threadpool.c b/src/unix/threadpool.c new file mode 100644 index 000000000..0e69376cc --- /dev/null +++ b/src/unix/threadpool.c @@ -0,0 +1,148 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "internal.h" + +#include +#include + +/* TODO add condvar support to libuv */ +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +static pthread_once_t once = PTHREAD_ONCE_INIT; +static pthread_t threads[4]; +static ngx_queue_t exit_message; +static ngx_queue_t wq = { &wq, &wq }; + + +static void* worker(void* arg) { + struct uv__work* w; + ngx_queue_t* q; + + (void) arg; + + for (;;) { + if (pthread_mutex_lock(&mutex)) + abort(); + + while (ngx_queue_empty(&wq)) + if (pthread_cond_wait(&cond, &mutex)) + abort(); + + q = ngx_queue_head(&wq); + + if (q == &exit_message) + pthread_cond_signal(&cond); + else + ngx_queue_remove(q); + + if (pthread_mutex_unlock(&mutex)) + abort(); + + if (q == &exit_message) + break; + + w = ngx_queue_data(q, struct uv__work, wq); + w->work(w); + + uv_mutex_lock(&w->loop->wq_mutex); + ngx_queue_insert_tail(&w->loop->wq, &w->wq); + uv_mutex_unlock(&w->loop->wq_mutex); + uv_async_send(&w->loop->wq_async); + } + + return NULL; +} + + +static void post(ngx_queue_t* q) { + pthread_mutex_lock(&mutex); + ngx_queue_insert_tail(&wq, q); + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); +} + + +static void init_once(void) { + unsigned int i; + + ngx_queue_init(&wq); + + for (i = 0; i < ARRAY_SIZE(threads); i++) + if (pthread_create(threads + i, NULL, worker, NULL)) + abort(); +} + + +__attribute__((destructor)) +static void cleanup(void) { + unsigned int i; + int err; + + post(&exit_message); + + for (i = 0; i < ARRAY_SIZE(threads); i++) { + err = pthread_join(threads[i], NULL); + + if (err == 0 || err == ESRCH) + continue; + + abort(); + } +} + + +void uv__work_submit(uv_loop_t* loop, + struct uv__work* w, + void (*work)(struct uv__work* w), + void (*done)(struct uv__work* w)) { + pthread_once(&once, init_once); + w->loop = loop; + w->work = work; + w->done = done; + post(&w->wq); +} + + +void uv__work_done(uv_async_t* handle, int status) { + struct uv__work* w; + uv_loop_t* loop; + ngx_queue_t* q; + ngx_queue_t wq; + + loop = container_of(handle, uv_loop_t, wq_async); + ngx_queue_init(&wq); + + uv_mutex_lock(&loop->wq_mutex); + if (!ngx_queue_empty(&loop->wq)) { + q = ngx_queue_head(&loop->wq); + ngx_queue_split(&loop->wq, q, &wq); + } + uv_mutex_unlock(&loop->wq_mutex); + + while (!ngx_queue_empty(&wq)) { + q = ngx_queue_head(&wq); + ngx_queue_remove(q); + + w = container_of(q, struct uv__work, wq); + w->done(w); + } +} diff --git a/uv.gyp b/uv.gyp index 15fa7d5a9..4d7f54948 100644 --- a/uv.gyp +++ b/uv.gyp @@ -138,6 +138,7 @@ 'src/unix/stream.c', 'src/unix/tcp.c', 'src/unix/thread.c', + 'src/unix/threadpool.c', 'src/unix/timer.c', 'src/unix/tty.c', 'src/unix/udp.c',