1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-observe_on.hpp
6 
7     \brief All values are queued and delivered using the scheduler from the supplied coordination.
8 
9     \tparam Coordination  the type of the scheduler.
10 
11     \param  cn  the scheduler to notify observers on.
12 
13     \return  The source observable modified so that its observers are notified on the specified scheduler.
14 
15     \sample
16     \snippet observe_on.cpp observe_on sample
17     \snippet output.txt observe_on sample
18 
19     Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
20     \snippet output.txt subscribe_on sample
21 */
22 
23 #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
24 #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
25 
26 #include "../rx-includes.hpp"
27 
28 namespace rxcpp {
29 
30 namespace operators {
31 
32 namespace detail {
33 
34 template<class... AN>
35 struct observe_on_invalid_arguments {};
36 
37 template<class... AN>
38 struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
39     using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>;
40 };
41 template<class... AN>
42 using observe_on_invalid_t = typename observe_on_invalid<AN...>::type;
43 
44 template<class T, class Coordination>
45 struct observe_on
46 {
47     typedef rxu::decay_t<T> source_value_type;
48 
49     typedef rxu::decay_t<Coordination> coordination_type;
50     typedef typename coordination_type::coordinator_type coordinator_type;
51 
52     coordination_type coordination;
53 
observe_onrxcpp::operators::detail::observe_on54     observe_on(coordination_type cn)
55         : coordination(std::move(cn))
56     {
57     }
58 
59     template<class Subscriber>
60     struct observe_on_observer
61     {
62         typedef observe_on_observer<Subscriber> this_type;
63         typedef source_value_type value_type;
64         typedef rxu::decay_t<Subscriber> dest_type;
65         typedef observer<value_type, this_type> observer_type;
66 
67         typedef rxn::notification<T> notification_type;
68         typedef typename notification_type::type base_notification_type;
69         typedef std::deque<base_notification_type> queue_type;
70 
71         struct mode
72         {
73             enum type {
74                 Invalid = 0,
75                 Processing,
76                 Empty,
77                 Disposed,
78                 Errored
79             };
80         };
81         struct observe_on_state : std::enable_shared_from_this<observe_on_state>
82         {
83             mutable std::mutex lock;
84             mutable queue_type fill_queue;
85             mutable queue_type drain_queue;
86             composite_subscription lifetime;
87             mutable typename mode::type current;
88             coordinator_type coordinator;
89             dest_type destination;
90 
observe_on_staterxcpp::operators::detail::observe_on::observe_on_observer::observe_on_state91             observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs)
92                 : lifetime(std::move(cs))
93                 , current(mode::Empty)
94                 , coordinator(std::move(coor))
95                 , destination(std::move(d))
96             {
97             }
98 
finishrxcpp::operators::detail::observe_on::observe_on_observer::observe_on_state99             void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
100                 if (!guard.owns_lock()) {
101                     std::terminate();
102                 }
103                 if (current == mode::Errored || current == mode::Disposed) {return;}
104                 current = end;
105                 queue_type fill_expired;
106                 swap(fill_expired, fill_queue);
107                 queue_type drain_expired;
108                 swap(drain_expired, drain_queue);
109                 RXCPP_UNWIND_AUTO([&](){guard.lock();});
110                 guard.unlock();
111                 lifetime.unsubscribe();
112                 destination.unsubscribe();
113             }
114 
ensure_processingrxcpp::operators::detail::observe_on::observe_on_observer::observe_on_state115             void ensure_processing(std::unique_lock<std::mutex>& guard) const {
116                 if (!guard.owns_lock()) {
117                     std::terminate();
118                 }
119                 if (current == mode::Empty) {
120                     current = mode::Processing;
121 
122                     if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
123                         finish(guard, mode::Disposed);
124                     }
125 
126                     auto keepAlive = this->shared_from_this();
127 
128                     auto drain = [keepAlive, this](const rxsc::schedulable& self){
129                         using std::swap;
130                         RXCPP_TRY {
131                             for (;;) {
132                                 if (drain_queue.empty() || !destination.is_subscribed()) {
133                                     std::unique_lock<std::mutex> guard(lock);
134                                     if (!destination.is_subscribed() ||
135                                         (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
136                                         finish(guard, mode::Disposed);
137                                         return;
138                                     }
139                                     if (drain_queue.empty()) {
140                                         if (fill_queue.empty()) {
141                                             current = mode::Empty;
142                                             return;
143                                         }
144                                         swap(fill_queue, drain_queue);
145                                     }
146                                 }
147                                 auto notification = std::move(drain_queue.front());
148                                 drain_queue.pop_front();
149                                 notification->accept(destination);
150                                 std::unique_lock<std::mutex> guard(lock);
151                                 self();
152                                 if (lifetime.is_subscribed()) break;
153                             }
154                         }
155                         RXCPP_CATCH(...) {
156                             destination.on_error(rxu::current_exception());
157                             std::unique_lock<std::mutex> guard(lock);
158                             finish(guard, mode::Errored);
159                         }
160                     };
161 
162                     auto selectedDrain = on_exception(
163                         [&](){return coordinator.act(drain);},
164                         destination);
165                     if (selectedDrain.empty()) {
166                         finish(guard, mode::Errored);
167                         return;
168                     }
169 
170                     auto processor = coordinator.get_worker();
171 
172                     RXCPP_UNWIND_AUTO([&](){guard.lock();});
173                     guard.unlock();
174 
175                     processor.schedule(selectedDrain.get());
176                 }
177             }
178         };
179         std::shared_ptr<observe_on_state> state;
180 
observe_on_observerrxcpp::operators::detail::observe_on::observe_on_observer181         observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs)
182             : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs)))
183         {
184         }
185 
on_nextrxcpp::operators::detail::observe_on::observe_on_observer186         void on_next(source_value_type v) const {
187             std::unique_lock<std::mutex> guard(state->lock);
188             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
189             state->fill_queue.push_back(notification_type::on_next(std::move(v)));
190             state->ensure_processing(guard);
191         }
on_errorrxcpp::operators::detail::observe_on::observe_on_observer192         void on_error(rxu::error_ptr e) const {
193             std::unique_lock<std::mutex> guard(state->lock);
194             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
195             state->fill_queue.push_back(notification_type::on_error(e));
196             state->ensure_processing(guard);
197         }
on_completedrxcpp::operators::detail::observe_on::observe_on_observer198         void on_completed() const {
199             std::unique_lock<std::mutex> guard(state->lock);
200             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
201             state->fill_queue.push_back(notification_type::on_completed());
202             state->ensure_processing(guard);
203         }
204 
makerxcpp::operators::detail::observe_on::observe_on_observer205         static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) {
206             auto coor = cn.create_coordinator(d.get_subscription());
207             d.add(cs);
208 
209             this_type o(d, std::move(coor), cs);
210             auto keepAlive = o.state;
211             cs.add([=](){
212                 std::unique_lock<std::mutex> guard(keepAlive->lock);
213                 keepAlive->ensure_processing(guard);
214             });
215 
216             return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o)));
217         }
218     };
219 
220     template<class Subscriber>
operator ()rxcpp::operators::detail::observe_on221     auto operator()(Subscriber dest) const
222         -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) {
223         return      observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination);
224     }
225 };
226 
227 }
228 
229 /*! @copydoc rx-observe_on.hpp
230 */
231 template<class... AN>
observe_on(AN &&...an)232 auto observe_on(AN&&... an)
233     ->      operator_factory<observe_on_tag, AN...> {
234      return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
235 }
236 
237 }
238 
239 template<>
240 struct member_overload<observe_on_tag>
241 {
242     template<class Observable, class Coordination,
243         class Enabled = rxu::enable_if_all_true_type_t<
244             is_observable<Observable>,
245             is_coordination<Coordination>>,
246         class SourceValue = rxu::value_type_t<Observable>,
247         class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload248     static auto member(Observable&& o, Coordination&& cn)
249         -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
250         return      o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
251     }
252 
253     template<class... AN>
memberrxcpp::member_overload254     static operators::detail::observe_on_invalid_t<AN...> member(AN...) {
255         std::terminate();
256         return {};
257         static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)");
258     }
259 };
260 
261 class observe_on_one_worker : public coordination_base
262 {
263     rxsc::scheduler factory;
264 
265     class input_type
266     {
267         rxsc::worker controller;
268         rxsc::scheduler factory;
269         identity_one_worker coordination;
270     public:
input_type(rxsc::worker w)271         explicit input_type(rxsc::worker w)
272             : controller(w)
273             , factory(rxsc::make_same_worker(w))
274             , coordination(factory)
275         {
276         }
get_worker() const277         inline rxsc::worker get_worker() const {
278             return controller;
279         }
get_scheduler() const280         inline rxsc::scheduler get_scheduler() const {
281             return factory;
282         }
now() const283         inline rxsc::scheduler::clock_type::time_point now() const {
284             return factory.now();
285         }
286         template<class Observable>
in(Observable o) const287         auto in(Observable o) const
288             -> decltype(o.observe_on(coordination)) {
289             return      o.observe_on(coordination);
290         }
291         template<class Subscriber>
out(Subscriber s) const292         auto out(Subscriber s) const
293             -> Subscriber {
294             return s;
295         }
296         template<class F>
act(F f) const297         auto act(F f) const
298             -> F {
299             return f;
300         }
301     };
302 
303 public:
304 
observe_on_one_worker(rxsc::scheduler sc)305     explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {}
306 
307     typedef coordinator<input_type> coordinator_type;
308 
now() const309     inline rxsc::scheduler::clock_type::time_point now() const {
310         return factory.now();
311     }
312 
create_coordinator(composite_subscription cs=composite_subscription ()) const313     inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
314         auto w = factory.create_worker(std::move(cs));
315         return coordinator_type(input_type(std::move(w)));
316     }
317 };
318 
observe_on_run_loop(const rxsc::run_loop & rl)319 inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
320     return observe_on_one_worker(rxsc::make_run_loop(rl));
321 }
322 
observe_on_event_loop()323 inline observe_on_one_worker observe_on_event_loop() {
324     static observe_on_one_worker r(rxsc::make_event_loop());
325     return r;
326 }
327 
observe_on_new_thread()328 inline observe_on_one_worker observe_on_new_thread() {
329     static observe_on_one_worker r(rxsc::make_new_thread());
330     return r;
331 }
332 
333 }
334 
335 #endif
336