diff --git a/include/uv/win.h b/include/uv/win.h index ce1e58c4f..c96d7ae31 100644 --- a/include/uv/win.h +++ b/include/uv/win.h @@ -260,6 +260,7 @@ typedef struct { struct uv_req_s* next_req; #define UV_WRITE_PRIVATE_FIELDS \ + struct uv__queue queue; \ int coalesced; \ uv_buf_t write_buffer; \ HANDLE event_handle; \ @@ -297,7 +298,7 @@ typedef struct { } uv_read_t; #define uv_stream_connection_fields \ - unsigned int write_reqs_pending; \ + struct uv__queue write_queue; \ uv_shutdown_t* shutdown_req; #define uv_stream_server_fields \ diff --git a/src/win/pipe.c b/src/win/pipe.c index 362d8d776..624f5346c 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -560,7 +560,7 @@ void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) { assert(handle->flags & UV_HANDLE_CONNECTION); assert(req != NULL); - assert(handle->stream.conn.write_reqs_pending == 0); + assert(uv__queue_empty(&handle->stream.conn.write_queue)); SET_REQ_SUCCESS(req); if (handle->flags & UV_HANDLE_CLOSING) { @@ -1600,13 +1600,13 @@ static int uv__pipe_write_data(uv_loop_t* loop, REGISTER_HANDLE_REQ(loop, handle); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); POST_COMPLETION_FOR_REQ(loop, req); return 0; } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { req->write_buffer = write_buf; uv__insert_non_overlapped_write_req(handle, req); - if (handle->stream.conn.write_reqs_pending == 0) { + if (uv__queue_empty(&handle->stream.conn.write_queue)) { uv__queue_non_overlapped_write(handle); } @@ -1648,7 +1648,7 @@ static int uv__pipe_write_data(uv_loop_t* loop, REGISTER_HANDLE_REQ(loop, handle); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); return 0; } else { result = WriteFile(handle->handle, @@ -1681,7 +1681,7 @@ static int uv__pipe_write_data(uv_loop_t* loop, REGISTER_HANDLE_REQ(loop, handle); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); return 0; } @@ -2158,6 +2158,7 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, /* If this was a coalesced write, extract pointer to the user_provided * uv_write_t structure so we can pass the expected pointer to the callback, * then free the heap-allocated write req. */ + uv__queue_remove(&req->queue); if (req->coalesced) { uv__coalesced_write_t* coalesced_write = container_of(req, uv__coalesced_write_t, req); @@ -2168,15 +2169,13 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, req->cb(req, uv_translate_sys_error(err)); } - handle->stream.conn.write_reqs_pending--; - if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && handle->pipe.conn.non_overlapped_writes_tail) { - assert(handle->stream.conn.write_reqs_pending > 0); + assert(!uv__queue_empty(&handle->stream.conn.write_queue)); uv__queue_non_overlapped_write(handle); } - if (handle->stream.conn.write_reqs_pending == 0 && + if (uv__queue_empty(&handle->stream.conn.write_queue) && uv__is_stream_shutting(handle)) uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req); diff --git a/src/win/stream-inl.h b/src/win/stream-inl.h index a0d5532e5..06ad4eca4 100644 --- a/src/win/stream-inl.h +++ b/src/win/stream-inl.h @@ -37,7 +37,7 @@ INLINE static void uv__stream_init(uv_loop_t* loop, handle->write_queue_size = 0; handle->activecnt = 0; handle->stream.conn.shutdown_req = NULL; - handle->stream.conn.write_reqs_pending = 0; + uv__queue_init(&handle->stream.conn.write_queue); UV_REQ_INIT(loop, &handle->read_req, UV_READ); handle->read_req.event_handle = NULL; diff --git a/src/win/stream.c b/src/win/stream.c index 5d55f721d..180124d5f 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -218,7 +218,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { handle->reqs_pending++; REGISTER_HANDLE_REQ(loop, handle); - if (handle->stream.conn.write_reqs_pending == 0) { + if (uv__queue_empty(&handle->stream.conn.write_queue)) { if (handle->type == UV_NAMED_PIPE) uv__pipe_shutdown(loop, (uv_pipe_t*) handle, req); else diff --git a/src/win/tcp.c b/src/win/tcp.c index db1354743..ed92881df 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -205,7 +205,7 @@ void uv__process_tcp_shutdown_req(uv_loop_t* loop, uv_tcp_t* stream, uv_shutdown int err; assert(req); - assert(stream->stream.conn.write_reqs_pending == 0); + assert(uv__queue_empty(&stream->stream.conn.write_queue)); assert(!(stream->flags & UV_HANDLE_SHUT)); assert(stream->flags & UV_HANDLE_CONNECTION); @@ -919,14 +919,14 @@ int uv__tcp_write(uv_loop_t* loop, /* Request completed immediately. */ req->u.io.queued_bytes = 0; handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); uv__insert_pending_req(loop, (uv_req_t*) req); } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { /* Request queued by the kernel. */ req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); handle->write_queue_size += req->u.io.queued_bytes; if (handle->flags & UV_HANDLE_EMULATE_IOCP && @@ -940,7 +940,7 @@ int uv__tcp_write(uv_loop_t* loop, /* Send failed due to an error, report it later */ req->u.io.queued_bytes = 0; handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); SET_REQ_ERROR(req, WSAGetLastError()); uv__insert_pending_req(loop, (uv_req_t*) req); @@ -956,7 +956,7 @@ int uv__tcp_try_write(uv_tcp_t* handle, int result; DWORD bytes; - if (handle->stream.conn.write_reqs_pending > 0) + if (!uv__queue_empty(&handle->stream.conn.write_queue)) return UV_EAGAIN; result = WSASend(handle->socket, @@ -1106,8 +1106,8 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle, req->cb(req, err); } - handle->stream.conn.write_reqs_pending--; - if (handle->stream.conn.write_reqs_pending == 0) { + uv__queue_remove(&req->queue); + if (uv__queue_empty(&handle->stream.conn.write_queue)) { if (handle->flags & UV_HANDLE_CLOSING) { closesocket(handle->socket); handle->socket = INVALID_SOCKET; @@ -1360,19 +1360,22 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) { int non_ifs_lsp; int reading; int writing; + struct uv__queue* q; socket = tcp->socket; reading = tcp->flags & UV_HANDLE_READ_PENDING; - writing = tcp->stream.conn.write_reqs_pending > 0; + writing = !uv__queue_empty(&tcp->stream.conn.write_queue); if (!reading && !writing) return; - /* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel - * them each explicitly with CancelIoEx (like unix). */ if (reading) CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped); - if (writing) - CancelIo((HANDLE) socket); + if (writing) { + uv__queue_foreach(q, &tcp->stream.conn.write_queue) { + uv_write_t* wr = uv__queue_data(q, uv_write_t, queue); + CancelIoEx((HANDLE) socket, &wr->u.io.overlapped); + } + } /* Check if we have any non-IFS LSPs stacked on top of TCP */ non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 : @@ -1391,7 +1394,7 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) { &bytes, NULL, NULL) != 0) { - /* Failed. We can't do CancelIo. */ + /* Failed. We can't do CancelIoEx. */ return; } } @@ -1401,8 +1404,12 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) { if (socket != tcp->socket) { if (reading) CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped); - if (writing) - CancelIo((HANDLE) socket); + if (writing) { + uv__queue_foreach(q, &tcp->stream.conn.write_queue) { + uv_write_t* wr = uv__queue_data(q, uv_write_t, queue); + CancelIoEx((HANDLE) socket, &wr->u.io.overlapped); + } + } } } @@ -1443,7 +1450,7 @@ void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) { * first (which typically should be cancellations). There's not much we can * do about canceled reads, which also will generate an RST packet. */ if (!(tcp->flags & UV_HANDLE_CONNECTION) || - tcp->stream.conn.write_reqs_pending == 0) { + uv__queue_empty(&tcp->stream.conn.write_queue)) { closesocket(tcp->socket); tcp->socket = INVALID_SOCKET; } diff --git a/src/win/tty.c b/src/win/tty.c index 0917c2e2a..6ad80e6bc 100644 --- a/src/win/tty.c +++ b/src/win/tty.c @@ -2166,7 +2166,7 @@ int uv__tty_write(uv_loop_t* loop, req->cb = cb; handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); req->u.io.queued_bytes = 0; @@ -2188,7 +2188,7 @@ int uv__tty_try_write(uv_tty_t* handle, unsigned int nbufs) { DWORD error; - if (handle->stream.conn.write_reqs_pending > 0) + if (!uv__queue_empty(&handle->stream.conn.write_queue)) return UV_EAGAIN; if (uv__tty_write_bufs(handle, bufs, nbufs, &error)) @@ -2211,8 +2211,8 @@ void uv__process_tty_write_req(uv_loop_t* loop, uv_tty_t* handle, } - handle->stream.conn.write_reqs_pending--; - if (handle->stream.conn.write_reqs_pending == 0 && + uv__queue_remove(&req->queue); + if (uv__queue_empty(&handle->stream.conn.write_queue) && uv__is_stream_shutting(handle)) uv__process_tty_shutdown_req(loop, handle, @@ -2237,7 +2237,7 @@ void uv__tty_close(uv_tty_t* handle) { void uv__process_tty_shutdown_req(uv_loop_t* loop, uv_tty_t* stream, uv_shutdown_t* req) { - assert(stream->stream.conn.write_reqs_pending == 0); + assert(uv__queue_empty(&stream->stream.conn.write_queue)); assert(req); stream->stream.conn.shutdown_req = NULL;