1 #include "../test.h"
2 #include <rxcpp/operators/rx-take_while.hpp>
3 
4 namespace {
5     class not_equal_to {
6         int value;
7     public:
not_equal_to(int value)8         not_equal_to(int value) : value(value) { }
operator ()(int i) const9         bool operator()(int i) const {
10             return i != value;
11         }
12     };
13 }
14 
15 SCENARIO("take_while not equal to 4", "[take_while][operators]"){
16     GIVEN("a source"){
17         auto sc = rxsc::make_test();
18         auto w = sc.create_worker();
19         const rxsc::test::messages<int> on;
20 
21         auto xs = sc.make_hot_observable({
22             on.next(150, 1),
23             on.next(210, 2),
24             on.next(220, 3),
25             on.next(230, 4),
26             on.next(240, 5),
27             on.completed(250)
28         });
29 
30         WHEN("values before 4 are taken"){
31 
32             auto res = w.start(
__anon6e0104070202() 33                 [xs]() {
34                     return xs
35                         .take_while(not_equal_to(4))
36                         .as_dynamic();
37                 }
38             );
39 
40             THEN("the output only contains items sent while subscribed"){
41                 auto required = rxu::to_vector({
42                     on.next(210, 2),
43                     on.next(220, 3),
44                     on.completed(230)
45                 });
46                 auto actual = res.get_observer().messages();
47                 REQUIRE(required == actual);
48             }
49 
50             THEN("there was 1 subscription/unsubscription to the source"){
51                 auto required = rxu::to_vector({
52                     on.subscribe(200, 230)
53                 });
54                 auto actual = xs.subscriptions();
55                 REQUIRE(required == actual);
56             }
57 
58         }
59     }
60 }
61 
62 SCENARIO("take_while, complete after", "[take_while][operators]"){
63     GIVEN("a source"){
64         auto sc = rxsc::make_test();
65         auto w = sc.create_worker();
66         const rxsc::test::messages<int> on;
67 
68         auto xs = sc.make_hot_observable({
69             on.next(70, 6),
70             on.next(150, 4),
71             on.next(210, 9),
72             on.next(230, 13),
73             on.next(270, 7),
74             on.next(280, 1),
75             on.next(300, -1),
76             on.next(310, 3),
77             on.next(340, 8),
78             on.next(370, 11),
79             on.next(410, 15),
80             on.next(415, 16),
81             on.next(460, 72),
82             on.next(510, 76),
83             on.next(560, 32),
84             on.next(570, -100),
85             on.next(580, -3),
86             on.next(590, 5),
87             on.next(630, 10),
88             on.completed(690)
89         });
90 
91         WHEN("all are taken"){
92 
93             auto res = w.start(
__anon6e0104070302() 94                 [xs]() {
95                     return xs
96                         .take_while(not_equal_to(0))
97                         // forget type to workaround lambda deduction bug on msvc 2013
98                         .as_dynamic();
99                 }
100             );
101 
102             THEN("the output only contains items sent while subscribed"){
103                 auto required = rxu::to_vector({
104                     on.next(210, 9),
105                     on.next(230, 13),
106                     on.next(270, 7),
107                     on.next(280, 1),
108                     on.next(300, -1),
109                     on.next(310, 3),
110                     on.next(340, 8),
111                     on.next(370, 11),
112                     on.next(410, 15),
113                     on.next(415, 16),
114                     on.next(460, 72),
115                     on.next(510, 76),
116                     on.next(560, 32),
117                     on.next(570, -100),
118                     on.next(580, -3),
119                     on.next(590, 5),
120                     on.next(630, 10),
121                     on.completed(690)
122                 });
123                 auto actual = res.get_observer().messages();
124                 REQUIRE(required == actual);
125             }
126 
127             THEN("there was 1 subscription/unsubscription to the source"){
128                 auto required = rxu::to_vector({
129                     on.subscribe(200, 690)
130                 });
131                 auto actual = xs.subscriptions();
132                 REQUIRE(required == actual);
133             }
134 
135         }
136     }
137 }
138 
139 SCENARIO("take_while, complete before", "[take_while][operators]"){
140     GIVEN("a source"){
141         auto sc = rxsc::make_test();
142         auto w = sc.create_worker();
143         const rxsc::test::messages<int> on;
144 
145         auto xs = sc.make_hot_observable({
146             on.next(70, 6),
147             on.next(150, 4),
148             on.next(210, 9),
149             on.next(230, 13),
150             on.next(270, 7),
151             on.next(280, 1),
152             on.next(300, -1),
153             on.next(310, 3),
154             on.next(340, 8),
155             on.next(370, 11),
156             on.next(410, 15),
157             on.next(415, 16),
158             on.next(460, 72),
159             on.next(510, 76),
160             on.next(560, 32),
161             on.next(570, -100),
162             on.next(580, -3),
163             on.next(590, 5),
164             on.next(630, 10),
165             on.completed(690)
166         });
167 
168         WHEN("10 values are taken"){
169 
170             auto res = w.start(
__anon6e0104070402() 171                 [xs]() {
172                     return xs
173                         .take_while(not_equal_to(72))
174                         // forget type to workaround lambda deduction bug on msvc 2013
175                         .as_dynamic();
176                 }
177             );
178 
179             THEN("the output only contains items sent while subscribed"){
180                 auto required = rxu::to_vector({
181                     on.next(210, 9),
182                     on.next(230, 13),
183                     on.next(270, 7),
184                     on.next(280, 1),
185                     on.next(300, -1),
186                     on.next(310, 3),
187                     on.next(340, 8),
188                     on.next(370, 11),
189                     on.next(410, 15),
190                     on.next(415, 16),
191                     on.completed(460)
192                 });
193                 auto actual = res.get_observer().messages();
194                 REQUIRE(required == actual);
195             }
196 
197             THEN("there was 1 subscription/unsubscription to the source"){
198                 auto required = rxu::to_vector({
199                     on.subscribe(200, 460)
200                 });
201                 auto actual = xs.subscriptions();
202                 REQUIRE(required == actual);
203             }
204 
205         }
206     }
207 }
208 
209 SCENARIO("take_while, error after", "[take_while][operators]"){
210     GIVEN("a source"){
211         auto sc = rxsc::make_test();
212         auto w = sc.create_worker();
213         const rxsc::test::messages<int> on;
214 
215         std::runtime_error ex("take_while on_error from source");
216 
217         auto xs = sc.make_hot_observable({
218             on.next(70, 6),
219             on.next(150, 4),
220             on.next(210, 9),
221             on.next(230, 13),
222             on.next(270, 7),
223             on.next(280, 1),
224             on.next(300, -1),
225             on.next(310, 3),
226             on.next(340, 8),
227             on.next(370, 11),
228             on.next(410, 15),
229             on.next(415, 16),
230             on.next(460, 72),
231             on.next(510, 76),
232             on.next(560, 32),
233             on.next(570, -100),
234             on.next(580, -3),
235             on.next(590, 5),
236             on.next(630, 10),
237             on.error(690, ex)
238         });
239 
240         WHEN("all values are taken"){
241 
242             auto res = w.start(
__anon6e0104070502() 243                 [xs]() {
244                     return xs
245                         .take_while(not_equal_to(0))
246                         // forget type to workaround lambda deduction bug on msvc 2013
247                         .as_dynamic();
248                 }
249             );
250 
251             THEN("the output only contains items sent while subscribed and the error"){
252                 auto required = rxu::to_vector({
253                     on.next(210, 9),
254                     on.next(230, 13),
255                     on.next(270, 7),
256                     on.next(280, 1),
257                     on.next(300, -1),
258                     on.next(310, 3),
259                     on.next(340, 8),
260                     on.next(370, 11),
261                     on.next(410, 15),
262                     on.next(415, 16),
263                     on.next(460, 72),
264                     on.next(510, 76),
265                     on.next(560, 32),
266                     on.next(570, -100),
267                     on.next(580, -3),
268                     on.next(590, 5),
269                     on.next(630, 10),
270                     on.error(690, ex)
271                 });
272                 auto actual = res.get_observer().messages();
273                 REQUIRE(required == actual);
274             }
275 
276             THEN("there was 1 subscription/unsubscription to the source"){
277                 auto required = rxu::to_vector({
278                     on.subscribe(200, 690)
279                 });
280                 auto actual = xs.subscriptions();
281                 REQUIRE(required == actual);
282             }
283 
284         }
285     }
286 }
287 
288 SCENARIO("take_while, error same", "[take_while][operators]"){
289     GIVEN("a source"){
290         auto sc = rxsc::make_test();
291         auto w = sc.create_worker();
292         const rxsc::test::messages<int> on;
293 
294         auto xs = sc.make_hot_observable({
295             on.next(70, 6),
296             on.next(150, 4),
297             on.next(210, 9),
298             on.next(230, 13),
299             on.next(270, 7),
300             on.next(280, 1),
301             on.next(300, -1),
302             on.next(310, 3),
303             on.next(340, 8),
304             on.next(370, 11),
305             on.next(410, 15),
306             on.next(415, 16),
307             on.next(460, 72),
308             on.next(510, 76),
309             on.next(560, 32),
310             on.next(570, -100),
311             on.next(580, -3),
312             on.next(590, 5),
313             on.next(630, 10),
314             on.error(690, std::runtime_error("error in unsubscribed stream"))
315         });
316 
317         WHEN("all but one values are taken"){
318 
319             auto res = w.start(
__anon6e0104070602() 320                 [xs]() {
321                     return xs
322                         .take_while(not_equal_to(10))
323                         // forget type to workaround lambda deduction bug on msvc 2013
324                         .as_dynamic();
325                 }
326             );
327 
328             THEN("the output only contains items sent while subscribed"){
329                 auto required = rxu::to_vector({
330                     on.next(210, 9),
331                     on.next(230, 13),
332                     on.next(270, 7),
333                     on.next(280, 1),
334                     on.next(300, -1),
335                     on.next(310, 3),
336                     on.next(340, 8),
337                     on.next(370, 11),
338                     on.next(410, 15),
339                     on.next(415, 16),
340                     on.next(460, 72),
341                     on.next(510, 76),
342                     on.next(560, 32),
343                     on.next(570, -100),
344                     on.next(580, -3),
345                     on.next(590, 5),
346                     on.completed(630)
347                 });
348                 auto actual = res.get_observer().messages();
349                 REQUIRE(required == actual);
350             }
351 
352             THEN("there was 1 subscription/unsubscription to the source"){
353                 auto required = rxu::to_vector({
354                     on.subscribe(200, 630)
355                 });
356                 auto actual = xs.subscriptions();
357                 REQUIRE(required == actual);
358             }
359 
360         }
361     }
362 }
363 
364 SCENARIO("take_while, error before", "[take_while][operators]"){
365     GIVEN("a source"){
366         auto sc = rxsc::make_test();
367         auto w = sc.create_worker();
368         const rxsc::test::messages<int> on;
369 
370         auto xs = sc.make_hot_observable({
371             on.next(70, 6),
372             on.next(150, 4),
373             on.next(210, 9),
374             on.next(230, 13),
375             on.next(270, 7),
376             on.next(280, 1),
377             on.next(300, -1),
378             on.next(310, 3),
379             on.next(340, 8),
380             on.next(370, 11),
381             on.next(410, 15),
382             on.next(415, 16),
383             on.next(460, 72),
384             on.next(510, 76),
385             on.next(560, 32),
386             on.next(570, -100),
387             on.next(580, -3),
388             on.next(590, 5),
389             on.next(630, 10),
390             on.error(690, std::runtime_error("error in unsubscribed stream"))
391         });
392 
393         WHEN("3 values are taken"){
394 
395             auto res = w.start(
__anon6e0104070702() 396                 [xs]() {
397                     return xs
398                         .take_while(not_equal_to(1))
399                         // forget type to workaround lambda deduction bug on msvc 2013
400                         .as_dynamic();
401                 }
402             );
403 
404             THEN("the output only contains items sent while subscribed"){
405                 auto required = rxu::to_vector({
406                     on.next(210, 9),
407                     on.next(230, 13),
408                     on.next(270, 7),
409                     on.completed(280)
410                 });
411                 auto actual = res.get_observer().messages();
412                 REQUIRE(required == actual);
413             }
414 
415             THEN("there was 1 subscription/unsubscription to the source"){
416                 auto required = rxu::to_vector({
417                     on.subscribe(200, 280)
418                 });
419                 auto actual = xs.subscriptions();
420                 REQUIRE(required == actual);
421             }
422 
423         }
424     }
425 }
426 
427 SCENARIO("take_while, dispose before", "[take_while][operators]"){
428     GIVEN("a source"){
429         auto sc = rxsc::make_test();
430         auto w = sc.create_worker();
431         const rxsc::test::messages<int> on;
432 
433         auto xs = sc.make_hot_observable({
434             on.next(70, 6),
435             on.next(150, 4),
436             on.next(210, 9),
437             on.next(230, 13),
438             on.next(270, 7),
439             on.next(280, 1),
440             on.next(300, -1),
441             on.next(310, 3),
442             on.next(340, 8),
443             on.next(370, 11),
444             on.next(410, 15),
445             on.next(415, 16),
446             on.next(460, 72),
447             on.next(510, 76),
448             on.next(560, 32),
449             on.next(570, -100),
450             on.next(580, -3),
451             on.next(590, 5),
452             on.next(630, 10)
453         });
454 
455         WHEN("3 values are taken"){
456 
457             auto res = w.start(
__anon6e0104070802() 458                 [xs]() {
459                     return xs
460                         .take_while(not_equal_to(100))
461                         // forget type to workaround lambda deduction bug on msvc 2013
462                         .as_dynamic();
463                 },
464                 250
465             );
466 
467             THEN("the output only contains items sent while subscribed"){
468                 auto required = rxu::to_vector({
469                     on.next(210, 9),
470                     on.next(230, 13)
471                 });
472                 auto actual = res.get_observer().messages();
473                 REQUIRE(required == actual);
474             }
475 
476             THEN("there was 1 subscription/unsubscription to the source"){
477                 auto required = rxu::to_vector({
478                     on.subscribe(200, 250)
479                 });
480                 auto actual = xs.subscriptions();
481                 REQUIRE(required == actual);
482             }
483 
484         }
485     }
486 }
487 
488 SCENARIO("take_while, dispose after", "[take_while][operators]"){
489     GIVEN("a source"){
490         auto sc = rxsc::make_test();
491         auto w = sc.create_worker();
492         const rxsc::test::messages<int> on;
493 
494         auto xs = sc.make_hot_observable({
495             on.next(70, 6),
496             on.next(150, 4),
497             on.next(210, 9),
498             on.next(230, 13),
499             on.next(270, 7),
500             on.next(280, 1),
501             on.next(300, -1),
502             on.next(310, 3),
503             on.next(340, 8),
504             on.next(370, 11),
505             on.next(410, 15),
506             on.next(415, 16),
507             on.next(460, 72),
508             on.next(510, 76),
509             on.next(560, 32),
510             on.next(570, -100),
511             on.next(580, -3),
512             on.next(590, 5),
513             on.next(630, 10)
514         });
515 
516         WHEN("3 values are taken"){
517 
518             auto res = w.start(
__anon6e0104070902() 519                 [xs]() {
520                     return xs
521                         .take_while(not_equal_to(1))
522                         // forget type to workaround lambda deduction bug on msvc 2013
523                         .as_dynamic();
524                 },
525                 400
526             );
527 
528             THEN("the output only contains items sent while subscribed"){
529                 auto required = rxu::to_vector({
530                     on.next(210, 9),
531                     on.next(230, 13),
532                     on.next(270, 7),
533                     on.completed(280)
534                 });
535                 auto actual = res.get_observer().messages();
536                 REQUIRE(required == actual);
537             }
538 
539             THEN("there was 1 subscription/unsubscription to the source"){
540                 auto required = rxu::to_vector({
541                     on.subscribe(200, 280)
542                 });
543                 auto actual = xs.subscriptions();
544                 REQUIRE(required == actual);
545             }
546 
547         }
548     }
549 }
550 
551