/* * uv_stream_t is a subclass of uv_handle_t. * * uv_stream is an abstract class. * * uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t and uv_tty_t. */ structuv_stream_s { UV_HANDLE_FIELDS UV_STREAM_FIELDS };
if (loop->emfile_fd == -1) { err = uv__open_cloexec("/dev/null", O_RDONLY); if (err < 0) /* In the rare case that "/dev/null" isn't mounted open "/" * instead. */ err = uv__open_cloexec("/", O_RDONLY); if (err >= 0) loop->emfile_fd = err; }
// 满足读数据条件,进行数据读取,读取成功后继续向下执行,读取需要多久? /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */ if (events & (POLLIN | POLLERR | POLLHUP)) uv__read(stream);
/* Short-circuit iff POLLHUP is set, the user is still interested in read * events and uv__read() reported a partial read but not EOF. If the EOF * flag is set, uv__read() called read_cb with err=UV_EOF and we don't * have to do anything. If the partial read flag is not set, we can't * report the EOF yet because there is still data to read. */ if ((events & POLLHUP) && (stream->flags & UV_HANDLE_READING) && (stream->flags & UV_HANDLE_READ_PARTIAL) && !(stream->flags & UV_HANDLE_READ_EOF)) { uv_buf_t buf = { NULL, 0 }; uv__stream_eof(stream, &buf); }
if (stream->flags & UV_HANDLE_CLOSING) return UV_EINVAL;
if (!(stream->flags & UV_HANDLE_READABLE)) return -ENOTCONN;
/* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just * expresses the desired state of the user. */ stream->flags |= UV_HANDLE_READING;
/* TODO: try to do the read inline? */ /* TODO: keep track of tcp state. If we've gotten a EOF then we should * not start the IO watcher. */ assert(uv__stream_fd(stream) >= 0); assert(alloc_cb);
/* The buffers to be written must remain valid until the callback is called. * This is not required for the uv_buf_t array. */ intuv_write(uv_write_t* req, uv_stream_t* handle, constuv_buf_t bufs[], unsignedint nbufs, uv_write_cb cb) { return uv_write2(req, handle, bufs, nbufs, NULL, cb); }
assert(nbufs > 0); assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY) && "uv_write (unix) does not yet support other types of streams");
if (uv__stream_fd(stream) < 0) return UV_EBADF;
if (!(stream->flags & UV_HANDLE_WRITABLE)) return -EPIPE;
if (send_handle) { if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) return UV_EINVAL;
/* XXX We abuse uv_write2() to send over UDP handles to child processes. * Don't call uv__stream_fd() on those handles, it's a macro that on OS X * evaluates to a function that operates on a uv_stream_t with a couple of * OS X specific fields. On other Unices it does (handle)->io_watcher.fd, * which works but only by accident. */ if (uv__handle_fd((uv_handle_t*) send_handle) < 0) return UV_EBADF;
#if defined(__CYGWIN__) || defined(__MSYS__) /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it. See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */ return UV_ENOSYS; #endif }
/* It's legal for write_queue_size > 0 even when the write_queue is empty; * it means there are error-state requests in the write_completed_queue that * will touch up write_queue_size later, see also uv__write_req_finish(). * We could check that write_queue is empty instead but that implies making * a write() syscall when we know that the handle is in error mode. */ empty_queue = (stream->write_queue_size == 0);
/* Append the request to write_queue. */ QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
/* If the queue was empty when this function began, we should attempt to * do the write immediately. Otherwise start the write_watcher and wait * for the fd to become writable. */ if (stream->connect_req) { /* Still connecting, do nothing. */ } elseif (empty_queue) { uv__write(stream); } else { /* * blocking streams should never have anything in the queue. * if this assert fires then somehow the blocking stream isn't being * sufficiently flushed in uv__write. */ assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); }
r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb); if (r != 0) return r;
/* Remove not written bytes from write queue size */ written = uv__count_bufs(bufs, nbufs); if (req.bufs != NULL) req_size = uv__write_req_size(&req); else req_size = 0; written -= req_size; stream->write_queue_size -= req_size;
/* Unqueue request, regardless of immediateness */ QUEUE_REMOVE(&req.queue); uv__req_unregister(stream->loop, &req); if (req.bufs != req.bufsml) uv__free(req.bufs); req.bufs = NULL;
/* Do not poll for writable, if we wasn't before calling this */ if (!has_pollout) { uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); }