1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-scan.hpp
6 
7     \brief For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.
8 
9     \tparam Seed         the type of the initial value for the accumulator.
10     \tparam Accumulator  the type of the data accumulating function.
11 
12     \param seed  the initial value for the accumulator.
13     \param a     an accumulator function to be invoked on each item emitted by the source observable, whose result will be emitted and used in the next accumulator call.
14 
15     \return  An observable that emits the results of each call to the accumulator function.
16 
17     \sample
18     \snippet scan.cpp scan sample
19     \snippet output.txt scan sample
20 */
21 
22 #if !defined(RXCPP_OPERATORS_RX_SCAN_HPP)
23 #define RXCPP_OPERATORS_RX_SCAN_HPP
24 
25 #include "../rx-includes.hpp"
26 
27 namespace rxcpp {
28 
29 namespace operators {
30 
31 namespace detail {
32 
33 template<class... AN>
34 struct scan_invalid_arguments {};
35 
36 template<class... AN>
37 struct scan_invalid : public rxo::operator_base<scan_invalid_arguments<AN...>> {
38     using type = observable<scan_invalid_arguments<AN...>, scan_invalid<AN...>>;
39 };
40 template<class... AN>
41 using scan_invalid_t = typename scan_invalid<AN...>::type;
42 
43 template<class T, class Observable, class Accumulator, class Seed>
44 struct scan : public operator_base<rxu::decay_t<Seed>>
45 {
46     typedef rxu::decay_t<Observable> source_type;
47     typedef rxu::decay_t<Accumulator> accumulator_type;
48     typedef rxu::decay_t<Seed> seed_type;
49 
50     struct scan_initial_type
51     {
scan_initial_typerxcpp::operators::detail::scan::scan_initial_type52         scan_initial_type(source_type o, accumulator_type a, seed_type s)
53             : source(std::move(o))
54             , accumulator(std::move(a))
55             , seed(s)
56         {
57         }
58         source_type source;
59         accumulator_type accumulator;
60         seed_type seed;
61     };
62     scan_initial_type initial;
63 
scanrxcpp::operators::detail::scan64     scan(source_type o, accumulator_type a, seed_type s)
65         : initial(std::move(o), a, s)
66     {
67     }
68 
69     template<class Subscriber>
on_subscriberxcpp::operators::detail::scan70     void on_subscribe(Subscriber o) const {
71         struct scan_state_type
72             : public scan_initial_type
73             , public std::enable_shared_from_this<scan_state_type>
74         {
75             scan_state_type(scan_initial_type i, Subscriber scrbr)
76                 : scan_initial_type(i)
77                 , result(scan_initial_type::seed)
78                 , out(std::move(scrbr))
79             {
80             }
81             seed_type result;
82             Subscriber out;
83         };
84         auto state = std::make_shared<scan_state_type>(initial, std::move(o));
85         state->source.subscribe(
86             state->out,
87         // on_next
88             [state](T t) {
89                 state->result = state->accumulator(state->result, t);
90                 state->out.on_next(state->result);
91             },
92         // on_error
93             [state](rxu::error_ptr e) {
94                 state->out.on_error(e);
95             },
96         // on_completed
97             [state]() {
98                 state->out.on_completed();
99             }
100         );
101     }
102 };
103 
104 }
105 
106 /*! @copydoc rx-scan.hpp
107 */
108 template<class... AN>
scan(AN &&...an)109 auto scan(AN&&... an)
110     ->     operator_factory<scan_tag, AN...> {
111     return operator_factory<scan_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
112 }
113 
114 }
115 
116 template<>
117 struct member_overload<scan_tag>
118 {
119     template<class Observable, class Seed, class Accumulator,
120         class Enabled = rxu::enable_if_all_true_type_t<
121             is_observable<Observable>,
122             is_accumulate_function_for<rxu::value_type_t<Observable>, rxu::decay_t<Seed>, rxu::decay_t<Accumulator>>>,
123         class SourceValue = rxu::value_type_t<Observable>,
124         class Scan = rxo::detail::scan<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Accumulator>, rxu::decay_t<Seed>>,
125         class Value = rxu::value_type_t<Scan>,
126         class Result = observable<Value, Scan>>
memberrxcpp::member_overload127     static Result member(Observable&& o, Seed s, Accumulator&& a) {
128         return Result(Scan(std::forward<Observable>(o), std::forward<Accumulator>(a), s));
129     }
130 
131     template<class... AN>
memberrxcpp::member_overload132     static operators::detail::scan_invalid_t<AN...> member(AN...) {
133         std::terminate();
134         return {};
135         static_assert(sizeof...(AN) == 10000, "scan takes (Seed, Accumulator); Accumulator must be a function with the signature Seed(Seed, T)");
136     }
137 };
138 
139 }
140 
141 #endif
142