1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-time_interval.hpp
6 
7     \brief Returns an observable that emits indications of the amount of time lapsed between consecutive emissions of the source observable.
8            The first emission from this new Observable indicates the amount of time lapsed between the time when the observer subscribed to the Observable and the time when the source Observable emitted its first item.
9 
10     \tparam Coordination  the type of the scheduler.
11 
12     \param coordination  the scheduler for time intervals.
13 
14     \return  Observable that emits a time_duration to indicate the amount of time lapsed between pairs of emissions.
15 
16     \sample
17     \snippet time_interval.cpp time_interval sample
18     \snippet output.txt time_interval sample
19 */
20 
21 #if !defined(RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP)
22 #define RXCPP_OPERATORS_RX_TIME_INTERVAL_HPP
23 
24 #include "../rx-includes.hpp"
25 
26 namespace rxcpp {
27 
28 namespace operators {
29 
30 namespace detail {
31 
32 template<class... AN>
33 struct time_interval_invalid_arguments {};
34 
35 template<class... AN>
36 struct time_interval_invalid : public rxo::operator_base<time_interval_invalid_arguments<AN...>> {
37     using type = observable<time_interval_invalid_arguments<AN...>, time_interval_invalid<AN...>>;
38 };
39 template<class... AN>
40 using time_interval_invalid_t = typename time_interval_invalid<AN...>::type;
41 
42 template<class T, class Coordination>
43 struct time_interval
44 {
45     typedef rxu::decay_t<T> source_value_type;
46     typedef rxu::decay_t<Coordination> coordination_type;
47 
48     struct time_interval_values {
time_interval_valuesrxcpp::operators::detail::time_interval::time_interval_values49         time_interval_values(coordination_type c)
50             : coordination(c)
51         {
52         }
53 
54         coordination_type coordination;
55     };
56     time_interval_values initial;
57 
time_intervalrxcpp::operators::detail::time_interval58     time_interval(coordination_type coordination)
59         : initial(coordination)
60     {
61     }
62 
63     template<class Subscriber>
64     struct time_interval_observer
65     {
66         typedef time_interval_observer<Subscriber> this_type;
67         typedef source_value_type value_type;
68         typedef rxu::decay_t<Subscriber> dest_type;
69         typedef observer<value_type, this_type> observer_type;
70         typedef rxsc::scheduler::clock_type::time_point time_point;
71         dest_type dest;
72         coordination_type coord;
73         mutable time_point last;
74 
time_interval_observerrxcpp::operators::detail::time_interval::time_interval_observer75         time_interval_observer(dest_type d, coordination_type coordination)
76             : dest(std::move(d)),
77               coord(std::move(coordination)),
78               last(coord.now())
79         {
80         }
81 
on_nextrxcpp::operators::detail::time_interval::time_interval_observer82         void on_next(source_value_type) const {
83             time_point now = coord.now();
84             dest.on_next(now - last);
85             last = now;
86         }
on_errorrxcpp::operators::detail::time_interval::time_interval_observer87         void on_error(rxu::error_ptr e) const {
88             dest.on_error(e);
89         }
on_completedrxcpp::operators::detail::time_interval::time_interval_observer90         void on_completed() const {
91             dest.on_completed();
92         }
93 
makerxcpp::operators::detail::time_interval::time_interval_observer94         static subscriber<value_type, observer_type> make(dest_type d, time_interval_values v) {
95             return make_subscriber<value_type>(d, this_type(d, v.coordination));
96         }
97     };
98 
99     template<class Subscriber>
operator ()rxcpp::operators::detail::time_interval100     auto operator()(Subscriber dest) const
101         -> decltype(time_interval_observer<Subscriber>::make(std::move(dest), initial)) {
102         return      time_interval_observer<Subscriber>::make(std::move(dest), initial);
103     }
104 };
105 
106 }
107 
108 /*! @copydoc rx-time_interval.hpp
109 */
110 template<class... AN>
time_interval(AN &&...an)111 auto time_interval(AN&&... an)
112     ->      operator_factory<time_interval_tag, AN...> {
113      return operator_factory<time_interval_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
114 }
115 
116 }
117 
118 template<>
119 struct member_overload<time_interval_tag>
120 {
121     template<class Observable,
122         class Enabled = rxu::enable_if_all_true_type_t<
123             is_observable<Observable>>,
124         class SourceValue = rxu::value_type_t<Observable>,
125         class TimeInterval = rxo::detail::time_interval<SourceValue, identity_one_worker>,
126         class Value = typename rxsc::scheduler::clock_type::time_point::duration>
memberrxcpp::member_overload127     static auto member(Observable&& o)
128         -> decltype(o.template lift<Value>(TimeInterval(identity_current_thread()))) {
129         return      o.template lift<Value>(TimeInterval(identity_current_thread()));
130     }
131 
132     template<class Observable, class Coordination,
133         class Enabled = rxu::enable_if_all_true_type_t<
134             is_observable<Observable>,
135             is_coordination<Coordination>>,
136         class SourceValue = rxu::value_type_t<Observable>,
137         class TimeInterval = rxo::detail::time_interval<SourceValue, rxu::decay_t<Coordination>>,
138         class Value = typename rxsc::scheduler::clock_type::time_point::duration>
memberrxcpp::member_overload139     static auto member(Observable&& o, Coordination&& cn)
140         -> decltype(o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)))) {
141         return      o.template lift<Value>(TimeInterval(std::forward<Coordination>(cn)));
142     }
143 
144     template<class... AN>
memberrxcpp::member_overload145     static operators::detail::time_interval_invalid_t<AN...> member(AN...) {
146         std::terminate();
147         return {};
148         static_assert(sizeof...(AN) == 10000, "time_interval takes (optional Coordination)");
149     }
150 };
151 
152 }
153 
154 #endif
155