1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_COORDINATION_HPP)
6 #define RXCPP_RX_COORDINATION_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 struct tag_coordinator {};
13 struct coordinator_base {typedef tag_coordinator coordinator_tag;};
14 
15 template<class T, class C = rxu::types_checked>
16 struct is_coordinator : public std::false_type {};
17 
18 template<class T>
19 struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
20     : public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};
21 
22 struct tag_coordination {};
23 struct coordination_base {typedef tag_coordination coordination_tag;};
24 
25 namespace detail {
26 
27 template<class T, class C = rxu::types_checked>
28 struct is_coordination : public std::false_type {};
29 
30 template<class T>
31 struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
32     : public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};
33 
34 }
35 
36 template<class T, class Decayed = rxu::decay_t<T>>
37 struct is_coordination : detail::is_coordination<Decayed>
38 {
39 };
40 
41 template<class Coordination, class DecayedCoordination = rxu::decay_t<Coordination>>
42 using coordination_tag_t = typename DecayedCoordination::coordination_tag;
43 
44 template<class Input>
45 class coordinator : public coordinator_base
46 {
47 public:
48     typedef Input input_type;
49 
50 private:
51     struct not_supported {typedef not_supported type;};
52 
53     template<class Observable>
54     struct get_observable
55     {
56         typedef decltype((*(input_type*)nullptr).in((*(Observable*)nullptr))) type;
57     };
58 
59     template<class Subscriber>
60     struct get_subscriber
61     {
62         typedef decltype((*(input_type*)nullptr).out((*(Subscriber*)nullptr))) type;
63     };
64 
65     template<class F>
66     struct get_action_function
67     {
68         typedef decltype((*(input_type*)nullptr).act((*(F*)nullptr))) type;
69     };
70 
71 public:
72     input_type input;
73 
74     template<class T>
75     struct get
76     {
77         typedef typename std::conditional<
78             rxsc::detail::is_action_function<T>::value, get_action_function<T>, typename std::conditional<
79             is_observable<T>::value, get_observable<T>, typename std::conditional<
80             is_subscriber<T>::value, get_subscriber<T>, not_supported>::type>::type>::type::type type;
81     };
82 
coordinator(Input i)83     coordinator(Input i) : input(i) {}
84 
get_worker() const85     rxsc::worker get_worker() const {
86         return input.get_worker();
87     }
get_scheduler() const88     rxsc::scheduler get_scheduler() const {
89         return input.get_scheduler();
90     }
91 
92     template<class Observable>
in(Observable o) const93     auto in(Observable o) const
94         -> typename get_observable<Observable>::type {
95         return input.in(std::move(o));
96         static_assert(is_observable<Observable>::value, "can only synchronize observables");
97     }
98 
99     template<class Subscriber>
out(Subscriber s) const100     auto out(Subscriber s) const
101         -> typename get_subscriber<Subscriber>::type {
102         return input.out(std::move(s));
103         static_assert(is_subscriber<Subscriber>::value, "can only synchronize subscribers");
104     }
105 
106     template<class F>
act(F f) const107     auto act(F f) const
108         -> typename get_action_function<F>::type {
109         return input.act(std::move(f));
110         static_assert(rxsc::detail::is_action_function<F>::value, "can only synchronize action functions");
111     }
112 };
113 
114 class identity_one_worker : public coordination_base
115 {
116     rxsc::scheduler factory;
117 
118     class input_type
119     {
120         rxsc::worker controller;
121         rxsc::scheduler factory;
122     public:
input_type(rxsc::worker w)123         explicit input_type(rxsc::worker w)
124             : controller(w)
125             , factory(rxsc::make_same_worker(w))
126         {
127         }
get_worker() const128         inline rxsc::worker get_worker() const {
129             return controller;
130         }
get_scheduler() const131         inline rxsc::scheduler get_scheduler() const {
132             return factory;
133         }
now() const134         inline rxsc::scheduler::clock_type::time_point now() const {
135             return factory.now();
136         }
137         template<class Observable>
in(Observable o) const138         auto in(Observable o) const
139             -> Observable {
140             return o;
141         }
142         template<class Subscriber>
out(Subscriber s) const143         auto out(Subscriber s) const
144             -> Subscriber {
145             return s;
146         }
147         template<class F>
act(F f) const148         auto act(F f) const
149             -> F {
150             return f;
151         }
152     };
153 
154 public:
155 
identity_one_worker(rxsc::scheduler sc)156     explicit identity_one_worker(rxsc::scheduler sc) : factory(sc) {}
157 
158     typedef coordinator<input_type> coordinator_type;
159 
now() const160     inline rxsc::scheduler::clock_type::time_point now() const {
161         return factory.now();
162     }
163 
create_coordinator(composite_subscription cs=composite_subscription ()) const164     inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
165         auto w = factory.create_worker(std::move(cs));
166         return coordinator_type(input_type(std::move(w)));
167     }
168 };
169 
identity_immediate()170 inline identity_one_worker identity_immediate() {
171     static identity_one_worker r(rxsc::make_immediate());
172     return r;
173 }
174 
identity_current_thread()175 inline identity_one_worker identity_current_thread() {
176     static identity_one_worker r(rxsc::make_current_thread());
177     return r;
178 }
179 
identity_same_worker(rxsc::worker w)180 inline identity_one_worker identity_same_worker(rxsc::worker w) {
181     return identity_one_worker(rxsc::make_same_worker(w));
182 }
183 
184 class serialize_one_worker : public coordination_base
185 {
186     rxsc::scheduler factory;
187 
188     template<class F>
189     struct serialize_action
190     {
191         F dest;
192         std::shared_ptr<std::mutex> lock;
serialize_actionrxcpp::serialize_one_worker::serialize_action193         serialize_action(F d, std::shared_ptr<std::mutex> m)
194             : dest(std::move(d))
195             , lock(std::move(m))
196         {
197             if (!lock) {
198                 std::terminate();
199             }
200         }
operator ()rxcpp::serialize_one_worker::serialize_action201         auto operator()(const rxsc::schedulable& scbl) const
202             -> decltype(dest(scbl)) {
203             std::unique_lock<std::mutex> guard(*lock);
204             return dest(scbl);
205         }
206     };
207 
208     template<class Observer>
209     struct serialize_observer
210     {
211         typedef serialize_observer<Observer> this_type;
212         typedef rxu::decay_t<Observer> dest_type;
213         typedef typename dest_type::value_type value_type;
214         typedef observer<value_type, this_type> observer_type;
215         dest_type dest;
216         std::shared_ptr<std::mutex> lock;
217 
serialize_observerrxcpp::serialize_one_worker::serialize_observer218         serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
219             : dest(std::move(d))
220             , lock(std::move(m))
221         {
222             if (!lock) {
223                 std::terminate();
224             }
225         }
on_nextrxcpp::serialize_one_worker::serialize_observer226         void on_next(value_type v) const {
227             std::unique_lock<std::mutex> guard(*lock);
228             dest.on_next(v);
229         }
on_errorrxcpp::serialize_one_worker::serialize_observer230         void on_error(rxu::error_ptr e) const {
231             std::unique_lock<std::mutex> guard(*lock);
232             dest.on_error(e);
233         }
on_completedrxcpp::serialize_one_worker::serialize_observer234         void on_completed() const {
235             std::unique_lock<std::mutex> guard(*lock);
236             dest.on_completed();
237         }
238 
239         template<class Subscriber>
makerxcpp::serialize_one_worker::serialize_observer240         static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) {
241             return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
242         }
243     };
244 
245     class input_type
246     {
247         rxsc::worker controller;
248         rxsc::scheduler factory;
249         std::shared_ptr<std::mutex> lock;
250     public:
input_type(rxsc::worker w,std::shared_ptr<std::mutex> m)251         explicit input_type(rxsc::worker w, std::shared_ptr<std::mutex> m)
252             : controller(w)
253             , factory(rxsc::make_same_worker(w))
254             , lock(std::move(m))
255         {
256         }
get_worker() const257         inline rxsc::worker get_worker() const {
258             return controller;
259         }
get_scheduler() const260         inline rxsc::scheduler get_scheduler() const {
261             return factory;
262         }
now() const263         inline rxsc::scheduler::clock_type::time_point now() const {
264             return factory.now();
265         }
266         template<class Observable>
in(Observable o) const267         auto in(Observable o) const
268             -> Observable {
269             return o;
270         }
271         template<class Subscriber>
out(const Subscriber & s) const272         auto out(const Subscriber& s) const
273             -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
274             return      serialize_observer<decltype(s.get_observer())>::make(s, lock);
275         }
276         template<class F>
act(F f) const277         auto act(F f) const
278             ->      serialize_action<F> {
279             return  serialize_action<F>(std::move(f), lock);
280         }
281     };
282 
283 public:
284 
serialize_one_worker(rxsc::scheduler sc)285     explicit serialize_one_worker(rxsc::scheduler sc) : factory(sc) {}
286 
287     typedef coordinator<input_type> coordinator_type;
288 
now() const289     inline rxsc::scheduler::clock_type::time_point now() const {
290         return factory.now();
291     }
292 
create_coordinator(composite_subscription cs=composite_subscription ()) const293     inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
294         auto w = factory.create_worker(std::move(cs));
295         std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
296         return coordinator_type(input_type(std::move(w), std::move(lock)));
297     }
298 };
299 
serialize_event_loop()300 inline serialize_one_worker serialize_event_loop() {
301     static serialize_one_worker r(rxsc::make_event_loop());
302     return r;
303 }
304 
serialize_new_thread()305 inline serialize_one_worker serialize_new_thread() {
306     static serialize_one_worker r(rxsc::make_new_thread());
307     return r;
308 }
309 
serialize_same_worker(rxsc::worker w)310 inline serialize_one_worker serialize_same_worker(rxsc::worker w) {
311     return serialize_one_worker(rxsc::make_same_worker(w));
312 }
313 
314 }
315 
316 #endif
317