Submit with uv__work_submit directly (no uv_fs_write)

This commit is contained in:
Sam Schweigel 2025-12-16 13:56:44 -08:00
parent d442f47919
commit 8db48e5336
2 changed files with 72 additions and 56 deletions

View File

@ -258,7 +258,7 @@ typedef struct {
unsigned int write_index; \
uv_buf_t* bufs; \
unsigned int nbufs; \
int error; \
ssize_t result; \
uv_buf_t bufsml[4]; \
#define UV_CONNECT_PRIVATE_FIELDS \
@ -292,7 +292,7 @@ typedef struct {
int delayed_error; \
int accepted_fd; \
void* queued_fds; \
uv_fs_t* blocked_write; \
struct uv__work blocked_write; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
#define UV_TCP_PRIVATE_FIELDS /* empty */

View File

@ -92,7 +92,6 @@ void uv__stream_init(uv_loop_t* loop,
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->blocked_write = NULL;
stream->delayed_error = 0;
uv__queue_init(&stream->write_queue);
uv__queue_init(&stream->write_completed_queue);
@ -445,7 +444,7 @@ void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
uv__queue_remove(q);
req = uv__queue_data(q, uv_write_t, queue);
req->error = error;
req->result = error;
uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
}
@ -473,11 +472,6 @@ void uv__stream_destroy(uv_stream_t* stream) {
uv__write_callbacks(stream);
uv__drain(stream);
if (stream->blocked_write) {
uv__free(stream->blocked_write);
stream->blocked_write = NULL;
}
assert(stream->write_queue_size == 0);
}
@ -735,10 +729,11 @@ static void uv__write_req_finish(uv_write_t* req) {
* they should stop writing - which they should if we got an error. Something
* to revisit in future revisions of the libuv API.
*/
if (req->error == 0) {
if (req->result >= 0) {
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
req->result = 0;
}
/* Add it to the write_completed_queue where it will have its
@ -850,57 +845,66 @@ static int uv__try_write(uv_stream_t* stream,
return UV__ERR(errno);
}
static void uv__write_done(uv_fs_t* req) {
/* A note about blocking writes: The UV_HANDLE_WRITE_PENDING flag is toggled
* only on the loop thread (either by uv__write or uv__write_done). While we're
* doing work from the thread pool, we touch only the result field of
* uv_write_t; we'll never read it from the loop thread while a blocked write is
* pending.
*/
static void uv__write_work(struct uv__work* w) {
uv_stream_t* stream;
struct uv__queue* q;
uv_write_t* wreq;
uv_write_t *req;
stream = (uv_stream_t*)req->data;
stream->flags &= ~UV_HANDLE_WRITE_PENDING;
assert(!(uv__queue_empty(&stream->write_queue)));
stream = container_of(w, uv_stream_t, blocked_write);
assert(!uv__queue_empty(&stream->write_queue));
q = uv__queue_head(&stream->write_queue);
wreq = uv__queue_data(q, uv_write_t, queue);
req = uv__queue_data(q, uv_write_t, queue);
assert(req->handle == stream);
if (req->result >= 0) {
if (uv__write_req_update(stream, wreq, req->result))
uv__write_req_finish(wreq);
} else {
wreq->error = req->result;
uv__write_req_finish(wreq);
}
req->result = uv__try_write(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
}
static int uv__write_submit(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle) {
uv_os_fd_t fd;
uv_fs_t* req;
ssize_t r;
static void uv__write_done(struct uv__work* w, int status) {
uv_stream_t* stream;
struct uv__queue* q;
uv_write_t *req;
if (send_handle || !(stream->flags & UV_HANDLE_BLOCKING_WRITES))
return uv__try_write(stream, bufs, nbufs, send_handle);
stream = container_of(w, uv_stream_t, blocked_write);
stream->flags &= ~UV_HANDLE_WRITE_PENDING;
/* Is a blocking write already underway? */
if (stream->flags & UV_HANDLE_WRITE_PENDING)
return UV_EAGAIN;
assert(!uv__queue_empty(&stream->write_queue));
q = uv__queue_head(&stream->write_queue);
req = uv__queue_data(q, uv_write_t, queue);
stream->flags |= UV_HANDLE_WRITE_PENDING;
req = stream->blocked_write;
if (req == NULL) {
req = uv__malloc(sizeof *stream->blocked_write);
req->data = stream;
stream->blocked_write = req;
/* This happens when we're cancelled in the thread pool work queue. */
if (status != 0) {
req->result = status;
goto error;
}
fd = uv__stream_fd(stream);
r = uv_fs_write(stream->loop, req, fd, bufs, nbufs, -1, uv__write_done);
if (r != 0) {
stream->flags &= ~UV_HANDLE_WRITE_PENDING;
return r;
if (req->result >= 0) {
if (uv__write_req_update(stream, req, req->result))
uv__write_req_finish(req);
} else if (req->result != UV_EAGAIN)
goto error;
if (!uv__queue_empty(&stream->write_queue)) {
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
return UV_EAGAIN;
return;
error:
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
static void uv__write(uv_stream_t* stream) {
@ -925,10 +929,22 @@ static void uv__write(uv_stream_t* stream) {
req = uv__queue_data(q, uv_write_t, queue);
assert(req->handle == stream);
n = uv__write_submit(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
if (!(stream->flags & UV_HANDLE_BLOCKING_WRITES)) {
n = uv__try_write(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
} else {
n = UV_EAGAIN;
if (!(stream->flags & UV_HANDLE_WRITE_PENDING)) {
stream->flags |= UV_HANDLE_WRITE_PENDING;
uv__work_submit(stream->loop,
&stream->blocked_write,
UV__WORK_FAST_IO,
uv__write_work,
uv__write_done);
}
}
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0) {
@ -953,7 +969,7 @@ static void uv__write(uv_stream_t* stream) {
}
error:
req->error = n;
req->result = n;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
@ -986,7 +1002,7 @@ static void uv__write_callbacks(uv_stream_t* stream) {
/* NOTE: call callback AFTER freeing the request data. */
if (req->cb)
req->cb(req, req->error);
req->cb(req, req->result);
}
}
@ -1417,7 +1433,7 @@ int uv_write2(uv_write_t* req,
uv__req_init(stream->loop, req, UV_WRITE);
req->cb = cb;
req->handle = stream;
req->error = 0;
req->result = 0;
req->send_handle = send_handle;
uv__queue_init(&req->queue);
@ -1604,8 +1620,8 @@ void uv__stream_close(uv_stream_t* handle) {
handle->queued_fds = NULL;
}
if (handle->flags & UV_HANDLE_WRITE_PENDING)
uv_cancel((uv_req_t*)handle->blocked_write);
/* if (handle->flags & UV_HANDLE_WRITE_PENDING)
* uv_cancel((uv_req_t*)handle->blocked_write); */
assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
}