diff --git a/include/uv/win.h b/include/uv/win.h index e3017aaf2..485de783a 100644 --- a/include/uv/win.h +++ b/include/uv/win.h @@ -260,6 +260,7 @@ typedef struct { struct uv_req_s* next_req; #define UV_WRITE_PRIVATE_FIELDS \ + struct uv__queue queue; \ int coalesced; \ uv_buf_t write_buffer; \ HANDLE event_handle; \ @@ -297,7 +298,7 @@ typedef struct { } uv_read_t; #define uv_stream_connection_fields \ - unsigned int write_reqs_pending; \ + struct uv__queue write_queue; \ uv_shutdown_t* shutdown_req; #define uv_stream_server_fields \ diff --git a/src/uv-common.h b/src/uv-common.h index 88a4d7d47..95a603aea 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -95,7 +95,7 @@ enum { UV_HANDLE_WRITABLE = 0x00008000, UV_HANDLE_READ_PENDING = 0x00010000, UV_HANDLE_SYNC_BYPASS_IOCP = 0x00020000, - /*UV_HANDLE_FLAG_UNUSED = 0x00040000,*/ + UV_HANDLE_IN_WRITE_CB = 0x00040000, UV_HANDLE_EMULATE_IOCP = 0x00080000, UV_HANDLE_BLOCKING_WRITES = 0x00100000, UV_HANDLE_CANCELLATION_PENDING = 0x00200000, diff --git a/src/win/pipe.c b/src/win/pipe.c index 380a92d00..0697bad73 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -1512,13 +1512,13 @@ static int uv__pipe_write_data(uv_loop_t* loop, REGISTER_HANDLE_REQ(loop, handle); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); POST_COMPLETION_FOR_REQ(loop, req); return 0; } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { req->write_buffer = write_buf; uv__insert_non_overlapped_write_req(handle, req); - if (handle->stream.conn.write_reqs_pending == 0) { + if (uv__queue_empty(&handle->stream.conn.write_queue)) { uv__queue_non_overlapped_write(handle); } @@ -1560,7 +1560,7 @@ static int uv__pipe_write_data(uv_loop_t* loop, REGISTER_HANDLE_REQ(loop, handle); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); return 0; } else { result = WriteFile(handle->handle, @@ -1593,7 +1593,7 @@ static int uv__pipe_write_data(uv_loop_t* loop, REGISTER_HANDLE_REQ(loop, handle); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); return 0; } @@ -2061,6 +2061,7 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, /* If this was a coalesced write, extract pointer to the user_provided * uv_write_t structure so we can pass the expected pointer to the callback, * then free the heap-allocated write req. */ + uv__queue_remove(&req->queue); if (req->coalesced) { uv__coalesced_write_t* coalesced_write = container_of(req, uv__coalesced_write_t, req); @@ -2068,14 +2069,14 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, uv__free(coalesced_write); } if (req->cb) { + handle->flags |= UV_HANDLE_IN_WRITE_CB; req->cb(req, uv_translate_sys_error(err)); + handle->flags &= ~UV_HANDLE_IN_WRITE_CB; } - handle->stream.conn.write_reqs_pending--; - if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && handle->pipe.conn.non_overlapped_writes_tail) { - assert(handle->stream.conn.write_reqs_pending > 0); + assert(!uv__queue_empty(&handle->stream.conn.write_queue)); uv__queue_non_overlapped_write(handle); } diff --git a/src/win/stream-inl.h b/src/win/stream-inl.h index a0d5532e5..06ad4eca4 100644 --- a/src/win/stream-inl.h +++ b/src/win/stream-inl.h @@ -37,7 +37,7 @@ INLINE static void uv__stream_init(uv_loop_t* loop, handle->write_queue_size = 0; handle->activecnt = 0; handle->stream.conn.shutdown_req = NULL; - handle->stream.conn.write_reqs_pending = 0; + uv__queue_init(&handle->stream.conn.write_queue); UV_REQ_INIT(loop, &handle->read_req, UV_READ); handle->read_req.event_handle = NULL; diff --git a/src/win/stream.c b/src/win/stream.c index 534b902a3..8b864091a 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -221,7 +221,8 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { handle->reqs_pending++; REGISTER_HANDLE_REQ(loop, handle); - if (handle->stream.conn.write_reqs_pending == 0) + if (!(handle->flags & UV_HANDLE_IN_WRITE_CB) && + uv__queue_empty(&handle->stream.conn.write_queue)) uv__insert_pending_req(loop, (uv_req_t*) req); return 0; diff --git a/src/win/tcp.c b/src/win/tcp.c index db1354743..8d7d9b471 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -205,7 +205,7 @@ void uv__process_tcp_shutdown_req(uv_loop_t* loop, uv_tcp_t* stream, uv_shutdown int err; assert(req); - assert(stream->stream.conn.write_reqs_pending == 0); + assert(uv__queue_empty(&stream->stream.conn.write_queue)); assert(!(stream->flags & UV_HANDLE_SHUT)); assert(stream->flags & UV_HANDLE_CONNECTION); @@ -919,14 +919,14 @@ int uv__tcp_write(uv_loop_t* loop, /* Request completed immediately. */ req->u.io.queued_bytes = 0; handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); uv__insert_pending_req(loop, (uv_req_t*) req); } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { /* Request queued by the kernel. */ req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs); handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); handle->write_queue_size += req->u.io.queued_bytes; if (handle->flags & UV_HANDLE_EMULATE_IOCP && @@ -940,7 +940,7 @@ int uv__tcp_write(uv_loop_t* loop, /* Send failed due to an error, report it later */ req->u.io.queued_bytes = 0; handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); SET_REQ_ERROR(req, WSAGetLastError()); uv__insert_pending_req(loop, (uv_req_t*) req); @@ -956,7 +956,7 @@ int uv__tcp_try_write(uv_tcp_t* handle, int result; DWORD bytes; - if (handle->stream.conn.write_reqs_pending > 0) + if (!uv__queue_empty(&handle->stream.conn.write_queue)) return UV_EAGAIN; result = WSASend(handle->socket, @@ -1083,7 +1083,7 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle, assert(handle->write_queue_size >= req->u.io.queued_bytes); handle->write_queue_size -= req->u.io.queued_bytes; - + uv__queue_remove(&req->queue); UNREGISTER_HANDLE_REQ(loop, handle); if (handle->flags & UV_HANDLE_EMULATE_IOCP) { @@ -1103,11 +1103,12 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle, /* use UV_ECANCELED for consistency with Unix */ err = UV_ECANCELED; } + handle->flags |= UV_HANDLE_IN_WRITE_CB; req->cb(req, err); + handle->flags &= ~UV_HANDLE_IN_WRITE_CB; } - handle->stream.conn.write_reqs_pending--; - if (handle->stream.conn.write_reqs_pending == 0) { + if (uv__queue_empty(&handle->stream.conn.write_queue)) { if (handle->flags & UV_HANDLE_CLOSING) { closesocket(handle->socket); handle->socket = INVALID_SOCKET; @@ -1360,19 +1361,22 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) { int non_ifs_lsp; int reading; int writing; + struct uv__queue* q; socket = tcp->socket; reading = tcp->flags & UV_HANDLE_READ_PENDING; - writing = tcp->stream.conn.write_reqs_pending > 0; + writing = !uv__queue_empty(&tcp->stream.conn.write_queue); if (!reading && !writing) return; - /* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel - * them each explicitly with CancelIoEx (like unix). */ if (reading) CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped); - if (writing) - CancelIo((HANDLE) socket); + if (writing) { + uv__queue_foreach(q, &tcp->stream.conn.write_queue) { + uv_write_t* wr = uv__queue_data(q, uv_write_t, queue); + CancelIoEx((HANDLE) socket, &wr->u.io.overlapped); + } + } /* Check if we have any non-IFS LSPs stacked on top of TCP */ non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 : @@ -1391,7 +1395,7 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) { &bytes, NULL, NULL) != 0) { - /* Failed. We can't do CancelIo. */ + /* Failed. We can't do CancelIoEx. */ return; } } @@ -1401,8 +1405,12 @@ static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) { if (socket != tcp->socket) { if (reading) CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped); - if (writing) - CancelIo((HANDLE) socket); + if (writing) { + uv__queue_foreach(q, &tcp->stream.conn.write_queue) { + uv_write_t* wr = uv__queue_data(q, uv_write_t, queue); + CancelIoEx((HANDLE) socket, &wr->u.io.overlapped); + } + } } } @@ -1443,7 +1451,7 @@ void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) { * first (which typically should be cancellations). There's not much we can * do about canceled reads, which also will generate an RST packet. */ if (!(tcp->flags & UV_HANDLE_CONNECTION) || - tcp->stream.conn.write_reqs_pending == 0) { + uv__queue_empty(&tcp->stream.conn.write_queue)) { closesocket(tcp->socket); tcp->socket = INVALID_SOCKET; } diff --git a/src/win/tty.c b/src/win/tty.c index 0917c2e2a..6da2090a7 100644 --- a/src/win/tty.c +++ b/src/win/tty.c @@ -2166,7 +2166,7 @@ int uv__tty_write(uv_loop_t* loop, req->cb = cb; handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; + uv__queue_insert_tail(&handle->stream.conn.write_queue, &req->queue); REGISTER_HANDLE_REQ(loop, handle); req->u.io.queued_bytes = 0; @@ -2188,7 +2188,7 @@ int uv__tty_try_write(uv_tty_t* handle, unsigned int nbufs) { DWORD error; - if (handle->stream.conn.write_reqs_pending > 0) + if (!uv__queue_empty(&handle->stream.conn.write_queue)) return UV_EAGAIN; if (uv__tty_write_bufs(handle, bufs, nbufs, &error)) @@ -2204,15 +2204,17 @@ void uv__process_tty_write_req(uv_loop_t* loop, uv_tty_t* handle, handle->write_queue_size -= req->u.io.queued_bytes; UNREGISTER_HANDLE_REQ(loop, handle); + uv__queue_remove(&req->queue); if (req->cb) { err = GET_REQ_ERROR(req); + handle->flags |= UV_HANDLE_IN_WRITE_CB; req->cb(req, uv_translate_sys_error(err)); + handle->flags &= ~UV_HANDLE_IN_WRITE_CB; } - handle->stream.conn.write_reqs_pending--; - if (handle->stream.conn.write_reqs_pending == 0 && + if (uv__queue_empty(&handle->stream.conn.write_queue) && uv__is_stream_shutting(handle)) uv__process_tty_shutdown_req(loop, handle, @@ -2237,7 +2239,7 @@ void uv__tty_close(uv_tty_t* handle) { void uv__process_tty_shutdown_req(uv_loop_t* loop, uv_tty_t* stream, uv_shutdown_t* req) { - assert(stream->stream.conn.write_reqs_pending == 0); + assert(uv__queue_empty(&stream->stream.conn.write_queue)); assert(req); stream->stream.conn.shutdown_req = NULL; diff --git a/test/echo-server.c b/test/echo-server.c index 7d6985d1d..f810bff70 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -143,9 +143,7 @@ static void after_read(uv_stream_t* handle, ASSERT_NOT_NULL(wr); wr->buf = uv_buf_init(buf->base, nread); - if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) { - FATAL("uv_write failed"); - } + ASSERT_OK(uv_write(&wr->req, handle, &wr->buf, 1, after_write)); if (shutdown) { sreq = malloc(sizeof* sreq);