1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-merge_delay_error.hpp
6 
7         \brief For each given observable subscribe.
8                    For each item emitted from all of the given observables, deliver from the new observable that is returned.
9                    The first error to occure is hold off until all of the given non-error-emitting observables have finished their emission.
10 
11                    There are 2 variants of the operator:
12                    - The source observable emits nested observables, nested observables are merged.
13                    - The source observable and the arguments v0...vn are used to provide the observables to merge.
14 
15         \tparam Coordination  the type of the scheduler (optional).
16         \tparam Value0  ... (optional).
17         \tparam ValueN  types of source observables (optional).
18 
19         \param  cn      the scheduler to synchronize sources from different contexts (optional).
20         \param  v0      ... (optional).
21         \param  vn      source observables (optional).
22 
23         \return                                                                                                              Observable that emits items that are the result of flattening the observables emitted by the source observable.
24 
25         If scheduler is omitted, identity_current_thread is used.
26 
27         \sample
28         \snippet merge_delay_error.cpp threaded implicit merge sample
29         \snippet output.txt threaded implicit merge sample
30 
31         \sample
32         \snippet merge_delay_error.cpp implicit merge sample
33         \snippet output.txt implicit merge sample
34 
35         \sample
36         \snippet merge_delay_error.cpp merge sample
37         \snippet output.txt merge sample
38 
39         \sample
40         \snippet merge_delay_error.cpp threaded merge sample
41         \snippet output.txt threaded merge sample
42 */
43 
44 #if !defined(RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP)
45 #define RXCPP_OPERATORS_RX_MERGE_DELAY_ERROR_HPP
46 
47 #include "rx-merge.hpp"
48 
49 #include "../rx-composite_exception.hpp"
50 
51 namespace rxcpp {
52 
53 namespace operators {
54 
55 namespace detail {
56 
57 template<class T, class Observable, class Coordination>
58 struct merge_delay_error
59         : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
60 {
61         //static_assert(is_observable<Observable>::value, "merge requires an observable");
62         //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
63 
64         typedef merge_delay_error<T, Observable, Coordination> this_type;
65 
66         typedef rxu::decay_t<T> source_value_type;
67         typedef rxu::decay_t<Observable> source_type;
68 
69         typedef typename source_type::source_operator_type source_operator_type;
70         typedef typename source_value_type::value_type value_type;
71 
72         typedef rxu::decay_t<Coordination> coordination_type;
73         typedef typename coordination_type::coordinator_type coordinator_type;
74 
75         struct values
76         {
valuesrxcpp::operators::detail::merge_delay_error::values77                 values(source_operator_type o, coordination_type sf)
78                         : source_operator(std::move(o))
79                         , coordination(std::move(sf))
80                 {
81                 }
82                 source_operator_type source_operator;
83                 coordination_type coordination;
84         };
85         values initial;
86 
merge_delay_errorrxcpp::operators::detail::merge_delay_error87         merge_delay_error(const source_type& o, coordination_type sf)
88                 : initial(o.source_operator, std::move(sf))
89         {
90         }
91 
92         template<class Subscriber>
on_subscriberxcpp::operators::detail::merge_delay_error93         void on_subscribe(Subscriber scbr) const {
94                 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
95 
96                 typedef Subscriber output_type;
97 
98                 struct merge_state_type
99                         : public std::enable_shared_from_this<merge_state_type>
100                         , public values
101                 {
102                         merge_state_type(values i, coordinator_type coor, output_type oarg)
103                                 : values(i)
104                                 , source(i.source_operator)
105                                 , pendingCompletions(0)
106                                 , coordinator(std::move(coor))
107                                 , out(std::move(oarg))
108                         {
109                         }
110                         observable<source_value_type, source_operator_type> source;
111                         // on_completed on the output must wait until all the
112                         // subscriptions have received on_completed
113                         int pendingCompletions;
114                         composite_exception exception;;
115                         coordinator_type coordinator;
116                         output_type out;
117                 };
118 
119                 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
120 
121                 // take a copy of the values for each subscription
122                 auto state = std::make_shared<merge_state_type>(initial, std::move(coordinator), std::move(scbr));
123 
124                 composite_subscription outercs;
125 
126                 // when the out observer is unsubscribed all the
127                 // inner subscriptions are unsubscribed as well
128                 state->out.add(outercs);
129 
130                 auto source = on_exception(
131                         [&](){return state->coordinator.in(state->source);},
132                         state->out);
133                 if (source.empty()) {
134                         return;
135                 }
136 
137                 ++state->pendingCompletions;
138                 // this subscribe does not share the observer subscription
139                 // so that when it is unsubscribed the observer can be called
140                 // until the inner subscriptions have finished
141                 auto sink = make_subscriber<source_value_type>(
142                         state->out,
143                         outercs,
144                 // on_next
145                         [state](source_value_type st) {
146 
147                                 composite_subscription innercs;
148 
149                                 // when the out observer is unsubscribed all the
150                                 // inner subscriptions are unsubscribed as well
151                                 auto innercstoken = state->out.add(innercs);
152 
153                                 innercs.add(make_subscription([state, innercstoken](){
154                                         state->out.remove(innercstoken);
155                                 }));
156 
157                                 auto selectedSource = state->coordinator.in(st);
158 
159                                 ++state->pendingCompletions;
160                                 // this subscribe does not share the source subscription
161                                 // so that when it is unsubscribed the source will continue
162                                 auto sinkInner = make_subscriber<value_type>(
163                                         state->out,
164                                         innercs,
165                                 // on_next
166                                         [state, st](value_type ct) {
167                                                 state->out.on_next(std::move(ct));
168                                         },
169                                 // on_error
170                                         [state](rxu::error_ptr e) {
171                                                 if(--state->pendingCompletions == 0) {
172                                                     state->out.on_error(
173                                                         rxu::make_error_ptr(std::move(state->exception.add(e))));
174                                                 } else {
175                                                         state->exception.add(e);
176                                                 }
177                                         },
178                                 //on_completed
179                                         [state](){
180                                                 if (--state->pendingCompletions == 0) {
181                                                         if(!state->exception.empty()) {
182                                                             state->out.on_error(
183                                                                 rxu::make_error_ptr(std::move(state->exception)));
184                                                         } else {
185                                                                 state->out.on_completed();
186                                                         }
187                                                 }
188                                         }
189                                 );
190 
191                                 auto selectedSinkInner = state->coordinator.out(sinkInner);
192                                 selectedSource.subscribe(std::move(selectedSinkInner));
193                         },
194                 // on_error
195                         [state](rxu::error_ptr e) {
196                             if(--state->pendingCompletions == 0) {
197                                 state->out.on_error(
198                                     rxu::make_error_ptr(std::move(state->exception.add(e))));
199                             } else {
200                                 state->exception.add(e);
201                             }
202                         },
203                 // on_completed
204                         [state]() {
205                             if (--state->pendingCompletions == 0) {
206                                 if(!state->exception.empty()) {
207                                     state->out.on_error(
208                                         rxu::make_error_ptr(std::move(state->exception)));
209                                 } else {
210                                     state->out.on_completed();
211                                 }
212                             }
213                         }
214                 );
215                 auto selectedSink = on_exception(
216                         [&](){return state->coordinator.out(sink);},
217                         state->out);
218                 if (selectedSink.empty()) {
219                         return;
220                 }
221                 source->subscribe(std::move(selectedSink.get()));
222         }
223 };
224 
225 }
226 
227 /*! @copydoc rx-merge-delay-error.hpp
228 */
229 template<class... AN>
merge_delay_error(AN &&...an)230 auto merge_delay_error(AN&&... an)
231         ->         operator_factory<merge_delay_error_tag, AN...> {
232         return operator_factory<merge_delay_error_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
233 }
234 
235 }
236 
237 template<>
238 struct member_overload<merge_delay_error_tag>
239 {
240         template<class Observable,
241                 class Enabled = rxu::enable_if_all_true_type_t<
242                         is_observable<Observable>>,
243                 class SourceValue = rxu::value_type_t<Observable>,
244                 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
245                 class Value = rxu::value_type_t<SourceValue>,
246                 class Result = observable<Value, Merge>
247         >
memberrxcpp::member_overload248         static Result member(Observable&& o) {
249                 return Result(Merge(std::forward<Observable>(o), identity_current_thread()));
250         }
251 
252         template<class Observable, class Coordination,
253                 class Enabled = rxu::enable_if_all_true_type_t<
254                         is_observable<Observable>,
255                         is_coordination<Coordination>>,
256                 class SourceValue = rxu::value_type_t<Observable>,
257                 class Merge = rxo::detail::merge_delay_error<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
258                 class Value = rxu::value_type_t<SourceValue>,
259                 class Result = observable<Value, Merge>
260         >
memberrxcpp::member_overload261         static Result member(Observable&& o, Coordination&& cn) {
262                 return Result(Merge(std::forward<Observable>(o), std::forward<Coordination>(cn)));
263         }
264 
265         template<class Observable, class Value0, class... ValueN,
266                 class Enabled = rxu::enable_if_all_true_type_t<
267                         all_observables<Observable, Value0, ValueN...>>,
268                 class EmittedValue = rxu::value_type_t<Observable>,
269                 class SourceValue = observable<EmittedValue>,
270                 class ObservableObservable = observable<SourceValue>,
271                 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, identity_one_worker>::type,
272                 class Value = rxu::value_type_t<Merge>,
273                 class Result = observable<Value, Merge>
274         >
memberrxcpp::member_overload275         static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
276                 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
277         }
278 
279         template<class Observable, class Coordination, class Value0, class... ValueN,
280                 class Enabled = rxu::enable_if_all_true_type_t<
281                         all_observables<Observable, Value0, ValueN...>,
282                         is_coordination<Coordination>>,
283                 class EmittedValue = rxu::value_type_t<Observable>,
284                 class SourceValue = observable<EmittedValue>,
285                 class ObservableObservable = observable<SourceValue>,
286                 class Merge = typename rxu::defer_type<rxo::detail::merge_delay_error, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
287                 class Value = rxu::value_type_t<Merge>,
288                 class Result = observable<Value, Merge>
289         >
memberrxcpp::member_overload290         static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
291                 return Result(Merge(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
292         }
293 
294         template<class... AN>
memberrxcpp::member_overload295         static operators::detail::merge_invalid_t<AN...> member(AN...) {
296                 std::terminate();
297                 return {};
298                 static_assert(sizeof...(AN) == 10000, "merge_delay_error takes (optional Coordination, optional Value0, optional ValueN...)");
299         }
300 };
301 
302 }
303 
304 #endif
305