1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-amb.hpp
6 
7     \brief For each item from only the first of the given observables deliver from the new observable that is returned, on the specified scheduler.
8 
9            There are 2 variants of the operator:
10            - The source observable emits nested observables, one of the nested observables is selected.
11            - The source observable and the arguments v0...vn are used to provide the observables to select from.
12 
13     \tparam Coordination  the type of the scheduler (optional).
14     \tparam Value0        ... (optional).
15     \tparam ValueN        types of source observables (optional).
16 
17     \param  cn  the scheduler to synchronize sources from different contexts (optional).
18     \param  v0  ... (optional).
19     \param  vn  source observables (optional).
20 
21     \return  Observable that emits the same sequence as whichever of the source observables first emitted an item or sent a termination notification.
22 
23     If scheduler is omitted, identity_current_thread is used.
24 
25     \sample
26     \snippet amb.cpp threaded implicit amb sample
27     \snippet output.txt threaded implicit amb sample
28 
29     \snippet amb.cpp implicit amb sample
30     \snippet output.txt implicit amb sample
31 
32     \snippet amb.cpp amb sample
33     \snippet output.txt amb sample
34 
35     \snippet amb.cpp threaded amb sample
36     \snippet output.txt threaded amb sample
37 */
38 
39 #if !defined(RXCPP_OPERATORS_RX_AMB_HPP)
40 #define RXCPP_OPERATORS_RX_AMB_HPP
41 
42 #include "../rx-includes.hpp"
43 
44 namespace rxcpp {
45 
46 namespace operators {
47 
48 namespace detail {
49 
50 template<class... AN>
51 struct amb_invalid_arguments {};
52 
53 template<class... AN>
54 struct amb_invalid : public rxo::operator_base<amb_invalid_arguments<AN...>> {
55     using type = observable<amb_invalid_arguments<AN...>, amb_invalid<AN...>>;
56 };
57 template<class... AN>
58 using amb_invalid_t = typename amb_invalid<AN...>::type;
59 
60 template<class T, class Observable, class Coordination>
61 struct amb
62     : public operator_base<rxu::value_type_t<T>>
63 {
64     //static_assert(is_observable<Observable>::value, "amb requires an observable");
65     //static_assert(is_observable<T>::value, "amb requires an observable that contains observables");
66 
67     typedef amb<T, Observable, Coordination> this_type;
68 
69     typedef rxu::decay_t<T> source_value_type;
70     typedef rxu::decay_t<Observable> source_type;
71 
72     typedef typename source_type::source_operator_type source_operator_type;
73     typedef typename source_value_type::value_type value_type;
74 
75     typedef rxu::decay_t<Coordination> coordination_type;
76     typedef typename coordination_type::coordinator_type coordinator_type;
77 
78     struct values
79     {
valuesrxcpp::operators::detail::amb::values80         values(source_operator_type o, coordination_type sf)
81             : source_operator(std::move(o))
82             , coordination(std::move(sf))
83         {
84         }
85         source_operator_type source_operator;
86         coordination_type coordination;
87     };
88     values initial;
89 
ambrxcpp::operators::detail::amb90     amb(const source_type& o, coordination_type sf)
91         : initial(o.source_operator, std::move(sf))
92     {
93     }
94 
95     template<class Subscriber>
on_subscriberxcpp::operators::detail::amb96     void on_subscribe(Subscriber scbr) const {
97         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
98 
99         typedef Subscriber output_type;
100 
101         struct amb_state_type
102             : public std::enable_shared_from_this<amb_state_type>
103             , public values
104         {
105             amb_state_type(values i, coordinator_type coor, output_type oarg)
106                 : values(i)
107                 , source(i.source_operator)
108                 , coordinator(std::move(coor))
109                 , out(std::move(oarg))
110                 , pendingObservables(0)
111                 , firstEmitted(false)
112             {
113             }
114             observable<source_value_type, source_operator_type> source;
115             coordinator_type coordinator;
116             output_type out;
117             int pendingObservables;
118             bool firstEmitted;
119             std::vector<composite_subscription> innerSubscriptions;
120         };
121 
122         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
123 
124         // take a copy of the values for each subscription
125         auto state = std::make_shared<amb_state_type>(initial, std::move(coordinator), std::move(scbr));
126 
127         composite_subscription outercs;
128 
129         // when the out observer is unsubscribed all the
130         // inner subscriptions are unsubscribed as well
131         state->out.add(outercs);
132 
133         auto source = on_exception(
134             [&](){return state->coordinator.in(state->source);},
135             state->out);
136         if (source.empty()) {
137             return;
138         }
139 
140         // this subscribe does not share the observer subscription
141         // so that when it is unsubscribed the observer can be called
142         // until the inner subscriptions have finished
143         auto sink = make_subscriber<source_value_type>(
144             state->out,
145             outercs,
146         // on_next
147             [state](source_value_type st) {
148 
149                 if (state->firstEmitted)
150                     return;
151 
152                 composite_subscription innercs;
153 
154                 state->innerSubscriptions.push_back(innercs);
155 
156                 // when the out observer is unsubscribed all the
157                 // inner subscriptions are unsubscribed as well
158                 auto innercstoken = state->out.add(innercs);
159 
160                 innercs.add(make_subscription([state, innercstoken](){
161                     state->out.remove(innercstoken);
162                 }));
163 
164                 auto selectedSource = state->coordinator.in(st);
165 
166                 auto current_id = state->pendingObservables++;
167 
168                 // this subscribe does not share the source subscription
169                 // so that when it is unsubscribed the source will continue
170                 auto sinkInner = make_subscriber<value_type>(
171                     state->out,
172                     innercs,
173                 // on_next
174                     [state, st, current_id](value_type ct) {
175                         state->out.on_next(std::move(ct));
176                         if (!state->firstEmitted) {
177                             state->firstEmitted = true;
178                             auto do_unsubscribe = [](composite_subscription cs) {
179                                 cs.unsubscribe();
180                             };
181                             std::for_each(state->innerSubscriptions.begin(), state->innerSubscriptions.begin() + current_id, do_unsubscribe);
182                             std::for_each(state->innerSubscriptions.begin() + current_id + 1, state->innerSubscriptions.end(), do_unsubscribe);
183                         }
184                     },
185                 // on_error
186                     [state](rxu::error_ptr e) {
187                         state->out.on_error(e);
188                     },
189                 //on_completed
190                     [state](){
191                         state->out.on_completed();
192                     }
193                 );
194 
195                 auto selectedSinkInner = state->coordinator.out(sinkInner);
196                 selectedSource.subscribe(std::move(selectedSinkInner));
197             },
198         // on_error
199             [state](rxu::error_ptr e) {
200                 state->out.on_error(e);
201             },
202         // on_completed
203             [state]() {
204                 if (state->pendingObservables == 0) {
205                     state->out.on_completed();
206                 }
207             }
208         );
209         auto selectedSink = on_exception(
210             [&](){return state->coordinator.out(sink);},
211             state->out);
212         if (selectedSink.empty()) {
213             return;
214         }
215         source->subscribe(std::move(selectedSink.get()));
216     }
217 };
218 
219 }
220 
221 /*! @copydoc rx-amb.hpp
222 */
223 template<class... AN>
amb(AN &&...an)224 auto amb(AN&&... an)
225     ->     operator_factory<amb_tag, AN...> {
226     return operator_factory<amb_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
227 }
228 
229 }
230 
231 template<>
232 struct member_overload<amb_tag>
233 {
234     template<class Observable,
235         class Enabled = rxu::enable_if_all_true_type_t<
236             is_observable<Observable>>,
237         class SourceValue = rxu::value_type_t<Observable>,
238         class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
239         class Value = rxu::value_type_t<SourceValue>,
240         class Result = observable<Value, Amb>
241     >
memberrxcpp::member_overload242     static Result member(Observable&& o) {
243         return Result(Amb(std::forward<Observable>(o), identity_current_thread()));
244     }
245 
246     template<class Observable, class Coordination,
247         class Enabled = rxu::enable_if_all_true_type_t<
248             is_observable<Observable>,
249             is_coordination<Coordination>>,
250         class SourceValue = rxu::value_type_t<Observable>,
251         class Amb = rxo::detail::amb<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
252         class Value = rxu::value_type_t<SourceValue>,
253         class Result = observable<Value, Amb>
254     >
memberrxcpp::member_overload255     static Result member(Observable&& o, Coordination&& cn) {
256         return Result(Amb(std::forward<Observable>(o), std::forward<Coordination>(cn)));
257     }
258 
259     template<class Observable, class Value0, class... ValueN,
260         class Enabled = rxu::enable_if_all_true_type_t<
261             all_observables<Observable, Value0, ValueN...>>,
262         class EmittedValue = rxu::value_type_t<Observable>,
263         class SourceValue = observable<EmittedValue>,
264         class ObservableObservable = observable<SourceValue>,
265         class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, identity_one_worker>::type,
266         class Value = rxu::value_type_t<Amb>,
267         class Result = observable<Value, Amb>
268     >
memberrxcpp::member_overload269     static Result member(Observable&& o, Value0&& v0, ValueN&&... vn) {
270         return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), identity_current_thread()));
271     }
272 
273     template<class Observable, class Coordination, class Value0, class... ValueN,
274         class Enabled = rxu::enable_if_all_true_type_t<
275             all_observables<Observable, Value0, ValueN...>,
276             is_coordination<Coordination>>,
277         class EmittedValue = rxu::value_type_t<Observable>,
278         class SourceValue = observable<EmittedValue>,
279         class ObservableObservable = observable<SourceValue>,
280         class Amb = typename rxu::defer_type<rxo::detail::amb, SourceValue, ObservableObservable, rxu::decay_t<Coordination>>::type,
281         class Value = rxu::value_type_t<Amb>,
282         class Result = observable<Value, Amb>
283     >
memberrxcpp::member_overload284     static Result member(Observable&& o, Coordination&& cn, Value0&& v0, ValueN&&... vn) {
285         return Result(Amb(rxs::from(o.as_dynamic(), v0.as_dynamic(), vn.as_dynamic()...), std::forward<Coordination>(cn)));
286     }
287 
288     template<class... AN>
memberrxcpp::member_overload289     static operators::detail::amb_invalid_t<AN...> member(AN...) {
290         std::terminate();
291         return {};
292         static_assert(sizeof...(AN) == 10000, "amb takes (optional Coordination, optional Value0, optional ValueN...)");
293     }
294 };
295 
296 }
297 
298 #endif
299