1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_HPP)
6 #define RXCPP_RX_SCHEDULER_HPP
7 
8 #include "rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 class worker_interface;
15 class scheduler_interface;
16 
17 namespace detail {
18 
19 class action_type;
20 typedef std::shared_ptr<action_type> action_ptr;
21 
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
24 
25 typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
26 typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
27 
28 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
29 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
30 
shared_empty()31 inline action_ptr shared_empty() {
32     static action_ptr shared_empty = std::make_shared<detail::action_type>();
33     return shared_empty;
34 }
35 
36 }
37 
38 // It is essential to keep virtual function calls out of an inner loop.
39 // To make tail-recursion work efficiently the recursion objects create
40 // a space on the stack inside the virtual function call in the actor that
41 // allows the callback and the scheduler to share stack space that records
42 // the request and the allowance without any virtual calls in the loop.
43 
44 /// recursed is set on a schedulable by the action to allow the called
45 /// function to request to be rescheduled.
46 class recursed
47 {
48     bool& isrequested;
49     recursed operator=(const recursed&);
50 public:
recursed(bool & r)51     explicit recursed(bool& r)
52         : isrequested(r)
53     {
54     }
55     /// request to be rescheduled
operator ()() const56     inline void operator()() const {
57         isrequested = true;
58     }
59 };
60 
61 /// recurse is passed to the action by the scheduler.
62 /// the action uses recurse to coordinate the scheduler and the function.
63 class recurse
64 {
65     bool& isallowed;
66     mutable bool isrequested;
67     recursed requestor;
68     recurse operator=(const recurse&);
69 public:
recurse(bool & a)70     explicit recurse(bool& a)
71         : isallowed(a)
72         , isrequested(true)
73         , requestor(isrequested)
74     {
75     }
76     /// does the scheduler allow tail-recursion now?
is_allowed() const77     inline bool is_allowed() const {
78         return isallowed;
79     }
80     /// did the function request to be recursed?
is_requested() const81     inline bool is_requested() const {
82         return isrequested;
83     }
84     /// reset the function request. call before each call to the function.
reset() const85     inline void reset() const {
86         isrequested = false;
87     }
88     /// get the recursed to set into the schedulable for the function to use to request recursion
get_recursed() const89     inline const recursed& get_recursed() const {
90         return requestor;
91     }
92 };
93 
94 /// recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
95 class recursion
96 {
97     mutable bool isallowed;
98     recurse recursor;
99     recursion operator=(const recursion&);
100 public:
recursion()101     recursion()
102         : isallowed(true)
103         , recursor(isallowed)
104     {
105     }
recursion(bool b)106     explicit recursion(bool b)
107         : isallowed(b)
108         , recursor(isallowed)
109     {
110     }
111     /// set whether tail-recursion is allowed
reset(bool b=true) const112     inline void reset(bool b = true) const {
113         isallowed = b;
114     }
115     /// get the recurse to pass into each action being called
get_recurse() const116     inline const recurse& get_recurse() const {
117         return recursor;
118     }
119 };
120 
121 
122 struct action_base
123 {
124     typedef tag_action action_tag;
125 };
126 
127 class schedulable;
128 
129 /// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable
130 class action : public action_base
131 {
132     typedef action this_type;
133     detail::action_ptr inner;
134 public:
action()135     action()
136     {
137     }
action(detail::action_ptr i)138     explicit action(detail::action_ptr i)
139     : inner(std::move(i))
140     {
141     }
142 
143     /// return the empty action
empty()144     inline static action empty() {
145         return action(detail::shared_empty());
146     }
147 
148     /// call the function
149     inline void operator()(const schedulable& s, const recurse& r) const;
150 };
151 
152 struct scheduler_base
153 {
154     typedef std::chrono::steady_clock clock_type;
155     typedef tag_scheduler scheduler_tag;
156 };
157 
158 struct worker_base : public subscription_base
159 {
160     typedef tag_worker worker_tag;
161 };
162 
163 class worker_interface
164     : public std::enable_shared_from_this<worker_interface>
165 {
166     typedef worker_interface this_type;
167 
168 public:
169     typedef scheduler_base::clock_type clock_type;
170 
~worker_interface()171     virtual ~worker_interface() {}
172 
173     virtual clock_type::time_point now() const = 0;
174 
175     virtual void schedule(const schedulable& scbl) const = 0;
176     virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
177 };
178 
179 namespace detail {
180 
181 template<class F>
182 struct is_action_function
183 {
184     struct not_void {};
185     template<class CF>
186     static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
187     template<class CF>
188     static not_void check(...);
189 
190     static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
191 };
192 
193 }
194 
195 class weak_worker;
196 
197 /// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
198 /// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
199 /// some inner implementations will impose additional constraints on the execution of items.
200 class worker : public worker_base
201 {
202     typedef worker this_type;
203     detail::worker_interface_ptr inner;
204     composite_subscription lifetime;
205     friend bool operator==(const worker&, const worker&);
206     friend class weak_worker;
207 public:
208     typedef scheduler_base::clock_type clock_type;
209     typedef composite_subscription::weak_subscription weak_subscription;
210 
worker()211     worker()
212     {
213     }
worker(composite_subscription cs,detail::const_worker_interface_ptr i)214     worker(composite_subscription cs, detail::const_worker_interface_ptr i)
215         : inner(std::const_pointer_cast<worker_interface>(i))
216         , lifetime(std::move(cs))
217     {
218     }
worker(composite_subscription cs,worker o)219     worker(composite_subscription cs, worker o)
220         : inner(o.inner)
221         , lifetime(std::move(cs))
222     {
223     }
224 
get_subscription() const225     inline const composite_subscription& get_subscription() const {
226         return lifetime;
227     }
get_subscription()228     inline composite_subscription& get_subscription() {
229         return lifetime;
230     }
231 
232     // composite_subscription
233     //
is_subscribed() const234     inline bool is_subscribed() const {
235         return lifetime.is_subscribed();
236     }
add(subscription s) const237     inline weak_subscription add(subscription s) const {
238         return lifetime.add(std::move(s));
239     }
remove(weak_subscription w) const240     inline void remove(weak_subscription w) const {
241         return lifetime.remove(std::move(w));
242     }
clear() const243     inline void clear() const {
244         return lifetime.clear();
245     }
unsubscribe() const246     inline void unsubscribe() const {
247         return lifetime.unsubscribe();
248     }
249 
250     // worker_interface
251     //
252     /// return the current time for this worker
now() const253     inline clock_type::time_point now() const {
254         return inner->now();
255     }
256 
257     /// insert the supplied schedulable to be run as soon as possible
schedule(const schedulable & scbl) const258     inline void schedule(const schedulable& scbl) const {
259         // force rebinding scbl to this worker
260         schedule_rebind(scbl);
261     }
262 
263     /// insert the supplied schedulable to be run at the time specified
schedule(clock_type::time_point when,const schedulable & scbl) const264     inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
265         // force rebinding scbl to this worker
266         schedule_rebind(when, scbl);
267     }
268 
269     // helpers
270     //
271 
272     /// insert the supplied schedulable to be run at now() + the delay specified
schedule(clock_type::duration when,const schedulable & scbl) const273     inline void schedule(clock_type::duration when, const schedulable& scbl) const {
274         // force rebinding scbl to this worker
275         schedule_rebind(now() + when, scbl);
276     }
277 
278     /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period)
279     /// this will continue until the worker or schedulable is unsubscribed.
schedule_periodically(clock_type::time_point initial,clock_type::duration period,const schedulable & scbl) const280     inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
281         // force rebinding scbl to this worker
282         schedule_periodically_rebind(initial, period, scbl);
283     }
284 
285     /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period)
286     /// this will continue until the worker or schedulable is unsubscribed.
schedule_periodically(clock_type::duration initial,clock_type::duration period,const schedulable & scbl) const287     inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
288         // force rebinding scbl to this worker
289         schedule_periodically_rebind(now() + initial, period, scbl);
290     }
291 
292     /// use the supplied arguments to make a schedulable and then insert it to be run
293     template<class Arg0, class... ArgN>
294     auto schedule(Arg0&& a0, ArgN&&... an) const
295         -> typename std::enable_if<
296             (detail::is_action_function<Arg0>::value ||
297             is_subscription<Arg0>::value) &&
298             !is_schedulable<Arg0>::value>::type;
299     template<class... ArgN>
300     /// use the supplied arguments to make a schedulable and then insert it to be run
301     void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
302 
303     /// use the supplied arguments to make a schedulable and then insert it to be run
304     template<class Arg0, class... ArgN>
305     auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
306         -> typename std::enable_if<
307             (detail::is_action_function<Arg0>::value ||
308             is_subscription<Arg0>::value) &&
309             !is_schedulable<Arg0>::value>::type;
310     /// use the supplied arguments to make a schedulable and then insert it to be run
311     template<class... ArgN>
312     void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
313 
314     /// use the supplied arguments to make a schedulable and then insert it to be run
315     template<class Arg0, class... ArgN>
316     auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
317         -> typename std::enable_if<
318             (detail::is_action_function<Arg0>::value ||
319             is_subscription<Arg0>::value) &&
320             !is_schedulable<Arg0>::value>::type;
321     /// use the supplied arguments to make a schedulable and then insert it to be run
322     template<class... ArgN>
323     void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
324 };
325 
operator ==(const worker & lhs,const worker & rhs)326 inline bool operator==(const worker& lhs, const worker& rhs) {
327     return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
328 }
operator !=(const worker & lhs,const worker & rhs)329 inline bool operator!=(const worker& lhs, const worker& rhs) {
330     return !(lhs == rhs);
331 }
332 
333 class weak_worker
334 {
335     detail::worker_interface_weak_ptr inner;
336     composite_subscription lifetime;
337 
338 public:
weak_worker()339     weak_worker()
340     {
341     }
weak_worker(worker & owner)342     explicit weak_worker(worker& owner)
343         : inner(owner.inner)
344         , lifetime(owner.lifetime)
345     {
346     }
347 
lock() const348     worker lock() const {
349         return worker(lifetime, inner.lock());
350     }
351 };
352 
353 class scheduler_interface
354     : public std::enable_shared_from_this<scheduler_interface>
355 {
356     typedef scheduler_interface this_type;
357 
358 public:
359     typedef scheduler_base::clock_type clock_type;
360 
~scheduler_interface()361     virtual ~scheduler_interface() {}
362 
363     virtual clock_type::time_point now() const = 0;
364 
365     virtual worker create_worker(composite_subscription cs) const = 0;
366 };
367 
368 
369 struct schedulable_base :
370     // public subscription_base, <- already in worker base
371     public worker_base,
372     public action_base
373 {
374     typedef tag_schedulable schedulable_tag;
375 };
376 
377 /*!
378     \brief allows functions to be called at specified times and possibly in other contexts.
379 
380     \ingroup group-core
381 
382 */
383 class scheduler : public scheduler_base
384 {
385     typedef scheduler this_type;
386     detail::scheduler_interface_ptr inner;
387     friend bool operator==(const scheduler&, const scheduler&);
388 public:
389     typedef scheduler_base::clock_type clock_type;
390 
scheduler()391     scheduler()
392     {
393     }
scheduler(detail::scheduler_interface_ptr i)394     explicit scheduler(detail::scheduler_interface_ptr i)
395         : inner(std::move(i))
396     {
397     }
scheduler(detail::const_scheduler_interface_ptr i)398     explicit scheduler(detail::const_scheduler_interface_ptr i)
399         : inner(std::const_pointer_cast<scheduler_interface>(i))
400     {
401     }
402 
403     /// return the current time for this scheduler
now() const404     inline clock_type::time_point now() const {
405         return inner->now();
406     }
407     /// create a worker with a lifetime.
408     /// when the worker is unsubscribed all scheduled items will be unsubscribed.
409     /// items scheduled to a worker will be run one at a time.
410     /// scheduling order is preserved: when more than one item is scheduled for
411     /// time T then at time T they will be run in the order that they were scheduled.
create_worker(composite_subscription cs=composite_subscription ()) const412     inline worker create_worker(composite_subscription cs = composite_subscription()) const {
413         return inner->create_worker(cs);
414     }
415 };
416 
417 template<class Scheduler, class... ArgN>
make_scheduler(ArgN &&...an)418 inline scheduler make_scheduler(ArgN&&... an) {
419     return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
420 }
421 
make_scheduler(std::shared_ptr<scheduler_interface> si)422 inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
423     return scheduler(si);
424 }
425 
426 class schedulable : public schedulable_base
427 {
428     typedef schedulable this_type;
429 
430     composite_subscription lifetime;
431     weak_worker controller;
432     action activity;
433     bool scoped;
434     composite_subscription::weak_subscription action_scope;
435 
436     struct detacher
437     {
~detacherrxcpp::schedulers::schedulable::detacher438         ~detacher()
439         {
440             if (that) {
441                 that->unsubscribe();
442             }
443         }
detacherrxcpp::schedulers::schedulable::detacher444         detacher(const this_type* that)
445             : that(that)
446         {
447         }
448         const this_type* that;
449     };
450 
451     class recursed_scope_type
452     {
453         mutable const recursed* requestor;
454 
455         class exit_recursed_scope_type
456         {
457             const recursed_scope_type* that;
458         public:
~exit_recursed_scope_type()459             ~exit_recursed_scope_type()
460             {
461                 if (that != nullptr) {
462                     that->requestor = nullptr;
463                 }
464             }
exit_recursed_scope_type(const recursed_scope_type * that)465             exit_recursed_scope_type(const recursed_scope_type* that)
466                 : that(that)
467             {
468             }
exit_recursed_scope_type(exit_recursed_scope_type && other)469             exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
470                 : that(other.that)
471             {
472                 other.that = nullptr;
473             }
474         };
475     public:
recursed_scope_type()476         recursed_scope_type()
477             : requestor(nullptr)
478         {
479         }
recursed_scope_type(const recursed_scope_type &)480         recursed_scope_type(const recursed_scope_type&)
481             : requestor(nullptr)
482         {
483             // does not aquire recursion scope
484         }
operator =(const recursed_scope_type &)485         recursed_scope_type& operator=(const recursed_scope_type& )
486         {
487             // no change in recursion scope
488             return *this;
489         }
reset(const recurse & r) const490         exit_recursed_scope_type reset(const recurse& r) const {
491             requestor = std::addressof(r.get_recursed());
492             return exit_recursed_scope_type(this);
493         }
is_recursed() const494         bool is_recursed() const {
495             return !!requestor;
496         }
operator ()() const497         void operator()() const {
498             (*requestor)();
499         }
500     };
501     recursed_scope_type recursed_scope;
502 
503 public:
504     typedef composite_subscription::weak_subscription weak_subscription;
505     typedef scheduler_base::clock_type clock_type;
506 
~schedulable()507     ~schedulable()
508     {
509         if (scoped) {
510             controller.lock().remove(action_scope);
511         }
512     }
schedulable()513     schedulable()
514         : scoped(false)
515     {
516     }
517 
518     /// action and worker share lifetime
schedulable(worker q,action a)519     schedulable(worker q, action a)
520         : lifetime(q.get_subscription())
521         , controller(q)
522         , activity(std::move(a))
523         , scoped(false)
524     {
525     }
526     /// action and worker have independent lifetimes
schedulable(composite_subscription cs,worker q,action a)527     schedulable(composite_subscription cs, worker q, action a)
528         : lifetime(std::move(cs))
529         , controller(q)
530         , activity(std::move(a))
531         , scoped(true)
532         , action_scope(controller.lock().add(lifetime))
533     {
534     }
535     /// inherit lifetimes
schedulable(schedulable scbl,worker q,action a)536     schedulable(schedulable scbl, worker q, action a)
537         : lifetime(scbl.get_subscription())
538         , controller(q)
539         , activity(std::move(a))
540         , scoped(scbl.scoped)
541         , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
542     {
543     }
544 
get_subscription() const545     inline const composite_subscription& get_subscription() const {
546         return lifetime;
547     }
get_subscription()548     inline composite_subscription& get_subscription() {
549         return lifetime;
550     }
get_worker() const551     inline const worker get_worker() const {
552         return controller.lock();
553     }
get_worker()554     inline worker get_worker() {
555         return controller.lock();
556     }
get_action() const557     inline const action& get_action() const {
558         return activity;
559     }
get_action()560     inline action& get_action() {
561         return activity;
562     }
563 
empty(worker sc)564     inline static schedulable empty(worker sc) {
565         return schedulable(composite_subscription::empty(), sc, action::empty());
566     }
567 
set_recursed(const recurse & r) const568     inline auto set_recursed(const recurse& r) const
569         -> decltype(recursed_scope.reset(r)) {
570         return      recursed_scope.reset(r);
571     }
572 
573     // recursed
574     //
is_recursed() const575     bool is_recursed() const {
576         return recursed_scope.is_recursed();
577     }
578     /// requests tail-recursion of the same action
579     /// this will exit the process if called when
580     /// is_recursed() is false.
581     /// Note: to improve perf it is not required
582     /// to call is_recursed() before calling this
583     /// operator. Context is sufficient. The schedulable
584     /// passed to the action by the scheduler will return
585     /// true from is_recursed()
operator ()() const586     inline void operator()() const {
587         recursed_scope();
588     }
589 
590     // composite_subscription
591     //
is_subscribed() const592     inline bool is_subscribed() const {
593         return lifetime.is_subscribed();
594     }
add(subscription s) const595     inline weak_subscription add(subscription s) const {
596         return lifetime.add(std::move(s));
597     }
598     template<class F>
add(F f) const599     auto add(F f) const
600     -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
601         return lifetime.add(make_subscription(std::move(f)));
602     }
remove(weak_subscription w) const603     inline void remove(weak_subscription w) const {
604         return lifetime.remove(std::move(w));
605     }
clear() const606     inline void clear() const {
607         return lifetime.clear();
608     }
unsubscribe() const609     inline void unsubscribe() const {
610         return lifetime.unsubscribe();
611     }
612 
613     // scheduler
614     //
now() const615     inline clock_type::time_point now() const {
616         return controller.lock().now();
617     }
618     /// put this on the queue of the stored scheduler to run asap
schedule() const619     inline void schedule() const {
620         if (is_subscribed()) {
621             get_worker().schedule(*this);
622         }
623     }
624     /// put this on the queue of the stored scheduler to run at the specified time
schedule(clock_type::time_point when) const625     inline void schedule(clock_type::time_point when) const {
626         if (is_subscribed()) {
627             get_worker().schedule(when, *this);
628         }
629     }
630     /// put this on the queue of the stored scheduler to run after a delay from now
schedule(clock_type::duration when) const631     inline void schedule(clock_type::duration when) const {
632         if (is_subscribed()) {
633             get_worker().schedule(when, *this);
634         }
635     }
636 
637     // action
638     //
639     /// invokes the action
operator ()(const recurse & r) const640     inline void operator()(const recurse& r) const {
641         if (!is_subscribed()) {
642             return;
643         }
644         detacher protect(this);
645         activity(*this, r);
646         protect.that = nullptr;
647     }
648 };
649 
650 struct current_thread;
651 
652 namespace detail {
653 
654 class action_type
655     : public std::enable_shared_from_this<action_type>
656 {
657     typedef action_type this_type;
658 
659 public:
660     typedef std::function<void(const schedulable&, const recurse&)> function_type;
661 
662 private:
663     function_type f;
664 
665 public:
action_type()666     action_type()
667     {
668     }
669 
action_type(function_type f)670     action_type(function_type f)
671         : f(std::move(f))
672     {
673     }
674 
operator ()(const schedulable & s,const recurse & r)675     inline void operator()(const schedulable& s, const recurse& r) {
676         if (!f) {
677             std::terminate();
678         }
679         f(s, r);
680     }
681 };
682 
683 class action_tailrecurser
684     : public std::enable_shared_from_this<action_type>
685 {
686     typedef action_type this_type;
687 
688 public:
689     typedef std::function<void(const schedulable&)> function_type;
690 
691 private:
692     function_type f;
693 
694 public:
action_tailrecurser()695     action_tailrecurser()
696     {
697     }
698 
action_tailrecurser(function_type f)699     action_tailrecurser(function_type f)
700         : f(std::move(f))
701     {
702     }
703 
operator ()(const schedulable & s,const recurse & r)704     inline void operator()(const schedulable& s, const recurse& r) {
705         if (!f) {
706             std::terminate();
707         }
708         trace_activity().action_enter(s);
709         auto scope = s.set_recursed(r);
710         while (s.is_subscribed()) {
711             r.reset();
712             f(s);
713             if (!r.is_allowed() || !r.is_requested()) {
714                 if (r.is_requested()) {
715                     s.schedule();
716                 }
717                 break;
718             }
719             trace_activity().action_recurse(s);
720         }
721         trace_activity().action_return(s);
722     }
723 };
724 }
725 
operator ()(const schedulable & s,const recurse & r) const726 inline void action::operator()(const schedulable& s, const recurse& r) const {
727     (*inner)(s, r);
728 }
729 
make_action_empty()730 inline action make_action_empty() {
731     return action::empty();
732 }
733 
734 template<class F>
make_action(F && f)735 inline action make_action(F&& f) {
736     static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
737     auto fn = std::forward<F>(f);
738     return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
739 }
740 
741 // copy
make_schedulable(const schedulable & scbl)742 inline auto make_schedulable(
743     const   schedulable& scbl)
744     ->      schedulable {
745     return  schedulable(scbl);
746 }
747 // move
make_schedulable(schedulable && scbl)748 inline auto make_schedulable(
749             schedulable&& scbl)
750     ->      schedulable {
751     return  schedulable(std::move(scbl));
752 }
753 
make_schedulable(worker sc,action a)754 inline schedulable make_schedulable(worker sc, action a) {
755     return schedulable(sc, a);
756 }
make_schedulable(worker sc,composite_subscription cs,action a)757 inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) {
758     return schedulable(cs, sc, a);
759 }
760 
761 template<class F>
make_schedulable(worker sc,F && f)762 auto make_schedulable(worker sc, F&& f)
763     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
764     return schedulable(sc, make_action(std::forward<F>(f)));
765 }
766 template<class F>
make_schedulable(worker sc,composite_subscription cs,F && f)767 auto make_schedulable(worker sc, composite_subscription cs, F&& f)
768     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
769     return schedulable(cs, sc, make_action(std::forward<F>(f)));
770 }
771 template<class F>
make_schedulable(schedulable scbl,composite_subscription cs,F && f)772 auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f)
773     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
774     return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
775 }
776 template<class F>
make_schedulable(schedulable scbl,worker sc,F && f)777 auto make_schedulable(schedulable scbl, worker sc, F&& f)
778     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
779     return schedulable(scbl, sc, make_action(std::forward<F>(f)));
780 }
781 template<class F>
make_schedulable(schedulable scbl,F && f)782 auto make_schedulable(schedulable scbl, F&& f)
783     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
784     return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
785 }
786 
make_schedulable(schedulable scbl,composite_subscription cs)787 inline auto make_schedulable(schedulable scbl, composite_subscription cs)
788     -> schedulable {
789     return schedulable(cs, scbl.get_worker(), scbl.get_action());
790 }
make_schedulable(schedulable scbl,worker sc,composite_subscription cs)791 inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs)
792     -> schedulable {
793     return schedulable(cs, sc, scbl.get_action());
794 }
make_schedulable(schedulable scbl,worker sc)795 inline auto make_schedulable(schedulable scbl, worker sc)
796     -> schedulable {
797     return schedulable(scbl, sc, scbl.get_action());
798 }
799 
800 template<class Arg0, class... ArgN>
schedule(Arg0 && a0,ArgN &&...an) const801 auto worker::schedule(Arg0&& a0, ArgN&&... an) const
802     -> typename std::enable_if<
803         (detail::is_action_function<Arg0>::value ||
804         is_subscription<Arg0>::value) &&
805         !is_schedulable<Arg0>::value>::type {
806     auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
807     trace_activity().schedule_enter(*inner.get(), scbl);
808     inner->schedule(std::move(scbl));
809     trace_activity().schedule_return(*inner.get());
810 }
811 template<class... ArgN>
schedule_rebind(const schedulable & scbl,ArgN &&...an) const812 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
813     auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
814     trace_activity().schedule_enter(*inner.get(), rescbl);
815     inner->schedule(std::move(rescbl));
816     trace_activity().schedule_return(*inner.get());
817 }
818 
819 template<class Arg0, class... ArgN>
schedule(clock_type::time_point when,Arg0 && a0,ArgN &&...an) const820 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
821     -> typename std::enable_if<
822         (detail::is_action_function<Arg0>::value ||
823         is_subscription<Arg0>::value) &&
824         !is_schedulable<Arg0>::value>::type {
825     auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
826     trace_activity().schedule_when_enter(*inner.get(), when, scbl);
827     inner->schedule(when, std::move(scbl));
828     trace_activity().schedule_when_return(*inner.get());
829 }
830 template<class... ArgN>
schedule_rebind(clock_type::time_point when,const schedulable & scbl,ArgN &&...an) const831 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
832     auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
833     trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
834     inner->schedule(when, std::move(rescbl));
835     trace_activity().schedule_when_return(*inner.get());
836 }
837 
838 template<class Arg0, class... ArgN>
schedule_periodically(clock_type::time_point initial,clock_type::duration period,Arg0 && a0,ArgN &&...an) const839 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
840     -> typename std::enable_if<
841         (detail::is_action_function<Arg0>::value ||
842         is_subscription<Arg0>::value) &&
843         !is_schedulable<Arg0>::value>::type {
844     schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
845 }
846 template<class... ArgN>
schedule_periodically_rebind(clock_type::time_point initial,clock_type::duration period,const schedulable & scbl,ArgN &&...an) const847 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
848     auto keepAlive = *this;
849     auto target = std::make_shared<clock_type::time_point>(initial);
850     auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
851     auto periodic = make_schedulable(
852         activity,
853         [keepAlive, target, period, activity](schedulable self) {
854             // any recursion requests will be pushed to the scheduler queue
855             recursion r(false);
856             // call action
857             activity(r.get_recurse());
858 
859             // schedule next occurance (if the action took longer than 'period' target will be in the past)
860             *target += period;
861             self.schedule(*target);
862         });
863     trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
864     inner->schedule(*target, periodic);
865     trace_activity().schedule_when_return(*inner.get());
866 }
867 
868 namespace detail {
869 
870 template<class TimePoint>
871 struct time_schedulable
872 {
873     typedef TimePoint time_point_type;
874 
time_schedulablerxcpp::schedulers::detail::time_schedulable875     time_schedulable(TimePoint when, schedulable a)
876         : when(when)
877         , what(std::move(a))
878     {
879     }
880     TimePoint when;
881     schedulable what;
882 };
883 
884 
885 // Sorts time_schedulable items in priority order sorted
886 // on value of time_schedulable.when. Items with equal
887 // values for when are sorted in fifo order.
888 template<class TimePoint>
889 class schedulable_queue {
890 public:
891     typedef time_schedulable<TimePoint> item_type;
892     typedef std::pair<item_type, int64_t> elem_type;
893     typedef std::vector<elem_type> container_type;
894     typedef const item_type& const_reference;
895 
896 private:
897     struct compare_elem
898     {
operator ()rxcpp::schedulers::detail::schedulable_queue::compare_elem899         bool operator()(const elem_type& lhs, const elem_type& rhs) const {
900             if (lhs.first.when == rhs.first.when) {
901                 return lhs.second > rhs.second;
902             }
903             else {
904                 return lhs.first.when > rhs.first.when;
905             }
906         }
907     };
908 
909     typedef std::priority_queue<
910         elem_type,
911         container_type,
912         compare_elem
913     > queue_type;
914 
915     queue_type q;
916 
917     int64_t ordinal;
918 public:
919 
schedulable_queue()920     schedulable_queue()
921         : ordinal(0)
922     {
923     }
924 
top() const925     const_reference top() const {
926         return q.top().first;
927     }
928 
pop()929     void pop() {
930         q.pop();
931     }
932 
empty() const933     bool empty() const {
934         return q.empty();
935     }
936 
push(const item_type & value)937     void push(const item_type& value) {
938         q.push(elem_type(value, ordinal++));
939     }
940 
push(item_type && value)941     void push(item_type&& value) {
942         q.push(elem_type(std::move(value), ordinal++));
943     }
944 };
945 
946 }
947 
948 }
949 namespace rxsc=schedulers;
950 
951 }
952 
953 #include "schedulers/rx-currentthread.hpp"
954 #include "schedulers/rx-runloop.hpp"
955 #include "schedulers/rx-newthread.hpp"
956 #include "schedulers/rx-eventloop.hpp"
957 #include "schedulers/rx-immediate.hpp"
958 #include "schedulers/rx-virtualtime.hpp"
959 #include "schedulers/rx-sameworker.hpp"
960 
961 #endif
962