1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-pairwise.hpp
6 
7     \brief Take values pairwise from this observable.
8 
9     \return Observable that emits tuples of two the most recent items emitted by the source observable.
10 
11     \sample
12     \snippet pairwise.cpp pairwise sample
13     \snippet output.txt pairwise sample
14 
15     If the source observable emits less than two items, no pairs are emitted  by the source observable:
16     \snippet pairwise.cpp pairwise short sample
17     \snippet output.txt pairwise short sample
18 */
19 
20 #if !defined(RXCPP_OPERATORS_RX_PAIRWISE_HPP)
21 #define RXCPP_OPERATORS_RX_PAIRWISE_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct pairwise_invalid_arguments {};
33 
34 template<class... AN>
35 struct pairwise_invalid : public rxo::operator_base<pairwise_invalid_arguments<AN...>> {
36     using type = observable<pairwise_invalid_arguments<AN...>, pairwise_invalid<AN...>>;
37 };
38 template<class... AN>
39 using pairwise_invalid_t = typename pairwise_invalid<AN...>::type;
40 
41 template<class T>
42 struct pairwise
43 {
44     typedef rxu::decay_t<T> source_value_type;
45     typedef std::tuple<source_value_type, source_value_type> value_type;
46 
47     template<class Subscriber>
48     struct pairwise_observer
49     {
50         typedef pairwise_observer<Subscriber> this_type;
51         typedef std::tuple<source_value_type, source_value_type> value_type;
52         typedef rxu::decay_t<Subscriber> dest_type;
53         typedef observer<T, this_type> observer_type;
54         dest_type dest;
55         mutable rxu::detail::maybe<source_value_type> remembered;
56 
pairwise_observerrxcpp::operators::detail::pairwise::pairwise_observer57         pairwise_observer(dest_type d)
58             : dest(std::move(d))
59         {
60         }
on_nextrxcpp::operators::detail::pairwise::pairwise_observer61         void on_next(source_value_type v) const {
62             if (remembered.empty()) {
63                 remembered.reset(v);
64                 return;
65             }
66 
67             dest.on_next(std::make_tuple(remembered.get(), v));
68             remembered.reset(v);
69         }
on_errorrxcpp::operators::detail::pairwise::pairwise_observer70         void on_error(rxu::error_ptr e) const {
71             dest.on_error(e);
72         }
on_completedrxcpp::operators::detail::pairwise::pairwise_observer73         void on_completed() const {
74             dest.on_completed();
75         }
76 
makerxcpp::operators::detail::pairwise::pairwise_observer77         static subscriber<T, observer_type> make(dest_type d) {
78             auto cs = d.get_subscription();
79             return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d))));
80         }
81     };
82 
83     template<class Subscriber>
operator ()rxcpp::operators::detail::pairwise84     auto operator()(Subscriber dest) const
85         -> decltype(pairwise_observer<Subscriber>::make(std::move(dest))) {
86         return      pairwise_observer<Subscriber>::make(std::move(dest));
87     }
88 };
89 
90 }
91 
92 /*! @copydoc rx-pairwise.hpp
93 */
94 template<class... AN>
pairwise(AN &&...an)95 auto pairwise(AN&&... an)
96     ->     operator_factory<pairwise_tag, AN...> {
97     return operator_factory<pairwise_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
98 }
99 
100 }
101 
102 template<>
103 struct member_overload<pairwise_tag>
104 {
105     template<class Observable,
106         class Enabled = rxu::enable_if_all_true_type_t<
107             is_observable<Observable>>,
108         class SourceValue = rxu::value_type_t<Observable>,
109         class Pairwise = rxo::detail::pairwise<SourceValue>,
110         class Value = rxu::value_type_t<Pairwise>>
memberrxcpp::member_overload111     static auto member(Observable&& o)
112     -> decltype(o.template lift<Value>(Pairwise())) {
113         return  o.template lift<Value>(Pairwise());
114     }
115 
116     template<class... AN>
memberrxcpp::member_overload117     static operators::detail::pairwise_invalid_t<AN...> member(AN...) {
118         std::terminate();
119         return {};
120         static_assert(sizeof...(AN) == 10000, "pairwise takes no arguments");
121     }
122 };
123 
124 }
125 
126 #endif
127