在 libuv 内部,对所有IO操作进行了统一的抽象,在底层的操作系统IO操作基础上,结合事件循环机制,实现了IO观察者,对应结构体 uv__io_s。其他 handle 通过内嵌I/O观察者的方式获得IO的监测能力,例如有 uv_stream_tuv_udp_tuv_poll_tuv_stream_t 的派生类型 uv_tcp_tuv_pipe_tuv_tty_tuv_stream_t 是一个抽象基类,一般不直接使用。内嵌IO观察者的 handle 本身就是IO观察者,和面向对象语言中常提到的 has-a 是一样的,所以,可以说 uv_stream_t 是一个I/O观察者,它的派生类也是I/O观察者。这类似于多重继承,即继承 uv_handle_t,又继承了 uv__io_t,所以即是 handle,又是 I/O观察者

应用程序 通过创建并初始化 handlerequest 的方式创建并初始化并注册I/O观察者,注册后被插入到 loop-watchers 队列中,并在 Start 后将I/O观察者标记为准备状态,在事件循环启动后对 loop-watchers 队列中所有I/O观察者关注的文件描述符进行轮询(linux下使用epoll)并在I/O事件触发时调用对应的回调函数。

libuv 的 15handle 类型中,属于I/O观察者就有 7 个,还有几个类型的 handle 虽然不直接内嵌I/O观察者,但内部实现依然依赖I/O观察者。实际上,只有 uv_timer_tuv_prepare_tuv_check_tuv_idle_t4handle 与I/O观察者无直接关系。足以见I/O观察者在 libuv 中的重要性,理解I/O观察者对于理解 libuv 甚至是 node.js 都是十分重要的。

下面开始讲其实现细节。

uv__io_s

uv__io_s 结构体 即为 I/O观察者 的抽象数据结构:

1
2
3
4
5
6
7
8
9
10
11
typedef struct uv__io_s uv__io_t;

struct uv__io_s {
uv__io_cb cb;
void* pending_queue[2];
void* watcher_queue[2];
unsigned int pevents; /* Pending event mask i.e. mask at next tick. */
unsigned int events; /* Current event mask. */
int fd;
UV_IO_PRIVATE_PLATFORM_FIELDS
};

主要字段用途介绍:

  • fd:感兴趣的 文件描述符;
  • cb:当文件描述符 fd 上有I/O事件发生时,调用该函数进行处理,注意,该函数的类型 uv__io_cb 是内部类型,并不对外暴露,所以 cb 自然也是内部提供的。libuv针对不同类型的IO观察者实现了多个不同的 cb 函数;
  • watcher_queue:作为队列节点,插入到 loop->watcher_queue 队列中,所有的I/O观察者都会被插入到这个队列中;
  • pending_queue:作为队列节点,插入到 loop->pending_queue 队列中,所有被挂起的I/O观察者都会被插入到这个队列中;
  • pevents:下次事件循环使用的事件掩码;
  • events:当前正在使用的事件掩码。events |= pevents

uv_stream_t(包括 uv_tcp_tuv_pipe_tuv_tty_t)、uv_udp_tuv_poll_t 类型结构体内部都内嵌了 uv__io_t

1
uv__io_t io_watcher;

另外,在 uv_loop_s 中,同样也内嵌了多个 uv__io_t 关联字段:

https://github.com/libuv/libuv/blob/v1.28.0/include/uv/unix.h#L218

1
2
3
4
5
6
7
#define UV_LOOP_PRIVATE_FIELDS                                                \
uv__io_t** watchers; \
unsigned int nwatchers; \
unsigned int nfds; \
uv__io_t async_io_watcher; \
uv__io_t signal_io_watcher; \
UV_PLATFORM_LOOP_FIELDS \

linux 下的 UV_PLATFORM_LOOP_FIELDS 定义:

1
2
3
4
#define UV_PLATFORM_LOOP_FIELDS                                               \
uv__io_t inotify_read_watcher; \
void* inotify_watchers; \
int inotify_fd; \

主要字段用途介绍:

  • watchers:咋看是一个二级指针,实际上是当做数组来使用的,数组的每一项存储了 uv__io_t* 类型的指针,使用示例:w = loop->watchers[i];,所有的I/O观察者都有以文件描述符 fd 作为数组下标保存在这个数组中,可以通过 fd 快速找到对应的I/O观察者。请仔细理解C语言数组和指针的关系,这里不能当做二级指针理解;
    注意:
    • 该指针指向的内存空间是动态分配的,以适应I/O观察者的变化,动态分配算法见函数 maybe_resize
    • 注意:实际分别长度为 nwatchers + 2
  • nwatchers:记录了能容纳的IO观察者的数量,len(watchers) == nwatchers + 2
  • nfds:记录了 watchers 中文件描述符的数量,也是实际使用量;
  • async_io_watcherloop 内嵌的I/O观察者,这个I/O观察者用于其他异步任务(运行在其他线程中)通过IO与主事件循环进行通讯;
  • signal_io_watcherloop 内嵌的I/O观察者,这个I/O观察者用于接收信号IO事件的。在 libuv 中,信号也被异步化到事件循环中去处理了;
  • inotify_read_watcherUV_PLATFORM_LOOP_FIELDS 宏在 linux 平台下展开之后会有一个 inotify_read_watcher,这个是用来在 linux 下支持 uv_fs_event_t的,在其他 uni* 平台上,有对应的 event_watcher

除以上介绍的 handle 外,uv_fs_poll_t 实际通过 uv_async_t 来支持其文件状态轮询的。

至此,我们已经把所有和I/O观察者相关的 handle 都提及到了,可以发现,libuv 中绝大多数 handle 都是I/O相关的,少部分 handle 简介通过I/O观察者实现功能。所以,libuv 可以说是专门为I/O而设计的,正如文档中所诉,它就是专注于异步I/O的程序库。下面,我们将介绍I/O观察者的工作原理,理解了I/O观察者就可以很容易的弄懂大部分相关的 handle 的工作原理。

因为 uv__io_t 是内部结构,并不对外暴露,所以我们以 uv_poll_t 作为入口,探索其内部的 uv__io_t 工作原理。

uv_poll_t 用于监控文件描述符的 可读/可写 状态,和 poll 系统调用的用途类型,不过 uv_poll_t 是异步非阻塞,而操作系统的原生 poll 函数是同步阻塞的。

uv_poll_t 是I/O观察者的简单封装后的应用程序接口,它只关心状态变化并调用用户层代码,I/O事件之后将由外部库处理。相比之下,uv_stream_t 则复杂的多,uv_stream_t 更进一步封装了数据读写操作等方面的能力,并将数据派发给不同类型的派生 handle 处理。

Init

uv_poll_init

使用文件描述符初始化 poll

以下是 uv_poll_init 的具体实现:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/poll.c#L67

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) {
int err;

if (uv__fd_exists(loop, fd))
return UV_EEXIST;

err = uv__io_check_fd(loop, fd);
if (err)
return err;

/* If ioctl(FIONBIO) reports ENOTTY, try fcntl(F_GETFL) + fcntl(F_SETFL).
* Workaround for e.g. kqueue fds not supporting ioctls.
*/
err = uv__nonblock(fd, 1);
if (err == UV_ENOTTY)
if (uv__nonblock == uv__nonblock_ioctl)
err = uv__nonblock_fcntl(fd, 1);

if (err)
return err;

uv__handle_init(loop, (uv_handle_t*) handle, UV_POLL);
uv__io_init(&handle->io_watcher, uv__poll_io, fd);
handle->poll_cb = NULL;
return 0;
}

以上代代码先是检查了文件描述符是否存在、文件描述符是否已经纳入监控等,然后调用 uv__handle_inituv__io_init 进行基类初始化,uv__io_init 是I/O观察者的初始化函数。

uv__io_init

直接上源码:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L805

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
assert(cb != NULL);
assert(fd >= -1);
QUEUE_INIT(&w->pending_queue);
QUEUE_INIT(&w->watcher_queue);
w->cb = cb;
w->fd = fd;
w->events = 0;
w->pevents = 0;

#if defined(UV_HAVE_KQUEUE)
w->rcount = 0;
w->wcount = 0;
#endif /* defined(UV_HAVE_KQUEUE) */
}

uv__io_init 初始化了两个队列,绑定了回调函数和文件描述符,基本工作就做完了。

调用 uv__io_init 时传递的 uv__poll_io 是 libuv 内部实现的,用于在I/O事件到来时调用。

uv__poll_io

如果有I/O事件产生,uv__poll_io 会被 libuv 调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_poll_t* handle;
int pevents;

handle = container_of(w, uv_poll_t, io_watcher);

/*
* As documented in the kernel source fs/kernfs/file.c #780
* poll will return POLLERR|POLLPRI in case of sysfs
* polling. This does not happen in case of out-of-band
* TCP messages.
*
* The above is the case on (at least) FreeBSD and Linux.
*
* So to properly determine a POLLPRI or a POLLERR we need
* to check for both.
*/
if ((events & POLLERR) && !(events & UV__POLLPRI)) {
uv__io_stop(loop, w, POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
uv__handle_stop(handle);
handle->poll_cb(handle, UV_EBADF, 0);
return;
}

pevents = 0;
if (events & POLLIN)
pevents |= UV_READABLE;
if (events & UV__POLLPRI)
pevents |= UV_PRIORITIZED;
if (events & POLLOUT)
pevents |= UV_WRITABLE;
if (events & UV__POLLRDHUP)
pevents |= UV_DISCONNECT;

handle->poll_cb(handle, 0, pevents);
}

uv__poll_io I/O事件的时候会被调用,工作逻辑如下:

  1. 首先通过 container_of 获取 handle
  2. 如果是一些异常的I/O事件,则会进入 Stop 流程并调用 handle->poll_cb
  3. 将事件记录到 pevents
  4. 调用 handle->poll_cb

handle->poll_cb 是在 Start 阶段设置的,所以 uv__poll_io 一定是在 uv_poll_start 调用后才能调用的,因为I/O事件在 uv_poll_start 后的下一次事件循环才能被处理。

uv__poll_io 比较简单,可以说就是直接调用用户层提供的回调函数,正如 uv_poll_t 的用途一样,负责监控文件描述符状态变化,但是不负责处理。

除了 uv__poll_io 外,还有多个同样功能的 uv__io_cb 类型的函数存在,他们用于不同的功能,通过全局搜索 uv__io_init 函数即可找到 uv__io_init 调用传递的不同 uv__io_cb 函数,如下:

  • uv__signal_event,用于处理 loop->signal_io_watcher 上的I/O事件;
  • uv__async_io,用于处理 loop->async_io_watcher 上的I/O事件;
  • uv__stream_io,用于处理 stream_handle->io_watcher 上的I/O事件;
  • uv__udp_io,用于处理 udp_handle->io_watcher 上的I/O事件。

