1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 struct event_loop : public scheduler_interface
15 {
16 private:
17     typedef event_loop this_type;
18     event_loop(const this_type&);
19 
20     struct loop_worker : public worker_interface
21     {
22     private:
23         typedef loop_worker this_type;
24         loop_worker(const this_type&);
25 
26         typedef detail::schedulable_queue<
27             typename clock_type::time_point> queue_item_time;
28 
29         typedef queue_item_time::item_type item_type;
30 
31         composite_subscription lifetime;
32         worker controller;
33         std::shared_ptr<const scheduler_interface> alive;
34 
35     public:
~loop_workerrxcpp::schedulers::event_loop::loop_worker36         virtual ~loop_worker()
37         {
38         }
loop_workerrxcpp::schedulers::event_loop::loop_worker39         loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
40             : lifetime(cs)
41             , controller(w)
42             , alive(alive)
43         {
44             auto token = controller.add(cs);
45             cs.add([token, w](){
46                 w.remove(token);
47             });
48         }
49 
nowrxcpp::schedulers::event_loop::loop_worker50         virtual clock_type::time_point now() const {
51             return clock_type::now();
52         }
53 
schedulerxcpp::schedulers::event_loop::loop_worker54         virtual void schedule(const schedulable& scbl) const {
55             controller.schedule(lifetime, scbl.get_action());
56         }
57 
schedulerxcpp::schedulers::event_loop::loop_worker58         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
59             controller.schedule(when, lifetime, scbl.get_action());
60         }
61     };
62 
63     mutable thread_factory factory;
64     scheduler newthread;
65     mutable std::atomic<std::size_t> count;
66     composite_subscription loops_lifetime;
67     std::vector<worker> loops;
68 
69 public:
event_looprxcpp::schedulers::event_loop70     event_loop()
71         : factory([](std::function<void()> start){
72             return std::thread(std::move(start));
73         })
74         , newthread(make_new_thread())
75         , count(0)
76     {
77         auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
78         while (remaining--) {
79             loops.push_back(newthread.create_worker(loops_lifetime));
80         }
81     }
event_looprxcpp::schedulers::event_loop82     explicit event_loop(thread_factory tf)
83         : factory(tf)
84         , newthread(make_new_thread(tf))
85         , count(0)
86     {
87         auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
88         while (remaining--) {
89             loops.push_back(newthread.create_worker(loops_lifetime));
90         }
91     }
~event_looprxcpp::schedulers::event_loop92     virtual ~event_loop()
93     {
94         loops_lifetime.unsubscribe();
95     }
96 
nowrxcpp::schedulers::event_loop97     virtual clock_type::time_point now() const {
98         return clock_type::now();
99     }
100 
create_workerrxcpp::schedulers::event_loop101     virtual worker create_worker(composite_subscription cs) const {
102         return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
103     }
104 };
105 
make_event_loop()106 inline scheduler make_event_loop() {
107     static scheduler instance = make_scheduler<event_loop>();
108     return instance;
109 }
make_event_loop(thread_factory tf)110 inline scheduler make_event_loop(thread_factory tf) {
111     return make_scheduler<event_loop>(tf);
112 }
113 
114 }
115 
116 }
117 
118 #endif
119