1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-take_until.hpp
6 
7     \brief For each item from this observable until on_next occurs on the trigger observable or until the specified time, emit them from the new observable that is returned.
8            take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)
9 
10     \tparam  TriggerSource  the type of the trigger observable.
11     \tparam  TimePoint  the type of the time interval.
12     \tparam  Coordination   the type of the scheduler (optional).
13 
14     \param  t   an observable whose first emitted item will stop emitting items from the source observable.
15     \param  when  a time point when the returned observable will stop emitting items.
16     \param  cn  the scheduler to use for scheduling the items (optional).
17 
18     \return  An observable that emits the items emitted by the source observable until trigger observable emitted or the time runs out.
19 
20     \sample
21     \snippet take_until.cpp take_until sample
22     \snippet output.txt take_until sample
23 
24     \sample
25     \snippet take_until.cpp threaded take_until sample
26     \snippet output.txt threaded take_until sample
27 
28     \sample
29     \snippet take_until.cpp take_until time sample
30     \snippet output.txt take_until time sample
31 
32     \sample
33     \snippet take_until.cpp threaded take_until time sample
34     \snippet output.txt threaded take_until time sample
35 */
36 
37 #if !defined(RXCPP_OPERATORS_RX_TAKE_UNTIL_HPP)
38 #define RXCPP_OPERATORS_RX_TAKE_UNTIL_HPP
39 
40 #include "../rx-includes.hpp"
41 
42 namespace rxcpp {
43 
44 namespace operators {
45 
46 namespace detail {
47 
48 template<class... AN>
49 struct take_until_invalid_arguments {};
50 
51 template<class... AN>
52 struct take_until_invalid : public rxo::operator_base<take_until_invalid_arguments<AN...>> {
53     using type = observable<take_until_invalid_arguments<AN...>, take_until_invalid<AN...>>;
54 };
55 template<class... AN>
56 using take_until_invalid_t = typename take_until_invalid<AN...>::type;
57 
58 template<class T, class Observable, class TriggerObservable, class Coordination>
59 struct take_until : public operator_base<T>
60 {
61     typedef rxu::decay_t<Observable> source_type;
62     typedef rxu::decay_t<TriggerObservable> trigger_source_type;
63     typedef rxu::decay_t<Coordination> coordination_type;
64     typedef typename coordination_type::coordinator_type coordinator_type;
65     struct values
66     {
valuesrxcpp::operators::detail::take_until::values67         values(source_type s, trigger_source_type t, coordination_type sf)
68             : source(std::move(s))
69             , trigger(std::move(t))
70             , coordination(std::move(sf))
71         {
72         }
73         source_type source;
74         trigger_source_type trigger;
75         coordination_type coordination;
76     };
77     values initial;
78 
take_untilrxcpp::operators::detail::take_until79     take_until(source_type s, trigger_source_type t, coordination_type sf)
80         : initial(std::move(s), std::move(t), std::move(sf))
81     {
82     }
83 
84     struct mode
85     {
86         enum type {
87             taking,    // no messages from trigger
88             clear,     // trigger completed
89             triggered, // trigger sent on_next
90             errored,   // error either on trigger or on observable
91             stopped    // observable completed
92         };
93     };
94 
95     template<class Subscriber>
on_subscriberxcpp::operators::detail::take_until96     void on_subscribe(Subscriber s) const {
97 
98         typedef Subscriber output_type;
99         struct take_until_state_type
100             : public std::enable_shared_from_this<take_until_state_type>
101             , public values
102         {
103             take_until_state_type(const values& i, coordinator_type coor, const output_type& oarg)
104                 : values(i)
105                 , mode_value(mode::taking)
106                 , coordinator(std::move(coor))
107                 , out(oarg)
108             {
109                 out.add(trigger_lifetime);
110                 out.add(source_lifetime);
111             }
112             typename mode::type mode_value;
113             composite_subscription trigger_lifetime;
114             composite_subscription source_lifetime;
115             coordinator_type coordinator;
116             output_type out;
117         };
118 
119         auto coordinator = initial.coordination.create_coordinator(s.get_subscription());
120 
121         // take a copy of the values for each subscription
122         auto state = std::make_shared<take_until_state_type>(initial, std::move(coordinator), std::move(s));
123 
124         auto trigger = on_exception(
125             [&](){return state->coordinator.in(state->trigger);},
126             state->out);
127         if (trigger.empty()) {
128             return;
129         }
130 
131         auto source = on_exception(
132             [&](){return state->coordinator.in(state->source);},
133             state->out);
134         if (source.empty()) {
135             return;
136         }
137 
138         auto sinkTrigger = make_subscriber<typename trigger_source_type::value_type>(
139         // share parts of subscription
140             state->out,
141         // new lifetime
142             state->trigger_lifetime,
143         // on_next
144             [state](const typename trigger_source_type::value_type&) {
145                 if (state->mode_value != mode::taking) {return;}
146                 state->mode_value = mode::triggered;
147                 state->out.on_completed();
148             },
149         // on_error
150             [state](rxu::error_ptr e) {
151                 if (state->mode_value != mode::taking) {return;}
152                 state->mode_value = mode::errored;
153                 state->out.on_error(e);
154             },
155         // on_completed
156             [state]() {
157                 if (state->mode_value != mode::taking) {return;}
158                 state->mode_value = mode::clear;
159             }
160         );
161         auto selectedSinkTrigger = on_exception(
162             [&](){return state->coordinator.out(sinkTrigger);},
163             state->out);
164         if (selectedSinkTrigger.empty()) {
165             return;
166         }
167         trigger->subscribe(std::move(selectedSinkTrigger.get()));
168 
169         auto sinkSource = make_subscriber<T>(
170         // split subscription lifetime
171             state->source_lifetime,
172         // on_next
173             [state](T t) {
174                 //
175                 // everything is crafted to minimize the overhead of this function.
176                 //
177                 if (state->mode_value < mode::triggered) {
178                     state->out.on_next(t);
179                 }
180             },
181         // on_error
182             [state](rxu::error_ptr e) {
183                 if (state->mode_value > mode::clear) {return;}
184                 state->mode_value = mode::errored;
185                 state->out.on_error(e);
186             },
187         // on_completed
188             [state]() {
189                 if (state->mode_value > mode::clear) {return;}
190                 state->mode_value = mode::stopped;
191                 state->out.on_completed();
192             }
193         );
194         auto selectedSinkSource = on_exception(
195             [&](){return state->coordinator.out(sinkSource);},
196             state->out);
197         if (selectedSinkSource.empty()) {
198             return;
199         }
200         source->subscribe(std::move(selectedSinkSource.get()));
201     }
202 };
203 
204 }
205 
206 /*! @copydoc rx-take_until.hpp
207 */
208 template<class... AN>
take_until(AN &&...an)209 auto take_until(AN&&... an)
210     ->     operator_factory<take_until_tag, AN...> {
211     return operator_factory<take_until_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
212 }
213 
214 }
215 
216 template<>
217 struct member_overload<take_until_tag>
218 {
219     template<class Observable, class TimePoint,
220         class Enabled = rxu::enable_if_all_true_type_t<
221             is_observable<Observable>,
222             std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
223         class SourceValue = rxu::value_type_t<Observable>,
224         class Timer = typename rxu::defer_type<rxs::detail::timer, identity_one_worker>::type,
225         class TimerValue = rxu::value_type_t<Timer>,
226         class TriggerObservable = observable<TimerValue, Timer>,
227         class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, identity_one_worker>,
228         class Value = rxu::value_type_t<TakeUntil>,
229         class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload230     static Result member(Observable&& o, TimePoint&& when) {
231         auto cn = identity_current_thread();
232         return Result(TakeUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
233     }
234 
235     template<class Observable, class TimePoint, class Coordination,
236         class Enabled = rxu::enable_if_all_true_type_t<
237             is_observable<Observable>,
238             is_coordination<Coordination>,
239             std::is_convertible<TimePoint, rxsc::scheduler::clock_type::time_point>>,
240         class SourceValue = rxu::value_type_t<Observable>,
241         class Timer = typename rxu::defer_type<rxs::detail::timer, rxu::decay_t<Coordination>>::type,
242         class TimerValue = rxu::value_type_t<Timer>,
243         class TriggerObservable = observable<TimerValue, Timer>,
244         class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, TriggerObservable, rxu::decay_t<Coordination>>,
245         class Value = rxu::value_type_t<TakeUntil>,
246         class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload247     static Result member(Observable&& o, TimePoint&& when, Coordination cn) {
248         return Result(TakeUntil(std::forward<Observable>(o), rxs::timer(std::forward<TimePoint>(when), cn), cn));
249     }
250 
251     template<class Observable, class TriggerObservable,
252         class Enabled = rxu::enable_if_all_true_type_t<
253             all_observables<Observable, TriggerObservable>>,
254         class SourceValue = rxu::value_type_t<Observable>,
255         class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, identity_one_worker>,
256         class Value = rxu::value_type_t<TakeUntil>,
257         class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload258     static Result member(Observable&& o, TriggerObservable&& t) {
259         return Result(TakeUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), identity_current_thread()));
260     }
261 
262      template<class Observable, class TriggerObservable, class Coordination,
263         class Enabled = rxu::enable_if_all_true_type_t<
264             all_observables<Observable, TriggerObservable>,
265             is_coordination<Coordination>>,
266         class SourceValue = rxu::value_type_t<Observable>,
267         class TakeUntil = rxo::detail::take_until<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<TriggerObservable>, rxu::decay_t<Coordination>>,
268         class Value = rxu::value_type_t<TakeUntil>,
269         class Result = observable<Value, TakeUntil>>
memberrxcpp::member_overload270     static Result member(Observable&& o, TriggerObservable&& t, Coordination&& cn) {
271         return Result(TakeUntil(std::forward<Observable>(o), std::forward<TriggerObservable>(t), std::forward<Coordination>(cn)));
272     }
273 
274     template<class... AN>
memberrxcpp::member_overload275     static operators::detail::take_until_invalid_t<AN...> member(AN...) {
276         std::terminate();
277         return {};
278         static_assert(sizeof...(AN) == 10000, "take_until takes (TriggerObservable, optional Coordination) or (TimePoint, optional Coordination)");
279     }
280 };
281 
282 }
283 
284 #endif
285