1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-debounce.hpp
6 
7     \brief  Return an observable that emits an item if a particular timespan has passed without emitting another item from the source observable.
8 
9     \tparam Duration      the type of the time interval
10     \tparam Coordination  the type of the scheduler
11 
12     \param period        the period of time to suppress any emitted items
13     \param coordination  the scheduler to manage timeout for each event
14 
15     \return  Observable that emits an item if a particular timespan has passed without emitting another item from the source observable.
16 
17     \sample
18     \snippet debounce.cpp debounce sample
19     \snippet output.txt debounce sample
20 */
21 
22 #if !defined(RXCPP_OPERATORS_RX_DEBOUNCE_HPP)
23 #define RXCPP_OPERATORS_RX_DEBOUNCE_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct debounce_invalid_arguments {};
35 
36 template<class... AN>
37 struct debounce_invalid : public rxo::operator_base<debounce_invalid_arguments<AN...>> {
38     using type = observable<debounce_invalid_arguments<AN...>, debounce_invalid<AN...>>;
39 };
40 template<class... AN>
41 using debounce_invalid_t = typename debounce_invalid<AN...>::type;
42 
43 template<class T, class Duration, class Coordination>
44 struct debounce
45 {
46     typedef rxu::decay_t<T> source_value_type;
47     typedef rxu::decay_t<Coordination> coordination_type;
48     typedef typename coordination_type::coordinator_type coordinator_type;
49     typedef rxu::decay_t<Duration> duration_type;
50 
51     struct debounce_values
52     {
debounce_valuesrxcpp::operators::detail::debounce::debounce_values53         debounce_values(duration_type p, coordination_type c)
54             : period(p)
55             , coordination(c)
56         {
57         }
58 
59         duration_type period;
60         coordination_type coordination;
61     };
62     debounce_values initial;
63 
debouncerxcpp::operators::detail::debounce64     debounce(duration_type period, coordination_type coordination)
65         : initial(period, coordination)
66     {
67     }
68 
69     template<class Subscriber>
70     struct debounce_observer
71     {
72         typedef debounce_observer<Subscriber> this_type;
73         typedef rxu::decay_t<T> value_type;
74         typedef rxu::decay_t<Subscriber> dest_type;
75         typedef observer<T, this_type> observer_type;
76 
77         struct debounce_subscriber_values : public debounce_values
78         {
debounce_subscriber_valuesrxcpp::operators::detail::debounce::debounce_observer::debounce_subscriber_values79             debounce_subscriber_values(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
80                 : debounce_values(v)
81                 , cs(std::move(cs))
82                 , dest(std::move(d))
83                 , coordinator(std::move(c))
84                 , worker(coordinator.get_worker())
85                 , index(0)
86             {
87             }
88 
89             composite_subscription cs;
90             dest_type dest;
91             coordinator_type coordinator;
92             rxsc::worker worker;
93             mutable std::size_t index;
94             mutable rxu::maybe<value_type> value;
95         };
96         typedef std::shared_ptr<debounce_subscriber_values> state_type;
97         state_type state;
98 
debounce_observerrxcpp::operators::detail::debounce::debounce_observer99         debounce_observer(composite_subscription cs, dest_type d, debounce_values v, coordinator_type c)
100             : state(std::make_shared<debounce_subscriber_values>(debounce_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
101         {
102             auto localState = state;
103 
104             auto disposer = [=](const rxsc::schedulable&){
105                 localState->cs.unsubscribe();
106                 localState->dest.unsubscribe();
107                 localState->worker.unsubscribe();
108             };
109             auto selectedDisposer = on_exception(
110                 [&](){ return localState->coordinator.act(disposer); },
111                 localState->dest);
112             if (selectedDisposer.empty()) {
113                 return;
114             }
115 
116             localState->dest.add([=](){
117                 localState->worker.schedule(selectedDisposer.get());
118             });
119             localState->cs.add([=](){
120                 localState->worker.schedule(selectedDisposer.get());
121             });
122         }
123 
produce_itemrxcpp::operators::detail::debounce::debounce_observer124         static std::function<void(const rxsc::schedulable&)> produce_item(std::size_t id, state_type state) {
125             auto produce = [id, state](const rxsc::schedulable&) {
126                 if(id != state->index)
127                     return;
128 
129                 state->dest.on_next(*state->value);
130                 state->value.reset();
131             };
132 
133             auto selectedProduce = on_exception(
134                     [&](){ return state->coordinator.act(produce); },
135                     state->dest);
136             if (selectedProduce.empty()) {
137                 return std::function<void(const rxsc::schedulable&)>();
138             }
139 
140             return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
141         }
142 
on_nextrxcpp::operators::detail::debounce::debounce_observer143         void on_next(T v) const {
144             auto localState = state;
145             auto work = [v, localState](const rxsc::schedulable&) {
146                 auto new_id = ++localState->index;
147                 auto produce_time = localState->worker.now() + localState->period;
148 
149                 localState->value.reset(v);
150                 localState->worker.schedule(produce_time, produce_item(new_id, localState));
151             };
152             auto selectedWork = on_exception(
153                 [&](){return localState->coordinator.act(work);},
154                 localState->dest);
155             if (selectedWork.empty()) {
156                 return;
157             }
158             localState->worker.schedule(selectedWork.get());
159         }
160 
on_errorrxcpp::operators::detail::debounce::debounce_observer161         void on_error(rxu::error_ptr e) const {
162             auto localState = state;
163             auto work = [e, localState](const rxsc::schedulable&) {
164                 localState->dest.on_error(e);
165                 localState->value.reset();
166             };
167             auto selectedWork = on_exception(
168                 [&](){ return localState->coordinator.act(work); },
169                 localState->dest);
170             if (selectedWork.empty()) {
171                 return;
172             }
173             localState->worker.schedule(selectedWork.get());
174         }
175 
on_completedrxcpp::operators::detail::debounce::debounce_observer176         void on_completed() const {
177             auto localState = state;
178             auto work = [localState](const rxsc::schedulable&) {
179                 if(!localState->value.empty()) {
180                     localState->dest.on_next(*localState->value);
181                 }
182                 localState->dest.on_completed();
183             };
184             auto selectedWork = on_exception(
185                 [&](){ return localState->coordinator.act(work); },
186                 localState->dest);
187             if (selectedWork.empty()) {
188                 return;
189             }
190             localState->worker.schedule(selectedWork.get());
191         }
192 
makerxcpp::operators::detail::debounce::debounce_observer193         static subscriber<T, observer_type> make(dest_type d, debounce_values v) {
194             auto cs = composite_subscription();
195             auto coordinator = v.coordination.create_coordinator();
196 
197             return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
198         }
199     };
200 
201     template<class Subscriber>
operator ()rxcpp::operators::detail::debounce202     auto operator()(Subscriber dest) const
203         -> decltype(debounce_observer<Subscriber>::make(std::move(dest), initial)) {
204         return      debounce_observer<Subscriber>::make(std::move(dest), initial);
205     }
206 };
207 
208 }
209 
210 /*! @copydoc rx-debounce.hpp
211 */
212 template<class... AN>
debounce(AN &&...an)213 auto debounce(AN&&... an)
214     ->      operator_factory<debounce_tag, AN...> {
215      return operator_factory<debounce_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
216 }
217 
218 }
219 
220 template<>
221 struct member_overload<debounce_tag>
222 {
223     template<class Observable, class Duration,
224         class Enabled = rxu::enable_if_all_true_type_t<
225             is_observable<Observable>,
226             rxu::is_duration<Duration>>,
227         class SourceValue = rxu::value_type_t<Observable>,
228         class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, identity_one_worker>>
memberrxcpp::member_overload229     static auto member(Observable&& o, Duration&& d)
230         -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), identity_current_thread()))) {
231         return      o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), identity_current_thread()));
232     }
233 
234     template<class Observable, class Coordination, class Duration,
235         class Enabled = rxu::enable_if_all_true_type_t<
236             is_observable<Observable>,
237             is_coordination<Coordination>,
238             rxu::is_duration<Duration>>,
239         class SourceValue = rxu::value_type_t<Observable>,
240         class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload241     static auto member(Observable&& o, Coordination&& cn, Duration&& d)
242         -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
243         return      o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
244     }
245 
246     template<class Observable, class Coordination, class Duration,
247         class Enabled = rxu::enable_if_all_true_type_t<
248             is_observable<Observable>,
249             is_coordination<Coordination>,
250             rxu::is_duration<Duration>>,
251         class SourceValue = rxu::value_type_t<Observable>,
252         class Debounce = rxo::detail::debounce<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>>
memberrxcpp::member_overload253     static auto member(Observable&& o, Duration&& d, Coordination&& cn)
254         -> decltype(o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)))) {
255         return      o.template lift<SourceValue>(Debounce(std::forward<Duration>(d), std::forward<Coordination>(cn)));
256     }
257 
258     template<class... AN>
memberrxcpp::member_overload259     static operators::detail::debounce_invalid_t<AN...> member(const AN&...) {
260         std::terminate();
261         return {};
262         static_assert(sizeof...(AN) == 10000, "debounce takes (optional Coordination, required Duration) or (required Duration, optional Coordination)");
263     }
264 };
265 
266 }
267 
268 #endif
269