1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-combine_latest.hpp
6 
7     \brief For each item from all of the observables select a value to emit from the new observable that is returned.
8 
9     \tparam AN  types of scheduler (optional), aggregate function (optional), and source observables
10 
11     \param  an  scheduler (optional), aggregation function (optional), and source observables
12 
13     \return  Observable that emits items that are the result of combining the items emitted by the source observables.
14 
15     If scheduler is omitted, identity_current_thread is used.
16 
17     If aggregation function is omitted, the resulting observable returns tuples of emitted items.
18 
19     \sample
20 
21     Neither scheduler nor aggregation function are present:
22     \snippet combine_latest.cpp combine_latest sample
23     \snippet output.txt combine_latest sample
24 
25     Only scheduler is present:
26     \snippet combine_latest.cpp Coordination combine_latest sample
27     \snippet output.txt Coordination combine_latest sample
28 
29     Only aggregation function is present:
30     \snippet combine_latest.cpp Selector combine_latest sample
31     \snippet output.txt Selector combine_latest sample
32 
33     Both scheduler and aggregation function are present:
34     \snippet combine_latest.cpp Coordination+Selector combine_latest sample
35     \snippet output.txt Coordination+Selector combine_latest sample
36 */
37 
38 #if !defined(RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP)
39 #define RXCPP_OPERATORS_RX_COMBINE_LATEST_HPP
40 
41 #include "../rx-includes.hpp"
42 
43 namespace rxcpp {
44 
45 namespace operators {
46 
47 namespace detail {
48 
49 template<class... AN>
50 struct combine_latest_invalid_arguments {};
51 
52 template<class... AN>
53 struct combine_latest_invalid : public rxo::operator_base<combine_latest_invalid_arguments<AN...>> {
54     using type = observable<combine_latest_invalid_arguments<AN...>, combine_latest_invalid<AN...>>;
55 };
56 template<class... AN>
57 using combine_latest_invalid_t = typename combine_latest_invalid<AN...>::type;
58 
59 template<class Selector, class... ObservableN>
60 struct is_combine_latest_selector_check {
61     typedef rxu::decay_t<Selector> selector_type;
62 
63     struct tag_not_valid;
64     template<class CS, class... CON>
65     static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...));
66     template<class CS, class... CON>
67     static tag_not_valid check(...);
68 
69     using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
70 
71     static const bool value = !std::is_same<type, tag_not_valid>::value;
72 };
73 
74 template<class Selector, class... ObservableN>
75 struct invalid_combine_latest_selector {
76     static const bool value = false;
77 };
78 
79 template<class Selector, class... ObservableN>
80 struct is_combine_latest_selector : public std::conditional<
81     is_combine_latest_selector_check<Selector, ObservableN...>::value,
82     is_combine_latest_selector_check<Selector, ObservableN...>,
83     invalid_combine_latest_selector<Selector, ObservableN...>>::type {
84 };
85 
86 template<class Selector, class... ON>
87 using result_combine_latest_selector_t = typename is_combine_latest_selector<Selector, ON...>::type;
88 
89 template<class Coordination, class Selector, class... ObservableN>
90 struct combine_latest_traits {
91 
92     typedef std::tuple<ObservableN...> tuple_source_type;
93     typedef std::tuple<rxu::detail::maybe<typename ObservableN::value_type>...> tuple_source_value_type;
94 
95     typedef rxu::decay_t<Selector> selector_type;
96     typedef rxu::decay_t<Coordination> coordination_type;
97 
98     typedef typename is_combine_latest_selector<selector_type, ObservableN...>::type value_type;
99 };
100 
101 template<class Coordination, class Selector, class... ObservableN>
102 struct combine_latest : public operator_base<rxu::value_type_t<combine_latest_traits<Coordination, Selector, ObservableN...>>>
103 {
104     typedef combine_latest<Coordination, Selector, ObservableN...> this_type;
105 
106     typedef combine_latest_traits<Coordination, Selector, ObservableN...> traits;
107 
108     typedef typename traits::tuple_source_type tuple_source_type;
109     typedef typename traits::tuple_source_value_type tuple_source_value_type;
110 
111     typedef typename traits::selector_type selector_type;
112 
113     typedef typename traits::coordination_type coordination_type;
114     typedef typename coordination_type::coordinator_type coordinator_type;
115 
116     struct values
117     {
valuesrxcpp::operators::detail::combine_latest::values118         values(tuple_source_type o, selector_type s, coordination_type sf)
119             : source(std::move(o))
120             , selector(std::move(s))
121             , coordination(std::move(sf))
122         {
123         }
124         tuple_source_type source;
125         selector_type selector;
126         coordination_type coordination;
127     };
128     values initial;
129 
combine_latestrxcpp::operators::detail::combine_latest130     combine_latest(coordination_type sf, selector_type s, tuple_source_type ts)
131         : initial(std::move(ts), std::move(s), std::move(sf))
132     {
133     }
134 
135     template<int Index, class State>
subscribe_onerxcpp::operators::detail::combine_latest136     void subscribe_one(std::shared_ptr<State> state) const {
137 
138         typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
139 
140         composite_subscription innercs;
141 
142         // when the out observer is unsubscribed all the
143         // inner subscriptions are unsubscribed as well
144         state->out.add(innercs);
145 
146         auto source = on_exception(
147             [&](){return state->coordinator.in(std::get<Index>(state->source));},
148             state->out);
149         if (source.empty()) {
150             return;
151         }
152 
153         // this subscribe does not share the observer subscription
154         // so that when it is unsubscribed the observer can be called
155         // until the inner subscriptions have finished
156         auto sink = make_subscriber<source_value_type>(
157             state->out,
158             innercs,
159         // on_next
160             [state](source_value_type st) {
161                 auto& value = std::get<Index>(state->latest);
162 
163                 if (value.empty()) {
164                     ++state->valuesSet;
165                 }
166 
167                 value.reset(st);
168 
169                 if (state->valuesSet == sizeof... (ObservableN)) {
170                     auto values = rxu::surely(state->latest);
171                     auto selectedResult = rxu::apply(values, state->selector);
172                     state->out.on_next(selectedResult);
173                 }
174             },
175         // on_error
176             [state](rxu::error_ptr e) {
177                 state->out.on_error(e);
178             },
179         // on_completed
180             [state]() {
181                 if (--state->pendingCompletions == 0) {
182                     state->out.on_completed();
183                 }
184             }
185         );
186         auto selectedSink = on_exception(
187             [&](){return state->coordinator.out(sink);},
188             state->out);
189         if (selectedSink.empty()) {
190             return;
191         }
192         source->subscribe(std::move(selectedSink.get()));
193     }
194     template<class State, int... IndexN>
subscribe_allrxcpp::operators::detail::combine_latest195     void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const {
196         bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
197         subscribed[0] = (*subscribed); // silence warning
198     }
199 
200     template<class Subscriber>
on_subscriberxcpp::operators::detail::combine_latest201     void on_subscribe(Subscriber scbr) const {
202         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
203 
204         typedef Subscriber output_type;
205 
206         struct combine_latest_state_type
207             : public std::enable_shared_from_this<combine_latest_state_type>
208             , public values
209         {
210             combine_latest_state_type(values i, coordinator_type coor, output_type oarg)
211                 : values(std::move(i))
212                 , pendingCompletions(sizeof... (ObservableN))
213                 , valuesSet(0)
214                 , coordinator(std::move(coor))
215                 , out(std::move(oarg))
216             {
217             }
218 
219             // on_completed on the output must wait until all the
220             // subscriptions have received on_completed
221             mutable int pendingCompletions;
222             mutable int valuesSet;
223             mutable tuple_source_value_type latest;
224             coordinator_type coordinator;
225             output_type out;
226         };
227 
228         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
229 
230         // take a copy of the values for each subscription
231         auto state = std::make_shared<combine_latest_state_type>(initial, std::move(coordinator), std::move(scbr));
232 
233         subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type());
234     }
235 };
236 
237 }
238 
239 /*! @copydoc rx-combine_latest.hpp
240 */
241 template<class... AN>
combine_latest(AN &&...an)242 auto combine_latest(AN&&... an)
243     ->     operator_factory<combine_latest_tag, AN...> {
244     return operator_factory<combine_latest_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
245 }
246 
247 }
248 
249 template<>
250 struct member_overload<combine_latest_tag>
251 {
252     template<class Observable, class... ObservableN,
253         class Enabled = rxu::enable_if_all_true_type_t<
254             all_observables<Observable, ObservableN...>>,
255         class combine_latest = rxo::detail::combine_latest<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
256         class Value = rxu::value_type_t<combine_latest>,
257         class Result = observable<Value, combine_latest>>
memberrxcpp::member_overload258     static Result member(Observable&& o, ObservableN&&... on)
259     {
260         return Result(combine_latest(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
261     }
262 
263     template<class Observable, class Selector, class... ObservableN,
264         class Enabled = rxu::enable_if_all_true_type_t<
265             operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>,
266             all_observables<Observable, ObservableN...>>,
267         class ResolvedSelector = rxu::decay_t<Selector>,
268         class combine_latest = rxo::detail::combine_latest<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
269         class Value = rxu::value_type_t<combine_latest>,
270         class Result = observable<Value, combine_latest>>
memberrxcpp::member_overload271     static Result member(Observable&& o, Selector&& s, ObservableN&&... on)
272     {
273         return Result(combine_latest(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
274     }
275 
276     template<class Coordination, class Observable, class... ObservableN,
277         class Enabled = rxu::enable_if_all_true_type_t<
278             is_coordination<Coordination>,
279             all_observables<Observable, ObservableN...>>,
280         class combine_latest = rxo::detail::combine_latest<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
281         class Value = rxu::value_type_t<combine_latest>,
282         class Result = observable<Value, combine_latest>>
memberrxcpp::member_overload283     static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on)
284     {
285         return Result(combine_latest(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
286     }
287 
288     template<class Coordination, class Selector, class Observable, class... ObservableN,
289         class Enabled = rxu::enable_if_all_true_type_t<
290             is_coordination<Coordination>,
291             operators::detail::is_combine_latest_selector<Selector, Observable, ObservableN...>,
292             all_observables<Observable, ObservableN...>>,
293         class ResolvedSelector = rxu::decay_t<Selector>,
294         class combine_latest = rxo::detail::combine_latest<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
295         class Value = rxu::value_type_t<combine_latest>,
296         class Result = observable<Value, combine_latest>>
memberrxcpp::member_overload297     static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
298     {
299         return Result(combine_latest(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
300     }
301 
302     template<class... AN>
memberrxcpp::member_overload303     static operators::detail::combine_latest_invalid_t<AN...> member(const AN&...) {
304         std::terminate();
305         return {};
306         static_assert(sizeof...(AN) == 10000, "combine_latest takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
307     }
308 };
309 
310 }
311 
312 #endif
313