1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-window_time_count.hpp
6 
7     \brief Return an observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first), on the specified scheduler.
8 
9     \tparam Duration      the type of time intervals.
10     \tparam Coordination  the type of the scheduler (optional).
11 
12     \param period        the period of time each window collects items before it is completed and replaced with a new window.
13     \param count         the maximum size of each window before it is completed and new window is created.
14     \param coordination  the scheduler for the windows (optional).
15 
16     \return  Observable that emits connected, non-overlapping windows of items from the source observable that were emitted during a fixed duration of time or when the window has reached maximum capacity (whichever occurs first).
17 
18     \sample
19     \snippet window.cpp window period+count+coordination sample
20     \snippet output.txt window period+count+coordination sample
21 
22     \sample
23     \snippet window.cpp window period+count sample
24     \snippet output.txt window period+count sample
25 */
26 
27 #if !defined(RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP)
28 #define RXCPP_OPERATORS_RX_WINDOW_WITH_TIME_OR_COUNT_HPP
29 
30 #include "../rx-includes.hpp"
31 
32 namespace rxcpp {
33 
34 namespace operators {
35 
36 namespace detail {
37 
38 template<class... AN>
39 struct window_with_time_or_count_invalid_arguments {};
40 
41 template<class... AN>
42 struct window_with_time_or_count_invalid : public rxo::operator_base<window_with_time_or_count_invalid_arguments<AN...>> {
43     using type = observable<window_with_time_or_count_invalid_arguments<AN...>, window_with_time_or_count_invalid<AN...>>;
44 };
45 template<class... AN>
46 using window_with_time_or_count_invalid_t = typename window_with_time_or_count_invalid<AN...>::type;
47 
48 template<class T, class Duration, class Coordination>
49 struct window_with_time_or_count
50 {
51     typedef rxu::decay_t<T> source_value_type;
52     typedef observable<source_value_type> value_type;
53     typedef rxu::decay_t<Coordination> coordination_type;
54     typedef typename coordination_type::coordinator_type coordinator_type;
55     typedef rxu::decay_t<Duration> duration_type;
56 
57     struct window_with_time_or_count_values
58     {
window_with_time_or_count_valuesrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_values59         window_with_time_or_count_values(duration_type p, int n, coordination_type c)
60             : period(p)
61             , count(n)
62             , coordination(c)
63         {
64         }
65         duration_type period;
66         int count;
67         coordination_type coordination;
68     };
69     window_with_time_or_count_values initial;
70 
window_with_time_or_countrxcpp::operators::detail::window_with_time_or_count71     window_with_time_or_count(duration_type period, int count, coordination_type coordination)
72         : initial(period, count, coordination)
73     {
74     }
75 
76     template<class Subscriber>
77     struct window_with_time_or_count_observer
78     {
79         typedef window_with_time_or_count_observer<Subscriber> this_type;
80         typedef rxu::decay_t<T> value_type;
81         typedef rxu::decay_t<Subscriber> dest_type;
82         typedef observer<T, this_type> observer_type;
83 
84         struct window_with_time_or_count_subscriber_values : public window_with_time_or_count_values
85         {
window_with_time_or_count_subscriber_valuesrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer::window_with_time_or_count_subscriber_values86             window_with_time_or_count_subscriber_values(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
87                 : window_with_time_or_count_values(std::move(v))
88                 , cs(std::move(cs))
89                 , dest(std::move(d))
90                 , coordinator(std::move(c))
91                 , worker(coordinator.get_worker())
92                 , cursor(0)
93                 , subj_id(0)
94             {
95             }
96             composite_subscription cs;
97             dest_type dest;
98             coordinator_type coordinator;
99             rxsc::worker worker;
100             mutable int cursor;
101             mutable int subj_id;
102             mutable rxcpp::subjects::subject<T> subj;
103         };
104         typedef std::shared_ptr<window_with_time_or_count_subscriber_values> state_type;
105         state_type state;
106 
window_with_time_or_count_observerrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer107         window_with_time_or_count_observer(composite_subscription cs, dest_type d, window_with_time_or_count_values v, coordinator_type c)
108             : state(std::make_shared<window_with_time_or_count_subscriber_values>(window_with_time_or_count_subscriber_values(std::move(cs), std::move(d), std::move(v), std::move(c))))
109         {
110             auto new_id = state->subj_id;
111             auto produce_time = state->worker.now();
112             auto localState = state;
113 
114             auto disposer = [=](const rxsc::schedulable&){
115                 localState->cs.unsubscribe();
116                 localState->dest.unsubscribe();
117                 localState->worker.unsubscribe();
118             };
119             auto selectedDisposer = on_exception(
120                 [&](){return localState->coordinator.act(disposer);},
121                 localState->dest);
122             if (selectedDisposer.empty()) {
123                 return;
124             }
125 
126             localState->dest.add([=](){
127                 localState->worker.schedule(selectedDisposer.get());
128             });
129             localState->cs.add([=](){
130                 localState->worker.schedule(selectedDisposer.get());
131             });
132 
133             //
134             // The scheduler is FIFO for any time T. Since the observer is scheduling
135             // on_next/on_error/oncompleted the timed schedule calls must be resheduled
136             // when they occur to ensure that production happens after on_next/on_error/oncompleted
137             //
138 
139             localState->worker.schedule(produce_time, [new_id, produce_time, localState](const rxsc::schedulable&){
140                 localState->worker.schedule(release_window(new_id, produce_time, localState));
141             });
142         }
143 
release_windowrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer144         static std::function<void(const rxsc::schedulable&)> release_window(int id, rxsc::scheduler::clock_type::time_point expected, state_type state) {
145             auto release = [id, expected, state](const rxsc::schedulable&) {
146                 if (id != state->subj_id)
147                     return;
148 
149                 state->subj.get_subscriber().on_completed();
150                 state->subj = rxcpp::subjects::subject<T>();
151                 state->dest.on_next(state->subj.get_observable().as_dynamic());
152                 state->cursor = 0;
153                 auto new_id = ++state->subj_id;
154                 auto produce_time = expected + state->period;
155                 state->worker.schedule(produce_time, [new_id, produce_time, state](const rxsc::schedulable&){
156                     state->worker.schedule(release_window(new_id, produce_time, state));
157                 });
158             };
159             auto selectedRelease = on_exception(
160                 [&](){return state->coordinator.act(release);},
161                 state->dest);
162             if (selectedRelease.empty()) {
163                 return std::function<void(const rxsc::schedulable&)>();
164             }
165 
166             return std::function<void(const rxsc::schedulable&)>(selectedRelease.get());
167         }
168 
on_nextrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer169         void on_next(T v) const {
170             auto localState = state;
171             auto work = [v, localState](const rxsc::schedulable& self){
172                 localState->subj.get_subscriber().on_next(v);
173                 if (++localState->cursor == localState->count) {
174                     release_window(localState->subj_id, localState->worker.now(), localState)(self);
175                 }
176             };
177             auto selectedWork = on_exception(
178                 [&](){return localState->coordinator.act(work);},
179                 localState->dest);
180             if (selectedWork.empty()) {
181                 return;
182             }
183             localState->worker.schedule(selectedWork.get());
184         }
185 
on_errorrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer186         void on_error(rxu::error_ptr e) const {
187             auto localState = state;
188             auto work = [e, localState](const rxsc::schedulable&){
189                 localState->subj.get_subscriber().on_error(e);
190                 localState->dest.on_error(e);
191             };
192             auto selectedWork = on_exception(
193                 [&](){return localState->coordinator.act(work);},
194                 localState->dest);
195             if (selectedWork.empty()) {
196                 return;
197             }
198             localState->worker.schedule(selectedWork.get());
199         }
200 
on_completedrxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer201         void on_completed() const {
202             auto localState = state;
203             auto work = [localState](const rxsc::schedulable&){
204                 localState->subj.get_subscriber().on_completed();
205                 localState->dest.on_completed();
206             };
207             auto selectedWork = on_exception(
208                 [&](){return localState->coordinator.act(work);},
209                 localState->dest);
210             if (selectedWork.empty()) {
211                 return;
212             }
213             localState->worker.schedule(selectedWork.get());
214         }
215 
makerxcpp::operators::detail::window_with_time_or_count::window_with_time_or_count_observer216         static subscriber<T, observer_type> make(dest_type d, window_with_time_or_count_values v) {
217             auto cs = composite_subscription();
218             auto coordinator = v.coordination.create_coordinator();
219 
220             return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
221         }
222     };
223 
224     template<class Subscriber>
operator ()rxcpp::operators::detail::window_with_time_or_count225     auto operator()(Subscriber dest) const
226         -> decltype(window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial)) {
227         return      window_with_time_or_count_observer<Subscriber>::make(std::move(dest), initial);
228     }
229 };
230 
231 }
232 
233 /*! @copydoc rx-window_time_count.hpp
234 */
235 template<class... AN>
window_with_time_or_count(AN &&...an)236 auto window_with_time_or_count(AN&&... an)
237     ->      operator_factory<window_with_time_or_count_tag, AN...> {
238      return operator_factory<window_with_time_or_count_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
239 }
240 
241 }
242 
243 template<>
244 struct member_overload<window_with_time_or_count_tag>
245 {
246     template<class Observable, class Duration,
247         class Enabled = rxu::enable_if_all_true_type_t<
248             is_observable<Observable>,
249             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>>,
250         class SourceValue = rxu::value_type_t<Observable>,
251         class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>, identity_one_worker>,
252         class Value = rxu::value_type_t<WindowTimeCount>>
memberrxcpp::member_overload253     static auto member(Observable&& o, Duration&& period, int count)
254         -> decltype(o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, identity_current_thread()))) {
255         return      o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, identity_current_thread()));
256     }
257 
258     template<class Observable, class Duration, class Coordination,
259         class Enabled = rxu::enable_if_all_true_type_t<
260             is_observable<Observable>,
261             std::is_convertible<Duration, rxsc::scheduler::clock_type::duration>,
262             is_coordination<Coordination>>,
263         class SourceValue = rxu::value_type_t<Observable>,
264         class WindowTimeCount = rxo::detail::window_with_time_or_count<SourceValue, rxu::decay_t<Duration>, rxu::decay_t<Coordination>>,
265         class Value = rxu::value_type_t<WindowTimeCount>>
memberrxcpp::member_overload266     static auto member(Observable&& o, Duration&& period, int count, Coordination&& cn)
267         -> decltype(o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)))) {
268         return      o.template lift<Value>(WindowTimeCount(std::forward<Duration>(period), count, std::forward<Coordination>(cn)));
269     }
270 
271     template<class... AN>
memberrxcpp::member_overload272     static operators::detail::window_with_time_or_count_invalid_t<AN...> member(AN...) {
273         std::terminate();
274         return {};
275         static_assert(sizeof...(AN) == 10000, "window_with_time_or_count takes (Duration, Count, optional Coordination)");
276     }
277 };
278 
279 }
280 
281 #endif
282