以上这些 uv__io_cb 函数就没有 uv__poll_io 的实现简单了,它们都有更复杂的处理逻辑,如在 uv__stream_io 中,开始对文件描述符进行数据读写。

除了以上几个外,还有 uv_fs_event_t 相关I/O观察者的 uv__poll_io,因各平台实现不同,uv__poll_io 也有不同版本,就不列举了。

这些 handle,在 Init、Start、Stop、Close 等阶段多有都有不同,但是最大的不同还是在于I/O事件的处理函数 uv__io_cb 的实现不同,也就是以上列出的函数不同。

接下来,进入 Start 阶段。

Start

uv_poll_start

开始对文件描述符进行事件轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
int events;

assert((pevents & ~(UV_READABLE | UV_WRITABLE | UV_DISCONNECT |
UV_PRIORITIZED)) == 0);
assert(!uv__is_closing(handle));

uv__poll_stop(handle);

if (pevents == 0)
return 0;

events = 0;
if (pevents & UV_READABLE)
events |= POLLIN;
if (pevents & UV_PRIORITIZED)
events |= UV__POLLPRI;
if (pevents & UV_WRITABLE)
events |= POLLOUT;
if (pevents & UV_DISCONNECT)
events |= UV__POLLRDHUP;

uv__io_start(handle->loop, &handle->io_watcher, events);
uv__handle_start(handle);
handle->poll_cb = poll_cb;

return 0;
}

其他部分不用太多解释了,直接看关键步骤:uv__io_start

uv__io_start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
assert(0 != events);
assert(w->fd >= 0);
assert(w->fd < INT_MAX);

w->pevents |= events;
maybe_resize(loop, w->fd + 1);

#if !defined(__sun)
/* The event ports backend needs to rearm all file descriptors on each and
* every tick of the event loop but the other backends allow us to
* short-circuit here if the event mask is unchanged.
*/
if (w->events == w->pevents)
return;
#endif

if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}

关键步骤如下:

  1. 将参数 events 或到 w->pevents,因为 uv__io_start 可以反复多次调用,相当于更新;
  2. 按需扩容,判断当前的 loop->watchers 没有更多空间容纳 fd 及关联的I/O观察者,如果没有,指数级扩容,并拷贝内容到新的内存空间;
  3. w->watcher_queue 连接到 loop->watcher_queue 队列尾部,所有I/O观察者都被关联了起来。这里有个判断,为了防止重复操作。
  4. w->fd 为下标,将 w 保持到 loop->watchers,并更新引用计数 loop->nfds

至此,完成了I/O观察者的准备工作,供事件循环处理。

Stop

1
2
3
4
5
int uv_poll_stop(uv_poll_t* handle) {
assert(!uv__is_closing(handle));
uv__poll_stop(handle);
return 0;
}
1
2
3
4
5
6
7
static void uv__poll_stop(uv_poll_t* handle) {
uv__io_stop(handle->loop,
&handle->io_watcher,
POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
uv__handle_stop(handle);
uv__platform_invalidate_fd(handle->loop, handle->io_watcher.fd);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
assert(0 != events);

if (w->fd == -1)
return;

assert(w->fd >= 0);

/* Happens when uv__io_stop() is called on a handle that was never started. */
if ((unsigned) w->fd >= loop->nwatchers)
return;

w->pevents &= ~events;

if (w->pevents == 0) {
QUEUE_REMOVE(&w->watcher_queue);
QUEUE_INIT(&w->watcher_queue);

if (loop->watchers[w->fd] != NULL) {
assert(loop->watchers[w->fd] == w);
assert(loop->nfds > 0);
loop->watchers[w->fd] = NULL;
loop->nfds--;
w->events = 0;
}
}
else if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
}

uv__io_stop 实际上就是将I/O观察者从 loop 上移除,避免事件循环继续处理这个I/O观察者。

Close

uv_poll_t 并无 close 方法,但是存在 uv__io_close 方法,实现如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L882

1
2
3
4
5
6
7
8
void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
uv__io_stop(loop, w, POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
QUEUE_REMOVE(&w->pending_queue);

/* Remove stale events for this file descriptor */
if (w->fd != -1)
uv__platform_invalidate_fd(loop, w->fd);
}

调用了 uv__io_stop 完成 Close

Run:Poll for I/O

I/O观察者在事件循环启动后才会被真正的处理,主要是在 uv__io_polluv__run_pending 两个函数中处理的。

以下为 uv__io_poll 实现代码(含注释),这部分代码可以说是 libuv 中最核心的代码,因为这部分实现了 libuv 最核心功能异步IO的支持,实现了IO事件的轮询与事件的派发。

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/linux-core.c#L190

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
void uv__io_poll(uv_loop_t* loop, int timeout) {
/* A bug in kernels < 2.6.37 makes timeouts larger than ~30 minutes
* effectively infinite on 32 bits architectures. To avoid blocking
* indefinitely, we cap the timeout and poll again if necessary.
*
* Note that "30 minutes" is a simplification because it depends on
* the value of CONFIG_HZ. The magic constant assumes CONFIG_HZ=1200,
* that being the largest value I have seen in the wild (and only once.)
*/
static const int max_safe_timeout = 1789569;
struct epoll_event events[1024];
struct epoll_event* pe;
struct epoll_event e;
int real_timeout;
QUEUE* q;
uv__io_t* w;
sigset_t sigset;
sigset_t* psigset;
uint64_t base;
int have_signals;
int nevents;
int count;
int nfds;
int fd;
int op;
int i;

// 如果没有任何观察者,直接返回
if (loop->nfds == 0) {
assert(QUEUE_EMPTY(&loop->watcher_queue));
return;
}

memset(&e, 0, sizeof(e));

// 向 `epoll` 系统注册所有 I/O观察者
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
// 获取队列头部,并将队列从 `loop->watcher_queue` 摘除
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);

// 获取 I/O观察者 结构
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);

e.events = w->pevents;
e.data.fd = w->fd;

if (w->events == 0)
op = EPOLL_CTL_ADD;
else
op = EPOLL_CTL_MOD;

// 向 epoll 注册文件描述符及需要监控的IO事件
/* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
* events, skip the syscall and squelch the events after epoll_wait().
*/
if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();

assert(op == EPOLL_CTL_ADD);

// loop->backend_fd 在事件循环初始化时也就是在 `uv_loop_init` 中 通过 `epoll_create` 创建
/* We've reactivated a file descriptor that's been watched before. */
if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
abort();
}

// 挂起的 `pevents` 设置为 `events` 将在下次事件循环中生效
w->events = w->pevents;
}

// 如果配置了 `UV_LOOP_BLOCK_SIGPROF`,则需要阻塞该信号
psigset = NULL;
if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
sigemptyset(&sigset);
sigaddset(&sigset, SIGPROF);
psigset = &sigset;
}

assert(timeout >= -1);
base = loop->time;
// `count` 减少到 `0` 下面的循环跳出
count = 48; /* Benchmarks suggest this gives the best throughput. */
real_timeout = timeout;

// 开始进入 `epoll_pwait` 轮询IO事件
// 通常情况下,在 `timeout` 大于 `0` 的情况下,循环不断迭代到 `timeout` 减小到 `0` 时,循环跳出
// 在没有设置定时器的情况下,如果不出现错误,循环将一直不会跳出
// 以下循环主要由 `timeout` 和 `count` 控制是否跳出,符合整个事件循环
for (;;) {
/* See the comment for max_safe_timeout for an explanation of why
* this is necessary. Executive summary: kernel bug workaround.
*/
if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout)
timeout = max_safe_timeout;

// `epoll_pwait` 在 timeout 为 `0` 时立刻返回,为 `-1` 时会一直阻塞直到有事件发生,为 `正整数` 时则会最长阻塞 `timeout` 毫秒或有事件后返回。
// `nfds` 表示产生IO事件的文件描述符的数量,为 `0` 则为没有事件发生,可能因为超时时间到了,或者 `timeout=0`
// `events` 保存了从内核得到的事件集合,`nfds` 实际上相当于数组内有效数据的长度。
nfds = epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
psigset);

/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
* operating system didn't reschedule our process while in the syscall.
*/
SAVE_ERRNO(uv__update_time(loop));

// 没有事件发生
if (nfds == 0) {
// `timeout` 一定不为 `-1`
assert(timeout != -1);

// 如果`timeout`为`0`函数直接返回
if (timeout == 0)
return;

/* We may have been inside the system call for longer than |timeout|
* milliseconds so we need to update the timestamp to avoid drift.
*/
// 减少下次 `epoll_pwait` 的 `timeout` 时间
goto update_timeout;
}

// `epoll_wait` 返回错误
if (nfds == -1) {
if (errno != EINTR)
abort();

// 如果`timeout`为`-1`则继续循环
if (timeout == -1)
continue;

// 如果`timeout`为`0`函数直接返回
if (timeout == 0)
return;

/* Interrupted by a signal. Update timeout and poll again. */
// 减少下次 `epoll_pwait` 的 `timeout` 时间
goto update_timeout;
}

have_signals = 0;
nevents = 0;

assert(loop->watchers != NULL);
// `loop->watchers` 的实际长度 为 `loop->nwatchers + 2`,观察者只使用 `loop->watchers` 的 `0` ~ `loop->nwatchers - 1` 项
// `loop->nwatchers` ~ `loop->nwatchers + 1` 被用来存储 `events` 和 `nfds`,`uv__platform_invalidate_fd` 中会使用
// 后面以下两项又被赋值为`NULL`,`for`循环部分又没有代码能够使函数返回,所以看似以下两行并无实际作用
loop->watchers[loop->nwatchers] = (void*) events;
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;

// 处理IO事件:获取IO观察者,调用关联的回调函数
for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->data.fd;

/* Skip invalidated events, see uv__platform_invalidate_fd */
if (fd == -1)
continue;

assert(fd >= 0);
assert((unsigned) fd < loop->nwatchers);

w = loop->watchers[fd];

// 如果IO观察者已经被移除,则停止轮询这个文件描述符上的IO事件
// 在一次事件循环中,同一IO观察者上可能出现多次IO事件
// 继而调用多次回调函数,某次回调函数中,有可能移除了`w`自己
if (w == NULL) {
/* File descriptor that we've stopped watching, disarm it.
*
* Ignore all errors because we may be racing with another thread
* when the file descriptor is closed.
*/
epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, pe);
continue;
}

// 进入事件处理

/* Give users only events they're interested in. Prevents spurious
* callbacks when previous callback invocation in this loop has stopped
* the current watcher. Also, filters out events that users has not
* requested us to watch.
*/
pe->events &= w->pevents | POLLERR | POLLHUP;

