unix: add custom thread pool

This commit is contained in:
Ben Noordhuis 2012-10-01 00:38:39 +02:00
parent 1020a1a82e
commit f35a4b628a
6 changed files with 182 additions and 1 deletions

View File

@ -42,6 +42,7 @@ OBJS += src/unix/signal.o
OBJS += src/unix/stream.o
OBJS += src/unix/tcp.o
OBJS += src/unix/thread.o
OBJS += src/unix/threadpool.o
OBJS += src/unix/timer.o
OBJS += src/unix/tty.o
OBJS += src/unix/udp.o

View File

@ -54,6 +54,13 @@ struct uv__io_s {
ev_io io_watcher;
};
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w);
struct uv_loop_s* loop;
ngx_queue_t wq;
};
#if defined(__linux__)
# include "uv-linux.h"
#elif defined(__sun)
@ -122,6 +129,9 @@ typedef struct {
uv_async_t uv_eio_want_poll_notifier; \
uv_async_t uv_eio_done_poll_notifier; \
uv_idle_t uv_eio_poller; \
ngx_queue_t wq; \
uv_mutex_t wq_mutex; \
uv_async_t wq_async; \
uv_handle_t* closing_handles; \
ngx_queue_t process_handles[1]; \
ngx_queue_t prepare_handles; \

View File

@ -165,6 +165,13 @@ unsigned int uv__next_timeout(uv_loop_t* loop);
void uv__signal_close(uv_signal_t* handle);
void uv__signal_unregister(uv_loop_t* loop);
/* thread pool */
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
void (*work)(struct uv__work *w),
void (*done)(struct uv__work *w));
void uv__work_done(uv_async_t* handle, int status);
/* platform specific */
int uv__platform_loop_init(uv_loop_t* loop, int default_loop);
void uv__platform_loop_delete(uv_loop_t* loop);

View File

@ -38,8 +38,8 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
#endif
memset(loop, 0, sizeof(*loop));
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->wq);
ngx_queue_init(&loop->active_reqs);
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->async_handles);
@ -63,6 +63,15 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
for (i = 0; i < ARRAY_SIZE(loop->process_handles); i++)
ngx_queue_init(loop->process_handles + i);
if (uv_mutex_init(&loop->wq_mutex))
abort();
if (uv_async_init(loop, &loop->wq_async, uv__work_done))
abort();
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV__HANDLE_INTERNAL;
if (uv__platform_loop_init(loop, default_loop))
return -1;
@ -89,4 +98,9 @@ void uv__loop_delete(uv_loop_t* loop) {
close(loop->emfile_fd);
loop->emfile_fd = -1;
}
uv_mutex_lock(&loop->wq_mutex);
assert(ngx_queue_empty(&loop->wq) && "thread pool work queue not empty!");
uv_mutex_unlock(&loop->wq_mutex);
uv_mutex_destroy(&loop->wq_mutex);
}

148
src/unix/threadpool.c Normal file
View File

@ -0,0 +1,148 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "internal.h"
#include <errno.h>
#include <pthread.h>
/* TODO add condvar support to libuv */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_once_t once = PTHREAD_ONCE_INIT;
static pthread_t threads[4];
static ngx_queue_t exit_message;
static ngx_queue_t wq = { &wq, &wq };
static void* worker(void* arg) {
struct uv__work* w;
ngx_queue_t* q;
(void) arg;
for (;;) {
if (pthread_mutex_lock(&mutex))
abort();
while (ngx_queue_empty(&wq))
if (pthread_cond_wait(&cond, &mutex))
abort();
q = ngx_queue_head(&wq);
if (q == &exit_message)
pthread_cond_signal(&cond);
else
ngx_queue_remove(q);
if (pthread_mutex_unlock(&mutex))
abort();
if (q == &exit_message)
break;
w = ngx_queue_data(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
ngx_queue_insert_tail(&w->loop->wq, &w->wq);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_async_send(&w->loop->wq_async);
}
return NULL;
}
static void post(ngx_queue_t* q) {
pthread_mutex_lock(&mutex);
ngx_queue_insert_tail(&wq, q);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
static void init_once(void) {
unsigned int i;
ngx_queue_init(&wq);
for (i = 0; i < ARRAY_SIZE(threads); i++)
if (pthread_create(threads + i, NULL, worker, NULL))
abort();
}
__attribute__((destructor))
static void cleanup(void) {
unsigned int i;
int err;
post(&exit_message);
for (i = 0; i < ARRAY_SIZE(threads); i++) {
err = pthread_join(threads[i], NULL);
if (err == 0 || err == ESRCH)
continue;
abort();
}
}
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w)) {
pthread_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq);
}
void uv__work_done(uv_async_t* handle, int status) {
struct uv__work* w;
uv_loop_t* loop;
ngx_queue_t* q;
ngx_queue_t wq;
loop = container_of(handle, uv_loop_t, wq_async);
ngx_queue_init(&wq);
uv_mutex_lock(&loop->wq_mutex);
if (!ngx_queue_empty(&loop->wq)) {
q = ngx_queue_head(&loop->wq);
ngx_queue_split(&loop->wq, q, &wq);
}
uv_mutex_unlock(&loop->wq_mutex);
while (!ngx_queue_empty(&wq)) {
q = ngx_queue_head(&wq);
ngx_queue_remove(q);
w = container_of(q, struct uv__work, wq);
w->done(w);
}
}

1
uv.gyp
View File

@ -138,6 +138,7 @@
'src/unix/stream.c',
'src/unix/tcp.c',
'src/unix/thread.c',
'src/unix/threadpool.c',
'src/unix/timer.c',
'src/unix/tty.c',
'src/unix/udp.c',