1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_OPERATORS_RX_ZIP_HPP)
6 #define RXCPP_OPERATORS_RX_ZIP_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 /*! \file rx-zip.hpp
11 
12     \brief Bring by one item from all given observables and select a value to emit from the new observable that is returned.
13 
14     \tparam AN  types of scheduler (optional), aggregate function (optional), and source observables
15 
16     \param  an  scheduler (optional), aggregation function (optional), and source observables
17 
18     \return  Observable that emits the result of combining the items emitted and brought by one from each of the source observables.
19 
20     If scheduler is omitted, identity_current_thread is used.
21 
22     If aggregation function is omitted, the resulting observable returns tuples of emitted items.
23 
24     \sample
25 
26     Neither scheduler nor aggregation function are present:
27     \snippet zip.cpp zip sample
28     \snippet output.txt zip sample
29 
30     Only scheduler is present:
31     \snippet zip.cpp Coordination zip sample
32     \snippet output.txt Coordination zip sample
33 
34     Only aggregation function is present:
35     \snippet zip.cpp Selector zip sample
36     \snippet output.txt Selector zip sample
37 
38     Both scheduler and aggregation function are present:
39     \snippet zip.cpp Coordination+Selector zip sample
40     \snippet output.txt Coordination+Selector zip sample
41 */
42 
43 namespace rxcpp {
44 
45 namespace operators {
46 
47 namespace detail {
48 
49 template<class Observable>
50 struct zip_source_state
51 {
52     using value_type = rxu::value_type_t<Observable>;
zip_source_staterxcpp::operators::detail::zip_source_state53     zip_source_state()
54         : completed(false)
55     {
56     }
57     std::list<value_type> values;
58     bool completed;
59 };
60 
61 struct values_not_empty {
62     template<class Observable>
operator ()rxcpp::operators::detail::values_not_empty63     bool operator()(zip_source_state<Observable>& source) const {
64         return !source.values.empty();
65     }
66 };
67 
68 struct source_completed_values_empty {
69     template<class Observable>
operator ()rxcpp::operators::detail::source_completed_values_empty70     bool operator()(zip_source_state<Observable>& source) const {
71         return source.completed && source.values.empty();
72     }
73 };
74 
75 struct extract_value_front {
76     template<class Observable, class Value = rxu::value_type_t<Observable>>
operator ()rxcpp::operators::detail::extract_value_front77     Value operator()(zip_source_state<Observable>& source) const {
78         auto val = std::move(source.values.front());
79         source.values.pop_front();
80         return val;
81     }
82 };
83 
84 template<class... AN>
85 struct zip_invalid_arguments {};
86 
87 template<class... AN>
88 struct zip_invalid : public rxo::operator_base<zip_invalid_arguments<AN...>> {
89     using type = observable<zip_invalid_arguments<AN...>, zip_invalid<AN...>>;
90 };
91 template<class... AN>
92 using zip_invalid_t = typename zip_invalid<AN...>::type;
93 
94 template<class Selector, class... ObservableN>
95 struct is_zip_selector_check {
96     typedef rxu::decay_t<Selector> selector_type;
97 
98     struct tag_not_valid;
99     template<class CS, class... CON>
100     static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...));
101     template<class CS, class... CON>
102     static tag_not_valid check(...);
103 
104     using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0));
105 
106     static const bool value = !std::is_same<type, tag_not_valid>::value;
107 };
108 
109 template<class Selector, class... ObservableN>
110 struct invalid_zip_selector {
111     static const bool value = false;
112 };
113 
114 template<class Selector, class... ObservableN>
115 struct is_zip_selector : public std::conditional<
116     is_zip_selector_check<Selector, ObservableN...>::value,
117     is_zip_selector_check<Selector, ObservableN...>,
118     invalid_zip_selector<Selector, ObservableN...>>::type {
119 };
120 
121 template<class Selector, class... ON>
122 using result_zip_selector_t = typename is_zip_selector<Selector, ON...>::type;
123 
124 template<class Coordination, class Selector, class... ObservableN>
125 struct zip_traits {
126     typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type;
127     typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type;
128 
129     typedef rxu::decay_t<Selector> selector_type;
130     typedef rxu::decay_t<Coordination> coordination_type;
131 
132     typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type;
133 };
134 
135 template<class Coordination, class Selector, class... ObservableN>
136 struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>>
137 {
138     typedef zip<Coordination, Selector, ObservableN...> this_type;
139 
140     typedef zip_traits<Coordination, Selector, ObservableN...> traits;
141 
142     typedef typename traits::tuple_source_type tuple_source_type;
143     typedef typename traits::tuple_source_values_type tuple_source_values_type;
144 
145     typedef typename traits::selector_type selector_type;
146 
147     typedef typename traits::coordination_type coordination_type;
148     typedef typename coordination_type::coordinator_type coordinator_type;
149 
150     struct values
151     {
valuesrxcpp::operators::detail::zip::values152         values(tuple_source_type o, selector_type s, coordination_type sf)
153             : source(std::move(o))
154             , selector(std::move(s))
155             , coordination(std::move(sf))
156         {
157         }
158         tuple_source_type source;
159         selector_type selector;
160         coordination_type coordination;
161     };
162     values initial;
163 
ziprxcpp::operators::detail::zip164     zip(coordination_type sf, selector_type s, tuple_source_type ts)
165         : initial(std::move(ts), std::move(s), std::move(sf))
166     {
167     }
168 
169     template<int Index, class State>
subscribe_onerxcpp::operators::detail::zip170     void subscribe_one(std::shared_ptr<State> state) const {
171 
172         typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type;
173 
174         composite_subscription innercs;
175 
176         // when the out observer is unsubscribed all the
177         // inner subscriptions are unsubscribed as well
178         state->out.add(innercs);
179 
180         auto source = on_exception(
181             [&](){return state->coordinator.in(std::get<Index>(state->source));},
182             state->out);
183         if (source.empty()) {
184             return;
185         }
186 
187         // this subscribe does not share the observer subscription
188         // so that when it is unsubscribed the observer can be called
189         // until the inner subscriptions have finished
190         auto sink = make_subscriber<source_value_type>(
191             state->out,
192             innercs,
193         // on_next
194             [state](source_value_type st) {
195                 auto& values = std::get<Index>(state->pending).values;
196                 values.push_back(st);
197                 if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) {
198                     auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector);
199                     state->out.on_next(selectedResult);
200                 }
201                 if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) {
202                     state->out.on_completed();
203                 }
204             },
205         // on_error
206             [state](rxu::error_ptr e) {
207                 state->out.on_error(e);
208             },
209         // on_completed
210             [state]() {
211                 auto& completed = std::get<Index>(state->pending).completed;
212                 completed = true;
213                 if (--state->pendingCompletions == 0) {
214                     state->out.on_completed();
215                 }
216             }
217         );
218         auto selectedSink = on_exception(
219             [&](){return state->coordinator.out(sink);},
220             state->out);
221         if (selectedSink.empty()) {
222             return;
223         }
224         source->subscribe(std::move(selectedSink.get()));
225     }
226     template<class State, int... IndexN>
subscribe_allrxcpp::operators::detail::zip227     void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const {
228         bool subscribed[] = {(subscribe_one<IndexN>(state), true)...};
229         subscribed[0] = (*subscribed); // silence warning
230     }
231 
232     template<class Subscriber>
on_subscriberxcpp::operators::detail::zip233     void on_subscribe(Subscriber scbr) const {
234         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
235 
236         typedef Subscriber output_type;
237 
238         struct zip_state_type
239             : public std::enable_shared_from_this<zip_state_type>
240             , public values
241         {
242             zip_state_type(values i, coordinator_type coor, output_type oarg)
243                 : values(std::move(i))
244                 , pendingCompletions(sizeof... (ObservableN))
245                 , valuesSet(0)
246                 , coordinator(std::move(coor))
247                 , out(std::move(oarg))
248             {
249             }
250 
251             // on_completed on the output must wait until all the
252             // subscriptions have received on_completed
253             mutable int pendingCompletions;
254             mutable int valuesSet;
255             mutable tuple_source_values_type pending;
256             coordinator_type coordinator;
257             output_type out;
258         };
259 
260         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
261 
262         // take a copy of the values for each subscription
263         auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr));
264 
265         subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type());
266     }
267 };
268 
269 }
270 
271 /*! @copydoc rx-zip.hpp
272 */
273 template<class... AN>
zip(AN &&...an)274 auto zip(AN&&... an)
275     ->     operator_factory<zip_tag, AN...> {
276     return operator_factory<zip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
277 }
278 
279 }
280 
281 template<>
282 struct member_overload<zip_tag>
283 {
284     template<class Observable, class... ObservableN,
285         class Enabled = rxu::enable_if_all_true_type_t<
286             all_observables<Observable, ObservableN...>>,
287         class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
288         class Value = rxu::value_type_t<Zip>,
289         class Result = observable<Value, Zip>>
memberrxcpp::member_overload290     static Result member(Observable&& o, ObservableN&&... on)
291     {
292         return Result(Zip(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
293     }
294 
295     template<class Observable, class Selector, class... ObservableN,
296         class Enabled = rxu::enable_if_all_true_type_t<
297             operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
298             all_observables<Observable, ObservableN...>>,
299         class ResolvedSelector = rxu::decay_t<Selector>,
300         class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
301         class Value = rxu::value_type_t<Zip>,
302         class Result = observable<Value, Zip>>
memberrxcpp::member_overload303     static Result member(Observable&& o, Selector&& s, ObservableN&&... on)
304     {
305         return Result(Zip(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
306     }
307 
308     template<class Coordination, class Observable, class... ObservableN,
309         class Enabled = rxu::enable_if_all_true_type_t<
310             is_coordination<Coordination>,
311             all_observables<Observable, ObservableN...>>,
312         class Zip = rxo::detail::zip<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
313         class Value = rxu::value_type_t<Zip>,
314         class Result = observable<Value, Zip>>
memberrxcpp::member_overload315     static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on)
316     {
317         return Result(Zip(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
318     }
319 
320     template<class Coordination, class Selector, class Observable, class... ObservableN,
321         class Enabled = rxu::enable_if_all_true_type_t<
322             is_coordination<Coordination>,
323             operators::detail::is_zip_selector<Selector, Observable, ObservableN...>,
324             all_observables<Observable, ObservableN...>>,
325         class ResolvedSelector = rxu::decay_t<Selector>,
326         class Zip = rxo::detail::zip<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>,
327         class Value = rxu::value_type_t<Zip>,
328         class Result = observable<Value, Zip>>
memberrxcpp::member_overload329     static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on)
330     {
331         return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...)));
332     }
333 
334     template<class... AN>
memberrxcpp::member_overload335     static operators::detail::zip_invalid_t<AN...> member(const AN&...) {
336         std::terminate();
337         return {};
338         static_assert(sizeof...(AN) == 10000, "zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)");
339     }
340 };
341 
342 }
343 
344 #endif
345