/* Work around an epoll quirk where it sometimes reports just the
* EPOLLERR or EPOLLHUP event. In order to force the event loop to
* move forward, we merge in the read/write events that the watcher
* is interested in; uv__read() and uv__write() will then deal with
* the error or hangup in the usual fashion.
*
* Note to self: happens when epoll reports EPOLLIN|EPOLLHUP, the user
* reads the available data, calls uv_read_stop(), then sometime later
* calls uv_read_start() again. By then, libuv has forgotten about the
* hangup and the kernel won't report EPOLLIN again because there's
* nothing left to read. If anything, libuv is to blame here. The
* current hack is just a quick bandaid; to properly fix it, libuv
* needs to remember the error/hangup event. We should get that for
* free when we switch over to edge-triggered I/O.
*/
if (pe->events == POLLERR || pe->events == POLLHUP)
pe->events |=
w->pevents & (POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);

// 如果存在有效事件
if (pe->events != 0) {
/* Run signal watchers last. This also affects child process watchers
* because those are implemented in terms of signal watchers.
*/
// 如果 `w` 是 `loop->signal_IOWatcher` 在循环之外调用回调,避免重复触发回调
if (w == &loop->signal_IOWatcher)
have_signals = 1;
else
w->cb(loop, w, pe->events);
// `w->cb` 是 `uv__io_cb` 类型的函数指针,对应的实现函数如`uv__async_io`已经在上文介绍
// 这个回调函数指针由 libuv 内部实现的统一入口,在 `cb` 中再进行事件分发,交由特定逻辑处理

nevents++;
}
}

// 如果信号事件触发
if (have_signals != 0)
loop->signal_IOWatcher.cb(loop, &loop->signal_IOWatcher, POLLIN);

// 重新赋值为`NULL`
loop->watchers[loop->nwatchers] = NULL;
loop->watchers[loop->nwatchers + 1] = NULL;

// 如果信号事件触发
if (have_signals != 0)
return; /* Event loop should cycle now so don't poll again. */

// 如果 事件计数器 不为 `0`
if (nevents != 0) {
// 如果 所有 所有文件描述符上都有事件产生 且 `count` 不为 `0`,再循环一次
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}

// 如果`timeout`为`0`函数直接返回
if (timeout == 0)
return;

// 如果`timeout`为`-1`则继续循环
if (timeout == -1)
continue;

// 重新计算 `timeout`
update_timeout:
assert(timeout > 0);

real_timeout -= (loop->time - base);
if (real_timeout <= 0)
return;

// 剩余 `timeout`
timeout = real_timeout;
}
}

在某些情况下,IO观察者绑定的回调函数并不是立即调用的,而是被延迟到下一次事件循环的固定阶段调用的,在 uv_run 中调用的 uv__run_pending 处理这些被延迟的IO观察者,实现如下:

https://github.com/libuv/libuv/blob/view-v1.28.0/src/unix/core.c#L737

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;

if (QUEUE_EMPTY(&loop->pending_queue))
return 0;

QUEUE_MOVE(&loop->pending_queue, &pq);

while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}

return 1;
}

该函数遍历 loop->pending_queue 队列节点,取得I/O观察者后调用 cb,并且指定 events 参数为固定值 POLLOUT(表示可写),因此可以猜测被插入到 loop->pending_queue 队列中的情形都是可写I/O事件。该队列上是用来保存被延迟到下次事件循环中处理的IO观察者。为什么需要延迟呢?

通过搜索可以找到只有 uv__io_feed 中存在向 loop->pending_queue 队列插入节点的代码,如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L892

1
2
3
4
void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
if (QUEUE_EMPTY(&w->pending_queue))
QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}

继续搜索 uv__io_feed 可找到多处调用,此处就不过多介绍了。

至此,整个I/O观察者工作原理已经分析完成了。

I/O观察者是 libuv 中I/O相关的基础抽象,实现了对I/O事件的监控,其他I/O相关的功能基于这个基础抽象,I/O观察者完成了基本的事件派发,事件处理中的I/O数据读写则由更高级的抽象 handlerequest 完成。


查看源文件&nbsp;&nbsp;编辑源文件

在事件循环中,处理的一个 handle 就是计时器,通过 uv__run_timers,我们可以找到 timer.c 文件,里面包含了 timer 的实现。

先来看一下 uv__run_timers 的实现源码:

https://github.com/libuv/libuv/blob/v1.28.0/src/timer.c#L158

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;

for (;;) {
heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
break;

handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)
break;

uv_timer_stop(handle);
uv_timer_again(handle);
handle->timer_cb(handle);
}
}

在 libuv 中,timer是按超时时间 timeout 存放在最小堆中的,这样,最小的的堆顶就是 timeout 最小的那个 timer,也就是最先到达超时时间的那个定时任务。

所以,在检查到期的定时任务时,只需要不断的获取堆顶的元素,并与当前时间比对:

  1. 如果没有堆顶元素,则没有任何定时器存在,函数将直接返回。
  2. 如果当前时间小于定时任务的超时间,那么堆顶timer未到到超时时间,非堆顶的timer更没有达到超时时间,整个uv__run_timers也就会退出。
  3. 如果当前时间等于或大于定时任务的超时间,这个timer就是一定达到或超过执行时间的。这时,就可以从timer堆中将其取出,然后调用其回调函数handle->timer_cb(handle)处理定时任务,然后再次重复获取下一个出现在堆顶的timer,直到情况1或2成立。

以下有两个主要注意的点:

  1. 大于或等于实际上包含两种情形,这两种情形对于实际应用程序逻辑来说可能会出现天壤之别。
    1. 如果当前时间等于定时任务的超时间,就是最理想的状态了,因为定时任务会在定时器到来的时候准时被执行,与预期相符合。
    2. 如果当前时间大于定时任务的超时间,则是非理想的状态了,然而这种情形缺是最出现的,因为很难保证当timer的超时时间到来时,程序搞好执行到此。
  2. 如果定时任务的回调函数handle->timer_cb执行时间过长,将会导致整个循环阻塞在此处,从而影响其他定时器的处理,进而也影响到整个时间循环的其他逻辑的处理,因为只有一个线程在处理各类型的回调任务。

Methods

1
2
3
4
5
6
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle)
int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, uint64_t timeout, uint64_t repeat)
int uv_timer_stop(uv_timer_t* handle)
int uv_timer_again(uv_timer_t* handle)
void uv_timer_set_repeat(uv_timer_t* handle, uint64_t repeat)
uint64_t uv_timer_get_repeat(const uv_timer_t* handle)

uv_timer_t
void (uv_timer_cb)(uv_timer_t handle)

1
2
3
4
5
6
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle) {
uv__handle_init(loop, (uv_handle_t*)handle, UV_TIMER);
handle->timer_cb = NULL;
handle->repeat = 0;
return 0;
}

查看源文件&nbsp;&nbsp;编辑源文件

Handle 是 libuv 设计实现的核心部分之一,根据官方描述:Handles 代表长生命周期的对象有能力执行某些操作当它们处于激活状态下

libuv 采用了组合的方式实现代码复用,并且达到了面向对象编程中的继承的效果。

Handle 有很多种不同的类型,这些类型有一个共同的、公共的基础结构 uv_handle_s,因结构体内存布局字节对齐所有子类型都可以强制类型转换成 uv_handle_t 类型,所以所有能够应用在 uv_handle_t 上的基础API都可用于子类型的 handle

在开始对不同类型的 Handle 开始分析之前,将会对 Handle 进行整体分析。

首先,看一下 libuv 中的 Handle 相关的类型声明和定义:

https://github.com/libuv/libuv/blob/v1.28.0/include/uv.h#L201

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Handle types. */
typedef struct uv_loop_s uv_loop_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_udp_s uv_udp_t;
typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_tty_s uv_tty_t;
typedef struct uv_poll_s uv_poll_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
typedef struct uv_async_s uv_async_t;
typedef struct uv_process_s uv_process_t;
typedef struct uv_fs_event_s uv_fs_event_t;
typedef struct uv_fs_poll_s uv_fs_poll_t;
typedef struct uv_signal_s uv_signal_t;

以上是 libuv 中所有的 handle 声明,并且都起了类型别名,命名规律显而易见,uv_loop_t 也在其中,uv_loop_t 作为所有资源的统一入口同样也是一种资源,而且是生命周期最长的资源。

下面来开一下 uv_handle_s 的结构定义:

https://github.com/libuv/libuv/blob/v1.28.0/include/uv.h#L411

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define UV_HANDLE_FIELDS     \
/* public */ \ 公有字段:
void* data; \ 数据指针,指向实际的数据,可用于指向任意用户自定义数据地址
/* read-only */ \ 只读字段:
uv_loop_t* loop; \ 事件循环对象指针,指向事件循环对象
uv_handle_type type; \ 类型
/* private */ \ 私有字段:
uv_close_cb close_cb; \ 关闭回调,Handle关闭时触发的回调函数
void* handle_queue[2]; \ 队列节点,用于链接前后节点,形成双向队列,该字段是含有两个成员的数组,分别表示next、prev。
union { \ 联合体字段,TODO:待补充作用
int fd; \
void* reserved[4]; \
} u; \
UV_HANDLE_PRIVATE_FIELDS \ 平台相关字段,libuv在各个不同平台上的实现是不同的,为支持不同平台,需针对不同平台进行差异化实现。

UV_HANDLE_FIELDS 被放到了 struct uv_handle_s

https://github.com/libuv/libuv/blob/view-v1.28.0/include/uv.h#L426

1
2
3
4
/* The abstract base class of all handles. */
struct uv_handle_s {
UV_HANDLE_FIELDS
};

如注释所言,struct uv_handle_s 是所有 handle 的抽象基类。

*nix 平台下,UV_HANDLE_PRIVATE_FIELDS 宏定义如下:

https://github.com/libuv/libuv/blob/v1.x/src/uv-common.h#L62

1
2
3
#define UV_HANDLE_PRIVATE_FIELDS     \
uv_handle_t* next_closing; \ 下一个 处于 closing 状态的 handle 的地址,形成一个单向的链表
unsigned int flags; \ handle 的 标识,handle 存在很多标识,通过位运算获取。

flags 可以使用的标识如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Handle flags. Some flags are specific to Windows or UNIX. */
enum {
/* Used by all handles. */
UV_HANDLE_CLOSING = 0x00000001,
UV_HANDLE_CLOSED = 0x00000002,
UV_HANDLE_ACTIVE = 0x00000004,
UV_HANDLE_REF = 0x00000008,
UV_HANDLE_INTERNAL = 0x00000010,
UV_HANDLE_ENDGAME_QUEUED = 0x00000020,

/* Used by streams. */
UV_HANDLE_LISTENING = 0x00000040,
UV_HANDLE_CONNECTION = 0x00000080,
UV_HANDLE_SHUTTING = 0x00000100,
UV_HANDLE_SHUT = 0x00000200,
UV_HANDLE_READ_PARTIAL = 0x00000400,
UV_HANDLE_READ_EOF = 0x00000800,

// ... 其他标识省略标识
};

