1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-distinct.hpp
6 
7     \brief For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
8 
9     \return Observable that emits those items from the source observable that are distinct.
10 
11     \note istinct keeps an unordered_set<T> of past values. Due to an issue in multiple implementations of std::hash<T>, rxcpp maintains a whitelist of hashable types. new types can be added by specializing rxcpp::filtered_hash<T>
12 
13     \sample
14     \snippet distinct.cpp distinct sample
15     \snippet output.txt distinct sample
16 */
17 
18 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_HPP)
19 #define RXCPP_OPERATORS_RX_DISTINCT_HPP
20 
21 #include "../rx-includes.hpp"
22 
23 namespace rxcpp {
24 
25 namespace operators {
26 
27 namespace detail {
28 
29 template<class... AN>
30 struct distinct_invalid_arguments {};
31 
32 template<class... AN>
33 struct distinct_invalid : public rxo::operator_base<distinct_invalid_arguments<AN...>> {
34     using type = observable<distinct_invalid_arguments<AN...>, distinct_invalid<AN...>>;
35 };
36 template<class... AN>
37 using distinct_invalid_t = typename distinct_invalid<AN...>::type;
38 
39 template<class T>
40 struct distinct
41 {
42     typedef rxu::decay_t<T> source_value_type;
43 
44     template<class Subscriber>
45     struct distinct_observer
46     {
47         typedef distinct_observer<Subscriber> this_type;
48         typedef source_value_type value_type;
49         typedef rxu::decay_t<Subscriber> dest_type;
50         typedef observer<value_type, this_type> observer_type;
51         dest_type dest;
52         mutable std::unordered_set<source_value_type, rxcpp::filtered_hash<source_value_type>> remembered;
53 
distinct_observerrxcpp::operators::detail::distinct::distinct_observer54         distinct_observer(dest_type d)
55                 : dest(d)
56         {
57         }
on_nextrxcpp::operators::detail::distinct::distinct_observer58         void on_next(source_value_type v) const {
59             if (remembered.empty() || remembered.count(v) == 0) {
60                 remembered.insert(v);
61                 dest.on_next(v);
62             }
63         }
on_errorrxcpp::operators::detail::distinct::distinct_observer64         void on_error(rxu::error_ptr e) const {
65             dest.on_error(e);
66         }
on_completedrxcpp::operators::detail::distinct::distinct_observer67         void on_completed() const {
68             dest.on_completed();
69         }
70 
makerxcpp::operators::detail::distinct::distinct_observer71         static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) {
72             return make_subscriber<value_type>(d, this_type(d));
73         }
74     };
75 
76     template<class Subscriber>
operator ()rxcpp::operators::detail::distinct77     auto operator()(Subscriber dest) const
78     -> decltype(distinct_observer<Subscriber>::make(std::move(dest))) {
79         return      distinct_observer<Subscriber>::make(std::move(dest));
80     }
81 };
82 
83 }
84 
85 /*! @copydoc rx-distinct.hpp
86 */
87 template<class... AN>
distinct(AN &&...an)88 auto distinct(AN&&... an)
89     ->     operator_factory<distinct_tag, AN...> {
90     return operator_factory<distinct_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
91 }
92 
93 }
94 
95 template<>
96 struct member_overload<distinct_tag>
97 {
98     template<class Observable,
99             class SourceValue = rxu::value_type_t<Observable>,
100             class Enabled = rxu::enable_if_all_true_type_t<
101                 is_observable<Observable>,
102                 is_hashable<SourceValue>>,
103             class Distinct = rxo::detail::distinct<SourceValue>>
memberrxcpp::member_overload104     static auto member(Observable&& o)
105     -> decltype(o.template lift<SourceValue>(Distinct())) {
106         return  o.template lift<SourceValue>(Distinct());
107     }
108 
109     template<class... AN>
memberrxcpp::member_overload110     static operators::detail::distinct_invalid_t<AN...> member(AN...) {
111         std::terminate();
112         return {};
113         static_assert(sizeof...(AN) == 10000, "distinct takes no arguments");
114     }
115 };
116 
117 }
118 
119 #endif
120