1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-buffer_count.hpp
6 
7     \brief Return an observable that emits connected, non-overlapping buffer, each containing at most count items from the source observable.
8            If the skip parameter is set, return an observable that emits buffers every skip items containing at most count items from the source observable.
9 
10     \param count  the maximum size of each buffers before it should be emitted.
11     \param skip   how many items need to be skipped before starting a new buffers (optional).
12 
13     \return  Observable that emits connected, non-overlapping buffers, each containing at most count items from the source observable.
14              If the skip parameter is set, return an Observable that emits buffers every skip items containing at most count items from the source observable.
15 
16     \sample
17     \snippet buffer.cpp buffer count sample
18     \snippet output.txt buffer count sample
19 
20     \sample
21     \snippet buffer.cpp buffer count+skip sample
22     \snippet output.txt buffer count+skip sample
23 */
24 
25 #if !defined(RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP)
26 #define RXCPP_OPERATORS_RX_BUFFER_COUNT_HPP
27 
28 #include "../rx-includes.hpp"
29 
30 namespace rxcpp {
31 
32 namespace operators {
33 
34 namespace detail {
35 
36 template<class... AN>
37 struct buffer_count_invalid_arguments {};
38 
39 template<class... AN>
40 struct buffer_count_invalid : public rxo::operator_base<buffer_count_invalid_arguments<AN...>> {
41     using type = observable<buffer_count_invalid_arguments<AN...>, buffer_count_invalid<AN...>>;
42 };
43 template<class... AN>
44 using buffer_count_invalid_t = typename buffer_count_invalid<AN...>::type;
45 
46 template<class T>
47 struct buffer_count
48 {
49     typedef rxu::decay_t<T> source_value_type;
50     typedef std::vector<source_value_type> value_type;
51 
52     struct buffer_count_values
53     {
buffer_count_valuesrxcpp::operators::detail::buffer_count::buffer_count_values54         buffer_count_values(int c, int s)
55             : count(c)
56             , skip(s)
57         {
58         }
59         int count;
60         int skip;
61     };
62 
63     buffer_count_values initial;
64 
buffer_countrxcpp::operators::detail::buffer_count65     buffer_count(int count, int skip)
66         : initial(count, skip)
67     {
68     }
69 
70     template<class Subscriber>
71     struct buffer_count_observer : public buffer_count_values
72     {
73         typedef buffer_count_observer<Subscriber> this_type;
74         typedef std::vector<T> value_type;
75         typedef rxu::decay_t<Subscriber> dest_type;
76         typedef observer<value_type, this_type> observer_type;
77         dest_type dest;
78         mutable int cursor;
79         mutable std::deque<value_type> chunks;
80 
buffer_count_observerrxcpp::operators::detail::buffer_count::buffer_count_observer81         buffer_count_observer(dest_type d, buffer_count_values v)
82             : buffer_count_values(v)
83             , dest(std::move(d))
84             , cursor(0)
85         {
86         }
on_nextrxcpp::operators::detail::buffer_count::buffer_count_observer87         void on_next(T v) const {
88             if (cursor++ % this->skip == 0) {
89                 chunks.emplace_back();
90             }
91             for(auto& chunk : chunks) {
92                 chunk.push_back(v);
93             }
94             while (!chunks.empty() && int(chunks.front().size()) == this->count) {
95                 dest.on_next(std::move(chunks.front()));
96                 chunks.pop_front();
97             }
98         }
on_errorrxcpp::operators::detail::buffer_count::buffer_count_observer99         void on_error(rxu::error_ptr e) const {
100             dest.on_error(e);
101         }
on_completedrxcpp::operators::detail::buffer_count::buffer_count_observer102         void on_completed() const {
103             auto done = on_exception(
104                 [&](){
105                     while (!chunks.empty()) {
106                         dest.on_next(std::move(chunks.front()));
107                         chunks.pop_front();
108                     }
109                     return true;
110                 },
111                 dest);
112             if (done.empty()) {
113                 return;
114             }
115             dest.on_completed();
116         }
117 
makerxcpp::operators::detail::buffer_count::buffer_count_observer118         static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_count_values v) {
119             auto cs = d.get_subscription();
120             return make_subscriber<T>(std::move(cs), this_type(std::move(d), std::move(v)));
121         }
122     };
123 
124     template<class Subscriber>
operator ()rxcpp::operators::detail::buffer_count125     auto operator()(Subscriber dest) const
126         -> decltype(buffer_count_observer<Subscriber>::make(std::move(dest), initial)) {
127         return      buffer_count_observer<Subscriber>::make(std::move(dest), initial);
128     }
129 };
130 
131 }
132 
133 /*! @copydoc rx-buffer_count.hpp
134 */
135 template<class... AN>
buffer(AN &&...an)136 auto buffer(AN&&... an)
137     ->      operator_factory<buffer_count_tag, AN...> {
138      return operator_factory<buffer_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
139 }
140 
141 }
142 
143 template<>
144 struct member_overload<buffer_count_tag>
145 {
146     template<class Observable,
147         class Enabled = rxu::enable_if_all_true_type_t<
148             is_observable<Observable>>,
149         class SourceValue = rxu::value_type_t<Observable>,
150         class BufferCount = rxo::detail::buffer_count<SourceValue>,
151         class Value = rxu::value_type_t<BufferCount>>
memberrxcpp::member_overload152     static auto member(Observable&& o, int count, int skip)
153         -> decltype(o.template lift<Value>(BufferCount(count, skip))) {
154         return      o.template lift<Value>(BufferCount(count, skip));
155     }
156 
157      template<class Observable,
158         class Enabled = rxu::enable_if_all_true_type_t<
159             is_observable<Observable>>,
160         class SourceValue = rxu::value_type_t<Observable>,
161         class BufferCount = rxo::detail::buffer_count<SourceValue>,
162         class Value = rxu::value_type_t<BufferCount>>
memberrxcpp::member_overload163     static auto member(Observable&& o, int count)
164         -> decltype(o.template lift<Value>(BufferCount(count, count))) {
165         return      o.template lift<Value>(BufferCount(count, count));
166     }
167 
168     template<class... AN>
memberrxcpp::member_overload169     static operators::detail::buffer_count_invalid_t<AN...> member(AN...) {
170         std::terminate();
171         return {};
172         static_assert(sizeof...(AN) == 10000, "buffer takes (Count, optional Skip)");
173     }
174 };
175 
176 }
177 
178 #endif
179