From b6a6dae34f01814ad1d19f8ffcc3f77c234432f5 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Fri, 1 Jul 2011 17:54:17 -0700 Subject: [PATCH] Named pipes implementation for Windows --- include/uv-unix.h | 5 + include/uv-win.h | 66 +++- include/uv.h | 26 +- src/uv-unix.c | 19 ++ src/uv-win.c | 748 +++++++++++++++++++++++++++++++++++++++-- test/benchmark-list.h | 28 +- test/benchmark-pump.c | 147 ++++++-- test/benchmark-sizes.c | 1 + test/echo-server.c | 119 +++++-- test/task.h | 11 + test/test-list.h | 28 +- test/test-ping-pong.c | 122 ++++--- 12 files changed, 1155 insertions(+), 165 deletions(-) diff --git a/include/uv-unix.h b/include/uv-unix.h index f68a7fd6f..d17ebc3b2 100644 --- a/include/uv-unix.h +++ b/include/uv-unix.h @@ -73,6 +73,11 @@ typedef struct { ev_io write_watcher; \ ngx_queue_t write_queue; \ ngx_queue_t write_completed_queue; + + +/* UV_NAMED_PIPE */ +#define UV_PIPE_PRIVATE_TYPEDEF +#define UV_PIPE_PRIVATE_FIELDS /* UV_PREPARE */ \ diff --git a/include/uv-win.h b/include/uv-win.h index e6254fee0..ee29e8c96 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -31,6 +31,8 @@ #include "tree.h" +#define MAX_PIPENAME_LEN 256 + /** * It should be possible to cast uv_buf_t[] to WSABUF[] * see http://msdn.microsoft.com/en-us/library/ms741542(v=vs.85).aspx @@ -40,6 +42,17 @@ typedef struct uv_buf_t { char* base; } uv_buf_t; +/* + * Private uv_pipe_instance state. + */ +typedef enum { + UV_PIPEINSTANCE_DISCONNECTED = 0, + UV_PIPEINSTANCE_PENDING, + UV_PIPEINSTANCE_WAITING, + UV_PIPEINSTANCE_ACCEPTED, + UV_PIPEINSTANCE_ACTIVE +} uv_pipeinstance_state; + #define UV_REQ_PRIVATE_FIELDS \ union { \ /* Used by I/O operations */ \ @@ -51,31 +64,56 @@ typedef struct uv_buf_t { int flags; \ uv_err_t error; \ struct uv_req_s* next_req; - -#define UV_STREAM_PRIVATE_FIELDS \ - uv_alloc_cb alloc_cb; \ - uv_read_cb read_cb; \ - struct uv_req_s read_req; \ -#define uv_tcp_connection_fields \ +#define uv_stream_connection_fields \ unsigned int write_reqs_pending; \ uv_req_t* shutdown_req; -#define uv_tcp_server_fields \ - uv_connection_cb connection_cb; \ - SOCKET accept_socket; \ - struct uv_req_s accept_req; \ - char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; +#define uv_stream_server_fields \ + uv_connection_cb connection_cb; + +#define UV_STREAM_PRIVATE_FIELDS \ + unsigned int reqs_pending; \ + uv_alloc_cb alloc_cb; \ + uv_read_cb read_cb; \ + struct uv_req_s read_req; \ + union { \ + struct { uv_stream_connection_fields }; \ + struct { uv_stream_server_fields }; \ + }; #define UV_TCP_PRIVATE_FIELDS \ - unsigned int reqs_pending; \ union { \ SOCKET socket; \ HANDLE handle; \ }; \ + SOCKET accept_socket; \ + char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ + struct uv_req_s accept_req; + +#define uv_pipe_server_fields \ + char* name; \ + int connectionCount; \ + uv_pipe_instance_t* connections; \ + uv_pipe_instance_t* acceptConnection; \ + uv_pipe_instance_t connectionsBuffer[4]; + +#define uv_pipe_connection_fields \ + uv_pipe_t* server; \ + uv_pipe_instance_t* connection; \ + uv_pipe_instance_t clientConnection; + +#define UV_PIPE_PRIVATE_TYPEDEF \ + typedef struct uv_pipe_instance_s { \ + HANDLE handle; \ + uv_pipeinstance_state state; \ + uv_req_t accept_req; \ + } uv_pipe_instance_t; + +#define UV_PIPE_PRIVATE_FIELDS \ union { \ - struct { uv_tcp_connection_fields }; \ - struct { uv_tcp_server_fields }; \ + struct { uv_pipe_server_fields }; \ + struct { uv_pipe_connection_fields }; \ }; #define UV_TIMER_PRIVATE_FIELDS \ diff --git a/include/uv.h b/include/uv.h index 6aecb2820..5e0c346e5 100644 --- a/include/uv.h +++ b/include/uv.h @@ -43,6 +43,7 @@ typedef struct uv_err_s uv_err_t; typedef struct uv_handle_s uv_handle_t; typedef struct uv_stream_s uv_stream_t; typedef struct uv_tcp_s uv_tcp_t; +typedef struct uv_pipe_s uv_pipe_t; typedef struct uv_timer_s uv_timer_t; typedef struct uv_prepare_s uv_prepare_t; typedef struct uv_check_s uv_check_t; @@ -124,7 +125,8 @@ typedef enum { UV_EAIFAMNOSUPPORT, UV_EAINONAME, UV_EAISERVICE, - UV_EAISOCKTYPE + UV_EAISOCKTYPE, + UV_ESHUTDOWN } uv_err_code; typedef enum { @@ -287,6 +289,26 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6); int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); +/* + * A subclass of uv_stream_t representing a pipe stream or pipe server. + */ +UV_PIPE_PRIVATE_TYPEDEF + +struct uv_pipe_s { + UV_HANDLE_FIELDS + UV_STREAM_FIELDS + UV_PIPE_PRIVATE_FIELDS +}; + +int uv_pipe_init(uv_pipe_t* handle); + +int uv_pipe_create(uv_pipe_t* handle, char* name); + +int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb); + +int uv_pipe_connect(uv_req_t* req, char* name); + + /* * Subclass of uv_handle_t. libev wrapper. Every active prepare handle gets * its callback called exactly once per loop iteration, just before the @@ -478,7 +500,9 @@ union uv_any_handle { typedef struct { uint64_t req_init; uint64_t handle_init; + uint64_t stream_init; uint64_t tcp_init; + uint64_t pipe_init; uint64_t prepare_init; uint64_t check_init; uint64_t idle_init; diff --git a/src/uv-unix.c b/src/uv-unix.c index 786657b94..6bffaf27c 100644 --- a/src/uv-unix.c +++ b/src/uv-unix.c @@ -1594,3 +1594,22 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle, return 0; } + +int uv_pipe_init(uv_pipe_t* handle) { + assert(0 && "implement me"); +} + + +int uv_pipe_create(uv_pipe_t* handle, char* name) { + assert(0 && "implement me"); +} + + +int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { + assert(0 && "implement me"); +} + + +int uv_pipe_connect(uv_req_t* req, char* name) { + assert(0 && "implement me"); +} diff --git a/src/uv-win.c b/src/uv-win.c index 704b9cf21..7edd1e879 100644 --- a/src/uv-win.c +++ b/src/uv-win.c @@ -164,6 +164,7 @@ static LPFN_TRANSMITFILE pTransmitFile6; #define UV_HANDLE_ENDGAME_QUEUED 0x0400 #define UV_HANDLE_BIND_ERROR 0x1000 #define UV_HANDLE_IPV6 0x2000 +#define UV_HANDLE_PIPESERVER 0x4000 /* * Private uv_req flags. @@ -242,6 +243,8 @@ void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req); void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req); void uv_ares_poll(uv_timer_t* handle, int status); +static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); + /* memory used per ares_channel */ struct uv_ares_channel_s { ares_channel channel; @@ -373,6 +376,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) { case ERROR_INVALID_FLAGS: return UV_EBADF; case ERROR_INVALID_PARAMETER: return UV_EINVAL; case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET; + case ERROR_BROKEN_PIPE: return UV_EOF; default: return UV_UNKNOWN; } } @@ -527,6 +531,11 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) { } +static uv_pipe_instance_t* uv_req_to_pipeinstance(uv_req_t* req) { + return CONTAINING_RECORD(req, uv_pipe_instance_t, accept_req); +} + + static void uv_insert_pending_req(uv_req_t* req) { req->next_req = NULL; if (uv_pending_reqs_tail_) { @@ -594,24 +603,20 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) { } -static void uv_tcp_init_connection(uv_tcp_t* handle) { +static void uv_init_connection(uv_stream_t* handle) { handle->flags |= UV_HANDLE_CONNECTION; handle->write_reqs_pending = 0; uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL); } -int uv_tcp_init(uv_tcp_t* handle) { - handle->socket = INVALID_SOCKET; +int uv_stream_init(uv_stream_t* handle) { handle->write_queue_size = 0; - handle->type = UV_TCP; handle->flags = 0; - handle->reqs_pending = 0; handle->error = uv_ok_; - handle->accept_socket = INVALID_SOCKET; uv_counters()->handle_init++; - uv_counters()->tcp_init++; + uv_counters()->stream_init++; uv_refs_++; @@ -619,6 +624,20 @@ int uv_tcp_init(uv_tcp_t* handle) { } +int uv_tcp_init(uv_tcp_t* handle) { + uv_stream_init((uv_stream_t*)handle); + + handle->socket = INVALID_SOCKET; + handle->type = UV_TCP; + handle->reqs_pending = 0; + handle->accept_socket = INVALID_SOCKET; + + uv_counters()->tcp_init++; + + return 0; +} + + static void uv_tcp_endgame(uv_tcp_t* handle) { uv_err_t err; int status; @@ -658,6 +677,39 @@ static void uv_tcp_endgame(uv_tcp_t* handle) { } +static void uv_pipe_endgame(uv_pipe_t* handle) { + uv_err_t err; + int status; + + if (handle->flags & UV_HANDLE_SHUTTING && + !(handle->flags & UV_HANDLE_SHUT) && + handle->write_reqs_pending == 0) { + close_pipe(handle, &status, &err); + + if (handle->shutdown_req->cb) { + handle->shutdown_req->flags &= ~UV_REQ_PENDING; + if (status == -1) { + uv_last_error_ = err; + } + ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status); + } + handle->reqs_pending--; + } + + if (handle->flags & UV_HANDLE_CLOSING && + handle->reqs_pending == 0) { + assert(!(handle->flags & UV_HANDLE_CLOSED)); + handle->flags |= UV_HANDLE_CLOSED; + + if (handle->close_cb) { + handle->close_cb((uv_handle_t*)handle); + } + + uv_refs_--; + } +} + + static void uv_timer_endgame(uv_timer_t* handle) { if (handle->flags & UV_HANDLE_CLOSING) { assert(!(handle->flags & UV_HANDLE_CLOSED)); @@ -714,6 +766,10 @@ static void uv_process_endgames() { case UV_TCP: uv_tcp_endgame((uv_tcp_t*)handle); break; + + case UV_NAMED_PIPE: + uv_pipe_endgame((uv_pipe_t*)handle); + break; case UV_TIMER: uv_timer_endgame((uv_timer_t*)handle); @@ -749,6 +805,7 @@ static void uv_want_endgame(uv_handle_t* handle) { static int uv_close_error(uv_handle_t* handle, uv_err_t e) { uv_tcp_t* tcp; + uv_pipe_t* pipe; if (handle->flags & UV_HANDLE_CLOSING) { return 0; @@ -773,6 +830,15 @@ static int uv_close_error(uv_handle_t* handle, uv_err_t e) { uv_want_endgame(handle); } return 0; + + case UV_NAMED_PIPE: + pipe = (uv_pipe_t*)handle; + pipe->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING); + close_pipe(pipe, NULL, NULL); + if (pipe->reqs_pending == 0) { + uv_want_endgame(handle); + } + return 0; case UV_TIMER: uv_timer_stop((uv_timer_t*)handle); @@ -877,7 +943,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) { } -static void uv_queue_accept(uv_tcp_t* handle) { +static void uv_tcp_queue_accept(uv_tcp_t* handle) { uv_req_t* req; BOOL success; DWORD bytes; @@ -908,6 +974,7 @@ static void uv_queue_accept(uv_tcp_t* handle) { if (accept_socket == INVALID_SOCKET) { req->error = uv_new_sys_error(WSAGetLastError()); uv_insert_pending_req(req); + handle->reqs_pending++; return; } @@ -927,6 +994,7 @@ static void uv_queue_accept(uv_tcp_t* handle) { /* Make this req pending reporting an error. */ req->error = uv_new_sys_error(WSAGetLastError()); uv_insert_pending_req(req); + handle->reqs_pending++; /* Destroy the preallocated client socket. */ closesocket(accept_socket); return; @@ -939,7 +1007,48 @@ static void uv_queue_accept(uv_tcp_t* handle) { } -static void uv_queue_read(uv_tcp_t* handle) { +static void uv_pipe_queue_accept(uv_pipe_t* handle) { + uv_req_t* req; + uv_pipe_instance_t* instance; + int i; + + assert(handle->flags & UV_HANDLE_LISTENING); + + /* This loop goes through every pipe instance and calls ConnectNamedPipe for every pending instance. + * TODO: Make this faster (we could maintain a linked list of pending instances). + */ + for (i = 0; i < handle->connectionCount; i++) { + instance = &handle->connections[i]; + + if (instance->state == UV_PIPEINSTANCE_PENDING) { + /* Prepare the uv_req structure. */ + req = &instance->accept_req; + uv_req_init(req, (uv_handle_t*)handle, NULL); + assert(!(req->flags & UV_REQ_PENDING)); + req->type = UV_ACCEPT; + req->flags |= UV_REQ_PENDING; + + /* Prepare the overlapped structure. */ + memset(&(req->overlapped), 0, sizeof(req->overlapped)); + + if (!ConnectNamedPipe(instance->handle, &req->overlapped) && + GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(GetLastError()); + uv_insert_pending_req(req); + handle->reqs_pending++; + continue; + } + + instance->state = UV_PIPEINSTANCE_WAITING; + handle->reqs_pending++; + req->flags |= UV_REQ_PENDING; + } + } +} + + +static void uv_tcp_queue_read(uv_tcp_t* handle) { uv_req_t* req; uv_buf_t buf; int result; @@ -967,6 +1076,40 @@ static void uv_queue_read(uv_tcp_t* handle) { /* Make this req pending reporting an error. */ req->error = uv_new_sys_error(WSAGetLastError()); uv_insert_pending_req(req); + handle->reqs_pending++; + return; + } + + req->flags |= UV_REQ_PENDING; + handle->reqs_pending++; +} + + +static void uv_pipe_queue_read(uv_pipe_t* handle) { + uv_req_t* req; + int result; + + assert(handle->flags & UV_HANDLE_READING); + assert(handle->connection); + assert(handle->connection->handle != INVALID_HANDLE_VALUE); + + req = &handle->read_req; + assert(!(req->flags & UV_REQ_PENDING)); + memset(&req->overlapped, 0, sizeof(req->overlapped)); + req->type = UV_READ; + + /* Do 0-read */ + result = ReadFile(handle->connection->handle, + &uv_zero_, + 0, + NULL, + &req->overlapped); + + if (!result && GetLastError() != ERROR_IO_PENDING) { + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(WSAGetLastError()); + uv_insert_pending_req(req); + handle->reqs_pending++; return; } @@ -999,40 +1142,76 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { handle->connection_cb = cb; uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL); - uv_queue_accept(handle); + uv_tcp_queue_accept(handle); return 0; } -int uv_accept(uv_handle_t* server, uv_stream_t* client) { +static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { int rv = 0; - uv_tcp_t* tcpServer = (uv_tcp_t*)server; - uv_tcp_t* tcpClient = (uv_tcp_t*)client; - if (tcpServer->accept_socket == INVALID_SOCKET) { + if (server->accept_socket == INVALID_SOCKET) { uv_set_sys_error(WSAENOTCONN); return -1; } - if (uv_tcp_set_socket(tcpClient, tcpServer->accept_socket) == -1) { - closesocket(tcpServer->accept_socket); + if (uv_tcp_set_socket(client, server->accept_socket) == -1) { + closesocket(server->accept_socket); rv = -1; } else { - uv_tcp_init_connection(tcpClient); + uv_init_connection((uv_stream_t*)client); } - tcpServer->accept_socket = INVALID_SOCKET; + server->accept_socket = INVALID_SOCKET; - if (!(tcpServer->flags & UV_HANDLE_CLOSING)) { - uv_queue_accept(tcpServer); + if (!(server->flags & UV_HANDLE_CLOSING)) { + uv_tcp_queue_accept(server); } return rv; } -int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { +static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { + assert(server->acceptConnection); + + /* Make the connection instance active */ + server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE; + + /* Move the connection instance from server to client */ + client->connection = server->acceptConnection; + server->acceptConnection = NULL; + + /* Remember the server */ + client->server = server; + + uv_init_connection((uv_stream_t*)client); + client->flags |= UV_HANDLE_PIPESERVER; + uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL); + + if (!(server->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(server); + } + + return 0; +} + + +int uv_accept(uv_handle_t* server, uv_stream_t* client) { + assert(client->type == server->type); + + if (server->type == UV_TCP) { + return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); + } else if (server->type == UV_NAMED_PIPE) { + return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); + } + + return -1; +} + + +static int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { if (!(handle->flags & UV_HANDLE_CONNECTION)) { uv_set_sys_error(WSAEINVAL); return -1; @@ -1055,12 +1234,52 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) /* If reading was stopped and then started again, there could stell be a */ /* read request pending. */ if (!(handle->read_req.flags & UV_REQ_PENDING)) - uv_queue_read((uv_tcp_t*)handle); + uv_tcp_queue_read(handle); return 0; } +static int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { + uv_set_sys_error(UV_EINVAL); + return -1; + } + + if (handle->flags & UV_HANDLE_READING) { + uv_set_sys_error(UV_EALREADY); + return -1; + } + + if (handle->flags & UV_HANDLE_EOF) { + uv_set_sys_error(UV_EOF); + return -1; + } + + handle->flags |= UV_HANDLE_READING; + handle->read_cb = read_cb; + handle->alloc_cb = alloc_cb; + + /* If reading was stopped and then started again, there could stell be a */ + /* read request pending. */ + if (!(handle->read_req.flags & UV_REQ_PENDING)) + uv_pipe_queue_read(handle); + + return 0; +} + + +int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { + if (handle->type == UV_TCP) { + return uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb); + } else if (handle->type == UV_NAMED_PIPE) { + return uv_pipe_read_start((uv_pipe_t*)handle, alloc_cb, read_cb); + } + + return -1; +} + + int uv_read_stop(uv_stream_t* handle) { handle->flags &= ~UV_HANDLE_READING; @@ -1175,7 +1394,7 @@ static size_t uv_count_bufs(uv_buf_t bufs[], int count) { } -int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { +int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { int result; DWORD bytes, err; uv_tcp_t* handle = (uv_tcp_t*) req->handle; @@ -1228,6 +1447,72 @@ int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { } +int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { + int result; + uv_pipe_t* handle = (uv_pipe_t*) req->handle; + + assert(!(req->flags & UV_REQ_PENDING)); + + if (bufcnt != 1) { + uv_set_sys_error(UV_ENOTSUP); + return -1; + } + + assert(handle->connection); + assert(handle->connection->handle != INVALID_HANDLE_VALUE); + + if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { + uv_set_sys_error(UV_EINVAL); + return -1; + } + + if (req->handle->flags & UV_HANDLE_SHUTTING) { + uv_set_sys_error(UV_EOF); + return -1; + } + + memset(&req->overlapped, 0, sizeof(req->overlapped)); + req->type = UV_WRITE; + + result = WriteFile(handle->connection->handle, + bufs[0].base, + bufs[0].len, + NULL, + &req->overlapped); + + if (!result && GetLastError() != WSA_IO_PENDING) { + uv_set_sys_error(GetLastError()); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + req->queued_bytes = uv_count_bufs(bufs, bufcnt); + handle->write_queue_size += req->queued_bytes; + } + + req->flags |= UV_REQ_PENDING; + handle->reqs_pending++; + handle->write_reqs_pending++; + + return 0; +} + + +int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { + if (req->handle->type == UV_TCP) { + return uv_tcp_write(req, bufs, bufcnt); + } else if (req->handle->type == UV_NAMED_PIPE) { + return uv_pipe_write(req, bufs, bufcnt); + } + + return -1; +} + + int uv_shutdown(uv_req_t* req) { uv_tcp_t* handle = (uv_tcp_t*) req->handle; int status = 0; @@ -1246,7 +1531,7 @@ int uv_shutdown(uv_req_t* req) { req->flags |= UV_REQ_PENDING; handle->flags |= UV_HANDLE_SHUTTING; - handle->shutdown_req = req; + handle->shutdown_req = req; handle->reqs_pending++; uv_want_endgame((uv_handle_t*)handle); @@ -1338,7 +1623,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { } /* Post another 0-read if still reading and not closing. */ if (handle->flags & UV_HANDLE_READING) { - uv_queue_read(handle); + uv_tcp_queue_read(handle); } break; @@ -1375,7 +1660,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { /* uv_queue_accept will detect it. */ closesocket(handle->accept_socket); if (handle->flags & UV_HANDLE_LISTENING) { - uv_queue_accept(handle); + uv_tcp_queue_accept(handle); } } break; @@ -1388,7 +1673,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { SO_UPDATE_CONNECT_CONTEXT, NULL, 0) == 0) { - uv_tcp_init_connection(handle); + uv_init_connection((uv_stream_t*)handle); ((uv_connect_cb)req->cb)(req, 0); } else { uv_set_sys_error(WSAGetLastError()); @@ -1417,6 +1702,161 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { } +static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) { + DWORD bytes, err, mode; + uv_buf_t buf; + uv_pipe_instance_t* acceptingConn; + + assert(handle->type == UV_NAMED_PIPE); + + /* Mark the request non-pending */ + req->flags &= ~UV_REQ_PENDING; + + switch (req->type) { + case UV_WRITE: + handle->write_queue_size -= req->queued_bytes; + if (req->cb) { + uv_last_error_ = req->error; + ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); + } + handle->write_reqs_pending--; + if (handle->write_reqs_pending == 0 && + handle->flags & UV_HANDLE_SHUTTING) { + uv_want_endgame((uv_handle_t*)handle); + } + break; + + case UV_READ: + if (req->error.code != UV_OK) { + /* An error occurred doing the 0-read. */ + if (!(handle->flags & UV_HANDLE_READING)) { + break; + } + + /* Stop reading and report error. */ + handle->flags &= ~UV_HANDLE_READING; + uv_last_error_ = req->error; + buf.base = 0; + buf.len = 0; + handle->read_cb((uv_stream_t*)handle, -1, buf); + break; + } + + /* Temporarily switch to non-blocking mode. + * This is so that ReadFile doesn't block if the read buffer is empty. + */ + mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT; + if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + /* We can't continue processing this read. */ + err = GetLastError(); + uv_set_sys_error(err); + handle->read_cb((uv_stream_t*)handle, -1, buf); + break; + } + + /* Do non-blocking reads until the buffer is empty */ + while (handle->flags & UV_HANDLE_READING) { + buf = handle->alloc_cb((uv_stream_t*)handle, 65536); + assert(buf.len > 0); + + if (ReadFile(handle->connection->handle, + buf.base, + buf.len, + &bytes, + NULL)) { + if (bytes > 0) { + /* Successful read */ + handle->read_cb((uv_stream_t*)handle, bytes, buf); + /* Read again only if bytes == buf.len */ + if (bytes < buf.len) { + break; + } + } else { + /* Connection closed */ + handle->flags &= ~UV_HANDLE_READING; + handle->flags |= UV_HANDLE_EOF; + uv_last_error_.code = UV_EOF; + uv_last_error_.sys_errno_ = ERROR_SUCCESS; + handle->read_cb((uv_stream_t*)handle, -1, buf); + break; + } + } else { + err = GetLastError(); + if (err == ERROR_NO_DATA) { + /* Read buffer was completely empty, report a 0-byte read. */ + uv_set_sys_error(UV_EAGAIN); + handle->read_cb((uv_stream_t*)handle, 0, buf); + } else { + /* Ouch! serious error. */ + uv_set_sys_error(err); + handle->read_cb((uv_stream_t*)handle, -1, buf); + } + break; + } + } + + if (handle->flags & UV_HANDLE_READING) { + /* Switch back to blocking mode so that we can use IOCP for 0-reads */ + mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; + if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + /* Report and continue. */ + err = GetLastError(); + uv_set_sys_error(err); + handle->read_cb((uv_stream_t*)handle, -1, buf); + break; + } + + /* Post another 0-read if still reading and not closing. */ + uv_pipe_queue_read(handle); + } + + break; + + case UV_ACCEPT: + if (req->error.code == UV_OK) { + /* Put the connection instance into accept state */ + handle->acceptConnection = uv_req_to_pipeinstance(req); + handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED; + + if (handle->connection_cb) { + handle->connection_cb((uv_handle_t*)handle, 0); + } + } else { + /* Ignore errors and continue listening */ + if (handle->flags & UV_HANDLE_LISTENING) { + uv_pipe_queue_accept(handle); + } + } + break; + + case UV_CONNECT: + if (req->cb) { + if (req->error.code == UV_OK) { + uv_init_connection((uv_stream_t*)handle); + ((uv_connect_cb)req->cb)(req, 0); + } else { + uv_last_error_ = req->error; + ((uv_connect_cb)req->cb)(req, -1); + } + } + break; + + default: + assert(0); + } + + /* The number of pending requests is now down by one */ + handle->reqs_pending--; + + /* Queue the handle's close callback if it is closing and there are no */ + /* more pending requests. */ + if (handle->flags & UV_HANDLE_CLOSING && + handle->reqs_pending == 0) { + uv_want_endgame((uv_handle_t*)handle); + } +} + + static int uv_timer_compare(uv_timer_t* a, uv_timer_t* b) { if (a->due < b->due) return -1; @@ -1714,6 +2154,10 @@ static void uv_process_reqs() { case UV_TCP: uv_tcp_return_req((uv_tcp_t*)handle, req); break; + + case UV_NAMED_PIPE: + uv_pipe_return_req((uv_pipe_t*)handle, req); + break; case UV_ASYNC: uv_async_return_req((uv_async_t*)handle, req); @@ -2460,3 +2904,253 @@ error: return -1; } + +int uv_pipe_init(uv_pipe_t* handle) { + uv_stream_init((uv_stream_t*)handle); + + handle->type = UV_NAMED_PIPE; + handle->reqs_pending = 0; + + uv_counters()->pipe_init++; + + return 0; +} + + +/* Creates a pipe server. */ +/* TODO: make this work with UTF8 name */ +int uv_pipe_create(uv_pipe_t* handle, char* name) { + if (!name) { + return -1; + } + + handle->connections = NULL; + handle->acceptConnection = NULL; + handle->connectionCount = 0; + + /* Make our own copy of the pipe name */ + handle->name = (char*)malloc(MAX_PIPENAME_LEN); + if (!handle->name) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + strcpy(handle->name, name); + handle->name[255] = '\0'; + + handle->flags |= UV_HANDLE_PIPESERVER; + return 0; +} + + +/* Starts listening for connections for the given pipe. */ +int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { + int i, maxInstances, errno; + HANDLE pipeHandle; + uv_pipe_instance_t* pipeInstance; + + if (handle->flags & UV_HANDLE_LISTENING || + handle->flags & UV_HANDLE_READING) { + uv_set_sys_error(UV_EALREADY); + return -1; + } + + if (!(handle->flags & UV_HANDLE_PIPESERVER)) { + uv_set_sys_error(UV_ENOTSUP); + return -1; + } + + if (instanceCount <= sizeof(handle->connectionsBuffer)) { + /* Use preallocated connections buffer */ + handle->connections = handle->connectionsBuffer; + } else { + handle->connections = (uv_pipe_instance_t*)malloc(instanceCount * sizeof(uv_pipe_instance_t)); + if (!handle->connections) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + } + + maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount; + + for (i = 0; i < instanceCount; i++) { + pipeHandle = CreateNamedPipe(handle->name, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + maxInstances, + 65536, + 65536, + 0, + NULL); + + if (pipeHandle == INVALID_HANDLE_VALUE) { + errno = GetLastError(); + goto error; + } + + if (CreateIoCompletionPort(pipeHandle, + uv_iocp_, + (ULONG_PTR)handle, + 0) == NULL) { + errno = GetLastError(); + goto error; + } + + pipeInstance = &handle->connections[i]; + pipeInstance->handle = pipeHandle; + pipeInstance->state = UV_PIPEINSTANCE_PENDING; + } + + /* We don't need the pipe name anymore. */ + free(handle->name); + handle->name = NULL; + + handle->connectionCount = instanceCount; + handle->flags |= UV_HANDLE_LISTENING; + handle->connection_cb = cb; + + uv_pipe_queue_accept(handle); + return 0; + +error: + close_pipe(handle, NULL, NULL); + uv_set_sys_error(errno); + return -1; +} + +/* TODO: make this work with UTF8 name */ +int uv_pipe_connect(uv_req_t* req, char* name) { + int errno; + DWORD mode; + uv_pipe_t* handle = (uv_pipe_t*)req->handle; + + assert(!(req->flags & UV_REQ_PENDING)); + + req->type = UV_CONNECT; + handle->connection = &handle->clientConnection; + handle->server = NULL; + memset(&req->overlapped, 0, sizeof(req->overlapped)); + + handle->clientConnection.handle = CreateFile(name, + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); + + if (handle->clientConnection.handle == INVALID_HANDLE_VALUE && + GetLastError() != ERROR_IO_PENDING) { + errno = GetLastError(); + goto error; + } + + mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; + + if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) { + errno = GetLastError(); + goto error; + } + + if (CreateIoCompletionPort(handle->clientConnection.handle, + uv_iocp_, + (ULONG_PTR)handle, + 0) == NULL) { + errno = GetLastError(); + goto error; + } + + req->error = uv_ok_; + req->flags |= UV_REQ_PENDING; + handle->connection->state = UV_PIPEINSTANCE_ACTIVE; + uv_insert_pending_req(req); + handle->reqs_pending++; + return 0; + +error: + close_pipe(handle, NULL, NULL); + req->error = uv_new_sys_error(errno); + uv_insert_pending_req(req); + handle->reqs_pending++; + return 0; +} + + +/* Cleans up uv_pipe_t (server or connection) and all resources associated with it */ +static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { + uv_pipe_instance_t* connection; + int i; + + if (handle->flags & UV_HANDLE_PIPESERVER) { + if (handle->flags & UV_HANDLE_CONNECTION) { + /* + * The handle is for a connection instance on the pipe server. + * To clean-up, we call DisconnectNamedPipe, and return the instance to pending state, + * which will be ready to accept another pipe connection in uv_pipe_queue_accept. + */ + connection = handle->connection; + if (connection && connection->state != UV_PIPEINSTANCE_PENDING && connection->handle != INVALID_HANDLE_VALUE) { + /* Disconnect the connection intance and return it to pending state */ + if (DisconnectNamedPipe(connection->handle)) { + connection->state = UV_PIPEINSTANCE_PENDING; + handle->connection = NULL; + if (status) *status = 0; + } else { + if (status) *status = -1; + if (err) *err = uv_new_sys_error(GetLastError()); + } + + /* Queue accept now that the instance is in pending state. */ + if (!(handle->server->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(handle->server); + } + } + } else { + /* + * The handle is for the pipe server. + * To clean-up we close every connection instance that was made in uv_pipe_listen. + */ + + if (handle->name) { + free(handle->name); + handle->name = NULL; + } + + if (handle->connections) { + /* Go through the list of connections, and close each one with CloseHandle. */ + for (i = 0; i < handle->connectionCount; i++) { + connection = &handle->connections[i]; + if (connection->state != UV_PIPEINSTANCE_DISCONNECTED && connection->handle != INVALID_HANDLE_VALUE) { + CloseHandle(connection->handle); + connection->state = UV_PIPEINSTANCE_DISCONNECTED; + connection->handle = INVALID_HANDLE_VALUE; + } + } + + /* Free the connections buffer. */ + if (handle->connections != handle->connectionsBuffer) { + free(handle->connections); + } + + handle->connections = NULL; + } + + if (status) *status = 0; + } + } else { + /* + * The handle is for a connection instance on the pipe client. + * To clean-up + */ + connection = handle->connection; + if (connection && connection->handle != INVALID_HANDLE_VALUE) { + if (CloseHandle(connection->handle)) { + connection->state = UV_PIPEINSTANCE_DISCONNECTED; + handle->connection = NULL; + if (status) *status = 0; + } else { + if (status) *status = -1; + if (err) *err = uv_new_sys_error(GetLastError()); + } + } + } + + handle->flags |= UV_HANDLE_SHUT; +} diff --git a/test/benchmark-list.h b/test/benchmark-list.h index 6040e90a3..5e7987802 100644 --- a/test/benchmark-list.h +++ b/test/benchmark-list.h @@ -21,25 +21,35 @@ BENCHMARK_DECLARE (sizes) BENCHMARK_DECLARE (ping_pongs) -BENCHMARK_DECLARE (pump100_client) -BENCHMARK_DECLARE (pump1_client) +BENCHMARK_DECLARE (tcp_pump100_client) +BENCHMARK_DECLARE (tcp_pump1_client) +BENCHMARK_DECLARE (pipe_pump100_client) +BENCHMARK_DECLARE (pipe_pump1_client) BENCHMARK_DECLARE (gethostbyname) BENCHMARK_DECLARE (getaddrinfo) -HELPER_DECLARE (pump_server) -HELPER_DECLARE (echo_server) +HELPER_DECLARE (tcp_pump_server) +HELPER_DECLARE (pipe_pump_server) +HELPER_DECLARE (tcp4_echo_server) +HELPER_DECLARE (pipe_echo_server) HELPER_DECLARE (dns_server) TASK_LIST_START BENCHMARK_ENTRY (sizes) BENCHMARK_ENTRY (ping_pongs) - BENCHMARK_HELPER (ping_pongs, echo_server) + BENCHMARK_HELPER (ping_pongs, tcp4_echo_server) - BENCHMARK_ENTRY (pump100_client) - BENCHMARK_HELPER (pump100_client, pump_server) + BENCHMARK_ENTRY (tcp_pump100_client) + BENCHMARK_HELPER (tcp_pump100_client, tcp_pump_server) - BENCHMARK_ENTRY (pump1_client) - BENCHMARK_HELPER (pump1_client, pump_server) + BENCHMARK_ENTRY (tcp_pump1_client) + BENCHMARK_HELPER (tcp_pump1_client, tcp_pump_server) + + BENCHMARK_ENTRY (pipe_pump100_client) + BENCHMARK_HELPER (pipe_pump100_client, pipe_pump_server) + + BENCHMARK_ENTRY (pipe_pump1_client) + BENCHMARK_HELPER (pipe_pump1_client, pipe_pump_server) BENCHMARK_ENTRY (gethostbyname) BENCHMARK_HELPER (gethostbyname, dns_server) diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index cd9c7d996..591dbb0be 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -45,7 +45,9 @@ static uv_buf_t buf_alloc(uv_stream_t*, size_t size); static void buf_free(uv_buf_t uv_buf_t); -static uv_tcp_t server; +static uv_tcp_t tcpServer; +static uv_pipe_t pipeServer; +static uv_handle_t* server; static struct sockaddr_in listen_addr; static struct sockaddr_in connect_addr; @@ -68,7 +70,10 @@ static char write_buffer[WRITE_BUFFER_SIZE]; /* Make this as large as you need. */ #define MAX_WRITE_HANDLES 1000 -static uv_tcp_t write_handles[MAX_WRITE_HANDLES]; +static stream_type type; + +static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES]; +static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES]; static uv_timer_t timer_handle; @@ -81,6 +86,7 @@ static double gbit(int64_t bytes, int64_t passed_ms) { static void show_stats(uv_timer_t* handle, int status) { int64_t diff; + int i; #if PRINT_STATS LOGF("connections: %d, write: %.1f gbit/s\n", @@ -94,9 +100,13 @@ static void show_stats(uv_timer_t* handle, int status) { uv_update_time(); diff = uv_now() - start_time; - LOGF("pump%d_client: %.1f gbit/s\n", write_sockets, + LOGF("%s_pump%d_client: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", write_sockets, gbit(nsent_total, diff)); + for (i = 0; i < write_sockets; i++) { + uv_close(type == TCP ? (uv_handle_t*)&tcp_write_handles[i] : (uv_handle_t*)&pipe_write_handles[i], NULL); + } + exit(0); } @@ -112,7 +122,7 @@ static void read_show_stats() { uv_update_time(); diff = uv_now() - start_time; - LOGF("pump%d_server: %.1f gbit/s\n", max_read_sockets, + LOGF("%s_pump%d_server: %.1f gbit/s\n", type == TCP ? "tcp" : "pipe", max_read_sockets, gbit(nrecv_total, diff)); } @@ -133,7 +143,7 @@ void read_sockets_close_cb(uv_handle_t* handle) { */ if (uv_now() - start_time > 1000 && read_sockets == 0) { read_show_stats(); - uv_close((uv_handle_t*)&server, NULL); + uv_close(server, NULL); } } @@ -154,7 +164,7 @@ static void start_stats_collection() { } -static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) { +static void read_cb(uv_stream_t* stream, ssize_t bytes, uv_buf_t buf) { if (nrecv_total == 0) { ASSERT(start_time == 0); uv_update_time(); @@ -162,7 +172,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) { } if (bytes < 0) { - uv_close((uv_handle_t*)tcp, read_sockets_close_cb); + uv_close((uv_handle_t*)stream, read_sockets_close_cb); return; } @@ -187,7 +197,7 @@ static void write_cb(uv_req_t *req, int status) { } -static void do_write(uv_stream_t* tcp) { +static void do_write(uv_stream_t* stream) { uv_req_t* req; uv_buf_t buf; int r; @@ -195,9 +205,9 @@ static void do_write(uv_stream_t* tcp) { buf.base = (char*) &write_buffer; buf.len = sizeof write_buffer; - while (tcp->write_queue_size == 0) { + while (stream->write_queue_size == 0) { req = req_alloc(); - uv_req_init(req, (uv_handle_t*)tcp, write_cb); + uv_req_init(req, (uv_handle_t*)stream, write_cb); r = uv_write(req, &buf, 1); ASSERT(r == 0); @@ -221,7 +231,7 @@ static void connect_cb(uv_req_t* req, int status) { /* Yay! start writing */ for (i = 0; i < write_sockets; i++) { - do_write((uv_stream_t*)&write_handles[i]); + do_write(type == TCP ? (uv_stream_t*)&tcp_write_handles[i] : (uv_stream_t*)&pipe_write_handles[i]); } } } @@ -230,38 +240,55 @@ static void connect_cb(uv_req_t* req, int status) { static void maybe_connect_some() { uv_req_t* req; uv_tcp_t* tcp; + uv_pipe_t* pipe; int r; while (max_connect_socket < TARGET_CONNECTIONS && max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) { - tcp = &write_handles[max_connect_socket++]; + if (type == TCP) { + tcp = &tcp_write_handles[max_connect_socket++]; - r = uv_tcp_init(tcp); - ASSERT(r == 0); + r = uv_tcp_init(tcp); + ASSERT(r == 0); - req = req_alloc(); - uv_req_init(req, (uv_handle_t*)tcp, connect_cb); - r = uv_tcp_connect(req, connect_addr); - ASSERT(r == 0); + req = req_alloc(); + uv_req_init(req, (uv_handle_t*)tcp, connect_cb); + r = uv_tcp_connect(req, connect_addr); + ASSERT(r == 0); + } else { + pipe = &pipe_write_handles[max_connect_socket++]; + + r = uv_pipe_init(pipe); + ASSERT(r == 0); + + req = req_alloc(); + uv_req_init(req, (uv_handle_t*)pipe, connect_cb); + r = uv_pipe_connect(req, TEST_PIPENAME); + ASSERT(r == 0); + } } } static void connection_cb(uv_handle_t* s, int status) { - uv_tcp_t* tcp; + uv_stream_t* stream; int r; - ASSERT(&server == (uv_tcp_t*)s); + ASSERT(server == s); ASSERT(status == 0); - tcp = malloc(sizeof(uv_tcp_t)); + if (type == TCP) { + stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init((uv_tcp_t*)stream); + } else { + stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t)); + uv_pipe_init((uv_pipe_t*)stream); + } - uv_tcp_init(tcp); - - r = uv_accept(s, (uv_stream_t*)tcp); + r = uv_accept(s, stream); ASSERT(r == 0); - r = uv_read_start((uv_stream_t*)tcp, buf_alloc, read_cb); + r = uv_read_start(stream, buf_alloc, read_cb); ASSERT(r == 0); read_sockets++; @@ -317,7 +344,7 @@ typedef struct buf_list_s { static buf_list_t* buf_freelist = NULL; -static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) { +static uv_buf_t buf_alloc(uv_stream_t* stream, size_t size) { buf_list_t* buf; buf = buf_freelist; @@ -342,18 +369,20 @@ static void buf_free(uv_buf_t uv_buf_t) { } -HELPER_IMPL(pump_server) { +HELPER_IMPL(tcp_pump_server) { int r; + type = TCP; uv_init(); listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT); /* Server */ - r = uv_tcp_init(&server); + server = (uv_handle_t*)&tcpServer; + r = uv_tcp_init(&tcpServer); ASSERT(r == 0); - r = uv_tcp_bind(&server, listen_addr); + r = uv_tcp_bind(&tcpServer, listen_addr); ASSERT(r == 0); - r = uv_tcp_listen(&server, MAX_WRITE_HANDLES, connection_cb); + r = uv_tcp_listen(&tcpServer, MAX_WRITE_HANDLES, connection_cb); ASSERT(r == 0); uv_run(); @@ -362,9 +391,31 @@ HELPER_IMPL(pump_server) { } -void pump(int n) { +HELPER_IMPL(pipe_pump_server) { + int r; + type = PIPE; + + uv_init(); + + /* Server */ + server = (uv_handle_t*)&pipeServer; + r = uv_pipe_init(&pipeServer); + ASSERT(r == 0); + r = uv_pipe_create(&pipeServer, TEST_PIPENAME); + ASSERT(r == 0); + r = uv_pipe_listen(&pipeServer, MAX_WRITE_HANDLES, connection_cb); + ASSERT(r == 0); + + uv_run(); + + return 0; +} + + +void tcp_pump(int n) { ASSERT(n <= MAX_WRITE_HANDLES); TARGET_CONNECTIONS = n; + type = TCP; uv_init(); @@ -377,13 +428,39 @@ void pump(int n) { } -BENCHMARK_IMPL(pump100_client) { - pump(100); +void pipe_pump(int n) { + ASSERT(n <= MAX_WRITE_HANDLES); + TARGET_CONNECTIONS = n; + type = PIPE; + + uv_init(); + + /* Start making connections */ + maybe_connect_some(); + + uv_run(); +} + + +BENCHMARK_IMPL(tcp_pump100_client) { + tcp_pump(100); return 0; } -BENCHMARK_IMPL(pump1_client) { - pump(1); +BENCHMARK_IMPL(tcp_pump1_client) { + tcp_pump(1); + return 0; +} + + +BENCHMARK_IMPL(pipe_pump100_client) { + pipe_pump(100); + return 0; +} + + +BENCHMARK_IMPL(pipe_pump1_client) { + pipe_pump(1); return 0; } diff --git a/test/benchmark-sizes.c b/test/benchmark-sizes.c index 830de3a00..903864525 100644 --- a/test/benchmark-sizes.c +++ b/test/benchmark-sizes.c @@ -26,6 +26,7 @@ BENCHMARK_IMPL(sizes) { LOGF("uv_req_t: %u bytes\n", (unsigned int) sizeof(uv_req_t)); LOGF("uv_tcp_t: %u bytes\n", (unsigned int) sizeof(uv_tcp_t)); + LOGF("uv_pipe_t: %u bytes\n", (unsigned int) sizeof(uv_pipe_t)); LOGF("uv_prepare_t: %u bytes\n", (unsigned int) sizeof(uv_prepare_t)); LOGF("uv_check_t: %u bytes\n", (unsigned int) sizeof(uv_check_t)); LOGF("uv_idle_t: %u bytes\n", (unsigned int) sizeof(uv_idle_t)); diff --git a/test/echo-server.c b/test/echo-server.c index 9addc546c..b8c3500b9 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -24,19 +24,16 @@ #include #include - typedef struct { uv_req_t req; uv_buf_t buf; } write_req_t; - static int server_closed; -static uv_tcp_t server; - -static int server6_closed; -static uv_tcp_t server6; - +static stream_type serverType; +static uv_tcp_t tcpServer; +static uv_pipe_t pipeServer; +static uv_handle_t* server; static void after_write(uv_req_t* req, int status); static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); @@ -98,10 +95,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { if (!server_closed) { for (i = 0; i < nread; i++) { if (buf.base[i] == 'Q') { - uv_close((uv_handle_t*)&server, on_server_close); + uv_close(server, on_server_close); server_closed = 1; - uv_close((uv_handle_t*)&server6, on_server_close); - server6_closed = 1; } } } @@ -131,7 +126,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) { static void on_connection(uv_handle_t* server, int status) { - uv_tcp_t* handle; + uv_handle_t* handle; int r; if (status != 0) { @@ -139,10 +134,17 @@ static void on_connection(uv_handle_t* server, int status) { } ASSERT(status == 0); - handle = (uv_tcp_t*) malloc(sizeof *handle); - ASSERT(handle != NULL); + if (serverType == TCP) { + handle = (uv_handle_t*) malloc(sizeof(uv_tcp_t)); + ASSERT(handle != NULL); - uv_tcp_init(handle); + uv_tcp_init((uv_tcp_t*)handle); + } else { + handle = (uv_handle_t*) malloc(sizeof(uv_pipe_t)); + ASSERT(handle != NULL); + + uv_pipe_init((uv_pipe_t*)handle); + } /* associate server with stream */ handle->data = server; @@ -156,37 +158,50 @@ static void on_connection(uv_handle_t* server, int status) { static void on_server_close(uv_handle_t* handle) { - ASSERT(handle == (uv_handle_t*)&server || handle == (uv_handle_t*)&server6); + ASSERT(handle == server); } -static int echo_start(int port) { +static int tcp4_echo_start(int port) { struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", port); - struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port); int r; - - r = uv_tcp_init(&server); + + server = (uv_handle_t*)&tcpServer; + serverType = TCP; + + r = uv_tcp_init(&tcpServer); if (r) { /* TODO: Error codes */ fprintf(stderr, "Socket creation error\n"); return 1; } - r = uv_tcp_bind(&server, addr); + r = uv_tcp_bind(&tcpServer, addr); if (r) { /* TODO: Error codes */ fprintf(stderr, "Bind error\n"); return 1; } - r = uv_tcp_listen(&server, 128, on_connection); + r = uv_tcp_listen(&tcpServer, 128, on_connection); if (r) { /* TODO: Error codes */ fprintf(stderr, "Listen error\n"); return 1; } - r = uv_tcp_init(&server6); + return 0; +} + + +static int tcp6_echo_start(int port) { + struct sockaddr_in6 addr6 = uv_ip6_addr("::1", port); + int r; + + server = (uv_handle_t*)&tcpServer; + serverType = TCP; + + r = uv_tcp_init(&tcpServer); if (r) { /* TODO: Error codes */ fprintf(stderr, "Socket creation error\n"); @@ -194,14 +209,45 @@ static int echo_start(int port) { } /* IPv6 is optional as not all platforms support it */ - r = uv_tcp_bind6(&server6, addr6); + r = uv_tcp_bind6(&tcpServer, addr6); if (r) { /* show message but return OK */ fprintf(stderr, "IPv6 not supported\n"); return 0; } - r = uv_tcp_listen(&server6, 128, on_connection); + r = uv_tcp_listen(&tcpServer, 128, on_connection); + if (r) { + /* TODO: Error codes */ + fprintf(stderr, "Listen error\n"); + return 1; + } + + return 0; +} + + +static int pipe_echo_start(char* pipeName) { + int r; + + server = (uv_handle_t*)&pipeServer; + serverType = PIPE; + + r = uv_pipe_init(&pipeServer); + if (r) { + /* TODO: Error codes */ + fprintf(stderr, "Pipe creation error\n"); + return 1; + } + + r = uv_pipe_create(&pipeServer, pipeName); + if (r) { + /* TODO: Error codes */ + fprintf(stderr, "create error\n"); + return 1; + } + + r = uv_pipe_listen(&pipeServer, 1, on_connection); if (r) { /* TODO: Error codes */ fprintf(stderr, "Listen error on IPv6\n"); @@ -212,9 +258,30 @@ static int echo_start(int port) { } -HELPER_IMPL(echo_server) { +HELPER_IMPL(tcp4_echo_server) { uv_init(); - if (echo_start(TEST_PORT)) + if (tcp4_echo_start(TEST_PORT)) + return 1; + + uv_run(); + return 0; +} + + +HELPER_IMPL(tcp6_echo_server) { + uv_init(); + if (tcp6_echo_start(TEST_PORT)) + return 1; + + uv_run(); + return 0; +} + + +HELPER_IMPL(pipe_echo_server) { + uv_init(); + + if (pipe_echo_start(TEST_PIPENAME)) return 1; uv_run(); diff --git a/test/task.h b/test/task.h index 8d9a1e8e0..d47c20948 100644 --- a/test/task.h +++ b/test/task.h @@ -30,6 +30,17 @@ #define TEST_PORT 9123 #define TEST_PORT_2 9124 +#ifdef _WIN32 +# define TEST_PIPENAME "\\\\.\\pipe\\uv-test" +#else +# /* TODO: define unix pipe name */ +# define TEST_PIPENAME "" +#endif + +typedef enum { + TCP = 0, + PIPE +} stream_type; /* Log to stderr. */ #define LOG(...) fprintf(stderr, "%s", __VA_ARGS__) diff --git a/test/test-list.h b/test/test-list.h index 190574aca..0cee34f3b 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -19,8 +19,9 @@ * IN THE SOFTWARE. */ -TEST_DECLARE (ping_pong) -TEST_DECLARE (ping_pong_v6) +TEST_DECLARE (tcp_ping_pong) +TEST_DECLARE (tcp_ping_pong_v6) +TEST_DECLARE (pipe_ping_pong) TEST_DECLARE (delayed_accept) TEST_DECLARE (tcp_writealot) TEST_DECLARE (bind_error_addrinuse) @@ -54,20 +55,25 @@ TEST_DECLARE (getaddrinfo_concurrent) TEST_DECLARE (gethostbyname) TEST_DECLARE (fail_always) TEST_DECLARE (pass_always) -HELPER_DECLARE (echo_server) +HELPER_DECLARE (tcp4_echo_server) +HELPER_DECLARE (tcp6_echo_server) +HELPER_DECLARE (pipe_echo_server) TASK_LIST_START - TEST_ENTRY (ping_pong) - TEST_HELPER (ping_pong, echo_server) + TEST_ENTRY (tcp_ping_pong) + TEST_HELPER (tcp_ping_pong, tcp4_echo_server) - TEST_ENTRY (ping_pong_v6) - TEST_HELPER (ping_pong_v6, echo_server) + TEST_ENTRY (tcp_ping_pong_v6) + TEST_HELPER (tcp_ping_pong_v6, tcp6_echo_server) + + TEST_ENTRY (pipe_ping_pong) + TEST_HELPER (pipe_ping_pong, pipe_echo_server) TEST_ENTRY (delayed_accept) TEST_ENTRY (tcp_writealot) - TEST_HELPER (tcp_writealot, echo_server) + TEST_HELPER (tcp_writealot, tcp4_echo_server) TEST_ENTRY (bind_error_addrinuse) TEST_ENTRY (bind_error_addrnotavail_1) @@ -86,10 +92,10 @@ TASK_LIST_START TEST_ENTRY (connection_fail_doesnt_auto_close) TEST_ENTRY (shutdown_eof) - TEST_HELPER (shutdown_eof, echo_server) + TEST_HELPER (shutdown_eof, tcp4_echo_server) TEST_ENTRY (callback_stack) - TEST_HELPER (callback_stack, echo_server) + TEST_HELPER (callback_stack, tcp4_echo_server) TEST_ENTRY (timer) @@ -113,7 +119,7 @@ TASK_LIST_START TEST_ENTRY (getaddrinfo_concurrent) TEST_ENTRY (gethostbyname) - TEST_HELPER (gethostbyname, echo_server) + TEST_HELPER (gethostbyname, tcp4_echo_server) #if 0 /* These are for testing the test runner. */ diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index 34de78b5b..97fe79474 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -39,7 +39,10 @@ static char PING[] = "PING\n"; typedef struct { int pongs; int state; - uv_tcp_t tcp; + union { + uv_tcp_t tcp; + uv_pipe_t pipe; + }; uv_req_t connect_req; uv_req_t read_req; char read_buffer[BUFSIZE]; @@ -93,11 +96,11 @@ static void pinger_write_ping(pinger_t* pinger) { } -static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { +static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) { unsigned int i; pinger_t* pinger; - pinger = (pinger_t*)tcp->data; + pinger = (pinger_t*)stream->data; if (nread < 0) { ASSERT(uv_last_error().code == UV_EOF); @@ -142,44 +145,8 @@ static void pinger_on_connect(uv_req_t *req, int status) { } -static void pinger_new() { - int r; - struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); - pinger_t *pinger; - - pinger = (pinger_t*)malloc(sizeof(*pinger)); - pinger->state = 0; - pinger->pongs = 0; - - /* Try to connec to the server and do NUM_PINGS ping-pongs. */ - r = uv_tcp_init(&pinger->tcp); - pinger->tcp.data = pinger; - ASSERT(!r); - - /* We are never doing multiple reads/connects at a time anyway. */ - /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), - (void *(*)(void *))pinger_on_connect); - - r = uv_tcp_connect(&pinger->connect_req, server_addr); - ASSERT(!r); -} - - -TEST_IMPL(ping_pong) { - uv_init(); - - pinger_new(); - uv_run(); - - ASSERT(completed_pingers == 1); - - return 0; -} - - /* same ping-pong test, but using IPv6 connection */ -static void pinger_v6_new() { +static void tcp_pinger_v6_new() { int r; struct sockaddr_in6 server_addr = uv_ip6_addr("::1", TEST_PORT); pinger_t *pinger; @@ -203,10 +170,81 @@ static void pinger_v6_new() { } -TEST_IMPL(ping_pong_v6) { +static void tcp_pinger_new() { + int r; + struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + pinger_t *pinger; + + pinger = (pinger_t*)malloc(sizeof(*pinger)); + pinger->state = 0; + pinger->pongs = 0; + + /* Try to connec to the server and do NUM_PINGS ping-pongs. */ + r = uv_tcp_init(&pinger->tcp); + pinger->tcp.data = pinger; + ASSERT(!r); + + /* We are never doing multiple reads/connects at a time anyway. */ + /* so these handles can be pre-initialized. */ + uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), + pinger_on_connect); + + r = uv_tcp_connect(&pinger->connect_req, server_addr); + ASSERT(!r); +} + + +static void pipe_pinger_new() { + int r; + pinger_t *pinger; + + pinger = (pinger_t*)malloc(sizeof(*pinger)); + pinger->state = 0; + pinger->pongs = 0; + + /* Try to connec to the server and do NUM_PINGS ping-pongs. */ + r = uv_pipe_init(&pinger->pipe); + pinger->pipe.data = pinger; + ASSERT(!r); + + /* We are never doing multiple reads/connects at a time anyway. */ + /* so these handles can be pre-initialized. */ + uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe), + (void *(*)(void *))pinger_on_connect); + + r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME); + ASSERT(!r); +} + + +TEST_IMPL(tcp_ping_pong) { uv_init(); - pinger_v6_new(); + tcp_pinger_new(); + uv_run(); + + ASSERT(completed_pingers == 1); + + return 0; +} + + +TEST_IMPL(tcp_ping_pong_v6) { + uv_init(); + + tcp_pinger_v6_new(); + uv_run(); + + ASSERT(completed_pingers == 1); + + return 0; +} + + +TEST_IMPL(pipe_ping_pong) { + uv_init(); + + pipe_pinger_new(); uv_run(); ASSERT(completed_pingers == 1);