1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_RUN_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_RUN_LOOP_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 struct run_loop_state : public std::enable_shared_from_this<run_loop_state>
17 {
18     typedef scheduler::clock_type clock_type;
19 
20     typedef detail::schedulable_queue<
21         clock_type::time_point> queue_item_time;
22 
23     typedef queue_item_time::item_type item_type;
24     typedef queue_item_time::const_reference const_reference_item_type;
25 
~run_loop_staterxcpp::schedulers::detail::run_loop_state26     virtual ~run_loop_state()
27     {
28     }
29 
run_loop_staterxcpp::schedulers::detail::run_loop_state30     run_loop_state()
31     {
32     }
33 
34     composite_subscription lifetime;
35     mutable std::mutex lock;
36     mutable queue_item_time q;
37     recursion r;
38     std::function<void(clock_type::time_point)> notify_earlier_wakeup;
39 };
40 
41 }
42 
43 
44 struct run_loop_scheduler : public scheduler_interface
45 {
46 private:
47     typedef run_loop_scheduler this_type;
48     run_loop_scheduler(const this_type&);
49 
50     struct run_loop_worker : public worker_interface
51     {
52     private:
53         typedef run_loop_worker this_type;
54 
55         run_loop_worker(const this_type&);
56 
57     public:
58         std::weak_ptr<detail::run_loop_state> state;
59 
~run_loop_workerrxcpp::schedulers::run_loop_scheduler::run_loop_worker60         virtual ~run_loop_worker()
61         {
62         }
63 
run_loop_workerrxcpp::schedulers::run_loop_scheduler::run_loop_worker64         explicit run_loop_worker(std::weak_ptr<detail::run_loop_state> ws)
65             : state(ws)
66         {
67         }
68 
nowrxcpp::schedulers::run_loop_scheduler::run_loop_worker69         virtual clock_type::time_point now() const {
70             return clock_type::now();
71         }
72 
schedulerxcpp::schedulers::run_loop_scheduler::run_loop_worker73         virtual void schedule(const schedulable& scbl) const {
74             schedule(now(), scbl);
75         }
76 
schedulerxcpp::schedulers::run_loop_scheduler::run_loop_worker77         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
78             if (scbl.is_subscribed()) {
79                 auto st = state.lock();
80                 std::unique_lock<std::mutex> guard(st->lock);
81                 const bool need_earlier_wakeup_notification = st->notify_earlier_wakeup &&
82                                                               (st->q.empty() || when < st->q.top().when);
83                 st->q.push(detail::run_loop_state::item_type(when, scbl));
84                 st->r.reset(false);
85                 if (need_earlier_wakeup_notification) st->notify_earlier_wakeup(when);
86                 guard.unlock(); // So we can't get attempt to recursively lock the state
87             }
88         }
89     };
90 
91     std::weak_ptr<detail::run_loop_state> state;
92 
93 public:
run_loop_schedulerrxcpp::schedulers::run_loop_scheduler94     explicit run_loop_scheduler(std::weak_ptr<detail::run_loop_state> ws)
95         : state(ws)
96     {
97     }
~run_loop_schedulerrxcpp::schedulers::run_loop_scheduler98     virtual ~run_loop_scheduler()
99     {
100     }
101 
nowrxcpp::schedulers::run_loop_scheduler102     virtual clock_type::time_point now() const {
103         return clock_type::now();
104     }
105 
create_workerrxcpp::schedulers::run_loop_scheduler106     virtual worker create_worker(composite_subscription cs) const {
107         auto lifetime = state.lock()->lifetime;
108         auto token = lifetime.add(cs);
109         cs.add([=](){lifetime.remove(token);});
110         return worker(cs, create_worker_interface());
111     }
112 
create_worker_interfacerxcpp::schedulers::run_loop_scheduler113     std::shared_ptr<worker_interface> create_worker_interface() const {
114         return std::make_shared<run_loop_worker>(state);
115     }
116 };
117 
118 class run_loop
119 {
120 private:
121     typedef run_loop this_type;
122     // don't allow this instance to copy/move since it owns current_thread queue
123     // for the thread it is constructed on.
124     run_loop(const this_type&);
125     run_loop(this_type&&);
126 
127     typedef detail::action_queue queue_type;
128 
129     typedef detail::run_loop_state::item_type item_type;
130     typedef detail::run_loop_state::const_reference_item_type const_reference_item_type;
131 
132     std::shared_ptr<detail::run_loop_state> state;
133     std::shared_ptr<run_loop_scheduler> sc;
134 
135 public:
136     typedef scheduler::clock_type clock_type;
run_loop()137     run_loop()
138         : state(std::make_shared<detail::run_loop_state>())
139         , sc(std::make_shared<run_loop_scheduler>(state))
140     {
141         // take ownership so that the current_thread scheduler
142         // uses the same queue on this thread
143         queue_type::ensure(sc->create_worker_interface());
144     }
~run_loop()145     ~run_loop()
146     {
147         state->lifetime.unsubscribe();
148 
149         std::unique_lock<std::mutex> guard(state->lock);
150 
151         // release ownership
152         queue_type::destroy();
153 
154         auto expired = std::move(state->q);
155         if (!state->q.empty()) std::terminate();
156     }
157 
now() const158     clock_type::time_point now() const {
159         return clock_type::now();
160     }
161 
get_subscription() const162     composite_subscription get_subscription() const {
163         return state->lifetime;
164     }
165 
empty() const166     bool empty() const {
167         return state->q.empty();
168     }
169 
peek() const170     const_reference_item_type peek() const {
171         return state->q.top();
172     }
173 
dispatch() const174     void dispatch() const {
175         std::unique_lock<std::mutex> guard(state->lock);
176         if (state->q.empty()) {
177             return;
178         }
179         auto& peek = state->q.top();
180         if (!peek.what.is_subscribed()) {
181             state->q.pop();
182             return;
183         }
184         if (clock_type::now() < peek.when) {
185             return;
186         }
187         auto what = peek.what;
188         state->q.pop();
189         state->r.reset(state->q.empty());
190         guard.unlock();
191         what(state->r.get_recurse());
192     }
193 
get_scheduler() const194     scheduler get_scheduler() const {
195         return make_scheduler(sc);
196     }
197 
set_notify_earlier_wakeup(std::function<void (clock_type::time_point)> const & f)198     void set_notify_earlier_wakeup(std::function<void(clock_type::time_point)> const& f) {
199         std::unique_lock<std::mutex> guard(state->lock);
200         state->notify_earlier_wakeup = f;
201     }
202 };
203 
make_run_loop(const run_loop & r)204 inline scheduler make_run_loop(const run_loop& r) {
205     return r.get_scheduler();
206 }
207 
208 }
209 
210 }
211 
212 #endif
213