1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_CURRENT_THREAD_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 struct action_queue
17 {
18     typedef action_queue this_type;
19 
20     typedef scheduler_base::clock_type clock;
21     typedef time_schedulable<clock::time_point> item_type;
22 
23 private:
24     typedef schedulable_queue<item_type::time_point_type> queue_item_time;
25 
26 public:
27     struct current_thread_queue_type {
28         std::shared_ptr<worker_interface> w;
29         recursion r;
30         queue_item_time q;
31     };
32 
33 private:
34 #if defined(RXCPP_THREAD_LOCAL)
current_thread_queuerxcpp::schedulers::detail::action_queue35      static current_thread_queue_type*& current_thread_queue() {
36          static RXCPP_THREAD_LOCAL current_thread_queue_type* q;
37          return q;
38      }
39 #else
40     static rxu::thread_local_storage<current_thread_queue_type>& current_thread_queue() {
41         static rxu::thread_local_storage<current_thread_queue_type> q;
42         return q;
43     }
44 #endif
45 
46 public:
47 
ownedrxcpp::schedulers::detail::action_queue48     static bool owned() {
49         return !!current_thread_queue();
50     }
get_worker_interfacerxcpp::schedulers::detail::action_queue51     static const std::shared_ptr<worker_interface>& get_worker_interface() {
52         return current_thread_queue()->w;
53     }
get_recursionrxcpp::schedulers::detail::action_queue54     static recursion& get_recursion() {
55         return current_thread_queue()->r;
56     }
emptyrxcpp::schedulers::detail::action_queue57     static bool empty() {
58         if (!current_thread_queue()) {
59             std::terminate();
60         }
61         return current_thread_queue()->q.empty();
62     }
toprxcpp::schedulers::detail::action_queue63     static queue_item_time::const_reference top() {
64         if (!current_thread_queue()) {
65             std::terminate();
66         }
67         return current_thread_queue()->q.top();
68     }
poprxcpp::schedulers::detail::action_queue69     static void pop() {
70         auto& state = current_thread_queue();
71         if (!state) {
72             std::terminate();
73         }
74         state->q.pop();
75         if (state->q.empty()) {
76             // allow recursion
77             state->r.reset(true);
78         }
79     }
pushrxcpp::schedulers::detail::action_queue80     static void push(item_type item) {
81         auto& state = current_thread_queue();
82         if (!state) {
83             std::terminate();
84         }
85         if (!item.what.is_subscribed()) {
86             return;
87         }
88         state->q.push(std::move(item));
89         // disallow recursion
90         state->r.reset(false);
91     }
ensurerxcpp::schedulers::detail::action_queue92     static std::shared_ptr<worker_interface> ensure(std::shared_ptr<worker_interface> w) {
93         if (!!current_thread_queue()) {
94             std::terminate();
95         }
96         // create and publish new queue
97         current_thread_queue() = new current_thread_queue_type();
98         current_thread_queue()->w = w;
99         return w;
100     }
createrxcpp::schedulers::detail::action_queue101     static std::unique_ptr<current_thread_queue_type> create(std::shared_ptr<worker_interface> w) {
102         std::unique_ptr<current_thread_queue_type> result(new current_thread_queue_type());
103         result->w = std::move(w);
104         return result;
105     }
setrxcpp::schedulers::detail::action_queue106     static void set(current_thread_queue_type* q) {
107         if (!!current_thread_queue()) {
108             std::terminate();
109         }
110         // publish new queue
111         current_thread_queue() = q;
112     }
destroyrxcpp::schedulers::detail::action_queue113     static void destroy(current_thread_queue_type* q) {
114         delete q;
115     }
destroyrxcpp::schedulers::detail::action_queue116     static void destroy() {
117         if (!current_thread_queue()) {
118             std::terminate();
119         }
120 #if defined(RXCPP_THREAD_LOCAL)
121          destroy(current_thread_queue());
122 #else
123         destroy(current_thread_queue().get());
124 #endif
125         current_thread_queue() = nullptr;
126     }
127 };
128 
129 }
130 
131 struct current_thread : public scheduler_interface
132 {
133 private:
134     typedef current_thread this_type;
135     current_thread(const this_type&);
136 
137     typedef detail::action_queue queue_type;
138 
139     struct derecurser : public worker_interface
140     {
141     private:
142         typedef current_thread this_type;
143         derecurser(const this_type&);
144     public:
derecurserrxcpp::schedulers::current_thread::derecurser145         derecurser()
146         {
147         }
~derecurserrxcpp::schedulers::current_thread::derecurser148         virtual ~derecurser()
149         {
150         }
151 
nowrxcpp::schedulers::current_thread::derecurser152         virtual clock_type::time_point now() const {
153             return clock_type::now();
154         }
155 
schedulerxcpp::schedulers::current_thread::derecurser156         virtual void schedule(const schedulable& scbl) const {
157             queue_type::push(queue_type::item_type(now(), scbl));
158         }
159 
schedulerxcpp::schedulers::current_thread::derecurser160         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
161             queue_type::push(queue_type::item_type(when, scbl));
162         }
163     };
164 
165     struct current_worker : public worker_interface
166     {
167     private:
168         typedef current_thread this_type;
169         current_worker(const this_type&);
170     public:
current_workerrxcpp::schedulers::current_thread::current_worker171         current_worker()
172         {
173         }
~current_workerrxcpp::schedulers::current_thread::current_worker174         virtual ~current_worker()
175         {
176         }
177 
nowrxcpp::schedulers::current_thread::current_worker178         virtual clock_type::time_point now() const {
179             return clock_type::now();
180         }
181 
schedulerxcpp::schedulers::current_thread::current_worker182         virtual void schedule(const schedulable& scbl) const {
183             schedule(now(), scbl);
184         }
185 
schedulerxcpp::schedulers::current_thread::current_worker186         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
187             if (!scbl.is_subscribed()) {
188                 return;
189             }
190 
191             {
192                 // check ownership
193                 if (queue_type::owned()) {
194                     // already has an owner - delegate
195                     queue_type::get_worker_interface()->schedule(when, scbl);
196                     return;
197                 }
198 
199                 // take ownership
200                 queue_type::ensure(std::make_shared<derecurser>());
201             }
202             // release ownership
203             RXCPP_UNWIND_AUTO([]{
204                 queue_type::destroy();
205             });
206 
207             const auto& recursor = queue_type::get_recursion().get_recurse();
208             std::this_thread::sleep_until(when);
209             if (scbl.is_subscribed()) {
210                 scbl(recursor);
211             }
212             if (queue_type::empty()) {
213                 return;
214             }
215 
216             // loop until queue is empty
217             for (
218                 auto next = queue_type::top().when;
219                 (std::this_thread::sleep_until(next), true);
220                 next = queue_type::top().when
221             ) {
222                 auto what = queue_type::top().what;
223 
224                 queue_type::pop();
225 
226                 if (what.is_subscribed()) {
227                     what(recursor);
228                 }
229 
230                 if (queue_type::empty()) {
231                     break;
232                 }
233             }
234         }
235     };
236 
237     std::shared_ptr<current_worker> wi;
238 
239 public:
current_threadrxcpp::schedulers::current_thread240     current_thread()
241         : wi(std::make_shared<current_worker>())
242     {
243     }
~current_threadrxcpp::schedulers::current_thread244     virtual ~current_thread()
245     {
246     }
247 
is_schedule_requiredrxcpp::schedulers::current_thread248     static bool is_schedule_required() { return !queue_type::owned(); }
249 
is_tail_recursion_allowedrxcpp::schedulers::current_thread250     inline bool is_tail_recursion_allowed() const {
251         return queue_type::empty();
252     }
253 
nowrxcpp::schedulers::current_thread254     virtual clock_type::time_point now() const {
255         return clock_type::now();
256     }
257 
create_workerrxcpp::schedulers::current_thread258     virtual worker create_worker(composite_subscription cs) const {
259         return worker(std::move(cs), wi);
260     }
261 };
262 
make_current_thread()263 inline const scheduler& make_current_thread() {
264     static scheduler instance = make_scheduler<current_thread>();
265     return instance;
266 }
267 
268 }
269 
270 }
271 
272 #endif
273