unix, win: add send_queue_size and send_queue_count to uv_udp_t
This functionality is present in stream and uv_udp_t has a queue as well so it makes sense for udp to have a send_write_size. Since udp sends entire messages atomically, the send_queue_count field lets the user determine how many messages are there left to send.
This commit is contained in:
parent
f63e073be2
commit
b769484ca3
@ -900,6 +900,15 @@ typedef void (*uv_udp_recv_cb)(uv_udp_t* handle,
|
||||
/* uv_udp_t is a subclass of uv_handle_t */
|
||||
struct uv_udp_s {
|
||||
UV_HANDLE_FIELDS
|
||||
/* read-only */
|
||||
/* Total size of buffers queued for sending. May send
|
||||
* actually less since udp packets are truncated to the MTU size.
|
||||
*/
|
||||
size_t send_queue_size;
|
||||
/* Total count of sends currently in the queue awaiting to
|
||||
* be processed.
|
||||
*/
|
||||
size_t send_queue_count;
|
||||
UV_UDP_PRIVATE_FIELDS
|
||||
};
|
||||
|
||||
|
||||
@ -82,6 +82,9 @@ void uv__udp_finish_close(uv_udp_t* handle) {
|
||||
req->send_cb(req, -ECANCELED);
|
||||
}
|
||||
|
||||
handle->send_queue_size = 0;
|
||||
handle->send_queue_count = 0;
|
||||
|
||||
/* Now tear down the handle. */
|
||||
handle->recv_cb = NULL;
|
||||
handle->alloc_cb = NULL;
|
||||
@ -127,6 +130,8 @@ static void uv__udp_run_pending(uv_udp_t* handle) {
|
||||
* why we don't handle partial writes. Just pop the request
|
||||
* off the write queue and onto the completed queue, done.
|
||||
*/
|
||||
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
|
||||
handle->send_queue_count--;
|
||||
QUEUE_REMOVE(&req->queue);
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
@ -433,6 +438,8 @@ int uv__udp_send(uv_udp_send_t* req,
|
||||
return -ENOMEM;
|
||||
|
||||
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
|
||||
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
|
||||
handle->send_queue_count++;
|
||||
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
|
||||
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
||||
uv__handle_start(handle);
|
||||
@ -531,6 +538,8 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
|
||||
uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
|
||||
handle->alloc_cb = NULL;
|
||||
handle->recv_cb = NULL;
|
||||
handle->send_queue_size = 0;
|
||||
handle->send_queue_count = 0;
|
||||
uv__io_init(&handle->io_watcher, uv__udp_io, -1);
|
||||
QUEUE_INIT(&handle->write_queue);
|
||||
QUEUE_INIT(&handle->write_completed_queue);
|
||||
|
||||
@ -129,6 +129,8 @@ int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
|
||||
handle->activecnt = 0;
|
||||
handle->func_wsarecv = WSARecv;
|
||||
handle->func_wsarecvfrom = WSARecvFrom;
|
||||
handle->send_queue_size = 0;
|
||||
handle->send_queue_count = 0;
|
||||
|
||||
uv_req_init(loop, (uv_req_t*) &(handle->recv_req));
|
||||
handle->recv_req.type = UV_UDP_RECV;
|
||||
@ -402,6 +404,8 @@ static int uv__send(uv_udp_send_t* req,
|
||||
/* Request queued by the kernel. */
|
||||
req->queued_bytes = uv__count_bufs(bufs, nbufs);
|
||||
handle->reqs_pending++;
|
||||
handle->send_queue_size += req->queued_bytes;
|
||||
handle->send_queue_count++;
|
||||
REGISTER_HANDLE_REQ(loop, handle, req);
|
||||
} else {
|
||||
/* Send failed due to an error. */
|
||||
@ -524,6 +528,11 @@ void uv_process_udp_send_req(uv_loop_t* loop, uv_udp_t* handle,
|
||||
|
||||
assert(handle->type == UV_UDP);
|
||||
|
||||
assert(handle->send_queue_size >= req->queued_bytes);
|
||||
assert(handle->send_queue_count >= 1);
|
||||
handle->send_queue_size -= req->queued_bytes;
|
||||
handle->send_queue_count--;
|
||||
|
||||
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
||||
|
||||
if (req->cb) {
|
||||
|
||||
@ -353,6 +353,9 @@ TEST_IMPL(getsockname_udp) {
|
||||
|
||||
ASSERT(getsocknamecount == 2);
|
||||
|
||||
ASSERT(udp.send_queue_size == 0);
|
||||
ASSERT(udpServer.send_queue_size == 0);
|
||||
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -91,6 +91,9 @@ TEST_IMPL(udp_multicast_interface) {
|
||||
ASSERT(sv_send_cb_called == 1);
|
||||
ASSERT(close_cb_called == 1);
|
||||
|
||||
ASSERT(client.send_queue_size == 0);
|
||||
ASSERT(server.send_queue_size == 0);
|
||||
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -159,6 +159,8 @@ TEST_IMPL(udp_open) {
|
||||
ASSERT(send_cb_called == 1);
|
||||
ASSERT(close_cb_called == 1);
|
||||
|
||||
ASSERT(client.send_queue_size == 0);
|
||||
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -206,6 +206,9 @@ TEST_IMPL(udp_send_and_recv) {
|
||||
ASSERT(sv_recv_cb_called == 1);
|
||||
ASSERT(close_cb_called == 2);
|
||||
|
||||
ASSERT(client.send_queue_size == 0);
|
||||
ASSERT(server.send_queue_size == 0);
|
||||
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user