1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-buffer_time_count.hpp
6 
7     \brief Return an observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first), on the specified scheduler.
8 
9     \tparam Duration      the type of the time interval.
10     \tparam Coordination  the type of the scheduler (optional).
11 
12     \param period        the period of time each buffer collects items before it is emitted and replaced with a new buffer.
13     \param count         the maximum size of each buffer before it is emitted and new buffer is created.
14     \param coordination  the scheduler for the buffers (optional).
15 
16     \return  Observable that emits connected, non-overlapping buffers of items from the source observable that were emitted during a fixed duration of time or when the buffer has reached maximum capacity (whichever occurs first).
17 
18     \sample
19     \snippet buffer.cpp buffer period+count+coordination sample
20     \snippet output.txt buffer period+count+coordination sample
21 
22     \sample
23     \snippet buffer.cpp buffer period+count sample
24     \snippet output.txt buffer period+count sample
25 */
26 
27 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP)
28 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_OR_COUNT_HPP
29 
30 #include "../rx-includes.hpp"
31 
32 namespace rxcpp {
33 
34 namespace operators {
35 
36 namespace detail {
37 
38 template<class... AN>
39 struct buffer_with_time_or_count_invalid_arguments {};
40 
41 template<class... AN>
42 struct buffer_with_time_or_count_invalid : public rxo::operator_base<buffer_with_time_or_count_invalid_arguments<AN...>> {
43     using type = observable<buffer_with_time_or_count_invalid_arguments<AN...>, buffer_with_time_or_count_invalid<AN...>>;
44 };
45 template<class... AN>
46 using buffer_with_time_or_count_invalid_t = typename buffer_with_time_or_count_invalid<AN...>::type;
47 
48 template<class T, class Duration, class Coordination>
49 struct buffer_with_time_or_count
50 {
51     typedef rxu::decay_t<T> source_value_type;
52     typedef std::vector<source_value_type> value_type;
53     typedef rxu::decay_t<Coordination> coordination_type;
54     typedef typename coordination_type::coordinator_type coordinator_type;
55     typedef rxu::decay_t<Duration> duration_type;
56 
57     struct buffer_with_time_or_count_values
58     {
buffer_with_time_or_count_valuesrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_values59         buffer_with_time_or_count_values(duration_type p, int n, coordination_type c)
60             : period(p)
61             , count(n)
62             , coordination(c)
63         {
64         }
65         duration_type period;
66         int count;
67         coordination_type coordination;
68     };
69     buffer_with_time_or_count_values initial;
70 
buffer_with_time_or_countrxcpp::operators::detail::buffer_with_time_or_count71     buffer_with_time_or_count(duration_type period, int count, coordination_type coordination)
72         : initial(period, count, coordination)
73     {
74     }
75 
76     template<class Subscriber>
77     struct buffer_with_time_or_count_observer
78     {
79         typedef buffer_with_time_or_count_observer<Subscriber> this_type;
80         typedef std::vector<T> value_type;
81         typedef rxu::decay_t<Subscriber> dest_type;
82         typedef observer<value_type, this_type> observer_type;
83 
84         struct buffer_with_time_or_count_subscriber_values : public buffer_with_time_or_count_values
85         {
buffer_with_time_or_count_subscriber_valuesrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer::buffer_with_time_or_count_subscriber_values86             buffer_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
87                 : buffer_with_time_or_count_values(std::move(v))
88                 , cs(std::move(cs))
89                 , dest(std::move(d))
90                 , coordinator(std::move(c))
91                 , worker(coordinator.get_worker())
92                 , chunk_id(0)
93             {
94             }
95             composite_subscription cs;
96             dest_type dest;
97             coordinator_type coordinator;
98             rxsc::worker worker;
99             mutable int chunk_id;
100             mutable value_type chunk;
101         };
102         typedef std::shared_ptr<buffer_with_time_or_count_subscriber_values> state_type;
103         state_type state;
104 
buffer_with_time_or_count_observerrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer105         buffer_with_time_or_count_observer(composite_subscription cs, dest_type d, buffer_with_time_or_count_values v, coordinator_type c)
106             : state(std::make_shared<buffer_with_time_or_count_subscriber_values>(buffer_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
107         {
108             auto new_id = state->chunk_id;
109             auto produce_time = state->worker.now() + state->period;
110             auto localState = state;
111 
112             auto disposer = [=](const rxsc::schedulable&){
113                 localState->cs.unsubscribe();
114                 localState->dest.unsubscribe();
115                 localState->worker.unsubscribe();
116             };
117             auto selectedDisposer = on_exception(
118                 [&](){return localState->coordinator.act(disposer);},
119                 localState->dest);
120             if (selectedDisposer.empty()) {
121                 return;
122             }
123 
124             localState->dest.add([=](){
125                 localState->worker.schedule(selectedDisposer.get());
126             });
127             localState->cs.add([=](){
128                 localState->worker.schedule(selectedDisposer.get());
129             });
130 
131             //
132             // The scheduler is FIFO for any time T. Since the observer is scheduling
133             // on_next/on_error/oncompleted the timed schedule calls must be resheduled
134             // when they occur to ensure that production happens after on_next/on_error/oncompleted
135             //
136 
137             localState->worker.schedule(produce_time, [new_id, produce_time, localState](const rxsc::schedulable&){
138                 localState->worker.schedule(produce_buffer(new_id, produce_time, localState));
139             });
140         }
141 
produce_bufferrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer142         static std::function<void(const rxsc::schedulable&)> produce_buffer(int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
143             auto produce = [id, expected, state](const rxsc::schedulable&) {
144                 if (id != state->chunk_id)
145                     return;
146 
147                 state->dest.on_next(state->chunk);
148                 state->chunk.resize(0);
149                 auto new_id = ++state->chunk_id;
150                 auto produce_time = expected + state->period;
151                 state->worker.schedule(produce_time, [new_id, produce_time, state](const rxsc::schedulable&){
152                     state->worker.schedule(produce_buffer(new_id, produce_time, state));
153                 });
154             };
155 
156             auto selectedProduce = on_exception(
157                 [&](){return state->coordinator.act(produce);},
158                 state->dest);
159             if (selectedProduce.empty()) {
160                 return std::function<void(const rxsc::schedulable&)>();
161             }
162 
163             return std::function<void(const rxsc::schedulable&)>(selectedProduce.get());
164         }
165 
on_nextrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer166         void on_next(T v) const {
167             auto localState = state;
168             auto work = [v, localState](const rxsc::schedulable& self){
169                 localState->chunk.push_back(v);
170                 if (int(localState->chunk.size()) == localState->count) {
171                     produce_buffer(localState->chunk_id, localState->worker.now(), localState)(self);
172                 }
173             };
174             auto selectedWork = on_exception(
175                 [&](){return localState->coordinator.act(work);},
176                 localState->dest);
177             if (selectedWork.empty()) {
178                 return;
179             }
180             localState->worker.schedule(selectedWork.get());
181         }
on_errorrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer182         void on_error(rxu::error_ptr e) const {
183             auto localState = state;
184             auto work = [e, localState](const rxsc::schedulable&){
185                 localState->dest.on_error(e);
186             };
187             auto selectedWork = on_exception(
188                 [&](){return localState->coordinator.act(work);},
189                 localState->dest);
190             if (selectedWork.empty()) {
191                 return;
192             }
193             localState->worker.schedule(selectedWork.get());
194         }
on_completedrxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer195         void on_completed() const {
196             auto localState = state;
197             auto work = [localState](const rxsc::schedulable&){
198                 localState->dest.on_next(localState->chunk);
199                 localState->dest.on_completed();
200             };
201             auto selectedWork = on_exception(
202                 [&](){return localState->coordinator.act(work);},
203                 localState->dest);
204             if (selectedWork.empty()) {
205                 return;
206             }
207             localState->worker.schedule(selectedWork.get());
208         }
209 
makerxcpp::operators::detail::buffer_with_time_or_count::buffer_with_time_or_count_observer210         static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_or_count_values v) {
211             auto cs = composite_subscription();
212             auto coordinator = v.coordination.create_coordinator();
213 
214             return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
215         }
216     };
217 
218     template<class Subscriber>
operator ()rxcpp::operators::detail::buffer_with_time_or_count219     auto operator()(Subscriber dest) const
220         -> decltype(buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
221         return      buffer_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
222     }
223 };
224 
225 }
226 
227 /*! @copydoc rx-buffer_time_count.hpp
228 */
229 template<class... AN>
buffer_with_time_or_count(AN &&...an)230 auto buffer_with_time_or_count(AN&&... an)
231     ->      operator_factory<buffer_with_time_or_count_tag, AN...> {
232      return operator_factory<buffer_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
233 }
234 
235 }
236 
237 template<>
238 struct member_overload<buffer_with_time_or_count_tag>
239 {
240     template<class Observable, class Duration,
241         class Enabled = rxu::enable_if_all_true_type_t<
242             is_observable<Observable>,
243             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
244         class SourceValue = rxu::value_type_t<Observable>,
245         class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
246         class Value = rxu::value_type_t<BufferTimeCount>>
memberrxcpp::member_overload247     static auto member(Observable&& o, Duration&& period, int count)
248         -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) {
249         return      o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, identity_current_thread()));
250     }
251 
252     template<class Observable, class Duration, class Coordination,
253         class Enabled = rxu::enable_if_all_true_type_t<
254             is_observable<Observable>,
255             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
256             is_coordination<Coordination>>,
257         class SourceValue = rxu::value_type_t<Observable>,
258         class BufferTimeCount = rxo::detail::buffer_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
259         class Value = rxu::value_type_t<BufferTimeCount>>
memberrxcpp::member_overload260     static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn)
261         -> decltype(o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) {
262         return      o.template lift<Value>(BufferTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)));
263     }
264 
265     template<class... AN>
memberrxcpp::member_overload266     static operators::detail::buffer_with_time_or_count_invalid_t<AN...> member(AN...) {
267         std::terminate();
268         return {};
269         static_assert(sizeof...(AN) == 10000, "buffer_with_time_or_count takes (Duration, Count, optional Coordination)");
270     }
271 };
272 
273 }
274 
275 #endif
276