1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-distinct_until_changed.hpp
6 
7     \brief For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.
8 
9     \tparam BinaryPredicate (optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
10 
11     \param pred (optional) the function that implements comparison of two values.
12 
13     \return  Observable that emits those items from the source observable that are distinct from their immediate predecessors.
14 
15     \sample
16     \snippet distinct_until_changed.cpp distinct_until_changed sample
17     \snippet output.txt distinct_until_changed sample
18 */
19 
20 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP)
21 #define RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct distinct_until_changed_invalid_arguments {};
33 
34 template<class... AN>
35 struct distinct_until_changed_invalid : public rxo::operator_base<distinct_until_changed_invalid_arguments<AN...>> {
36     using type = observable<distinct_until_changed_invalid_arguments<AN...>, distinct_until_changed_invalid<AN...>>;
37 };
38 template<class... AN>
39 using distinct_until_changed_invalid_t = typename distinct_until_changed_invalid<AN...>::type;
40 
41 template<class T, class BinaryPredicate>
42 struct distinct_until_changed
43 {
44     typedef rxu::decay_t<T> source_value_type;
45     typedef rxu::decay_t<BinaryPredicate> predicate_type;
46 
47     predicate_type pred;
48 
distinct_until_changedrxcpp::operators::detail::distinct_until_changed49     distinct_until_changed(predicate_type p)
50     : pred(std::move(p))
51     {
52     }
53 
54     template<class Subscriber>
55     struct distinct_until_changed_observer
56     {
57         typedef distinct_until_changed_observer<Subscriber> this_type;
58         typedef source_value_type value_type;
59         typedef rxu::decay_t<Subscriber> dest_type;
60         typedef observer<value_type, this_type> observer_type;
61 
62         dest_type dest;
63         predicate_type pred;
64         mutable rxu::detail::maybe<source_value_type> remembered;
65 
distinct_until_changed_observerrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer66         distinct_until_changed_observer(dest_type d, predicate_type pred)
67             : dest(std::move(d))
68             , pred(std::move(pred))
69         {
70         }
on_nextrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer71         void on_next(source_value_type v) const {
72             if (remembered.empty() || !pred(v, remembered.get())) {
73                 remembered.reset(v);
74                 dest.on_next(v);
75             }
76         }
on_errorrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer77         void on_error(rxu::error_ptr e) const {
78             dest.on_error(e);
79         }
on_completedrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer80         void on_completed() const {
81             dest.on_completed();
82         }
83 
makerxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer84         static subscriber<value_type, observer_type> make(dest_type d, predicate_type p) {
85             return make_subscriber<value_type>(d, this_type(d, std::move(p)));
86         }
87     };
88 
89     template<class Subscriber>
operator ()rxcpp::operators::detail::distinct_until_changed90     auto operator()(Subscriber dest) const
91         -> decltype(distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred)) {
92         return      distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred);
93     }
94 };
95 
96 }
97 
98 /*! @copydoc rx-distinct_until_changed.hpp
99 */
100 template<class... AN>
distinct_until_changed(AN &&...an)101 auto distinct_until_changed(AN&&... an)
102     ->      operator_factory<distinct_until_changed_tag, AN...> {
103      return operator_factory<distinct_until_changed_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
104 }
105 
106 }
107 
108 template<>
109 struct member_overload<distinct_until_changed_tag>
110 {
111     template<class Observable,
112             class SourceValue = rxu::value_type_t<Observable>,
113             class Enabled = rxu::enable_if_all_true_type_t<
114                 is_observable<Observable>>,
115             class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, rxu::equal_to<>>>
memberrxcpp::member_overload116     static auto member(Observable&& o)
117     -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()))) {
118         return  o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()));
119     }
120 
121     template<class Observable,
122             class BinaryPredicate,
123             class SourceValue = rxu::value_type_t<Observable>,
124             class Enabled = rxu::enable_if_all_true_type_t<
125             is_observable<Observable>>,
126             class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, BinaryPredicate>>
memberrxcpp::member_overload127     static auto member(Observable&& o, BinaryPredicate&& pred)
128     -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)))) {
129         return  o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)));
130     }
131 
132     template<class... AN>
memberrxcpp::member_overload133     static operators::detail::distinct_until_changed_invalid_t<AN...> member(AN...) {
134         std::terminate();
135         return {};
136         static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes (optional BinaryPredicate)");
137     }
138 };
139 
140 }
141 
142 #endif
143