1 #include "../test.h"
2 #include <rxcpp/operators/rx-take.hpp>
3 
4 SCENARIO("take 2", "[take][operators]"){
5     GIVEN("a source"){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9 
10         auto xs = sc.make_hot_observable({
11             on.next(150, 1),
12             on.next(210, 2),
13             on.next(220, 3),
14             on.next(230, 4),
15             on.next(240, 5),
16             on.completed(250)
17         });
18 
19         WHEN("2 values are taken"){
20 
21             auto res = w.start(
__anon4424e56f0102() 22                 [xs]() {
23                     return xs
24                         | rxo::take(2)
25                         // forget type to workaround lambda deduction bug on msvc 2013
26                         | rxo::as_dynamic();
27                 }
28             );
29 
30             THEN("the output only contains items sent while subscribed"){
31                 auto required = rxu::to_vector({
32                     on.next(210, 2),
33                     on.next(220, 3),
34                     on.completed(220)
35                 });
36                 auto actual = res.get_observer().messages();
37                 REQUIRE(required == actual);
38             }
39 
40             THEN("there was 1 subscription/unsubscription to the source"){
41                 auto required = rxu::to_vector({
42                     on.subscribe(200, 220)
43                 });
44                 auto actual = xs.subscriptions();
45                 REQUIRE(required == actual);
46             }
47 
48         }
49     }
50 }
51 
52 SCENARIO("take, complete after", "[take][operators]"){
53     GIVEN("a source"){
54         auto sc = rxsc::make_test();
55         auto w = sc.create_worker();
56         const rxsc::test::messages<int> on;
57 
58         auto xs = sc.make_hot_observable({
59             on.next(70, 6),
60             on.next(150, 4),
61             on.next(210, 9),
62             on.next(230, 13),
63             on.next(270, 7),
64             on.next(280, 1),
65             on.next(300, -1),
66             on.next(310, 3),
67             on.next(340, 8),
68             on.next(370, 11),
69             on.next(410, 15),
70             on.next(415, 16),
71             on.next(460, 72),
72             on.next(510, 76),
73             on.next(560, 32),
74             on.next(570, -100),
75             on.next(580, -3),
76             on.next(590, 5),
77             on.next(630, 10),
78             on.completed(690)
79         });
80 
81         WHEN("20 values are taken"){
82 
83             auto res = w.start(
__anon4424e56f0202() 84                 [xs]() {
85                     return xs
86                         .take(20)
87                         // forget type to workaround lambda deduction bug on msvc 2013
88                         .as_dynamic();
89                 }
90             );
91 
92             THEN("the output only contains items sent while subscribed"){
93                 auto required = rxu::to_vector({
94                     on.next(210, 9),
95                     on.next(230, 13),
96                     on.next(270, 7),
97                     on.next(280, 1),
98                     on.next(300, -1),
99                     on.next(310, 3),
100                     on.next(340, 8),
101                     on.next(370, 11),
102                     on.next(410, 15),
103                     on.next(415, 16),
104                     on.next(460, 72),
105                     on.next(510, 76),
106                     on.next(560, 32),
107                     on.next(570, -100),
108                     on.next(580, -3),
109                     on.next(590, 5),
110                     on.next(630, 10),
111                     on.completed(690)
112                 });
113                 auto actual = res.get_observer().messages();
114                 REQUIRE(required == actual);
115             }
116 
117             THEN("there was 1 subscription/unsubscription to the source"){
118                 auto required = rxu::to_vector({
119                     on.subscribe(200, 690)
120                 });
121                 auto actual = xs.subscriptions();
122                 REQUIRE(required == actual);
123             }
124 
125         }
126     }
127 }
128 
129 SCENARIO("take, complete same", "[take][operators]"){
130     GIVEN("a source"){
131         auto sc = rxsc::make_test();
132         auto w = sc.create_worker();
133         const rxsc::test::messages<int> on;
134 
135         auto xs = sc.make_hot_observable({
136             on.next(70, 6),
137             on.next(150, 4),
138             on.next(210, 9),
139             on.next(230, 13),
140             on.next(270, 7),
141             on.next(280, 1),
142             on.next(300, -1),
143             on.next(310, 3),
144             on.next(340, 8),
145             on.next(370, 11),
146             on.next(410, 15),
147             on.next(415, 16),
148             on.next(460, 72),
149             on.next(510, 76),
150             on.next(560, 32),
151             on.next(570, -100),
152             on.next(580, -3),
153             on.next(590, 5),
154             on.next(630, 10),
155             on.completed(690)
156         });
157 
158         WHEN("17 values are taken"){
159 
160             auto res = w.start(
__anon4424e56f0302() 161                 [xs]() {
162                     return xs
163                         .take(17)
164                         // forget type to workaround lambda deduction bug on msvc 2013
165                         .as_dynamic();
166                 }
167             );
168 
169             THEN("the output only contains items sent while subscribed"){
170                 auto required = rxu::to_vector({
171                     on.next(210, 9),
172                     on.next(230, 13),
173                     on.next(270, 7),
174                     on.next(280, 1),
175                     on.next(300, -1),
176                     on.next(310, 3),
177                     on.next(340, 8),
178                     on.next(370, 11),
179                     on.next(410, 15),
180                     on.next(415, 16),
181                     on.next(460, 72),
182                     on.next(510, 76),
183                     on.next(560, 32),
184                     on.next(570, -100),
185                     on.next(580, -3),
186                     on.next(590, 5),
187                     on.next(630, 10),
188                     on.completed(630)
189                 });
190                 auto actual = res.get_observer().messages();
191                 REQUIRE(required == actual);
192             }
193 
194             THEN("there was 1 subscription/unsubscription to the source"){
195                 auto required = rxu::to_vector({
196                     on.subscribe(200, 630)
197                 });
198                 auto actual = xs.subscriptions();
199                 REQUIRE(required == actual);
200             }
201 
202         }
203     }
204 }
205 
206 SCENARIO("take, complete before", "[take][operators]"){
207     GIVEN("a source"){
208         auto sc = rxsc::make_test();
209         auto w = sc.create_worker();
210         const rxsc::test::messages<int> on;
211 
212         auto xs = sc.make_hot_observable({
213             on.next(70, 6),
214             on.next(150, 4),
215             on.next(210, 9),
216             on.next(230, 13),
217             on.next(270, 7),
218             on.next(280, 1),
219             on.next(300, -1),
220             on.next(310, 3),
221             on.next(340, 8),
222             on.next(370, 11),
223             on.next(410, 15),
224             on.next(415, 16),
225             on.next(460, 72),
226             on.next(510, 76),
227             on.next(560, 32),
228             on.next(570, -100),
229             on.next(580, -3),
230             on.next(590, 5),
231             on.next(630, 10),
232             on.completed(690)
233         });
234 
235         WHEN("10 values are taken"){
236 
237             auto res = w.start(
__anon4424e56f0402() 238                 [xs]() {
239                     return xs
240                         .take(10)
241                         // forget type to workaround lambda deduction bug on msvc 2013
242                         .as_dynamic();
243                 }
244             );
245 
246             THEN("the output only contains items sent while subscribed"){
247                 auto required = rxu::to_vector({
248                     on.next(210, 9),
249                     on.next(230, 13),
250                     on.next(270, 7),
251                     on.next(280, 1),
252                     on.next(300, -1),
253                     on.next(310, 3),
254                     on.next(340, 8),
255                     on.next(370, 11),
256                     on.next(410, 15),
257                     on.next(415, 16),
258                     on.completed(415)
259                 });
260                 auto actual = res.get_observer().messages();
261                 REQUIRE(required == actual);
262             }
263 
264             THEN("there was 1 subscription/unsubscription to the source"){
265                 auto required = rxu::to_vector({
266                     on.subscribe(200, 415)
267                 });
268                 auto actual = xs.subscriptions();
269                 REQUIRE(required == actual);
270             }
271 
272         }
273     }
274 }
275 
276 SCENARIO("take, error after", "[take][operators]"){
277     GIVEN("a source"){
278         auto sc = rxsc::make_test();
279         auto w = sc.create_worker();
280         const rxsc::test::messages<int> on;
281 
282         std::runtime_error ex("take on_error from source");
283 
284         auto xs = sc.make_hot_observable({
285             on.next(70, 6),
286             on.next(150, 4),
287             on.next(210, 9),
288             on.next(230, 13),
289             on.next(270, 7),
290             on.next(280, 1),
291             on.next(300, -1),
292             on.next(310, 3),
293             on.next(340, 8),
294             on.next(370, 11),
295             on.next(410, 15),
296             on.next(415, 16),
297             on.next(460, 72),
298             on.next(510, 76),
299             on.next(560, 32),
300             on.next(570, -100),
301             on.next(580, -3),
302             on.next(590, 5),
303             on.next(630, 10),
304             on.error(690, ex)
305         });
306 
307         WHEN("20 values are taken"){
308 
309             auto res = w.start(
__anon4424e56f0502() 310                 [xs]() {
311                     return xs
312                         .take(20)
313                         // forget type to workaround lambda deduction bug on msvc 2013
314                         .as_dynamic();
315                 }
316             );
317 
318             THEN("the output only contains items sent while subscribed"){
319                 auto required = rxu::to_vector({
320                     on.next(210, 9),
321                     on.next(230, 13),
322                     on.next(270, 7),
323                     on.next(280, 1),
324                     on.next(300, -1),
325                     on.next(310, 3),
326                     on.next(340, 8),
327                     on.next(370, 11),
328                     on.next(410, 15),
329                     on.next(415, 16),
330                     on.next(460, 72),
331                     on.next(510, 76),
332                     on.next(560, 32),
333                     on.next(570, -100),
334                     on.next(580, -3),
335                     on.next(590, 5),
336                     on.next(630, 10),
337                     on.error(690, ex)
338                 });
339                 auto actual = res.get_observer().messages();
340                 REQUIRE(required == actual);
341             }
342 
343             THEN("there was 1 subscription/unsubscription to the source"){
344                 auto required = rxu::to_vector({
345                     on.subscribe(200, 690)
346                 });
347                 auto actual = xs.subscriptions();
348                 REQUIRE(required == actual);
349             }
350 
351         }
352     }
353 }
354 
355 SCENARIO("take, error same", "[take][operators]"){
356     GIVEN("a source"){
357         auto sc = rxsc::make_test();
358         auto w = sc.create_worker();
359         const rxsc::test::messages<int> on;
360 
361         auto xs = sc.make_hot_observable({
362             on.next(70, 6),
363             on.next(150, 4),
364             on.next(210, 9),
365             on.next(230, 13),
366             on.next(270, 7),
367             on.next(280, 1),
368             on.next(300, -1),
369             on.next(310, 3),
370             on.next(340, 8),
371             on.next(370, 11),
372             on.next(410, 15),
373             on.next(415, 16),
374             on.next(460, 72),
375             on.next(510, 76),
376             on.next(560, 32),
377             on.next(570, -100),
378             on.next(580, -3),
379             on.next(590, 5),
380             on.next(630, 10),
381             on.error(690, std::runtime_error("error in unsubscribed stream"))
382         });
383 
384         WHEN("17 values are taken"){
385 
386             auto res = w.start(
__anon4424e56f0602() 387                 [xs]() {
388                     return xs
389                         .take(17)
390                         // forget type to workaround lambda deduction bug on msvc 2013
391                         .as_dynamic();
392                 }
393             );
394 
395             THEN("the output only contains items sent while subscribed"){
396                 auto required = rxu::to_vector({
397                     on.next(210, 9),
398                     on.next(230, 13),
399                     on.next(270, 7),
400                     on.next(280, 1),
401                     on.next(300, -1),
402                     on.next(310, 3),
403                     on.next(340, 8),
404                     on.next(370, 11),
405                     on.next(410, 15),
406                     on.next(415, 16),
407                     on.next(460, 72),
408                     on.next(510, 76),
409                     on.next(560, 32),
410                     on.next(570, -100),
411                     on.next(580, -3),
412                     on.next(590, 5),
413                     on.next(630, 10),
414                     on.completed(630)
415                 });
416                 auto actual = res.get_observer().messages();
417                 REQUIRE(required == actual);
418             }
419 
420             THEN("there was 1 subscription/unsubscription to the source"){
421                 auto required = rxu::to_vector({
422                     on.subscribe(200, 630)
423                 });
424                 auto actual = xs.subscriptions();
425                 REQUIRE(required == actual);
426             }
427 
428         }
429     }
430 }
431 
432 SCENARIO("take, error before", "[take][operators]"){
433     GIVEN("a source"){
434         auto sc = rxsc::make_test();
435         auto w = sc.create_worker();
436         const rxsc::test::messages<int> on;
437 
438         auto xs = sc.make_hot_observable({
439             on.next(70, 6),
440             on.next(150, 4),
441             on.next(210, 9),
442             on.next(230, 13),
443             on.next(270, 7),
444             on.next(280, 1),
445             on.next(300, -1),
446             on.next(310, 3),
447             on.next(340, 8),
448             on.next(370, 11),
449             on.next(410, 15),
450             on.next(415, 16),
451             on.next(460, 72),
452             on.next(510, 76),
453             on.next(560, 32),
454             on.next(570, -100),
455             on.next(580, -3),
456             on.next(590, 5),
457             on.next(630, 10),
458             on.error(690, std::runtime_error("error in unsubscribed stream"))
459         });
460 
461         WHEN("3 values are taken"){
462 
463             auto res = w.start(
__anon4424e56f0702() 464                 [xs]() {
465                     return xs
466                         .take(3)
467                         // forget type to workaround lambda deduction bug on msvc 2013
468                         .as_dynamic();
469                 }
470             );
471 
472             THEN("the output only contains items sent while subscribed"){
473                 auto required = rxu::to_vector({
474                     on.next(210, 9),
475                     on.next(230, 13),
476                     on.next(270, 7),
477                     on.completed(270)
478                 });
479                 auto actual = res.get_observer().messages();
480                 REQUIRE(required == actual);
481             }
482 
483             THEN("there was 1 subscription/unsubscription to the source"){
484                 auto required = rxu::to_vector({
485                     on.subscribe(200, 270)
486                 });
487                 auto actual = xs.subscriptions();
488                 REQUIRE(required == actual);
489             }
490 
491         }
492     }
493 }
494 
495 SCENARIO("take, dispose before", "[take][operators]"){
496     GIVEN("a source"){
497         auto sc = rxsc::make_test();
498         auto w = sc.create_worker();
499         const rxsc::test::messages<int> on;
500 
501         auto xs = sc.make_hot_observable({
502             on.next(70, 6),
503             on.next(150, 4),
504             on.next(210, 9),
505             on.next(230, 13),
506             on.next(270, 7),
507             on.next(280, 1),
508             on.next(300, -1),
509             on.next(310, 3),
510             on.next(340, 8),
511             on.next(370, 11),
512             on.next(410, 15),
513             on.next(415, 16),
514             on.next(460, 72),
515             on.next(510, 76),
516             on.next(560, 32),
517             on.next(570, -100),
518             on.next(580, -3),
519             on.next(590, 5),
520             on.next(630, 10)
521         });
522 
523         WHEN("3 values are taken"){
524 
525             auto res = w.start(
__anon4424e56f0802() 526                 [xs]() {
527                     return xs
528                         .take(3)
529                         // forget type to workaround lambda deduction bug on msvc 2013
530                         .as_dynamic();
531                 },
532                 250
533             );
534 
535             THEN("the output only contains items sent while subscribed"){
536                 auto required = rxu::to_vector({
537                     on.next(210, 9),
538                     on.next(230, 13)
539                 });
540                 auto actual = res.get_observer().messages();
541                 REQUIRE(required == actual);
542             }
543 
544             THEN("there was 1 subscription/unsubscription to the source"){
545                 auto required = rxu::to_vector({
546                     on.subscribe(200, 250)
547                 });
548                 auto actual = xs.subscriptions();
549                 REQUIRE(required == actual);
550             }
551 
552         }
553     }
554 }
555 
556 SCENARIO("take, dispose after", "[take][operators]"){
557     GIVEN("a source"){
558         auto sc = rxsc::make_test();
559         auto w = sc.create_worker();
560         const rxsc::test::messages<int> on;
561 
562         auto xs = sc.make_hot_observable({
563             on.next(70, 6),
564             on.next(150, 4),
565             on.next(210, 9),
566             on.next(230, 13),
567             on.next(270, 7),
568             on.next(280, 1),
569             on.next(300, -1),
570             on.next(310, 3),
571             on.next(340, 8),
572             on.next(370, 11),
573             on.next(410, 15),
574             on.next(415, 16),
575             on.next(460, 72),
576             on.next(510, 76),
577             on.next(560, 32),
578             on.next(570, -100),
579             on.next(580, -3),
580             on.next(590, 5),
581             on.next(630, 10)
582         });
583 
584         WHEN("3 values are taken"){
585 
586             auto res = w.start(
__anon4424e56f0902() 587                 [xs]() {
588                     return xs
589                         .take(3)
590                         // forget type to workaround lambda deduction bug on msvc 2013
591                         .as_dynamic();
592                 },
593                 400
594             );
595 
596             THEN("the output only contains items sent while subscribed"){
597                 auto required = rxu::to_vector({
598                     on.next(210, 9),
599                     on.next(230, 13),
600                     on.next(270, 7),
601                     on.completed(270)
602                 });
603                 auto actual = res.get_observer().messages();
604                 REQUIRE(required == actual);
605             }
606 
607             THEN("there was 1 subscription/unsubscription to the source"){
608                 auto required = rxu::to_vector({
609                     on.subscribe(200, 270)
610                 });
611                 auto actual = xs.subscriptions();
612                 REQUIRE(required == actual);
613             }
614 
615         }
616     }
617 }
618 
619