pub fn understand_channel_and_condvar()
Expand description
§构建「无悔」并发系统
使用 channel 和 condvar : 模拟并行组件
- Rust 保证安全性上「无畏」,但不保证工程性上的「无悔」。
- 但 Rust 有提供帮助我们建立「无悔」并发的「工具」。
- 通过这些工具,结合从实际沉淀出来并发模型的最佳默认模式「event-loop」来建立健壮的并发应用。
- 拓展阅读: Rust concurrency patterns: regret-less concurrency
示例1: 用 channel 模拟 event
+--------------+
| main thread | send work msg
+-----------------------------> | 主 组 件 | +-------------+
| receive result msg | | |
| +--------------+ work1
| |
| send result msg |
| +-----------------------+ work1
| | | |
| v | v
| result channel | work channel
| +---+ | +---+
| | | | | |
| +---+ | +---+
| | | | | |
| +---+ +---+----+ +---+
| | | | worker | | |
| +---+ | thread | +---+
| | | | 并 | | |
| +---+ | 行 | +---+
| | | | 组 | | |
| +---+ | 件 | +---+
| | | +----+---+ | |
| +-+-+ ^ +-+-+
| | |receive work msg |
| | | |
+--------------+ +--------------------------+
代码
#[macro_use]
extern crate crossbeam_channel;
extern crate rayon;
use crossbeam_channel::unbounded;
use std::collections::HashMap;
use std::sync::{Arc, Condvar, Mutex};
// use parking_lot::{Mutex, Condvar};
// use std::sync::Arc;
use std::thread;
// 此消息用于发送到与「主组件」并行运行的其他组件。
enum WorkMsg {
Work(u8),
Exit,
}
// 此消息用于从并行运行的其他组件 发送回「主组件」。
enum ResultMsg {
Result(u8),
Exited,
}
fn main() {
let (work_sender, work_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
// 生成子线程用于执行另一个并行组件
let _ = thread::spawn(move || loop {
// 接收并处理消息,直到收到 exit 消息
match work_receiver.recv() {
Ok(WorkMsg::Work(num)) => {
// 执行一些工作,并且发送消息给 Result 队列
let _ = result_sender.send(ResultMsg::Result(num));
}
Ok(WorkMsg::Exit) => {
// 发送 exit 确认消息
let _ = result_sender.send(ResultMsg::Exited);
break;
}
_ => panic!("Error receiving a WorkMsg."),
}
});
let _ = work_sender.send(WorkMsg::Work(0));
let _ = work_sender.send(WorkMsg::Work(1));
let _ = work_sender.send(WorkMsg::Exit);
// worker执行计数
let mut counter = 0;
loop {
match result_receiver.recv() {
Ok(ResultMsg::Result(num)) => {
// 断言确保接收和发送的顺序是一致的
assert_eq!(num, counter);
counter += 1;
}
Ok(ResultMsg::Exited) => {
// 断言确保在接收两条工作消息之后收到退出消息
assert_eq!(2, counter);
break;
}
_ => panic!("Error receiving a ResultMsg."),
}
}
}
示例二:引入线程池,工作的顺序将无法确定
+--------------+
| main thread | send work msg
+-----------------------------> | 主 组 件 | +---------------+
| receive result msg | | |
| +--------------+ work1
| |
| send result msg |
| +-----------------------+ work0
| | | |
| v | v
| result channel +--------+------+ work channel
| +---+ | | +---+
| | | | | | |
| +---+ +----+---+ +----+----+ +---+
| | | | worker | | worker | | |
| +---+ | thread |thread| thread | +---+
| | | | 并 | pool| 并 | | |
| +---+ | 行 | | 行 | +---+
| | | | 组 | | 组 | | |
| +---+ | 件 | | 件 | +---+
| | | +----+---+ +-----+---+ | |
| +---+ ^ ^ +---+
| | | | | | |
| +-+-+ +receive-work-msg+ +-+-+
| | | |
| | | |
+--------------+ +--------------------------+
代码:
#[macro_use]
extern crate crossbeam_channel;
extern crate rayon;
use crossbeam_channel::unbounded;
use std::collections::HashMap;
// use std::sync::{Arc, Condvar, Mutex};
use parking_lot::{Condvar, Mutex};
use std::sync::Arc;
use std::thread;
enum WorkMsg {
Work(u8),
Exit,
}
enum ResultMsg {
Result(u8),
Exited,
}
fn main() {
let (work_sender, work_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
// 引入线程池,开两个工作线程
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
let _ = thread::spawn(move || loop {
match work_receiver.recv() {
Ok(WorkMsg::Work(num)) => {
let result_sender = result_sender.clone();
// 使用线程池中的线程
pool.spawn(move || {
// 执行一些工作,并且发送消息给 Result 队列
let _ = result_sender.send(ResultMsg::Result(num));
});
}
Ok(WorkMsg::Exit) => {
let _ = result_sender.send(ResultMsg::Exited);
break;
}
_ => panic!("Error receiving a WorkMsg."),
}
});
let _ = work_sender.send(WorkMsg::Work(0));
let _ = work_sender.send(WorkMsg::Work(1));
let _ = work_sender.send(WorkMsg::Exit);
loop {
match result_receiver.recv() {
Ok(ResultMsg::Result(_)) => {
// 不能再断言顺序了
}
Ok(ResultMsg::Exited) => {
// 也不能断言在退出消息之前已经收到了结果
break;
}
_ => panic!("Error receiving a ResultMsg."),
}
}
}
示例3: 确保工作结束再退出
+--------------+
| main thread | send work msg
+-----------------------------> | 主 组 件 | +--------------------------+
| receive result msg | | +
| +--------------+ work1
| |
| send result msg |
| +-----------------------+ work0
| | | |
| v | v
| result channel +--------+-------------------------+ work channel
| +---+ | thread | +---+
| | | | pool | | |
| +---+ +----+---+ +----+----+ +---+
| | | | worker | | worker | | |
| +---+ | thread | pool_res_channel | thread | +---+
| | | | 并 +-------------------------+ 并 | | |
| +---+ | 行 send msg when job finished 行 | +---+
| | | | 组 +-------------------------+ 组 | | |
| +---+ | 件 | | 件 | +---+
| | | +----+---+ +-----+---+ | |
| +---+ ^ ^ +---+
| | | | | | |
| +-+-+ +receive-work-msg+------------------+ +-+-+
| | | |
| | | |
+--------------+ +-------------------------------------+
代码:
#[macro_use]
extern crate crossbeam_channel;
extern crate rayon;
use crossbeam_channel::unbounded;
use std::collections::HashMap;
// use std::sync::{Arc, Condvar, Mutex};
use parking_lot::{Condvar, Mutex};
use std::sync::Arc;
use std::thread;
enum WorkMsg {
Work(u8),
Exit,
}
enum ResultMsg {
Result(u8),
Exited,
}
fn main() {
let (work_sender, work_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
// 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
let (pool_result_sender, pool_result_receiver) = unbounded();
let mut ongoing_work = 0;
let mut exiting = false;
// 使用线程池
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
let _ = thread::spawn(move || loop {
// 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
select! {
recv(work_receiver) -> msg => {
match msg {
Ok(WorkMsg::Work(num)) => {
let result_sender = result_sender.clone();
let pool_result_sender = pool_result_sender.clone();
// 注意,这里正在池上启动一个新的工作单元。
ongoing_work += 1;
pool.spawn(move || {
// 1. 发送结果给「主组件」
let _ = result_sender.send(ResultMsg::Result(num));
// 2. 让并行组件知道这里完成了一个工作单元
let _ = pool_result_sender.send(());
});
},
Ok(WorkMsg::Exit) => {
// N注意,这里接收请求并退出
exiting = true;
// 如果没有正则进行的工作则立即退出
if ongoing_work == 0 {
let _ = result_sender.send(ResultMsg::Exited);
break;
}
},
_ => panic!("Error receiving a WorkMsg."),
}
},
recv(pool_result_receiver) -> _ => {
if ongoing_work == 0 {
panic!("Received an unexpected pool result.");
}
// 注意,一个工作单元已经被完成
ongoing_work -=1;
// 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
if ongoing_work == 0 && exiting {
let _ = result_sender.send(ResultMsg::Exited);
break;
}
},
}
});
let _ = work_sender.send(WorkMsg::Work(0));
let _ = work_sender.send(WorkMsg::Work(1));
let _ = work_sender.send(WorkMsg::Exit);
let mut counter = 0;
loop {
match result_receiver.recv() {
Ok(ResultMsg::Result(_)) => {
// 计数当前完成的工作单元
counter += 1;
}
Ok(ResultMsg::Exited) => {
// 断言检测:是在接收到两个请求以后退出的
assert_eq!(2, counter);
break;
}
_ => panic!("Error receiving a ResultMsg."),
}
}
}
示例3 重构
#[macro_use]
extern crate crossbeam_channel;
extern crate rayon;
use crossbeam_channel::unbounded;
use std::collections::HashMap;
// use std::sync::{Arc, Condvar, Mutex};
use parking_lot::{Condvar, Mutex};
use std::sync::Arc;
use std::thread;
enum WorkMsg {
Work(u8),
Exit,
}
enum ResultMsg {
Result(u8),
Exited,
}
struct WorkerState {
ongoing: i16,
exiting: bool,
}
impl WorkerState {
fn init() -> Self {
WorkerState{ ongoing: 0, exiting: false }
}
fn set_ongoing(&mut self, count: i16) {
self.ongoing += count;
}
fn set_exiting(&mut self, exit_state: bool) {
self.exiting = exit_state;
}
fn is_exiting(&self) -> bool {
self.exiting == true
}
fn is_nomore_work(&self)-> bool {
self.ongoing == 0
}
}
fn main() {
let (work_sender, work_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
// 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
let (pool_result_sender, pool_result_receiver) = unbounded();
let mut worker_state = WorkerState::init();
// 使用线程池
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
let _ = thread::spawn(move || loop {
// 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
select! {
recv(work_receiver) -> msg => {
match msg {
Ok(WorkMsg::Work(num)) => {
let result_sender = result_sender.clone();
let pool_result_sender = pool_result_sender.clone();
// 注意,这里正在池上启动一个新的工作单元。
worker_state.set_ongoing(1);
pool.spawn(move || {
// 1. 发送结果给「主组件」
result_sender.send(ResultMsg::Result(num));
// 2. 让并行组件知道这里完成了一个工作单元
pool_result_sender.send(());
});
},
Ok(WorkMsg::Exit) => {
// N注意,这里接收请求并退出
// exiting = true;
worker_state.set_exiting(true);
// 如果没有正则进行的工作则立即退出
if worker_state.is_nomore_work() {
result_sender.send(ResultMsg::Exited);
break;
}
},
_ => panic!("Error receiving a WorkMsg."),
}
},
recv(pool_result_receiver) -> _ => {
if worker_state.is_nomore_work() {
panic!("Received an unexpected pool result.");
}
// 注意,一个工作单元已经被完成
worker_state.set_ongoing(-1);
// 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
if worker_state.is_nomore_work() && worker_state.is_exiting() {
result_sender.send(ResultMsg::Exited);
break;
}
},
}
});
work_sender.send(WorkMsg::Work(0));
work_sender.send(WorkMsg::Work(1));
work_sender.send(WorkMsg::Exit);
let mut counter = 0;
loop {
match result_receiver.recv() {
Ok(ResultMsg::Result(_)) => {
// 计数当前完成的工作单元
counter += 1;
}
Ok(ResultMsg::Exited) => {
// 断言检测:是在接收到两个请求以后退出的
assert_eq!(2, counter);
break;
}
_ => panic!("Error receiving a ResultMsg."),
}
}
}
示例4: 使用缓存共享数据
+--------------+
| main thread | send work msg
+-----------------------------> | 主 组 件 | +--------------------------+
| receive result msg | | +
| +--------------+ work1
| |
| send result msg |
| +------------------------+ work0
| | | +
| v | |
| result channel | |
| +---+ +---------+------------------------+ |
| | | | thread | |
| +---+ | pool | |
| | | +----+---+ +----+----+ |
| +---+ | worker | | worker | |
| | | | thread | | thread | |
| +---+ | | | | |
| | | | | | | +
| +---+ | | | | work channel
| | | | |get +--------------+ get| | +---+
| +---+ | +--->+ work cache +<----+ | | |
| | | | | +--------------+ | | +---+
| +-+-+ | | | | | |
| | | | pool_res_channel | | +---+
| | | 并 +-------------------------+ 并 | | |
+--------------+ | 行 send msg when job finished 行 | +---+
| 组 +-------------------------+ 组 | | |
| 件 | | 件 | +---+
+----+---+ +-----+---+ | |
^ ^ +---+
| | | |
+receive-work-msg+------------------+ +-+-+
| |
| |
+-------------------------------------+
代码:
#[macro_use]
extern crate crossbeam_channel;
extern crate rayon;
use crossbeam_channel::unbounded;
use std::collections::HashMap;
use std::sync::{Arc, Condvar, Mutex};
// use parking_lot::{Condvar, Mutex};
// use std::sync::Arc;
use std::thread;
enum WorkMsg {
Work(u8),
Exit,
}
enum ResultMsg {
Result(u8, WorkPerformed),
Exited,
}
struct WorkerState {
ongoing: i16,
exiting: bool,
}
impl WorkerState {
fn init() -> Self {
WorkerState{ ongoing: 0, exiting: false }
}
fn set_ongoing(&mut self, count: i16) {
self.ongoing += count;
}
fn set_exiting(&mut self, exit_state: bool) {
self.exiting = exit_state;
}
fn is_exiting(&self) -> bool {
self.exiting == true
}
fn is_nomore_work(&self)-> bool {
self.ongoing == 0
}
}
#[derive(Debug, Eq, PartialEq)]
enum WorkPerformed {
FromCache,
New,
}
#[derive(Eq, Hash, PartialEq)]
struct CacheKey(u8);
fn main() {
let (work_sender, work_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
// 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
let (pool_result_sender, pool_result_receiver) = unbounded();
let mut worker_state = WorkerState::init();
// 使用线程池
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
// 缓存 work ,由 池 中的 worker 共享
let cache: Arc<Mutex<HashMap<CacheKey, u8>>> = Arc::new(Mutex::new(HashMap::new()));
let _ = thread::spawn(move || loop {
// 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
select! {
recv(work_receiver) -> msg => {
match msg {
Ok(WorkMsg::Work(num)) => {
let result_sender = result_sender.clone();
let pool_result_sender = pool_result_sender.clone();
// 使用缓存
let cache = cache.clone();
// 注意,这里正在池上启动一个新的工作单元。
worker_state.set_ongoing(1);
pool.spawn(move || {
let num = {
// 缓存开始
let cache = cache.lock().unwrap();
let key = CacheKey(num);
if let Some(result) = cache.get(&key) {
// 从缓存中获得一个结果,并将其发送回去,
// 同时带有一个标志,表明是从缓存中获得了它
let _ = result_sender.send(ResultMsg::Result(result.clone(), WorkPerformed::FromCache));
let _ = pool_result_sender.send(());
return;
}
key.0
// 缓存结束
};
// work work work work work work...
// 返回结果,表明我们必须执行work
let _ = result_sender.send(ResultMsg::Result(num.clone(), WorkPerformed::New));
// 在缓存中存储“昂贵”的work.
let mut cache = cache.lock().unwrap();
let key = CacheKey(num.clone());
cache.insert(key, num);
let _ = pool_result_sender.send(());
});
},
Ok(WorkMsg::Exit) => {
// N注意,这里接收请求并退出
// exiting = true;
worker_state.set_exiting(true);
// 如果没有正则进行的工作则立即退出
if worker_state.is_nomore_work() {
result_sender.send(ResultMsg::Exited);
break;
}
},
_ => panic!("Error receiving a WorkMsg."),
}
},
recv(pool_result_receiver) -> _ => {
if worker_state.is_nomore_work() {
panic!("Received an unexpected pool result.");
}
// 注意,一个工作单元已经被完成
worker_state.set_ongoing(-1);
// 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
if worker_state.is_nomore_work() && worker_state.is_exiting() {
result_sender.send(ResultMsg::Exited);
break;
}
},
}
});
let _ = work_sender.send(WorkMsg::Work(0));
// 发送两个相同的work
let _ = work_sender.send(WorkMsg::Work(1));
let _ = work_sender.send(WorkMsg::Work(1));
let _ = work_sender.send(WorkMsg::Exit);
let mut counter = 0;
loop {
match result_receiver.recv() {
Ok(ResultMsg::Result(_, _cached)) => {
// 计数当前完成的工作单元
counter += 1;
}
Ok(ResultMsg::Exited) => {
// 断言检测:是在接收到两个请求以后退出的
assert_eq!(3, counter);
break;
}
_ => panic!("Error receiving a ResultMsg."),
}
}
}
示例5: 确保从缓存中取共享数据的行为是确定的
+--------------+
| main thread | send work msg
+-----------------------------> | 主 组 件 | +----------------------------------+
| receive result msg | | +
| +--------------+ work1
| |
| send result msg |
| +------------------------+ work0
| | | +
| v | thread |
| result channel | pool |
| +---+ |---------+--------------------------------| |
| | | +--------+ wait +------------+ wait +---------+ |
| +---+ | worker +<----------+ inproces +------->+ worker | |
| | | | thread | +---- ---+ | thread | |
| +---+ | | +--->+ work cache state +<--+ | | |
| | | | | | +-----+ +----+ | | | |
| +---+ | +--+ | ready | +-+ | |
| | | | | notify +----+---++ notify | | +
| +---+ | | <-----------+ | +-------->+ | work channel
| | | | | v | | +---+
| +---+ | | +--------+-----+ | | | |
| | | | | | work cache | | | +---+
| +-+-+ | | +--------------+ | | | |
| | | | pool_res_channel | | +---+
| | | 并 +---------------------------------+ 并 | | |
+--------------+ | 行 send msg when job finished 行 | +---+
| 组 +---------------------------------+ 组 | | |
| 件 | | 件 | +---+
+----+---+ +-----+---+ | |
^ ^ +---+
| | | |
+receive-work-msg+--------------------------+ +-+-+
| |
| |
+---------------------------------------------+
代码:
#[macro_use]
extern crate crossbeam_channel;
extern crate rayon;
use crossbeam_channel::unbounded;
use std::collections::HashMap;
use std::sync::{Arc, Condvar, Mutex};
// use parking_lot::{Condvar, Mutex};
// use std::sync::Arc;
use std::thread;
enum WorkMsg {
Work(u8),
Exit,
}
#[derive(Debug, Eq, PartialEq)]
enum CacheState {
Ready,
WorkInProgress,
}
enum ResultMsg {
Result(u8, WorkPerformed),
Exited,
}
struct WorkerState {
ongoing: i16,
exiting: bool,
}
impl WorkerState {
fn init() -> Self {
WorkerState{ ongoing: 0, exiting: false }
}
fn set_ongoing(&mut self, count: i16) {
self.ongoing += count;
}
fn set_exiting(&mut self, exit_state: bool) {
self.exiting = exit_state;
}
fn is_exiting(&self) -> bool {
self.exiting == true
}
fn is_nomore_work(&self)-> bool {
self.ongoing == 0
}
}
#[derive(Debug, Eq, PartialEq)]
enum WorkPerformed {
FromCache,
New,
}
#[derive(Eq, Hash, PartialEq)]
struct CacheKey(u8);
fn main() {
let (work_sender, work_receiver) = unbounded();
let (result_sender, result_receiver) = unbounded();
// 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
let (pool_result_sender, pool_result_receiver) = unbounded();
let mut worker_state = WorkerState::init();
// 使用线程池
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
// 缓存 work ,由 池 中的 worker 共享
let cache: Arc<Mutex<HashMap<CacheKey, u8>>> = Arc::new(Mutex::new(HashMap::new()));
// 增加缓存状态,指示对于给定的key,缓存是否已经准备好被读取。
let cache_state: Arc<Mutex<HashMap<CacheKey, Arc<(Mutex<CacheState>, Condvar)>>>> =
Arc::new(Mutex::new(HashMap::new()));
let _ = thread::spawn(move || loop {
// 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
select! {
recv(work_receiver) -> msg => {
match msg {
Ok(WorkMsg::Work(num)) => {
let result_sender = result_sender.clone();
let pool_result_sender = pool_result_sender.clone();
// 使用缓存
let cache = cache.clone();
let cache_state = cache_state.clone();
// 注意,这里正在池上启动一个新的工作单元。
worker_state.set_ongoing(1);
pool.spawn(move || {
let num = {
let (cache_state_lock, cvar) = {
// `cache_state` 临界区开始
let mut state_map = cache_state.lock().unwrap();
&*state_map
.entry(CacheKey(num.clone()))
.or_insert_with(|| {
Arc::new((
Mutex::new(CacheState::Ready),
Condvar::new(),
))
})
.clone()
// `cache_state` 临界区结束
};
// `state` 临界区开始
let mut state = cache_state_lock.lock().unwrap();
// 注意:使用while循环来防止条件变量的虚假唤醒
while let CacheState::WorkInProgress = *state {
// 阻塞直到状态是 `CacheState::Ready`.
//
// 当唤醒时会自动释放锁
let current_state = cvar
.wait(state)
.unwrap();
state = current_state;
}
// 循环外可以认为state 已经是 Ready 的了
assert_eq!(*state, CacheState::Ready);
let (num, result) = {
// 缓存临界区开始
let cache = cache.lock().unwrap();
let key = CacheKey(num);
let result = match cache.get(&key) {
Some(result) => Some(result.clone()),
None => None,
};
(key.0, result)
// 缓存临界区结束
};
if let Some(result) = result {
// 从缓存中获得一个结果,并将其发送回去,
// 同时带有一个标志,表明是从缓存中获得了它
let _ = result_sender.send(ResultMsg::Result(result, WorkPerformed::FromCache));
let _ = pool_result_sender.send(());
// 不要忘记通知等待线程
cvar.notify_one();
return;
} else {
// 如果缓存里没有找到结果,那么切换状态
*state = CacheState::WorkInProgress;
num
}
// `state` 临界区结束
};
// 在临界区外做更多「昂贵工作」
let _ = result_sender.send(ResultMsg::Result(num.clone(), WorkPerformed::New));
{
// 缓存临界区开始
// 插入工作结果到缓存中
let mut cache = cache.lock().unwrap();
let key = CacheKey(num.clone());
cache.insert(key, num);
// 缓存临界区结束
}
let (lock, cvar) = {
let mut state_map = cache_state.lock().unwrap();
&*state_map
.get_mut(&CacheKey(num))
.expect("Entry in cache state to have been previously inserted")
.clone()
};
// 重新进入 `state` 临界区
let mut state = lock.lock().unwrap();
// 在这里,由于已经提前设置了state,并且任何其他worker都将等待状态切换回ready,可以确定该状态是“in-progress”。
assert_eq!(*state, CacheState::WorkInProgress);
// 切换状态为 Ready
*state = CacheState::Ready;
// 通知等待线程
cvar.notify_one();
let _ = pool_result_sender.send(());
});
},
Ok(WorkMsg::Exit) => {
// N注意,这里接收请求并退出
// exiting = true;
worker_state.set_exiting(true);
// 如果没有正则进行的工作则立即退出
if worker_state.is_nomore_work() {
result_sender.send(ResultMsg::Exited);
break;
}
},
_ => panic!("Error receiving a WorkMsg."),
}
},
recv(pool_result_receiver) -> _ => {
if worker_state.is_nomore_work() {
panic!("Received an unexpected pool result.");
}
// 注意,一个工作单元已经被完成
worker_state.set_ongoing(-1);
// 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
if worker_state.is_nomore_work() && worker_state.is_exiting() {
result_sender.send(ResultMsg::Exited);
break;
}
},
}
});
let _ = work_sender.send(WorkMsg::Work(0));
// 发送两个相同的work
let _ = work_sender.send(WorkMsg::Work(1));
let _ = work_sender.send(WorkMsg::Work(1));
let _ = work_sender.send(WorkMsg::Exit);
let mut counter = 0;
// 当work 是 1 的时候重新计数
let mut work_one_counter = 0;
loop {
match result_receiver.recv() {
Ok(ResultMsg::Result(num, cached)) => {
counter += 1;
if num == 1 {
work_one_counter += 1;
}
// 现在我们可以断言,当收到 num 为 1 的第二个结果时,它已经来自缓存。
if num == 1 && work_one_counter == 2 {
assert_eq!(cached, WorkPerformed::FromCache);
}
}
Ok(ResultMsg::Exited) => {
assert_eq!(3, counter);
break;
}
_ => panic!("Error receiving a ResultMsg."),
}
}
}