1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-delay.hpp
6 
7     \brief Return an observable that emits each item emitted by the source observable after the specified delay.
8 
9     \tparam Duration      the type of time interval
10     \tparam Coordination  the type of the scheduler
11 
12     \param period        the period of time each item is delayed
13     \param coordination  the scheduler for the delays
14 
15     \return  Observable that emits each item emitted by the source observable after the specified delay.
16 
17     \sample
18     \snippet delay.cpp delay period+coordination sample
19     \snippet output.txt delay period+coordination sample
20 */
21 
22 #if !defined(RXCPP_OPERATORS_RX_DELAY_HPP)
23 #define RXCPP_OPERATORS_RX_DELAY_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct delay_invalid_arguments {};
35 
36 template<class... AN>
37 struct delay_invalid : public rxo::operator_base<delay_invalid_arguments<AN...>> {
38     using type = observable<delay_invalid_arguments<AN...>, delay_invalid<AN...>>;
39 };
40 template<class... AN>
41 using delay_invalid_t = typename delay_invalid<AN...>::type;
42 
43 template<class T, class Duration, class Coordination>
44 struct delay
45 {
46     typedef rxu::decay_t<T> source_value_type;
47     typedef rxu::decay_t<Coordination> coordination_type;
48     typedef typename coordination_type::coordinator_type coordinator_type;
49     typedef rxu::decay_t<Duration> duration_type;
50 
51     struct delay_values
52     {
delay_valuesrxcpp::operators::detail::delay::delay_values53         delay_values(duration_type p, coordination_type c)
54             : period(p)
55             , coordination(c)
56         {
57         }
58         duration_type period;
59         coordination_type coordination;
60     };
61     delay_values initial;
62 
delayrxcpp::operators::detail::delay63     delay(duration_type period, coordination_type coordination)
64         : initial(period, coordination)
65     {
66     }
67 
68     template<class Subscriber>
69     struct delay_observer
70     {
71         typedef delay_observer<Subscriber> this_type;
72         typedef rxu::decay_t<T> value_type;
73         typedef rxu::decay_t<Subscriber> dest_type;
74         typedef observer<T, this_type> observer_type;
75 
76         struct delay_subscriber_values : public delay_values
77         {
delay_subscriber_valuesrxcpp::operators::detail::delay::delay_observer::delay_subscriber_values78             delay_subscriber_values(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
79                 : delay_values(v)
80                 , cs(std::move(cs))
81                 , dest(std::move(d))
82                 , coordinator(std::move(c))
83                 , worker(coordinator.get_worker())
84                 , expected(worker.now())
85             {
86             }
87             composite_subscription cs;
88             dest_type dest;
89             coordinator_type coordinator;
90             rxsc::worker worker;
91             rxsc::scheduler::clock_type::time_point expected;
92         };
93         std::shared_ptr<delay_subscriber_values> state;
94 
delay_observerrxcpp::operators::detail::delay::delay_observer95         delay_observer(composite_subscription cs, dest_type d, delay_values v, coordinator_type c)
96             : state(std::make_shared<delay_subscriber_values>(delay_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
97         {
98             auto localState = state;
99 
100             auto disposer = [=](const rxsc::schedulable&){
101                 localState->cs.unsubscribe();
102                 localState->dest.unsubscribe();
103                 localState->worker.unsubscribe();
104             };
105             auto selectedDisposer = on_exception(
106                 [&](){return localState->coordinator.act(disposer);},
107                 localState->dest);
108             if (selectedDisposer.empty()) {
109                 return;
110             }
111 
112             localState->dest.add([=](){
113                 localState->worker.schedule(selectedDisposer.get());
114             });
115             localState->cs.add([=](){
116                 localState->worker.schedule(localState->worker.now() + localState->period, selectedDisposer.get());
117             });
118         }
119 
on_nextrxcpp::operators::detail::delay::delay_observer120         void on_next(T v) const {
121             auto localState = state;
122             auto work = [v, localState](const rxsc::schedulable&){
123                 localState->dest.on_next(v);
124             };
125             auto selectedWork = on_exception(
126                 [&](){return localState->coordinator.act(work);},
127                 localState->dest);
128             if (selectedWork.empty()) {
129                 return;
130             }
131             localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
132         }
133 
on_errorrxcpp::operators::detail::delay::delay_observer134         void on_error(rxu::error_ptr e) const {
135             auto localState = state;
136             auto work = [e, localState](const rxsc::schedulable&){
137                 localState->dest.on_error(e);
138             };
139             auto selectedWork = on_exception(
140                 [&](){return localState->coordinator.act(work);},
141                 localState->dest);
142             if (selectedWork.empty()) {
143                 return;
144             }
145             localState->worker.schedule(selectedWork.get());
146         }
147 
on_completedrxcpp::operators::detail::delay::delay_observer148         void on_completed() const {
149             auto localState = state;
150             auto work = [localState](const rxsc::schedulable&){
151                 localState->dest.on_completed();
152             };
153             auto selectedWork = on_exception(
154                 [&](){return localState->coordinator.act(work);},
155                 localState->dest);
156             if (selectedWork.empty()) {
157                 return;
158             }
159             localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
160         }
161 
makerxcpp::operators::detail::delay::delay_observer162         static subscriber<T, observer_type> make(dest_type d, delay_values v) {
163             auto cs = composite_subscription();
164             auto coordinator = v.coordination.create_coordinator();
165 
166             return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
167         }
168     };
169 
170     template<class Subscriber>
operator ()rxcpp::operators::detail::delay171     auto operator()(Subscriber dest) const
172         -> decltype(delay_observer<Subscriber>::make(std::move(dest), initial)) {
173         return      delay_observer<Subscriber>::make(std::move(dest), initial);
174     }
175 };
176 
177 }
178 
179 /*! @copydoc rx-delay.hpp
180 */
181 template<class... AN>
delay(AN &&...an)182 auto delay(AN&&... an)
183     ->      operator_factory<delay_tag, AN...> {
184      return operator_factory<delay_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
185 }
186 
187 }
188 
189 template<>
190 struct member_overload<delay_tag>
191 {
192     template<class Observable, class Duration,
193         class Enabled = rxu::enable_if_all_true_type_t<
194             is_observable<Observable>,
195             rxu::is_duration<Duration>>,
196         class SourceValue = rxu::value_type_t<Observable>,
197         class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload198     static auto member(Observable&& o, Duration&& d)
199         -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), identity_current_thread()))) {
200         return      o.template lift<SourceValue>(delay(std::forward<Duration>(d), identity_current_thread()));
201     }
202 
203     template<class Observable, class Coordination, class Duration,
204         class Enabled = rxu::enable_if_all_true_type_t<
205             is_observable<Observable>,
206             is_coordination<Coordination>,
207             rxu::is_duration<Duration>>,
208         class SourceValue = rxu::value_type_t<Observable>,
209         class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload210     static auto member(Observable&& o, Coordination&& cn, Duration&& d)
211         -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
212         return      o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
213     }
214 
215     template<class Observable, class Coordination, class Duration,
216         class Enabled = rxu::enable_if_all_true_type_t<
217             is_observable<Observable>,
218             is_coordination<Coordination>,
219             rxu::is_duration<Duration>>,
220         class SourceValue = rxu::value_type_t<Observable>,
221         class delay = rxo::detail::delay<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload222     static auto member(Observable&& o, Duration&& d, Coordination&& cn)
223         -> decltype(o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
224         return      o.template lift<SourceValue>(delay(std::forward<Duration>(d), std::forward<Coordination>(cn)));
225     }
226 
227     template<class... AN>
memberrxcpp::member_overload228     static operators::detail::delay_invalid_t<AN...> member(const AN&...) {
229         std::terminate();
230         return {};
231         static_assert(sizeof...(AN) == 10000, "delay takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
232     }
233 };
234 
235 }
236 
237 #endif
238