1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP)
6 #define RXCPP_RX_SCHEDULER_VIRTUAL_TIME_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 namespace detail {
15 
16 template<class Absolute, class Relative>
17 struct virtual_time_base : std::enable_shared_from_this<virtual_time_base<Absolute, Relative>>
18 {
19 private:
20     typedef virtual_time_base<Absolute, Relative> this_type;
21     virtual_time_base(const virtual_time_base&);
22 
23     mutable bool isenabled;
24 
25 public:
26     typedef Absolute absolute;
27     typedef Relative relative;
28 
~virtual_time_baserxcpp::schedulers::detail::virtual_time_base29     virtual ~virtual_time_base()
30     {
31     }
32 
33 protected:
virtual_time_baserxcpp::schedulers::detail::virtual_time_base34     virtual_time_base()
35         : isenabled(false)
36         , clock_now(0)
37     {
38     }
virtual_time_baserxcpp::schedulers::detail::virtual_time_base39     explicit virtual_time_base(absolute initialClock)
40         : isenabled(false)
41         , clock_now(initialClock)
42     {
43     }
44 
45     mutable absolute clock_now;
46 
47     typedef time_schedulable<long> item_type;
48 
49     virtual absolute add(absolute, relative) const =0;
50 
51     virtual typename scheduler_base::clock_type::time_point to_time_point(absolute) const =0;
52     virtual relative to_relative(typename scheduler_base::clock_type::duration) const =0;
53 
54     virtual item_type top() const =0;
55     virtual void pop() const =0;
56     virtual bool empty() const =0;
57 
58 public:
59 
60     virtual void schedule_absolute(absolute, const schedulable&) const =0;
61 
schedule_relativerxcpp::schedulers::detail::virtual_time_base62     virtual void schedule_relative(relative when, const schedulable& a) const {
63         auto at = add(clock_now, when);
64         return schedule_absolute(at, a);
65     }
66 
is_enabledrxcpp::schedulers::detail::virtual_time_base67     bool is_enabled() const {return isenabled;}
clockrxcpp::schedulers::detail::virtual_time_base68     absolute clock() const {return clock_now;}
69 
startrxcpp::schedulers::detail::virtual_time_base70     void start() const
71     {
72         if (!isenabled) {
73             isenabled = true;
74             rxsc::recursion r;
75             r.reset(false);
76             while (!empty() && isenabled) {
77                 auto next = top();
78                 pop();
79                 if (next.what.is_subscribed()) {
80                     if (next.when > clock_now) {
81                         clock_now = next.when;
82                     }
83                     next.what(r.get_recurse());
84                 }
85             }
86             isenabled = false;
87         }
88     }
89 
stoprxcpp::schedulers::detail::virtual_time_base90     void stop() const
91     {
92         isenabled = false;
93     }
94 
advance_torxcpp::schedulers::detail::virtual_time_base95     void advance_to(absolute time) const
96     {
97         if (time < clock_now) {
98             std::terminate();
99         }
100 
101         if (time == clock_now) {
102             return;
103         }
104 
105         if (!isenabled) {
106             isenabled = true;
107             rxsc::recursion r;
108             while (!empty() && isenabled) {
109                 auto next = top();
110                 if (next.when <= time) {
111                     pop();
112                     if (!next.what.is_subscribed()) {
113                         continue;
114                     }
115                     if (next.when > clock_now) {
116                         clock_now = next.when;
117                     }
118                     next.what(r.get_recurse());
119                 }
120                 else {
121                     break;
122                 }
123             }
124             isenabled = false;
125             clock_now = time;
126         }
127         else {
128             std::terminate();
129         }
130     }
131 
advance_byrxcpp::schedulers::detail::virtual_time_base132     void advance_by(relative time) const
133     {
134         auto dt = add(clock_now, time);
135 
136         if (dt < clock_now) {
137             std::terminate();
138         }
139 
140         if (dt == clock_now) {
141             return;
142         }
143 
144         if (!isenabled) {
145             advance_to(dt);
146         }
147         else {
148             std::terminate();
149         }
150     }
151 
sleeprxcpp::schedulers::detail::virtual_time_base152     void sleep(relative time) const
153     {
154         auto dt = add(clock_now, time);
155 
156         if (dt < clock_now) {
157             std::terminate();
158         }
159 
160         clock_now = dt;
161     }
162 
163 };
164 
165 }
166 
167 template<class Absolute, class Relative>
168 struct virtual_time : public detail::virtual_time_base<Absolute, Relative>
169 {
170     typedef detail::virtual_time_base<Absolute, Relative> base;
171 
172     typedef typename base::item_type item_type;
173 
174     typedef detail::schedulable_queue<
175         typename item_type::time_point_type> queue_item_time;
176 
177     mutable queue_item_time q;
178 
179 public:
~virtual_timerxcpp::schedulers::virtual_time180     virtual ~virtual_time()
181     {
182     }
183 
184 protected:
virtual_timerxcpp::schedulers::virtual_time185     virtual_time()
186     {
187     }
virtual_timerxcpp::schedulers::virtual_time188     explicit virtual_time(typename base::absolute initialClock)
189         : base(initialClock)
190     {
191     }
192 
toprxcpp::schedulers::virtual_time193     virtual item_type top() const {
194         return q.top();
195     }
poprxcpp::schedulers::virtual_time196     virtual void pop() const {
197         q.pop();
198     }
emptyrxcpp::schedulers::virtual_time199     virtual bool empty() const {
200         return q.empty();
201     }
202 
203     using base::schedule_absolute;
204     using base::schedule_relative;
205 
schedule_absoluterxcpp::schedulers::virtual_time206     virtual void schedule_absolute(typename base::absolute when, const schedulable& a) const
207     {
208         // use a separate subscription here so that a's subscription is not affected
209         auto run = make_schedulable(
210             a.get_worker(),
211             composite_subscription(),
212             [a](const schedulable& scbl) {
213                 rxsc::recursion r;
214                 r.reset(false);
215                 if (scbl.is_subscribed()) {
216                     scbl.unsubscribe(); // unsubscribe() run, not a;
217                     a(r.get_recurse());
218                 }
219             });
220         q.push(item_type(when, run));
221     }
222 };
223 
224 
225 
226 }
227 
228 }
229 
230 #endif
231