diff --git a/src/unix/stream.c b/src/unix/stream.c index f5d3041d0..602e94889 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -561,12 +561,19 @@ static void uv__read(uv_stream_t* stream) { struct msghdr msg; struct cmsghdr* cmsg; char cmsg_space[64]; + int count; + + /* Prevent loop starvation when the data comes in as fast as (or faster than) + * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. + */ + count = 32; /* XXX: Maybe instead of having UV_STREAM_READING we just test if * tcp->read_cb is NULL or not? */ - while ((stream->read_cb || stream->read2_cb) && - stream->flags & UV_STREAM_READING) { + while ((stream->read_cb || stream->read2_cb) + && (stream->flags & UV_STREAM_READING) + && (count-- > 0)) { assert(stream->alloc_cb); buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); diff --git a/src/unix/udp.c b/src/unix/udp.c index 523d2823d..3c784962c 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -196,6 +196,7 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { ssize_t nread; uv_buf_t buf; int flags; + int count; handle = container_of(w, uv_udp_t, read_watcher); assert(handle->type == UV_UDP); @@ -204,8 +205,12 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { assert(handle->recv_cb != NULL); assert(handle->alloc_cb != NULL); + /* Prevent loop starvation when the data comes in as fast as (or faster than) + * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. + */ + count = 32; + do { - /* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */ buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024); assert(buf.len > 0); assert(buf.base != NULL); @@ -246,6 +251,7 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { } /* recv_cb callback may decide to pause or close the handle */ while (nread != -1 + && count-- > 0 && handle->fd != -1 && handle->recv_cb != NULL); }