1 #include "../test.h"
2 
3 namespace detail {
4 
5 template<class Predicate>
6 struct liftfilter
7 {
8     typedef typename std::decay<Predicate>::type test_type;
9     test_type test;
10 
liftfilterdetail::liftfilter11     liftfilter(test_type t)
12         : test(t)
13     {
14     }
15 
16     template<class Subscriber>
17     struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type>
18     {
19         typedef filter_observer<Subscriber> this_type;
20         typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type;
21         typedef typename base_type::value_type value_type;
22         typedef typename std::decay<Subscriber>::type dest_type;
23         typedef rx::observer<value_type, this_type> observer_type;
24         dest_type dest;
25         test_type test;
26 
filter_observerdetail::liftfilter::filter_observer27         filter_observer(dest_type d, test_type t)
28             : dest(d)
29             , test(t)
30         {
31         }
on_nextdetail::liftfilter::filter_observer32         void on_next(typename dest_type::value_type v) const {
33             bool filtered = false;
34             RXCPP_TRY {
35                filtered = !test(v);
36             } RXCPP_CATCH(...) {
37                 dest.on_error(rxu::current_exception());
38                 return;
39             }
40             if (!filtered) {
41                 dest.on_next(v);
42             }
43         }
on_errordetail::liftfilter::filter_observer44         void on_error(rxu::error_ptr e) const {
45             dest.on_error(e);
46         }
on_completeddetail::liftfilter::filter_observer47         void on_completed() const {
48             dest.on_completed();
49         }
50 
makedetail::liftfilter::filter_observer51         static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) {
52             return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t)));
53         }
54     };
55 
56     template<class Subscriber>
operator ()detail::liftfilter57     auto operator()(const Subscriber& dest) const
58         -> decltype(filter_observer<Subscriber>::make(dest, test)) {
59         return      filter_observer<Subscriber>::make(dest, test);
60     }
61 };
62 
63 }
64 
65 namespace {
66 
67 template<class Predicate>
liftfilter(Predicate && p)68 auto liftfilter(Predicate&& p)
69     ->      detail::liftfilter<typename std::decay<Predicate>::type> {
70     return  detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p));
71 }
72 
IsPrime(int x)73 bool IsPrime(int x)
74 {
75     if (x < 2) return false;
76     for (int i = 2; i <= x/2; ++i)
77     {
78         if (x % i == 0)
79             return false;
80     }
81     return true;
82 }
83 
84 }
85 
86 SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){
87     GIVEN("a test hot observable of ints"){
88         auto sc = rxsc::make_test();
89         auto w = sc.create_worker();
90         const rxsc::test::messages<int> on;
91 
92         long invoked = 0;
93 
94         auto xs = sc.make_hot_observable({
95             on.next(110, 1),
96             on.next(180, 2),
97             on.next(230, 3),
98             on.next(270, 4),
99             on.next(340, 5),
100             on.next(380, 6),
101             on.next(390, 7),
102             on.next(450, 8),
103             on.next(470, 9),
104             on.next(560, 10),
105             on.next(580, 11),
106             on.completed(600)
107         });
108 
109         WHEN("filtered to ints that are primes"){
110 
111             auto res = w.start(
__anon3f4dc8d90202() 112                 [&xs, &invoked]() {
113                     return xs
114                         .lift<int>(liftfilter([&invoked](int x) {
115                             invoked++;
116                             return IsPrime(x);
117                         }))
118                         // forget type to workaround lambda deduction bug on msvc 2013
119                         .as_dynamic();
120                 },
121                 400
122             );
123 
124             THEN("the output only contains primes that arrived before disposal"){
125                 auto required = rxu::to_vector({
126                     on.next(230, 3),
127                     on.next(340, 5),
128                     on.next(390, 7)
129                 });
130                 auto actual = res.get_observer().messages();
131                 REQUIRE(required == actual);
132             }
133 
134             THEN("there was one subscription and one unsubscription"){
135                 auto required = rxu::to_vector({
136                     on.subscribe(200, 400)
137                 });
138                 auto actual = xs.subscriptions();
139                 REQUIRE(required == actual);
140             }
141 
142             THEN("where was called until disposed"){
143                 REQUIRE(5 == invoked);
144             }
145         }
146     }
147 }
148 
149 SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){
150     GIVEN("a test hot observable of ints"){
151         auto sc = rxsc::make_test();
152         auto w = sc.create_worker();
153         const rxsc::test::messages<int> on;
154 
155         long invoked = 0;
156 
157         auto xs = sc.make_hot_observable({
158             on.next(110, 1),
159             on.next(180, 2),
160             on.next(230, 3),
161             on.next(270, 4),
162             on.next(340, 5),
163             on.next(380, 6),
164             on.next(390, 7),
165             on.next(450, 8),
166             on.next(470, 9),
167             on.next(560, 10),
168             on.next(580, 11),
169             on.completed(600)
170         });
171 
172         WHEN("filtered to ints that are primes"){
173 
174             auto res = w.start(
__anon3f4dc8d90402() 175                 [&xs, &invoked]() {
176                     return xs
177                         >> rxo::lift<int>(liftfilter([&invoked](int x) {
178                             invoked++;
179                             return IsPrime(x);
180                         }))
181                         // forget type to workaround lambda deduction bug on msvc 2013
182                         >> rxo::as_dynamic();
183                 },
184                 400
185             );
186 
187             THEN("the output only contains primes that arrived before disposal"){
188                 auto required = rxu::to_vector({
189                     on.next(230, 3),
190                     on.next(340, 5),
191                     on.next(390, 7)
192                 });
193                 auto actual = res.get_observer().messages();
194                 REQUIRE(required == actual);
195             }
196 
197             THEN("there was one subscription and one unsubscription"){
198                 auto required = rxu::to_vector({
199                     on.subscribe(200, 400)
200                 });
201                 auto actual = xs.subscriptions();
202                 REQUIRE(required == actual);
203             }
204 
205             THEN("where was called until disposed"){
206                 REQUIRE(5 == invoked);
207             }
208         }
209     }
210 }
211 
212 SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){
213     GIVEN("a test hot observable of ints"){
214         auto sc = rxsc::make_test();
215         auto w = sc.create_worker();
216         const rxsc::test::messages<int> on;
217 
218         long invoked = 0;
219 
220         auto xs = sc.make_hot_observable({
221             on.next(110, 1),
222             on.next(180, 2),
223             on.next(230, 3),
224             on.next(270, 4),
225             on.next(340, 5),
226             on.next(380, 6),
227             on.next(390, 7),
228             on.next(450, 8),
229             on.next(470, 9),
230             on.next(560, 10),
231             on.next(580, 11),
232             on.completed(600)
233         });
234 
235         WHEN("filtered to ints that are primes"){
236 
237             auto res = w.start(
__anon3f4dc8d90602() 238                 [&xs, &invoked]() {
239                     auto predicate = [&](int x){
240                         invoked++;
241                         return IsPrime(x);
242                     };
243                     return xs
244                         .lift<int>([=](rx::subscriber<int> dest){
245                             // VS2013 deduction issue requires dynamic (type-forgetting)
246                             return rx::make_subscriber<int>(
247                                 dest,
248                                 rx::make_observer_dynamic<int>(
249                                     [=](int n){
250                                         bool pass = false;
251                                         RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());};
252                                         if (pass) {dest.on_next(n);}
253                                     },
254                                     [=](rxu::error_ptr e){dest.on_error(e);},
255                                     [=](){dest.on_completed();}));
256                         })
257                         // forget type to workaround lambda deduction bug on msvc 2013
258                         .as_dynamic();
259                 },
260                 400
261             );
262 
263             THEN("the output only contains primes that arrived before disposal"){
264                 auto required = rxu::to_vector({
265                     on.next(230, 3),
266                     on.next(340, 5),
267                     on.next(390, 7)
268                 });
269                 auto actual = res.get_observer().messages();
270                 REQUIRE(required == actual);
271             }
272 
273             THEN("there was one subscription and one unsubscription"){
274                 auto required = rxu::to_vector({
275                     on.subscribe(200, 400)
276                 });
277                 auto actual = xs.subscriptions();
278                 REQUIRE(required == actual);
279             }
280 
281             THEN("where was called until disposed"){
282                 REQUIRE(5 == invoked);
283             }
284         }
285     }
286 }
287 
288