1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-window_toggle.hpp
6 
7     \brief Return an observable that emits observables every period time interval and collects items from this observable for period of time into each produced observable, on the specified scheduler.
8 
9     \tparam Openings        observable<OT>
10     \tparam ClosingSelector a function of type observable<CT>(OT)
11     \tparam Coordination    the type of the scheduler (optional).
12 
13     \param opens         each value from this observable opens a new window.
14     \param closes        this function is called for each opened window and returns an observable. the first value from the returned observable will close the window.
15     \param coordination  the scheduler for the windows (optional).
16 
17     \return  Observable that emits an observable for each opened window.
18 
19     \sample
20     \snippet window.cpp window toggle+coordination sample
21     \snippet output.txt window toggle+coordination sample
22 
23     \sample
24     \snippet window.cpp window toggle sample
25     \snippet output.txt window toggle sample
26 */
27 
28 #if !defined(RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP)
29 #define RXCPP_OPERATORS_RX_WINDOW_TOGGLE_HPP
30 
31 #include "../rx-includes.hpp"
32 
33 namespace rxcpp {
34 
35 namespace operators {
36 
37 namespace detail {
38 
39 template<class... AN>
40 struct window_toggle_invalid_arguments {};
41 
42 template<class... AN>
43 struct window_toggle_invalid : public rxo::operator_base<window_toggle_invalid_arguments<AN...>> {
44     using type = observable<window_toggle_invalid_arguments<AN...>, window_toggle_invalid<AN...>>;
45 };
46 template<class... AN>
47 using window_toggle_invalid_t = typename window_toggle_invalid<AN...>::type;
48 
49 template<class T, class Openings, class ClosingSelector, class Coordination>
50 struct window_toggle
51 {
52     typedef window_toggle<T, Openings, ClosingSelector, Coordination> this_type;
53 
54     using source_value_type = rxu::decay_t<T>;
55     using coordination_type = rxu::decay_t<Coordination>;
56     using coordinator_type = typename coordination_type::coordinator_type;
57     using openings_type = rxu::decay_t<Openings>;
58     using openings_value_type = typename openings_type::value_type;
59     using closing_selector_type = rxu::decay_t<ClosingSelector>;
60     using closings_type = rxu::result_of_t<closing_selector_type(openings_value_type)>;
61     using closings_value_type = typename closings_type::value_type;
62 
63     struct window_toggle_values
64     {
window_toggle_valuesrxcpp::operators::detail::window_toggle::window_toggle_values65         window_toggle_values(openings_type opens, closing_selector_type closes, coordination_type c)
66             : openings(opens)
67             , closingSelector(closes)
68             , coordination(c)
69         {
70         }
71         openings_type openings;
72         mutable closing_selector_type closingSelector;
73         coordination_type coordination;
74     };
75     window_toggle_values initial;
76 
window_togglerxcpp::operators::detail::window_toggle77     window_toggle(openings_type opens, closing_selector_type closes, coordination_type coordination)
78         : initial(opens, closes, coordination)
79     {
80     }
81 
82     template<class Subscriber>
83     struct window_toggle_observer
84     {
85         typedef window_toggle_observer<Subscriber> this_type;
86         typedef rxu::decay_t<T> value_type;
87         typedef rxu::decay_t<Subscriber> dest_type;
88         typedef observer<T, this_type> observer_type;
89 
90         struct window_toggle_subscriber_values : public window_toggle_values
91         {
window_toggle_subscriber_valuesrxcpp::operators::detail::window_toggle::window_toggle_observer::window_toggle_subscriber_values92             window_toggle_subscriber_values(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
93                 : window_toggle_values(v)
94                 , cs(std::move(cs))
95                 , dest(std::move(d))
96                 , coordinator(std::move(c))
97                 , worker(coordinator.get_worker())
98             {
99             }
100             composite_subscription cs;
101             dest_type dest;
102             coordinator_type coordinator;
103             rxsc::worker worker;
104             mutable std::list<rxcpp::subjects::subject<T>> subj;
105         };
106         std::shared_ptr<window_toggle_subscriber_values> state;
107 
window_toggle_observerrxcpp::operators::detail::window_toggle::window_toggle_observer108         window_toggle_observer(composite_subscription cs, dest_type d, window_toggle_values v, coordinator_type c)
109             : state(std::make_shared<window_toggle_subscriber_values>(window_toggle_subscriber_values(std::move(cs), std::move(d), v, std::move(c))))
110         {
111             auto localState = state;
112 
113             composite_subscription innercs;
114 
115             // when the out observer is unsubscribed all the
116             // inner subscriptions are unsubscribed as well
117             auto innerscope = localState->dest.add(innercs);
118 
119             innercs.add([=](){
120                 localState->dest.remove(innerscope);
121             });
122 
123             localState->dest.add(localState->cs);
124 
125             auto source = on_exception(
126                 [&](){return localState->coordinator.in(localState->openings);},
127                 localState->dest);
128             if (source.empty()) {
129                 return;
130             }
131 
132             // this subscribe does not share the observer subscription
133             // so that when it is unsubscribed the observer can be called
134             // until the inner subscriptions have finished
135             auto sink = make_subscriber<openings_value_type>(
136                 localState->dest,
137                 innercs,
138             // on_next
139                 [localState](const openings_value_type& ov) {
140                     auto closer = localState->closingSelector(ov);
141 
142                     auto it = localState->subj.insert(localState->subj.end(), rxcpp::subjects::subject<T>());
143                     localState->dest.on_next(it->get_observable().as_dynamic());
144 
145                     composite_subscription innercs;
146 
147                     // when the out observer is unsubscribed all the
148                     // inner subscriptions are unsubscribed as well
149                     auto innerscope = localState->dest.add(innercs);
150 
151                     innercs.add([=](){
152                         localState->dest.remove(innerscope);
153                     });
154 
155                     auto source = localState->coordinator.in(closer);
156 
157                     auto sit = std::make_shared<decltype(it)>(it);
158                     auto close = [localState, sit]() {
159                         auto it = *sit;
160                         *sit = localState->subj.end();
161                         if (it != localState->subj.end()) {
162                             it->get_subscriber().on_completed();
163                             localState->subj.erase(it);
164                         }
165                     };
166 
167                     // this subscribe does not share the observer subscription
168                     // so that when it is unsubscribed the observer can be called
169                     // until the inner subscriptions have finished
170                     auto sink = make_subscriber<closings_value_type>(
171                         localState->dest,
172                         innercs,
173                     // on_next
174                         [close, innercs](closings_value_type) {
175                             close();
176                             innercs.unsubscribe();
177                         },
178                     // on_error
179                         [localState](rxu::error_ptr e) {
180                             localState->dest.on_error(e);
181                         },
182                     // on_completed
183                         close
184                     );
185                     auto selectedSink = localState->coordinator.out(sink);
186                     source.subscribe(std::move(selectedSink));
187                 },
188             // on_error
189                 [localState](rxu::error_ptr e) {
190                     localState->dest.on_error(e);
191                 },
192             // on_completed
193                 []() {
194                 }
195             );
196             auto selectedSink = on_exception(
197                 [&](){return localState->coordinator.out(sink);},
198                 localState->dest);
199             if (selectedSink.empty()) {
200                 return;
201             }
202             source->subscribe(std::move(selectedSink.get()));
203         }
204 
on_nextrxcpp::operators::detail::window_toggle::window_toggle_observer205         void on_next(T v) const {
206             auto localState = state;
207             auto work = [v, localState](const rxsc::schedulable&){
208                 for (auto s : localState->subj) {
209                     s.get_subscriber().on_next(v);
210                 }
211             };
212             auto selectedWork = on_exception(
213                 [&](){return localState->coordinator.act(work);},
214                 localState->dest);
215             if (selectedWork.empty()) {
216                 return;
217             }
218             localState->worker.schedule(selectedWork.get());
219         }
220 
on_errorrxcpp::operators::detail::window_toggle::window_toggle_observer221         void on_error(rxu::error_ptr e) const {
222             auto localState = state;
223             auto work = [e, localState](const rxsc::schedulable&){
224                 for (auto s : localState->subj) {
225                     s.get_subscriber().on_error(e);
226                 }
227                 localState->dest.on_error(e);
228             };
229             auto selectedWork = on_exception(
230                 [&](){return localState->coordinator.act(work);},
231                 localState->dest);
232             if (selectedWork.empty()) {
233                 return;
234             }
235             localState->worker.schedule(selectedWork.get());
236         }
237 
on_completedrxcpp::operators::detail::window_toggle::window_toggle_observer238         void on_completed() const {
239             auto localState = state;
240             auto work = [localState](const rxsc::schedulable&){
241                 for (auto s : localState->subj) {
242                     s.get_subscriber().on_completed();
243                 }
244                 localState->dest.on_completed();
245             };
246             auto selectedWork = on_exception(
247                 [&](){return localState->coordinator.act(work);},
248                 localState->dest);
249             if (selectedWork.empty()) {
250                 return;
251             }
252             localState->worker.schedule(selectedWork.get());
253         }
254 
makerxcpp::operators::detail::window_toggle::window_toggle_observer255         static subscriber<T, observer_type> make(dest_type d, window_toggle_values v) {
256             auto cs = composite_subscription();
257             auto coordinator = v.coordination.create_coordinator(d.get_subscription());
258 
259             return make_subscriber<T>(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator))));
260         }
261     };
262 
263     template<class Subscriber>
operator ()rxcpp::operators::detail::window_toggle264     auto operator()(Subscriber dest) const
265         -> decltype(window_toggle_observer<Subscriber>::make(std::move(dest), initial)) {
266         return      window_toggle_observer<Subscriber>::make(std::move(dest), initial);
267     }
268 };
269 
270 }
271 
272 /*! @copydoc rx-window_toggle.hpp
273 */
274 template<class... AN>
window_toggle(AN &&...an)275 auto window_toggle(AN&&... an)
276     ->      operator_factory<window_toggle_tag, AN...> {
277      return operator_factory<window_toggle_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
278 }
279 
280 }
281 
282 template<>
283 struct member_overload<window_toggle_tag>
284 {
285     template<class Observable, class Openings, class ClosingSelector,
286         class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
287         class OpeningsType = rxu::decay_t<Openings>,
288         class OpeningsValueType = typename OpeningsType::value_type,
289         class Enabled = rxu::enable_if_all_true_type_t<
290             all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>>,
291         class SourceValue = rxu::value_type_t<Observable>,
292         class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, identity_one_worker>,
293         class Value = observable<SourceValue>>
memberrxcpp::member_overload294     static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector)
295         -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()))) {
296         return      o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), identity_immediate()));
297     }
298 
299     template<class Observable, class Openings, class ClosingSelector, class Coordination,
300         class ClosingSelectorType = rxu::decay_t<ClosingSelector>,
301         class OpeningsType = rxu::decay_t<Openings>,
302         class OpeningsValueType = typename OpeningsType::value_type,
303         class Enabled = rxu::enable_if_all_true_type_t<
304             all_observables<Observable, Openings, rxu::result_of_t<ClosingSelectorType(OpeningsValueType)>>,
305             is_coordination<Coordination>>,
306         class SourceValue = rxu::value_type_t<Observable>,
307         class WindowToggle = rxo::detail::window_toggle<SourceValue, rxu::decay_t<Openings>, rxu::decay_t<ClosingSelector>, rxu::decay_t<Coordination>>,
308         class Value = observable<SourceValue>>
memberrxcpp::member_overload309     static auto member(Observable&& o, Openings&& openings, ClosingSelector&& closingSelector, Coordination&& cn)
310         -> decltype(o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)))) {
311         return      o.template lift<Value>(WindowToggle(std::forward<Openings>(openings), std::forward<ClosingSelector>(closingSelector), std::forward<Coordination>(cn)));
312     }
313 
314     template<class... AN>
memberrxcpp::member_overload315     static operators::detail::window_toggle_invalid_t<AN...> member(AN...) {
316         std::terminate();
317         return {};
318         static_assert(sizeof...(AN) == 10000, "window_toggle takes (Openings, ClosingSelector, optional Coordination)");
319     }
320 };
321 
322 }
323 
324 #endif
325