libuv/ol-unix.c

416 lines
8.4 KiB
C
Raw Normal View History

2011-03-24 00:56:16 +00:00
#include "ol.h"
2011-03-23 12:56:06 +00:00
2011-03-28 10:26:00 +00:00
#include <stdlib.h>
#include <errno.h>
#include <assert.h>
2011-03-28 10:54:18 +00:00
#include <string.h> /* strnlen */
2011-03-29 23:40:27 +00:00
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
2011-03-23 12:56:06 +00:00
2011-03-28 10:17:52 +00:00
void ol_tcp_io(EV_P_ ev_io* watcher, int revents);
void ol_tcp_connect(ol_handle* handle, ol_req* req);
2011-03-30 02:53:29 +00:00
ol_handle* ol_tcp_open(ol_handle* parent, int fd);
2011-03-28 10:54:18 +00:00
int ol_close_error(ol_handle* handle, ol_err err);
2011-03-23 12:56:06 +00:00
2011-03-28 10:17:52 +00:00
2011-03-29 17:08:45 +00:00
static ol_err ol_err_new(ol_handle* handle, int e) {
handle->_.err = e;
2011-03-28 10:54:18 +00:00
return e;
2011-03-23 12:56:06 +00:00
}
2011-03-29 17:08:45 +00:00
ol_err ol_err_last(ol_handle *handle) {
return handle->_.err;
}
2011-03-23 12:56:06 +00:00
2011-03-28 10:17:52 +00:00
struct sockaddr_in ol_ip4_addr(char *ip, int port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
return addr;
}
2011-03-28 10:54:18 +00:00
int ol_close(ol_handle* handle) {
return ol_close_error(handle, 0);
}
2011-03-29 17:08:45 +00:00
void ol_init() {
ev_default_loop(0);
}
2011-03-28 10:54:18 +00:00
int ol_run() {
2011-03-29 17:08:45 +00:00
ev_run(EV_DEFAULT_ 0);
2011-03-28 10:54:18 +00:00
}
2011-03-30 02:57:11 +00:00
ol_handle* ol_tcp_handle_new(ol_close_cb close_cb, void* data) {
2011-03-28 08:55:29 +00:00
ol_handle *handle = calloc(sizeof(ol_handle), 1);
handle->close_cb = close_cb;
handle->data = data;
2011-03-23 12:56:06 +00:00
2011-03-29 17:08:45 +00:00
handle->_.fd = -1;
2011-03-30 02:53:29 +00:00
ngx_queue_init(&handle->_.read_reqs);
2011-03-28 10:17:52 +00:00
ev_init(&handle->_.read_watcher, ol_tcp_io);
ev_init(&handle->_.write_watcher, ol_tcp_io);
2011-03-23 23:31:29 +00:00
2011-03-28 10:17:52 +00:00
return handle;
2011-03-23 12:56:06 +00:00
}
2011-03-29 17:08:45 +00:00
int ol_tcp_lazy_open(ol_handle* handle, int domain) {
assert(handle->_.fd < 0);
/* Lazily allocate a file descriptor for this handle */
2011-03-30 02:53:29 +00:00
int fd = socket(domain, SOCK_STREAM, 0);
/* Set non-blocking, etc */
ol_tcp_init_fd(fd);
handle->_.fd = fd;
2011-03-29 17:08:45 +00:00
return 0;
}
2011-03-28 10:17:52 +00:00
int ol_bind(ol_handle* handle, struct sockaddr* addr) {
2011-03-28 08:55:29 +00:00
int addrsize;
2011-03-29 17:08:45 +00:00
int domain;
2011-03-27 21:50:51 +00:00
2011-03-28 08:55:29 +00:00
if (addr->sa_family == AF_INET) {
2011-03-28 10:17:52 +00:00
addrsize = sizeof(struct sockaddr_in);
2011-03-29 17:08:45 +00:00
domain = AF_INET;
2011-03-28 08:55:29 +00:00
} else if (addr->sa_family == AF_INET6) {
2011-03-28 10:17:52 +00:00
addrsize = sizeof(struct sockaddr_in6);
2011-03-29 17:08:45 +00:00
domain = AF_INET6;
2011-03-28 08:55:29 +00:00
} else {
assert(0);
2011-03-28 10:17:52 +00:00
return -1;
2011-03-23 12:56:06 +00:00
}
2011-03-29 17:08:45 +00:00
int r = 0;
if (handle->_.fd < 0) {
r = ol_tcp_lazy_open(handle, domain);
if (r) {
return ol_err_new(handle, r);
}
}
r = bind(handle->_.fd, addr, addrsize);
2011-03-27 21:50:51 +00:00
2011-03-29 17:08:45 +00:00
return ol_err_new(handle, r);
}
2011-03-30 02:53:29 +00:00
int ol_tcp_init_fd(int fd) {
int r;
int yes = 1;
r = fcntl(fd, F_SETFL, O_NONBLOCK);
assert(r == 0);
r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
assert(r == 0);
return 0;
}
2011-03-29 17:08:45 +00:00
ol_handle* ol_tcp_open(ol_handle* parent, int fd) {
2011-03-30 02:57:11 +00:00
ol_handle* h = ol_tcp_handle_new(NULL, NULL);
2011-03-30 02:53:29 +00:00
if (!h) {
return NULL;
}
2011-03-29 17:08:45 +00:00
h->_.fd = fd;
2011-03-30 02:53:29 +00:00
/* Set non-blocking, etc */
ol_tcp_init_fd(fd);
2011-03-29 17:08:45 +00:00
return h;
}
void ol_server_io(EV_P_ ev_io* watcher, int revents) {
ol_handle* handle = watcher->data;
assert(revents == EV_READ);
while (1) {
struct sockaddr addr;
socklen_t addrlen;
int fd = accept(handle->_.fd, &addr, &addrlen);
if (fd < 0) {
if (errno == EAGAIN) {
2011-03-29 23:40:27 +00:00
return; /* No problem. */
2011-03-29 17:08:45 +00:00
} else if (errno == EMFILE) {
2011-03-29 23:40:27 +00:00
/* TODO special trick. unlock reserved socket, accept, close. */
2011-03-29 17:08:45 +00:00
return;
} else {
ol_close_error(handle, ol_err_new(handle, errno));
}
2011-03-30 02:53:29 +00:00
2011-03-29 17:08:45 +00:00
} else {
2011-03-30 02:53:29 +00:00
if (!handle->accept_cb) {
close(fd);
} else {
2011-03-29 17:08:45 +00:00
ol_handle* new_client = ol_tcp_open(handle, fd);
if (!new_client) {
ol_close_error(handle, ol_err_last(handle));
return;
}
handle->accept_cb(handle, new_client);
}
}
}
2011-03-28 08:55:29 +00:00
}
2011-03-23 12:56:06 +00:00
2011-03-27 21:50:51 +00:00
2011-03-28 08:55:29 +00:00
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) {
2011-03-29 17:08:45 +00:00
if (handle->_.fd < 0) {
/* Lazily allocate a file descriptor for this handle */
handle->_.fd = socket(AF_INET, SOCK_STREAM, 0);
}
2011-03-28 08:55:29 +00:00
int r = listen(handle->_.fd, backlog);
2011-03-29 17:08:45 +00:00
if (r < 0) {
return ol_err_new(handle, errno);
}
2011-03-28 08:55:29 +00:00
handle->accept_cb = cb;
2011-03-29 17:08:45 +00:00
ev_io_init(&handle->_.read_watcher, ol_server_io, handle->_.fd, EV_READ);
ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
handle->_.read_watcher.data = handle;
return 0;
2011-03-23 12:56:06 +00:00
}
2011-03-28 10:54:18 +00:00
int ol_close_error(ol_handle* handle, ol_err err) {
2011-03-29 17:08:45 +00:00
ev_io_stop(EV_DEFAULT_ &handle->_.read_watcher);
2011-03-28 10:17:52 +00:00
close(handle->_.fd);
handle->_.fd = -1;
2011-03-23 23:31:29 +00:00
2011-03-28 08:55:29 +00:00
if (handle->close_cb) {
2011-03-28 10:17:52 +00:00
handle->close_cb(handle, err);
2011-03-24 03:40:55 +00:00
}
2011-03-28 10:54:18 +00:00
return err;
2011-03-23 23:31:29 +00:00
}
2011-03-28 08:55:29 +00:00
void ol_tcp_io(EV_P_ ev_io* watcher, int revents) {
2011-03-28 10:17:52 +00:00
ol_handle* handle = watcher->data;
2011-03-23 12:56:06 +00:00
2011-03-28 10:17:52 +00:00
if (handle->_.connect_req) {
ol_tcp_connect(handle, handle->_.connect_req);
2011-03-28 08:55:29 +00:00
} else {
2011-03-23 12:56:06 +00:00
}
2011-03-28 08:55:29 +00:00
assert(handle->_.fd >= 0);
2011-03-24 03:40:55 +00:00
}
2011-03-28 08:55:29 +00:00
/**
* We get called here from directly following a call to connect(2).
* In order to determine if we've errored out or succeeded must call
* getsockopt.
*/
2011-03-28 10:17:52 +00:00
void ol_tcp_connect(ol_handle* handle, ol_req* req) {
2011-03-28 08:55:29 +00:00
assert(handle->_.fd >= 0);
assert(req);
assert(req->type == OL_CONNECT);
2011-03-24 08:50:23 +00:00
2011-03-28 08:55:29 +00:00
int error;
2011-03-28 10:17:52 +00:00
int errorsize = sizeof(int);
getsockopt(handle->_.fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
2011-03-24 08:50:23 +00:00
2011-03-28 08:55:29 +00:00
if (!error) {
2011-03-28 10:17:52 +00:00
ev_io_init(&handle->_.write_watcher, ol_tcp_io, handle->_.fd, EV_WRITE);
ev_set_cb(&handle->_.read_watcher, ol_tcp_io);
2011-03-24 08:50:23 +00:00
2011-03-28 08:55:29 +00:00
/* Successful connection */
2011-03-29 23:40:27 +00:00
ol_connect_cb connect_cb = req->cb;
2011-03-28 08:55:29 +00:00
if (connect_cb) {
if (req->_.local) {
2011-03-29 17:08:45 +00:00
connect_cb(NULL, ol_err_new(handle, 0));
2011-03-24 08:50:23 +00:00
} else {
2011-03-29 17:08:45 +00:00
connect_cb(req, ol_err_new(handle, 0));
2011-03-24 08:50:23 +00:00
}
}
2011-03-28 08:55:29 +00:00
/* Free up connect_req if we own it. */
if (req->_.local) {
free(req);
}
2011-03-23 12:56:06 +00:00
2011-03-28 08:55:29 +00:00
req = NULL;
2011-03-23 12:56:06 +00:00
2011-03-28 08:55:29 +00:00
} else if (error == EINPROGRESS) {
/* Still connecting. */
return;
2011-03-23 12:56:06 +00:00
2011-03-23 23:31:29 +00:00
} else {
2011-03-29 17:08:45 +00:00
ol_err err = ol_err_new(handle, error);
2011-03-28 10:17:52 +00:00
if (req->_.connect_cb) {
2011-03-29 17:08:45 +00:00
req->_.connect_cb(req, err);
2011-03-24 03:40:55 +00:00
}
2011-03-24 08:50:23 +00:00
2011-03-29 17:08:45 +00:00
ol_close_error(handle, err);
2011-03-24 03:40:55 +00:00
}
2011-03-23 23:31:29 +00:00
}
2011-03-23 12:56:06 +00:00
2011-03-28 10:17:52 +00:00
ol_req* ol_req_maybe_alloc(ol_handle* handle, ol_req* in_req) {
if (in_req) {
in_req->handle = handle;
in_req->_.local = 0;
return in_req;
} else {
ol_req *req = calloc(sizeof(ol_req), 1);
req->handle = handle;
req->_.local = 1;
return req;
}
}
2011-03-24 08:50:23 +00:00
2011-03-28 10:17:52 +00:00
int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) {
2011-03-28 08:55:29 +00:00
if (handle->_.connect_req) {
2011-03-29 17:08:45 +00:00
return ol_err_new(handle, EALREADY);
2011-03-24 08:50:23 +00:00
}
2011-03-28 08:55:29 +00:00
if (handle->type != OL_TCP) {
2011-03-29 17:08:45 +00:00
return ol_err_new(handle, ENOTSOCK);
2011-03-24 08:50:23 +00:00
}
2011-03-28 08:55:29 +00:00
ol_req *req = ol_req_maybe_alloc(handle, req_in);
if (!req) {
2011-03-29 17:08:45 +00:00
return ol_err_new(handle, ENOMEM);
2011-03-24 08:50:23 +00:00
}
2011-03-28 08:55:29 +00:00
handle->_.connect_req = req;
2011-03-24 08:50:23 +00:00
2011-03-28 08:55:29 +00:00
int addrsize;
2011-03-24 08:50:23 +00:00
2011-03-28 08:55:29 +00:00
if (addr->sa_family == AF_INET) {
2011-03-28 10:17:52 +00:00
addrsize = sizeof(struct sockaddr_in);
2011-03-28 08:55:29 +00:00
handle->_.fd = socket(AF_INET, SOCK_STREAM, 0);
} else if (addr->sa_family == AF_INET6) {
2011-03-28 10:17:52 +00:00
addrsize = sizeof(struct sockaddr_in6);
2011-03-28 08:55:29 +00:00
handle->_.fd = socket(AF_INET6, SOCK_STREAM, 0);
} else {
assert(0);
2011-03-28 10:17:52 +00:00
return -1;
2011-03-27 21:50:51 +00:00
}
2011-03-24 08:50:23 +00:00
2011-03-28 08:55:29 +00:00
/* socket(2) failed */
if (handle->_.fd < 0) {
2011-03-29 17:08:45 +00:00
return ol_err_new(handle, errno);
2011-03-28 08:55:29 +00:00
}
2011-03-24 08:50:23 +00:00
2011-03-28 08:55:29 +00:00
int r = connect(handle->_.fd, addr, addrsize);
2011-03-27 21:50:51 +00:00
2011-03-28 10:17:52 +00:00
ev_io_init(&handle->_.read_watcher, ol_tcp_io, handle->_.fd, EV_READ);
ev_io_init(&handle->_.write_watcher, ol_tcp_io, handle->_.fd, EV_WRITE);
2011-03-29 17:08:45 +00:00
ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
2011-03-27 21:50:51 +00:00
2011-03-29 17:08:45 +00:00
return ol_err_new(handle, r);
2011-03-27 21:50:51 +00:00
}
2011-03-28 10:54:18 +00:00
int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) {
2011-03-30 02:53:29 +00:00
ssize_t r;
r = writev(handle->_.fd, (struct iovec*)bufs, bufcnt);
if (r < 0) {
return ol_err_new(handle, r);
} else {
return 0;
}
2011-03-28 10:54:18 +00:00
}
int ol_write2(ol_handle* handle, const char* msg) {
size_t len = strnlen(msg, 1024 * 1024);
ol_buf b;
b.base = (char*)msg;
b.len = len;
return ol_write(handle, NULL, &b, 1);
}
2011-03-29 17:08:45 +00:00
void ol_req_append(ol_handle* handle, ol_req *req) {
2011-03-29 23:40:27 +00:00
ngx_queue_insert_tail(&handle->_.read_reqs, &req->_.read_reqs);
2011-03-29 17:08:45 +00:00
}
int ol_read(ol_handle* handle, ol_req *req_in, ol_buf* bufs, int bufcnt) {
assert(handle->_.fd >= 0);
2011-03-29 23:40:27 +00:00
if (!ngx_queue_empty(&handle->_.read_reqs)) {
2011-03-29 17:08:45 +00:00
/* There are already pending read_reqs. We must get in line. */
2011-03-29 23:40:27 +00:00
assert(ev_is_active(&handle->_.read_watcher));
2011-03-29 17:08:45 +00:00
2011-03-29 23:40:27 +00:00
ol_req* req = ol_req_maybe_alloc(handle, req_in);
2011-03-29 17:08:45 +00:00
if (!req) {
return ol_err_new(handle, ENOMEM);
}
ol_req_append(handle, req);
return ol_err_new(handle, EINPROGRESS);
} else {
/* Attempt to read immediately */
2011-03-29 23:40:27 +00:00
ssize_t nread = readv(handle->_.fd, (struct iovec*) bufs, bufcnt);
2011-03-29 17:08:45 +00:00
2011-03-30 02:53:29 +00:00
ol_read_cb cb = req_in->cb;
2011-03-29 17:08:45 +00:00
2011-03-30 02:53:29 +00:00
if (nread < 0) {
if (errno == EAGAIN) {
ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
return 0;
} else {
ol_err err = ol_err_new(handle, errno);
2011-03-29 17:08:45 +00:00
2011-03-29 23:40:27 +00:00
if (cb) {
2011-03-30 02:53:29 +00:00
cb(req_in, nread, err);
2011-03-29 17:08:45 +00:00
}
2011-03-30 02:53:29 +00:00
return err;
}
2011-03-29 17:08:45 +00:00
2011-03-30 02:53:29 +00:00
} else {
if (cb) {
cb(req_in, nread, 0);
}
return 0;
2011-03-29 17:08:45 +00:00
}
}
2011-03-30 02:53:29 +00:00
assert(0 && "Unreachable");
2011-03-28 10:54:18 +00:00
return 0;
}
void ol_free(ol_handle* handle) {
free(handle);
2011-03-29 23:40:27 +00:00
/* lists? */
2011-03-28 10:54:18 +00:00
return;
}