sui_rust/ch02/
s3_thread_safe.rs

1//! 第二章:Rust核心概念
2//! 2.3 Thread Safe
3
4/**
5    ### 理解本地线程,理解并发
6
7    - 并发:同时「应对」很多事的能力
8    - 并行:同时「执行」很多事的能力
9
10    相关类型:
11
12    - [Duration](https://doc.rust-lang.org/std/time/struct.Duration.html)
13    - [JoinHandle](https://doc.rust-lang.org/std/thread/struct.JoinHandle.html)
14
15    ```
16    use std::thread;
17
18    fn main() {
19        // Duration 实现了 Copy、Send、Sync
20        let duration = std::time::Duration::from_millis(3000);
21
22        println!("Main thread");
23
24        let handle  = thread::spawn(move || {
25            println!("Sub thread 1");
26
27            // 注意:它的父线程是主线程,而不是线程1
28            let handle2 = thread::spawn( move || {
29                println!("Sub thread 2");
30                thread::sleep(duration);
31            });
32
33            handle2.join().unwrap();
34            thread::sleep(duration);
35        });
36
37        handle.join().unwrap();
38        thread::sleep(duration);
39    }
40*/
41pub fn understand_local_thread() {
42    println!(" 理解本地线程 ");
43}
44
45/**
46    ### 线程间共享数据
47
48    [https://doc.rust-lang.org/std/time/struct.Duration.html](https://doc.rust-lang.org/std/time/struct.Duration.html)
49
50    ```
51    use std::thread;
52
53    fn main() {
54        let mut v = vec![1,2,3];
55        thread::spawn(move || {
56            v.push(4);
57        });
58        // Can no longer access `v` here.
59    }
60    ```
61
62    ```
63    // invalid
64    use std::thread;
65
66    fn main() {
67        let mut v = vec![1,2,3];
68        for i in 0..10 {
69            thread::spawn(move || {
70                v.push(i);
71            });
72        }
73    }
74    ```
75
76    借用检查阻止并发Bug
77
78    ```
79    // invalid
80    fn inner_func(vref: &mut Vec<u32>) {
81        std::thread::spawn(move || {
82        vref.push(3);
83        });
84    }
85
86    fn main() {
87        let mut v = vec![1,2,3];
88        inner_func(&mut v);
89    }
90    ```
91
92    `'static' 与 线程安全
93
94    Note: [曾经的 thread::scoped 会泄漏 JoinGuard 所以被废弃](https://github.com/rust-lang/rust/issues/24292)
95
96    ```
97    use std::fmt;
98    use std::time::Duration;
99    use std::thread;
100
101    struct Foo {
102        string: String,
103        v: Vec<f64>,
104    }
105
106    impl fmt::Display for Foo {
107        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108            write!(f, "{}: {:?}", self.string, self.v)
109        }
110    }
111
112    fn test<T: Send + Sync + fmt::Display + 'static >(val: T) {
113        thread::spawn(move || println!("{}", val));
114    }
115
116    fn main() {
117        test("hello");                // &'static str
118        test(String::from("hello"));  // String
119        test(5);                      // i32
120
121        // Arbitrary struct containing String and Vec<f64>
122        test(Foo {string: String::from("hi"), v: vec![1.2, 2.3]});
123        thread::sleep(Duration::new(1, 0));
124    }
125    ```
126
127    使用 crossbeam::scope 共享数据
128
129    ```rust
130    use crossbeam;
131    use std::{thread, time::Duration};
132
133    fn main() {
134        let mut vec = vec![1, 2, 3, 4, 5];
135
136        crossbeam::scope(|scope| {
137            for e in &vec {
138                scope.spawn(move |_| {
139                    println!("{:?}", e);
140                });
141            }
142        })
143        .expect("A child thread panicked");
144
145        println!("{:?}", vec);
146    }
147    ```
148
149    scope thread 修改数据
150
151    ```rust
152    use crossbeam; // 0.6.0
153    use std::{thread, time::Duration};
154
155    fn main() {
156        let mut vec = vec![1, 2, 3, 4, 5];
157
158        crossbeam::scope(|scope| {
159            for e in &mut vec {
160                scope.spawn(move |_| {
161                    thread::sleep(Duration::from_secs(1));
162                    *e += 1;
163                });
164            }
165        })
166        .expect("A child thread panicked");
167
168        println!("{:?}", vec);
169    }
170    ```
171*/
172pub fn understand_shared_thread() {
173    println!(" 线程间共享数据 ");
174}
175
176/**
177    ### 使用 Arc 和 Mutex 安全共享数据
178
179    ```
180    use std::sync::{Arc, Mutex};
181    use std::thread;
182
183    fn main() {
184        let v = Arc::new(Mutex::new(vec![1,2,3]));
185
186        for i in 0..3 {
187            let cloned_v = v.clone();
188            thread::spawn(move || {
189                cloned_v.lock().unwrap().push(i);
190            });
191        }
192    }
193    ```
194*/
195pub fn understand_safed_shared_thread() {
196    println!(" 线程间安全共享数据 ");
197}
198
199/**
200    ### 构建「无悔」并发系统
201
202    使用 channel 和 condvar : 模拟并行组件
203
204    - [parking_lot](https://github.com/Amanieu/parking_lot)
205    - [crossbeam](https://github.com/crossbeam-rs/crossbeam)
206
207    > 1. Rust 保证安全性上「无畏」,但不保证工程性上的「无悔」。
208    > 2. 但 Rust 有提供帮助我们建立「无悔」并发的「工具」。
209    > 3. 通过这些工具,结合从实际沉淀出来并发模型的最佳默认模式「event-loop」来建立健壮的并发应用。
210    > 4. 拓展阅读:
211    > [Rust concurrency patterns: regret-less concurrency](https://medium.com/@polyglot_factotum/rust-regret-less-concurrency-2238b9e53333)
212
213
214    示例1: 用 channel 模拟 event
215
216    ```text
217
218
219                                    +--------------+
220                                    | main thread  |      send work msg
221    +-----------------------------> |    主 组 件     |  +-------------+
222    |           receive result msg  |              |                  |
223    |                               +--------------+                  work1
224    |                                                                 |
225    |                       send result msg                           |
226    |              +-----------------------+                          work1
227    |              |                       |                          |
228    |              v                       |                          v
229    |        result channel                |                      work channel
230    |            +---+                     |                         +---+
231    |            |   |                     |                         |   |
232    |            +---+                     |                         +---+
233    |            |   |                     |                         |   |
234    |            +---+                 +---+----+                    +---+
235    |            |   |                 | worker |                    |   |
236    |            +---+                 | thread |                    +---+
237    |            |   |                 |   并    |                   |   |
238    |            +---+                 |   行    |                   +---+
239    |            |   |                 |   组    |                   |   |
240    |            +---+                 |   件    |                   +---+
241    |            |   |                 +----+---+                    |   |
242    |            +-+-+                      ^                        +-+-+
243    |              |                        |receive work msg          |
244    |              |                        |                          |
245    +--------------+                        +--------------------------+
246
247    ```
248
249    代码
250
251    ```
252    #[macro_use]
253    extern crate crossbeam_channel;
254    extern crate rayon;
255
256    use crossbeam_channel::unbounded;
257    use std::collections::HashMap;
258    use std::sync::{Arc, Condvar, Mutex};
259    // use parking_lot::{Mutex, Condvar};
260    // use std::sync::Arc;
261    use std::thread;
262
263    // 此消息用于发送到与「主组件」并行运行的其他组件。
264    enum WorkMsg {
265        Work(u8),
266        Exit,
267    }
268
269    // 此消息用于从并行运行的其他组件 发送回「主组件」。
270    enum ResultMsg {
271        Result(u8),
272        Exited,
273    }
274
275    fn main() {
276        let (work_sender, work_receiver) = unbounded();
277        let (result_sender, result_receiver) = unbounded();
278
279        // 生成子线程用于执行另一个并行组件
280        let _ = thread::spawn(move || loop {
281            // 接收并处理消息,直到收到 exit 消息
282            match work_receiver.recv() {
283                Ok(WorkMsg::Work(num)) => {
284                    // 执行一些工作,并且发送消息给 Result 队列
285                    let _ = result_sender.send(ResultMsg::Result(num));
286                }
287                Ok(WorkMsg::Exit) => {
288                    // 发送 exit 确认消息
289                    let _ = result_sender.send(ResultMsg::Exited);
290                    break;
291                }
292                _ => panic!("Error receiving a WorkMsg."),
293            }
294        });
295
296        let _ = work_sender.send(WorkMsg::Work(0));
297        let _ = work_sender.send(WorkMsg::Work(1));
298        let _ = work_sender.send(WorkMsg::Exit);
299
300        // worker执行计数
301        let mut counter = 0;
302
303        loop {
304            match result_receiver.recv() {
305                Ok(ResultMsg::Result(num)) => {
306                    // 断言确保接收和发送的顺序是一致的
307                    assert_eq!(num, counter);
308                    counter += 1;
309                }
310                Ok(ResultMsg::Exited) => {
311                    // 断言确保在接收两条工作消息之后收到退出消息
312                    assert_eq!(2, counter);
313                    break;
314                }
315                _ => panic!("Error receiving a ResultMsg."),
316            }
317        }
318    }
319    ```
320
321    示例二:引入线程池,工作的顺序将无法确定
322
323    ```text
324                                    +--------------+
325                                    | main thread  |      send work msg
326    +-----------------------------> |    主 组 件   |  +---------------+
327    |           receive result msg  |              |                  |
328    |                               +--------------+                  work1
329    |                                                                 |
330    |                       send result msg                           |
331    |              +-----------------------+                          work0
332    |              |                       |                          |
333    |              v                       |                          v
334    |        result channel       +--------+------+               work channel
335    |            +---+            |               |                  +---+
336    |            |   |            |               |                  |   |
337    |            +---+       +----+---+      +----+----+             +---+
338    |            |   |       | worker |      |  worker |             |   |
339    |            +---+       | thread |thread|  thread |             +---+
340    |            |   |       |   并    | pool|    并    |             |   |
341    |            +---+       |   行    |     |    行    |             +---+
342    |            |   |       |   组    |     |    组    |             |   |
343    |            +---+       |   件    |     |    件    |             +---+
344    |            |   |       +----+---+      +-----+---+             |   |
345    |            +---+            ^                ^                 +---+
346    |            |   |            |                |                 |   |
347    |            +-+-+            +receive-work-msg+                 +-+-+
348    |              |                        |                          |
349    |              |                        |                          |
350    +--------------+                        +--------------------------+
351
352    ```
353
354    代码:
355
356    ```rust
357    #[macro_use]
358    extern crate crossbeam_channel;
359    extern crate rayon;
360
361    use crossbeam_channel::unbounded;
362    use std::collections::HashMap;
363    // use std::sync::{Arc, Condvar, Mutex};
364
365    use parking_lot::{Condvar, Mutex};
366    use std::sync::Arc;
367    use std::thread;
368
369    enum WorkMsg {
370        Work(u8),
371        Exit,
372    }
373
374    enum ResultMsg {
375        Result(u8),
376        Exited,
377    }
378
379    fn main() {
380        let (work_sender, work_receiver) = unbounded();
381        let (result_sender, result_receiver) = unbounded();
382        // 引入线程池,开两个工作线程
383        let pool = rayon::ThreadPoolBuilder::new()
384            .num_threads(2)
385            .build()
386            .unwrap();
387
388        let _ = thread::spawn(move || loop {
389            match work_receiver.recv() {
390                Ok(WorkMsg::Work(num)) => {
391                    let result_sender = result_sender.clone();
392                    // 使用线程池中的线程
393                    pool.spawn(move || {
394                        // 执行一些工作,并且发送消息给 Result 队列
395                        let _ = result_sender.send(ResultMsg::Result(num));
396                    });
397                }
398                Ok(WorkMsg::Exit) => {
399                    let _ = result_sender.send(ResultMsg::Exited);
400                    break;
401                }
402                _ => panic!("Error receiving a WorkMsg."),
403            }
404        });
405
406        let _ = work_sender.send(WorkMsg::Work(0));
407        let _ = work_sender.send(WorkMsg::Work(1));
408        let _ = work_sender.send(WorkMsg::Exit);
409
410        loop {
411            match result_receiver.recv() {
412                Ok(ResultMsg::Result(_)) => {
413                    // 不能再断言顺序了
414                }
415                Ok(ResultMsg::Exited) => {
416                    // 也不能断言在退出消息之前已经收到了结果
417                    break;
418                }
419                _ => panic!("Error receiving a ResultMsg."),
420            }
421        }
422    }
423    ```
424
425    示例3: 确保工作结束再退出
426
427    ```text
428
429                                        +--------------+
430                                        | main thread  |      send work msg
431        +-----------------------------> |    主 组 件   |  +--------------------------+
432        |           receive result msg  |              |                             +
433        |                               +--------------+                             work1
434        |                                                                            |
435        |                       send result msg                                      |
436        |              +-----------------------+                                     work0
437        |              |                       |                                     |
438        |              v                       |                                     v
439        |        result channel       +--------+-------------------------+       work channel
440        |            +---+            |              thread              |          +---+
441        |            |   |            |               pool               |          |   |
442        |            +---+       +----+---+                         +----+----+     +---+
443        |            |   |       | worker |                         |  worker |     |   |
444        |            +---+       | thread |     pool_res_channel    |  thread |     +---+
445        |            |   |       |   并   +-------------------------+    并    |     |   |
446        |            +---+       |   行    send msg when job finished     行   |     +---+
447        |            |   |       |   组   +-------------------------+    组    |     |   |
448        |            +---+       |   件   |                         |    件    |     +---+
449        |            |   |       +----+---+                         +-----+---+     |   |
450        |            +---+            ^                                   ^         +---+
451        |            |   |            |                                   |         |   |
452        |            +-+-+            +receive-work-msg+------------------+         +-+-+
453        |              |                        |                                     |
454        |              |                        |                                     |
455        +--------------+                        +-------------------------------------+
456
457
458
459    ```
460
461    代码:
462
463    ```rust
464    #[macro_use]
465    extern crate crossbeam_channel;
466    extern crate rayon;
467
468    use crossbeam_channel::unbounded;
469    use std::collections::HashMap;
470    // use std::sync::{Arc, Condvar, Mutex};
471
472    use parking_lot::{Condvar, Mutex};
473    use std::sync::Arc;
474    use std::thread;
475
476    enum WorkMsg {
477        Work(u8),
478        Exit,
479    }
480
481    enum ResultMsg {
482        Result(u8),
483        Exited,
484    }
485
486    fn main() {
487        let (work_sender, work_receiver) = unbounded();
488        let (result_sender, result_receiver) = unbounded();
489        // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
490        let (pool_result_sender, pool_result_receiver) = unbounded();
491        let mut ongoing_work = 0;
492        let mut exiting = false;
493        // 使用线程池
494        let pool = rayon::ThreadPoolBuilder::new()
495            .num_threads(2)
496            .build()
497            .unwrap();
498
499        let _ = thread::spawn(move || loop {
500            // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
501            select! {
502                recv(work_receiver) -> msg => {
503                    match msg {
504                        Ok(WorkMsg::Work(num)) => {
505                            let result_sender = result_sender.clone();
506                            let pool_result_sender = pool_result_sender.clone();
507
508                            // 注意,这里正在池上启动一个新的工作单元。
509                            ongoing_work += 1;
510
511                            pool.spawn(move || {
512                                // 1. 发送结果给「主组件」
513                                let _ = result_sender.send(ResultMsg::Result(num));
514
515                                // 2. 让并行组件知道这里完成了一个工作单元
516                                let _ = pool_result_sender.send(());
517                            });
518                        },
519                        Ok(WorkMsg::Exit) => {
520                            // N注意,这里接收请求并退出
521                            exiting = true;
522
523                            // 如果没有正则进行的工作则立即退出
524                            if ongoing_work == 0 {
525                                let _ = result_sender.send(ResultMsg::Exited);
526                                break;
527                            }
528                        },
529                        _ => panic!("Error receiving a WorkMsg."),
530                    }
531                },
532                recv(pool_result_receiver) -> _ => {
533                    if ongoing_work == 0 {
534                        panic!("Received an unexpected pool result.");
535                    }
536
537                    // 注意,一个工作单元已经被完成
538                    ongoing_work -=1;
539
540                    // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
541                    if ongoing_work == 0 && exiting {
542                        let _ = result_sender.send(ResultMsg::Exited);
543                        break;
544                    }
545                },
546            }
547        });
548
549        let _ = work_sender.send(WorkMsg::Work(0));
550        let _ = work_sender.send(WorkMsg::Work(1));
551        let _ = work_sender.send(WorkMsg::Exit);
552
553        let mut counter = 0;
554
555        loop {
556            match result_receiver.recv() {
557                Ok(ResultMsg::Result(_)) => {
558                    // 计数当前完成的工作单元
559                    counter += 1;
560                }
561                Ok(ResultMsg::Exited) => {
562                    // 断言检测:是在接收到两个请求以后退出的
563                    assert_eq!(2, counter);
564                    break;
565                }
566                _ => panic!("Error receiving a ResultMsg."),
567            }
568        }
569    }
570    ```
571
572    示例3 重构
573
574    ```rust
575    #[macro_use]
576    extern crate crossbeam_channel;
577    extern crate rayon;
578
579    use crossbeam_channel::unbounded;
580    use std::collections::HashMap;
581    // use std::sync::{Arc, Condvar, Mutex};
582
583    use parking_lot::{Condvar, Mutex};
584    use std::sync::Arc;
585    use std::thread;
586
587    enum WorkMsg {
588        Work(u8),
589        Exit,
590    }
591
592    enum ResultMsg {
593        Result(u8),
594        Exited,
595    }
596
597    struct WorkerState {
598        ongoing: i16,
599        exiting: bool,
600    }
601
602    impl WorkerState {
603        fn init() -> Self {
604            WorkerState{ ongoing: 0, exiting: false }
605        }
606
607        fn set_ongoing(&mut self, count: i16) {
608            self.ongoing += count;
609        }
610
611        fn set_exiting(&mut self, exit_state: bool) {
612            self.exiting = exit_state;
613        }
614
615        fn is_exiting(&self) -> bool {
616            self.exiting == true
617        }
618
619        fn is_nomore_work(&self)-> bool {
620            self.ongoing == 0
621        }
622
623    }
624
625    fn main() {
626        let (work_sender, work_receiver) = unbounded();
627        let (result_sender, result_receiver) = unbounded();
628        // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
629        let (pool_result_sender, pool_result_receiver) = unbounded();
630        let mut worker_state = WorkerState::init();
631
632        // 使用线程池
633        let pool = rayon::ThreadPoolBuilder::new()
634            .num_threads(2)
635            .build()
636            .unwrap();
637
638        let _ = thread::spawn(move || loop {
639            // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
640            select! {
641                recv(work_receiver) -> msg => {
642                    match msg {
643                        Ok(WorkMsg::Work(num)) => {
644                            let result_sender = result_sender.clone();
645                            let pool_result_sender = pool_result_sender.clone();
646
647                            // 注意,这里正在池上启动一个新的工作单元。
648                            worker_state.set_ongoing(1);
649
650                            pool.spawn(move || {
651                                // 1. 发送结果给「主组件」
652                                result_sender.send(ResultMsg::Result(num));
653
654                                // 2. 让并行组件知道这里完成了一个工作单元
655                                pool_result_sender.send(());
656                            });
657                        },
658                        Ok(WorkMsg::Exit) => {
659                            // N注意,这里接收请求并退出
660                            // exiting = true;
661                            worker_state.set_exiting(true);
662
663                            // 如果没有正则进行的工作则立即退出
664                            if worker_state.is_nomore_work() {
665                                result_sender.send(ResultMsg::Exited);
666                                break;
667                            }
668                        },
669                        _ => panic!("Error receiving a WorkMsg."),
670                    }
671                },
672                recv(pool_result_receiver) -> _ => {
673                    if worker_state.is_nomore_work() {
674                        panic!("Received an unexpected pool result.");
675                    }
676
677                    // 注意,一个工作单元已经被完成
678                    worker_state.set_ongoing(-1);
679
680                    // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
681                    if worker_state.is_nomore_work() && worker_state.is_exiting() {
682                        result_sender.send(ResultMsg::Exited);
683                        break;
684                    }
685                },
686            }
687        });
688
689        work_sender.send(WorkMsg::Work(0));
690        work_sender.send(WorkMsg::Work(1));
691        work_sender.send(WorkMsg::Exit);
692
693        let mut counter = 0;
694
695        loop {
696            match result_receiver.recv() {
697                Ok(ResultMsg::Result(_)) => {
698                    // 计数当前完成的工作单元
699                    counter += 1;
700                }
701                Ok(ResultMsg::Exited) => {
702                    // 断言检测:是在接收到两个请求以后退出的
703                    assert_eq!(2, counter);
704                    break;
705                }
706                _ => panic!("Error receiving a ResultMsg."),
707            }
708        }
709    }
710    ```
711
712    示例4: 使用缓存共享数据
713
714    ```text
715                                        +--------------+
716                                        | main thread  |      send work msg
717        +-----------------------------> |    主 组 件   |  +--------------------------+
718        |           receive result msg  |              |                             +
719        |                               +--------------+                             work1
720        |                                                                            |
721        |                       send result msg                                      |
722        |              +------------------------+                                    work0
723        |              |                        |                                    +
724        |              v                        |                                    |
725        |        result channel                 |                                    |
726        |            +---+            +---------+------------------------+           |
727        |            |   |            |              thread              |           |
728        |            +---+            |               pool               |           |
729        |            |   |       +----+---+                         +----+----+      |
730        |            +---+       | worker |                         |  worker |      |
731        |            |   |       | thread |                         |  thread |      |
732        |            +---+       |        |                         |         |      |
733        |            |   |       |        |                         |         |      +
734        |            +---+       |        |                         |         |  work channel
735        |            |   |       |        |get +--------------+  get|         |     +---+
736        |            +---+       |        +--->+  work cache  +<----+         |     |   |
737        |            |   |       |        |    +--------------+     |         |     +---+
738        |            +-+-+       |        |                         |         |     |   |
739        |              |         |        |     pool_res_channel    |         |     +---+
740        |              |         |   并    +-------------------------+    并   |     |   |
741        +--------------+         |   行    send msg when job finished     行   |     +---+
742                                 |   组    +-------------------------+    组    |    |   |
743                                 |   件    |                         |    件    |    +---+
744                                 +----+---+                         +-----+---+     |   |
745                                      ^                                   ^         +---+
746                                      |                                   |         |   |
747                                      +receive-work-msg+------------------+         +-+-+
748                                                |                                     |
749                                                |                                     |
750                                                +-------------------------------------+
751
752
753    ```
754
755    代码:
756
757    ```rust
758    #[macro_use]
759    extern crate crossbeam_channel;
760    extern crate rayon;
761
762    use crossbeam_channel::unbounded;
763    use std::collections::HashMap;
764    use std::sync::{Arc, Condvar, Mutex};
765
766    // use parking_lot::{Condvar, Mutex};
767    // use std::sync::Arc;
768    use std::thread;
769
770    enum WorkMsg {
771        Work(u8),
772        Exit,
773    }
774
775    enum ResultMsg {
776        Result(u8, WorkPerformed),
777        Exited,
778    }
779
780    struct WorkerState {
781        ongoing: i16,
782        exiting: bool,
783    }
784
785    impl WorkerState {
786        fn init() -> Self {
787            WorkerState{ ongoing: 0, exiting: false }
788        }
789
790        fn set_ongoing(&mut self, count: i16) {
791            self.ongoing += count;
792        }
793
794        fn set_exiting(&mut self, exit_state: bool) {
795            self.exiting = exit_state;
796        }
797
798        fn is_exiting(&self) -> bool {
799            self.exiting == true
800        }
801
802        fn is_nomore_work(&self)-> bool {
803            self.ongoing == 0
804        }
805    }
806
807    #[derive(Debug, Eq, PartialEq)]
808    enum WorkPerformed {
809        FromCache,
810        New,
811    }
812
813    #[derive(Eq, Hash, PartialEq)]
814    struct CacheKey(u8);
815
816    fn main() {
817        let (work_sender, work_receiver) = unbounded();
818        let (result_sender, result_receiver) = unbounded();
819        // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
820        let (pool_result_sender, pool_result_receiver) = unbounded();
821        let mut worker_state = WorkerState::init();
822
823        // 使用线程池
824        let pool = rayon::ThreadPoolBuilder::new()
825            .num_threads(2)
826            .build()
827            .unwrap();
828
829        // 缓存 work ,由 池 中的 worker 共享
830        let cache: Arc<Mutex<HashMap<CacheKey, u8>>> = Arc::new(Mutex::new(HashMap::new()));
831
832        let _ = thread::spawn(move || loop {
833            // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
834            select! {
835                recv(work_receiver) -> msg => {
836                    match msg {
837                        Ok(WorkMsg::Work(num)) => {
838                            let result_sender = result_sender.clone();
839                            let pool_result_sender = pool_result_sender.clone();
840                            // 使用缓存
841                            let cache = cache.clone();
842
843                            // 注意,这里正在池上启动一个新的工作单元。
844                            worker_state.set_ongoing(1);
845
846                            pool.spawn(move || {
847                                let num = {
848                                    // 缓存开始
849                                    let cache = cache.lock().unwrap();
850                                    let key = CacheKey(num);
851                                    if let Some(result) = cache.get(&key) {
852                                        // 从缓存中获得一个结果,并将其发送回去,
853                                        // 同时带有一个标志,表明是从缓存中获得了它
854                                        let _ = result_sender.send(ResultMsg::Result(result.clone(), WorkPerformed::FromCache));
855                                        let _ = pool_result_sender.send(());
856                                        return;
857                                    }
858                                    key.0
859                                    // 缓存结束
860                                };
861
862                                // work work work work work work...
863
864                                // 返回结果,表明我们必须执行work
865                                let _ = result_sender.send(ResultMsg::Result(num.clone(), WorkPerformed::New));
866
867                                // 在缓存中存储“昂贵”的work.
868                                let mut cache = cache.lock().unwrap();
869                                let key = CacheKey(num.clone());
870                                cache.insert(key, num);
871
872                                let _ = pool_result_sender.send(());
873                            });
874                        },
875                        Ok(WorkMsg::Exit) => {
876                            // N注意,这里接收请求并退出
877                            // exiting = true;
878                            worker_state.set_exiting(true);
879
880                            // 如果没有正则进行的工作则立即退出
881                            if worker_state.is_nomore_work() {
882                                result_sender.send(ResultMsg::Exited);
883                                break;
884                            }
885                        },
886                        _ => panic!("Error receiving a WorkMsg."),
887                    }
888                },
889                recv(pool_result_receiver) -> _ => {
890                    if worker_state.is_nomore_work() {
891                        panic!("Received an unexpected pool result.");
892                    }
893
894                    // 注意,一个工作单元已经被完成
895                    worker_state.set_ongoing(-1);
896
897                    // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
898                    if worker_state.is_nomore_work() && worker_state.is_exiting() {
899                        result_sender.send(ResultMsg::Exited);
900                        break;
901                    }
902                },
903            }
904        });
905
906        let _ = work_sender.send(WorkMsg::Work(0));
907        // 发送两个相同的work
908        let _ = work_sender.send(WorkMsg::Work(1));
909        let _ = work_sender.send(WorkMsg::Work(1));
910        let _ = work_sender.send(WorkMsg::Exit);
911
912        let mut counter = 0;
913
914        loop {
915            match result_receiver.recv() {
916                Ok(ResultMsg::Result(_, _cached)) => {
917                    // 计数当前完成的工作单元
918                    counter += 1;
919                }
920                Ok(ResultMsg::Exited) => {
921                    // 断言检测:是在接收到两个请求以后退出的
922                    assert_eq!(3, counter);
923                    break;
924                }
925                _ => panic!("Error receiving a ResultMsg."),
926            }
927        }
928    }
929    ```
930
931    示例5: 确保从缓存中取共享数据的行为是确定的
932
933    ```text
934                                        +--------------+
935                                        | main thread  |      send work msg
936        +-----------------------------> |    主 组 件   |  +----------------------------------+
937        |           receive result msg  |              |                                     +
938        |                               +--------------+                                     work1
939        |                                                                                    |
940        |                       send result msg                                              |
941        |              +------------------------+                                            work0
942        |              |                        |                                            +
943        |              v                        |          thread                            |
944        |        result channel                 |           pool                             |
945        |            +---+            |---------+--------------------------------|           |
946        |            |   |       +--------+   wait    +------------+  wait  +---------+      |
947        |            +---+       | worker +<----------+  inproces  +------->+  worker |      |
948        |            |   |       | thread |       +----            ---+     |  thread |      |
949        |            +---+       |        |  +--->+ work cache state  +<--+ |         |      |
950        |            |   |       |        |  |    +-----+        +----+   | |         |      |
951        |            +---+       |        +--+          |  ready |        +-+         |      |
952        |            |   |       |        |      notify +----+---++  notify |         |      +
953        |            +---+       |        | <-----------+    |    +-------->+         |  work channel
954        |            |   |       |        |                  v              |         |     +---+
955        |            +---+       |        |         +--------+-----+        |         |     |   |
956        |            |   |       |        |         |  work cache  |        |         |     +---+
957        |            +-+-+       |        |         +--------------+        |         |     |   |
958        |              |         |        |     pool_res_channel            |         |     +---+
959        |              |         |   并    +---------------------------------+    并   |     |   |
960        +--------------+         |   行        send msg when job finished         行   |     +---+
961                                 |   组    +---------------------------------+    组   |     |   |
962                                 |   件    |                                 |    件   |     +---+
963                                 +----+---+                                  +-----+---+    |   |
964                                      ^                                           ^         +---+
965                                      |                                           |         |   |
966                                      +receive-work-msg+--------------------------+         +-+-+
967                                                |                                             |
968                                                |                                             |
969                                                +---------------------------------------------+
970
971    ```
972
973    代码:
974
975    ```
976    #[macro_use]
977    extern crate crossbeam_channel;
978    extern crate rayon;
979
980    use crossbeam_channel::unbounded;
981    use std::collections::HashMap;
982    use std::sync::{Arc, Condvar, Mutex};
983
984    // use parking_lot::{Condvar, Mutex};
985    // use std::sync::Arc;
986    use std::thread;
987
988    enum WorkMsg {
989        Work(u8),
990        Exit,
991    }
992
993    #[derive(Debug, Eq, PartialEq)]
994    enum CacheState {
995        Ready,
996        WorkInProgress,
997    }
998
999    enum ResultMsg {
1000        Result(u8, WorkPerformed),
1001        Exited,
1002    }
1003
1004    struct WorkerState {
1005        ongoing: i16,
1006        exiting: bool,
1007    }
1008
1009    impl WorkerState {
1010        fn init() -> Self {
1011            WorkerState{ ongoing: 0, exiting: false }
1012        }
1013
1014        fn set_ongoing(&mut self, count: i16) {
1015            self.ongoing += count;
1016        }
1017
1018        fn set_exiting(&mut self, exit_state: bool) {
1019            self.exiting = exit_state;
1020        }
1021
1022        fn is_exiting(&self) -> bool {
1023            self.exiting == true
1024        }
1025
1026        fn is_nomore_work(&self)-> bool {
1027            self.ongoing == 0
1028        }
1029    }
1030
1031    #[derive(Debug, Eq, PartialEq)]
1032    enum WorkPerformed {
1033        FromCache,
1034        New,
1035    }
1036
1037    #[derive(Eq, Hash, PartialEq)]
1038    struct CacheKey(u8);
1039
1040    fn main() {
1041        let (work_sender, work_receiver) = unbounded();
1042        let (result_sender, result_receiver) = unbounded();
1043        // 添加一个新的Channel,Worker使用它来通知“并行”组件已经完成了一个工作单元
1044        let (pool_result_sender, pool_result_receiver) = unbounded();
1045        let mut worker_state = WorkerState::init();
1046
1047        // 使用线程池
1048        let pool = rayon::ThreadPoolBuilder::new()
1049            .num_threads(2)
1050            .build()
1051            .unwrap();
1052
1053        // 缓存 work ,由 池 中的 worker 共享
1054        let cache: Arc<Mutex<HashMap<CacheKey, u8>>> = Arc::new(Mutex::new(HashMap::new()));
1055
1056        // 增加缓存状态,指示对于给定的key,缓存是否已经准备好被读取。
1057        let cache_state: Arc<Mutex<HashMap<CacheKey, Arc<(Mutex<CacheState>, Condvar)>>>> =
1058            Arc::new(Mutex::new(HashMap::new()));
1059
1060        let _ = thread::spawn(move || loop {
1061            // 使用 corssbeam 提供的 select! 宏 选择一个就绪工作
1062            select! {
1063                recv(work_receiver) -> msg => {
1064                    match msg {
1065                        Ok(WorkMsg::Work(num)) => {
1066                            let result_sender = result_sender.clone();
1067                            let pool_result_sender = pool_result_sender.clone();
1068                            // 使用缓存
1069                            let cache = cache.clone();
1070                            let cache_state = cache_state.clone();
1071
1072                            // 注意,这里正在池上启动一个新的工作单元。
1073                            worker_state.set_ongoing(1);
1074
1075                            pool.spawn(move || {
1076                                let num = {
1077                                    let (cache_state_lock, cvar) = {
1078                                        //  `cache_state` 临界区开始
1079                                        let mut state_map = cache_state.lock().unwrap();
1080                                        &*state_map
1081                                            .entry(CacheKey(num.clone()))
1082                                            .or_insert_with(|| {
1083                                                Arc::new((
1084                                                    Mutex::new(CacheState::Ready),
1085                                                    Condvar::new(),
1086                                                ))
1087                                            })
1088                                            .clone()
1089                                        //  `cache_state` 临界区结束
1090                                    };
1091
1092                                    //  `state` 临界区开始
1093                                    let mut state = cache_state_lock.lock().unwrap();
1094
1095                                    // 注意:使用while循环来防止条件变量的虚假唤醒
1096                                    while let CacheState::WorkInProgress = *state {
1097                                        // 阻塞直到状态是 `CacheState::Ready`.
1098                                        //
1099                                        // 当唤醒时会自动释放锁
1100                                        let current_state = cvar
1101                                            .wait(state)
1102                                            .unwrap();
1103                                        state = current_state;
1104                                    }
1105
1106                                    // 循环外可以认为state 已经是 Ready 的了
1107                                    assert_eq!(*state, CacheState::Ready);
1108
1109                                    let (num, result) = {
1110                                        // 缓存临界区开始
1111                                        let cache = cache.lock().unwrap();
1112                                        let key = CacheKey(num);
1113                                        let result = match cache.get(&key) {
1114                                            Some(result) => Some(result.clone()),
1115                                            None => None,
1116                                        };
1117                                        (key.0, result)
1118                                        // 缓存临界区结束
1119                                    };
1120
1121                                    if let Some(result) = result {
1122                                        // 从缓存中获得一个结果,并将其发送回去,
1123                                        // 同时带有一个标志,表明是从缓存中获得了它
1124                                        let _ = result_sender.send(ResultMsg::Result(result, WorkPerformed::FromCache));
1125                                        let _ = pool_result_sender.send(());
1126
1127                                        // 不要忘记通知等待线程
1128                                        cvar.notify_one();
1129                                        return;
1130                                    } else {
1131                                        // 如果缓存里没有找到结果,那么切换状态
1132                                        *state = CacheState::WorkInProgress;
1133                                        num
1134                                    }
1135                                    // `state` 临界区结束
1136                                };
1137
1138                                // 在临界区外做更多「昂贵工作」
1139
1140                                let _ = result_sender.send(ResultMsg::Result(num.clone(), WorkPerformed::New));
1141
1142                                {
1143                                    // 缓存临界区开始
1144                                    // 插入工作结果到缓存中
1145                                    let mut cache = cache.lock().unwrap();
1146                                    let key = CacheKey(num.clone());
1147                                    cache.insert(key, num);
1148                                    // 缓存临界区结束
1149                                }
1150
1151                                let (lock, cvar) = {
1152                                    let mut state_map = cache_state.lock().unwrap();
1153                                    &*state_map
1154                                        .get_mut(&CacheKey(num))
1155                                        .expect("Entry in cache state to have been previously inserted")
1156                                        .clone()
1157                                };
1158                                // 重新进入 `state` 临界区
1159                                let mut state = lock.lock().unwrap();
1160
1161                                // 在这里,由于已经提前设置了state,并且任何其他worker都将等待状态切换回ready,可以确定该状态是“in-progress”。
1162                                assert_eq!(*state, CacheState::WorkInProgress);
1163
1164                                // 切换状态为 Ready
1165                                *state = CacheState::Ready;
1166
1167                                // 通知等待线程
1168                                cvar.notify_one();
1169
1170                                let _ = pool_result_sender.send(());
1171                            });
1172                        },
1173                        Ok(WorkMsg::Exit) => {
1174                            // N注意,这里接收请求并退出
1175                            // exiting = true;
1176                            worker_state.set_exiting(true);
1177
1178                            // 如果没有正则进行的工作则立即退出
1179                            if worker_state.is_nomore_work() {
1180                                result_sender.send(ResultMsg::Exited);
1181                                break;
1182                            }
1183                        },
1184                        _ => panic!("Error receiving a WorkMsg."),
1185                    }
1186                },
1187                recv(pool_result_receiver) -> _ => {
1188                    if worker_state.is_nomore_work() {
1189                        panic!("Received an unexpected pool result.");
1190                    }
1191
1192                    // 注意,一个工作单元已经被完成
1193                    worker_state.set_ongoing(-1);
1194
1195                    // 如果没有正在进行的工作,并且接收到了退出请求,那么就退出
1196                    if worker_state.is_nomore_work() && worker_state.is_exiting() {
1197                        result_sender.send(ResultMsg::Exited);
1198                        break;
1199                    }
1200                },
1201            }
1202        });
1203
1204        let _ = work_sender.send(WorkMsg::Work(0));
1205        // 发送两个相同的work
1206        let _ = work_sender.send(WorkMsg::Work(1));
1207        let _ = work_sender.send(WorkMsg::Work(1));
1208        let _ = work_sender.send(WorkMsg::Exit);
1209
1210        let mut counter = 0;
1211
1212        // 当work 是 1 的时候重新计数
1213        let mut work_one_counter = 0;
1214
1215        loop {
1216            match result_receiver.recv() {
1217                Ok(ResultMsg::Result(num, cached)) => {
1218                    counter += 1;
1219
1220                    if num == 1 {
1221                        work_one_counter += 1;
1222                    }
1223
1224                    // 现在我们可以断言,当收到 num 为 1 的第二个结果时,它已经来自缓存。
1225                    if num == 1 && work_one_counter == 2 {
1226                        assert_eq!(cached, WorkPerformed::FromCache);
1227                    }
1228                }
1229                Ok(ResultMsg::Exited) => {
1230                    assert_eq!(3, counter);
1231                    break;
1232                }
1233                _ => panic!("Error receiving a ResultMsg."),
1234            }
1235        }
1236    }
1237    ```
1238*/
1239pub fn understand_channel_and_condvar() {
1240    println!(" 线程间安全共享数据 ");
1241}