1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-timeout.hpp
6 
7     \brief Return an observable that terminates with timeout_error if a particular timespan has passed without emitting another item from the source observable.
8 
9     \tparam Duration      the type of time interval.
10     \tparam Coordination  the type of the scheduler (optional).
11 
12     \param period        the period of time wait for another item from the source observable.
13     \param coordination  the scheduler to manage timeout for each event (optional).
14 
15     \return  Observable that terminates with an error if a particular timespan has passed without emitting another item from the source observable.
16 
17     \sample
18     \snippet timeout.cpp timeout sample
19     \snippet output.txt timeout sample
20 */
21 
22 #if !defined(RXCPP_OPERATORS_RX_TIMEOUT_HPP)
23 #define RXCPP_OPERATORS_RX_TIMEOUT_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 class timeout_error: public std::runtime_error
30 {
31     public:
timeout_error(const std::string & msg)32         explicit timeout_error(const std::string& msg):
33             std::runtime_error(msg)
34         {}
35 };
36 
37 namespace operators {
38 
39 namespace detail {
40 
41 template<class... AN>
42 struct timeout_invalid_arguments {};
43 
44 template<class... AN>
45 struct timeout_invalid : public rxo::operator_base<timeout_invalid_arguments<AN...>> {
46     using type = observable<timeout_invalid_arguments<AN...>, timeout_invalid<AN...>>;
47 };
48 template<class... AN>
49 using timeout_invalid_t = typename timeout_invalid<AN...>::type;
50 
51 template<class T, class Duration, class Coordination>
52 struct timeout
53 {
54     typedef rxu::decay_t<T> source_value_type;
55     typedef rxu::decay_t<Coordination> coordination_type;
56     typedef typename coordination_type::coordinator_type coordinator_type;
57     typedef rxu::decay_t<Duration> duration_type;
58 
59     struct timeout_values
60     {
timeout_valuesrxcpp::operators::detail::timeout::timeout_values61         timeout_values(duration_type p, coordination_type c)
62             : period(p)
63             , coordination(c)
64         {
65         }
66 
67         duration_type period;
68         coordination_type coordination;
69     };
70     timeout_values initial;
71 
timeoutrxcpp::operators::detail::timeout72     timeout(duration_type period, coordination_type coordination)
73         : initial(period, coordination)
74     {
75     }
76 
77     template<class Subscriber>
78     struct timeout_observer
79     {
80         typedef timeout_observer<Subscriber> this_type;
81         typedef rxu::decay_t<T> value_type;
82         typedef rxu::decay_t<Subscriber> dest_type;
83         typedef observer<T, this_type> observer_type;
84 
85         struct timeout_subscriber_values : public timeout_values
86         {
timeout_subscriber_valuesrxcpp::operators::detail::timeout::timeout_observer::timeout_subscriber_values87             timeout_subscriber_values(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
88                 : timeout_values(v)
89                 , cs(std::move(cs))
90                 , dest(std::move(d))
91                 , coordinator(std::move(c))
92                 , worker(coordinator.get_worker())
93                 , index(0)
94             {
95             }
96 
97             composite_subscription cs;
98             dest_type dest;
99             coordinator_type coordinator;
100             rxsc::worker worker;
101             mutable std::size_t index;
102         };
103         typedef std::shared_ptr<timeout_subscriber_values> state_type;
104         state_type state;
105 
timeout_observerrxcpp::operators::detail::timeout::timeout_observer106         timeout_observer(composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
107             : state(std::make_shared<timeout_subscriber_values>(timeout_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
108         {
109             auto localState = state;
110 
111             auto disposer = [=](const rxsc::schedulable&){
112                 localState->cs.unsubscribe();
113                 localState->dest.unsubscribe();
114                 localState->worker.unsubscribe();
115             };
116             auto selectedDisposer = on_exception(
117                 [&](){ return localState->coordinator.act(disposer); },
118                 localState->dest);
119             if (selectedDisposer.empty()) {
120                 return;
121             }
122 
123             localState->dest.add([=](){
124                 localState->worker.schedule(selectedDisposer.get());
125             });
126             localState->cs.add([=](){
127                 localState->worker.schedule(selectedDisposer.get());
128             });
129 
130             auto work = [v, localState](const rxsc::schedulable&) {
131                 auto new_id = ++localState->index;
132                 auto produce_time = localState->worker.now() + localState->period;
133 
134                 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
135             };
136             auto selectedWork = on_exception(
137                 [&](){return localState->coordinator.act(work);},
138                 localState->dest);
139             if (selectedWork.empty()) {
140                 return;
141             }
142             localState->worker.schedule(selectedWork.get());
143         }
144 
produce_timeoutrxcpp::operators::detail::timeout::timeout_observer145         static std::function<void(const rxsc::schedulable&)> produce_timeout(std::size_t id, state_type state) {
146             auto produce = [id, state](const rxsc::schedulable&) {
147                 if(id != state->index)
148                     return;
149 
150                 state->dest.on_error(rxu::make_error_ptr(rxcpp::timeout_error("timeout has occurred")));
151             };
152 
153             auto selectedProduce = on_exception(
154                     [&](){ return state->coordinator.act(produce); },
155                     state->dest);
156             if (selectedProduce.empty()) {
157                 return std::function<void(const rxsc::schedulable&)>();
158             }
159 
160             return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
161         }
162 
on_nextrxcpp::operators::detail::timeout::timeout_observer163         void on_next(T v) const {
164             auto localState = state;
165             auto work = [v, localState](const rxsc::schedulable&) {
166                 auto new_id = ++localState->index;
167                 auto produce_time = localState->worker.now() + localState->period;
168 
169                 localState->dest.on_next(v);
170                 localState->worker.schedule(produce_time, produce_timeout(new_id, localState));
171             };
172             auto selectedWork = on_exception(
173                 [&](){return localState->coordinator.act(work);},
174                 localState->dest);
175             if (selectedWork.empty()) {
176                 return;
177             }
178             localState->worker.schedule(selectedWork.get());
179         }
180 
on_errorrxcpp::operators::detail::timeout::timeout_observer181         void on_error(rxu::error_ptr e) const {
182             auto localState = state;
183             auto work = [e, localState](const rxsc::schedulable&) {
184                 localState->dest.on_error(e);
185             };
186             auto selectedWork = on_exception(
187                 [&](){ return localState->coordinator.act(work); },
188                 localState->dest);
189             if (selectedWork.empty()) {
190                 return;
191             }
192             localState->worker.schedule(selectedWork.get());
193         }
194 
on_completedrxcpp::operators::detail::timeout::timeout_observer195         void on_completed() const {
196             auto localState = state;
197             auto work = [localState](const rxsc::schedulable&) {
198                 localState->dest.on_completed();
199             };
200             auto selectedWork = on_exception(
201                 [&](){ return localState->coordinator.act(work); },
202                 localState->dest);
203             if (selectedWork.empty()) {
204                 return;
205             }
206             localState->worker.schedule(selectedWork.get());
207         }
208 
makerxcpp::operators::detail::timeout::timeout_observer209         static subscriber<T, observer_type> make(dest_type d, timeout_values v) {
210             auto cs = composite_subscription();
211             auto coordinator = v.coordination.create_coordinator();
212 
213             return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
214         }
215     };
216 
217     template<class Subscriber>
operator ()rxcpp::operators::detail::timeout218     auto operator()(Subscriber dest) const
219         -> decltype(timeout_observer<Subscriber>::make(std::move(dest), initial)) {
220         return      timeout_observer<Subscriber>::make(std::move(dest), initial);
221     }
222 };
223 
224 }
225 
226 /*! @copydoc rx-timeout.hpp
227 */
228 template<class... AN>
timeout(AN &&...an)229 auto timeout(AN&&... an)
230     ->      operator_factory<timeout_tag, AN...> {
231      return operator_factory<timeout_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
232 }
233 
234 }
235 
236 template<>
237 struct member_overload<timeout_tag>
238 {
239     template<class Observable, class Duration,
240         class Enabled = rxu::enable_if_all_true_type_t<
241             is_observable<Observable>,
242             rxu::is_duration<Duration>>,
243         class SourceValue = rxu::value_type_t<Observable>,
244         class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload245     static auto member(Observable&& o, Duration&& d)
246         -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), identity_current_thread()))) {
247         return      o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), identity_current_thread()));
248     }
249 
250     template<class Observable, class Coordination, class Duration,
251         class Enabled = rxu::enable_if_all_true_type_t<
252             is_observable<Observable>,
253             is_coordination<Coordination>,
254             rxu::is_duration<Duration>>,
255         class SourceValue = rxu::value_type_t<Observable>,
256         class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload257     static auto member(Observable&& o, Coordination&& cn, Duration&& d)
258         -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
259         return      o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
260     }
261 
262     template<class Observable, class Coordination, class Duration,
263         class Enabled = rxu::enable_if_all_true_type_t<
264             is_observable<Observable>,
265             is_coordination<Coordination>,
266             rxu::is_duration<Duration>>,
267         class SourceValue = rxu::value_type_t<Observable>,
268         class Timeout = rxo::detail::timeout<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload269     static auto member(Observable&& o, Duration&& d, Coordination&& cn)
270         -> decltype(o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
271         return      o.template lift<SourceValue>(Timeout(std::forward<Duration>(d), std::forward<Coordination>(cn)));
272     }
273 
274     template<class... AN>
memberrxcpp::member_overload275     static operators::detail::timeout_invalid_t<AN...> member(const AN&...) {
276         std::terminate();
277         return {};
278         static_assert(sizeof...(AN) == 10000, "timeout takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
279     }
280 };
281 
282 }
283 
284 #endif
285