/* To avoid deadlock with uv_cancel() it's crucial that the worker * never holds the global mutex and the loop-local mutex at the same time. */ staticvoidworker(void* arg) { structuv__work* w; QUEUE* q; int is_slow_work;
uv_sem_post((uv_sem_t*) arg); arg = NULL;
// 加锁 mutex // 因为只有一个线程能抢占锁,所以多个线程也只能一个接一个的进入循环 // 因为整个线程池中线程创建过程中不会出现其他线程在其他位置抢占并锁定 mutex 的情形出现, // 所以只有该位置会抢占加锁,而后很快释放锁,所以线程池中的线程之后短暂的阻塞在这里。 // 工作线程需要不断的等待处理任务,所以需要进入死循环 uv_mutex_lock(&mutex); for (;;) { /* `mutex` should always be locked at this point. */
/* Keep waiting while either no work is present or only slow I/O and we're at the threshold for that. */ // 条件满足时,没有任务需要处理,线程进入挂起等待状态,等待被唤醒。 while ( // 任务队列为空 QUEUE_EMPTY(&wq) || // 任务队列非空,但是 // 队列头部被标记为慢速IO任务 // 且该队列中只有run_slow_work_message一个数据节点 // 且正在处理的慢IO任务超过阈值(默认2) // 该一个条件避免太多线程同时都在处理慢IO操作 // 达到阈值后空闲的线程不再接慢IO任务而是挂起,等待非慢IO操作任务 能有机会尽快得到处理 // 正在进行的慢IO任务完成后,阈值限制解除,可以接慢IO任务 // 最终,保证了最多只有 `(nthreads + 1) / 2` 个线程处理慢IO // 区分了快车道和慢车道后,能有效避免慢车堵快车,提升性能 (QUEUE_HEAD(&wq) == &run_slow_work_message && QUEUE_NEXT(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { // 进入休息区,注意某线程在执行 while 循环时该线程一定抢占了 mutex,不论是首次还是后续执行 // 线程挂起,等待唤醒 // uv_cond_wait 会使线程挂起等待cond上的信号,为防止多线程同时调用 uv_cond_wait,必须提前加锁 // uv_cond_wait 在挂起前会释放 mutex,其他阻塞在 mutex 上的线程会在 mutex 释放时被唤醒,并在唤醒时重新抢占 mutex,即只能唤醒一个 // 所以,阻塞在for循环外的多个线程中的某一个会重新抢占 mutex 执行到达此处挂起,又继续唤醒其他线程 // 也可能唤醒 阻塞在 uv__work_submit -> post 函数提交任务的抢占锁的位置的线程(通常为主事件循环线程) // 挂起的线程都是空闲的线程,被唤醒后为非空闲的线程,所以需要更新空闲线程计数 idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; // 挂起的线程在被唤醒后,一定不满足再次进入循环的条件,会继续向下执行 } // 进入工作区,一共有三个区间,前后两个区间都有锁,中间的区间执行用户代码无锁 // 线程被唤醒,开始干活
/* If we're at the slow I/O threshold, re-schedule until after all other work in the queue is done. */ // 如果当前运行的慢IO操作的线程数达到阈值(2个线程) // 则将这些操作插入到 wq 队列末尾,延迟处理 // 避免多个线程同时处理慢IO // 临界状态:已经有达到阈值限制个数的线程进入工作区处理慢IO任务,但是还没执行更新慢IO线程计数器代码, // 后续被慢IO任务唤醒的线程线程可能因为慢IO线程计数器未更新而满足进入条件。 // 但是,因为该区间锁定了 mutex,阻塞在 uv_cond_wait 处的代码无法抢占锁无法执行,也就是无法跳出 while 循环, // 到 mutex 释放时,被唤醒的线程能够抢占锁时,计数器已经被更新了,前面所说的进入条件不再满足了。 // 所以,条件满足时不能动,能动了条件又不满足了,本质上,两次判断在同一段锁定区间,所以以下情形应该难以出现,难道还有其他情况? if (slow_io_work_running >= slow_work_thread_threshold()) { QUEUE_INSERT_TAIL(&wq, q); continue; }
/* If we encountered a request to run slow I/O work but there is none to run, that means it's cancelled => Start over. */ // 如果慢IO队列为空,可能任务被取消 if (QUEUE_EMPTY(&slow_io_pending_wq)) continue;
/* If there is more slow I/O work, schedule it to be run as well. */ // 如果还有更多的慢IO操作,则将这些任务插入到 wq 队列末尾,本次只能处理 q 这一个任务 if (!QUEUE_EMPTY(&slow_io_pending_wq)) { QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); // 如果有空闲线程,唤醒 if (idle_threads > 0) uv_cond_signal(&cond); } }
// 因为 loop 在多线程中共享,所以访问 loop 需要加锁 uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ // 将完成的任务插入到 loop->wq 队列中,在主事件循环线程中处理 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); // 发送完成信号,唤醒事件询线程并处理 uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex);
/* Lock `mutex` since that is expected at the start of the next * iteration. */ uv_mutex_lock(&mutex); if (is_slow_work) { /* `slow_io_work_running` is protected by `mutex`. */ slow_io_work_running--; } } }
uv_async_send 已经分析过了,它向事件循环线程发送消息唤醒事件循环线程
主线程中的初始化工作
主线程中的初始化工作是先于线程池初始化的,这部分初始化完成了用于接收 work 线程消息的 AsyncHandle 的初始化工作。
staticvoidpost(QUEUE* q, enum uv__work_kind kind) { uv_mutex_lock(&mutex); if (kind == UV__WORK_SLOW_IO) { /* Insert into a separate queue. */ QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); if (!QUEUE_EMPTY(&run_slow_work_message)) { /* Running slow I/O tasks is already scheduled => Nothing to do here. The worker that runs said other task will schedule this one as well. */ uv_mutex_unlock(&mutex); return; } q = &run_slow_work_message; }
QUEUE_INSERT_TAIL(&wq, q); if (idle_threads > 0) uv_cond_signal(&cond); uv_mutex_unlock(&mutex); }