1 #include "../test.h"
2 #include <rxcpp/operators/rx-map.hpp>
3 #include <rxcpp/operators/rx-take_until.hpp>
4 
5 SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
6     GIVEN("2 sources"){
7         auto sc = rxsc::make_test();
8         auto w = sc.create_worker();
9         const rxsc::test::messages<int> on;
10 
11         auto xs = sc.make_hot_observable({
12             on.next(150, 1),
13             on.next(210, 2),
14             on.next(220, 3),
15             on.next(230, 4),
16             on.next(240, 5),
17             on.completed(250)
18         });
19 
20         auto ys = sc.make_hot_observable({
21             on.next(150, 1),
22             on.next(225, 99),
23             on.completed(230)
24         });
25 
26         WHEN("one is taken until the other emits a marble"){
27 
28             auto res = w.start(
__anon56d2ed3a0102() 29                 [xs, ys]() {
30                 return xs
31                     | rxo::take_until(ys)
32                     // forget type to workaround lambda deduction bug on msvc 2013
33                     | rxo::as_dynamic();
34             }
35             );
36 
37             THEN("the output only contains items sent while subscribed"){
38                 auto required = rxu::to_vector({
39                     on.next(210, 2),
40                     on.next(220, 3),
41                     on.completed(225)
42                 });
43                 auto actual = res.get_observer().messages();
44                 REQUIRE(required == actual);
45             }
46 
47             THEN("there was 1 subscription/unsubscription to the source"){
48                 auto required = rxu::to_vector({
49                     on.subscribe(200, 225)
50                 });
51                 auto actual = xs.subscriptions();
52                 REQUIRE(required == actual);
53             }
54 
55             THEN("there was 1 subscription/unsubscription to the trigger"){
56                 auto required = rxu::to_vector({
57                     on.subscribe(200, 225)
58                 });
59                 auto actual = ys.subscriptions();
60                 REQUIRE(required == actual);
61             }
62 
63         }
64     }
65 }
66 
67 SCENARIO("take_until, preempt some data next", "[take_until][take][operators]"){
68     GIVEN("2 sources"){
69         auto sc = rxsc::make_test();
70         auto w = sc.create_worker();
71         const rxsc::test::messages<int> on;
72 
73         auto l = sc.make_hot_observable({
74             on.next(150, 1),
75             on.next(210, 2),
76             on.next(220, 3),
77             on.next(230, 4),
78             on.next(240, 5),
79             on.completed(250)
80         });
81 
82         auto r = sc.make_hot_observable({
83             on.next(150, 1),
84             on.next(225, 99),
85             on.completed(230)
86         });
87 
88         WHEN("one is taken until the other emits a marble"){
89 
90             auto res = w.start(
__anon56d2ed3a0202() 91                 [l, r]() {
92                 return l
93                     .take_until(r)
94                     // forget type to workaround lambda deduction bug on msvc 2013
95                     .as_dynamic();
96             }
97             );
98 
99             THEN("the output only contains items sent while subscribed"){
100                 auto required = rxu::to_vector({
101                     on.next(210, 2),
102                     on.next(220, 3),
103                     on.completed(225)
104                 });
105                 auto actual = res.get_observer().messages();
106                 REQUIRE(required == actual);
107             }
108 
109             THEN("there was 1 subscription/unsubscription to the source"){
110                 auto required = rxu::to_vector({
111                     on.subscribe(200, 225)
112                 });
113                 auto actual = l.subscriptions();
114                 REQUIRE(required == actual);
115             }
116 
117             THEN("there was 1 subscription/unsubscription to the trigger"){
118                 auto required = rxu::to_vector({
119                     on.subscribe(200, 225)
120                 });
121                 auto actual = r.subscriptions();
122                 REQUIRE(required == actual);
123             }
124 
125         }
126     }
127 }
128 
129 SCENARIO("take_until, preempt some data error", "[take_until][take][operators]"){
130     GIVEN("2 sources"){
131         auto sc = rxsc::make_test();
132         auto w = sc.create_worker();
133         const rxsc::test::messages<int> on;
134 
135         std::runtime_error ex("take_until on_error from source");
136 
137         auto l = sc.make_hot_observable({
138             on.next(150, 1),
139             on.next(210, 2),
140             on.next(220, 3),
141             on.next(230, 4),
142             on.next(240, 5),
143             on.completed(250)
144         });
145 
146         auto r = sc.make_hot_observable({
147             on.next(150, 1),
148             on.error(225, ex)
149         });
150 
151         WHEN("one is taken until the other emits a marble"){
152 
153             auto res = w.start(
__anon56d2ed3a0302() 154                 [l, r]() {
155                 return l
156                     .take_until(r)
157                     // forget type to workaround lambda deduction bug on msvc 2013
158                     .as_dynamic();
159             }
160             );
161 
162             THEN("the output only contains items sent while subscribed"){
163                 auto required = rxu::to_vector({
164                     on.next(210, 2),
165                     on.next(220, 3),
166                     on.error(225, ex)
167                 });
168                 auto actual = res.get_observer().messages();
169                 REQUIRE(required == actual);
170             }
171 
172             THEN("there was 1 subscription/unsubscription to the source"){
173                 auto required = rxu::to_vector({
174                     on.subscribe(200, 225)
175                 });
176                 auto actual = l.subscriptions();
177                 REQUIRE(required == actual);
178             }
179 
180             THEN("there was 1 subscription/unsubscription to the trigger"){
181                 auto required = rxu::to_vector({
182                     on.subscribe(200, 225)
183                 });
184                 auto actual = r.subscriptions();
185                 REQUIRE(required == actual);
186             }
187 
188         }
189     }
190 }
191 
192 SCENARIO("take_until, no-preempt some data empty", "[take_until][take][operators]"){
193     GIVEN("2 sources"){
194         auto sc = rxsc::make_test();
195         auto w = sc.create_worker();
196         const rxsc::test::messages<int> on;
197 
198         auto l = sc.make_hot_observable({
199             on.next(150, 1),
200             on.next(210, 2),
201             on.next(220, 3),
202             on.next(230, 4),
203             on.next(240, 5),
204             on.completed(250)
205         });
206 
207         auto r = sc.make_hot_observable({
208             on.next(150, 1),
209             on.completed(225)
210         });
211 
212         WHEN("one is taken until the other emits a marble"){
213 
214             auto res = w.start(
__anon56d2ed3a0402() 215                 [l, r]() {
216                 return l
217                     .take_until(r)
218                     // forget type to workaround lambda deduction bug on msvc 2013
219                     .as_dynamic();
220             }
221             );
222 
223             THEN("the output only contains items sent while subscribed"){
224                 auto required = rxu::to_vector({
225                     on.next(210, 2),
226                     on.next(220, 3),
227                     on.next(230, 4),
228                     on.next(240, 5),
229                     on.completed(250)
230                 });
231                 auto actual = res.get_observer().messages();
232                 REQUIRE(required == actual);
233             }
234 
235             THEN("there was 1 subscription/unsubscription to the source"){
236                 auto required = rxu::to_vector({
237                     on.subscribe(200, 250)
238                 });
239                 auto actual = l.subscriptions();
240                 REQUIRE(required == actual);
241             }
242 
243             THEN("there was 1 subscription/unsubscription to the trigger"){
244                 auto required = rxu::to_vector({
245                     on.subscribe(200, 225)
246                 });
247                 auto actual = r.subscriptions();
248                 REQUIRE(required == actual);
249             }
250 
251         }
252     }
253 }
254 
255 SCENARIO("take_until, no-preempt some data never", "[take_until][take][operators]"){
256     GIVEN("2 sources"){
257         auto sc = rxsc::make_test();
258         auto w = sc.create_worker();
259         const rxsc::test::messages<int> on;
260 
261         auto l = sc.make_hot_observable({
262             on.next(150, 1),
263             on.next(210, 2),
264             on.next(220, 3),
265             on.next(230, 4),
266             on.next(240, 5),
267             on.completed(250)
268         });
269 
270         auto r = sc.make_hot_observable({
271             on.next(150, 1)
272         });
273 
274         WHEN("one is taken until the other emits a marble"){
275 
276             auto res = w.start(
__anon56d2ed3a0502() 277                 [l, r]() {
278                 return l
279                     .take_until(r)
280                     // forget type to workaround lambda deduction bug on msvc 2013
281                     .as_dynamic();
282             }
283             );
284 
285             THEN("the output only contains items sent while subscribed"){
286                 auto required = rxu::to_vector({
287                     on.next(210, 2),
288                     on.next(220, 3),
289                     on.next(230, 4),
290                     on.next(240, 5),
291                     on.completed(250)
292                 });
293                 auto actual = res.get_observer().messages();
294                 REQUIRE(required == actual);
295             }
296 
297             THEN("there was 1 subscription/unsubscription to the source"){
298                 auto required = rxu::to_vector({
299                     on.subscribe(200, 250)
300                 });
301                 auto actual = l.subscriptions();
302                 REQUIRE(required == actual);
303             }
304 
305             THEN("there was 1 subscription/unsubscription to the trigger"){
306                 auto required = rxu::to_vector({
307                     on.subscribe(200, 250)
308                 });
309                 auto actual = r.subscriptions();
310                 REQUIRE(required == actual);
311             }
312 
313         }
314     }
315 }
316 
317 SCENARIO("take_until, preempt never next", "[take_until][take][operators]"){
318     GIVEN("2 sources"){
319         auto sc = rxsc::make_test();
320         auto w = sc.create_worker();
321         const rxsc::test::messages<int> on;
322 
323         auto l = sc.make_hot_observable({
324             on.next(150, 1)
325         });
326 
327         auto r = sc.make_hot_observable({
328             on.next(150, 1),
329             on.next(225, 2), //!
330             on.completed(250)
331         });
332 
333         WHEN("one is taken until the other emits a marble"){
334 
335             auto res = w.start(
__anon56d2ed3a0602() 336                 [l, r]() {
337                 return l
338                     .take_until(r)
339                     // forget type to workaround lambda deduction bug on msvc 2013
340                     .as_dynamic();
341             }
342             );
343 
344             THEN("the output only contains items sent while subscribed"){
345                 auto required = rxu::to_vector({
346                     on.completed(225)
347                 });
348                 auto actual = res.get_observer().messages();
349                 REQUIRE(required == actual);
350             }
351 
352             THEN("there was 1 subscription/unsubscription to the source"){
353                 auto required = rxu::to_vector({
354                     on.subscribe(200, 225)
355                 });
356                 auto actual = l.subscriptions();
357                 REQUIRE(required == actual);
358             }
359 
360             THEN("there was 1 subscription/unsubscription to the trigger"){
361                 auto required = rxu::to_vector({
362                     on.subscribe(200, 225)
363                 });
364                 auto actual = r.subscriptions();
365                 REQUIRE(required == actual);
366             }
367 
368         }
369     }
370 }
371 
372 SCENARIO("take_until, preempt never error", "[take_until][take][operators]"){
373     GIVEN("2 sources"){
374         auto sc = rxsc::make_test();
375         auto w = sc.create_worker();
376         const rxsc::test::messages<int> on;
377 
378         std::runtime_error ex("take_until on_error from source");
379 
380         auto l = sc.make_hot_observable({
381             on.next(150, 1)
382         });
383 
384         auto r = sc.make_hot_observable({
385             on.next(150, 1),
386             on.error(225, ex)
387         });
388 
389         WHEN("one is taken until the other emits a marble"){
390 
391             auto res = w.start(
__anon56d2ed3a0702() 392                 [l, r]() {
393                 return l
394                     .take_until(r)
395                     // forget type to workaround lambda deduction bug on msvc 2013
396                     .as_dynamic();
397             }
398             );
399 
400             THEN("the output only contains items sent while subscribed"){
401                 auto required = rxu::to_vector({
402                     on.error(225, ex)
403                 });
404                 auto actual = res.get_observer().messages();
405                 REQUIRE(required == actual);
406             }
407 
408             THEN("there was 1 subscription/unsubscription to the source"){
409                 auto required = rxu::to_vector({
410                     on.subscribe(200, 225)
411                 });
412                 auto actual = l.subscriptions();
413                 REQUIRE(required == actual);
414             }
415 
416             THEN("there was 1 subscription/unsubscription to the trigger"){
417                 auto required = rxu::to_vector({
418                     on.subscribe(200, 225)
419                 });
420                 auto actual = r.subscriptions();
421                 REQUIRE(required == actual);
422             }
423 
424         }
425     }
426 }
427 
428 SCENARIO("take_until, no-preempt never empty", "[take_until][take][operators]"){
429     GIVEN("2 sources"){
430         auto sc = rxsc::make_test();
431         auto w = sc.create_worker();
432         const rxsc::test::messages<int> on;
433 
434         auto l = sc.make_hot_observable({
435             on.next(150, 1)
436         });
437 
438         auto r = sc.make_hot_observable({
439             on.next(150, 1),
440             on.completed(225)
441         });
442 
443         WHEN("one is taken until the other emits a marble"){
444 
445             auto res = w.start(
__anon56d2ed3a0802() 446                 [l, r]() {
447                 return l
448                     .take_until(r)
449                     // forget type to workaround lambda deduction bug on msvc 2013
450                     .as_dynamic();
451             }
452             );
453 
454             THEN("the output only contains items sent while subscribed"){
455                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
456                 auto actual = res.get_observer().messages();
457                 REQUIRE(required == actual);
458             }
459 
460             THEN("there was 1 subscription/unsubscription to the source"){
461                 auto required = rxu::to_vector({
462                     on.subscribe(200, 1000 /* can't dispose prematurely, could be in flight to dispatch OnError */)
463                 });
464                 auto actual = l.subscriptions();
465                 REQUIRE(required == actual);
466             }
467 
468             THEN("there was 1 subscription/unsubscription to the trigger"){
469                 auto required = rxu::to_vector({
470                     on.subscribe(200, 225)
471                 });
472                 auto actual = r.subscriptions();
473                 REQUIRE(required == actual);
474             }
475 
476         }
477     }
478 }
479 
480 SCENARIO("take_until, no-preempt never never", "[take_until][take][operators]"){
481     GIVEN("2 sources"){
482         auto sc = rxsc::make_test();
483         auto w = sc.create_worker();
484         const rxsc::test::messages<int> on;
485 
486         auto l = sc.make_hot_observable({
487             on.next(150, 1)
488         });
489 
490         auto r = sc.make_hot_observable({
491             on.next(150, 1)
492         });
493 
494         WHEN("one is taken until the other emits a marble"){
495 
496             auto res = w.start(
__anon56d2ed3a0902() 497                 [l, r]() {
498                 return l
499                     .take_until(r)
500                     // forget type to workaround lambda deduction bug on msvc 2013
501                     .as_dynamic();
502             }
503             );
504 
505             THEN("the output only contains items sent while subscribed"){
506                 auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
507                 auto actual = res.get_observer().messages();
508                 REQUIRE(required == actual);
509             }
510 
511             THEN("there was 1 subscription/unsubscription to the source"){
512                 auto required = rxu::to_vector({
513                     on.subscribe(200, 1000)
514                 });
515                 auto actual = l.subscriptions();
516                 REQUIRE(required == actual);
517             }
518 
519             THEN("there was 1 subscription/unsubscription to the trigger"){
520                 auto required = rxu::to_vector({
521                     on.subscribe(200, 1000)
522                 });
523                 auto actual = r.subscriptions();
524                 REQUIRE(required == actual);
525             }
526 
527         }
528     }
529 }
530 
531 SCENARIO("take_until, preempt before first produced", "[take_until][take][operators]"){
532     GIVEN("2 sources"){
533         auto sc = rxsc::make_test();
534         auto w = sc.create_worker();
535         const rxsc::test::messages<int> on;
536 
537         auto l = sc.make_hot_observable({
538             on.next(150, 1),
539             on.next(230, 2),
540             on.completed(240)
541         });
542 
543         auto r = sc.make_hot_observable({
544             on.next(150, 1),
545             on.next(210, 2), //!
546             on.completed(220)
547         });
548 
549         WHEN("one is taken until the other emits a marble"){
550 
551             auto res = w.start(
__anon56d2ed3a0a02() 552                 [l, r]() {
553                 return l
554                     .take_until(r)
555                     // forget type to workaround lambda deduction bug on msvc 2013
556                     .as_dynamic();
557             }
558             );
559 
560             THEN("the output only contains items sent while subscribed"){
561                 auto required = rxu::to_vector({
562                     on.completed(210)
563                 });
564                 auto actual = res.get_observer().messages();
565                 REQUIRE(required == actual);
566             }
567 
568             THEN("there was 1 subscription/unsubscription to the source"){
569                 auto required = rxu::to_vector({
570                     on.subscribe(200, 210)
571                 });
572                 auto actual = l.subscriptions();
573                 REQUIRE(required == actual);
574             }
575 
576             THEN("there was 1 subscription/unsubscription to the trigger"){
577                 auto required = rxu::to_vector({
578                     on.subscribe(200, 210)
579                 });
580                 auto actual = r.subscriptions();
581                 REQUIRE(required == actual);
582             }
583 
584         }
585     }
586 }
587 
588 SCENARIO("take_until, preempt before first produced, remain silent and proper unsubscribed", "[take_until][take][operators]"){
589     GIVEN("2 sources"){
590         auto sc = rxsc::make_test();
591         auto w = sc.create_worker();
592         const rxsc::test::messages<int> on;
593 
594         bool sourceNotDisposed = false;
595 
596         auto l = sc.make_hot_observable({
597             on.next(150, 1),
598             on.error(215, std::runtime_error("error in unsubscribed stream")), // should not come
599             on.completed(240)
600         });
601 
602         auto r = sc.make_hot_observable({
603             on.next(150, 1),
604             on.next(210, 2), //!
605             on.completed(220)
606         });
607 
608         WHEN("one is taken until the other emits a marble"){
609 
610             auto res = w.start(
__anon56d2ed3a0b02() 611                 [l, r, &sourceNotDisposed]() {
612                 return l
613                     .map([&sourceNotDisposed](int v){sourceNotDisposed = true; return v; })
614                     .take_until(r)
615                     // forget type to workaround lambda deduction bug on msvc 2013
616                     .as_dynamic();
617             }
618             );
619 
620             THEN("the output only contains items sent while subscribed"){
621                 auto required = rxu::to_vector({
622                     on.completed(210)
623                 });
624                 auto actual = res.get_observer().messages();
625                 REQUIRE(required == actual);
626             }
627 
628             THEN("signal disposed"){
629                 auto required = false;
630                 auto actual = sourceNotDisposed;
631                 REQUIRE(required == actual);
632             }
633 
634         }
635     }
636 }
637 
638 SCENARIO("take_until, no-preempt after last produced, proper unsubscribe signal", "[take_until][take][operators]"){
639     GIVEN("2 sources"){
640         auto sc = rxsc::make_test();
641         auto w = sc.create_worker();
642         const rxsc::test::messages<int> on;
643 
644         bool signalNotDisposed = false;
645 
646         auto l = sc.make_hot_observable({
647             on.next(150, 1),
648             on.next(230, 2),
649             on.completed(240)
650         });
651 
652         auto r = sc.make_hot_observable({
653             on.next(150, 1),
654             on.next(250, 2),
655             on.completed(260)
656         });
657 
658         WHEN("one is taken until the other emits a marble"){
659 
660             auto res = w.start(
__anon56d2ed3a0d02() 661                 [l, r, &signalNotDisposed]() {
662                 return l
663                     .take_until(r
664                     .map([&signalNotDisposed](int v){signalNotDisposed = true; return v; }))
665                     // forget type to workaround lambda deduction bug on msvc 2013
666                     .as_dynamic();
667             }
668             );
669 
670             THEN("the output only contains items sent while subscribed"){
671                 auto required = rxu::to_vector({
672                     on.next(230, 2),
673                     on.completed(240)
674                 });
675                 auto actual = res.get_observer().messages();
676                 REQUIRE(required == actual);
677             }
678 
679             THEN("signal disposed"){
680                 auto required = false;
681                 auto actual = signalNotDisposed;
682                 REQUIRE(required == actual);
683             }
684 
685         }
686     }
687 }
688 
689 SCENARIO("take_until, error some", "[take_until][take][operators]"){
690     GIVEN("2 sources"){
691         auto sc = rxsc::make_test();
692         auto w = sc.create_worker();
693         const rxsc::test::messages<int> on;
694 
695         std::runtime_error ex("take_until on_error from source");
696 
697         auto l = sc.make_hot_observable({
698             on.next(150, 1),
699             on.error(225, ex)
700         });
701 
702         auto r = sc.make_hot_observable({
703             on.next(150, 1),
704             on.next(240, 2)
705         });
706 
707         WHEN("one is taken until the other emits a marble"){
708 
709             auto res = w.start(
__anon56d2ed3a0f02() 710                 [l, r]() {
711                 return l
712                     .take_until(r)
713                     // forget type to workaround lambda deduction bug on msvc 2013
714                     .as_dynamic();
715             }
716             );
717 
718             THEN("the output only contains items sent while subscribed"){
719                 auto required = rxu::to_vector({
720                     on.error(225, ex)
721                 });
722                 auto actual = res.get_observer().messages();
723                 REQUIRE(required == actual);
724             }
725 
726             THEN("there was 1 subscription/unsubscription to the source"){
727                 auto required = rxu::to_vector({
728                     on.subscribe(200, 225)
729                 });
730                 auto actual = l.subscriptions();
731                 REQUIRE(required == actual);
732             }
733 
734             THEN("there was 1 subscription/unsubscription to the trigger"){
735                 auto required = rxu::to_vector({
736                     on.subscribe(200, 225)
737                 });
738                 auto actual = r.subscriptions();
739                 REQUIRE(required == actual);
740             }
741 
742         }
743     }
744 }
745 
746 SCENARIO("take_until trigger on time point", "[take_until][take][operators]"){
747     GIVEN("a source and a time point"){
748         auto sc = rxsc::make_test();
749         auto so = rx::synchronize_in_one_worker(sc);
750         auto w = sc.create_worker();
751         const rxsc::test::messages<int> on;
752 
753         auto xs = sc.make_hot_observable({
754             on.next(150, 1),
755             on.next(210, 2),
756             on.next(220, 3),
757             on.next(230, 4),
758             on.next(240, 5),
759             on.completed(250)
760         });
761 
762         auto t = sc.to_time_point(225);
763 
764         WHEN("invoked with a time point"){
765 
766             auto res = w.start(
__anon56d2ed3a1002() 767                 [&]() {
768                 return xs
769                     | rxo::take_until(t, so)
770                     // forget type to workaround lambda deduction bug on msvc 2013
771                     | rxo::as_dynamic();
772             }
773             );
774 
775             THEN("the output only contains items sent while subscribed"){
776                 auto required = rxu::to_vector({
777                     on.next(211, 2),
778                     on.next(221, 3),
779                     on.completed(226)
780                 });
781                 auto actual = res.get_observer().messages();
782                 REQUIRE(required == actual);
783             }
784 
785             THEN("there was 1 subscription/unsubscription to the source"){
786                 auto required = rxu::to_vector({
787                     on.subscribe(200, 226)
788                 });
789                 auto actual = xs.subscriptions();
790                 REQUIRE(required == actual);
791             }
792         }
793     }
794 }
795