From d718a22ace96117dc520f8b1bccce7f8a488c6e8 Mon Sep 17 00:00:00 2001 From: Keno Fischer Date: Thu, 11 Dec 2025 06:22:40 +0000 Subject: [PATCH] stream: Implement cancellation support for uv_write_t This adds a new function `uv_write_cancel` which requests the cancellation of an in-progress write as soon as possible, possibly causing a short write. The exact timing of the cancellation is stream type and kernel dependent. A cancelled request will receive the ordinary write callback with `UV_ECANCELED` and the new function `uv_write_nwritten` can be used to determine if the write request was partially fulfilled. --- CMakeLists.txt | 1 + Makefile.am | 1 + docs/src/stream.rst | 32 ++++- include/uv.h | 30 ++++- include/uv/unix.h | 8 +- include/uv/win.h | 8 +- src/queue.h | 9 ++ src/unix/stream.c | 30 +++++ src/win/pipe.c | 10 ++ src/win/req-inl.h | 3 + src/win/stream.c | 51 ++++++++ src/win/tcp.c | 2 + src/win/tty.c | 2 + test/test-list.h | 6 + test/test-write-cancel.c | 272 +++++++++++++++++++++++++++++++++++++++ 15 files changed, 461 insertions(+), 4 deletions(-) create mode 100644 test/test-write-cancel.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 449dc8322..2b1273d4d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -662,6 +662,7 @@ if(LIBUV_BUILD_TESTS) test/test-tcp-unexpected-read.c test/test-tcp-write-after-connect.c test/test-tcp-write-fail.c + test/test-write-cancel.c test/test-tcp-write-queue-order.c test/test-tcp-write-to-half-open-connection.c test/test-tcp-writealot.c diff --git a/Makefile.am b/Makefile.am index 797efc83e..99b7d1e19 100644 --- a/Makefile.am +++ b/Makefile.am @@ -289,6 +289,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-tcp-try-write.c \ test/test-tcp-write-in-a-row.c \ test/test-tcp-try-write-error.c \ + test/test-write-cancel.c \ test/test-tcp-write-queue-order.c \ test/test-test-macros.c \ test/test-thread-equal.c \ diff --git a/docs/src/stream.rst b/docs/src/stream.rst index 0b42c4b3f..938ce893f 100644 --- a/docs/src/stream.rst +++ b/docs/src/stream.rst @@ -217,7 +217,37 @@ API where it returns ``UV_EAGAIN``. .. versionadded:: 1.42.0 - + +.. c:function:: int uv_write_cancel(uv_write_t* req) + + Cancel a pending write request. The write callback will still be called, + with ``UV_ECANCELED`` status, or with the normal result if the write + completed before cancellation took effect. + + Fully cancelled writes (where no bytes were written) may have their + callbacks called out of order with respect to other writes on the same + stream. Partial writes are completed in order. + + Returns 0 on success. Currently expected to succeed for all valid write + requests, but may return an error code in future libuv versions. + + Use :c:func:`uv_write_nwritten` from the write callback to determine how + many bytes were written before cancellation. + + .. versionadded:: 1.52.0 + +.. c:function:: size_t uv_write_nwritten(const uv_write_t* req) + + Returns the number of bytes written by a write request. Only valid when + called from within the write callback (:c:type:`uv_write_cb`). + + This is primarily useful when a write has been cancelled via + :c:func:`uv_write_cancel` and the callback receives ``UV_ECANCELED`` + status, to determine how many bytes were actually written before + cancellation. + + .. versionadded:: 1.52.0 + .. c:function:: int uv_is_readable(const uv_stream_t* handle) Returns 1 if the stream is readable, 0 otherwise. diff --git a/include/uv.h b/include/uv.h index d1a77c749..82652a891 100644 --- a/include/uv.h +++ b/include/uv.h @@ -437,7 +437,7 @@ UV_EXTERN char* uv_err_name_r(int err, char* buf, size_t buflen); /* read-only */ \ uv_req_type type; \ /* private */ \ - void* reserved[6]; \ + void* reserved[5]; \ UV_REQ_PRIVATE_FIELDS \ /* Abstract base class of all requests. */ @@ -568,6 +568,34 @@ UV_EXTERN int uv_try_write2(uv_stream_t* handle, unsigned int nbufs, uv_stream_t* send_handle); +/* + * Returns the number of bytes written by a write request. + * + * NOTE: The result is only well-defined when called from within the + * write callback (uv_write_cb). The value is undefined if called at + * other times. + * + * This is primarily useful when a write has been cancelled via uv_write_cancel() + * and the callback receives UV_ECANCELED status, to determine how many + * bytes were actually written before cancellation. + */ +UV_EXTERN size_t uv_write_nwritten(const uv_write_t* req); + +/* + * Cancel a pending write request. + * + * The write callback will still be called, with UV_ECANCELED status, or with + * the normal result if the write completed before cancellation took effect. + * + * Fully cancelled writes (where no bytes were written) may have their + * callbacks called out of order with respect to other writes on the same + * stream. Partial writes are completed in order. + * + * Returns 0 on success. Currently expected to succeed for all valid write + * requests, but may return an error code in future libuv versions. + */ +UV_EXTERN int uv_write_cancel(uv_write_t* req); + /* uv_write_t is a subclass of uv_req_t. */ struct uv_write_s { UV_REQ_FIELDS diff --git a/include/uv/unix.h b/include/uv/unix.h index c6ba419da..5150632be 100644 --- a/include/uv/unix.h +++ b/include/uv/unix.h @@ -249,7 +249,13 @@ typedef struct { #define UV_REQ_TYPE_PRIVATE /* empty */ -#define UV_REQ_PRIVATE_FIELDS /* empty */ +#define UV_REQ_PRIVATE_FIELDS \ + union { \ + void* reserved2[1]; \ + struct { \ + size_t nwritten; \ + } write_extra; \ + }; #define UV_PRIVATE_REQ_TYPES /* empty */ diff --git a/include/uv/win.h b/include/uv/win.h index 7b4ebd4b7..4e1575211 100644 --- a/include/uv/win.h +++ b/include/uv/win.h @@ -379,7 +379,13 @@ typedef struct { WCHAR* name; \ } connect; \ } u; \ - struct uv_req_s* next_req; + struct uv_req_s* next_req; \ + union { \ + void* reserved2[1]; \ + struct { \ + size_t nwritten; \ + } write_extra; \ + }; #define UV_WRITE_PRIVATE_FIELDS \ int coalesced; \ diff --git a/src/queue.h b/src/queue.h index 5f8489e9b..08cbb9f15 100644 --- a/src/queue.h +++ b/src/queue.h @@ -87,4 +87,13 @@ static inline void uv__queue_remove(struct uv__queue* q) { q->next->prev = q->prev; } +static inline int uv__queue_contains(const struct uv__queue* h, + const struct uv__queue* q) { + const struct uv__queue* p; + uv__queue_foreach(p, h) + if (p == q) + return 1; + return 0; +} + #endif /* QUEUE_H_ */ diff --git a/src/unix/stream.c b/src/unix/stream.c index 725558662..980a52ef9 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -692,6 +692,7 @@ static int uv__write_req_update(uv_stream_t* stream, assert(n <= stream->write_queue_size); stream->write_queue_size -= n; + req->write_extra.nwritten += n; buf = req->bufs + req->write_index; @@ -915,6 +916,9 @@ static void uv__write_callbacks(uv_stream_t* stream) { uv__req_unregister(stream->loop); if (req->bufs != NULL) { + /* bufs are non-NULL on errors (including cancel and stream close). + * Success path sets bufs to NULL after adjusting size in uv__write. */ + assert(req->error != 0); stream->write_queue_size -= uv__write_req_size(req); if (req->bufs != req->bufsml) uv__free(req->bufs); @@ -1356,6 +1360,7 @@ int uv_write2(uv_write_t* req, req->handle = stream; req->error = 0; req->send_handle = send_handle; + req->write_extra.nwritten = 0; uv__queue_init(&req->queue); req->bufs = req->bufsml; @@ -1398,6 +1403,31 @@ int uv_write2(uv_write_t* req, } +size_t uv_write_nwritten(const uv_write_t* req) { + return req->write_extra.nwritten; +} + + +int uv_write_cancel(uv_write_t* req) { + uv_stream_t* stream; + + stream = req->handle; + + /* Already completed, return success, callback will return success as well */ + if (!uv__queue_contains(&stream->write_queue, &req->queue)) + return 0; + + uv__queue_remove(&req->queue); + req->error = UV_ECANCELED; + + /* uv__write_callbacks will handle write_queue_size and freeing bufs. */ + uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); + uv__io_feed(stream->loop, &stream->io_watcher); + + return 0; +} + + /* The buffers to be written must remain valid until the callback is called. * This is not required for the uv_buf_t array. */ diff --git a/src/win/pipe.c b/src/win/pipe.c index 8f86a1fee..c7e45e866 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -1332,6 +1332,7 @@ static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) { if (!result) { SET_REQ_ERROR(req, GetLastError()); } + SET_REQ_NWRITTEN(req, bytes); POST_COMPLETION_FOR_REQ(loop, req); return 0; @@ -1592,6 +1593,7 @@ static int uv__pipe_write_data(uv_loop_t* loop, req->handle = (uv_stream_t*) handle; req->send_handle = NULL; req->cb = cb; + req->write_extra.nwritten = 0; /* Private fields. */ req->coalesced = 0; req->event_handle = NULL; @@ -1637,6 +1639,7 @@ static int uv__pipe_write_data(uv_loop_t* loop, req->u.io.queued_bytes = 0; } + SET_REQ_NWRITTEN(req, bytes); REGISTER_HANDLE_REQ(loop, handle); handle->reqs_pending++; handle->stream.conn.write_reqs_pending++; @@ -2176,11 +2179,15 @@ void uv__process_pipe_read_req(uv_loop_t* loop, void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, uv_write_t* req) { int err; + size_t bytes_written; assert(handle->type == UV_NAMED_PIPE); assert(handle->write_queue_size >= req->u.io.queued_bytes); handle->write_queue_size -= req->u.io.queued_bytes; + /* Ask the kernel how many bytes were actually written. + * N.B.: If the write was partially cancelled, this could differ from queued_bytes. */ + bytes_written = req->u.io.overlapped.InternalHigh; UNREGISTER_HANDLE_REQ(loop, handle); @@ -2204,6 +2211,9 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, req = coalesced_write->user_req; uv__free(coalesced_write); } + + req->write_extra.nwritten += bytes_written; + if (req->cb) { req->cb(req, uv_translate_sys_error(err)); } diff --git a/src/win/req-inl.h b/src/win/req-inl.h index af6fb7522..0c2a04522 100644 --- a/src/win/req-inl.h +++ b/src/win/req-inl.h @@ -34,6 +34,9 @@ #define SET_REQ_ERROR(req, error) \ SET_REQ_STATUS((req), NTSTATUS_FROM_WIN32((error))) +#define SET_REQ_NWRITTEN(req, nwritten) \ + (req)->u.io.overlapped.InternalHigh = (ULONG_PTR) (nwritten); + /* Note: used open-coded in UV_REQ_INIT() because of a circular dependency * between src/uv-common.h and src/win/internal.h. */ diff --git a/src/win/stream.c b/src/win/stream.c index a53a10b03..95659baef 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -250,3 +250,54 @@ int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { return 0; } + + +size_t uv_write_nwritten(const uv_write_t* req) { + return req->write_extra.nwritten; +} + + +int uv_write_cancel(uv_write_t* req) { + uv_stream_t* stream; + HANDLE handle; + BOOL result; + + stream = req->handle; + + switch (stream->type) { + case UV_TCP: + handle = (HANDLE)((uv_tcp_t*)stream)->socket; + break; + case UV_NAMED_PIPE: + handle = ((uv_pipe_t*)stream)->handle; + + if ((stream->flags & (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) == + UV_HANDLE_NON_OVERLAPPED_PIPE) { + /* Non-overlapped, non-blocking writes run on the threadpool and we + do not have a good way to cancel them. */ + return UV_ENOTSUP; + } + + break; + case UV_TTY: + /* TTY writes complete synchronously on Windows, so cancellation + * is not applicable - the callback has already been queued. */ + return 0; + default: + assert(0); + return UV_EINVAL; + } + + result = CancelIoEx(handle, &req->u.io.overlapped); + + if (!result) { + DWORD err = GetLastError(); + if (err == ERROR_NOT_FOUND) { + /* The operation has already completed. */ + return 0; + } + return uv_translate_sys_error(err); + } + + return 0; +} diff --git a/src/win/tcp.c b/src/win/tcp.c index 66e005ff0..edf5240ad 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -956,6 +956,7 @@ int uv__tcp_write(uv_loop_t* loop, UV_REQ_INIT(req, UV_WRITE); req->handle = (uv_stream_t*) handle; req->cb = cb; + req->write_extra.nwritten = 0; /* Prepare the overlapped structure. */ memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped)); @@ -1172,6 +1173,7 @@ void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle, assert(handle->write_queue_size >= req->u.io.queued_bytes); handle->write_queue_size -= req->u.io.queued_bytes; + req->write_extra.nwritten += req->u.io.overlapped.InternalHigh; UNREGISTER_HANDLE_REQ(loop, handle); diff --git a/src/win/tty.c b/src/win/tty.c index 66ca99cda..7b77af2ec 100644 --- a/src/win/tty.c +++ b/src/win/tty.c @@ -2208,6 +2208,7 @@ int uv__tty_write(uv_loop_t* loop, UV_REQ_INIT(req, UV_WRITE); req->handle = (uv_stream_t*) handle; req->cb = cb; + req->write_extra.nwritten = 0; handle->reqs_pending++; handle->stream.conn.write_reqs_pending++; @@ -2217,6 +2218,7 @@ int uv__tty_write(uv_loop_t* loop, if (!uv__tty_write_bufs(handle, bufs, nbufs, &error)) { SET_REQ_SUCCESS(req); + req->write_extra.nwritten = uv__count_bufs(bufs, nbufs); } else { SET_REQ_ERROR(req, error); } diff --git a/test/test-list.h b/test/test-list.h index 5d44fefb4..e197b2b9f 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -111,6 +111,9 @@ TEST_DECLARE (tcp_try_write) TEST_DECLARE (tcp_write_in_a_row) TEST_DECLARE (tcp_try_write_error) TEST_DECLARE (tcp_write_queue_order) +TEST_DECLARE (tcp_write_cancel) +TEST_DECLARE (tcp_write_nwritten) +TEST_DECLARE (pipe_write_nwritten) TEST_DECLARE (tcp_open) TEST_DECLARE (tcp_open_twice) TEST_DECLARE (tcp_open_bound) @@ -723,6 +726,9 @@ TASK_LIST_START TEST_ENTRY (tcp_try_write_error) TEST_ENTRY (tcp_write_queue_order) + TEST_ENTRY (tcp_write_cancel) + TEST_ENTRY (tcp_write_nwritten) + TEST_ENTRY (pipe_write_nwritten) TEST_ENTRY (tcp_open) TEST_HELPER (tcp_open, tcp4_echo_server) diff --git a/test/test-write-cancel.c b/test/test-write-cancel.c new file mode 100644 index 000000000..818d2e03a --- /dev/null +++ b/test/test-write-cancel.c @@ -0,0 +1,272 @@ +/* Copyright libuv project contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include +#include +#include + +#include "uv.h" +#include "task.h" + +#define REQ_COUNT 100 + +static uv_tcp_t server; +static uv_tcp_t client; +static uv_tcp_t incoming; +static int close_cb_called; +static int write_cb_called; +static int cancelled_count; +static int connected; +static int closing; + +static uv_write_t write_reqs[REQ_COUNT]; +static char buf_data[16 * 1024]; + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + +static void connection_cb(uv_stream_t* tcp, int status) { + ASSERT_OK(status); + ASSERT_OK(uv_tcp_init(tcp->loop, &incoming)); + ASSERT_OK(uv_accept(tcp, (uv_stream_t*) &incoming)); + connected = 1; +} + +static void write_cb(uv_write_t* req, int status) { + write_cb_called++; + if (status == UV_ECANCELED && !closing) + cancelled_count++; + + if (cancelled_count >= 5 && !closing) { + closing = 1; + uv_close((uv_handle_t*) &client, close_cb); + uv_close((uv_handle_t*) &server, close_cb); + if (connected) + uv_close((uv_handle_t*) &incoming, close_cb); + } +} + +static void connect_cb(uv_connect_t* req, int status) { + uv_buf_t buf; + int r; + int i; + int cancel_count; + + ASSERT_OK(status); + + buf = uv_buf_init(buf_data, sizeof(buf_data)); + + /* Queue many writes to fill the socket buffer */ + for (i = 0; i < REQ_COUNT; i++) { + r = uv_write(&write_reqs[i], + req->handle, + &buf, + 1, + write_cb); + ASSERT_OK(r); + } + + /* Cancel the trailing writes which should be queued */ + cancel_count = 0; + for (i = REQ_COUNT - 5; i < REQ_COUNT; i++) { + r = uv_write_cancel(&write_reqs[i]); + ASSERT_OK(r); + cancel_count++; + } + + ASSERT_EQ(5, cancel_count); +} + +TEST_IMPL(tcp_write_cancel) { + uv_connect_t connect_req; + struct sockaddr_in addr; + uv_loop_t* loop; + int buffer_size; + + loop = uv_default_loop(); + + close_cb_called = 0; + write_cb_called = 0; + cancelled_count = 0; + connected = 0; + closing = 0; + buffer_size = sizeof(buf_data); + memset(buf_data, 'A', sizeof(buf_data)); + + ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); + ASSERT_OK(uv_tcp_init(loop, &server)); + ASSERT_OK(uv_tcp_bind(&server, (struct sockaddr*) &addr, 0)); + ASSERT_OK(uv_listen((uv_stream_t*) &server, 128, connection_cb)); + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + ASSERT_OK(uv_tcp_init(loop, &client)); + ASSERT_OK(uv_tcp_connect(&connect_req, + &client, + (struct sockaddr*) &addr, + connect_cb)); + ASSERT_OK(uv_send_buffer_size((uv_handle_t*) &client, &buffer_size)); + + ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); + + /* The writes we cancelled should have gotten UV_ECANCELED callbacks */ + ASSERT_EQ(5, cancelled_count); + ASSERT_EQ(2 + connected, close_cb_called); + + MAKE_VALGRIND_HAPPY(loop); + return 0; +} + + +/* + * Test that uv_write_nwritten returns correct byte count on success. + */ +static uv_tcp_t nwritten_server; +static uv_tcp_t nwritten_client; +static uv_tcp_t nwritten_incoming; +static uv_write_t nwritten_req; +static int nwritten_cb_called; +static size_t nwritten_value; +static char nwritten_buf_data[1024]; + +static void nwritten_close_cb(uv_handle_t* handle) { + close_cb_called++; +} + +static void nwritten_write_cb(uv_write_t* req, int status) { + ASSERT_OK(status); + nwritten_cb_called++; + nwritten_value = uv_write_nwritten(req); + + uv_close((uv_handle_t*) &nwritten_client, nwritten_close_cb); + uv_close((uv_handle_t*) &nwritten_server, nwritten_close_cb); + uv_close((uv_handle_t*) &nwritten_incoming, nwritten_close_cb); +} + +static void nwritten_connection_cb(uv_stream_t* tcp, int status) { + ASSERT_OK(status); + ASSERT_OK(uv_tcp_init(tcp->loop, &nwritten_incoming)); + ASSERT_OK(uv_accept(tcp, (uv_stream_t*) &nwritten_incoming)); +} + +static void nwritten_connect_cb(uv_connect_t* req, int status) { + uv_buf_t buf; + + ASSERT_OK(status); + + buf = uv_buf_init(nwritten_buf_data, sizeof(nwritten_buf_data)); + ASSERT_OK(uv_write(&nwritten_req, req->handle, &buf, 1, nwritten_write_cb)); +} + +TEST_IMPL(tcp_write_nwritten) { + uv_connect_t connect_req; + struct sockaddr_in addr; + uv_loop_t* loop; + + loop = uv_default_loop(); + + close_cb_called = 0; + nwritten_cb_called = 0; + nwritten_value = 0; + memset(nwritten_buf_data, 'B', sizeof(nwritten_buf_data)); + + ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); + ASSERT_OK(uv_tcp_init(loop, &nwritten_server)); + ASSERT_OK(uv_tcp_bind(&nwritten_server, (struct sockaddr*) &addr, 0)); + ASSERT_OK(uv_listen((uv_stream_t*) &nwritten_server, 128, nwritten_connection_cb)); + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + ASSERT_OK(uv_tcp_init(loop, &nwritten_client)); + ASSERT_OK(uv_tcp_connect(&connect_req, + &nwritten_client, + (struct sockaddr*) &addr, + nwritten_connect_cb)); + + ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); + + ASSERT_EQ(1, nwritten_cb_called); + ASSERT_EQ(sizeof(nwritten_buf_data), nwritten_value); + ASSERT_EQ(3, close_cb_called); + + MAKE_VALGRIND_HAPPY(loop); + return 0; +} + + +/* + * Test that uv_write_nwritten returns correct byte count for pipes. + */ +static uv_pipe_t pipe_client; +static uv_pipe_t pipe_server; +static uv_write_t pipe_write_req; +static int pipe_cb_called; +static size_t pipe_nwritten_value; +static char pipe_buf_data[1024]; + +static void pipe_close_cb(uv_handle_t* handle) { + close_cb_called++; +} + +static void pipe_write_cb(uv_write_t* req, int status) { + ASSERT_OK(status); + pipe_cb_called++; + pipe_nwritten_value = uv_write_nwritten(req); + + uv_close((uv_handle_t*) &pipe_client, pipe_close_cb); + uv_close((uv_handle_t*) &pipe_server, pipe_close_cb); +} + +TEST_IMPL(pipe_write_nwritten) { + uv_loop_t* loop; + uv_buf_t buf; + int fds[2]; + + loop = uv_default_loop(); + + close_cb_called = 0; + pipe_cb_called = 0; + pipe_nwritten_value = 0; + memset(pipe_buf_data, 'C', sizeof(pipe_buf_data)); + + ASSERT_OK(uv_pipe(fds, 0, 0)); + + ASSERT_OK(uv_pipe_init(loop, &pipe_client, 0)); + ASSERT_OK(uv_pipe_init(loop, &pipe_server, 0)); + + ASSERT_OK(uv_pipe_open(&pipe_client, fds[1])); + ASSERT_OK(uv_pipe_open(&pipe_server, fds[0])); + + buf = uv_buf_init(pipe_buf_data, sizeof(pipe_buf_data)); + ASSERT_OK(uv_write(&pipe_write_req, + (uv_stream_t*) &pipe_client, + &buf, + 1, + pipe_write_cb)); + + ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); + + ASSERT_EQ(1, pipe_cb_called); + ASSERT_EQ(sizeof(pipe_buf_data), pipe_nwritten_value); + ASSERT_EQ(2, close_cb_called); + + MAKE_VALGRIND_HAPPY(loop); + return 0; +}