1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-subscribe_on.hpp
6 
7     \brief Subscription and unsubscription are queued and delivered using the scheduler from the supplied coordination.
8 
9     \tparam Coordination  the type of the scheduler.
10 
11     \param  cn  the scheduler to perform subscription actions on.
12 
13     \return  The source observable modified so that its subscriptions happen on the specified scheduler.
14 
15     \sample
16     \snippet subscribe_on.cpp subscribe_on sample
17     \snippet output.txt subscribe_on sample
18 
19     Invoking rxcpp::observable::observe_on operator, instead of subscribe_on, gives following results:
20     \snippet output.txt observe_on sample
21 */
22 
23 #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
24 #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP
25 
26 #include "../rx-includes.hpp"
27 
28 namespace rxcpp {
29 
30 namespace operators {
31 
32 namespace detail {
33 
34 template<class... AN>
35 struct subscribe_on_invalid_arguments {};
36 
37 template<class... AN>
38 struct subscribe_on_invalid : public rxo::operator_base<subscribe_on_invalid_arguments<AN...>> {
39     using type = observable<subscribe_on_invalid_arguments<AN...>, subscribe_on_invalid<AN...>>;
40 };
41 template<class... AN>
42 using subscribe_on_invalid_t = typename subscribe_on_invalid<AN...>::type;
43 
44 template<class T, class Observable, class Coordination>
45 struct subscribe_on : public operator_base<T>
46 {
47     typedef rxu::decay_t<Observable> source_type;
48     typedef rxu::decay_t<Coordination> coordination_type;
49     typedef typename coordination_type::coordinator_type coordinator_type;
50     struct subscribe_on_values
51     {
~subscribe_on_valuesrxcpp::operators::detail::subscribe_on::subscribe_on_values52         ~subscribe_on_values()
53         {
54         }
subscribe_on_valuesrxcpp::operators::detail::subscribe_on::subscribe_on_values55         subscribe_on_values(source_type s, coordination_type sf)
56             : source(std::move(s))
57             , coordination(std::move(sf))
58         {
59         }
60         source_type source;
61         coordination_type coordination;
62     private:
63         subscribe_on_values& operator=(subscribe_on_values o) RXCPP_DELETE;
64     };
65     const subscribe_on_values initial;
66 
~subscribe_onrxcpp::operators::detail::subscribe_on67     ~subscribe_on()
68     {
69     }
subscribe_onrxcpp::operators::detail::subscribe_on70     subscribe_on(source_type s, coordination_type sf)
71         : initial(std::move(s), std::move(sf))
72     {
73     }
74 
75     template<class Subscriber>
on_subscriberxcpp::operators::detail::subscribe_on76     void on_subscribe(Subscriber s) const {
77 
78         typedef Subscriber output_type;
79         struct subscribe_on_state_type
80             : public std::enable_shared_from_this<subscribe_on_state_type>
81             , public subscribe_on_values
82         {
83             subscribe_on_state_type(const subscribe_on_values& i, const output_type& oarg)
84                 : subscribe_on_values(i)
85                 , out(oarg)
86             {
87             }
88             composite_subscription source_lifetime;
89             output_type out;
90         private:
91             subscribe_on_state_type& operator=(subscribe_on_state_type o) RXCPP_DELETE;
92         };
93 
94         composite_subscription coordinator_lifetime;
95 
96         auto coordinator = initial.coordination.create_coordinator(coordinator_lifetime);
97 
98         auto controller = coordinator.get_worker();
99 
100         // take a copy of the values for each subscription
101         auto state = std::make_shared<subscribe_on_state_type>(initial, std::move(s));
102 
103         auto sl = state->source_lifetime;
104         auto ol = state->out.get_subscription();
105 
106         auto disposer = [=](const rxsc::schedulable&){
107             sl.unsubscribe();
108             ol.unsubscribe();
109             coordinator_lifetime.unsubscribe();
110         };
111         auto selectedDisposer = on_exception(
112             [&](){return coordinator.act(disposer);},
113             state->out);
114         if (selectedDisposer.empty()) {
115             return;
116         }
117 
118         state->source_lifetime.add([=](){
119             controller.schedule(selectedDisposer.get());
120         });
121 
122         state->out.add([=](){
123             sl.unsubscribe();
124             ol.unsubscribe();
125             coordinator_lifetime.unsubscribe();
126         });
127 
128         auto producer = [=](const rxsc::schedulable&){
129             state->source.subscribe(state->source_lifetime, state->out);
130         };
131 
132         auto selectedProducer = on_exception(
133             [&](){return coordinator.act(producer);},
134             state->out);
135         if (selectedProducer.empty()) {
136             return;
137         }
138 
139         controller.schedule(selectedProducer.get());
140     }
141 private:
142     subscribe_on& operator=(subscribe_on o) RXCPP_DELETE;
143 };
144 
145 }
146 
147 /*! @copydoc rx-subscribe_on.hpp
148 */
149 template<class... AN>
subscribe_on(AN &&...an)150 auto subscribe_on(AN&&... an)
151     ->      operator_factory<subscribe_on_tag, AN...> {
152      return operator_factory<subscribe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
153 }
154 
155 }
156 
157 template<>
158 struct member_overload<subscribe_on_tag>
159 {
160     template<class Observable, class Coordination,
161         class Enabled = rxu::enable_if_all_true_type_t<
162             is_observable<Observable>,
163             is_coordination<Coordination>>,
164         class SourceValue = rxu::value_type_t<Observable>,
165         class SubscribeOn = rxo::detail::subscribe_on<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Coordination>>,
166         class Value = rxu::value_type_t<SubscribeOn>,
167         class Result = observable<Value, SubscribeOn>>
memberrxcpp::member_overload168     static Result member(Observable&& o, Coordination&& cn) {
169         return Result(SubscribeOn(std::forward<Observable>(o), std::forward<Coordination>(cn)));
170     }
171 
172     template<class... AN>
memberrxcpp::member_overload173     static operators::detail::subscribe_on_invalid_t<AN...> member(AN...) {
174         std::terminate();
175         return {};
176         static_assert(sizeof...(AN) == 10000, "subscribe_on takes (Coordination)");
177     }
178 };
179 
180 }
181 
182 #endif
183