1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-merge.hpp
6 
7     \brief For each given observable subscribe.
8            For each item emitted from all of the given observables, deliver from the new observable that is returned.
9 
10            There are 2 variants of the operator:
11            - The source observable emits nested observables, nested observables are merged.
12            - The source observable and the arguments v0...vn are used to provide the observables to merge.
13 
14     \tparam Coordination  the type of the scheduler (optional).
15     \tparam Value0  ... (optional).
16     \tparam ValueN  types of source observables (optional).
17 
18     \param  cn  the scheduler to synchronize sources from different contexts (optional).
19     \param  v0  ... (optional).
20     \param  vn  source observables (optional).
21 
22     \return  Observable that emits items that are the result of flattening the observables emitted by the source observable.
23 
24     If scheduler is omitted, identity_current_thread is used.
25 
26     \sample
27     \snippet merge.cpp threaded implicit merge sample
28     \snippet output.txt threaded implicit merge sample
29 
30     \sample
31     \snippet merge.cpp implicit merge sample
32     \snippet output.txt implicit merge sample
33 
34     \sample
35     \snippet merge.cpp merge sample
36     \snippet output.txt merge sample
37 
38     \sample
39     \snippet merge.cpp threaded merge sample
40     \snippet output.txt threaded merge sample
41 */
42 
43 #if !defined(RXCPP_OPERATORS_RX_MERGE_HPP)
44 #define RXCPP_OPERATORS_RX_MERGE_HPP
45 
46 #include "../rx-includes.hpp"
47 
48 namespace rxcpp {
49 
50 namespace operators {
51 
52 namespace detail {
53 
54 template<class... AN>
55 struct merge_invalid_arguments {};
56 
57 template<class... AN>
58 struct merge_invalid : public rxo::operator_base<merge_invalid_arguments<AN...>> {
59     using type = observable<merge_invalid_arguments<AN...>, merge_invalid<AN...>>;
60 };
61 template<class... AN>
62 using merge_invalid_t = typename merge_invalid<AN...>::type;
63 
64 template<class T, class Observable, class Coordination>
65 struct merge
66     : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
67 {
68     //static_assert(is_observable<Observable>::value, "merge requires an observable");
69     //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
70 
71     typedef merge<T, Observable, Coordination> this_type;
72 
73     typedef rxu::decay_t<T> source_value_type;
74     typedef rxu::decay_t<Observable> source_type;
75 
76     typedef typename source_type::source_operator_type source_operator_type;
77     typedef typename source_value_type::value_type value_type;
78 
79     typedef rxu::decay_t<Coordination> coordination_type;
80     typedef typename coordination_type::coordinator_type coordinator_type;
81 
82     struct values
83     {
valuesrxcpp::operators::detail::merge::values84         values(source_operator_type o, coordination_type sf)
85             : source_operator(std::move(o))
86             , coordination(std::move(sf))
87         {
88         }
89         source_operator_type source_operator;
90         coordination_type coordination;
91     };
92     values initial;
93 
mergerxcpp::operators::detail::merge94     merge(const source_type& o, coordination_type sf)
95         : initial(o.source_operator, std::move(sf))
96     {
97     }
98 
99     template<class Subscriber>
on_subscriberxcpp::operators::detail::merge100     void on_subscribe(Subscriber scbr) const {
101         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
102 
103         typedef Subscriber output_type;
104 
105         struct merge_state_type
106             : public std::enable_shared_from_this<merge_state_type>
107             , public values
108         {
109             merge_state_type(values i, coordinator_type coor, output_type oarg)
110                 : values(i)
111                 , source(i.source_operator)
112                 , pendingCompletions(0)
113                 , coordinator(std::move(coor))
114                 , out(std::move(oarg))
115             {
116             }
117             observable<source_value_type, source_operator_type> source;
118             // on_completed on the output must wait until all the
119             // subscriptions have received on_completed
120             int pendingCompletions;
121             coordinator_type coordinator;
122             output_type out;
123         };
124 
125         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
126 
127         // take a copy of the values for each subscription
128         auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
129 
130         composite_subscription outercs;
131 
132         // when the out observer is unsubscribed all the
133         // inner subscriptions are unsubscribed as well
134         state->out.add(outercs);
135 
136         auto source = on_exception(
137             [&](){return state->coordinator.in(state->source);},
138             state->out);
139         if (source.empty()) {
140             return;
141         }
142 
143         ++state->pendingCompletions;
144         // this subscribe does not share the observer subscription
145         // so that when it is unsubscribed the observer can be called
146         // until the inner subscriptions have finished
147         auto sink = make_subscriber<source_value_type>(
148             state->out,
149             outercs,
150         // on_next
151             [state](source_value_type st) {
152 
153                 composite_subscription innercs;
154 
155                 // when the out observer is unsubscribed all the
156                 // inner subscriptions are unsubscribed as well
157                 auto innercstoken = state->out.add(innercs);
158 
159                 innercs.add(make_subscription([state, innercstoken](){
160                     state->out.remove(innercstoken);
161                 }));
162 
163                 auto selectedSource = state->coordinator.in(st);
164 
165                 ++state->pendingCompletions;
166                 // this subscribe does not share the source subscription
167                 // so that when it is unsubscribed the source will continue
168                 auto sinkInner = make_subscriber<value_type>(
169                     state->out,
170                     innercs,
171                 // on_next
172                     [state, st](value_type ct) {
173                         state->out.on_next(std::move(ct));
174                     },
175                 // on_error
176                     [state](rxu::error_ptr e) {
177                         state->out.on_error(e);
178                     },
179                 //on_completed
180                     [state](){
181                         if (--state->pendingCompletions == 0) {
182                             state->out.on_completed();
183                         }
184                     }
185                 );
186 
187                 auto selectedSinkInner = state->coordinator.out(sinkInner);
188                 selectedSource.subscribe(std::move(selectedSinkInner));
189             },
190         // on_error
191             [state](rxu::error_ptr e) {
192                 state->out.on_error(e);
193             },
194         // on_completed
195             [state]() {
196                 if (--state->pendingCompletions == 0) {
197                     state->out.on_completed();
198                 }
199             }
200         );
201         auto selectedSink = on_exception(
202             [&](){return state->coordinator.out(sink);},
203             state->out);
204         if (selectedSink.empty()) {
205             return;
206         }
207         source->subscribe(std::move(selectedSink.get()));
208     }
209 };
210 
211 }
212 
213 /*! @copydoc rx-merge.hpp
214 */
215 template<class... AN>
merge(AN &&...an)216 auto merge(AN&&... an)
217     ->     operator_factory<merge_tag, AN...> {
218     return operator_factory<merge_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
219 }
220 
221 }
222 
223 template<>
224 struct member_overload<merge_tag>
225 {
226     template<class Observable,
227         class Enabled = rxu::enable_if_all_true_type_t<
228             is_observable<Observable>>,
229         class SourceValue = rxu::value_type_t<Observable>,
230         class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
231         class Value = rxu::value_type_t<SourceValue>,
232         class Result = observable<Value, Merge>
233     >
memberrxcpp::member_overload234     static Result member(Observable&& o) {
235         return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
236     }
237 
238     template<class Observable, class Coordination,
239         class Enabled = rxu::enable_if_all_true_type_t<
240             is_observable<Observable>,
241             is_coordination<Coordination>>,
242         class SourceValue = rxu::value_type_t<Observable>,
243         class Merge = rxo::detail::merge<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
244         class Value = rxu::value_type_t<SourceValue>,
245         class Result = observable<Value, Merge>
246     >
memberrxcpp::member_overload247     static Result member(Observable&& o, Coordination&& cn) {
248         return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
249     }
250 
251     template<class Observable, class Value0, class... ValueN,
252         class Enabled = rxu::enable_if_all_true_type_t<
253             all_observables<Observable, Value0, ValueN...>>,
254         class EmittedValue = rxu::value_type_t<Observable>,
255         class SourceValue = observable<EmittedValue>,
256         class ObservableObservable = observable<SourceValue>,
257         class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, identity_one_worker>::type,
258         class Value = rxu::value_type_t<Merge>,
259         class Result = observable<Value, Merge>
260     >
memberrxcpp::member_overload261     static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
262         return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
263     }
264 
265     template<class Observable, class Coordination, class Value0, class... ValueN,
266         class Enabled = rxu::enable_if_all_true_type_t<
267             all_observables<Observable, Value0, ValueN...>,
268             is_coordination<Coordination>>,
269         class EmittedValue = rxu::value_type_t<Observable>,
270         class SourceValue = observable<EmittedValue>,
271         class ObservableObservable = observable<SourceValue>,
272         class Merge = typename rxu::defer_type<rxo::detail::merge, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
273         class Value = rxu::value_type_t<Merge>,
274         class Result = observable<Value, Merge>
275     >
memberrxcpp::member_overload276     static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
277         return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
278     }
279 
280     template<class... AN>
memberrxcpp::member_overload281     static operators::detail::merge_invalid_t<AN...> member(AN...) {
282         std::terminate();
283         return {};
284         static_assert(sizeof...(AN) == 10000, "merge takes (optional Coordination, optional Value0, optional ValueN...)");
285     }
286 };
287 
288 }
289 
290 #endif
291