1 use super::plumbing::*;
2 use super::ParallelIterator;
3
4 use super::private::Try;
5 use std::sync::atomic::{AtomicBool, Ordering};
6
try_reduce_with<PI, R, T>(pi: PI, reduce_op: R) -> Option<T> where PI: ParallelIterator<Item = T>, R: Fn(T::Ok, T::Ok) -> T + Sync, T: Try + Send,7 pub(super) fn try_reduce_with<PI, R, T>(pi: PI, reduce_op: R) -> Option<T>
8 where
9 PI: ParallelIterator<Item = T>,
10 R: Fn(T::Ok, T::Ok) -> T + Sync,
11 T: Try + Send,
12 {
13 let full = AtomicBool::new(false);
14 let consumer = TryReduceWithConsumer {
15 reduce_op: &reduce_op,
16 full: &full,
17 };
18 pi.drive_unindexed(consumer)
19 }
20
21 struct TryReduceWithConsumer<'r, R> {
22 reduce_op: &'r R,
23 full: &'r AtomicBool,
24 }
25
26 impl<'r, R> Copy for TryReduceWithConsumer<'r, R> {}
27
28 impl<'r, R> Clone for TryReduceWithConsumer<'r, R> {
clone(&self) -> Self29 fn clone(&self) -> Self {
30 *self
31 }
32 }
33
34 impl<'r, R, T> Consumer<T> for TryReduceWithConsumer<'r, R>
35 where
36 R: Fn(T::Ok, T::Ok) -> T + Sync,
37 T: Try + Send,
38 {
39 type Folder = TryReduceWithFolder<'r, R, T>;
40 type Reducer = Self;
41 type Result = Option<T>;
42
split_at(self, _index: usize) -> (Self, Self, Self)43 fn split_at(self, _index: usize) -> (Self, Self, Self) {
44 (self, self, self)
45 }
46
into_folder(self) -> Self::Folder47 fn into_folder(self) -> Self::Folder {
48 TryReduceWithFolder {
49 reduce_op: self.reduce_op,
50 opt_result: None,
51 full: self.full,
52 }
53 }
54
full(&self) -> bool55 fn full(&self) -> bool {
56 self.full.load(Ordering::Relaxed)
57 }
58 }
59
60 impl<'r, R, T> UnindexedConsumer<T> for TryReduceWithConsumer<'r, R>
61 where
62 R: Fn(T::Ok, T::Ok) -> T + Sync,
63 T: Try + Send,
64 {
split_off_left(&self) -> Self65 fn split_off_left(&self) -> Self {
66 *self
67 }
68
to_reducer(&self) -> Self::Reducer69 fn to_reducer(&self) -> Self::Reducer {
70 *self
71 }
72 }
73
74 impl<'r, R, T> Reducer<Option<T>> for TryReduceWithConsumer<'r, R>
75 where
76 R: Fn(T::Ok, T::Ok) -> T + Sync,
77 T: Try,
78 {
reduce(self, left: Option<T>, right: Option<T>) -> Option<T>79 fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
80 let reduce_op = self.reduce_op;
81 match (left, right) {
82 (None, x) | (x, None) => x,
83 (Some(a), Some(b)) => match (a.into_result(), b.into_result()) {
84 (Ok(a), Ok(b)) => Some(reduce_op(a, b)),
85 (Err(e), _) | (_, Err(e)) => Some(T::from_error(e)),
86 },
87 }
88 }
89 }
90
91 struct TryReduceWithFolder<'r, R, T: Try> {
92 reduce_op: &'r R,
93 opt_result: Option<Result<T::Ok, T::Error>>,
94 full: &'r AtomicBool,
95 }
96
97 impl<'r, R, T> Folder<T> for TryReduceWithFolder<'r, R, T>
98 where
99 R: Fn(T::Ok, T::Ok) -> T,
100 T: Try,
101 {
102 type Result = Option<T>;
103
consume(self, item: T) -> Self104 fn consume(self, item: T) -> Self {
105 let reduce_op = self.reduce_op;
106 let result = match self.opt_result {
107 None => item.into_result(),
108 Some(Ok(a)) => match item.into_result() {
109 Ok(b) => reduce_op(a, b).into_result(),
110 Err(e) => Err(e),
111 },
112 Some(Err(e)) => Err(e),
113 };
114 if result.is_err() {
115 self.full.store(true, Ordering::Relaxed)
116 }
117 TryReduceWithFolder {
118 opt_result: Some(result),
119 ..self
120 }
121 }
122
complete(self) -> Option<T>123 fn complete(self) -> Option<T> {
124 let result = self.opt_result?;
125 Some(match result {
126 Ok(ok) => T::from_ok(ok),
127 Err(error) => T::from_error(error),
128 })
129 }
130
full(&self) -> bool131 fn full(&self) -> bool {
132 self.full.load(Ordering::Relaxed)
133 }
134 }
135