AP计算机科学A(APcomputer science A)复习备考攻略视频教程
44201 人在学
我们都知道 Node.js 是基于事件循环来运行的,本质上是一个生产者 / 消费者模型,所以就少不了任务的管理机制,不过本文不是介绍事件循环中的任务管理,而是 C++ 层的任务管理。本文主要介绍 SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook 这四个 API 产生的任务。时间关系,随便写写,权当笔记。
好久没更新了,今天写个笔记。
我们都知道 Node.js 是基于事件循环来运行的,本质上是一个生产者 / 消费者模型,所以就少不了任务的管理机制,不过本文不是介绍事件循环中的任务管理,而是 C++ 层的任务管理。本文主要介绍 SetImmediate、SetImmediateThreadsafe、RequestInterrupt、AddCleanupHook 这四个 API 产生的任务。时间关系,随便写写,权当笔记。
任务管理机制的初始化
首先来看一下 Node.js 启动的过程中,和任务管理相关的逻辑。
复制
1. uv_check_start(immediate_check_handle(), CheckImmediate)
2. uv_async_init(
3. event_loop(),
4. &task_queues_async_,
5. [](uv_async_t* async) {
6. Environment* env = ContainerOf(&Environment::task_queues_async_, async);
7. env->RunAndClearNativeImmediates();
8. })
CheckImmediate 是在 check 阶段执行的函数,task_queues_async_ 则用于线程间通信,即当子线程往主线程提交任务时,通过 task_queues_async_ 通知主线程,然后主线程执行 uv_async_init 注册的回调。上面的代码就是消费者的逻辑。后面再详细分析里面的处理流程。
提交任务
接下来逐个看一下生产者的逻辑。
复制
1. template
2. void Environment::SetImmediate(Fn&& cb, CallbackFlags::Flags flags) {
3. auto callback = native_immediates_.CreateCallback(std::move(cb), flags);
4. native_immediates_.Push(std::move(callback));
5. // ...
6. }
SetImmediate 用于同线程的代码提交任务。
复制
1. template
2. void Environment::SetImmediateThreadsafe(Fn&& cb, CallbackFlags::Flags flags) {
3. auto callback = native_immediates_threadsafe_.CreateCallback(
4. std::move(cb), flags);
5. {
6. Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
7. native_immediates_threadsafe_.Push(std::move(callback));
8. if (task_queues_async_initialized_)
9. uv_async_send(&task_queues_async_);
10. }
11. }
SetImmediateThreadsafe 用于子线程给主线程提交任务,所以需要加锁。
复制
1. template
2. void Environment::RequestInterrupt(Fn&& cb) {
3. auto callback = native_immediates_interrupts_.CreateCallback(
4. std::move(cb), CallbackFlags::kRefed);
5. {
6. Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
7. native_immediates_interrupts_.Push(std::move(callback));
8. if (task_queues_async_initialized_)
9. uv_async_send(&task_queues_async_);
10. }
11. RequestInterruptFromV8();
12. }
RequestInterrupt 用于子线程给主线程提交代码,他和 SetImmediateThreadsafe 有一个很重要的区别是调用了 RequestInterruptFromV8。
复制
1. void Environment::RequestInterruptFromV8() {
2. isolate()->RequestInterrupt([](Isolate* isolate, void* data) {
3. std::unique_ptr<environment*> env_ptr { static_cast<environment**>(data) };
4. Environment* env = *env_ptr;
5. env->RunAndClearInterrupts();
6. }, interrupt_data);
7. }
RequestInterrupt 可以使得提交的代码在 JS 代码死循环时依然会被执行。接着看 AddCleanupHook。
复制
1. void Environment::AddCleanupHook(CleanupQueue::Callback fn, void* arg) {
2. cleanup_queue_.Add(fn, arg);
3. }
AddCleanupHook 用于注册线程退出前的回调。生产者的逻辑都比较简单,就是往任务队列里插入一个任务,如果是涉及到线程间的任务,则通知主线程。
消费者
接下来看一下消费者的逻辑,根据前面的分析可以知道,消费者有几个:CheckImmediate,task_queues_async_ 的处理函数、RequestInterrupt 注册的函数、退出前回调处理函数。先看 CheckImmediate。
复制
1. void Environment::CheckImmediate(uv_check_t* handle) {
2. Environment* env = Environment::from_immediate_check_handle(handle);
3. env->RunAndClearNativeImmediates();
4. }
5.
6. void Environment::RunAndClearNativeImmediates(bool only_refed) {
7. RunAndClearInterrupts();
8.
9. auto drain_list = [&](NativeImmediateQueue* queue) {
10. while (auto head = queue->Shift()) {
11. head->Call(this);
12. }
13. return false;
14. };
15. while (drain_list(&native_immediates_)) {}
16. NativeImmediateQueue threadsafe_immediates;
17. if (native_immediates_threadsafe_.size() > 0) {
18. Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
19. threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));
20. }
21. while (drain_list(&threadsafe_immediates)) {}
22. }
23.
24. void Environment::RunAndClearInterrupts() {
25. while (native_immediates_interrupts_.size() > 0) {
26. NativeImmediateQueue queue;
27. {
28. Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
29. queue.ConcatMove(std::move(native_immediates_interrupts_));
30. }
31. while (auto head = queue.Shift())
32. head->Call(this);
33. }
34. }
CheckImmediate 函数中处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。但是如果主线程阻塞在 Poll IO 阶段时,只有子线程提交任务时会唤醒主线程,具体是通过 task_queues_async_ 结构体,看一下处理函数。
复制
1. env->RunAndClearNativeImmediates();
可以看到这时候也是处理了SetImmediate、SetImmediateThreadsafe 和 RequestInterrupt 产生的任务。最后来看一下处理退出前回调的函数,具体时机是 FreeEnvironment 函数中的 env->RunCleanup()。
复制
1. void Environment::RunCleanup() {
2. RunAndClearNativeImmediates(true);
3. while (!cleanup_queue_.empty() || principal_realm_->HasCleanupHooks() ||
4. native_immediates_.size() > 0 ||
5. native_immediates_threadsafe_.size() > 0 ||
6. native_immediates_interrupts_.size() > 0) {
7. // 见 CleanupQueue::Drain
8. cleanup_queue_.Drain();
9. RunAndClearNativeImmediates(true);
10. }
11. }
12.
13. // cleanup_queue_.Drain();
14. void CleanupQueue::Drain() {
15. std::vector callbacks(cleanup_hooks_.begin(),
16. cleanup_hooks_.end());
17. std::sort(callbacks.begin(),
18. callbacks.end(),
19. [](const CleanupHookCallback& a, const CleanupHookCallback& b) {
20. return a.insertion_order_counter_ > b.insertion_order_counter_;
21. });
22.
23. for (const CleanupHookCallback& cb : callbacks) {
24. cb.fn_(cb.arg_);
25. cleanup_hooks_.erase(cb);
26. }
27. }
RunCleanup 中同时处理了 SetImmediate、SetImmediateThreadsafe、 RequestInterrupt 产生的任务和注册的退出前回调。
来源: 编程杂技
>>>>>>点击进入编程语言专题
共46节 · 9小时5分钟
智慧养老院管理系统+小程序(附vue+springboot项目源码)视频教程
¥799.00159人在学
共35节 · 7小时48分钟
智慧养老院管理系统(附vue+springboot项目源码)视频教程
¥499.00163人在学
共36节 · 9小时13分钟
¥599.00852人在学
共27节 · 7小时36分钟
房屋出租管理系统(附vue+springboot项目源码)视频教程
¥399.001143人在学