diff --git a/include/uv-win.h b/include/uv-win.h index 49d6f5117..c4871d95f 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -94,14 +94,20 @@ typedef struct uv_buf_t { struct { uv_stream_server_fields }; \ }; -#define UV_TCP_PRIVATE_FIELDS \ - union { \ - SOCKET socket; \ - HANDLE handle; \ - }; \ +#define uv_tcp_server_fields \ SOCKET accept_socket; \ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ - struct uv_req_s accept_req; \ + struct uv_req_s accept_req; + +#define uv_tcp_connection_fields \ + uv_buf_t read_buffer; + +#define UV_TCP_PRIVATE_FIELDS \ + SOCKET socket; \ + union { \ + struct { uv_tcp_server_fields }; \ + struct { uv_tcp_connection_fields }; \ + }; #define uv_pipe_server_fields \ uv_pipe_accept_t accept_reqs[4]; \ diff --git a/src/win/internal.h b/src/win/internal.h index de41bfd49..fe435f522 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -108,6 +108,7 @@ extern uv_loop_t uv_main_loop_; #define UV_HANDLE_GIVEN_OS_HANDLE 0x10000 #define UV_HANDLE_UV_ALLOCED 0x20000 #define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000 +#define UV_HANDLE_TCP_ZERO_READ 0x80000 void uv_want_endgame(uv_handle_t* handle); void uv_process_endgames(); diff --git a/src/win/tcp.c b/src/win/tcp.c index 933ce5bc7..49291072f 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -103,6 +103,12 @@ #endif +/* + * Threshold of active tcp streams for which to preallocate tcp read buffers. + */ +const unsigned int uv_active_tcp_streams_threshold = 50; + + /* Pointers to winsock extension functions to be retrieved dynamically */ static LPFN_CONNECTEX pConnectEx; static LPFN_ACCEPTEX pAcceptEx; @@ -127,6 +133,9 @@ static char uv_zero_[] = ""; /* mark if IPv6 sockets are supported */ static BOOL uv_allow_ipv6 = FALSE; +/* Counter to keep track of active tcp streams */ +static uint64_t active_tcp_streams; + /* * Retrieves the pointer to a winsock extension function. @@ -328,6 +337,8 @@ void uv_tcp_endgame(uv_tcp_t* handle) { handle->close_cb((uv_handle_t*)handle); } + active_tcp_streams--; + uv_unref(); } } @@ -472,8 +483,20 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { req = &handle->read_req; memset(&req->overlapped, 0, sizeof(req->overlapped)); - buf.base = (char*) &uv_zero_; - buf.len = 0; + /* + * Preallocate a read buffer if the number of active streams is below + * the threshold. + */ + if (active_tcp_streams < uv_active_tcp_streams_threshold) { + handle->flags &= ~UV_HANDLE_TCP_ZERO_READ; + handle->read_buffer = handle->alloc_cb((uv_stream_t*)handle, 65536); + assert(handle->read_buffer.len > 0); + buf = handle->read_buffer; + } else { + handle->flags |= UV_HANDLE_TCP_ZERO_READ; + buf.base = (char*) &uv_zero_; + buf.len = 0; + } flags = 0; result = WSARecv(handle->socket, @@ -487,6 +510,7 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { /* Process the req without IOCP. */ handle->flags |= UV_HANDLE_READ_PENDING; + req->overlapped.InternalHigh = bytes; handle->reqs_pending++; uv_insert_pending_req(req); } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { @@ -559,6 +583,8 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { uv_tcp_queue_accept(server); } + active_tcp_streams++; + return rv; } @@ -773,7 +799,7 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { handle->flags &= ~UV_HANDLE_READ_PENDING; if (req->error.code != UV_OK) { - /* An error occurred doing the 0-read. */ + /* An error occurred doing the read. */ if ((handle->flags & UV_HANDLE_READING)) { handle->flags &= ~UV_HANDLE_READING; LOOP->last_error = req->error; @@ -782,6 +808,28 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { handle->read_cb((uv_stream_t*)handle, -1, buf); } } else { + if (!(handle->flags & UV_HANDLE_TCP_ZERO_READ)) { + /* The read was done with a non-zero buffer length. */ + if (req->overlapped.InternalHigh > 0) { + /* Successful read */ + handle->read_cb((uv_stream_t*)handle, req->overlapped.InternalHigh, handle->read_buffer); + /* Read again only if bytes == buf.len */ + if (req->overlapped.InternalHigh < handle->read_buffer.len) { + goto done; + } + } else { + /* Connection closed */ + handle->flags &= ~UV_HANDLE_READING; + handle->flags |= UV_HANDLE_EOF; + LOOP->last_error.code = UV_EOF; + LOOP->last_error.sys_errno_ = ERROR_SUCCESS; + buf.base = 0; + buf.len = 0; + handle->read_cb((uv_stream_t*)handle, -1, handle->read_buffer); + goto done; + } + } + /* Do nonblocking reads until the buffer is empty */ while (handle->flags & UV_HANDLE_READING) { buf = handle->alloc_cb((uv_stream_t*)handle, 65536); @@ -825,7 +873,8 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { } } - /* Post another 0-read if still reading and not closing. */ +done: + /* Post another read if still reading and not closing. */ if ((handle->flags & UV_HANDLE_READING) && !(handle->flags & UV_HANDLE_READ_PENDING)) { uv_tcp_queue_read(handle); @@ -906,6 +955,7 @@ void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req) { NULL, 0) == 0) { uv_connection_init((uv_stream_t*)handle); + active_tcp_streams++; ((uv_connect_cb)req->cb)(req, 0); } else { uv_set_sys_error(WSAGetLastError());