1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-skip_until.hpp
6 
7     \brief Make new observable with items skipped until on_next occurs on the trigger observable or until the specified time.
8            skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)
9 
10     \tparam  TriggerSource  the type of the trigger observable.
11     \tparam  Coordination   the type of the scheduler (optional).
12 
13     \param  t  an observable that has to emit an item before the source observable's elements begin to be mirrored by the resulting observable.
14     \param  cn  the scheduler to use for scheduling the items (optional).
15 
16     \return  An observable that skips items from the source observable until the second observable emits an item or the time runs out, then emits the remaining items.
17 
18     \sample
19     \snippet skip_until.cpp skip_until sample
20     \snippet output.txt skip_until sample
21 
22     \sample
23     \snippet skip_until.cpp threaded skip_until sample
24     \snippet output.txt threaded skip_until sample
25 */
26 
27 #if !defined(RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP)
28 #define RXCPP_OPERATORS_RX_SKIP_UNTIL_HPP
29 
30 #include "../rx-includes.hpp"
31 
32 namespace rxcpp {
33 
34 namespace operators {
35 
36 namespace detail {
37 
38 template<class... AN>
39 struct skip_until_invalid_arguments {};
40 
41 template<class... AN>
42 struct skip_until_invalid : public rxo::operator_base<skip_until_invalid_arguments<AN...>> {
43     using type = observable<skip_until_invalid_arguments<AN...>, skip_until_invalid<AN...>>;
44 };
45 template<class... AN>
46 using skip_until_invalid_t = typename skip_until_invalid<AN...>::type;
47 
48 template<class T, class Observable, class TriggerObservable, class Coordination>
49 struct skip_until : public operator_base<T>
50 {
51     typedef rxu::decay_t<Observable> source_type;
52     typedef rxu::decay_t<TriggerObservable> trigger_source_type;
53     typedef rxu::decay_t<Coordination> coordination_type;
54     typedef typename coordination_type::coordinator_type coordinator_type;
55     struct values
56     {
valuesrxcpp::operators::detail::skip_until::values57         values(source_type s, trigger_source_type t, coordination_type sf)
58             : source(std::move(s))
59             , trigger(std::move(t))
60             , coordination(std::move(sf))
61         {
62         }
63         source_type source;
64         trigger_source_type trigger;
65         coordination_type coordination;
66     };
67     values initial;
68 
skip_untilrxcpp::operators::detail::skip_until69     skip_until(source_type s, trigger_source_type t, coordination_type sf)
70         : initial(std::move(s), std::move(t), std::move(sf))
71     {
72     }
73 
74     struct mode
75     {
76         enum type {
77             skipping,  // no messages from trigger
78             clear,     // trigger completed
79             triggered, // trigger sent on_next
80             errored,   // error either on trigger or on observable
81             stopped    // observable completed
82         };
83     };
84 
85     template<class Subscriber>
on_subscriberxcpp::operators::detail::skip_until86     void on_subscribe(Subscriber s) const {
87 
88         typedef Subscriber output_type;
89         struct state_type
90             : public std::enable_shared_from_this<state_type>
91             , public values
92         {
93             state_type(const values& i, coordinator_type coor, const output_type& oarg)
94                 : values(i)
95                 , mode_value(mode::skipping)
96                 , coordinator(std::move(coor))
97                 , out(oarg)
98             {
99                 out.add(trigger_lifetime);
100                 out.add(source_lifetime);
101             }
102             typename mode::type mode_value;
103             composite_subscription trigger_lifetime;
104             composite_subscription source_lifetime;
105             coordinator_type coordinator;
106             output_type out;
107         };
108 
109         auto coordinator = initial.coordination.create_coordinator();
110 
111         // take a copy of the values for each subscription
112         auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
113 
114         auto trigger = on_exception(
115             [&](){return state->coordinator.in(state->trigger);},
116             state->out);
117         if (trigger.empty()) {
118             return;
119         }
120 
121         auto source = on_exception(
122             [&](){return state->coordinator.in(state->source);},
123             state->out);
124         if (source.empty()) {
125             return;
126         }
127 
128         auto sinkTrigger = make_subscriber<typename trigger_source_type::value_type>(
129         // share parts of subscription
130             state->out,
131         // new lifetime
132             state->trigger_lifetime,
133         // on_next
134             [state](const typename trigger_source_type::value_type&) {
135                 if (state->mode_value != mode::skipping) {
136                     return;
137                 }
138                 state->mode_value = mode::triggered;
139                 state->trigger_lifetime.unsubscribe();
140             },
141         // on_error
142             [state](rxu::error_ptr e) {
143                 if (state->mode_value != mode::skipping) {
144                     return;
145                 }
146                 state->mode_value = mode::errored;
147                 state->out.on_error(e);
148             },
149         // on_completed
150             [state]() {
151                 if (state->mode_value != mode::skipping) {
152                     return;
153                 }
154                 state->mode_value = mode::clear;
155                 state->trigger_lifetime.unsubscribe();
156             }
157         );
158         auto selectedSinkTrigger = on_exception(
159             [&](){return state->coordinator.out(sinkTrigger);},
160             state->out);
161         if (selectedSinkTrigger.empty()) {
162             return;
163         }
164         trigger->subscribe(std::move(selectedSinkTrigger.get()));
165 
166         source.get().subscribe(
167         // split subscription lifetime
168             state->source_lifetime,
169         // on_next
170             [state](T t) {
171                 if (state->mode_value != mode::triggered) {
172                     return;
173                 }
174                 state->out.on_next(t);
175             },
176         // on_error
177             [state](rxu::error_ptr e) {
178                 if (state->mode_value > mode::triggered) {
179                     return;
180                 }
181                 state->mode_value = mode::errored;
182                 state->out.on_error(e);
183             },
184         // on_completed
185             [state]() {
186                 if (state->mode_value != mode::triggered) {
187                     return;
188                 }
189                 state->mode_value = mode::stopped;
190                 state->out.on_completed();
191             }
192         );
193     }
194 };
195 
196 }
197 
198 /*! @copydoc rx-skip_until.hpp
199 */
200 template<class... AN>
skip_until(AN &&...an)201 auto skip_until(AN&&... an)
202     ->     operator_factory<skip_until_tag, AN...> {
203     return operator_factory<skip_until_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
204 }
205 
206 }
207 
208 template<>
209 struct member_overload<skip_until_tag>
210 {
211     template<class Observable, class TimePoint,
212         class Enabled = rxu::enable_if_all_true_type_t<
213             is_observable<Observable>,
214             std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
215         class SourceValue = rxu::value_type_t<Observable>,
216         class Timer = typename rxu::defer_type<rxs::detail::timer, identity_one_worker>::type,
217         class TimerValue = rxu::value_type_t<Timer>,
218         class TriggerObservable = observable<TimerValue, Timer>,
219         class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, identity_one_worker>,
220         class Value = rxu::value_type_t<SkipUntil>,
221         class Result = observable<Value, SkipUntil>>
memberrxcpp::member_overload222     static Result member(Observable&& o, TimePoint&& when) {
223         auto cn = identity_current_thread();
224         return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
225     }
226 
227     template<class Observable, class TimePoint, class Coordination,
228         class Enabled = rxu::enable_if_all_true_type_t<
229             is_observable<Observable>,
230             is_coordination<Coordination>,
231             std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
232         class SourceValue = rxu::value_type_t<Observable>,
233         class Timer = typename rxu::defer_type<rxs::detail::timer, rxu::decay_t<Coordination>>::type,
234         class TimerValue = rxu::value_type_t<Timer>,
235         class TriggerObservable = observable<TimerValue, Timer>,
236         class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, rxu::decay_t<Coordination>>,
237         class Value = rxu::value_type_t<SkipUntil>,
238         class Result = observable<Value, SkipUntil>>
memberrxcpp::member_overload239     static Result member(Observable&& o, TimePoint&& when, Coordination cn) {
240         return Result(SkipUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
241     }
242 
243     template<class Observable, class TriggerObservable,
244         class Enabled = rxu::enable_if_all_true_type_t<
245             all_observables<Observable, TriggerObservable>>,
246         class SourceValue = rxu::value_type_t<Observable>,
247         class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, identity_one_worker>,
248         class Value = rxu::value_type_t<SkipUntil>,
249         class Result = observable<Value, SkipUntil>>
memberrxcpp::member_overload250     static Result member(Observable&& o, TriggerObservable&& t) {
251         return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), identity_current_thread()));
252     }
253 
254      template<class Observable, class TriggerObservable, class Coordination,
255         class Enabled = rxu::enable_if_all_true_type_t<
256             all_observables<Observable, TriggerObservable>,
257             is_coordination<Coordination>>,
258         class SourceValue = rxu::value_type_t<Observable>,
259         class SkipUntil = rxo::detail::skip_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, rxu::decay_t<Coordination>>,
260         class Value = rxu::value_type_t<SkipUntil>,
261         class Result = observable<Value, SkipUntil>>
memberrxcpp::member_overload262     static Result member(Observable&& o, TriggerObservable&& t, Coordination&& cn) {
263         return Result(SkipUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn)));
264     }
265 
266     template<class... AN>
memberrxcpp::member_overload267     static operators::detail::skip_until_invalid_t<AN...> member(AN...) {
268         std::terminate();
269         return {};
270         static_assert(sizeof...(AN) == 10000, "skip_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)");
271     }
272 };
273 
274 }
275 
276 #endif
277