所有 handle 都具备 UV_HANDLE_ACTIVE UV_HANDLE_CLOSED 等几个公共状态,还有一些特地 handle 特定的状态。

以上为 uv_handle_s 定义,其中字段是通过 UV_HANDLE_FIELDS 宏定义和引入的,这样做的目的是为了复用字段定义部分的代码,能有效降低代码量,提升可维护性。相关字段的功能描述见字段后的说明。

uv_handle_t 实际上就是作为所有其他 handle基类存在的,其他 handle 通过组合的方式集成了 uv_handle_t 字段,通过强制类型转换,可以转换为 uv_handle_t,之后在其上应用 uv_handle_t 的相关方法。

stream 为例看一下其类型定义:

https://github.com/libuv/libuv/blob/v1.x/include/uv.h#L461

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define UV_STREAM_FIELDS                        \
/* number of bytes queued for writing */ \
size_t write_queue_size; \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
/* private */ \
UV_STREAM_PRIVATE_FIELDS

/*
* uv_stream_t is a subclass of uv_handle_t.
*
* uv_stream is an abstract class.
*
* uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t and uv_tty_t.
*/
struct uv_stream_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
};

uv_stream_s 结构体中,包含了 uv_handle_s 的宏定义 UV_HANDLE_FIELDSuv_stream_s 类型特定宏定义 UV_STREAM_FIELDSuv_stream_suv_handle_s 在结构体内存布局上存在公共的部分且是以起始地址对齐的,uv_stream_suv_handle_s 多出一块特有的部分, 可以通过强制类型转换将 uv_stream_s 转换为 uv_handle_s

uv_handle_t 定义了所有 handle 公共的部分,作为一个抽象基类存在。uv_handle_t 是不直接使用的,因为它并不能支持用户需求,无实际意义,实际上,在使用其他派生类型时,会间接使用 uv_handle_t。所有派生类型在初始化的时候,也进行了 uv_handle_t 的初始化,这类似于高级语言构造函数在执行时常常需要调用基类构造函数一样。除初始化操作以外,同样还有其他操作需要调用 uv_handle_t 函数的相关操作。

一般来说,派生类型具备如下几个操作:

  • uv_{handle}_init:初始化 handle 结构,把各个字段设置成合理值,并插入 loop->handle_queue 队列;
  • uv_{handle}_start:启动 handle 使其处于 UV_HANDLE_ACTIVE 状态;
  • uv_{handle}_stop:停止 handle 使其处于 UV_HANDLE_CLOSED 状态,并移出 loop->handle_queue 队列。

以上各派生类型的公共操作,体现了 handle 的生命周期,和 loop 生命周期类似,除此之外还包括一些特定 handle 特定处理逻辑。

因为各个派生类型的初始化/启动/停止逻辑都不相同,所以并没有公共的初始化/启动/停止方法,每个派生类型根据需要实现类型特定的初始化/启动/停止函数,同时它们还需要在内部调用基类的方法进行对象 初始化/启动/停止 uv_handle_t,对应的方法为:

  • uv__handle_init
  • uv__handle_start
  • uv__handle_stop

从命名可以看出这些都是不对外暴露的方法。

接下来我就来看一下 handle 的初始化。

Init:uv__handle_init

uv_handle_t 的初始化代码是用宏 uv__handle_init 定义的宏函数,采用宏可实现内联 inline 函数的效果,实现如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.h#L282

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define uv__handle_init(loop_, h, type_)                                      \
do { \
(h)->loop = (loop_); \
(h)->type = (type_); \
(h)->flags = UV_HANDLE_REF; /* Ref the loop when active. */ \
QUEUE_INSERT_TAIL(&(loop_)->handle_queue, &(h)->handle_queue); \
uv__handle_platform_init(h); \
} \
while (0)

#if defined(_WIN32)
# define uv__handle_platform_init(h) ((h)->u.fd = -1)
#else
# define uv__handle_platform_init(h) ((h)->next_closing = NULL)
#endif

uv__handle_init 是一个使用宏定义的宏函数,do{}while(0) 是一种巧妙用法,形成代码块,且调用时后边必须加分号,会被编译器优化掉。

这段代码主要完成以下几个工作:

  1. 关联 loophandle,可以通过 handle 找到对应的 loop
  2. 设置 handle 类型;
  3. 设置 handle 标识为 UV_HANDLE_REF,这个标识位决定了 handle 是否计入引用计数。后续 Start Stop 会看到其用途;
  4. handle 插入 loop->handle_queue 队列的尾部,所有初始化的 handle 就将被插入到这个队列中;
  5. 通过 uv__handle_platform_init 平台特定初始化函数将 handlenext_closing 设置为 NULL,这是一个连接了所有关闭的 handle 的单链表。

如下是 uv_timer_t 的初始化函数 uv_timer_init,它直接引用了 uv__handle_init 初始化 uv_handle_t,其他派生类型也是如此。

https://github.com/libuv/libuv/blob/v1.28.0/src/timer.c#L62

1
2
3
4
5
6
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle) {
uv__handle_init(loop, (uv_handle_t*)handle, UV_TIMER);
handle->timer_cb = NULL;
handle->repeat = 0;
return 0;
}

这样初始化工作就完成了,各个派生结构特定的初始化部分可能很简单,也可能很复杂。

Start:uv__handle_start

uv_handle_t 的启动代码是用宏 uv__handle_start 定义的宏函数,实现如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.h#L239

1
2
3
4
5
6
7
#define uv__handle_start(h)                                                   \
do { \
if (((h)->flags & UV_HANDLE_ACTIVE) != 0) break; \
(h)->flags |= UV_HANDLE_ACTIVE; \
if (((h)->flags & UV_HANDLE_REF) != 0) uv__active_handle_add(h); \
} \
while (0)

uv__handle_start 将 handle 设置为 UV_HANDLE_ACTIVE 状态,并通过 uv__active_handle_add 更新活动的 handle 引用计数。如果不存在 UV_HANDLE_REF 标志位,则不会增加引用计数。

虽然对 handle 进行了 Start 操作,但是实际仅仅是设置了个标志位和增加了一个引用计数而已,看不到任何的 Start,实际上是告诉 libuv 该 handle 准备好了,可以 Go 了。因为更新引用计数间接影响了事件循环的活动状态。

uv_run 才是真正的启动操作,向 libuv 表明 Ready 了之后,uv_run 的时候才会处理这个 handle

Stop:uv__handle_stop

handle 的 Stop: 操作 由 uv__handle_stop 宏实现:

1
2
3
4
5
6
7
#define uv__handle_stop(h)                                                    \
do { \
if (((h)->flags & UV_HANDLE_ACTIVE) == 0) break; \
(h)->flags &= ~UV_HANDLE_ACTIVE; \
if (((h)->flags & UV_HANDLE_REF) != 0) uv__active_handle_rm(h); \
} \
while (0)

uv__handle_stop 将 handle 设置为 ~UV_HANDLE_ACTIVE 状态,并通过 uv__active_handle_rm 更新活动的 handle 引用计数。如果不存在 UV_HANDLE_REF 标志位,则不会减少引用计数。

StopStart 的反向操作,将 handle 修改为非准备状态。

Close:uv_close

对于 handle 来说,还有一个 Close 方法 uv_closeClose 可以认为是 Init 的反向操作,它将 handleloop->handle_queue 移除,清理资源并触发回调。

不同于上面三个方法,uv_close 是对外开放的,适用于所有类型 handle 的方法,在 uv_close 内部根据不同的类型,调用对应的函数处理。

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L107

1
2
3
4
5
6
7
8
9
10
11
12
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
assert(!uv__is_closing(handle));

handle->flags |= UV_HANDLE_CLOSING;
handle->close_cb = close_cb;

switch (handle->type) {
// ...
}

uv__make_close_pending(handle);
}

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L209

1
2
3
4
5
6
void uv__make_close_pending(uv_handle_t* handle) {
assert(handle->flags & UV_HANDLE_CLOSING);
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}

loop 上有一个 closing_handles 字段,这是一个单向链表,关联了处于关闭进行中的 handle,这个字段的类型是 uv_handle_t*,指向了 uv_handle_t,而 uv_handle_s 存在了一个 uv_handle_t* 类型的指针 next_closing 指向下一个 handle, 这样就形成一个单向链表。

如下 closing_handles 的声明和初始化:

1
2
#define UV_LOOP_PRIVATE_FIELDS                                                \
uv_handle_t* closing_handles; \
1
2
#define UV_HANDLE_PRIVATE_FIELDS                                              \
uv_handle_t* next_closing; \
1
2
3
4
5
int uv_loop_init(uv_loop_t* loop) {
// ...
loop->closing_handles = NULL;
// ...
}

uv_close 通过调用 uv__make_close_pending 将待关闭的 handle 放到 loop->closing_handles 链表末尾,panding 的含义是延迟到下次事件循环处理。

uv_runCall close callbacks 阶段,通过函数 uv__run_closing_handles 专门负责处理 loop->closing_handles

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L343

1
2
3
4
5
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
while (r != 0 && loop->stop_flag == 0) {
uv__run_closing_handles(loop);
}
}

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L286

1
2
3
4
5
6
7
8
9
10
11
12
13
static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;

p = loop->closing_handles;
loop->closing_handles = NULL;

while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L236

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void uv__finish_close(uv_handle_t* handle) {
assert(handle->flags & UV_HANDLE_CLOSING);
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;

switch (handle->type) {
//
}

uv__handle_unref(handle);
QUEUE_REMOVE(&handle->handle_queue);

if (handle->close_cb) {
handle->close_cb(handle);
}
}

首先将 closing_handlesloop 摘除,然后遍历 closing_handles,通过 uv__finish_close 对每个 handle 进行最后的 closehandle 被移除 loop->handle_queue 并调用其关联的 close_cb,至此 handle 彻底没有了和 loop 的关联走完了一个完整的生命周期。

uv_close 的处理过程被拆分成了两段,一段是调用 uv__make_close_pending,另一段是在事件循环中调用 uv__run_closing_handles,关闭的过程是异步的,用户程序无法仅仅是通过 uv_close 返回判断关闭是否完成,需要在 close_cb 中接收异步操作结果。那么问题来了,为什么要拆分成两段而不是一次性处理完呢?

uv_close 一般来说都是在异步回调中被调用的,因为一个 handle 的关闭在逻辑上依赖于 handle 完成相关工作,而异步的逻辑中,完成工作后会调用相应的回调,所以只有在回调中调用 uv_close 才能使逻辑上是同步。

Close 阶段可以看做是 Init 阶段的反向操作。

handle 就这样伴随着事件循环经历了 Init -> Start -> Stop -> Close 等生命周期。

Reference counting:Ref & Unref

