Named pipes implementation for Windows

This commit is contained in:
Igor Zinkovsky 2011-07-01 17:54:17 -07:00 committed by Bert Belder
parent 5aa8c005ec
commit b6a6dae34f
12 changed files with 1155 additions and 165 deletions

View File

@ -73,6 +73,11 @@ typedef struct {
ev_io write_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue;
/* UV_NAMED_PIPE */
#define UV_PIPE_PRIVATE_TYPEDEF
#define UV_PIPE_PRIVATE_FIELDS
/* UV_PREPARE */ \

View File

@ -31,6 +31,8 @@
#include "tree.h"
#define MAX_PIPENAME_LEN 256
/**
* It should be possible to cast uv_buf_t[] to WSABUF[]
* see http://msdn.microsoft.com/en-us/library/ms741542(v=vs.85).aspx
@ -40,6 +42,17 @@ typedef struct uv_buf_t {
char* base;
} uv_buf_t;
/*
* Private uv_pipe_instance state.
*/
typedef enum {
UV_PIPEINSTANCE_DISCONNECTED = 0,
UV_PIPEINSTANCE_PENDING,
UV_PIPEINSTANCE_WAITING,
UV_PIPEINSTANCE_ACCEPTED,
UV_PIPEINSTANCE_ACTIVE
} uv_pipeinstance_state;
#define UV_REQ_PRIVATE_FIELDS \
union { \
/* Used by I/O operations */ \
@ -51,31 +64,56 @@ typedef struct uv_buf_t {
int flags; \
uv_err_t error; \
struct uv_req_s* next_req;
#define UV_STREAM_PRIVATE_FIELDS \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
struct uv_req_s read_req; \
#define uv_tcp_connection_fields \
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
uv_req_t* shutdown_req;
#define uv_tcp_server_fields \
uv_connection_cb connection_cb; \
SOCKET accept_socket; \
struct uv_req_s accept_req; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
#define uv_stream_server_fields \
uv_connection_cb connection_cb;
#define UV_STREAM_PRIVATE_FIELDS \
unsigned int reqs_pending; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
struct uv_req_s read_req; \
union { \
struct { uv_stream_connection_fields }; \
struct { uv_stream_server_fields }; \
};
#define UV_TCP_PRIVATE_FIELDS \
unsigned int reqs_pending; \
union { \
SOCKET socket; \
HANDLE handle; \
}; \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
struct uv_req_s accept_req;
#define uv_pipe_server_fields \
char* name; \
int connectionCount; \
uv_pipe_instance_t* connections; \
uv_pipe_instance_t* acceptConnection; \
uv_pipe_instance_t connectionsBuffer[4];
#define uv_pipe_connection_fields \
uv_pipe_t* server; \
uv_pipe_instance_t* connection; \
uv_pipe_instance_t clientConnection;
#define UV_PIPE_PRIVATE_TYPEDEF \
typedef struct uv_pipe_instance_s { \
HANDLE handle; \
uv_pipeinstance_state state; \
uv_req_t accept_req; \
} uv_pipe_instance_t;
#define UV_PIPE_PRIVATE_FIELDS \
union { \
struct { uv_tcp_connection_fields }; \
struct { uv_tcp_server_fields }; \
struct { uv_pipe_server_fields }; \
struct { uv_pipe_connection_fields }; \
};
#define UV_TIMER_PRIVATE_FIELDS \

View File

@ -43,6 +43,7 @@ typedef struct uv_err_s uv_err_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
@ -124,7 +125,8 @@ typedef enum {
UV_EAIFAMNOSUPPORT,
UV_EAINONAME,
UV_EAISERVICE,
UV_EAISOCKTYPE
UV_EAISOCKTYPE,
UV_ESHUTDOWN
} uv_err_code;
typedef enum {
@ -287,6 +289,26 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6);
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
/*
* A subclass of uv_stream_t representing a pipe stream or pipe server.
*/
UV_PIPE_PRIVATE_TYPEDEF
struct uv_pipe_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
UV_PIPE_PRIVATE_FIELDS
};
int uv_pipe_init(uv_pipe_t* handle);
int uv_pipe_create(uv_pipe_t* handle, char* name);
int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb);
int uv_pipe_connect(uv_req_t* req, char* name);
/*
* Subclass of uv_handle_t. libev wrapper. Every active prepare handle gets
* its callback called exactly once per loop iteration, just before the
@ -478,7 +500,9 @@ union uv_any_handle {
typedef struct {
uint64_t req_init;
uint64_t handle_init;
uint64_t stream_init;
uint64_t tcp_init;
uint64_t pipe_init;
uint64_t prepare_init;
uint64_t check_init;
uint64_t idle_init;

View File

@ -1594,3 +1594,22 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle,
return 0;
}
int uv_pipe_init(uv_pipe_t* handle) {
assert(0 && "implement me");
}
int uv_pipe_create(uv_pipe_t* handle, char* name) {
assert(0 && "implement me");
}
int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
assert(0 && "implement me");
}
int uv_pipe_connect(uv_req_t* req, char* name) {
assert(0 && "implement me");
}

View File

@ -164,6 +164,7 @@ static LPFN_TRANSMITFILE pTransmitFile6;
#define UV_HANDLE_ENDGAME_QUEUED 0x0400
#define UV_HANDLE_BIND_ERROR 0x1000
#define UV_HANDLE_IPV6 0x2000
#define UV_HANDLE_PIPESERVER 0x4000
/*
* Private uv_req flags.
@ -242,6 +243,8 @@ void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req);
void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req);
void uv_ares_poll(uv_timer_t* handle, int status);
static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err);
/* memory used per ares_channel */
struct uv_ares_channel_s {
ares_channel channel;
@ -373,6 +376,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
case ERROR_INVALID_FLAGS: return UV_EBADF;
case ERROR_INVALID_PARAMETER: return UV_EINVAL;
case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET;
case ERROR_BROKEN_PIPE: return UV_EOF;
default: return UV_UNKNOWN;
}
}
@ -527,6 +531,11 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) {
}
static uv_pipe_instance_t* uv_req_to_pipeinstance(uv_req_t* req) {
return CONTAINING_RECORD(req, uv_pipe_instance_t, accept_req);
}
static void uv_insert_pending_req(uv_req_t* req) {
req->next_req = NULL;
if (uv_pending_reqs_tail_) {
@ -594,24 +603,20 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
}
static void uv_tcp_init_connection(uv_tcp_t* handle) {
static void uv_init_connection(uv_stream_t* handle) {
handle->flags |= UV_HANDLE_CONNECTION;
handle->write_reqs_pending = 0;
uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL);
}
int uv_tcp_init(uv_tcp_t* handle) {
handle->socket = INVALID_SOCKET;
int uv_stream_init(uv_stream_t* handle) {
handle->write_queue_size = 0;
handle->type = UV_TCP;
handle->flags = 0;
handle->reqs_pending = 0;
handle->error = uv_ok_;
handle->accept_socket = INVALID_SOCKET;
uv_counters()->handle_init++;
uv_counters()->tcp_init++;
uv_counters()->stream_init++;
uv_refs_++;
@ -619,6 +624,20 @@ int uv_tcp_init(uv_tcp_t* handle) {
}
int uv_tcp_init(uv_tcp_t* handle) {
uv_stream_init((uv_stream_t*)handle);
handle->socket = INVALID_SOCKET;
handle->type = UV_TCP;
handle->reqs_pending = 0;
handle->accept_socket = INVALID_SOCKET;
uv_counters()->tcp_init++;
return 0;
}
static void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;
@ -658,6 +677,39 @@ static void uv_tcp_endgame(uv_tcp_t* handle) {
}
static void uv_pipe_endgame(uv_pipe_t* handle) {
uv_err_t err;
int status;
if (handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
handle->write_reqs_pending == 0) {
close_pipe(handle, &status, &err);
if (handle->shutdown_req->cb) {
handle->shutdown_req->flags &= ~UV_REQ_PENDING;
if (status == -1) {
uv_last_error_ = err;
}
((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status);
}
handle->reqs_pending--;
}
if (handle->flags & UV_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;
if (handle->close_cb) {
handle->close_cb((uv_handle_t*)handle);
}
uv_refs_--;
}
}
static void uv_timer_endgame(uv_timer_t* handle) {
if (handle->flags & UV_HANDLE_CLOSING) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
@ -714,6 +766,10 @@ static void uv_process_endgames() {
case UV_TCP:
uv_tcp_endgame((uv_tcp_t*)handle);
break;
case UV_NAMED_PIPE:
uv_pipe_endgame((uv_pipe_t*)handle);
break;
case UV_TIMER:
uv_timer_endgame((uv_timer_t*)handle);
@ -749,6 +805,7 @@ static void uv_want_endgame(uv_handle_t* handle) {
static int uv_close_error(uv_handle_t* handle, uv_err_t e) {
uv_tcp_t* tcp;
uv_pipe_t* pipe;
if (handle->flags & UV_HANDLE_CLOSING) {
return 0;
@ -773,6 +830,15 @@ static int uv_close_error(uv_handle_t* handle, uv_err_t e) {
uv_want_endgame(handle);
}
return 0;
case UV_NAMED_PIPE:
pipe = (uv_pipe_t*)handle;
pipe->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
close_pipe(pipe, NULL, NULL);
if (pipe->reqs_pending == 0) {
uv_want_endgame(handle);
}
return 0;
case UV_TIMER:
uv_timer_stop((uv_timer_t*)handle);
@ -877,7 +943,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
static void uv_queue_accept(uv_tcp_t* handle) {
static void uv_tcp_queue_accept(uv_tcp_t* handle) {
uv_req_t* req;
BOOL success;
DWORD bytes;
@ -908,6 +974,7 @@ static void uv_queue_accept(uv_tcp_t* handle) {
if (accept_socket == INVALID_SOCKET) {
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
handle->reqs_pending++;
return;
}
@ -927,6 +994,7 @@ static void uv_queue_accept(uv_tcp_t* handle) {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
return;
@ -939,7 +1007,48 @@ static void uv_queue_accept(uv_tcp_t* handle) {
}
static void uv_queue_read(uv_tcp_t* handle) {
static void uv_pipe_queue_accept(uv_pipe_t* handle) {
uv_req_t* req;
uv_pipe_instance_t* instance;
int i;
assert(handle->flags & UV_HANDLE_LISTENING);
/* This loop goes through every pipe instance and calls ConnectNamedPipe for every pending instance.
* TODO: Make this faster (we could maintain a linked list of pending instances).
*/
for (i = 0; i < handle->connectionCount; i++) {
instance = &handle->connections[i];
if (instance->state == UV_PIPEINSTANCE_PENDING) {
/* Prepare the uv_req structure. */
req = &instance->accept_req;
uv_req_init(req, (uv_handle_t*)handle, NULL);
assert(!(req->flags & UV_REQ_PENDING));
req->type = UV_ACCEPT;
req->flags |= UV_REQ_PENDING;
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (!ConnectNamedPipe(instance->handle, &req->overlapped) &&
GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(GetLastError());
uv_insert_pending_req(req);
handle->reqs_pending++;
continue;
}
instance->state = UV_PIPEINSTANCE_WAITING;
handle->reqs_pending++;
req->flags |= UV_REQ_PENDING;
}
}
}
static void uv_tcp_queue_read(uv_tcp_t* handle) {
uv_req_t* req;
uv_buf_t buf;
int result;
@ -967,6 +1076,40 @@ static void uv_queue_read(uv_tcp_t* handle) {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
handle->reqs_pending++;
return;
}
req->flags |= UV_REQ_PENDING;
handle->reqs_pending++;
}
static void uv_pipe_queue_read(uv_pipe_t* handle) {
uv_req_t* req;
int result;
assert(handle->flags & UV_HANDLE_READING);
assert(handle->connection);
assert(handle->connection->handle != INVALID_HANDLE_VALUE);
req = &handle->read_req;
assert(!(req->flags & UV_REQ_PENDING));
memset(&req->overlapped, 0, sizeof(req->overlapped));
req->type = UV_READ;
/* Do 0-read */
result = ReadFile(handle->connection->handle,
&uv_zero_,
0,
NULL,
&req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
handle->reqs_pending++;
return;
}
@ -999,40 +1142,76 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
handle->connection_cb = cb;
uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL);
uv_queue_accept(handle);
uv_tcp_queue_accept(handle);
return 0;
}
int uv_accept(uv_handle_t* server, uv_stream_t* client) {
static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
int rv = 0;
uv_tcp_t* tcpServer = (uv_tcp_t*)server;
uv_tcp_t* tcpClient = (uv_tcp_t*)client;
if (tcpServer->accept_socket == INVALID_SOCKET) {
if (server->accept_socket == INVALID_SOCKET) {
uv_set_sys_error(WSAENOTCONN);
return -1;
}
if (uv_tcp_set_socket(tcpClient, tcpServer->accept_socket) == -1) {
closesocket(tcpServer->accept_socket);
if (uv_tcp_set_socket(client, server->accept_socket) == -1) {
closesocket(server->accept_socket);
rv = -1;
} else {
uv_tcp_init_connection(tcpClient);
uv_init_connection((uv_stream_t*)client);
}
tcpServer->accept_socket = INVALID_SOCKET;
server->accept_socket = INVALID_SOCKET;
if (!(tcpServer->flags & UV_HANDLE_CLOSING)) {
uv_queue_accept(tcpServer);
if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_tcp_queue_accept(server);
}
return rv;
}
int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
assert(server->acceptConnection);
/* Make the connection instance active */
server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE;
/* Move the connection instance from server to client */
client->connection = server->acceptConnection;
server->acceptConnection = NULL;
/* Remember the server */
client->server = server;
uv_init_connection((uv_stream_t*)client);
client->flags |= UV_HANDLE_PIPESERVER;
uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL);
if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(server);
}
return 0;
}
int uv_accept(uv_handle_t* server, uv_stream_t* client) {
assert(client->type == server->type);
if (server->type == UV_TCP) {
return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client);
} else if (server->type == UV_NAMED_PIPE) {
return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client);
}
return -1;
}
static int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(WSAEINVAL);
return -1;
@ -1055,12 +1234,52 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb)
/* If reading was stopped and then started again, there could stell be a */
/* read request pending. */
if (!(handle->read_req.flags & UV_REQ_PENDING))
uv_queue_read((uv_tcp_t*)handle);
uv_tcp_queue_read(handle);
return 0;
}
static int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(UV_EINVAL);
return -1;
}
if (handle->flags & UV_HANDLE_READING) {
uv_set_sys_error(UV_EALREADY);
return -1;
}
if (handle->flags & UV_HANDLE_EOF) {
uv_set_sys_error(UV_EOF);
return -1;
}
handle->flags |= UV_HANDLE_READING;
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
/* If reading was stopped and then started again, there could stell be a */
/* read request pending. */
if (!(handle->read_req.flags & UV_REQ_PENDING))
uv_pipe_queue_read(handle);
return 0;
}
int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
if (handle->type == UV_TCP) {
return uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb);
} else if (handle->type == UV_NAMED_PIPE) {
return uv_pipe_read_start((uv_pipe_t*)handle, alloc_cb, read_cb);
}
return -1;
}
int uv_read_stop(uv_stream_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
@ -1175,7 +1394,7 @@ static size_t uv_count_bufs(uv_buf_t bufs[], int count) {
}
int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
int result;
DWORD bytes, err;
uv_tcp_t* handle = (uv_tcp_t*) req->handle;
@ -1228,6 +1447,72 @@ int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
}
int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
int result;
uv_pipe_t* handle = (uv_pipe_t*) req->handle;
assert(!(req->flags & UV_REQ_PENDING));
if (bufcnt != 1) {
uv_set_sys_error(UV_ENOTSUP);
return -1;
}
assert(handle->connection);
assert(handle->connection->handle != INVALID_HANDLE_VALUE);
if (!(req->handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(UV_EINVAL);
return -1;
}
if (req->handle->flags & UV_HANDLE_SHUTTING) {
uv_set_sys_error(UV_EOF);
return -1;
}
memset(&req->overlapped, 0, sizeof(req->overlapped));
req->type = UV_WRITE;
result = WriteFile(handle->connection->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);
if (!result && GetLastError() != WSA_IO_PENDING) {
uv_set_sys_error(GetLastError());
return -1;
}
if (result) {
/* Request completed immediately. */
req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
}
req->flags |= UV_REQ_PENDING;
handle->reqs_pending++;
handle->write_reqs_pending++;
return 0;
}
int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
if (req->handle->type == UV_TCP) {
return uv_tcp_write(req, bufs, bufcnt);
} else if (req->handle->type == UV_NAMED_PIPE) {
return uv_pipe_write(req, bufs, bufcnt);
}
return -1;
}
int uv_shutdown(uv_req_t* req) {
uv_tcp_t* handle = (uv_tcp_t*) req->handle;
int status = 0;
@ -1246,7 +1531,7 @@ int uv_shutdown(uv_req_t* req) {
req->flags |= UV_REQ_PENDING;
handle->flags |= UV_HANDLE_SHUTTING;
handle->shutdown_req = req;
handle->shutdown_req = req;
handle->reqs_pending++;
uv_want_endgame((uv_handle_t*)handle);
@ -1338,7 +1623,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
}
/* Post another 0-read if still reading and not closing. */
if (handle->flags & UV_HANDLE_READING) {
uv_queue_read(handle);
uv_tcp_queue_read(handle);
}
break;
@ -1375,7 +1660,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
/* uv_queue_accept will detect it. */
closesocket(handle->accept_socket);
if (handle->flags & UV_HANDLE_LISTENING) {
uv_queue_accept(handle);
uv_tcp_queue_accept(handle);
}
}
break;
@ -1388,7 +1673,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
uv_tcp_init_connection(handle);
uv_init_connection((uv_stream_t*)handle);
((uv_connect_cb)req->cb)(req, 0);
} else {
uv_set_sys_error(WSAGetLastError());
@ -1417,6 +1702,161 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
}
static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) {
DWORD bytes, err, mode;
uv_buf_t buf;
uv_pipe_instance_t* acceptingConn;
assert(handle->type == UV_NAMED_PIPE);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
switch (req->type) {
case UV_WRITE:
handle->write_queue_size -= req->queued_bytes;
if (req->cb) {
uv_last_error_ = req->error;
((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1);
}
handle->write_reqs_pending--;
if (handle->write_reqs_pending == 0 &&
handle->flags & UV_HANDLE_SHUTTING) {
uv_want_endgame((uv_handle_t*)handle);
}
break;
case UV_READ:
if (req->error.code != UV_OK) {
/* An error occurred doing the 0-read. */
if (!(handle->flags & UV_HANDLE_READING)) {
break;
}
/* Stop reading and report error. */
handle->flags &= ~UV_HANDLE_READING;
uv_last_error_ = req->error;
buf.base = 0;
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
/* Temporarily switch to non-blocking mode.
* This is so that ReadFile doesn't block if the read buffer is empty.
*/
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT;
if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
/* We can't continue processing this read. */
err = GetLastError();
uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
/* Do non-blocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
assert(buf.len > 0);
if (ReadFile(handle->connection->handle,
buf.base,
buf.len,
&bytes,
NULL)) {
if (bytes > 0) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle, bytes, buf);
/* Read again only if bytes == buf.len */
if (bytes < buf.len) {
break;
}
} else {
/* Connection closed */
handle->flags &= ~UV_HANDLE_READING;
handle->flags |= UV_HANDLE_EOF;
uv_last_error_.code = UV_EOF;
uv_last_error_.sys_errno_ = ERROR_SUCCESS;
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
} else {
err = GetLastError();
if (err == ERROR_NO_DATA) {
/* Read buffer was completely empty, report a 0-byte read. */
uv_set_sys_error(UV_EAGAIN);
handle->read_cb((uv_stream_t*)handle, 0, buf);
} else {
/* Ouch! serious error. */
uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
break;
}
}
if (handle->flags & UV_HANDLE_READING) {
/* Switch back to blocking mode so that we can use IOCP for 0-reads */
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
/* Report and continue. */
err = GetLastError();
uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
/* Post another 0-read if still reading and not closing. */
uv_pipe_queue_read(handle);
}
break;
case UV_ACCEPT:
if (req->error.code == UV_OK) {
/* Put the connection instance into accept state */
handle->acceptConnection = uv_req_to_pipeinstance(req);
handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED;
if (handle->connection_cb) {
handle->connection_cb((uv_handle_t*)handle, 0);
}
} else {
/* Ignore errors and continue listening */
if (handle->flags & UV_HANDLE_LISTENING) {
uv_pipe_queue_accept(handle);
}
}
break;
case UV_CONNECT:
if (req->cb) {
if (req->error.code == UV_OK) {
uv_init_connection((uv_stream_t*)handle);
((uv_connect_cb)req->cb)(req, 0);
} else {
uv_last_error_ = req->error;
((uv_connect_cb)req->cb)(req, -1);
}
}
break;
default:
assert(0);
}
/* The number of pending requests is now down by one */
handle->reqs_pending--;
/* Queue the handle's close callback if it is closing and there are no */
/* more pending requests. */
if (handle->flags & UV_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
uv_want_endgame((uv_handle_t*)handle);
}
}
static int uv_timer_compare(uv_timer_t* a, uv_timer_t* b) {
if (a->due < b->due)
return -1;
@ -1714,6 +2154,10 @@ static void uv_process_reqs() {
case UV_TCP:
uv_tcp_return_req((uv_tcp_t*)handle, req);
break;
case UV_NAMED_PIPE:
uv_pipe_return_req((uv_pipe_t*)handle, req);
break;
case UV_ASYNC:
uv_async_return_req((uv_async_t*)handle, req);
@ -2460,3 +2904,253 @@ error:
return -1;
}
int uv_pipe_init(uv_pipe_t* handle) {
uv_stream_init((uv_stream_t*)handle);
handle->type = UV_NAMED_PIPE;
handle->reqs_pending = 0;
uv_counters()->pipe_init++;
return 0;
}
/* Creates a pipe server. */
/* TODO: make this work with UTF8 name */
int uv_pipe_create(uv_pipe_t* handle, char* name) {
if (!name) {
return -1;
}
handle->connections = NULL;
handle->acceptConnection = NULL;
handle->connectionCount = 0;
/* Make our own copy of the pipe name */
handle->name = (char*)malloc(MAX_PIPENAME_LEN);
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
strcpy(handle->name, name);
handle->name[255] = '\0';
handle->flags |= UV_HANDLE_PIPESERVER;
return 0;
}
/* Starts listening for connections for the given pipe. */
int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
int i, maxInstances, errno;
HANDLE pipeHandle;
uv_pipe_instance_t* pipeInstance;
if (handle->flags & UV_HANDLE_LISTENING ||
handle->flags & UV_HANDLE_READING) {
uv_set_sys_error(UV_EALREADY);
return -1;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
uv_set_sys_error(UV_ENOTSUP);
return -1;
}
if (instanceCount <= sizeof(handle->connectionsBuffer)) {
/* Use preallocated connections buffer */
handle->connections = handle->connectionsBuffer;
} else {
handle->connections = (uv_pipe_instance_t*)malloc(instanceCount * sizeof(uv_pipe_instance_t));
if (!handle->connections) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
}
maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount;
for (i = 0; i < instanceCount; i++) {
pipeHandle = CreateNamedPipe(handle->name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
maxInstances,
65536,
65536,
0,
NULL);
if (pipeHandle == INVALID_HANDLE_VALUE) {
errno = GetLastError();
goto error;
}
if (CreateIoCompletionPort(pipeHandle,
uv_iocp_,
(ULONG_PTR)handle,
0) == NULL) {
errno = GetLastError();
goto error;
}
pipeInstance = &handle->connections[i];
pipeInstance->handle = pipeHandle;
pipeInstance->state = UV_PIPEINSTANCE_PENDING;
}
/* We don't need the pipe name anymore. */
free(handle->name);
handle->name = NULL;
handle->connectionCount = instanceCount;
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
uv_pipe_queue_accept(handle);
return 0;
error:
close_pipe(handle, NULL, NULL);
uv_set_sys_error(errno);
return -1;
}
/* TODO: make this work with UTF8 name */
int uv_pipe_connect(uv_req_t* req, char* name) {
int errno;
DWORD mode;
uv_pipe_t* handle = (uv_pipe_t*)req->handle;
assert(!(req->flags & UV_REQ_PENDING));
req->type = UV_CONNECT;
handle->connection = &handle->clientConnection;
handle->server = NULL;
memset(&req->overlapped, 0, sizeof(req->overlapped));
handle->clientConnection.handle = CreateFile(name,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (handle->clientConnection.handle == INVALID_HANDLE_VALUE &&
GetLastError() != ERROR_IO_PENDING) {
errno = GetLastError();
goto error;
}
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) {
errno = GetLastError();
goto error;
}
if (CreateIoCompletionPort(handle->clientConnection.handle,
uv_iocp_,
(ULONG_PTR)handle,
0) == NULL) {
errno = GetLastError();
goto error;
}
req->error = uv_ok_;
req->flags |= UV_REQ_PENDING;
handle->connection->state = UV_PIPEINSTANCE_ACTIVE;
uv_insert_pending_req(req);
handle->reqs_pending++;
return 0;
error:
close_pipe(handle, NULL, NULL);
req->error = uv_new_sys_error(errno);
uv_insert_pending_req(req);
handle->reqs_pending++;
return 0;
}
/* Cleans up uv_pipe_t (server or connection) and all resources associated with it */
static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
uv_pipe_instance_t* connection;
int i;
if (handle->flags & UV_HANDLE_PIPESERVER) {
if (handle->flags & UV_HANDLE_CONNECTION) {
/*
* The handle is for a connection instance on the pipe server.
* To clean-up, we call DisconnectNamedPipe, and return the instance to pending state,
* which will be ready to accept another pipe connection in uv_pipe_queue_accept.
*/
connection = handle->connection;
if (connection && connection->state != UV_PIPEINSTANCE_PENDING && connection->handle != INVALID_HANDLE_VALUE) {
/* Disconnect the connection intance and return it to pending state */
if (DisconnectNamedPipe(connection->handle)) {
connection->state = UV_PIPEINSTANCE_PENDING;
handle->connection = NULL;
if (status) *status = 0;
} else {
if (status) *status = -1;
if (err) *err = uv_new_sys_error(GetLastError());
}
/* Queue accept now that the instance is in pending state. */
if (!(handle->server->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(handle->server);
}
}
} else {
/*
* The handle is for the pipe server.
* To clean-up we close every connection instance that was made in uv_pipe_listen.
*/
if (handle->name) {
free(handle->name);
handle->name = NULL;
}
if (handle->connections) {
/* Go through the list of connections, and close each one with CloseHandle. */
for (i = 0; i < handle->connectionCount; i++) {
connection = &handle->connections[i];
if (connection->state != UV_PIPEINSTANCE_DISCONNECTED && connection->handle != INVALID_HANDLE_VALUE) {
CloseHandle(connection->handle);
connection->state = UV_PIPEINSTANCE_DISCONNECTED;
connection->handle = INVALID_HANDLE_VALUE;
}
}
/* Free the connections buffer. */
if (handle->connections != handle->connectionsBuffer) {
free(handle->connections);
}
handle->connections = NULL;
}
if (status) *status = 0;
}
} else {
/*
* The handle is for a connection instance on the pipe client.
* To clean-up
*/
connection = handle->connection;
if (connection && connection->handle != INVALID_HANDLE_VALUE) {
if (CloseHandle(connection->handle)) {
connection->state = UV_PIPEINSTANCE_DISCONNECTED;
handle->connection = NULL;
if (status) *status = 0;
} else {
if (status) *status = -1;
if (err) *err = uv_new_sys_error(GetLastError());
}
}
}
handle->flags |= UV_HANDLE_SHUT;
}

View File

@ -21,25 +21,35 @@
BENCHMARK_DECLARE (sizes)
BENCHMARK_DECLARE (ping_pongs)
BENCHMARK_DECLARE (pump100_client)
BENCHMARK_DECLARE (pump1_client)
BENCHMARK_DECLARE (tcp_pump100_client)
BENCHMARK_DECLARE (tcp_pump1_client)
BENCHMARK_DECLARE (pipe_pump100_client)
BENCHMARK_DECLARE (pipe_pump1_client)
BENCHMARK_DECLARE (gethostbyname)
BENCHMARK_DECLARE (getaddrinfo)
HELPER_DECLARE (pump_server)
HELPER_DECLARE (echo_server)
HELPER_DECLARE (tcp_pump_server)
HELPER_DECLARE (pipe_pump_server)
HELPER_DECLARE (tcp4_echo_server)
HELPER_DECLARE (pipe_echo_server)
HELPER_DECLARE (dns_server)
TASK_LIST_START
BENCHMARK_ENTRY (sizes)
BENCHMARK_ENTRY (ping_pongs)
BENCHMARK_HELPER (ping_pongs, echo_server)
BENCHMARK_HELPER (ping_pongs, tcp4_echo_server)
BENCHMARK_ENTRY (pump100_client)
BENCHMARK_HELPER (pump100_client, pump_server)
BENCHMARK_ENTRY (tcp_pump100_client)
BENCHMARK_HELPER (tcp_pump100_client, tcp_pump_server)
BENCHMARK_ENTRY (pump1_client)
BENCHMARK_HELPER (pump1_client, pump_server)
BENCHMARK_ENTRY (tcp_pump1_client)
BENCHMARK_HELPER (tcp_pump1_client, tcp_pump_server)
BENCHMARK_ENTRY (pipe_pump100_client)
BENCHMARK_HELPER (pipe_pump100_client, pipe_pump_server)
BENCHMARK_ENTRY (pipe_pump1_client)
BENCHMARK_HELPER (pipe_pump1_client, pipe_pump_server)
BENCHMARK_ENTRY (gethostbyname)
BENCHMARK_HELPER (gethostbyname, dns_server)

View File

@ -45,7 +45,9 @@ static uv_buf_t buf_alloc(uv_stream_t*, size_t size);
static void buf_free(uv_buf_t uv_buf_t);
static uv_tcp_t server;
static uv_tcp_t tcpServer;
static uv_pipe_t pipeServer;
static uv_handle_t* server;
static struct sockaddr_in listen_addr;
static struct sockaddr_in connect_addr;
@ -68,7 +70,10 @@ static char write_buffer[WRITE_BUFFER_SIZE];
/* Make this as large as you need. */
#define MAX_WRITE_HANDLES 1000
static uv_tcp_t write_handles[MAX_WRITE_HANDLES];
static stream_type type;
static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
static uv_timer_t timer_handle;
@ -81,6 +86,7 @@ static double gbit(int64_t bytes, int64_t passed_ms) {
static void show_stats(uv_timer_t* handle, int status) {
int64_t diff;
int i;
#if PRINT_STATS
LOGF("connections: %d, write: %.1f gbit/s\n",
@ -94,9 +100,13 @@ static void show_stats(uv_timer_t* handle, int status) {
uv_update_time();
diff = uv_now() - start_time;
LOGF("pump%d_client: %.1f gbit/s\n", write_sockets,
LOGF("%s_pump%d_client: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", write_sockets,
gbit(nsent_total, diff));
for (i = 0; i < write_sockets; i++) {
uv_close(type == TCP ? (uv_handle_t*)&tcp_write_handles[i] : (uv_handle_t*)&pipe_write_handles[i], NULL);
}
exit(0);
}
@ -112,7 +122,7 @@ static void read_show_stats() {
uv_update_time();
diff = uv_now() - start_time;
LOGF("pump%d_server: %.1f gbit/s\n", max_read_sockets,
LOGF("%s_pump%d_server: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", max_read_sockets,
gbit(nrecv_total, diff));
}
@ -133,7 +143,7 @@ void read_sockets_close_cb(uv_handle_t* handle) {
*/
if (uv_now() - start_time > 1000 && read_sockets == 0) {
read_show_stats();
uv_close((uv_handle_t*)&server, NULL);
uv_close(server, NULL);
}
}
@ -154,7 +164,7 @@ static void start_stats_collection() {
}
static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) {
static void read_cb(uv_stream_t* stream, ssize_t bytes, uv_buf_t buf) {
if (nrecv_total == 0) {
ASSERT(start_time == 0);
uv_update_time();
@ -162,7 +172,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) {
}
if (bytes < 0) {
uv_close((uv_handle_t*)tcp, read_sockets_close_cb);
uv_close((uv_handle_t*)stream, read_sockets_close_cb);
return;
}
@ -187,7 +197,7 @@ static void write_cb(uv_req_t *req, int status) {
}
static void do_write(uv_stream_t* tcp) {
static void do_write(uv_stream_t* stream) {
uv_req_t* req;
uv_buf_t buf;
int r;
@ -195,9 +205,9 @@ static void do_write(uv_stream_t* tcp) {
buf.base = (char*) &write_buffer;
buf.len = sizeof write_buffer;
while (tcp->write_queue_size == 0) {
while (stream->write_queue_size == 0) {
req = req_alloc();
uv_req_init(req, (uv_handle_t*)tcp, write_cb);
uv_req_init(req, (uv_handle_t*)stream, write_cb);
r = uv_write(req, &buf, 1);
ASSERT(r == 0);
@ -221,7 +231,7 @@ static void connect_cb(uv_req_t* req, int status) {
/* Yay! start writing */
for (i = 0; i < write_sockets; i++) {
do_write((uv_stream_t*)&write_handles[i]);
do_write(type == TCP ? (uv_stream_t*)&tcp_write_handles[i] : (uv_stream_t*)&pipe_write_handles[i]);
}
}
}
@ -230,38 +240,55 @@ static void connect_cb(uv_req_t* req, int status) {
static void maybe_connect_some() {
uv_req_t* req;
uv_tcp_t* tcp;
uv_pipe_t* pipe;
int r;
while (max_connect_socket < TARGET_CONNECTIONS &&
max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
tcp = &write_handles[max_connect_socket++];
if (type == TCP) {
tcp = &tcp_write_handles[max_connect_socket++];
r = uv_tcp_init(tcp);
ASSERT(r == 0);
r = uv_tcp_init(tcp);
ASSERT(r == 0);
req = req_alloc();
uv_req_init(req, (uv_handle_t*)tcp, connect_cb);
r = uv_tcp_connect(req, connect_addr);
ASSERT(r == 0);
req = req_alloc();
uv_req_init(req, (uv_handle_t*)tcp, connect_cb);
r = uv_tcp_connect(req, connect_addr);
ASSERT(r == 0);
} else {
pipe = &pipe_write_handles[max_connect_socket++];
r = uv_pipe_init(pipe);
ASSERT(r == 0);
req = req_alloc();
uv_req_init(req, (uv_handle_t*)pipe, connect_cb);
r = uv_pipe_connect(req, TEST_PIPENAME);
ASSERT(r == 0);
}
}
}
static void connection_cb(uv_handle_t* s, int status) {
uv_tcp_t* tcp;
uv_stream_t* stream;
int r;
ASSERT(&server == (uv_tcp_t*)s);
ASSERT(server == s);
ASSERT(status == 0);
tcp = malloc(sizeof(uv_tcp_t));
if (type == TCP) {
stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init((uv_tcp_t*)stream);
} else {
stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
uv_pipe_init((uv_pipe_t*)stream);
}
uv_tcp_init(tcp);
r = uv_accept(s, (uv_stream_t*)tcp);
r = uv_accept(s, stream);
ASSERT(r == 0);
r = uv_read_start((uv_stream_t*)tcp, buf_alloc, read_cb);
r = uv_read_start(stream, buf_alloc, read_cb);
ASSERT(r == 0);
read_sockets++;
@ -317,7 +344,7 @@ typedef struct buf_list_s {
static buf_list_t* buf_freelist = NULL;
static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) {
static uv_buf_t buf_alloc(uv_stream_t* stream, size_t size) {
buf_list_t* buf;
buf = buf_freelist;
@ -342,18 +369,20 @@ static void buf_free(uv_buf_t uv_buf_t) {
}
HELPER_IMPL(pump_server) {
HELPER_IMPL(tcp_pump_server) {
int r;
type = TCP;
uv_init();
listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT);
/* Server */
r = uv_tcp_init(&server);
server = (uv_handle_t*)&tcpServer;
r = uv_tcp_init(&tcpServer);
ASSERT(r == 0);
r = uv_tcp_bind(&server, listen_addr);
r = uv_tcp_bind(&tcpServer, listen_addr);
ASSERT(r == 0);
r = uv_tcp_listen(&server, MAX_WRITE_HANDLES, connection_cb);
r = uv_tcp_listen(&tcpServer, MAX_WRITE_HANDLES, connection_cb);
ASSERT(r == 0);
uv_run();
@ -362,9 +391,31 @@ HELPER_IMPL(pump_server) {
}
void pump(int n) {
HELPER_IMPL(pipe_pump_server) {
int r;
type = PIPE;
uv_init();
/* Server */
server = (uv_handle_t*)&pipeServer;
r = uv_pipe_init(&pipeServer);
ASSERT(r == 0);
r = uv_pipe_create(&pipeServer, TEST_PIPENAME);
ASSERT(r == 0);
r = uv_pipe_listen(&pipeServer, MAX_WRITE_HANDLES, connection_cb);
ASSERT(r == 0);
uv_run();
return 0;
}
void tcp_pump(int n) {
ASSERT(n <= MAX_WRITE_HANDLES);
TARGET_CONNECTIONS = n;
type = TCP;
uv_init();
@ -377,13 +428,39 @@ void pump(int n) {
}
BENCHMARK_IMPL(pump100_client) {
pump(100);
void pipe_pump(int n) {
ASSERT(n <= MAX_WRITE_HANDLES);
TARGET_CONNECTIONS = n;
type = PIPE;
uv_init();
/* Start making connections */
maybe_connect_some();
uv_run();
}
BENCHMARK_IMPL(tcp_pump100_client) {
tcp_pump(100);
return 0;
}
BENCHMARK_IMPL(pump1_client) {
pump(1);
BENCHMARK_IMPL(tcp_pump1_client) {
tcp_pump(1);
return 0;
}
BENCHMARK_IMPL(pipe_pump100_client) {
pipe_pump(100);
return 0;
}
BENCHMARK_IMPL(pipe_pump1_client) {
pipe_pump(1);
return 0;
}

View File

@ -26,6 +26,7 @@
BENCHMARK_IMPL(sizes) {
LOGF("uv_req_t: %u bytes\n", (unsigned int) sizeof(uv_req_t));
LOGF("uv_tcp_t: %u bytes\n", (unsigned int) sizeof(uv_tcp_t));
LOGF("uv_pipe_t: %u bytes\n", (unsigned int) sizeof(uv_pipe_t));
LOGF("uv_prepare_t: %u bytes\n", (unsigned int) sizeof(uv_prepare_t));
LOGF("uv_check_t: %u bytes\n", (unsigned int) sizeof(uv_check_t));
LOGF("uv_idle_t: %u bytes\n", (unsigned int) sizeof(uv_idle_t));

View File

@ -24,19 +24,16 @@
#include <stdio.h>
#include <stdlib.h>
typedef struct {
uv_req_t req;
uv_buf_t buf;
} write_req_t;
static int server_closed;
static uv_tcp_t server;
static int server6_closed;
static uv_tcp_t server6;
static stream_type serverType;
static uv_tcp_t tcpServer;
static uv_pipe_t pipeServer;
static uv_handle_t* server;
static void after_write(uv_req_t* req, int status);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
@ -98,10 +95,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
if (!server_closed) {
for (i = 0; i < nread; i++) {
if (buf.base[i] == 'Q') {
uv_close((uv_handle_t*)&server, on_server_close);
uv_close(server, on_server_close);
server_closed = 1;
uv_close((uv_handle_t*)&server6, on_server_close);
server6_closed = 1;
}
}
}
@ -131,7 +126,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) {
static void on_connection(uv_handle_t* server, int status) {
uv_tcp_t* handle;
uv_handle_t* handle;
int r;
if (status != 0) {
@ -139,10 +134,17 @@ static void on_connection(uv_handle_t* server, int status) {
}
ASSERT(status == 0);
handle = (uv_tcp_t*) malloc(sizeof *handle);
ASSERT(handle != NULL);
if (serverType == TCP) {
handle = (uv_handle_t*) malloc(sizeof(uv_tcp_t));
ASSERT(handle != NULL);
uv_tcp_init(handle);
uv_tcp_init((uv_tcp_t*)handle);
} else {
handle = (uv_handle_t*) malloc(sizeof(uv_pipe_t));
ASSERT(handle != NULL);
uv_pipe_init((uv_pipe_t*)handle);
}
/* associate server with stream */
handle->data = server;
@ -156,37 +158,50 @@ static void on_connection(uv_handle_t* server, int status) {
static void on_server_close(uv_handle_t* handle) {
ASSERT(handle == (uv_handle_t*)&server || handle == (uv_handle_t*)&server6);
ASSERT(handle == server);
}
static int echo_start(int port) {
static int tcp4_echo_start(int port) {
struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", port);
struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port);
int r;
r = uv_tcp_init(&server);
server = (uv_handle_t*)&tcpServer;
serverType = TCP;
r = uv_tcp_init(&tcpServer);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Socket creation error\n");
return 1;
}
r = uv_tcp_bind(&server, addr);
r = uv_tcp_bind(&tcpServer, addr);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Bind error\n");
return 1;
}
r = uv_tcp_listen(&server, 128, on_connection);
r = uv_tcp_listen(&tcpServer, 128, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error\n");
return 1;
}
r = uv_tcp_init(&server6);
return 0;
}
static int tcp6_echo_start(int port) {
struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port);
int r;
server = (uv_handle_t*)&tcpServer;
serverType = TCP;
r = uv_tcp_init(&tcpServer);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Socket creation error\n");
@ -194,14 +209,45 @@ static int echo_start(int port) {
}
/* IPv6 is optional as not all platforms support it */
r = uv_tcp_bind6(&server6, addr6);
r = uv_tcp_bind6(&tcpServer, addr6);
if (r) {
/* show message but return OK */
fprintf(stderr, "IPv6 not supported\n");
return 0;
}
r = uv_tcp_listen(&server6, 128, on_connection);
r = uv_tcp_listen(&tcpServer, 128, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error\n");
return 1;
}
return 0;
}
static int pipe_echo_start(char* pipeName) {
int r;
server = (uv_handle_t*)&pipeServer;
serverType = PIPE;
r = uv_pipe_init(&pipeServer);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Pipe creation error\n");
return 1;
}
r = uv_pipe_create(&pipeServer, pipeName);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "create error\n");
return 1;
}
r = uv_pipe_listen(&pipeServer, 1, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error on IPv6\n");
@ -212,9 +258,30 @@ static int echo_start(int port) {
}
HELPER_IMPL(echo_server) {
HELPER_IMPL(tcp4_echo_server) {
uv_init();
if (echo_start(TEST_PORT))
if (tcp4_echo_start(TEST_PORT))
return 1;
uv_run();
return 0;
}
HELPER_IMPL(tcp6_echo_server) {
uv_init();
if (tcp6_echo_start(TEST_PORT))
return 1;
uv_run();
return 0;
}
HELPER_IMPL(pipe_echo_server) {
uv_init();
if (pipe_echo_start(TEST_PIPENAME))
return 1;
uv_run();

View File

@ -30,6 +30,17 @@
#define TEST_PORT 9123
#define TEST_PORT_2 9124
#ifdef _WIN32
# define TEST_PIPENAME "\\\\.\\pipe\\uv-test"
#else
# /* TODO: define unix pipe name */
# define TEST_PIPENAME ""
#endif
typedef enum {
TCP = 0,
PIPE
} stream_type;
/* Log to stderr. */
#define LOG(...) fprintf(stderr, "%s", __VA_ARGS__)

View File

@ -19,8 +19,9 @@
* IN THE SOFTWARE.
*/
TEST_DECLARE (ping_pong)
TEST_DECLARE (ping_pong_v6)
TEST_DECLARE (tcp_ping_pong)
TEST_DECLARE (tcp_ping_pong_v6)
TEST_DECLARE (pipe_ping_pong)
TEST_DECLARE (delayed_accept)
TEST_DECLARE (tcp_writealot)
TEST_DECLARE (bind_error_addrinuse)
@ -54,20 +55,25 @@ TEST_DECLARE (getaddrinfo_concurrent)
TEST_DECLARE (gethostbyname)
TEST_DECLARE (fail_always)
TEST_DECLARE (pass_always)
HELPER_DECLARE (echo_server)
HELPER_DECLARE (tcp4_echo_server)
HELPER_DECLARE (tcp6_echo_server)
HELPER_DECLARE (pipe_echo_server)
TASK_LIST_START
TEST_ENTRY (ping_pong)
TEST_HELPER (ping_pong, echo_server)
TEST_ENTRY (tcp_ping_pong)
TEST_HELPER (tcp_ping_pong, tcp4_echo_server)
TEST_ENTRY (ping_pong_v6)
TEST_HELPER (ping_pong_v6, echo_server)
TEST_ENTRY (tcp_ping_pong_v6)
TEST_HELPER (tcp_ping_pong_v6, tcp6_echo_server)
TEST_ENTRY (pipe_ping_pong)
TEST_HELPER (pipe_ping_pong, pipe_echo_server)
TEST_ENTRY (delayed_accept)
TEST_ENTRY (tcp_writealot)
TEST_HELPER (tcp_writealot, echo_server)
TEST_HELPER (tcp_writealot, tcp4_echo_server)
TEST_ENTRY (bind_error_addrinuse)
TEST_ENTRY (bind_error_addrnotavail_1)
@ -86,10 +92,10 @@ TASK_LIST_START
TEST_ENTRY (connection_fail_doesnt_auto_close)
TEST_ENTRY (shutdown_eof)
TEST_HELPER (shutdown_eof, echo_server)
TEST_HELPER (shutdown_eof, tcp4_echo_server)
TEST_ENTRY (callback_stack)
TEST_HELPER (callback_stack, echo_server)
TEST_HELPER (callback_stack, tcp4_echo_server)
TEST_ENTRY (timer)
@ -113,7 +119,7 @@ TASK_LIST_START
TEST_ENTRY (getaddrinfo_concurrent)
TEST_ENTRY (gethostbyname)
TEST_HELPER (gethostbyname, echo_server)
TEST_HELPER (gethostbyname, tcp4_echo_server)
#if 0
/* These are for testing the test runner. */

View File

@ -39,7 +39,10 @@ static char PING[] = "PING\n";
typedef struct {
int pongs;
int state;
uv_tcp_t tcp;
union {
uv_tcp_t tcp;
uv_pipe_t pipe;
};
uv_req_t connect_req;
uv_req_t read_req;
char read_buffer[BUFSIZE];
@ -93,11 +96,11 @@ static void pinger_write_ping(pinger_t* pinger) {
}
static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) {
unsigned int i;
pinger_t* pinger;
pinger = (pinger_t*)tcp->data;
pinger = (pinger_t*)stream->data;
if (nread < 0) {
ASSERT(uv_last_error().code == UV_EOF);
@ -142,44 +145,8 @@ static void pinger_on_connect(uv_req_t *req, int status) {
}
static void pinger_new() {
int r;
struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
pinger_t *pinger;
pinger = (pinger_t*)malloc(sizeof(*pinger));
pinger->state = 0;
pinger->pongs = 0;
/* Try to connec to the server and do NUM_PINGS ping-pongs. */
r = uv_tcp_init(&pinger->tcp);
pinger->tcp.data = pinger;
ASSERT(!r);
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
(void *(*)(void *))pinger_on_connect);
r = uv_tcp_connect(&pinger->connect_req, server_addr);
ASSERT(!r);
}
TEST_IMPL(ping_pong) {
uv_init();
pinger_new();
uv_run();
ASSERT(completed_pingers == 1);
return 0;
}
/* same ping-pong test, but using IPv6 connection */
static void pinger_v6_new() {
static void tcp_pinger_v6_new() {
int r;
struct sockaddr_in6 server_addr = uv_ip6_addr("::1", TEST_PORT);
pinger_t *pinger;
@ -203,10 +170,81 @@ static void pinger_v6_new() {
}
TEST_IMPL(ping_pong_v6) {
static void tcp_pinger_new() {
int r;
struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
pinger_t *pinger;
pinger = (pinger_t*)malloc(sizeof(*pinger));
pinger->state = 0;
pinger->pongs = 0;
/* Try to connec to the server and do NUM_PINGS ping-pongs. */
r = uv_tcp_init(&pinger->tcp);
pinger->tcp.data = pinger;
ASSERT(!r);
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
pinger_on_connect);
r = uv_tcp_connect(&pinger->connect_req, server_addr);
ASSERT(!r);
}
static void pipe_pinger_new() {
int r;
pinger_t *pinger;
pinger = (pinger_t*)malloc(sizeof(*pinger));
pinger->state = 0;
pinger->pongs = 0;
/* Try to connec to the server and do NUM_PINGS ping-pongs. */
r = uv_pipe_init(&pinger->pipe);
pinger->pipe.data = pinger;
ASSERT(!r);
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe),
(void *(*)(void *))pinger_on_connect);
r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME);
ASSERT(!r);
}
TEST_IMPL(tcp_ping_pong) {
uv_init();
pinger_v6_new();
tcp_pinger_new();
uv_run();
ASSERT(completed_pingers == 1);
return 0;
}
TEST_IMPL(tcp_ping_pong_v6) {
uv_init();
tcp_pinger_v6_new();
uv_run();
ASSERT(completed_pingers == 1);
return 0;
}
TEST_IMPL(pipe_ping_pong) {
uv_init();
pipe_pinger_new();
uv_run();
ASSERT(completed_pingers == 1);