1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-concat_map.hpp
6 
7     \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable.
8            For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned.
9 
10     \tparam CollectionSelector  the type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type)
11     \tparam ResultSelector      the type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)
12     \tparam Coordination        the type of the scheduler (optional).
13 
14     \param  s   a function that returns an observable for each item emitted by the source observable.
15     \param  rs  a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional).
16     \param  cn  the scheduler to synchronize sources from different contexts. (optional).
17 
18     \return  Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable.
19 
20     Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables.
21 
22     \sample
23     \snippet concat_map.cpp concat_map sample
24     \snippet output.txt concat_map sample
25 
26     \sample
27     \snippet concat_map.cpp threaded concat_map sample
28     \snippet output.txt threaded concat_map sample
29 */
30 
31 #if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP)
32 #define RXCPP_OPERATORS_RX_CONCATMAP_HPP
33 
34 #include "../rx-includes.hpp"
35 
36 namespace rxcpp {
37 
38 namespace operators {
39 
40 namespace detail {
41 
42 template<class... AN>
43 struct concat_map_invalid_arguments {};
44 
45 template<class... AN>
46 struct concat_map_invalid : public rxo::operator_base<concat_map_invalid_arguments<AN...>> {
47     using type = observable<concat_map_invalid_arguments<AN...>, concat_map_invalid<AN...>>;
48 };
49 template<class... AN>
50 using concat_map_invalid_t = typename concat_map_invalid<AN...>::type;
51 
52 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
53 struct concat_traits {
54     typedef rxu::decay_t<Observable> source_type;
55     typedef rxu::decay_t<CollectionSelector> collection_selector_type;
56     typedef rxu::decay_t<ResultSelector> result_selector_type;
57     typedef rxu::decay_t<Coordination> coordination_type;
58 
59     typedef typename source_type::value_type source_value_type;
60 
61     struct tag_not_valid {};
62     template<class CV, class CCS>
63     static auto collection_check(int) -> decltype((*(CCS*)nullptr)(*(CV*)nullptr));
64     template<class CV, class CCS>
65     static tag_not_valid collection_check(...);
66 
67     static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "concat_map CollectionSelector must be a function with the signature observable(concat_map::source_value_type)");
68 
69     typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
70 
71 //#if _MSC_VER >= 1900
72     static_assert(is_observable<collection_type>::value, "concat_map CollectionSelector must return an observable");
73 //#endif
74 
75     typedef typename collection_type::value_type collection_value_type;
76 
77     template<class CV, class CCV, class CRS>
78     static auto result_check(int) -> decltype((*(CRS*)nullptr)(*(CV*)nullptr, *(CCV*)nullptr));
79     template<class CV, class CCV, class CRS>
80     static tag_not_valid result_check(...);
81 
82     static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");
83 
84     typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
85 };
86 
87 template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
88 struct concat_map
89     : public operator_base<rxu::value_type_t<concat_traits<Observable, CollectionSelector, ResultSelector, Coordination>>>
90 {
91     typedef concat_map<Observable, CollectionSelector, ResultSelector, Coordination> this_type;
92     typedef concat_traits<Observable, CollectionSelector, ResultSelector, Coordination> traits;
93 
94     typedef typename traits::source_type source_type;
95     typedef typename traits::collection_selector_type collection_selector_type;
96     typedef typename traits::result_selector_type result_selector_type;
97 
98     typedef typename traits::source_value_type source_value_type;
99     typedef typename traits::collection_type collection_type;
100     typedef typename traits::collection_value_type collection_value_type;
101 
102     typedef typename traits::coordination_type coordination_type;
103     typedef typename coordination_type::coordinator_type coordinator_type;
104 
105     struct values
106     {
valuesrxcpp::operators::detail::concat_map::values107         values(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
108             : source(std::move(o))
109             , selectCollection(std::move(s))
110             , selectResult(std::move(rs))
111             , coordination(std::move(sf))
112         {
113         }
114         source_type source;
115         collection_selector_type selectCollection;
116         result_selector_type selectResult;
117         coordination_type coordination;
118     private:
119         values& operator=(const values&) RXCPP_DELETE;
120     };
121     values initial;
122 
concat_maprxcpp::operators::detail::concat_map123     concat_map(source_type o, collection_selector_type s, result_selector_type rs, coordination_type sf)
124         : initial(std::move(o), std::move(s), std::move(rs), std::move(sf))
125     {
126     }
127 
128     template<class Subscriber>
on_subscriberxcpp::operators::detail::concat_map129     void on_subscribe(Subscriber scbr) const {
130         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
131 
132         typedef Subscriber output_type;
133 
134         struct concat_map_state_type
135             : public std::enable_shared_from_this<concat_map_state_type>
136             , public values
137         {
138             concat_map_state_type(values i, coordinator_type coor, output_type oarg)
139                 : values(std::move(i))
140                 , sourceLifetime(composite_subscription::empty())
141                 , collectionLifetime(composite_subscription::empty())
142                 , coordinator(std::move(coor))
143                 , out(std::move(oarg))
144             {
145             }
146 
147             void subscribe_to(source_value_type st)
148             {
149                 auto state = this->shared_from_this();
150 
151                 auto selectedCollection = on_exception(
152                     [&](){return state->selectCollection(st);},
153                     state->out);
154                 if (selectedCollection.empty()) {
155                     return;
156                 }
157 
158                 collectionLifetime = composite_subscription();
159 
160                 // when the out observer is unsubscribed all the
161                 // inner subscriptions are unsubscribed as well
162                 auto innercstoken = state->out.add(collectionLifetime);
163 
164                 collectionLifetime.add(make_subscription([state, innercstoken](){
165                     state->out.remove(innercstoken);
166                 }));
167 
168                 auto selectedSource = on_exception(
169                     [&](){return state->coordinator.in(selectedCollection.get());},
170                     state->out);
171                 if (selectedSource.empty()) {
172                     return;
173                 }
174 
175                 // this subscribe does not share the source subscription
176                 // so that when it is unsubscribed the source will continue
177                 auto sinkInner = make_subscriber<collection_value_type>(
178                     state->out,
179                     collectionLifetime,
180                 // on_next
181                     [state, st](collection_value_type ct) {
182                         auto selectedResult = state->selectResult(st, std::move(ct));
183                         state->out.on_next(std::move(selectedResult));
184                     },
185                 // on_error
186                     [state](rxu::error_ptr e) {
187                         state->out.on_error(e);
188                     },
189                 //on_completed
190                     [state](){
191                         if (!state->selectedCollections.empty()) {
192                             auto value = state->selectedCollections.front();
193                             state->selectedCollections.pop_front();
194                             state->collectionLifetime.unsubscribe();
195                             state->subscribe_to(value);
196                         } else if (!state->sourceLifetime.is_subscribed()) {
197                             state->out.on_completed();
198                         }
199                     }
200                 );
201                 auto selectedSinkInner = on_exception(
202                     [&](){return state->coordinator.out(sinkInner);},
203                     state->out);
204                 if (selectedSinkInner.empty()) {
205                     return;
206                 }
207                 selectedSource->subscribe(std::move(selectedSinkInner.get()));
208             }
209             composite_subscription sourceLifetime;
210             composite_subscription collectionLifetime;
211             std::deque<source_value_type> selectedCollections;
212             coordinator_type coordinator;
213             output_type out;
214         };
215 
216         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
217 
218         // take a copy of the values for each subscription
219         auto state = std::make_shared<concat_map_state_type>(initial, std::move(coordinator), std::move(scbr));
220 
221         state->sourceLifetime = composite_subscription();
222 
223         // when the out observer is unsubscribed all the
224         // inner subscriptions are unsubscribed as well
225         state->out.add(state->sourceLifetime);
226 
227         auto source = on_exception(
228             [&](){return state->coordinator.in(state->source);},
229             state->out);
230         if (source.empty()) {
231             return;
232         }
233 
234         // this subscribe does not share the observer subscription
235         // so that when it is unsubscribed the observer can be called
236         // until the inner subscriptions have finished
237         auto sink = make_subscriber<source_value_type>(
238             state->out,
239             state->sourceLifetime,
240         // on_next
241             [state](source_value_type st) {
242                 if (state->collectionLifetime.is_subscribed()) {
243                     state->selectedCollections.push_back(st);
244                 } else if (state->selectedCollections.empty()) {
245                     state->subscribe_to(st);
246                 }
247             },
248         // on_error
249             [state](rxu::error_ptr e) {
250                 state->out.on_error(e);
251             },
252         // on_completed
253             [state]() {
254                 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
255                     state->out.on_completed();
256                 }
257             }
258         );
259         auto selectedSink = on_exception(
260             [&](){return state->coordinator.out(sink);},
261             state->out);
262         if (selectedSink.empty()) {
263             return;
264         }
265         source->subscribe(std::move(selectedSink.get()));
266 
267     }
268 private:
269     concat_map& operator=(const concat_map&) RXCPP_DELETE;
270 };
271 
272 }
273 
274 /*! @copydoc rx-concat_map.hpp
275 */
276 template<class... AN>
concat_map(AN &&...an)277 auto concat_map(AN&&... an)
278 ->     operator_factory<concat_map_tag, AN...> {
279     return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
280 }
281 
282 /*! @copydoc rx-concat_map.hpp
283 */
284 template<class... AN>
concat_transform(AN &&...an)285 auto concat_transform(AN&&... an)
286 ->     operator_factory<concat_map_tag, AN...> {
287     return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
288 }
289 
290 }
291 
292 template<>
293 struct member_overload<concat_map_tag>
294 {
295     template<class Observable, class CollectionSelector,
296         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
297         class SourceValue = rxu::value_type_t<Observable>,
298         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
299         class ResultSelectorType = rxu::detail::take_at<1>,
300         class Enabled = rxu::enable_if_all_true_type_t<
301             all_observables<Observable, CollectionType>>,
302         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>,
303         class CollectionValueType = rxu::value_type_t<CollectionType>,
304         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
305         class Result = observable<Value, ConcatMap>
306     >
memberrxcpp::member_overload307     static Result member(Observable&& o, CollectionSelector&& s) {
308         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread()));
309     }
310 
311     template<class Observable, class CollectionSelector, class Coordination,
312         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
313         class SourceValue = rxu::value_type_t<Observable>,
314         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
315         class ResultSelectorType = rxu::detail::take_at<1>,
316         class Enabled = rxu::enable_if_all_true_type_t<
317             all_observables<Observable, CollectionType>,
318             is_coordination<Coordination>>,
319         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>,
320         class CollectionValueType = rxu::value_type_t<CollectionType>,
321         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
322         class Result = observable<Value, ConcatMap>
323     >
memberrxcpp::member_overload324     static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) {
325         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn)));
326     }
327 
328     template<class Observable, class CollectionSelector, class ResultSelector,
329         class IsCoordination = is_coordination<ResultSelector>,
330         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
331         class SourceValue = rxu::value_type_t<Observable>,
332         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
333         class Enabled = rxu::enable_if_all_true_type_t<
334             all_observables<Observable, CollectionType>,
335             rxu::negation<IsCoordination>>,
336         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>,
337         class CollectionValueType = rxu::value_type_t<CollectionType>,
338         class ResultSelectorType = rxu::decay_t<ResultSelector>,
339         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
340         class Result = observable<Value, ConcatMap>
341     >
memberrxcpp::member_overload342     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) {
343         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread()));
344     }
345 
346     template<class Observable, class CollectionSelector, class ResultSelector, class Coordination,
347         class CollectionSelectorType = rxu::decay_t<CollectionSelector>,
348         class SourceValue = rxu::value_type_t<Observable>,
349         class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>,
350         class Enabled = rxu::enable_if_all_true_type_t<
351             all_observables<Observable, CollectionType>,
352             is_coordination<Coordination>>,
353         class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>,
354         class CollectionValueType = rxu::value_type_t<CollectionType>,
355         class ResultSelectorType = rxu::decay_t<ResultSelector>,
356         class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>,
357         class Result = observable<Value, ConcatMap>
358     >
memberrxcpp::member_overload359     static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) {
360         return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn)));
361     }
362 
363     template<class... AN>
memberrxcpp::member_overload364     static operators::detail::concat_map_invalid_t<AN...> member(AN...) {
365         std::terminate();
366         return {};
367         static_assert(sizeof...(AN) == 10000, "concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)");
368     }
369 };
370 
371 }
372 
373 #endif
374