1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SYNCHRONIZE_HPP)
6 #define RXCPP_RX_SYNCHRONIZE_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace subjects {
13 
14 namespace detail {
15 
16 template<class T, class Coordination>
17 class synchronize_observer : public detail::multicast_observer<T>
18 {
19     typedef synchronize_observer<T, Coordination> this_type;
20     typedef detail::multicast_observer<T> base_type;
21 
22     typedef rxu::decay_t<Coordination> coordination_type;
23     typedef typename coordination_type::coordinator_type coordinator_type;
24     typedef typename coordinator_type::template get<subscriber<T>>::type output_type;
25 
26     struct synchronize_observer_state : public std::enable_shared_from_this<synchronize_observer_state>
27     {
28         typedef rxn::notification<T> notification_type;
29         typedef typename notification_type::type base_notification_type;
30         typedef std::deque<base_notification_type> queue_type;
31 
32         struct mode
33         {
34             enum type {
35                 Invalid = 0,
36                 Processing,
37                 Empty,
38                 Disposed
39             };
40         };
41 
42         mutable std::mutex lock;
43         mutable std::condition_variable wake;
44         mutable queue_type fill_queue;
45         composite_subscription lifetime;
46         mutable typename mode::type current;
47         coordinator_type coordinator;
48         output_type destination;
49 
ensure_processingrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state50         void ensure_processing(std::unique_lock<std::mutex>& guard) const {
51             if (!guard.owns_lock()) {
52                 std::terminate();
53             }
54             if (current == mode::Empty) {
55                 current = mode::Processing;
56                 auto keepAlive = this->shared_from_this();
57 
58                 auto drain_queue = [keepAlive, this](const rxsc::schedulable& self){
59                     RXCPP_TRY {
60                         std::unique_lock<std::mutex> guard(lock);
61                         if (!destination.is_subscribed()) {
62                             current = mode::Disposed;
63                             fill_queue.clear();
64                             guard.unlock();
65                             lifetime.unsubscribe();
66                             return;
67                         }
68                         if (fill_queue.empty()) {
69                             current = mode::Empty;
70                             return;
71                         }
72                         auto notification = std::move(fill_queue.front());
73                         fill_queue.pop_front();
74                         guard.unlock();
75                         notification->accept(destination);
76                         self();
77                     } RXCPP_CATCH(...) {
78                         destination.on_error(rxu::current_exception());
79                         std::unique_lock<std::mutex> guard(lock);
80                         current = mode::Empty;
81                     }
82                 };
83 
84                 auto selectedDrain = on_exception(
85                     [&](){return coordinator.act(drain_queue);},
86                     destination);
87                 if (selectedDrain.empty()) {
88                     return;
89                 }
90 
91                 auto processor = coordinator.get_worker();
92                 processor.schedule(lifetime, selectedDrain.get());
93             }
94         }
95 
synchronize_observer_staterxcpp::subjects::detail::synchronize_observer::synchronize_observer_state96         synchronize_observer_state(coordinator_type coor, composite_subscription cs, output_type scbr)
97             : lifetime(std::move(cs))
98             , current(mode::Empty)
99             , coordinator(std::move(coor))
100             , destination(std::move(scbr))
101         {
102         }
103 
104         template<class V>
on_nextrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state105         void on_next(V v) const {
106             if (lifetime.is_subscribed()) {
107                 std::unique_lock<std::mutex> guard(lock);
108                 fill_queue.push_back(notification_type::on_next(std::move(v)));
109                 ensure_processing(guard);
110             }
111             wake.notify_one();
112         }
on_errorrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state113         void on_error(rxu::error_ptr e) const {
114             if (lifetime.is_subscribed()) {
115                 std::unique_lock<std::mutex> guard(lock);
116                 fill_queue.push_back(notification_type::on_error(e));
117                 ensure_processing(guard);
118             }
119             wake.notify_one();
120         }
on_completedrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state121         void on_completed() const {
122             if (lifetime.is_subscribed()) {
123                 std::unique_lock<std::mutex> guard(lock);
124                 fill_queue.push_back(notification_type::on_completed());
125                 ensure_processing(guard);
126             }
127             wake.notify_one();
128         }
129     };
130 
131     std::shared_ptr<synchronize_observer_state> state;
132 
133 public:
synchronize_observer(coordination_type cn,composite_subscription dl,composite_subscription il)134     synchronize_observer(coordination_type cn, composite_subscription dl, composite_subscription il)
135         : base_type(dl)
136     {
137         auto o = make_subscriber<T>(dl, make_observer_dynamic<T>( *static_cast<base_type*>(this) ));
138 
139         // creates a worker whose lifetime is the same as the destination subscription
140         auto coordinator = cn.create_coordinator(dl);
141 
142         state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
143     }
144 
get_subscriber() const145     subscriber<T> get_subscriber() const {
146         return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).as_dynamic();
147     }
148 
149     template<class V>
on_next(V v) const150     void on_next(V v) const {
151         state->on_next(std::move(v));
152     }
on_error(rxu::error_ptr e) const153     void on_error(rxu::error_ptr e) const {
154         state->on_error(e);
155     }
on_completed() const156     void on_completed() const {
157         state->on_completed();
158     }
159 };
160 
161 }
162 
163 template<class T, class Coordination>
164 class synchronize
165 {
166     detail::synchronize_observer<T, Coordination> s;
167 
168 public:
synchronize(Coordination cn,composite_subscription cs=composite_subscription ())169     explicit synchronize(Coordination cn, composite_subscription cs = composite_subscription())
170         : s(std::move(cn), std::move(cs), composite_subscription())
171     {
172     }
173 
has_observers() const174     bool has_observers() const {
175         return s.has_observers();
176     }
177 
get_subscriber() const178     subscriber<T> get_subscriber() const {
179         return s.get_subscriber();
180     }
181 
get_observable() const182     observable<T> get_observable() const {
183         auto keepAlive = s;
184         return make_observable_dynamic<T>([=](subscriber<T> o){
185             keepAlive.add(keepAlive.get_subscriber(), std::move(o));
186         });
187     }
188 };
189 
190 }
191 
192 class synchronize_in_one_worker : public coordination_base
193 {
194     rxsc::scheduler factory;
195 
196     class input_type
197     {
198         rxsc::worker controller;
199         rxsc::scheduler factory;
200         identity_one_worker coordination;
201     public:
input_type(rxsc::worker w)202         explicit input_type(rxsc::worker w)
203             : controller(w)
204             , factory(rxsc::make_same_worker(w))
205             , coordination(factory)
206         {
207         }
get_worker() const208         inline rxsc::worker get_worker() const {
209             return controller;
210         }
get_scheduler() const211         inline rxsc::scheduler get_scheduler() const {
212             return factory;
213         }
now() const214         inline rxsc::scheduler::clock_type::time_point now() const {
215             return factory.now();
216         }
217         template<class Observable>
in(Observable o) const218         auto in(Observable o) const
219             -> decltype(o.publish_synchronized(coordination).ref_count()) {
220             return      o.publish_synchronized(coordination).ref_count();
221         }
222         template<class Subscriber>
out(Subscriber s) const223         auto out(Subscriber s) const
224             -> Subscriber {
225             return s;
226         }
227         template<class F>
act(F f) const228         auto act(F f) const
229             -> F {
230             return f;
231         }
232     };
233 
234 public:
235 
synchronize_in_one_worker(rxsc::scheduler sc)236     explicit synchronize_in_one_worker(rxsc::scheduler sc) : factory(sc) {}
237 
238     typedef coordinator<input_type> coordinator_type;
239 
now() const240     inline rxsc::scheduler::clock_type::time_point now() const {
241         return factory.now();
242     }
243 
create_coordinator(composite_subscription cs=composite_subscription ()) const244     inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
245         auto w = factory.create_worker(std::move(cs));
246         return coordinator_type(input_type(std::move(w)));
247     }
248 };
249 
synchronize_event_loop()250 inline synchronize_in_one_worker synchronize_event_loop() {
251     static synchronize_in_one_worker r(rxsc::make_event_loop());
252     return r;
253 }
254 
synchronize_new_thread()255 inline synchronize_in_one_worker synchronize_new_thread() {
256     static synchronize_in_one_worker r(rxsc::make_new_thread());
257     return r;
258 }
259 
260 }
261 
262 #endif
263