上文已经遇到过,handle 有个 UV_HANDLE_REF 标志位,这个状态用于控制 handle 是否计入 loop->active_handles 引用计数,因为 handle 的引用计数影响 loop 活动状态,所以 UV_HANDLE_REF 状态会间接影响 loop 的状态。

接下来,我们看下引用计数相关API:

https://github.com/libuv/libuv/blob/view-v1.28.0/src/uv-common.c#L502

1
2
3
4
5
6
7
8
9
10
11
12
13
void uv_ref(uv_handle_t* handle) {
uv__handle_ref(handle);
}


void uv_unref(uv_handle_t* handle) {
uv__handle_unref(handle);
}


int uv_has_ref(const uv_handle_t* handle) {
return uv__has_ref(handle);
}

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.h#L255

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#define uv__handle_ref(h)                                                     \
do { \
if (((h)->flags & UV_HANDLE_REF) != 0) break; \
(h)->flags |= UV_HANDLE_REF; \
if (((h)->flags & UV_HANDLE_CLOSING) != 0) break; \
if (((h)->flags & UV_HANDLE_ACTIVE) != 0) uv__active_handle_add(h); \
} \
while (0)

#define uv__handle_unref(h) \
do { \
if (((h)->flags & UV_HANDLE_REF) == 0) break; \
(h)->flags &= ~UV_HANDLE_REF; \
if (((h)->flags & UV_HANDLE_CLOSING) != 0) break; \
if (((h)->flags & UV_HANDLE_ACTIVE) != 0) uv__active_handle_rm(h); \
} \
while (0)

#define uv__has_ref(h) \
(((h)->flags & UV_HANDLE_REF) != 0)

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.h#L221

1
2
3
4
5
6
7
8
9
10
11
#define uv__active_handle_add(h)                                              \
do { \
(h)->loop->active_handles++; \
} \
while (0)

#define uv__active_handle_rm(h) \
do { \
(h)->loop->active_handles--; \
} \
while (0)

存在引用计数标志 UV_HANDLE_REF,会计入引用计数,否则不会引用计数。

实现非常简单,在条件满足的情况下,更新 loop->active_handles 值。

在事件循环初始化函数 uv_loop_init 中,loop->child_watcherloop->wq_async 都被 Unref 了,避免影响 loop 的存活状态。

Status:Active & Closing

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L400

1
2
3
int uv_is_active(const uv_handle_t* handle) {
return uv__is_active(handle);
}

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L301

1
2
3
int uv_is_closing(const uv_handle_t* handle) {
return uv__is_closing(handle);
}

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.h#L233

1
2
3
4
5
#define uv__is_active(h)                                                      \
(((h)->flags & UV_HANDLE_ACTIVE) != 0)

#define uv__is_closing(h) \
(((h)->flags & (UV_HANDLE_CLOSING | UV_HANDLE_CLOSED)) != 0)

实现简单,无需解释。

handle 和 loop 的关联关系

handleloop 之间的关联是最为重要的,handle 必须注册到 loop 中的各种结构中才有意义,脱离 loophandle 是毫无用途的,只有关联到 loop 上的 handle 才能在事件循环的过程中被处理。以上 handle 生命周期的核心就是在管理这种关系。除了以上基本的关联之外,handleloop 还有其他关联。

InitClose 操作中,handle 被插入/移除 loop->handle_queue 队列,uv__active_handle_adduv__active_handle_rm 这两个宏函数修改 handle 的引用计数,进而间接修改了 loop 的状态。

loop->handle_queue 外,loop 中还有多个 handle 有关的队列,handle 除了被插入 loop->handle_queue 队列外,还会被插入到类型特定的结构中(如:队列、链表、堆),在 uv_run 的各个阶段,libuv 依赖这些结构完成工作中,下面将逐个来介绍一下都有哪些关联以及都是什么用途。

uv_loop_t 中,有多个相关 handle 队列:

uv_idle_t uv_check_t uv_check_t

https://github.com/libuv/libuv/blob/v1.28.0/include/uv/unix.h#L231

1
2
3
4
5
#define UV_LOOP_PRIVATE_FIELDS                                                \
void* process_handles[2]; \
void* prepare_handles[2]; \
void* check_handles[2]; \
void* idle_handles[2]; \
  • uv_idle_t 还会被插入到 loop->idle_handles 队列头部,队列节点为 handle->queue
  • uv_check_t 还会被插入到 loop->check_handles 队列头部,队列节点为 handle->queue
  • uv_check_t 还会被插入到 loop->prepare_handles 队列头部,队列节点为 handle->queue

队列初始化:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/loop.c#L29

1
2
3
4
5
6
7
8
int uv_loop_init(uv_loop_t* loop) {
// ...
QUEUE_INIT(&loop->idle_handles);
QUEUE_INIT(&loop->async_handles);
QUEUE_INIT(&loop->check_handles);
QUEUE_INIT(&loop->prepare_handles);
// ...
}

队列插入节点:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/loop-watcher.c#L24

1
2
3
4
5
6
7
8
#define UV_LOOP_WATCHER_DEFINE(name, type)                                    \
...
int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \
...
QUEUE_INSERT_HEAD(&handle->loop->name##_handles, &handle->queue); \
...
} \
...

uv_async_t

1
2
#define UV_LOOP_PRIVATE_FIELDS                                                \
void* async_handles[2]; \
  • uv_async_t 还会被插入到 loop->async_handles 队列尾部,队列节点为 handle->queue

队列初始化:

https://github.com/libuv/libuv/blob/v1.x/src/unix/loop.c#L29

1
2
3
4
5
int uv_loop_init(uv_loop_t* loop) {
// ...
QUEUE_INIT(&loop->async_handles);
// ...
}

队列插入节点:

https://github.com/libuv/libuv/blob/v1.x/src/unix/async.c#L40

1
2
3
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
}

uv_process_t

1
2
#define UV_LOOP_PRIVATE_FIELDS                                                \
void* process_handles[2]; \

uv_process_t 还会被插入到 loop->process_handles 队列尾部,队列节点为 handle->queue

队列初始化:

https://github.com/libuv/libuv/blob/v1.x/src/unix/loop.c#L29

1
2
3
4
5
6
7
int uv_loop_init(uv_loop_t* loop) {
// ...
uv__handle_unref(&loop->child_watcher);
loop->child_watcher.flags |= UV_HANDLE_INTERNAL;
QUEUE_INIT(&loop->process_handles);
// ...
}

队列插入节点:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/process.c#L410

1
2
3
4
5
6
7
8
9
10
11
int uv_spawn(uv_loop_t* loop,
uv_process_t* process,
const uv_process_options_t* options) {
// ...
/* Only activate this handle if exec() happened successfully */
if (exec_errorno == 0) {
QUEUE_INSERT_TAIL(&loop->process_handles, &process->queue);
uv__handle_start(process);
}
// ...
}

uv_timer_t

1
2
3
4
5
#define UV_LOOP_PRIVATE_FIELDS                                                \
struct { \
void* min; \
unsigned int nelts; \
} timer_heap; \

uv_timer_t 还会被插入到 loop->timer_heap 堆(最小堆)中,堆节点为 handle->heap_node

以上这些针对于不同类型的队列/链表/堆结构,都是为了方便的找到并统一处理相同类型的 handle。除了以上这些类型的 handle 之外,在 loop 上并没有这种特定类型的直接入口,而是通过其他链间接访问到 handle 的。

uv__io_t

loop 中,存在一个 watchers 数组,这个数组的每一项都是一个指向 uv__io_t 结构的指针,uv__io_t 是一个I/O观察者被内嵌到多个IO相关的 handle 结构中,所以所有的内嵌I/O观察者的 handle 都通过 watchers 被关联到 loop 上了。uv__io_t 存在多个子类型,这些子类型都可以被放到 watchers 数组中。

另外,还存在两个I/O观察者队列:

  • watcher_queue:所有的IO观察者都会被插入到这个队列中。
  • pending_queue:所有的挂起IO观察者都会被插入到这个队列中。

uv__io_t 相关字段声明:

https://github.com/libuv/libuv/blob/v1.28.0/include/uv/unix.h#L218

1
2
3
4
5
6
#define UV_LOOP_PRIVATE_FIELDS                                                \
void* pending_queue[2]; \
void* watcher_queue[2]; \
uv__io_t** watchers; \
unsigned int nwatchers; \
unsigned int nfds; \

uv__work

uv__work 是 libuv 中任务的抽象,任务有线程池处理,任务在发起 uv_work_t request 时创建。

loop 中,存在一个任务队列,这个队列中记录了所以经线程池处理完成的任务,队列入口为:loop->wq

https://github.com/libuv/libuv/blob/v1.28.0/include/uv/unix.h#L218

1
2
#define UV_LOOP_PRIVATE_FIELDS                                                \
void* wq[2]; \

队列初始化:

https://github.com/libuv/libuv/blob/view-v1.28.0/src/unix/loop.c#L29

1
2
3
4
5
int uv_loop_init(uv_loop_t* loop) {
// ...
QUEUE_INIT(&loop->wq);
// ...
}

任务被处理完成或者任务取消后,都会被插入到该队里中。

closing_handles

Close 中提到过 closing_handles 关联了所有正在关闭的 handle,这也是一个关联的入口。

handle 可能同时存在于 loop->handle_queue 队列、loop->closing_handles 链表 以及 其他某一个特定类型的 handle 队列中。

通过上面的描述,可以在大脑中勾勒出一幅数据结构图。

除了 通过 loop 可以找到每一个 Handle,每一个 Handle 也可以通过其 loop 字段找到其所在的 loop

以上这些 handle 通过各种方式关联到 loop 上除了 loop 作为资源的统一入口需要管理注册记录所有资源外,还需要使 事件循环 在运行的时候,能够方便的高效的处理这些 handle

Request

Requests 一般代表一个短生命周期的操作,有些 Request 需要通过在 Handle 上执行,有些 Request 则可以直接执行。

在 libuv 中,有如下 Request:

1
2
3
4
5
6
7
8
9
10
/* Request types. */
typedef struct uv_req_s uv_req_t;
typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
typedef struct uv_getnameinfo_s uv_getnameinfo_t;
typedef struct uv_shutdown_s uv_shutdown_t;
typedef struct uv_write_s uv_write_t;
typedef struct uv_connect_s uv_connect_t;
typedef struct uv_udp_send_s uv_udp_send_t;
typedef struct uv_fs_s uv_fs_t;
typedef struct uv_work_s uv_work_t;

uv_getaddrinfo_t uv_getnameinfo_t uv_fs_t uv_work_t 可以直接执行,不依赖于 handle,直接关联到 loop 上。

uv_connect_t uv_write_t uv_udp_send_t uv_shutdown_t 都是跟读写相关的 request,其操作依赖于 uv_stream_t,也就是这些操作作用于 uv_stream_t,这些 request 通过 handle 关联到 loop 上。

