Windows: Pre-allocate buffers for overlapped WSARecv if the number of active tcp streams is below 50.

This commit is contained in:
Igor Zinkovsky 2011-08-15 16:49:57 -07:00
parent e5a938f1f4
commit 422c139306
3 changed files with 67 additions and 10 deletions

View File

@ -94,14 +94,20 @@ typedef struct uv_buf_t {
struct { uv_stream_server_fields }; \
};
#define UV_TCP_PRIVATE_FIELDS \
union { \
SOCKET socket; \
HANDLE handle; \
}; \
#define uv_tcp_server_fields \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
struct uv_req_s accept_req; \
struct uv_req_s accept_req;
#define uv_tcp_connection_fields \
uv_buf_t read_buffer;
#define UV_TCP_PRIVATE_FIELDS \
SOCKET socket; \
union { \
struct { uv_tcp_server_fields }; \
struct { uv_tcp_connection_fields }; \
};
#define uv_pipe_server_fields \
uv_pipe_accept_t accept_reqs[4]; \

View File

@ -108,6 +108,7 @@ extern uv_loop_t uv_main_loop_;
#define UV_HANDLE_GIVEN_OS_HANDLE 0x10000
#define UV_HANDLE_UV_ALLOCED 0x20000
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000
#define UV_HANDLE_TCP_ZERO_READ 0x80000
void uv_want_endgame(uv_handle_t* handle);
void uv_process_endgames();

View File

@ -103,6 +103,12 @@
#endif
/*
* Threshold of active tcp streams for which to preallocate tcp read buffers.
*/
const unsigned int uv_active_tcp_streams_threshold = 50;
/* Pointers to winsock extension functions to be retrieved dynamically */
static LPFN_CONNECTEX pConnectEx;
static LPFN_ACCEPTEX pAcceptEx;
@ -127,6 +133,9 @@ static char uv_zero_[] = "";
/* mark if IPv6 sockets are supported */
static BOOL uv_allow_ipv6 = FALSE;
/* Counter to keep track of active tcp streams */
static uint64_t active_tcp_streams;
/*
* Retrieves the pointer to a winsock extension function.
@ -328,6 +337,8 @@ void uv_tcp_endgame(uv_tcp_t* handle) {
handle->close_cb((uv_handle_t*)handle);
}
active_tcp_streams--;
uv_unref();
}
}
@ -472,8 +483,20 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {
req = &handle->read_req;
memset(&req->overlapped, 0, sizeof(req->overlapped));
buf.base = (char*) &uv_zero_;
buf.len = 0;
/*
* Preallocate a read buffer if the number of active streams is below
* the threshold.
*/
if (active_tcp_streams < uv_active_tcp_streams_threshold) {
handle->flags &= ~UV_HANDLE_TCP_ZERO_READ;
handle->read_buffer = handle->alloc_cb((uv_stream_t*)handle, 65536);
assert(handle->read_buffer.len > 0);
buf = handle->read_buffer;
} else {
handle->flags |= UV_HANDLE_TCP_ZERO_READ;
buf.base = (char*) &uv_zero_;
buf.len = 0;
}
flags = 0;
result = WSARecv(handle->socket,
@ -487,6 +510,7 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
/* Process the req without IOCP. */
handle->flags |= UV_HANDLE_READ_PENDING;
req->overlapped.InternalHigh = bytes;
handle->reqs_pending++;
uv_insert_pending_req(req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
@ -559,6 +583,8 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
uv_tcp_queue_accept(server);
}
active_tcp_streams++;
return rv;
}
@ -773,7 +799,7 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
handle->flags &= ~UV_HANDLE_READ_PENDING;
if (req->error.code != UV_OK) {
/* An error occurred doing the 0-read. */
/* An error occurred doing the read. */
if ((handle->flags & UV_HANDLE_READING)) {
handle->flags &= ~UV_HANDLE_READING;
LOOP->last_error = req->error;
@ -782,6 +808,28 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
} else {
if (!(handle->flags & UV_HANDLE_TCP_ZERO_READ)) {
/* The read was done with a non-zero buffer length. */
if (req->overlapped.InternalHigh > 0) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle, req->overlapped.InternalHigh, handle->read_buffer);
/* Read again only if bytes == buf.len */
if (req->overlapped.InternalHigh < handle->read_buffer.len) {
goto done;
}
} else {
/* Connection closed */
handle->flags &= ~UV_HANDLE_READING;
handle->flags |= UV_HANDLE_EOF;
LOOP->last_error.code = UV_EOF;
LOOP->last_error.sys_errno_ = ERROR_SUCCESS;
buf.base = 0;
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, -1, handle->read_buffer);
goto done;
}
}
/* Do nonblocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
@ -825,7 +873,8 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
}
}
/* Post another 0-read if still reading and not closing. */
done:
/* Post another read if still reading and not closing. */
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
uv_tcp_queue_read(handle);
@ -906,6 +955,7 @@ void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req) {
NULL,
0) == 0) {
uv_connection_init((uv_stream_t*)handle);
active_tcp_streams++;
((uv_connect_cb)req->cb)(req, 0);
} else {
uv_set_sys_error(WSAGetLastError());