1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-filter.hpp
6 
7     \brief For each item from this observable use Predicate to select which items to emit from the new observable that is returned.
8 
9     \tparam Predicate  the type of the filter function
10 
11     \param p  the filter function
12 
13     \return  Observable that emits only those items emitted by the source observable that the filter evaluates as true.
14 
15     \sample
16     \snippet filter.cpp filter sample
17     \snippet output.txt filter sample
18 */
19 
20 #if !defined(RXCPP_OPERATORS_RX_FILTER_HPP)
21 #define RXCPP_OPERATORS_RX_FILTER_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct filter_invalid_arguments {};
33 
34 template<class... AN>
35 struct filter_invalid : public rxo::operator_base<filter_invalid_arguments<AN...>> {
36     using type = observable<filter_invalid_arguments<AN...>, filter_invalid<AN...>>;
37 };
38 template<class... AN>
39 using filter_invalid_t = typename filter_invalid<AN...>::type;
40 
41 template<class T, class Predicate>
42 struct filter
43 {
44     typedef rxu::decay_t<T> source_value_type;
45     typedef rxu::decay_t<Predicate> test_type;
46     test_type test;
47 
filterrxcpp::operators::detail::filter48     filter(test_type t)
49         : test(std::move(t))
50     {
51     }
52 
53     template<class Subscriber>
54     struct filter_observer
55     {
56         typedef filter_observer<Subscriber> this_type;
57         typedef source_value_type value_type;
58         typedef rxu::decay_t<Subscriber> dest_type;
59         typedef observer<value_type, this_type> observer_type;
60         dest_type dest;
61         mutable test_type test;
62 
filter_observerrxcpp::operators::detail::filter::filter_observer63         filter_observer(dest_type d, test_type t)
64             : dest(std::move(d))
65             , test(std::move(t))
66         {
67         }
68 
69         template <class Value>
on_nextrxcpp::operators::detail::filter::filter_observer70         void on_next(Value&& v) const {
71             auto filtered = on_exception([&](){
72                     return !this->test(rxu::as_const(v));
73                 },
74                 dest);
75             if (filtered.empty()) {
76                 return;
77             }
78             if (!filtered.get()) {
79                 dest.on_next(std::forward<Value>(v));
80             }
81         }
on_errorrxcpp::operators::detail::filter::filter_observer82         void on_error(rxu::error_ptr e) const {
83             dest.on_error(e);
84         }
on_completedrxcpp::operators::detail::filter::filter_observer85         void on_completed() const {
86             dest.on_completed();
87         }
88 
makerxcpp::operators::detail::filter::filter_observer89         static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
90             return make_subscriber<value_type>(d, this_type(d, std::move(t)));
91         }
92     };
93 
94     template<class Subscriber>
operator ()rxcpp::operators::detail::filter95     auto operator()(Subscriber dest) const
96         -> decltype(filter_observer<Subscriber>::make(std::move(dest), test)) {
97         return      filter_observer<Subscriber>::make(std::move(dest), test);
98     }
99 };
100 
101 }
102 
103 /*! @copydoc rx-filter.hpp
104 */
105 template<class... AN>
filter(AN &&...an)106 auto filter(AN&&... an)
107     ->      operator_factory<filter_tag, AN...> {
108      return operator_factory<filter_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
109 }
110 
111 }
112 
113 template<>
114 struct member_overload<filter_tag>
115 {
116     template<class Observable, class Predicate,
117         class SourceValue = rxu::value_type_t<Observable>,
118         class Filter = rxo::detail::filter<SourceValue, rxu::decay_t<Predicate>>>
memberrxcpp::member_overload119     static auto member(Observable&& o, Predicate&& p)
120         -> decltype(o.template lift<SourceValue>(Filter(std::forward<Predicate>(p)))) {
121         return      o.template lift<SourceValue>(Filter(std::forward<Predicate>(p)));
122     }
123 
124     template<class... AN>
memberrxcpp::member_overload125     static operators::detail::filter_invalid_t<AN...> member(const AN&...) {
126         std::terminate();
127         return {};
128         static_assert(sizeof...(AN) == 10000, "filter takes (Predicate)");
129     }
130 };
131 
132 }
133 
134 #endif
135