handle 相似,request 也有一个基础结构 uv_req_s,其他 request 都通过组合复用 uv_req_s 的字段。

uv_req_s 结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
#define UV_REQ_FIELDS                   \
/* public */ \
void* data; \
/* read-only */ \
uv_req_type type; \
/* private */ \
void* reserved[6]; \
UV_REQ_PRIVATE_FIELDS \

/* Abstract base class of all requests. */
struct uv_req_s {
UV_REQ_FIELDS
};

request 并不需要在用户代码中显示的初始化,初始过程在相关的实现代码中由核心处理,通用的初始化部分 由 uv__req_init 函数处理,如下代码实现:

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.h#L310

1
2
3
4
5
6
#define uv__req_init(loop, req, typ)                                          \
do { \
UV_REQ_INIT(req, typ); \
uv__req_register(loop, req); \
} \
while (0)
1
2
3
4
5
6
# define UV_REQ_INIT(req, typ)                                                \
do { \
(req)->type = (typ); \
} \
while (0)
#endif

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.h#L205

1
2
3
4
5
6
7
8
9
10
11
12
#define uv__req_register(loop, req)                                           \
do { \
(loop)->active_reqs.count++; \
} \
while (0)

#define uv__req_unregister(loop, req) \
do { \
assert(uv__has_active_reqs(loop)); \
(loop)->active_reqs.count--; \
} \
while (0)

uv__req_init 初始化了 request 的类型,并通过 uv__req_register 更新 request 的引用计数,也可以通过 uv__req_unregister 反注册。

request 的更多内容,将在后续的分析中结合相关功能继续介绍。


查看源文件&nbsp;&nbsp;编辑源文件

事件循环是 libuv 功能的核心部分。它的主要职责是对 I/O 进行轮询然后基于不同的事件源调用它们的回调。

事件循环主体数据结构在 libuv 中用 struct uv_loop_s 或类型别名 uv_loop_t 表示,文中统一使用 loop 表示其实例,它代表了事件循环,实际上它是事件循环所有资源的统一入口,所有在事件循环上运行的各类 Handle/Request 实例都被注册到 uv_loop_s 内部的各类结构中如队列、堆、伸展树等,同一 Handle 实例通常也会被关联到多个不同的结构中,如大多数 Handle 都会同时存在两个队列中。

uv_loop_t 是一种特殊的 Handle,它管理了同一事件循环上的所有资源。

uv_loop_t 实例通常需要经历 InitRunStopClose 这几个生命周期,下面将分别分析几个阶段的实现。

Example

1
2
3
4
5
6
7
8
9
10
11
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>

int main() {
uv_loop_t* loop = uv_default_loop();
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
printf("quit.\n");
return 0;
}
1
gcc -luv helloworld.c -o helloworld
1
2
$ ./helloworld
quit.

以上是一个最基本的 libuv 程序代码,通过 uv_run 函数启动了 libuv 事件循环,所以 uv_run 做为事件循环的入口一定是阅读源码的重点,可以 uv_run 为起点,先看看 uv_run 都做了什么。

程序启动之后,打印了 quit.\n 立刻退出了,更具体一点说就是,所以函数调用尤其是 uv_run 函数立即返回了,程序自然就退出了,因为我们实际什么也有做,连上文提到的异步操作以及注册的回调函数都没有。

Init:uv_loop_init

在常见的使用场景中,通常都是直接调用 uv_default_loop 获取已经初始了的全局 uv_loop_t 实例,所以在分析 uv_run 之前,先看一下 uv_loop_t 初始化。

先来看一下 uv_default_loophttps://github.com/libuv/libuv/blob/v1.28.0/src/unix/loop.c#L30

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static uv_loop_t default_loop_struct;
static uv_loop_t* default_loop_ptr;


uv_loop_t* uv_default_loop(void) {
if (default_loop_ptr != NULL)
return default_loop_ptr;

if (uv_loop_init(&default_loop_struct))
return NULL;

default_loop_ptr = &default_loop_struct;
return default_loop_ptr;
}

在 libuv 中存在一个全局的、静态的 uv_loop_t 实例 default_loop_struct,首次获取的时候经过 uv_loop_init 进行了初始化。

uv_default_loop 调用 uv_loop_initdefault_loop_struct 进行初始化并将地址赋给了 default_loop_ptr

uv_loop_init 实现如下(含注释):

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/loop.c#L29

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
int uv_loop_init(uv_loop_t* loop) {
void* saved_data;
int err;

// 数据清零
saved_data = loop->data;
memset(loop, 0, sizeof(*loop));
loop->data = saved_data;

// 定时器 uv_timer_t 相关:初始化定时器堆
heap_init((struct heap*) &loop->timer_heap);
// 初始化用于接收线程池中已完成任务的队列
QUEUE_INIT(&loop->wq);
// 初始化 uv_idle_t 队列
QUEUE_INIT(&loop->idle_handles);
// 初始化 uv_async_t 队列
QUEUE_INIT(&loop->async_handles);
// 初始化 uv_check_t 队列
QUEUE_INIT(&loop->check_handles);
// 初始化 uv_prepare_t 队列
QUEUE_INIT(&loop->prepare_handles);
// 初始化 uv_handle_t 队列,所以初始化后的 handle 都会放到此队列中
QUEUE_INIT(&loop->handle_queue);

// 初始化 活跃的 handle 和 request 数量
loop->active_handles = 0;
loop->active_reqs.count = 0;

// 开始初始化I/O观察者相关字段
// 文件描述符数量
loop->nfds = 0;
// I/O观察者数组首地址指针
loop->watchers = NULL;
// I/O观察者数组数量,但是 `loop->watchers` 实际长度为:nwatchers + 2
loop->nwatchers = 0;
// 初始化 挂起的I/O观察者队列,挂起的I/O观察者会被插入此队列延迟处理
QUEUE_INIT(&loop->pending_queue);
// 初始化 I/O观察者队列,所有初始化后的I/O观察者都会被插入此队列
QUEUE_INIT(&loop->watcher_queue);

// 关闭的 handle 队列,单向链表
loop->closing_handles = NULL;
// 初始化计时器 loop->time
uv__update_time(loop);

// uv_async_t
// 初始化 async_io_watcher,它是一个I/O观察者,用于 uv_async_t 唤醒事件循环
loop->async_io_watcher.fd = -1;
// 用于写数据给 async_io_watcher
loop->async_wfd = -1;

// uv_signal_t
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
// epoll_create()
loop->backend_fd = -1;
// EMFILE 错误相关
loop->emfile_fd = -1;

// 定时器计数器
loop->timer_counter = 0;

// 事件循环关闭标识
loop->stop_flag = 0;

// 平台特定初始化:UV_LOOP_PRIVATE_FIELDS
err = uv__platform_loop_init(loop);
if (err)
return err;

// uv_signal_t
// 初始化进程信号
uv__signal_global_once_init();

// uv_proccess_t
// 初始化子进程信号观察者
err = uv_signal_init(loop, &loop->child_watcher);
if (err)
goto fail_signal_init;
// 解引用loop->child_watcher
uv__handle_unref(&loop->child_watcher);
loop->child_watcher.flags |= UV_HANDLE_INTERNAL;
// 初始化子进程 handle 队列
QUEUE_INIT(&loop->process_handles);

// 初始化线程读写锁
err = uv_rwlock_init(&loop->cloexec_lock);
if (err)
goto fail_rwlock_init;

// 初始化线程互斥量锁
err = uv_mutex_init(&loop->wq_mutex);
if (err)
goto fail_mutex_init;

// uv_work_t
// 初始化loop->wq_async,用于结束任务完成信号,并注册处理函数
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
if (err)
goto fail_async_init;
// 解引用
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV_HANDLE_INTERNAL;

return 0;

fail_async_init:
uv_mutex_destroy(&loop->wq_mutex);

fail_mutex_init:
uv_rwlock_destroy(&loop->cloexec_lock);

fail_rwlock_init:
uv__signal_loop_cleanup(loop);

fail_signal_init:
uv__platform_loop_delete(loop);

return err;
}

uv_loop_init 的初始化代码是比较长的,它初始化了 libuv 运行时所有依赖的内容,这包括事件循环自身运行所需的内容,以及各类型 Handle 运行所需的共有内容和特定内容,这些都在 uv_loop_t 实例初始化的时候一并进行了初始化,初始化细节和很多有其他功能相关,当分析其他功能时,还会提及涉及到的该函数的部分代码块。

Run:uv_run

下面我们就来看一下 uv_run 函数都干了什么:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L343

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;

r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);

while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);

timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);

uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);

if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}

r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}

/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;

return r;
}

可以看到 uv_run 内部就是一个 while 循环,在 UV_RUN_ONCEUV_RUN_NOWAIT 两种模式下,循环在执行一次后就会 break,一次性的,实际上没有循环。另外 在 uv__loop_alive(loop) == 0 或者 loop->stop_flag != 0 时 无法进入循环,同样循环结束,uv_run 函数返回。

该函数就如官网给出的流程图一样简单

loop_iteration

再看看几个关键的函数调用:

  1. uv__update_time(loop):对应图中 Update loop time
  2. uv__run_timers(loop):对应图中 Run due timers,用于 uv_timer_t,见Timer
  3. uv__run_pending(loop):对应图中 Call pending callbacks,用于 uv__io_t,见I/O-Watcher
  4. uv__run_idle(loop):对应图中 Run idle handles,用于 uv_idle_t
  5. uv__run_prepare(loop):对应图中 Run prepare handles,用于 uv_prepare_t
  6. uv__io_poll(loop, timeout):对应图中 Poll for I/O,用于 uv__io_t,见I/O-Watcher
  7. uv__run_check(loop):对应图中 Run check handles,用于 uv_check_t
  8. uv__run_closing_handles(loop):对应图中 Call close callbacks,用于 uv_handle_t,见Handle and Requst

以上执行逻辑正好和文档中的各个执行阶段相对应,文档中描述的各个执行阶段分别对应了不同的函数调用。整个循环迭代的不同阶段,对应于不同类型/状态的 handle 处理。除了用于 uv_timer_tuv_idle_tuv_prepare_tuv_check_t 这四种类型的 handle 处理的几个阶段之外,没看到其他 handle 相关内容,倒是有个 uv__io_t 的处理,这是前文所提到的 libuv 内部关于I/O观察者的一个基本抽象,所有其他的 handle 都可以当做是一个I/O观察者,类似于双重继承。

如果这个函数处于一直不断的循环状态,所在进程岂不是会一直占用CPU?实际上不会这样的,因为线程会在 uv__io_poll(loop, timeout) 这个函数内部因为阻塞而挂起,挂起的时间主要由下一次到来的定时器决定。在线程挂起这段时间内,不会占用CPU。

uv_run 启动事件循环,才使所有活动状态的 handle 开始工作,否则所有 handle 都是静止的,这一步就是 libuv 启动的按钮。

