1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-switch_on_next.hpp
6 
7     \brief Return observable that emits the items emitted by the observable most recently emitted by the source observable.
8 
9     \tparam Coordination  the type of the scheduler (optional).
10 
11     \param cn  the scheduler to synchronize sources from different contexts (optional).
12 
13     \return  Observable that emits the items emitted by the observable most recently emitted by the source observable.
14 
15     \sample
16     \snippet switch_on_next.cpp switch_on_next sample
17     \snippet output.txt switch_on_next sample
18 */
19 
20 #if !defined(RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP)
21 #define RXCPP_OPERATORS_RX_SWITCH_ON_NEXT_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct switch_on_next_invalid_arguments {};
33 
34 template<class... AN>
35 struct switch_on_next_invalid : public rxo::operator_base<switch_on_next_invalid_arguments<AN...>> {
36     using type = observable<switch_on_next_invalid_arguments<AN...>, switch_on_next_invalid<AN...>>;
37 };
38 template<class... AN>
39 using switch_on_next_invalid_t = typename switch_on_next_invalid<AN...>::type;
40 
41 template<class T, class Observable, class Coordination>
42 struct switch_on_next
43     : public operator_base<rxu::value_type_t<rxu::decay_t<T>>>
44 {
45     //static_assert(is_observable<Observable>::value, "switch_on_next requires an observable");
46     //static_assert(is_observable<T>::value, "switch_on_next requires an observable that contains observables");
47 
48     typedef switch_on_next<T, Observable, Coordination> this_type;
49 
50     typedef rxu::decay_t<T> source_value_type;
51     typedef rxu::decay_t<Observable> source_type;
52 
53     typedef typename source_type::source_operator_type source_operator_type;
54 
55     typedef source_value_type collection_type;
56     typedef typename collection_type::value_type collection_value_type;
57 
58     typedef rxu::decay_t<Coordination> coordination_type;
59     typedef typename coordination_type::coordinator_type coordinator_type;
60 
61     struct values
62     {
valuesrxcpp::operators::detail::switch_on_next::values63         values(source_operator_type o, coordination_type sf)
64             : source_operator(std::move(o))
65             , coordination(std::move(sf))
66         {
67         }
68         source_operator_type source_operator;
69         coordination_type coordination;
70     };
71     values initial;
72 
switch_on_nextrxcpp::operators::detail::switch_on_next73     switch_on_next(const source_type& o, coordination_type sf)
74         : initial(o.source_operator, std::move(sf))
75     {
76     }
77 
78     template<class Subscriber>
on_subscriberxcpp::operators::detail::switch_on_next79     void on_subscribe(Subscriber scbr) const {
80         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
81 
82         typedef Subscriber output_type;
83 
84         struct switch_state_type
85             : public std::enable_shared_from_this<switch_state_type>
86             , public values
87         {
88             switch_state_type(values i, coordinator_type coor, output_type oarg)
89                 : values(i)
90                 , source(i.source_operator)
91                 , pendingCompletions(0)
92                 , coordinator(std::move(coor))
93                 , out(std::move(oarg))
94             {
95             }
96             observable<source_value_type, source_operator_type> source;
97             // on_completed on the output must wait until all the
98             // subscriptions have received on_completed
99             int pendingCompletions;
100             coordinator_type coordinator;
101             composite_subscription inner_lifetime;
102             output_type out;
103         };
104 
105         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
106 
107         // take a copy of the values for each subscription
108         auto state = std::make_shared<switch_state_type>(initial, std::move(coordinator), std::move(scbr));
109 
110         composite_subscription outercs;
111 
112         // when the out observer is unsubscribed all the
113         // inner subscriptions are unsubscribed as well
114         state->out.add(outercs);
115 
116         auto source = on_exception(
117             [&](){return state->coordinator.in(state->source);},
118             state->out);
119         if (source.empty()) {
120             return;
121         }
122 
123         ++state->pendingCompletions;
124         // this subscribe does not share the observer subscription
125         // so that when it is unsubscribed the observer can be called
126         // until the inner subscriptions have finished
127         auto sink = make_subscriber<collection_type>(
128             state->out,
129             outercs,
130         // on_next
131             [state](collection_type st) {
132 
133                 state->inner_lifetime.unsubscribe();
134 
135                 state->inner_lifetime = composite_subscription();
136 
137                 // when the out observer is unsubscribed all the
138                 // inner subscriptions are unsubscribed as well
139                 auto innerlifetimetoken = state->out.add(state->inner_lifetime);
140 
141                 state->inner_lifetime.add(make_subscription([state, innerlifetimetoken](){
142                     state->out.remove(innerlifetimetoken);
143                     --state->pendingCompletions;
144                 }));
145 
146                 auto selectedSource = state->coordinator.in(st);
147 
148                 // this subscribe does not share the source subscription
149                 // so that when it is unsubscribed the source will continue
150                 auto sinkInner = make_subscriber<collection_value_type>(
151                     state->out,
152                     state->inner_lifetime,
153                 // on_next
154                     [state, st](collection_value_type ct) {
155                         state->out.on_next(std::move(ct));
156                     },
157                 // on_error
158                     [state](rxu::error_ptr e) {
159                         state->out.on_error(e);
160                     },
161                 //on_completed
162                     [state](){
163                         if (state->pendingCompletions == 1) {
164                             state->out.on_completed();
165                         }
166                     }
167                 );
168 
169                 auto selectedSinkInner = state->coordinator.out(sinkInner);
170                 ++state->pendingCompletions;
171                 selectedSource.subscribe(std::move(selectedSinkInner));
172             },
173         // on_error
174             [state](rxu::error_ptr e) {
175                 state->out.on_error(e);
176             },
177         // on_completed
178             [state]() {
179                 if (--state->pendingCompletions == 0) {
180                     state->out.on_completed();
181                 }
182             }
183         );
184 
185         auto selectedSink = on_exception(
186             [&](){return state->coordinator.out(sink);},
187             state->out);
188         if (selectedSink.empty()) {
189             return;
190         }
191 
192         source->subscribe(std::move(selectedSink.get()));
193 
194     }
195 };
196 
197 }
198 
199 /*! @copydoc rx-switch_on_next.hpp
200 */
201 template<class... AN>
switch_on_next(AN &&...an)202 auto switch_on_next(AN&&... an)
203     ->     operator_factory<switch_on_next_tag, AN...> {
204     return operator_factory<switch_on_next_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
205 }
206 
207 }
208 
209 template<>
210 struct member_overload<switch_on_next_tag>
211 {
212     template<class Observable,
213         class Enabled = rxu::enable_if_all_true_type_t<
214             is_observable<Observable>>,
215         class SourceValue = rxu::value_type_t<Observable>,
216         class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, identity_one_worker>,
217         class Value = rxu::value_type_t<SourceValue>,
218         class Result = observable<Value, SwitchOnNext>
219     >
memberrxcpp::member_overload220     static Result member(Observable&& o) {
221         return Result(SwitchOnNext(std::forward<Observable>(o), identity_current_thread()));
222     }
223 
224     template<class Observable, class Coordination,
225         class Enabled = rxu::enable_if_all_true_type_t<
226             is_observable<Observable>,
227             is_coordination<Coordination>>,
228         class SourceValue = rxu::value_type_t<Observable>,
229         class SwitchOnNext = rxo::detail::switch_on_next<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
230         class Value = rxu::value_type_t<SourceValue>,
231         class Result = observable<Value, SwitchOnNext>
232     >
memberrxcpp::member_overload233     static Result member(Observable&& o, Coordination&& cn) {
234         return Result(SwitchOnNext(std::forward<Observable>(o), std::forward<Coordination>(cn)));
235     }
236 
237     template<class... AN>
memberrxcpp::member_overload238     static operators::detail::switch_on_next_invalid_t<AN...> member(AN...) {
239         std::terminate();
240         return {};
241         static_assert(sizeof...(AN) == 10000, "switch_on_next takes (optional Coordination)");
242     }
243 };
244 
245 }
246 
247 #endif
248