• Home
  • History
  • Annotate
Name Date Size #Lines LOC

..--

README.mdD23-Nov-202313.2 KiB316267

mod.rsD23-Nov-202318.4 KiB485228

README.md

1# Parallel Iterators
2
3These are some notes on the design of the parallel iterator traits.
4This file does not describe how to **use** parallel iterators.
5
6## The challenge
7
8Parallel iterators are more complicated than sequential iterators.
9The reason is that they have to be able to split themselves up and
10operate in parallel across the two halves.
11
12The current design for parallel iterators has two distinct modes in
13which they can be used; as we will see, not all iterators support both
14modes (which is why there are two):
15
16- **Pull mode** (the `Producer` and `UnindexedProducer` traits): in this mode,
17  the iterator is asked to produce the next item using a call to `next`. This
18  is basically like a normal iterator, but with a twist: you can split the
19  iterator in half to produce disjoint items in separate threads.
20  - in the `Producer` trait, splitting is done with `split_at`, which accepts
21    an index where the split should be performed. Only indexed iterators can
22    work in this mode, as they know exactly how much data they will produce,
23    and how to locate the requested index.
24  - in the `UnindexedProducer` trait, splitting is done with `split`, which
25    simply requests that the producer divide itself *approximately* in half.
26    This is useful when the exact length and/or layout is unknown, as with
27    `String` characters, or when the length might exceed `usize`, as with
28    `Range<u64>` on 32-bit platforms.
29    - In theory, any `Producer` could act unindexed, but we don't currently
30      use that possibility.  When you know the exact length, a `split` can
31      simply be implemented as `split_at(length/2)`.
32- **Push mode** (the `Consumer` and `UnindexedConsumer` traits): in
33  this mode, the iterator instead is *given* each item in turn, which
34  is then processed. This is the opposite of a normal iterator. It's
35  more like a `for_each` call: each time a new item is produced, the
36  `consume` method is called with that item. (The traits themselves are
37  a bit more complex, as they support state that can be threaded
38  through and ultimately reduced.) Unlike producers, there are two
39  variants of consumers. The difference is how the split is performed:
40  - in the `Consumer` trait, splitting is done with `split_at`, which
41    accepts an index where the split should be performed. All
42    iterators can work in this mode. The resulting halves thus have an
43    idea about how much data they expect to consume.
44  - in the `UnindexedConsumer` trait, splitting is done with
45    `split_off_left`.  There is no index: the resulting halves must be
46    prepared to process any amount of data, and they don't know where that
47    data falls in the overall stream.
48    - Not all consumers can operate in this mode. It works for
49      `for_each` and `reduce`, for example, but it does not work for
50      `collect_into_vec`, since in that case the position of each item is
51      important for knowing where it ends up in the target collection.
52
53## How iterator execution proceeds
54
55We'll walk through this example iterator chain to start. This chain
56demonstrates more-or-less the full complexity of what can happen.
57
58```rust
59vec1.par_iter()
60    .zip(vec2.par_iter())
61    .flat_map(some_function)
62    .for_each(some_other_function)
63```
64
65To handle an iterator chain, we start by creating consumers. This
66works from the end. So in this case, the call to `for_each` is the
67final step, so it will create a `ForEachConsumer` that, given an item,
68just calls `some_other_function` with that item. (`ForEachConsumer` is
69a very simple consumer because it doesn't need to thread any state
70between items at all.)
71
72Now, the `for_each` call will pass this consumer to the base iterator,
73which is the `flat_map`. It will do this by calling the `drive_unindexed`
74method on the `ParallelIterator` trait. `drive_unindexed` basically
75says "produce items for this iterator and feed them to this consumer";
76it only works for unindexed consumers.
77
78(As an aside, it is interesting that only some consumers can work in
79unindexed mode, but all producers can *drive* an unindexed consumer.
80In contrast, only some producers can drive an *indexed* consumer, but
81all consumers can be supplied indexes. Isn't variance neat.)
82
83As it happens, `FlatMap` only works with unindexed consumers anyway.
84This is because flat-map basically has no idea how many items it will
85produce. If you ask flat-map to produce the 22nd item, it can't do it,
86at least not without some intermediate state. It doesn't know whether
87processing the first item will create 1 item, 3 items, or 100;
88therefore, to produce an arbitrary item, it would basically just have
89to start at the beginning and execute sequentially, which is not what
90we want. But for unindexed consumers, this doesn't matter, since they
91don't need to know how much data they will get.
92
93Therefore, `FlatMap` can wrap the `ForEachConsumer` with a
94`FlatMapConsumer` that feeds to it. This `FlatMapConsumer` will be
95given one item. It will then invoke `some_function` to get a parallel
96iterator out. It will then ask this new parallel iterator to drive the
97`ForEachConsumer`. The `drive_unindexed` method on `flat_map` can then
98pass the `FlatMapConsumer` up the chain to the previous item, which is
99`zip`. At this point, something interesting happens.
100
101## Switching from push to pull mode
102
103If you think about `zip`, it can't really be implemented as a
104consumer, at least not without an intermediate thread and some
105channels or something (or maybe coroutines). The problem is that it
106has to walk two iterators *in lockstep*. Basically, it can't call two
107`drive` methods simultaneously, it can only call one at a time. So at
108this point, the `zip` iterator needs to switch from *push mode* into
109*pull mode*.
110
111You'll note that `Zip` is only usable if its inputs implement
112`IndexedParallelIterator`, meaning that they can produce data starting
113at random points in the stream. This need to switch to push mode is
114exactly why. If we want to split a zip iterator at position 22, we
115need to be able to start zipping items from index 22 right away,
116without having to start from index 0.
117
118Anyway, so at this point, the `drive_unindexed` method for `Zip` stops
119creating consumers. Instead, it creates a *producer*, a `ZipProducer`,
120to be exact, and calls the `bridge` function in the `internals`
121module. Creating a `ZipProducer` will in turn create producers for
122the two iterators being zipped. This is possible because they both
123implement `IndexedParallelIterator`.
124
125The `bridge` function will then connect the consumer, which is
126handling the `flat_map` and `for_each`, with the producer, which is
127handling the `zip` and its preecessors. It will split down until the
128chunks seem reasonably small, then pull items from the producer and
129feed them to the consumer.
130
131## The base case
132
133The other time that `bridge` gets used is when we bottom out in an
134indexed producer, such as a slice or range.  There is also a
135`bridge_unindexed` equivalent for - you guessed it - unindexed producers,
136such as string characters.
137
138<a name="producer-callback">
139
140## What on earth is `ProducerCallback`?
141
142We saw that when you call a parallel action method like
143`par_iter.reduce()`, that will create a "reducing" consumer and then
144invoke `par_iter.drive_unindexed()` (or `par_iter.drive()`) as
145appropriate. This may create yet more consumers as we proceed up the
146parallel iterator chain. But at some point we're going to get to the
147start of the chain, or to a parallel iterator (like `zip()`) that has
148to coordinate multiple inputs. At that point, we need to start
149converting parallel iterators into producers.
150
151The way we do this is by invoking the method `with_producer()`, defined on
152`IndexedParallelIterator`. This is a callback scheme. In an ideal world,
153it would work like this:
154
155```rust
156base_iter.with_producer(|base_producer| {
157    // here, `base_producer` is the producer for `base_iter`
158});
159```
160
161In that case, we could implement a combinator like `map()` by getting
162the producer for the base iterator, wrapping it to make our own
163`MapProducer`, and then passing that to the callback. Something like
164this:
165
166```rust
167struct MapProducer<'f, P, F: 'f> {
168    base: P,
169    map_op: &'f F,
170}
171
172impl<I, F> IndexedParallelIterator for Map<I, F>
173    where I: IndexedParallelIterator,
174          F: MapOp<I::Item>,
175{
176    fn with_producer<CB>(self, callback: CB) -> CB::Output {
177        let map_op = &self.map_op;
178        self.base_iter.with_producer(|base_producer| {
179            // Here `producer` is the producer for `self.base_iter`.
180            // Wrap that to make a `MapProducer`
181            let map_producer = MapProducer {
182                base: base_producer,
183                map_op: map_op
184            };
185
186            // invoke the callback with the wrapped version
187            callback(map_producer)
188        });
189    }
190});
191```
192
193This example demonstrates some of the power of the callback scheme.
194It winds up being a very flexible setup. For one thing, it means we
195can take ownership of `par_iter`; we can then in turn give ownership
196away of its bits and pieces into the producer (this is very useful if
197the iterator owns an `&mut` slice, for example), or create shared
198references and put *those* in the producer. In the case of map, for
199example, the parallel iterator owns the `map_op`, and we borrow
200references to it which we then put into the `MapProducer` (this means
201the `MapProducer` can easily split itself and share those references).
202The `with_producer` method can also create resources that are needed
203during the parallel execution, since the producer does not have to be
204returned.
205
206Unfortunately there is a catch. We can't actually use closures the way
207I showed you. To see why, think about the type that `map_producer`
208would have to have. If we were going to write the `with_producer`
209method using a closure, it would have to look something like this:
210
211```rust
212pub trait IndexedParallelIterator: ParallelIterator {
213    type Producer;
214    fn with_producer<CB, R>(self, callback: CB) -> R
215        where CB: FnOnce(Self::Producer) -> R;
216    ...
217}
218```
219
220Note that we had to add this associated type `Producer` so that
221we could specify the argument of the callback to be `Self::Producer`.
222Now, imagine trying to write that `MapProducer` impl using this style:
223
224```rust
225impl<I, F> IndexedParallelIterator for Map<I, F>
226    where I: IndexedParallelIterator,
227          F: MapOp<I::Item>,
228{
229    type MapProducer = MapProducer<'f, P::Producer, F>;
230    //                             ^^ wait, what is this `'f`?
231
232    fn with_producer<CB, R>(self, callback: CB) -> R
233        where CB: FnOnce(Self::Producer) -> R
234    {
235        let map_op = &self.map_op;
236        //  ^^^^^^ `'f` is (conceptually) the lifetime of this reference,
237        //         so it will be different for each call to `with_producer`!
238    }
239}
240```
241
242This may look familiar to you: it's the same problem that we have
243trying to define an `Iterable` trait. Basically, the producer type
244needs to include a lifetime (here, `'f`) that refers to the body of
245`with_producer` and hence is not in scope at the impl level.
246
247If we had [associated type constructors][1598], we could solve this
248problem that way. But there is another solution. We can use a
249dedicated callback trait like `ProducerCallback`, instead of `FnOnce`:
250
251[1598]: https://github.com/rust-lang/rfcs/pull/1598
252
253```rust
254pub trait ProducerCallback<T> {
255    type Output;
256    fn callback<P>(self, producer: P) -> Self::Output
257        where P: Producer<Item=T>;
258}
259```
260
261Using this trait, the signature of `with_producer()` looks like this:
262
263```rust
264fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output;
265```
266
267Notice that this signature **never has to name the producer type** --
268there is no associated type `Producer` anymore. This is because the
269`callback()` method is generically over **all** producers `P`.
270
271The problem is that now the `||` sugar doesn't work anymore. So we
272have to manually create the callback struct, which is a mite tedious.
273So our `MapProducer` code looks like this:
274
275```rust
276impl<I, F> IndexedParallelIterator for Map<I, F>
277    where I: IndexedParallelIterator,
278          F: MapOp<I::Item>,
279{
280    fn with_producer<CB>(self, callback: CB) -> CB::Output
281        where CB: ProducerCallback<Self::Item>
282    {
283        return self.base.with_producer(Callback { callback: callback, map_op: self.map_op });
284        //                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
285        //                             Manual version of the closure sugar: create an instance
286        //                             of a struct that implements `ProducerCallback`.
287
288        // The struct declaration. Each field is something that need to capture from the
289        // creating scope.
290        struct Callback<CB, F> {
291            callback: CB,
292            map_op: F,
293        }
294
295        // Implement the `ProducerCallback` trait. This is pure boilerplate.
296        impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
297            where F: MapOp<T>,
298                  CB: ProducerCallback<F::Output>
299        {
300            type Output = CB::Output;
301
302            fn callback<P>(self, base: P) -> CB::Output
303                where P: Producer<Item=T>
304            {
305                // The body of the closure is here:
306                let producer = MapProducer { base: base,
307                                             map_op: &self.map_op };
308                self.callback.callback(producer)
309            }
310        }
311    }
312}
313```
314
315OK, a bit tedious, but it works!
316