1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
6 #define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 namespace rxcpp {
11 
12 namespace schedulers {
13 
14 typedef std::function<std::thread(std::function<void()>)> thread_factory;
15 
16 struct new_thread : public scheduler_interface
17 {
18 private:
19     typedef new_thread this_type;
20     new_thread(const this_type&);
21 
22     struct new_worker : public worker_interface
23     {
24     private:
25         typedef new_worker this_type;
26 
27         typedef detail::action_queue queue_type;
28 
29         new_worker(const this_type&);
30 
31         struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
32         {
33             typedef detail::schedulable_queue<
34                 typename clock_type::time_point> queue_item_time;
35 
36             typedef queue_item_time::item_type item_type;
37 
~new_worker_staterxcpp::schedulers::new_thread::new_worker::new_worker_state38             virtual ~new_worker_state()
39             {
40                 // Ensure that std::thread is no longer joinable,
41                 // otherwise the destructor will call std::terminate.
42                 if (!worker.joinable()) {
43                     return;
44                 }
45                 if (worker.get_id() != std::this_thread::get_id()) {
46                     worker.join();
47                 } else {
48                     worker.detach();
49                 }
50             }
51 
new_worker_staterxcpp::schedulers::new_thread::new_worker::new_worker_state52             explicit new_worker_state(composite_subscription cs)
53                 : lifetime(cs)
54             {
55             }
56 
57             composite_subscription lifetime;
58             mutable std::mutex lock;
59             mutable std::condition_variable wake;
60             mutable queue_item_time q;
61             std::thread worker;
62             recursion r;
63         };
64 
65         std::shared_ptr<new_worker_state> state;
66 
67     public:
~new_workerrxcpp::schedulers::new_thread::new_worker68         virtual ~new_worker()
69         {
70         }
71 
new_workerrxcpp::schedulers::new_thread::new_worker72         explicit new_worker(std::shared_ptr<new_worker_state> ws)
73             : state(ws)
74         {
75         }
76 
new_workerrxcpp::schedulers::new_thread::new_worker77         new_worker(composite_subscription cs, thread_factory& tf)
78             : state(std::make_shared<new_worker_state>(cs))
79         {
80             auto keepAlive = state;
81 
82             state->lifetime.add([keepAlive](){
83                 std::unique_lock<std::mutex> guard(keepAlive->lock);
84                 auto expired = std::move(keepAlive->q);
85                 keepAlive->q = new_worker_state::queue_item_time{};
86                 if (!keepAlive->q.empty()) std::terminate();
87                 keepAlive->wake.notify_one();
88 
89                 // ~new_worker_state cleans up the std::thread
90             });
91 
92             state->worker = tf([keepAlive](){
93 
94                 // take ownership
95                 queue_type::ensure(std::make_shared<new_worker>(keepAlive));
96                 // release ownership
97                 RXCPP_UNWIND_AUTO([]{
98                     queue_type::destroy();
99                 });
100 
101                 for(;;) {
102                     std::unique_lock<std::mutex> guard(keepAlive->lock);
103                     if (keepAlive->q.empty()) {
104                         keepAlive->wake.wait(guard, [keepAlive](){
105                             return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
106                         });
107                     }
108                     if (!keepAlive->lifetime.is_subscribed()) {
109                         break;
110                     }
111                     auto& peek = keepAlive->q.top();
112                     if (!peek.what.is_subscribed()) {
113                         keepAlive->q.pop();
114                         continue;
115                     }
116                     if (clock_type::now() < peek.when) {
117                         keepAlive->wake.wait_until(guard, peek.when);
118                         continue;
119                     }
120                     auto what = peek.what;
121                     keepAlive->q.pop();
122                     keepAlive->r.reset(keepAlive->q.empty());
123                     guard.unlock();
124                     what(keepAlive->r.get_recurse());
125                 }
126             });
127         }
128 
nowrxcpp::schedulers::new_thread::new_worker129         virtual clock_type::time_point now() const {
130             return clock_type::now();
131         }
132 
schedulerxcpp::schedulers::new_thread::new_worker133         virtual void schedule(const schedulable& scbl) const {
134             schedule(now(), scbl);
135         }
136 
schedulerxcpp::schedulers::new_thread::new_worker137         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
138             if (scbl.is_subscribed()) {
139                 std::unique_lock<std::mutex> guard(state->lock);
140                 state->q.push(new_worker_state::item_type(when, scbl));
141                 state->r.reset(false);
142             }
143             state->wake.notify_one();
144         }
145     };
146 
147     mutable thread_factory factory;
148 
149 public:
new_threadrxcpp::schedulers::new_thread150     new_thread()
151         : factory([](std::function<void()> start){
152             return std::thread(std::move(start));
153         })
154     {
155     }
new_threadrxcpp::schedulers::new_thread156     explicit new_thread(thread_factory tf)
157         : factory(tf)
158     {
159     }
~new_threadrxcpp::schedulers::new_thread160     virtual ~new_thread()
161     {
162     }
163 
nowrxcpp::schedulers::new_thread164     virtual clock_type::time_point now() const {
165         return clock_type::now();
166     }
167 
create_workerrxcpp::schedulers::new_thread168     virtual worker create_worker(composite_subscription cs) const {
169         return worker(cs, std::make_shared<new_worker>(cs, factory));
170     }
171 };
172 
make_new_thread()173 inline scheduler make_new_thread() {
174     static scheduler instance = make_scheduler<new_thread>();
175     return instance;
176 }
make_new_thread(thread_factory tf)177 inline scheduler make_new_thread(thread_factory tf) {
178     return make_scheduler<new_thread>(tf);
179 }
180 
181 }
182 
183 }
184 
185 #endif
186