事件循环自身存在存活状态,通过 uv__loop_alive 判断,实现如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L331

1
2
3
4
5
static int uv__loop_alive(const uv_loop_t* loop) {
return uv__has_active_handles(loop) ||
uv__has_active_reqs(loop) ||
loop->closing_handles != NULL;
}

uv__loop_alive 判断 loop 是否是存活状态,满足以下三种条件之一即是存活状态:

  • 存在活跃的 handle
  • 存在活跃的 request
  • 正在关闭的 handle 列表不为空

所以,若想成功事件事件循环一直不断的运行而不退出,必须在 uv_run 之前想事件循环里放入处于活跃状态的 handlerequest

uv_loop_t 结构中,存在记录处于活动状态的 handlerequest 的计数器,所以通过简单的判断数量即可,实现如下:

1
2
#define uv__has_active_handles(loop)                                          \
((loop)->active_handles > 0)
1
2
#define uv__has_active_reqs(loop)                                             \
((loop)->active_reqs.count > 0)

另外,除了存活状态之外,loop 还存在一个 stop_flag 字段 标识 loop 是否处于关闭状态。

所以,当 loop 中没有活动的 handlerequest 时 或者 关闭标识开启时,事件循环跳出。

libuv 在运行时 有三种模式:对应模式的用途看文档上对应的描述即可,uv_run

在 Run 的过程中,多次调用 uv__update_time 来更新时间

https://github.com/libuv/libuv/blob/v1.x/src/unix/internal.h#L288

1
2
3
4
5
UV_UNUSED(static void uv__update_time(uv_loop_t* loop)) {
/* Use a fast time source if available. We only need millisecond precision.
*/
loop->time = uv__hrtime(UV_CLOCK_FAST) / 1000000;
}
1
2
3
uint64_t uv__hrtime(uv_clocktype_t type) {
return gethrtime();
}

这个函数通过调用 gethrtime 获取系统当前时间,精度非常高,单位是纳秒(ns),1纳秒等于十亿分之一秒。除 1000000 后的时间单位为 毫秒(ms)。

时间对 libuv 来说非常重要,很多机制依赖于这个时间,比如定时器,后续的分析中,我们将会看到相关的利用。

在事件循环中,还有一个 timeout,这个值用于控制 uv__io_poll(loop, timeout) 的挂起时长,这个变量的值是通过 uv_backend_timeout 来获取的,源码如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/unix/core.c#L311

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int uv_backend_timeout(const uv_loop_t* loop) {
if (loop->stop_flag != 0)
return 0;

if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;

if (!QUEUE_EMPTY(&loop->idle_handles))
return 0;

if (!QUEUE_EMPTY(&loop->pending_queue))
return 0;

if (loop->closing_handles)
return 0;

return uv__next_timeout(loop);
}

uv_backend_timeout 在多个情况下都返回 0,这些情况表明不需要等待超时,如果前面的条件都不满足,会通过 uv__next_timeout 计算 timeout,源码如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/timer.c#L137

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int uv__next_timeout(const uv_loop_t* loop) {
const struct heap_node* heap_node;
const uv_timer_t* handle;
uint64_t diff;

heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
return -1; /* block indefinitely */

handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout <= loop->time)
return 0;

diff = handle->timeout - loop->time;
if (diff > INT_MAX)
diff = INT_MAX;

return (int) diff;
}

uv__next_timeout 有两种 情况:

  • 堆为空,返回 -1
  • 堆非空,返回 堆顶定时器当前时间的差值,但是差值不能越界。

综合在一起,uv_backend_timeout 有可能返回 -1 0 正整数

可以看到 timeout 作为参数传递给了 uv__io_poll,而 timeout 正好作为 epoll_pwait 的超时时间,所以,这个 timeout 的作用主要是使 epoll_pwait 能够有一个合理的超时时间:

  • timeout-1 的时候这个函数会无限期的阻塞下去;
  • timeout0 的时候,就算没有任何事件,也会立刻返回,当没有任何需要等待的资源时,timeout 刚好为 0
  • timeout 等于 正整数 的时候,将会阻塞 timeout 毫秒,或有I/O事件产生。

epoll_pwait 要在定时器时间到来时返回以进入下一次事件循环处理定时器,如果不能返回,将会导致定时任务不能按时得到处理,即使是按时返回,也不一定能够那是处理,因为 uv__io_poll 之后还有其他逻辑代码要执行,甚至是有可能是耗时计算,所以,Node.js 中定时器是不精确的,浏览器中类似。

UV_RUN_ONCE 模式下,因为循环会直接跳出,不会再次进入循环处理定时器,所以需要在这种模式下,需要处理额外处理定时器。

至此,事件循环的大逻辑已经分析完成了,后续,将会在各类型的 handle 的处理逻辑中展开对事件循环各阶段的内容分析。

Stop

Stop 将使事件循环在下一次循环因不满条件而无法进入,源码如下:

https://github.com/libuv/libuv/blob/v1.28.0/src/uv-common.c#L517

1
2
3
void uv_stop(uv_loop_t* loop) {
loop->stop_flag = 1;
}

调用 uv_stop 后,事件循环同样无法进入,程序退出。

以上,我们分析了事件循环执行的流程。


查看源文件&nbsp;&nbsp;编辑源文件

简介

libuv 是一个专注于异步I/O的跨平台的程序库,它主要是用于支持 Node.js,但是也被如 Luvit、Julia、pyuv 等很多其他的库使用。它使得 异步I/O 变的简单。

libuv 对于经常接触 c 语言程序开发的开发人员来说,应该是非常容易的,但是对于没有没有相关经验的开发人员来说,阅读源码就比较吃力了,但是如果能阅读并理解 libuv,才能更深入的了解 Node.js 是怎样工作的,以及 Node.js 中的事件驱动、非阻塞异步I/O是怎么一回事儿,它是怎么解决这些问题的,它有哪些不足。

基础要求

阅读源代码需要 c 语言相关的知识和一定的系统编程基础,需要了解常见的系统API调用及相关机制。主要有以下几个方面:

  1. libuv 主要是用 c 实现的,所以需要有一定的 c 语言基础,尤其是宏、结构体、指针、函数指针、数组等内容,对 c 语言程序的内存布局有一定了解;
  2. libuv 内部包含 队列 等基础的数据结构,需要有一定的了解;
  3. libuv 内部使用了很多系统API,所以需要对相应系统平台系统编程和操作系统原理有一定了解,了解进程、线程、文件系统、网络等,了解常见的异步IO模型,最好阅读过 APUE

libuv 内部实现采用的大量的来复用代码,并配合使用强制类型转换等机制实现类似于高级语言中继承的效果,这依赖于 c 语言结构体内存布局,本身是非常不容易阅读的,增加了阅读源代码的难度。同时内部进行一定的抽象,需要一定的理解能力。

libuv 支持多个平台,尤其是支持 *nixwin 两种设计实现差异很大的操作系统,其内部也大量使用宏进行条件编译,且部分功能在不同环境下的实现有可能完全不同,所以在阅读源码时只关注某一平台代码。本源码分析也仅针对 linux 环境的相关代码的分析。

设计概述

本部分内容主要来源于官方文档,建议对照官方文档 Design overview 阅读。

libuv 最初是为 NodeJS 编写的跨平台支持库。它是围绕事件驱动的异步I/O模型设计的。提到异步非阻塞I/O模型,有经验的开发人员应该能很快意识到这是一种很常见的实现高并发的模型,这是一种不同于多进程/多线程的并发模型。并发模型可以参考知乎专栏并发模型之间的比较或其他资料。

libuv 不仅仅在多种不同I/O轮询机制之上提供了的简单抽象——I/O观察者,更通过 handlesstreams 为套接字和其他实体提供了更高级别的抽象,同时也提供了跨平台I/O和线程管理能力,还包含一些其他功能。

下图说明了 libuv 的不同组成部分以及它们与哪些子系统相关:

architecture

从图中可以看到,libuv 主要部分都是和I/O相关的,主要包括网络I/O和文件I/O,其中文件I/O和其他少部分功能基于线程池实现,网络I/O在 *nix 平台基于 uv__io_t(内部抽象的I/O观察者)实现,uv__io_t 又基于不同环境采用了不同的底层机制,网络I/O在 win 平台基于 IOCP 机制实现。

Handles and requests

libuv 为用户提供了两个与实践循环结合使用的抽象:handlesrequests

Handles 代表长生命周期的对象有能力执行某些操作当它们处于激活状态下。例如:

  • prepare handle 在激活时,每次事件循环迭代都会调用一次它的回调;
  • TCP server handle 在每一次有一个新的 connection 进来的时候都会调用一次它的回调。

Requests 一般代表一个短生命周期的操作。这些操作可以通过 handle 执行:write requests 用于在 handle 上写数据,所以 requesthandle 可能存在一定的数据关联;或者也可以独立执行:getaddrinfo requests 不需要一个 handle,他们直接运行在事件循环中。

相关的代码分析在Handle and Requst 部分进行。

The I/O loop

I/O loop 也就是事件循环(Event Loop)是 libuv 的核心组成部分。它为所有 I/O 操作建立内容,实际上这意味着事件循环被绑定到一个单一的线程。可以运行多个不同的事件循环只要它们在不同的线程中。除非另有说明,事件循环(任何涉及事件循环和 handle 的API)并不是线程安全的。

所有的异步操作的结果最终在事件循环中被处理,也就是通常所说的回调函数,在事件循环中被调用。因为事件循环线程是单线程模式,所以所有用户代码都在一个线程中运行,libuv 更像是一个调度器,在合适的时候调度用户代码运行。此时,如果用户代码CPU密集型的耗时运算,将会阻塞事件循环。

事件循环是非常常见的单线程异步I/O的处理方法:所有(网络)I/O都在非阻塞的套接字上执行,这些套接字使用给定平台上可用的最佳机制进行轮询:Linux 上使用 epoll,OSX 和其他 BSDs 系统上使用 kqueue,SunOS 使用 event ports,Windows 上使用 IOCP。以上I/O轮询作为事件循环迭代的一部分,事件循环将会被阻塞在I/O轮询(例如:linux 上的 epoll_pwait 调用),直到被添加到轮询器中的套接字有IO活动(事件),事件循环线程将会在有IO事件时被唤醒,关联的回调函数将会被调用表明套接字有新的连接,然后便可以在 handles 上进行读、写或其他想要进行的操作 requests

下图显示了事件循环的所有阶段:

loop_iteration

图中主要有七个阶段,

其中 idlepreparecheck 的实现完全相同,调用时间不同,类似于生命周期勾子,这几个阶段目的是允许开发者在事件循环的特定阶段执行代码,在 Node.js 主要用于性能信息收集。这三个阶段的实现代码比较简单,很容易理解。因源代码几乎完全使用宏实现,所以编辑器无法跳转到对应实现,搜索关键字也无法匹配,这里给出源文件路径:src/unix/loop-watcher.c,便于读者找到源文件。

