1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-sequence_equal.hpp
6 
7     \brief Determine whether two Observables emit the same sequence of items.
8 
9     \tparam OtherSource      the type of the other observable.
10     \tparam BinaryPredicate  the type of the value comparing function (optional). The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
11     \tparam Coordination  the type of the scheduler (optional).
12 
13     \param t     the other Observable that emits items to compare.
14     \param pred  the function that implements comparison of two values (optional).
15     \param cn    the scheduler (optional).
16 
17     \return  Observable that emits true only if both sequences terminate normally after emitting the same sequence of items in the same order; otherwise it will emit false.
18 
19     \sample
20     \snippet sequence_equal.cpp sequence_equal sample
21     \snippet output.txt sequence_equal sample
22 */
23 
24 #if !defined(RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP)
25 #define RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP
26 
27 #include "../rx-includes.hpp"
28 
29 namespace rxcpp {
30 
31 namespace operators {
32 
33 namespace detail {
34 
35 template<class... AN>
36 struct sequence_equal_invalid_arguments {};
37 
38 template<class... AN>
39 struct sequence_equal_invalid : public rxo::operator_base<sequence_equal_invalid_arguments<AN...>> {
40     using type = observable<sequence_equal_invalid_arguments<AN...>, sequence_equal_invalid<AN...>>;
41 };
42 template<class... AN>
43 using sequence_equal_invalid_t = typename sequence_equal_invalid<AN...>::type;
44 
45 template<class T, class Observable, class OtherObservable, class BinaryPredicate, class Coordination>
46 struct sequence_equal : public operator_base<bool>
47 {
48     typedef rxu::decay_t<Observable> source_type;
49     typedef rxu::decay_t<T> source_value_type;
50     typedef rxu::decay_t<OtherObservable> other_source_type;
51     typedef typename other_source_type::value_type other_source_value_type;
52     typedef rxu::decay_t<BinaryPredicate> predicate_type;
53     typedef rxu::decay_t<Coordination> coordination_type;
54     typedef typename coordination_type::coordinator_type coordinator_type;
55 
56     struct values {
valuesrxcpp::operators::detail::sequence_equal::values57         values(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
58                 : source(std::move(s))
59                 , other(std::move(t))
60                 , pred(std::move(pred))
61                 , coordination(std::move(sf))
62         {
63         }
64 
65         source_type source;
66         other_source_type other;
67         predicate_type pred;
68         coordination_type coordination;
69     };
70 
71     values initial;
72 
sequence_equalrxcpp::operators::detail::sequence_equal73     sequence_equal(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
74         : initial(std::move(s), std::move(t), std::move(pred), std::move(sf))
75     {
76     }
77 
78     template<class Subscriber>
on_subscriberxcpp::operators::detail::sequence_equal79     void on_subscribe(Subscriber s) const {
80 
81         typedef Subscriber output_type;
82 
83         struct state_type
84             : public std::enable_shared_from_this<state_type>
85             , public values
86         {
87             state_type(const values& vals, coordinator_type coor, const output_type& o)
88                 : values(vals)
89                 , coordinator(std::move(coor))
90                 , out(o)
91                 , source_completed(false)
92                 , other_completed(false)
93             {
94                 out.add(other_lifetime);
95                 out.add(source_lifetime);
96             }
97 
98             composite_subscription other_lifetime;
99             composite_subscription source_lifetime;
100             coordinator_type coordinator;
101             output_type out;
102 
103             mutable std::list<source_value_type> source_values;
104             mutable std::list<other_source_value_type> other_values;
105             mutable bool source_completed;
106             mutable bool other_completed;
107         };
108 
109         auto coordinator = initial.coordination.create_coordinator();
110         auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
111 
112         auto other = on_exception(
113             [&](){ return state->coordinator.in(state->other); },
114             state->out);
115         if (other.empty()) {
116             return;
117         }
118 
119         auto source = on_exception(
120             [&](){ return state->coordinator.in(state->source); },
121             state->out);
122         if (source.empty()) {
123             return;
124         }
125 
126         auto check_equal = [state]() {
127             if(!state->source_values.empty() && !state->other_values.empty()) {
128                 auto x = std::move(state->source_values.front());
129                 state->source_values.pop_front();
130 
131                 auto y = std::move(state->other_values.front());
132                 state->other_values.pop_front();
133 
134                 if (!state->pred(x, y)) {
135                     state->out.on_next(false);
136                     state->out.on_completed();
137                 }
138             } else {
139                 if((!state->source_values.empty() && state->other_completed) ||
140                    (!state->other_values.empty() && state->source_completed)) {
141                     state->out.on_next(false);
142                     state->out.on_completed();
143                 }
144             }
145         };
146 
147         auto check_complete = [state]() {
148             if(state->source_completed && state->other_completed) {
149                 state->out.on_next(state->source_values.empty() && state->other_values.empty());
150                 state->out.on_completed();
151             }
152         };
153 
154         auto sinkOther = make_subscriber<other_source_value_type>(
155             state->out,
156             state->other_lifetime,
157             // on_next
158             [state, check_equal](other_source_value_type t) {
159                 auto& values = state->other_values;
160                 values.push_back(t);
161                 check_equal();
162             },
163             // on_error
164             [state](rxu::error_ptr e) {
165                 state->out.on_error(e);
166             },
167             // on_completed
168             [state, check_complete]() {
169                 auto& completed = state->other_completed;
170                 completed = true;
171                 check_complete();
172             }
173         );
174 
175         auto selectedSinkOther = on_exception(
176             [&](){ return state->coordinator.out(sinkOther); },
177             state->out);
178         if (selectedSinkOther.empty()) {
179             return;
180         }
181         other->subscribe(std::move(selectedSinkOther.get()));
182 
183         source.get().subscribe(
184             state->source_lifetime,
185             // on_next
186             [state, check_equal](source_value_type t) {
187                 auto& values = state->source_values;
188                 values.push_back(t);
189                 check_equal();
190             },
191             // on_error
192             [state](rxu::error_ptr e) {
193                 state->out.on_error(e);
194             },
195             // on_completed
196             [state, check_complete]() {
197                 auto& completed = state->source_completed;
198                 completed = true;
199                 check_complete();
200             }
201         );
202     }
203 };
204 
205 }
206 
207 /*! @copydoc rx-sequence_equal.hpp
208 */
209 template<class... AN>
sequence_equal(AN &&...an)210 auto sequence_equal(AN&&... an)
211     ->     operator_factory<sequence_equal_tag, AN...> {
212     return operator_factory<sequence_equal_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
213 }
214 
215 }
216 
217 template<>
218 struct member_overload<sequence_equal_tag>
219 {
220     template<class Observable, class OtherObservable,
221         class Enabled = rxu::enable_if_all_true_type_t<
222             is_observable<Observable>,
223             is_observable<OtherObservable>>,
224         class SourceValue = rxu::value_type_t<Observable>,
225         class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::equal_to<>, identity_one_worker>,
226         class Value = rxu::value_type_t<SequenceEqual>,
227         class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload228     static Result member(Observable&& o, OtherObservable&& t) {
229         return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(), identity_current_thread()));
230     }
231 
232     template<class Observable, class OtherObservable, class BinaryPredicate,
233         class IsCoordination = is_coordination<BinaryPredicate>,
234         class Enabled = rxu::enable_if_all_true_type_t<
235             is_observable<Observable>,
236             is_observable<OtherObservable>,
237             rxu::negation<IsCoordination>>,
238         class SourceValue = rxu::value_type_t<Observable>,
239         class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::decay_t<BinaryPredicate>, identity_one_worker>,
240         class Value = rxu::value_type_t<SequenceEqual>,
241         class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload242     static Result member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred) {
243         return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred), identity_current_thread()));
244     }
245 
246     template<class Observable, class OtherObservable, class Coordination,
247         class Enabled = rxu::enable_if_all_true_type_t<
248             is_observable<Observable>,
249             is_observable<OtherObservable>,
250             is_coordination<Coordination>>,
251         class SourceValue = rxu::value_type_t<Observable>,
252         class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::equal_to<>, rxu::decay_t<Coordination>>,
253         class Value = rxu::value_type_t<SequenceEqual>,
254         class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload255     static Result member(Observable&& o, OtherObservable&& t, Coordination&& cn) {
256         return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(), std::forward<Coordination>(cn)));
257     }
258 
259     template<class Observable, class OtherObservable, class BinaryPredicate, class Coordination,
260         class Enabled = rxu::enable_if_all_true_type_t<
261             is_observable<Observable>,
262             is_observable<OtherObservable>,
263             is_coordination<Coordination>>,
264         class SourceValue = rxu::value_type_t<Observable>,
265         class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<Coordination>>,
266         class Value = rxu::value_type_t<SequenceEqual>,
267         class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload268     static Result member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred, Coordination&& cn) {
269         return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred), std::forward<Coordination>(cn)));
270     }
271 
272     template<class... AN>
memberrxcpp::member_overload273     static operators::detail::sequence_equal_invalid_t<AN...> member(const AN&...) {
274         std::terminate();
275         return {};
276         static_assert(sizeof...(AN) == 10000, "sequence_equal takes (OtherObservable, optional BinaryPredicate, optional Coordination)");
277     }
278 };
279 
280 }
281 
282 #endif
283