unix: implement async handles in libuv

Replace libev backed async handles with a pure libuv implementation.
This commit is contained in:
Ben Noordhuis 2012-06-11 04:13:57 +02:00
parent ddb5f55922
commit 78bc0d6134
3 changed files with 89 additions and 21 deletions

View File

@ -38,6 +38,7 @@
#include <semaphore.h>
#include <pthread.h>
#include <signal.h>
#if __sun
# include <sys/port.h>
@ -113,6 +114,9 @@ struct uv__io_s {
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
ngx_queue_t async_handles; \
uv__io_t async_watcher; \
int async_pipefd[2]; \
/* RB_HEAD(uv__timers, uv_timer_s) */ \
struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \
uint64_t time; \
@ -211,9 +215,10 @@ struct uv__io_s {
/* UV_ASYNC */
#define UV_ASYNC_PRIVATE_FIELDS \
ev_async async_watcher; \
uv_async_cb async_cb;
#define UV_ASYNC_PRIVATE_FIELDS \
volatile sig_atomic_t pending; \
uv_async_cb async_cb; \
ngx_queue_t queue;
/* UV_TIMER */

View File

@ -21,39 +21,99 @@
#include "uv.h"
#include "internal.h"
#include <errno.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
static void uv__async(EV_P_ ev_async* w, int revents) {
uv_async_t* async = container_of(w, uv_async_t, async_watcher);
if (async->async_cb) {
async->async_cb(async, 0);
}
}
static int uv__async_init(uv_loop_t* loop);
static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events);
int uv_async_init(uv_loop_t* loop, uv_async_t* async, uv_async_cb async_cb) {
uv__handle_init(loop, (uv_handle_t*)async, UV_ASYNC);
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
if (uv__async_init(loop))
return uv__set_sys_error(loop, errno);
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
loop->counters.async_init++;
ev_async_init(&async->async_watcher, uv__async);
async->async_cb = async_cb;
handle->async_cb = async_cb;
handle->pending = 0;
/* Note: This does not have symmetry with the other libev wrappers. */
ev_async_start(loop->ev, &async->async_watcher);
uv__handle_start(async);
ngx_queue_insert_tail(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
return 0;
}
int uv_async_send(uv_async_t* async) {
ev_async_send(async->loop->ev, &async->async_watcher);
int uv_async_send(uv_async_t* handle) {
int r;
handle->pending = 1; /* XXX needs a memory barrier? */
do
r = write(handle->loop->async_pipefd[1], "x", 1);
while (r == -1 && errno == EINTR);
if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
return uv__set_sys_error(handle->loop, errno);
return 0;
}
void uv__async_close(uv_async_t* handle) {
ev_async_stop(handle->loop->ev, &handle->async_watcher);
uv__handle_ref(handle);
ngx_queue_remove(&handle->queue);
uv__handle_stop(handle);
}
static int uv__async_init(uv_loop_t* loop) {
if (loop->async_pipefd[0] != -1)
return 0;
if (uv__make_pipe(loop->async_pipefd, UV__F_NONBLOCK))
return -1;
uv__io_init(&loop->async_watcher,
uv__async_io,
loop->async_pipefd[0],
UV__IO_READ);
uv__io_start(loop, &loop->async_watcher);
return 0;
}
static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) {
char buf[1024];
ngx_queue_t* q;
uv_async_t* h;
ssize_t r;
while (1) {
r = read(loop->async_pipefd[0], buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
ngx_queue_foreach(q, &loop->async_handles) {
h = ngx_queue_data(q, uv_async_t, queue);
if (!h->pending) continue;
h->pending = 0;
h->async_cb(h, 0);
}
}

View File

@ -40,12 +40,15 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->active_reqs);
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->async_handles);
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
ngx_queue_init(&loop->handle_queue);
loop->closing_handles = NULL;
loop->channel = NULL;
loop->time = uv_hrtime() / 1000000;
loop->async_pipefd[0] = -1;
loop->async_pipefd[1] = -1;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);
eio_channel_init(&loop->uv_eio_channel, loop);