1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-take_while.hpp
6 
7     \brief For the first items fulfilling the predicate from this observable emit them from the new observable that is returned.
8 
9     \tparam Predicate  the type of the predicate
10 
11     \param t  the predicate
12 
13     \return  An observable that emits only the first items emitted by the source Observable fulfilling the predicate, or all of the items from the source observable if the predicate never returns false
14 
15     \sample
16     \snippet take_while.cpp take_while sample
17     \snippet output.txt take_while sample
18 */
19 
20 #if !defined(RXCPP_OPERATORS_RX_TAKE_WHILE_HPP)
21 #define RXCPP_OPERATORS_RX_TAKE_WHILE_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct take_while_invalid_arguments {};
33 
34 template<class... AN>
35 struct take_while_invalid : public rxo::operator_base<take_while_invalid_arguments<AN...>> {
36     using type = observable<take_while_invalid_arguments<AN...>, take_while_invalid<AN...>>;
37 };
38 template<class... AN>
39 using take_while_invalid_t = typename take_while_invalid<AN...>::type;
40 
41 template<class T, class Predicate>
42 struct take_while
43 {
44     typedef rxu::decay_t<T> source_value_type;
45     typedef rxu::decay_t<Predicate> test_type;
46     test_type test;
47 
48 
take_whilerxcpp::operators::detail::take_while49     take_while(test_type t)
50         : test(std::move(t))
51     {
52     }
53 
54     template<class Subscriber>
55     struct take_while_observer
56     {
57         typedef take_while_observer<Subscriber> this_type;
58         typedef source_value_type value_type;
59         typedef rxu::decay_t<Subscriber> dest_type;
60         typedef observer<value_type, this_type> observer_type;
61         dest_type dest;
62         test_type test;
63 
take_while_observerrxcpp::operators::detail::take_while::take_while_observer64         take_while_observer(dest_type d, test_type t)
65                 : dest(std::move(d))
66                 , test(std::move(t))
67         {
68         }
on_nextrxcpp::operators::detail::take_while::take_while_observer69         void on_next(source_value_type v) const {
70             if (test(v)) {
71                 dest.on_next(v);
72             } else {
73                 dest.on_completed();
74             }
75         }
on_errorrxcpp::operators::detail::take_while::take_while_observer76         void on_error(rxu::error_ptr e) const {
77             dest.on_error(e);
78         }
on_completedrxcpp::operators::detail::take_while::take_while_observer79         void on_completed() const {
80             dest.on_completed();
81         }
82 
makerxcpp::operators::detail::take_while::take_while_observer83         static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
84             return make_subscriber<value_type>(d, this_type(d, std::move(t)));
85         }
86     };
87 
88     template<class Subscriber>
operator ()rxcpp::operators::detail::take_while89     auto operator()(Subscriber dest) const
90     -> decltype(take_while_observer<Subscriber>::make(std::move(dest), test)) {
91         return  take_while_observer<Subscriber>::make(std::move(dest), test);
92     }
93 };
94 
95 }
96 
97 /*! @copydoc rx-take_while.hpp
98 */
99 template<class... AN>
take_while(AN &&...an)100 auto take_while(AN&&... an)
101     ->      operator_factory<take_while_tag, AN...> {
102         return operator_factory<take_while_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
103     }
104 
105 }
106 
107 template<>
108 struct member_overload<take_while_tag>
109 {
110     template<class Observable, class Predicate,
111             class SourceValue = rxu::value_type_t<Observable>,
112             class TakeWhile = rxo::detail::take_while<SourceValue, rxu::decay_t<Predicate>>>
memberrxcpp::member_overload113     static auto member(Observable&& o, Predicate&& p)
114     -> decltype(o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p)))) {
115         return      o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p)));
116     }
117 
118     template<class... AN>
memberrxcpp::member_overload119     static operators::detail::take_while_invalid_t<AN...> member(const AN&...) {
120         std::terminate();
121         return {};
122         static_assert(sizeof...(AN) == 10000, "take_while takes (Predicate)");
123     }
124 };
125 
126 
127 }
128 
129 #endif
130