1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-concat.hpp
6 
7     \brief For each item from this observable subscribe to one at a time, in the order received.
8            For each item from all of the given observables deliver from the new observable that is returned.
9 
10            There are 2 variants of the operator:
11            - The source observable emits nested observables, nested observables are concatenated.
12            - The source observable and the arguments v0...vn are used to provide the observables to concatenate.
13 
14     \tparam Coordination  the type of the scheduler (optional).
15     \tparam Value0  ... (optional).
16     \tparam ValueN  types of source observables (optional).
17 
18     \param  cn  the scheduler to synchronize sources from different contexts (optional).
19     \param  v0  ... (optional).
20     \param  vn  source observables (optional).
21 
22     \return  Observable that emits the items emitted by each of the Observables emitted by the source observable, one after the other, without interleaving them.
23 
24     \sample
25     \snippet concat.cpp implicit concat sample
26     \snippet output.txt implicit concat sample
27 
28     \sample
29     \snippet concat.cpp threaded implicit concat sample
30     \snippet output.txt threaded implicit concat sample
31 
32     \sample
33     \snippet concat.cpp concat sample
34     \snippet output.txt concat sample
35 
36     \sample
37     \snippet concat.cpp threaded concat sample
38     \snippet output.txt threaded concat sample
39 */
40 
41 #if !defined(RXCPP_OPERATORS_RX_CONCAT_HPP)
42 #define RXCPP_OPERATORS_RX_CONCAT_HPP
43 
44 #include "../rx-includes.hpp"
45 
46 namespace rxcpp {
47 
48 namespace operators {
49 
50 namespace detail {
51 
52 template<class... AN>
53 struct concat_invalid_arguments {};
54 
55 template<class... AN>
56 struct concat_invalid : public rxo::operator_base<concat_invalid_arguments<AN...>> {
57     using type = observable<concat_invalid_arguments<AN...>, concat_invalid<AN...>>;
58 };
59 template<class... AN>
60 using concat_invalid_t = typename concat_invalid<AN...>::type;
61 
62 template<class T, class Observable, class Coordination>
63 struct concat
64     : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
65 {
66     typedef concat<T, Observable, Coordination> this_type;
67 
68     typedef rxu::decay_t<T> source_value_type;
69     typedef rxu::decay_t<Observable> source_type;
70     typedef rxu::decay_t<Coordination> coordination_type;
71 
72     typedef typename coordination_type::coordinator_type coordinator_type;
73 
74     typedef typename source_type::source_operator_type source_operator_type;
75     typedef source_value_type collection_type;
76     typedef typename collection_type::value_type value_type;
77 
78     struct values
79     {
valuesrxcpp::operators::detail::concat::values80         values(source_operator_type o, coordination_type sf)
81             : source_operator(std::move(o))
82             , coordination(std::move(sf))
83         {
84         }
85         source_operator_type source_operator;
86         coordination_type coordination;
87     };
88     values initial;
89 
concatrxcpp::operators::detail::concat90     concat(const source_type& o, coordination_type sf)
91         : initial(o.source_operator, std::move(sf))
92     {
93     }
94 
95     template<class Subscriber>
on_subscriberxcpp::operators::detail::concat96     void on_subscribe(Subscriber scbr) const {
97         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
98 
99         typedef Subscriber output_type;
100 
101         struct concat_state_type
102             : public std::enable_shared_from_this<concat_state_type>
103             , public values
104         {
105             concat_state_type(values i, coordinator_type coor, output_type oarg)
106                 : values(i)
107                 , source(i.source_operator)
108                 , sourceLifetime(composite_subscription::empty())
109                 , collectionLifetime(composite_subscription::empty())
110                 , coordinator(std::move(coor))
111                 , out(std::move(oarg))
112             {
113             }
114 
115             void subscribe_to(collection_type st)
116             {
117                 auto state = this->shared_from_this();
118 
119                 collectionLifetime = composite_subscription();
120 
121                 // when the out observer is unsubscribed all the
122                 // inner subscriptions are unsubscribed as well
123                 auto innercstoken = state->out.add(collectionLifetime);
124 
125                 collectionLifetime.add(make_subscription([state, innercstoken](){
126                     state->out.remove(innercstoken);
127                 }));
128 
129                 auto selectedSource = on_exception(
130                     [&](){return state->coordinator.in(std::move(st));},
131                     state->out);
132                 if (selectedSource.empty()) {
133                     return;
134                 }
135 
136                 // this subscribe does not share the out subscription
137                 // so that when it is unsubscribed the out will continue
138                 auto sinkInner = make_subscriber<value_type>(
139                     state->out,
140                     collectionLifetime,
141                 // on_next
142                     [state, st](value_type ct) {
143                         state->out.on_next(ct);
144                     },
145                 // on_error
146                     [state](rxu::error_ptr e) {
147                         state->out.on_error(e);
148                     },
149                 //on_completed
150                     [state](){
151                         if (!state->selectedCollections.empty()) {
152                             auto value = state->selectedCollections.front();
153                             state->selectedCollections.pop_front();
154                             state->collectionLifetime.unsubscribe();
155                             state->subscribe_to(value);
156                         } else if (!state->sourceLifetime.is_subscribed()) {
157                             state->out.on_completed();
158                         }
159                     }
160                 );
161                 auto selectedSinkInner = on_exception(
162                     [&](){return state->coordinator.out(sinkInner);},
163                     state->out);
164                 if (selectedSinkInner.empty()) {
165                     return;
166                 }
167                 selectedSource->subscribe(std::move(selectedSinkInner.get()));
168             }
169             observable<source_value_type, source_operator_type> source;
170             composite_subscription sourceLifetime;
171             composite_subscription collectionLifetime;
172             std::deque<collection_type> selectedCollections;
173             coordinator_type coordinator;
174             output_type out;
175         };
176 
177         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
178 
179         // take a copy of the values for each subscription
180         auto state = std::make_shared<concat_state_type>(initial, std::move(coordinator), std::move(scbr));
181 
182         state->sourceLifetime = composite_subscription();
183 
184         // when the out observer is unsubscribed all the
185         // inner subscriptions are unsubscribed as well
186         state->out.add(state->sourceLifetime);
187 
188         auto source = on_exception(
189             [&](){return state->coordinator.in(state->source);},
190             state->out);
191         if (source.empty()) {
192             return;
193         }
194 
195         // this subscribe does not share the observer subscription
196         // so that when it is unsubscribed the observer can be called
197         // until the inner subscriptions have finished
198         auto sink = make_subscriber<collection_type>(
199             state->out,
200             state->sourceLifetime,
201         // on_next
202             [state](collection_type st) {
203                 if (state->collectionLifetime.is_subscribed()) {
204                     state->selectedCollections.push_back(st);
205                 } else if (state->selectedCollections.empty()) {
206                     state->subscribe_to(st);
207                 }
208             },
209         // on_error
210             [state](rxu::error_ptr e) {
211                 state->out.on_error(e);
212             },
213         // on_completed
214             [state]() {
215                 if (!state->collectionLifetime.is_subscribed() && state->selectedCollections.empty()) {
216                     state->out.on_completed();
217                 }
218             }
219         );
220         auto selectedSink = on_exception(
221             [&](){return state->coordinator.out(sink);},
222             state->out);
223         if (selectedSink.empty()) {
224             return;
225         }
226         source->subscribe(std::move(selectedSink.get()));
227     }
228 };
229 
230 }
231 
232 /*! @copydoc rx-concat.hpp
233 */
234 template<class... AN>
concat(AN &&...an)235 auto concat(AN&&... an)
236     ->     operator_factory<concat_tag, AN...> {
237     return operator_factory<concat_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
238 }
239 
240 }
241 
242 template<>
243 struct member_overload<concat_tag>
244 {
245     template<class Observable,
246         class Enabled = rxu::enable_if_all_true_type_t<
247             is_observable<Observable>>,
248         class SourceValue = rxu::value_type_t<Observable>,
249         class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
250         class Value = rxu::value_type_t<SourceValue>,
251         class Result = observable<Value, Concat>
252     >
memberrxcpp::member_overload253     static Result member(Observable&& o) {
254         return Result(Concat(std::forward<Observable>(o), identity_current_thread()));
255     }
256 
257     template<class Observable, class Coordination,
258         class Enabled = rxu::enable_if_all_true_type_t<
259             is_observable<Observable>,
260             is_coordination<Coordination>>,
261         class SourceValue = rxu::value_type_t<Observable>,
262         class Concat = rxo::detail::concat<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
263         class Value = rxu::value_type_t<SourceValue>,
264         class Result = observable<Value, Concat>
265     >
memberrxcpp::member_overload266     static Result member(Observable&& o, Coordination&& cn) {
267         return Result(Concat(std::forward<Observable>(o), std::forward<Coordination>(cn)));
268     }
269 
270     template<class Observable, class Value0, class... ValueN,
271         class Enabled = rxu::enable_if_all_true_type_t<
272             all_observables<Observable, Value0, ValueN...>>,
273         class EmittedValue = rxu::value_type_t<Observable>,
274         class SourceValue = observable<EmittedValue>,
275         class ObservableObservable = observable<SourceValue>,
276         class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, identity_one_worker>::type,
277         class Value = rxu::value_type_t<Concat>,
278         class Result = observable<Value, Concat>
279     >
memberrxcpp::member_overload280     static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
281         return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
282     }
283 
284     template<class Observable, class Coordination, class Value0, class... ValueN,
285         class Enabled = rxu::enable_if_all_true_type_t<
286             all_observables<Observable, Value0, ValueN...>,
287             is_coordination<Coordination>>,
288         class EmittedValue = rxu::value_type_t<Observable>,
289         class SourceValue = observable<EmittedValue>,
290         class ObservableObservable = observable<SourceValue>,
291         class Concat = typename rxu::defer_type<rxo::detail::concat, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
292         class Value = rxu::value_type_t<Concat>,
293         class Result = observable<Value, Concat>
294     >
memberrxcpp::member_overload295     static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
296         return Result(Concat(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
297     }
298 
299     template<class... AN>
memberrxcpp::member_overload300     static operators::detail::concat_invalid_t<AN...> member(AN...) {
301         std::terminate();
302         return {};
303         static_assert(sizeof...(AN) == 10000, "concat takes (optional Coordination, optional Value0, optional ValueN...)");
304     }
305 };
306 
307 }
308 
309 #endif
310