其余剩下的阶段就主要有 Call pending callbacks Poll for I/O Call close callbacks,这三个阶段主要用于处理IO操作等异步操作结果,阅读源码也主要是围绕着这三个阶段的代码展开的。

各阶段用途描述:

  1. Run due timers:处理定时任务;
  2. Call pending callbacks:处理上一轮事件循环中因出现错误或者逻辑需要等原因挂起的任务;
  3. Run idle handles;
  4. Run prepare handles;
  5. Poll for I/O:事件循环 则有可能 因为 epoll_wait 而阻塞在这里,这取决于 timeout 参数是否为 0,但是通常情况下,会阻塞到有关注的IO事件发送时回,这也直接避免了时间循环一直工作导致占用CPU的问题。这个阶段是整个 libuv 事件循环最重要的阶段。libuv 的大部分 handle (都是I/O相关的)都依赖该阶段实现。
  6. Run check handles:
  7. Call close callbacks:清理被关闭的 handles

更多详细描述,请直接阅读The I/O loop的详细描述。

相关的代码分析在EventLoop 部分进行。

File I/O

不同于网络IO,目前没有 libuv 可以依赖的平台特定(异步)文件IO机制,所以当前的方式是在线程池中运行阻塞式的文件IO操作。

libuv 目前采用一个全局的线程池,所有事件循环都可以在向其任务队列提交任务,目前有3种类型的操作运行在线程中:

  • 文件系统操作:readwrite
  • DNS功能操作:getaddrinfogetnameinfo
  • 通过 uv_queue_work() 提交的用户特定的任务

有关跨平台文件I/O现状的详尽说明:asynchronous disk I/O

线程池中的线程在工作完成之后,会向事件循环线程发送数据,然后再主线程中触发回调。

Reactor模式

通过以上的介绍和描述,我们可以获知几个关键概念:Event-LoopAsync I/O, HandleRequest

  • Event-Loop 是一个程序结构,用于在程序中 等待派发 消息和事件。它实际是一个运行时概念,表示一个运行时逻辑。Event-Loop 是实现 Event-Driven 编程的基本结构。
  • Async I/O 相比 Sync I/OAsync I/O 一个典型的特征是不会阻塞,因为I/O操作通常比较慢,同步I/O操作会导致程序阻塞在I/O操作上,进而导致程序响应很慢。Async I/O 就是为了避免阻塞的。

Event-LoopAsync I/O 是实现高性能I/O的常见手段,这些都始于 C10k 问题,单服务器同时服务 10000 个客户端,当然现在远不止这些了。早期的服务器是基于进程/线程模型,每新来一个连接,就分配一个进程(线程)去处理这个连接,这是一个非常大的开销,容易达到系统软硬件瓶颈,另外,进程/线程切换上下文的成本也非常高。

一般而言,大多数网络应用服务器端软件都是I/O密集型系统,服务器系统大部分的时间都花费在等待数据的输入输出上了,而不是计算,如果CPU把时间花在等待I/O操作上,就白白浪费了CPU的处理能力了,更重要的是,此时可能还有大量的客户端请求需要处理,而CPU却在等待I/O无法脱身。最终,以此方式工作的服务器吞吐量极低,需要更多的服务支撑业务,导致成本升高。

为了充分利用CPU的计算能力,就需要避免让CPU等待I/O操作完成能够抽出身来做其他工作,例如,接收更多请求,等I/O操作完成之后再来进一步处理。这就有了 非阻塞I/O。异步I/O给编程带来了一定的麻烦,因为同步思维对于人来说更自然、更容易,也不易于调试,但是实际上限时世界原本偏向异步的。如何在I/O操作完成后能够让CPU回来继续完成工作也需要更复杂的流程逻辑控制,这些都带来了一定的设计难度。不过幸好,聪明的开发者设计了 Event-Driven 编程模型解决了此问题。基于此模型,也衍生出高性能IO常见实现模式:Reactor 模式,该模式有很多变体。Redis、Nginx、Netty、java.NIO都采用类似的模式来解决高并发的问题,libuv 自然也不例外,实现了事件驱动的异步IO。Reactor 模式采用同步I/O,Proactor 是一个采用异步I/O的模式。

该部分内容参考:

libevent vs libev vs libuv

  • libevent:名气最大,应用最广泛,历史悠久的跨平台事件库;
  • libev:较 libevent 而言,设计更简练,性能更好,但对 Windows 支持不够好;
  • libuv:开发 node 的过程中需要一个跨平台的事件库,他们首选了 libev,但又要支持 Windows,故重新封装了一套,linux 下用 libev 实现,Windows下用 IOCP 实现;

libevent、libev、libuv 都是 c 语言实现的事件驱动(Event-Driven)的异步I/O(Async I/O)库。

异步I/O(Async I/O)库本质上是提供异步I/O事件通知(Asynchronous Event Notification,AEN)的。异步事件通知机制就是根据发生的事件,调用相应的回调函数进行处理。

  • 事件(Event):事件是通知机制的核心,比如I/O事件、定时器事件、信号事件事件。有时候也称事件为事件处理器(EventHandler),这个名称更形象,因为 Handler 本身表示了包含处理所需数据(或数据的地址)和处理的方法(回调函数),更像是面向对象思想中的称谓。
  • 事件循环(EventLoop):是事件驱动(Event-Driven)的核心,等待并分发事件。事件循环用于管理事件。

对于应用程序来说,这些只是异步事件库提供的API,封装了异步事件库跟操作系统的交互,异步事件库会选择一种操作系统提供的机制来实现某一种事件,比如利用 Unix/Linux 平台的 epoll 机制实现网络IO事件,在同时存在多种机制可以利用时,异步事件库会采用最优机制。

该部分内容参考:

源码

版本

1
2
3
$ git checkout v1.28.0
Previous HEAD position was a4fc9a66 2019.03.17, Version 1.27.0 (Stable)
HEAD is now at 7bf8fabf 2019.04.16, Version 1.28.0 (Stable)

目录结构

以下为源码的文件结构,删掉了无关的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
$ tree
.
├── README.md
├── docs/
├── include
│ ├── uv
│ │ ├── aix.h
│ │ ├── android-ifaddrs.h
│ │ ├── bsd.h
│ │ ├── darwin.h
│ │ ├── errno.h
│ │ ├── linux.h
│ │ ├── os390.h
│ │ ├── posix.h
│ │ ├── stdint-msvc2008.h
│ │ ├── sunos.h
│ │ ├── threadpool.h
│ │ ├── tree.h
│ │ ├── unix.h
│ │ ├── version.h
│ │ └── win.h
│ └── uv.h
├── src
│ ├── fs-poll.c
│ ├── heap-inl.h
│ ├── idna.c
│ ├── idna.h
│ ├── inet.c
│ ├── queue.h
│ ├── strscpy.c
│ ├── strscpy.h
│ ├── threadpool.c
│ ├── timer.c
│ ├── uv-common.c
│ ├── uv-common.h
│ ├── uv-data-getter-setters.c
│ ├── version.c
│ ├── unix/
│ └── win/
├── test/
├── samples/
├── uv.gyp
└── vcbuild.bat

源码主要存在于以下几个目录:

  • include:存放 .h 文件,这些文件主要用于对外暴露 c API

    • include/uv.h 文件存放平台无关的头文件,该文件需要被 include 依赖项目的源码当中。
    • include/uv/*.h 路径下的文件则是针对不同平台进行的不同相关类型的声明定义等。

    include/uv.h 会根据不同的环境 include uv/win.huv/unix.huv/unix.hinclude *nix 系的其他系统相关头文件。如同通常的c库一样,uv.h 不仅作为入口文件,同时还具备文档的作用,阅读源码自然适合从此文件开始。

    include/tree.h 是个例外,该文件内通过 宏实现了 伸展树红黑树,而 同样采用实现的 队列 存放在 src/queue.h 文件中

  • src:存放 .c 文件,和一些 不对外暴露的 .h 文件

    • uv-common.h/uv-common.c 包含部分公共的内部数据结构、函数的声明和实现,会被 src 内部大部分其他文件 包含
    • timer.c 对应于 定时器 的实现
    • threadpool.c 实现了线程池,对应的线程管理实现存在于 src/[unix|win]/thread.c 文件中
    • queue.h 基于宏实现的简单的队列
    • heap-inl.h 最小二叉堆实现,未采用宏实现
    • fs-poll.c 文件系统轮询相关实现
    • idna.h/idna.c IDNA Punycode 相关实现代码
    • unix/ *nix 平台相关实现
    • win/ win 平台相关实现
  • test:存放一些 单元测试 代码,这里面的很多代码可以作为参考示例

  • samples:存放 示例代码,其中 samples/socks5-proxy 是一个基于 libuv 实现的 sock5 代理

命名风格

libuv 所有函数、结构体都采用了统一的前缀 uv_,名称格式为:uv_ + namename 可以以下划线开头,表示内部成员,例如:

  • 公开名称:uv_loop_t = uv_ + loop_t uv_loop_start = uv_ + loop_start
  • 内部名称:uv__io_t = uv_ + _io_t uv__io_poll = uv_ + _io_poll

数据结构

libuv 中的数据结构(队列,堆)采用被称为侵入式的实现方式实现,下图为 Linux 内核 /include/linux/list.h 的实现示意图:

该队列与通常实现最大的不同是指向兄弟节点的 prevnext 指针存储的并非兄弟节点数据的首地址,而是兄弟节点的某个成员(数据结构某个成员,图中是 list_head)的地址,如果需要拿到完整的数据结构,需要获得数据结构的首地址,已知条件是已知 list_head 的地址,c 语言程序中结构体的内存布局是对齐的,所以可以计算出 list_head 相对数据结构首地址的偏移量,这样就可以算出数据结构首地址了,container_of 就是完成这一换算的。这种方式实现的数据结构,图中 data structure 1data structure 2data structure 3 可以是不同的类型,所以这种方式实现的数据结构可以很通用。

container_of 定义如下:

1
2
#define container_of(ptr, type, member) \
((type *) ((char *) (ptr) - offsetof(type, member)))

在 linux 内核中也定义了这个宏,但是略微不同,宏定义如下:

https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/linux/kernel.h?id=refs/tags/v4.10.13#n842

1
2
3
4
5
6
7
8
9
10
/**
* container_of - cast a member of a structure out to the containing structure
* @ptr: the pointer to the member.
* @type: the type of the container struct this is embedded in.
* @member: the name of the member within the struct.
*
*/
#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})

关于 container_of 的实现原理可参考:

接下来,开始详细分析 libuv 源码的各个部分内容,请见下文。


查看源文件&nbsp;&nbsp;编辑源文件

0%