1# Parallel Iterators 2 3These are some notes on the design of the parallel iterator traits. 4This file does not describe how to **use** parallel iterators. 5 6## The challenge 7 8Parallel iterators are more complicated than sequential iterators. 9The reason is that they have to be able to split themselves up and 10operate in parallel across the two halves. 11 12The current design for parallel iterators has two distinct modes in 13which they can be used; as we will see, not all iterators support both 14modes (which is why there are two): 15 16- **Pull mode** (the `Producer` and `UnindexedProducer` traits): in this mode, 17 the iterator is asked to produce the next item using a call to `next`. This 18 is basically like a normal iterator, but with a twist: you can split the 19 iterator in half to produce disjoint items in separate threads. 20 - in the `Producer` trait, splitting is done with `split_at`, which accepts 21 an index where the split should be performed. Only indexed iterators can 22 work in this mode, as they know exactly how much data they will produce, 23 and how to locate the requested index. 24 - in the `UnindexedProducer` trait, splitting is done with `split`, which 25 simply requests that the producer divide itself *approximately* in half. 26 This is useful when the exact length and/or layout is unknown, as with 27 `String` characters, or when the length might exceed `usize`, as with 28 `Range<u64>` on 32-bit platforms. 29 - In theory, any `Producer` could act unindexed, but we don't currently 30 use that possibility. When you know the exact length, a `split` can 31 simply be implemented as `split_at(length/2)`. 32- **Push mode** (the `Consumer` and `UnindexedConsumer` traits): in 33 this mode, the iterator instead is *given* each item in turn, which 34 is then processed. This is the opposite of a normal iterator. It's 35 more like a `for_each` call: each time a new item is produced, the 36 `consume` method is called with that item. (The traits themselves are 37 a bit more complex, as they support state that can be threaded 38 through and ultimately reduced.) Unlike producers, there are two 39 variants of consumers. The difference is how the split is performed: 40 - in the `Consumer` trait, splitting is done with `split_at`, which 41 accepts an index where the split should be performed. All 42 iterators can work in this mode. The resulting halves thus have an 43 idea about how much data they expect to consume. 44 - in the `UnindexedConsumer` trait, splitting is done with 45 `split_off_left`. There is no index: the resulting halves must be 46 prepared to process any amount of data, and they don't know where that 47 data falls in the overall stream. 48 - Not all consumers can operate in this mode. It works for 49 `for_each` and `reduce`, for example, but it does not work for 50 `collect_into_vec`, since in that case the position of each item is 51 important for knowing where it ends up in the target collection. 52 53## How iterator execution proceeds 54 55We'll walk through this example iterator chain to start. This chain 56demonstrates more-or-less the full complexity of what can happen. 57 58```rust 59vec1.par_iter() 60 .zip(vec2.par_iter()) 61 .flat_map(some_function) 62 .for_each(some_other_function) 63``` 64 65To handle an iterator chain, we start by creating consumers. This 66works from the end. So in this case, the call to `for_each` is the 67final step, so it will create a `ForEachConsumer` that, given an item, 68just calls `some_other_function` with that item. (`ForEachConsumer` is 69a very simple consumer because it doesn't need to thread any state 70between items at all.) 71 72Now, the `for_each` call will pass this consumer to the base iterator, 73which is the `flat_map`. It will do this by calling the `drive_unindexed` 74method on the `ParallelIterator` trait. `drive_unindexed` basically 75says "produce items for this iterator and feed them to this consumer"; 76it only works for unindexed consumers. 77 78(As an aside, it is interesting that only some consumers can work in 79unindexed mode, but all producers can *drive* an unindexed consumer. 80In contrast, only some producers can drive an *indexed* consumer, but 81all consumers can be supplied indexes. Isn't variance neat.) 82 83As it happens, `FlatMap` only works with unindexed consumers anyway. 84This is because flat-map basically has no idea how many items it will 85produce. If you ask flat-map to produce the 22nd item, it can't do it, 86at least not without some intermediate state. It doesn't know whether 87processing the first item will create 1 item, 3 items, or 100; 88therefore, to produce an arbitrary item, it would basically just have 89to start at the beginning and execute sequentially, which is not what 90we want. But for unindexed consumers, this doesn't matter, since they 91don't need to know how much data they will get. 92 93Therefore, `FlatMap` can wrap the `ForEachConsumer` with a 94`FlatMapConsumer` that feeds to it. This `FlatMapConsumer` will be 95given one item. It will then invoke `some_function` to get a parallel 96iterator out. It will then ask this new parallel iterator to drive the 97`ForEachConsumer`. The `drive_unindexed` method on `flat_map` can then 98pass the `FlatMapConsumer` up the chain to the previous item, which is 99`zip`. At this point, something interesting happens. 100 101## Switching from push to pull mode 102 103If you think about `zip`, it can't really be implemented as a 104consumer, at least not without an intermediate thread and some 105channels or something (or maybe coroutines). The problem is that it 106has to walk two iterators *in lockstep*. Basically, it can't call two 107`drive` methods simultaneously, it can only call one at a time. So at 108this point, the `zip` iterator needs to switch from *push mode* into 109*pull mode*. 110 111You'll note that `Zip` is only usable if its inputs implement 112`IndexedParallelIterator`, meaning that they can produce data starting 113at random points in the stream. This need to switch to push mode is 114exactly why. If we want to split a zip iterator at position 22, we 115need to be able to start zipping items from index 22 right away, 116without having to start from index 0. 117 118Anyway, so at this point, the `drive_unindexed` method for `Zip` stops 119creating consumers. Instead, it creates a *producer*, a `ZipProducer`, 120to be exact, and calls the `bridge` function in the `internals` 121module. Creating a `ZipProducer` will in turn create producers for 122the two iterators being zipped. This is possible because they both 123implement `IndexedParallelIterator`. 124 125The `bridge` function will then connect the consumer, which is 126handling the `flat_map` and `for_each`, with the producer, which is 127handling the `zip` and its preecessors. It will split down until the 128chunks seem reasonably small, then pull items from the producer and 129feed them to the consumer. 130 131## The base case 132 133The other time that `bridge` gets used is when we bottom out in an 134indexed producer, such as a slice or range. There is also a 135`bridge_unindexed` equivalent for - you guessed it - unindexed producers, 136such as string characters. 137 138<a name="producer-callback"> 139 140## What on earth is `ProducerCallback`? 141 142We saw that when you call a parallel action method like 143`par_iter.reduce()`, that will create a "reducing" consumer and then 144invoke `par_iter.drive_unindexed()` (or `par_iter.drive()`) as 145appropriate. This may create yet more consumers as we proceed up the 146parallel iterator chain. But at some point we're going to get to the 147start of the chain, or to a parallel iterator (like `zip()`) that has 148to coordinate multiple inputs. At that point, we need to start 149converting parallel iterators into producers. 150 151The way we do this is by invoking the method `with_producer()`, defined on 152`IndexedParallelIterator`. This is a callback scheme. In an ideal world, 153it would work like this: 154 155```rust 156base_iter.with_producer(|base_producer| { 157 // here, `base_producer` is the producer for `base_iter` 158}); 159``` 160 161In that case, we could implement a combinator like `map()` by getting 162the producer for the base iterator, wrapping it to make our own 163`MapProducer`, and then passing that to the callback. Something like 164this: 165 166```rust 167struct MapProducer<'f, P, F: 'f> { 168 base: P, 169 map_op: &'f F, 170} 171 172impl<I, F> IndexedParallelIterator for Map<I, F> 173 where I: IndexedParallelIterator, 174 F: MapOp<I::Item>, 175{ 176 fn with_producer<CB>(self, callback: CB) -> CB::Output { 177 let map_op = &self.map_op; 178 self.base_iter.with_producer(|base_producer| { 179 // Here `producer` is the producer for `self.base_iter`. 180 // Wrap that to make a `MapProducer` 181 let map_producer = MapProducer { 182 base: base_producer, 183 map_op: map_op 184 }; 185 186 // invoke the callback with the wrapped version 187 callback(map_producer) 188 }); 189 } 190}); 191``` 192 193This example demonstrates some of the power of the callback scheme. 194It winds up being a very flexible setup. For one thing, it means we 195can take ownership of `par_iter`; we can then in turn give ownership 196away of its bits and pieces into the producer (this is very useful if 197the iterator owns an `&mut` slice, for example), or create shared 198references and put *those* in the producer. In the case of map, for 199example, the parallel iterator owns the `map_op`, and we borrow 200references to it which we then put into the `MapProducer` (this means 201the `MapProducer` can easily split itself and share those references). 202The `with_producer` method can also create resources that are needed 203during the parallel execution, since the producer does not have to be 204returned. 205 206Unfortunately there is a catch. We can't actually use closures the way 207I showed you. To see why, think about the type that `map_producer` 208would have to have. If we were going to write the `with_producer` 209method using a closure, it would have to look something like this: 210 211```rust 212pub trait IndexedParallelIterator: ParallelIterator { 213 type Producer; 214 fn with_producer<CB, R>(self, callback: CB) -> R 215 where CB: FnOnce(Self::Producer) -> R; 216 ... 217} 218``` 219 220Note that we had to add this associated type `Producer` so that 221we could specify the argument of the callback to be `Self::Producer`. 222Now, imagine trying to write that `MapProducer` impl using this style: 223 224```rust 225impl<I, F> IndexedParallelIterator for Map<I, F> 226 where I: IndexedParallelIterator, 227 F: MapOp<I::Item>, 228{ 229 type MapProducer = MapProducer<'f, P::Producer, F>; 230 // ^^ wait, what is this `'f`? 231 232 fn with_producer<CB, R>(self, callback: CB) -> R 233 where CB: FnOnce(Self::Producer) -> R 234 { 235 let map_op = &self.map_op; 236 // ^^^^^^ `'f` is (conceptually) the lifetime of this reference, 237 // so it will be different for each call to `with_producer`! 238 } 239} 240``` 241 242This may look familiar to you: it's the same problem that we have 243trying to define an `Iterable` trait. Basically, the producer type 244needs to include a lifetime (here, `'f`) that refers to the body of 245`with_producer` and hence is not in scope at the impl level. 246 247If we had [associated type constructors][1598], we could solve this 248problem that way. But there is another solution. We can use a 249dedicated callback trait like `ProducerCallback`, instead of `FnOnce`: 250 251[1598]: https://github.com/rust-lang/rfcs/pull/1598 252 253```rust 254pub trait ProducerCallback<T> { 255 type Output; 256 fn callback<P>(self, producer: P) -> Self::Output 257 where P: Producer<Item=T>; 258} 259``` 260 261Using this trait, the signature of `with_producer()` looks like this: 262 263```rust 264fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output; 265``` 266 267Notice that this signature **never has to name the producer type** -- 268there is no associated type `Producer` anymore. This is because the 269`callback()` method is generically over **all** producers `P`. 270 271The problem is that now the `||` sugar doesn't work anymore. So we 272have to manually create the callback struct, which is a mite tedious. 273So our `MapProducer` code looks like this: 274 275```rust 276impl<I, F> IndexedParallelIterator for Map<I, F> 277 where I: IndexedParallelIterator, 278 F: MapOp<I::Item>, 279{ 280 fn with_producer<CB>(self, callback: CB) -> CB::Output 281 where CB: ProducerCallback<Self::Item> 282 { 283 return self.base.with_producer(Callback { callback: callback, map_op: self.map_op }); 284 // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 285 // Manual version of the closure sugar: create an instance 286 // of a struct that implements `ProducerCallback`. 287 288 // The struct declaration. Each field is something that need to capture from the 289 // creating scope. 290 struct Callback<CB, F> { 291 callback: CB, 292 map_op: F, 293 } 294 295 // Implement the `ProducerCallback` trait. This is pure boilerplate. 296 impl<T, F, CB> ProducerCallback<T> for Callback<CB, F> 297 where F: MapOp<T>, 298 CB: ProducerCallback<F::Output> 299 { 300 type Output = CB::Output; 301 302 fn callback<P>(self, base: P) -> CB::Output 303 where P: Producer<Item=T> 304 { 305 // The body of the closure is here: 306 let producer = MapProducer { base: base, 307 map_op: &self.map_op }; 308 self.callback.callback(producer) 309 } 310 } 311 } 312} 313``` 314 315OK, a bit tedious, but it works! 316