1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-buffer_time.hpp
6 
7     \brief Return an observable that emits buffers every period time interval and collects items from this observable for period of time into each produced buffer.
8            If the skip parameter is set, Return an observable that emits buffers every skip time interval and collects items from this observable for period of time into each produced buffer, on the specified scheduler.
9 
10     \tparam Duration      the type of the time interval
11     \tparam Coordination  the type of the scheduler (optional).
12 
13     \param period        the period of time each buffer collects items before it is emitted.
14     \param skip          the period of time after which a new buffer will be created (optional).
15     \param coordination  the scheduler for the buffers (optional).
16 
17     \return  Observable that emits buffers every period time interval and collect items from this observable for period of time into each produced buffer.
18              If the skip parameter is set, return an Observable that emits buffers every skip time interval and collect items from this observable for period of time into each produced buffer.
19 
20     \sample
21     \snippet buffer.cpp buffer period+skip+coordination sample
22     \snippet output.txt buffer period+skip+coordination sample
23 
24     \sample
25     \snippet buffer.cpp buffer period+skip sample
26     \snippet output.txt buffer period+skip sample
27 
28     Overlapping buffers are allowed:
29     \snippet buffer.cpp buffer period+skip overlapping sample
30     \snippet output.txt buffer period+skip overlapping sample
31 
32     If no items are emitted, an empty buffer is returned:
33     \snippet buffer.cpp buffer period+skip empty sample
34     \snippet output.txt buffer period+skip empty sample
35 
36     \sample
37     \snippet buffer.cpp buffer period+coordination sample
38     \snippet output.txt buffer period+coordination sample
39 
40     \sample
41     \snippet buffer.cpp buffer period sample
42     \snippet output.txt buffer period sample
43 */
44 
45 #if !defined(RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP)
46 #define RXCPP_OPERATORS_RX_BUFFER_WITH_TIME_HPP
47 
48 #include "../rx-includes.hpp"
49 
50 namespace rxcpp {
51 
52 namespace operators {
53 
54 namespace detail {
55 
56 template<class... AN>
57 struct buffer_with_time_invalid_arguments {};
58 
59 template<class... AN>
60 struct buffer_with_time_invalid : public rxo::operator_base<buffer_with_time_invalid_arguments<AN...>> {
61     using type = observable<buffer_with_time_invalid_arguments<AN...>, buffer_with_time_invalid<AN...>>;
62 };
63 template<class... AN>
64 using buffer_with_time_invalid_t = typename buffer_with_time_invalid<AN...>::type;
65 
66 template<class T, class Duration, class Coordination>
67 struct buffer_with_time
68 {
69     typedef rxu::decay_t<T> source_value_type;
70     typedef std::vector<source_value_type> value_type;
71     typedef rxu::decay_t<Coordination> coordination_type;
72     typedef typename coordination_type::coordinator_type coordinator_type;
73     typedef rxu::decay_t<Duration> duration_type;
74 
75     struct buffer_with_time_values
76     {
buffer_with_time_valuesrxcpp::operators::detail::buffer_with_time::buffer_with_time_values77         buffer_with_time_values(duration_type p, duration_type s, coordination_type c)
78             : period(p)
79             , skip(s)
80             , coordination(c)
81         {
82         }
83         duration_type period;
84         duration_type skip;
85         coordination_type coordination;
86     };
87     buffer_with_time_values initial;
88 
buffer_with_timerxcpp::operators::detail::buffer_with_time89     buffer_with_time(duration_type period, duration_type skip, coordination_type coordination)
90         : initial(period, skip, coordination)
91     {
92     }
93 
94     template<class Subscriber>
95     struct buffer_with_time_observer
96     {
97         typedef buffer_with_time_observer<Subscriber> this_type;
98         typedef std::vector<T> value_type;
99         typedef rxu::decay_t<Subscriber> dest_type;
100         typedef observer<value_type, this_type> observer_type;
101 
102         struct buffer_with_time_subscriber_values : public buffer_with_time_values
103         {
buffer_with_time_subscriber_valuesrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer::buffer_with_time_subscriber_values104             buffer_with_time_subscriber_values(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
105                 : buffer_with_time_values(v)
106                 , cs(std::move(cs))
107                 , dest(std::move(d))
108                 , coordinator(std::move(c))
109                 , worker(coordinator.get_worker())
110                 , expected(worker.now())
111             {
112             }
113             composite_subscription cs;
114             dest_type dest;
115             coordinator_type coordinator;
116             rxsc::worker worker;
117             mutable std::deque<value_type> chunks;
118             rxsc::scheduler::clock_type::time_point expected;
119         };
120         std::shared_ptr<buffer_with_time_subscriber_values> state;
121 
buffer_with_time_observerrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer122         buffer_with_time_observer(composite_subscription cs, dest_type d, buffer_with_time_values v, coordinator_type c)
123             : state(std::make_shared<buffer_with_time_subscriber_values>(buffer_with_time_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
124         {
125             auto localState = state;
126 
127             auto disposer = [=](const rxsc::schedulable&){
128                 localState->cs.unsubscribe();
129                 localState->dest.unsubscribe();
130                 localState->worker.unsubscribe();
131             };
132             auto selectedDisposer = on_exception(
133                 [&](){return localState->coordinator.act(disposer);},
134                 localState->dest);
135             if (selectedDisposer.empty()) {
136                 return;
137             }
138 
139             localState->dest.add([=](){
140                 localState->worker.schedule(selectedDisposer.get());
141             });
142             localState->cs.add([=](){
143                 localState->worker.schedule(selectedDisposer.get());
144             });
145 
146             //
147             // The scheduler is FIFO for any time T. Since the observer is scheduling
148             // on_next/on_error/oncompleted the timed schedule calls must be resheduled
149             // when they occur to ensure that production happens after on_next/on_error/oncompleted
150             //
151 
152             auto produce_buffer = [localState](const rxsc::schedulable&) {
153                 localState->dest.on_next(std::move(localState->chunks.front()));
154                 localState->chunks.pop_front();
155             };
156             auto selectedProduce = on_exception(
157                 [&](){return localState->coordinator.act(produce_buffer);},
158                 localState->dest);
159             if (selectedProduce.empty()) {
160                 return;
161             }
162 
163             auto create_buffer = [localState, selectedProduce](const rxsc::schedulable&) {
164                 localState->chunks.emplace_back();
165                 auto produce_at = localState->expected + localState->period;
166                 localState->expected += localState->skip;
167                 localState->worker.schedule(produce_at, [localState, selectedProduce](const rxsc::schedulable&) {
168                     localState->worker.schedule(selectedProduce.get());
169                 });
170             };
171             auto selectedCreate = on_exception(
172                 [&](){return localState->coordinator.act(create_buffer);},
173                 localState->dest);
174             if (selectedCreate.empty()) {
175                 return;
176             }
177 
178             state->worker.schedule_periodically(
179                 state->expected,
180                 state->skip,
181                 [localState, selectedCreate](const rxsc::schedulable&) {
182                     localState->worker.schedule(selectedCreate.get());
183                 });
184         }
on_nextrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer185         void on_next(T v) const {
186             auto localState = state;
187             auto work = [v, localState](const rxsc::schedulable&){
188                 for(auto& chunk : localState->chunks) {
189                     chunk.push_back(v);
190                 }
191             };
192             auto selectedWork = on_exception(
193                 [&](){return localState->coordinator.act(work);},
194                 localState->dest);
195             if (selectedWork.empty()) {
196                 return;
197             }
198             localState->worker.schedule(selectedWork.get());
199         }
on_errorrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer200         void on_error(rxu::error_ptr e) const {
201             auto localState = state;
202             auto work = [e, localState](const rxsc::schedulable&){
203                 localState->dest.on_error(e);
204             };
205             auto selectedWork = on_exception(
206                 [&](){return localState->coordinator.act(work);},
207                 localState->dest);
208             if (selectedWork.empty()) {
209                 return;
210             }
211             localState->worker.schedule(selectedWork.get());
212         }
on_completedrxcpp::operators::detail::buffer_with_time::buffer_with_time_observer213         void on_completed() const {
214             auto localState = state;
215             auto work = [localState](const rxsc::schedulable&){
216                 on_exception(
217                     [&](){
218                         while (!localState->chunks.empty()) {
219                             localState->dest.on_next(std::move(localState->chunks.front()));
220                             localState->chunks.pop_front();
221                         }
222                         return true;
223                     },
224                     localState->dest);
225                 localState->dest.on_completed();
226             };
227             auto selectedWork = on_exception(
228                 [&](){return localState->coordinator.act(work);},
229                 localState->dest);
230             if (selectedWork.empty()) {
231                 return;
232             }
233             localState->worker.schedule(selectedWork.get());
234         }
235 
makerxcpp::operators::detail::buffer_with_time::buffer_with_time_observer236         static subscriber<T, observer<T, this_type>> make(dest_type d, buffer_with_time_values v) {
237             auto cs = composite_subscription();
238             auto coordinator = v.coordination.create_coordinator();
239 
240             return make_subscriber<T>(cs, this_type(cs, std::move(d), std::move(v), std::move(coordinator)));
241         }
242     };
243 
244     template<class Subscriber>
operator ()rxcpp::operators::detail::buffer_with_time245     auto operator()(Subscriber dest) const
246         -> decltype(buffer_with_time_observer<Subscriber>::make(std::move(dest), initial)) {
247         return      buffer_with_time_observer<Subscriber>::make(std::move(dest), initial);
248     }
249 };
250 
251 }
252 
253 /*! @copydoc rx-buffer_time.hpp
254 */
255 template<class... AN>
buffer_with_time(AN &&...an)256 auto buffer_with_time(AN&&... an)
257     ->      operator_factory<buffer_with_time_tag, AN...> {
258      return operator_factory<buffer_with_time_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
259 }
260 
261 }
262 
263 template<>
264 struct member_overload<buffer_with_time_tag>
265 {
266     template<class Observable, class Duration,
267         class Enabled = rxu::enable_if_all_true_type_t<
268             is_observable<Observable>,
269             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
270         class SourceValue = rxu::value_type_t<Observable>,
271         class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
272         class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload273     static auto member(Observable&& o, Duration period)
274         -> decltype(o.template lift<Value>(BufferWithTime(period, period, identity_current_thread()))) {
275         return      o.template lift<Value>(BufferWithTime(period, period, identity_current_thread()));
276     }
277 
278     template<class Observable, class Duration, class Coordination,
279         class Enabled = rxu::enable_if_all_true_type_t<
280             is_observable<Observable>,
281             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
282             is_coordination<Coordination>>,
283         class SourceValue = rxu::value_type_t<Observable>,
284         class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
285         class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload286     static auto member(Observable&& o, Duration period, Coordination&& cn)
287         -> decltype(o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)))) {
288         return      o.template lift<Value>(BufferWithTime(period, period, std::forward<Coordination>(cn)));
289     }
290 
291     template<class Observable, class Duration,
292         class Enabled = rxu::enable_if_all_true_type_t<
293             is_observable<Observable>,
294             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
295         class SourceValue = rxu::value_type_t<Observable>,
296         class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
297         class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload298     static auto member(Observable&& o, Duration&& period, Duration&& skip)
299         -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()))) {
300         return      o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), identity_current_thread()));
301     }
302 
303     template<class Observable, class Duration, class Coordination,
304         class Enabled = rxu::enable_if_all_true_type_t<
305             is_observable<Observable>,
306             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
307             is_coordination<Coordination>>,
308         class SourceValue = rxu::value_type_t<Observable>,
309         class BufferWithTime = rxo::detail::buffer_with_time<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
310         class Value = rxu::value_type_t<BufferWithTime>>
memberrxcpp::member_overload311     static auto member(Observable&& o, Duration&& period, Duration&& skip, Coordination&& cn)
312         -> decltype(o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)))) {
313         return      o.template lift<Value>(BufferWithTime(std::forward<Duration>(period), std::forward<Duration>(skip), std::forward<Coordination>(cn)));
314     }
315 
316     template<class... AN>
memberrxcpp::member_overload317     static operators::detail::buffer_with_time_invalid_t<AN...> member(AN...) {
318         std::terminate();
319         return {};
320         static_assert(sizeof...(AN) == 10000, "buffer_with_time takes (Duration, optional Duration, optional Coordination)");
321     }
322 };
323 
324 }
325 
326 #endif
327