sui_rust/ch02/s3_thread_safe.rs
1//! 第二章:Rust核心概念
2//! 2.3 Thread Safe
3
4/**
5 ### 理解本地线程,理解并发
6
7 - 并发:同时「应对」很多事的能力
8 - 并行:同时「执行」很多事的能力
9
10 相关类型:
11
12 - [Duration](https://doc.rust-lang.org/std/time/struct.Duration.html)
13 - [JoinHandle](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html)
14
15 ```
16 use std::thread;
17
18 fn main() {
19 // Duration 实现了 Copy、Send、Sync
20 let duration = std::time::Duration::from_millis(3000);
21
22 println!("Main thread");
23
24 let handle = thread::spawn(move || {
25 println!("Sub thread 1");
26
27 // 注意:它的父线程是主线程,而不是线程1
28 let handle2 = thread::spawn( move || {
29 println!("Sub thread 2");
30 thread::sleep(duration);
31 });
32
33 handle2.join().unwrap();
34 thread::sleep(duration);
35 });
36
37 handle.join().unwrap();
38 thread::sleep(duration);
39 }
40*/
41pub fn understand_local_thread() {
42 println!(" 理解本地线程 ");
43}
44
45/**
46 ### 线程间共享数据
47
48 [https://doc.rust-lang.org/std/time/struct.Duration.html](https://doc.rust-lang.org/std/time/struct.Duration.html)
49
50 ```
51 use std::thread;
52
53 fn main() {
54 let mut v = vec![1,2,3];
55 thread::spawn(move || {
56 v.push(4);
57 });
58 // Can no longer access `v` here.
59 }
60 ```
61
62 ```
63 // invalid
64 use std::thread;
65
66 fn main() {
67 let mut v = vec![1,2,3];
68 for i in 0..10 {
69 thread::spawn(move || {
70 v.push(i);
71 });
72 }
73 }
74 ```
75
76 借用检查阻止并发Bug
77
78 ```
79 // invalid
80 fn inner_func(vref: &mut Vec<u32>) {
81 std::thread::spawn(move || {
82 vref.push(3);
83 });
84 }
85
86 fn main() {
87 let mut v = vec![1,2,3];
88 inner_func(&mut v);
89 }
90 ```
91
92 `'static' 与 线程安全
93
94 Note: [曾经的 thread::scoped 会泄漏 JoinGuard 所以被废弃](https://github.com/rust-lang/rust/issues/24292)
95
96 ```
97 use std::fmt;
98 use std::time::Duration;
99 use std::thread;
100
101 struct Foo {
102 string: String,
103 v: Vec<f64>,
104 }
105
106 impl fmt::Display for Foo {
107 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108 write!(f, "{}: {:?}", self.string, self.v)
109 }
110 }
111
112 fn test<T: Send + Sync + fmt::Display + 'static >(val: T) {
113 thread::spawn(move || println!("{}", val));
114 }
115
116 fn main() {
117 test("hello"); // &'static str
118 test(String::from("hello")); // String
119 test(5); // i32
120
121 // Arbitrary struct containing String and Vec<f64>
122 test(Foo {string: String::from("hi"), v: vec![1.2, 2.3]});
123 thread::sleep(Duration::new(1, 0));
124 }
125 ```
126
127 使用 crossbeam::scope 共享数据
128
129 ```rust
130 use crossbeam;
131 use std::{thread, time::Duration};
132
133 fn main() {
134 let mut vec = vec![1, 2, 3, 4, 5];
135
136 crossbeam::scope(|scope| {
137 for e in &vec {
138 scope.spawn(move |_| {
139 println!("{:?}", e);
140 });
141 }
142 })
143 .expect("A child thread panicked");
144
145 println!("{:?}", vec);
146 }
147 ```
148
149 scope thread 修改数据
150
151 ```rust
152 use crossbeam; // 0.6.0
153 use std::{thread, time::Duration};
154
155 fn main() {
156 let mut vec = vec![1, 2, 3, 4, 5];
157
158 crossbeam::scope(|scope| {
159 for e in &mut vec {
160 scope.spawn(move |_| {
161 thread::sleep(Duration::from_secs(1));
162 *e += 1;
163 });
164 }
165 })
166 .expect("A child thread panicked");
167
168 println!("{:?}", vec);
169 }
170 ```
171*/
172pub fn understand_shared_thread() {
173 println!(" 线程间共享数据 ");
174}
175
176/**
177 ### 使用 Arc 和 Mutex 安全共享数据
178
179 ```
180 use std::sync::{Arc, Mutex};
181 use std::thread;
182
183 fn main() {
184 let v = Arc::new(Mutex::new(vec![1,2,3]));
185
186 for i in 0..3 {
187 let cloned_v = v.clone();
188 thread::spawn(move || {
189 cloned_v.lock().unwrap().push(i);
190 });
191 }
192 }
193 ```
194*/
195pub fn understand_safed_shared_thread() {
196 println!(" 线程间安全共享数据 ");
197}
198
199/**
200 ### 构建「无悔」并发系统
201
202 使用 channel 和 condvar : 模拟并行组件
203
204 - [parking_lot](https://github.com/Amanieu/parking_lot)
205 - [crossbeam](https://github.com/crossbeam-rs/crossbeam)
206
207 > 1. Rust 保证安全性上「无畏」,但不保证工程性上的「无悔」。
208 > 2. 但 Rust 有提供帮助我们建立「无悔」并发的「工具」。
209 > 3. 通过这些工具,结合从实际沉淀出来并发模型的最佳默认模式「event-loop」来建立健壮的并发应用。
210 > 4. 拓展阅读:
211 > [Rust concurrency patterns: regret-less concurrency](https://medium.com/@polyglot_factotum/rust-regret-less-concurrency-2238b9e53333)
212
213
214 示例1: 用 channel 模拟 event
215
216 ```text
217
218
219 +--------------+
220 | main thread | send work msg
221 +-----------------------------> | 主 组 件 | +-------------+
222 | receive result msg | | |
223 | +--------------+ work1
224 | |
225 | send result msg |
226 | +-----------------------+ work1
227 | | | |
228 | v | v
229 | result channel | work channel
230 | +---+ | +---+
231 | | | | | |
232 | +---+ | +---+
233 | | | | | |
234 | +---+ +---+----+ +---+
235 | | | | worker | | |
236 | +---+ | thread | +---+
237 | | | | 并 | | |
238 | +---+ | 行 | +---+
239 | | | | 组 | | |
240 | +---+ | 件 | +---+
241 | | | +----+---+ | |
242 | +-+-+ ^ +-+-+
243 | | |receive work msg |
244 | | | |
245 +--------------+ +--------------------------+
246
247 ```
248
249 代码
250
251 ```
252 #[macro_use]
253 extern crate crossbeam_channel;
254 extern crate rayon;
255
256 use crossbeam_channel::unbounded;
257 use std::collections::HashMap;
258 use std::sync::{Arc, Condvar, Mutex};
259 // use parking_lot::{Mutex, Condvar};
260 // use std::sync::Arc;
261 use std::thread;
262
263 // 此消息用于发送到与「主组件」并行运行的其他组件。
264 enum WorkMsg {
265 Work(u8),
266 Exit,
267 }
268
269 // 此消息用于从并行运行的其他组件 发送回「主组件」。
270 enum ResultMsg {
271 Result(u8),
272 Exited,
273 }
274
275 fn main() {
276 let (work_sender, work_receiver) = unbounded();
277 let (result_sender, result_receiver) = unbounded();
278
279 // 生成子线程用于执行另一个并行组件
280 let _ = thread::spawn(move || loop {
281 // 接收并处理消息,直到收到 exit 消息
282 match work_receiver.recv() {
283 Ok(WorkMsg::Work(num)) => {
284 // 执行一些工作,并且发送消息给 Result 队列
285 let _ = result_sender.send(ResultMsg::Result(num));
286 }
287 Ok(WorkMsg::Exit) => {
288 // 发送 exit 确认消息
289 let _ = result_sender.send(ResultMsg::Exited);
290 break;
291 }
292 _ => panic!("Error receiving a WorkMsg."),
293 }
294 });
295
296 let _ = work_sender.send(WorkMsg::Work(0));
297 let _ = work_sender.send(WorkMsg::Work(1));
298 let _ = work_sender.send(WorkMsg::Exit);
299
300 // worker执行计数
301 let mut counter = 0;
302
303 loop {
304 match result_receiver.recv() {
305 Ok(ResultMsg::Result(num)) => {
306 // 断言确保接收和发送的顺序是一致的
307 assert_eq!(num, counter);
308 counter += 1;
309 }
310 Ok(ResultMsg::Exited) => {
311 // 断言确保在接收两条工作消息之后收到退出消息
312 assert_eq!(2, counter);
313 break;
314 }
315 _ => panic!("Error receiving a ResultMsg."),
316 }
317 }
318 }
319 ```
320
321 示例二:引入线程池,工作的顺序将无法确定
322
323 ```text
324 +--------------+
325 | main thread | send work msg
326 +-----------------------------> | 主 组 件 | +---------------+
327 | receive result msg | | |
328 | +--------------+ work1
329 | |
330 | send result msg |
331 | +-----------------------+ work0
332 | | | |
333 | v | v
334 | result channel +--------+------+ work channel
335 | +---+ | | +---+
336 | | | | | | |
337 | +---+ +----+---+ +----+----+ +---+
338 | | | | worker | | worker | | |
339 | +---+ | thread |thread| thread | +---+
340 | | | | 并 | pool| 并 | | |
341 | +---+ | 行 | | 行 | +---+
342 | | | | 组 | | 组 | | |
343 | +---+ | 件 | | 件 | +---+
344 | | | +----+---+ +-----+---+ | |
345 | +---+ ^ ^ +---+
346 | | | | | | |
347 | +-+-+ +receive-work-msg+ +-+-+
348 | | | |
349 | | | |
350 +--------------+ +--------------------------+
351
352 ```
353
354 代码:
355
356 ```rust
357 #[macro_use]
358 extern crate crossbeam_channel;
359 extern crate rayon;
360
361 use crossbeam_channel::unbounded;
362 use std::collections::HashMap;
363 // use std::sync::{Arc, Condvar, Mutex};
364
365 use parking_lot::{Condvar, Mutex};
366 use std::sync::Arc;
367 use std::thread;
368
369 enum WorkMsg {
370 Work(u8),
371 Exit,
372 }
373
374 enum ResultMsg {
375 Result(u8),
376 Exited,
377 }
378
379 fn main() {
380 let (work_sender, work_receiver) = unbounded();
381 let (result_sender, result_receiver) = unbounded();
382 // 引入线程池,开两个工作线程
383 let pool = rayon::ThreadPoolBuilder::new()
384 .num_threads(2)
385 .build()
386 .unwrap();
387
388 let _ = thread::spawn(move || loop {
389 match work_receiver.recv() {
390 Ok(WorkMsg::Work(num)) => {
391 let result_sender = result_sender.clone();
392 // 使用线程池中的线程
393 pool.spawn(move || {
394 // 执行一些工作,并且发送消息给 Result 队列
395 let _ = result_sender.send(ResultMsg::Result(num));
396 });
397 }
398 Ok(WorkMsg::Exit) => {
399 let _ = result_sender.send(ResultMsg::Exited);
400 break;
401 }
402 _ => panic!("Error receiving a WorkMsg."),
403 }
404 });
405
406 let _ = work_sender.send(WorkMsg::Work(0));
407 let _ = work_sender.send(WorkMsg::Work(1));
408 let _ = work_sender.send(WorkMsg::Exit);
409
410 loop {
411 match result_receiver.recv() {
412 Ok(ResultMsg::Result(_)) => {
413 // 不能再断言顺序了
414 }
415 Ok(ResultMsg::Exited) => {
416 // 也不能断言在退出消息之前已经收到了结果
417 break;
418 }
419 _ => panic!("Error receiving a ResultMsg."),
420 }
421 }
422 }
423 ```
424
425 示例3: 确保工作结束再退出
426
427 ```text
428
429 +--------------+
430 | main thread | send work msg
431 +-----------------------------> | 主 组 件 | +--------------------------+
432 | receive result msg | | +
433 | +--------------+ work1
434 | |
435 | send result msg |
436 | +-----------------------+ work0
437 | | | |
438 | v | v
439 | result channel +--------+-------------------------+ work channel
440 | +---+ | thread | +---+
441 | | | | pool | | |
442 | +---+ +----+---+ +----+----+ +---+
443 | | | | worker | | worker | | |
444 | +---+ | thread | pool_res_channel | thread | +---+
445 | | | | 并 +-------------------------+ 并 | | |
446 | +---+ | 行 send msg when job finished 行 | +---+
447 | | | | 组 +-------------------------+ 组 | | |
448 | +---+ | 件 | | 件 | +---+
449 | | | +----+---+ +-----+---+ | |
450 | +---+ ^ ^ +---+
451 | | | | | | |
452 | +-+-+ +receive-work-msg+------------------+ +-+-+
453 | | | |
454 | | | |
455 +--------------+ +-------------------------------------+
456
457
458
459 ```
460
461 代码:
462
463 ```rust
464 #[macro_use]
465 extern crate crossbeam_channel;
466 extern crate rayon;
467
468 use crossbeam_channel::unbounded;
469 use std::collections::HashMap;
470 // use std::sync::{Arc, Condvar, Mutex};
471
472 use parking_lot::{Condvar, Mutex};
473 use std::sync::Arc;
474 use std::thread;
475
476 enum WorkMsg {
477 Work(u8),
478 Exit,
479 }
480
481 enum ResultMsg {
482 Result(u8),
483 Exited,
484 }
485
486 fn main() {
487 let (work_sender, work_receiver) = unbounded();
488 let (result_sender, result_receiver) = unbounded();
489 // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
490 let (pool_result_sender, pool_result_receiver) = unbounded();
491 let mut ongoing_work = 0;
492 let mut exiting = false;
493 // 使用线程池
494 let pool = rayon::ThreadPoolBuilder::new()
495 .num_threads(2)
496 .build()
497 .unwrap();
498
499 let _ = thread::spawn(move || loop {
500 // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
501 select! {
502 recv(work_receiver) -> msg => {
503 match msg {
504 Ok(WorkMsg::Work(num)) => {
505 let result_sender = result_sender.clone();
506 let pool_result_sender = pool_result_sender.clone();
507
508 // 注意,这里正在池上启动一个新的工作单元。
509 ongoing_work += 1;
510
511 pool.spawn(move || {
512 // 1. 发送结果给「主组件」
513 let _ = result_sender.send(ResultMsg::Result(num));
514
515 // 2. 让并行组件知道这里完成了一个工作单元
516 let _ = pool_result_sender.send(());
517 });
518 },
519 Ok(WorkMsg::Exit) => {
520 // N注意,这里接收请求并退出
521 exiting = true;
522
523 // 如果没有正则进行的工作则立即退出
524 if ongoing_work == 0 {
525 let _ = result_sender.send(ResultMsg::Exited);
526 break;
527 }
528 },
529 _ => panic!("Error receiving a WorkMsg."),
530 }
531 },
532 recv(pool_result_receiver) -> _ => {
533 if ongoing_work == 0 {
534 panic!("Received an unexpected pool result.");
535 }
536
537 // 注意,一个工作单元已经被完成
538 ongoing_work -=1;
539
540 // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
541 if ongoing_work == 0 && exiting {
542 let _ = result_sender.send(ResultMsg::Exited);
543 break;
544 }
545 },
546 }
547 });
548
549 let _ = work_sender.send(WorkMsg::Work(0));
550 let _ = work_sender.send(WorkMsg::Work(1));
551 let _ = work_sender.send(WorkMsg::Exit);
552
553 let mut counter = 0;
554
555 loop {
556 match result_receiver.recv() {
557 Ok(ResultMsg::Result(_)) => {
558 // 计数当前完成的工作单元
559 counter += 1;
560 }
561 Ok(ResultMsg::Exited) => {
562 // 断言检测:是在接收到两个请求以后退出的
563 assert_eq!(2, counter);
564 break;
565 }
566 _ => panic!("Error receiving a ResultMsg."),
567 }
568 }
569 }
570 ```
571
572 示例3 重构
573
574 ```rust
575 #[macro_use]
576 extern crate crossbeam_channel;
577 extern crate rayon;
578
579 use crossbeam_channel::unbounded;
580 use std::collections::HashMap;
581 // use std::sync::{Arc, Condvar, Mutex};
582
583 use parking_lot::{Condvar, Mutex};
584 use std::sync::Arc;
585 use std::thread;
586
587 enum WorkMsg {
588 Work(u8),
589 Exit,
590 }
591
592 enum ResultMsg {
593 Result(u8),
594 Exited,
595 }
596
597 struct WorkerState {
598 ongoing: i16,
599 exiting: bool,
600 }
601
602 impl WorkerState {
603 fn init() -> Self {
604 WorkerState{ ongoing: 0, exiting: false }
605 }
606
607 fn set_ongoing(&mut self, count: i16) {
608 self.ongoing += count;
609 }
610
611 fn set_exiting(&mut self, exit_state: bool) {
612 self.exiting = exit_state;
613 }
614
615 fn is_exiting(&self) -> bool {
616 self.exiting == true
617 }
618
619 fn is_nomore_work(&self)-> bool {
620 self.ongoing == 0
621 }
622
623 }
624
625 fn main() {
626 let (work_sender, work_receiver) = unbounded();
627 let (result_sender, result_receiver) = unbounded();
628 // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
629 let (pool_result_sender, pool_result_receiver) = unbounded();
630 let mut worker_state = WorkerState::init();
631
632 // 使用线程池
633 let pool = rayon::ThreadPoolBuilder::new()
634 .num_threads(2)
635 .build()
636 .unwrap();
637
638 let _ = thread::spawn(move || loop {
639 // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
640 select! {
641 recv(work_receiver) -> msg => {
642 match msg {
643 Ok(WorkMsg::Work(num)) => {
644 let result_sender = result_sender.clone();
645 let pool_result_sender = pool_result_sender.clone();
646
647 // 注意,这里正在池上启动一个新的工作单元。
648 worker_state.set_ongoing(1);
649
650 pool.spawn(move || {
651 // 1. 发送结果给「主组件」
652 result_sender.send(ResultMsg::Result(num));
653
654 // 2. 让并行组件知道这里完成了一个工作单元
655 pool_result_sender.send(());
656 });
657 },
658 Ok(WorkMsg::Exit) => {
659 // N注意,这里接收请求并退出
660 // exiting = true;
661 worker_state.set_exiting(true);
662
663 // 如果没有正则进行的工作则立即退出
664 if worker_state.is_nomore_work() {
665 result_sender.send(ResultMsg::Exited);
666 break;
667 }
668 },
669 _ => panic!("Error receiving a WorkMsg."),
670 }
671 },
672 recv(pool_result_receiver) -> _ => {
673 if worker_state.is_nomore_work() {
674 panic!("Received an unexpected pool result.");
675 }
676
677 // 注意,一个工作单元已经被完成
678 worker_state.set_ongoing(-1);
679
680 // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
681 if worker_state.is_nomore_work() && worker_state.is_exiting() {
682 result_sender.send(ResultMsg::Exited);
683 break;
684 }
685 },
686 }
687 });
688
689 work_sender.send(WorkMsg::Work(0));
690 work_sender.send(WorkMsg::Work(1));
691 work_sender.send(WorkMsg::Exit);
692
693 let mut counter = 0;
694
695 loop {
696 match result_receiver.recv() {
697 Ok(ResultMsg::Result(_)) => {
698 // 计数当前完成的工作单元
699 counter += 1;
700 }
701 Ok(ResultMsg::Exited) => {
702 // 断言检测:是在接收到两个请求以后退出的
703 assert_eq!(2, counter);
704 break;
705 }
706 _ => panic!("Error receiving a ResultMsg."),
707 }
708 }
709 }
710 ```
711
712 示例4: 使用缓存共享数据
713
714 ```text
715 +--------------+
716 | main thread | send work msg
717 +-----------------------------> | 主 组 件 | +--------------------------+
718 | receive result msg | | +
719 | +--------------+ work1
720 | |
721 | send result msg |
722 | +------------------------+ work0
723 | | | +
724 | v | |
725 | result channel | |
726 | +---+ +---------+------------------------+ |
727 | | | | thread | |
728 | +---+ | pool | |
729 | | | +----+---+ +----+----+ |
730 | +---+ | worker | | worker | |
731 | | | | thread | | thread | |
732 | +---+ | | | | |
733 | | | | | | | +
734 | +---+ | | | | work channel
735 | | | | |get +--------------+ get| | +---+
736 | +---+ | +--->+ work cache +<----+ | | |
737 | | | | | +--------------+ | | +---+
738 | +-+-+ | | | | | |
739 | | | | pool_res_channel | | +---+
740 | | | 并 +-------------------------+ 并 | | |
741 +--------------+ | 行 send msg when job finished 行 | +---+
742 | 组 +-------------------------+ 组 | | |
743 | 件 | | 件 | +---+
744 +----+---+ +-----+---+ | |
745 ^ ^ +---+
746 | | | |
747 +receive-work-msg+------------------+ +-+-+
748 | |
749 | |
750 +-------------------------------------+
751
752
753 ```
754
755 代码:
756
757 ```rust
758 #[macro_use]
759 extern crate crossbeam_channel;
760 extern crate rayon;
761
762 use crossbeam_channel::unbounded;
763 use std::collections::HashMap;
764 use std::sync::{Arc, Condvar, Mutex};
765
766 // use parking_lot::{Condvar, Mutex};
767 // use std::sync::Arc;
768 use std::thread;
769
770 enum WorkMsg {
771 Work(u8),
772 Exit,
773 }
774
775 enum ResultMsg {
776 Result(u8, WorkPerformed),
777 Exited,
778 }
779
780 struct WorkerState {
781 ongoing: i16,
782 exiting: bool,
783 }
784
785 impl WorkerState {
786 fn init() -> Self {
787 WorkerState{ ongoing: 0, exiting: false }
788 }
789
790 fn set_ongoing(&mut self, count: i16) {
791 self.ongoing += count;
792 }
793
794 fn set_exiting(&mut self, exit_state: bool) {
795 self.exiting = exit_state;
796 }
797
798 fn is_exiting(&self) -> bool {
799 self.exiting == true
800 }
801
802 fn is_nomore_work(&self)-> bool {
803 self.ongoing == 0
804 }
805 }
806
807 #[derive(Debug, Eq, PartialEq)]
808 enum WorkPerformed {
809 FromCache,
810 New,
811 }
812
813 #[derive(Eq, Hash, PartialEq)]
814 struct CacheKey(u8);
815
816 fn main() {
817 let (work_sender, work_receiver) = unbounded();
818 let (result_sender, result_receiver) = unbounded();
819 // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
820 let (pool_result_sender, pool_result_receiver) = unbounded();
821 let mut worker_state = WorkerState::init();
822
823 // 使用线程池
824 let pool = rayon::ThreadPoolBuilder::new()
825 .num_threads(2)
826 .build()
827 .unwrap();
828
829 // 缓存 work ,由 池 中的 worker 共享
830 let cache: Arc<Mutex<HashMap<CacheKey, u8>>> = Arc::new(Mutex::new(HashMap::new()));
831
832 let _ = thread::spawn(move || loop {
833 // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
834 select! {
835 recv(work_receiver) -> msg => {
836 match msg {
837 Ok(WorkMsg::Work(num)) => {
838 let result_sender = result_sender.clone();
839 let pool_result_sender = pool_result_sender.clone();
840 // 使用缓存
841 let cache = cache.clone();
842
843 // 注意,这里正在池上启动一个新的工作单元。
844 worker_state.set_ongoing(1);
845
846 pool.spawn(move || {
847 let num = {
848 // 缓存开始
849 let cache = cache.lock().unwrap();
850 let key = CacheKey(num);
851 if let Some(result) = cache.get(&key) {
852 // 从缓存中获得一个结果,并将其发送回去,
853 // 同时带有一个标志,表明是从缓存中获得了它
854 let _ = result_sender.send(ResultMsg::Result(result.clone(), WorkPerformed::FromCache));
855 let _ = pool_result_sender.send(());
856 return;
857 }
858 key.0
859 // 缓存结束
860 };
861
862 // work work work work work work...
863
864 // 返回结果,表明我们必须执行work
865 let _ = result_sender.send(ResultMsg::Result(num.clone(), WorkPerformed::New));
866
867 // 在缓存中存储“昂贵”的work.
868 let mut cache = cache.lock().unwrap();
869 let key = CacheKey(num.clone());
870 cache.insert(key, num);
871
872 let _ = pool_result_sender.send(());
873 });
874 },
875 Ok(WorkMsg::Exit) => {
876 // N注意,这里接收请求并退出
877 // exiting = true;
878 worker_state.set_exiting(true);
879
880 // 如果没有正则进行的工作则立即退出
881 if worker_state.is_nomore_work() {
882 result_sender.send(ResultMsg::Exited);
883 break;
884 }
885 },
886 _ => panic!("Error receiving a WorkMsg."),
887 }
888 },
889 recv(pool_result_receiver) -> _ => {
890 if worker_state.is_nomore_work() {
891 panic!("Received an unexpected pool result.");
892 }
893
894 // 注意,一个工作单元已经被完成
895 worker_state.set_ongoing(-1);
896
897 // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
898 if worker_state.is_nomore_work() && worker_state.is_exiting() {
899 result_sender.send(ResultMsg::Exited);
900 break;
901 }
902 },
903 }
904 });
905
906 let _ = work_sender.send(WorkMsg::Work(0));
907 // 发送两个相同的work
908 let _ = work_sender.send(WorkMsg::Work(1));
909 let _ = work_sender.send(WorkMsg::Work(1));
910 let _ = work_sender.send(WorkMsg::Exit);
911
912 let mut counter = 0;
913
914 loop {
915 match result_receiver.recv() {
916 Ok(ResultMsg::Result(_, _cached)) => {
917 // 计数当前完成的工作单元
918 counter += 1;
919 }
920 Ok(ResultMsg::Exited) => {
921 // 断言检测:是在接收到两个请求以后退出的
922 assert_eq!(3, counter);
923 break;
924 }
925 _ => panic!("Error receiving a ResultMsg."),
926 }
927 }
928 }
929 ```
930
931 示例5: 确保从缓存中取共享数据的行为是确定的
932
933 ```text
934 +--------------+
935 | main thread | send work msg
936 +-----------------------------> | 主 组 件 | +----------------------------------+
937 | receive result msg | | +
938 | +--------------+ work1
939 | |
940 | send result msg |
941 | +------------------------+ work0
942 | | | +
943 | v | thread |
944 | result channel | pool |
945 | +---+ |---------+--------------------------------| |
946 | | | +--------+ wait +------------+ wait +---------+ |
947 | +---+ | worker +<----------+ inproces +------->+ worker | |
948 | | | | thread | +---- ---+ | thread | |
949 | +---+ | | +--->+ work cache state +<--+ | | |
950 | | | | | | +-----+ +----+ | | | |
951 | +---+ | +--+ | ready | +-+ | |
952 | | | | | notify +----+---++ notify | | +
953 | +---+ | | <-----------+ | +-------->+ | work channel
954 | | | | | v | | +---+
955 | +---+ | | +--------+-----+ | | | |
956 | | | | | | work cache | | | +---+
957 | +-+-+ | | +--------------+ | | | |
958 | | | | pool_res_channel | | +---+
959 | | | 并 +---------------------------------+ 并 | | |
960 +--------------+ | 行 send msg when job finished 行 | +---+
961 | 组 +---------------------------------+ 组 | | |
962 | 件 | | 件 | +---+
963 +----+---+ +-----+---+ | |
964 ^ ^ +---+
965 | | | |
966 +receive-work-msg+--------------------------+ +-+-+
967 | |
968 | |
969 +---------------------------------------------+
970
971 ```
972
973 代码:
974
975 ```
976 #[macro_use]
977 extern crate crossbeam_channel;
978 extern crate rayon;
979
980 use crossbeam_channel::unbounded;
981 use std::collections::HashMap;
982 use std::sync::{Arc, Condvar, Mutex};
983
984 // use parking_lot::{Condvar, Mutex};
985 // use std::sync::Arc;
986 use std::thread;
987
988 enum WorkMsg {
989 Work(u8),
990 Exit,
991 }
992
993 #[derive(Debug, Eq, PartialEq)]
994 enum CacheState {
995 Ready,
996 WorkInProgress,
997 }
998
999 enum ResultMsg {
1000 Result(u8, WorkPerformed),
1001 Exited,
1002 }
1003
1004 struct WorkerState {
1005 ongoing: i16,
1006 exiting: bool,
1007 }
1008
1009 impl WorkerState {
1010 fn init() -> Self {
1011 WorkerState{ ongoing: 0, exiting: false }
1012 }
1013
1014 fn set_ongoing(&mut self, count: i16) {
1015 self.ongoing += count;
1016 }
1017
1018 fn set_exiting(&mut self, exit_state: bool) {
1019 self.exiting = exit_state;
1020 }
1021
1022 fn is_exiting(&self) -> bool {
1023 self.exiting == true
1024 }
1025
1026 fn is_nomore_work(&self)-> bool {
1027 self.ongoing == 0
1028 }
1029 }
1030
1031 #[derive(Debug, Eq, PartialEq)]
1032 enum WorkPerformed {
1033 FromCache,
1034 New,
1035 }
1036
1037 #[derive(Eq, Hash, PartialEq)]
1038 struct CacheKey(u8);
1039
1040 fn main() {
1041 let (work_sender, work_receiver) = unbounded();
1042 let (result_sender, result_receiver) = unbounded();
1043 // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
1044 let (pool_result_sender, pool_result_receiver) = unbounded();
1045 let mut worker_state = WorkerState::init();
1046
1047 // 使用线程池
1048 let pool = rayon::ThreadPoolBuilder::new()
1049 .num_threads(2)
1050 .build()
1051 .unwrap();
1052
1053 // 缓存 work ,由 池 中的 worker 共享
1054 let cache: Arc<Mutex<HashMap<CacheKey, u8>>> = Arc::new(Mutex::new(HashMap::new()));
1055
1056 // 增加缓存状态,指示对于给定的key,缓存是否已经准备好被读取。
1057 let cache_state: Arc<Mutex<HashMap<CacheKey, Arc<(Mutex<CacheState>, Condvar)>>>> =
1058 Arc::new(Mutex::new(HashMap::new()));
1059
1060 let _ = thread::spawn(move || loop {
1061 // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
1062 select! {
1063 recv(work_receiver) -> msg => {
1064 match msg {
1065 Ok(WorkMsg::Work(num)) => {
1066 let result_sender = result_sender.clone();
1067 let pool_result_sender = pool_result_sender.clone();
1068 // 使用缓存
1069 let cache = cache.clone();
1070 let cache_state = cache_state.clone();
1071
1072 // 注意,这里正在池上启动一个新的工作单元。
1073 worker_state.set_ongoing(1);
1074
1075 pool.spawn(move || {
1076 let num = {
1077 let (cache_state_lock, cvar) = {
1078 // `cache_state` 临界区开始
1079 let mut state_map = cache_state.lock().unwrap();
1080 &*state_map
1081 .entry(CacheKey(num.clone()))
1082 .or_insert_with(|| {
1083 Arc::new((
1084 Mutex::new(CacheState::Ready),
1085 Condvar::new(),
1086 ))
1087 })
1088 .clone()
1089 // `cache_state` 临界区结束
1090 };
1091
1092 // `state` 临界区开始
1093 let mut state = cache_state_lock.lock().unwrap();
1094
1095 // 注意:使用while循环来防止条件变量的虚假唤醒
1096 while let CacheState::WorkInProgress = *state {
1097 // 阻塞直到状态是 `CacheState::Ready`.
1098 //
1099 // 当唤醒时会自动释放锁
1100 let current_state = cvar
1101 .wait(state)
1102 .unwrap();
1103 state = current_state;
1104 }
1105
1106 // 循环外可以认为state 已经是 Ready 的了
1107 assert_eq!(*state, CacheState::Ready);
1108
1109 let (num, result) = {
1110 // 缓存临界区开始
1111 let cache = cache.lock().unwrap();
1112 let key = CacheKey(num);
1113 let result = match cache.get(&key) {
1114 Some(result) => Some(result.clone()),
1115 None => None,
1116 };
1117 (key.0, result)
1118 // 缓存临界区结束
1119 };
1120
1121 if let Some(result) = result {
1122 // 从缓存中获得一个结果,并将其发送回去,
1123 // 同时带有一个标志,表明是从缓存中获得了它
1124 let _ = result_sender.send(ResultMsg::Result(result, WorkPerformed::FromCache));
1125 let _ = pool_result_sender.send(());
1126
1127 // 不要忘记通知等待线程
1128 cvar.notify_one();
1129 return;
1130 } else {
1131 // 如果缓存里没有找到结果,那么切换状态
1132 *state = CacheState::WorkInProgress;
1133 num
1134 }
1135 // `state` 临界区结束
1136 };
1137
1138 // 在临界区外做更多「昂贵工作」
1139
1140 let _ = result_sender.send(ResultMsg::Result(num.clone(), WorkPerformed::New));
1141
1142 {
1143 // 缓存临界区开始
1144 // 插入工作结果到缓存中
1145 let mut cache = cache.lock().unwrap();
1146 let key = CacheKey(num.clone());
1147 cache.insert(key, num);
1148 // 缓存临界区结束
1149 }
1150
1151 let (lock, cvar) = {
1152 let mut state_map = cache_state.lock().unwrap();
1153 &*state_map
1154 .get_mut(&CacheKey(num))
1155 .expect("Entry in cache state to have been previously inserted")
1156 .clone()
1157 };
1158 // 重新进入 `state` 临界区
1159 let mut state = lock.lock().unwrap();
1160
1161 // 在这里,由于已经提前设置了state,并且任何其他worker都将等待状态切换回ready,可以确定该状态是“in-progress”。
1162 assert_eq!(*state, CacheState::WorkInProgress);
1163
1164 // 切换状态为 Ready
1165 *state = CacheState::Ready;
1166
1167 // 通知等待线程
1168 cvar.notify_one();
1169
1170 let _ = pool_result_sender.send(());
1171 });
1172 },
1173 Ok(WorkMsg::Exit) => {
1174 // N注意,这里接收请求并退出
1175 // exiting = true;
1176 worker_state.set_exiting(true);
1177
1178 // 如果没有正则进行的工作则立即退出
1179 if worker_state.is_nomore_work() {
1180 result_sender.send(ResultMsg::Exited);
1181 break;
1182 }
1183 },
1184 _ => panic!("Error receiving a WorkMsg."),
1185 }
1186 },
1187 recv(pool_result_receiver) -> _ => {
1188 if worker_state.is_nomore_work() {
1189 panic!("Received an unexpected pool result.");
1190 }
1191
1192 // 注意,一个工作单元已经被完成
1193 worker_state.set_ongoing(-1);
1194
1195 // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
1196 if worker_state.is_nomore_work() && worker_state.is_exiting() {
1197 result_sender.send(ResultMsg::Exited);
1198 break;
1199 }
1200 },
1201 }
1202 });
1203
1204 let _ = work_sender.send(WorkMsg::Work(0));
1205 // 发送两个相同的work
1206 let _ = work_sender.send(WorkMsg::Work(1));
1207 let _ = work_sender.send(WorkMsg::Work(1));
1208 let _ = work_sender.send(WorkMsg::Exit);
1209
1210 let mut counter = 0;
1211
1212 // 当work 是 1 的时候重新计数
1213 let mut work_one_counter = 0;
1214
1215 loop {
1216 match result_receiver.recv() {
1217 Ok(ResultMsg::Result(num, cached)) => {
1218 counter += 1;
1219
1220 if num == 1 {
1221 work_one_counter += 1;
1222 }
1223
1224 // 现在我们可以断言,当收到 num 为 1 的第二个结果时,它已经来自缓存。
1225 if num == 1 && work_one_counter == 2 {
1226 assert_eq!(cached, WorkPerformed::FromCache);
1227 }
1228 }
1229 Ok(ResultMsg::Exited) => {
1230 assert_eq!(3, counter);
1231 break;
1232 }
1233 _ => panic!("Error receiving a ResultMsg."),
1234 }
1235 }
1236 }
1237 ```
1238*/
1239pub fn understand_channel_and_condvar() {
1240 println!(" 线程间安全共享数据 ");
1241}